实时分析场景中,实时大屏似乎永远都是那最璀璨的星星。其中每年的天猫双11实时大屏几乎是家喻户晓。今天就带大家一起来看看双11实时大屏指标是如何计算的。一定要动手实现一遍,在动手过程中会发现一些问题,通过不断解决问题,才能夯实知识理解。
双11实时大屏的指标计算
这里就做一个最简单的模拟天猫实时大屏的小例子,需求如下:
1、实时计算出当天截止到当前时间的累计销售额
2、统计销售额top3的品类
3、每秒钟更新一次统计结果
代码实现
public class TianMaoGmv {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // 设置检查点执行间隔为1分
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置事件时间类型
// kafka配置属性
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "demo");
// 构造用户数据源连接器
FlinkKafkaConsumer011<ObjectNode> orderConsumer =new FlinkKafkaConsumer011<ObjectNode>(
"ods_order_rt", //设置订单源数据主题
new JSONKeyValueDeserializationSchema(false),
props);
orderConsumer.assignTimestampsAndWatermarks(new OrderMessagePeriodicWatermarks());
orderConsumer.setStartFromLatest();
// 构造订单流
DataStreamSource<ObjectNode> orderDataStreamSource = env.addSource(orderConsumer);
SingleOutputStreamOperator<CategorySummary> categoryAgg = orderDataStreamSource.map(new MapFunction<ObjectNode, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(ObjectNode objectNode) throws Exception {
return new Tuple2<String, Double>(objectNode.get("value").get("category").asText(), objectNode.get("value").get("amount").asDouble());
}
})
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(1)))
.aggregate(new AmountAgg(), new WindowResult());
categoryAgg.keyBy(x -> x.getCreateTime())
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.process(new ProcessWindowFunction<CategorySummary,GmvResult,String, TimeWindow>() {
/***
* 这里做最后的统计:
* 把各个分类的总额加起来,就是当天截止到目前的总销售金额
* 这里使用优先级队列解决分类前3销售额
*/
@Override
public void process(String s, Context context, Iterable<CategorySummary> iterable, Collector<GmvResult> collector) throws Exception {
// 创建小顶堆
Queue<CategorySummary> queue = new PriorityQueue<>(3, (x, y) -> x.getTotalAmount() >= y.getTotalAmount() ? 1 : -1);
GmvResult gmvResult = new GmvResult();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
gmvResult.setStatsTime(simpleDateFormat.format(new Date(context.window().getStart())));
double totalAmount = 0D;
for (CategorySummary item : iterable) {
if (queue.size() < 3) {
queue.add(item);
} else if (item.getTotalAmount() > queue.peek().getTotalAmount()) {
// 若当前循环元素值大于小顶堆堆顶元素,则需要移除堆顶元素并添加当前循环元素
queue.poll();
queue.add(item);
}
totalAmount += item.getTotalAmount();
}
gmvResult.setGmv(totalAmount);
CategorySummary categorySummary = queue.poll();
gmvResult.setTop3Category(new Tuple2<String,Double>(categorySummary.getCategory(), categorySummary.getTotalAmount()));
categorySummary = queue.poll();
gmvResult.setTop2Category(new Tuple2<String,Double>(categorySummary.getCategory(), categorySummary.getTotalAmount()));
categorySummary = queue.poll();
gmvResult.setTop1Category(new Tuple2<String,Double>(categorySummary.getCategory(), categorySummary.getTotalAmount()));
collector.collect(gmvResult);
}
}).addSink(getMysqlSink("INSERT INTO ads_order_gmv_rt(stats_time,gmv,top1_category,top1_category_amount,top2_category,top2_category_amount,top3_category,top3_category_amount) VALUES(?,?,?,?,?,?,?,?)"));
env.execute("flink kafka gmv sample");
}
private static SinkFunction<GmvResult> getMysqlSink(String sql) {
return JdbcSink.sink(sql,
new JdbcStatementBuilder<GmvResult>() {
@Override
public void accept(PreparedStatement preparedStatement, GmvResult gmvResult) throws SQLException {
preparedStatement.setString(1, gmvResult.getStatsTime());
preparedStatement.setDouble(2, gmvResult.getGmv());
preparedStatement.setString(3, gmvResult.getTop1Category().f0);
preparedStatement.setDouble(4, gmvResult.getTop1Category().f1);
preparedStatement.setString(5, gmvResult.getTop2Category().f0);
preparedStatement.setDouble(6, gmvResult.getTop2Category().f1);
preparedStatement.setString(7, gmvResult.getTop3Category().f0);
preparedStatement.setDouble(8, gmvResult.getTop3Category().f1);
}
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/traffic?useSSL=false")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("")
.build());
}
private static class OrderMessagePeriodicWatermarks implements AssignerWithPeriodicWatermarks<ObjectNode> {
private long lastTs = Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(lastTs);
}
@Override
public long extractTimestamp(ObjectNode jsonNodes, long l) {
lastTs = jsonNodes.get("value").get("update_time").asLong();
return lastTs;
}
}
private static class AmountAgg implements AggregateFunction<Tuple2<String,Double>, Double, Double> {
@Override
public Double createAccumulator() {
return 0D;
}
@Override
public Double add(Tuple2<String, Double> stringDoubleTuple2, Double aDouble) {
return aDouble + stringDoubleTuple2.f1;
}
@Override
public Double getResult(Double aDouble) {
return aDouble;
}
@Override
public Double merge(Double aDouble, Double acc1) {
return aDouble + acc1;
}
}
private static class CategorySummary {
private String category;
private double totalAmount;
private String createTime;
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public double getTotalAmount() {
return totalAmount;
}
public void setTotalAmount(double totalAmount) {
this.totalAmount = totalAmount;
}
public String getCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
}
private static class GmvResult {
private String statsTime;
private Double gmv;
private Tuple2<String,Double> top1Category;
private Tuple2<String,Double> top2Category;
private Tuple2<String,Double> top3Category;
public String getStatsTime() {
return statsTime;
}
public void setStatsTime(String statsTime) {
this.statsTime = statsTime;
}
public Double getGmv() {
return gmv;
}
public void setGmv(Double gmv) {
this.gmv = gmv;
}
public Tuple2<String, Double> getTop1Category() {
return top1Category;
}
public void setTop1Category(Tuple2<String, Double> top1Category) {
this.top1Category = top1Category;
}
public Tuple2<String, Double> getTop2Category() {
return top2Category;
}
public void setTop2Category(Tuple2<String, Double> top2Category) {
this.top2Category = top2Category;
}
public Tuple2<String, Double> getTop3Category() {
return top3Category;
}
public void setTop3Category(Tuple2<String, Double> top3Category) {
this.top3Category = top3Category;
}
}
private static class WindowResult implements WindowFunction<Double, CategorySummary, Tuple, TimeWindow> {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Double> iterable, Collector<CategorySummary> collector) throws Exception {
CategorySummary categorySummary = new CategorySummary();
categorySummary.setCategory(((Tuple1<String>)tuple).f0);
categorySummary.setTotalAmount(iterable.iterator().next());
categorySummary.setCreateTime(simpleDateFormat.format(new Date()));
collector.collect(categorySummary);
}
}
}
public class GmvProducerTest {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 5000);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Object, String> producer = new KafkaProducer<Object, String>(props);
int totalMessageCount = 500;
Random random = new Random();
double totalAmount = 0, amount = 0;
String category;
JSONObject categoryAmount = new JSONObject();
for (int i = 0; i < totalMessageCount; i++) {
JSONObject jsonObject = new JSONObject();
category = String.format("category%d",random.nextInt(100));
jsonObject.put("category", category);
amount = random.nextInt(100);
jsonObject.put("amount", amount);
jsonObject.put("update_time", System.currentTimeMillis());
totalAmount += amount;
categoryAmount.put(category, categoryAmount.getDoubleValue(category) + amount);
producer.send(new ProducerRecord<Object, String>("ods_order_rt", jsonObject.toJSONString()), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("Failed to send message with exception " + exception);
}
}
});
Thread.sleep(1000);
}
System.out.println(String.format("totalAmount=%s", totalAmount));
for(Map.Entry<String,Object> entry: categoryAmount.entrySet()) {
System.out.println(String.format("category=%s, categoryValue=%s", entry.getKey(), entry.getValue().toString()));
}
producer.close();
}
遇到的坑
1、categoryAgg.keyBy(x -> x.getCreateTime()) .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) 这段代码的第一个版本用的是TumblingEventTimeWindows,发现一直不会触发窗口计算,改成TumblingProcessingTimeWindows就可以正常触发窗口计算了。为什么用TumblingEventTimeWindows不会触发窗口计算呢?留着下篇给大家介绍下。
2、明明是一秒输出一次,为什么保存到数据库中的stats_time字段值不是按秒连续的呢?同样下一篇为大家揭晓。
欢迎关注我的个人公众号。
|