1.从集合中读取数据源
使用?fromCollection() 方法将数据放入Seq序列中作为Flink的数据源
object Collections {
def main(args: Array[String]): Unit = {
//声明 flink 的运行环境
var env = StreamExecutionEnvironment.getExecutionEnvironment
var ds = env.fromCollection(Seq(
(1,"zhangsan","male","20"),
(2,"lisi","female","30")
))
ds.print()
env.execute()
}
}
也可以使用样例类将数据封装成对象序列传入 fromCollection() 方法作为Flink的数据源
case class Worker(id:Int,name:String,gender:String,age:Int)
object Collections {
def main(args: Array[String]): Unit = {
//声明 flink 的运行环境
var env = StreamExecutionEnvironment.getExecutionEnvironment
var ds = env.fromCollection(Seq(
Worker(1,"张三","男",20),
Worker(2,"李四","女",21),
Worker(3,"王五","男",22),
Worker(4,"赵六","女",23)
))
ds.print()
env.execute()
}
}
还可以通过调用 fromElements() 方法将数据直接输入
2.从文件中读取数据
readTextFile()
通过?readTextFile() 方法可以将本地或hdfs上的文件作为数据源读入Flink
3.从 Kafka 中读取数据
在实现 Flink 读取 Kafka 的数据时,需要导入相关依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-9.0</version>
</dependency>
通过 addSource() 方法加载kafka中的数据?
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置kafka的相关配置
val prop = new Properties()
prop.setProperty("bootstrap.servers","192.168.100.155:9092")
prop.setProperty("group.id","user1")
val ds = env.addSource(new FlinkKafkaConsumer011[String]("flink",new SimpleStringSchema(),prop))
ds.print()
env.execute()
}
可通过调节kafka的游标相关参数,读取已存在kafka队列中的数据
4.自定义数据源
以Flink读取MySQL的数据为例,通过 addSource() 方法,将实现 SourceFunction 的对象作为参数传入。代码如下:
object UDFSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.addSource(new MysqlSource())
ds.print()
env.execute()
}
}
?实现 SourceFunction 的代码如下:
class MysqlSource extends SourceFunction[Stu] {
var isFlag = true
val url = "jdbc:mysql://192.168.100.155:3306/exp"
val driver = "com.mysql.jdbc.Driver"
val username = "root"
val password = "okok"
Class.forName(driver)
override def run(sourceContext: SourceFunction.SourceContext[Stu]): Unit = {
//添加mysql数据库的配置信息
var conn = DriverManager.getConnection(url,username,password) //该参数不能作为成员变量,必须写入方法中,否则报错:无法实现序列化
val ps = conn.prepareStatement("select * from s1_student")
while (isFlag) {
val rs = ps.executeQuery()
while (rs.next()) {
var no = rs.getString("sno")
var name = rs.getString("sname")
var birthday = rs.getString("sbirthday")
var sex = rs.getString("ssex")
var sclass = rs.getString("sclass")
sourceContext.collect(new Stu(no,name,birthday,sex,sclass))
}
Thread.sleep(3000)
}
}
override def cancel(): Unit = {
isFlag = false
}
}
Stu类:
class Stu (no:String,name:String,birthday:String,sex:String,sclass:String) extends Serializable {
override def toString = {
"no:" + no + "\tname:" + name + "\tbirthday:" + birthday + "\tsex:" + sex + "\tsclass:" + sclass
}
}
打印效果如下:
?
|