IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> springboot kafka 同一服务的多个实例,如何设置成不同的消费组group. 动态group -> 正文阅读

[大数据]springboot kafka 同一服务的多个实例,如何设置成不同的消费组group. 动态group

问题场景:我的一个微服务A会在本地缓存一些业务配置数据,配置更新时由相关服务B发送

问题场景:

我的一个微服务A会在本地缓存一些业务配置数据,配置更新时由相关服务B发送一个变动消息。A收到消息更新本地缓存。那么问题来了,同一个服务A的多个实例a1,a2如何多次消费同一个topic消息

解决方案:

通过redis setif 加代码动态配置groupID、不同实例获取不动groupID。启动的时候会配置kafka消费工厂ConsumerFactory 这个时候生成groupID
setIfAbsent()redis。如果失败就继续生成直到成功

配置代码如下:

package com.jieshun.open.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @Description
 * @Date 2021-7-23 16:04
 * @Created by yyk
 */
@Slf4j
@Component
public class KafkaConsumerConfig {

        @Value("${spring.kafka.bootstrap-servers}")
        private String BROKERS;
        @Value("${spring.kafka.consumer.enable-auto-commit}")
        private Boolean ENABLE_AUTO_COMMIT;
        @Value("${spring.kafka.consumer.auto-commit-interval-ms}")
        private String AUTO_COMMIT_INTERVAL_MS;
        @Value("${spring.kafka.consumer.session-timeout-ms}")
        private Integer SESSION_TIMEOUT_MS;
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        private String AUTO_OFFSET_RESET;
        @Value("${spring.kafka.consumer.group-id}")
        private String GROUP_ID;
        @Value("${spring.kafka.consumer.max-poll-records}")
        private String MAX_POLL_RECORDS;

        /**缓存名称前缀*/
        private final String CACHE_GROUP_NAME_PREFIX = "jop:gateway:group:";

         private String CURRENT_INSTANCE_GROUP_ID;

    @Autowired
    private StringRedisTemplate redisTemplate;
        /** 线程池,为了实现分组名称续租服务*/
        private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);


        /**构建kafka监听工厂*/
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }


        /**通过redis限制获取的分组名称*/
        public String getSerializeGroupId(Integer currValue){
            String key = CACHE_GROUP_NAME_PREFIX.concat(currValue.toString());
            boolean b = redisTemplate.opsForValue().setIfAbsent(key, currValue.toString());
            if(b){
                return GROUP_ID.concat(currValue.toString());
            }else{
                currValue++;
                return getSerializeGroupId(currValue);
            }
        }


        /**初始化消费工厂配置 其中会动态指定消费分组*/
        private ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> properties = new HashMap<String, Object>();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT);
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS);
            properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            /**多实例部署每个实例设置不同groupId 实现发布订阅*/
            CURRENT_INSTANCE_GROUP_ID = getSerializeGroupId(0);
            log.info("当前实例 group_id:{}",CURRENT_INSTANCE_GROUP_ID);
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, CURRENT_INSTANCE_GROUP_ID);
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
            return new DefaultKafkaConsumerFactory<String, String>(properties);
        }



    }

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-24 11:33:51  更:2021-07-24 11:34:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 8:20:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码