1.安装kafka
参考 https://blog.csdn.net/SpringHASh/article/details/120323423
2.生产者
package com.wang.learn.cloudkafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/{message}")
public void sendMessage(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("test1", normalMessage);
}
}
3.消费者
package com.wang.learn.cloudkafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test1"}, groupId = "test-consumer-group")
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
server:
port: 7889
spring:
kafka:
bootstrap-servers: 192.168.26.130:9092
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger.ms: 1
consumer:
enable-auto-commit: false
auto-commit-interval: 100ms
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 15000
group-id: test-consumer-group1
4.测试
完整源码 添加链接描述
|