IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris -> 正文阅读

[大数据]使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

背景

  1. 现有数据库:mysql
  2. 数据:库表较多,每个企业用户一个分库,每个企业下的表均不同,无法做到聚合,且表可以被用户随意改动,增删改列等,增加表
  3. 分析:用户自定义分析,通过拖拽定义图卡,要求实时,点击确认即出现相应结果,其中有无法预判的过滤
  4. 问题:随业务增长,企业用户越来越多,mysql压力越来越大,已经出现一些图卡加载过慢[mysql sql]

同步流程

  1. 脚本读取mysql中需要同步的企业,在获取需要同步的表,以字段 member_id,table 字段存储doris中表A,
  2. 脚本读取doris 表A数据,获取mysql中的schema,通过转换,获取doris建表语句,连接doris执行语句
  3. cancel flink 任务,并重新启动flink任务
    1. 每次重启连接doris 表A,获取database与table,组装?databaseList,tableList,以达到重启时重置监听表,联合checkpoint与cdc参数 scanNewlyAddedTableEnabled 增量同步旧表,全量同步新表
    2. doris目前还不支持同步数据时同步修改表结构【据大佬说应该1.2+会支持】,不过cdc可以获取ddlsql,可以通过jdbc的方式连接doris去执行ddlsql,因为sql有点差异,需要转换才能执行
    3. 在将数据导入之doris时,速度导入过快都会出现导入失败,-235错误,可以使用控制读取binlog数量+window聚合 去批量导入
      ?? ?如需要导入表B的数据有{"id":1,"name":"小明"},{"id":2,"name":"小红"},如果执行两次put显然时不太合理的,可以使用jsonArr的方式[{"id":1,"name":"小明"},{"id":2,"name":"小红"}]一次导入

代码

? ? ? ? python 带码不在赘述,git:GitHub - xiaofeicn/MysqlToDorisTable

? ? ? ? Flink CDC

? ? ? ? ??flink中需要感知新表,每日重启时获取doris 表A数据,并组装成databaseList,tableList的参数,代码如下,代码有注释

????????FlinkCDCMysql2Doris.scala

package com.xxx.mysql2doris


import org.apache.flink.streaming.api.TimeCharacteristic
import com.xxx.util.{DorisStreamLoad, PropertiesManager, PropertiesUtil, SinkDoris, SinkSchema}
import com.ververica.cdc.connectors.mysql.source.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, DataStreamUtils}
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink

import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters.asScalaIteratorConverter

object FlinkCDCMysql2Doris {

  PropertiesManager.initUtil()
  val props: PropertiesUtil = PropertiesManager.getUtil
  val log: Logger = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 并行度
    env.setParallelism(props.parallelism)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    /**
     * checkpoint的相关设置
     */
    // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的
    env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE)
    // 设定Checkpoint超时时间,默认为10分钟
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    /** 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多
     * 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒) */
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000)
    // 默认情况下,只有一个检查点可以运行
    // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
    //    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    /** 外部检查点
     * 不会在任务正常停止的过程中清理掉检查点数据,而是会一直保存在外部系统介质中,另外也可以通过从外部检查点中对任务进行恢复 */
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    /** 如果有更近的保存点时,是否将作业回退到该检查点 */
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
    // 设置可以允许的checkpoint失败数
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)

    /**
     * 重启策略的配置
     */
    // 重启3次,每次失败后等待10000毫秒
    //    env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(3, TimeUnit.MINUTES), Time.of(30, TimeUnit.SECONDS)))
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))

    /**
     * 获取同步表配置
     * database table
     */
    val inputMysql = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
      .setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://%s:%d/%s".format(props.sync_config_host, props.sync_config_port, props.sync_config_db))
      .setUsername(props.sync_config_user)
      .setPassword(props.sync_config_password)
      .setQuery("select member_id,sync_table from %s.%s".format(props.sync_config_db, props.sync_config_table))
      .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
      .finish()).uid("inputMysql")


    val databaseName: DataStream[String] = inputMysql.map(line => line.getField(0).toString).uid("databaseName")
    val tableName: DataStream[String] = inputMysql.map(line => {
      val database = line.getField(0).toString
      val table = line.getField(1).toString
      database + "." + table
    }).uid("tableName")

    val databaseIter = databaseName.executeAndCollect().asScala
    // val databaseIter = DataStreamUtils.collect(databaseName).asScala
    // 最终database,用作flink cdc mysql databaseList 参数
    // database1,database2,database3
    val databaseList = databaseIter.toList.mkString(",")

    val tableIter = tableName.executeAndCollect().asScala
    // 最终table,用作flink cdc mysql tableList 参数
    // database1.table1,database1.table2,database2.table1
    val tableList = tableIter.toList.mkString(",")
    log.info(databaseList)
    log.info(tableList)

    /**
     *
     * mysql source for doris
     */
    val mySqlSource = MySqlSource.builder[String]()
      .hostname(props.rds_host)
      .port(props.rds_port)
      .databaseList(databaseList)
      .tableList(tableList)
      .username(props.rds_user)
      .password(props.rds_password)
      .splitSize(props.split_size)
      .fetchSize(props.fetch_size)

      //      .startupOptions(StartupOptions.latest())
      // 全量读取
      .startupOptions(StartupOptions.initial())
      .includeSchemaChanges(true)
      // 发现新表,加入同步任务,需要在tableList中配置
      .scanNewlyAddedTableEnabled(true)
      .deserializer(new JsonDebeziumDeserializationSchema()).build()

    val dataStreamSource: DataStreamSource[String] = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
    // doris赞不支持修改列名
    val ddlSqlStream: DataStream[String] = dataStreamSource.filter(line => line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("ddlSqlStream")
    // data
    val dmlStream: DataStream[String] = dataStreamSource.filter(line => !line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("dmlStream")

    val ddlDataStream = FlinkCDCBinLogETL.ddlFormat(ddlSqlStream)
    val dmlDataStream = FlinkCDCBinLogETL.binLogETL(dmlStream)

    val dorisStreamLoad = new DorisStreamLoad(props)
//    ddlDataStream.print()
    //    byKeyData.print()
    ddlDataStream.addSink(new SinkSchema(props)).name("ALTER TABLE TO DORIS").uid("SinkSchema")
    dmlDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("Data TO DORIS").uid("SinkDoris")
    env.execute("Flink CDC Mysql To Doris With Initial")



  }

  case class dataLine(merge_type: String, db: String, table: String, data: String)


}
FlinkCDCBinLogETL.scala
package com.xxx.mysql2doris

