前言
今天主要是过下Flink的DataStream API入门,编码基本套路、数据源等等。 昨天部门经理把项目的demo权限放开了,看了下,基本的大流程已经实现了。使用的是1.14.3版本(现在1.14.4),代码最后git时间是3.28。不得不说,demo基本把要改造的大流程写好了,还是强啊。所以说还是要保持好学习能力,是不是要走领导岗位就看个人了(不过不得不说当了一定级别的领导菜有更多的时间study up。这个级别不能低也不能高)。
一、Flink编码套路
运行模型 数据源 自定义数据源示例 转换 DataSink
二、迭代运算
1.简单理解迭代运算
2.流式迭代运算
3.延迟控制
理解梳理: 吞吐针对整体而言,延迟针对个体而言。
三、Flink调试
1、调试手段
PS: 显式创建本地环境不推荐这样写,getExecutionEnvironment方法里会判断环境,并创建环境。
2、数据模拟
实际上1.14.4里DataStreamUtil.collect()相关方法已过时,替代的是对keyedStream的reinterpretAsKeyedStream
四、流式迭代demo
业务需求:输入一组数据,分别进行减1运算,直到等于0为止
package spendreport.stream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class IterativeStreamJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
DataStream<Long> input = env.generateSequence(0, 10);
IterativeStream<Long> itStream = input.iterate();
DataStream<Long> minusOne = itStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1;
}
});
DataStream<Long> greaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
itStream.closeWith(greaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
KeyedStream<Long, Object> ls = DataStreamUtils.reinterpretAsKeyedStream(lessThanZero,
new KeySelector<Long, Object>() {
@Override
public Object getKey(Long value) throws Exception {
return value;
}
});
ls.print();
env.execute("IterativeStreamJob");
}
}
总结
流式处理的迭代写法有点不好理解,跟我们平常的迭代方法区别有点大吧。其实用流水作业想就好理解多了,后面也许有高级的简单写法。 好,就写到这里,up!!!
|