IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka集群环境搭建 + ELK -> 正文阅读

[大数据]kafka集群环境搭建 + ELK

Zookeeper

Zookeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供功能包括:配置维护、分布式同步等。Kafka的运行依赖Zookeeper。

Zookeeper主要用来协调Kafka的各个broker,不仅可以实现broker的负载均衡,而且当增加了broker或者某个broker故障了,Zookeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。

在Kafka中,一个topic会被分成多个区并被分配在多个broker上,分区的信息以及broker的分布情况和消费者当前的消费的状态信息都会保存在ZooKeeper中

集群部署

es01:192.168.241.131
es02:192.168.241.132
es03:192.168.241.133

软件版本: kafka_2.12-2.8.1.tgz

关闭防火墙

setenforce 0
systemctl stop firewalld
systemctl disable firewalld

安装jdk8

yum install -y java-1.8.0-openjdk

安装ZK

配置服务解析
vim /etc/hosts
192.168.241.131 es01
192.168.241.132 es02
192.168.241.133 es03
1. 安装
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar zxvf kafka_2.12-2.8.1.tgz 
mv kafka_2.12-2.8.1 /usr/local/kafka
2. 配置
sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
vim /usr/local/kafka/config/zookeeper.properties

dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10

server.1=192.168.241.131:2888:3888
server.2=192.168.241.132:2888:3888
server.3=192.168.241.133:2888:3888
dataDir             # ZK数据存放目录
dataLogDir          # ZK日志存放目录
clientPort          # 客户端连接ZK服务的端口
tickTime            # ZK服务器直接或客户端与服务端直接保持心跳的时间间隔
initLimit           # 允许follower连接并同步到Leader的初始化连接时间,当初始化连接时间超过该值,则表示连接失败
syncLimit           # Leader与Follower之间发送消息时如果follower在设置时间内不能与leader通信,那么此follower将会被抛弃
server.1            # 2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。
# 创建data、log目录
mkdir -p /opt/data/zookeeper/{data,log}
# 创建myid文件,集群中此值不能相同
echo 1 > /opt/data/zookeeper/data/myid
3. 配置kafka
sed -i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties
vim /usr/local/kafka/config/server.properties

broker.id=1
listeners:PLAINTEXT://192.168.241.131:9092
num.network.threads=2
num.io.threads=8
socker.send.buffer.bytes=102400
socker.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168    
log.segment.bytes=536870912 
log.retention.check.interval.ms=300000  
zookeeper.connect=192.168.241.131:2181,192.168.241.132:2181,192.168.241.133:2181
zookeeper.connection.timeout.ms=6000 
group.initial.rebalance.delay.ms=0  
broker.id               # 集群中此值不能相同
listeners               # 监听地址,集群中此值不能相同
num.network.threads     # broker 处理消息的最大线程数
num.io.threads          # broker 处理磁盘IO的线程数,数值应该大于你的硬盘数
socker.send.buffer.bytes    # socket的发送缓存区
socker.receive.buffer.bytes # socket的接收缓冲区
socket.request.max.bytes    # socket请求的最大值
log.dirs                    # 日志文件目录
num.partitions                    
num.recovery.threads.per.data.dir
offsets.topic.replication.factor
transaction.state.log.replication.factor
transaction.state.log.min.isr
log.retention.hours                 # 日志数据存储的最大时间,超过这个时间会根据log.cleanup.policy设置的策略处理数据
log.segment.bytes                   # 日志分片的大小
log.retention.check.interval.ms     # 信息校验的时间间隔
zookeeper.connect
zookeeper.connection.timeout.ms     # zk连接超时时间
group.initial.rebalance.delay.ms    
# 创建log目录
mkdir -p /opt/data/kafka/log
4. 启动ZK服务
cd /usr/local/kafka
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

netstat -anpt | grep 2181
4. 启动kafka服务
cd /usr/local/kafka
nohup bin/kafka-server-start.sh config/server.properties &

netstat -anpt | grep 9092
5. 配置开机自启(systemctl)
vim /lib/systemd/system/zookeeper.service

[Unit]
Description=Zookeeper service
After=network.target

[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
vim /lib/systemd/system/kafka.service

[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service

[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

自启配置

systemctl daemon-reload
systemctl enable zookeeper
systemctl enable kafka

systemctl start zookeeper
systemctl start kafka

实例

创建topic testtopic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic

查询topic

bin/kafka-topics.sh --zookeeper 192.168.241.131:2181 --list

模拟消息生产和消费

发送消息到241.131

bin/kfaka-console-producer.sh --broker-list 192.168.241.131:9092 --topic testtopic

从241.132接收消息

bin/kfaka-console-producer.sh --bootstrap-server 192.168.241.132:9092 --topic testtopic --from-beginning

filebeat配置kafka

vim /etc/filebeat/filebeat.yml 

output.kafka:   #输出至kafka集群
  hosts: ["192.168.241.131:9092", "192.168.241.132:9092", "192.168.241.133:9092"]
  
  topic: '%{[fields][log_topic]}'
  partion.round_robin:
    reachable_only: false
    
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
  

logstash配置kafka

[root@localhost ~]# vim /etc/logstash/test-pipeline.yml 

input{
  kafka {
      # type => "nginx_log"
      codec => "json"
      topics => ["nginx", "test-log"]
      decorate_events => true
      consumer_threads => 3
      bootstrap_servers => ""192.168.241.131:9092", "192.168.241.132:9092", "192.168.241.133:9092""
  }
}

filter {
grok { #过滤插件
  match => { "message" => "%{COMBINEDAPACHELOG}" } }
#  geoip { "source" => "clientip" } # 溯源
#  mutate { rename => { "clientip" => "cip"  }  } #重写字段
  mutate { remove_field => ["message", "input_type", "@version", "host"]  } # 去除没用字段
}
}

output{
  stdout {} # 标准输出
  
  elasticsearch { 
      codec => "json"
      hosts => ["192.168.241.130:9200"] # 输出到elasticsearch集群
      index => "%{appname}-%{+YYYY.MM.dd}"  # 创建索引
  }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-03 16:21:05  更:2022-03-03 16:24:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:11:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码