背景
用户行为统计是常见需求,当用户点击某按钮或者进入某页面的时候将行为日志发送给后端进行运算保存。通过用户行为日志分析可以更好地捕捉用户喜好,开发出让用户喜欢的产品。
时序图
架构图
环境准备
- jdk 1.8
- zookeeper版本:3.4.14
- kafka版本:kafka_2.12-2.3.1
- nginx版本:nginx/1.18.0
- 操作系统:CentOS Linux release 7.4.1708 (Core)
安装zookeeper
安装配置kafka
- 修改/etc/hosts,配置zookeeper域名解析
- 修改kafka安装目录的应用配置文件/config/server.properties
- 设置zookeeper地址:
zookeeper.connect=zk1:2181
host设置为使用hostname而不是ip,避免连接缓慢 2. 设置broker.id(在集群中必须唯一)broker.id=x
- 设置日志文件存储位置(也可使用默认位置)
log.dirs=/xxx/xxx
- 配置允许kafka远程访问
advertised.listeners=PLAINTEXT://172.23.x.x:9092
- 其余配置
auto.create.topics.enable=false
delete.topic.enable=true
- 配置文档:
http://kafkadoc.beanmr.com/030_configuration/01_configuration_cn.html#brokerconfigs - 在kafka安装目录输入以下命令启动kafka
bin/kafka-server-start.sh config/server.properties &
- kafka集群安装好后,在zookeeper安装目录执行以下命令连接zookeeper
./bin/zkCli.sh -server zk1:2181
- 通过zookeeper节点查看kafka状态
命令:ls /brokers/ids
结果:
创建topic
- 在kafka安装目录执行以下脚本:
bin/kafka-topics.sh --create --zookeeper 172.23.x.x:2181 --replication-factor 1 --partitions 2 --topic usr-log
- 创建成功后,在kafka安装目录输入命令查看topic分片情况
bin/kafka-topics.sh --describe --zookeeper 172.23.x.x:2181 usr-log
topic被存储在2个分区中,并且只有一个leader副本,不做冗余备份。
安装nginx插件ngx_kafka_module
-
官方文档地址 https://github.com/brg-liuwei/ngx_kafka_module -
安装流程
- 安装插件前需先安装kafka c/c++客户端环境(librdkafka)
- 安装好kafka客户端环境后输入以下命令设置系统全局配置更新
echo "/usr/local/lib" >> /etc/ld.so.conf
ldconfig
- 安装ngx_kafka_module插件
- 注意事项:
安装插件时,make install会覆盖原有nginx模块,若在已安装的nginx进行操作,执行configure时记得带上原模块,例如: ./configure --prefix=/usr/local/src/nginx_1.18.0 --with-http_ssl_module --add-module=/usr/local/src/ngx_kafka_module
-
官方安装核心步骤图
配置nginx,将用户行为日志转发至kafka
- 修改nginx.conf文件,配置请求转发至kafka
http {
kafka;
kafka_broker_list 172.23.x.x:9092 172.23.x.x:9092;
server {
listen 80;
server_name localhost;
location /log/usr {
kafka_topic usr-log;
}
}
}
- 官方配置文档
https://github.com/brg-liuwei/ngx_kafka_module - 启动nginx
- 执行命令模拟客户端向nginx发送操作日志
curl -d '{"uid": 1, "action": "2"}' -H 'Content-Type: application/json' http://localhost/log/usr
- 使用kafka终端消费脚本观察日志是否发送至kafka
./kafka-console-consumer.sh --bootstrap-server 172.23.x.x:9092,172.23.x.x:9092 --topic usr-log
使用flink计算并且保存结果
- 添加maven依赖
<properties>
<flink.version>1.10.3</flink.version>
<lombok.version>1.18.10</lombok.version>
<fastjson.version>1.2.49</fastjson.version>
<maven.compiler.version>3.5.1</maven.compiler.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
- 编写flume程序从kafka抓取nginx日志,并保存到数据库
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "172.23.x.x:9092,172.23.x.x:9092");
props.put("zookeeper.connect", "zk1:2181");
props.put("group.id", "flink");
props.put("client.id", "flink-1");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
props.put("enable.auto.commit", "true");
DataStreamSource<String> dataStreamSource = env.addSource(
new FlinkKafkaConsumer<String>(
"usr-log",
new SimpleStringSchema(),
props))
.setParallelism(1);
dataStreamSource.timeWindowAll( Time.seconds(5L) ).apply(new AllWindowFunction<String, List<String>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<String> iterableValues, Collector<List<String>> out) throws Exception {
List<String> strList = Lists.newArrayList(iterableValues);
if ( strList.isEmpty() ){
return;
}
out.collect(strList);
}
}).addSink( new JdbcPersistence() );
env.execute("nginx log analyse running");
- 编写数据持久类,保存统计结果到数据库
public class JdbcPersistence extends RichSinkFunction<List<String>> {
@Override
public void invoke(List<String> values, Context context) throws Exception {
for (String log : values) {
System.out.println(log);
}
}
}
- 消费结果:
|