IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink cdc例子 -> 正文阅读

[大数据]flink cdc例子

1. flink cdc官网地址(2个地址):

CDC Connectors for Apache Flink?https://ververica.github.io/flink-cdc-connectors/https://github.com/ververica/flink-cdc-connectorshttps://github.com/ververica/flink-cdc-connectors

2.? mysql增加以下配置:

log-bin=mysql-bin
server-id=1
binlog_format=ROW

[mysqld]
#安装目录
basedir=D:\\soft\mysql-5.7.19
datadir=D:\\soft\mysql-5.7.19\data
port=3306
#最大连接数
max_connections=200
#编码
character-set-server=utf8
default-storage-engine=INNODB
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

log-bin=mysql-bin
server-id=1
binlog_format=ROW
 
[mysql]
#编码
default-character-set=utf8 

查看配置结果:SHOW VARIABLES LIKE '%log_bin%';

SHOW VARIABLES LIKE '%binlog_format%';

?


3. pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.flink.cdc</groupId>
	<artifactId>myflinkcdc</artifactId>
	<version>1.0.0-RELEASE</version>
	<properties>
		<flink.version>1.13.5</flink.version>
		<debezium.version>1.5.4.Final</debezium.version>
		<geometry.version>2.2.0</geometry.version>
		<testcontainers.version>1.15.3</testcontainers.version>
		<java.version>1.8</java.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>${java.version}</maven.compiler.source>
		<maven.compiler.target>${java.version}</maven.compiler.target>
		<hamcrest.version>1.3</hamcrest.version>
		<version.awaitility>4.0.1</version.awaitility>
		<slf4j.version>1.7.15</slf4j.version>
		<log4j.version>2.17.1</log4j.version>
		<spotless.version>2.4.2</spotless.version>
		<flink.forkCount>1</flink.forkCount>
		<flink.reuseForks>true</flink.reuseForks>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<fastjson.version>1.2.56</fastjson.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-core</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>${slf4j.version}</version>
		</dependency>

		<!-- Use fixed version 18.0-13.0 of flink shaded guava -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-shaded-guava</artifactId>
			<version>18.0-13.0</version>
		</dependency>

		<!-- test dependencies -->
		<dependency>
			<groupId>org.hamcrest</groupId>
			<artifactId>hamcrest-all</artifactId>
			<version>${hamcrest.version}</version>
			<type>jar</type>
			<scope>test</scope>
		</dependency>

		<!-- tests will have log4j as the default logging framework available -->
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${log4j.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${log4j.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<!-- API bridge between log4j 1 and 2 -->
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-1.2-api</artifactId>
			<version>${log4j.version}</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.awaitility</groupId>
			<artifactId>awaitility</artifactId>
			<type>jar</type>
			<version>${version.awaitility}</version>
		</dependency>
		<dependency>
			<groupId>com.ververica</groupId>
			<!-- add the dependency matching your database -->
			<artifactId>flink-connector-mysql-cdc</artifactId>
			<!-- the dependency is available only for stable releases. -->
			<version>2.2.0</version>
		</dependency>
		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>${fastjson.version}</version>
		</dependency>
	</dependencies>
	
	<build>
        <plugins>
             <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.flink.cdc.FlinkCdc</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

4.FlinkCdc.java

package com.flink.cdc;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Arrays;
import java.util.Properties;

public class FlinkCdc {

    private static final String DEFAULT_SOURCE_PARALLELISM = "2";
    private static final String DEFAULT_SINK_PARALLELISM = "1";
    private static final String DEFAULT_FLINK_CHECK_POINTING = "3000";
    private static final String NULL_CONFIG = "null";

    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        // mysql source 配置
       // String sourceHostname = parameter.get("source.hostname");
        String sourceHostname = "10.11.11.xx";
        int sourcePort = Integer.parseInt("3306");
        String sourceUsername = "xxxx";
        String sourcePassword = "xxxxx";

        // 库表配置
        /*String[] databaseList = parameter.get("source.databaseList").split(",");
        String[] tableList = Arrays.stream(parameter.get("source.tableList", "").split(","))
                .map(table -> databaseList.length == 1 && table.length() > 0 ? databaseList[0] + '.' + table : table)
                .toArray(String[]::new);*/

        String[] databaseList = "testdb".split(",");
        String[] tableList = "testdb.test_table".split(",");

        // flink 配置
        long checkPointing = Long.parseLong(DEFAULT_FLINK_CHECK_POINTING);
        int sourceParallelism = Integer.parseInt(DEFAULT_SOURCE_PARALLELISM);
        int sinkParallelism = Integer.parseInt(DEFAULT_SINK_PARALLELISM);

        // serverId 配置
       // String serverId = parameter.get("serverId", NULL_CONFIG);
        String serverId = "1";
        if (NULL_CONFIG.equals(serverId)) {
            throw new RuntimeException("serverId 必须要设置");
        }
        if (sourceParallelism > 1) {
            serverId = serverId + "-" + (Integer.parseInt(serverId) + sourceParallelism - 1);
        }

        // source -> flink 序列化配置
        Properties debeziumProperties = new Properties();
        debeziumProperties.setProperty("decimal.handling.mode", "double");
        debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long");

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(sourceHostname)
                .port(sourcePort)
                // set captured database
                .databaseList(databaseList)
                // set captured table
                .tableList(tableList)
                .username(sourceUsername)
                .password(sourcePassword)
                .serverId(serverId)
                .debeziumProperties(debeziumProperties)
                .deserializer(new JsonDebeziumDeserializationSchema())
                //监控的方式:
                // 1. initial 初始化全表拷贝,然后再比较
                // 2. earliest 不做初始化,只从当前的
                // 3. latest  指定最新的
                // 4. specificOffset 指定offset
                // 3. timestamp 比指定的时间大的
                .startupOptions(StartupOptions.latest())
                .build();


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(checkPointing);
        env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointTimeout(7200000);
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.1.100:8020/tmp/flinkcdc/test1");
env.getCheckpointConfig().setCheckpointStorage("file:/D:/tmp/ckp");

        // kafka 配置
        String kafkaHost = "xxxxxx:7776";
        String defaultTopic = "flink_cdc_kakfa_test";

        // kafka producer 相关配置
        Properties propertiesProducer = new Properties();
        propertiesProducer.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        propertiesProducer.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "2097152");

        FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>(
                defaultTopic,
                new SimpleStringSerializationSchemaWrapper(false),
                propertiesProducer,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

        String jobName = "flinkCdcTest";
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQLSource")
                .setParallelism(sourceParallelism)
                .addSink(flinkKafkaProducer)
                .name(jobName)
                .setParallelism(sinkParallelism);

        env.execute(jobName);
    }

}

5. SimpleStringSerializationSchemaWrapper.java

package com.flink.cdc;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.nio.charset.StandardCharsets;

import javax.annotation.Nullable;

public class SimpleStringSerializationSchemaWrapper
        implements KafkaSerializationSchema<String>, KafkaContextAware<String> {

    private final SerializationSchema<String> serializationSchema = new SimpleStringSchema();
    private boolean writeTimestamp;

    public SimpleStringSerializationSchemaWrapper(boolean writeTimestamp) {
        this.writeTimestamp = writeTimestamp;
    }

    @Override
    public void open(SerializationSchema.InitializationContext context) throws Exception {
        serializationSchema.open(context);
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
        byte[] serialized = serializationSchema.serialize(element);

        final Long timestampToWrite;
        if (writeTimestamp) {
            timestampToWrite = timestamp;
        } else {
            timestampToWrite = null;
        }

        String targetTopic = getTargetTopic(element);
        // 按topic(db.table)名来分区
        byte[] key = targetTopic.getBytes(StandardCharsets.UTF_8);
        return new ProducerRecord<>(targetTopic, null, timestampToWrite, key, serialized);
    }

    @Override
    public String getTargetTopic(String element) {
        JSONObject payload = JSON.parseObject(element);
        JSONObject source = payload.getJSONObject("source");
        String db = source.getString("db");
        String table = source.getString("table");
        return db + "." + table;
    }

    public void setWriteTimestamp(boolean writeTimestamp) {
        this.writeTimestamp = writeTimestamp;
    }
}

6. FlinkCdc2.java实现类

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class FlinkCdc2 {

    public static void main(String[] args) throws Exception  {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("mytest") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
                .tableList("mytest.t_user_info") // set captured table
                .username("root")
                .password("123456")
  //监控的方式:
                // 1. initial 初始化全表拷贝,然后再比较
                // 2. earliest 不做初始化,只从当前的
                // 3. latest  指定最新的
                // 4. specificOffset 指定offset
                // 3. timestamp 比指定的时间大的
                .startupOptions(StartupOptions.latest())
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        // kafka 配置
        String kafkaHost = "192.168.1.100:7776,192.168.1.101:7776,192.168.1.102:7776";
        String topicId = "flink_cdc_test";

        // kafka producer 相关配置
        Properties propertiesProducer = new Properties();
        propertiesProducer.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        propertiesProducer.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "2097152");

        FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>(
                kafkaHost,
                topicId,
                new SimpleStringSchema());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQLSource")
                .setParallelism(4)
                .addSink(flinkKafkaProducer)
                .setParallelism(1);

        env.execute("flinkCDC");
    }

7. FlinkCdc3.java

package com.flink.cdc;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class FlinkCdc1 {

    public static void main(String[] args) throws Exception  {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("mytest") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
                .tableList("mytest.t_user_info") // set captured table
                .username("root")
                .password("xxxxx")
                .startupOptions(StartupOptions.latest())
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot + Binlog");
    }

}

8. 发布yarn

/opt/flink/bin/yarn-session.sh -d \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=8192m \
-Dtaskmanager.numberOfTaskSlots=8 \
-Dyarn.application.queue="uat" \
-Dyarn.application.name="flinkCdcTest"


/opt/flink/bin/flink run -t yarn-session -d \
-Dyarn.application.id=application_1655802018813_9105 \
-c com.flink.cdc.FlinkCdc \
/tmp/myflinkcdc-1.0.0-RELEASE.jar

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-03 10:53:18  更:2022-07-03 10:53:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:43:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码