Zookeeper
Zookeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供功能包括:配置维护、分布式同步等。Kafka的运行依赖Zookeeper。
Zookeeper主要用来协调Kafka的各个broker,不仅可以实现broker的负载均衡,而且当增加了broker或者某个broker故障了,Zookeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。
在Kafka中,一个topic会被分成多个区并被分配在多个broker上,分区的信息以及broker的分布情况和消费者当前的消费的状态信息都会保存在ZooKeeper中
集群部署
es01:192.168.241.131 es02:192.168.241.132 es03:192.168.241.133
软件版本: kafka_2.12-2.8.1.tgz
关闭防火墙
setenforce 0
systemctl stop firewalld
systemctl disable firewalld
安装jdk8
yum install -y java-1.8.0-openjdk
安装ZK
配置服务解析
vim /etc/hosts
192.168.241.131 es01
192.168.241.132 es02
192.168.241.133 es03
1. 安装
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar zxvf kafka_2.12-2.8.1.tgz
mv kafka_2.12-2.8.1 /usr/local/kafka
2. 配置
sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.241.131:2888:3888
server.2=192.168.241.132:2888:3888
server.3=192.168.241.133:2888:3888
dataDir # ZK数据存放目录
dataLogDir # ZK日志存放目录
clientPort # 客户端连接ZK服务的端口
tickTime # ZK服务器直接或客户端与服务端直接保持心跳的时间间隔
initLimit # 允许follower连接并同步到Leader的初始化连接时间,当初始化连接时间超过该值,则表示连接失败
syncLimit # Leader与Follower之间发送消息时如果follower在设置时间内不能与leader通信,那么此follower将会被抛弃
server.1 # 2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。
# 创建data、log目录
mkdir -p /opt/data/zookeeper/{data,log}
# 创建myid文件,集群中此值不能相同
echo 1 > /opt/data/zookeeper/data/myid
3. 配置kafka
sed -i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties
vim /usr/local/kafka/config/server.properties
broker.id=1
listeners:PLAINTEXT://192.168.241.131:9092
num.network.threads=2
num.io.threads=8
socker.send.buffer.bytes=102400
socker.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.241.131:2181,192.168.241.132:2181,192.168.241.133:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
broker.id # 集群中此值不能相同
listeners # 监听地址,集群中此值不能相同
num.network.threads # broker 处理消息的最大线程数
num.io.threads # broker 处理磁盘IO的线程数,数值应该大于你的硬盘数
socker.send.buffer.bytes # socket的发送缓存区
socker.receive.buffer.bytes # socket的接收缓冲区
socket.request.max.bytes # socket请求的最大值
log.dirs # 日志文件目录
num.partitions
num.recovery.threads.per.data.dir
offsets.topic.replication.factor
transaction.state.log.replication.factor
transaction.state.log.min.isr
log.retention.hours # 日志数据存储的最大时间,超过这个时间会根据log.cleanup.policy设置的策略处理数据
log.segment.bytes # 日志分片的大小
log.retention.check.interval.ms # 信息校验的时间间隔
zookeeper.connect
zookeeper.connection.timeout.ms # zk连接超时时间
group.initial.rebalance.delay.ms
# 创建log目录
mkdir -p /opt/data/kafka/log
4. 启动ZK服务
cd /usr/local/kafka
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
netstat -anpt | grep 2181
4. 启动kafka服务
cd /usr/local/kafka
nohup bin/kafka-server-start.sh config/server.properties &
netstat -anpt | grep 9092
5. 配置开机自启(systemctl)
vim /lib/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
vim /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
自启配置
systemctl daemon-reload
systemctl enable zookeeper
systemctl enable kafka
systemctl start zookeeper
systemctl start kafka
实例
创建topic testtopic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic
查询topic
bin/kafka-topics.sh --zookeeper 192.168.241.131:2181 --list
模拟消息生产和消费
发送消息到241.131
bin/kfaka-console-producer.sh --broker-list 192.168.241.131:9092 --topic testtopic
从241.132接收消息
bin/kfaka-console-producer.sh --bootstrap-server 192.168.241.132:9092 --topic testtopic --from-beginning
filebeat配置kafka
vim /etc/filebeat/filebeat.yml
output.kafka: #输出至kafka集群
hosts: ["192.168.241.131:9092", "192.168.241.132:9092", "192.168.241.133:9092"]
topic: '%{[fields][log_topic]}'
partion.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
logstash配置kafka
[root@localhost ~]# vim /etc/logstash/test-pipeline.yml
input{
kafka {
# type => "nginx_log"
codec => "json"
topics => ["nginx", "test-log"]
decorate_events => true
consumer_threads => 3
bootstrap_servers => ""192.168.241.131:9092", "192.168.241.132:9092", "192.168.241.133:9092""
}
}
filter {
grok { #过滤插件
match => { "message" => "%{COMBINEDAPACHELOG}" } }
# geoip { "source" => "clientip" } # 溯源
# mutate { rename => { "clientip" => "cip" } } #重写字段
mutate { remove_field => ["message", "input_type", "@version", "host"] } # 去除没用字段
}
}
output{
stdout {} # 标准输出
elasticsearch {
codec => "json"
hosts => ["192.168.241.130:9200"] # 输出到elasticsearch集群
index => "%{appname}-%{+YYYY.MM.dd}" # 创建索引
}
}
|