1、Flink开发环境搭建
1.1、创建Maven项目
- 1、选择“File” -->“New”–>“Project”
- 2、选择 Maven,设置JDK版本,选择maven项目的模板
org.apache.maven.archetypes:maven-archetype-quickstart
#代表普通的maven项目面板
- 3、设置Groupid和Artifactid
Groupid:公司名称
Artifactid:项目模块
- 4、检查Maven环境,选择”Next“
主要选择maven路径和settings路径
- 5、检查项目名和工作空间,选择finish
- 6、等待项目创建,下载资源,创建完成后目录结构如下
1.2、pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hainiu</groupId>
<artifactId>hainiuflink</artifactId>
<version>1.0</version>
<properties>
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<flink.version>1.9.3</flink.version>
<parquet.version>1.10.0</parquet.version>
<hadoop.version>2.7.3</hadoop.version>
<fastjson.version>1.2.72</fastjson.version>
<redis.version>2.9.0</redis.version>
<mysql.version>5.1.35</mysql.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.7</slf4j.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.scope>compile</project.build.scope>
<mainClass>com.hainiu.Driver</mainClass>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${redis.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/assembly/assembly.xml</descriptor>
</descriptors>
<archive>
<manifest>
<mainClass>${mainClass}</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12</version>
<configuration>
<skip>true</skip>
<forkMode>once</forkMode>
<excludes>
<exclude>**/**</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
2、Flink开发流程
2.1、获取执行环境
2.1.1、批处理环境获取
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2.1.2、流处理环境获取
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.1.3、自动模式
- Flink.12开始,流批一体,自动获取模式 - AUTOMATIC()
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
2.2、创建/加载数据源
2.1.2、从本地文件加载数据源 - 批处理
String path = "H:\\flink_demo\\flink_test\\src\\main\\resources\\wordcount.txt";
DataSet<String> inputDataSet = env.readTextFile(path);
2.1.2、从scoket中读取数据 - 流处理
DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);
2.1.3、从文件中动态读取数据 - 流处理
DataStream<String> lines = env.readTextFile("file:///path");
2.3、数据转换处理
DataSet<Tuple2<String, Integer>> resultDataSet = inputDataSet.flatMap(new MyFlatMapFunction())
.groupBy(0)
.sum(1);
2.4、数据输出/存储
- 输出位置:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到
scoket …
resultDataSet.print();
2.5、计算模型
- 获取执行环境(execution environment)
- 加载/创建初始数据集
- 对数据集进行各种转换操作(生成新的数据集)
- 指定将计算的结果放到何处去
- 触发APP执行
3、实战案例
3.1、批处理开发样例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String path = "H:\\flink_demo\\flink_test\\src\\main\\resources\\wordcount.txt";
DataSet<String> inputDataSet = env.readTextFile(path);
DataSet<Tuple2<String, Integer>> resultDataSet = inputDataSet.flatMap(new MyFlatMapFunction())
.groupBy(0)
.sum(1);
resultDataSet.print();
}
public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
- 运行结果
3.2、流处理开发样例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> inputDataStream = env.readTextFile("H:\\flink_demo\\flink_test\\src\\main\\resources\\wordcount.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0)
.sum(1);
resultDataStream.print();
env.execute();
}
}
统计文本:Flink
输出:Flink,1
统计文本:增加Spark
输出:Flink,2 Spark,1
统计文本:新增python
输出:Flink,3 Spark,2 python,1
3.3、流/批不同
流式处理的结果:是不断刷新的,在这个例子中,数据时一行一行进入处理任务的,进来一批处理一批,没有数据就处于等待状态。
4、Flink在Yarn部署
4.1、代码打包
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
DataStreamSource<String> inputDataStream = env.socketTextStream(host,port);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0)
.sum(1);
resultDataStream.print().setParallelism(1);
System.out.println(env.getExecutionPlan());
env.execute();
}
}
- 先编译后打包
4.2、通过UI界面部署到集群
(1)将jar包上传到flink集群
- Flink-ui界面 -》submit new job -》Add New
(2)配置参数
entry class:入口类
program Arguments:入口参数
parallelism:并行度,作业提交job时,如果代码没有配,以全局并行度为准,没有以此并性都为准,没有,以全局并行度为准
savapoint path:保存点,从之前存盘的地方启动
(3)运行结果
未成功,待解决
5、疑难杂症
5.1、maven的Plugin文件爆红
New -> Settings -> maven
6.2、Flink Web UI截面提交时显示Server reponse message
vim {flink_home}/log
|