基于flinkSQL+kafka+iceberg的测试。参考的链接是:https://www.icode9.com/content-4-827451.html基于文档将测试中踩过的坑给详细的记录一下。
1、flinkSQL要支持iceberg和hive需要引进两个jar包,将两个jar包放至flink lib目录下。 ? ?iceberg-flink-runtime-0.11.1.jar ? ?flink-sql-connector-hive-2.2.0_2.11-1.11.3.jar 2、编辑flinkSQL加载的配置文件sql-client-defaults.yaml,添加默认加载的hive catalog catalogs: ? ?- name: myhive ? ? ?type: hive ? ? ?hive-conf-dir: /opt/cloudera/parcels/CDH/lib/hive/conf 3、使用bin/sql-client.sh ?embedded 命令进入flink sql客户端 ? ?进入客户端之后show catalogs可以看到默认加载的myhive catalog 4、基于myhive catalog创建kafka流表 ? ? CREATE TABLE myhive.test.ods_kafka_stream ( ? ? ? ? ? ? ? ? ?user_id STRING,? ? ? ? ? ? ? ? ? ?order_amount DOUBLE,? ? ? ? ? ? ? ? ? ?log_ts TIMESTAMP(3),? ? ? ? ? ? ? ? ? ) WITH (? ? ? ? ? ? ? ? ? ? 'connector'='kafka', ? ? ? ? ? ? ? ? ? ?//连接器 ? ? ? ? ? ? ? ? ? 'topic'='test_kafka_stream', ? ? ? ? ? ?//kafka topic? ? ? ? ? ? ? ? ? ? 'scan.startup.mode'='latest-offset', ? ?//消费kafka topic的消费模式。 ? ? ? ? ? ? ? ? ? 'properties.bootstrap.servers'='X.X.X.X:9092', ?? ? ? ? ? ? ? ? ? ? 'properties.group.id' = 'test_01',? ? ? ? ? ? ? ? ? ? 'format'='json' ?//kafka消息的格式 ? ? ? ? ? ? ? ? ); ? ? 官网链接:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/kafka.html#start-reading-position ?? ?查表数据时会报如下错误: ? ? (1)select * from myhive.test.ods_kafka_stream? ?? ? ? ? [ERROR] Could not execute SQL statement. Reason: ? ? ? ? ?java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer ?? ? ? ? 需要重启flink集群 ?? ? ?? ?(2)Flink SQL> select * from myhive.test.ods_kafka_stream; ? ? ? ? ?[ERROR] Could not execute SQL statement. Reason: ? ? ? ? ?java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer ?? ? ? ? 报错的原因是kafka序列化的问题,需要将kafka-clients-2.4.1.jar放至flink lib目录下。 ? ?? ?? ?(3)Flink SQL> select * from myhive.test.ods_kafka_stream; ?? ? ? ? java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode ? ? ? ? ?需要将flink-shaded-jackson-2.10.1-11.0.jar放到flink lib目录下。 ?? ? ?? ?Flink SQL> select * from myhive.test.ods_kafka_stream;运行监听流表数据。 5、启动一个topic test_kafka_stream的console producer ? ?kafka-console-producer --broker-list X.X.X.X:9092 --topic test_kafka_stream ? ?生产消息: ? ?{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"} ? ?{"user_id":"a1111","order_amount":11.0,"log_ts":"2021-07-26 12:15:00"} 6、在监听的流表能看到刚刚生产的消息。 ? ? 7、关联kafka流表和iceberg表 ? (1)创建iceberg类型的hadoop catalog ? ?CREATE CATALOG hadoop_catalog WITH ( ? ? ?'type'='iceberg', ? ? ?'catalog-type'='hadoop', ? ? ?'warehouse'='hdfs:///data/iceberg/warehouse/', ? ? ?'property-version'='1' ? ? ); ?? ? (2)创建iceberg表 ? ? CREATE TABLE ?hadoop_catalog.iceberg_db.ods_kafka_stream_iceberg (? ? ? ? ? ? ? ? ? ? ? user_id STRING COMMENT 'user_id',? ? ? ? ? ? ? ? ? ? ? order_amount DOUBLE COMMENT 'order_amount',? ? ? ? ? ? ? ? ? ? ? log_ts STRING? ? ? ? ? ? ? ? ? ); ? (3)使用sql将kafka流表和iceberg表进行关联 ? ? Flink SQL> insert into hadoop_catalog.iceberg_db.ods_kafka_stream_iceberg select ?user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.test.ods_kafka_stream; ? ? 如果之前启动的kafka console producer没有关闭可以再利用这个console producer发送几条消息数据,如果已经关闭再启动kafka-console-producer --broker-list X.X.X.X:9092 --topic test_kafka_stream ? ? {"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"} ? ? {"user_id":"a1111","order_amount":11.0,"log_ts":"2021-07-26 12:15:00"}?? ? ?? ? (4)select * from hadoop_catalog.iceberg_db.ods_kafka_stream_iceberg会发现查找不到数据。 ? ? ? ?原因:写数据到 Iceberg,data 目录数据一直在更新,但是 metadata 没有数据,导致查询的时候没有数据,因为 Iceberg 的查询是需要元数据来索引真实数据的。 ?? ? ? SQL Client 默认没有开启 checkpoint,需要通过配置文件来开启状态。所以会导致 data 目录写入数据而 metadata 目录不写入元数据。 ?? ? ? 设置方法可以在flink-conf.yaml全局设置 或者sql-client-default.yaml文件下的configuration下设置? ? ? ? ?在sql-client-default.yaml下设置的: ? ? ? ?configuration: ? ? ? ? ? execution.checkpointing.interval: 10s ? ? 设置之后重启flinkSQL客户端,然后将之前启动的insert JOB在webUI上面kill掉任务重新运行insert语句。 ? ? 如果之前启动的kafka console producer没有关闭可以再利用这个console producer发送几条消息数据,如果已经关闭再启动kafka-console-producer --broker-list X.X.X.X:9092 --topic test_kafka_stream ? ? {"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"} ? ? {"user_id":"a1111","order_amount":11.0,"log_ts":"2021-07-26 12:15:00"}?? ? ?? ?select * iceberg表能查看到新产生的数据。 8、使用hive客户端读取iceberg表数据 ? ?hive读取iceberg数据需要用到iceberg-hive-runtime-0.11.1.jar包,将包放至hive的lib目录下。 ? ?进入hive客户端创建外部表关联iceberg warehouse表目录 ? ?CREATE EXTERNAL TABLE test.ods_kafka_stream_iceberg_hive(user_id STRING,order_amount DOUBLE,log_ts STRING) ? ?STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'? ? ?LOCATION '/data/iceberg/warehouse/iceberg_db/ods_kafka_stream_iceberg/'; //之前创建hadoop catalog时设置的warehouse目录。 ? ?select * test.ods_kafka_stream_iceberg_hive能查看到表数据内容。
|