kafka(组件 搭建 springboot集成 实战
kafka(组件 搭建 springboot集成 实战)
1、应用场景
1.1 kafka场景
Kafka最初是由LinkedIn公司采用Scala语言开发,基于ZooKeeper,现在已经捐献给了Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以 高吞吐、可持久化、可水平扩展、支持流处理等多种特性而被广泛应用。 Apache Kafka能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中,Kafka都有广泛的应用。 (1)日志收集:收集各种服务的log,通过kafka以统一接口服务的方式开放 给各种consumer,例如Hadoop、Hbase、Solr等; (2)消息系统:解耦和生产者和消费者、缓存消息等; (3)用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘; (4)运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告; (5)流式处理:比如spark streaming和storm;
1.2 kafka特性
kafka以高吞吐量著称,主要有以下特性: (1)高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒; (2)可扩展性:kafka集群支持热扩展; (3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失; (4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败); (5)高并发:支持数千个客户端同时读写;
1.3 消息对比
- 如果普通的业务消息解耦,消息传输,rabbitMq是首选,它足够简单,管理方便,性能够用。
- 如果在上述,日志、消息收集、访问记录等高吞吐,实时性场景下,推荐kafka,它基于分布式,扩容便捷
- 如果很重的业务,要做到极高的可靠性,考虑rocketMq,但是它太重。需要你有足够的了解
1.4 大厂应用
京东通过kafka搭建数据平台,用于用户购买、浏览等行为的分析。成功抗住6.18的流量洪峰 阿里借鉴kafka的理念,推出自己的rocketmq。在设计上参考了kafka的架构体系
2、基础组件
2.1 角色

- broker:节点,就是你看到的机器
- provider:生产者,发消息的
- consumer:消费者,读消息的
- zookeeper:信息中心,记录kafka的各种信息的地方
- controller:其中的一个broker,作为leader身份来负责管理整个集群。如果挂掉,借助zk重新选主
2.2 逻辑组件

- topic:主题,一个消息的通道,收发总得知道消息往哪投
- partition:分区,每个主题可以有多个分区分担数据的传递,多条路并行,吞吐量大
- Replicas:副本,每个分区可以设置多个副本,副本之间数据一致。相当于备份,有备胎更可靠
- leader & follower:主从,上面的这些副本里有1个身份为leader,其他的为follower。leader处理partition的所有读写请求
2.3 副本集合
- AR(Assigned Repllicas):所有副本的统称,AR=ISR+OSR
- ISR(In-Sync Replicas):同步中的副本,可以参与leader选主。一旦落后太多(数量滞后和时间滞后两个维度)会被踢到OSR。
- OSR(Out-Sync Replicas):踢出同步的副本,一直追赶leader,追上后会进入ISR
2.4 消息标记
 offset:偏移量,消息消费到哪一条了?每个消费者都有自己的偏移量
 HW:(high watermark):副本的高水印值,客户端最多能消费到的位置,HW值为8,代表offset为[0,8]的9条消息都可以被消费到,它们是对消费者可见的,而[9,12]这4条消息由于未提交,对消费者是不可见的。 LEO:(log end offset):日志末端位移,代表日志文件中下一条待写入消息的offset,这个offset上实际是没有消息的。不管是leader副本还是follower副本,都有这个值。
那么这三者有什么关系呢? 比如在副本数等于3的情况下,消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、 C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW一般会小于LEO,即LEO>=HW。
LEO>=HW>=OFFSET
3、架构探索
3.1 发展历程
http://kafka.apache.org/downloads 
3.1.1 版本命名
Kafka在1.0.0版本前的命名规则是4位,比如0.8.2.1,0.8是大版本号,2是小版本号,1表示打过1个补丁 现在的版本号命名规则是3位,格式是“大版本号”+“小版本号”+“修订补丁数”,比如2.5.0,前面的2代表的是大版本号,中间的5代表的是小版本号,0表示没有打过补丁 我们所看到的下载包,前面是scala编译器的版本,后面才是真正的kafka版本。
3.1.2 演进历史
0.7版本 只提供了最基础的消息队列功能。 0.8版本 引入了副本机制,至此Kafka成为了一个真正意义上完备的分布式高可靠消息队列解决方案。 0.9版本 增加权限和认证,使用Java重写了新的consumer API,Kafka Connect功能;不建议使用consumer API; 0.10版本 引入Kafka Streams功能,正式升级成分布式流处理平台;建议版本0.10.2.2;建议使用新版consumer API 0.11版本 producer API幂等,事务API,消息格式重构;建议版本0.11.0.3;谨慎对待消息格式变化 1.0和2.0版本 Kafka Streams改进;建议版本2.0;
3.2 集群搭建
1)原生启动 kafka启动需要zookeeper,第一步启动zk:
docker run --name zookeeper-1 -d -p 2181 zookeeper:3.4.13
原生安装:下载后解压启动即可 http://kafka.apache.org/downloads
bin/kafka-server-start.sh config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka/log
zookeeper.connect=zookeeper:2181
2)推荐docker-compose 一键启动
version: '3'
services:
zookeeper:
image: zookeeper:3.6.3
kafka-1:
container_name: kafka-1
image: wurstmeister/kafka
ports:
- 10903:9092
environment:
KAFKA_BROKER_ID: 1
HOST_IP: 192.168.31.236
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.31.236
KAFKA_ADVERTISED_PORT: 10903
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
kafka-2:
container_name: kafka-2
image: wurstmeister/kafka
ports:
- 10904:9092
environment:
KAFKA_BROKER_ID: 2
HOST_IP: 192.168.31.236
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.31.236
KAFKA_ADVERTISED_PORT: 10904
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
3.3 组件探秘
命令行工具是管理kafka集群最直接的工具。官方自带,不需要额外安装。
3.2.1 主题创建
docker exec -it kafka-1 sh
cd /opt/kafka/bin
kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 2
--replication-factor 1
3.2.2 查看主题
kafka-topics.sh --zookeeper zookeeper:2181 --list
3.2.3 主题详情
kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test
Topic: test PartitionCount: 2 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
3.2.4 消息收发
docker exec -it kafka-1 sh
cd /opt/kafka/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
如果起两个客户端都监听的话,发现是广播形式
3.2.5 分组消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --
group aaa
注意!!! 这是在消费者和分区数相等(都是2)的情况下。 如果同一group下的 ( 消费者数量 > 分区数量 ) 那么就会有消费者闲置。
验证方式: 可以再多启动几个消费者试一试,会发现,超出2个的时候,有的始终不会消费到消息。 停掉可以消费到的,那么闲置的会被激活,进入工作状态
3.2.6 指定分区
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --
partition 0
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --
partition 1
./kafka-console-producer.sh --broker-list kafka-1:9092 --topic test --property
parse.key=true
>1 1111
>1 2222
>2 3333
>2 4444
3.2.7 偏移量
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --
partition 0 --offset earliest
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --
partition 0 --from-beginning
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --
group begining-group --from-beginning
3.4 zk探秘
前面说过,zk存储了kafka集群的相关信息,本节来探索内部的秘密。 kafka的信息记录在zk中,进入zk容器,查看相关节点和信息
docker exec -it kafka_zookeeper_1 sh
>./bin/zkCli.sh
>ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

