说明
要输出规范的时间窗口数据,00:00—02:00、02:00-04:00
package flink;
import bi.MainFunc;
import bi.bean.SolomeBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
public class WindowHandle {
private final static Long dayMilliSecond = 86400000L;
private final static Long hourMilliSecond = 3600000L;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
env.enableCheckpointing(1000);
SingleOutputStreamOperator<Tuple3<String, Date, Long>> jsonDS = env.addSource(MainFunc.getHandKafka())
.map(x -> JSON.parseObject(x, SolomeBean.class))
.filter(x -> x.getCityAreaId() != null)
.map(new MapFunction<SolomeBean, Tuple3<String, Date, Long>>() {
@Override
public Tuple3<String, Date, Long> map(SolomeBean x) throws Exception {
Tuple3<String, Date, Long> ofResult = Tuple3.of(x.getCityAreaId(), x.getCreateTime(), 1L);
return ofResult;
}
});
WindowedStream<Tuple3<String, Date, Long>, String, TimeWindow> window = jsonDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<String, Date, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((solomeBean, timestamp) -> solomeBean.f1.getTime())
).keyBy(x -> x.f0)
.window(new WindowsHandle());
window.sum(2)
.keyBy(x->x.f0)
.process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2< String,Long>>() {}))
.print();
env.execute();
}
public static class WindowsHandle extends WindowAssigner<Object, TimeWindow> {
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
Long timeValueStart = getTimeValue(timestamp);
return Collections.singletonList(new TimeWindow(timeValueStart, timeValueStart + 7200000L));
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
EventTimeTrigger eventTimeTrigger = EventTimeTrigger.create();
return eventTimeTrigger;
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
public static Long getTimeValue(Long timeStamp){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date parse20 = null;
try {
parse20 = simpleDateFormat.parse("2020-01-01 00:00:00");
} catch (ParseException e) {
e.printStackTrace();
}
long interverTime = timeStamp - parse20.getTime();
long moreTime = interverTime % dayMilliSecond;
long secodTime = moreTime / hourMilliSecond;
Long windowHourNum = 0L;
if (secodTime == 0){
windowHourNum = 0L;
}else if (secodTime %2 ==0){
windowHourNum = secodTime;
}else {
windowHourNum = secodTime - 1;
}
long startTime = (interverTime - moreTime) + windowHourNum * hourMilliSecond + parse20.getTime();
return startTime;
}
}
package flink;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class OutputOrderGmvProcessFunc extends KeyedProcessFunction<String, Tuple3<String, Date, Long>, Tuple2<String,Long>> {
private static final long serialVersionUID = 1L;
private static MapState<String,Long> state;
private static PreparedStatement preparedStatement;
private final static Long dayMilliSecond = 86400000L;
private final static Long hourMilliSecond = 3600000L;
private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void processElement(Tuple3<String, Date, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Long timeValue = getTimeValue(value.f1.getTime());
Date date = new Date(timeValue);
String format = simpleDateFormat.format(date);
out.collect(Tuple2.of(value.f0+"::"+format,value.f2));
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
state = getRuntimeContext().getMapState(new MapStateDescriptor<>("redult_map",String.class,Long.class));
Class.forName("com.mysql.jdbc.Driver");
Connection conn = DriverManager.getConnection("jdbc:mysql://192.168.5.14:3306/cmdata","root","pl1kxWcm9_t4DnpMI");
preparedStatement = conn.prepareStatement("SELECT area_name FROM sys_area sa where area_id = ?");
}
@Override
public void close() throws Exception {
state.clear();
super.close();
}
public static Long getTimeValue(Long timeStamp){
Date parse20 = null;
try {
parse20 = simpleDateFormat.parse("2020-01-01 00:00:00");
} catch (ParseException e) {
e.printStackTrace();
}
long interverTime = timeStamp - parse20.getTime();
long moreTime = interverTime % dayMilliSecond;
long secodTime = moreTime / hourMilliSecond;
Long windowHourNum = 0L;
if (secodTime == 0){
windowHourNum = 0L;
}else if (secodTime %2 ==0){
windowHourNum = secodTime;
}else {
windowHourNum = secodTime - 1;
}
long startTime = (interverTime - moreTime) + windowHourNum * hourMilliSecond + parse20.getTime();
return startTime;
}
}
value的值发生变化再刷新到数据库
public static class OutputOrderGmvProcessFunc extends KeyedProcessFunction<String,Tuple3<String, Date, Long>,Tuple2<String,Long>> {
private static final long serialVersionUID = 1L;
private static MapState<String,Long> state;
@Override
public void processElement(Tuple3<String, Date, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
String key = value.f0;
Long aLong = state.get(key);
if (aLong == null || aLong < value.f2){
state.put(key,value.f2);
System.out.println(key + ":" + value + "状态进行了更新");
out.collect(Tuple2.of(key,value.f2));
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
state = getRuntimeContext().getMapState(new MapStateDescriptor<>("redult_map",String.class,Long.class));
}
@Override
public void close() throws Exception {
state.clear();
super.close();
}
}
|