IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> springboot集成rabbitmmq多数据源,解决对源码不熟悉导致多个源出现同样队列,交换机等问题 -> 正文阅读

[Java知识库]springboot集成rabbitmmq多数据源,解决对源码不熟悉导致多个源出现同样队列,交换机等问题

多数据源配置类

/**
 *
 * @ClassName: RabbitMqMultiDataSourceConfig
 * @Description: 集成rabbitmq 多数据源
 * @author cqj
 * @date 2022/7/01
 */

@Configuration
public class RabbitMqMultiDataSourceConfig {


    @Bean(name = "secondConnectionFactory")
    public CachingConnectionFactory mqEquipControlConnectionFactory(
            @Value("${mq.second.host}") String host,
            @Value("${mq.second.port}") int port,
            @Value("${mq.second.username}") String username,
            @Value("${mq.second.password}") String password) {
        return connectionFactory(host, port, username, password);
    }

    public CachingConnectionFactory connectionFactory(String host, int port, String username, String password) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean(name = "secondFactory")
    public SimpleRabbitListenerContainerFactory secondFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setMaxConcurrentConsumers(20);
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(10);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "secondRabbitTemplate")
    public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // 这里的转换器设置实现了发送消息时自动序列化消息对象为message body
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean(name = "secondRabbitAdmin")
    public RabbitAdmin rabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(false);
        //设置忽略声明异常
        rabbitAdmin.setIgnoreDeclarationExceptions(true);
        return rabbitAdmin;
    }

    @Bean(name = "primaryConnectionFactory")
    @Primary
    public CachingConnectionFactory primaryConnectionFactory(
            @Value("${spring.rabbitmq.host}") String host,
            @Value("${spring.rabbitmq.port}") int port,
            @Value("${spring.rabbitmq.username}") String username,
            @Value("${spring.rabbitmq.password}") String password) {
        return connectionFactory(host, port, username, password);
    }

    @Bean(name = "primaryFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //当前的消费者数量
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        //是否重回队列
        factory.setDefaultRequeueRejected(true);
        return factory;
    }

    @Bean
    @Primary
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //设置忽略声明异常
        rabbitAdmin.setIgnoreDeclarationExceptions(true);
        return rabbitAdmin;
    }

}

上述代码就是springboot多数据源配置

集成多数据源RabbitAdmin这玩意大家不陌生吧,这里需要说明的是

如果RabbitAdmin不设置autoStartup属性(true/false)默认是true。会默认项目启动时候自动创建队列跟交换机。

源码位置
	/**
	 * If {@link #setAutoStartup(boolean) autoStartup} is set to true, registers a callback on the
	 * {@link ConnectionFactory} to declare all exchanges and queues in the enclosing application context. If the
	 * callback fails then it may cause other clients of the connection factory to fail, but since only exchanges,
	 * queues and bindings are declared failure is not expected.
	 *
	 * @see InitializingBean#afterPropertiesSet()
	 * @see #initialize()
	 */
	@Override
	public void afterPropertiesSet() {

		synchronized (this.lifecycleMonitor) {

			if (this.running || !this.autoStartup) {
				return;
			}

			if (this.retryTemplate == null && !this.retryDisabled) {
				this.retryTemplate = new RetryTemplate();
				this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(DECLARE_MAX_ATTEMPTS));
				ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
				backOffPolicy.setInitialInterval(DECLARE_INITIAL_RETRY_INTERVAL);
				backOffPolicy.setMultiplier(DECLARE_RETRY_MULTIPLIER);
				backOffPolicy.setMaxInterval(DECLARE_MAX_RETRY_INTERVAL);
				this.retryTemplate.setBackOffPolicy(backOffPolicy);
			}
			if (this.connectionFactory instanceof CachingConnectionFactory &&
					((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
				this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
				return;
			}

			// Prevent stack overflow...
			final AtomicBoolean initializing = new AtomicBoolean(false);

			this.connectionFactory.addConnectionListener(connection -> {

				if (!initializing.compareAndSet(false, true)) {
					// If we are already initializing, we don't need to do it again...
					return;
				}
				try {
					/*
					 * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
					 * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
					 * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
					 * declared for every connection. If anyone has a problem with it: use auto-startup="false".
					 */
					if (this.retryTemplate != null) {
						this.retryTemplate.execute(c -> {
							initialize();
							return null;
						});
					}
					else {
						initialize();
					}
				}
				finally {
					initializing.compareAndSet(true, false);
				}

			});

			this.running = true;

		}
	}
上面代码initialize()方法会去初始化队列交换机等。源码如下:
	/**
	 * Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
	 * (but unnecessary) to call this method more than once.
	 */
	@Override // NOSONAR complexity
	public void initialize() {

		if (this.applicationContext == null) {
			this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
			return;
		}

		this.logger.debug("Initializing declarations");
		Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
				this.applicationContext.getBeansOfType(Exchange.class).values());
		Collection<Queue> contextQueues = new LinkedList<Queue>(
				this.applicationContext.getBeansOfType(Queue.class).values());
		Collection<Binding> contextBindings = new LinkedList<Binding>(
				this.applicationContext.getBeansOfType(Binding.class).values());
		Collection<DeclarableCustomizer> customizers =
				this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();

		processDeclarables(contextExchanges, contextQueues, contextBindings);

		final Collection<Exchange> exchanges = filterDeclarables(contextExchanges, customizers);
		final Collection<Queue> queues = filterDeclarables(contextQueues, customizers);
		final Collection<Binding> bindings = filterDeclarables(contextBindings, customizers);

		for (Exchange exchange : exchanges) {
			if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
						+ exchange.getName()
						+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
						+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
						+ "reopening the connection.");
			}
		}

		for (Queue queue : queues) {
			if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
						+ queue.getName()
						+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
						+ queue.isExclusive() + ". "
						+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
						+ "alive, but all messages will be lost.");
			}
		}

		if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
			this.logger.debug("Nothing to declare");
			return;
		}
		this.rabbitTemplate.execute(channel -> {
			declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
			declareQueues(channel, queues.toArray(new Queue[queues.size()]));
			declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
			return null;
		});
		this.logger.debug("Declarations finished");

	}
