1. 实时计算应用场景
1.1 智能推荐
什么是智能推荐? 定义: 根据用户行为习惯所提供的数据, 系统提供策略模型,自动推荐符合用户行为的信息。 例举: 比如根据用户对商品的点击数据(时间周期,点击频次), 推荐类似的商品; 根据用户的评价与满意度, 推荐合适的品牌; 根据用户的使用习惯与点击行为,推荐类似的资讯。 应用案例:
1.2 实时数仓
什么是实时数仓 数据仓库(Data Warehouse),可简写为DW或DWH,是一个庞大的数据存储集合,通过对各种业务数 据进行筛选与整合,生成企业的分析性报告和各类报表,为企业的决策提供支持。实时仓库是基于 Storm/Spark(Streaming)/Flink等实时处理框架,构建的具备实时性特征的数据仓库。
应用案例 分析物流数据, 提升物流处理效率。 阿里巴巴菜鸟网络实时数仓设计: 数仓分层处理架构(流式ETL): ODS -> DWD -> DWS -> ADS ODS(Operation Data Store):操作数据层, 一般为原始采集数据。 DWD(Data Warehouse Detail) :明细数据层, 对数据经过清洗,也称为DWI。 DWS(Data Warehouse Service):汇总数据层,基于DWD层数据, 整合汇总成分析某一个主题域的服 务数据,一般是宽表, 由多个属性关联在一起的表, 比如用户行为日志信息:点赞、评论、收藏等。 ADS(Application Data Store): 应用数据层, 将结果同步至RDS数据库中, 一般做报表呈现使用。
1.3 大数据分析应用
- IoT数据分析
- 什么是IoT
物联网是新一代信息技术,也是未来发展的趋势,英文全称为: Internet of things(IOT),顾名 思义, 物联网就是万物相联。物联网通过智能感知、识别技术与普适计算等通信感知技术,广泛 应用于网络的融合中,也因此被称为继计算机、互联网之后世界信息产业发展的第三次浪潮。 - 应用案例
物联网设备运营分析: 华为Iot数据分析平台架构:
- 智慧城市
城市中汽车越来越多, 川流不息,高德地图等APP通过技术手段采集了越来越多的摄像头、车流 的数据。 但道路却越来越拥堵,越来越多的城市开始通过大数据技术, 对城市实行智能化管理。 2018年, 杭州采用AI智慧城市,平均通行速度提高15%,监控摄像头日报警次数高达500次,识 别准确率超过92%,AI智慧城市通报占全体95%以上,在中国城市交通堵塞排行榜, 杭州从中国 第5名降至57名。 - 金融风控
风险是金融机构业务固有特性,与金融机构相伴而生。金融机构盈利的来源就是承担风险的风险溢 价。 金融机构中常见的六种风险:市场风险、信用风险、流动性风险、操作风险、声誉风险及法律风 险。其中最主要的是市场风险和信用风险。 线上信贷流程,通过后台大数据系统进行反欺诈和信用评估: - 电商行业
用户在电商的购物网站数据通过实时大数据分析之后, 通过大屏汇总展示, 比如天猫的双11购物 活动,通过大屏, 将全国上亿买家的订单数据可视化,实时性的动态展示,包含总览数据,流式 TopN数据,多维区域统计数据等,极大的增强了对海量数据的可读性。 TopN排行:
2 Flink快速入门
大数据Flink概述 大数据Flink入门案例
3. Flink接入体系
3.1 Flink Connectors
Flink 连接器包含数据源输入与汇聚输出两部分。Flink自身内置了一些基础的连接器,数据源输入包含文件、目录、Socket以及 支持从collections 和 iterators 中读取数据;汇聚输出支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。 官方地址 Flink还可以支持扩展的连接器,能够与第三方系统进行交互。目前支持以下系统:
Flink还可以支持扩展的连接器,能够与第三方系统进行交互。目前支持以下系统:
常用的是Kafka、ES、HDFS以及JDBC。
3.2 JDBC(读/写)
Flink Connectors JDBC 如何使用? 功能: 将集合数据写入数据库中
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.2</version>
</dependency>
代码:
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
public class JDBCConnectorApplication {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
List<String> list = Arrays.asList(
"192.168.116.141\t1601297294548\tPOST\taddOrder",
"192.168.116.142\t1601297294549\tGET\tgetOrder"
);
env.fromCollection(list).addSink(JdbcSink.sink(
"insert into t_access_log(ip, time, type, api) values(?, ?, ?, ?)",
new JdbcStatementBuilder<String>() {
@Override
public void accept(PreparedStatement preparedStatement,
String s) throws SQLException {
System.out.println("receive ====> " + s);
String[] elements = String.valueOf(s).split("\t");
for (int i = 0; i < elements.length; i++) {
preparedStatement.setString(i+1, elements[i]);
}
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://192.168.116.141:3306/flink?useSSL=false")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
));
env.execute("jdbc-job");
}
}
数据表:
DROP TABLE IF EXISTS `t_access_log`;
CREATE TABLE `t_access_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`ip` varchar(32) NOT NULL COMMENT 'IP地址',
`time` varchar(255) NULL DEFAULT NULL COMMENT '访问时间',
`type` varchar(32) NOT NULL COMMENT '请求类型',
`api` varchar(32) NOT NULL COMMENT 'API地址',
PRIMARY KEY (`id`)
) ENGINE = InnoDB AUTO_INCREMENT=1;
自定义写入数据源 功能:读取Socket数据, 采用流方式写入数据库中。 代码:
public class CustomSinkApplication {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream =
env.socketTextStream("192.168.116.141", 9911, "\n");
SingleOutputStreamOperator<AccessLog> outputStreamOperator =
socketTextStream.map(new MapFunction<String, AccessLog>() {
@Override
public AccessLog map(String s) throws Exception {
System.out.println(s);
String[] elements = s.split("\t");
AccessLog accessLog = new AccessLog();
accessLog.setNum(1);
for (int i = 0; i < elements.length; i++) {
if (i == 0) accessLog.setIp(elements[i]);
if (i == 1) accessLog.setTime(elements[i]);
if (i == 2) accessLog.setType(elements[i]);
if (i == 3) accessLog.setApi(elements[i]);
}
return accessLog;
}
});
outputStreamOperator.addSink(new MySQLSinkFunction());
env.execute("custom jdbc sink");
}
自定义数据源
private static class MySQLSinkFunction extends RichSinkFunction<AccessLog>{
private Connection connection;
private PreparedStatement preparedStatement;
@Override
public void open(Configuration parameters) throws Exception {
String url="jdbc:mysql://192.168.11.14:3306/flik?useSSL=fales";
String username="admin";
String password="admin";
connection= DriverManager.getConnection(url,username,password);
String sql="insert into xxx_log(ip,time,type,api) valuse(?,?,?,?)"
preparedStatement=connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
try {
if (null==connection)connection.close();
connection=null;
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void invoke(AccessLog accessLog, Context context) throws Exception {
preparedStatement.setString(1,accessLog.getIp());
preparedStatement.setString(2,accessLog.getTime());
preparedStatement.setString(3,accessLog.getType());
preparedStatement.setString(4,accessLog.getApi());
preparedStatement.execute();
}
}
AccessLog:
@Data
public class AccessLog {
private String ip;
private String time;
private String type;
private String api;
private Integer num;
}
测试数据:注意 \t
192.168.116.141 1603166893313 GET getOrder
192.168.116.142 1603166893314 POST addOrder
自定义读取数据源 功能: 读取数据库中的数据, 并将结果打印出来。 代码:
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<AccessLog> streamSource = env.addSource(new
MySQLSourceFunction());
streamSource.print().setParallelism(1);
env.execute("custom jdbc source");
}
3.3 HDFS(读/写)
通过Sink写入HDFS数据 功能: 将Socket接收到的数据, 写入至HDFS文件中。
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.1</version>
</dependency>
代码:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
public class HDFSSinkApplication {
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("127.0.0.1", 9911, "\n");
DataStreamSource<String> socketTextStream =
env.socketTextStream("192.168.116.141", 9911, "\n");
BucketingSink<String> bucketingSink = new BucketingSink<>("F:/oldlu/Flink/hdfs");
bucketingSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm"));
bucketingSink.setWriter(new StringWriter())
.setBatchSize(5 * 1024)
.setBatchRolloverInterval(5 * 1000)
.setInactiveBucketCheckInterval(30 * 1000)
.setInactiveBucketThreshold(60 * 1000);
socketTextStream.addSink(bucketingSink).setParallelism(1);
env.execute("flink hdfs source");
}
}
数据源模拟实现:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
代码:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public static List<Channel> channelList = new ArrayList<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Server >>>> 连接已建立:" + ctx);
super.channelActive(ctx);
channelList.add(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
System.out.println("Server >>>> 收到的消息:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println("Server >>>> 读取数据出现异常");
cause.printStackTrace();
ctx.close();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
super.channelUnregistered(ctx);
channelList.remove(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
channelList.remove(ctx.channel());
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.Random;
public class SocketSourceApplication {
private int port;
public SocketSourceApplication(int port) {
this.port = port;
}
private static String[] accessIps = new String[]{
"192.168.116.141",
"192.168.116.142",
"192.168.116.143"
};
private static String[] accessTypes = new String[]{
"GET",
"POST",
"PUT"
};
private static String[] accessApis = new String[]{
"addOrder",
"getAccount",
"getOrder"
};
public void runServer() throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decode", new StringDecoder());
pipeline.addLast("encode", new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println(">>>>>server 启动<<<<<<<");
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
String accessLog = getAccessLog();
System.out.println("broadcast (" +
NettyServerHandler.channelList.size() + ") ==> " + accessLog);
if (NettyServerHandler.channelList.size() > 0) {
for (Channel channel :
NettyServerHandler.channelList) {
channel.writeAndFlush(accessLog);
}
}
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
private String getAccessLog() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(accessIps[new
Random().nextInt(accessIps.length)]).append("\t")
.append(System.currentTimeMillis()).append("\t")
.append(accessTypes[new
Random().nextInt(accessTypes.length)]).append("\t")
.append(accessApis[new
Random().nextInt(accessApis.length)]).append("\t\n");
return stringBuilder.toString();
}
public static void main(String[] args) throws Exception {
new SocketSourceApplication(9911).runServer();
}
}
读取HDFS文件数据
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class HDFSSourceApplication {
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> file =
env.readTextFile("hdfs://192.168.116.141:9090/hadoop-env.sh");
file.print().setParallelism(1);
env.execute("flink hdfs source");
}
}
Hadoop环境安装
- 配置免密码登录
生成秘钥:
[root@flink1 hadoop-2.6.0-cdh5.15.2]
Generating public/private rsa key pair.
将秘钥写入认证文件:
[root@flink1 .ssh]
修改认证文件权限:
[root@flink1 .ssh]
- 配置环境变量
将Hadoop安装包解压, 将Hadoop加入环境变量/etc/profile:
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.15.2
export PATH=$HADOOP_HOME/bin:$PATH
执行生效:
source /etc/profile
- 修改Hadoop配置文件
1) 修改hadoop-env.sh文件
vi /opt/hadoop-2.6.0-cdh5.15.2/etc/hadoop/hadoop-env.sh
修改JAVA_HOME:
export JAVA_HOME=/opt/jdk1.8.0_301
2)修改core-site.xml文件
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://flink:9090</value>
</property>
</configuration>
这里的主机名称是flink。 3)修改hdfs-site.xml文件
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop-2.6.0-cdh5.15.2/tmp</value>
</property>
</configuration>
4)修改mapred-site.xml文件
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
5)修改slaves文件
flink
这里配置的是单节点, 指向本机主机名称。 6)修改yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
- 启动Hadoop服务
[root@flink hadoop-2.6.0-cdh5.15.2]
This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh
21/08/23 11:59:17 WARN util.NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes where
applicable
Starting namenodes on [flink]
flink: starting namenode, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/hadoop-root-namenode-flink.out
flink: starting datanode, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/hadoop-root-datanode-flink.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/hadoop-root-secondarynamenode-flink.out
21/08/23 11:59:45 WARN util.NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes where
applicable
starting yarn daemons
starting resourcemanager, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/yarn-root-resourcemanager-flink.out
flink: starting nodemanager, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/yarn-root-nodemanager-flink.out
上传一个文件, 用于测试:
hdfs dfs -put /opt/hadoop-2.6.0-cdh5.15.2/etc/hadoop/hadoop-env.sh /
如果上传失败 1)可能是namenode没有启动,则执行如下命令
hadoop namenode -format
2)检查/etc/hosts文件配置
[root@flink hadoop-2.6.0-cdh5.15.2]
127.0.0.1 localhost localhost.localdomain localhost4
localhost4.localdomain4
::1 localhost localhost.localdomain localhost6
localhost6.localdomain6
192.168.116.141 flink
192.168.116.141 localhost
- 访问验证
3.4 ES(写)
ES服务安装
- 到官网下载地址下载6.8.1版本的gz压缩包, 不要下载最新版本, Spring Boot等项目可能未及时更新支持。
- 解压安装包
tar -xvf elasticsearch-6.8.1-linux-x86_64.tar.gz
- ElasticSearch不能以Root身份运行, 需要单独创建一个用户
1. groupadd elsearch
2. useradd elsearch -g elsearch -p elasticsearch
3. chown -R elsearch:elsearch /opt/elasticsearch-6.8.1
执行以上命令,创建一个名为elsearch用户, 并赋予目录权限。 \4. 修改配置文件 vi config/elasticsearch.yml, 只需修改以下设置:
cluster.name: my-application
node.name: node-1
path.data: /opt/elasticsearch-6.8.1/data
path.logs: /opt/elasticsearch-6.8.1/logs
network.host: 192.168.116.141
http.port: 9200
transport.tcp.port: 9300
- 指定JDK版本
最新版的ElasticSearch需要JDK11版本, 下载JDK11压缩包, 并进行解压。
修改环境配置文件 vi bin/elasticsearch-env 参照以下位置, 追加一行, 设置JAVA_HOME, 指定JDK11路径。
JAVA_HOME=/opt/jdk11
if [ ! -z "$JAVA_HOME" ]; then
JAVA="$JAVA_HOME/bin/java"
else
if [ "$(uname -s)" = "Darwin" ]; then
JAVA="$ES_HOME/jdk/Contents/Home/bin/java"
else
JAVA="$ES_HOME/jdk/bin/java"
fi
fi
JDK9版本以后不建议使用ConcMarkSweepGC, 如果不想出现提示, 可以将其关闭 vi config/jvm.options 将UseConcMarkSweepGC注释:
...
...
su elsearch
bin/elasticsearch -d 7. 问题处理 出现max virtual memory areas vm.max_map_count [65530] is too low, increase to at least 错误信息 修改系统配置:
vi /etc/sysctl.conf
添加
vm.max_map_count=655360
执行生效
sysctl -p
vi /etc/security/limits.conf
在文件末尾添加
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096
elsearch soft nproc 125535
elsearch hard nproc 125535
重新切换用户即可:
su - elsearch
FLINK ES写入功能实现 功能: 将Socket流数据, 写入至ES服务。 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.6.0</version>
</dependency>
代码:
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import java.util.ArrayList;
import java.util.HashMap;
public class ElasticSinkApplication {
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream =
env.socketTextStream("localhost", 9911, "\n");
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("192.168.116.141", 9200, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new
ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
@Override
public void process(String s, RuntimeContext
runtimeContext, RequestIndexer requestIndexer) {
requestIndexer.add(createIndexRequest(s));
}
private IndexRequest createIndexRequest(String s) {
HashMap<String, String> map = new HashMap<>();
String[] elements = String.valueOf(s).split("\t");
for (int i = 0; i < elements.length; i++) {
if (i == 0) map.put("ip", elements[i]);
if (i == 1) map.put("time", elements[i]);
if (i == 2) map.put("type", elements[i]);
if (i == 3) map.put("api", elements[i]);
}
return Requests.indexRequest()
.index("flink-es")
.type("access-log")
.source(map);
}
});
esSinkBuilder.setBulkFlushMaxActions(1);
esSinkBuilder.setRestClientFactory(new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder
restClientBuilder) {
restClientBuilder.setMaxRetryTimeoutMillis(5000);
}
});
socketTextStream.addSink(esSinkBuilder.build());
socketTextStream.print().setParallelism(1);
env.execute("flink es sink");
}
}
查看index信息: http://192.168.116.141:9200/_cat/indices?v 查看具体数据: http://192.168.116.141:9200/flink-es/_search
3.5 KAFKA(读/写)
Kafka安装
- 下载Kafka_2.12-1.1.1安装包
- 将安装包解压
tar -xvf kafka_2.12-1.1.1.tgz
- 修改kafka配置
只修改绑定IP, 因为是单节点, 其他按默认配置来。
[root@flink kafka_2.12-1.1.1]
listeners=PLAINTEXT://192.168.116.141:9092
advertised.listeners=PLAINTEXT://192.168.116.141:9092
如有多个IP地址, 绑定为对外访问的IP。 4. 启动zookeeper服务 kafka安装包内置了zookeeper,可以直接启动。
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
- 启动kafka服务
bin/kafka-server-start.sh -daemon config/server.properties
Flink Kafka 读取功能 功能: 通过flink读取kafka消息队列数据, 并打印显示。 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.2</version>
</dependency>
代码:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaSourceApplication {
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.116.141:9092");
properties.setProperty("group.id", "flink_group");
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(
"flink-source",
new SimpleStringSchema(),
properties
);
DataStreamSource dataStreamSource =
env.addSource(flinkKafkaConsumer);
dataStreamSource.print().setParallelism(1);
env.execute("Flink kafka source");
}
}
通过kafka生产者命令测试验证
[root@flink kafka_2.12-1.1.1]
扩展点:kafka消息的消费处理策略:
Flink Kafka 写入功能 功能: 将Socket的流数据,通过flink 写入kafka 消息队列。 代码:
public class kafkaSinkApplication {
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream =
env.socketTextStream("localhost", 9911, "\t");
FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer(
"192.168.116.141:9092",
"flink-topic",
new SimpleStringSchema()
);
socketTextStream.addSink(flinkKafkaProducer);
socketTextStream.print().setParallelism(1);
env.execute("flink kafka sink");
}
}
通过kafka消费者命令测试验证:
[root@flink kafka_2.12-1.1.1]
控制消息的发送处理模式:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.116.141:9092");
FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer(
"flink-topic",
new KeyedSerializationSchemaWrapper(new
SimpleStringSchema()),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
提供了三种消息处理模式:
- Semantic.NONE :Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。
- Semantic.AT_LEAST_ONCE (默认设置):类似 FlinkKafkaProducer010
版本中的setFlushOnCheckpoint(true) ,这可以保证不会丢失任何记录(虽然记录可能会重复)。 - Semantic.EXACTLY_ONCE :使用 Kafka 事务提供精准一次的语义。无论何时,在使用事务写入 Kafka时,都要记得为所有消费 Kafka 消息的应用程序设置所需的 isolation.level ( read_committed 或 read_uncommitted - 后者是默认值)。
- Kafka 的消息可以携带时间戳,指示事件发生的时间或消息写入 Kafka broker 的时间。
kafkaProducer.setWriteTimestampToKafka(true);
3.6 自定义序列化(Protobuf)
在实际应用场景中, 会存在各种复杂传输对象,同时要求较高的传输处理性能, 这就需要采用自定义的序列化方式做相应实现, 这里以Protobuf为例做讲解。 功能: kafka对同一Topic的生产与消费,采用Protobuf做序列化与反序列化传输, 验证能否正常解析数据。
- 通过protobuf脚本生成JAVA文件
在syntax = "proto3";
option java_package = "cn.flink.connector.kafka.proto";
option java_outer_classname = "AccessLogProto";
message AccessLog {
string ip = 1;
string time = 2;
string type = 3;
string api = 4;
string num = 5;
}
通过批处理脚本,生成JAVA文件:
@echo off
for %%i in (proto/*.proto) do (
F:/oldlu/Flink/tar/protoc.exe --proto_path=./proto --java_out=../java
./proto/%%i
echo generate %%i to java file successfully!
)
1234567
注意, 路径要配置正确。 \2. 自定义序列化实现 添加POM依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
</dependencies>
AccessLog对象:
import lombok.Data;
import java.io.Serializable;
@Data
public class AccessLog implements Serializable {
private String ip;
private String time;
private String type;
private String api;
private Integer num;
}
序列话好之后会根据AccessLog对象得到一个序列号的文件 CustomSerialSchema:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.springframework.beans.BeanUtils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import static org.apache.flink.util.Preconditions.checkNotNull;
public class CustomSerialSchema implements DeserializationSchema<AccessLog>,
SerializationSchema<AccessLog> {
private static final long serialVersionUID = -7319637733955723488L;
private transient Charset charset;
public CustomSerialSchema() {
this(StandardCharsets.UTF_8);
}
public CustomSerialSchema(Charset charset) {
this.charset = checkNotNull(charset);
}
public Charset getCharset() {
return charset;
}
@Override
public AccessLog deserialize(byte[] bytes) throws IOException {
AccessLog accessLog = null;
try {
AccessLogProto.AccessLog accessLogProto =
AccessLogProto.AccessLog.parseFrom(bytes);
accessLog = new AccessLog();
BeanUtils.copyProperties(accessLogProto, accessLog);
return accessLog;
} catch (Exception e) {
e.printStackTrace();
}
return accessLog;
}
@Override
public boolean isEndOfStream(AccessLog accessLog) {
return false;
}
@Override
public byte[] serialize(AccessLog accessLog) {
AccessLogProto.AccessLog.Builder builder =
AccessLogProto.AccessLog.newBuilder();
BeanUtils.copyProperties(accessLog, builder);
return builder.build().toByteArray();
}
@Override
public TypeInformation<AccessLog> getProducedType() {
return TypeInformation.of(AccessLog.class);
}
}
3. 通过flink对kafka消息生产者的实现
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream =
env.socketTextStream("localhost", 9911, "\n");
SingleOutputStreamOperator<AccessLog> outputStreamOperator =
socketTextStream.map(new MapFunction<String, AccessLog>() {
@Override
public AccessLog map(String value) throws Exception {
System.out.println(value);
String[] arrValue = value.split("\t");
AccessLog accessLog = new AccessLog();
accessLog.setNum(1);
for (int i = 0; i < arrValue.length; i++) {
if (i == 0) accessLog.setIp(arrValue[i]);
if (i == 1) accessLog.setTime(arrValue[i]);
if (i == 2) accessLog.setType(arrValue[i]);
if (i == 3) accessLog.setApi(arrValue[i]);
}
return accessLog;
}
});
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.116.141:9092");
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
"192.168.116.141:9092",
"flink-serial",
new CustomSerialSchema()
);
outputStreamOperator.addSink(kafkaProducer);
socketTextStream.print().setParallelism(1);
env.execute("flink kafka protobuf sink");
}
}
开启Kafka消费者命令行终端,验证生产者的可用性:
[root@flink1 kafka_2.12-1.1.1]
server 192.168.116.141:9092 --topic flink-serial
1601649380422GET"
getAccount
1601649381422POSTaddOrder
1601649382422POST"
- 通过flink对kafka消息订阅者的实现
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.116.141:9092");
properties.setProperty("group.id", "flink_group");
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(
"flink-serial",
new CustomSerialSchema(),
properties);
DataStreamSource<AccessLog> dataStreamSource =
env.addSource(flinkKafkaConsumer);
dataStreamSource.print().setParallelism(1);
env.execute("flink kafka protobuf source");
}
}
通过flink的kafka生产者消息的发送, 对消费者的功能做测试验证。
4 Flink大屏数据实战
4.1 双十一大屏数据
总销售量/总销售金额 TopN: 热销商品/商品类目/商品PV/商品UV
不同区域销售排名 不同分类销售排名
4.2 Canal同步服务安装
- 下载安装包
安装包 后台管理包 - 解压
解压安装包:
mkdir -p /opt/canal
tar -xzvf canal.deployer-1.1.4.tar.gz -C /opt/canal/
解压管理包:
mkdir -p /opt/canal-admin
tar -xvf canal.admin-1.1.4.tar.gz -C /opt/canal-admin
- 初始化管理数据库
导入初始化数据脚本:
mysql -uroot -p123456 < /opt/canal-admin/conf/canal_manager.sql
- 修改MySQL服务同步配置
编辑配置文件:
vi /etc/my.cnf
增加同步配置:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
重启服务:
systemctl restart mariadb
检查同步功能是否开启
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)
创建同步用户:
mysql> FLUSH PRIVILEGES;
mysql> CREATE USER canal IDENTIFIED BY 'canal';
赋予同步所需权限:
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO
'canal'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)
- 修改后台管理配置文件
vi /opt/canal-admin/conf/application.yml
配置内容:
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 192.168.116.141:3306
database: canal_manager
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
url:
jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?
useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
先启动后台管理服务, 再启动Canal服务, 后台管理服务启动命令:
/opt/canal-admin/bin/startup.sh
访问:http://192.168.116.141:8089/ 登录: admin/123456 \6. Canal服务配置
vi /opt/canal/conf/canal_local.properties
配置内容:
canal.register.ip = 192.168.116.141
canal.admin.manager = 192.168.116.141:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.admin.register.auto = true
canal.admin.register.cluster =
启动Canal服务:
/opt/canal/bin/startup.sh local
- 后台管理配置
修改Server管理配置:
canal.zkServers = 192.168.116.141:2181
canal.serverMode = kafka
canal.mq.servers = 192.168.116.141:9092
修改Instance配置(如果没有, 则新建,载入模板即可):
canal.instance.mysql.slaveId=121
canal.instance.master.address=192.168.116.141:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=order_binlog
canal.instance.filter.regex=flink.t_order
regex同步配置规则: 常见例子:
- 所有表:.* or …
- canal schema下所有表: canal…*
- canal下的以canal打头的表:canal.canal.*
- canal schema下的一张表:canal.test1
- 多个规则组合使用:canal…*,mysql.test1,mysql.test2 (逗号分隔)
4.3 热销商品统计
功能实现流程:
- 订单数据源的实现
- flink代码功能实现
- Flink 与 Spring Boot的集成
- 测试验证,比对SQL:
select goodsId, sum(execPrice * execVolume) as totalAmount from t_order
where execTime < 时间窗口的结束时间戳 group by goodsId order by totalAmount
desc
- 数据呈现
消费者
[root@flink kafka_2.12-1.1.1]
192.168.116.141:9092 --topic order_binlog
[root@flink kafka_2.12-1.1.1]
192.168.116.141:9092 --topic orderAddress_binlog
[root@flink kafka_2.12-1.1.1]
192.168.116.141:9092 --topic orderPayment_binlog
删除Kafka主题
[root@flink kafka_2.12-1.1.1]
delete.topic.enable=true
[root@flink kafka_2.12-1.1.1]
order_binlog --zookeeper 192.168.116.141:2181
[root@flink kafka_2.12-1.1.1]
orderAddress_binlog --zookeeper 192.168.116.141:2181
[root@flink kafka_2.12-1.1.1]
orderPayment_binlog --zookeeper 192.168.116.141:2181
import cn.oldlu.flink.screen.database.bo.HotOrder;
import cn.oldlu.flink.screen.database.bo.Order;
import cn.oldlu.flink.screen.database.json.GsonConvertUtil;
import cn.oldlu.flink.screen.database.repository.HotOrderRepository;
import cn.oldlu.flink.screen.database.spring.ApplicationContextUtil;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.springframework.boot.Banner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import java.util.*;
@SpringBootApplication
@ComponentScan(basePackages = {"cn.oldlu"})
@EnableTransactionManagement
public class ScreenDatabaseApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(ScreenDatabaseApplication.class);
application.setBannerMode(Banner.Mode.OFF);
application.run(args);
}
@Override
public void run(String... args) throws Exception {
executeFlinkTask();
}
private void executeFlinkTask() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.116.141:9092");
props.setProperty("group.id", "flink_group");
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>(
"order_binlog",
new SimpleStringSchema(),
props
);
flinkKafkaConsumer.setStartFromEarliest();
DataStreamSource<String> orderDataStreamSource = env.addSource(flinkKafkaConsumer);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
orderDataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String orderStr) throws Exception {
JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(orderStr);
boolean isDdl = jsonObject.get("isDdl").getAsBoolean();
String type = jsonObject.get("type").getAsString();
return !isDdl && "insert".equalsIgnoreCase(type);
}
})
.flatMap(new FlatMapFunction<String, Order>() {
@Override
public void flatMap(String orderKafkaStr, Collector<Order> collector) throws Exception {
JsonArray data = GsonConvertUtil.getSingleton().getJsonObject(orderKafkaStr).getAsJsonArray("data");
for (int i = 0; i < data.size(); i++) {
JsonObject asJsonObject = data.get(i).getAsJsonObject();
Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(asJsonObject, Order.class);
System.out.println("order >> " + order);
collector.collect(order);
}
}
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) {
@Override
public long extractTimestamp(Order order) {
return order.getExecTime();
}
})
.keyBy(Order::getGoodsId)
.timeWindow(Time.hours(24), Time.seconds(3))
.aggregate(
new AggregateFunction<Order, Order, Order>() {
@Override
public Order createAccumulator() {
Order order = new Order();
order.setTotalAmount(0L);
return order;
}
@Override
public Order add(Order order, Order order2) {
order2.setTotalAmount(
order2.getTotalAmount() + order.getExecPrice() * order.getExecVolume());
order2.setGoodsId(order.getGoodsId());
return order2;
}
@Override
public Order getResult(Order order) {
return order;
}
@Override
public Order merge(Order order, Order acc1) {
return null;
}
},
new WindowFunction<Order, HotOrder, Long, TimeWindow>() {
@Override
public void apply(Long goodsId, TimeWindow timeWindow, Iterable<Order> iterable, Collector<HotOrder> collector) throws Exception {
Order order = iterable.iterator().next();
collector.collect(new HotOrder(goodsId, order.getGoodsName(), order.getTotalAmount(), timeWindow.getEnd()));
}
})
.keyBy(HotOrder::getTimeWindow)
.process(new KeyedProcessFunction<Long, HotOrder, String>() {
private ListState<HotOrder> hotOrderListState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
hotOrderListState = getRuntimeContext().getListState(
new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class)
);
}
@Override
public void processElement(HotOrder hotOrder, Context context, Collector<String> collector) throws Exception {
hotOrderListState.add(hotOrder);
context.timerService().registerEventTimeTimer(hotOrder.getTimeWindow());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<HotOrder> hotOrders = new ArrayList<>();
hotOrderListState.get().forEach(hotOrder -> hotOrders.add(hotOrder));
hotOrders.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed());
hotOrderListState.clear();
HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository");
hotOrders.forEach(hotOrder -> {
hotOrder.setId(hotOrder.getGoodsId());
hotOrder.setCreateDate(new Date(hotOrder.getTimeWindow()));
hotOrderRepository.save(hotOrder);
System.out.println("ES hotOrder" + hotOrder);
});
}
});
env.execute("es hotOrder");
}
}
kibana服务安装 Kibana是一个针对Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索 引中的数据。 \6. 到官网下载, Kibana安装包, 与之对应6.8.1版本, 选择Linux 64位版本下载,并进行解压。 \7. Kibana启动不能使用root用户, 使用上面创建的elsearch用户, 进行赋权
chown -R elsearch:elsearch kibana-6.8.1-linux-x86_64
- 修改配置文件
vi config/kibana.yml , 修改以下配置:
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.116.141:9200""]
- 启动kibana
./kibana -q
看到以下日志, 代表启动正常
log [01:40:00.143] [info][listening] Server running at http://0.0.0.0:5601
如果出现启动失败的情况, 要检查集群各节点的日志, 确保服务正常运行状态
4.4 区域分类统计
- 增加订单地址信息数据源
- 创建对应的表与实体
实体: OrderAddress BO: JoinOrderAddress(订单数据与地址数据的合并对象) BO: HotDimensionOrder(ES存储的映射对象), 注意这里的ID唯一性, 如果是按省份统计, ID存储省份信息,如果是按地级市统计, ID则存储为市区信息。 - 改造订单数据源, 增加缓存写入, 地址信息数据源增加缓存的读取。
- 修改Canal的后台配置, 增加地址数据源的监听队列。
- 区域双流统计的核心代码实现:
1)增加双流的kafka配置, 每个流监听不同的数据队列。 2)每个流要加上时间水印, 设定时间窗, 设定值比后面聚合的时间窗稍小一些。 3)根据订单ID做join匹配。 4) 根据区域做汇总统计(省份、城市)。 5) 将数据写入至ES。 - 测试验证
验证SQL:
select province, goodsId, sum(execPrice * execVolume) totalAmount from
t_order odr left join t_order_address adr on odr.id = adr.orderId where
odr.execTime < 时间窗结束时间
group by province, goodsId order by province, totalAmount desc
import cn.oldlu.flink.screen.database.bo.HotDimensionOrder;
import cn.oldlu.flink.screen.database.bo.HotOrder;
import cn.oldlu.flink.screen.database.bo.JoinOrderAddress;
import cn.oldlu.flink.screen.database.bo.Order;
import cn.oldlu.flink.screen.database.json.GsonConvertUtil;
import cn.oldlu.flink.screen.database.pojo.OrderAddress;
import cn.oldlu.flink.screen.database.repository.HotDimensionRepository;
import cn.oldlu.flink.screen.database.repository.HotOrderRepository;
import cn.oldlu.flink.screen.database.spring.ApplicationContextUtil;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.springframework.boot.Banner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import java.util.*;
@SpringBootApplication
@ComponentScan(basePackages = {"cn.oldlu"})
@EnableTransactionManagement
public class ScreenDimensionApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(ScreenDimensionApplication.class);
application.setBannerMode(Banner.Mode.OFF);
application.run(args);
}
@Override
public void run(String... args) throws Exception {
executeFlinkTask();
}
private void executeFlinkTask() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.116.141:9092");
props.setProperty("group.id", "flink_group");
FlinkKafkaConsumer<String> orderKafkaConsumer = new FlinkKafkaConsumer<String>(
"order_binlog",
new SimpleStringSchema(),
props
);
orderKafkaConsumer.setStartFromEarliest();
DataStreamSource<String> orderDataStreamSource = env.addSource(orderKafkaConsumer);
FlinkKafkaConsumer<String> addressKafkaConsumer = new FlinkKafkaConsumer<String>(
"orderAddress_binlog",
new SimpleStringSchema(),
props
);
addressKafkaConsumer.setStartFromEarliest();
DataStreamSource<String> addressDataStreamSource = env.addSource(addressKafkaConsumer);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Order> orderOperator = orderDataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String orderKafkaStr) throws Exception {
JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(orderKafkaStr);
String isDdl = jsonObject.get("isDdl").getAsString();
String type = jsonObject.get("type").getAsString();
return "false".equalsIgnoreCase(isDdl) && "insert".equalsIgnoreCase(type);
}
})
.flatMap(new FlatMapFunction<String, Order>() {
@Override
public void flatMap(String orderKafkaStr, Collector<Order> collector) throws Exception {
JsonArray data = GsonConvertUtil.getSingleton().getJsonObject(orderKafkaStr).getAsJsonArray("data");
for (int i = 0; i < data.size(); i++) {
JsonObject asJsonObject = data.get(i).getAsJsonObject();
Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(asJsonObject, Order.class);
System.out.println("order >> " + order);
collector.collect(order);
}
}
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) {
@Override
public long extractTimestamp(Order order) {
return order.getExecTime();
}
});
SingleOutputStreamOperator<OrderAddress> addressOperator = addressDataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String orderKafkaStr) throws Exception {
JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(orderKafkaStr);
String isDdl = jsonObject.get("isDdl").getAsString();
String type = jsonObject.get("type").getAsString();
return "false".equalsIgnoreCase(isDdl) && "insert".equalsIgnoreCase(type);
}
})
.flatMap(new FlatMapFunction<String, OrderAddress>() {
@Override
public void flatMap(String orderKafkaStr, Collector<OrderAddress> collector) throws Exception {
JsonArray data = GsonConvertUtil.getSingleton().getJsonObject(orderKafkaStr).getAsJsonArray("data");
for (int i = 0; i < data.size(); i++) {
JsonObject asJsonObject = data.get(i).getAsJsonObject();
OrderAddress orderAddress = GsonConvertUtil.getSingleton().cvtJson2Obj(asJsonObject, OrderAddress.class);
System.out.println("orderAddress >> " + orderAddress);
collector.collect(orderAddress);
}
}
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderAddress>(Time.seconds(0)) {
@Override
public long extractTimestamp(OrderAddress orderAddress) {
return orderAddress.getExecTime();
}
});
orderOperator.join(addressOperator)
.where(Order::getId).equalTo(OrderAddress::getOrderId)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.apply(new JoinFunction<Order, OrderAddress, JoinOrderAddress>() {
@Override
public JoinOrderAddress join(Order order, OrderAddress orderAddress) throws Exception {
return JoinOrderAddress.build(order, orderAddress);
}
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JoinOrderAddress>(Time.seconds(0)) {
@Override
public long extractTimestamp(JoinOrderAddress joinOrderAddress) {
return joinOrderAddress.getExecTime();
}
})
.keyBy(new KeySelector<JoinOrderAddress, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> getKey(JoinOrderAddress joinOrderAddress) throws Exception {
return Tuple2.of(joinOrderAddress.getProvince(), joinOrderAddress.getGoodsId());
}
})
.timeWindow(Time.hours(24), Time.seconds(3))
.aggregate(
new AggregateFunction<JoinOrderAddress, JoinOrderAddress, JoinOrderAddress>() {
@Override
public JoinOrderAddress createAccumulator() {
JoinOrderAddress joinOrderAddress = new JoinOrderAddress();
joinOrderAddress.setTotalAmount(0L);
return joinOrderAddress;
}
@Override
public JoinOrderAddress add(JoinOrderAddress joinOrderAddress, JoinOrderAddress joinOrderAddress2) {
joinOrderAddress2.setTotalAmount(
joinOrderAddress2.getTotalAmount() + joinOrderAddress.getExecPrice() * joinOrderAddress.getExecVolume()
);
joinOrderAddress2.setProvince(joinOrderAddress.getProvince());
joinOrderAddress2.setGoodsId(joinOrderAddress.getGoodsId());
return joinOrderAddress2;
}
@Override
public JoinOrderAddress getResult(JoinOrderAddress joinOrderAddress) {
return joinOrderAddress;
}
@Override
public JoinOrderAddress merge(JoinOrderAddress joinOrderAddress, JoinOrderAddress acc1) {
return null;
}
},
new WindowFunction<JoinOrderAddress, HotDimensionOrder, Tuple2<String, Long>, TimeWindow>() {
@Override
public void apply(Tuple2<String, Long> stringLongTuple2, TimeWindow timeWindow,
Iterable<JoinOrderAddress> iterable, Collector<HotDimensionOrder> collector) throws Exception {
JoinOrderAddress joinOrderAddress = iterable.iterator().next();
collector.collect(new HotDimensionOrder(joinOrderAddress, timeWindow.getEnd()));
}
})
.keyBy(HotDimensionOrder::getTimeWindow)
.process(new KeyedProcessFunction<Long, HotDimensionOrder, String>() {
private ListState<HotDimensionOrder> hotDimensionOrderListState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
hotDimensionOrderListState = getRuntimeContext().getListState(
new ListStateDescriptor<HotDimensionOrder>("hot-dimension-order", HotDimensionOrder.class)
);
}
@Override
public void processElement(HotDimensionOrder hotDimensionOrder, Context context, Collector<String> collector) throws Exception {
hotDimensionOrderListState.add(hotDimensionOrder);
context.timerService().registerEventTimeTimer(hotDimensionOrder.getTimeWindow());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<HotDimensionOrder> hotDimensionOrders = new ArrayList<>();
hotDimensionOrderListState.get().forEach(hotDimensionOrder -> hotDimensionOrders.add(hotDimensionOrder));
hotDimensionOrders.sort(Comparator.comparing(HotDimensionOrder::getProvince)
.thenComparing(HotDimensionOrder::getGoodsId, Comparator.reverseOrder()));
hotDimensionOrderListState.clear();
HotDimensionRepository hotDimensionRepository = (HotDimensionRepository) ApplicationContextUtil.getBean("hotDimensionRepository");
hotDimensionOrders.forEach(hotDimensionOrder -> {
hotDimensionOrder.setId(hotDimensionOrder.getProvince()+hotDimensionOrder.getGoodsId());
hotDimensionOrder.setCreateDate(new Date(hotDimensionOrder.getTimeWindow()));
hotDimensionRepository.save(hotDimensionOrder);
System.out.println("es hotDimensionOrder >> " + hotDimensionOrder);
});
}
});
env.execute("es hotDimensionOrder");
}
}
4.5 订单状态监控统计(CEP)
- 增加订单支付流水数据源
- 创建对应的表与实体
实体: OrderPayment BO: JoinOrderAddress - 修改Canal的后台配置, 增加地址数据源的监听队列。
- 核心代码实现:
1)实现订单支付流水数据源的监听处理。 2)定义CEP处理规则,解析出支付成功的订单。 - 测试验证
检查订单状态是未支付 -》 已支付的数据
select * from t_order_payment pay where exists (
select 1 from t_order_payment tmp where tmp.orderId = pay.orderId and
tmp.status = 0
) and pay.status = 1
检查超时的数据: 初始状态为0, 指定时间之内没有已支付的数据。 \6. 拓展实现, 热门商品统计排行,只统计支付成功的数据。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.116.141:9092");
props.setProperty("group.id", "flink_group");
FlinkKafkaConsumer<String> orderPaymentKafkaConsumer = new FlinkKafkaConsumer<String>(
"orderPayment_binlog",
new SimpleStringSchema(),
props
);
orderPaymentKafkaConsumer.setStartFromEarliest();
DataStreamSource<String> orderPaymentDataStreamSource = env.addSource(orderPaymentKafkaConsumer);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
KeyedStream<OrderPayment, Long> orderPaymentLongKeyedStream = orderPaymentDataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String orderKafkaStr) throws Exception {
JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(orderKafkaStr);
String isDdl = jsonObject.get("isDdl").getAsString();
String type = jsonObject.get("type").getAsString();
return "false".equalsIgnoreCase(isDdl) && "insert".equalsIgnoreCase(type);
}
})
.flatMap(new FlatMapFunction<String, OrderPayment>() {
@Override
public void flatMap(String orderKafkaStr, Collector<OrderPayment> collector) throws Exception {
JsonArray data = GsonConvertUtil.getSingleton().getJsonObject(orderKafkaStr).getAsJsonArray("data");
for (int i = 0; i < data.size(); i++) {
JsonObject asJsonObject = data.get(i).getAsJsonObject();
OrderPayment orderPayment = GsonConvertUtil.getSingleton().cvtJson2Obj(asJsonObject, OrderPayment.class);
System.out.println("orderPayment >> " + orderPayment);
collector.collect(orderPayment);
}
}
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderPayment>(Time.seconds(0)) {
@Override
public long extractTimestamp(OrderPayment orderPayment) {
return orderPayment.getUpdateTime();
}
})
.keyBy(OrderPayment::getOrderId);
Pattern<OrderPayment, ?> pattern = Pattern.<OrderPayment>begin("begin").where(new SimpleCondition<OrderPayment>() {
@Override
public boolean filter(OrderPayment orderPayment) throws Exception {
return orderPayment.getStatus() == 0;
}
}).next("next").where(new SimpleCondition<OrderPayment>() {
@Override
public boolean filter(OrderPayment orderPayment) throws Exception {
return orderPayment.getStatus() == 1;
}
}).within(Time.seconds(15));
PatternStream<OrderPayment> patternStream = CEP.pattern(orderPaymentLongKeyedStream, pattern);
OutputTag orderExpired = new OutputTag<OrderPayment>("orderExpired"){};
SingleOutputStreamOperator<OrderPaymentResult> select = patternStream.select(orderExpired, new PatternTimeoutFunction<OrderPayment, OrderPaymentResult>() {
@Override
public OrderPaymentResult timeout(Map<String, List<OrderPayment>> map, long l) throws Exception {
OrderPaymentResult orderPaymentResult = new OrderPaymentResult();
OrderPayment orderPayment = map.get("begin").iterator().next();
orderPaymentResult.setOrderId(orderPayment.getOrderId());
orderPaymentResult.setStatus(orderPayment.getStatus());
orderPaymentResult.setUpdateTime(orderPayment.getUpdateTime());
orderPaymentResult.setMessage("支付超时");
return orderPaymentResult;
}
}, new PatternSelectFunction<OrderPayment, OrderPaymentResult>() {
@Override
public OrderPaymentResult select(Map<String, List<OrderPayment>> map) throws Exception {
OrderPaymentResult orderPaymentResult = new OrderPaymentResult();
OrderPayment orderPayment = map.get("next").iterator().next();
orderPaymentResult.setOrderId(orderPayment.getOrderId());
orderPaymentResult.setStatus(orderPayment.getStatus());
orderPaymentResult.setUpdateTime(orderPayment.getUpdateTime());
orderPaymentResult.setMessage("支付成功");
return orderPaymentResult;
}
});
select.print("payed");
env.execute("payed job");
}
}
4.6 商品UV统计
功能: 统计商品在一段时间内的UV(Unique Visitor) 核心代码:
import cn.oldlu.flink.screen.database.bo.GoodsAccessLog;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.springframework.boot.Banner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
@SpringBootApplication
@ComponentScan(basePackages = {"cn.oldlu"})
@EnableTransactionManagement
public class ScreenUniqueVisitorApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(ScreenUniqueVisitorApplication.class);
application.setBannerMode(Banner.Mode.OFF);
application.run(args);
}
@Override
public void run(String... args) throws Exception {
executeFlinkTask();
}
public void executeFlinkTask() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);
DataStreamSource<String> goodsAccessDataStreamSource = env.readTextFile("data/goods_access.log");
goodsAccessDataStreamSource.flatMap(new FlatMapFunction<String, GoodsAccessLog>() {
@Override
public void flatMap(String goodsAccessStr, Collector<GoodsAccessLog> collector) throws Exception {
String[] elements = goodsAccessStr.split("\t");
System.out.println("receive msg => " + goodsAccessStr);
GoodsAccessLog goodsAccessLog = new GoodsAccessLog();
goodsAccessLog.setIp(elements[0]);
goodsAccessLog.setAccessTime(Long.valueOf(elements[1]));
goodsAccessLog.setEventType(elements[2]);
goodsAccessLog.setGoodsId(elements[3]);
collector.collect(goodsAccessLog);
}
})
.filter(new FilterFunction<GoodsAccessLog>() {
@Override
public boolean filter(GoodsAccessLog goodsAccessLog) throws Exception {
return goodsAccessLog.getEventType().equals("view");
}
})
.keyBy(GoodsAccessLog::getGoodsId)
.timeWindow(Time.seconds(10))
.process(new ProcessWindowFunction<GoodsAccessLog, Map<String, String>, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<GoodsAccessLog> elements,
Collector<Map<String, String>> collector) throws Exception {
Set<String> ipSet = new HashSet<>();
Map<String, String> goodsUV = new LinkedHashMap<>();
elements.forEach(log -> {
ipSet.add(log.getIp());
});
goodsUV.put(key, context.window().getEnd() + ":" + ipSet.size());
collector.collect(goodsUV);
}
})
.print("uv result").setParallelism(1);
env.execute("job");
}
}
4.7 布隆过滤器
功能: 统计商品在一段时间内的UV(采用布隆过滤器) 核心代码:
);
env.execute("job");
}
public static class CustomWindowTrigger extends Trigger<GoodsAccessLog, TimeWindow> {
@Override
public TriggerResult onElement(GoodsAccessLog element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
private class CustomUVBloom extends ProcessWindowFunction<GoodsAccessLog, Tuple2<String, String>, String, TimeWindow> {
private transient ValueState<BloomFilter> bloomState;
private transient ValueState<Long> countState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<BloomFilter> bloomFilterValueStateDescriptor = new ValueStateDescriptor<BloomFilter>("bloomState", BloomFilter.class);
bloomState = getRuntimeContext().getState(bloomFilterValueStateDescriptor);
countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count-state", Long.class));
}
@Override
public void process(String s, Context context, Iterable<GoodsAccessLog> elements, Collector<Tuple2<String, String>> out) throws Exception {
BloomFilter bloomFilter = bloomState.value();
if(bloomState.value() == null ) {
bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 1000000);
countState.update(0L);
}
Iterator<GoodsAccessLog> accessLogs = elements.iterator();
while(accessLogs.hasNext()) {
GoodsAccessLog log = accessLogs.next();
String repeatKey = log.getIp() + log.getGoodsId();
if(!bloomFilter.mightContain(repeatKey)) {
bloomFilter.put(repeatKey);
countState.update(countState.value() + 1);
bloomState.update(bloomFilter);
out.collect(Tuple2.of(log.getGoodsId(), countState.value().toString()));
}
}
}
}
}
|