3.4.1 broker信息
[zk: localhost:2181(CONNECTED) 2] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.31.236:10903"],"jmx_port":-1,"port":10903,"host":"192.168.31.236","version":5,"timestamp":"1625978230130"}
cZxid = 0x27
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x27
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x27
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 196
numChildren = 0
3.4.2 主题与分区
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics
[__consumer_offsets, test]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test/partitions
[0, 1]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 12] get /brokers/topics/test/partitions/0/state
{"controller_epoch":2,"leader":1,"version":1,"leader_epoch":2,"isr":[1]}
[zk: localhost:2181(CONNECTED) 13]
cZxid = 0xb0
ctime = Tue Jan 05 05:56:06 GMT 2021
mZxid = 0xb0
mtime = Tue Jan 05 05:56:06 GMT 2021
pZxid = 0xb0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0
3.4.3 消费者与偏移量
[zk: localhost:2181(CONNECTED) 15] ls /consumers
[]
kafka 消费者记录 group 的消费 偏移量 有两种方式 : 1)kafka 自维护 (新) 2)zookpeer 维护 (旧) ,已经逐渐被废弃
查看方式: 上面的消费用的是控制台工具,这个工具使用–bootstrap-server,不经过zk,也就不会记录 到/consumers下。 其消费者的offffset会更新到一个kafka自带的topic【__consumer_offffsets】下面
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --list
KMOffsetCache-44acff134cad
aaa
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group
aaa
当前与LEO保持一致,说明消息都完整的被消费过  停掉consumer后,往provider中再发几条记录,offset开始滞后:  重新启动consumer,消费到最新的消息,同时再返回看偏移量,消息得到同步 
3.4.4 controller
[zk: localhost:2181(CONNECTED) 17] get /controller
{"version":1,"brokerid":1,"timestamp":"1609825245694"}
cZxid = 0x2a
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x2a
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x2a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 54
numChildren = 0
3.5.1 启动
kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源。提供可视化kafka集群操作 官网:https://github.com/yahoo/kafka-manager/releases 注意它的版本,docker社区的景象版本滞后于kafka,我们自己来打镜像
FROM daocloud.io/library/java:openjdk-8u40-jdk
ADD kafka-manager-2.0.0.2/ /opt/km2002/
CMD ["/opt/km2002/bin/kafka-manager","-
Dconfig.file=/opt/km2002/conf/application.conf"]
docker build -t km:2002 .
km:
image: km:2002
ports:
- 10906:9000
depends_on:
- zookeeper
3.5.2 使用
使用km可以方便的查看以下信息:
- cluster:创建集群,填写zk地址,选中jmx,consumer信息等选项
- brokers:列表,机器信息
- topic:主题信息,主题内的分区信息。创建新的主题,增加分区
- cosumers: 消费者信息,偏移量等
4、深入应用
4.1 springboot-kafka
1)配置文件
kafka:
bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904
producer:
retries: 0
acks: 1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: javagroup
enable-auto-commit: true
auto-commit-interval: 100
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
pom.xml
<name>kafka</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>1.9.6</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
2)启动信息

