IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink项目4 双流connect&intervalJoin项目 -> 正文阅读

[大数据]Flink项目4 双流connect&intervalJoin项目

1、一个是订单流,一个是对账流

定时器螫不区分key的,是项目视角的

package flinkProject

import java.text.SimpleDateFormat

import flinkSourse.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

case class ReceiptEvent(txid:String,payChannel:String,timestamp:Long)
case class OrderEvent(txid:String,payChannel:String,timestamp:Long)

object TxConnectedMatch {
  def main(args: Array[String]): Unit = {
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    executionEnvironment.setParallelism(1)
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200ms

    val stream1: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111)

    val receiptDataStream: DataStream[ReceiptEvent] = stream1.map(data => {
      val tmpList = data.split(" ")
      val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")
      val ts = simpleDateFormat.parse(tmpList(2)).getTime
      ReceiptEvent(tmpList(0), tmpList(1), ts)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(0)) {
      override def extractTimestamp(t: ReceiptEvent) = t.timestamp
    })


    val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 2222)

    val orderStram: DataStream[OrderEvent] = stream2.map(data => {
      val tmpList = data.split(" ")
      val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")
      val ts = simpleDateFormat.parse(tmpList(2)).getTime
      OrderEvent(tmpList(0), tmpList(1), ts)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(0)) {
      override def extractTimestamp(t: OrderEvent) = t.timestamp
    })

    val result: DataStream[(ReceiptEvent, OrderEvent)] = receiptDataStream.connect(orderStram)
      .keyBy((receipt => receipt.txid), (order => order.txid))
      .process(new ConnectedCoProcessFunction())

    result.print("result")
    result.getSideOutput(new OutputTag[OrderEvent]("order_output_tag")).print("order_output_tag")
    result.getSideOutput(new OutputTag[ReceiptEvent]("receipt_output_tag")).print("receipt_output_tag  ")


    executionEnvironment.execute("connected Stream")
  }

}

class ConnectedCoProcessFunction extends CoProcessFunction[ReceiptEvent,OrderEvent,(ReceiptEvent,OrderEvent)] {
  var receiptValueState:ValueState[ReceiptEvent]=_
  var orderValueState:ValueState[OrderEvent]=_

  override def open(parameters: Configuration): Unit = {
    receiptValueState=getRuntimeContext.getState[ReceiptEvent](new ValueStateDescriptor[ReceiptEvent]("receipt",classOf[ReceiptEvent]))
    orderValueState=getRuntimeContext.getState[OrderEvent](new ValueStateDescriptor[OrderEvent]("order",classOf[OrderEvent]))
  }

  override def processElement1(in1: ReceiptEvent, context: CoProcessFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#Context, collector: Collector[(ReceiptEvent, OrderEvent)]): Unit = {
    var order=orderValueState.value()
    //订单先来
    if(order!=null){
      collector.collect((in1,order))
      orderValueState.clear()
    }else{
      receiptValueState.update(in1)
      context.timerService().registerEventTimeTimer(in1.timestamp+3000l)
    }
  }

  override def processElement2(in2: OrderEvent, context: CoProcessFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#Context, collector: Collector[(ReceiptEvent, OrderEvent)]): Unit = {
    var receipt=receiptValueState.value()
    //receipt先来
    if(receipt!=null){
      collector.collect(receipt,in2)
      receiptValueState.clear()
    }else{
      orderValueState.update(in2)
      context.timerService().registerEventTimeTimer(in2.timestamp+3000l)
    }
  }

  override def onTimer(timestamp: Long, ctx: CoProcessFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#OnTimerContext, out: Collector[(ReceiptEvent, OrderEvent)]): Unit = {
    if(receiptValueState.value()!=null){
      ctx.output(new OutputTag[ReceiptEvent]("receipt_output_tag"),receiptValueState.value())
    }
    if(orderValueState.value()!=null){
      ctx.output(new OutputTag[OrderEvent]("order_output_tag"),orderValueState.value() )
    }
    receiptValueState.clear()
    orderValueState.clear()
  }
}

2、输入数据

正常的只要两个流有匹配的txId就会输出,会等3s的时间,3s以后来的就匹配不上了

定时器的timestamp是不区分key的,是项目整体视角的,但是定时器是按照每个key区分的,清空的状态也是每个key的状态

只有一个流里面有的时候,定时器延迟3s,每个流根据自己的watermark,如下

流1输入:4 404 17/05/2015:10:26:45? ?不会有输出

流1输入:5 404 17/05/2015:10:26:47? ?不会有输出,

流1输入:7 404 17/05/2015:10:26:49? ?

输出:receipt_output_tag ?> ReceiptEvent(4,404,1421461605000)

流1输入:9 404 17/05/2015:10:26:59? ?watermark是10:26:59

输出:

receipt_output_tag ?> ReceiptEvent(5,404,1421461607000)
receipt_output_tag ?> ReceiptEvent(7,404,1421461609000)

流2输入:6 505 17/05/2015:10:26:55? ?不会有输出

流2输入:8 505 17/05/2015:10:26:56? ?不会有输出

流2输入:1 505 17/05/2015:10:27:01

输出:

order_output_tag> OrderEvent(6,505,1421461615000)
order_output_tag> OrderEvent(8,505,1421461616000)

3、intervalJoin:结果一样

package flinkProject

import java.text.SimpleDateFormat

import flinkSourse.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.{CoProcessFunction, ProcessJoinFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

case class ReceiptEvent(txid:String,payChannel:String,timestamp:Long)
case class OrderEvent(txid:String,payChannel:String,timestamp:Long)

object TxConnectedMatch {
  def main(args: Array[String]): Unit = {
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    executionEnvironment.setParallelism(1)
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200ms

    val stream1: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111)

    val receiptDataStream: DataStream[ReceiptEvent] = stream1.map(data => {
      val tmpList = data.split(" ")
      val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")
      val ts = simpleDateFormat.parse(tmpList(2)).getTime
      ReceiptEvent(tmpList(0), tmpList(1), ts)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(0)) {
      override def extractTimestamp(t: ReceiptEvent) = t.timestamp
    })


    val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 2222)

    val orderStram: DataStream[OrderEvent] = stream2.map(data => {
      val tmpList = data.split(" ")
      val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")
      val ts = simpleDateFormat.parse(tmpList(2)).getTime
      OrderEvent(tmpList(0), tmpList(1), ts)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(0)) {
      override def extractTimestamp(t: OrderEvent) = t.timestamp
    })


    //方式2 intervalJoin
    val keyedReceiptDataStream: KeyedStream[ReceiptEvent, String] = receiptDataStream.keyBy(_.txid)
    val keyedOrderStram: KeyedStream[OrderEvent, String] = orderStram.keyBy(_.txid)
    keyedReceiptDataStream
      .intervalJoin(keyedOrderStram)
        .between(Time.seconds(3),Time.seconds(3))
        .process(new TxIntervalProcessJoinFunctin())

    executionEnvironment.execute("connected Stream")
  }

}


class TxIntervalProcessJoinFunctin() extends ProcessJoinFunction[ReceiptEvent,OrderEvent,(ReceiptEvent,OrderEvent)] {
  override def processElement(in1: ReceiptEvent, in2: OrderEvent, context: ProcessJoinFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#Context, collector: Collector[(ReceiptEvent, OrderEvent)]): Unit = {
    collector.collect((in1,in2))
  }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-25 12:36:07  更:2021-10-25 12:36:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 5:07:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码