IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Springboot实现RabbitMQ的简单收发 -> 正文阅读

[大数据]Springboot实现RabbitMQ的简单收发

? ? ? ? 利用spring的注解用法和amqp提供的依赖接口,实现一个简单的消息收发功能。


1.准备好MQ环境与依赖

安装RabbitMQ(已省略...)及引入相关依赖。

2.配置MQ配置文件

application.yml:

??

RabbitMQ配置:

package com.business.config;

@Configuration
@Component
public class RabbitMqConfig {

    @Value("${rabbitmq.username}")
    private String username;
    @Value("${rabbitmq.password}")
    private String password;
    @Value("${rabbitmq.virtual-host}")
    private String virtualHost;
    @Value("${rabbitmq.port}")
    private int port;
    @Value("${rabbitmq.publisher-returns}")
    private boolean publisherReturns;
    @Value("${rabbitmq.publisher-confirm}")
    private boolean publisherConfirm;

    /**
     * 接收数据接口队列
     *
     * @return
     */
    @Bean
    public Queue loginLogQueue() {
        return new Queue(MqConst.QUEUE_LOGINLOG, true);
    }

    /**
     * 接收数据接口交换机
     *
     * @return
     */
    @Bean
    public DirectExchange loginLogExchange() {
        return new DirectExchange(MqConst.QUEUE_LOGINLOG + MqConst.EXCHANGE_SUF, true, false);
    }

    /**
     * 接收数据接口绑定
     *
     * @return
     */
    @Bean
    public Binding loginLogBinding() {
        return BindingBuilder.bind(loginLogQueue()).to(loginLogExchange()).with(MqConst.QUEUE_LOGINLOG + MqConst
                .ROUTING_KEY_SUF);
    }
    
    /**
     * RabbitMQConfig中定义connectionFactory中设置属性
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
//        cachingConnectionFactory.setAddresses(this.addresses);
        cachingConnectionFactory.setUsername(this.username);
        cachingConnectionFactory.setPassword(this.password);
        cachingConnectionFactory.setVirtualHost(this.virtualHost);
        cachingConnectionFactory.setPort(this.port);
        // 如果消息要设置成回调,则以下的配置必须要设置成true
        cachingConnectionFactory.setPublisherConfirms(this.publisherConfirm);
        cachingConnectionFactory.setPublisherReturns(this.publisherReturns);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
//        开启Mandatory,此时才会触发回调函数,无论消息推送结果如何都会强制调用回调函数
        rabbitTemplate.setMandatory(true);
//application.yml中设置publisher-confirm-type: correlated时可用
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("发送情况:"+correlationData+"|"+ack+"|"+cause);
            //当 setConfirmCallback中的ack=true并且不调用setReturnCallback时,说明消息发送成功
            if(!ack){
                System.out.println("发送失败,原因是:"+cause);
            }else {
                System.out.println("发送成功");
            }
        });
//application.yml中设置publisher-returns: true时可用
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("返回信息:"+message+"&&&"+replyCode+replyText+"&&&"+exchange+"&&&"+routingKey);
        });
        return rabbitTemplate;
    }
}

3.创建生产者和消费者的工具类及实现类

生产者工具类:

package com.business.util;

/**
 * @Author Lee
 * @Date 2021/5/18 15:28
 */
public class ProducerUtil {

    /**
     * 获取连接
     *
     */
    public static Connection getConnection(){
        //新建连接工厂类
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机地址信息
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        //用户信息
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("");
        connectionFactory.setConnectionTimeout(600000);
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        try
        {
            connection = connectionFactory.newConnection();
        }catch (Exception e)
        {
            e.printStackTrace();
        }
        return connection;
    }

    /**
     * 建立通道
     *
     */
    public static Channel getChannel(Connection connection, String queueName, String exchangeName) throws IOException
    {
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(exchangeName,"direct",true);
        //声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        //绑定交换机与队列
        channel.queueBind(queueName,exchangeName,queueName);
        //同一时刻发送消息的数量
        channel.basicQos(1);
        return channel;
    }

    /**
     * 发送消息
     *
     */
    public static void sendMessage(Channel channel,String queueName,String exchangeName,String msg) throws IOException
    {
        System.out.println(new Date()+"--->当前发送的消息是 :"+msg);
        channel.basicPublish(exchangeName,queueName,null, msg.getBytes());
        System.out.println("发送结束");
    }

    /**
     * 关闭资源
     *
     */
    public static void closeResource(Connection connection,Channel channel){
        try
        {
            if (channel.isOpen()) {
                channel.abort();
            }

            if (connection.isOpen()) {
                connection.close();
            }
        }
        catch (Exception e)
        {
            System.out.println("-----------------------"+e);
        }
    }

}

消费者工具类:

package com.business.util;

/**
 * @Author Lee
 * @Date 2021/5/18 16:32
 */
public class ConsumerUtil {
    public static void handleMessage(Channel channel, Message message) throws IOException, IOException {
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println(new Date()+"--->当前收到的消息是 :"+msg);
        //消息数组(多条数据)todo

        System.out.println("接收结束");
        //ACK参数1:消息标签,参数2:是否确认多条消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

?生产者实现类:

package com.business.mq;

@Component
@Service
public class LoginPublisher {

    public String sendMessage() throws IOException {
        //获取MQ的连接、通道信息
        Connection connection = ProducerUtil.getConnection();
        Channel channel = ProducerUtil.getChannel(connection,MqConst.QUEUE_LOGINLOG,MqConst.QUEUE_LOGINLOG+MqConst.EXCHANGE_SUF);
        //组装消息体
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("system_name","你好兔子MQ");
        jsonObject.put("login_count","201");
        //发送消息
        ProducerUtil.sendMessage(channel,MqConst.QUEUE_LOGINLOG,MqConst.QUEUE_LOGINLOG+MqConst.EXCHANGE_SUF,jsonObject.toJSONString());
        //关闭资源
        ProducerUtil.closeResource(connection,channel);
        return "success";
    }

}

消费者实现类:

package com.business.mq;

@Component
@Slf4j
public class TestReceiver {
    @RabbitListener(queues = MqConst.QUEUE_LOGINLOG)
    @RabbitHandler
    public String receiverMessage(Channel channel, Message message) throws IOException {
        //接收消息
        ConsumerUtil.handleMessage(channel,message);
        
        return "success";
    }
}

4.测试收发联通状态

查看日志打印:

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-07 12:09:17  更:2021-08-07 12:10:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 18:27:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码