IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 实仓(一) -> 正文阅读

[大数据]实仓(一)

实时数仓

实时计算与实时数仓比较

普通的实时计算优先考虑时效性,所以从数据源采集经过实时计算直接得到结果。如此做时效性更好,但是弊端是由于计算过程中的中间结果没有沉淀下来,所以当面对大量实时需求的时候,计算的复用性较差,开发成本随着需求增加直线上升
在这里插入图片描述

实时数仓基于一定的数据仓库理念,对数据处理流程进行规划、分层,目的是提高数据的复用性。
在这里插入图片描述

实时数仓分层

ODS

原始数据,日志和业务数据

DWD

根据数据对象为单位进行分流,比如订单、页面访问等等

DIM

维度数据

DWM

对于部分数据对象进行进一步加工,比如独立访问、跳出行为,也可以和维度进行关联,形成宽表,依旧是明细数据。

DWS

根据某个主题将多个事实数据轻度聚合,形成主题宽表。

ADS

把Clickhouse中的数据根据可视化需要进行筛选聚合

在这里插入图片描述

日志采集

日志采集模块

创建Springboot模块

在创建目录/opt/module/gmall_flink/rt_log

将生成行为数据的jar放在该目录下

打包单机部署

logback.xml文件

将logback.xml文件放到resource目录下

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="d:/logs" />
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 将某一个包下日志单独打印日志 -->
    <logger name="com.atguigu.gmalllogger.controller.LoggerController"
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error" additivity="false">
        <appender-ref ref="console" />
    </root>
</configuration>

发送文件到Kafka

package com.atguigu.gmalllogger.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController//表示返回普通对象而不是页面
@Slf4j
public class LoggerController {

    @Autowired
    private KafkaTemplate <String,String> kafkaTemplate;


    @RequestMapping("applog")
    public String getLogger(@RequestParam("param")String jsoStr){
        //落盘
        log.info(jsoStr);
        //写入kafka
        kafkaTemplate.send("ods_base_log",jsoStr);
        return "success";
    }
}

kafka中创建主题

[root@hadoop103 kafka]# bin/kafka-topics.sh --zookeeper hadoop103:2181 --create --topic ods_base_log --partitions 2 --replication-factor 2

启动程序

创建消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic ods_base_log

开一个新的窗口

执行java -jar gmall2020-mock-log-2020-12-18.jar

在这里插入图片描述

修改logback.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="/opt/module/gmall-flink/rt_apploglogs" />
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 将某一个包下日志单独打印日志 -->
    <logger name="com.atguigu.gmalllogger.controller.LoggerController"
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error" additivity="false">
        <appender-ref ref="console" />
    </root>
</configuration>

打包

只需要gmall-logger模块

gmall-logger-0.0.1-SNAPSHOT.jar
在这里插入图片描述

将生成的jar包放置在/opt/module/gmall-flink/rt_applog/

修改application.yml文件

# 外部配置打开
# logging.config=./logback.xml
#业务日期
mock.date: "2020-12-18"

  #模拟数据发送模式
mock.type: "http"
  #http模式下,发送的地址
mock.url: "http://localhost:8081/applog"

#mock:
#  kafka-server: "hdp1:9092,hdp2:9092,hdp3:9092"
#  kafka-topic: "ODS_BASE_LOG"

#启动次数
mock.startup.count: 1000
  #设备最大值
mock.max.mid: 20
  #会员最大值
mock.max.uid: 50
  #商品最大值
mock.max.sku-id: 10
  #页面平均访问时间
mock.page.during-time-ms: 20000
  #错误概率 百分比
mock.error.rate: 3
  #每条日志发送延迟 ms
mock.log.sleep: 100
  #商品详情来源  用户查询,商品推广,智能推荐, 促销活动
mock.detail.source-type-rate: "40:25:15:20"

#领取购物券概率
mock.if_get_coupon_rate: 75

#购物券最大id
mock.max.coupon-id: 3

  #搜索关键词  
mock.search.keyword: "图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子"



测试

开三个窗口

分别执行

[root@hadoop103 kafka]#java -jar gmall-logger-0.0.1-SNAPSHOT.jar

[root@hadoop103 kafka]# java -jar gmall2020-mock-log-2020-12-18.jar

