IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink_状态编程_1 -> 正文阅读

[大数据]Flink_状态编程_1

1.Flink 中的状态

1.1 状态管理

  1. 状态的访问权限。我们知道 Flink 上的聚合和窗口操作,一般都是基于 KeyedStream的,数据会按照 key 的哈希值进行分区,聚合处理的结果也应该是只对当前 key 有效。然而同一个分区(也就是 slot)上执行的任务实例,可能会包含多个 key 的数据,它
    们同时访问和更改本地变量,就会导致计算结果错误。所以这时状态并不是单纯的本地变量。

  2. 容错性,也就是故障后的恢复。状态只保存在内存中显然是不够稳定的,我们需要将它持久化保存,做一个备份;在发生故障后可以从这个备份中恢复状态。

  3. 我们还应该考虑到分布式应用的横向扩展性。比如处理的数据量增大时,我们应该相应地对计算资源扩容,调大并行度。这时就涉及到了状态的重组调整。

1.2 状态的分类

托管状态(Managed State)和原始状态(Raw State)

  • 托管状态是由 Flink 的运行时(Runtime)来托管的
  • 托管状态是由 Flink 的运行时(Runtime)来托管的

托管状态细分: 算子状态(Operator State)和按键分区状态(Keyed State)

  • 算子状态: 列表状态, 联合列表状态, 广播状态
  • 按键分区状态: 值状态, 列表状态, 映射状态, 聚合状态, reducingState 归约状态

算子状态:
在这里插入图片描述
按键分区状态:
在这里插入图片描述
算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现 CheckpointedFunction 接口。

按键分区状态应用非常广泛。之前讲到的聚合算子必须在 keyBy 之后才能使用,就是因为聚合的结果是以 Keyed State 的形式保存的。另外,也可以通过富函数类(Rich Function)来自定义 Keyed State,所以只要提供了富函数类接口的算子,也都可以使用 Keyed State。

2.按键分区状态 (keyed state)

2.1 值类型

public interface ValueState<T> extends State {
T value() throws IOException;
void update(T value) throws IOException;
}

[需求]

我们这里会使用用户 id 来进行分流,然后分别统计每个用户的 pv 数据,由于我们并不想每次 pv 加一,就将统计结果发送到下游去,所以这里我们注册了一个定时器,用来隔一段时间发送 pv 的统计结果,这样对下游算子的压力不至于太大。具体实现方式是定义一个用来保
存定时器时间戳的值状态变量。当定时器触发并向下游发送数据以后,便清空储存定时器时间戳的状态变量,这样当新的数据到来时,发现并没有定时器存在,就可以注册新的定时器了,注册完定时器之后将定时器的时间戳继续保存在状态变量中。

public class PeriodicPvExample {
    public static void main(String[] args) throws Exception {
        // ValueState
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);


        SingleOutputStreamOperator<Event> stream = environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        })
                );

        stream.print("input");
        stream.keyBy(data -> data.user)
                .process(new MyProcessKeyedFunction())
                .print();
        environment.execute();
    }
    public static class MyProcessKeyedFunction extends KeyedProcessFunction<String, Event, String>{
        ValueState<Long> countState;
        ValueState<Long> timerTsState;

        @Override
        public void open(Configuration parameters) throws Exception {
            countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
            timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Long.class));

        }

        @Override
        public void processElement(Event event, Context context, Collector<String> collector) throws Exception {
            Long value = countState.value();
            countState.update(value == null? 1: value + 1);

            if (timerTsState.value() == null){
                context.timerService().registerEventTimeTimer(event.timestamp + 10 * 1000L);
                timerTsState.update(event.timestamp + 10 * 1000);
            }

        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            out.collect(ctx.getCurrentKey() + " pv" + countState.value());
            timerTsState.clear();

            ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
            timerTsState.update(timestamp + 10 * 1000);
        }
    }
}

在这里插入图片描述

2.2 列表状态 List State

public interface ListState<T> extends MergingState<T, Iterable<T>> {
    void update(List<T> var1) throws Exception;

    void addAll(List<T> var1) throws Exception;
}

[需求]

SELECT * FROM A INNER JOIN B WHERE A.id = B.id;

