kafka简介:
kafka是一个基于发布/订阅的分布式消息系统,主要设计目标有:
1.高性能的消息持久化能力
2.高通吐且对硬件要求不高
3.支持分区存储,分布式消费,且能保证分区内的消息顺序.
4.支持离线/在线的数据传输处理
5.支持水平扩展
kafka安装配置
1.下载压缩包
http://kafka.apache.org/downloads
2.解压
data:image/s3,"s3://crabby-images/c0aa0/c0aa011cd61a8e4d6c5876bd29d1be95c962fdaa" alt=""
3.进入kafka_2.12-2.7.0文件夹
data:image/s3,"s3://crabby-images/1dc0f/1dc0fd5bdc841a05b7fc0bdf60801f2e4897b511" alt=""
4.进入config文件夹找到 server.properties文件打开,打开listeners = PLAINTEXT://127.0.0.1:9092代码的注释并修改ip,然后保存修改并退出.
data:image/s3,"s3://crabby-images/c459d/c459d9ec2548de171ccf35685231ac81e46e56c5" alt=""
?5.打开终端,进入kafka_2.12-2.7.0文件夹下.输入命令:
bin/zookeeper-server-start.sh config/zookeeper.properties
?启动zookeeper,这是kafka自带的zookeeper,用来管理broker(代理服务),像注册中心一样.data:image/s3,"s3://crabby-images/177ad/177adac8279f5728a4a02c7144aefac3304463d4" alt=""
?加上nohup代码是后台启动.
6.启动kafka,使用:bin/kafka-server-start.sh config/server.properties命令
data:image/s3,"s3://crabby-images/21c07/21c07009a00bd2a0b2521eec5e49bd142ecd1047" alt=""
7.操作命令
//查看topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
//创建一个名为topicTest的topic,分区为3,副本为1
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topicTest --partitions 3 --replication- factor 1
//查看topicTest详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicTest
//创建一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topicTest
//创建一个生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topicTest
kafka集群配置
1.因为我们前面配置了单机的,zookeeper里面的配置需要清空,不然集群起不来.所以需要下载一个页面工具ZooInspector,下载地址https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
2.下载完成后从终端进入到ZooInspector/build目录下启动zookeeper-dev-ZooInspector.jar
data:image/s3,"s3://crabby-images/ef3c2/ef3c2cdb7812d0d472c709d4fec3d6f6e879bbde" alt=""
3.连接上本地的zookeeper
data:image/s3,"s3://crabby-images/ea4f4/ea4f4ce235dd592986b60686b5c7043579946ec8" alt=""
4.将目录清理至如下
data:image/s3,"s3://crabby-images/341c3/341c3dad16fcb1479bc9cc474e1d52702d41e1c3" alt=""
5.重启zookeeper服务
data:image/s3,"s3://crabby-images/982b4/982b4d39147ff4f87cc9a83dfae099bbf696e784" alt=""
6.准备3个配置文件,放在kafka_2.12-2.7.0文件夹下
data:image/s3,"s3://crabby-images/de325/de325771c1738cb8d4dc302501635f2c24005d1b" alt=""
data:image/s3,"s3://crabby-images/c902a/c902a1e208ebf21eef999863658e87079780c115" alt=""
data:image/s3,"s3://crabby-images/d7738/d7738f917c35f5a79b9c05cd98445043c8014e9c" alt=""
//不同文件ibroker.id不同
broker.id=11
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
//日志文件路径不同
log.dirs=/tmp/kafka/kafka-log1
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000000
delete.topic.enable=true
group.initial.rebalance.delay.ms=0
message.max.bytes=5000000
replica.fetch.max.bytes=5000000
//端口不同
listeners=PLAINTEXT://localhost:9001
broker.list=localhost:9001,localhost:9002,localhost:9003
zookeeper.connect=localhost:2181
?7.启动这3台kafka
./bin/kafka-server-start.sh kafka9001.properties
./bin/kafka-server-start.sh kafka9002.properties
./bin/kafka-server-start.sh kafka9003.properties
启动成功.
?8.spring boot集成,配置文件
data:image/s3,"s3://crabby-images/4d637/4d637b434163a910c6f57c637159dbac02c8bba6" alt=""
?9.引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.3</version>
<scope>compile</scope>
</dependency>
10.编写生产者和消费者类
data:image/s3,"s3://crabby-images/b70c6/b70c6ccf553d1460ce6c4075e1311a1be7e63183" alt=""
?
package com.kafka.demo.must1;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.kafka.core.KafkaTemplate;
/**
* @Author: LiXin
* @CreateTime: 2021/07/29/ 20:34
* @Presentation:
*/
@Service
public class ProducerDemoImpl implements ProducerDemo{
@Autowired
private KafkaTemplate kafkaTemplate;
@Override
public void send() {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send("test32","消息"+i);
System.out.println("已生产消息"+i);
}
}
}
data:image/s3,"s3://crabby-images/bbd83/bbd83881c39d17828dd72b283e4dcbd4ea798fb9" alt=""
?
package com.kafka.demo.must1;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.apache.kafka.common.serialization.Serializer;
/**
* @Author: LiXin
* @CreateTime: 2021/07/29/ 20:34
* @Presentation:
*/
@Service
public class ConsumerDemoImpl implements ConsumerDemo{
@Override
@KafkaListener(topics = {"test32"})
public void consumer(ConsumerRecord record) {
System.out.println("消费者收到消息"+record.value());
}
}
11.创建一个controller接口,便于从页面访问
package com.kafka.demo.must1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
/**
* @Author: LiXin
* @CreateTime: 2021/07/29/ 22:33
* @Presentation:
*/
@Controller
public class TestConsumerAndProducer {
@Autowired
private ProducerDemo producerDemo;
//页面访问地址:http://localhost:8080/sendMsg
@GetMapping("/sendMsg")
public String show(){
producerDemo.send();
return "调用生产成功";
}
}
12.启动服务
data:image/s3,"s3://crabby-images/bb8b6/bb8b604714d3b4bb1efe48bc4d0a8125ebe4b96e" alt=""
?
13.访问服务
data:image/s3,"s3://crabby-images/e3656/e3656ca7353ac33c042eaa25898ae8076f321a53" alt=""
页面因为没有返回资源,所以会报错404,但控制台可以看出是成功的 data:image/s3,"s3://crabby-images/6ad9f/6ad9f082b0942c69d411a74c94078a59cf34cbdd" alt=""
?
|