DataStream的Transformation
keyBy
按照指定的key来进行分流,类似于批处理中的 groupBy 。可以按照索引名/字段名来指定分组的字段.
package com.ccj.pxj.heima.stream.tran
import org.apache.flink.streaming.api.scala._
object TransformationKeyBy {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val datas: DataStream[String] = senv.socketTextStream("192.168.25.60", 12346)
val dataSet: DataStream[(String, Int)] = datas.flatMap(_.split(" ")).map((_, 1)).keyBy(_._1).sum(1)
dataSet.print()
senv.execute("pxj")
}
}
Connect
Connect 用来将两个DataStream组装成一个ConnectedStreams 。它用了两个泛型,即不要求两个dataStream的element是同一类型。这样我们就可以把不同的数据组装成同一个结构 开发步骤 1.创建流式处理环境 2.添加两个自定义数据源 3.使用connect合并两个数据流,创建ConnectedStreams对象 4.遍历ConnectedStreams对象,转换为DataStream 5.打印输出,设置并行度为1 6.执行任务
package com.ccj.pxj.heima.stream.tran
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.functions.source.SourceFunction
class MyLongSourceScala extends SourceFunction[Long]{
var count: Long =1L
var isRunning=true
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning){
ctx.collect(count)
count+=1
TimeUnit.SECONDS.sleep(1)
}
}
override def cancel(): Unit = {
isRunning=false
}
}
class MyStringSourceScala extends SourceFunction[String]{
var count: Long =1L
var isRunning: Boolean =true
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while (isRunning){
ctx.collect("str_"+count)
count+=1
TimeUnit.SECONDS.sleep(1)
}
}
override def cancel(): Unit = {
isRunning=false
}
}
package com.ccj.pxj.heima.stream.tran
import org.apache.flink.streaming.api.scala._
object StreamingDemoConnectScala {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val longSource: DataStream[Long] = senv.addSource(new MyLongSourceScala)
val stringSource: DataStream[String] = senv.addSource(new MyStringSourceScala)
val datas: ConnectedStreams[Long, String] = longSource.connect(stringSource)
val connectDatas: DataStream[Any] = datas.map(line1 => {
line1+2
}, line2 => {
line2.split("_")(0)
})
connectDatas.print()
senv.execute("pxj")
}
}
split和select
split 就是将一个DataStream分成多个流,用SplitStream 来表示DataStream → SplitStream select 就是获取分流后对应的数据,跟split搭配使用,从SplitStream中选择一个或多个流SplitStream → DataStream 示例 加载本地集合(1,2,3,4,5,6), 使用split进行数据分流,分为奇数和偶数. 并打印奇数结果 开发步骤 1.创建流处理环境 2.设置并行度 3.加载本地集合 4.数据分流,分为奇数和偶数 5.获取分流后的数据 6.打印数据 7.执行任务
package com.ccj.pxj.heima.stream.tran
import org.apache.flink.streaming.api.scala._
object SplitAndSelect {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
val datas: DataStream[Int] =senv.fromElements(1,2,3,4,5,6,7,8,9,10)
val data_split: SplitStream[Int] = datas.split(
(num:Int)=>
num % 2 match {
case 0 => List("even")
case 1 => List("odd")
}
)
val even_data: DataStream[Int] = data_split.select("even")
val odd_data: DataStream[Int] = data_split.select("odd")
val datasss = data_split.select("even", "odd")
even_data.print()
println("--------")
odd_data.print()
println("--------")
datasss.print()
senv.execute("pxj")
}
}
Flink在流处理上常见的sink
package com.ccj.pxj.heima.stream.sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala._
object SinkMySql {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val datas: DataStream[(Int, String, String, String)] = senv.fromCollection(List(
(10, "dazhuang", "123456", "大壮"),
(11, "erya", "123456", "二丫"),
(12, "sanpang", "123456", "三胖")
)
)
val datass: DataStream[User] = datas.map(x => User(x._1, x._2, x._3, x._4))
datass.addSink(new Mysql_Sink)
senv.execute("pxj")
}
}
class Mysql_Sink extends RichSinkFunction[User]{
private var connection: Connection = null
private var ps: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
connection = DriverManager.getConnection("jdbc:mysql:///flink?characterEncoding=utf8", "pxj", "1314520")
var sql = "insert into user(id,username,password,name) values(?,?,?,?)"
ps = connection.prepareStatement(sql)
}
override def invoke(value: User): Unit = {
try {
ps.setInt(1, value.id)
ps.setString(2, value.username)
ps.setString(3, value.password)
ps.setString(4, value.name)
ps.executeUpdate()
} catch {
case e: Exception => println(e.getMessage)
}
}
override def close(): Unit = {
if(connection!=null){
connection.close()
}
if(ps!=null){
ps.close()
}
}
}
case class User(id:Int,username:String,password:String,name:String)
作者:pxj 日期:2021-08-01 0:26:58 你若安好便是晴天
|