IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【Flink入门(7)】Flink的ProcessFunction API(底层API) -> 正文阅读

[大数据]【Flink入门(7)】Flink的ProcessFunction API(底层API)

【时间】2022.06.14 周二

【题目】【Flink入门(7)】Flink的ProcessFunction API(底层API)

本专栏是尚硅谷Flink课程的笔记与思维导图。

目录

引言

0.概述

1.KeyedProcessFunction

示例代码

2 TimerService和定时器(Timers)

示例代码

3 侧输出流(SideOutput)

4 CoProcessFunction

总思维导图

引言

本节主要介绍flink中的ProcessFunction API(底层API),主要是KeyedProcessFunction的基本使用和示例代码。

0.概述

1.KeyedProcessFunction

?

示例代码

设置一个定时器:在获取数据后第5s给出提示信息。

关键是实现processElement()和onTimer()方法。

package processfunction;

import apitest.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class ProcessTest1_KeyedProcessFunction {
  public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // socket文本流
    DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

    // 转换成SensorReading类型
    DataStream<SensorReading> dataStream = inputStream.map(line -> {
      String[] fields = line.split(",");
      return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    });

    // 测试KeyedProcessFunction,先分组然后自定义处理
    dataStream.keyBy("id")
      .process( new MyProcess() )
      .print();

    env.execute();
  }

  // 实现自定义的处理函数
  public static class MyProcess extends KeyedProcessFunction<Tuple, SensorReading, Integer> {
    ValueState<Long> tsTimerState;

    @Override
    public void open(Configuration parameters) throws Exception {
      //tsTimerState =  getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-timer", Long.class));
    }

    @Override
    public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
      out.collect(value.getId().length());

      // context
      // Timestamp of the element currently being processed or timestamp of a firing timer.
      //ctx.timestamp();
      // Get key of the element being processed.
      //ctx.getCurrentKey();
      //            ctx.output();
      //ctx.timerService().currentProcessingTime();
      //ctx.timerService().currentWatermark();
      // 在处理时间的5秒延迟后触发
      ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
      //tsTimerState.update(ctx.timerService().currentProcessingTime() + 5000L);
      // ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
      // 删除指定时间触发的定时器
      //ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
      System.out.println(timestamp + " 定时器触发");
      //ctx.getCurrentKey();
      //ctx.output();
      //ctx.timeDomain();
    }

    @Override
    public void close() throws Exception {
      //tsTimerState.clear();
    }
  }
}

启动本地socket

nc -lk 7777

输入

sensor_1,1547718207,36.3

输出

8
1612283803911 定时器触发

2 TimerService和定时器(Timers)

?

示例代码

需求:监控温度传感器的温度值,如果温度值在10 秒钟之内(processing time)连续上升,则报警。

思路:在第一个温度上升的节点设置一个10s的定时器,后面如果温度下降就删去定时器(不触发)。

package processfunction;

import apitest.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;


public class ProcessTest2_ApplicationCase {

  public static void main(String[] args) throws Exception {
    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并行度为1
    env.setParallelism(1);
    // 从socket中获取数据
    DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
    // 转换数据为SensorReading类型
    DataStream<SensorReading> sensorReadingStream = inputStream.map(line -> {
      String[] fields = line.split(",");
      return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    });
    // 如果存在连续10s内温度持续上升的情况,则报警
    sensorReadingStream.keyBy(SensorReading::getId)
      .process(new TempConsIncreWarning(Time.seconds(10).toMilliseconds()))
      .print();
    env.execute();
  }

  // 如果存在连续10s内温度持续上升的情况,则报警
  public static class TempConsIncreWarning extends KeyedProcessFunction<String, SensorReading, String> {

    public TempConsIncreWarning(Long interval) {
      this.interval = interval;
    }

    // 报警的时间间隔(如果在interval时间内温度持续上升,则报警)
    private Long interval;

    // 上一个温度值
    private ValueState<Double> lastTemperature;
    // 最近一次定时器的触发时间(报警时间)
    private ValueState<Long> recentTimerTimeStamp;

