IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ系列(四)-Exchange -> 正文阅读

[大数据]RabbitMQ系列(四)-Exchange

RabbitMQ系列(四)-发布订阅模式

上一篇 : RabbitMQ系列(三)-工作队列模式

一、 介绍

1.1 交换机Exchange

1. rabbitmq中的消息首先会到达 exchange.  
2. exchange根据指定的规则,将消息分发到指定的队列或者多个队列中,在Server中承担着从Produce
接收Message的责任。
3. 我们常用的模式有3个. Fanout、Direct、Topic 

1.2 Fanout

Fanout模式的交换机会忽略routingkey,把消息发送到所有绑定了此交换机的队列上

1.3 Direct

完全匹配的工作模式,需要队列和交换机绑定并指定了routingkey.消息将被发送到指定routingkey的队列中

1.4 Topic

模糊匹配的工作模式,支持*通配符和#通配符

二、 测试

2.1 Fanout

  1. 生产者
var factory = new ConnectionFactory
            {
                HostName = "192.168.3.20",
                Port = 5672,
                UserName = "xxx",
                Password = "xxx",
                VirtualHost = "/"
            };
            using var conn = factory.CreateConnection();
            using var channel = conn.CreateModel();
            //声明交换机
            channel.ExchangeDeclare(exchange: "fanout_exchange",
                                    type: "fanout");
            //声明3个队列
            string queueName1 = "fanout_queue1";
            channel.QueueDeclare(queue: queueName1,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            string queueName2 = "fanout_queue2";
            channel.QueueDeclare(queue: queueName2,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            string queueName3 = "fanout_queue3";
            channel.QueueDeclare(queue: queueName3,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            //绑定队列到交换机
            channel.QueueBind(queue: queueName1,
                              exchange: "fanout_exchange",
                              routingKey: "");
            channel.QueueBind(queue: queueName2,
                              exchange: "fanout_exchange",
                              routingKey: "");
            channel.QueueBind(queue: queueName3,
                              exchange: "fanout_exchange",
                              routingKey: "");
            Console.WriteLine("请输入要发送的消息,如果发送完毕,输入ok即可.");
            var sendMsg = "";
            while (sendMsg != "ok")
            {
                sendMsg = Console.ReadLine();
                var body = Encoding.UTF8.GetBytes(sendMsg);
                channel.BasicPublish(exchange: "fanout_exchange",
                                     routingKey: "",
                                     basicProperties: null,
                                     body: body);
                if (sendMsg == "ok")
                {
                    Console.WriteLine("消息发送结束,按任意键退出.");
                }
                else
                {
                    Console.WriteLine($"Send ====>{sendMsg}");
                }
            }

            Console.ReadKey();
  1. 运行可以发现,三个队列已经创建并且有3条数据.
    在这里插入图片描述

  2. 点对点和工作队列模式我们用的是默认的交换机,而现在我们用的是自己创建的新交换机

在这里插入图片描述

  1. 消费者
//这里博主只消费一个队列
#region fanout
            string queueName = "fanout_queue1";
            using var conn = factory.CreateConnection();
            using var channel = conn.CreateModel();
            //声明交换机
            channel.ExchangeDeclare(exchange: "fanout_exchange",
                                    type: "fanout");
            channel.QueueDeclare(queue: queueName,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            //绑定队列到交换机
            channel.QueueBind(queue: queueName,
                              exchange: "fanout_exchange",
                              routingKey: "");
            var consumer = new EventingBasicConsumer(channel);
            
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($"Received {message}");
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);
            #endregion

            Console.ReadKey();

在这里插入图片描述

  1. 可以看到队列1中的消息已经被消费完了.

2.2 Direct

  1. 生产者,这里我将奇数1 3 5 三条消息发送到 d1–routingkey,偶数 2 4 6三条发送到d2–routingkey
 using var conn = factory.CreateConnection();
            using var channel = conn.CreateModel();
            //声明交换机
            channel.ExchangeDeclare(exchange: "direct_exchange",
                                    type: "direct");
            //声明2个队列
            string queueName1 = "direct_queue1";
            channel.QueueDeclare(queue: queueName1,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            string queueName2 = "direct_queue2";
            channel.QueueDeclare(queue: queueName2,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            //绑定队列到交换机
            channel.QueueBind(queue: queueName1,
                              exchange: "direct_exchange",
                              routingKey: "d1");
            channel.QueueBind(queue: queueName2,
                              exchange: "direct_exchange",
                              routingKey: "d2");
            Console.WriteLine("请输入要发送的消息,如果发送完毕,输入ok即可.");
            var sendMsg = "";
            while (sendMsg != "ok")
            {
                sendMsg = Console.ReadLine();
                Console.WriteLine("输入routingkey");
                var routingKey = Console.ReadLine();
                var body = Encoding.UTF8.GetBytes(sendMsg);
                channel.BasicPublish(exchange: "direct_exchange",
                                     routingKey: routingKey,
                                     basicProperties: null,
                                     body: body);
                if (sendMsg == "ok")
                {
                    Console.WriteLine("消息发送结束,按任意键退出.");
                }
                else
                {
                    Console.WriteLine($"Send ====>{sendMsg}");
                }
            }
  1. 运行结果图
    在这里插入图片描述
  2. 交换机绑定
    在这里插入图片描述
  3. 消费者,这里博主还是只消费direct_queue1
string queueName = "direct_queue1";
            using var conn = factory.CreateConnection();
            using var channel = conn.CreateModel();
            //声明交换机
            channel.ExchangeDeclare(exchange: "direct_exchange",
                                    type: "direct");
            channel.QueueDeclare(queue: queueName,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            //绑定队列到交换机
            channel.QueueBind(queue: queueName,
                              exchange: "direct_exchange",
                              routingKey: "d1");
            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($"Received {message}, routingkey:{ea.RoutingKey}");
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

在这里插入图片描述

2.3 Topic

  1. 生产者,两个正常routingkey,一个通配符routingkey
using var conn = factory.CreateConnection();
            using var channel = conn.CreateModel();
            //声明交换机
            channel.ExchangeDeclare(exchange: "topic_exchange",
                                    type: "topic");
            //声明3个队列
            string queueName1 = "topic_queue1";
            channel.QueueDeclare(queue: queueName1,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            string queueName2 = "topic_queue2";
            channel.QueueDeclare(queue: queueName2,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            string queueName3 = "topic_queue3";
            channel.QueueDeclare(queue: queueName3,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            //绑定队列到交换机
            channel.QueueBind(queue: queueName1,
                              exchange: "topic_exchange",
                              routingKey: "test.info");
            channel.QueueBind(queue: queueName2,
                              exchange: "topic_exchange",
                              routingKey: "test.debug");
            channel.QueueBind(queue: queueName3,
                              exchange: "topic_exchange",
                              routingKey: "test.info.*");
            Console.WriteLine("请输入要发送的消息,如果发送完毕,输入ok即可.");
            var sendMsg = "";
            while (sendMsg != "ok")
            {
                sendMsg = Console.ReadLine();
                var body = Encoding.UTF8.GetBytes(sendMsg);
                channel.BasicPublish(exchange: "topic_exchange",
                                     routingKey: "test.info",
                                     basicProperties: null,
                                     body: body);
                channel.BasicPublish(exchange: "topic_exchange",
                                     routingKey: "test.debug",
                                     basicProperties: null,
                                     body: body);
                channel.BasicPublish(exchange: "topic_exchange",
                                     routingKey: "test.info.a1",
                                     basicProperties: null,
                                     body: body);
                channel.BasicPublish(exchange: "topic_exchange",
                                     routingKey: "test.info.a2",
                                     basicProperties: null,
                                     body: body);
                channel.BasicPublish(exchange: "topic_exchange",
                                     routingKey: "test.info.a3",
                                     basicProperties: null,
                                     body: body);
                if (sendMsg == "ok")
                {
                    Console.WriteLine("消息发送结束,按任意键退出.");
                }
                else
                {
                    Console.WriteLine($"Send ====>{sendMsg}");
                }
            }
  1. 因为通配符可以适配模糊匹配,所以 test.info.a1/test.info.a2/test.info.a3三个数据都被发送到了topic_queue3中,下图所示也符合预期

在这里插入图片描述
3. 消费者,这里我们消费通配符队列,其他队列指定队列名称即可消费.

string queueName = "topic_queue3";
            using var conn = factory.CreateConnection();
            using var channel = conn.CreateModel();
            //声明交换机
            channel.ExchangeDeclare(exchange: "topic_exchange",
                                    type: "topic");
            channel.QueueDeclare(queue: queueName,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
            //绑定队列到交换机
            channel.QueueBind(queue: queueName,
                              exchange: "topic_exchange",
                              routingKey: "test.info.*");
            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($"Received {message}, routingkey:{ea.RoutingKey}");
                channel.BasicAck(deliveryTag: ea.DeliveryTag, 
                                 multiple: false);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: false,
                                 consumer: consumer);

在这里插入图片描述

三、 用法优化

3.1 消费者消费能力

var consumer = new EventingBasicConsumer(channel);
//在创建了消费者对象之后,用prefetchCount来限定消费者可以同时消费处理的数量.
//如果设置为1,那么只能消费1个并且这个消费完成之前,rabbitmq不会下发新的消息
//根据情况可以适当增大来提升消费速度
channel.BasicQos(prefetchSize: 0,
                 prefetchCount: 1,
                 global: false);

3.2 手动ack

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($"Received {message}, routingkey:{ea.RoutingKey}");        
    //加上下面这行,如果上面部分的业务处理完成,那么就自己手动签收
    //如果不加这个会导致重复消费问题
    channel.BasicAck(deliveryTag: ea.DeliveryTag, 
                     multiple: false);
};
//自己手动签收,保证正确消费
channel.BasicConsume(queue: queueName,
                     autoAck: false, #这里选false
                     consumer: consumer);

3.3 消息持久化

//创建一个这个对象
var basicProperties = channel.CreateBasicProperties();
//1为非持久化  2为持久化
basicProperties.DeliveryMode = 2;
//把第三个参数赋值即可达到持久化的效果
channel.BasicPublish(exchange: "",
                     routingKey: queueName,
                     basicProperties: basicProperties,
                     body: body);

四、 总结

1. 博主介绍了关于exchange交换机的3钟模式,当然还有其他的,但是博主也没用过.就不介绍了
2. 如有不对,欢迎指正.
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-06 11:13:37  更:2021-09-06 11:16:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 20:56:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码