IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 【Kafka】Spring-kafka 生产者消费者代码实现 -> 正文阅读

[Java知识库]【Kafka】Spring-kafka 生产者消费者代码实现

Spring-kafka 配置是以 spring.kafka 为前缀的配置信息进行注入的,按照这种形式定义的 kafka 配置信息,在项目启动时会自动读取并配置到 kafka 实例中。当然也可以在配置文件中自定义配置的名称,对应的则需要手动封装配置信息的映射表,并创建相应的 Factory 和 KafkaTemplate 容器。

添加启动 kafka 需要的基本配置:

# kafka producer
kafka.servers=127.0.0.1:9092
kafka.producer.topic=kafka-demo-queue
kafka.producer.sasl.username=demo
kafka.producer.sasl.password=demo

# kafka consumer
kafka.consumer.topic=kafka-demo-queue
kafka.consumer.group.id=kafka-demo-group
kafka.consumer.sasl.username=demo
kafka.consumer.sasl.password=demo

这里的 demo 是连接同一个 kafka 服务器(集群),因此 server ip 用的同一个,而且是自己生产消息,自己消费,所以使用同一个 topic。group id 是消费方用来筛选目标消息用的。kafka 服务器认证的账号、密码则是由服务器配置,控制业务方权限。

然后就是配置 kafka 生产者和消费者的启动配置,由于大部分配置都有其默认配置,所以这里只展示必须的一些配置项:

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    
    @Value("${kafka.servers}")
    private String servers;
    @Value("${kafka.producer.sasl.username}")
    private String userName;
    @Value("${kafka.producer.sasl.password}")
    private String password;
    
    @Bean
    public KafkaProducer<String, String> initKafkaProducer() {
        return new KafkaProducer<>(kafkaProducerConfig());
    }
    
    private Map<String, Object> kafkaProducerConfig() {
        Map<String, Object> props = new HashMap<>(8);
        // kafka 服务器地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // 消息序列化类型
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 认证信息校验相关
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        return props;
    }
}
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    
    @Value("${kafka.servers}")
    private String servers;
    @Value("${kafka.consumer.topic}")
    private String topic;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consume.sasl.username}")
    private String userName;
    @Value("${kafka.consume.sasl.password}")
    private String password;
    
    @Bean
    public KafkaConsumer<String, String> initKafkaConsumer() {
        KafkaConsumer<String, String> consumer = new KafkaProducer<>(kafkaConsumerConfig());
        consumer.subscribe(topic);
        return consumer;
    }
    
    private Map<String, Object> kafkaConsumerConfig() {
        Map<String, Object> props = new HashMap<>(8);
        // kafka 服务器地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // 消息序列化类型
        props.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 认证信息校验相关
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        return props;
    }
}

生产者发送

// 异步获取发送结果
template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
			@Override
			public void onFailure(Throwable throwable) {
				......
			}

			@Override
			public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
				....
			}
		});


// 同步获取发送结果
ListenableFuture<SendResult<Object,Object>> future = template.send("demo-topic","sync message send!");
try {
    SendResult<Object,Object> result = future.get();
}catch (Throwable e){
    e.printStackTrace();
}

消费者监听

  • 直接消费字符串内容
@KafkaListener(id = "demo_group_id", topics = "demo_topic")
public void consume(String msg) {
	logger.info("Consume message is: {}" , msg);
}
  • 手动 ack
@KafkaListener(id = "demo_group_id", topics = "demo_topic")
public String consume(String msg, Acknowledgment ack) {
    logger.info("input value: {}", msg);
    ack.acknowledge();
    return "successful";
}
  • 消费整个消息记录(包含分组、偏移量等)
@KafkaListener(topics = "demo_topic", containerFactory = "demoKafkaListenerContainerFactory")
public void consume(ConsumerRecord<String, String> record) {
    logger.info("Consume record: partition-[{}], offset-[{}]", record.partition(), record.offset());
	logger.info("Consume message is {}: {}.", record.key(), record.value());
}
  • 配置并发数,获取消费实例
@KafkaListener(topics = "demo_topic", containerFactory = "demoKafkaListenerContainerFactory", concurrency = "3")
public void consume(ConsumerRecord<String, String> record, 
                   @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, 
                   Consumer<String, String> consumer) {
	logger.info("Consume record: partition-[{}], offset-[{}]", record.partition(), record.offset());
    logger.info("Consume message is {}: {}.", record.key(), record.value());
    consumer.commitSync();
}

接收回复的生产发送

ReplyingKafkaTemplate 继承了父类 KafkaTemplate,在这个基础上增加了 sendAndReceive 方法,实现了在消息发送的同时,接收消费方回复的功能。方法的具体定义为:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

其生产者配置在基本的 producerTemplate 基础下,补充下列的配置:

@Configuration
public class KafkaProducerConfig {
	@Bean
	public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
		ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("send_and_reply");
		repliesContainer.getContainerProperties().setGroupId("reply_group");
		repliesContainer.setAutoStartup(false);
		return repliesContainer;
	}

	@Bean
	public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> factory, ConcurrentMessageListenerContainer<String, String> container) {
		return new ReplyingKafkaTemplate(factory, container);
	}

    // ...... KafkaTemplate ......
}

完成相关配置后,就可以写一个简单的 demo 验证一下它的功能:

@RestController
public class KafkaController {

	@Autowired
	private ReplyingKafkaTemplate template;

	@GetMapping("/kafka/send_and_receive")
	@Transactional(rollbackFor = RuntimeException.class)
	public void sendAndReceive(String input) throws Exception {
		ProducerRecord<String, String> record = new ProducerRecord<>("demo-topic", input);
		RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
		ConsumerRecord<String, String> consumerRecord = replyFuture.get();
		System.out.println("Reply message: " + consumerRecord.value());
	}

	@KafkaListener(id = "reply_group", topics = "demo-topic")
	@SendTo
	public String listen(String input) {
		logger.info("input value: {}", input);
		return "successful";
	}
}

Kafka 事务消息

默认情况下,Spring-kafka通过配置等方式启动了相关的 kafka 生产和消费服务,KafkaTemplate 实例是不具有事务消息处理能力的。如果需要支持事务特性,可以通过添加特定配置来激活,但是必须注意的是,一旦事务特性被激活,所有的消息发送逻辑都需要封装在事务方法内执行,否则会抛出无事务交易的异常。

Spring-kafka 的事务是基于 Kafka-client 的事务消息功能实现的,我们可以通过配置激活 kafka 消息事务特性:

kafka.producer.transaction-id-prefix=kafka_tx

事务的概念就是,当所有的流程都执行完成了才算成功,中途任何一步出现异常了,前面执行的操作都进行回滚。对于 kafka 来说,只有当事务内的消息发送动作都完成了,消费端才能接收到消息。

生产者使用事务消息的方式有两种:

// 方式一:使用模板实现的事务方法
@Autowired
private KafkaTemplate<String, Object> template;

public void sendTransactionDemo(String input) {
    template.executeInTransaction(t -> {
        t.send("demo-topic", input);
        if (input == null || "".equals(input)) {
            throw new RuntimeException("invalid kafka input!");
        }
        t.send("demo-topic", "Second time sending: " + input);
    })
}

// 方式二:事务注解 @Transactional
@Transactional(rollbackFor = RuntimeException.class)
public void sendTransactionDemo(String input) {
    t.send("demo-topic", input);
    if (input == null || "".equals(input)) {
        throw new Exception("invalid kafka input!");
    }
    t.send("demo-topic", "Second time sending: " + input);
}


  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-06-06 17:11:36  更:2022-06-06 17:12:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 20:37:43-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码