1、TimeWindow?
package com.atguigu.Bwindow;
import com.atguigu.Fbeans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.swing.text.html.HTMLDocument;
/**
* window上可以安装netcat,安装好了之后通过cmd打开,输入“nc -l -p 7777”即可启动服务端发送消息
* 本类正好监控该7777端口,每次nc服务端发送消息,本类都能够按key分类统计出现的次数
* 消息类型:sensor_1,1547718199,35.8 sensor_6,1547718201,15.4
*
* 窗口分类:
* 滑动滚动的区别在于滑动窗口步长不等于窗口长度,滚动则是等于。
* 窗口大概分为时间和数量窗口,每一类又可以细分为滚动和滑动。
* 时间窗口:每隔几秒统计最近几秒的数据。(用得多)。
* 滑动时间窗口:
* keyBy("id").window(SlidingProcessingTimeWindows.of(Time.Seconds(10),Time.Seconds(5))).maxBy("id"); 步长5
* 简写:keyBy("id").timeWindow(Time.Seconds(10),Time.Seconds(5)).maxBy("id"); 步长5
* 滚动时间窗口:
* keyBy("id").window(TumblingProcessingTimeWindows.of(Time.Seconds(5))).maxBy("id"); 窗口和步长都是5
* 简写:keyBy("id").timeWindow(Time.Seconds(5)).maxBy("id"); 窗口和步长都是5
* session会话窗口:
* 设置一个session间隔,如果两条消息间隔大于session间隔,则后一条信息加入新窗口并触发上一个窗口的计算。
* keyBy("id").window(ProcessingTimeSessionWindow.withGap(Time.Seconds(10))).maxBy("id");
* 没有简写。还一个类DynamicProcessingTimeSessionWindow
* 数量窗口:每隔几个数据统计最近几个数据。(用得少)
* 滑动数量窗口:
* keyBy("id").countWindow(5,3).maxBy("id") 最近5条消息,相同的key出现了三次的数据
* 滑动数量窗口:
* keyBy("id").countWindow(5).maxBy("id") 最近5条消息,相同的key出现了5次的数据
*
*/
public class ATimeWindowTest {
public static void main(String[] args) throws Exception {
//创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据并包装成pojo
DataStreamSource<String> inputStream = env. socketTextStream("localhost",7777);
DataStream<SensorReading> mapStream = inputStream.map(line -> {
String[] splits = line.split(",");
//这儿toString是为了数据传输时方便使用simpleStringSchema
return new SensorReading(new String(splits[0]), new Long(splits[1]), new Double(splits[2]));
});
/**
* 基础知识:
* 一个窗口操作由窗口分配器window[All]()方法紧随其后的窗口函数(一般是聚合操作)组成。
* window方法之前必须有keyBy分组,windowAll则不用,windowAll之后所有数据会被传递到下一个算子的一个分区上执行。
* 所谓窗口函数有分为两类:
* 增量聚合函数:每当一条数据进入窗口,就进行一次计算。窗口中保存一个聚合状态。
* 如:max\min\count\avg\maxBy\minBy\
* reduce(new ReduceFunction(){})\aggregate(new AggregateFunction(){})都是增量聚合函数。
* 全窗口函数:窗口中所有数据全部收集完毕,遍历窗口所有数据完成计算。
* 如:apply(new WindowFunction) 或者process(new PrcessWindowFunction)
* process比apply更为强大,能获取到上下文
* 以下测试增量聚合函数:AggregateFunction
*
*/
DataStream<Integer> result1 = mapStream.keyBy(SensorReading::getId)
.timeWindow(Time.seconds(10)) //步长=窗口长度 为滚动窗口
//泛型一:输入类型 泛型二:累加器类型 泛型三:输出类型
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
@Override
public Integer createAccumulator() {//创建累加器并赋初值
return 0;
}
@Override
public Integer add(SensorReading sensorReading, Integer accu) {//累加器每次累加多少
return accu + 1;
}
@Override
public Integer getResult(Integer accu) {//给外部返回信息
return accu;
}
@Override
public Integer merge(Integer a, Integer b) {//一般只有session会话窗口会用这个方法
return a + b;
}
});
/**
*
* 以下测试全窗口函数:apply(new WindowFunction)
*/
DataStream<Object> result2 = mapStream.keyBy(SensorReading::getId)
.timeWindow(Time.seconds(15))//步长=窗口长度 为滚动窗口
//泛型一:输入类型 泛型二:输出类型 泛型三:当前分组key的类型 泛型四:窗口类型
.apply(new WindowFunction<SensorReading, Object, String, TimeWindow>() {
@Override
public void apply(String str, TimeWindow window, Iterable<SensorReading> in, Collector<Object> out) throws Exception {
String key=str; //获取分组id
long end = window.getEnd(); //获取窗口结束时间
int size = IteratorUtils.toList(in.iterator()).size();//获取窗口内同key数据的个数
out.collect(key+"========="+end+"============="+size);
}
});
//执行
//result1.print("增量聚合函数aggragate");
result2.print("全窗口函数apply!");
env.execute("测试window窗口分配器和窗口函数");
}
}
2、CountWindow
|