前言:本文基于springboot分布式系统实现日志的收集与分析,多用于电商项目的秒杀等热点数据。文章中的内容涉及zookeeper(注册中心)、kafka(队列)、Lua语言(日志收集)以及Apache Druid(实时分析)等热门技术。
一、zookeeper安装
注意:安装zookeeper前,需确保已安装jdk1.8_92以上到虚拟机!安装jdk参考上一篇文章 Linux安装jkd1.8。
下载地址:https://zookeeper.apache.org/releases.html
1、上传安装包到指定目录下
将安装包上传到/usr/local/zookeeper目录下并解压:
2、创建data和logs文件夹用于存放数据和日志
[root@localhost apache-zookeeper-3.5.9-bin]
[root@localhost apache-zookeeper-3.5.9-bin]
3、重命名配置文件并修改配置文件
注意:zookeeper默认加载zoo.cfg文件
重命名:
[root@localhost conf]
/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/conf
[root@localhost conf]
修改zoo.cfg文件,修改数据和日志位置:
[root@localhost conf]
4、启动zookeeper
[root@localhost bin]
5、检查是否启动成功
[root@localhost bin]
二、kafka安装
下载地址:http://kafka.apache.org/downloads
1、上传安装包到指定目录
将安装包上传到/usr/local/kafka目录下并解压:
[root@localhost kafka]
2、创建日志存放目录
在解压目录/usr/local/kafka/kafka_2.13-3.0.0 下创建:
[root@localhost kafka_2.13-3.0.0]
/usr/local/kafka/kafka_2.13-3.0.0
[root@localhost kafka_2.13-3.0.0]
3、修改配置文件
[root@localhost kafka_2.13-3.0.0]
参数介绍:
listeners:
- localhost : 只监听本机的地址请求, 客户端也只能用 localhost 来请求
- 127.0.0.1 : 同localhost, 在请求上可能有与区分 , 看client的请求吧 . 客户端也只能用127.0.0.1来请求
- 192.168.0.1 : 建议不要用这个 , 局域网不一定是 192.168 段的.
- 0.0.0.0 : 本机的所有地址都监听 , 包含 localhost , 127.0.0.1, 及不同网卡的所有ip地址 , 都监听 .
advertised.listeners:
- 这个是对外提供的地址 , 当client请求到kafka时, 会分发这个地址.
- 有三个地方用到: 集群内其他的broker,生产者,消费者
- 可以不填 , 不填就默认用 listeners 的地址.
4、启动kafka并验证是否成功
1、启动
[root@localhost kafka_2.13-3.0.0]
2、验证
[root@localhost kafka_2.13-3.0.0]
出现如下内容表示启动成功! kafka快速入门请参考文档:https://kafka.apache.org/quickstart
1)下面来简单创建一个主题(即队列 itemaccess ),为下文中的日志收集做准备:
[root@localhost kafka_2.13-3.0.0]
/usr/local/kafka/kafka_2.13-3.0.0
[root@localhost kafka_2.13-3.0.0]
[root@localhost kafka_2.13-3.0.0]
itemaccess
2)启动消费端
[root@localhost kafka_2.13-3.0.0]
三、日志收集
1、OpenRestry安装
OpenResty 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。OpenResty 通过lua脚本扩展nginx功能,可提供负载均衡、请求路由、安全认证、服务鉴权、流量控制与日志监控等服务。
关于OpenRestry的学习,可以参考:http://openresty.org/cn/
1.1、下载并解压安装包
在目录/tmp下下载安装包:
[root@localhost tmp]
解压:
[root@localhost tmp]
1.2、安装
进入到解压目录进行安装,依次执行以下命令:
[root@localhost tmp]
[root@localhost openresty-1.11.2.5]
[root@localhost openresty-1.11.2.5]
[root@localhost openresty-1.11.2.5]
安装完成后,软件会安装到/usr/local/openresty,这里面会包含nginx。
1.3、配置环境变量并刷新配置
配置:
[root@localhost openresty-1.11.2.5]
刷新:
[root@localhost openresty-1.11.2.5]
1.4、测试是否可用
1)在/usr/local目录下创建web/items目录:
[root@localhost local]
/usr/local
[root@localhost local]
2)下载百度网页到web目录下:
[root@localhost local]
[root@localhost items]
/usr/local/web/items
[root@localhost items]
3)修改/usr/local/openresty/nginx/conf/下的nginx.conf配置文件
[root@localhost conf]
/usr/local/openresty/nginx/conf
[root@localhost conf]
加入如下内容:
4)启动nginx,并访问测试
[root@localhost conf]
/usr/local/openresty/nginx/conf
[root@localhost conf]
访问:http://192.168.8.116:8081/items/index.html
2、Lua日志收集
使用Lua实现日志收集,并向Kafka发送访问的详情页信息,此时我们需要安装一个依赖组件lua-restry-kafka。关于lua-restry-kafka的下载和使用,可以参考https://github.com/doujiang24/lua-resty-kafka
日志收集流程: ? 用户请求/web/items/1.html,进入到nginx第1个location中,在该location中向Kafka发送请求日志信息,并将请求中的/web去掉,跳转到另一个location中,并查找本地文件,这样既可以完成日志收集,也能完成文件的访问。
2.1、解压
将下载好的lua-resty-kafka-master.zip文件上传到/usr/local/openrestry目录下,并解压。 1)安装unzip命令
[root@localhost openresty]
2)解压
[root@localhost openresty]
/usr/local/openresty
[root@localhost openresty]
bin COPYRIGHT luajit lualib lua-resty-kafka-master.zip nginx openssl111 pcre pod resty.index site zlib
[root@localhost openresty]
2.2、修改nginx的配置
修改nginx.conf,在配置文件中指定lua-resty-kafka的库文件位置:
[root@localhost conf]
/usr/local/openresty/nginx/conf
[root@localhost conf]
2.3、日志收集
用户访问页面的时候,需要实现日志收集,日志收集采用Lua将当前访问信息发布到Kafka中,因此这里要实现Kafka消息生产者。
我们定义一个消息格式:
{
"actime": "2021-11-10 16:25:30",
"uri": "http://192.168.8.116/items/index.html",
"ip": "119.123.33.231",
"token": "Bearer JAVAITCAST"
}
1)生产者脚本 在/usr/local/openresty/nginx/lua目录下创建一个lua脚本items-access.lua:
[root@localhost lua]
/usr/local/openresty/nginx/lua
[root@localhost lua]
脚本内容如下:
--引入json解析库
local cjson = require("cjson")
--kafka依赖库
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
--配置kafka的链接地址
local broker_list = {
{ host = "192.168.8.116", port = 9092 }
}
--创建生产者
local pro = producer:new(broker_list,{ producer_type="async"})
--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
--定义消息内容
local logjson = {}
logjson["uri"]=ngx.var.uri
logjson["ip"]=ip
logjson["token"]="Bearer TEST"
logjson["actime"]=os.date("%Y-%m-%d %H:%m:%S")
--发送消息
local offset, err = pro:send("itemaccess", nil, cjson.encode(logjson))
--去掉访问前缀
local uri = ngx.var.uri
uri = string.gsub(uri,"/web","")
--页面跳转
ngx.exec(uri)
2.4、修改nginx配置
[root@localhost conf]
/usr/local/openresty/nginx/conf
[root@localhost conf]
重启nginx!
2.5、日志收集测试
请求地址:http://192.168.8.116:8081/web/items/index.html
四、 Apache Druid日志实时分析
Apache Druid 是一个分布式的、支持实时多维 OLAP 分析的数据处理系统。它既支持高速的数据实时摄入,也支持实时且灵活的多维数据分析查询。因此 Druid 最常用的场景是大数据背景下、灵活快速的多维 OLAP 分析。 另外,Druid 还有一个关键的特点:它支持根据时间戳对数据进行预聚合摄入和聚合分析,因此也有用户经常在有时序数据处理分析的场景中用到它。
注:需要JDK:java8(8u92+),同时需要笔记本大约 4 个 CPU 和 16 G的内存来运行!
1、下载安装包
下载地址:https://druid.apache.org/downloads.html 快速入门:https://druid.apache.org/docs/latest/tutorials/index.html
2、安装
1)将文件上传至/usr/local/apache-druid目录下: 2)解压安装
[root@localhost apache-druid]
3、修改 Apache Druid自带的zookeeper的端口
问题:在单机部署的时候会和原先安装的zookeeper端口2181冲突,如果两个一起启动,那就就需要修改Druid或者zookeeper端口为2182。
1)查看执行文件
[root@localhost bin]
/usr/local/apache-druid/apache-druid-0.22.0/bin
[root@localhost bin]
2)看到显示加载的conf文件是micro-quickstart.conf,直接查看该conf文件
[root@localhost apache-druid-0.22.0]
/usr/local/apache-druid/apache-druid-0.22.0
[root@localhost apache-druid-0.22.0]
发现先去端口验证verify bin/verify-default-ports,然后执行in/run-zk conf。
3) 修改bin/verify-default-ports文件中的端口
[root@localhost apache-druid-0.22.0]
/usr/local/apache-druid/apache-druid-0.22.0
[root@localhost apache-druid-0.22.0]
将@ports数组中的2181改为2182。
4)将zookeeper中的端口改为2182,修改zoo.cfg文件
[root@localhost zk]
/usr/local/apache-druid/apache-druid-0.22.0/conf/zk
[root@localhost zk]
jvm.config log4j2.xml zoo.cfg
[root@localhost zk]
4、启动单机版Apache Druid
[root@localhost apache-druid-0.22.0]
启动后访问:http://192.168.3.10:8888/,默认的端口是8888
5、测试
5.1、离线数据导入
- 点击Load data->Local disk->Connect data
2)导入数据
我们要导入的数据在/tmp/apache-druid-0.21.1/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz(在安装目录下),需要把该文件的相对路径填写到右边表单中,再点击Apply。 3) 解析数据 在上一个步骤上点击Next:Parse data 4) 解析时间 在上一个步骤上点击Next: Parse time,Apache Druid要求每条数据都有一个time列,如果我们导入的数据没有该列,Apache Druid会自动创建该列! 5) 数据分区设置 点击下一步一直到Partition,Segment granularity选择day
- Segment granularity:分片文件每个segment包含的时间戳范围
- Partitioning type:分区类型
- Max rows per segment:用于分片。确定每个段中的行数。
6) 设置数据源
将默认名称从 更改 wikiticker-2015-09-12-sampled为wikipedia 7)提交数据
5.2、实时数据摄入
参考地址:https://druid.apache.org/docs/latest/tutorials/tutorial-kafka.html 1)加载数据
2)配置Kafka源
topic设置为上文kafka所创建的itemaccess。 3)配置数据源名字
其他的步骤和之前文件导入一样。 查询:
6、JDBC查询druid
1、导入依赖
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.15.0</version>
</dependency>
2、测试
public static void main(String[] args) throws Exception{
String url = "jdbc:avatica:remote:url=http://192.168.8.116:8082/druid/v2/sql/avatica/";
AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url);
String sql="SELECT uri,count(*) AS \"viewcount\" FROM(SELECT * FROM \"itemlogs\" WHERE __time>'2021-11-10 21:50:30' ORDER BY __time DESC) GROUP BY uri LIMIT 100";
AvaticaStatement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
String uri = resultSet.getString("uri");
String viewcount = resultSet.getString("viewcount");
System.out.println(uri+"--------->"+viewcount);
}
}
7、拓展
Druid的时区和国内时区不一致,会比我们的少8个小时,我们需要修改配置文件,批量将时间+8,代码如下:
[root@k8s-master1 apache-druid-0.21.1]
五、springboot整合druid
采用elastic-job定时器来实时查询热点数据。
1、导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>com.github.kuhn-he</groupId>
<artifactId>elastic-job-lite-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.0</version>
</dependency>
2、yaml配置
server:
port: 18084
spring:
application:
name: monitor
datasource:
driver-class-name: org.apache.calcite.avatica.remote.Driver
url: jdbc:avatica:remote:url=http://192.168.3.10:8082/druid/v2/sql/avatica/
type: com.alibaba.druid.pool.DruidDataSource
cloud:
nacos:
config:
file-extension: yaml
server-addr: 192.168.3.10:8848
discovery:
server-addr: 192.168.3.10:8848
redis:
cluster:
nodes:
- 192.168.3.10:7001
- 192.168.3.10:7002
- 192.168.3.10:7003
- 192.168.3.10:7004
- 192.168.3.10:7005
- 192.168.3.10:7006
elaticjob:
zookeeper:
server-lists: 192.168.3.10:2181
namespace: monitortask
druidurl: jdbc:avatica:remote:url=http://192.168.3.10:8082/druid/v2/sql/avatica/
logging:
pattern:
console: "%msg%n"
3、热点数据查询
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.calcite.avatica.AvaticaConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@Component
public class MonitorItemsAccess {
@Value("${druidurl}")
private String druidurl;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private DruidDataSource dataSource;
public List<String> loadData() throws Exception{
Connection connection =dataSource.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(druidSQL());
List<String> ids = new ArrayList<String>();
while (resultSet.next()){
String uri = resultSet.getString("uri");
uri=uri.replace("/web/items/","").replace(".html","");
ids.add(uri);
}
resultSet.close();
statement.close();
connection.close();
return ids;
}
public String druidSQL(){
String prefix="SELECT COUNT(*) AS \"viewCount\",uri FROM logsitems WHERE __time>=CURRENT_TIMESTAMP - INTERVAL '1' HOUR";
String suffix=" GROUP BY uri HAVING viewCount>2";
String sql = "";
Set<String> keys = redisTemplate.keys("SKU_*");
if(keys!=null && keys.size()>0){
sql=" AND uri NOT IN (";
for (String key : keys) {
sql+="'/web/items/"+key.substring(4)+".html',";
}
sql=sql.substring(0,sql.length()-1);
sql+=")";
}
return prefix+sql+suffix;
}
}
4、定时查询热点数据
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.elasticjob.lite.annotation.ElasticSimpleJob;
import com.seckill.goods.feign.SkuFeign;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@ElasticSimpleJob(
cron = "1/5 * * * * ?",
jobName = "monitortask",
shardingTotalCount = 1
)
public class MonitorTask implements SimpleJob{
@Autowired
private MonitorItemsAccess monitorItemsAccess;
@SneakyThrows
@Override
public void execute(ShardingContext shardingContext) {
List<String> ids = monitorItemsAccess.loadData();
for (String id : ids) {
System.out.println("热点商品ID:"+id);
}
}
}
自此,完成!!!
|