IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink(六):Tumbling Windows 例子 -> 正文阅读

[大数据]Flink(六):Tumbling Windows 例子

一、简介

? ? ??滚动窗口将每个元素分配给指定窗口大小的窗口。滚动窗口具有固定大小并且不重叠。例如,如果指定一个大小为 5 分钟的滚动窗口,则将评估当前窗口,并每隔五分钟启动一个新窗口,我们实现对应的例子

? ? ?Flink(四) :窗口简介_在前进的路上-CSDN博客

二、代码

? ? ?


import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class WindowsTumblingWindowsTest {

    public static void main(String[] args) {
    try {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties producerProperties = new Properties();
        producerProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        producerProperties.setProperty("group.id", "test");

        producerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        producerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("test_window", new SimpleStringSchema(), producerProperties);

        //Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
        consumer.setStartFromGroupOffsets();
        //添加数据源
        DataStreamSource<String> sourceStream = env.addSource(consumer);
         //指定事件时间字段
        DataStream<Map<String, Object>> dataStream = sourceStream.map(new MapFunction<String, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(String value) throws Exception {
                return (Map)JSON.parse(value);
            }
        }) .assignTimestampsAndWatermarks(
                WatermarkStrategy.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(2)) 
                        .withTimestampAssigner(new SerializableTimestampAssigner<Map<String, Object>>() {
                            @Override
                            public long extractTimestamp(Map<String, Object> map, long recordTimestamp) {
                                return Long.parseLong(map.get("time").toString());
                            }
                        })
        );
        //指定key
        DataStream windowDataStream = dataStream.keyBy(new KeySelector<Map<String, Object>, Tuple1<String>>() {
            @Override
            public Tuple1<String> getKey(Map<String, Object> value) throws Exception {

                return Tuple1.of(value.get("key").toString());
            }
        })
                .window(TumblingEventTimeWindows.of(Time.seconds(5))) //指定窗口时间5秒
.process(new ProcessWindowFunction<Map<String, Object>, Tuple4<String, Integer, Long, Long>, Tuple1<String>, TimeWindow>() {
                    @Override
                    public void process(Tuple1<String> stringTuple1, Context context, Iterable<Map<String, Object>> elements, Collector<Tuple4<String, Integer, Long, Long>> out) throws Exception {
                        Tuple4 tp4 = new Tuple4();
                          int count=0;
                        for (Map<String, Object> map : elements) {
                            count=count+1;
                           
                            tp4.f0 = map.get("key").toString();
                        }
                          tp4.f1 = count;
                        tp4.f2 = context.window().getStart();
                        tp4.f3 = context.window().getEnd();
                        out.collect(tp4);
                    }
                });

        windowDataStream.print();

        env.execute("test_windows");
    }catch (Exception e){

        e.printStackTrace();
    }
    }

}

说明

?1、设置水位线

? ?WatermarkStrategy<Map<String,Object>>forBoundedOutOfOrderness(Duration.ofSeconds(2))?

2、TumblingEventTimeWindows窗口时间

? ? TumblingEventTimeWindows.of(Time.seconds(5)

三、执行结果

发送数据 ??

{"key":"002","time":1642263310000}
{"key":"002","time":1642263311000}
{"key":"002","time":1642263313000}
{"key":"002","time":1642263315000}
{"key":"002","time":1642263316000}
{"key":"002","time":1642263317000}
{"key":"002","time":1642263318000}
{"key":"002","time":1642263319000}
{"key":"002","time":1642263320000}
??

窗口统计的执行结果

  1. 7> (002,3,1642263310000,1642263315000)

  2. 7> (002,5,1642263315000,1642263320000)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-13 21:52:47  更:2022-03-13 21:53:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 8:42:01-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码