IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink之DataStream的Transformation以及sink -> 正文阅读

[大数据]flink之DataStream的Transformation以及sink

DataStream的Transformation

keyBy

按照指定的key来进行分流,类似于批处理中的
groupBy
。可以按照索引名/字段名来指定分组的字段.

package com.ccj.pxj.heima.stream.tran
import org.apache.flink.streaming.api.scala._
/**
 * 1. 获取流处理运行环境
 * 2. 设置并行度
 * 3. 获取数据源
 * 4. 转换操作
 * 1. 以空白进行分割
 * 2. 给每个单词计数1
 * 3. 根据单词分组
 * 4. 求和
 * 5. 打印到控制台
 * 6. 执行任务
 */
object TransformationKeyBy {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val datas: DataStream[String] = senv.socketTextStream("192.168.25.60", 12346)
    val dataSet: DataStream[(String, Int)] = datas.flatMap(_.split(" ")).map((_, 1)).keyBy(_._1).sum(1)
    dataSet.print()
    senv.execute("pxj")
  }
}
Connect

Connect 用来将两个DataStream组装成一个ConnectedStreams 。它用了两个泛型,即不要求两个dataStream的element是同一类型。这样我们就可以把不同的数据组装成同一个结构
开发步骤
1.创建流式处理环境
2.添加两个自定义数据源
3.使用connect合并两个数据流,创建ConnectedStreams对象
4.遍历ConnectedStreams对象,转换为DataStream
5.打印输出,设置并行度为1
6.执行任务

package com.ccj.pxj.heima.stream.tran
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.functions.source.SourceFunction
/**
 * 1. 创建流式处理环境
 * 2. 添加两个自定义数据源
 * 3. 使用connect合并两个数据流,创建ConnectedStreams对象
 * 4. 遍历ConnectedStreams对象,转换为DataStream
 * 5. 打印输出,设置并行度为1
 * 6. 执行任务
 */
/**
 *  * 创建自定义并行度为1的source
 *  * 实现从1开始产生递增数字
 */
class MyLongSourceScala extends  SourceFunction[Long]{
  var count: Long =1L
  var isRunning=true
  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    while (isRunning){
      ctx.collect(count)
      count+=1
      TimeUnit.SECONDS.sleep(1)
    }
  }
  override def cancel(): Unit = {
    isRunning=false
  }
}
/**
 * 创建自定义并行度为1的source
 *  实现从1开始产生递增字符串
 */
class MyStringSourceScala extends  SourceFunction[String]{
  var count: Long =1L
  var isRunning: Boolean =true
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      ctx.collect("str_"+count)
      count+=1
      TimeUnit.SECONDS.sleep(1)
    }
  }
  override def cancel(): Unit = {
    isRunning=false
  }
}
package com.ccj.pxj.heima.stream.tran
import org.apache.flink.streaming.api.scala._
/**
 *1. 创建流式处理环境
 * 2. 添加两个自定义数据源
 * 3. 使用connect合并两个数据流,创建ConnectedStreams对象
 * 4. 遍历ConnectedStreams对象,转换为DataStream
 * 5. 打印输出,设置并行度为1
 * 6. 执行任务
 */
object StreamingDemoConnectScala {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val longSource: DataStream[Long] = senv.addSource(new MyLongSourceScala)
    val stringSource: DataStream[String] = senv.addSource(new MyStringSourceScala)
    val datas: ConnectedStreams[Long, String] = longSource.connect(stringSource)
    val connectDatas: DataStream[Any] = datas.map(line1 => {
      line1+2
    }, line2 => {
      line2.split("_")(0)
    })
    connectDatas.print()
    senv.execute("pxj")
  }
}

split和select

split 就是将一个DataStream分成多个流,用SplitStream 来表示DataStream → SplitStream
select 就是获取分流后对应的数据,跟split搭配使用,从SplitStream中选择一个或多个流SplitStream → DataStream
示例
加载本地集合(1,2,3,4,5,6), 使用split进行数据分流,分为奇数和偶数. 并打印奇数结果
开发步骤
1.创建流处理环境
2.设置并行度
3.加载本地集合
4.数据分流,分为奇数和偶数
5.获取分流后的数据
6.打印数据
7.执行任务

package com.ccj.pxj.heima.stream.tran
import org.apache.flink.streaming.api.scala._
/**
 * 1. 创建流处理环境
 * 2. 设置并行度
 * 3. 加载本地集合
 * 4. 数据分流,分为奇数和偶数
 * 5. 获取分流后的数据
 * 6. 打印数据
 * 7. 执行任务
 * Split: DataStream->SplitStream
 * Select: SplitStream->DataStream
 */
object SplitAndSelect {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setParallelism(1)
   // val datas: DataStream[Int] = senv.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
   val datas: DataStream[Int] =senv.fromElements(1,2,3,4,5,6,7,8,9,10)
    val data_split: SplitStream[Int] = datas.split(
      (num:Int)=>
        num % 2 match {
        case 0 => List("even")
        case 1 => List("odd")
      }
    )
    val even_data: DataStream[Int] = data_split.select("even")
    val odd_data: DataStream[Int] = data_split.select("odd")
    val datasss = data_split.select("even", "odd")
    even_data.print()
    println("--------")
    odd_data.print()
    println("--------")
    datasss.print()
    senv.execute("pxj")
  }
}

Flink在流处理上常见的sink

package com.ccj.pxj.heima.stream.sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala._
object SinkMySql {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val datas: DataStream[(Int, String, String, String)] = senv.fromCollection(List(
      (10, "dazhuang", "123456", "大壮"),
      (11, "erya", "123456", "二丫"),
      (12, "sanpang", "123456", "三胖")
    )
    )
    val datass: DataStream[User] = datas.map(x => User(x._1, x._2, x._3, x._4))
    datass.addSink(new Mysql_Sink)
    senv.execute("pxj")
  }
}
class Mysql_Sink extends RichSinkFunction[User]{
  private var connection: Connection = null
  private var ps: PreparedStatement = null
  override def open(parameters: Configuration): Unit = {
    //1.加载驱动
    Class.forName("com.mysql.jdbc.Driver")
    //    创建链接
    connection = DriverManager.getConnection("jdbc:mysql:///flink?characterEncoding=utf8", "pxj", "1314520")
    //   获得执行语句
    var sql = "insert into user(id,username,password,name) values(?,?,?,?)"
    ps = connection.prepareStatement(sql)
  }
  override def invoke(value: User): Unit = {
    try {
      //    组装数据,执行插入操作
      ps.setInt(1, value.id)
      ps.setString(2, value.username)
      ps.setString(3, value.password)
      ps.setString(4, value.name)
      ps.executeUpdate()
    } catch {
      case e: Exception => println(e.getMessage)
    }
  }
  override def close(): Unit = {
    // 关闭连接
    if(connection!=null){
      connection.close()
    }
    if(ps!=null){
      ps.close()
    }
  }
}
case  class User(id:Int,username:String,password:String,name:String)

作者:pxj
日期:2021-08-01 0:26:58
你若安好便是晴天

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-02 10:53:03  更:2021-08-02 10:55:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/22 5:39:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码