IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink广播状态清理 -> 正文阅读

[大数据]Flink广播状态清理

在Flink实时计算的实际项目中,广播中的状态,可能并不是需要一直存在,只需要当天存在,之后不再会用到。

这种情况下,如果状态数据一直不清理,量会越来越庞大,占用内存,时间长,甚至会导致内存溢出。所以需要对过期的广播状态进行清理。

但是状态过期清理的机制,目前仅是对keyed state来说的有效,对广播状态不起作用。因此,需要自己手动去处理。

按照flink keyed state过期处理的思想,手动实现对过期的广播状态的清理;主要逻辑如下:
(1)广播状态设置一个状态,专门存储一个时间戳毫秒值,通过这个时间和当前时间对比,判断是否需要进行状态清理;比如当前时间+24小时;一天之后,对状态进行处理;
(2)每个状态在创建的时候,都生成一个时间戳毫秒值,类似于上一步;在清理状态时,判断该状态是否需要清理,需要清理,则从状态中清理掉该状态;

实现的代码逻辑如下:
?

package cn.china.test.main;

import com.alibaba.fastjson.JSON;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import cn.china.test.data.EmployeeInfo;
import cn.china.test.data.RankSalaryInfo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.*;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;

public class Test {

    public static void main(String[] args) throws Exception {

        Config config = ConfigFactory.load(Test.class.getClassLoader());

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 禁用全局任务链,避免多个操作在一个task中执行(flink内部优化,会把可以优化的算子优化成一个算子链,放在一个task中执行,UI页面只显示一个算子窗口)
        env.disableOperatorChaining();

        String brokers = config.getString("consumer.kafka.brokers");
        String employeeSalary = config.getString("kafka.topic.employee.salary");
        String employeeInfo = config.getString("kafka.topic.employee.info");
        String pushTopic = config.getString("kafka.topic.employee.push");

        String groupId = config.getString("kafka.groupId");
        String checkPointPath = config.getString("check.point.path.prefix");

        StateBackend backend = new EmbeddedRocksDBStateBackend(true);
        env.setStateBackend(backend);

        CheckpointConfig conf = env.getCheckpointConfig();
        conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//启用持久化 checkpoint 到外部系统,取消时保留检查点数据
        conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//恰好一次(Exactly Once)或者至少一次(At least Once)
        conf.setCheckpointInterval(30 * 1000);//milliseconds,checkpoint 触发间隔。在此间隔内触发 checkpoint
        conf.setCheckpointTimeout(30 * 60 * 1000);//milliseconds,checkpoint 超时间隔。超时之后,JobManager 取消 checkpoint 并触发新的 checkpoint
        conf.setCheckpointStorage(checkPointPath);//设置检查点保存路径
        conf.setMinPauseBetweenCheckpoints(10 * 1000);// Checkpoint 之间所需的最小暂停时间
        conf.setMaxConcurrentCheckpoints(30);// 可以同时进行的最大 checkpoint 个数

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", brokers);
        props.setProperty("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", 1000);
        props.put("session.timeout.ms", 90000);
        props.put("request.timeout.ms", 120000);
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 100);

        // 获取职级薪资信息 并 广播
        FlinkKafkaConsumer<String> rankSalaryConsumer = new FlinkKafkaConsumer<String>(employeeSalary, new SimpleStringSchema(), props);
        rankSalaryConsumer.setCommitOffsetsOnCheckpoints(true);
        DataStream<String> rankSalaryKafkaData = env.addSource(rankSalaryConsumer).name("RankSalarySource");
        MapStateDescriptor<String, RankSalaryInfo> rankSalaeyBroadcastDesc = new MapStateDescriptor<String, RankSalaryInfo>("RankSalaeyBroadcast", String.class, RankSalaryInfo.class);
        BroadcastStream<String> rankSalaryBroadcast = rankSalaryKafkaData.broadcast(rankSalaeyBroadcastDesc);

        // 获取员工信息
        FlinkKafkaConsumer<String> employeeInfoConsumer = new FlinkKafkaConsumer<String>(employeeInfo, new SimpleStringSchema(), props);
        employeeInfoConsumer.setCommitOffsetsOnCheckpoints(true);
        DataStream<String> employeeInfoKafkaData = env.addSource(employeeInfoConsumer).name("EmployeeInfoSource");

