RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。
消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。
“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列; 消费者只负责从消息队列中取出数据处理。
启用之前下载安装RabbitMQ
RabbitMQ防止数据丢失:
1.生产者发送消息时,使用Confirm机制,来确认消息到达消息队列
2.RabbitMQ端设置消息持久化
3. 消费者消费消息时,使用ACK事务机制,进行手动签收,正常消费则返回确认签收,如果出现异常,则返回拒绝签收重回队列。
生产者RabbitMQClient.cs
namespace RabbitMQClient
{
class RabbitMQClient
{
public void SendMessage()
{
//实例化连接工厂
var factory = new ConnectionFactory()
{
HostName = "localhost", //RabbitMQ服务在本地运行
Port = 5672, //端口号
UserName = "guest", //默认用户名
Password = "guest" //默认密码
};
//建立连接
using (var connection=factory.CreateConnection())
{
//创建信道
using (var channel=connection.CreateModel())
{
//声明队列
/* 创建一个名为 ProcessQueue 的消息队列,如果名称相同不会重复创建,参数解释:
* 参1:消息队列名称;
* 参2:是否持久化,持久化的队列会存盘,服务器重启后任然存在;
* 参3:是否为排他队列,排他队列表示仅对首次声明它的连接可见,并在连接断开时自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
* 参4:是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
* 参5:设置队列的其他一些参数,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
*/
channel.QueueDeclare("ProcessQueue", durable: true, false, false, null);
//设置消息优先级(Priority)
//channel.QueueDeclare("ProcessQueue", durable: true, false, false, new Dictionary<string, object> {
// { "x-rnax-priority", 10 }
//});
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; //持久化
//设置优先级,0-9,大的优先发送,先消费
//properties.Priority = 9;
//轮询调度(默认)
//此情形下,RabbitMQ把信息按顺序发送给每一个消费者。平均每个消费者将获得同等数量的信息。
/*公平分发
* 设置消息通道的基础 Qos 参数
* 只有当消费者回传消息标记后,才会将下一个消息发送给它,否则将消息分发给其它空闲的消费者
* prefetchCount:1 表示告诉 RabbitMQ, 在未接收到消费者确认消息之前,不在分发消息
*/
channel.BasicQos(0, prefetchCount: 1, false);
//构建字节数据包
string message = State.Initialized.ToString();
var body = Encoding.UTF8.GetBytes(message);
//开启confirm模式
channel.ConfirmSelect();
//发送数据包
channel.BasicPublish("", "ProcessQueue", properties, body);
Console.WriteLine($"已发布消息:{message}");
message = State.Terminated.ToString();
body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "ProcessQueue", properties, body);
Console.WriteLine($"已发布消息:{message}");
message = State.Error.ToString();
body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "ProcessQueue", properties, body);
Console.WriteLine($"已发布消息:{message}");
if (channel.WaitForConfirms())
{
Console.WriteLine("所有消息均已发送到Broker");
}
}
}
}
//将生产者推送的信息发给每个订阅的消费者处理,使用Exchange实现
public void PublishMessage()
{
var factory = new ConnectionFactory()
{
HostName ="localhost",
Port = 5672,
};
using (var connection=factory.CreateConnection())
{
using (var channel=connection.CreateModel())
{
//声明信息交换机,
//type: fanout(将信息分发到exchange上绑定的所有队列上);direct(消费者绑定的队列名称须和生产者发布指定的路由名称一致)
channel.ExchangeDeclare(exchange:"fanoutTest",type:ExchangeType.Fanout);
var message = "Kilter";
byte[] body = Encoding.UTF8.GetBytes(message);
//发布到指定exchange,fanout类型的会忽视routingKey的值,所以无需填写
channel.BasicPublish(exchange:"fanoutTest",routingKey:"",null,body);
Console.WriteLine($"发布消息:{message} 到exchange");
}
}
}
}
public enum State
{
Terminated, // 终止状态
Initialized, // 初始化状态
Idle, // 空闲
Busy, // 忙碌
Pause, // 暂停状态
Warning, // 警告,人工消除后可继续
Error, // 错误,人工消除后可继续
FatalError, // 致命错误,无法继续工作,仅可通过重新初始化
}
}
消费者RabbitMQServer.cs
class RabbitMQServer
{
public ConcurrentDictionary<string, MethodInfo> methodDic = new ConcurrentDictionary<string, MethodInfo>();
public object Instance { get; set; }
public RabbitMQServer()
{
SetExecutor();
}
public void SetExecutor()
{
Type type= Assembly.Load("RabbitMQServer").GetType("RabbitMQServer.Executor");
Instance = Activator.CreateInstance(type);
MethodInfo[] methods = type.GetMethods();
foreach (var item in methods)
{
methodDic.TryAdd(item.Name,item);
}
}
public void ReceiveMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest"
};
//tips: 不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare("ProcessQueue", durable:true, false, false, null);
//构造消费者实例(指定消息通道)
var consumer = new EventingBasicConsumer(channel);
/*消费者消费消息(在当前通道中监听 ProcessQueue 队列,并进行消费)
autoAck参数属性
true:自动信息确认,当消费者接收到信息后,自动发送ack信号,不管信息是否处理完毕
false:关闭自动信息确认,通过调用BasicAck方法手动进行信息确认,表示开启消息响应的功能, 当一条消息发送给消费者后,该消息必须得到消费者的“确认”后,RabbitMQ 才会将该消息删除。
*/
channel.BasicConsume("ProcessQueue", autoAck: false, consumer);
//绑定消息接收后的事件委托
consumer.Received += (sender, e) =>
{
//ReadOnlyMemory<byte>转化为byte[] : .ToArray()
//string转化为ReadOnlyMemory<byte>: .AsMemory()
var body = e.Body.ToArray(); //消息字节数组
var message = Encoding.UTF8.GetString(body); //消息内容
Console.WriteLine($"已消费消息:{message}");
//根据消息,添加逻辑
if (methodDic.ContainsKey(message))
{
methodDic[message].Invoke(Instance, null);
}
//消息响应,在消息处理完成后回传该消息标记, 只有当响应此消息标记后,该消息才会在消息队列中删除
//deliveryTag参数是分发的标记,multiple表示是否确认多条。
channel.BasicAck(e.DeliveryTag, false);
//用于拒绝消息,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃。
//channel.BasicReject(e.DeliveryTag, requeue: false);
};
}
//多个消费者订阅一个生产者
public void SubscribeMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//声明信息交换机
channel.ExchangeDeclare(exchange: "fanoutTest", type: ExchangeType.Fanout);
//循环生产3个消费者,模拟此情境
for (int i = 0; i < 3; i++)
{
//生成随机队列名称
var queuename = channel.QueueDeclare().QueueName;
//绑定队列到指定fanout类型exchange
channel.QueueBind(queuename, "fanoutTest", "", null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body.ToArray(); //消息字节数组
var message = Encoding.UTF8.GetString(body); //消息内容
Console.WriteLine($"消费者{queuename}通过exchange已消费消息:{message}");
};
channel.BasicConsume(queuename, true, consumer);
}
}
}
public class Executor
{
public void Terminated()
{
Console.WriteLine("执行Terminated对应事件");
}
public void Initialized()
{
Console.WriteLine("执行Initialized对应事件");
}
public void Error()
{
Console.WriteLine("执行Error对应事件");
}
}
运行结果:
参考:
[1]?C#调用RabbitMQ实现消息队列 - kiba518 - 博客园 (cnblogs.com)
[2]?快速掌握RabbitMQ(三)——消息确认、持久化、优先级的C#实现 - 捞月亮的猴子 - 博客园 (cnblogs.com)
[3]?C# 消息队列之 RabbitMQ 进阶篇 - Abeam - 博客园 (cnblogs.com)
[4]?C#教程之RabbitMQ基础入门篇|C#教程 (xin3721.com)
|