IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink sql 知其所以然(五)| 自定义 protobuf format -> 正文阅读

[大数据]flink sql 知其所以然(五)| 自定义 protobuf format

图片

感谢您的关注 ?+ ?点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!!

1.序篇-本文结构

大数据羊说

大数据羊说

用数据提升美好事物发生的概率~

30篇原创内容

公众号

protobuf?作为目前各大公司中最广泛使用的高效的协议数据交换格式工具库,会大量作为流式数据传输的序列化方式,所以在 flink sql 中如果能实现?protobuf?的?format?会非常有用(目前社区已经有对应的实现,不过目前还没有 merge,预计在 1.14 系列版本中能 release)。

issue?见:https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC

pr?见:https://github.com/apache/flink/pull/14376

这一节主要介绍 flink sql 中怎么自定义实现?format,其中以最常使用的?protobuf?作为案例来介绍。

  1. 背景篇-为啥需要 protobuf format

  2. 目标篇-protobuf format 预期效果

  3. 难点剖析篇-此框架建设的难点、目前有哪些实现

  4. 维表实现篇-实现的过程

  5. 总结与展望篇

如果想在本地直接测试下:

  1. 在公众号后台回复
  • flink sql 知其所以然(五)| 自定义 protobuf format获取源码(源码基于 1.13.1 实现)

  • flink sql 知其所以然(五)| 自定义 protobuf format获取源码(源码基于 1.13.1 实现)

  • flink sql 知其所以然(五)| 自定义 protobuf format获取源码(源码基于 1.13.1 实现)

  1. 执行源码包中的?flink.examples.sql._05.format.formats.SocketWriteTest?测试类来制造 protobuf 数据

  2. 然后执行源码包中的?flink.examples.sql._05.format.formats.ProtobufFormatTest?测试类来消费 protobuf 数据,并且打印在 console 中,然后就可以在 console 中看到结果。

2.背景篇-为啥需要 protobuf format

关于为什么选择?protobuf?可以看这篇文章,写的很详细:

http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral

在实时计算的领域中,为了可读性会选择?json,为了效率以及一些已经依赖了?grpc?的公司会选择?protobuf?来做数据序列化,那么自然而然,日志的序列化方式也会选择?protobuf

而官方目前已经 release 的版本中是没有提供 flink sql api 的?protobuf format?的。如下图,基于 1.13 版本。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

图片

1

因此本文在介绍怎样自定义一个 format 的同时,实现一个 protobuf format 来给大家使用。

3.目标篇-protobuf format 预期效果

预期效果是先实现几种最基本的数据类型,包括 protobuf 中的?message(自定义 model)、map(映射)、repeated(列表)、其他基本数据类型等,这些都是我们最常使用的类型。

预期 protobuf message 定义如下:

图片

2

测试数据源数据如下,博主把 protobuf 的数据转换为 json,以方便展示,如下图:

图片

3

预期 flink sql:

数据源表 DDL:

CREATE?TABLE?protobuf_source?(
????name?STRING
??,?names?ARRAY<STRING>
??,?si_map?MAP<STRING,?INT>
)
WITH?(
??'connector'?=?'socket',
??'hostname'?=?'localhost',
??'port'?=?'9999',
??'format'?=?'protobuf',
??'protobuf.class-name'?=?'flink.examples.sql._04.format.formats.protobuf.Test'
)

数据汇表 DDL:

CREATE?TABLE?print_sink?(
??name?STRING
??,?names?ARRAY<STRING>
??,?si_map?MAP<STRING,?INT>
)?WITH?(
??'connector'?=?'print'
)

Transform 执行逻辑:

INSERT?INTO?print_sink
SELECT?*
FROM?protobuf_source

下面是我在本地跑的结果:

图片

图片

可以看到打印的结果,数据是正确的被反序列化读入,并且最终输出到 console。

4.难点剖析篇-目前有哪些实现

目前业界可以参考的实现如下:https://github.com/maosuhan/flink-pb, 也就是这位哥们负责目前 flink protobuf 的 format。

这种实现的具体使用方式如下:

图片

7

