IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink经典实战案例 -> 正文阅读

[大数据]flink经典实战案例

一、java版flink-wordcount-离线计算版

1.1maven构建flink,加入依赖

<!-- flink包依赖配置-start -->
     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <!--  <scope>provided</scope> -->
     </dependency>
     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.version}</artifactId>
        <version>${flink.version}</version>
     </dependency>
     <!-- flink包依赖配置-end -->

1.2 java实现flink wordCount的代码编写

1.2.1代码编写

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.Collector;

/**
* Flink实现离线数据DataSet版本的WordCount经典案例
*
*
*/
public class FlinkWordCount4DataSet {
    public static void main(String[] args) throws Exception {
        // 创建Flink的代码执行离线数据流上下文环境变量
        ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();
        // 定义从本地文件系统当中文件路径
        String filePath = "";
        if (args == null || args.length == 0) {
            filePath = "D:\\temp\\input.txt";
        } else {
            filePath = args[0];
        }
        // 获取输入文件对应的DataSet对象
        DataSet<String> inputLineDataSet = env.readTextFile(filePath);

        // 对数据集进行多个算子处理,按空白符号分词展开,并转换成(word, 1)二元组进行统计
        DataSet<Tuple2<String, Integer>> resultSet = inputLineDataSet
                .flatMap(
                        new FlatMapFunction<String, Tuple2<String, Integer>>() {
                            public void flatMap(String line,
                                    Collector<Tuple2<String, Integer>> out)
                                    throws Exception {
                                // 按空白符号分词
                                String[] wordArray = line.split("\\s");
                                // 遍历所有word,包成二元组输出
                                for (String word : wordArray) {
                                    out.collect(new Tuple2<String, Integer>(
                                            word, 1));
                                }
                            }
                        }).groupBy(0) // 返回的是一个一个的(word,1)的二元组,按照第一个位置的word分组
                .sum(1); // 将第二个位置上的freq=1的数据求和
        // 打印出来计算出来的(word,freq)的统计结果对

        // 注:print会自行执行env.execute方法,故不用再最后执行env.execute正式开启执行过程
        resultSet.print();
        // 注:writeAsText的sink算子,必须要调用env.execute方法才能正式开启环境执行
        // resultSet.writeAsText("d:\\temp\\output2", WriteMode.OVERWRITE)
        // .setParallelism(2);
        // 正式开启执行flink计算
        // env.execute();
    }
}

直接运行即可

1.2.2集群运行
使用maven-shade-plugin配置打包

第1种执行方式-传统的yarn jar方式执行
优点: 简单、易操作
缺点:
默认仅支持local模式支持,要改成分布式方式较麻烦。
需要在打包时候注意,将flink依赖去掉provided,即将依赖包全部打入最后的包中,会使最终包比较大。

第2种执行方式-flink建议的执行方式

1)下载flink1.13.1版本发布包 上传到集群中解压
将hadoop环境变量设置到linux profile文件当中

2)flink的三种运行模式

yarn application运行方式

./bin/flink run-application -t yarn-application -c main_class ../xxxxxx-0.0.1-SNAPSHOT.jar hdfs:///xxxx

运行结果在yarn中,通过日志查看

yarn per-job运行方式
在配置文件./conf/flink-conf.yaml,修改classloader.check-leaked-classloader: false

./bin/flink run -t yarn-per-job  -c mian_class ../xxxxx-0.0.1-SNAPSHOT.jar hdfs:///xxxxxx

黑窗口显示

yarn session运行方式
首先在yarn上提前启动flink session会话任务,并得到session task任务的yarn app-id

./bin/yarn-session.sh

提交flink job作业到session任务当中,正式进行作业计算
#第1种提交: 多加入-t yarn-session参数,此时必须指定app-id参数,即提前启动的session作业任务id

./bin/flink run -t yarn-session -Dyarn.application.id=application_xxxxx_xxx -c mian_class ../ xxxxx-0.0.1-SNAPSHOT.jar hdfs:///xxxxx

#第2种提交: 不加入-t yarn-session参数,则不需要手动指定app-id,其是自行寻找提前启动的session作业任务id
./bin/flink run -c main_class …/xxxxx-0.0.1-SNAPSHOT.jar hdfs:///xxxxxxxx
黑窗口显示

二、java版flink-wordcount-实时计算版

2.1配置环境

同上

2.2java实现flink wordcount代码编写-实时版

