因为公司老旧框架需要使用rabbitmq,但是因为业务逻辑复杂升级springboot难度大,时间不够只能使用xml方式引入rabbitmq
首先引入jar
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
spring使用的版本是5.1.9.RELEASE
创建xml文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 定义连接工厂 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="#{rabbitmq.rabbitmq_virtual}"
host="#{rabbitmq.rabbitmq_host}" port="#{rabbitmq.rabbitmq_port}" username="#{rabbitmq.rabbitmq_username}" password="#{rabbitmq.rabbitmq_password}" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- 序列化mq数据
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 定义交换机绑定队列(路由模式)使用匹配符
<rabbit:topic-exchange id="tspTboxExchange" name="tspTboxExchange">
<rabbit:bindings>
<rabbit:binding queue="tspTboxBQueue" pattern="#" />
</rabbit:bindings>
</rabbit:topic-exchange>
-->
<!-- 定义消费者 -->
<bean id="TspConsumer" class="com.dayunmotor.tbox.gateway.business.mq.TspConsumer" />
<!-- 定义队列 -->
<rabbit:queue name="tspTboxBQueue" auto-declare="true" durable="true" />
<!-- 定义消费者监听队列 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener ref="TspConsumer" queues="tspTboxBQueue" />
</rabbit:listener-container>
</beans>
本文未写全xml配置,如需要可以进xsd文件查看
创建消费者
@Component
public class TspConsumer implements MessageListener {
private TspMQController tspMQController;
@Override
public void onMessage(Message message) {
String body="";
try{
body=new String(message.getBody(),"utf-8");
log.info("接收到消息:{}",body);
tspMQController.processMQ(body);
}catch(Exception e){
log.error("执行失败");
}
}
@Autowired
public void setTspMQController(TspMQController tspMQController) {
this.tspMQController = tspMQController;
}
}
也可以实现ChannelAwareMessageListener接口
创建生产者
@Component
public class RabbitMqProducer {
private static final Logger logger= LoggerFactory.getLogger(RabbitMqProducer.class);
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String exchangeKey,String routingkey,Object message){
logger.info("exchangeKey{} routingkey{}to send message:{}",exchangeKey,routingkey,message);
rabbitTemplate.convertAndSend(exchangeKey,routingkey,message);
}
}
然后启动就可以实现消费和发送消息了。 启动rabbitmq监听器可以阻塞主线程一直监听数据; 本人经多次验证无法使用注解版接收mq消息,故放弃使用注解版;
|