实时数仓
实时计算与实时数仓比较
普通的实时计算优先考虑时效性,所以从数据源采集经过实时计算直接得到结果。如此做时效性更好,但是弊端是由于计算过程中的中间结果没有沉淀下来,所以当面对大量实时需求的时候,计算的复用性较差,开发成本随着需求增加直线上升
实时数仓基于一定的数据仓库理念,对数据处理流程进行规划、分层,目的是提高数据的复用性。
实时数仓分层
ODS
原始数据,日志和业务数据
DWD
根据数据对象为单位进行分流,比如订单、页面访问等等
DIM
维度数据
DWM
对于部分数据对象进行进一步加工,比如独立访问、跳出行为,也可以和维度进行关联,形成宽表,依旧是明细数据。
DWS
根据某个主题将多个事实数据轻度聚合,形成主题宽表。
ADS
把Clickhouse中的数据根据可视化需要进行筛选聚合
日志采集
日志采集模块
创建Springboot模块
在创建目录/opt/module/gmall_flink/rt_log
将生成行为数据的jar放在该目录下
打包单机部署
logback.xml文件
将logback.xml文件放到resource目录下
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_HOME" value="d:/logs" />
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<logger name="com.atguigu.gmalllogger.controller.LoggerController"
level="INFO" additivity="false">
<appender-ref ref="rollingFile" />
<appender-ref ref="console" />
</logger>
<root level="error" additivity="false">
<appender-ref ref="console" />
</root>
</configuration>
发送文件到Kafka
package com.atguigu.gmalllogger.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class LoggerController {
@Autowired
private KafkaTemplate <String,String> kafkaTemplate;
@RequestMapping("applog")
public String getLogger(@RequestParam("param")String jsoStr){
log.info(jsoStr);
kafkaTemplate.send("ods_base_log",jsoStr);
return "success";
}
}
kafka中创建主题
[root@hadoop103 kafka]# bin/kafka-topics.sh --zookeeper hadoop103:2181 --create --topic ods_base_log --partitions 2 --replication-factor 2
启动程序
创建消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic ods_base_log
开一个新的窗口
执行java -jar gmall2020-mock-log-2020-12-18.jar
修改logback.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_HOME" value="/opt/module/gmall-flink/rt_apploglogs" />
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<logger name="com.atguigu.gmalllogger.controller.LoggerController"
level="INFO" additivity="false">
<appender-ref ref="rollingFile" />
<appender-ref ref="console" />
</logger>
<root level="error" additivity="false">
<appender-ref ref="console" />
</root>
</configuration>
打包
只需要gmall-logger模块
gmall-logger-0.0.1-SNAPSHOT.jar
将生成的jar包放置在/opt/module/gmall-flink/rt_applog/
修改application.yml文件
mock.date: "2020-12-18"
mock.type: "http"
mock.url: "http://localhost:8081/applog"
mock.startup.count: 1000
mock.max.mid: 20
mock.max.uid: 50
mock.max.sku-id: 10
mock.page.during-time-ms: 20000
mock.error.rate: 3
mock.log.sleep: 100
mock.detail.source-type-rate: "40:25:15:20"
mock.if_get_coupon_rate: 75
mock.max.coupon-id: 3
mock.search.keyword: "图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子"
测试
开三个窗口
分别执行
[root@hadoop103 kafka]#java -jar gmall-logger-0.0.1-SNAPSHOT.jar
[root@hadoop103 kafka]# java -jar gmall2020-mock-log-2020-12-18.jar
[root@hadoop103 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic ods_base_log
观察是否有数据产生
打包集群部署
搭建Nginc环境
Nginx (“engine x”) 是一个高性能的HTTP和反向代理服务器,特点是占有内存少,并发能力强,事实上nginx的并发能力确实在同类型的网页服务器中表现较好,中国大陆使用nginx网站用户有:百度、京东、新浪、网易、腾讯、淘宝等。
正向代理和反向代理
正向代理类似一个跳板机,代理访问外部资源。比如:我是一个用户,我访问不了某网站,但是我能访问一个代理服务器,这个代理服务器,它能访问那个我不能访问的网站,于是我先连上代理服务器,告诉它我需要那个无法访问网站的内容,代理服务器去取回来,然后返回给我。
反向代理(Reverse Proxy)方式是指以代理服务器来接受internet上的连接请求,然后将请求转发给内部网络上的服务器,并将从服务器上得到的结果返回给internet上请求连接的客户端,此时代理服务器对外就表现为一个反向代理服务器;
nginc的安装
在hadoop103上运行yum,安装相关依赖包
yum -y install openssl openssl-devel pcre pcre-devel zlib zlib-devel gcc gcc-c++
将nginx-1.12.2.tar.gz放到/opt/soft目录下
tar -zxvf nginx-1.12.2.tar.gz 解压到soft目录下
进入/opt/module/soft/nginx-1.12.2目录下
执行
[root@hadoop103 nginx-1.12.2]# ./configure --prefix=/opt/module/nginx
[root@hadoop103 nginx-1.12.2]# make && make install
启动nginx
1)在/opt/module/nginx/sbin目录下执行 ./nginx
[root@hadoop103 sbin]# ./nginx
如果在普通用户下面启动会报错
原因:nginx占用80端口,默认情况下非root用户不允许使用1024以下端口
解决:让当前用户的某个应用也可以使用1024以下的端口
sudo setcap cap_net_bind_service=+eip /opt/module/nginx/sbin/nginx
注意:要根据自己的实际路径进行配置
2)查看启动情况
ps -ef |grep nginx
因为nginx不是用java写的,所以不能通过jps查看
3) 在浏览器中输入http://hadoop102/访问
nginx常用命令
重启Nginx
./nginx -s reload
关闭Nginx
./nginx -s stop
通过配置文件启动
./nginx -c /opt/module/nginx/conf/nginx.conf
/opt/module/nginx/sbin/nginx -c /opt/module/nginx/conf/nginx.conf
其中-c是指定配置文件,而且配置文件路径必须指定绝对路径
配置检查
当修改Nginx配置文件后,可以使用Nginx命令进行配置文件语法检查,用于检查Nginx配置文件是否正确
/opt/module/nginx/sbin/nginx -c /opt/module/nginx/conf/nginx.conf –t
配置nginx
修改nginx.conf配置文件
在server内部配置
worker_processes 1;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
upstream logserver{
server hadoop102:8081 weight=1;
server hadoop103:8081 weight=1;
server hadoop104:8081 weight=1;
}
sendfile on;
keepalive_timeout 65;
server {
listen 80;
server_name logserver;
location / {
root html;
index index.html index.htm;
proxy_pass http://logserver;
proxy_connect_timeout 10;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
采集日志的jar包同步到其它机器
[root@hadoop103 module]# xsync gmall-flink
集群群起脚本(logger.sh)
#!/bin/bash
JAVA_BIN= /opt/module/jdk1.8.0_144/bin/java
APPNAME=gmall-logger-0.0.1-SNAPSHOT.jar
case $1 in
"start")
{
for i in hadoop102 hadoop103 hadoop104
do
echo "========: $i==============="
ssh $i "/opt/module/jdk1.8.0_144/bin/java -Xms32m -Xmx64m -jar /opt/module/gmall-flink/rt_applog/gmall-logger-0.0.1-SNAPSHOT.jar >/dev/null 2>&1 &"
done
};;
"stop")
{
for i in hadoop102 hadoop103 hadoop104
do
echo "========: $i==============="
ssh $i "ps -ef|grep $APPNAME |grep -v grep|awk '{print \$2}'|xargs kill" >/dev/null 2>&1
done
};;
esac
修改模拟日志生成的配置
mock.date: "2020-12-18"
mock.type: "http"
mock.url: "http://hadoop103/applog"
mock.startup.count: 1000
mock.max.mid: 20
mock.max.uid: 50
mock.max.sku-id: 10
mock.page.during-time-ms: 20000
mock.error.rate: 3
mock.log.sleep: 100
mock.detail.source-type-rate: "40:25:15:20"
mock.if_get_coupon_rate: 75
mock.max.coupon-id: 3
mock.search.keyword: "图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子"
测试
1)运行kafka消费
bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic ods_base_log
2)启动nginx服务采集服务集群
logger.sh start
3)运行模拟生成数据的jar
[root@hadoop102 rt_applog]# java -jar gmall2020-mock-log-2020-12-18.jar
业务数据采集
MySQL的准备
创建数据库
名字:gmall-flink-200821
编码:utf-8
导入建表数据
导入sql文件
修改/etc/my.cnf文件
[root@hadoop103]# vim /etc/my.cnf
server-id = 103 log-bin=mysql-bin binlog_format=row binlog-do-db=gmall-flink-200821
重启mysql
模拟生成数据
创建目录
[root@hadoop103 module]# mkdir rt_db
上传到该目录下
配置application.properties文件
修改mysql 的密码
或配置其它的一些信息
安装Maxwell
将maxwell-1.25.0.tar.gz上传到/opt/soft目录下
解压
初始化Maxwell元数据库
登入mysql
1)在MySQL中建立一个maxwell库用于存储Maxwell的元数据
mysql> create database maxwell;
2)设置安全级别
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
3)分配一个账号可以操作该数据库
mysql> GRANT ALL ON maxwell.* TO ‘maxwell’@’%’ IDENTIFIED BY ‘123456’;
4)分配这个账号可以监控其他数据库的权限
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON . TO maxwell@’%’;
使用maxwell监控抓取mysql数据
[root@hadoop103 maxwell-1.25.0]# cp config.properties.example config.properties
[root@hadoop103 maxwell-1.25.0]# vim config.properties
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka_topic=ods_base_db_m
# mysql login info
host=hadoop103
user=maxwell
password=123456
client_id=maxwell_1
producer_partition_by=primary_key
启动maxwell脚本(maxwell.sh)
/opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
测试
maxwell.sh
bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic ods_base_db_m
java -jar gmall2020-mock-db-2020-11-27.jar
观察是否有数据生成
安装canal
将canal.deployer-1.1.4.tar.gz上传到/opt/module目录下
解压到/opt/module/canal
注:canal解压后是散的所以要提前创建目录canal
修改conf/canal.properties的配置
[root@hadoop103 conf]# vim canal.properties
修改
canal.serverMode = kafka
canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
canal.mq.retries = 2
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
修改instance.properties
[root@hadoop103 example]# pwd /opt/module/canal/conf/example
修改
# mq config
canal.mq.topic=ods_base_db_c
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
测试
bin/startup.sh
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_base_db_c
造数据
dwd层数据准备
需求分析及实现思路
每层的职能
分层 | 数据描述 | 生成计算工具 | 存储媒介 |
---|
ODS | 原始数据,日志和业务数据 | 日志服务器,maxwell | kafka | DWD | 根据数据对象为单位进行分流,比如订单、页面访问等等。 | FLINK | kafka | DWM | 对于部分数据对象进行进一步加工,比如独立访问、跳出行为。依旧是明细数据。 | FLINK | kafka | DIM | 维度数据 | FLINK | HBase | DWS | 根据某个维度主题将多个事实数据轻度聚合,形成主题宽表。 | FLINK | Clickhouse | ADS | 把Clickhouse中的数据根据可视化需要进行筛选聚合。 | Clickhouse SQL | 可视化展示 |
DWD层数据准备实现思路
功能1:环境搭建
功能2:计算用户行为日志DWD层
功能3:计算业务数据DWD层
环境搭建
在工程中新建模块gmall-realtime
app | 产生各层数据的flink任务 |
---|
bean | 数据对象 | common | 公共常量 | utils | 工具类 |
添加依赖
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.12.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
在resources目录下创建log4j.properties配置文件
log4j.rootLogger=warn,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
准备用户行为日志DWD层
我们前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从kafka的ODS层读取的日志数据分为3类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流
主要任务
识别新老用户
利用测输出流实现数据的拆分
将不同流的数据推送到下游kafka的不同topic中
依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gmall-flink-200821</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>gmall-realtime</artifactId>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.12.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
代码
app
package com.atguigu.app.dwd;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
public class LogBaseApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/gmall/dwd_log/ck"));
env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000L));
System.setProperty("HADOOP_USER_NAME", "root");
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource("ods_base_log", "dwd_log");
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSONObject::parseObject);
KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(data -> data.getJSONObject("common").getString("mid"));
SingleOutputStreamOperator<JSONObject> jsonWithNewFlagDS = keyedStream.map(new NewMidRichMapFunction());
SingleOutputStreamOperator<String> pageDS = jsonWithNewFlagDS.process(new SplitProcessFunction());
DataStream<String> startDS = pageDS.getSideOutput(new OutputTag<String>("start") {
});
DataStream<String> displayDS = pageDS.getSideOutput(new OutputTag<String>("display") {
});
pageDS.addSink(MyKafkaUtil.getKafkaSink("dwd_page_log"));
startDS.addSink(MyKafkaUtil.getKafkaSink("dwd_start_log"));
displayDS.addSink(MyKafkaUtil.getKafkaSink("dwd_display_log"));
env.execute("");
}
public static class NewMidRichMapFunction extends RichMapFunction<JSONObject, JSONObject> {
private ValueState<String> firstVisitDateState;
private SimpleDateFormat simpleDateFormat;
@Override
public void open(Configuration parameters) throws Exception {
firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("new-mid", String.class));
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
}
@Override
public JSONObject map(JSONObject value) throws Exception {
String isNew = value.getJSONObject("common").getString("is_new");
if ("1".equals(isNew)) {
String firstDate = firstVisitDateState.value();
long ts = value.getLong("ts");
if (firstDate != null) {
value.getJSONObject("common").put("is_new", "0");
} else {
firstVisitDateState.update(simpleDateFormat.format(ts));
}
}
return value;
}
}
public static class SplitProcessFunction extends ProcessFunction<JSONObject, String> {
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
String startStr = value.getString("start");
if (startStr != null && startStr.length() > 0) {
ctx.output(new OutputTag<String>("start") {
}, value.toString());
} else {
JSONArray displays = value.getJSONArray("displays");
if (displays != null && displays.size() > 0) {
for (int i = 0; i < displays.size(); i++) {
JSONObject displaysJSON = displays.getJSONObject(i);
displaysJSON.put("page_id", value.getJSONObject("page").getString("page_id"));
ctx.output(new OutputTag<String>("display") {
}, displaysJSON.toString());
}
} else {
out.collect(value.toString());
}
}
}
}
}
util
package com.atguigu.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class MyKafkaUtil {
private static String KAFKA_SERVER = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
private static Properties properties = new Properties();
static {
properties.setProperty("bootstrap.servers", KAFKA_SERVER);
}
public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);
}
}
测试
日志数据dwd层测试所需进程
mock、nginx、Logger、kafka(zk)、flinkApp(Hdfs)、kafka(三个消费者)
准备业务数据DWD层
业务数据的变化,我们可以通过MaxWell采集到,但是MaxWell是把全部数据统一写入一个Topic中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka的业务数据ODS层读取数据,经过处理后,将维度数据保存到HBase,将事实数据写回Kafka作为业务数据的DWD层。
主要任务
接收kafka数据,过滤空值数据
对MaxWell抓取数据进行ETL,有用的部分保留,没用的过滤掉
实现动态分流功能
由于MaxWell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为Flink实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现
一种是用Zookeeper存储,通过Watch感知数据变化。
另一种是用mysql数据库存储,周期性的同步。
这里选择第二种方案,主要是mysql对于配置数据初始化和维护管理,用sql都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。
把分好的流保存到对应表,主题中
业务数据保存到Kafka的主题中
维度数据保存到HBase的表中
根据mysql的配置表,动态进行分流
1)gmall-realtime-200821
2)建表
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`,`operate_type`)
)
3)创建表配置实体类
import lombok.Data;
@Data
public class TableProcess {
public static final String SINK_TYPE_HBASE = "HBASE";
public static final String SINK_TYPE_KAFKA = "KAFKA";
public static final String SINK_TYPE_CK = "CLICKHOUSE";
String sourceTable;
String operateType;
String sinkType;
String sinkTable;
String sinkColumns;
String sinkPk;
String sinkExtend;
}
4)编写操作Mysql操作类
package com.atguigu.gmall.realtime.utils;import com.atguigu.gmall.realtime.bean.TableProcess;import com.google.common.base.CaseFormat;import org.apache.commons.beanutils.BeanUtils;import java.sql.*;import java.util.ArrayList;import java.util.List;public class MySQLUtil { public static <T> List<T> queryList(String sql, Class<T> clazz, Boolean underScoreToCamel) { Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try {
5)需要导入的依赖
<dependency>
<groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope></dependency>
<dependency>
<groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.9.3</version>
</dependency>
<dependency> <
groupId>com.google.guava</groupId> <artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <version>5.1.47</version>
</dependency>
|