前言
流式计算可能在日常不多见,主要统计一个阶段内的PV、UV,在风控场景很常见,比如统计某个用户一天内同地区下单总量来判断该用户是否为异常用户。还有一些大数据处理场景,如将某一段时间生成的日志按需要加工后倒入到存储DB中做查询报表。为什么要学习Flink,因为最近碰到一些实时计算性能问题,其次也不太理解实时计算底层实现原理,这里拿当下很流行的开源工具Flink作为待学习对象,一步一步深入Flink底层探索实时计算奥秘。
第一个程序
导maven依赖,主要依赖项如下:
<properties>
<blink.version>1.5.1</blink.version>
<scala.binary.version>2.11</scala.binary.version>
<blink-streaming.version>1.5.1</blink-streaming.version>
<log4j.version>1.2.17</log4j.version>
<slf4j-log4j.version>1.7.9</slf4j-log4j.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-core</artifactId>
<version>${blink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${blink.version}</version>
</dependency>
<!-- blink stream java -->
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${blink-streaming.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${blink.version}</version>
<scope>test</scope>
</dependency>
<!-- logging framework -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
这里引入比较干净,只包含flink相关核心包+日志包,接下来开始使用flink API完成第一个Hello World程序,这里我用的是flink官方WordCount Demo,代码如下:
package com.alibaba.security.blink;
import com.alibaba.security.blink.util.WordCountData;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {
private final ParameterTool params;
private final ExecutionEnvironment env;
public WordCount(String[] args) {
this.params = ParameterTool.fromArgs(args);
this.env = ExecutionEnvironment.createLocalEnvironment();
env.getConfig().setGlobalJobParameters(params);
}
public static void main(String[] args) throws Exception {
WordCount wordCount = new WordCount(args);
DataSet<String> dataSet = wordCount.getDataSetFromCommandLine();
wordCount.executeFrom(dataSet);
}
private DataSet<String> getDataSetFromCommandLine() {
DataSet<String> text;
if (params.has("input")) {
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
return text;
}
private void executeFrom(DataSet<String> text) throws Exception {
DataSet<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
if (this.params.has("output")) {
counts.writeAsCsv(this.params.get("output"), "\n", " ");
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
WordCountData.java代码如下:
package com.alibaba.security.blink.util;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
public class WordCountData {
public static final String[] WORDS = new String[] {
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};
public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
return env.fromElements(WORDS);
}
}
运行时如果加了命令行参数--input则从自定义输入文件中读取内容,否则从WordCountData中读取。
Flink代码启动通过org.apache.flink.api.java.ExecutionEnvironment#createLocalEnvironment()来完成,表示flink本地启动。flink只能处理DataSet,因此任何数据想要在flink里处理,都要被转换成DataSet,这里将文本转化为DataSet通过调org.apache.flink.api.java.ExecutionEnvironment#readTextFile方法。下面executeFrom方法就是flink核心处理流程了,先将一行行文本打散转化为Tuple2对象,Tuple2就是一个KV。然后对打散后的Tuple2集合进行groupBy,相同单词将被groupBy一起,最后将所有相同单词相加(sum),最终得到每个单词出现次数。
API调用
flatMap
flatMap在java里用的也不多,主要用的还是map,这里我用jdk1.8 Stream API写了一个flatMap demo
import org.apache.commons.lang3.tuple.Pair;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class Scratch {
public static void main(String[] args) {
String str = "abc,debf";
String[] strArray = str.split(",");
// 使用flatMap返回多个元素(Stream)
List<Object> result = Arrays.stream(strArray).flatMap(new Function<String, Stream<?>>() {
@Override
public Stream<?> apply(String s) {
return Stream.of(s.split("b"));
}
}).collect(Collectors.toList());
System.out.println(result);
// 使用map方式只能返回一个元素
List<Pair<String, Integer>> tupleResultList = Arrays.stream(strArray).map(new Function<String, Pair<String, Integer>>() {
@Override
public Pair<String, Integer> apply(String s) {
return Pair.of(s, s.length());
}
}).collect(Collectors.toList());
System.out.println(tupleResultList);
}
}
先将字符串str拆分成一个数组,然后遍历数组对数据中每个字符串再进行切割,将切割后生成的字符串数组重新构建为一个Stream对象并返回。也就是说flatMap做了一个一变多的事,一个流变成多个流了:
将Stream1中每个元素都遍历一遍,然后将遍历的每个元素又转化成一个Stream对象,最终生成的就是一个Stream集合。
如果用map只能返回一个元素
flink中flatMap和map也是一样的道理,上面flink例子里用的是flatMap,将每行记录转化后的单词都保存到Collector里,后面该Collector可以作为输入执行groupBy操作。而如果是换成map该怎么写呢?代码如下
// 这里map每次方法调用只会返回一个Tuple2对象
AggregateOperator<Tuple2<String, Integer>> firstSumResult = text.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");
if (tokens.length > 0) {
return new Tuple2<String, Integer>(tokens[0], 1);
}
return null;
}
}).groupBy(0).sum(1);
List<Tuple2<String, Integer>> result = firstSumResult.collect();
result.forEach(e -> LOGGER.info("word={},count={}", e.f0, e.f1));
可以看出MapFunction#map是有返回值的,且返回值为单元素,后面groupBy都是针对map后生成的集合来操作。
因此如何选择map与flatMap我个人认为:如果只是便利DataSet将一个对象转化成另一个对象可以使用map函数,如果是一个对象转化成多个对象,可以使用flatMap。
groupBy
groupBy也是DataSet提供的标准API之一,该方法有3个重载的方法,如下
?groupBy(int... fields)
?该方法只能对Tuple类型DataSet起作用,Tuple有哪些类呢?
使用该种groupBy方法举个例子:
/**
* @author shaoxian.ssx
* @date 2021/11/7
*/
public class SimpleGroupBy {
private final static Logger LOGGER = LoggerFactory.getLogger(SimpleGroupBy.class);
public static void main(String[] args) throws Exception {
LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
List<Tuple2<String, Integer>> list = new ArrayList<>(16);
Random random = new Random();
for (int i = 0; i < 16; i++) {
int num = random.nextInt(20);
list.add(new Tuple2<String, Integer>(String.valueOf(num), num));
}
LOGGER.info("listResult={}", list);
List<Tuple2<String, Integer>> flinkRes = env.fromCollection(list).groupBy(0).sum(1).collect();
LOGGER.info("flinkResult={}", flinkRes);
}
}
输出如下:
16:41:33,708 [ ? ? ? ? ? main] INFO ?com.alibaba.security.blink.SimpleGroupBy ? ? ? ? ? ? ? ? ? ? ?- listResult=[(1,1), (9,9), (14,14), (18,18), (1,1), (7,7), (16,16), (1,1), (4,4), (16,16), (15,15), (17,17), (16,16), (0,0), (12,12), (15,15)]
16:41:37,573 [ ? ? ? ? ? main] INFO ?com.alibaba.security.blink.SimpleGroupBy ? ? ? ? ? ? ? ? ? ? ?- flinkResult=[(12,12), (18,18), (9,9), (14,14), (15,30), (7,7), (16,48), (17,17), (4,4), (0,0), (1,3)]
通过上面groupBy例子可以看出,groupBy(int... fields)?方法仅针对DataSet类型为Tuple系列的数据源才有效,fields顺序为Tuple中属性位置,如Tuple第0号属性,则参数为0,以此类推。
?groupBy(String... fields)
该方法可以针对那些DataSet为POJO类型数据源,方法参数为POJO属性且该属性必须有公共的setter、getter方法,并且该POJO必须有一个默认无参数构造方法。举个例子,获取某个用户所有下单IP个数,代码如下:
/**
* @author shaoxian.ssx
* @date 2021/11/7
*/
public class SimpleGroupBy {
private final static Logger LOGGER = LoggerFactory.getLogger(SimpleGroupBy.class);
public static void main(String[] args) throws Exception {
LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
Order order1 = new Order(1001L, "张三", "192.168.1.10");
Order order2 = new Order(1002L, "李四", "192.168.1.212");
Order order3 = new Order(1001L, "张三", "192.168.1.50");
Order order4 = new Order(1003L, "王五", "192.168.1.71");
DataSource<Order> dataSource = env.fromElements(order1, order2, order3, order4);
List<Order> result = dataSource.groupBy("byrId").reduce(new ReduceFunction<Order>() {
@Override
public Order reduce(Order value1, Order value2) throws Exception {
if (!value1.getIp().equals(value2.getIp())) {
value1.setIpCount(value1.getIpCount() + value2.getIpCount());
return value1;
}
return value1;
}
}).collect();
result.forEach(e -> LOGGER.info("order={}", e));
}
/**
* 必须为public类型,否则flink校验类型会报错
*/
public static class Order {
private Long byrId;
private String name;
private String ip;
private int ipCount = 1;
// 必须提供无参数构造方法
public Order() {
}
public Order(Long byrId, String name, String ip) {
this.byrId = byrId;
this.name = name;
this.ip = ip;
}
// 省略setter、getter方法...
@Override
public String toString() {
return new StringJoiner(", ", Order.class.getSimpleName() + "[", "]")
.add("byrId=" + byrId)
.add("name='" + name + "'")
.add("ip='" + ip + "'")
.add("ipCount=" + ipCount)
.toString();
}
}
}
输出结果如下:
17:08:02,922 [ ? ? ? ? ? main] INFO ?com.alibaba.security.blink.SimpleGroupBy ? ? ? ? ? ? ? ? ? ? ?- order=Order[byrId=1001, name='张三', ip='192.168.1.10', ipCount=2] 17:08:02,922 [ ? ? ? ? ? main] INFO ?com.alibaba.security.blink.SimpleGroupBy ? ? ? ? ? ? ? ? ? ? ?- order=Order[byrId=1003, name='王五', ip='192.168.1.71', ipCount=1] 17:08:02,922 [ ? ? ? ? ? main] INFO ?com.alibaba.security.blink.SimpleGroupBy ? ? ? ? ? ? ? ? ? ? ?- order=Order[byrId=1002, name='李四', ip='192.168.1.212', ipCount=1]
?groupBy(KeySelector<T, K> keyExtractor)
该方法个人感觉跟上看属性groupBy差不多,只是写起来更好看点,也是针对POJO类型数据源,其实准确说是有public类型setter、getter方法属性,例如Tuple2中f0、f1也可以用,还是用上面例子改用KeySelector如下:
// 告诉flink使用Order对象byrId值进行groupBy
List<Order> result = dataSource.groupBy(new KeySelector<Order, Long>() {
@Override
public Long getKey(Order value) throws Exception {
return value.getByrId();
}
}).reduce(new ReduceFunction<Order>() {
@Override
public Order reduce(Order value1, Order value2) throws Exception {
if (!value1.getIp().equals(value2.getIp())) {
value1.setIpCount(value1.getIpCount() + value2.getIpCount());
return value1;
}
return value1;
}
}).collect();
result.forEach(e -> LOGGER.info("order={}", e));
UnsortedGrouping
UnsortedGrouping为groupBy返回对象,为什么要说groupBy呢?因为在流式计算中groupBy是最常见的场景,如groupBy商品ID来判断哪个商品买的最多;groupBy地址来判断哪个地方地址聚集度等等。一般sql写完了group by后通常都要进行count,那flink在flink中怎么做呢?flink最终聚合计算调的方法都在这个UnsortedGrouping类中,count在这里为reduce操作,reduce计算逻辑封装在ReduceFunction中。如上面统计所有订单相同买家IP个数,在reduce中针对不同IP做了+1操作,在reduce执行完后,拿到的那个Order对象里ipCount就是最终累加后的总IP个数。当然这个UnsortedGrouping里还有很多有用方法,如maxBy、minBy、sum,这里写个demo演示一下:
/**
* @author shaoxian.ssx
* @date 2021/11/7
*/
public class SimpleGroupBy {
private final static Logger LOGGER = LoggerFactory.getLogger(SimpleGroupBy.class);
public static void main(String[] args) throws Exception {
LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
Order order1 = new Order(1001L, "张三", "192.168.1.10", 30d);
Order order2 = new Order(1002L, "李四", "192.168.1.212", 27d);
Order order3 = new Order(1001L, "张三", "192.168.1.50", 100d);
Order order4 = new Order(1003L, "王五", "192.168.1.71", 30d);
DataSource<Order> dataSource = env.fromElements(order1, order2, order3, order4);
// 先使用map将Order转化为Tuple类型,然后再按照买家ID进行groupBy,最后筛选出每组中金额最大的一笔订单并输出
List<Tuple2<Double, Order>> result = dataSource.map(new MapFunction<Order, Tuple2<Double, Order>>() {
@Override
public Tuple2<Double, Order> map(Order value) throws Exception {
return new Tuple2<>(value.getTotal(), value);
}
}).groupBy("f1.byrId").maxBy(0).collect();
result.forEach(e -> LOGGER.info("order={}", e));
}
/**
* 必须为public类型,否则flink校验类型会报错
*/
public static class Order {
private Long byrId;
private String name;
private String ip;
// 订单总金额
private double total;
private int ipCount = 1;
// 必须提供无参数构造方法
public Order() {
}
public Order(Long byrId, String name, String ip, double total) {
this.byrId = byrId;
this.name = name;
this.ip = ip;
this.total = total;
}
// 省略setter、getter方法...
public void setTotal(double total) {
this.total = total;
}
@Override
public String toString() {
return new StringJoiner(", ", Order.class.getSimpleName() + "[", "]")
.add("byrId=" + byrId)
.add("name='" + name + "'")
.add("ip='" + ip + "'")
.add("ipCount=" + ipCount)
.add("double=" + total)
.toString();
}
}
}
总结
今天学习的这些Demo及API也已入门flink,后续需要持续投入并带来更多API调用探索及flink底层原理解析。
|