spring-boot 集成 kafka 2.5.7
1.maven依赖(生产者、消费者都需要)
<!-- kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.7</version>
</dependency>
2.生产者配置文件yml格式:
spring:
kafka:
## kafka服务器地址 多个服务集群“,”分隔
bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092
producer:
acks: 0
retries: 1
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
#这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
# batch-size: 100
# 如果不设置linger.ms,其默认值就是0
# linger.ms: 5
topic:
testTopic: testTopicName
testTopicTwo: testTopicNameTwo
2.消费者配置文件yml格式:
spring:
kafka:
## kafka服务器地址
bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092
consumer:
# 设置kafka的消费者组
group-id: group_name
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
# 实时生产,实时消费,不会从头开始消费
auto-offset-reset: latest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: true
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 批量消费每次最多消费多少条消息
max-poll-records: 50
listener:
# 在侦听器容器中运行的线程数。
concurrency: 1
# 设置批量消费
type: batch
#listner负责ack,每调用一次,就立即commit
# ack-mode: manual_immediate
# 如果Broker上不存在至少一个配置的主题(topic),则容器是否无法启动,
# 该设置项结合Broker设置项allow.auto.create.topics=true,如果为false,则会自动创建不存在的topic
# missing-topics-fatal: false
# 主题同生产者
topic:
testTopic: testTopicName
testTopicTwo: testTopicNameTwo
4.linux命令,在kafka安装目录下执行
- 查询zk中的主题
./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181 - 创建主题
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 1 --topic Re_receive
5.生产者代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
@Slf4j
public class DataProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void send(String topic,Object obj) {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
log.info(topic + " - 生产者 发送消息成功");
}
});
}
}
生产者业务类:
@Value("${spring.kafka.topic.testTopic}")
private String topic;
@Resource
DataProducer dp;
public void testSendInfo(){
MyInfoObj info = new MyInfoObj();
dp.send(furnaceTopic, JSON.toJSONString(info));
}
5.消费者代码
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.Executor;
@Component
@Slf4j
public class ProductionConsumer {
@Resource(name = "taskExecutor")
private Executor taskExecutor;
@KafkaListener(topics = "${spring.kafka.topic.testTopic}")
public void consumer(List<ConsumerRecord<String, String>> records) {
taskExecutor.execute(() -> {
for (ConsumerRecord<String, String> record : records) {
consumerRecord(record);
}
});
}
public void consumerRecord(ConsumerRecord<String, String> record) {
try {
JSONObject json = JSONObject.parseObject(record.value());
} catch (Exception e) {
log.error("kafka接收数据异常" + e);
}
}
}
taskExecutor线程池类(配置根据自己的要求,这里不做详细介绍)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class ExecutorConfig {
@Bean("taskExecutor")
public Executor taskExecutro(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(100);
taskExecutor.setMaxPoolSize(150);
taskExecutor.setQueueCapacity(500);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("taskExecutor--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
taskExecutor.initialize();
return taskExecutor;
}
}
– 本文完
|