IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 实仓(二) -> 正文阅读

[大数据]实仓(二)

安装hbase

1)HBase-env.sh修改内容: 可不用改 因为profile已经把JAVA_HOME设为全局了!

export JAVA_HOME=/opt/module/jdk1.8.0_144

export HBASE_MANAGES_ZK=false

2)HBase-site.xml修改内容:

<configuration>
<property>   
   <name>hbase.rootdir</name>   
   <value>hdfs://hadoop102:9000/HBase</value>  
  </property>
<property>  
   <name>hbase.cluster.distributed</name>
   <value>true</value>
</property>

  <!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
<property>
   <name>hbase.master.port</name>
   <value>16000</value>
</property>

<property>  
   <name>hbase.zookeeper.quorum</name>
   <value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
</property>

<property>  
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/opt/module/zookeeper-3.4.10/zkData</value>
</property>
    
<property>
    <name>phoenix.schema.isNamespaceMappingEnabled</name>
    <value>true</value>
</property>
<property>
    <name>phoenix.schema.mapSystemTablesToNamespace</name>
    <value>true</value>
</property>

</configuration>

3)regionservers:

hadoop102 
hadoop103  
hadoop104  

4)软连接hadoop配置文件到HBase:

[root@hadoop103 module]# ln -s /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml

/opt/module/HBase/conf/core-site.xml

[root@hadoop103 module]# ln -s /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml

/opt/module/HBase/conf/hdfs-site.xml

5)分发HBase到其他节点

[root@hadoop103 module]# xsync hbase/

启动服务

[root@hadoop103 hbase]# bin/start-HBase.sh

查看hbase界面hadoop103:160010

[root@hadoop103 hbase]# bin/stop-HBase.sh

安装phoenix

先安装依赖

yum install python-argparse

1)下载 Phoenix

http://archive.apache.org/dist/phoenix/apache-phoenix-4.14.2-HBase-1.3/

2)解压 jar 包

tar -zxvf apache-phoenix-4.14.2-HBase-1.3-bin.tar.gz -C /opt/module

mv apache-phoenix-4.14.2-HBase-1.3-bin phoenix

3)复制 jar 包

复制 HBase 需要用到 server 和 client 2 个 jar 包

cp phoenix-4.14.2-HBase-1.3-server.jar /opt/module/hbase-1.3.1/lib
cp phoenix-4.14.2-HBase-1.3-client.jar /opt/module/hbase-1.3.1/lib

4)分发 jar 包

需要把刚才 copy 的 2个jar 包分发到其他 HBase 节点

5)配置环境变量

export PHOENIX_HOME=/opt/module/phoenix
export PHOENIX_CLASSPATH= P H O E N I X H O M E e x p o r t P A T H = PHOENIX_HOME export PATH= PHOENIXH?OMEexportPATH=PATH:$PHOENIX_HOME/bin

6)vim bin/hbase-site.xml

添加

<property>
    <name>phoenix.schema.isNamespaceMappingEnabled</name>
    <value>true</value>
</property>
<property>
    <name>phoenix.schema.mapSystemTablesToNamespace</name>
    <value>true</value>
</property>

7)启动 hadoop, zookeeper, HBase

8)启动 Phoenix

[root@hadoop103 phoenix]# bin/sqlline.py hadoop102,hadoop103,hadoop104:2181

分流sink之保存到hbase

程序流程分析

在这里插入图片描述

引入依赖

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-spark</artifactId>
    <version>5.0.0-HBase-2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.glassfish</groupId>
            <artifactId>javax.el</artifactId>
        </exclusion>
    </exclusions>
</dependency>

因为要用单独的schema,所以在程序中加入hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop102:8020/HBase</value>
    </property>

    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop102,hadoop103,hadoop104</value>
    </property>

    <property>
        <name>hbase.unsafe.stream.capability.enforce</name>
        <value>false</value>
    </property>

    <property>
        <name>hbase.wal.provider</name>
        <value>filesystem</value>
    </property>

    <property>
        <name>phoenix.schema.isNamespaceMappingEnabled</name>
        <value>true</value>
    </property>

    <property>
        <name>phoenix.schema.mapSystemTablesToNamespace</name>
        <value>true</value>
    </property>