上述源码就是在自动初始化队列跟交换机等。


这里需要说明一下多数据源一般情况都是在原有项目的基础在在添加信息rabbitmmq数据源,一般情况会默认自动创建队列,如果手动创建当我没说。

如果是多数据源情况第二数据源或者更多数据源配置信息RabbitAdmin类里面的autoStartup属性设置为false,这时候就不会自动创建队列跟交换机,需要手动取创建。这样就不会出现多个数据源初始化创建相同的队列跟交换机。

这里特别强调一下@RabbitListener等绑定队列交换机是不会创建的,如下代码第二数据源并不对创建队列

 /**
     * MQ接收刀具补偿消息
     *
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "test", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "testExchange", type = ExchangeTypes.DIRECT), key = "testRouting"
    ), containerFactory = "secondFactory")
    @RabbitHandler
    public void testMsg(Message msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            if (msg.getBody() != null) {
                Date date = new Date();
                String strJson = new String(msg.getBody());
                channel.basicAck(deliveryTag, false);
                log.error("MQ接收到刀具补偿消息:{}", msg);

            }
        } catch (Exception e) {
            log.error("MQ接收到刀具补偿消息失败,消息内容:{}", e);
            log.error("MQ接收到刀具补偿消息失败,消息内容:{}", new String(msg.getBody()));
        }
    }

如下是我写的一个第二数据源手动创建队列demo

/**
 * @ClassName: RabbitSecondQueueConfig
 * @Description: []
 * @Author: cqj
 * @Date: 2022/7/1
 * @Version 1.0
 */
@Configuration
public class RabbitSecondQueueConfig {
    @Resource(name = "secondRabbitAdmin")
    private RabbitAdmin secondRabbitAdmin;

    @Bean
    void EcuOperationLogQueueDirect() {
        Queue queue = new Queue("test",true);
        secondRabbitAdmin.declareQueue(queue);
        DirectExchange directExchange = new DirectExchange("testExchange",true,false);
        secondRabbitAdmin.declareExchange(directExchange);
        secondRabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with("testRouting"));
    }
}

然后写监听消费队列测试代码如下:

@RabbitListener(queues = "test", containerFactory = "secondFactory")
    @RabbitHandler
    public void testMsg(Message msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            if (msg.getBody() != null) {
                Date date = new Date();
                String strJson = new String(msg.getBody());
                channel.basicAck(deliveryTag, false);
                log.error("消息:{}", msg);

            }
        } catch (Exception e) {
            log.error("MQ接收消息失败,消息内容:{}", e);
            log.error("MQ接消息失败,消息内容:{}", new String(msg.getBody()));
        }
    }

如上代码均为测试代码,感兴趣的同伴可以自己测试,根据自己情况来更改,不过大体思路就是这样。

本人技术有限,以前没仔细研究源码,所以一直困惑着。大佬不喜勿喷,哪里不足请多指点,小弟我会十分感谢。

 

                
        
        
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-07-03 10:36:15  更:2022-07-03 10:38:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 17:07:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码