flink中间结果写HDFS
项目中遇到一个应用场景需要将flink计算的中间结果写入到hdfs中
提示:正常的kafka数据还是用flume同步至hdfs吧,用flink写hdfs会有很多问题
一、数据分桶写入
流数据写入到hdfs中是将数据写入到分桶(bucket)中。默认使用基于系统时间(yyyy-MM-dd–HH)的分桶策略。在分桶中,又根据滚动策略,将输出拆分为 part 文件,具体如下: 上面的文件的问题在于:文件一直处于pending状态,句柄无法关闭 记得配置checkpoint
二、代码编写
代码如下(示例):
1 写入本地文件中
String patha = "F:\\testflinktext";
final StreamingFileSink<String> sinka = StreamingFileSink
.forRowFormat(new Path(patha), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(60))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(30))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
flowIncreaStream.addSink(sinka);
2 写入至hdfs中
String pathb = "hdfs://fh-node1:8020/hivedata/device_flowdata";
final StreamingFileSink<String> sinkb = StreamingFileSink
.forRowFormat(new Path(pathb), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(30))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(10))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
flowIncreaStream.addSink(sinkb);
三 创建hive表,并增加分区
1 分区表创建
CREATE EXTERNAL TABLE `ssa_flow_device_increment`(
`fromMac` string COMMENT 'mac',
`region` string COMMENT '区域',
`update_time` string COMMENT '更新时间')
COMMENT '抖音订单行表落地表'
PARTITIONED BY (`statis_time` string COMMENT '分区字段')
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://fh-node1:8020/hivedata/device_flowdata/'
2 定时增加分区(dolphin中调度)
sql:
alter table ssa_flow_device_increment
add partition(statis_time='${statis_time}')
location '/hivedata/device_flowdata/${statis_time}';
调度任务:
hive -hivevar statis_time='${statis_time}' -S -f hdfs://nncluster/data/dolphinscheduler/dolphinscheduler/resources/sql/ssa_flow_device_increment_partition.sql
参数配置:
四 flink写hdfs目前存在的问题
截止目前,Flink 的 Streaming File Sink 仍存在不少问题,如:
它只支持 Hadoop 2.7 以上的版本,因为需要用到高版本文件系统提供的 truncate 方法来实现故障恢复。 不支持写入到
Hive。
写入HDFS时,会产生大量的小文件。
当程序突然停止时,文件仍处于inprogress状态。
默认桶下的文件名是 part-{parallel-task}-{count}。当程序重启时,从上次编号值加1继续开始。前提是程序是正常停止
除了使用StreamingFileSink外,还可以使用BucketingSink.
StreamingFileSink API 见 这里。
|