</configuration>

在phoenix中执行

create schema GMALL200821_REALTIME;

DWS层与DWM层的设计

需求梳理

在这里插入图片描述

访客uv计算

UV,全称是Unique Visitor,即独立访客,对于实时计算中,也可以称为DAU(Daily Active User),即每日活跃用户,因为实时计算中的uv通常是指当日的访客数。

那么如何从用户行为日志中识别出当日的访客,那么有两点:

其一,是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用

其二,由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重

package com.atguigu.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.StringValueUtils;

import java.text.SimpleDateFormat;

/**
 * 数据流:MockLog(web/app)->nginx->springboot->kafka->FlinkApp(LogBaseApp)->kafka
 *       FlinkApp(DauApp)->kafka
 * 服务:nginx logger zk kafka DauAPP 消费者(dwm_unique_visit) MockLog
 */
public class DauApp {

    public static void main(String[] args) throws Exception {
        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.1设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/gmall/dwd_log/ck"));
        System.setProperty("HADOOP_USER_NAME", "root");
        //1.2开启ck
        env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L));
        //2、读取kafka dwd_page_log主题创建流
        String groupId = "unique_visit_app";
        String sourceTopic = "dwd_page_log";
        String sinkTopic = "dwm_unique_visit";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaSource(sourceTopic, groupId));

        //3、将每行数据转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    ctx.output(new OutputTag<String>("dirty") {
                    }, value);
                }
            }
        });
        //4、按照mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));

        //5、过滤掉不是今天第一次访问的数据
        SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new UvRichFilterFunction());
        //6、写入DWM层kafka主题中
        filterDS.map(JSON::toString).addSink(MyKafkaUtil.getKafkaSink(sinkTopic));

        //7、启动任务
        env.execute();


    }

    public static class UvRichFilterFunction extends RichFilterFunction<JSONObject> {
        private ValueState<String> firstVisitState;
        private SimpleDateFormat simpleDateFormat;

        @Override
        public void open(Configuration parameters) throws Exception {
            simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
            ValueStateDescriptor<String> stringValueStateDescriptor = new ValueStateDescriptor<>("visit-state", String.class);

            //创建状态ttl配置项
            StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1))
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    .build();
            stringValueStateDescriptor.enableTimeToLive(stateTtlConfig);
            firstVisitState = getRuntimeContext().getState(stringValueStateDescriptor);
        }

        @Override
        public boolean filter(JSONObject value) throws Exception {
            //取出上次访问页面
            String lastPageID = value.getJSONObject("page").getString("last_page_id");
            //判断是否存在上一个页面
            if (lastPageID == null || lastPageID.length() <= 0) {
                //取出状态数据
                String firstVisitDate = firstVisitState.value();
                //取出数据时间
                Long ts = value.getLong("ts");
                String curDate = simpleDateFormat.format(ts);
                if (firstVisitDate == null || firstVisitDate.equals(curDate)) {
                    firstVisitState.update(curDate);
                    return true;
                } else {
                    return false;
                }
            } else {
                return false;
            }
        }
    }
}

跳出明细计算

首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么要抓住几个特征:

1)该页面是用户近期访问的第一个页面

这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。

2) 首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。

这第一个特征的识别很简单,保留last_page_id为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢?

最简单的办法就是Flink自带的CEP技术。这个CEP非常适合通过多条数据组合来识别某个事件。

package com.atguigu.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

