Flink Source不同的来源
1、Source几个不同的来源
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream(“master”,6666)
env.readTextFile(path,"UTF_8")
env.fromSequeence(1,100)
env.fromCollection(List(1,2,3,4))
env.fromElements(1,2,3,"d")
2、kafka作为数据源,flink读取
kakfa producer生产者产生数据,flink读取
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092",)
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gr01")
val inputStream = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props))
3、myslq数据库作为数据源,且自定义Source
object MyJDBCSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(new MyJDBCSourceFunc)
.print()
env.execute()
}
}
class MyJDBCSourceFunc extends RichParallelSourceFunction[Worker] {
var conn:Connection = _
var statement:PreparedDtatement = _
var flag:Boolean = true
override def open(oarameters:Configuartion):Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhos:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","147258")
statemant = conn.prepareStatement("select * from stu")
}
override def run(ctx:SourceFunction.SourceContext[Worker]):Unit={
while(flag){
Thread.sleep(5000)
val resultSet = statement.executeQuery()
while(restultSet.next()){
val id = resultSet.getInt(1)
val name = resultSet.getString(2)
ctx.collect(Worker(id,name))
}
}
}
override def close():Unit = {
if(statemant!=null) statemant.close()
if(conn!=null) conn.close()
}
}
|