IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink给元素指定窗口 -> 正文阅读

[大数据]flink给元素指定窗口

说明

要输出规范的时间窗口数据,00:00—02:00、02:00-04:00

package flink;

import bi.MainFunc;
import bi.bean.SolomeBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;

/**
 * 自定义分区器来实现自定义时间窗口配置
 */
public  class WindowHandle {

    private final static Long dayMilliSecond = 86400000L;
    private final static Long hourMilliSecond = 3600000L;

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //设置状态存储

        env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        env.enableCheckpointing(1000);

        SingleOutputStreamOperator<Tuple3<String, Date, Long>> jsonDS = env.addSource(MainFunc.getHandKafka())
                .map(x -> JSON.parseObject(x, SolomeBean.class))
                .filter(x -> x.getCityAreaId() != null)
                .map(new MapFunction<SolomeBean, Tuple3<String, Date, Long>>() {
                    @Override
                    public Tuple3<String, Date, Long> map(SolomeBean x) throws Exception {
                        Tuple3<String, Date, Long> ofResult = Tuple3.of(x.getCityAreaId(), x.getCreateTime(), 1L);
                        return ofResult;
                    }
                });


        WindowedStream<Tuple3<String, Date, Long>, String, TimeWindow> window = jsonDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Date, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间
                        .withTimestampAssigner((solomeBean, timestamp) -> solomeBean.f1.getTime())//指定事件时间列
        ).keyBy(x -> x.f0)
                .window(new WindowsHandle());

        window.sum(2)
                .keyBy(x->x.f0)
                .process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2< String,Long>>() {}))//keyedProcessFunc处理
                .print();



        env.execute();
    }

//元素的窗口分配
    public static class WindowsHandle extends WindowAssigner<Object, TimeWindow> {

        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            //返回数据属于的窗口
            Long timeValueStart = getTimeValue(timestamp);
//            System.out.println(new Date(timeValueStart)+"::"+new Date(timeValueStart + 7200000L));
            return Collections.singletonList(new TimeWindow(timeValueStart, timeValueStart + 7200000L));
        }

        @Override//不用Objec报错
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            EventTimeTrigger eventTimeTrigger = EventTimeTrigger.create();
            return eventTimeTrigger;
        }

        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {

            return  new TimeWindow.Serializer();
        }

        @Override
        public boolean isEventTime() {
            return true;
        }

        //按照0-2-4-6—。。。的方式返回该时间戳应该属于的时间段
            public static Long getTimeValue(Long timeStamp){
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                Date parse20 = null;
                try {
                    parse20 = simpleDateFormat.parse("2020-01-01 00:00:00");
                } catch (ParseException e) {
                    e.printStackTrace();
                }

                long interverTime = timeStamp - parse20.getTime();//间隔毫秒数
                long moreTime = interverTime % dayMilliSecond;//多出来的非整数天数毫秒数
                long secodTime = moreTime / hourMilliSecond;//整小时数
                //获取窗口小时数
                Long windowHourNum = 0L;
                if (secodTime == 0){
                    windowHourNum = 0L;
                }else if (secodTime %2 ==0){
                    windowHourNum = secodTime;
                }else {
                    windowHourNum = secodTime - 1;
                }
                //返回窗口开始时
                long startTime = (interverTime - moreTime) + windowHourNum * hourMilliSecond + parse20.getTime();
                return startTime;
            }

    }
package flink;

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public  class OutputOrderGmvProcessFunc extends KeyedProcessFunction<String, Tuple3<String, Date, Long>, Tuple2<String,Long>> {
    private static final long serialVersionUID = 1L;
    private static MapState<String,Long> state;//存储城市和数量
    private static PreparedStatement preparedStatement;
    private final static Long dayMilliSecond = 86400000L;
    private final static Long hourMilliSecond = 3600000L;
    private final  static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");



    @Override
    public void processElement(Tuple3<String, Date, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        Long timeValue = getTimeValue(value.f1.getTime());
        Date date = new Date(timeValue);
        String format = simpleDateFormat.format(date);
//        String key = value.f0;//城市的编号
//        Long aLong = state.get(key);
//        String cityName = null;
//        if (aLong == null || aLong < value.f2){//先前没有记录或者数量有了更新,进行外部数据库的写入,状态刷新
//            //从mysql中获取城市名字
//            preparedStatement.setString(1,key);
//            ResultSet resultSet = preparedStatement.executeQuery();
//            while (resultSet.next()){
//                cityName = resultSet.getString(1);
//            }
//            state.put(key,value.f2);
//        }
        out.collect(Tuple2.of(value.f0+"::"+format,value.f2));
    }

    @Override
    public void open(Configuration parameters) throws Exception {//初始化,执行之前获取map状态对象
        super.open(parameters);
        state = getRuntimeContext().getMapState(new MapStateDescriptor<>("redult_map",String.class,Long.class));
        //mysql连接对象
        Class.forName("com.mysql.jdbc.Driver");
        Connection conn = DriverManager.getConnection("jdbc:mysql://192.168.5.14:3306/cmdata","root","pl1kxWcm9_t4DnpMI");
        preparedStatement = conn.prepareStatement("SELECT area_name FROM  sys_area sa where area_id = ?");
    }

    @Override
    public void close() throws Exception {
        state.clear();
        super.close();
    }

    public static Long getTimeValue(Long timeStamp){

        Date parse20 = null;
        try {
            parse20 = simpleDateFormat.parse("2020-01-01 00:00:00");
        } catch (ParseException e) {
            e.printStackTrace();
        }

        long interverTime = timeStamp - parse20.getTime();//间隔毫秒数
        long moreTime = interverTime % dayMilliSecond;//多出来的非整数天数毫秒数
        long secodTime = moreTime / hourMilliSecond;//整小时数
        //获取窗口小时数
        Long windowHourNum = 0L;
        if (secodTime == 0){
            windowHourNum = 0L;
        }else if (secodTime %2 ==0){
            windowHourNum = secodTime;
        }else {
            windowHourNum = secodTime - 1;
        }
        //返回窗口开始时
        long startTime = (interverTime - moreTime) + windowHourNum * hourMilliSecond + parse20.getTime();
        return startTime;
    }
}


value的值发生变化再刷新到数据库

public static class OutputOrderGmvProcessFunc extends KeyedProcessFunction<String,Tuple3<String, Date, Long>,Tuple2<String,Long>> {
        private static final long serialVersionUID = 1L;
        private static MapState<String,Long> state;//存储城市和数量
        @Override
        public void processElement(Tuple3<String, Date, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
            String key = value.f0;//城市的编号
            Long aLong = state.get(key);
            if (aLong == null || aLong < value.f2){//先前没有记录或者数量有了更新,进行外部数据库的写入,状态刷新
                state.put(key,value.f2);
                System.out.println(key + ":" + value + "状态进行了更新");
                out.collect(Tuple2.of(key,value.f2));
            }
        }

        @Override
        public void open(Configuration parameters) throws Exception {//初始化,执行之前获取map状态对象
            super.open(parameters);
            state = getRuntimeContext().getMapState(new MapStateDescriptor<>("redult_map",String.class,Long.class));
        }

        @Override
        public void close() throws Exception {
            state.clear();
            super.close();
        }
    }
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-28 16:33:26  更:2021-07-28 16:33:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/7 11:56:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码