其实现有几个特点:

  1. 复杂性:用户需要在 flink sql 程序运行时,将对应的 protobuf java 文件引入 classpath,这个特点是复合 flink 这样的通用框架的特点的。但是如果需要在各个公司场景要做一个流式处理平台的场景下,各个 protobuf sdk 可能都位于不同的 jar 包中,那么其 jar 包管理可能是一个比较大的问题。

  2. 高效 serde:一般很多场景下为了通用化 serde protobuf message,可能会选择 DynamicMessage 来处理 protobuf message,但是其 serde 性能相比原生 java code 的性能比较差。因为特点 1 引入了 protobuf 的 java class,所以其 serde function 可以基于 codegen 实现,而这将极大提高 serde 效率,效率提高就代表着省钱啊,可以吹逼的。

图片

8

Notes:

当然博主针对第一点也有一些想法,比如怎样做到不依赖 protobuf java 文件,只依赖 protobuf 的 message 定义即可或者只依赖其 descriptor。目前博主的想法如下:

  1. flink 程序在客户端获取到对应的 protobuf message 定义

  2. 然后根据这个定义恢复出 proto 文件

  3. 客户端本地执行 protoc 将此文件编译为 java 文件

  4. 客户端本地动态将此 java 文件编译并 load 到 jvm 中

  5. 使用 codegen 然后动态生成执行代码

一气呵成!!!

具体实现其实可以参考:https://stackoverflow.com/questions/28381659/how-to-compile-protocol-buffers-schema-at-runtime

5.实现篇-实现的过程

5.1.flink format 工作原理

其实上节已经详细描述了 flink sql 对于 source\sink\format 的加载机制。

  1. 通过 SPI 机制加载所有的 source\sink\format 工厂?Factory

  2. 过滤出 DeserializationFormatFactory\SerializationFormatFactory + format 标识的 format 工厂类

  3. 通过 format 工厂类创建出对应的 format

图片

12

