IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink中间结果写入hdfs并构建hive分区表 -> 正文阅读

[大数据]flink中间结果写入hdfs并构建hive分区表

flink中间结果写HDFS

项目中遇到一个应用场景需要将flink计算的中间结果写入到hdfs中


提示:正常的kafka数据还是用flume同步至hdfs吧,用flink写hdfs会有很多问题


一、数据分桶写入

流数据写入到hdfs中是将数据写入到分桶(bucket)中。默认使用基于系统时间(yyyy-MM-dd–HH)的分桶策略。在分桶中,又根据滚动策略,将输出拆分为 part 文件,具体如下:
在这里插入图片描述
上面的文件的问题在于:文件一直处于pending状态,句柄无法关闭
记得配置checkpoint

二、代码编写

代码如下(示例):

1 写入本地文件中

       String patha = "F:\\testflinktext";
        final StreamingFileSink<String> sinka = StreamingFileSink
                .forRowFormat(new Path(patha), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(60))//多长时间运行一个文件
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(30))//多长时间没有写入就生成一个文件
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();
        flowIncreaStream.addSink(sinka);

2 写入至hdfs中

 //输出至HDFS
        String pathb = "hdfs://fh-node1:8020/hivedata/device_flowdata";
        final StreamingFileSink<String> sinkb = StreamingFileSink
                .forRowFormat(new Path(pathb), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(30))//多长时间运行一个文件
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(10))//多长时间没有写入就生成一个文件
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();
        flowIncreaStream.addSink(sinkb);

三 创建hive表,并增加分区

1 分区表创建

CREATE EXTERNAL TABLE `ssa_flow_device_increment`(
`fromMac` string COMMENT 'mac',
`region` string COMMENT '区域',
`update_time` string COMMENT '更新时间')
COMMENT '抖音订单行表落地表'
PARTITIONED BY (`statis_time` string COMMENT '分区字段')
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://fh-node1:8020/hivedata/device_flowdata/'

2 定时增加分区(dolphin中调度)

sql:

alter table ssa_flow_device_increment
add partition(statis_time='${statis_time}') 
location '/hivedata/device_flowdata/${statis_time}';

调度任务:

hive -hivevar statis_time='${statis_time}' -S -f hdfs://nncluster/data/dolphinscheduler/dolphinscheduler/resources/sql/ssa_flow_device_increment_partition.sql 

参数配置:
在这里插入图片描述

四 flink写hdfs目前存在的问题

截止目前,Flink 的 Streaming File Sink 仍存在不少问题,如:

它只支持 Hadoop 2.7 以上的版本,因为需要用到高版本文件系统提供的 truncate 方法来实现故障恢复。 不支持写入到
Hive。
写入HDFS时,会产生大量的小文件。
当程序突然停止时,文件仍处于inprogress状态。
默认桶下的文件名是 part-{parallel-task}-{count}。当程序重启时,从上次编号值加1继续开始。前提是程序是正常停止
除了使用StreamingFileSink外,还可以使用BucketingSink.
StreamingFileSink API 见 这里。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-06 09:53:50  更:2021-08-06 09:54:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 17:02:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码