一、java版flink-wordcount-离线计算版
1.1maven构建flink,加入依赖
<!-- flink包依赖配置-start -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink包依赖配置-end -->
1.2 java实现flink wordCount的代码编写
1.2.1代码编写
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.Collector;
/**
* Flink实现离线数据DataSet版本的WordCount经典案例
*
*
*/
public class FlinkWordCount4DataSet {
public static void main(String[] args) throws Exception {
// 创建Flink的代码执行离线数据流上下文环境变量
ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
// 定义从本地文件系统当中文件路径
String filePath = "";
if (args == null || args.length == 0) {
filePath = "D:\\temp\\input.txt";
} else {
filePath = args[0];
}
// 获取输入文件对应的DataSet对象
DataSet<String> inputLineDataSet = env.readTextFile(filePath);
// 对数据集进行多个算子处理,按空白符号分词展开,并转换成(word, 1)二元组进行统计
DataSet<Tuple2<String, Integer>> resultSet = inputLineDataSet
.flatMap(
new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out)
throws Exception {
// 按空白符号分词
String[] wordArray = line.split("\\s");
// 遍历所有word,包成二元组输出
for (String word : wordArray) {
out.collect(new Tuple2<String, Integer>(
word, 1));
}
}
}).groupBy(0) // 返回的是一个一个的(word,1)的二元组,按照第一个位置的word分组
.sum(1); // 将第二个位置上的freq=1的数据求和
// 打印出来计算出来的(word,freq)的统计结果对
// 注:print会自行执行env.execute方法,故不用再最后执行env.execute正式开启执行过程
resultSet.print();
// 注:writeAsText的sink算子,必须要调用env.execute方法才能正式开启环境执行
// resultSet.writeAsText("d:\\temp\\output2", WriteMode.OVERWRITE)
// .setParallelism(2);
// 正式开启执行flink计算
// env.execute();
}
}
直接运行即可
1.2.2集群运行 使用maven-shade-plugin配置打包
第1种执行方式-传统的yarn jar方式执行 优点: 简单、易操作 缺点: 默认仅支持local模式支持,要改成分布式方式较麻烦。 需要在打包时候注意,将flink依赖去掉provided,即将依赖包全部打入最后的包中,会使最终包比较大。
第2种执行方式-flink建议的执行方式
1)下载flink1.13.1版本发布包 上传到集群中解压 将hadoop环境变量设置到linux profile文件当中
2)flink的三种运行模式
yarn application运行方式
./bin/flink run-application -t yarn-application -c main_class ../xxxxxx-0.0.1-SNAPSHOT.jar hdfs:///xxxx
运行结果在yarn中,通过日志查看
yarn per-job运行方式 在配置文件./conf/flink-conf.yaml,修改classloader.check-leaked-classloader: false
./bin/flink run -t yarn-per-job -c mian_class ../xxxxx-0.0.1-SNAPSHOT.jar hdfs:///xxxxxx
黑窗口显示
yarn session运行方式 首先在yarn上提前启动flink session会话任务,并得到session task任务的yarn app-id
./bin/yarn-session.sh
提交flink job作业到session任务当中,正式进行作业计算 #第1种提交: 多加入-t yarn-session参数,此时必须指定app-id参数,即提前启动的session作业任务id
./bin/flink run -t yarn-session -Dyarn.application.id=application_xxxxx_xxx -c mian_class ../ xxxxx-0.0.1-SNAPSHOT.jar hdfs:///xxxxx
#第2种提交: 不加入-t yarn-session参数,则不需要手动指定app-id,其是自行寻找提前启动的session作业任务id ./bin/flink run -c main_class …/xxxxx-0.0.1-SNAPSHOT.jar hdfs:///xxxxxxxx 黑窗口显示
二、java版flink-wordcount-实时计算版
2.1配置环境
同上
2.2java实现flink wordcount代码编写-实时版
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Flink实现实时数据流DataStream版本的WordCount经典案例
*
*
*/
public class FlinkWordCount4DataStream {
public static void main(String[] args) throws Exception {
// 创建Flink的代码执行实时流处理上下文环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// 定义读取数据机器主机名称和端口
String host = "localhost";
int port = 9999;
// 获取输入对应的socket输入的实时流数据
DataStream<String> inputLineDataStream = env.socketTextStream(host,
port);
// 对数据集进行多个算子处理,按空白符号分词展开,并转换成(word, 1)二元组进行统计
DataStream<Tuple2<String, Integer>> resultStream = inputLineDataStream
.flatMap(
new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out)
throws Exception {
// 按空白符号分词
String[] wordArray = line.split("\\s");
// 遍历所有word,包成二元组输出
for (String word : wordArray) {
out.collect(new Tuple2<String, Integer>(
word, 1));
}
}
}).keyBy(0) // 返回的是一个一个的(word,1)的二元组,按照第一个位置的word分组,因为此实时流是无界的,即数据并不完整,故不用group
// by而是用keyBy来代替
.sum(1); // 将第二个位置上的freq=1的数据求和
// 打印出来计算出来的(word,freq)的统计结果对
resultStream.print();
// resultStream.writeAsText("./output", WriteMode.OVERWRITE)
// .setParallelism(2);
// 正式启动实时流处理引擎
env.execute();
}
}
2.3运行
windows版本 下载windows netcat
https://blog.csdn.net/qq_37585545/article/details/82250984
解压下载完的压缩包后,找到nc64.exe所在的目录 启动数据输出命令 ./nc64.exe -lp 9999 启动Java代码 在nc64窗口中输入数据
三、scala版flink-wordcount-离线计算版
3.1、maven构建flink开发环境与测试
引入依赖
<!-- scala开发flink依赖-start -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.compile.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compile.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- scala开发flink依赖-end -->
3.2scala实现flink wordcount代码编写-离线版
import org.apache.flink.api.scala._ import org.apache.flink.api.scala.ExecutionEnvironment
/**
-
scala版本的flink wordcount离线版 */ object FlinkWordCount4DataSet4Scala { def main(args: Array[String]): Unit = { //获取上下文执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //加载数据源-1-从内存当中的字符串渠道 // val source = env.fromElements(“中国 抗美 援朝 战争 很伟大”, “抗美 需要 中国”) // 加载数据源-2-定义从本地文件系统当中文件路径 var filePath = “”; if (args == null || args.length == 0) { filePath = “D:\temp\input.txt”; } else { filePath = args(0); } val source = env.readTextFile(filePath); //进行transformation操作处理数据 val ds = source.flatMap(x => x.split("\s+")).map((_, 1)).groupBy(0).sum(1) //输出到控制台 ds.print() // 正式开始执行操作 // 由于是Batch操作,当DataSet调用print方法时,源码内部已经调用Excute方法,所以此处不再调用 //如果调用反而会出现上下文不匹配的执行错误 //env.execute(“Flink Batch Word Count By Scala”) } } 本地执行
四、scala版flink-wordcount-实时计算版
4.1、maven构建flink开发环境与测试
同上
4.2scala版flink-wordcount代码
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.createTypeInformation /**
* scala版本的flink wordcount实时版
*/ object FlinkWordCount4DataStream4Scala { def main(args: Array[String]): Unit = {
//获取上下文执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//加载或创建数据源-从socket端口获取
val source = env.socketTextStream("localhost", 9999, '\n')
//进行transformation操作处理数据
val dataStream = source.flatMap(_.split("\\s+")).map((_, 1)).keyBy(0).sum(1)
//输出到控制台
dataStream.print()
//执行操作
env.execute("FlinkWordCount4DataStream4Scala") } }
4.3测试
使用netcat同上
|