以下代码基于SpringKafka 2.3.13.RELEASE + Boot 2.2.9.RELEASE 实现
Producer 消息的可靠性
实现方案:ack模式调整 + 重试机制 + 规避重试机制下带来的问题
spring.kafka:
producer:
clientId: ${spring.application.name}
bootstrap-servers: 127.0.0.1:8080
acks: all
retries: 2
properties:
retry-backoff-ms: 1000
compressionType: "none"
batch-size: 16384
buffer-memory: 33554432
生产者:
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ProducerFuture implements FailureCallback, SuccessCallback<SendResult<String, Object>> {
private static final Logger logger = LoggerFactory.getLogger(ProducerFuture.class);
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
private String uniqueId;
public void async(String topicLcs, String value) {
uniqueId = MDC.get("UNIQUE_ID");
logger.info("send {} data:{}", topicLcs, value);
ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topicLcs, value);
listenableFuture.addCallback(this, this);
}
@Override
public void onFailure(Throwable ex) {
MDC.put(UNIQUE_ID, uniqueId);
logger.error("sendFailure:", ex);
MDC.remove(UNIQUE_ID);
}
@Override
public void onSuccess(SendResult<String, Object> result) {
MDC.put(UNIQUE_ID, uniqueId);
logger.info("sendSuccess {} ", result.getRecordMetadata().topic() + result.getRecordMetadata().offset());
MDC.remove(UNIQUE_ID);
}
}
启用重试机制后带来的问题
- 重试过程中,一条消息只会向同一个分区进行重试发送,所以在重试的机制下,也能保证消息的全局幂等性
- 由于重试,可能导致消息在 Node 中的顺序和 Producer 发送时的顺序不一致。可以对max.in.flight.requests.per.connectio(限制每个连接(指客户端与 Node 之间的 per.connection 连接)最多缓存 已发送但未收到响应的请求数,默认为5)设置为1,即可保证在重试机制下的消息顺序。
Consumer 消息的可靠性
实现方案:手动提交 offset + 重试机制 + 死信队列(告警) + 死信队列消息采用其他策略去处理消息
kafka配置:
spring.kafka:
consumer:
bootstrap-servers: 127.0.0.1:8080
autoOffsetReset: latest
enable-auto-commit: false
auto-commit-interval: 2000
fetchMinSize: 1
fetchMaxWait: 500
maxPollRecords: 500
配置类:
@Configuration
public class Config {
Logger logger = LoggerFactory.getLogger(Config.class);
private final String error_topic="error_topic";
@Bean
public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(ConsumerFactory consumerFactory, KafkaTemplate<String,Object> template) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
logger.error("重试机制后异常,consumerRecord:{}", consumerRecord.toString(), e);
template.send(error_topic,consumerRecord.toString());
}, new FixedBackOff(5000, 5));
factory.setErrorHandler(seekToCurrentErrorHandler);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
消费者:
@Component
public class ConsumerKafka {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final String CONSUMER_GROUP_PREFIX = "MOCK-A-GROUP";
@KafkaListener(topics = {"${kafka.topic.topic4Test}"}, groupId = CONSUMER_GROUP_PREFIX, containerFactory = "listenerContainerFactory")
public void onMessage(ConsumerRecord<String,Object> consumerRecord, Acknowledgment acknowledgeMode) {
acknowledgeMode.acknowledge();
}
}
|