IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 分布式异步队列学习总结6(交换机FanoutExchange) -> 正文阅读

[大数据]分布式异步队列学习总结6(交换机FanoutExchange)

Fanout类型交换机跟Direct类型交换机相比,没有了key的概念,会把所有发送到该交换机的消息转发到所有与它绑定的队列中。

应用场景:发布订阅(观察者模式)

应用案例:总公司下面有2个分公司,总公司要向2个分公司同时发送通知信息。总公司在这里的角色就是生产者(发布者),两个分公司就是消费者(订阅者)。

项目结构:

?

代码实例:

生产者

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// Fanout类型交换机
    /// </summary>
    public class FanoutExchange
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("生产者已准备就绪......");

                    //声明队列1
                    channel.QueueDeclare(queue: "FanoutQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明队列2
                    channel.QueueDeclare(queue: "FanoutQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机exchange
                    channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);

                    //绑定exchange和queue1
                    channel.QueueBind(queue: "FanoutQueue01", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);

                    //绑定exchange和queue2
                    channel.QueueBind(queue: "FanoutQueue02", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);

                    int i = 1;
                    //待发送的消息
                    {
                        while (true)
                        {
                            var message = $"通知{i}";
                            var body = Encoding.UTF8.GetBytes(message);

                            //基本发布
                            channel.BasicPublish(exchange: "FanoutExchange", routingKey: string.Empty, basicProperties: null, body: body);
                            Console.WriteLine($"通知{message}已发送到队列");
                            Thread.Sleep(2000);
                            i++;
                        }
                    }
                }
            }
        }
    }
}

main方法:

class Program
    {
        static void Main(string[] args)
        {
            FanoutExchange.Show();

            Console.ReadLine();
        }
    }

消费者一:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
    /// <summary>
    /// 消费者 Fanout类型交换机
    /// </summary>
    public class FanoutExchange
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //基于当前信道创建事件
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"接收成功{message} 邮件通知...");
                        };

                        Console.WriteLine("通知服务1准备就绪......");

                        //处理消息
                        channel.BasicConsume(queue: "FanoutQueue01", autoAck: true, consumer: consumer);

                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }
    }
}

main方法:

 class Program
    {
        static void Main(string[] args)
        {
            FanoutExchange.Show();
            Console.ReadLine();
        }
    }

消费者二:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageConsumer_02.MessageConsumer
{
    /// <summary>
    /// 消费者 Fanout类型交换机
    /// </summary>
    public class FanoutExchange
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //基于当前信道创建事件
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"接收成功{message} 邮件通知...");
                        };

                        Console.WriteLine("通知服务2准备就绪......");

                        //处理消息
                        channel.BasicConsume(queue: "FanoutQueue02", autoAck: true, consumer: consumer);

                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }
    }
}

main方法:

class Program
    {
        static void Main(string[] args)
        {
            FanoutExchange.Show();

            Console.ReadLine();
        }
    }

运行结果:

?

总结:需要发布订阅模式的时候,就可以选择使用Fanout类型的交换机。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-30 12:07:06  更:2021-08-30 12:08:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 15:58:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码