public class UserJumpDetailApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/gmall/dwd_log/ck"));
        System.setProperty("HADOOP_USER_NAME", "root");
        env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L));

        //读取kafka dwd_page_log主题数据创建流
        String sourceTopic = "dwd_page_log";
        String groupId = "userJumpDetailApp";
        String sinkTopic = "dwm_user_jump_detail";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, groupId);
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
//        DataStreamSource<String> kafkaDS = env.fromElements(
//                "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":1000000000} ",
//                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":1000000001}",
//                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
//                        "\"home\"},\"ts\":1000000002} ",
//                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
//                        "\"detail\"},\"ts\":1000000020} "
//
//        );


        //提取数据中的时间戳生成watermark
        //老版本默认使用的处理时间语义,新版本默认时间语义为事件时间
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//        WatermarkStrategy<JSONObject> watermarkStrategy = WatermarkStrategy.<JSONObject>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
//            @Override
//            public long extractTimestamp(JSONObject element, long recordTimestamp) {
//                return element.getLong("ts");
//            }
//        });
        //3、将数据转换成JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    ctx.output(new OutputTag<String>("dirty") {
                    }, value);
                }
            }
        });//.assignTimestampsAndWatermarks(watermarkStrategy);


        //4、按照mid进行分区
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));

        //5、定义模式序列
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                //取出上一条页面信息
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        }).followedBy("follow").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId != null && lastPageId.length() > 0;
            }
        }).within(Time.seconds(10));

        //6、将模式序列作用在流上
        PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);

        //7、提取事件和超时事件
        OutputTag<String> timeOutTag = new OutputTag<String>("TimeOut"){};
        SingleOutputStreamOperator<Object> selectDS = patternStream.flatSelect(timeOutTag, new PatternFlatTimeoutFunction<JSONObject, String>() {

            @Override
            public void timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
                out.collect(pattern.get("start").get(0).toString());
            }
        }, new PatternFlatSelectFunction<JSONObject, Object>() {
            @Override
            public void flatSelect(Map<String, List<JSONObject>> pattern, Collector<Object> out) throws Exception {
                //什么都不写,因为匹配上的数据是不需要的数据
            }
        });
        //8、将数据写入kafka
        FlinkKafkaProducer<String> kafkaSink = MyKafkaUtil.getKafkaSink(sinkTopic);
        selectDS.getSideOutput(timeOutTag).addSink(kafkaSink);

        //9、执行任务
        env.execute();

    }
}

订单宽表

image-20210820150640679

创建实体单表

package com.atguigu.gmall.realtime.bean;

import lombok.Data;
import java.math.BigDecimal;

@Data
public class OrderInfo {

    Long id;
    Long province_id;
    String order_status;
    Long user_id;
    BigDecimal total_amount;
    BigDecimal activity_reduce_amount;
    BigDecimal coupon_reduce_amount;
    BigDecimal original_total_amount;
    BigDecimal feight_fee;
    String expire_time;
    String create_time;
    String operate_time;
    String create_date; // 把其他字段处理得到
    String create_hour;
    Long create_ts;
}

创建订单明细实体类

package com.atguigu.gmall.realtime.bean;

import lombok.Data;
import java.math.BigDecimal;

@Data
public class OrderDetail {
    Long id;
    Long order_id ;
    Long sku_id;
    BigDecimal order_price ;
    Long sku_num ;
    String sku_name;
    String create_time;
    BigDecimal split_total_amount;
    BigDecimal split_activity_amount;
    BigDecimal split_coupon_amount;
    Long create_ts;
}

创建OrderWideApp读取订单和订单明细数据

package com.atguigu.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.atguigu.gmall.realtime.bean.OrderDetail;
import com.atguigu.gmall.realtime.bean.OrderInfo;
import com.atguigu.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.text.SimpleDateFormat;

