kafka简介:
kafka是一个基于发布/订阅的分布式消息系统,主要设计目标有:
1.高性能的消息持久化能力
2.高通吐且对硬件要求不高
3.支持分区存储,分布式消费,且能保证分区内的消息顺序.
4.支持离线/在线的数据传输处理
5.支持水平扩展
kafka安装配置
1.下载压缩包
http://kafka.apache.org/downloads
2.解压

3.进入kafka_2.12-2.7.0文件夹

4.进入config文件夹找到 server.properties文件打开,打开listeners = PLAINTEXT://127.0.0.1:9092代码的注释并修改ip,然后保存修改并退出.

?5.打开终端,进入kafka_2.12-2.7.0文件夹下.输入命令:
bin/zookeeper-server-start.sh config/zookeeper.properties
?启动zookeeper,这是kafka自带的zookeeper,用来管理broker(代理服务),像注册中心一样.
?加上nohup代码是后台启动.
6.启动kafka,使用:bin/kafka-server-start.sh config/server.properties命令

7.操作命令
//查看topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
//创建一个名为topicTest的topic,分区为3,副本为1
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topicTest --partitions 3 --replication- factor 1
//查看topicTest详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicTest
//创建一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topicTest
//创建一个生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topicTest
kafka集群配置
1.因为我们前面配置了单机的,zookeeper里面的配置需要清空,不然集群起不来.所以需要下载一个页面工具ZooInspector,下载地址https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
2.下载完成后从终端进入到ZooInspector/build目录下启动zookeeper-dev-ZooInspector.jar

3.连接上本地的zookeeper

4.将目录清理至如下

5.重启zookeeper服务

6.准备3个配置文件,放在kafka_2.12-2.7.0文件夹下



//不同文件ibroker.id不同
broker.id=11
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
//日志文件路径不同
log.dirs=/tmp/kafka/kafka-log1
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000000
delete.topic.enable=true
group.initial.rebalance.delay.ms=0
message.max.bytes=5000000
replica.fetch.max.bytes=5000000
//端口不同
listeners=PLAINTEXT://localhost:9001
broker.list=localhost:9001,localhost:9002,localhost:9003
zookeeper.connect=localhost:2181
?7.启动这3台kafka
./bin/kafka-server-start.sh kafka9001.properties
./bin/kafka-server-start.sh kafka9002.properties
./bin/kafka-server-start.sh kafka9003.properties
启动成功.
?8.spring boot集成,配置文件

?9.引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.3</version>
<scope>compile</scope>
</dependency>
10.编写生产者和消费者类

?
package com.kafka.demo.must1;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.kafka.core.KafkaTemplate;
/**
* @Author: LiXin
* @CreateTime: 2021/07/29/ 20:34
* @Presentation:
*/
@Service
public class ProducerDemoImpl implements ProducerDemo{
@Autowired
private KafkaTemplate kafkaTemplate;
@Override
public void send() {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send("test32","消息"+i);
System.out.println("已生产消息"+i);
}
}
}

?
package com.kafka.demo.must1;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.apache.kafka.common.serialization.Serializer;
/**
* @Author: LiXin
* @CreateTime: 2021/07/29/ 20:34
* @Presentation:
*/
@Service
public class ConsumerDemoImpl implements ConsumerDemo{
@Override
@KafkaListener(topics = {"test32"})
public void consumer(ConsumerRecord record) {
System.out.println("消费者收到消息"+record.value());
}
}
11.创建一个controller接口,便于从页面访问
package com.kafka.demo.must1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
/**
* @Author: LiXin
* @CreateTime: 2021/07/29/ 22:33
* @Presentation:
*/
@Controller
public class TestConsumerAndProducer {
@Autowired
private ProducerDemo producerDemo;
//页面访问地址:http://localhost:8080/sendMsg
@GetMapping("/sendMsg")
public String show(){
producerDemo.send();
return "调用生产成功";
}
}
12.启动服务

?
13.访问服务

页面因为没有返回资源,所以会报错404,但控制台可以看出是成功的 
?
|