Fanout类型交换机跟Direct类型交换机相比,没有了key的概念,会把所有发送到该交换机的消息转发到所有与它绑定的队列中。
应用场景:发布订阅(观察者模式)
应用案例:总公司下面有2个分公司,总公司要向2个分公司同时发送通知信息。总公司在这里的角色就是生产者(发布者),两个分公司就是消费者(订阅者)。
项目结构:
?
代码实例:
生产者
using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// Fanout类型交换机
/// </summary>
public class FanoutExchange
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("生产者已准备就绪......");
//声明队列1
channel.QueueDeclare(queue: "FanoutQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明队列2
channel.QueueDeclare(queue: "FanoutQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明交换机exchange
channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
//绑定exchange和queue1
channel.QueueBind(queue: "FanoutQueue01", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);
//绑定exchange和queue2
channel.QueueBind(queue: "FanoutQueue02", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);
int i = 1;
//待发送的消息
{
while (true)
{
var message = $"通知{i}";
var body = Encoding.UTF8.GetBytes(message);
//基本发布
channel.BasicPublish(exchange: "FanoutExchange", routingKey: string.Empty, basicProperties: null, body: body);
Console.WriteLine($"通知{message}已发送到队列");
Thread.Sleep(2000);
i++;
}
}
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
FanoutExchange.Show();
Console.ReadLine();
}
}
消费者一:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
/// <summary>
/// 消费者 Fanout类型交换机
/// </summary>
public class FanoutExchange
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接收成功{message} 邮件通知...");
};
Console.WriteLine("通知服务1准备就绪......");
//处理消息
channel.BasicConsume(queue: "FanoutQueue01", autoAck: true, consumer: consumer);
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
FanoutExchange.Show();
Console.ReadLine();
}
}
消费者二:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_02.MessageConsumer
{
/// <summary>
/// 消费者 Fanout类型交换机
/// </summary>
public class FanoutExchange
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接收成功{message} 邮件通知...");
};
Console.WriteLine("通知服务2准备就绪......");
//处理消息
channel.BasicConsume(queue: "FanoutQueue02", autoAck: true, consumer: consumer);
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
FanoutExchange.Show();
Console.ReadLine();
}
}
运行结果:
?
总结:需要发布订阅模式的时候,就可以选择使用Fanout类型的交换机。
|