1 flink sink
?2 file sink
package com.study.liucf.unbounded.sink
import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._
/**
* @Author liucf
* @Date 2021/9/13
*/
object FileSink {
def main(args: Array[String]): Unit = {
//创建flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//读取数据
val inputStream: DataStream[String] = env.readTextFile("src\\main\\resources\\sensor.txt")
//转换数据类型 string 类型转换成LiucfSensorReding,求最小值
val ds = inputStream.map(r=>{
val arr = r.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
//输出到控制台
ds.print()
//输出到文件
// ds
// .writeAsCsv("src\\main\\resources\\sensor.csv")
// .setParallelism(1)//默认会分布式并行执行根据多少并行度生成多少文件,这里我让它生成一个文件
ds.addSink(StreamingFileSink.forRowFormat(
new Path("src\\main\\resources\\sensor2.csv"),
new SimpleStringEncoder[LiucfSensorReding]()
).build())
//可见writeAsCSV已经被弃用了
//启动flink执行
env.execute("liucf sink api")
}
}
|