IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka安装与配置 -> 正文阅读

[大数据]Kafka安装与配置

1.下载Kafka2.13-3.1.0

最新版为?kafka_2.13-3.1.0.tgz

下载Zookeper

最新版为?zookeeper-3.8.0

2.单机安装zookeper

Kafka依赖于zookeeper,官方承诺将来会移除.

解压文件:

tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt/
mv /opt/apache-zookeeper-3.8.0-bin/ /opt/zookeeper

在/opt/zookeeper/ 目录下创建数据文件目录和日志文件目录

mkdir /opt/zookeeper/zkData
mkdir /opt/zookeeper/zkLog

# 复制一份配置文件并修改

cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
# 修改如下内容
dataDir=/opt/zookeeper/zkData
dataLogDir=/opt/zookeeper/zkLog

启动

cd /opt/zookeeper/bin/
# 启动zookeeper
./zkServer.sh start
# 查看进程是否启动
jps
# 查看状态
./zkServer.sh status
# 停止zookeeper
./zkServer.sh stop

3.安装Kafka

解压到指定目录

cd /home
$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0

修改config目录下vi server.propertie文件

listeners = PLAINTEXT://192.168.2.40:9092

#多个可用逗号分隔

#zookeeper.connect=server1:2181,server2:2181,server3:2181

zookeeper.connect=localhost:2181

启动命令:

bin/kafka-server-start.sh config/server.properties

此方式可以实时查看日志.

后台启动方式:

./kafka-server-start.sh -daemon ../config/server.properties

查询进程和关闭命令

jps
./kafka-server-stop.sh

登录zookeeper客户端,查看/brokers/ids

cd /opt/zookeeper/bin/
zkCli.sh
# 查询结果如下:
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[0]
[zk: localhost:2181(CONNECTED) 1] quit

kafka常见命令

#创建主题 主题名是 quickstart-events
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
#查询主题
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
#主题中写入消息
 bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
#主题中读取消息
 bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

4.可视化工具kafka-eagle

下载:kafka-eaglev2.1.0.tar.gz

解压

cd /home
tar -zxvf kafka-eagle-web-2.1.0-bin.tar.gz 

在/etc/profile文件中添加环境变量KE_HOME?

vi /etc/profile
# 在profile文件中添加
export KE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5
export PATH=$PATH:$KE_HOME/bin
# 使修改后的profile文件生效
. /etc/profile

安装MySQL并添加数据库ke,kafka-eagle之后会用到它;
修改配置文件$KE_HOME/conf/system-config.properties,主要是修改Zookeeper的配置和数据库配置,注释掉sqlite配置,改为使用MySQL.

######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
 ######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048
 ######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456

?启动后会自动使用上面的数据库连接,创建并初始化数据库.名称ke.

# 停止服务
bin/ke.sh stop
# 重启服务
bin/ke.sh restart
# 查看服务运行状态
bin/ke.sh status
# 查看服务状态
bin/ke.sh stats
# 动态查看服务输出日志
tail -f /logs/ke_console.out 

??
启动成功可以直接访问,输入账号密码admin:123456,访问地址:http://localhost:8048/

?

注意观察 brokers,topics的数量。brokers为0的话没有连接成功.

可视化工具自然少不了监控,如果你想开启kafka-eagle对Kafka的监控功能的话,需要修改Kafka的启动脚本,暴露JMX的端口.

vi kafka-server-start.sh
#添加  export JMX_PORT="9999"
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi

?

5.SpringBoot整合Kafka.

在pom.xml中添加

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

?在application.yml中spring节点下添加

spring
    kafka:
    bootstrap-servers: ts.huida.mes.mysql.node1:9092
    producer: # 生产者配置
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384 #16K
      buffer-memory: 33554432 #32M
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: zhTestGroup # 消费者组
      enable-auto-commit: false # 关闭自动提交
      auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

生产者:

@RestController
public class KafkaProducer {

	@Autowired
	private KafkaTemplate kafkaTemplate;
	// 发送消息
	@GetMapping("/kafka/normal/{message}")
	public void sendMessage1(@PathVariable("message") String normalMessage) {
		kafkaTemplate.send("quickstart-events", normalMessage);
	}

}

?消费者:

@Component
public class KafkaConsumer {
	// 消费监听
	@KafkaListener(topics = {"quickstart-events"})
	public void onMessage1(ConsumerRecord<?, ?> record){
		// 消费的哪个topic、partition的消息,打印出消息内容
		System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
	}
}

?可以用postman进行测试,观察结果.

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-05 11:25:14  更:2022-05-05 11:26:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 9:05:48-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码