import net.sf.json.JSONObject
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.api.java.tuple.Tuple4
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource}
import org.apache.flink.streaming.api.windowing.time.Time

import scala.util.matching.Regex

object FlinkCDCBinLogETL {

  def binLogETL(dataStreamSource: DataStream[String]): DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = {
    /**
     * 根据不同日志类型 匹配load doris方式
     */
    val tupleData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = dataStreamSource.map(line => {
      var data: JSONObject = null
      var mergetype = "APPEND"
      val lineObj = JSONObject.fromObject(line)

      val source = lineObj.getJSONObject("source")
      val db = source.getString("db")
      val table = source.getString("table")
      if ("d" == lineObj.getString("op")) {
        val oo = lineObj.getJSONObject("before")
        data = lineObj.getJSONObject("before")
        mergetype = "DELETE"
      } else if ("u" == lineObj.getString("op")) {
        data = lineObj.getJSONObject("after")
        mergetype = "MERGE"
      } else if ("c" == lineObj.getString("op")) {
        data = lineObj.getJSONObject("after")
      } else if ("r" == lineObj.getString("op")) {
        data = lineObj.getJSONObject("after")
        mergetype = "APPEND"
      }
      new Tuple4[String, String, String, String](mergetype, db, table, data.toString)
    }).returns(TypeInformation.of(new TypeHint[Tuple4[String, String, String, String]] {}))

    /**
     * 窗口聚合数据,将相同load方式,db,table的json 数据组合为长字符串,
     */
    val byKeyData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = tupleData.keyBy(0, 1, 2)
      .timeWindow(Time.seconds(5))
      .reduce((itemFirst, itemSecond) => new Tuple4(itemFirst.f0, itemFirst.f1, itemFirst.f2, itemFirst.f3 + "=-=-=" + itemSecond.f3))
    byKeyData
  }

  def ddlFormat(ddlDataStream: DataStream[String]): DataStream[String] = {

    val ddlStrDataStream: DataStream[String] = ddlDataStream.map(line => {

      val lineObj = JSONObject.fromObject(line)
      val historyRecord = lineObj.getString("historyRecord")
      var ddl = JSONObject.fromObject(historyRecord).getString("ddl").replace("\r\n", " ")
      println(ddl.replace("\t", " ").replace("\n", ""))
      println("-" * 10)

      if (ddl.contains("varchar")) {
        val pattern = new Regex("varchar\\([0-9]+\\)")
        val v = showCapital(pattern.findFirstIn(ddl))
        val patternLen = new Regex("[0-9]+")
        val lastLen = showCapital(patternLen.findFirstIn(v)).toInt * 3
        val lastType = "varchar(%d)".format(lastLen)
        ddl = ddl.replace(v, lastType)
      }
      // 适配doris类型
      ddl=ddl.replaceAll("TEXT|VARBINARY|varbinary","STRING")
      if (ddl.contains("CHARACTER")) {
        val pattern = new Regex(("CHARACTER[ ]+[a-zA-Z0-9_]+[ ]+[a-zA-Z0-9_]+[ ]+COLLATE[ ]+[a-zA-Z0-9_]+"))
        val rep = showCapital(pattern.findFirstIn(ddl))
        ddl = ddl.replace(rep, "")
      }
      val pattern=new Regex("AFTER[ ]+`.*")
      val after=showCapital(pattern.findFirstIn(ddl))
      ddl.replace("NOT NULL","NULL").replace(after,"")
    })
    ddlStrDataStream
  }