public class OrderWideApp {
    public static void main(String[] args) throws Exception {
        //TODO 0.基本环境准备
        StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度读取kafka分区数据
        env.setParallelism(4);
        /*
        //设置CK相关配置
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        StateBackend fsStateBackend = new FsStateBackend("hdfs://hadoop:8020/gmall/flink/checkpoint/OrderWideApp");
        env.setStateBackend(fsStateBackend);
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        */
        //TODO 1.从Kafka的dwd层接收订单和订单明细数据
        String orderInfoSourceTopic = "dwd_order_info";
        String  orderDetailSourceTopic = "dwd_order_detail";
        String orderWideSinkTopic = "dwm_order_wide";
        String groupId = "order_wide_group";

        //从Kafka中读取数据
        FlinkKafkaConsumer<String> sourceOrderInfo  = MyKafkaUtil.getKafkaSource(orderInfoSourceTopic,groupId);
        FlinkKafkaConsumer<String> sourceOrderDetail  = MyKafkaUtil.getKafkaSource(orderDetailSourceTopic,groupId);
        DataStream<String> orderInfojsonDStream   = env.addSource(sourceOrderInfo);
        DataStream<String> orderDetailJsonDStream   = env.addSource(sourceOrderDetail);


        //对读取的数据进行结构的转换
        DataStream<OrderInfo>  orderInfoDStream   = orderInfojsonDStream.map(
            new RichMapFunction<String, OrderInfo>() {
                SimpleDateFormat simpleDateFormat=null;
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd");
                }
                @Override
                public OrderInfo map(String jsonString) throws Exception {
                    OrderInfo orderInfo = JSON.parseObject(jsonString, OrderInfo.class);
                    orderInfo.setCreate_ts(simpleDateFormat.parse(orderInfo.getCreate_time()).getTime());
                    return orderInfo;
                }
            }
        );
        DataStream<OrderDetail> orderDetailDStream   = orderDetailJsonDStream.map(new RichMapFunction<String, OrderDetail>() {
            SimpleDateFormat simpleDateFormat=null;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd");
            }
            @Override
            public OrderDetail map(String jsonString) throws Exception {
                OrderDetail orderDetail = JSON.parseObject(jsonString, OrderDetail.class);
                orderDetail.setCreate_ts (simpleDateFormat.parse(orderDetail.getCreate_time()).getTime());
                return orderDetail;
            }
        });

        orderInfoDStream.print("orderInfo>>>>>>>");
        orderDetailDStream.print("orderDetail>>>>>>");

        env.execute();
    }
}

创建合并后的宽表实体类

package com.atguigu.gmall.realtime.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.ObjectUtils;

import java.math.BigDecimal;

@Data
@AllArgsConstructor
public class OrderWide {
    Long detail_id;
    Long order_id ;
    Long sku_id;
    BigDecimal order_price ;
    Long sku_num ;
    String sku_name;
    Long province_id;
    String order_status;
    Long user_id;

    BigDecimal total_amount;
    BigDecimal activity_reduce_amount;
    BigDecimal coupon_reduce_amount;
    BigDecimal original_total_amount;
    BigDecimal feight_fee;
    BigDecimal split_feight_fee;
    BigDecimal split_activity_amount;
    BigDecimal split_coupon_amount;
    BigDecimal split_total_amount;

    String expire_time;
    String create_time;
    String operate_time;
    String create_date; // 把其他字段处理得到
    String create_hour;

    String province_name;//查询维表得到
    String province_area_code;
    String province_iso_code;
    String province_3166_2_code;

    Integer user_age ;
    String user_gender;

    Long spu_id;     //作为维度数据 要关联进来
    Long tm_id;
    Long category3_id;
    String spu_name;
    String tm_name;
    String category3_name;

