kafka学习记录 1.生产者
@Autowired
KafkaTemplate kafkaTemplate;
public boolean send(Map<String,Object> data){
try {
String topic = "kafka-topic";
String message = JSONUtil.toJsonStr(data);
if (log.isInfoEnabled()) {
log.info("向{}发送统计消息:{}", topic, message);
}
kafkaTemplate.send(topic, message);
return true;
} catch (Exception e) {
String errorInfo = String.format("数据统计发送Kafka异常:%s\n%s", e.getMessage(), JSONUtil.toJsonStr(vo));
log.error(errorInfo);
return false;
}
}
2.消费者 监听topic
@Component
public class Consumer(){
@KafkaListener(topics = { "kafka-topic" })
public void listen(ConsumerRecord<?, ?> record) {
String recordStr = record.value().toString();
JSONObject jsonObject = JSONUtil.parseObj(recordStr);
...
}
}
|