IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> FlinkSQL 实时采集Kafka内容到MySQL(实战记录) -> 正文阅读

[大数据]FlinkSQL 实时采集Kafka内容到MySQL(实战记录)

作者:recommend-item-box type_blog clearfix

01 引言

最近在做实时采集Kafka发布的内容到MySQL,本文记录一下关键的点,细节不再描述,希望能帮助到大家。

02 实现

2.1 添加依赖

在工程,除了添加基础的Flink环境依赖,还需要添加flink-connector-kafka的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.13.6</version>
</dependency>

除此,因为FlinkKafka作为了Source,所以读取的字符串是有解析方式的,本文主要使用的是“json”的方式,因此还需要引入序列化包的,但是flink-connector-kafka已经自带了,所以没必要再引入。

ok,到这里如果我们写好FlinkSQL去启动,直接就会一闪而退了,为什么呢?因为我们缺少了’ kafka-clients-2.1.0.jar'这个包,但是也无需引入,因为在flink-connector-kafka里面已经自带了。

为什么要在这里特别提示 “序列化包”和“kafka-clients包呢”?因为如果我们采用Flink On Yarn的方式部署时,这两个包是需要放到HDFS的,如下:在这里插入图片描述

2.2 Flink SQL

好了,到了关键的FlinkSQL了,该如何写呢?

首先看看Source,也就是我们的Kafka,如下:

CREATE TABLE t_student (
	id INT,
	name STRING
) WITH (
	'connector' = 'kafka',
	'topic' = 'cdc_user',
	'properties.bootstrap.servers' = '10.194.166.92:9092',
	'properties.group.id' = 'flink-cdc-mysql-kafka',
	'scan.startup.mode' = 'earliest-offset',
	'format' = 'json'
)

然后Sink输出,我这里需要输出到MySQL

CREATE TABLE t_student_copy (
      id INT,
      name STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://127.0.0.1:3306/big_data',
       'username' = 'root',
       'password' = '123456',
       'table-name' = 't_student_copy'
)

最后,使用INSERT INTO声明如何写入:

INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student

2.3 配置Kafka域名

还有一点需要注意的是,当我们跑Flink的程序的时候,会出现类似如下错误:
unable to connect broker…

这个时候,我们要在跑Flink的程序的服务器配置Kafka的域名,具体在hosts文件里配置:

vi /etc/hosts

ok,到这里,只要我们只要使用Kafka工具发送json格式的数据,Flink程序就能实时收到,并写入MySQL数据库。

03 文末

本文主要是记录Kafka如何实时写入到MySQL的一些坑点,完整源码就不贴出来了,希望能给大家一点启示并帮助到大家,谢谢大家的阅读,本文完!

附:KafkaTool的使用教程

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 23:28:20  更:2022-04-01 23:29:18 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 15:04:15-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码