import  org.apache.flink.api.common.functions.FlatMapFunction;
import  org.apache.flink.api.java.tuple.Tuple2;
import  org.apache.flink.streaming.api.datastream.DataStream;
import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import  org.apache.flink.util.Collector;
/**
* Flink实现实时数据流DataStream版本的WordCount经典案例
*
*
*/
public class  FlinkWordCount4DataStream {
   public static void  main(String[] args) throws  Exception {
     // 创建Flink的代码执行实时流处理上下文环境变量
     StreamExecutionEnvironment  env = StreamExecutionEnvironment
           .getExecutionEnvironment();
     // 定义读取数据机器主机名称和端口
     String host = "localhost";
     int port = 9999;
     // 获取输入对应的socket输入的实时流数据
     DataStream<String>  inputLineDataStream =  env.socketTextStream(host,
           port);
     // 对数据集进行多个算子处理,按空白符号分词展开,并转换成(word, 1)二元组进行统计
     DataStream<Tuple2<String,  Integer>> resultStream =  inputLineDataStream
           .flatMap(
                new  FlatMapFunction<String,  Tuple2<String, Integer>>() {
                   public void  flatMap(String line,
                         Collector<Tuple2<String,  Integer>> out)
                         throws  Exception {
                      // 按空白符号分词
                      String[]  wordArray = line.split("\\s");
                      // 遍历所有word,包成二元组输出
                      for  (String word : wordArray) {
                         out.collect(new Tuple2<String,  Integer>(
                              word, 1));
                      }
                   }
                }).keyBy(0) //  返回的是一个一个的(word,1)的二元组,按照第一个位置的word分组,因为此实时流是无界的,即数据并不完整,故不用group
                         // by而是用keyBy来代替
           .sum(1); // 将第二个位置上的freq=1的数据求和
     // 打印出来计算出来的(word,freq)的统计结果对
     resultStream.print();
     
//   resultStream.writeAsText("./output", WriteMode.OVERWRITE)
//         .setParallelism(2);
     
     // 正式启动实时流处理引擎
     env.execute();
   }
}

2.3运行

windows版本
下载windows netcat

https://blog.csdn.net/qq_37585545/article/details/82250984

解压下载完的压缩包后,找到nc64.exe所在的目录
启动数据输出命令
./nc64.exe -lp 9999
启动Java代码
在nc64窗口中输入数据

三、scala版flink-wordcount-离线计算版

3.1、maven构建flink开发环境与测试

引入依赖

 <!-- scala开发flink依赖-start -->
     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.compile.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
     </dependency>
     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.compile.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
     </dependency>
     <!-- scala开发flink依赖-end  -->

3.2scala实现flink wordcount代码编写-离线版

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment

/**

  • scala版本的flink wordcount离线版
    */
    object FlinkWordCount4DataSet4Scala {
    def main(args: Array[String]): Unit = {
    //获取上下文执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //加载数据源-1-从内存当中的字符串渠道
    // val source = env.fromElements(“中国 抗美 援朝 战争 很伟大”, “抗美 需要 中国”)

    // 加载数据源-2-定义从本地文件系统当中文件路径
    var filePath = “”;
    if (args == null || args.length == 0) {
    filePath = “D:\temp\input.txt”;
    } else {
    filePath = args(0);
    }
    val source = env.readTextFile(filePath);

    //进行transformation操作处理数据
    val ds = source.flatMap(x => x.split("\s+")).map((_, 1)).groupBy(0).sum(1)

    //输出到控制台
    ds.print()

    // 正式开始执行操作
    // 由于是Batch操作,当DataSet调用print方法时,源码内部已经调用Excute方法,所以此处不再调用
    //如果调用反而会出现上下文不匹配的执行错误
    //env.execute(“Flink Batch Word Count By Scala”)
    }
    }
    本地执行

四、scala版flink-wordcount-实时计算版

4.1、maven构建flink开发环境与测试

同上

4.2scala版flink-wordcount代码

import  org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.createTypeInformation /**
* scala版本的flink wordcount实时版
*/ object  FlinkWordCount4DataStream4Scala  {   def main(args: Array[String]):  Unit = {
    //获取上下文执行环境
    val env =  StreamExecutionEnvironment.getExecutionEnvironment
    //加载或创建数据源-从socket端口获取
    val source =  env.socketTextStream("localhost", 9999, '\n')
    //进行transformation操作处理数据
    val dataStream =  source.flatMap(_.split("\\s+")).map((_, 1)).keyBy(0).sum(1)
    //输出到控制台
    dataStream.print()
    //执行操作
     env.execute("FlinkWordCount4DataStream4Scala")   } }

4.3测试

使用netcat同上

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-01 12:00:29  更:2021-09-01 12:00:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 17:02:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码