[root@hadoop103 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic ods_base_log

观察是否有数据产生

打包集群部署

搭建Nginc环境

Nginx (“engine x”) 是一个高性能的HTTP和反向代理服务器,特点是占有内存少,并发能力强,事实上nginx的并发能力确实在同类型的网页服务器中表现较好,中国大陆使用nginx网站用户有:百度、京东、新浪、网易、腾讯、淘宝等。

正向代理和反向代理

正向代理类似一个跳板机,代理访问外部资源。比如:我是一个用户,我访问不了某网站,但是我能访问一个代理服务器,这个代理服务器,它能访问那个我不能访问的网站,于是我先连上代理服务器,告诉它我需要那个无法访问网站的内容,代理服务器去取回来,然后返回给我。

反向代理(Reverse Proxy)方式是指以代理服务器来接受internet上的连接请求,然后将请求转发给内部网络上的服务器,并将从服务器上得到的结果返回给internet上请求连接的客户端,此时代理服务器对外就表现为一个反向代理服务器;

nginc的安装

在hadoop103上运行yum,安装相关依赖包

yum -y install openssl openssl-devel pcre pcre-devel zlib zlib-devel gcc gcc-c++

将nginx-1.12.2.tar.gz放到/opt/soft目录下

tar -zxvf nginx-1.12.2.tar.gz 解压到soft目录下

进入/opt/module/soft/nginx-1.12.2目录下

执行

[root@hadoop103 nginx-1.12.2]# ./configure --prefix=/opt/module/nginx

[root@hadoop103 nginx-1.12.2]# make && make install

启动nginx

1)在/opt/module/nginx/sbin目录下执行 ./nginx

[root@hadoop103 sbin]# ./nginx

如果在普通用户下面启动会报错

原因:nginx占用80端口,默认情况下非root用户不允许使用1024以下端口

解决:让当前用户的某个应用也可以使用1024以下的端口

sudo setcap cap_net_bind_service=+eip /opt/module/nginx/sbin/nginx

注意:要根据自己的实际路径进行配置

2)查看启动情况

ps -ef |grep nginx

因为nginx不是用java写的,所以不能通过jps查看

3) 在浏览器中输入http://hadoop102/访问

nginx常用命令

重启Nginx

./nginx -s reload

关闭Nginx

./nginx -s stop

通过配置文件启动

./nginx -c /opt/module/nginx/conf/nginx.conf

/opt/module/nginx/sbin/nginx -c /opt/module/nginx/conf/nginx.conf

其中-c是指定配置文件,而且配置文件路径必须指定绝对路径

配置检查

当修改Nginx配置文件后,可以使用Nginx命令进行配置文件语法检查,用于检查Nginx配置文件是否正确

/opt/module/nginx/sbin/nginx -c /opt/module/nginx/conf/nginx.conf –t

配置nginx

修改nginx.conf配置文件

在server内部配置

#user  nobody;
worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;
    upstream logserver{
	server hadoop102:8081 weight=1;
	server hadoop103:8081 weight=1;
	server hadoop104:8081 weight=1;
}

	
    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

    server {
        listen       80;
        server_name  logserver;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location / {
            root   html;
            index  index.html index.htm;
	    proxy_pass http://logserver;
	    proxy_connect_timeout 10;
        }

        #error_page  404              /404.html;

        # redirect server error pages to the static page /50x.html
        #
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }

        # proxy the PHP scripts to Apache listening on 127.0.0.1:80
        #
        #location ~ \.php$ {
        #    proxy_pass   http://127.0.0.1;
        #}

        # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
        #
        #location ~ \.php$ {
        #    root           html;
        #    fastcgi_pass   127.0.0.1:9000;
        #    fastcgi_index  index.php;
        #    fastcgi_param  SCRIPT_FILENAME  /scripts$fastcgi_script_name;
        #    include        fastcgi_params;
        #}

        # deny access to .htaccess files, if Apache's document root
        # concurs with nginx's one
        #
        #location ~ /\.ht {
        #    deny  all;
        #}
    }


    # another virtual host using mix of IP-, name-, and port-based configuration
    #
    #server {
    #    listen       8000;
    #    listen       somename:8080;
    #    server_name  somename  alias  another.alias;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}


    # HTTPS server
    #
    #server {
    #    listen       443 ssl;
    #    server_name  localhost;

    #    ssl_certificate      cert.pem;
    #    ssl_certificate_key  cert.key;

    #    ssl_session_cache    shared:SSL:1m;
    #    ssl_session_timeout  5m;

    #    ssl_ciphers  HIGH:!aNULL:!MD5;
    #    ssl_prefer_server_ciphers  on;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}

}

