IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> .net RabbitMQ的几种工作模式 -> 正文阅读

[大数据].net RabbitMQ的几种工作模式

.net RabbitMQ的几种工作模式

查看官网https://www.rabbitmq.com/getstarted.html可以看到RabbitMQ的几种工作模式
在这里插入图片描述

“Hello World”

在这里插入图片描述

一个生产者一个队列一个消费者,生产者生产消息放入队列,消费者从队列中接收消息消费
最基本的模式

demo

代码演示见这篇文章

“Work queues”

在这里插入图片描述

一个生产者一个队列多个消费者,生产者生产消费放入队列,消费者们从队列中接收消息消费,消费者之间存在竞争关系,也就是说,一个消息,一旦被A消费者消费,就不能被B消费者消费了。
这样工作模式的好处是,假设一个生产者1s生产2000条消息,而一个消费者1s只能消费1000条消息,为了提高消费的速度,可以设置多个消费者来对同一个队列进行消费

demo

生产者

using System;
using RabbitMQ.Client;
using System.Text;

namespace RabbitMQSender
{
  public class Send
  {
    public static void Main()
    {
      // 创建连接工厂ConnectionFactory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建连接
      using (IConnection connection = connectionFactory.CreateConnection())
      //创建管道
      using (IModel channel = connection.CreateModel())
      {
        //创建队列
        channel.QueueDeclare(queue: "queue",
                           durable: false, //是否持久化
                           exclusive: false, //是否独占
                           autoDelete: false, //是否自动删除
                           arguments: null); //参数

        //创建消息
        string message = "Hello World!";

        for (int i = 1; i <= 10; i++)
        {
          //发布消息
          channel.BasicPublish(exchange: "", //指定采用哪个交换机
                               routingKey: "queue", //路由键
                               basicProperties: null,
                               body: Encoding.Default.GetBytes($"第{i}条消息:" + message));
          Console.WriteLine(" [x] Sent {0}", message);
        }
      }
    }
  }
}

