前言
笔者我集群使用的Flink1.12.5,Hudi官方版本是不支持Flink1.12.3-Flink1.12.5之间的Flink版本,所以我对Hudi源码进行了一部分修改,最终可以实现Flink1.12.5与Hudi0.9正常使用(包括sync hive功能),理论上来说 Flink 1.12.3-Flink1.12.5之间都可以通过这套方法来进行Flink与Hudi的适配。
笔者的环境:Hadoop3.1.1、hive3.1.0 、Flink 1.12.5、Hudi0.9.0 Hudi 0.9的源码地址:https://github.com/apache/hudi/tree/release-0.9.0
以下是修改过程
-
修改hudi-release-0.9目录下的pom.xml文件 ①、注释掉hudi-integ-test 与packaging/hudi-integ-test-bundle不注释掉这两个打包的时候会报错 ②、将 <flink.version>1.12.2</flink.version> 改为 ? <flink.version>1.12.5</flink.version> -
修改 hudi-release-0.9.0/hudi-flink/src/main/java\org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
①、添加属性 private transient SubtaskGateway[] gateways; ②、实现 subtaskReady 方法,并在方法内部添加操作
@Override
public void subtaskReady(int subtask, SubtaskGateway gateway) {
this.gateways[subtask] = gateway;
}
③、实现 notifyCheckpointAborted 方法,不做任何操作 ④、修改sendCommitAckEvents()方法的返回值与捕获的异常类型如下: ⑤、删除importorg.apache.flink.runtime.operators.coordination.TaskNotRunningException; 不删除在编译过程中会报错 ⑥、在 start() 方法内添加 this.gateways = new SubtaskGateway[this.parallelism];
-
在 hudi-release-0.9.0/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java 内在 StreamWriteFunctionWrapper(String tablePath, Configuration conf) 方法内,删除 MockOperatorCoordinatorContext弟三个参数,修改后如下: -
修改hudi-release-0.9.0/packaging/hudi-flink-bundle/pom.xml文件 1. 修改下面的属性,换成适配你hive的 flink-sql-connector-hive 依赖,这个依赖maven没有,需要去Flink官网 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 下载后手动添加到本地maven仓库中,正确添加后第二条属性在idea里面会检查到,不报红,下面两条属性用的版本号要一致,
<include>org.apache.flink:flink-sql-connector-hive-2.3.6_${scala.binary.version}</include>
<profile>
<id>include-flink-sql-connector-hive</id>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.3.6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</profile>
下面是我修改后的适配hive 3.1.0版本的两条属性 两条属性的位置是不一样的 -
在hudi-release-0.9.0目录下,执行maven打包命令 hive 3 系列版本(默认打包hive 3.1.2的依赖): mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 hive 2 系列版本(默认打包hive 2.3.6的依赖): mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2 hive 1系列版本(默认打包hive1.1.0的依赖) mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive1 如果版本默认打包的hive版本号与你使用的版本号不兼容,则修改 hudi-release-0.9.0/packaging/hudi-flink-bundle/pom.xml 内 对应 flink-bundle-shade-hive 属性的具体的hive版本号 <profile>
<id>flink-bundle-shade-hive1</id>
<properties>
<hive.version>1.1.0</hive.version>
<thrift.version>0.9.2</thrift.version>
<flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
</properties>
</profile>
<profile>
<id>flink-bundle-shade-hive2</id>
<properties>
<hive.version>2.3.1</hive.version>
<flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
</properties>
<dependencies>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${hive.version}</version>
<scope>${flink.bundle.hive.scope}</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>flink-bundle-shade-hive3</id>
<properties>
<hive.version>3.1.2</hive.version>
<flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
</properties>
<dependencies>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${hive.version}</version>
<scope>${flink.bundle.hive.scope}</scope>
</dependency>
</dependencies>
</profile>
-
编译完成后,可以在hudi-release-0.9.0\packaging\hudi-flink-bundle\target下找到编译好的 hudi-flink-bundle_2.11-0.9.0.jar 包,可以在idea里外部引入这个包或者放入到Flink的lib目录下使之生效
|