IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【尚硅谷Java版】Flink中DataStream API篇之读取数据源 -> 正文阅读

[大数据]【尚硅谷Java版】Flink中DataStream API篇之读取数据源

????????flink可以从各种来源获取数据,然后构建DataStream进行转换处理,一般把数据的输入称为数据源,读取数据的算子就称为源算子(在代码里面调用的那个API)
????????在代码中读取数据源的方式有以下三种:

  1. 从文件中读取数据
  2. 从集合中读取数据
  3. 从元素中读取数据

在使用以上三种方式读取数据源之前先创建一个名称为Event的类

注意:
这里我们需要注意以下几点:
*1、类必须是公有的
*2、所有属性都是公有的
*3、所有属性的类型都是可以序列化的

package com.atguigu.chapter05;

import java.sql.Timestamp;

/**
 * @author potential
 */
public class Event {
    /**
     * 这里我们需要注意以下几点:
     *      1、类必须是公有的
     *      2、所有属性都是公有的
     *      3、所有属性的类型都是可以序列化的
     */
    public String user;
    public String url;
    public Long timestamp;

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}

使用三种方式读取数据源

方式一:从文件中读取数据

(1)创建一个input包,在里面创建clicks.txt文件
在这里插入图片描述
(2)并在clicks.txt文件中添加如下内容(可随意添加,也可直接复制我这里的直接使用即可):

 Mary,./home,1000
Alice,./cart,2000
Bob,./prod?id=100,3000
Bob,./cart,4000
Bob,./home,5000
Mary,./home,6000
Bob,./cart,7000
Bob,./home,8000
Bob,./prod?id=10, 9000

(3)编写测试类SourceTest,在测试类中使用方式一:从文件中读取数据源

   //(1)从文件中读取数据 批量处理   常用
        DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");

方式二:从集合中读取数据

在测试类中使用方式二:从集合中读取数据源

 ArrayList<Integer> nums=new ArrayList<>();
        nums.add(2);
        nums.add(5);
        DataStreamSource<Integer> numStream = env.fromCollection(nums);

        ArrayList<Event> events = new ArrayList<>();
        events.add(new Event("Mary","./home",1000L));
        events.add(new Event("Bob","./cart",2000L));
        DataStreamSource<Event> stream2 = env.fromCollection(events);

方式三:从元素中读取数据

  DataStreamSource<Event> stream3 = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

测试

将上面的三种方式同时写入一个测试类SourceTest当中,如下代码所示。进行测试

package com.atguigu.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * @author potential
 */
public class SourceTest {
    public static void main(String[] args) throws Exception {
        //1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //方便调试,对于全局的并行度设为1
        env.setParallelism(1);
        /**
         * 2、从不同的来源读取数据
         */
        //(1)从文件中读取数据 批量处理   常用
        DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");

        //(2)从集合中读取数据  常用于测试
        ArrayList<Integer> nums=new ArrayList<>();
        nums.add(2);
        nums.add(5);
        DataStreamSource<Integer> numStream = env.fromCollection(nums);

        ArrayList<Event> events = new ArrayList<>();
        events.add(new Event("Mary","./home",1000L));
        events.add(new Event("Bob","./cart",2000L));
        DataStreamSource<Event> stream2 = env.fromCollection(events);

        //(3)、从元素读取数据  常用于测试
        DataStreamSource<Event> stream3 = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        stream1.print("1");
        numStream.print("nums");
        stream2.print("2");
        stream3.print("3");

        env.execute();

    }
}

测试结果:
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-17 16:30:00  更:2022-07-17 16:32:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:33:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码