IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink SQL使用Catalog消费Kafka时,多个Source读取同一主题解决方案 -> 正文阅读

[大数据]Flink SQL使用Catalog消费Kafka时,多个Source读取同一主题解决方案

一、Catalog定义

????????Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的

二、Catalog在Flink中实现讲解

2.1Catalog源码定义

????????Catalog的定义在org.apache.flink.table.catalog.Catalog,可以看到它是一个接口,具体如下:

public interface Catalog {
    ''''
}

????????Catalog包含了主要的方法:包括获取数据库信、表、视图、表列信息、函数Function、分区Partition等信息的增删查改。

2.1Catalog继承关系

????????下图为Catalog接口继承图
Catalog继承关系

????????可以看到,Catalog的子类为AbstractCatalog,它有3个实现类,GenericInMemoryCatalog,HiveCatalog,AbstractJdbclog,也就是它可以把Catalog信息保存在内存、Hive、Jdbc(目前仅支持Postgre)中。默认的是保存在内存中。

三、Catalog信息保存在Hive测试

????????要验证多个source读取同一张Kafka表如何处理,我们把Catalog信息保存在MySQL中进行测试。
首先是创建Catalog

3.1创建Hive Catalog

????????启动SQL客户端

./sql-client.sh

????????创建 Hive Catalog


启动Flink sqlclient

$FLINK_HOME/bin/sql-client.sh

创建CATALOG的语法为:

create catalog hive_catalog with (
    'type' = 'hive',
    'default-database' = 'catalogtest',
    'hive-conf-dir' = '/opt/hive/apache-hive-3.1.2-bin/conf/'
);
其中hive-conf-dir为本地hive的配置文件路,即$HIVE_HOME/conf

USE CATALOG hive_catalog ;

3.2 创建Kafka主题,并且往其中写数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
往Kafka队列中写入数据,格式为:string,int

>abs,12
>saz,43
.......

3.3在Flink创建Kafka表

CREATE TABLE mykafka (name String, age int) with (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'test',
   'connector.properties.bootstrap.servers' = 'localhost:9092',
   'connector.properties.group.id' = 'testGroup',
   'format.type' = 'csv',
   'update-mode' = 'append'
);
其中testGroup为默认的groupid

四、测试数据

4.1 第一组消费者使用已经注册的表去消费

select * from mykafka;

4.2第二组消费者使用已经注册的表去消费

select *  from mykafka /*+ OPTIONS('connector.properties.group.id' = 'testGroup111') */;

????????这里使用了hint语法,可以直接消费数据,如果找不到表,需要创建一下catalog(flink官网有默认的catalog设置)

4.3停止第二组消费者,继续往kafka中写数据,再重启第二组消费者

????????首先暂停第二组消费者,然后往Kafka主题中发送数据,接着写入数据后,再重新消费,会观察到数据是从消费停止后开始消费的,这也就说明了这2组消费者是独立的。

五、结论

????????Flink的Catalog支持存储在内存(默认)、Hive、JDBC(仅支持Postgre)中,如果注册完一张Kafka的表默认已经指定了groupId,如果要通过不同的groupId来消费数据,可以通过hint语法指定groupId消费
????????除此之外,还可以通过修改源码的方式,在使用时指定groupId来读取。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-22 21:22:23  更:2022-10-22 21:24:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年4日历 -2025/4/23 10:50:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码