IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flinkSQL+kafka+iceberg -> 正文阅读

[大数据]flinkSQL+kafka+iceberg

基于flinkSQL+kafka+iceberg的测试。参考的链接是:https://www.icode9.com/content-4-827451.html基于文档将测试中踩过的坑给详细的记录一下。

1、flinkSQL要支持iceberg和hive需要引进两个jar包,将两个jar包放至flink lib目录下。
? ?iceberg-flink-runtime-0.11.1.jar
? ?flink-sql-connector-hive-2.2.0_2.11-1.11.3.jar
2、编辑flinkSQL加载的配置文件sql-client-defaults.yaml,添加默认加载的hive catalog
catalogs:
? ?- name: myhive
? ? ?type: hive
? ? ?hive-conf-dir: /opt/cloudera/parcels/CDH/lib/hive/conf
3、使用bin/sql-client.sh ?embedded 命令进入flink sql客户端
? ?进入客户端之后show catalogs可以看到默认加载的myhive catalog
4、基于myhive catalog创建kafka流表
? ? CREATE TABLE myhive.test.ods_kafka_stream (
? ? ? ? ? ? ? ? ?user_id STRING,?
? ? ? ? ? ? ? ? ?order_amount DOUBLE,?
? ? ? ? ? ? ? ? ?log_ts TIMESTAMP(3),?
? ? ? ? ? ? ? ? ) WITH (?
? ? ? ? ? ? ? ? ? 'connector'='kafka', ? ? ? ? ? ? ? ? ? ?//连接器
? ? ? ? ? ? ? ? ? 'topic'='test_kafka_stream', ? ? ? ? ? ?//kafka topic?
? ? ? ? ? ? ? ? ? 'scan.startup.mode'='latest-offset', ? ?//消费kafka topic的消费模式。
? ? ? ? ? ? ? ? ? 'properties.bootstrap.servers'='X.X.X.X:9092', ??
? ? ? ? ? ? ? ? ? 'properties.group.id' = 'test_01',?
? ? ? ? ? ? ? ? ? 'format'='json' ?//kafka消息的格式
? ? ? ? ? ? ? ? );
? ? 官网链接:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/kafka.html#start-reading-position
?? ?查表数据时会报如下错误:
? ? (1)select * from myhive.test.ods_kafka_stream?
?? ? ? ? [ERROR] Could not execute SQL statement. Reason:
? ? ? ? ?java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
?? ? ? ? 需要重启flink集群
?? ?
?? ?(2)Flink SQL> select * from myhive.test.ods_kafka_stream;
? ? ? ? ?[ERROR] Could not execute SQL statement. Reason:
? ? ? ? ?java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
?? ? ? ? 报错的原因是kafka序列化的问题,需要将kafka-clients-2.4.1.jar放至flink lib目录下。
? ??
?? ?(3)Flink SQL> select * from myhive.test.ods_kafka_stream;
?? ? ? ? java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
? ? ? ? ?需要将flink-shaded-jackson-2.10.1-11.0.jar放到flink lib目录下。
?? ?
?? ?Flink SQL> select * from myhive.test.ods_kafka_stream;运行监听流表数据。
5、启动一个topic test_kafka_stream的console producer
? ?kafka-console-producer --broker-list X.X.X.X:9092 --topic test_kafka_stream
? ?生产消息:
? ?{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
? ?{"user_id":"a1111","order_amount":11.0,"log_ts":"2021-07-26 12:15:00"}
6、在监听的流表能看到刚刚生产的消息。
? ?
7、关联kafka流表和iceberg表
? (1)创建iceberg类型的hadoop catalog
? ?CREATE CATALOG hadoop_catalog WITH (
? ? ?'type'='iceberg',
? ? ?'catalog-type'='hadoop',
? ? ?'warehouse'='hdfs:///data/iceberg/warehouse/',
? ? ?'property-version'='1'
? ? );
??
? (2)创建iceberg表
? ? CREATE TABLE ?hadoop_catalog.iceberg_db.ods_kafka_stream_iceberg (?
? ? ? ? ? ? ? ? ? ? user_id STRING COMMENT 'user_id',?
? ? ? ? ? ? ? ? ? ? order_amount DOUBLE COMMENT 'order_amount',?
? ? ? ? ? ? ? ? ? ? log_ts STRING?
? ? ? ? ? ? ? ? );
? (3)使用sql将kafka流表和iceberg表进行关联
? ? Flink SQL> insert into hadoop_catalog.iceberg_db.ods_kafka_stream_iceberg select ?user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.test.ods_kafka_stream;
? ? 如果之前启动的kafka console producer没有关闭可以再利用这个console producer发送几条消息数据,如果已经关闭再启动kafka-console-producer --broker-list X.X.X.X:9092 --topic test_kafka_stream
? ? {"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
? ? {"user_id":"a1111","order_amount":11.0,"log_ts":"2021-07-26 12:15:00"}?? ?
??
? (4)select * from hadoop_catalog.iceberg_db.ods_kafka_stream_iceberg会发现查找不到数据。
? ? ? ?原因:写数据到 Iceberg,data 目录数据一直在更新,但是 metadata 没有数据,导致查询的时候没有数据,因为 Iceberg 的查询是需要元数据来索引真实数据的。
?? ? ? SQL Client 默认没有开启 checkpoint,需要通过配置文件来开启状态。所以会导致 data 目录写入数据而 metadata 目录不写入元数据。
?? ? ? 设置方法可以在flink-conf.yaml全局设置 或者sql-client-default.yaml文件下的configuration下设置?
? ? ? ?在sql-client-default.yaml下设置的:
? ? ? ?configuration:
? ? ? ? ? execution.checkpointing.interval: 10s
? ? 设置之后重启flinkSQL客户端,然后将之前启动的insert JOB在webUI上面kill掉任务重新运行insert语句。
? ? 如果之前启动的kafka console producer没有关闭可以再利用这个console producer发送几条消息数据,如果已经关闭再启动kafka-console-producer --broker-list X.X.X.X:9092 --topic test_kafka_stream
? ? {"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
? ? {"user_id":"a1111","order_amount":11.0,"log_ts":"2021-07-26 12:15:00"}?? ?
?? ?select * iceberg表能查看到新产生的数据。
8、使用hive客户端读取iceberg表数据
? ?hive读取iceberg数据需要用到iceberg-hive-runtime-0.11.1.jar包,将包放至hive的lib目录下。
? ?进入hive客户端创建外部表关联iceberg warehouse表目录
? ?CREATE EXTERNAL TABLE test.ods_kafka_stream_iceberg_hive(user_id STRING,order_amount DOUBLE,log_ts STRING)
? ?STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'?
? ?LOCATION '/data/iceberg/warehouse/iceberg_db/ods_kafka_stream_iceberg/'; //之前创建hadoop catalog时设置的warehouse目录。
? ?select * test.ods_kafka_stream_iceberg_hive能查看到表数据内容。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-28 23:17:22  更:2021-07-28 23:17:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 19:51:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码