IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink CDC (java 版本) -> 正文阅读

[大数据]flink CDC (java 版本)

一.flink CDC ?目前还有许多要完善的,用起来感觉还不错的,我这边自己研究了下.
自己有些心得
1.在flink cdc 自定义反序列化器 测试 修改主键后会终止程序 报错 Recovery is suppressed by NoRestartBackoffTimeStrategy
我这边没时间继续查找研究.希望有人继续研究吧!
2.希望flink cdc 支持 Oracle,这个很重要,成功了也是个壮举啊!
二. mysql的配置 ?mysql肯定都安装 但是初学者找到这个配置确实有点难度
指令:

mysql --help|grep my.cnf

查看 mysql 配置文件的位置 可能会有几个但是只有一个是有效的
我这边是这个/etc/my.cnf
添加配置:?

max_allowed_packet=1024M
server-id=1
log-bin=mysql-bin
binlog_format=row?
binlog-do-db=ssm ?# 这个是很重要,不然会监控mysql中所有的数据库.

?三.报错问题

这个报错主要还是idea中内置的java source版本过低了,这个不提供解决方法了 百度下 有很多解决方案,但是值得一说的是 要找全面.

四.撸代码

需要的依赖包含flink CDC 和flinkSQL cdc 放在一起.

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.1.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>


    </dependencies>

?(1)flink CDC 部分(这里面的断点续传只是演示下 没有具体的研究,由于flink版本 和flink高版本需要整合hadoop的问题,没在研究.)

public class flinkCDC {

    public static void main(String[] args) throws Exception {


        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.flinkcdc 做断点续传,需要将flinkcdc读取binlog的位置信息以状态方式保存在checkpoint中即可.

        //(1)开启checkpoint 每隔5s 执行一次ck 指定ck的一致性语义

       env.enableCheckpointing(5000L);
     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //3.设置任务关闭后,保存最后后一次ck数据.
       env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000L));

       env.setStateBackend(new FsStateBackend("hdfs://192.168.1.204:9000/flinkCDC"));
        //4.设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME","root");

        //5.创建Sources数据源
        Properties prop = new Properties();
        prop.setProperty("scan.startup.mode","initial");  //"scan.startup.mode","initial" 三种要补充解释下

        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("192.168.1.205")
                .port(3306)
                .username("root")
                .password("Root@123")
                .tableList("ssm.order") //这里的表名称,书写格式:db.table  不然会报错
                .databaseList("ssm")
                .debeziumProperties(prop)
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();
        
        //6.添加数据源
        DataStreamSource<String> source = env.addSource(mysqlSource);

         //7.打印数据
        source.print();

        //8.执行任务

        env.execute();

    }
}

(2)flink SQL版本 在这里 需要注意这个mysql的格式

public class flinkSQL_CDC {

    public static void main(String[] args) throws Exception {

        //1.创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableENV = StreamTableEnvironment.create(env);

        //创建一个虚拟机的表 order_info 真实存在于mysql中的表是 ssm.order
        tableENV.executeSql("CREATE TABLE order_info(\n" +
                "orderid INT,\n" +
                "orderNmae STRING,\n" +
                "orderAddr STRING,\n" +
                "orderTime DATE\n" +
                ")WITH(\n" +
                "'connector' = 'mysql-cdc',\n" +
                "'hostname' = '192.168.1.205',\n" +
                "'port' = '3306',\n" +
                "'username' = 'root',\n" +
                "'password' = 'Root@123',\n" +
                "'database-name' = 'ssm',\n" +
                "'table-name' = 'order'\n" +
                ")");

       tableENV.executeSql("select * from order_info ").print();
        env.execute();
    }
}

(3)自定义反序列化器

/**
 * @author quruiwei
 * @version 1.0
 * @date 2021/7/13 13:33
 * @descreption 自定义反序列化器
 */
public class flinkCDC_custom_deserializer {

    public static void main(String[] args) throws Exception {

        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //5.创建Sources数据源
        Properties prop = new Properties();
        prop.setProperty("scan.startup.mode","initial");
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("192.168.1.205")
                .port(3306)
                .username("root")
                .password("Root@123")
                .tableList("ssm.order") //这里的表名称,书写格式:db.table  不然会报错
                .databaseList("ssm")
                .debeziumProperties(prop)
                .deserializer(new DebeziumDeserializationSchema<String>() {
                    @Override
                    public TypeInformation<String> getProducedType() {
                        return TypeInformation.of(String.class);
                    }

                    @Override
                    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
                        //获取topic topic包含着数据库和表名
                        String[] split = sourceRecord.topic().split("\\.");
                        String db = split[1];
                        String tableName = split[2];
                        //System.out.println(sourceRecord.topic());
                        //System.out.println(db);
                        //System.out.println(tableName);

                        //获取操作类型 增删查改
                        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
                        //System.out.println(operation);
                        //获取数据的操作
                        Struct struct = (Struct) sourceRecord.value();
                        Struct after = (Struct) struct.get("after");
                        JSONObject data = new JSONObject(); //创建json 将数据存放在json中,成为json格式
                        for (Field field : after.schema().fields()) {
                            //System.out.println(field);
                            Object o = after.get(field);
                            data.put(field.name(),o);
                        }

                        JSONObject resout = new JSONObject();
                        resout.put("operation",operation.toString().toLowerCase());
                        resout.put("data",data);
                        resout.put("database",db);
                        resout.put("table",tableName);

                        //查看下封装的数据
                        //System.out.println(resout); //{"database":"ssm","data":{"orderTime":18828,"orderid":8,"orderNmae":"11","orderAddr":"1"},"operation":"update","table":"order"}
                        collector.collect(resout.toJSONString());
                    }
                })
                .build();
        //6.添加数据源
        env.addSource(mysqlSource).print();
        
        //8.执行任务
        env.execute();
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-14 10:59:11  更:2021-07-14 11:01:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/3 5:58:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码