        // 员工信息 关联 职级薪资信息,获取薪资
        BroadcastConnectedStream<String, String> employeeInfoConnectRankSalary = employeeInfoKafkaData.connect(rankSalaryBroadcast);
        DataStream<String> employeeInfoDataStream = employeeInfoConnectRankSalary.process(new BroadcastProcessFunction<String, String, String>() {

            MapStateDescriptor<String, RankSalaryInfo> rankSalaeyBroadcastDesc = new MapStateDescriptor<String, RankSalaryInfo>("RankSalaeyBroadcast", String.class, RankSalaryInfo.class);

            @Override
            public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                try {
                    ReadOnlyBroadcastState<String, RankSalaryInfo> broadcastState = ctx.getBroadcastState(rankSalaeyBroadcastDesc);

                    EmployeeInfo employeeInfo = JSON.parseObject(value, EmployeeInfo.class);
                    String rank = employeeInfo.rank;
                    if (broadcastState.contains(rank)) {
                        RankSalaryInfo rankSalaryInfo = broadcastState.get(rank);
                        String salary = rankSalaryInfo.salary;
                        employeeInfo.setSalary(salary);
                    }

                    String employeeInfoString = JSON.toJSONString(employeeInfo);

                    out.collect(employeeInfoString);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                try {
                    BroadcastState<String, RankSalaryInfo> broadcastState = ctx.getBroadcastState(rankSalaeyBroadcastDesc);

                    // 设置一个状态值,保存计时器时间,计时器时间为当前时间加上24小时的毫秒值,到了计时器时间之后,清理数据;也就是一天清理一次
                    if (!broadcastState.contains("timer_state")) {
                        RankSalaryInfo timerState = new RankSalaryInfo("timer_state", "timer_state", System.currentTimeMillis() + 1000 * 60 * 60 * 24);
                        broadcastState.put("timer_state", timerState);
                    }
                    // 定时器时间大于等于当前时间,说明上次清理是在24小时之前,清理过期状态,并重新设置定时器
                    RankSalaryInfo timerState = broadcastState.get("timer_state");
                    Long timer = timerState.ttl;
                    if (System.currentTimeMillis() >= timer) {
                        // 清理状态
                        Iterator<Map.Entry<String, RankSalaryInfo>> iterator = broadcastState.iterator();
                        // 创建一个list,存放需要清理的状态的key
                        ArrayList<String> waitToDeleteKey = new ArrayList<>();
                        while (iterator.hasNext()) {
                            RankSalaryInfo rankSalaryInfo = iterator.next().getValue();
                            Long stateTtl = rankSalaryInfo.ttl;
                            if (System.currentTimeMillis() >= stateTtl) {
                                waitToDeleteKey.add(rankSalaryInfo.rank);
                            }
                        }
                        // 遍历waitToDeleteKey,删除过期状态
                        for (int i = 0; i < waitToDeleteKey.size(); i++) {
                            broadcastState.remove(waitToDeleteKey.get(i));
                        }
                        // 重新设置计时器状态,到当前时间的24小时之后
                        timerState.setTtl(System.currentTimeMillis() + 1000 * 60 * 60 * 24);
                        broadcastState.put("timer_state", timerState);
                    }

                    // 设置最数据的状态
                    RankSalaryInfo rankSalaryInfo = JSON.parseObject(value, RankSalaryInfo.class);
                    // 状态的ttl设置为 当前时间+24小时 的毫秒值
                    rankSalaryInfo.setTtl(System.currentTimeMillis() + 1000 * 60 * 60 * 24);
                    String rank = rankSalaryInfo.rank;
                    broadcastState.put(rank, rankSalaryInfo);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        employeeInfoDataStream.addSink(new FlinkKafkaProducer<String>(
                brokers,
                pushTopic,
                new SimpleStringSchema())).name("PushInfo");

        env.execute("BroadcastStateTtlTest");
    }
}

涉及到的两个实体类,简单罗列如下:
1)EmployeeInfo

public class EmployeeInfo {
    public String name;
    public String age;
    public String rank;
    public String salary;

    public EmployeeInfo(String name, String age, String rank, String salary) {
        this.name = name;
        this.age = age;
        this.rank = rank;
        this.salary = salary;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }

    public String getRank() {
        return rank;
    }

    public void setRank(String rank) {
        this.rank = rank;
    }

    public String getSalary() {
        return salary;
    }

    public void setSalary(String salary) {
        this.salary = salary;
    }
}

2)RankSalaryInfo

public class RankSalaryInfo {
    public String rank;
    public String salary;
    public Long ttl;

    public RankSalaryInfo() {
    }

    public RankSalaryInfo(String rank, String salary, Long ttl) {
        this.rank = rank;
        this.salary = salary;
        this.ttl = ttl;
    }

    public String getRank() {
        return rank;
    }

    public void setRank(String rank) {
        this.rank = rank;
    }

    public String getSalary() {
        return salary;
    }

    public void setSalary(String salary) {
        this.salary = salary;
    }

    public Long getTtl() {
        return ttl;
    }

    public void setTtl(Long ttl) {
        this.ttl = ttl;
    }
}

上面代码的逻辑如下:
1)从kafka中获取员工等级薪资信息,即RankSalaryInfo,包括rank和salary信息,ttl是在生成状态时创建;将对应的RankSalaryInfo信息写到广播状态中;
2)从kafka中获取员工信息,即EmployeeInfo,包括name、age、rank,通过rank到广播状态中获取salary,没有则默认为null(可以根据自己的情况,设置默认值);
3)将经过上面步骤处理的员工信息转成json串,发送到kafka中;

在上面的逻辑代码中,使用POJO类作为状态,该POJO类需要满足一定的条件;具体参考:
Flink POJO类状态使用注意事项_Johnson8702的博客-CSDN博客
?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-07 22:46:53  更:2022-04-07 22:47:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 13:58:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码