Kafka单机环境搭建及整合SpringBoot完成基本使用
Kafka单机环境搭建
下载kafka_2.11-1.1.0.tgz版本
下载地址 https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz
上传到服务器,我使用的是centos7 环境
使用ftp工具上传到/opt目录下
解压:tar -zxvf kafka_2.11-1.1.0.tgz
进入kafka目录
cd /opt/kafka_2.11-2.1.1
启动zookeeper(kafka自带zookeeper)和kafka
注意:启动这两个的前提是你的服务器已经装好了java环境
#安装open-jdk11
yum install java-11-openjdk-devel.x86_64
默认不修改任何配置 zookeeper地址为:你服务器ip:2181
kafka地址 你服务器ip:9092
前台启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
后台运行:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-run.log 2>&1 &
前台启动kafka:
bin/kafka-server-start.sh config/server.properties
后台运行:
nohup bin/kafka-server-start.sh config/server.properties > kafka-run.log 2>&1 &
通过linux命令查看zookeeper和kafka是否启动成功
netstat -nltp
可以看到9092和2181两个端口正常运行就说明启动成功了
整合SpringBoot实现一个基本的demo
开发环境
jdk: jdk-11.0.10.9-hotspot
maven: 3.8.1
IDE: IntelliJ IDEA 2021.1.3
SpringBoot: 2.3.2.RELEASE
1.新建springboot项目
使用Spring Initializr创建过程不再赘述
项目目录:
2.配置pom.xml环境
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
3.application.properties配置文件
spring.application.name=kafka-demo
server.port=8080
# kafka 配置
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=10.172.29.100:9092
# provider
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# consumer
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
4.生产者和消费者
生产者
@Slf4j
@RestController
public class KafkaProducer {
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping("test")
public String testKafka() {
String str = new String("hello kafka!");
kafkaTemplate.send(MessageTopic.TEST_TOPIC, JSONObject.toJSONString(str));
log.info("发送消息到kafka,消息体:{}", str);
return "成功发送消息到kafka";
}
}
消费者
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = {MessageTopic.TEST_TOPIC})
public void getMessages(ConsumerRecord<String, String> consumerRecord) {
String value = consumerRecord.value();
log.info("接收到kafka消息,消息为:{}", value);
}
@KafkaListener(topics = {MessageTopic.TEST_USER_TOPIC})
public void getUserMessages(ConsumerRecord<String, String> consumerRecord) {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
log.info(">>>>>>>>>> record =" + kafkaMessage);
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("消费kafka消息,消息为:{}", message);
}
}
}
MessageTopic
public class MessageTopic {
public static final String TEST_TOPIC = "test";
public static final String TEST_USER_TOPIC = "test-user";
}
5.启动项目测试
使用postman工具测试 http://localhost:8080/test
6.测试结果
加入一个实体类模拟普通业务场景
@Data
@Accessors(chain = true)
public class User {
private String id;
private String username;
private String state;
}
消费者不变,加入一个kafkaHandler发送消息
/**
* @author 赵赳
* @since: 2021/8/10 18:00
* @desc:
*/
@Slf4j
@Component
public class kafkaHandler {
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public kafkaHandler(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* 发送用户信息
*
* @param id 用户id
*/
public void sendMessages(String id) {
var user = new User().setId(id).setUsername("tom").setState("0");
kafkaTemplate.send(MessageTopic.TEST_USER_TOPIC, JSONObject.toJSONString(user));
log.info("发送消息到kafka,消息内容:{}", user);
}
}
测试类
kafkaHandler kafkaHandler;
@Autowired
public KafkaDemoApplicationTests(kafkaHandler kafkaHandler) {
this.kafkaHandler = kafkaHandler;
}
@Test
void sendMessages() {
for (int i = 0; i < 10; i++) {
kafkaHandler.sendMessages(String.valueOf(++i));
}
}
启功项目执行测试方法
测试结果
|