public class TwoStreamJoinExample {
    public static void main(String[] args) throws Exception {
        // ListState: select * from A inner join B where A.id = B.id;
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = environment.fromElements(
                Tuple3.of("1", "1", 1000L),
                Tuple3.of("2", "2", 2000L),
                Tuple3.of("3", "3", 3000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
                        return stringStringLongTuple3.f2;
                    }

                })
        );

        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = environment.fromElements(
                Tuple3.of("1", "11", 1000L),
                Tuple3.of("2", "22", 1000L),
                Tuple3.of("3", "33", 1000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
                        return stringStringLongTuple3.f2;
                    }

                })
        );

        stream1.keyBy(data -> data.f0)
                .connect(stream2.keyBy(data -> data.f0))
                .process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
                    ListState<Tuple2<String, Long>> stream1List;
                    ListState<Tuple2<String, Long>> stream2List;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        stream1List = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("stream1-list", Types.TUPLE(Types.STRING, Types.LONG)));
                        stream2List = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("stream2-list", Types.TUPLE(Types.STRING, Types.LONG)));
                    }

                    @Override
                    public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> collector) throws Exception {
                        for (Tuple2<String, Long> right : stream1List.get()) {
                            collector.collect(left.f0 + " " + left.f2 + "=>" + right);

                        }
                        stream1List.add(Tuple2.of(left.f0, left.f2));
                    }

                    @Override
                    public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception {
                        for (Tuple2<String, Long> left : stream1List.get()) {
                            collector.collect(right.f0 + " " + right.f2 + "=>" + left);

                        }
                        stream1List.add(Tuple2.of(right.f0, right.f2));
                    }
                })
                .print();
        
        environment.execute();
    }
}

在这里插入图片描述

2.3 映射状态 Map State

public interface MapState<UK, UV> extends State {
    UV get(UK var1) throws Exception;

    void put(UK var1, UV var2) throws Exception;

    void putAll(Map<UK, UV> var1) throws Exception;

    void remove(UK var1) throws Exception;

    boolean contains(UK var1) throws Exception;

    Iterable<Entry<UK, UV>> entries() throws Exception;

    Iterable<UK> keys() throws Exception;

    Iterable<UV> values() throws Exception;

    Iterator<Entry<UK, UV>> iterator() throws Exception;

    boolean isEmpty() throws Exception;
}

[需求]

映射状态的用法和 Java 中的 HashMap 很相似。在这里我们可以通过 MapState 的使用来探索一下窗口的底层实现,也就是我们要用映射状态来完整模拟窗口的功能。这里我们模拟一个滚动窗口。我们要计算的是每一个 url 在每一个窗口中的 pv 数据。我们之前使用增量聚合和全窗口聚合结合的方式实现过这个需求。这里我们用 MapState 再来实现一下。

public class FakeWindowExample {
    // MapState 模拟窗口
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);


        SingleOutputStreamOperator<Event> stream = environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        })
                );

        stream.print("input");

        stream.keyBy(data -> data.url)
                .process(new FakeWindowResult(10000L))
                .print();
        environment.execute();
    }
    public static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{
        private Long windowSize;

        public FakeWindowResult(Long windowSize) {
            this.windowSize = windowSize;
        }

        // 保存每一个窗口统计的count
        MapState<Long, Long> mapState;

        @Override
        public void open(Configuration parameters) throws Exception {
            mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("map-count", Long.class, Long.class));
        }

        @Override
        public void processElement(Event event, Context context, Collector<String> collector) throws Exception {
            // 根据时间戳, 判断属于哪个窗口
            Long windowStart = event.timestamp / windowSize * windowSize;
            Long windowEnd = windowStart + windowSize;

            context.timerService().registerEventTimeTimer(windowEnd - 1);
            if (mapState.contains(windowStart)){
                mapState.put(windowStart, mapState.get(windowStart) + 1);
            }else {
                mapState.put(windowStart, 1L);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            Long windowEnd = timestamp + 1;
            Long windowStart = windowEnd - windowSize;
            Long count = mapState.get(windowStart);
            out.collect("窗口" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd) + " url: " + ctx.getCurrentKey() + " count: " + count);

            mapState.remove(windowStart);
        }
    }
}

在这里插入图片描述