    public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail){
        mergeOrderInfo(orderInfo);
        mergeOrderDetail(orderDetail);

    }

    public void  mergeOrderInfo(OrderInfo orderInfo  )  {
        if (orderInfo != null) {
            this.order_id = orderInfo.id;
            this.order_status = orderInfo.order_status;
            this.create_time = orderInfo.create_time;
            this.create_date = orderInfo.create_date;
            this.activity_reduce_amount = orderInfo.activity_reduce_amount;
            this.coupon_reduce_amount = orderInfo.coupon_reduce_amount;
            this.original_total_amount = orderInfo.original_total_amount;
            this.feight_fee = orderInfo.feight_fee;
            this.total_amount =  orderInfo.total_amount;
            this.province_id = orderInfo.province_id;
            this.user_id = orderInfo.user_id;
        }
    }

    public void mergeOrderDetail(OrderDetail orderDetail  )  {
        if (orderDetail != null) {
            this.detail_id = orderDetail.id;
            this.sku_id = orderDetail.sku_id;
            this.sku_name = orderDetail.sku_name;
            this.order_price = orderDetail.order_price;
            this.sku_num = orderDetail.sku_num;
            this.split_activity_amount=orderDetail.split_activity_amount;
            this.split_coupon_amount=orderDetail.split_coupon_amount;
            this.split_total_amount=orderDetail.split_total_amount;
        }
    }

    public void mergeOtherOrderWide(OrderWide otherOrderWide){
        this.order_status = ObjectUtils.firstNonNull( this.order_status ,otherOrderWide.order_status);
        this.create_time =  ObjectUtils.firstNonNull(this.create_time,otherOrderWide.create_time);
        this.create_date =  ObjectUtils.firstNonNull(this.create_date,otherOrderWide.create_date);
        this.coupon_reduce_amount =  ObjectUtils.firstNonNull(this.coupon_reduce_amount,otherOrderWide.coupon_reduce_amount);
        this.activity_reduce_amount =  ObjectUtils.firstNonNull(this.activity_reduce_amount,otherOrderWide.activity_reduce_amount);
        this.original_total_amount =  ObjectUtils.firstNonNull(this.original_total_amount,otherOrderWide.original_total_amount);
        this.feight_fee = ObjectUtils.firstNonNull( this.feight_fee,otherOrderWide.feight_fee);
        this.total_amount =  ObjectUtils.firstNonNull( this.total_amount,otherOrderWide.total_amount);
        this.user_id =  ObjectUtils.<Long>firstNonNull(this.user_id,otherOrderWide.user_id);
        this.sku_id = ObjectUtils.firstNonNull( this.sku_id,otherOrderWide.sku_id);
        this.sku_name =  ObjectUtils.firstNonNull(this.sku_name,otherOrderWide.sku_name);
        this.order_price =  ObjectUtils.firstNonNull(this.order_price,otherOrderWide.order_price);
        this.sku_num = ObjectUtils.firstNonNull( this.sku_num,otherOrderWide.sku_num);
        this.split_activity_amount=ObjectUtils.firstNonNull(this.split_activity_amount);
        this.split_coupon_amount=ObjectUtils.firstNonNull(this.split_coupon_amount);
        this.split_total_amount=ObjectUtils.firstNonNull(this.split_total_amount);
    }
}

PhoenixUtil

package com.atguigu.utils;

