IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink学习之路-双11实时大屏背后指标计算 -> 正文阅读

[大数据]Flink学习之路-双11实时大屏背后指标计算

实时分析场景中,实时大屏似乎永远都是那最璀璨的星星。其中每年的天猫双11实时大屏几乎是家喻户晓。今天就带大家一起来看看双11实时大屏指标是如何计算的。一定要动手实现一遍,在动手过程中会发现一些问题,通过不断解决问题,才能夯实知识理解。

双11实时大屏的指标计算

这里就做一个最简单的模拟天猫实时大屏的小例子,需求如下:

1、实时计算出当天截止到当前时间的累计销售额

2、统计销售额top3的品类

3、每秒钟更新一次统计结果

代码实现

public class TianMaoGmv {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000);                                   // 设置检查点执行间隔为1分
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);    // 设置事件时间类型

        // kafka配置属性
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "demo");

        // 构造用户数据源连接器
        FlinkKafkaConsumer011<ObjectNode> orderConsumer =new FlinkKafkaConsumer011<ObjectNode>(
                "ods_order_rt", //设置订单源数据主题
                new JSONKeyValueDeserializationSchema(false),
                props);
        orderConsumer.assignTimestampsAndWatermarks(new OrderMessagePeriodicWatermarks());
        orderConsumer.setStartFromLatest();

        // 构造订单流
        DataStreamSource<ObjectNode> orderDataStreamSource = env.addSource(orderConsumer);
        SingleOutputStreamOperator<CategorySummary> categoryAgg = orderDataStreamSource.map(new MapFunction<ObjectNode, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(ObjectNode objectNode) throws Exception {
                return new Tuple2<String, Double>(objectNode.get("value").get("category").asText(), objectNode.get("value").get("amount").asDouble());
            }
        })
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
                .trigger(ContinuousEventTimeTrigger.of(Time.seconds(1)))
                .aggregate(new AmountAgg(), new WindowResult());

        categoryAgg.keyBy(x -> x.getCreateTime())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                .process(new ProcessWindowFunction<CategorySummary,GmvResult,String, TimeWindow>() {
                    /***
                     * 这里做最后的统计:
                     * 把各个分类的总额加起来,就是当天截止到目前的总销售金额
                     * 这里使用优先级队列解决分类前3销售额
                     */
                    @Override
                    public void process(String s, Context context, Iterable<CategorySummary> iterable, Collector<GmvResult> collector) throws Exception {
                        // 创建小顶堆
                        Queue<CategorySummary> queue = new PriorityQueue<>(3, (x, y) -> x.getTotalAmount() >= y.getTotalAmount() ? 1 : -1);

                        GmvResult gmvResult = new GmvResult();
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        gmvResult.setStatsTime(simpleDateFormat.format(new Date(context.window().getStart())));
                        double totalAmount = 0D;
                        for (CategorySummary item : iterable) {
                            if (queue.size() < 3) {
                                queue.add(item);
                            } else if (item.getTotalAmount() > queue.peek().getTotalAmount()) {
                                // 若当前循环元素值大于小顶堆堆顶元素,则需要移除堆顶元素并添加当前循环元素
                                queue.poll();
                                queue.add(item);
                            }

                            totalAmount += item.getTotalAmount();
                        }

                        gmvResult.setGmv(totalAmount);
                        CategorySummary categorySummary = queue.poll();
                        gmvResult.setTop3Category(new Tuple2<String,Double>(categorySummary.getCategory(), categorySummary.getTotalAmount()));
                        categorySummary = queue.poll();
                        gmvResult.setTop2Category(new Tuple2<String,Double>(categorySummary.getCategory(), categorySummary.getTotalAmount()));
                        categorySummary = queue.poll();
                        gmvResult.setTop1Category(new Tuple2<String,Double>(categorySummary.getCategory(), categorySummary.getTotalAmount()));
                        collector.collect(gmvResult);
                    }
                }).addSink(getMysqlSink("INSERT INTO ads_order_gmv_rt(stats_time,gmv,top1_category,top1_category_amount,top2_category,top2_category_amount,top3_category,top3_category_amount) VALUES(?,?,?,?,?,?,?,?)"));

        env.execute("flink kafka gmv sample");
    }

    private static SinkFunction<GmvResult> getMysqlSink(String sql) {
        return JdbcSink.sink(sql,
                new JdbcStatementBuilder<GmvResult>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, GmvResult gmvResult) throws SQLException {
                        preparedStatement.setString(1, gmvResult.getStatsTime());
                        preparedStatement.setDouble(2, gmvResult.getGmv());
                        preparedStatement.setString(3, gmvResult.getTop1Category().f0);
                        preparedStatement.setDouble(4, gmvResult.getTop1Category().f1);
                        preparedStatement.setString(5, gmvResult.getTop2Category().f0);
                        preparedStatement.setDouble(6, gmvResult.getTop2Category().f1);
                        preparedStatement.setString(7, gmvResult.getTop3Category().f0);
                        preparedStatement.setDouble(8, gmvResult.getTop3Category().f1);
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/traffic?useSSL=false")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("")
                        .build());
    }

    private static class OrderMessagePeriodicWatermarks implements AssignerWithPeriodicWatermarks<ObjectNode> {

        private long lastTs = Long.MIN_VALUE;

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(lastTs);
        }

        @Override
        public long extractTimestamp(ObjectNode jsonNodes, long l) {
            lastTs = jsonNodes.get("value").get("update_time").asLong();
            return lastTs;
        }
    }

    private static class AmountAgg implements AggregateFunction<Tuple2<String,Double>, Double, Double> {
        @Override
        public Double createAccumulator() {
            return 0D;
        }

        @Override
        public Double add(Tuple2<String, Double> stringDoubleTuple2, Double aDouble) {
            return aDouble + stringDoubleTuple2.f1;
        }

        @Override
        public Double getResult(Double aDouble) {
            return aDouble;
        }

        @Override
        public Double merge(Double aDouble, Double acc1) {
            return aDouble + acc1;
        }
    }

    private static class CategorySummary {
        private String category;
        private double totalAmount;
        private String createTime;

        public String getCategory() {
            return category;
        }

        public void setCategory(String category) {
            this.category = category;
        }

        public double getTotalAmount() {
            return totalAmount;
        }

        public void setTotalAmount(double totalAmount) {
            this.totalAmount = totalAmount;
        }

        public String getCreateTime() {
            return createTime;
        }

        public void setCreateTime(String createTime) {
            this.createTime = createTime;
        }
    }

    private static class GmvResult {
        private String statsTime;
        private Double gmv;
        private Tuple2<String,Double> top1Category;
        private Tuple2<String,Double> top2Category;
        private Tuple2<String,Double> top3Category;

        public String getStatsTime() {
            return statsTime;
        }

        public void setStatsTime(String statsTime) {
            this.statsTime = statsTime;
        }

        public Double getGmv() {
            return gmv;
        }

        public void setGmv(Double gmv) {
            this.gmv = gmv;
        }

        public Tuple2<String, Double> getTop1Category() {
            return top1Category;
        }

        public void setTop1Category(Tuple2<String, Double> top1Category) {
            this.top1Category = top1Category;
        }

        public Tuple2<String, Double> getTop2Category() {
            return top2Category;
        }

        public void setTop2Category(Tuple2<String, Double> top2Category) {
            this.top2Category = top2Category;
        }

        public Tuple2<String, Double> getTop3Category() {
            return top3Category;
        }

        public void setTop3Category(Tuple2<String, Double> top3Category) {
            this.top3Category = top3Category;
        }
    }

    private static class WindowResult implements WindowFunction<Double, CategorySummary, Tuple, TimeWindow> {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        @Override
        public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Double> iterable, Collector<CategorySummary> collector) throws Exception {
            CategorySummary categorySummary = new CategorySummary();
            categorySummary.setCategory(((Tuple1<String>)tuple).f0);
            categorySummary.setTotalAmount(iterable.iterator().next());
            categorySummary.setCreateTime(simpleDateFormat.format(new Date()));

            collector.collect(categorySummary);
        }
    }

}


