IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> day11&12 ---- Flink -> 正文阅读

[大数据]day11&12 ---- Flink

学习链接

##关闭flink集群
[root@hadoop01 flink-1.10.1]# bin/stop-cluster.sh

### zookeeper
bin/zkServer.sh start

##关闭所有服务
[root@hadoop01 flink-1.10.1]# stop-all.sh
  1. 数据下沉后为什么会生成多个文件?
  2. 作业完成后生成的文件数与什么有关?
  3. 并行度如何设置?优先级?

1 Flink

1.1 简介

批,流一体化的框架
:离线处理
:实时处理

tar-xvzf
-x 从档案文件中释放文件。
-v 详细报告tar处理的文件信息。如无此选项,tar不报告文件信息。
-z 用gzip来压缩/解压缩文件,加上该选项后可以将档案文件进行压缩,
但还原时也一定要使用该选项进行解压缩。
-f 使用档案文件或设备,这个选项通常是必选的。

1.2 集群搭建

上传解压

上传Flink 压缩包到指定目录
解压缩flink 到/opt/servers 目录

cd /opt/servers
tar -xvzf flink-1.10.1-bin-scala_2.12.tgz -C ../servers/

修改配置文件

  1. 修改安装目录下conf 文件夹内的flink-conf.yaml 配置文件,指定JobManager

    cd /opt/servers/flink-1.10.1/conf/
    ##创建目录
    mkdir -p /opt/servers/flink-1.10.1/tmp
    

    修改配置文件:flink-conf.yaml

    #配置Master 的机器名(IP 地址)
    
    jobmanager.rpc.address: hadoop01
    
    #配置每个taskmanager 生成的临时文件夹
    
    taskmanager.tmp.dirs: /opt/servers/flink-1.10.1/tmp
    
  2. 修改安装目录下conf 文件夹内的slave 配置文件,指定TaskManager

    hadoop01
    hadoop02
    hadoop03
    
  3. 使用vi 修改/etc/profile 系统环境变量配置文件,添加HADOOP_CONF_DIR 目录

    export HADOOP_CONF_DIR=/opt/servers/hadoop-2.7.7/etc/hadoop
    

    YARN_CONF_DIR 或者HADOOP_CONF_DIR 必须将环境变量设置为读取YARN 和HDFS 配置

  4. 新版本需要增加hadoop的附加组件,下载一个jar包放在Flink的lib目录下

    下载地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/

  5. 分发/etc/profile 到其他两个节点

    scp -r /etc/profile hadoop02:/etc
    scp -r /etc/profile hadoop03:/etc
    
  6. 每个节点重新加载环境变量

    source /etc/profile
    
  7. 将配置好的Flink 目录分发给其他的两台节点

    cd /opt/servers
    scp -r flink-1.10.1/ hadoop02:$PWD
    scp -r flink-1.10.1/ hadoop03:$PWD
    

1.3 启动

启动Flink 集群

 cd /opt/servers/flink-1.10.1
 bin/start-cluster.sh

在这里插入图片描述
启动HDFS 集群

start-all.sh

1.4 使用 Flink 测试词频统计

在HDFS 中创建/test/input 目录

 hadoop fs -mkdir -p /test/input

上传wordcount.txt 文件到HDFS /test/input 目录

 hadoop fs -put /root/wordcount.txt /test/input

并运行测试任务

bin/flink run examples/batch/WordCount.jar --input hdfs://hadoop01:8020/test/input/wordcount.txt --output hdfs://hadoop01:8020/test/output/001

不output直接打印控制台

在这里插入图片描述

浏览Flink Web UI 界面

http://hadoop01:8081

在这里插入图片描述

启动flink集群

 cd /opt/servers/flink-1.10.1
 bin/start-cluster.sh

高可用HA

在这里插入图片描述

修改slots
taskmanager.numberOfTaskSlots:
2 2 4

cd /opt/servers/flink-1.10.1/conf/
[root@hadoop03 conf]# vim flink-conf.yaml

在这里插入图片描述
在这里插入图片描述

2 词频统计