在这里插入图片描述

在这里插入图片描述

采集日志的jar包同步到其它机器

[root@hadoop103 module]# xsync gmall-flink

集群群起脚本(logger.sh)

#!/bin/bash
JAVA_BIN= /opt/module/jdk1.8.0_144/bin/java
APPNAME=gmall-logger-0.0.1-SNAPSHOT.jar

case $1 in
 "start")
   {

    for i in hadoop102 hadoop103 hadoop104
    do
     echo "========: $i==============="
    ssh $i   "/opt/module/jdk1.8.0_144/bin/java -Xms32m -Xmx64m -jar /opt/module/gmall-flink/rt_applog/gmall-logger-0.0.1-SNAPSHOT.jar >/dev/null 2>&1  &"
    done
  };;
  "stop")
  {
    for i in  hadoop102 hadoop103 hadoop104
    do
     echo "========: $i==============="
     ssh $i "ps -ef|grep $APPNAME |grep -v grep|awk '{print \$2}'|xargs kill" >/dev/null 2>&1
    done
  };;
   esac

修改模拟日志生成的配置

# 外部配置打开
# logging.config=./logback.xml
#业务日期
mock.date: "2020-12-18"

  #模拟数据发送模式
mock.type: "http"
  #http模式下,发送的地址
mock.url: "http://hadoop103/applog"

#mock:
#  kafka-server: "hdp1:9092,hdp2:9092,hdp3:9092"
#  kafka-topic: "ODS_BASE_LOG"

#启动次数
mock.startup.count: 1000
  #设备最大值
mock.max.mid: 20
  #会员最大值
mock.max.uid: 50
  #商品最大值
mock.max.sku-id: 10
  #页面平均访问时间
mock.page.during-time-ms: 20000
  #错误概率 百分比
mock.error.rate: 3
  #每条日志发送延迟 ms
mock.log.sleep: 100
  #商品详情来源  用户查询,商品推广,智能推荐, 促销活动
mock.detail.source-type-rate: "40:25:15:20"

#领取购物券概率
mock.if_get_coupon_rate: 75

#购物券最大id
mock.max.coupon-id: 3

  #搜索关键词  
mock.search.keyword: "图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子"



测试

1)运行kafka消费

bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic ods_base_log

2)启动nginx服务采集服务集群

logger.sh start

3)运行模拟生成数据的jar

[root@hadoop102 rt_applog]# java -jar gmall2020-mock-log-2020-12-18.jar

业务数据采集

MySQL的准备

创建数据库

名字:gmall-flink-200821

编码:utf-8

导入建表数据

导入sql文件

修改/etc/my.cnf文件

[root@hadoop103]# vim /etc/my.cnf

server-id = 103
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall-flink-200821

重启mysql

模拟生成数据

创建目录

[root@hadoop103 module]# mkdir rt_db

在这里插入图片描述

上传到该目录下

配置application.properties文件

修改mysql 的密码

或配置其它的一些信息

安装Maxwell

将maxwell-1.25.0.tar.gz上传到/opt/soft目录下

解压

初始化Maxwell元数据库

登入mysql

1)在MySQL中建立一个maxwell库用于存储Maxwell的元数据

mysql> create database maxwell;

2)设置安全级别

mysql> set global validate_password_length=4;

mysql> set global validate_password_policy=0;

3)分配一个账号可以操作该数据库

mysql> GRANT ALL ON maxwell.* TO ‘maxwell’@’%’ IDENTIFIED BY ‘123456’;

4)分配这个账号可以监控其他数据库的权限

mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON . TO maxwell@’%’;

使用maxwell监控抓取mysql数据

[root@hadoop103 maxwell-1.25.0]# cp config.properties.example config.properties

[root@hadoop103 maxwell-1.25.0]# vim config.properties

# tl;dr config
log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka_topic=ods_base_db_m

# mysql login info
host=hadoop103
user=maxwell
password=123456

client_id=maxwell_1
producer_partition_by=primary_key

