IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 利用Flink-CDC和flink-doris-connector技术实现Mysql数据全量或增量同步至Doris -> 正文阅读

[大数据]利用Flink-CDC和flink-doris-connector技术实现Mysql数据全量或增量同步至Doris

简介

Flink CDC:

? 解决了传统数据库实时同步的痛点, 该技术抛弃了其他第三方组件(例如Kafka等),能够实时读取Mysql master节点全量和增量数据,能够捕获所有数据的变化,同时它完全与业务解耦,运维也及其简单。具体介绍请参考:Flink_CDC搭建及简单使用flink-cdc-connectors

Apache Doris:

它是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。

Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!目前Apache Doris已成为Apache顶级项目,成为许多一线大厂实时数据分析数据仓库的不二选择。具体请参考:Apache Doris

技术选型

? 本次采用Flink-CDC和flink-doris-connector技术利用FLink DataStream的方式实现Mysql全量或增量数据同步至分析性数据仓库Doris中。

版本选择:

FlinkFlink-doris-connectorFlink-CDCDoris
1.11.x1.11.6-2.12-xx1.0.0/1.1.00.13.+
1.12.x1.12.7-2.12-xx1.2.0/1.3.00.13.+
1.13.x1.13.5-2.12-xx1.4.0 / 2.0.* / 2.1.* / 2.2.*0.13.+
1.14.x1.14.4-2.12-xx2.2.*0.13.+

注:Flink-doris-connector版本号解读:例如1.13.5-2.12-1.0.1 表示flink 版本 1.13.5,scala 版本 2.12,connector 版本 1.0.1。

具体实现

本次代码构建选择的版本号为:

FlinkFlink-doris-connectorFlink-CDCDoris
1.13.61.0.32.2.00.15.13
导入依赖:
        <!--flink cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>


        <!-- flink  -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.6</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.6</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.6</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.13.6</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.6</version>
            <scope>provided</scope>
        </dependency>


        <!-- flink doris connector -->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.13_2.12</artifactId>
            <version>1.0.3</version>
        </dependency>
代码实现:
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // enable checkpoint
        env.enableCheckpointing(3000);
    
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("127.0.0.1")
                .port(3306)
                .databaseList("test") // set captured database
                .tableList("test.test_cdc") // set captured table
                .username("bigdata")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema()).build();

        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");

    
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql source").map(line ->JSONObject.parseObject(line).getString("after")).addSink(DorisSink.sink(DorisExecutionOptions.builder()
                        .setBatchSize(3)  //批导入条数大小
                        .setBatchIntervalMs(10L)  //批量导入最大间隔时间
                        .setMaxRetries(3)   //最大重试次数
                        .setStreamLoadProp(pro)
                        .build(),
                DorisOptions.builder()
                        .setFenodes("FE_IP:8030")
                        .setTableIdentifier("test.test_cdc_sink")
                        .setUsername("root")
                        .setPassword("").build()));

        env.execute("MYSQL TO Doris Job");

    }

总结:

? 通过Flink CDC和flink doris connector 技术,我们可以很简单的将Mysql中增量或全量数据同步至Doris中,链路短,时效性高,简化了传统数据同步的方式,便于维护。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-21 19:02:54  更:2022-05-21 19:05:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 3:54:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码