IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink-cdc 基础教程 完结 附报错解决(二) -> 正文阅读

[大数据]flink-cdc 基础教程 完结 附报错解决(二)

今天分享又来了呀。?( ′・?・` ) 一起学习进步?ゝ??)ノ?

摘要:保证能够使用flink-cdc的大部分场景

完整教程内容:

  1. 介绍使用flink-cdc的前置知识,MySQL的binlog

  2. 展示部分flink-cdc源码

  3. 实践DataStream方式使用flink-cdc

  4. 实践FlinkSQL方式使用flink-cdc

  5. 对比总结DataStream、FlinkSQL方式的区别和适用场景

  6. 自定义反序列化器,使得获得的流数据更加直观易用

  7. 学习过程遇见过的flink-cdc相关报错

加油,好好学习,天天向上~?

Q:

1 flink-cdc的容错保证

可以看见这个任务提交上来了.

可以看见,数据应该打印在hadoop102 上面.

点击hadoop102这一行之后,可以看见输出:

当我在mysql中插入一条数据后:

可以看见成功监控到数据:

我们希望,当任务挂掉之后,重启任务能够接着上次消费到最新的数据,此时,我们应该保存一个savepoint,从savepoint这里来获取上次消费数据的地方:

[myself@hadoop102 ~]$ cd /opt/module/flink-standalone/
[myself@hadoop102 flink-standalone]$ bin/flink savepoint 713d0721e83be877c97f0a846cf6af85 hdfs://hadoop102:8020/flink1109/savepoint   命令操作 jobid hdfs地址


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-standalone/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Triggering savepoint for job 713d0721e83be877c97f0a846cf6af85.
Waiting for response...
Savepoint completed. Path: hdfs://hadoop102:8020/flink1109/savepoint/savepoint-713d07-9fd0e5ddd3f3
You can resume your program from this savepoint with the run command.

为了感受savepoint,我把这个flink任务给主动挂掉

之后在mysql中插入新的数据行:

从hdfs中获取这个文件夹的地址:

执行命令:

[myself@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink1109/savepoint/savepoint-713d07-9fd0e5ddd3f3 -c com.atguigu.Flink01_DataStream flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar  




SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-standalone/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID f2efda4055ccd36731b2f64aef7e3c9c

如果不从savepoint重启,那还是重头开始消费。

成功接着上次消费数据的地方获取到了数据:

所以这样就实现了DataStream方式 断点续传 ? 只不过把消费数据的位置当做状态来保存的,然后从状态里恢复,不像flum 和 canal,是把消费数据的位置保存到文件中.

Q:

2?FlinkSQL方式的应用

2.2.0 首先到官网去看example

https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (   with后面加上连接参数
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'inventory',
 'table-name' = 'products'
);


-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;

2.2.1 添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

2.2.2 代码实现

package com.alibaba;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;


/**
 * @author zhouyanjun
 * @create 2021-06-22 12:31
 */
public class Flink02_SQL {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        //2.使用SQL方式读取MySQL变化数据
        //3.转换为流打印  我们用流来看下,数据的格式
        //4.启动




        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        //2.使用SQL方式读取MySQL变化数据
        tableEnv.executeSql("create table trademark(id string,tm_name string,logo_url string) " +
                "with(" +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'hadoop102', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = '123456', " +
                " 'database-name' = 'gmall0820flink', " +
                " 'table-name' = 'base_trademark'" +
                ")"
        );
        //3.转换为流打印  我们用流来看下,数据的格式
        Table table = tableEnv.sqlQuery("select * from trademark");//会得到table对象.
        tableEnv.toRetractStream(table, Row.class).print(); //把动态表转换成流.这个地方应该是什么流会比较合适呢?
        //先用撤回流,


        //4.启动
        env.execute();




    }
}
Connector Options 连接器选项


scan.startup.mode
optional  initial  String  Optional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionp for more detailed information.

输出结果:

D:\Develop_software\java\jdk1.8.0_141\bin\java.exe "-javaagent:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\lib\idea_rt.jar=3755:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Develop_software\java\jdk1.8.0_141\jre\lib\charsets.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\deploy.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\access-bridge-64.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\cldrdata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\dnsns.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jaccess.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jfxrt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\localedata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\nashorn.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunec.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunjce_provider.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunmscapi.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunpkcs11.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\zipfs.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\javaws.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jce.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfr.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfxswt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jsse.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\management-agent.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\plugin.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\resources.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\rt.jar;D:\workspace_idea1\BigData0621\gmall-flink-201109\flink-cdc\target\classes;D:\MyWork\Program\RepMaven\org\apache\flink\flink-java\1.12.0\flink-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-core\1.12.0\flink-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-annotations\1.12.0\flink-annotations-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-metrics-core\1.12.0\flink-metrics-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-asm-7\7.1-12.0\flink-shaded-asm-7-7.1-12.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\MyWork\Program\RepMaven\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\MyWork\Program\RepMaven\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\MyWork\Program\RepMaven\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\MyWork\Program\RepMaven\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\MyWork\Program\RepMaven\org\apache\flink\force-shading\1.12.0\force-shading-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-java_2.12\1.12.0\flink-streaming-java_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-file-sink-common\1.12.0\flink-file-sink-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-runtime_2.12\1.12.0\flink-runtime_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-queryable-state-client-java\1.12.0\flink-queryable-state-client-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-hadoop-fs\1.12.0\flink-hadoop-fs-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-io\commons-io\2.7\commons-io-2.7.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-netty\4.1.49.Final-12.0\flink-shaded-netty-4.1.49.Final-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-jackson\2.10.1-12.0\flink-shaded-jackson-2.10.1-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-12.0\flink-shaded-zookeeper-3-3.4.14-12.0.jar;D:\MyWork\Program\RepMaven\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\config\1.3.3\config-1.3.3.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;D:\MyWork\Program\RepMaven\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;D:\MyWork\Program\RepMaven\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;D:\MyWork\Program\RepMaven\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\MyWork\Program\RepMaven\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;D:\MyWork\Program\RepMaven\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\MyWork\Program\RepMaven\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-guava\18.0-12.0\flink-shaded-guava-18.0-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-clients_2.12\1.12.0\flink-clients_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-optimizer_2.12\1.12.0\flink-optimizer_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\MyWork\Program\RepMaven\mysql\mysql-connector-java\5.1.49\mysql-connector-java-5.1.49.jar;D:\MyWork\Program\RepMaven\com\alibaba\ververica\flink-connector-mysql-cdc\1.2.0\flink-connector-mysql-cdc-1.2.0.jar;D:\MyWork\Program\RepMaven\com\alibaba\fastjson\1.2.75\fastjson-1.2.75.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-planner-blink_2.12\1.12.0\flink-table-planner-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-common\1.12.0\flink-table-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-files\1.12.0\flink-connector-files-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-base\1.12.0\flink-connector-base-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java\1.12.0\flink-table-api-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala_2.12\1.12.0\flink-table-api-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java-bridge_2.12\1.12.0\flink-table-api-java-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala-bridge_2.12\1.12.0\flink-table-api-scala-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-scala_2.12\1.12.0\flink-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-scala_2.12\1.12.0\flink-streaming-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-runtime-blink_2.12\1.12.0\flink-table-runtime-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;D:\MyWork\Program\RepMaven\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;D:\MyWork\Program\RepMaven\org\reflections\reflections\0.9.10\reflections-0.9.10.jar com.atguigu.Flink02_SQL
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
(true,1,Redmi,null)
(true,2,苹果,/static/default.jpg)
(true,3,华为,/static/default.jpg)
(true,4,TCL,/static/default.jpg)
(true,5,小米,/static/default.jpg)
(true,6,长粒香,/static/default.jpg)
(true,7,金沙河,/static/default.jpg)
(true,8,索芙特,/static/default.jpg)
(true,9,CAREMiLLE,/static/default.jpg)
(true,10,欧莱雅,/static/default.jpg)
(true,11,香奈儿,/static/default.jpg)
(true,12,周大帅哥,null)
(true,13,天下第一帅,null)
六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)

可以看见,上面就是默认的初始化方式。

先把数据读入进来,然后连接到最新的binlog的位置。

我们删除一条数据之后的输出:

(true,1,Redmi,null)
(true,2,苹果,/static/default.jpg)
(true,3,华为,/static/default.jpg)
(true,4,TCL,/static/default.jpg)
(true,5,小米,/static/default.jpg)
(true,6,长粒香,/static/default.jpg)
(true,7,金沙河,/static/default.jpg)
(true,8,索芙特,/static/default.jpg)
(true,9,CAREMiLLE,/static/default.jpg)
(true,10,欧莱雅,/static/default.jpg)
(true,11,香奈儿,/static/default.jpg)
(true,12,周大帅哥,null)
(true,13,天下第一帅,null)
六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
(false,13,天下第一帅,null)    这里就是表示撤回

我们更新一下信息:

(true,1,Redmi,null)
(true,2,苹果,/static/default.jpg)
(true,3,华为,/static/default.jpg)
(true,4,TCL,/static/default.jpg)
(true,5,小米,/static/default.jpg)
(true,6,长粒香,/static/default.jpg)
(true,7,金沙河,/static/default.jpg)
(true,8,索芙特,/static/default.jpg)
(true,9,CAREMiLLE,/static/default.jpg)
(true,10,欧莱雅,/static/default.jpg)
(true,11,香奈儿,/static/default.jpg)
(true,12,周大帅哥,null)
(true,13,天下第一帅,null)
六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
(false,13,天下第一帅,null)
(false,12,周大帅哥,null)     更新就会产生两条数据。删除旧数据,增添新数据。
(true,12,周大帅哥,更新一条信息)

Q:

3 两种方式的区别:

那么flink 的DataStream方式和FlinkSQL方式的区别在于:

DataStream可以监控多库多表

FlinkSQL只能监控单库的单表, 这是因为代码中建表语句的限制。优点:格式更友好点。

可以通过代码的对比:

DataStream:库的列表、表的列表
.databaseList("gmall0820flink")
.tableList("gmall0820flink.base_trademark")


FlinkSQL: 库名 表名
" 'database-name' = 'gmall0820flink', " +
" 'table-name' = 'base_trademark'" +

这样也是可以的,同时监控两个库,3张表(要首先在mysql的binlog里把库给配置上,开启)。

Q:

4?自定义反序列化器

修改反序列化,使得获得的流数据更加直观易用。

.deserializer(new StringDebeziumDeserializationSchema())

4.1 代码实现

public class Flink03_DataStreamWithMySchema {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "atguigu");


        //1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 开启Ck
        env.enableCheckpointing(5000L);
        env.getCheckpointConfig().setCheckpointTimeout(5000L); //设置checkpoint超时时间,超过时间则会被丢弃
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/ck"));


        //2.使用CDC的方式读取MySQL变化数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource
                .<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall0820flink")
                .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
                .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
//                .startupOptions(StartupOptions.earliest())
//                .startupOptions(StartupOptions.latest())
//                .startupOptions(StartupOptions.specificOffset())
//                .startupOptions(StartupOptions.timestamp())
                .deserializer(new MyDebeziumDeserializationSchema())//首先对这里进行修改。自定义反序列化器。把序列化的二进制文件给反序列化
                .build();


        DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流


        //3.打印
        StreamSource.print();
        //4.启动
        env.execute();
    }
}

源码查看:

.deserializer(new StringDebeziumDeserializationSchema())
点进去


/**
 * The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}.
 */
public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
  this.deserializer = deserializer;
  return this;
}


继续点进去,发现是一个接口,所以在我们写代码时,就要实现这个接口
@PublicEvolving
public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    void deserialize(SourceRecord var1, Collector<T> var2) throws Exception;  


DebeziumDeserializationSchema<T>中的<T>是传递给Collector<T> 使用,而我们已知Collector<T>是往下游输出数据用的。
所以这个T就是输出的数据类型。
}

当我们不知道怎么写的时候,参考源码中别人是怎么写的

.deserializer(new StringDebeziumDeserializationSchema())   点击进去。


可以看见如下:




public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    private static final long serialVersionUID = -3168848963265670603L;


    public StringDebeziumDeserializationSchema() {
    }


    public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
        out.collect(record.toString());  因为是.toString()的写法,所以导致我们默认情况下输出的数据,不能直接的进行使用。非常不直观
    }


    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;   所以这里我们可以参考,同样使用这种写法
    }
}

拿默认反序列化得到的数据:是一个对象不是json,不然我json转json就没意思了

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}} 
ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=1,tm_name=Redmi},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673564}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}


public void deserialize(SourceRecord sourceRecord, Collector<String> collector)


我们点进去,找到最后的toString(),可以发现:


public String toString() {
    return "SourceRecord{sourcePartition=" + this.sourcePartition + ", sourceOffset=" + this.sourceOffset + "} " + super.toString();
}
发现是写死的字符串。

我们对照着默认监控获取到的数据来写代码,为了获取库名.表名,我们选择.keySchema()

Schema schema = sourceRecord.keySchema();

然后看看schema 能够获取什么东西。然后发现schema并不容易获取数据。

选择这个kafak的struct。老师是试过之后,才选择使用这个struct

想要一次性获取所有的字段名,这样方便通过字段名来获取对应的字段值 。

所以就可以对schema()元数据信息的属性值做一次遍历。

//获取操作类型 老师在这里也是卡主了,不知道怎么写,然后在网上搜索贴子后才推出写法的。有一个专门的解析类。

公司当中是一定会有新东西的。

最终代码:

package com.alibaba;


import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;


/**
 * @author zhouyanjun
 * @create 2021-06-22 17:03
 */
public class Flink03_DataStreamWithMySchema {
    public static void main(String[] args) throws Exception {
//        System.setProperty("HADOOP_USER_NAME", "alibaba" );


        //1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 开启Ck,这块我不设置了,先在本地测试运行下


        //2.使用CDC的方式读取MySQL变化数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource
                .<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall0820flink")
                .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
                .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
//                .startupOptions(StartupOptions.earliest())
//                .startupOptions(StartupOptions.latest())
//                .startupOptions(StartupOptions.specificOffset())
//                .startupOptions(StartupOptions.timestamp())
                .deserializer(new MyDebeziumDeserializationSchema())//自定义反序列化器。
                .build();


        DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流


        //3.打印
        StreamSource.print();
        //4.启动
        env.execute();
    }




    public static class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {//我们要把数据封装成json格式,往下游传递进行处理,是json所以我们设置<T>类型是String
        /**
         * 下面是我们要定义的json格式:起码要有这些信息,我们后续才方便来处理这条数据
         * {
         * "data":"{"id":11,"tm_name":"sasa"}",  首先是data,也就是数据本身,因为有多个字段,所以还是个json
         * "db":"", 数据库名。因为我们要能够获取多库多表的数据
         * "tableName":"", 有了数据库,对应的就要有表名。我后续是对库名.表名的数据进行分流处理。因为不同库的不同表的字段是不一样的,如果放到一个流里进行处理,代码里没法确定具体的类型,只能是object类型。
         * "op":"c u d",  操作类型,来表示增加、更新(修改)、删除
         * "ts":""   我们需要有时间字段。
         * }
         */
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //输入的数据  输出的数据
            //我们就是要把SourceRecord sourceRecord 这部分内容解析成json。
            //为了后续能够正常使用数据,我们首先要能正确提取字段出来,也就是说,这个 json是什么样子,我自己要定义下。后续处理数据,我们要什么,就来定义什么
//            Schema schema = sourceRecord.keySchema();


            //获取主题信息,提取数据库和表名
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\." );//java中.不能乱用,要加转义符 有点疑问。
            String db = fields[1];  //获取我们想要的库名、表名了
            String tableName = fields[2];


            //获取Value信息,提取数据本身
//            Object value = sourceRecord.value(); 可以看见是object类型,但根据初始化获得的数据,是stuct类型
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after"); //获得after后面的struct结构


           //遍历之前先new一个json.使用fastjson的包
            JSONObject jsonObject = new JSONObject();


            for (Field field : after.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
                Object o = after.get(field);
                jsonObject.put(field.name(),o);
            }


            //想要获得op,op属于source属性所对应的值里
            //获取操作类型
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            //最后要封装为一个大的json发送出去


            //创建结果JSON
            JSONObject result = new JSONObject();
            result.put("database", db);
            result.put("tableName", tableName);
            result.put("data", jsonObject);
            result.put("op", operation);


            //输出数据
            collector.collect(result.toJSONString());


        }


        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
}

最终代码:



D:\Develop_software\java\jdk1.8.0_141\bin\java.exe "-javaagent:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\lib\idea_rt.jar=11101:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Develop_software\java\jdk1.8.0_141\jre\lib\charsets.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\deploy.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\access-bridge-64.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\cldrdata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\dnsns.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jaccess.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jfxrt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\localedata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\nashorn.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunec.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunjce_provider.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunmscapi.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunpkcs11.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\zipfs.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\javaws.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jce.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfr.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfxswt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jsse.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\management-agent.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\plugin.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\resources.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\rt.jar;D:\workspace_idea1\BigData0621\gmall-flink-201109\flink-cdc\target\classes;D:\MyWork\Program\RepMaven\org\apache\flink\flink-java\1.12.0\flink-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-core\1.12.0\flink-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-annotations\1.12.0\flink-annotations-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-metrics-core\1.12.0\flink-metrics-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-asm-7\7.1-12.0\flink-shaded-asm-7-7.1-12.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\MyWork\Program\RepMaven\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\MyWork\Program\RepMaven\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\MyWork\Program\RepMaven\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\MyWork\Program\RepMaven\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\MyWork\Program\RepMaven\org\apache\flink\force-shading\1.12.0\force-shading-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-java_2.12\1.12.0\flink-streaming-java_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-file-sink-common\1.12.0\flink-file-sink-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-runtime_2.12\1.12.0\flink-runtime_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-queryable-state-client-java\1.12.0\flink-queryable-state-client-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-hadoop-fs\1.12.0\flink-hadoop-fs-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-io\commons-io\2.7\commons-io-2.7.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-netty\4.1.49.Final-12.0\flink-shaded-netty-4.1.49.Final-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-jackson\2.10.1-12.0\flink-shaded-jackson-2.10.1-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-12.0\flink-shaded-zookeeper-3-3.4.14-12.0.jar;D:\MyWork\Program\RepMaven\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\config\1.3.3\config-1.3.3.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;D:\MyWork\Program\RepMaven\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;D:\MyWork\Program\RepMaven\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;D:\MyWork\Program\RepMaven\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\MyWork\Program\RepMaven\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;D:\MyWork\Program\RepMaven\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\MyWork\Program\RepMaven\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-guava\18.0-12.0\flink-shaded-guava-18.0-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-clients_2.12\1.12.0\flink-clients_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-optimizer_2.12\1.12.0\flink-optimizer_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\MyWork\Program\RepMaven\mysql\mysql-connector-java\5.1.49\mysql-connector-java-5.1.49.jar;D:\MyWork\Program\RepMaven\com\alibaba\ververica\flink-connector-mysql-cdc\1.2.0\flink-connector-mysql-cdc-1.2.0.jar;D:\MyWork\Program\RepMaven\com\alibaba\fastjson\1.2.75\fastjson-1.2.75.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-planner-blink_2.12\1.12.0\flink-table-planner-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-common\1.12.0\flink-table-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-files\1.12.0\flink-connector-files-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-base\1.12.0\flink-connector-base-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java\1.12.0\flink-table-api-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala_2.12\1.12.0\flink-table-api-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java-bridge_2.12\1.12.0\flink-table-api-java-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala-bridge_2.12\1.12.0\flink-table-api-scala-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-scala_2.12\1.12.0\flink-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-scala_2.12\1.12.0\flink-streaming-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-runtime-blink_2.12\1.12.0\flink-table-runtime-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;D:\MyWork\Program\RepMaven\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;D:\MyWork\Program\RepMaven\org\reflections\reflections\0.9.10\reflections-0.9.10.jar com.alibaba.Flink03_DataStreamWithMySchema
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.


{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"Redmi","id":1},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"苹果","logo_url":"/static/default.jpg","id":2},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"华为","logo_url":"/static/default.jpg","id":3},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"TCL","logo_url":"/static/default.jpg","id":4},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"小米","logo_url":"/static/default.jpg","id":5},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"长粒香","logo_url":"/static/default.jpg","id":6},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"金沙河","logo_url":"/static/default.jpg","id":7},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"索芙特","logo_url":"/static/default.jpg","id":8},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"CAREMiLLE","logo_url":"/static/default.jpg","id":9},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"欧莱雅","logo_url":"/static/default.jpg","id":10},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"香奈儿","logo_url":"/static/default.jpg","id":11},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"周大帅哥","logo_url":"更新一条信息","id":12},"tableName":"base_trademark"}
六月 22, 2021 9:55:39 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop102:3306 at mysql-bin.000009/154 (sid:5499, cid:13)

放入解析json的网站中,我们可以看见成功进行了解析。这样就方便我们后续对数据的处理。成功把输出为json字符串。

自定义反序列化器,既能够实现监控多库多表,又能够输出方便我们后续处理的数据格式。

Q:

5.在运行过程中我又报错了:



这个是错误提示:
Caused by: java.lang.ArrayIndexOutOfBoundsException: 3
  at org.apache.kafka.connect.data.Struct.get(Struct.java:86)
  at com.alibaba.Flink03_DataStreamWithMySchema$MyDebeziumDeserializationSchema.deserialize(Flink03_DataStreamWithMySchema.java:93)
  at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:114)
  at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:82)
  at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
  at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)




我根据这个定位找到了代码:
for (Field field : value.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
    Object o = after.get(field);    根据提示,我代码点击到这里了。但其实代码真正写错的地方在上面一行
    jsonObject.put(field.name(),o);
}


正确的代码是:


for (Field field : after.schema().fields()) {   正确的代码写法。
    Object o = after.get(field);
    jsonObject.put(field.name(),o);
}


所以,debug的简单方法就是在提示上下行代码处看看,看看有没有逻辑上的漏洞。

打包上传集群后,我每次都从savepoint恢复的,为什么不从checkpoint恢复?

其实可以的,因为savepoint和checkpoint里面的东西都是一样的。

在任务被正常Cancel的时候不保留CK。假如是任务出错的话,checkpoint的文件夹还是会保留的。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); 默认是这个参数。也就是cancel任务之后,默认会把checkpoint清除。
假如我们正常来升级代码,就会手动cancel掉任务,那此时checkpoint的文件夹就没了。


在任务被正常Cancel的时候 保留CK 。假如是任务出错的话,checkpoint的文件夹还是会保留的。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 保留






我们点进RETAIN_ON_CANCELLATION后发现,
RETAIN_ON_CANCELLATION(false);

不加这行参数有什么现象呢?

cancel掉任务之后,

对比这上下两个图片:

cancel之后,文件夹没了,也就不能从进行数据恢复了。

所以最终的代码:

package com.alibaba;


import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;


/**
 * @author zhouyanjun
 * @create 2021-06-22 17:03
 */
public class Flink03_DataStreamWithMySchema {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "alibaba" );


        //1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        // 开启Ck
        env.enableCheckpointing(5000L);
        env.getCheckpointConfig().setCheckpointTimeout(5000L);
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/checkpoint"));


        //在任务被正常Cancel的时候不保留CK
        //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        //在任务被正常Cancel的时候  保留CK
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);




        //2.使用CDC的方式读取MySQL变化数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource
                .<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall0820flink")
                .tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
                .startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
//                .startupOptions(StartupOptions.earliest())
//                .startupOptions(StartupOptions.latest())
//                .startupOptions(StartupOptions.specificOffset())
//                .startupOptions(StartupOptions.timestamp())
                .deserializer(new MyDebeziumDeserializationSchema())//自定义反序列化器。把序列化的二进制文件给反序列化
                .build();


        DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流


        //3.打印
        StreamSource.print();
        //4.启动
        env.execute();
    }




    public static class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {//我们要把数据封装成json格式,往下游传递进行处理,是json所以我们设置<T>类型是String
        /**
         * 下面是我们要定义的json格式:起码要有这些信息,我们后续才方便来处理这条数据
         * {
         * "data":"{"id":11,"tm_name":"sasa"}",  首先是data,也就是数据本身,因为有多个字段,所以还是个json
         * "db":"", 数据库名。因为我们要能够获取多库多表的数据
         * "tableName":"", 有了数据库,对应的就要有表名。我后续是对库名.表名的数据进行分流处理。因为不同库的不同表的字段是不一样的,如果放到一个流里进行处理,代码里没法确定具体的类型,只能是object类型。
         * "op":"c u d",  操作类型,来表示增加、更新(修改)、删除
         * "ts":""   我们需要有时间字段。
         * }
         */
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //输入的数据  输出的数据
            //我们就是要把SourceRecord sourceRecord 这部分内容解析成json。
            //为了后续能够正常使用数据,我们首先要能正确提取字段出来,也就是说,这个 json是什么样子,我自己要定义下。后续处理数据,我们要什么,就来定义什么
//            Schema schema = sourceRecord.keySchema();


            //1  获取主题信息,提取数据库和表名
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\." );//java中.不能乱用,要加转义符 有点疑问。
            String db = fields[1];  //获取我们想要的库名、表名了
            String tableName = fields[2];


            //2  获取Value信息,提取数据本身
//            Object value = sourceRecord.value(); 可以看见是object类型,但根据初始化获得的数据,是stuct类型
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after"); //获得after后面的struct结构


           //遍历之前先new一个json.使用fastjson的包
            JSONObject jsonObject = new JSONObject();


            for (Field field : after.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
                Object o = after.get(field);
                jsonObject.put(field.name(),o);
            }


            //想要获得op,op属于source属性所对应的值里
            //3  获取操作类型
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            //最后要封装为一个大的json发送出去


            //创建结果JSON
            JSONObject result = new JSONObject();
            result.put("database", db);
            result.put("tableName", tableName);
            result.put("data", jsonObject);
            result.put("op", operation);


            //输出数据
            collector.collect(result.toJSONString());


        }


        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
}

flink-cdc系列完结。

-?END -

本文为原创文章

作者:Eugene
某上市公司数据岗萌新,希望自己以后不会是萌新 哈哈

?:在这里跟我一起学习技术、职场、人生、原理、健身、摄影、生活等知识吧!

?:?欢迎点个关注一起学习,进步充实人生。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-25 11:45:16  更:2021-07-25 11:47:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 14:41:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码