启动maxwell脚本(maxwell.sh)

/opt/module/maxwell-1.25.0/bin/maxwell --config  /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &

测试

maxwell.sh

bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic ods_base_db_m

java -jar gmall2020-mock-db-2020-11-27.jar

观察是否有数据生成

安装canal

将canal.deployer-1.1.4.tar.gz上传到/opt/module目录下

解压到/opt/module/canal

注:canal解压后是散的所以要提前创建目录canal

修改conf/canal.properties的配置

[root@hadoop103 conf]# vim canal.properties

修改

canal.serverMode = kafka


canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
canal.mq.retries = 2
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all

修改instance.properties

[root@hadoop103 example]# pwd
/opt/module/canal/conf/example

修改

# mq config
canal.mq.topic=ods_base_db_c
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3

测试

bin/startup.sh

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_base_db_c

造数据

dwd层数据准备

需求分析及实现思路

每层的职能

分层数据描述生成计算工具存储媒介
ODS原始数据,日志和业务数据日志服务器,maxwellkafka
DWD根据数据对象为单位进行分流,比如订单、页面访问等等。FLINKkafka
DWM对于部分数据对象进行进一步加工,比如独立访问、跳出行为。依旧是明细数据。FLINKkafka
DIM维度数据FLINKHBase
DWS根据某个维度主题将多个事实数据轻度聚合,形成主题宽表。FLINKClickhouse
ADS把Clickhouse中的数据根据可视化需要进行筛选聚合。Clickhouse SQL可视化展示

DWD层数据准备实现思路

功能1:环境搭建

功能2:计算用户行为日志DWD层

功能3:计算业务数据DWD层

环境搭建

在工程中新建模块gmall-realtime

app产生各层数据的flink任务
bean数据对象
common公共常量
utils工具类

添加依赖

<properties>
<java.version>1.8</java.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
    <flink.version>1.12.0</flink.version>
    <scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.68</version>
    </dependency>

<!--如果保存检查点到hdfs上,需要引入此依赖-->
<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>

<!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>

<dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

在resources目录下创建log4j.properties配置文件

log4j.rootLogger=warn,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

准备用户行为日志DWD层

我们前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从kafka的ODS层读取的日志数据分为3类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。

页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流

主要任务

识别新老用户

利用测输出流实现数据的拆分

将不同流的数据推送到下游kafka的不同topic中

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>gmall-flink-200821</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>gmall-realtime</artifactId>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.12.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--如果保存检查点到hdfs上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

代码

app

package com.atguigu.app.dwd;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;

public class LogBaseApp {
    public static void main(String[] args) throws Exception {
        //1、获取执行环境,设置并行度,开启ck,设置状态后端(hdfs)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//为kafka主题的分区数
        //1.1设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/gmall/dwd_log/ck"));
        //1.2开启ck
        env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000L));

        //修改用户名
        System.setProperty("HADOOP_USER_NAME", "root");

        //2、读取kafka ods_base_log 主题数据
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource("ods_base_log", "dwd_log");
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //3、将每行数据转换成JsonObject
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSONObject::parseObject);
        //4、按照mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(data -> data.getJSONObject("common").getString("mid"));
        //5、使用状态做新老用户校验
        SingleOutputStreamOperator<JSONObject> jsonWithNewFlagDS = keyedStream.map(new NewMidRichMapFunction());
        //打印测试
        //jsonWithNewFlagDS.print();

        //6、分流,使用ProcessFunction将ODS数据拆分成启动曝光以及页面数据
        SingleOutputStreamOperator<String> pageDS = jsonWithNewFlagDS.process(new SplitProcessFunction());

        //7、将三个流的数据写入对应kafka主题
        DataStream<String> startDS = pageDS.getSideOutput(new OutputTag<String>("start") {
        });
        DataStream<String> displayDS = pageDS.getSideOutput(new OutputTag<String>("display") {
        });

        //打印测试