  def showCapital(x: Option[String]): String = x match {
    case Some(s) => s
    case None => "?"
  }

}
DorisStreamLoad.scala
package com.xxx.util

import net.sf.json.JSONObject
import net.sf.json.JSONArray
import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpPut
import org.apache.http.entity.StringEntity
import org.apache.http.entity.BufferedHttpEntity
import org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients}
import org.apache.http.util.EntityUtils
import org.slf4j.{Logger, LoggerFactory}
import org.apache.commons.codec.binary.Base64

import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.UUID

class DorisStreamLoad(props: PropertiesUtil) extends Serializable {


  lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() {
    override protected def isRedirectable(method: String): Boolean = {
      // If the connection target is FE, you need to deal with 307 redirect。
      true
    }
  })


  def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try {
    val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"
    val arr = jsonData.split("=-=-=")
    val jsonArray = new JSONArray()
    for (line <- arr) {
      try {
        val js = JSONObject.fromObject(line)
        jsonArray.add(js)
      } catch {
        case e: Exception =>
          println(e)
          println(line)
      }

    }
    val jsonArrayStr = jsonArray.toString()
    val client = httpClientBuilder.build
    val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table)
    try {
      val put = new HttpPut(loadUrlStr)
      put.removeHeaders(HttpHeaders.CONTENT_LENGTH)
      put.removeHeaders(HttpHeaders.TRANSFER_ENCODING)
      put.setHeader(HttpHeaders.EXPECT, "100-continue")
      put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader)
      val label = UUID.randomUUID.toString
      // You can set stream load related properties in the Header, here we set label and column_separator.
      put.setHeader("label", label)
      put.setHeader("merge_type", mergeType)
      //      put.setHeader("two_phase_commit", "true")
      put.setHeader("column_separator", ",")
      put.setHeader("format", "json")
      put.setHeader("strip_outer_array", "true")
      put.setHeader("exec_mem_limit", "6442450944")
      // Set up the import file. Here you can also use StringEntity to transfer arbitrary data.

      //      for (jsonData <- arr){
      val entity = new StringEntity(jsonArrayStr, "UTF-8")

      put.setEntity(entity)

      //      }

      try {
        val response = client.execute(put)
        println(response)
        try {
          var loadResult = ""
          if (response.getEntity != null) {
            loadResult = EntityUtils.toString(response.getEntity)
          }
          val statusCode = response.getStatusLine.getStatusCode
          if (statusCode != 200) {
            throw new IOException("Stream load failed. status: %s load result: %s".format(statusCode, loadResult))
          }

        } finally if (response != null) {
          response.close()
        }
      }
    }
    finally
      if (client != null) client.close()
  }

  /**
   * Construct authentication information, the authentication method used by doris here is Basic Auth
   *
   */
  def basicAuthHeader: String = {
    val tobeEncode = props.doris_user + ":" + props.doris_password
    val encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8))
    "Basic " + new String(encoded)
  }


}
SinkDoris.scala
package com.xxx.util


import org.apache.flink.api.java.tuple.Tuple4
import com.xxx.mysql2doris.FlinkCDCMysql2Doris.dataLine
import org.apache.flink.streaming.api.functions.sink.SinkFunction


class SinkDoris(dorisStreamLoad:DorisStreamLoad) extends SinkFunction[Tuple4[String, String, String, String]]  {

//  val  dorisStreamLoad:DorisStreamLoadT=null
  /**
   * 在open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
   */
//   def open(parameters: Configuration): Unit = {
//     super
//     super.open(parameters);
//
//  }

  /**
   * 每个元素的插入都要调用一次invoke()方法进行插入操作
   */
  override def invoke(value:Tuple4[String, String, String, String]): Unit = {

    dorisStreamLoad.loadJson(value.f3,value.f0,value.f1,value.f2)

  }

}
SinkSchema.scala
package com.xxx.util

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

