java高频的获取kafka数据,导致数据库数据一致在高频读写,为了降低数据库的高频连接搞高频读写,可以将数据堆积一段时间之后,进行插入数据库操作。
主要采用了队列和缓存,将获取到的数据放入java队列中,利用缓存进行延时判断。
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.0.M2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>26.0-jre</version>
</dependency>
@KafkaListener(topics = {"jyz_xxxxxxx"})
public void jyz_xxxxxxx(ConsumerRecord<?, ?> record) throws InterruptedException {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
String SQL=null;
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
blockingQueue.offer(message.toString());
long startTime= System.currentTimeMillis();
long endTime = 0;
try {
endTime = fifoCache.get("endTime");
} catch (Exception e) {
endTime = System.currentTimeMillis();
fifoCache.put("endTime",endTime);
}
if(startTime-endTime>=2000){
List<String> list = new ArrayList<>();
// 转移阻塞队列数据到普通的List
blockingQueue.drainTo(list);
SQL = String.join("", list);
System.out.println("SQL:"+SQL);
endTime = System.currentTimeMillis();
fifoCache.put("endTime",endTime);
}
}
}
|