首先我们看下项目的基本结构:

?在上一篇文章中我们搭建zookeeper和kafka?https://blog.csdn.net/lchmyhua88/article/details/119080644
这你就不在多说这个了,我们先把zookeeper和kafka 服务气起来:
sudo bin/zkServer.sh start

bin/zkCli.sh -server 127.0.0.1:2181


kafka服务:

下面我们来 实现:
application.yml配置如下:
server:
port: 9990
spring:
application:
name:JAVA Kafka URL
# kafka相关配置
kafka:
# kafka集群配置
bootstrap-servers: 192.168.33.10:9092
####初始化生产者配置#####
producer:
# 重试次数
retries: 0
# 应答级别:多少个分区副本北非结束之后向生产者发送ack确认(可选 0、1、all\-1)
acks: 1
# 批量大小
batch-size: 16348
# 提交延时
properties:
linger: 0
#当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产者缓冲区大小(总字节数)
buffer-memory: 33554432
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
####初始化消费者配置#####
consumer:
# 默认消费组
group-id: group-user
properties:
# 消费会话超时时间
session:
timeout: 120000
# 消费请求超时时间
request:
timeout: 120000
# 是否自动体骄傲offset
enable-auto-commit: true
# 提交offset延时(收到消息之后多久提交offset)
auto-commit-interval: 1000ms
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-offset-reset: earliest
# Kafka提供的序列化和反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
listener:
missing-topics-fatal: false
# 自定义配置
kafka:
bootstrap:
servers: 192.168.33.10:9092
topic:
user: huage
consumer:
id: group-user
设置一个主题,10个分区:
@Configuration
public class KafkaInitialConfiguration {
@Bean
public NewTopic initTopic(){
//创建一个主题名称为:topic1 分区数为10,分区副本为2
return new NewTopic("topic1",10,(short)1);
}
}
生产者:
@Slf4j
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Value("${kafka.topic.user}")
private String topicName;
public void sendMessage(Message message){
GsonBuilder gson = new GsonBuilder();
gson.setPrettyPrinting();
gson.setDateFormat("yyyy-mm-dd HH:mm:ss");
String toJson = gson.create().toJson(message);
kafkaTemplate.send(topicName,toJson);
log.info("生产者发送消息至kafka:" + message);
}
}
消费者读取方法:
@Slf4j
@Component
public class KafkaConsumer {
@Value("${kafka.topic.user}")
private String topicName;
public void consume(){
Properties props = new Properties();
//设置配置属性
props.put("bootstrap.servers", "192.168.33.10:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "group-user");
//自动提交offset,每1s提交一次(提交后的消息不再消费,避免重复消费问题)
props.put("enable.auto.commit","true");
//自动提交时间间隔
props.put("auto.commit.interval.ms","1000");
//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
props.put("auto.offset.reset", "earliest");
//根据上面的配置,新增消费者对象
org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
//订阅topic
consumer.subscribe(Collections.singletonList(topicName));
while (true){
//从服务器拉取数据
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record->{
System.out.printf("成功消费消息:topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
});
}
}
}
通过URL来消费
@RestController
@RequestMapping("/kafka/consumer")
public class KafkaConsumerController {
@KafkaListener(topics = {"topic1"})
public void receiveMessage(ConsumerRecord<String,Object> record){
System.out.println("消费信息:" + record.topic() +":" + record.value());
}
}
通过URL来生成消息
@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@GetMapping("/send/{msg}")
private void sendMessageByKafkaProducer(@PathVariable("msg") String msg){
kafkaTemplate.send("topic1",msg);
}
/**
* 添加回调监听
* @param msg
*/
@GetMapping("/send/callback/{msg}")
private void sendMessageByKafkaProducerAndCallback(@PathVariable("msg") String msg){
kafkaTemplate.send("topic1",msg).addCallback(sucess->{
//消息成功发送到topic
String topic = sucess.getRecordMetadata().topic();
//消息发送到的分区
int partition = sucess.getRecordMetadata().partition();
//消息所在分区内地offset
long offset = sucess.getRecordMetadata().offset();
System.out.println("消息成功发送到:" + topic + partition + offset);
},failure->{
System.out.println("消息发送失败:" + failure.getMessage());
});
}
}
消息的消费读取
@RestController
@RequestMapping("/kafka")
public class KafkaMessageController {
@Autowired
private KafkaProducer kafkaProducer;
@Autowired
private KafkaConsumer kafkaConsumer;
/**
* 生产者发送信息
* */
@RequestMapping("/send")
public void kafkaSend(){
Message message = Message.builder()
.msg("我是从KafkaMessageController 通过KafkaProducer.sendMessage() 方法发送过来的消息")
.code("200")
.sentTime("生产者发送消息")
.sender("huage")
.build();
kafkaProducer.sendMessage(message);
}
/**
* 消费者读取信息
*/
@RequestMapping("/read")
public void kafkaRead(){
kafkaConsumer.consume();
}
}
入口方法
@SpringBootApplication
@RestController
public class huageApplication {
public static void main(String[] args) {
SpringApplication.run(huageApplication.class, args);
}
@RequestMapping("/")
public String hello(){
return "hello huage ,this is springboot project";
}
}
接着 启动项目:

?

?好了,让我们通过url来访问一下看看是否启动服务:?
在浏览器地址输入:
http://192.168.33.1:9990/看到如下:

?这个时候我们可以发送消息了:
发送消息
http://192.168.33.1:9990/kafka/producer/send/计算机看到几个点

打印 消息
http://192.168.33.1:9990/kafka/producer/send/callback/计算机看到几个点

?
以上可以看到消息发送了
这个时候我们代码来消费消息:
http://192.168.33.1:9990/kafka/read

?
查询消费者组
?bin/kafka-consumer-groups.sh --bootstrap-server 192.168.33.10:9092 --list

查询消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.33.10:9092 --describe --group group-user?


?可以看到现在LAG消息堆积量为0,最大偏移量18,消费正常
|