    @Override
    public void open(Configuration parameters) throws Exception {
      lastTemperature = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemperature", Double.class));
      recentTimerTimeStamp = getRuntimeContext().getState(new ValueStateDescriptor<Long>("recentTimerTimeStamp", Long.class));
    }

    @Override
    public void close() throws Exception {
      lastTemperature.clear();
      recentTimerTimeStamp.clear();
    }

    @Override
    public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
      // 当前温度值
      double curTemp = value.getTemperature();
      // 上一次温度(没有则设置为当前温度)
      double lastTemp = lastTemperature.value() != null ? lastTemperature.value() : curTemp;
      // 计时器状态值(时间戳),后面通过这个state判断是否已经设置了定时器
      Long timerTimestamp = recentTimerTimeStamp.value();

      // 如果 当前温度 > 上次温度 并且 没有设置报警计时器,则设置
      if (curTemp > lastTemp && null == timerTimestamp) {
        long warningTimestamp = ctx.timerService().currentProcessingTime() + interval;
        ctx.timerService().registerProcessingTimeTimer(warningTimestamp);
        recentTimerTimeStamp.update(warningTimestamp);
      }
      // 如果 当前温度 < 上次温度,且 设置了报警计时器,则清空计时器
      else if (curTemp <= lastTemp && timerTimestamp != null) {
        ctx.timerService().deleteProcessingTimeTimer(timerTimestamp);
        recentTimerTimeStamp.clear();//timerTimestamp为null
      }
      // 更新保存的温度值
      lastTemperature.update(curTemp);
    }

    // 定时器任务
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
      // 触发报警,并且清除 定时器状态值
      out.collect("传感器" + ctx.getCurrentKey() + "温度值连续" + interval + "ms上升");
      recentTimerTimeStamp.clear();
    }
  }
}
  • 启动本地socket,之后输入数据

    nc -lk 7777
  • 输入

    sensor_1,1547718199,35.8
    sensor_1,1547718199,34.1
    sensor_1,1547718199,34.2
    sensor_1,1547718199,35.1
    sensor_6,1547718201,15.4
    sensor_7,1547718202,6.7
    sensor_10,1547718205,38.1
    sensor_10,1547718205,39 ?
    sensor_6,1547718201,18 ?
    sensor_7,1547718202,9.1
  • 输出
传感器sensor_1温度值连续10000ms上升
传感器sensor_10温度值连续10000ms上升
传感器sensor_6温度值连续10000ms上升
传感器sensor_7温度值连续10000ms上升

3 侧输出流(SideOutput)

?

需求:场景:温度>=30放入高温流输出,反之放入低温流输出

package processfunction;

import apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


public class ProcessTest3_SideOuptCase {
  public static void main(String[] args) throws Exception {
    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并行度 = 1
    env.setParallelism(1);
    // 从本地socket读取数据
    DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
    // 转换成SensorReading类型
    DataStream<SensorReading> dataStream = inputStream.map(line -> {
      String[] fields = line.split(",");
      return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    });

    // 定义一个OutputTag,用来表示侧输出流低温流
    // An OutputTag must always be an anonymous inner class
    // so that Flink can derive a TypeInformation for the generic type parameter.
    OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp"){};

    // 测试ProcessFunction,自定义侧输出流实现分流操作
    SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
      @Override
      public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
        // 判断温度,大于30度,高温流输出到主流;小于低温流输出到侧输出流
        if (value.getTemperature() > 30) {
          out.collect(value);
        } else {
          ctx.output(lowTempTag, value);
        }
      }
    });

    highTempStream.print("high-temp");
    highTempStream.getSideOutput(lowTempTag).print("low-temp");

    env.execute();
  }
}
  • 本地启动socket,输入

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
  • 输出

high-temp> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
low-temp> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
low-temp> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
high-temp> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}

4 CoProcessFunction

?

使用方法:stream1.connect(stream2).process(getCoProcessFunctionInstance())

?

具体见:Flink处理函数实战之五:CoProcessFunction(双流处理)_程序员欣宸的博客-CSDN博客

总思维导图

?

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-18 23:28:02  更:2022-06-18 23:30:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/11 17:11:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码