//        pageDS.print("Page>>>>>>>");
//        startDS.print("Start>>>>>>>>>");
//        displayDS.print("Display>>>>>>>>");
        pageDS.addSink(MyKafkaUtil.getKafkaSink("dwd_page_log"));
        startDS.addSink(MyKafkaUtil.getKafkaSink("dwd_start_log"));
        displayDS.addSink(MyKafkaUtil.getKafkaSink("dwd_display_log"));

        //8、执行任务
        env.execute("");
    }

    public static class NewMidRichMapFunction extends RichMapFunction<JSONObject, JSONObject> {
        //声明状态用于表示当前mid是否已经访问过
        private ValueState<String> firstVisitDateState;
        private SimpleDateFormat simpleDateFormat;

        @Override
        public void open(Configuration parameters) throws Exception {
            firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("new-mid", String.class));
            simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
        }

        @Override
        public JSONObject map(JSONObject value) throws Exception {
            //提取新用户标记
            String isNew = value.getJSONObject("common").getString("is_new");
            //如果当前前端传输数据表示为新用户,则进行校验
            if ("1".equals(isNew)) {
                //取出状态数据并取出当前访问时间
                String firstDate = firstVisitDateState.value();
                long ts = value.getLong("ts");

                //判断状态数据是否为null
                if (firstDate != null) {
                    //修复
                    value.getJSONObject("common").put("is_new", "0");
                } else {
                    //更新状态
                    firstVisitDateState.update(simpleDateFormat.format(ts));
                }
            }
            return value;
        }
    }

    public static class SplitProcessFunction extends ProcessFunction<JSONObject, String> {

        @Override
        public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
            //获取“sort”字段
            String startStr = value.getString("start");

            //判断是否为启动数据
            if (startStr != null && startStr.length() > 0) {
                //将启动日志输出到侧输出流
                ctx.output(new OutputTag<String>("start") {
                }, value.toString());
            } else {
                //不是启动数据,继续判断是否是曝光数据
                JSONArray displays = value.getJSONArray("displays");
                if (displays != null && displays.size() > 0) {
                    //曝光数据,遍历写入侧输出流
                    for (int i = 0; i < displays.size(); i++) {
                        //取出单条曝光数据
                        JSONObject displaysJSON = displays.getJSONObject(i);
                        //添加页面id
                        displaysJSON.put("page_id", value.getJSONObject("page").getString("page_id"));
                        //输出到侧输出流
                        ctx.output(new OutputTag<String>("display") {
                        }, displaysJSON.toString());
                    }
                } else {
                    //为页面数据,将数据输出到主流
                    out.collect(value.toString());
                }
            }
        }

    }
}

util

package com.atguigu.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class MyKafkaUtil {
    private static String KAFKA_SERVER = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
    private static Properties properties = new Properties();

    static {
        properties.setProperty("bootstrap.servers", KAFKA_SERVER);
    }

    /**
     * 获取KafkaSource的方法
     *
     * @param groupId 消费者组
     * @parm topic 主题
     */
    public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {

        //给配置信息对象添加配置项
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //获取KafkaSource
        return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
    }

    public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
        return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);
    }
}

测试

日志数据dwd层测试所需进程

mock、nginx、Logger、kafka(zk)、flinkApp(Hdfs)、kafka(三个消费者)

准备业务数据DWD层

业务数据的变化,我们可以通过MaxWell采集到,但是MaxWell是把全部数据统一写入一个Topic中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka的业务数据ODS层读取数据,经过处理后,将维度数据保存到HBase,将事实数据写回Kafka作为业务数据的DWD层。

主要任务

接收kafka数据,过滤空值数据

对MaxWell抓取数据进行ETL,有用的部分保留,没用的过滤掉

实现动态分流功能

由于MaxWell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。

在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为Flink实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?

这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。

这种可以有两个方案实现

一种是用Zookeeper存储,通过Watch感知数据变化。

另一种是用mysql数据库存储,周期性的同步。

这里选择第二种方案,主要是mysql对于配置数据初始化和维护管理,用sql都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。

把分好的流保存到对应表,主题中

业务数据保存到Kafka的主题中

维度数据保存到HBase的表中

根据mysql的配置表,动态进行分流

1)gmall-realtime-200821

2)建表

