IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink快速上手之流批处理WordCount(二) -> 正文阅读

[大数据]Flink快速上手之流批处理WordCount(二)

作者:token keyword

上一篇简单介绍了Flink的几个基本的概念,今天通过一个WordCount的案例来初步了解一下Flink是怎样做计算的。

这里我是用的是maven构建项目,在pom文件中添加Flink的依赖以及其他的依赖

<properties>
    <flink.version>1.12.0</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.11</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

这里我使用的是版本是1.12,语言采用的是1.8 Java,也引入了Scala,是因为Flink底层的通信采用的Akka,需要用到Scala,也引入了日志的包,方便排查问题。

在resources下面创建log4j.propertieswen文件,将日志的配置写在文件里面

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

然后在某一个文件下新建一个文件,以空格的方式,输入几个单词,等会要用

然后创建对应的类,这里先用Flink的批处理来计算WordCount,具体的操作步骤如下:

  1. 获取执行环境
  2. 读取对应路径下的文件
  3. 将读取到的数据进行压平
  4. 将一个一个的单词转换成元组
  5. 分组,然后按照key进行分组
  6. 聚合计算
  7. 打印输出

具体的代码如下:

package com.bigdata_world;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @version 1.0
 * @description:
 * @author: bigdata_world
 * @create: 2021-08-15 14:21
 * @since JDK 1.8
 **/
public class Flink01_WordCount_Batch {

    public static void main(String[] args) throws Exception {


        //使用Flink批处理来进行WordCount

        //1、获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2、读取文本数据
        DataSource<String> source = env.readTextFile("input/input.txt");

        //3、压平
        FlatMapOperator<String, String> wordDS = source.flatMap(new SourceFlatMap());

        //4、将单词转换成元组
        MapOperator<String, Tuple2<String, Integer>> mapDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value, 1);
            }
        });

        //5、分组,然后按照key进行分组,因为输入的是一个元组,所以按照第一位进行分组,这里传入0
        UnsortedGrouping<Tuple2<String, Integer>> groupByDS = mapDS.groupBy(0);

        //6、聚合,这里按照元组中的第二位进行聚合计算
        AggregateOperator<Tuple2<String, Integer>> sum = groupByDS.sum(1);

        //7、打印输出
        sum.print();




    }


    /**
     * @param <T> Type of the input elements.
     * @param <O> Type of the returned elements.
     *            <p>
     *            FlatMapFunction是一个接口,源码里面有两个范型
     *            <T>表示的是输入元素的类型
     *            <O> 表示输出元素的类型
     *            这里我都是String类型
     **/
    public static class SourceFlatMap implements FlatMapFunction<String, String> {

        @Override
        public void flatMap(String value, Collector<String> out) throws Exception {

            //1、输入的是一行一行的数据,需要按照空格切割
            String[] words = value.split(" ");

            //2、遍历,通过out写出一个一个的单词
            for (String word : words) {
                out.collect(word);
            }
        }
    }
}

控制台输出的结果如下:

(hive,1)
(flink,1)
(hello,2)
(slot,1)
(spark,1)

下面再进行流处理的WordCount,流处理又分为有界流和无界流,我们先写有界流的。

具体的代码思路跟批处理是一样的。这里不再赘述,直接上代码。

package com.bigdata_world;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @version 1.0
 * @description:
 * @author: bigdata_world
 * @create: 2021-08-15 15:29
 * @since JDK 1.8
 **/
public class Flink02_WordCount_Bounded {

    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、读取文件创建流
        DataStreamSource<String> source = env.readTextFile("input/input.txt");

        //3、压平
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = source.flatMap(new SourceTupleToFlatMap());

        //4、分组,按照元组中的第一个元素进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyDS = tupleDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //5、聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyDS.sum(1);

        //6、打印输出
        sum.print();

        //7、启动任务
        env.execute();


    }


    public static class SourceTupleToFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {

            //1、按照空格将每一行的数据切割成一个单词
            String[] words = value.split(" ");

            //2、以Tuple2的形式循环遍历输出
            for (String word : words) {
                out.collect(new Tuple2<>(word, 1));

            }
        }
    }
}

输出结果如下:

1> (spark,1)
2> (hello,1)
4> (slot,1)
2> (hello,2)
1> (hive,1)
4> (flink,1)

根据上面的结果,从hello单词的累加结果可以看出,Flink确实是来一条计算一条,来一条累加一条。

那可能有的小伙伴要问了,每个结果前面的数字代表什么?

前面的数字跟电脑的CPU核心数有关,我这儿最大的数字是4,表示CPU的核心数是4,表示整个计算过程中它的并行度默认等于核心数。

那可不可以设置并行度呢?

答案当然是可以,可以通过设置env.setParallelism(1);参数,设置全局的并行度。结果如下:

(hello,1)
(spark,1)
(hello,2)
(hive,1)
(flink,1)
(slot,1)

使用无界流来计算WordCount,生产环境中基本上都是无界流。无界流我打算从端口读取数据,其他的代码都是一样的。

package com.bigdata_world;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @version 1.0
 * @description:
 * @author: bigdata_world
 * @create: 2021-08-15 16:01
 * @since JDK 1.8
 **/
public class Flink03_WordCount_Unbounded {

    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置并行度
        env.setParallelism(1);

        //2、从端口读取数据创建流
        DataStreamSource<String> source = env.socketTextStream("localhost",9999);

        //3、压平
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = source.flatMap(new SourceTupleToFlatMap());

        //4、分组,按照元组中的第一个元素进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyDS = tupleDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //5、聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyDS.sum(1);

        //6、打印输出
        sum.print();

        //7、启动任务
        env.execute();
    }

    public static class SourceTupleToFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {

            //1、按照空格将每一行的数据切割成一个单词
            String[] words = value.split(" ");

            //2、以Tuple2的形式循环遍历输出
            for (String word : words) {
                out.collect(new Tuple2<>(word, 1));

            }
        }
    }
}

打开终端,输入 nc -lk 9999
在这里插入图片描述
输出的结果如下:

(hello,1)
(,1)
(hello,2)
(hive,1)
(hive,2)
(spark,1)
(kafka,1)

以上就是wordcount的集中写法,这里做一个简单的总结:

  1. 批处理使用的执行环境是ExecutionEnvironment,而流处理使用的执行环境是StreamExecutionEnvironment
  2. 批处理在进行分组的时候,使用的是groupBy,而在流处理中分组是没有groupBy,需要使用keyBy
  3. 并行度跟CPU的核心数有关,默认等于核心数,也可以设置全局唯一的并行度
  4. 批处理的时候,执行环境不需要启动任务,流处理需要在最后调用execute() 去启动任务

有兴趣的可以关注微信公众号:bigdata_world

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-16 11:48:52  更:2021-08-16 11:50:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:07:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码