? ? ? ? 利用spring的注解用法和amqp提供的依赖接口,实现一个简单的消息收发功能。
1.准备好MQ环境与依赖
安装RabbitMQ(已省略...)及引入相关依赖。

2.配置MQ配置文件
application.yml:
??
RabbitMQ配置:
package com.business.config;
@Configuration
@Component
public class RabbitMqConfig {
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Value("${rabbitmq.virtual-host}")
private String virtualHost;
@Value("${rabbitmq.port}")
private int port;
@Value("${rabbitmq.publisher-returns}")
private boolean publisherReturns;
@Value("${rabbitmq.publisher-confirm}")
private boolean publisherConfirm;
/**
* 接收数据接口队列
*
* @return
*/
@Bean
public Queue loginLogQueue() {
return new Queue(MqConst.QUEUE_LOGINLOG, true);
}
/**
* 接收数据接口交换机
*
* @return
*/
@Bean
public DirectExchange loginLogExchange() {
return new DirectExchange(MqConst.QUEUE_LOGINLOG + MqConst.EXCHANGE_SUF, true, false);
}
/**
* 接收数据接口绑定
*
* @return
*/
@Bean
public Binding loginLogBinding() {
return BindingBuilder.bind(loginLogQueue()).to(loginLogExchange()).with(MqConst.QUEUE_LOGINLOG + MqConst
.ROUTING_KEY_SUF);
}
/**
* RabbitMQConfig中定义connectionFactory中设置属性
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
// cachingConnectionFactory.setAddresses(this.addresses);
cachingConnectionFactory.setUsername(this.username);
cachingConnectionFactory.setPassword(this.password);
cachingConnectionFactory.setVirtualHost(this.virtualHost);
cachingConnectionFactory.setPort(this.port);
// 如果消息要设置成回调,则以下的配置必须要设置成true
cachingConnectionFactory.setPublisherConfirms(this.publisherConfirm);
cachingConnectionFactory.setPublisherReturns(this.publisherReturns);
return cachingConnectionFactory;
}
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 开启Mandatory,此时才会触发回调函数,无论消息推送结果如何都会强制调用回调函数
rabbitTemplate.setMandatory(true);
//application.yml中设置publisher-confirm-type: correlated时可用
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("发送情况:"+correlationData+"|"+ack+"|"+cause);
//当 setConfirmCallback中的ack=true并且不调用setReturnCallback时,说明消息发送成功
if(!ack){
System.out.println("发送失败,原因是:"+cause);
}else {
System.out.println("发送成功");
}
});
//application.yml中设置publisher-returns: true时可用
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("返回信息:"+message+"&&&"+replyCode+replyText+"&&&"+exchange+"&&&"+routingKey);
});
return rabbitTemplate;
}
}
3.创建生产者和消费者的工具类及实现类
生产者工具类:
package com.business.util;
/**
* @Author Lee
* @Date 2021/5/18 15:28
*/
public class ProducerUtil {
/**
* 获取连接
*
*/
public static Connection getConnection(){
//新建连接工厂类
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机地址信息
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
//用户信息
connectionFactory.setUsername("admin");
connectionFactory.setPassword("");
connectionFactory.setConnectionTimeout(600000);
connectionFactory.setVirtualHost("/");
Connection connection = null;
try
{
connection = connectionFactory.newConnection();
}catch (Exception e)
{
e.printStackTrace();
}
return connection;
}
/**
* 建立通道
*
*/
public static Channel getChannel(Connection connection, String queueName, String exchangeName) throws IOException
{
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(exchangeName,"direct",true);
//声明队列
channel.queueDeclare(queueName,true,false,false,null);
//绑定交换机与队列
channel.queueBind(queueName,exchangeName,queueName);
//同一时刻发送消息的数量
channel.basicQos(1);
return channel;
}
/**
* 发送消息
*
*/
public static void sendMessage(Channel channel,String queueName,String exchangeName,String msg) throws IOException
{
System.out.println(new Date()+"--->当前发送的消息是 :"+msg);
channel.basicPublish(exchangeName,queueName,null, msg.getBytes());
System.out.println("发送结束");
}
/**
* 关闭资源
*
*/
public static void closeResource(Connection connection,Channel channel){
try
{
if (channel.isOpen()) {
channel.abort();
}
if (connection.isOpen()) {
connection.close();
}
}
catch (Exception e)
{
System.out.println("-----------------------"+e);
}
}
}
消费者工具类:
package com.business.util;
/**
* @Author Lee
* @Date 2021/5/18 16:32
*/
public class ConsumerUtil {
public static void handleMessage(Channel channel, Message message) throws IOException, IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println(new Date()+"--->当前收到的消息是 :"+msg);
//消息数组(多条数据)todo
System.out.println("接收结束");
//ACK参数1:消息标签,参数2:是否确认多条消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
?生产者实现类:
package com.business.mq;
@Component
@Service
public class LoginPublisher {
public String sendMessage() throws IOException {
//获取MQ的连接、通道信息
Connection connection = ProducerUtil.getConnection();
Channel channel = ProducerUtil.getChannel(connection,MqConst.QUEUE_LOGINLOG,MqConst.QUEUE_LOGINLOG+MqConst.EXCHANGE_SUF);
//组装消息体
JSONObject jsonObject = new JSONObject();
jsonObject.put("system_name","你好兔子MQ");
jsonObject.put("login_count","201");
//发送消息
ProducerUtil.sendMessage(channel,MqConst.QUEUE_LOGINLOG,MqConst.QUEUE_LOGINLOG+MqConst.EXCHANGE_SUF,jsonObject.toJSONString());
//关闭资源
ProducerUtil.closeResource(connection,channel);
return "success";
}
}
消费者实现类:
package com.business.mq;
@Component
@Slf4j
public class TestReceiver {
@RabbitListener(queues = MqConst.QUEUE_LOGINLOG)
@RabbitHandler
public String receiverMessage(Channel channel, Message message) throws IOException {
//接收消息
ConsumerUtil.handleMessage(channel,message);
return "success";
}
}
4.测试收发联通状态
查看日志打印:

|