IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink获取数据源的四种方式 (scala) -> 正文阅读

[大数据]Flink获取数据源的四种方式 (scala)

1.从集合中读取数据源

使用?fromCollection() 方法将数据放入Seq序列中作为Flink的数据源

object Collections {
  def main(args: Array[String]): Unit = {
    //声明 flink 的运行环境
    var env = StreamExecutionEnvironment.getExecutionEnvironment

    var ds = env.fromCollection(Seq(
      (1,"zhangsan","male","20"),
      (2,"lisi","female","30")
    ))
    ds.print()
    env.execute()
  }
}

也可以使用样例类将数据封装成对象序列传入 fromCollection() 方法作为Flink的数据源

case class Worker(id:Int,name:String,gender:String,age:Int)
object Collections {
  def main(args: Array[String]): Unit = {
    //声明 flink 的运行环境
    var env = StreamExecutionEnvironment.getExecutionEnvironment
    var ds = env.fromCollection(Seq(
      Worker(1,"张三","男",20),
      Worker(2,"李四","女",21),
      Worker(3,"王五","男",22),
      Worker(4,"赵六","女",23)
    ))
    ds.print()
    env.execute()
  }
}

还可以通过调用 fromElements() 方法将数据直接输入

2.从文件中读取数据

readTextFile()

通过?readTextFile() 方法可以将本地或hdfs上的文件作为数据源读入Flink

3.从 Kafka 中读取数据

在实现 Flink 读取 Kafka 的数据时,需要导入相关依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
    <version>2.7.5-9.0</version>
</dependency>

通过 addSource() 方法加载kafka中的数据?

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置kafka的相关配置
    val prop = new Properties()
    prop.setProperty("bootstrap.servers","192.168.100.155:9092")
    prop.setProperty("group.id","user1")

    val ds = env.addSource(new FlinkKafkaConsumer011[String]("flink",new SimpleStringSchema(),prop))
    ds.print()
    env.execute()
  }

可通过调节kafka的游标相关参数,读取已存在kafka队列中的数据

4.自定义数据源

以Flink读取MySQL的数据为例,通过 addSource() 方法,将实现 SourceFunction 的对象作为参数传入。代码如下:

object UDFSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val ds = env.addSource(new MysqlSource())
    ds.print()
    env.execute()
  }
}

?实现 SourceFunction 的代码如下:

class MysqlSource extends SourceFunction[Stu] {
  var isFlag = true
  val url = "jdbc:mysql://192.168.100.155:3306/exp"
  val driver = "com.mysql.jdbc.Driver"
  val username = "root"
  val password = "okok"
  Class.forName(driver)

  override def run(sourceContext: SourceFunction.SourceContext[Stu]): Unit = {
    //添加mysql数据库的配置信息
    var conn = DriverManager.getConnection(url,username,password)  //该参数不能作为成员变量,必须写入方法中,否则报错:无法实现序列化
    val ps = conn.prepareStatement("select * from s1_student")
    while (isFlag) {
      val rs = ps.executeQuery()
      while (rs.next()) {
        var no = rs.getString("sno")
        var name = rs.getString("sname")
        var birthday = rs.getString("sbirthday")
        var sex = rs.getString("ssex")
        var sclass = rs.getString("sclass")
        sourceContext.collect(new Stu(no,name,birthday,sex,sclass))
      }
      Thread.sleep(3000)
    }
  }

  override def cancel(): Unit = {
    isFlag = false
  }
}

Stu类:

class Stu (no:String,name:String,birthday:String,sex:String,sclass:String) extends Serializable {
  override def toString = {
    "no:" + no + "\tname:" + name + "\tbirthday:" + birthday + "\tsex:" + sex + "\tsclass:" + sclass
  }
}

打印效果如下:

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-03 16:10:38  更:2022-01-03 16:12:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 3:46:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码