2.1 java

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>
package cn.tedu;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //构建批处理的运行环境
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        //读取文件
        DataSource<String> source = env.readTextFile("flink_java/data/wordcount.txt");
//        source.print();
        //词频统计                          <输入类型,输出类型>
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            //每来一条数据,调用一次flatmap处理数据
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //获取每个词的数组
                String[] words = value.split(" ");
                //循环输出
                for (String word : words) {
                    out.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        });
//        wordAndOne.print();                   (hello,1)(hadoop,1)
        UnsortedGrouping<Tuple2<String, Integer>> groupeWords = wordAndOne.groupBy(0);
        AggregateOperator<Tuple2<String, Integer>> result = groupeWords.sum(1);
        //数据sink保存
        result.print();
    }
}

2.2 scala


数据下沉后为什么会生成多个文件?

package cn.tedu.batch

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode

object BatchWordCount {

  def main(args: Array[String]): Unit = {
    //构建Flink批处理的运行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //读取数据
    //val source: DataSet[String] = env.readTextFile("FLINKSCALA/data/wordcount.txt")
    val source: DataSet[String]
    = env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
    //source.print()
    //hello flink kafka flink kafka flink --> hello ,1
    //数据转化
    //需要导入隐式转换
    import org.apache.flink.api.scala._
    val words: DataSet[String] = source.flatMap(_.split(" "))
    val wordAndOne = words.map((_, 1))
    //分组
    val groupedDataSet: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)
    val result = groupedDataSet.sum(1)
    //val result = source.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
    //数据下沉
    //result.print()
    result.writeAsText("hdfs://hadoop01:8020/test/output/102")
    //执行批处理,flink中是惰性加载
    env.execute()
  }

}

在这里插入图片描述
文件数 根据计算机处理器数生成

打包到服务器


作业完成后生成的文件数与什么有关?

package cn.tedu.batch

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode

object BatchWordCount {

  def main(args: Array[String]): Unit = {
    //构建Flink批处理的运行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //读取数据
    //val source: DataSet[String] = env.readTextFile("FLINKSCALA/data/wordcount.txt")
    val source: DataSet[String]
    = env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
    //source.print()
    //hello flink kafka flink kafka flink --> hello ,1
    //数据转化
    //需要导入隐式转换
    import org.apache.flink.api.scala._
    val words: DataSet[String] = source.flatMap(_.split(" "))
    val wordAndOne = words.map((_, 1))
    //分组
    val groupedDataSet: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)
    val result = groupedDataSet.sum(1)
    //val result = source.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
    //数据下沉
    //result.print()
    result.writeAsText("hdfs://hadoop01:8020/test/output/101",WriteMode.OVERWRITE)
    //执行批处理,flink中是惰性加载
    env.execute()

  }

}

打包
上传服务器

[root@hadoop01 flink-1.10.1]# bin/flink run -m yarn-cluster /opt/data/flink_scala-1.0-SNAPSHOT.jar

在这里插入图片描述

在flink界面提交作业

在这里插入图片描述
并行度设置为2,submit
在这里插入图片描述
在这里插入图片描述

并行度

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

并行度优先级

算子的并行度 > env环境的并行度 > 运行参数的并行度 > 配置文件的默认并行度

在这里插入图片描述

2.3 源的读取(scala)

package cn.tedu.batch.source

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}


object BatchFromFile {
  def main(args: Array[String]): Unit = {
    //构建数据源
    val env=ExecutionEnvironment.getExecutionEnvironment

    //1.读取本地文件
    val source1: DataSet[String]= env.readTextFile("flink_scala/data/wordcount.txt")
//    source1.print()

    //2.读取hdfs文件
    val source2: DataSet[String] = env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
//    source2.print()

    //3.读取csv文件
    import org.apache.flink.api.scala._
    val source3: DataSet[Subject] = env.readCsvFile[Subject]("flink_scala/data/subject.csv")
//    source3.print()

    //4.读取压缩文件
    val source4: DataSet[String]= env.readTextFile("flink_scala/data/wordcount.txt.gz")
    source4.print()


  }
  case class Subject(id:Int,subjectName:String)

}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-12 17:36:19  更:2022-03-12 17:38:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 8:30:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码