springboot集成kafka多线程定时消费
1.使用版本
1.springframework
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.3.9</version>
</dependency>
2.springboot
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
</parent>
2.核心依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.核心代码
package com.example.demo_kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class ConsumerHandler {
private KafkaConsumer<Object, Object> consumer;
private ExecutorService executors;
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.1.240:9092");
props.put("group.id", "test01");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
@PostConstruct
public void initKafkaConfig() {
Properties properties = initConfig();
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test001"));
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofMillis(100));
if (!consumerRecords.isEmpty()) {
for (final ConsumerRecord record : consumerRecords) {
executors.submit(new Worker(record));
}
}
}
}
}
业务处理
package com.example.demo_kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class Worker implements Runnable {
private ConsumerRecord<String, String> consumerRecord;
public Worker(ConsumerRecord record) {
this.consumerRecord = record;
}
@Override
public void run() {
// 这里写你的消息处理逻辑只是简单地打印消息
System.out.println(Thread.currentThread().getId()+" "+Thread.currentThread().getName() + " consumed " + consumerRecord.partition()
+ "th message with offset: " + consumerRecord.offset()+"=======>"+ consumerRecord.value());
}
}
3.测试代码
package com.example.demo_kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.concurrent.ListenableFuture;
@EnableScheduling
@SpringBootApplication
public class DemoKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(DemoKafkaApplication.class, args);
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ConsumerHandler consumers;
@Scheduled(cron = "0/30 * * * * ? ")
public void test() {
System.err.println("定时消费");
consumers.execute(10);
}
@Scheduled(cron = "0/10 * * * * ? ")
public void test01() {
System.err.println("定时生产");
for (int i = 0; i < 10000; i++) {
try{
String message = "{\"x1\": 2133, \"x2\": 2477, \"y1\": 1568, \"y2\": 1888, \"conf\": 0.791015625, \"label_id\": 0, \"label_name\": \"trash\"}";
ListenableFuture<SendResult<String, String>> topic_test = kafkaTemplate.send("test001", message);
System.err.println(message);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
4.测试效果
5.实现原理
使用队列进行接收消息,在使用多线程进行消费信息#####
6.小结
|