IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Kafka,RabbitMQ,RockedMQ实际应用大汇总2.RabbitMQ,RocketMQ -> 正文阅读

[Java知识库]Kafka,RabbitMQ,RockedMQ实际应用大汇总2.RabbitMQ,RocketMQ

Kafka,RabbitMQ,RockedMQ实际应用大汇总2.RocketMQ&RabbitMQ

  • 本文结合官网用例,记载了三大主流mq的实例以及实际运用。
  • 本文不涉及相关环境的安装与配置,涉及较为全面的代码(包括配置文件及maven)
  • 本文直接上代码及用例,适合对mq已经学过一遍或了解过的同学进行学习&复习,可加入生产环境。
  • 关于各种mq的介绍及对比可以参照我之前的文章
  • 其实mq的设计思想都差不多,可以细细感受一下mq的设计理念以及基本的服务对象。
  • 如果有问题,欢迎留言区或私信进行交流。
  • 虽然已经挑重点代码拿出来了,但是一篇文章里写三个mq还是很长,因此还是分为三篇记录吧,这是第二篇,主讲RabbitMQ,RocketMQ。

二:RocketMQ

推荐阅读官方example,讲的非常非常详细了,按照同步异步订阅等等等类别都分好了,阅读example的体验感也很好。
官方example

三:RabbitMQ

官方example同样给的非常详细,但有些并不常用。但官网上的图绝对是学习与理解的利器。
官方example

  • 路由模式(工作队列)
    • (消费者)监听者
      在这里插入图片描述
      在这里插入图片描述
package net.xdclass.direct;

import com.rabbitmq.client.*;

import java.nio.charset.StandardCharsets;

public class Recv{

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.122.108");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);

        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();

        //绑定队列和交换机,fanout交换机不用指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");



        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    }
}
  • (生产者)消息的发送端
package net.xdclass.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.122.108");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        /**
         * 消息生产者不用过多操作,只需要和交换机绑定即可
         */
        try (//创建连接
             Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {

            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            String error = "我是订单服务的error日志";
            String info = "我是订单服务的info日志";
            String debug = "我是订单服务的debug日志";

            channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));

            System.out.println("发送成功");

        }
    }
}
  • 订阅发布模式
    在这里插入图片描述
  • 订阅者(消息消费者)
package net.xdclass.pub;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Recv{

   private final static String EXCHANGE_NAME = "exchange_fanout";

   public static void main(String[] argv) throws Exception {
       ConnectionFactory factory = new ConnectionFactory();
       factory.setHost("47.98.122.108");
       factory.setUsername("admin");
       factory.setPassword("123456");
       factory.setVirtualHost("/dev");
       factory.setPort(5672);

       //消费者一般不增加自动关闭
       Connection connection = factory.newConnection();
       Channel channel = connection.createChannel();

       //绑定交换机,fanout扇形,即广播类型
       channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);

       //获取队列(排它队列)
       String queueName = channel.queueDeclare().getQueue();

       //绑定队列和交换机,fanout交换机不用指定routingkey
       channel.queueBind(queueName,EXCHANGE_NAME,"");


       DeliverCallback deliverCallback = (consumerTag, delivery) -> {
           String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
           System.out.println(" [x] Received '" + message + "'");
       };

       //自动确认消息
       channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

   }
}
  • 发布者(消息生产者)
package net.xdclass.pub;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.122.108");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        /**
         * 消息生产者不用过多操作,只需要和交换机绑定即可
         */
        try (//创建连接
             Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {

            //绑定交换机,fanout扇形,即广播类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

            String message = "Hello World pub !";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");

        }
    }
}
  • 主题模式
    在这里插入图片描述
  • 消费者
package net.xdclass.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Recv2 {

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.122.108");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");



        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body=" + new String(body, StandardCharsets.UTF_8));

                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };

        //消费,关闭消息自动确认
        channel.basicConsume(queueName,false,consumer);

    }
}
  • 生产者
package net.xdclass.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.122.108");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        /**
         * 消息生产者不用过多操作,只需要和交换机绑定即可
         */
        try (//创建连接
             Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {

            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String error = "我是订单服务的error日志";
            String info = "我是订单服务的info日志";
            String debug = "我是订单服务的debug日志";

            channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));

            System.out.println("topic发送成功");



        }
    }
}
交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词
一般用#作为通配符居多,比如 #.order, 会匹配 info.order 、sys.error.order,
 而 *.order ,只会匹配 info.order, 之间是使用. 点进行分割多个词的; 
 如果是 ., 则info.order、error.order都会匹配
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-09-12 20:37:43  更:2021-09-12 20:38:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 16:57:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码