(4) ProcessWindowFunction with Incremental Aggregation(与增量聚合结合)
-
可将ProcessWindowFunction 与增量聚合函数ReduceFunction 、AggregateFunction 结合。 -
元素到达窗口时增量聚合,当窗口关闭时对增量聚合的结果用ProcessWindowFunction 再进行全量聚合。 -
既能够增量聚合,也能够访问窗口的元数据信息(如开始结束时间、状态等)。 DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
kafkaStream
.process(new KafkaProcessFunction())
.assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
.keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
.timeWindow(Time.seconds(windowLengthSeconds))
.reduce(
new ReduceFunction<UserActionLog>() {
@Override
public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {
return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;
}
},
new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {
UserActionLog max = elements.iterator().next();
String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的最大价值对应的那条记录: "+max;
out.collect(record);
}
}
)
.print();
Key: user_2 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_2', eventTime='2019-11-09 13:54:10', eventType='browse', productID='product_3', productPrice=30}
Key: user_4 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_4', eventTime='2019-11-09 13:54:15', eventType='browse', productID='product_3', productPrice=30}
Key: user_3 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_3', eventTime='2019-11-09 13:54:12', eventType='browse', productID='product_2', productPrice=20}
Key: user_5 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_5', eventTime='2019-11-09 13:54:17', eventType='browse', productID='product_2', productPrice=20}
(5)ProcessWindowFunction与AggregateFunction结合 DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
kafkaStream
.process(new KafkaProcessFunction())
.assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
.keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
.timeWindow(Time.seconds(windowLengthSeconds))
.aggregate(
new AggregateFunction<UserActionLog, Tuple2<Long, Long>, Double>() {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {
accumulator.f0 += 1;
accumulator.f1 += value.getProductPrice();
return accumulator;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
acc1.f0 += acc2.f0;
acc1.f1 += acc2.f1;
return acc1;
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return accumulator.f1 / (accumulator.f0 * 1.0);
}
},
new ProcessWindowFunction<Double, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Double> elements, Collector<String> out) throws Exception {
Double avg = elements.iterator().next();
String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的平均价值: "+String.format("%.2f",avg);
out.collect(record);
}
}
)
.print();
Key: user_2 窗口开始时间: 2019-11-09 14:05:40 窗口结束时间: 2019-11-09 14:05:50 浏览的商品的平均价值: 13.33
Key: user_3 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 25.00
Key: user_4 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 20.00
Key: user_2 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 30.00
Key: user_5 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 20.00
Key: user_1 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 23.33
(6)WindowFunction (Legacy)
In some places where a ProcessWindowFunction can be used you can also use a WindowFunction . This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
|