import com.atguigu.common.GmallConfig;
import org.apache.commons.beanutils.BeanUtils;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class PhoenixUtil {
    //声明
    private static Connection connection=null;
    //初始化连接
    private static Connection init(){
        try {
            Class.forName(GmallConfig.PHOENIX_DRIVER);
            Connection connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
            return connection;
        } catch (ClassNotFoundException | SQLException e) {
            e.printStackTrace();
            throw new RuntimeException("获取连接失败");
        }
    }
    public static <T> List<T> queryList(String sql,Class<T>cls){

        if(connection==null)connection=init();

        PreparedStatement preparedStatement=null;
        ResultSet resultSet=null;
        try {
            //编译sql
            preparedStatement = connection.prepareStatement(sql);
            //执行查询
            resultSet = preparedStatement.executeQuery();
            //获取查询结果中的元数据信息
            ResultSetMetaData metaData = resultSet.getMetaData();
            int columnCount = metaData.getColumnCount();
            ArrayList<T>list=new ArrayList<>();
            while (resultSet.next()){
                T t = cls.newInstance();
                for(int i=1;i<=columnCount;i++){
                    BeanUtils.setProperty(t,metaData.getColumnName(i),resultSet.getObject(i));
                }
                list.add(t);
            }
            //返回结果
            return list;

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("查询维度信息");
        }finally {
            if(preparedStatement!=null){
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(resultSet!=null){
                try {
                    resultSet.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {
        
    }
}

DimUtil

package com.atguigu.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;

import java.util.List;

/**
 * select * from where id='' and name='';
 * Redis:
 * 1、存什么数据         维度数据 JsonStr
 * 2、用什么类型         String Set Hash
 * 3、RedisKey的设计    String:tableName+id Set:tableName Hash:tableName
 * t:19:zhangsan
 * 集合方式排除,原因在于我们需要对每条独立的维度数据设置过期时间
 */
public class DimUtil {
    public static JSONObject getDimInfo(String tableName, Tuple2<String,String>...columnValues){
        if(columnValues.length<=0){
            throw new RuntimeException("查询维度数据时,请至少设置一个查询条件!");
        }

        //创建Phoenix Where子句
        StringBuilder whereSql = new StringBuilder(" where ");

        //创建RedisKey
        StringBuilder redisKey = new StringBuilder(tableName).append(":");

        //遍历查询条件并赋值whereSql
        for (int i=0;i<columnValues.length;i++){
            Tuple2<String, String> columnValue = columnValues[i];
            String column=columnValue.f0;
            String value=columnValue.f1;
            whereSql.append(column).append("='").append(value).append("'");
            redisKey.append(value);
            //判断如果不是最后一个条件,则添加”and“
            if(i<columnValues.length-1){
                whereSql.append(" and ");
                redisKey.append(":");
            }
        }
        //获取Redis连接
        Jedis jedis = RedisUtil.getJedis();
        String dimJsonStr = jedis.get(redisKey.toString());
        //判断是否从Redis中查询到数据
        if(dimJsonStr!=null&&dimJsonStr.length()>0){
            jedis.close();
            return JSON.parseObject(dimJsonStr);
        }

        //拼接SQL
        String querySql = "select * from " + tableName + whereSql.toString();
        System.out.println(querySql);
        //查询Phoenix中的味素数据
        List<JSONObject> queryList = PhoenixUtil.queryList(querySql, JSONObject.class);
        JSONObject dimJsonObj=queryList.get(0);

        //将数据写入Redis
        jedis.set(redisKey.toString(),dimJsonObj.toString());
        jedis.expire(redisKey.toString(),24*60*60);

        return dimJsonObj;
    }
    //根据key让Redis中的缓存失效
    public static  void deleteCached( String tableName, String id){
        String key = tableName.toUpperCase() + ":" + id;
        try {
            Jedis jedis = RedisUtil.getJedis();
            // 通过key清除缓存
            jedis.del(key);
            jedis.close();
        } catch (Exception e) {
            System.out.println("缓存异常!");
            e.printStackTrace();
        }
    }

    public static JSONObject getDimInfo(String tableName,String value){
        return getDimInfo(tableName,new Tuple2<>("id",value));
    }

    public static void main(String[] args) {
        System.out.println(getDimInfo("GMALL200821_REALTIME.DIM_BASE_DIC", new Tuple2<>("id", "32432")));
    }
}

RedisUtil

package com.atguigu.utils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {

    public static JedisPool jedisPool = null;

    public static Jedis getJedis() {

        if (jedisPool == null) {
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxTotal(100); //最大可用连接数
            jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
            jedisPoolConfig.setMaxWaitMillis(2000); //等待时间

            jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
            jedisPoolConfig.setMinIdle(5); //最小闲置连接数

            jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong

            jedisPool = new JedisPool(jedisPoolConfig, "hadoop103", 6379, 1000);
            System.out.println("开辟连接池");
            return jedisPool.getResource();

        } else {
  //          System.out.println(" 连接池:" + jedisPool.getNumActive());
            return jedisPool.getResource();
        }
    }

}

异步查询

在这里插入图片描述

异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。

这种方式特别针对涉及网络IO的操作,减少因为请求等待带来的消耗。

orderWideApp

package com.atguigu.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DimAsyncFunction;
import com.atguigu.bean.OrderDetail;
import com.atguigu.bean.OrderInfo;
import com.atguigu.bean.OrderWide;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.concurrent.TimeUnit;

/**
 * Mock -> mysql(binLog)->maxwell->kafka(ods_base_db_m)->DbBaseApp(修改配置,phoenix)
 * ->kafka(dwd_order_info,dwd_order_detail)->orderWideApp
 */
public class OrderWideApp {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/gmall/dwm_log/ck"));
//        System.setProperty("HADOOP_USER_NAME", "root");
//        env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(6000L);

        //2.读取kafka订单和订单明细主题数据dwd_order_info dwd_order_detail
        String orderInfoSourceTopic = "dwd_order_info";
        String orderDetailSourceTopic = "dwd_order_detail";
        String orderWideSinkTopic = "dwm_order_wide";
        String groupId = "order_wide_group";

        FlinkKafkaConsumer<String> orderInfoKafkaSource = MyKafkaUtil.getKafkaSource(orderInfoSourceTopic, groupId);
        DataStreamSource<String> orderInfoKafkaDS = env.addSource(orderInfoKafkaSource);
        DataStreamSource<String> orderDetailKafkaDS = env.addSource(MyKafkaUtil.getKafkaSource(orderDetailSourceTopic, groupId));

        //3.将每行数据转换为JavaBean,提取时间戳生成watermark
        WatermarkStrategy<OrderInfo> orderInfoWatermarkStrategy = WatermarkStrategy.<OrderInfo>forMonotonousTimestamps().
                withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
                    @Override
                    public long extractTimestamp(OrderInfo element, long recordTimestamp) {
                        return element.getCreate_ts();
                    }
                });
        WatermarkStrategy<OrderDetail> orderDetailWatermarkStrategy = WatermarkStrategy.<OrderDetail>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
                    @Override
                    public long extractTimestamp(OrderDetail element, long recordTimestamp) {
                        return element.getCreate_ts();
                    }
                });
        KeyedStream<OrderInfo, Long> orderInfoWithKeyedStream = orderInfoKafkaDS.map(jsonStr -> {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //将JSON字符串装换成JavaBean
            OrderInfo orderInfo = JSON.parseObject(jsonStr, OrderInfo.class);
            //取出创建时间字段
            String create_time = orderInfo.getCreate_time();
            //按照空格分割
            String[] createTimeArr = create_time.split(" ");

            orderInfo.setCreate_date(createTimeArr[0]);
            orderInfo.setCreate_hour(createTimeArr[1]);
            orderInfo.setCreate_ts(simpleDateFormat.parse(create_time).getTime());
            return orderInfo;
        }).assignTimestampsAndWatermarks(orderInfoWatermarkStrategy)
                .keyBy(OrderInfo::getId);

        KeyedStream<OrderDetail, Long> orderDetailWithKeyedStream = orderDetailKafkaDS.map(jsonStr -> {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            OrderDetail orderDetail = JSON.parseObject(jsonStr, OrderDetail.class);
            orderDetail.setCreate_ts(sdf.parse(orderDetail.getCreate_time()).getTime());
            return orderDetail;
        }).assignTimestampsAndWatermarks(orderDetailWatermarkStrategy)
                .keyBy(OrderDetail::getOrder_id);
        //4、双流Join
        SingleOutputStreamOperator<OrderWide> orderWideDS = orderInfoWithKeyedStream.intervalJoin(orderDetailWithKeyedStream)
                .between(Time.seconds(-5), Time.seconds(5))//生产环境,为了不丢数据,设置时间为最大网络延迟时间
                .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo left, OrderDetail right, Context ctx, Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(left, right));
                    }
                });
        //测试打印
        //orderWideDS.print(">>>>>>");

        //5、关联维度
        //5.1、关联用户维度
        SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait(orderWideDS,
                new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
                    @Override
                    public String getKey(OrderWide input) {
                        return input.getUser_id().toString();
                    }

                    @Override
                    public void join(OrderWide input, JSONObject dimInfo) throws ParseException {
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                        //取出用户维度中的生日
                        String birthday = dimInfo.getString("BIRTHDAY");
                        long currentTs = System.currentTimeMillis();
                        long ts = sdf.parse(birthday).getTime();

                        //将生日字段处理成年纪
                        long ageLong = (currentTs - ts) / 1000L / 60 / 60 / 24 / 365;
                        input.setUser_age((int) ageLong);

                        //取出用户维度的性别中的性别
                        String gender = dimInfo.getString("GENDER");
                        input.setUser_gender(gender);

                    }
                }, 60, TimeUnit.SECONDS);

        //orderWideWithUserDS.print();

        //5.2、关联地区维度
        SingleOutputStreamOperator<OrderWide> oderWideWithProvinceDS = AsyncDataStream.unorderedWait(orderWideWithUserDS,
                new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
                    @Override
                    public String getKey(OrderWide orderWide) {
                        return orderWide.getProvince_id().toString();
                    }

                    @Override
                    public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException {
                        //提取维度信息并设置orderWide
                        orderWide.setProvince_name(dimInfo.getString("NAME"));
                        orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE"));
                        orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE"));
                        orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2"));

                    }
                }, 60, TimeUnit.SECONDS);

        //5.3、关联sku维度S
        SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait(
                oderWideWithProvinceDS, new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
                    @Override
                    public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
                        orderWide.setSku_name(jsonObject.getString("SKU_NAME"));
                        orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID"));
                        orderWide.setSpu_id(jsonObject.getLong("SPU_ID"));
                        orderWide.setTm_id(jsonObject.getLong("TM_ID"));
                    }

                    @Override
                    public String getKey(OrderWide orderWide) {
                        return String.valueOf(orderWide.getSku_id());
                    }
                }, 60, TimeUnit.SECONDS);


        //5.4、关联spu维度
        SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait(
                orderWideWithSkuDS, new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
                    @Override
                    public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                        orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
                    }

                    @Override
                    public String getKey(OrderWide orderWide) {
                        return String.valueOf(orderWide.getSpu_id());
                    }
                }, 60, TimeUnit.SECONDS);


        //5.5关联品类维度
        SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
                orderWideWithSpuDS, new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
                    @Override
                    public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                        orderWide.setCategory3_name(jsonObject.getString("NAME"));
                    }

                    @Override
                    public String getKey(OrderWide orderWide) {
                        return String.valueOf(orderWide.getCategory3_id());
                    }
                }, 60, TimeUnit.SECONDS);


        //5.5关联品牌维度
        SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait(
                orderWideWithCategory3DS, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
                    @Override
                    public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                        orderWide.setTm_name(jsonObject.getString("TM_NAME"));
                    }

                    @Override
                    public String getKey(OrderWide orderWide) {
                        return String.valueOf(orderWide.getTm_id());
                    }
                }, 60, TimeUnit.SECONDS);


        //6、写入数据到kafka dwm_order_wide
        orderWideWithTmDS.map(JSON::toJSONString).addSink(MyKafkaUtil.getKafkaSink(orderWideSinkTopic));
        //7、开启任务
        env.execute();
    }
}