import java.sql.{Connection, DriverManager, PreparedStatement}

class SinkSchema(props:PropertiesUtil) extends RichSinkFunction[String]  {
  var conn: Connection = _
  var ps : PreparedStatement  = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = DriverManager.getConnection("jdbc:mysql://%s:%d/%s".format(props.sync_config_host, props.sync_config_port, props.sync_config_db),props.sync_config_user,props.sync_config_password)
    conn.setAutoCommit(false)
  }

  override def close(): Unit = {
    super.close()
    if (conn != null) {
      conn.close()
    }
    if (ps != null) {
      ps.close()
    }
  }

  override def invoke(sql: String, context: SinkFunction.Context): Unit = {
    super.invoke(sql, context)
    ps = conn.prepareStatement(sql)
    try {
      ps.execute()
    }catch {
//      val sql="insert into syserror_table"
      case ex:Exception=>println(ex)
    }

    conn.commit()
  }

}
PropertiesUtil.scala
package com.xxx.util

import java.io.FileInputStream
import java.util.Properties

/**
 * propertiesUtil
 *
 */
class PropertiesUtil extends Serializable {


  private val props = new Properties()

  var doris_host =""
  var doris_port =0
  var doris_user =""
  var doris_password =""

  var database_list =""
  var table_list =""

  var mysql_host =""
  var mysql_port =0
  var mysql_user =""
  var mysql_password =""
  var doris_load_host=""

  var rds_host=""
  var rds_port=0
  var rds_user=""
  var rds_password=""
  var rds_database=""

  var sync_database_select_sql=""
  var sync_table_select_sql=""
  var sync_config_host=""
  var sync_config_port=0
  var sync_config_user=""
  var sync_config_password=""
  var sync_config_db=""
  var sync_config_table=""

  var parallelism=0
  var split_size=0
  var fetch_size=0

  var bootstrap_servers=""
  var topic=""
  var group_id=""
  var offset_mode=""

  def init(filePath: String): Unit = {
    props.load(new FileInputStream(filePath))

    // hdfs
    doris_host = props.getProperty("doris_host")
    doris_port = props.getProperty("doris_port").toInt
    doris_user = props.getProperty("doris_user")
    doris_password = props.getProperty("doris_password")

    database_list = props.getProperty("database_list")
    table_list = props.getProperty("table_list")

    mysql_host = props.getProperty("mysql_host")
    mysql_port = props.getProperty("mysql_port").toInt
    mysql_user = props.getProperty("mysql_user")
    mysql_password = props.getProperty("mysql_password")
    doris_load_host=doris_host+":"+doris_port


    rds_host=props.getProperty("rds_host")
    rds_port=props.getProperty("rds_port").toInt
    rds_user=props.getProperty("rds_user")
    rds_password=props.getProperty("rds_password")
    rds_database=props.getProperty("rds_database")

    sync_database_select_sql=props.getProperty("sync_database_select_sql")
    sync_table_select_sql=props.getProperty("sync_table_select_sql")

    sync_config_host=props.getProperty("sync_config_host")
    sync_config_port=props.getProperty("sync_config_port").toInt
    sync_config_user=props.getProperty("sync_config_user")
    sync_config_password=props.getProperty("sync_config_password")
    sync_config_db=props.getProperty("sync_config_db")
    sync_config_table=props.getProperty("sync_config_table")


    parallelism=props.getProperty("parallelism").toInt
    split_size=props.getProperty("split_size").toInt
    fetch_size=props.getProperty("fetch_size").toInt

    bootstrap_servers=props.getProperty("bootstrap_servers")
    topic=props.getProperty("topic")
    group_id=props.getProperty("group_id")
    offset_mode=props.getProperty("offset_mode")



  }

  def stringToInt(prop: String): Int = {
    try {
      prop.toInt
    } catch {
      case ex: Exception => {
        0
      }
    }
  }
}

//惰性单例,真正计算时才初始化对象
object PropertiesManager {
  @volatile private var propertiesUtil: PropertiesUtil = _

  def getUtil: PropertiesUtil = {
    propertiesUtil
  }

  def initUtil(): Unit = {
    var filePath = "config.properties"
//    if (evn.contains("dev")) {
      filePath = this.getClass.getResource("/").toString.replace("file:", "") + "config.properties"
//    } else if (!evn.equals("")) {
//      filePath = "config_" + evn + ".properties"
//    }
    if (propertiesUtil == null) {
      propertiesUtil = new PropertiesUtil
    }
    propertiesUtil.init(filePath)
//    propertiesUtil.evn = evn
  }
}

若有疑问请留言或者 加入857技术社区

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-17 16:30:00  更:2022-07-17 16:30:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:41:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码