[

图片

flink sql 知其所以然(一)| source\sink 原理

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488486&idx=1&sn=b9bdb56e44631145c8cc6354a093e7c0&chksm=c1549f1ef623160834e3c5661c155ec421699fc18c57f2c63ba14d33bab1d37c5930fdce016b&scene=21#wechat_redirect)

图片

11

如图 serde format 是通过?TableFactoryHelper.discoverDecodingFormat?和?TableFactoryHelper.discoverEncodingFormat?创建的

//?either?implement?your?custom?validation?logic?here?...
????????//?or?use?the?provided?helper?utility
final?FactoryUtil.TableFactoryHelper?helper?=?FactoryUtil.createTableFactoryHelper(this,?context);

//?discover?a?suitable?decoding?format
final?DecodingFormat<DeserializationSchema<RowData>>?decodingFormat?=?helper.discoverDecodingFormat(
????????DeserializationFormatFactory.class,
????????FactoryUtil.FORMAT);

图片

16

所有通过 SPI 的 source\sink\formt 插件都继承自?Factory

整体创建 format 方法的调用链如下图。

图片

13

5.2.flink protobuf format 实现

最终实现如下,涉及到了几个实现类:

  1. ProtobufFormatFactory

  2. ProtobufOptions

  3. ProtobufRowDataDeserializationSchema

  4. ProtobufToRowDataConverters

图片

14

具体流程:

  1. 定义 SPI 的工厂类?ProtobufFormatFactory implements DeserializationFormatFactory,并且在 resource\META-INF 下创建 SPI 的插件文件

  2. 实现?ProtobufFormatFactory#factoryIdentifier?标识?protobuf

  3. 实现?ProtobufFormatFactory#createDecodingFormat?来创建对应的?DecodingFormat<DeserializationSchema<RowData>>DecodingFormat?是用来封装具体的反序列化器的,实现?DecodingFormat<DeserializationSchema<RowData>>#createRuntimeDecoder,返回?ProtobufRowDataDeserializationSchema

  4. 定义?ProtobufRowDataDeserializationSchema implements DeserializationSchema<RowData>,这个就是具体的反序列化器,其实与 datastream api 相同

  5. 实现?ProtobufRowDataDeserializationSchema#deserialize?方法,与 datastream 相同,这个方法就是将?byte[]?序列化为?RowData?的具体逻辑

  6. 注意这里还实现了一个类?ProtobufToRowDataConverters,其作用就是在客户端创建出具体的将 ?byte[]?序列化为?RowData?的具体工具类,其会根据用户定义的表字段类型动态生成数据转换的 converter 类(策略模式:https://www.runoob.com/design-pattern/strategy-pattern.html),相当于表的 schema 确定之后,其 converter 也会确定

上述实现类的具体关系如下:

图片

19

介绍完流程,进入具体实现方案细节:

ProtobufFormatFactory?主要创建 format 的逻辑:

public?class?ProtobufFormatFactory?implements?DeserializationFormatFactory?{

????public?static?final?String?IDENTIFIER?=?"protobuf";

????@Override
????public?DecodingFormat<DeserializationSchema<RowData>>?createDecodingFormat(Context?context,
????????????ReadableConfig?formatOptions)?{

????????FactoryUtil.validateFactoryOptions(this,?formatOptions);

????????//?1.获取到?protobuf?的?class?全路径
????????final?String?className?=?formatOptions.get(PROTOBUF_CLASS_NAME);

????????try?{
????????????//?2.load?class
????????????Class<GeneratedMessageV3>?protobufV3?=
????????????????????(Class<GeneratedMessageV3>)?this.getClass().getClassLoader().loadClass(className);

????????????//?3.创建?DecodingFormat
????????????return?new?DecodingFormat<DeserializationSchema<RowData>>()?{
????????????????@Override
????????????????public?DeserializationSchema<RowData>?createRuntimeDecoder(DynamicTableSource.Context?context,
????????????????????????DataType?physicalDataType)?{
????????????????????//?4.获取到?table?schema?rowtype
????????????????????final?RowType?rowType?=?(RowType)?physicalDataType.getLogicalType();

????????????????????//?5.创建对应的?DeserializationSchema?作为反序列化器
????????????????????return?new?ProtobufRowDataDeserializationSchema(
????????????????????????????protobufV3
????????????????????????????,?true
????????????????????????????,?rowType);
????????????????}

????????????????@Override
????????????????public?ChangelogMode?getChangelogMode()?{
????????????????????return?ChangelogMode.insertOnly();
????????????????}
????????????};
????????}?catch?(ClassNotFoundException?e)?{
????????????throw?new?RuntimeException(e);
????????}
????}

????@Override
????public?String?factoryIdentifier()?{
????????return?IDENTIFIER;
????}

????...
}

resources\META-INF 文件:

图片

17

ProtobufRowDataDeserializationSchema?主要实现反序列化的逻辑:

public?class?ProtobufRowDataDeserializationSchema?extends?AbstractDeserializationSchema<RowData>?{

????...

????private?ProtobufToRowDataConverters.ProtobufToRowDataConverter?runtimeConverter;

????public?ProtobufRowDataDeserializationSchema(
????????????Class<??extends?GeneratedMessageV3>?messageClazz
????????????,?boolean?ignoreParseErrors
????????????,?RowType?expectedResultType)?{
????????this.ignoreParseErrors?=?ignoreParseErrors;
????????Preconditions.checkNotNull(messageClazz,?"Protobuf?message?class?must?not?be?null.");
????????this.messageClazz?=?messageClazz;
????????this.descriptorBytes?=?null;
????????this.descriptor?=?ProtobufUtils.getDescriptor(messageClazz);
????????this.defaultInstance?=?ProtobufUtils.getDefaultInstance(messageClazz);

????????//?protobuf?本身的?schema
????????this.protobufOriginalRowType?=?(RowType)?ProtobufSchemaConverter.convertToRowDataTypeInfo(messageClazz);

????????this.expectedResultType?=?expectedResultType;

????????//?1.根据?table?schema?动态创建出对应的反序列化器
????????this.runtimeConverter?=?new?ProtobufToRowDataConverters(false)
????????????????.createRowDataConverterByLogicalType(this.descriptor,?this.expectedResultType);
????}

????@Override
????public?RowData?deserialize(byte[]?bytes)?throws?IOException?{
????????if?(bytes?==?null)?{
????????????return?null;
????????}
????????try?{

????????????//?2.将?bytes?反序列化为?protobuf?message
????????????Message?message?=?this.defaultInstance
????????????????????.newBuilderForType()
????????????????????.mergeFrom(bytes)
????????????????????.build();

????????????//?3.反序列化逻辑,从?protobuf?message?中获取字段转换为?RowData
????????????return?(RowData)?runtimeConverter.convert(message);
????????}?catch?(Throwable?t)?{
????????????if?(ignoreParseErrors)?{
????????????????return?null;
????????????}
????????????throw?new?IOException(
????????????????????format("Failed?to?deserialize?Protobuf?'%s'.",?new?String(bytes)),?t);
????????}
????}

????...

可以注意到上述反序列化的主要逻辑就集中在?runtimeConverter?上,即?ProtobufToRowDataConverters.ProtobufToRowDataConverter

ProtobufToRowDataConverters.ProtobufToRowDataConverter?就是在?ProtobufToRowDataConverters?中定义的。

ProtobufToRowDataConverters.ProtobufToRowDataConverter?其实就是一个 convertor 接口:

@FunctionalInterface
public?interface?ProtobufToRowDataConverter?extends?Serializable?{
????Object?convert(Object?object);
}

其作用就是将 protobuf message 中的每一个字段转换成为?RowData?中的每一个字段。

ProtobufToRowDataConverters?中就定义了具体转换逻辑,如截图所示,每一个 LogicalType 都定义了 protobuf message 字段转换为 flink 数据类型的逻辑:

图片

18

源码公众号后台回复flink sql 知其所以然(五)| 自定义 protobuf format获取。

6.总结与展望篇

6.1.总结

本文主要是针对 flink sql protobuf format 进行了原理解释以及对应的实现。如果你正好需要这么一个 format,直接公众号后台回复flink sql 知其所以然(五)| 自定义 protobuf format获取源码吧。

大数据羊说

大数据羊说

用数据提升美好事物发生的概率~

30篇原创内容

公众号

6.2.展望

当然上述只是 protobuf format 一个基础的实现,用于生产环境还有很多方面可以去扩展的。

  1. 性能优化、通用化:protobuf java class 本地 codegen 来提高任务性能

  2. 数据质量:异常 AOP,alert 等

往期推荐

[

flink sql 知其所以然(四)| sql api 类型系统

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488788&idx=1&sn=0127fd4037788762a0401313b43b0ea5&chksm=c15499ecf62310fa747c530f722e631570a1b0469af2a693e9f48d3a660aa2c15e610653fe8c&scene=21#wechat_redirect)

[

flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488720&idx=1&sn=5695e3691b55a7e40814d0e455dbe92a&chksm=c1549828f623113e9959a382f98dc9033997dd4bdcb127f9fb2fbea046545b527233d4c3510e&scene=21#wechat_redirect)

[

flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488635&idx=1&sn=41817a078ef456fb036e94072b2383ff&chksm=c1549883f623119559c47047c6d2a9540531e0e6f0b58b155ef9da17e37e32a9c486fe50f8e3&scene=21#wechat_redirect)

[

flink sql 知其所以然(一)| source\sink 原理

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488486&idx=1&sn=b9bdb56e44631145c8cc6354a093e7c0&chksm=c1549f1ef623160834e3c5661c155ec421699fc18c57f2c63ba14d33bab1d37c5930fdce016b&scene=21#wechat_redirect)

[

揭秘字节跳动埋点数据实时动态处理引擎(附源码)

](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488435&idx=1&sn=5d89a0d24603c08af4be342462409230&chksm=c1549f4bf623165d977426d13a0bdbe821ec8738744d2274613a7ad92dec0256d090aea4b815&scene=21#wechat_redirect)

更多 Flink 实时大数据分析相关技术博文,视频。后台回复?“flink”?获取。

点个赞+在看,感谢您的肯定?👇
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-26 12:10:28  更:2021-08-26 12:10:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:02:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码