前言
再来一个Flink的stream的example,提前先说下,官网的例子有点坑。
一、stream例子
拷贝到我的目录(这里顺便说下一个好的工具用起来真香,idea居然可以复制的代码,粘贴自动跟我创建类) 你想文什么,我知道,先别问,继续看下面。
二、example整理
1.依赖引入
你拷贝到你的demo项目,在自动引包的时候,会发现很多缺很多对象。 首先需要引入flink-connector-files
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
2.为什么会有TextLineInputFormat类
做完上面的步骤,应该会发现缺少这个类。首先说明,这里的示例居然是根据最新的快照版本写的,返回去看example的pom会发现。
我从maven下载到这个版本,用luyten打开jar,发现还真有这个类。 于是就新建了这样一个类,这里要说下,如果是用最新版本1.14.4版本,其实那行代码换个写法也是可以的。
FileSource.FileSourceBuilder<String> builder =
FileSource.forRecordStreamFormat(new TextLineFormat()
, params.getInputs().get());
这个最终解释权,还是来源官网,在官网1.14.4版本的Search框输入Text,就会出现Text Files的下拉了,点击,就会跳转到语法使用示例
3.还有报错提示
点击MemorySize的方法ofMebiBytes看实现,确定可以直接写long值,另外,下面Duration这里也爆红,改:
counts.sinkTo(
FileSink.<Tuple2<String, Integer>>forRowFormat(
params.getOutput().get(), new SimpleStringEncoder<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(20)
.withRolloverInterval(10)
.build())
.build())
.name("file-sink");
三、运行example
报错,原因查了下没有说明,看这个字面意思是,无法使用私有最终字节。。。 PS:说到这突然想到,jdk在9就将实现由char[]改成了byte[]了,意义是可以节省占用内存。 这里其实我怀疑是不是jdk17有bug,最后突然想到官网建demo项目脚本的地方有这样的2句话: 于是,切换demo的build&run的jdk版本,idea里在这里切换: 再次运行 熟悉的分区统计又出来了,是不是很开心!!!
总结
1、看了官网的example还很多,后面的学习可能不能每个跟大家都分享了,有些我们肯定也暂时不用。 2、我的CSDN学习会员里Flink的这个课程有100节课,学习要加快,可能后面都有可能没有专门的时间学习了,只能在实战中get了。 希望能帮到大家哦,下班了,回家。
|