4.2 消息发送
4.2.1 发送类型
KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法 详细代码参考:AsyncProducer.java
@RestController
public class AsyncProducer {
private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/sync/{msg}")
public void sync(@PathVariable("msg") String msg) throws Exception {
Message message = new Message();
message.setMessage(msg);
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
logger.info("send result:{}",result.getProducerRecord().value());
}
}
消费者使用:KafkaConsumer.java
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = {"test"})
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("message:{}", msg);
}
}
}
1)同步发送
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send("test", JSON.toJSONString(message));
SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
logger.info("send result:{}",result.getProducerRecord().value());
通过swagger发送,控制台可以正常打印send result
2)阻断 在服务器上,将kafka暂停服务
在swagger发送消息 调同步发送:请求被阻断,一直等待,超时后返回错误  而调异步发送的(默认发送接口),请求立刻返回。  那么,异步发送的消息怎么确认发送情况呢???往下看! 3)注册监听 代码参考: KafkaListener.java 可以给kafkaTemplate设置Listener来监听消息发送情况,实现内部的对应方法
public class KafkaListener {
private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);
@Autowired
KafkaTemplate kafkaTemplate;
@PostConstruct
private void listener(){
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
logger.info("ok,message={}",producerRecord.value());
}
@Override
public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
logger.error("error!message={}",producerRecord.value());
}
});
}
}
查看控制台,等待一段时间后,异步发送失败的消息会被回调给注册过的listener
com.itheima.demo.config.KafkaListener:error!message= {"message":"1","sendTime":1609920296374}
启动kafka
docker-compose unpause kafka-1 kafka-2
再次发送消息时,同步异步均可以正常收发,并且监听进入success回调
com.itheima.demo.config.KafkaListener$1:ok,message=
{"message":"1","sendTime":1610089315395}
com.itheima.demo.controller.PartitionConsumer:patition=1,message:
[{"message":"1","sendTime":1610089315395}]
可以看到,在内部类 KafkaListener$1 中,即注册的Listener的消息
4.2.2 序列化
消费者使用:KafkaConsumer.java
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = {"test"})
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("message:{}", msg);
}
}
}
1)序列化详解
- 前面用到的是Kafka自带的字符串序列化器
(org.apache.kafka.common.serialization.StringSerializer) - 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
- 这些序列化器都实现了接口 (org.apache.kafka.common.serialization.Serializer)
- 基本上,可以满足绝大多数场景
2)自定义序列化 自己实现,实现对应的接口即可,有以下方法:
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}
byte[] serialize(String var1, T var2);
default byte[] serialize(String topic, Headers headers, T data) {
return this.serialize(topic, data);
}
default void close() {
}
}
案例,参考: MySerializer.java
public class MySerializer implements Serializer {
@Override
public byte[] serialize(String s, Object o) {
String json = JSON.toJSONString(o);
return json.getBytes();
}
}
在yaml中配置自己的编码器
value-serializer: com.itheima.demo.config.MySerializer
重新发送,发现:消息发送端编码回调一切正常。但是消费端消息内容不对!
com.itheima.demo.controller.KafkaListener$1:ok,message=
{"message":"1","sendTime":1609923570477}
com.itheima.demo.controller.KafkaConsumer:message:"
{\"message\":\"1\",\"sendTime\":1609923570477}"
怎么办? 3)解码 发送端有编码并且我们自己定义了编码,那么接收端自然要配备对应的解码策略 代码参考:MyDeserializer.java,实现方式与编码器几乎一样!
在yaml中配置自己的解码器
value-deserializer: com.itheima.demo.config.MyDeserializer
public class MyDeserializer implements Deserializer {
private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);
@Override
public Object deserialize(String s, byte[] bytes) {
try {
String json = new String(bytes,"utf-8");
return JSON.parse(json);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}
再次收发,消息正常
com.itheima.demo.controller.AsyncProducer$1:ok,message=
{"message":"1","sendTime":1609924855896}
com.itheima.demo.controller.KafkaConsumer:message:
{"message":"1","sendTime":1609924855896}
4.2.3 分区策略
分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。
- 给定了分区号,直接将数据发送到指定的分区里面去
- 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
- 既没有给定分区号,也没有给定key值,直接轮循进行分区
- 自定义分区,你想怎么做就怎么做
1)验证默认分区规则
发送者代码参考:PartitionProducer.java
@RestController
public class PartitionProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/partitionSend/{key}")
public void setPartition(@PathVariable("key") String key) {
kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");
}
@GetMapping("/kafka/keysend/{key}")
public void setKey(@PathVariable("key") String key) {
kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");
}
}
消费者代码使用:PartitionConsumer.java
public class PartitionConsumer {
private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);
@KafkaListener(topics = {"test"},topicPattern = "0")
public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("partition=0,message:[{}]", msg);
}
}
@KafkaListener(topics = {"test"},topicPattern = "1")
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("partition=1,message:[{}]", msg);
}
}
}
通过swagger访问setKey:  看控制台:  再访问setPartition来设置分区号0来发送  看控制台:  2)自定义分区 你想自己定义规则,根据我的要求,把消息投放到对应的分区去? 可以! 参考代码:MyPartitioner.java
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String keyStr = key+"";
if (keyStr.startsWith("0")){
return 0;
}else {
return 1;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
MyPartitionTemplate.java ,
@Configuration
public class MyPartitionTemplate {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
KafkaTemplate kafkaTemplate;
@PostConstruct
public void setKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
}
public KafkaTemplate getKafkaTemplate(){
return kafkaTemplate;
}
}
发送使用:MyPartitionProducer.java
@RestController
public class MyPartitionProducer {
@Autowired
MyPartitionTemplate template;
@GetMapping("/kafka/myPartitionSend/{key}")
public void setPartition(@PathVariable("key") String key) {
template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定义分区策略");
}
}
使用swagger,发送0开头和非0开头两种key试一试! 备注: 自己定义config参数,比较麻烦,需要打破默认的KafkaTemplate设置 可以将KafkaConfiguration.java中的getTemplate加上@Bean注解来覆盖系统默认bean 这里为了避免混淆,采用@Autowire注入
4.3 消息消费
4.3.1 消息组别
发送者使用:KafkaProducer.java
@RestController
public class KafkaProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/test/{msg}")
public void sendMessage(@PathVariable("msg") String msg) {
Message message = new Message();
message.setMessage(msg);
kafkaTemplate.send("test", JSON.toJSONString(message));
}
}
1)代码参考:GroupConsumer.java,Listener拷贝3份,分别赋予两组group,验证分组消费:
@Component
public class GroupConsumer {
private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);
@KafkaListener(topics = {"test"},groupId = "group1")
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("group:group1-1 , message:{}", msg);
}
}
@KafkaListener(topics = {"test"},groupId = "group1")
public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("group:group1-2 , message:{}", msg);
}
}
@KafkaListener(topics = {"test"},groupId = "group2")
public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("group:group2 , message:{}", msg);
}
}
}
2)启动  3)通过swagger发送2条消息  同一group下的两个消费者,在group1均分消息 group2下只有一个消费者,得到全部消息
4)消费端闲置 注意分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会出现消费者闲置,浪费资源!
验证方式: 停掉项目,删掉test主题,重新建一个 ,这次只给它分配一个分区。 重新发送两条消息,试一试  解析: group2可以消费到1、2两条消息 group1下有两个消费者,但是只分配给了 -1 , -2这个进程被闲置
4.3.2 位移提交
1)自动提交 前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交
enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
2)手动提交 有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。 下面我们自己定义配置,覆盖上面的参数
@Configuration
public class MyOffsetConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
通过在消费端的Consumer来提交偏移量,有如下几种方式:
public class MyOffsetConsumer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = "test",groupId = "myoffset-group-1",containerFactory = "manualKafkaListenerContainerFactory")
public void manualCommit(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
logger.info("手动提交偏移量 , partition={}, msg={}", partition, message);
consumer.commitSync();
}
@KafkaListener(topics = "test",groupId = "myoffset-group-2",containerFactory = "manualKafkaListenerContainerFactory")
public void noCommit(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
logger.info("忘记提交偏移量, partition={}, msg={}", partition, message);
}
public void manualOffset(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
try {
logger.info("同步异步搭配 , partition={}, msg={}", partition, message);
consumer.commitAsync();
} catch (Exception e) {
System.out.println("commit failed");
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}
public void offset(ConsumerRecord record, Consumer consumer) {
logger.info("手动指定任意偏移量, partition={}, msg={}",record.partition(),record);
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(currentOffset);
}
}
同步提交、异步提交:manualCommit() ,同步异步的差别,下面会详细讲到。 指定偏移量提交:offset() 3)重复消费问题 如果手动提交模式被打开,一定不要忘记提交偏移量。否则会造成重复消费! 代码参考和对比:manualCommit() , noCommit()
验证过程: 用km将test主题删除,新建一个test空主题。方便观察消息偏移 注释掉其他Consumer的Component注解,只保留当前MyOffsetConsumer.java 启动项目,使用swagger的KafkaProducer发送连续几条消息 留心控制台,都能消费,没问题:  但是!重启试试:  无论重启多少次,不提交偏移量的消费组,会重复消费一遍!!!
再通过命令行查询偏移量试试:  4)经验与总结
commitSync()方法,即同步提交,会提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,
commitSync()会一直重试,但是commitAsync()不会。
这就造成一个陷阱:
如果异步提交,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题
导致的,那么后续的提交总会有成功的。只要成功一次,偏移量就会提交上去。
但是!如果这是发生在关闭消费者时的最后一次提交,就要确保能够提交成功,如果还没提交完就停掉了进
程。就会造成重复消费!
因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
详细代码参考:MyOffsetConsumer.manualOffset()
|