[root@hadoop01 flink-1.10.1]
bin/zkServer.sh start
[root@hadoop01 flink-1.10.1]
- 数据下沉后为什么会生成多个文件?
- 作业完成后生成的文件数与什么有关?
- 并行度如何设置?优先级?
1 Flink
1.1 简介
批,流一体化的框架 批 :离线处理 流 :实时处理
tar-xvzf -x 从档案文件中释放文件。 -v 详细报告tar处理的文件信息。如无此选项,tar不报告文件信息。 -z 用gzip来压缩/解压缩文件,加上该选项后可以将档案文件进行压缩, 但还原时也一定要使用该选项进行解压缩。 -f 使用档案文件或设备,这个选项通常是必选的。
1.2 集群搭建
上传解压
上传Flink 压缩包到指定目录 解压缩flink 到/opt/servers 目录
cd /opt/servers
tar -xvzf flink-1.10.1-bin-scala_2.12.tgz -C ../servers/
修改配置文件
-
修改安装目录下conf 文件夹内的flink-conf.yaml 配置文件,指定JobManager cd /opt/servers/flink-1.10.1/conf/
mkdir -p /opt/servers/flink-1.10.1/tmp
修改配置文件:flink-conf.yaml
jobmanager.rpc.address: hadoop01
taskmanager.tmp.dirs: /opt/servers/flink-1.10.1/tmp
-
修改安装目录下conf 文件夹内的slave 配置文件,指定TaskManager hadoop01
hadoop02
hadoop03
-
使用vi 修改/etc/profile 系统环境变量配置文件,添加HADOOP_CONF_DIR 目录 export HADOOP_CONF_DIR=/opt/servers/hadoop-2.7.7/etc/hadoop
YARN_CONF_DIR 或者HADOOP_CONF_DIR 必须将环境变量设置为读取YARN 和HDFS 配置 -
新版本需要增加hadoop的附加组件,下载一个jar包放在Flink的lib目录下 下载地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/ -
分发/etc/profile 到其他两个节点 scp -r /etc/profile hadoop02:/etc
scp -r /etc/profile hadoop03:/etc
-
每个节点重新加载环境变量 source /etc/profile
-
将配置好的Flink 目录分发给其他的两台节点 cd /opt/servers
scp -r flink-1.10.1/ hadoop02:$PWD
scp -r flink-1.10.1/ hadoop03:$PWD
1.3 启动
启动Flink 集群
cd /opt/servers/flink-1.10.1
bin/start-cluster.sh
启动HDFS 集群
start-all.sh
1.4 使用 Flink 测试词频统计
在HDFS 中创建/test/input 目录
hadoop fs -mkdir -p /test/input
上传wordcount.txt 文件到HDFS /test/input 目录
hadoop fs -put /root/wordcount.txt /test/input
并运行测试任务
bin/flink run examples/batch/WordCount.jar --input hdfs://hadoop01:8020/test/input/wordcount.txt --output hdfs://hadoop01:8020/test/output/001
不output直接打印控制台
浏览Flink Web UI 界面
http://hadoop01:8081
启动flink集群
cd /opt/servers/flink-1.10.1
bin/start-cluster.sh
高可用HA
修改slots taskmanager.numberOfTaskSlots: 2 2 4
cd /opt/servers/flink-1.10.1/conf/
[root@hadoop03 conf]# vim flink-conf.yaml
2 词频统计
2.1 java
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
package cn.tedu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> source = env.readTextFile("flink_java/data/wordcount.txt");
FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
});
UnsortedGrouping<Tuple2<String, Integer>> groupeWords = wordAndOne.groupBy(0);
AggregateOperator<Tuple2<String, Integer>> result = groupeWords.sum(1);
result.print();
}
}
2.2 scala
数据下沉后为什么会生成多个文件?
package cn.tedu.batch
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
object BatchWordCount {
def main(args: Array[String]): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val source: DataSet[String]
= env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
import org.apache.flink.api.scala._
val words: DataSet[String] = source.flatMap(_.split(" "))
val wordAndOne = words.map((_, 1))
val groupedDataSet: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)
val result = groupedDataSet.sum(1)
result.writeAsText("hdfs://hadoop01:8020/test/output/102")
env.execute()
}
}
文件数 根据计算机处理器数生成
打包到服务器
作业完成后生成的文件数与什么有关?
package cn.tedu.batch
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
object BatchWordCount {
def main(args: Array[String]): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val source: DataSet[String]
= env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
import org.apache.flink.api.scala._
val words: DataSet[String] = source.flatMap(_.split(" "))
val wordAndOne = words.map((_, 1))
val groupedDataSet: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)
val result = groupedDataSet.sum(1)
result.writeAsText("hdfs://hadoop01:8020/test/output/101",WriteMode.OVERWRITE)
env.execute()
}
}
打包 上传服务器
[root@hadoop01 flink-1.10.1]
在flink界面提交作业
并行度设置为2,submit
并行度
并行度优先级
算子的并行度 > env环境的并行度 > 运行参数的并行度 > 配置文件的默认并行度
2.3 源的读取(scala)
package cn.tedu.batch.source
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object BatchFromFile {
def main(args: Array[String]): Unit = {
val env=ExecutionEnvironment.getExecutionEnvironment
val source1: DataSet[String]= env.readTextFile("flink_scala/data/wordcount.txt")
val source2: DataSet[String] = env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
import org.apache.flink.api.scala._
val source3: DataSet[Subject] = env.readCsvFile[Subject]("flink_scala/data/subject.csv")
val source4: DataSet[String]= env.readTextFile("flink_scala/data/wordcount.txt.gz")
source4.print()
}
case class Subject(id:Int,subjectName:String)
}
|