IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink批处理和流处理两种方式实现WordCount代码示例 -> 正文阅读

[大数据]Flink批处理和流处理两种方式实现WordCount代码示例

重要说明:这个是flink1.12 以前的模式

新模式参考:Flink1.13批流合一的介绍_javastart的博客-CSDN博客
本文使用 Flink 的两种方式实现 WordCount

基于流计算
基于批计算

先说一下我的环境:
Flink 1.9
开发工具:Idea
Maven版本:3.3.9
Linux:CentOS 7
演示语言:Scala 2.11


1. Idea 新建 Maven 项目并配置以下依赖

<dependencies>
? ? <dependency>
? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? <artifactId>flink-scala_2.11</artifactId>
? ? ? ? <version>1.9.1</version>
? ? </dependency>
? ? <dependency>
? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? <artifactId>flink-streaming-scala_2.11</artifactId>
? ? ? ? <version>1.9.1</version>
? ? </dependency>
</dependencies>


2. 实现代码及详细注释


2.1 Flink 基于流计算实现 WordCount


案例需求:采用 Netcat 数据源发送数据,使用Flink统计每个单词的数量

Idea执行代码 –> 打开 Linux 使用 nc(netcat)命令发送数据测试
nc -lk 8888


2.2 Flink 基于批计算实现 WordCount


需求:读取本地数据文件,统计文件中每个单词出现的次数

wc.txt文件的内容

hadoop hbase hello
hello hadoop apache apache
flink hello


执行代码结果


2.3 附件:完整代码

package com.bigdataBC.flink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
? * 基于流计算的WordCount案例
? */
object WordCountBySrteaming {
? def main(args: Array[String]): Unit = {
? ? // 初始化Flink的Streaming(流计算)上下文执行环境
? ? val streamEvn: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

? ? //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
? ? import org.apache.flink.streaming.api.scala._

? ? // 设置默认的分区(分区优先级:先找单独设置的分区,若没有就用默认的)
? ? streamEvn.setParallelism(1)

? ? // 读取流数据
? ? val ds: DataStream[String] = streamEvn.socketTextStream("node1",8888)
? ? // 转换计算
? ? val result: DataStream[(String, Int)] = ds.flatMap(_.split(" "))
? ? ? .map((_, 1))
? ? ? .setParallelism(2) //设置单独的分区
? ? ? .keyBy(0) // 分组:必须制定根据哪个字段分组,参数代表当前要分组的字段的下标(另外还有fieldsname)
? ? ? .sum(1) // 1代表下标,下标为1的进行累加

? ? //打印结果到控制台
? ? result.print()
? ? ? .setParallelism(4) //设置单独的分区
? ? //启动流式处理,如果没有该行代码上面的程序不会运行
? ? streamEvn.execute("wordcount")

? }
}



?

package com.bigdataBC.flink

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

/**
? * 基于批计算的WordCount案例
? */
object WordCountByBatch {
? def main(args: Array[String]): Unit = {
? ? // 初始化Flink批计算环境、
? ? val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

? ? // 导入隐式转换
? ? import org.apache.flink.api.scala._

? ? // 设置默认的分区
// ? ?env.setParallelism(1)

? ? // 读取数据
? ? val ds: DataSet[String] = env.readTextFile("D:\\workspace\\Idea-workspace\\Flinkdemo\\src\\main\\resources\\wc.txt")

? ? // 转换计算
? ? val result: AggregateDataSet[(String, Int)] = ds.flatMap(_.split(" "))
? ? ? .map((_, 1))
? ? ? .groupBy(0)
? ? ? .sum(1)

? ? // 打印(这里的print不能设置分区)
? ? result.print()

? }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 17:49:30  更:2022-04-18 17:52:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:35:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码