1 定义自己想要的流返回值数据类型
package com.study.liucf.unbounded.source
/**
* @Author liucf
* @Date 2021/9/8
*/
case class LiucfSensorReding(id:String,timestamp:Long,temperature:Double)
2 定义自己的读取数据源的类
package com.study.liucf.unbounded.source
import java.util.Random
import org.apache.flink.streaming.api.functions.source.SourceFunction
/**
* @Author liucf
* @Date 2021/9/8
*
* 我读取数据源类
*/
class LiucfSourceFunction() extends SourceFunction[LiucfSensorReding]{
var running = true
//模拟读取数据
override def run(ctx: SourceFunction.SourceContext[LiucfSensorReding]): Unit = {
//定义随机变量
val random = new Random()
//定义一组10个传感器初始温度
var currentTemp = 1.to(10).map(r=>("sensor_"+r,random.nextDouble()*100))
while(running){
currentTemp = currentTemp.map(r=>(r._1,r._2+random.nextGaussian()))
val currentTimestamp = System.currentTimeMillis()
//逐条发出去
currentTemp.foreach(r=>ctx.collect(LiucfSensorReding(r._1,currentTimestamp,r._2)))
}
Thread.sleep(1000)
}
//如果调用cancel方法,则running标准位会被设置成false,停止读取数据
override def cancel(): Unit = {
running = false
}
}
3 flink使用自定义的数据源处理数据
package com.study.liucf.unbounded.source
import org.apache.flink.streaming.api.scala._
/**
* @Author liucf
* @Date 2021/9/8
*
* 自定义数据源读取数据
*/
object MySource {
def main(args: Array[String]): Unit = {
//创建flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//添加自定义数据源
val ds: DataStream[LiucfSensorReding] = env.addSource(new LiucfSourceFunction())
//输出到标准控制台
ds.print()
//启动flink运行
env.execute("liucf defined source api")
}
}
输出结果
?
|