<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wm.analyse</groupId>
<artifactId>wm-offline-analysis-scala-flink</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
使用table api 实现车辆行程阶段数据划分聚合
package test;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.*;
public class Test1 {
public static void main(String[] args) throws Exception {
String filePath = "src\\main\\resources\\testData.csv";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5 * 60, CheckpointingMode.EXACTLY_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<String> inputStream = env.readTextFile(filePath);
DataStream<VinBean> dataStraem = inputStream.map(line -> {
String[] filds = line.split(",");
return new VinBean(filds[0], Long.parseLong(filds[1]) * 1000, filds[2], Integer.parseInt(filds[3]), Double.parseDouble(filds[4]), Double.parseDouble(filds[5]), Double.parseDouble(filds[6]));
});
DataStream<VinBean> dataStraem2 = dataStraem.assignTimestampsAndWatermarks(WatermarkStrategy
.<VinBean>forBoundedOutOfOrderness(Duration.ofSeconds(10l))
.withTimestampAssigner((vinBean, timestamp) -> (vinBean.getCarTime()))
.withIdleness(Duration.ofMinutes(10l)));
Table inputTable = tableEnv.fromDataStream(dataStraem2, $("vin"), $("carTime"), $("battery_code"), $("BMS_DCS_ActOprtMode"),
$("BMS_MaxAllowChrgCur_DC"), $("BMS_CellVolMax"), $("BMS_CellVolMin"), $("carTime").rowtime().as("ts"));
inputTable.printSchema();
tableEnv.createTemporarySystemFunction("ifFun", IfFun.class);
Table aggVin = inputTable
.window(Over.partitionBy($("vin")).orderBy($("ts")).preceding(rowInterval(1l)).as("w"))
.select($("vin"), $("ts"), $("carTime"), $("carTime").min().over($("w")).as("min_carTime"))
.select($("vin"), $("ts"), $("carTime"), $("min_carTime"), $("carTime").minus($("min_carTime")).isGreater(10 * 1000).then(1, 0).as("flag"))
.window(Over.partitionBy($("vin")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w2"))
.select($("vin"), $("ts"), $("carTime"), $("flag").sum().over($("w2")).as("sum_flag"))
.window(Over.partitionBy($("vin"), $("sum_flag")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w3"))
.select($("vin"), $("ts"), $("carTime").min().over($("w3")).as("start_time"), $("carTime").max().over($("w3")).as("end_time"), $("sum_flag"))
.window(Tumble.over(lit(10l).seconds()).on($("ts")).as("w4"))
.groupBy($("vin"),$("w4"),$("sum_flag"))
.select($("vin"), $("start_time").min().as("start_time"), $("end_time").max().as("end_time"));
tableEnv.toRetractStream(aggVin, Row.class).print();
env.execute();
}
}
用table api实现join获取每条数据5秒之前的数据
提前对两个流的时间进行处理,然后开启滚动窗口,使他们在窗口内join,这样既能避免频繁写数据库和无限join,又能保证在同一时间窗口一定能join上数据
package test;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.*;
public class Test1 {
public static void main(String[] args) throws Exception {
String filePath = "src\\main\\resources\\testData.csv";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5 * 60, CheckpointingMode.EXACTLY_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<String> inputStream = env.readTextFile(filePath);
DataStream<VinBean> dataStraem = inputStream.map(line -> {
String[] filds = line.split(",");
return new VinBean(filds[0], Long.parseLong(filds[1]) * 1000, filds[2], Integer.parseInt(filds[3]), Double.parseDouble(filds[4]), Double.parseDouble(filds[5]), Double.parseDouble(filds[6]));
});
DataStream<VinBean> dataStraem2 = dataStraem.assignTimestampsAndWatermarks(WatermarkStrategy
.<VinBean>forBoundedOutOfOrderness(Duration.ofSeconds(10l))
.withTimestampAssigner((vinBean, timestamp) -> (vinBean.getCarTime()))
.withIdleness(Duration.ofMinutes(10l)));
DataStream<VinBean> dataStraem3 = inputStream.map(line -> {
String[] filds = line.split(",");
return new VinBean(filds[0], Long.parseLong(filds[1]) * 1000 + 5000, filds[2], Integer.parseInt(filds[3]), Double.parseDouble(filds[4]), Double.parseDouble(filds[5]), Double.parseDouble(filds[6]));
});
DataStream<VinBean> dataStraem4 = dataStraem3.assignTimestampsAndWatermarks(WatermarkStrategy
.<VinBean>forBoundedOutOfOrderness(Duration.ofSeconds(10l))
.withTimestampAssigner((vinBean, timestamp) -> (vinBean.getCarTime()))
.withIdleness(Duration.ofMinutes(10l)));
tableEnv.createTemporarySystemFunction("concatFun", ConcatFun.class);
Table inputTable = tableEnv.fromDataStream(dataStraem2, $("vin"), $("carTime"), $("battery_code"), $("BMS_DCS_ActOprtMode"),
$("BMS_MaxAllowChrgCur_DC"), $("BMS_CellVolMax"), $("BMS_CellVolMin"), $("carTime").rowtime().as("ts"))
.window(Tumble.over(lit(10l).seconds()).on($("ts")).as("w"))
.groupBy($("vin"),$("ts"), $("w"))
.select($("vin"), $("ts"),$("w").start().as("start"),$("w").end().as("end"))
.addOrReplaceColumns($("ts").cast(DataTypes.STRING()).as("ts"))
.addColumns(call(ConcatFun.class, $("vin"), $("ts")).as("vin_ts"));
Table inputTable2 = tableEnv.fromDataStream(dataStraem4, $("vin").as("vin2"), $("carTime").as("carTime2"),
$("BMS_CellVolMax").as("BMS_CellVolMax2"), $("carTime").rowtime().as("ts2"))
.window(Tumble.over(lit(10l).seconds()).on($("ts2")).as("w"))
.groupBy($("vin2"),$("ts2"), $("w"))
.select($("vin2"),$("ts2"))
.addOrReplaceColumns($("ts2").cast(DataTypes.STRING()).as("ts2"))
.addColumns(call(ConcatFun.class, $("vin2"), $("ts2")).as("vin_ts2"));
inputTable.printSchema();
inputTable2.printSchema();
Table joined = inputTable.leftOuterJoin(inputTable2, $("vin_ts").isEqual($("vin_ts2")));
joined.printSchema();
tableEnv.toRetractStream(joined, Row.class).print();
env.execute();
}
}
|