RabbitMQ系列(四)-发布订阅模式
上一篇 : RabbitMQ系列(三)-工作队列模式
一、 介绍
1.1 交换机Exchange
1. rabbitmq中的消息首先会到达 exchange.
2. exchange根据指定的规则,将消息分发到指定的队列或者多个队列中,在Server中承担着从Produce
接收Message的责任。
3. 我们常用的模式有3个. Fanout、Direct、Topic
1.2 Fanout
Fanout模式的交换机会忽略routingkey,把消息发送到所有绑定了此交换机的队列上
1.3 Direct
完全匹配的工作模式,需要队列和交换机绑定并指定了routingkey.消息将被发送到指定routingkey的队列中
1.4 Topic
模糊匹配的工作模式,支持*通配符和#通配符
二、 测试
2.1 Fanout
- 生产者
var factory = new ConnectionFactory
{
HostName = "192.168.3.20",
Port = 5672,
UserName = "xxx",
Password = "xxx",
VirtualHost = "/"
};
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: "fanout_exchange",
type: "fanout");
string queueName1 = "fanout_queue1";
channel.QueueDeclare(queue: queueName1,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string queueName2 = "fanout_queue2";
channel.QueueDeclare(queue: queueName2,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string queueName3 = "fanout_queue3";
channel.QueueDeclare(queue: queueName3,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName1,
exchange: "fanout_exchange",
routingKey: "");
channel.QueueBind(queue: queueName2,
exchange: "fanout_exchange",
routingKey: "");
channel.QueueBind(queue: queueName3,
exchange: "fanout_exchange",
routingKey: "");
Console.WriteLine("请输入要发送的消息,如果发送完毕,输入ok即可.");
var sendMsg = "";
while (sendMsg != "ok")
{
sendMsg = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(sendMsg);
channel.BasicPublish(exchange: "fanout_exchange",
routingKey: "",
basicProperties: null,
body: body);
if (sendMsg == "ok")
{
Console.WriteLine("消息发送结束,按任意键退出.");
}
else
{
Console.WriteLine($"Send ====>{sendMsg}");
}
}
Console.ReadKey();
-
运行可以发现,三个队列已经创建并且有3条数据. -
点对点和工作队列模式我们用的是默认的交换机,而现在我们用的是自己创建的新交换机
- 消费者
#region fanout
string queueName = "fanout_queue1";
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: "fanout_exchange",
type: "fanout");
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName,
exchange: "fanout_exchange",
routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received {message}");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
#endregion
Console.ReadKey();
- 可以看到队列1中的消息已经被消费完了.
2.2 Direct
- 生产者,这里我将奇数1 3 5 三条消息发送到 d1–routingkey,偶数 2 4 6三条发送到d2–routingkey
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: "direct_exchange",
type: "direct");
string queueName1 = "direct_queue1";
channel.QueueDeclare(queue: queueName1,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string queueName2 = "direct_queue2";
channel.QueueDeclare(queue: queueName2,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName1,
exchange: "direct_exchange",
routingKey: "d1");
channel.QueueBind(queue: queueName2,
exchange: "direct_exchange",
routingKey: "d2");
Console.WriteLine("请输入要发送的消息,如果发送完毕,输入ok即可.");
var sendMsg = "";
while (sendMsg != "ok")
{
sendMsg = Console.ReadLine();
Console.WriteLine("输入routingkey");
var routingKey = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(sendMsg);
channel.BasicPublish(exchange: "direct_exchange",
routingKey: routingKey,
basicProperties: null,
body: body);
if (sendMsg == "ok")
{
Console.WriteLine("消息发送结束,按任意键退出.");
}
else
{
Console.WriteLine($"Send ====>{sendMsg}");
}
}
- 运行结果图
- 交换机绑定
- 消费者,这里博主还是只消费direct_queue1
string queueName = "direct_queue1";
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: "direct_exchange",
type: "direct");
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "d1");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received {message}, routingkey:{ea.RoutingKey}");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
2.3 Topic
- 生产者,两个正常routingkey,一个通配符routingkey
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: "topic_exchange",
type: "topic");
string queueName1 = "topic_queue1";
channel.QueueDeclare(queue: queueName1,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string queueName2 = "topic_queue2";
channel.QueueDeclare(queue: queueName2,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string queueName3 = "topic_queue3";
channel.QueueDeclare(queue: queueName3,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName1,
exchange: "topic_exchange",
routingKey: "test.info");
channel.QueueBind(queue: queueName2,
exchange: "topic_exchange",
routingKey: "test.debug");
channel.QueueBind(queue: queueName3,
exchange: "topic_exchange",
routingKey: "test.info.*");
Console.WriteLine("请输入要发送的消息,如果发送完毕,输入ok即可.");
var sendMsg = "";
while (sendMsg != "ok")
{
sendMsg = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(sendMsg);
channel.BasicPublish(exchange: "topic_exchange",
routingKey: "test.info",
basicProperties: null,
body: body);
channel.BasicPublish(exchange: "topic_exchange",
routingKey: "test.debug",
basicProperties: null,
body: body);
channel.BasicPublish(exchange: "topic_exchange",
routingKey: "test.info.a1",
basicProperties: null,
body: body);
channel.BasicPublish(exchange: "topic_exchange",
routingKey: "test.info.a2",
basicProperties: null,
body: body);
channel.BasicPublish(exchange: "topic_exchange",
routingKey: "test.info.a3",
basicProperties: null,
body: body);
if (sendMsg == "ok")
{
Console.WriteLine("消息发送结束,按任意键退出.");
}
else
{
Console.WriteLine($"Send ====>{sendMsg}");
}
}
- 因为通配符可以适配模糊匹配,所以 test.info.a1/test.info.a2/test.info.a3三个数据都被发送到了topic_queue3中,下图所示也符合预期
3. 消费者,这里我们消费通配符队列,其他队列指定队列名称即可消费.
string queueName = "topic_queue3";
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: "topic_exchange",
type: "topic");
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName,
exchange: "topic_exchange",
routingKey: "test.info.*");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received {message}, routingkey:{ea.RoutingKey}");
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
};
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
三、 用法优化
3.1 消费者消费能力
var consumer = new EventingBasicConsumer(channel);
channel.BasicQos(prefetchSize: 0,
prefetchCount: 1,
global: false);
3.2 手动ack
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received {message}, routingkey:{ea.RoutingKey}");
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
};
channel.BasicConsume(queue: queueName,
autoAck: false, #这里选false
consumer: consumer);
3.3 消息持久化
var basicProperties = channel.CreateBasicProperties();
basicProperties.DeliveryMode = 2;
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: basicProperties,
body: body);
四、 总结
1. 博主介绍了关于exchange交换机的3钟模式,当然还有其他的,但是博主也没用过.就不介绍了
2. 如有不对,欢迎指正.
|