2.4 聚合状态 Aggregating State

public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {
}

public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> {
}

public interface AppendingState<IN, OUT> extends State {
    OUT get() throws Exception;

    void add(IN var1) throws Exception;
}

public interface State {
    void clear();
}

[需求]

我们举一个简单的例子,对用户点击事件流每 5 个数据统计一次平均时间戳。这是一个类似计数窗口(CountWindow)求平均值的计算,这里我们可以使用一个有聚合状态的RichFlatMapFunction 来实现。

public class AverageTimestampExample {
    // 实现自定义平均时间戳的统计
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);


        SingleOutputStreamOperator<Event> stream = environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        })
                );

        stream.print("input");
        stream.keyBy(data -> data.user)
                .flatMap(new AvgTsResult(5L))
                .print();
        environment.execute();
    }

    public static class AvgTsResult extends RichFlatMapFunction<Event, String>{
        private Long count;

        public AvgTsResult(Long count) {
            this.count = count;
        }

        // 保存平均时间戳
        AggregatingState<Event, Long> aggregatingState;
        // 保存用户访问次数
        ValueState<Long> valueState;

        @Override
        public void open(Configuration parameters) throws Exception {
            aggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>("agg",
                    new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {
                        @Override
                        public Tuple2<Long, Long> createAccumulator() {
                            return Tuple2.of(0L, 0L);
                        }

                        @Override
                        public Tuple2<Long, Long> add(Event event, Tuple2<Long, Long> longLongTuple2) {
                            return Tuple2.of(event.timestamp + longLongTuple2.f0, longLongTuple2.f1 + 1);
                        }

                        @Override
                        public Long getResult(Tuple2<Long, Long> longLongTuple2) {
                            return longLongTuple2.f0/longLongTuple2.f1;
                        }

                        @Override
                        public Tuple2<Long, Long> merge(Tuple2<Long, Long> longLongTuple2, Tuple2<Long, Long> acc1) {
                            return null;
                        }
                    }
                    , Types.TUPLE(Types.LONG, Types.LONG)));
            valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
        }


        @Override
        public void flatMap(Event event, Collector<String> collector) throws Exception {
            Long value = valueState.value();
            if (value == null){
                value = 1L;
            }else {
                value ++;
            }
            valueState.update(value);
            aggregatingState.add(event);

            if (value.equals(count)){
                collector.collect(event.user + "过去" + count + "平均时间戳为: " + aggregatingState.get());
                valueState.clear();
                aggregatingState.clear();
            }
        }
    }
}

在这里插入图片描述

3.状态生存时间 Ttl

失效时间 = 当前时间 + TTL

        ValueState<Event> valueState = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Event> stateDescriptor = new ValueStateDescriptor<>("value-state", Event.class);
            valueState = getRuntimeContext().getState(stateDescriptor);
            // 配置ttl
            StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(1))
                    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                    // 状态的可见性
                    .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                    .build();
            stateDescriptor.enableTimeToLive(stateTtlConfig);
        }
  • .newBuilder():
    状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
  • setUpdateType()
    设置更新类型。更新类型指定了什么时候更新状态失效时间
  • .setStateVisibility()
    设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。

4.状态一致性说明

4.1 一致性级别说明

在流处理中,一致性可以分为 3 个级别:

  • at-most-once: 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。同样的还有 udp。
  • at-least-once: 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
  • exactly-once: 这指的是系统保证在发生故障后得到的计数结果与正确值一致。

曾经,用户不得不在保证exactly-once 与获得低延迟和效率之间权衡利弊。Flink 避免了这种权衡。
Flink 的一个重大价值在于:它既保证了 exactly-once,也具有低延迟和高吞吐的处理能力。

4.2 端到端状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。

  • 内部保证 —— 依赖 checkpoint
  • source 端 —— 需要外部源可重设数据的读取位置
  • sink 端 —— 需要保证从故障恢复时,数据不会重复写入外部系统而对于 sink 端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。
  • 幂等写入
    所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
  • 事务写入
    需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint真正完成的时候,才把所有对应的结果写入 sink 系统中。

对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。DataStream API 提供了 GenericWriteAheadSink 模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 00:08:39  更:2022-04-01 00:10:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 14:47:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码