springboot中集成kafka,主要目的干啥呢,当然消息推送啦。不同系统之间,自身系统不同组件之间消息通信的一种方式,也可以是使用MQ。
为什么要使用咱们的消息系统呢:个人看来,目的主要就是为了解耦,异步通信,消峰处理。
消息系统三大优点
解耦:
怎么理解呢,比如我是A系统,我要现在要给B、C两个系统发送消息,如果不用消息系统,直接调用,就相当于A系统跟B、C系统强耦合到一起了,如果后面还有D、E......等系统怎么办呢,我总不能挨着挨着一个一个写吧,这样代码耦合太高了,而且我还得考虑别人收到没有,处理成功失败等等情况。那我使用消息系统不就解决这个问题了嘛,我直管向Kafka推送消息,我才不管你谁来消费呢,你想消费你就去消费Kafka消息。这不就解耦了嘛。
异步:
系统A调用其他系统接口的时候,需要一直等待其他系统处理完成它的业务逻辑后,返回处理结果,我才能继续处理我的业务逻辑,但是它的业务逻辑我其实不关心,我没必要等啊。
那用异步不就好了吗,我将消息发给Kafka,你其他系统去消费就行了,我监听你返回的处理结果就行了,我发送完后就可以继续做其他操作了,不用一直等着。
消峰:
比如电商在秒杀的时候,用户量暴增,但是它又只是一小段时间的爆发量。如果不处理,那服务器不得直接挂了。怎么办呢?那我们用Kafka啊,来了请求我就推送消息到Kafka,然后呢,消费设置一秒消费多少就好,然后等高峰过去,用户量回归正常,积累的消息也会慢慢的被消费完。这样咱不是又能愉快的玩耍了嘛。
Kafka术语:
Producer:生产者,消息的生产者,负责推送消息到Kafka队列
Consumer:消费者,负责消费Kafka队列中的消息
Consumer group:用来实现消息广播(发给多个Consumer)或单播(发给单个Consumer)的手段
Offset:kafka存储消息的偏移量,可以理解为下标用来控制消息的消费位置
Broker:Kafka服务节点,一个kafka服务器就是一个Broker,一个集群由多个Broker组成,一个? ? ? ? ? ? ? ? ? Broker下可以有多个Topic
Topic:消息的类别、标题,可以理解为是一个消息的队列
Partition:属于Topic的子集,消息分区,一个Topic可以有多个partition,一个partition在物理上对? ? ? ? ? ? ? ? ? ?应了一个文件夹;partition中的所有消息都会分配一个offset
Kafka消息模式:
点对点消息传递模式:
一个消息推送出去,只能被消费一次,任何一个消费者消费了该消息后,其他消费者都不能继续消费该消息
发布-订阅模式:
消息发布到队列,所有订阅了topic的的消费者都可以消费topic里的所有消息。
话不多说,直接开始:
1.pom文件中添加maven引用
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.yml文件中增加kafka的配置
spring:
kafka:
# 指定kafka server的地址,集群配多个,中间,逗号隔开
bootstrap-servers:
- 127.0.0.1:9092
- 127.0.0.1:9093
- 127.0.0.1:9094
# kafka生产者配置
producer:
# 写入失败时,重试次数。当leader失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
retries: 0
# 每次批量发送消息的数量,produce积累到一定数据,一次发数据量
# 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。
# 这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置(16K)
batch-size: 16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
# #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
buffer-memory: 33554432
#默认情况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy压缩算法由Google研发,
#这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
#如果不开启压缩,可设置为none(默认值),比较大的消息可开启
compressionType: none
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,
# 无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后
# 立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,
# 这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
acks: all
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 连接超时时间
properties:
request.timeout.ms: 30000
3.配置生产者Producer或者消费者Consumer
首先当然是生产者了,发送消息Producer:
package com.liu.kafka;
import com.liu.constants.KafkaConstants;
import com.liu.utils.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.*;
import java.util.concurrent.ExecutionException;
import static com.alibaba.fastjson.JSON.toJSONString;
/**
* kafka生产者类
* @author kevin
* @date 2021/7/28
*/
@Component
@Slf4j
@SuppressWarnings({"unused"})
public class ProducerUtils {
private static final String PUSH_MSG_LOG = "准备发送消息为:{}";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private KafkaAdminClient kafkaAdminClient;
/**
* 如果没有topic,则创建一个
* @author kevin
* @param topicName :
* @param partitionNum :
* @param replicaNum :
* @return org.apache.kafka.clients.admin.CreateTopicsResult
* @date 2021/8/5 9:42
*/
public Boolean createTopic(String topicName, int partitionNum, int replicaNum){
KafkaFuture<Set<String>> topics = kafkaAdminClient.listTopics().names();
try {
if (topics.get().contains(topicName)) {
return true;
}
NewTopic newTopic = new NewTopic(topicName, partitionNum, (short) replicaNum);
kafkaAdminClient.createTopics(Collections.singleton(newTopic));
return true;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return false;
}
}
/**
* 传入topic名称,json格式字符串的消息,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param jsonStr : 消息json字符串
* @return boolean : 推送是否成功
* @date 2021/7/28 15:53
*/
public boolean sendMessage(String topicName, String jsonStr) {
createTopic(topicName, 5, 5);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,json格式字符串数组的消息,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param jsonStrs : 消息json字符串数组
* @return boolean : 推送是否成功
* @date 2021/7/28 15:53
*/
public Boolean[] sendMessage(String topicName, String[] jsonStrs) {
createTopic(topicName, 5, 5);
int msgLength = jsonStrs.length;
Boolean[] success = new Boolean[msgLength];
for (int i = 0; i < msgLength; i++) {
String jsonStr = jsonStrs[i];
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,消息对象,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param obj : 消息对象
* @return boolean : 推送是否成功
* @date 2021/7/28 15:53
*/
public boolean sendMessage(String topicName, Object obj) {
createTopic(topicName, 5, 5);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,消息对象数组,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param list : 消息对象数组
* @return boolean : 推送是否成功
* @date 2021/7/28 15:56
*/
public Boolean[] sendMessage(String topicName, List<Object> list) {
createTopic(topicName, 5, 5);
Boolean[] success = new Boolean[list.size()];
for (int i = 0; i < list.size(); i++) {
Object obj = list.get(i);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,json格式字符串的消息,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param key : 消息key
* @param jsonStr : 消息json字符串
* @return boolean : 推送是否成功
* @date 2021/7/28 15:53
*/
public boolean sendMessage(String topicName, String key, String jsonStr) {
createTopic(topicName, 5, 5);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
key, jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,json格式字符串数组的消息,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param key : 消息key
* @param jsonStrs : 消息json字符串数组
* @return boolean : 推送是否成功
* @date 2021/7/28 15:53
*/
public Boolean[] sendMessage(String topicName, String key, String[] jsonStrs) {
createTopic(topicName, 5, 5);
int msgLength = jsonStrs.length;
Boolean[] success = new Boolean[msgLength];
for (int i = 0; i < msgLength; i++) {
String jsonStr = jsonStrs[i];
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
key, jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,消息对象,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param key : 消息key
* @param obj : 消息对象
* @return boolean : 推送是否成功
* @date 2021/7/28 15:53
*/
public boolean sendMessage(String topicName, String key, Object obj) {
createTopic(topicName, 5, 5);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
key, jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,消息对象数组,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param key : 消息key
* @param list : 消息对象数组
* @return boolean : 推送是否成功
* @date 2021/7/28 15:56
*/
public Boolean[] sendMessage(String topicName, String key, List<Object> list) {
createTopic(topicName, 5, 5);
Boolean[] success = new Boolean[list.size()];
for (int i = 0; i < list.size(); i++) {
Object obj = list.get(i);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
key, jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,json格式字符串的消息,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param partition : 消息发送分区
* @param key : 消息key
* @param jsonStr : 消息json字符串
* @return boolean : 推送是否成功
* @date 2021/7/28 15:53
*/
public boolean sendMessage(String topicName, int partition, String key, String jsonStr) {
createTopic(topicName, 5, 5);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
partition, key, jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,json格式字符串数组的消息,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param partition : 消息发送分区
* @param key : 消息key
* @param jsonStrs : 消息json字符串数组
* @return boolean : 推送是否成功
* @date 2021/7/28 15:53
*/
public Boolean[] sendMessage(String topicName, int partition, String key, String[] jsonStrs) {
createTopic(topicName, 5, 5);
int msgLength = jsonStrs.length;
Boolean[] success = new Boolean[msgLength];
for (int i = 0; i < msgLength; i++) {
String jsonStr = jsonStrs[i];
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
partition, key, jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,消息对象,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param partition : 消息发送分区
* @param key : 消息key
* @param obj : 消息对象
* @return boolean : 推送是否成功
* @date 2021/7/28 15:53
*/
public boolean sendMessage(String topicName, int partition, String key, Object obj) {
createTopic(topicName, 5, 5);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
partition, key, jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,消息对象数组,生产者进行发送
* @author kevin
* @param topicName : topic名称
* @param partition : 消息发送分区
* @param key : 消息key
* @param list : 消息对象数组
* @return boolean : 推送是否成功
* @date 2021/7/28 15:56
*/
public Boolean[] sendMessage(String topicName, int partition, String key, List<Object> list) {
createTopic(topicName, 5, 5);
Boolean[] success = new Boolean[list.size()];
for (int i = 0; i < list.size(); i++) {
Object obj = list.get(i);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(
topicName, partition, key, jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 处理消息推送结果
* @author kevin
* @param future :
* @return boolean
* @date 2021/7/28 15:56
*/
private boolean dealSendResult(ListenableFuture<SendResult<String, Object>> future) {
final boolean[] success = {false};
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
log.info(KafkaConstants.TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
success[0] = false;
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//成功的处理
log.info(KafkaConstants.TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
success[0] = true;
}
});
return success[0];
}
}
此处生产者是异步发送消息,不用等待消息发送完成。
接下来就是消费者Consumer了:
package com.liu.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.transaction.Transactional;
import java.util.List;
import java.util.Optional;
/**
* kafka消费者类
* @author kevin
* @date 2021/7/28
*/
@Component
@Slf4j
public class ConsumerUtils {
@Autowired
private RedisUtils redisUtils;
@Bean
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(5);
factory.getContainerProperties().setPollTimeout(1000);
factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
return factory;
}
/**
* 单条的消费kafka消息
* @author kevin
* @param record : 消息记录
* @param ack : ack回调确认
* @return void :
* @date 2021/8/3 15:14
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
@TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"0" ,"2" ,"4"}),
}, groupId = KafkaConstants.TOPIC_GROUP1)
public void topicTest(ConsumerRecord<String, String> record, Acknowledgment ack) {
Optional<String> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test 消费了: Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg);
ack.acknowledge();//手动提交offset
}
}
/**
* 批量的消费kafka消息,要配合containerFactory使用,配置的bean见batchFactory
* @author kevin
* @param records : 消息记录列表
* @param ack : ack回调确认
* @return void :
* @date 2021/8/3 15:15
*/
@Transactional(rollbackOn = Exception.class)
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
@TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"1", "3"}),
}, groupId = KafkaConstants.TOPIC_GROUP2, containerFactory="batchFactory")
public void topicTest2(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
try {
for (ConsumerRecord<String, String> record : records) {
//取到消息后,先查询缓存中是否已经存在,存在表示不需要再次处理
//如果消息不存在,业务处理完成后将消息存入redis;如果消息存在直接跳过;这样可防止重复消费
boolean isExists = redisUtils.hasKey(record.topic() + record.partition() + record.key());
if (!isExists) {
Optional<String> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test1 消费了: Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg);
}
redisUtils.set(record.topic() + record.partition() + record.key(), record.value());
}
}
ack.acknowledge();//手动提交offset
}catch (Exception e){
log.error(e.getMessage());
throw e;
}
}
}
消费者,写了两种模式,批量获取消息与单条获取消息,获取消息的时候,指定topic,partition。通过KafkaListener监听来消费消息。
4.使用
接下来当然就是使用了,直接定义一个接口,然后访问接口,在接口中调用Producer发送消息,就可在Consumer消费者中监听获取到消息。
@ApiOperation(value = "测试1", notes = "test2")
@GetMapping(value = "/test2")
public ResponseVo test2(String value, String key, Integer partition){
if(StringUtils.isBlank(value)){
return new ResponseVo.Builder().error().message("请从传入发送的消息!").build();
}
Message message = new Message.Builder().id(UuidUtil.getUuid32()).msg(value).sendTime(DateUtils.nowDate()).build();
String str = JSONObject.toJSONString(message);
if(StringUtils.isNotBlank(key)){
if(partition != null){
producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, partition, key, str);
}else{
producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, key, str);
}
}else {
producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, str);
}
return null;
}
5.测试验证
打开swagger页面,找到接口,录入参数:
data:image/s3,"s3://crabby-images/f8ab1/f8ab10d3ffb8a1c8071efce05261883312b96ed8" alt=""
点击execute执行测试。结果如下图:
data:image/s3,"s3://crabby-images/f4566/f4566e1dba9cf4785559c76fb1aad6aa2f22cfa6" alt=""
?
?
|