消费者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive
  {
    public static void Main()
    {
      //创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建默认连接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //创建管道
        using (IModel channel = connection.CreateModel())
        {
          //创建队列
          channel.QueueDeclare(
            queue: "queue",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //创建消费者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定义回调事件,每次接收消息时触发
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消费消息
          channel.BasicConsume(queue: "queue",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}

消费者2

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive2
  {
    public static void Main()
    {
      //创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建默认连接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //创建管道
        using (IModel channel = connection.CreateModel())
        {
          //创建队列
          channel.QueueDeclare(
            queue: "queue",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //创建消费者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定义回调事件,每次接收消息时触发
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消费消息
          channel.BasicConsume(queue: "queue",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}

在这里插入图片描述
先运行消费者1和消费者2
再运行生产者,发现消费者1接收到了第1、3、5、7、9条消息,消费者2接收到了第2、4、6、8、10条消息

“Publish/Subscribe”

在这里插入图片描述

一个生产者多个队列多个消费者,一个队列对应一个消费者,生产者生产的消息将放进每个队列
消费者只关心它的队列并将其中的消息消费,消费者之间不存在竞争关系。

如果不同的消费者收到同一个消息后需要执行不同的操作,例如消费者A接收到X消息后需要在控制台打印,而消费者B接收到X消息后需要将X消息存入数据库。像这种情形下,"Work queues"模式肯定是行不通的,因为"Work queues"中的消息只能被一个消费者处理,这意味着消费者A接收到X消息后就会把X消息消费掉了,那么消费者B就接收不到X消息了

demo

要实现这种模式,需要将交换机模式设置为扇形模式(ExchangeType.Fanout),这样它就会将消息分发到每一个与之绑定的队列中

生产者

using System;
using RabbitMQ.Client;
using System.Text;

namespace RabbitMQSender
{
  public class Send
  {
    public static void Main()
    {
      // 创建连接工厂ConnectionFactory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建连接
      using (IConnection connection = connectionFactory.CreateConnection())
      //创建管道
      using (IModel channel = connection.CreateModel())
      {
        //创建交换机
        string exchange = "exchange1";
        //交换机采用扇形模式
        channel.ExchangeDeclare(exchange, ExchangeType.Fanout, false);

        //创建队列1
        channel.QueueDeclare(queue: "queue1",
                           durable: false, //是否持久化
                           exclusive: false, //是否独占
                           autoDelete: false, //是否自动删除
                           arguments: null); //参数
        //创建队列2
        channel.QueueDeclare(queue: "queue2",
                           durable: false, //是否持久化
                           exclusive: false, //是否独占
                           autoDelete: false, //是否自动删除
                           arguments: null); //参数

        //如果交换机采用扇形模式,则routeKey置为""
        channel.QueueBind("queue1", exchange, "");
        channel.QueueBind("queue2", exchange, "");

        //创建消息
        string message = "Hello World!";

        for (int i = 1; i <= 10; i++)
        {
          //发布消息
          channel.BasicPublish(exchange: exchange, //指定采用哪个交换机
                               routingKey: "", //路由键
                               basicProperties: null,
                               body: Encoding.Default.GetBytes($"第{i}条消息:" + message));
          Console.WriteLine(" [x] Sent {0}", message);
        }
      }
    }
  }
}

消费者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive
  {
    public static void Main()
    {
      //创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建默认连接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //创建管道
        using (IModel channel = connection.CreateModel())
        {
          //创建队列
          channel.QueueDeclare(
            queue: "queue1",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //创建消费者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定义回调事件,每次接收消息时触发
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消费消息
          channel.BasicConsume(queue: "queue1",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}

消费者2

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive2
  {
    public static void Main()
    {
      //创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建默认连接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //创建管道
        using (IModel channel = connection.CreateModel())
        {
          //创建队列
          channel.QueueDeclare(
            queue: "queue2",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //创建消费者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定义回调事件,每次接收消息时触发
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消费消息
          channel.BasicConsume(queue: "queue2",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}

先运行消费者1和消费者2
再运行生产者,发现消费者1和消费者2都接收到了生产者生产的10条消息
在这里插入图片描述

“Routing”

在这里插入图片描述

一个生产者多个队列多个消费者,一个队列对应一个消费者,生产者生产的消息不是简单的放入每个队列,而是根据不同的条件,由交换机来决定将消息放入哪个消息队列中。
例如不太重要的消息只需要在控制台打印即可,不需要存数据库了,
而重要的消息既需要在控制台打印,还需要存数据库

demo

要实现这种模式,需要将交换机模式设置为直连模式(ExchangeType.Direct),
并配置路由规则,也就是指定哪个routingKey对应哪个消息队列
在发布消息时需要指定routingKey,这样交换机就能根据路由规则来进行消息的分发了

生产者

using System;
using RabbitMQ.Client;
using System.Text;

namespace RabbitMQSender
{
  public class Send
  {
    public static void Main()
    {
      // 创建连接工厂ConnectionFactory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建连接
      using (IConnection connection = connectionFactory.CreateConnection())
      //创建管道
      using (IModel channel = connection.CreateModel())
      {
        //创建交换机
        string exchange = "exchange1";
        //交换机采用直连模式
        channel.ExchangeDeclare(exchange, ExchangeType.Direct, false);

        //创建队列1
        channel.QueueDeclare(queue: "queue1",
                           durable: false, //是否持久化
                           exclusive: false, //是否独占
                           autoDelete: false, //是否自动删除
                           arguments: null); //参数
        //创建队列2
        channel.QueueDeclare(queue: "queue2",
                           durable: false, //是否持久化
                           exclusive: false, //是否独占
                           autoDelete: false, //是否自动删除
                           arguments: null); //参数

        //配置路由
        //routingKey为key1的消息将放入队列queue1中
        channel.QueueBind("queue1", exchange, "key1");
        //routingKey为key2的消息将放入队列queue1和queue2中
        channel.QueueBind("queue1", exchange, "key2"); 
        channel.QueueBind("queue2", exchange, "key2"); 

        //创建消息
        string message = "Hello World!";

        for (int i = 1; i <= 10; i++)
        {
          //发布消息
          channel.BasicPublish(exchange: exchange, //指定采用哪个交换机
                               routingKey: "key1", //路由键
                               basicProperties: null,
                               body: Encoding.Default.GetBytes($"第{i}条key为key1的消息:" + message));
          Console.WriteLine(" [x] Sent {0} to queue1", message);
        }

        for (int i = 1; i <= 5; i++)
        {
          //发布消息
          channel.BasicPublish(exchange: exchange, //指定采用哪个交换机
                               routingKey: "key2", //路由键
                               basicProperties: null,
                               body: Encoding.Default.GetBytes($"第{i}条key为key2的消息:" + message));
          Console.WriteLine(" [x] Sent {0} to queue2", message);
        }
      }
    }
  }
}

消费者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive
  {
    public static void Main()
    {
      //创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建默认连接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //创建管道
        using (IModel channel = connection.CreateModel())
        {
          //创建队列
          channel.QueueDeclare(
            queue: "queue1",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //创建消费者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定义回调事件,每次接收消息时触发
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消费消息
          channel.BasicConsume(queue: "queue1",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}

消费者2

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive2
  {
    public static void Main()
    {
      //创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建默认连接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //创建管道
        using (IModel channel = connection.CreateModel())
        {
          //创建队列
          channel.QueueDeclare(
            queue: "queue2",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //创建消费者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定义回调事件,每次接收消息时触发
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消费消息
          channel.BasicConsume(queue: "queue2",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}

先运行消费者1和消费者2
再运行生产者,发现
消费者1接收到了key为key1的10条消息以及key为key2的5条消息
消费者2只接收key为key2的5条消息
在这里插入图片描述

“Topics”

在这里插入图片描述
Topic模式和Routing模式的工作模式类似,也是一个生产者多个队列多个消费者,一个队列对应一个消费者,根据不同的条件,由交换机来决定将消息放入哪个消息队列中

区别在于Routing模式是精确匹配,也就是说一个routingKey为“key1”的消息,只能匹配到绑定了"key1"的队列
而Topic模式下可以进行模糊匹配
一个绑定了"Word.*“的队列,既能匹配routingKey为"Word.A"的消息,还能匹配routingKey为"Word.B”、"Word.C"的消息

*是一种通配符,可以匹配一个单词
#是另一种通配符,可以匹配零个或多个单词

这是RabbitMQ官网的解释
在这里插入图片描述
为了说明这些匹配规则,官网还给出了一些例子,那我就不再举例了
We created three bindings: Q1 is bound with binding key “.orange.” and Q2 with “..rabbit” and “lazy.#”.

These bindings can be summarised as:

Q1 is interested in all the orange animals.
Q2 wants to hear everything about rabbits, and everything about lazy animals.
A message with a routing key set to “quick.orange.rabbit” will be delivered to both queues. Message “lazy.orange.elephant” also will go to both of them. On the other hand “quick.orange.fox” will only go to the first queue, and “lazy.brown.fox” only to the second. “lazy.pink.rabbit” will be delivered to the second queue only once, even though it matches two bindings. “quick.brown.fox” doesn’t match any binding so it will be discarded.

What happens if we break our contract and send a message with one or four words, like “orange” or “quick.orange.male.rabbit”? Well, these messages won’t match any bindings and will be lost.

On the other hand “lazy.orange.male.rabbit”, even though it has four words, will match the last binding and will be delivered to the second queue.

demo

要实现这种模式,需要将交换机模式设置为Topic模式(ExchangeType.Topic),这样才能使用通配符*或#进行模糊匹配

生产者:生产带有不同routingKey的消息

using System;
using RabbitMQ.Client;
using System.Text;

namespace RabbitMQSender
{
  public class Send
  {
    public static void Main()
    {
      // 创建连接工厂ConnectionFactory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建连接
      using (IConnection connection = connectionFactory.CreateConnection())
      //创建管道
      using (IModel channel = connection.CreateModel())
      {
        //创建交换机
        string exchange = "exchange1";
        //交换机采用Topic模式
        channel.ExchangeDeclare(exchange, ExchangeType.Topic, false);

        //创建队列1
        channel.QueueDeclare(queue: "queue1",
                           durable: false, //是否持久化
                           exclusive: false, //是否独占
                           autoDelete: false, //是否自动删除
                           arguments: null); //参数
        //创建队列2
        channel.QueueDeclare(queue: "queue2",
                           durable: false, //是否持久化
                           exclusive: false, //是否独占
                           autoDelete: false, //是否自动删除
                           arguments: null); //参数

        //配置路由
        channel.QueueBind("queue1", exchange, "key1.*");
        channel.QueueBind("queue1", exchange, "key2");
        channel.QueueBind("queue2", exchange, "key2.#");
        channel.QueueBind("queue2", exchange, "*.key2.#");

        //创建消息
        string message = "Hello World!";

        //发布消息
        channel.BasicPublish(exchange: exchange, //指定采用哪个交换机
                             routingKey: "key1.A", //路由键
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key为key1.A的消息:" + message));
        channel.BasicPublish(exchange: exchange, //指定采用哪个交换机
                             routingKey: "key1.B", //路由键
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key为key1.B的消息:" + message));
        channel.BasicPublish(exchange: exchange, //指定采用哪个交换机
                             routingKey: "key2", //路由键
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key为key2的消息:" + message));
        channel.BasicPublish(exchange: exchange, //指定采用哪个交换机
                             routingKey: "key2.A.B.C", //路由键
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key为key2.A.B.C的消息:" + message));
        channel.BasicPublish(exchange: exchange, //指定采用哪个交换机
                             routingKey: "key1.key2.A", //路由键
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key为key1.key2.A的消息:" + message));
      }
    }
  }
}

消费者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive
  {
    public static void Main()
    {
      //创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建默认连接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //创建管道
        using (IModel channel = connection.CreateModel())
        {
          //创建队列
          channel.QueueDeclare(
            queue: "queue1",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //创建消费者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定义回调事件,每次接收消息时触发
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消费消息
          channel.BasicConsume(queue: "queue1",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}

消费者2

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive2
  {
    public static void Main()
    {
      //创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //创建默认连接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //创建管道
        using (IModel channel = connection.CreateModel())
        {
          //创建队列
          channel.QueueDeclare(
            queue: "queue2",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //创建消费者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定义回调事件,每次接收消息时触发
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消费消息
          channel.BasicConsume(queue: "queue2",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}

先运行消费者1和消费者2
再运行生产者,发现不同routingKey的消息被分发到了与之匹配的队列中
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-02 11:26:45  更:2021-09-02 11:29:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:02:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码