CREATE TABLE `table_process` (
  `source_table` varchar(200) NOT NULL COMMENT '来源表',
  `operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
   `sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
  `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
  `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
  `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
  `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`,`operate_type`)
)

3)创建表配置实体类

import lombok.Data;

@Data
public class TableProcess {
//动态分流Sink常量
    public static final String SINK_TYPE_HBASE = "HBASE";
    public static final String SINK_TYPE_KAFKA = "KAFKA";
    public static final String SINK_TYPE_CK = "CLICKHOUSE";
    //来源表
    String sourceTable;
    //操作类型 insert,update,delete
    String operateType;
    //输出类型 hbase kafka
    String sinkType;
    //输出表(主题)
    String sinkTable;
    //输出字段
    String sinkColumns;
    //主键字段
    String sinkPk;
    //建表扩展
    String sinkExtend;
}

4)编写操作Mysql操作类

package com.atguigu.gmall.realtime.utils;import com.atguigu.gmall.realtime.bean.TableProcess;import com.google.common.base.CaseFormat;import org.apache.commons.beanutils.BeanUtils;import java.sql.*;import java.util.ArrayList;import java.util.List;public class MySQLUtil {    /**     * mysql查询方法,根据给定的class类型 返回对应类型的元素列表     *     * @param sql     * @param clazz     * @param underScoreToCamel 是否把对应字段的下划线名转为驼峰名     * @param <T>     * @return     */    public static <T> List<T> queryList(String sql, Class<T> clazz, Boolean underScoreToCamel) {        Connection conn = null;        PreparedStatement ps = null;        ResultSet rs = null;        try {            //注册驱动            Class.forName("com.mysql.jdbc.Driver");            //建立连接            conn = DriverManager.getConnection(                    "jdbc:mysql://hadoop102:3306/gmall2021_realti me?characterEncoding=utf-8&useSSL=false",                    "root",                    "000000");            //创建数据库操作对象            ps = conn.prepareStatement(sql);            //执行SQL语句            rs = ps.executeQuery();            //处理结果集            ResultSetMetaData md = rs.getMetaData();            //声明集合对象,用于封装返回结果            List<T> resultList = new ArrayList<T>();            //每循环一次,获取一条查询结果            while (rs.next()) {                //通过反射创建要将查询结果转换为目标类型的对象                T obj = clazz.newInstance();                //对查询出的列进行遍历,每遍历一次得到一个列名                for (int i = 1; i <= md.getColumnCount(); i++) {                    String propertyName = md.getColumnName(i);                    //如果开启了下划线转驼峰的映射,那么将列名里的下划线转换为属性的打                    if (underScoreToCamel) {                        //直接调用Google的guava的CaseFormat  LOWER_UNDERSCORE小写开头+下划线->LOWER_CAMEL小写开头+驼峰                        propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, propertyName);                    }                    //调用apache的commons-bean中的工具类,给Bean的属性赋值                    BeanUtils.setProperty(obj, propertyName, rs.getObject(i));                }                resultList.add(obj);            }            return resultList;        } catch (Exception e) {            e.printStackTrace();            throw new RuntimeException("查询mysql失败!");        }finally {            //释放资源            if(rs!=null){                try {                    rs.close();                } catch (SQLException e) {                    e.printStackTrace();                }            }            if(ps!=null){                try {                    ps.close();                } catch (SQLException e) {                    e.printStackTrace();                }            }            if(conn!=null){                try {                    conn.close();                } catch (SQLException e) {                    e.printStackTrace();                }            }        }    }    /**     * 测试验证     * @param args     */    public static void main(String[] args) {        List<TableProcess> tableProcesses = queryList("select * from table_process", TableProcess.class, true);        for (TableProcess tableProcess : tableProcesses) {            System.out.println(tableProcess);        }    }}

5)需要导入的依赖

<!--lomback插件依赖--><dependency>
    <groupId>org.projectlombok</groupId>    <artifactId>lombok</artifactId> 
       <version>1.18.12</version>    
       <scope>provided</scope></dependency>
       <!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。使用commons-beanutils,我们可以很方便的对bean对象的属性进行操作--><dependency>   
        <groupId>commons-beanutils</groupId>    <artifactId>commons-beanutils</artifactId>    <version>1.9.3</version>
        </dependency>
        <!--Guava工程包含了若干被Google的Java项目广泛依赖的核心库,方便开发-->
        <dependency>    <
        groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>    
        <version>29.0-jre</version>
        </dependency>
        <dependency>    
        <groupId>mysql</groupId>    
        <artifactId>mysql-connector-java</artifactId>    <version>5.1.47</version>
        </dependency>
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章           查看所有文章
加:2021-08-16 11:48:52  更:2021-08-16 11:52:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:00:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码