public class GmvProducerTest {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 5000);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<Object, String> producer = new KafkaProducer<Object, String>(props);
        int totalMessageCount = 500;
        Random random = new Random();
        double totalAmount = 0, amount = 0;
        String category;
        JSONObject categoryAmount = new JSONObject();
        for (int i = 0; i < totalMessageCount; i++) {
            JSONObject jsonObject = new JSONObject();
            category = String.format("category%d",random.nextInt(100));
            jsonObject.put("category", category);
            amount = random.nextInt(100);
            jsonObject.put("amount", amount);
            jsonObject.put("update_time", System.currentTimeMillis());
            totalAmount += amount;
            categoryAmount.put(category, categoryAmount.getDoubleValue(category) + amount);
            producer.send(new ProducerRecord<Object, String>("ods_order_rt", jsonObject.toJSONString()), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.out.println("Failed to send message with exception " + exception);
                    }
                }
            });
            Thread.sleep(1000);
        }

        System.out.println(String.format("totalAmount=%s", totalAmount));
        for(Map.Entry<String,Object> entry: categoryAmount.entrySet()) {
            System.out.println(String.format("category=%s, categoryValue=%s", entry.getKey(), entry.getValue().toString()));
        }
        producer.close();
    }

遇到的坑

1、categoryAgg.keyBy(x -> x.getCreateTime())
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) 这段代码的第一个版本用的是TumblingEventTimeWindows,发现一直不会触发窗口计算,改成TumblingProcessingTimeWindows就可以正常触发窗口计算了。为什么用TumblingEventTimeWindows不会触发窗口计算呢?留着下篇给大家介绍下。

2、明明是一秒输出一次,为什么保存到数据库中的stats_time字段值不是按秒连续的呢?同样下一篇为大家揭晓。

欢迎关注我的个人公众号。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-28 11:22:04  更:2021-11-28 11:22:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 7:55:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码