.net RabbitMQ的几种工作模式
查看官网https://www.rabbitmq.com/getstarted.html可以看到RabbitMQ的几种工作模式
“Hello World”
一个生产者一个队列一个消费者,生产者生产消息放入队列,消费者从队列中接收消息消费 最基本的模式
demo
代码演示见这篇文章
“Work queues”
一个生产者一个队列多个消费者,生产者生产消费放入队列,消费者们从队列中接收消息消费,消费者之间存在竞争关系,也就是说,一个消息,一旦被A消费者消费,就不能被B消费者消费了。 这样工作模式的好处是,假设一个生产者1s生产2000条消息,而一个消费者1s只能消费1000条消息,为了提高消费的速度,可以设置多个消费者来对同一个队列进行消费
demo
生产者
using System;
using RabbitMQ.Client;
using System.Text;
namespace RabbitMQSender
{
public class Send
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "Hello World!";
for (int i = 1; i <= 10; i++)
{
channel.BasicPublish(exchange: "",
routingKey: "queue",
basicProperties: null,
body: Encoding.Default.GetBytes($"第{i}条消息:" + message));
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
}
}
消费者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQReceiver
{
class Receive
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, basicDeliverEventArgs) =>
{
ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
string message = Encoding.Default.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "queue",
autoAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
}
消费者2
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQReceiver
{
class Receive2
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, basicDeliverEventArgs) =>
{
ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
string message = Encoding.Default.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "queue",
autoAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
}
先运行消费者1和消费者2 再运行生产者,发现消费者1接收到了第1、3、5、7、9条消息,消费者2接收到了第2、4、6、8、10条消息
“Publish/Subscribe”
一个生产者多个队列多个消费者,一个队列对应一个消费者,生产者生产的消息将放进每个队列 消费者只关心它的队列并将其中的消息消费,消费者之间不存在竞争关系。
如果不同的消费者收到同一个消息后需要执行不同的操作,例如消费者A接收到X消息后需要在控制台打印,而消费者B接收到X消息后需要将X消息存入数据库。像这种情形下,"Work queues"模式肯定是行不通的,因为"Work queues"中的消息只能被一个消费者处理,这意味着消费者A接收到X消息后就会把X消息消费掉了,那么消费者B就接收不到X消息了
demo
要实现这种模式,需要将交换机模式设置为扇形模式(ExchangeType.Fanout),这样它就会将消息分发到每一个与之绑定的队列中
生产者
using System;
using RabbitMQ.Client;
using System.Text;
namespace RabbitMQSender
{
public class Send
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
using (IModel channel = connection.CreateModel())
{
string exchange = "exchange1";
channel.ExchangeDeclare(exchange, ExchangeType.Fanout, false);
channel.QueueDeclare(queue: "queue1",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueDeclare(queue: "queue2",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind("queue1", exchange, "");
channel.QueueBind("queue2", exchange, "");
string message = "Hello World!";
for (int i = 1; i <= 10; i++)
{
channel.BasicPublish(exchange: exchange,
routingKey: "",
basicProperties: null,
body: Encoding.Default.GetBytes($"第{i}条消息:" + message));
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
}
}
消费者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQReceiver
{
class Receive
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "queue1",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, basicDeliverEventArgs) =>
{
ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
string message = Encoding.Default.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "queue1",
autoAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
}
消费者2
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQReceiver
{
class Receive2
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "queue2",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, basicDeliverEventArgs) =>
{
ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
string message = Encoding.Default.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "queue2",
autoAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
}
先运行消费者1和消费者2 再运行生产者,发现消费者1和消费者2都接收到了生产者生产的10条消息
“Routing”
一个生产者多个队列多个消费者,一个队列对应一个消费者,生产者生产的消息不是简单的放入每个队列,而是根据不同的条件,由交换机来决定将消息放入哪个消息队列中。 例如不太重要的消息只需要在控制台打印即可,不需要存数据库了, 而重要的消息既需要在控制台打印,还需要存数据库
demo
要实现这种模式,需要将交换机模式设置为直连模式(ExchangeType.Direct), 并配置路由规则,也就是指定哪个routingKey对应哪个消息队列 在发布消息时需要指定routingKey,这样交换机就能根据路由规则来进行消息的分发了
生产者
using System;
using RabbitMQ.Client;
using System.Text;
namespace RabbitMQSender
{
public class Send
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
using (IModel channel = connection.CreateModel())
{
string exchange = "exchange1";
channel.ExchangeDeclare(exchange, ExchangeType.Direct, false);
channel.QueueDeclare(queue: "queue1",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueDeclare(queue: "queue2",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind("queue1", exchange, "key1");
channel.QueueBind("queue1", exchange, "key2");
channel.QueueBind("queue2", exchange, "key2");
string message = "Hello World!";
for (int i = 1; i <= 10; i++)
{
channel.BasicPublish(exchange: exchange,
routingKey: "key1",
basicProperties: null,
body: Encoding.Default.GetBytes($"第{i}条key为key1的消息:" + message));
Console.WriteLine(" [x] Sent {0} to queue1", message);
}
for (int i = 1; i <= 5; i++)
{
channel.BasicPublish(exchange: exchange,
routingKey: "key2",
basicProperties: null,
body: Encoding.Default.GetBytes($"第{i}条key为key2的消息:" + message));
Console.WriteLine(" [x] Sent {0} to queue2", message);
}
}
}
}
}
消费者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQReceiver
{
class Receive
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "queue1",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, basicDeliverEventArgs) =>
{
ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
string message = Encoding.Default.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "queue1",
autoAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
}
消费者2
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQReceiver
{
class Receive2
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "queue2",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, basicDeliverEventArgs) =>
{
ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
string message = Encoding.Default.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "queue2",
autoAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
}
先运行消费者1和消费者2 再运行生产者,发现 消费者1接收到了key为key1的10条消息以及key为key2的5条消息 消费者2只接收key为key2的5条消息
“Topics”
Topic模式和Routing模式的工作模式类似,也是一个生产者多个队列多个消费者,一个队列对应一个消费者,根据不同的条件,由交换机来决定将消息放入哪个消息队列中
区别在于Routing模式是精确匹配,也就是说一个routingKey为“key1”的消息,只能匹配到绑定了"key1"的队列 而Topic模式下可以进行模糊匹配 一个绑定了"Word.*“的队列,既能匹配routingKey为"Word.A"的消息,还能匹配routingKey为"Word.B”、"Word.C"的消息
*是一种通配符,可以匹配一个单词 #是另一种通配符,可以匹配零个或多个单词
这是RabbitMQ官网的解释 为了说明这些匹配规则,官网还给出了一些例子,那我就不再举例了 We created three bindings: Q1 is bound with binding key “.orange.” and Q2 with “..rabbit” and “lazy.#”.
These bindings can be summarised as:
Q1 is interested in all the orange animals. Q2 wants to hear everything about rabbits, and everything about lazy animals. A message with a routing key set to “quick.orange.rabbit” will be delivered to both queues. Message “lazy.orange.elephant” also will go to both of them. On the other hand “quick.orange.fox” will only go to the first queue, and “lazy.brown.fox” only to the second. “lazy.pink.rabbit” will be delivered to the second queue only once, even though it matches two bindings. “quick.brown.fox” doesn’t match any binding so it will be discarded.
What happens if we break our contract and send a message with one or four words, like “orange” or “quick.orange.male.rabbit”? Well, these messages won’t match any bindings and will be lost.
On the other hand “lazy.orange.male.rabbit”, even though it has four words, will match the last binding and will be delivered to the second queue.
demo
要实现这种模式,需要将交换机模式设置为Topic模式(ExchangeType.Topic),这样才能使用通配符*或#进行模糊匹配
生产者:生产带有不同routingKey的消息
using System;
using RabbitMQ.Client;
using System.Text;
namespace RabbitMQSender
{
public class Send
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
using (IModel channel = connection.CreateModel())
{
string exchange = "exchange1";
channel.ExchangeDeclare(exchange, ExchangeType.Topic, false);
channel.QueueDeclare(queue: "queue1",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueDeclare(queue: "queue2",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind("queue1", exchange, "key1.*");
channel.QueueBind("queue1", exchange, "key2");
channel.QueueBind("queue2", exchange, "key2.#");
channel.QueueBind("queue2", exchange, "*.key2.#");
string message = "Hello World!";
channel.BasicPublish(exchange: exchange,
routingKey: "key1.A",
basicProperties: null,
body: Encoding.Default.GetBytes($"key为key1.A的消息:" + message));
channel.BasicPublish(exchange: exchange,
routingKey: "key1.B",
basicProperties: null,
body: Encoding.Default.GetBytes($"key为key1.B的消息:" + message));
channel.BasicPublish(exchange: exchange,
routingKey: "key2",
basicProperties: null,
body: Encoding.Default.GetBytes($"key为key2的消息:" + message));
channel.BasicPublish(exchange: exchange,
routingKey: "key2.A.B.C",
basicProperties: null,
body: Encoding.Default.GetBytes($"key为key2.A.B.C的消息:" + message));
channel.BasicPublish(exchange: exchange,
routingKey: "key1.key2.A",
basicProperties: null,
body: Encoding.Default.GetBytes($"key为key1.key2.A的消息:" + message));
}
}
}
}
消费者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQReceiver
{
class Receive
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "queue1",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, basicDeliverEventArgs) =>
{
ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
string message = Encoding.Default.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "queue1",
autoAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
}
消费者2
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQReceiver
{
class Receive2
{
public static void Main()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "queue2",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, basicDeliverEventArgs) =>
{
ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
string message = Encoding.Default.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "queue2",
autoAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
}
先运行消费者1和消费者2 再运行生产者,发现不同routingKey的消息被分发到了与之匹配的队列中
|