DimAsyncFunction

package com.atguigu.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.OrderWide;
import com.atguigu.utils.DimUtil;
import com.atguigu.utils.ThreadPoolUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;

public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {

    //声明线程池对象
    private ThreadPoolExecutor threadPoolExecutor;

    //声明属性
    private String tableName;

    public DimAsyncFunction(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化线程池
        threadPoolExecutor = ThreadPoolUtil.getInstance();

    }

    @Override
    public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                //0、获取取查询条件
                String key = getKey(input);
                //1、查询维度信息
                JSONObject dimInfo = DimUtil.getDimInfo(tableName, key);

                //2、关联到事实数据上
                if (dimInfo != null && dimInfo.size() > 0) {
                    try {
                        join(input, dimInfo);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                //3、继续向下游传输
                resultFuture.complete(Collections.singletonList(input));
            }
        });

    }

}

DimJoinFunction

package com.atguigu.app.func;

import com.alibaba.fastjson.JSONObject;

import java.text.ParseException;

public interface DimJoinFunction<T> {
    //获取数据所要关联维度的主键
    String getKey(T input);
    //关联事实数据和维度数据
    void join(T input, JSONObject dimInfo) throws ParseException, Exception;
}

ThreadPoolUtil

package com.atguigu.utils;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolUtil {

    //声明线程池
    public static ThreadPoolExecutor pool;

    private ThreadPoolUtil() {

    }

    //单例对象
    public static ThreadPoolExecutor getInstance() {
        if (pool == null) {
            synchronized (ThreadPoolUtil.class) {
                if (pool == null) {
    /*
    获取单例的线程池对象
    corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
    maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
    keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
    unit:keepAliveTime的单位
    workQueue:任务队列,被添加到线程池中,但尚未被执行的任务
    */
                    pool = new ThreadPoolExecutor(3,
                            3,
                            300L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE));

                }
            }
        }
        return pool;
    }


}

初始化数据到Hbase

1、启动Maxwell、ZK、Kafka、HDFS、Hbase、Redis

2、运行运行Idea中的BaseDBApp

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table user_info --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table base_province --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table sku_info --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table spu_info --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table base_category3 --client_id maxwell_1

bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop103 --database gmall-flink-200821 --table base_trademark --client_id maxwell_1

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-23 16:45:39  更:2021-08-23 16:47:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年10日历 -2024/10/25 22:23:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码