安装hbase
1)HBase-env.sh修改内容: 可不用改 因为profile已经把JAVA_HOME设为全局了!
export JAVA_HOME=/opt/module/jdk1.8.0_144
export HBASE_MANAGES_ZK=false
2)HBase-site.xml修改内容:
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop102:9000/HBase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/module/zookeeper-3.4.10/zkData</value>
</property>
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
</configuration>
3)regionservers:
hadoop102
hadoop103
hadoop104
4)软连接hadoop配置文件到HBase:
[root@hadoop103 module]# ln -s /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml
/opt/module/HBase/conf/core-site.xml
[root@hadoop103 module]# ln -s /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml
/opt/module/HBase/conf/hdfs-site.xml
5)分发HBase到其他节点
[root@hadoop103 module]# xsync hbase/
启动服务
[root@hadoop103 hbase]# bin/start-HBase.sh
查看hbase界面hadoop103:160010
[root@hadoop103 hbase]# bin/stop-HBase.sh
安装phoenix
先安装依赖
yum install python-argparse
1)下载 Phoenix
http://archive.apache.org/dist/phoenix/apache-phoenix-4.14.2-HBase-1.3/
2)解压 jar 包
tar -zxvf apache-phoenix-4.14.2-HBase-1.3-bin.tar.gz -C /opt/module
mv apache-phoenix-4.14.2-HBase-1.3-bin phoenix
3)复制 jar 包
复制 HBase 需要用到 server 和 client 2 个 jar 包
cp phoenix-4.14.2-HBase-1.3-server.jar /opt/module/hbase-1.3.1/lib cp phoenix-4.14.2-HBase-1.3-client.jar /opt/module/hbase-1.3.1/lib
4)分发 jar 包
需要把刚才 copy 的 2个jar 包分发到其他 HBase 节点
5)配置环境变量
export PHOENIX_HOME=/opt/module/phoenix export PHOENIX_CLASSPATH=
P
H
O
E
N
I
X
H
O
M
E
e
x
p
o
r
t
P
A
T
H
=
PHOENIX_HOME export PATH=
PHOENIXH?OMEexportPATH=PATH:$PHOENIX_HOME/bin
6)vim bin/hbase-site.xml
添加
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
7)启动 hadoop, zookeeper, HBase
8)启动 Phoenix
[root@hadoop103 phoenix]# bin/sqlline.py hadoop102,hadoop103,hadoop104:2181
分流sink之保存到hbase
程序流程分析
引入依赖
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
因为要用单独的schema,所以在程序中加入hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop102:8020/HBase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>hbase.wal.provider</name>
<value>filesystem</value>
</property>
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
</configuration>
在phoenix中执行
create schema GMALL200821_REALTIME;
DWS层与DWM层的设计
需求梳理
访客uv计算
UV,全称是Unique Visitor,即独立访客,对于实时计算中,也可以称为DAU(Daily Active User),即每日活跃用户,因为实时计算中的uv通常是指当日的访客数。
那么如何从用户行为日志中识别出当日的访客,那么有两点:
其一,是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用
其二,由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重
package com.atguigu.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.StringValueUtils;
import java.text.SimpleDateFormat;
public class DauApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/gmall/dwd_log/ck"));
System.setProperty("HADOOP_USER_NAME", "root");
env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L));
String groupId = "unique_visit_app";
String sourceTopic = "dwd_page_log";
String sinkTopic = "dwm_unique_visit";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaSource(sourceTopic, groupId));
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
try {
JSONObject jsonObject = JSON.parseObject(value);
out.collect(jsonObject);
} catch (Exception e) {
ctx.output(new OutputTag<String>("dirty") {
}, value);
}
}
});
KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new UvRichFilterFunction());
filterDS.map(JSON::toString).addSink(MyKafkaUtil.getKafkaSink(sinkTopic));
env.execute();
}
public static class UvRichFilterFunction extends RichFilterFunction<JSONObject> {
private ValueState<String> firstVisitState;
private SimpleDateFormat simpleDateFormat;
@Override
public void open(Configuration parameters) throws Exception {
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
ValueStateDescriptor<String> stringValueStateDescriptor = new ValueStateDescriptor<>("visit-state", String.class);
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build();
stringValueStateDescriptor.enableTimeToLive(stateTtlConfig);
firstVisitState = getRuntimeContext().getState(stringValueStateDescriptor);
}
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageID = value.getJSONObject("page").getString("last_page_id");
if (lastPageID == null || lastPageID.length() <= 0) {
String firstVisitDate = firstVisitState.value();
Long ts = value.getLong("ts");
String curDate = simpleDateFormat.format(ts);
if (firstVisitDate == null || firstVisitDate.equals(curDate)) {
firstVisitState.update(curDate);
return true;
} else {
return false;
}
} else {
return false;
}
}
}
}
跳出明细计算
首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么要抓住几个特征:
1)该页面是用户近期访问的第一个页面
这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。
2) 首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。
这第一个特征的识别很简单,保留last_page_id为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢?
最简单的办法就是Flink自带的CEP技术。这个CEP非常适合通过多条数据组合来识别某个事件。
package com.atguigu.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
public class UserJumpDetailApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/gmall/dwd_log/ck"));
System.setProperty("HADOOP_USER_NAME", "root");
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L));
String sourceTopic = "dwd_page_log";
String groupId = "userJumpDetailApp";
String sinkTopic = "dwm_user_jump_detail";
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, groupId);
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
try {
JSONObject jsonObject = JSON.parseObject(value);
out.collect(jsonObject);
} catch (Exception e) {
ctx.output(new OutputTag<String>("dirty") {
}, value);
}
}
});
KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageId = value.getJSONObject("page").getString("last_page_id");
return lastPageId == null || lastPageId.length() <= 0;
}
}).followedBy("follow").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageId = value.getJSONObject("page").getString("last_page_id");
return lastPageId != null && lastPageId.length() > 0;
}
}).within(Time.seconds(10));
PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);
OutputTag<String> timeOutTag = new OutputTag<String>("TimeOut"){};
SingleOutputStreamOperator<Object> selectDS = patternStream.flatSelect(timeOutTag, new PatternFlatTimeoutFunction<JSONObject, String>() {
@Override
public void timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
out.collect(pattern.get("start").get(0).toString());
}
}, new PatternFlatSelectFunction<JSONObject, Object>() {
@Override
public void flatSelect(Map<String, List<JSONObject>> pattern, Collector<Object> out) throws Exception {
}
});
FlinkKafkaProducer<String> kafkaSink = MyKafkaUtil.getKafkaSink(sinkTopic);
selectDS.getSideOutput(timeOutTag).addSink(kafkaSink);
env.execute();
}
}
订单宽表
创建实体单表
package com.atguigu.gmall.realtime.bean;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class OrderInfo {
Long id;
Long province_id;
String order_status;
Long user_id;
BigDecimal total_amount;
BigDecimal activity_reduce_amount;
BigDecimal coupon_reduce_amount;
BigDecimal original_total_amount;
BigDecimal feight_fee;
String expire_time;
String create_time;
String operate_time;
String create_date;
String create_hour;
Long create_ts;
}
创建订单明细实体类
package com.atguigu.gmall.realtime.bean;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class OrderDetail {
Long id;
Long order_id ;
Long sku_id;
BigDecimal order_price ;
Long sku_num ;
String sku_name;
String create_time;
BigDecimal split_total_amount;
BigDecimal split_activity_amount;
BigDecimal split_coupon_amount;
Long create_ts;
}
创建OrderWideApp读取订单和订单明细数据
package com.atguigu.gmall.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.atguigu.gmall.realtime.bean.OrderDetail;
import com.atguigu.gmall.realtime.bean.OrderInfo;
import com.atguigu.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.text.SimpleDateFormat;
public class OrderWideApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
String orderInfoSourceTopic = "dwd_order_info";
String orderDetailSourceTopic = "dwd_order_detail";
String orderWideSinkTopic = "dwm_order_wide";
String groupId = "order_wide_group";
FlinkKafkaConsumer<String> sourceOrderInfo = MyKafkaUtil.getKafkaSource(orderInfoSourceTopic,groupId);
FlinkKafkaConsumer<String> sourceOrderDetail = MyKafkaUtil.getKafkaSource(orderDetailSourceTopic,groupId);
DataStream<String> orderInfojsonDStream = env.addSource(sourceOrderInfo);
DataStream<String> orderDetailJsonDStream = env.addSource(sourceOrderDetail);
DataStream<OrderInfo> orderInfoDStream = orderInfojsonDStream.map(
new RichMapFunction<String, OrderInfo>() {
SimpleDateFormat simpleDateFormat=null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd");
}
@Override
public OrderInfo map(String jsonString) throws Exception {
OrderInfo orderInfo = JSON.parseObject(jsonString, OrderInfo.class);
orderInfo.setCreate_ts(simpleDateFormat.parse(orderInfo.getCreate_time()).getTime());
return orderInfo;
}
}
);
DataStream<OrderDetail> orderDetailDStream = orderDetailJsonDStream.map(new RichMapFunction<String, OrderDetail>() {
SimpleDateFormat simpleDateFormat=null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd");
}
@Override
public OrderDetail map(String jsonString) throws Exception {
OrderDetail orderDetail = JSON.parseObject(jsonString, OrderDetail.class);
orderDetail.setCreate_ts (simpleDateFormat.parse(orderDetail.getCreate_time()).getTime());
return orderDetail;
}
});
orderInfoDStream.print("orderInfo>>>>>>>");
orderDetailDStream.print("orderDetail>>>>>>");
env.execute();
}
}
创建合并后的宽表实体类
package com.atguigu.gmall.realtime.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.ObjectUtils;
import java.math.BigDecimal;
@Data
@AllArgsConstructor
public class OrderWide {
Long detail_id;
Long order_id ;
Long sku_id;
BigDecimal order_price ;
Long sku_num ;
String sku_name;
Long province_id;
String order_status;
Long user_id;
BigDecimal total_amount;
BigDecimal activity_reduce_amount;
BigDecimal coupon_reduce_amount;
BigDecimal original_total_amount;
BigDecimal feight_fee;
BigDecimal split_feight_fee;
BigDecimal split_activity_amount;
BigDecimal split_coupon_amount;
BigDecimal split_total_amount;
String expire_time;
String create_time;
String operate_time;
String create_date;
String create_hour;
String province_name;
String province_area_code;
String province_iso_code;
String province_3166_2_code;
Integer user_age ;
String user_gender;
Long spu_id;
Long tm_id;
Long category3_id;
String spu_name;
String tm_name;
String category3_name;
public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail){
mergeOrderInfo(orderInfo);
mergeOrderDetail(orderDetail);
}
public void mergeOrderInfo(OrderInfo orderInfo ) {
if (orderInfo != null) {
this.order_id = orderInfo.id;
this.order_status = orderInfo.order_status;
this.create_time = orderInfo.create_time;
this.create_date = orderInfo.create_date;
this.activity_reduce_amount = orderInfo.activity_reduce_amount;
this.coupon_reduce_amount = orderInfo.coupon_reduce_amount;
this.original_total_amount = orderInfo.original_total_amount;
this.feight_fee = orderInfo.feight_fee;
this.total_amount = orderInfo.total_amount;
this.province_id = orderInfo.province_id;
this.user_id = orderInfo.user_id;
}
}
public void mergeOrderDetail(OrderDetail orderDetail ) {
if (orderDetail != null) {
this.detail_id = orderDetail.id;
this.sku_id = orderDetail.sku_id;
this.sku_name = orderDetail.sku_name;
this.order_price = orderDetail.order_price;
this.sku_num = orderDetail.sku_num;
this.split_activity_amount=orderDetail.split_activity_amount;
this.split_coupon_amount=orderDetail.split_coupon_amount;
this.split_total_amount=orderDetail.split_total_amount;
}
}
public void mergeOtherOrderWide(OrderWide otherOrderWide){
this.order_status = ObjectUtils.firstNonNull( this.order_status ,otherOrderWide.order_status);
this.create_time = ObjectUtils.firstNonNull(this.create_time,otherOrderWide.create_time);
this.create_date = ObjectUtils.firstNonNull(this.create_date,otherOrderWide.create_date);
this.coupon_reduce_amount = ObjectUtils.firstNonNull(this.coupon_reduce_amount,otherOrderWide.coupon_reduce_amount);
this.activity_reduce_amount = ObjectUtils.firstNonNull(this.activity_reduce_amount,otherOrderWide.activity_reduce_amount);
this.original_total_amount = ObjectUtils.firstNonNull(this.original_total_amount,otherOrderWide.original_total_amount);
this.feight_fee = ObjectUtils.firstNonNull( this.feight_fee,otherOrderWide.feight_fee);
this.total_amount = ObjectUtils.firstNonNull( this.total_amount,otherOrderWide.total_amount);
this.user_id = ObjectUtils.<Long>firstNonNull(this.user_id,otherOrderWide.user_id);
this.sku_id = ObjectUtils.firstNonNull( this.sku_id,otherOrderWide.sku_id);
this.sku_name = ObjectUtils.firstNonNull(this.sku_name,otherOrderWide.sku_name);
this.order_price = ObjectUtils.firstNonNull(this.order_price,otherOrderWide.order_price);
this.sku_num = ObjectUtils.firstNonNull( this.sku_num,otherOrderWide.sku_num);
this.split_activity_amount=ObjectUtils.firstNonNull(this.split_activity_amount);
this.split_coupon_amount=ObjectUtils.firstNonNull(this.split_coupon_amount);
this.split_total_amount=ObjectUtils.firstNonNull(this.split_total_amount);
}
}
PhoenixUtil
package com.atguigu.utils;
import com.atguigu.common.GmallConfig;
import org.apache.commons.beanutils.BeanUtils;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class PhoenixUtil {
private static Connection connection=null;
private static Connection init(){
try {
Class.forName(GmallConfig.PHOENIX_DRIVER);
Connection connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
return connection;
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
throw new RuntimeException("获取连接失败");
}
}
public static <T> List<T> queryList(String sql,Class<T>cls){
if(connection==null)connection=init();
PreparedStatement preparedStatement=null;
ResultSet resultSet=null;
try {
preparedStatement = connection.prepareStatement(sql);
resultSet = preparedStatement.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
ArrayList<T>list=new ArrayList<>();
while (resultSet.next()){
T t = cls.newInstance();
for(int i=1;i<=columnCount;i++){
BeanUtils.setProperty(t,metaData.getColumnName(i),resultSet.getObject(i));
}
list.add(t);
}
return list;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("查询维度信息");
}finally {
if(preparedStatement!=null){
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(resultSet!=null){
try {
resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
}
}
DimUtil
package com.atguigu.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import java.util.List;
public class DimUtil {
public static JSONObject getDimInfo(String tableName, Tuple2<String,String>...columnValues){
if(columnValues.length<=0){
throw new RuntimeException("查询维度数据时,请至少设置一个查询条件!");
}
StringBuilder whereSql = new StringBuilder(" where ");
StringBuilder redisKey = new StringBuilder(tableName).append(":");
for (int i=0;i<columnValues.length;i++){
Tuple2<String, String> columnValue = columnValues[i];
String column=columnValue.f0;
String value=columnValue.f1;
whereSql.append(column).append("='").append(value).append("'");
redisKey.append(value);
if(i<columnValues.length-1){
whereSql.append(" and ");
redisKey.append(":");
}
}
Jedis jedis = RedisUtil.getJedis();
String dimJsonStr = jedis.get(redisKey.toString());
if(dimJsonStr!=null&&dimJsonStr.length()>0){
jedis.close();
return JSON.parseObject(dimJsonStr);
}
String querySql = "select * from " + tableName + whereSql.toString();
System.out.println(querySql);
List<JSONObject> queryList = PhoenixUtil.queryList(querySql, JSONObject.class);
JSONObject dimJsonObj=queryList.get(0);
jedis.set(redisKey.toString(),dimJsonObj.toString());
jedis.expire(redisKey.toString(),24*60*60);
return dimJsonObj;
}
public static void deleteCached( String tableName, String id){
String key = tableName.toUpperCase() + ":" + id;
try {
Jedis jedis = RedisUtil.getJedis();
jedis.del(key);
jedis.close();
} catch (Exception e) {
System.out.println("缓存异常!");
e.printStackTrace();
}
}
public static JSONObject getDimInfo(String tableName,String value){
return getDimInfo(tableName,new Tuple2<>("id",value));
}
public static void main(String[] args) {
System.out.println(getDimInfo("GMALL200821_REALTIME.DIM_BASE_DIC", new Tuple2<>("id", "32432")));
}
}
RedisUtil
package com.atguigu.utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
public static JedisPool jedisPool = null;
public static Jedis getJedis() {
if (jedisPool == null) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(100);
jedisPoolConfig.setBlockWhenExhausted(true);
jedisPoolConfig.setMaxWaitMillis(2000);
jedisPoolConfig.setMaxIdle(5);
jedisPoolConfig.setMinIdle(5);
jedisPoolConfig.setTestOnBorrow(true);
jedisPool = new JedisPool(jedisPoolConfig, "hadoop103", 6379, 1000);
System.out.println("开辟连接池");
return jedisPool.getResource();
} else {
return jedisPool.getResource();
}
}
}
异步查询
异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。
这种方式特别针对涉及网络IO的操作,减少因为请求等待带来的消耗。
orderWideApp
package com.atguigu.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DimAsyncFunction;
import com.atguigu.bean.OrderDetail;
import com.atguigu.bean.OrderInfo;
import com.atguigu.bean.OrderWide;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.concurrent.TimeUnit;
public class OrderWideApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String orderInfoSourceTopic = "dwd_order_info";
String orderDetailSourceTopic = "dwd_order_detail";
String orderWideSinkTopic = "dwm_order_wide";
String groupId = "order_wide_group";
FlinkKafkaConsumer<String> orderInfoKafkaSource = MyKafkaUtil.getKafkaSource(orderInfoSourceTopic, groupId);
DataStreamSource<String> orderInfoKafkaDS = env.addSource(orderInfoKafkaSource);
DataStreamSource<String> orderDetailKafkaDS = env.addSource(MyKafkaUtil.getKafkaSource(orderDetailSourceTopic, groupId));
WatermarkStrategy<OrderInfo> orderInfoWatermarkStrategy = WatermarkStrategy.<OrderInfo>forMonotonousTimestamps().
withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
@Override
public long extractTimestamp(OrderInfo element, long recordTimestamp) {
return element.getCreate_ts();
}
});
WatermarkStrategy<OrderDetail> orderDetailWatermarkStrategy = WatermarkStrategy.<OrderDetail>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
@Override
public long extractTimestamp(OrderDetail element, long recordTimestamp) {
return element.getCreate_ts();
}
});
KeyedStream<OrderInfo, Long> orderInfoWithKeyedStream = orderInfoKafkaDS.map(jsonStr -> {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
OrderInfo orderInfo = JSON.parseObject(jsonStr, OrderInfo.class);
String create_time = orderInfo.getCreate_time();
String[] createTimeArr = create_time.split(" ");
orderInfo.setCreate_date(createTimeArr[0]);
orderInfo.setCreate_hour(createTimeArr[1]);
orderInfo.setCreate_ts(simpleDateFormat.parse(create_time).getTime());
return orderInfo;
}).assignTimestampsAndWatermarks(orderInfoWatermarkStrategy)
.keyBy(OrderInfo::getId);
KeyedStream<OrderDetail, Long> orderDetailWithKeyedStream = orderDetailKafkaDS.map(jsonStr -> {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
OrderDetail orderDetail = JSON.parseObject(jsonStr, OrderDetail.class);
orderDetail.setCreate_ts(sdf.parse(orderDetail.getCreate_time()).getTime());
return orderDetail;
}).assignTimestampsAndWatermarks(orderDetailWatermarkStrategy)
.keyBy(OrderDetail::getOrder_id);
SingleOutputStreamOperator<OrderWide> orderWideDS = orderInfoWithKeyedStream.intervalJoin(orderDetailWithKeyedStream)
.between(Time.seconds(-5), Time.seconds(5))
.process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
@Override
public void processElement(OrderInfo left, OrderDetail right, Context ctx, Collector<OrderWide> out) throws Exception {
out.collect(new OrderWide(left, right));
}
});
SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait(orderWideDS,
new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
@Override
public String getKey(OrderWide input) {
return input.getUser_id().toString();
}
@Override
public void join(OrderWide input, JSONObject dimInfo) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String birthday = dimInfo.getString("BIRTHDAY");
long currentTs = System.currentTimeMillis();
long ts = sdf.parse(birthday).getTime();
long ageLong = (currentTs - ts) / 1000L / 60 / 60 / 24 / 365;
input.setUser_age((int) ageLong);
String gender = dimInfo.getString("GENDER");
input.setUser_gender(gender);
}
}, 60, TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> oderWideWithProvinceDS = AsyncDataStream.unorderedWait(orderWideWithUserDS,
new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getProvince_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException {
orderWide.setProvince_name(dimInfo.getString("NAME"));
orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE"));
orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE"));
orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2"));
}
}, 60, TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait(
oderWideWithProvinceDS, new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
orderWide.setSku_name(jsonObject.getString("SKU_NAME"));
orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID"));
orderWide.setSpu_id(jsonObject.getLong("SPU_ID"));
orderWide.setTm_id(jsonObject.getLong("TM_ID"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getSku_id());
}
}, 60, TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait(
orderWideWithSkuDS, new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getSpu_id());
}
}, 60, TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
orderWideWithSpuDS, new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
orderWide.setCategory3_name(jsonObject.getString("NAME"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getCategory3_id());
}
}, 60, TimeUnit.SECONDS);
SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait(
orderWideWithCategory3DS, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
orderWide.setTm_name(jsonObject.getString("TM_NAME"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getTm_id());
}
}, 60, TimeUnit.SECONDS);
orderWideWithTmDS.map(JSON::toJSONString).addSink(MyKafkaUtil.getKafkaSink(orderWideSinkTopic));
env.execute();
}
}
DimAsyncFunction
package com.atguigu.app.func;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.OrderWide;
import com.atguigu.utils.DimUtil;
import com.atguigu.utils.ThreadPoolUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {
private ThreadPoolExecutor threadPoolExecutor;
private String tableName;
public DimAsyncFunction(String tableName) {
this.tableName = tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
threadPoolExecutor = ThreadPoolUtil.getInstance();
}
@Override
public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
String key = getKey(input);
JSONObject dimInfo = DimUtil.getDimInfo(tableName, key);
if (dimInfo != null && dimInfo.size() > 0) {
try {
join(input, dimInfo);
} catch (Exception e) {
e.printStackTrace();
}
}
resultFuture.complete(Collections.singletonList(input));
}
});
}
}
DimJoinFunction
package com.atguigu.app.func;
import com.alibaba.fastjson.JSONObject;
import java.text.ParseException;
public interface DimJoinFunction<T> {
String getKey(T input);
void join(T input, JSONObject dimInfo) throws ParseException, Exception;
}
ThreadPoolUtil
package com.atguigu.utils;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolUtil {
public static ThreadPoolExecutor pool;
private ThreadPoolUtil() {
}
public static ThreadPoolExecutor getInstance() {
if (pool == null) {
synchronized (ThreadPoolUtil.class) {
if (pool == null) {
pool = new ThreadPoolExecutor(3,
3,
300L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE));
}
}
}
return pool;
}
}
初始化数据到Hbase
1、启动Maxwell、ZK、Kafka、HDFS、Hbase、Redis
2、运行运行Idea中的BaseDBApp
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table user_info --client_id maxwell_1
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table base_province --client_id maxwell_1
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table sku_info --client_id maxwell_1
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table spu_info --client_id maxwell_1
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table base_category3 --client_id maxwell_1
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table base_trademark --client_id maxwell_1
|