IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> .NET中间件 -- 消息队列- RabbitMQ -> 正文阅读

[大数据].NET中间件 -- 消息队列- RabbitMQ

RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。

消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列; 消费者只负责从消息队列中取出数据处理。

启用之前下载安装RabbitMQ

RabbitMQ防止数据丢失:

1.生产者发送消息时,使用Confirm机制,来确认消息到达消息队列

2.RabbitMQ端设置消息持久化

3. 消费者消费消息时,使用ACK事务机制,进行手动签收,正常消费则返回确认签收,如果出现异常,则返回拒绝签收重回队列。

生产者RabbitMQClient.cs

namespace RabbitMQClient
{
    class RabbitMQClient
    {
        public void SendMessage()
        {
            //实例化连接工厂
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",  //RabbitMQ服务在本地运行
                Port = 5672,                   //端口号
                UserName = "guest",    //默认用户名
                Password = "guest"     //默认密码
            };

            //建立连接
            using (var connection=factory.CreateConnection())
            {
                //创建信道
                using (var channel=connection.CreateModel())
                {
                    //声明队列
                    /* 创建一个名为 ProcessQueue 的消息队列,如果名称相同不会重复创建,参数解释:
                     * 参1:消息队列名称;
                     * 参2:是否持久化,持久化的队列会存盘,服务器重启后任然存在;
                     * 参3:是否为排他队列,排他队列表示仅对首次声明它的连接可见,并在连接断开时自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
                     * 参4:是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
                     * 参5:设置队列的其他一些参数,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
                     */
                    channel.QueueDeclare("ProcessQueue", durable: true, false, false, null);

                    //设置消息优先级(Priority)
                    //channel.QueueDeclare("ProcessQueue", durable: true, false, false, new Dictionary<string, object> {
                    //    { "x-rnax-priority", 10 }
                    //});

                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2; //持久化

                    //设置优先级,0-9,大的优先发送,先消费
                    //properties.Priority = 9;

                    //轮询调度(默认)
                    //此情形下,RabbitMQ把信息按顺序发送给每一个消费者。平均每个消费者将获得同等数量的信息。

                    /*公平分发
                     * 设置消息通道的基础 Qos 参数
                     * 只有当消费者回传消息标记后,才会将下一个消息发送给它,否则将消息分发给其它空闲的消费者
                     * prefetchCount:1 表示告诉 RabbitMQ, 在未接收到消费者确认消息之前,不在分发消息
                     */
                    channel.BasicQos(0, prefetchCount: 1, false);
                    //构建字节数据包
                    string message = State.Initialized.ToString();
                    var body = Encoding.UTF8.GetBytes(message);

                    //开启confirm模式
                    channel.ConfirmSelect();
                    //发送数据包
                    channel.BasicPublish("", "ProcessQueue", properties, body);
                    Console.WriteLine($"已发布消息:{message}");

                    message = State.Terminated.ToString();
                    body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish("", "ProcessQueue", properties, body);
                    Console.WriteLine($"已发布消息:{message}");

                    message = State.Error.ToString();
                    body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish("", "ProcessQueue", properties, body);
                    Console.WriteLine($"已发布消息:{message}");

                    if (channel.WaitForConfirms())
                    {
                        Console.WriteLine("所有消息均已发送到Broker");
                    }
                }
            }
        }

        //将生产者推送的信息发给每个订阅的消费者处理,使用Exchange实现
        public void PublishMessage()
        {
            var factory = new ConnectionFactory()
            {
                HostName ="localhost",
                Port = 5672,
            };

            using (var connection=factory.CreateConnection())
            {
                using (var channel=connection.CreateModel())
                {
                    //声明信息交换机,
                    //type: fanout(将信息分发到exchange上绑定的所有队列上);direct(消费者绑定的队列名称须和生产者发布指定的路由名称一致)
                    channel.ExchangeDeclare(exchange:"fanoutTest",type:ExchangeType.Fanout);

                    var message = "Kilter";
                    byte[] body = Encoding.UTF8.GetBytes(message);

                    //发布到指定exchange,fanout类型的会忽视routingKey的值,所以无需填写
                    channel.BasicPublish(exchange:"fanoutTest",routingKey:"",null,body);
                    Console.WriteLine($"发布消息:{message} 到exchange");

                }
            }
        }


    }

    public enum State
    {
        Terminated,             // 终止状态
        Initialized,            // 初始化状态
        Idle,                   // 空闲
        Busy,                   // 忙碌
        Pause,                  // 暂停状态

        Warning,                // 警告,人工消除后可继续
        Error,                  // 错误,人工消除后可继续
        FatalError,             // 致命错误,无法继续工作,仅可通过重新初始化
    }
}

消费者RabbitMQServer.cs

class RabbitMQServer
    {
        public  ConcurrentDictionary<string, MethodInfo> methodDic = new ConcurrentDictionary<string, MethodInfo>();
        public object Instance { get; set; }
        public RabbitMQServer()
        {
            SetExecutor();
        }

        public void SetExecutor()
        {
            Type type= Assembly.Load("RabbitMQServer").GetType("RabbitMQServer.Executor");
            Instance = Activator.CreateInstance(type);
            MethodInfo[] methods = type.GetMethods();
            foreach (var item in methods)
            {
                methodDic.TryAdd(item.Name,item);
            }
        }

        public void ReceiveMessage()
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                UserName = "guest",
                Password = "guest"
            };

            //tips: 不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            channel.QueueDeclare("ProcessQueue", durable:true, false, false, null);
            //构造消费者实例(指定消息通道)
            var consumer = new EventingBasicConsumer(channel);
            /*消费者消费消息(在当前通道中监听 ProcessQueue 队列,并进行消费)
            autoAck参数属性
            true:自动信息确认,当消费者接收到信息后,自动发送ack信号,不管信息是否处理完毕
            false:关闭自动信息确认,通过调用BasicAck方法手动进行信息确认,表示开启消息响应的功能, 当一条消息发送给消费者后,该消息必须得到消费者的“确认”后,RabbitMQ 才会将该消息删除。
            */
            channel.BasicConsume("ProcessQueue", autoAck: false, consumer);
            //绑定消息接收后的事件委托
            consumer.Received += (sender, e) =>
            {
                //ReadOnlyMemory<byte>转化为byte[] :  .ToArray()
                //string转化为ReadOnlyMemory<byte>: .AsMemory()
                var body = e.Body.ToArray();    //消息字节数组
                var message = Encoding.UTF8.GetString(body);  //消息内容
                Console.WriteLine($"已消费消息:{message}");
                //根据消息,添加逻辑
                if (methodDic.ContainsKey(message))
                {
                    methodDic[message].Invoke(Instance, null);
                }

                //消息响应,在消息处理完成后回传该消息标记, 只有当响应此消息标记后,该消息才会在消息队列中删除
                //deliveryTag参数是分发的标记,multiple表示是否确认多条。
                channel.BasicAck(e.DeliveryTag, false);

                //用于拒绝消息,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃。
                //channel.BasicReject(e.DeliveryTag, requeue: false);
            };

        }

        //多个消费者订阅一个生产者
        public void SubscribeMessage()
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672
            };

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();
            //声明信息交换机
            channel.ExchangeDeclare(exchange: "fanoutTest", type: ExchangeType.Fanout);
            //循环生产3个消费者,模拟此情境
            for (int i = 0; i < 3; i++)
            {
                //生成随机队列名称
                var queuename = channel.QueueDeclare().QueueName;
                //绑定队列到指定fanout类型exchange
                channel.QueueBind(queuename, "fanoutTest", "", null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    var body = e.Body.ToArray();    //消息字节数组
                    var message = Encoding.UTF8.GetString(body);  //消息内容
                    Console.WriteLine($"消费者{queuename}通过exchange已消费消息:{message}");
                };

                channel.BasicConsume(queuename, true, consumer);
            }
            
        }


    }

    public class Executor
    {
        public void Terminated()
        {
            Console.WriteLine("执行Terminated对应事件");
        }

        public void Initialized()
        {
            Console.WriteLine("执行Initialized对应事件");
        }

        public void Error()
        {
            Console.WriteLine("执行Error对应事件");
        }
    }

运行结果:

参考:

[1]?C#调用RabbitMQ实现消息队列 - kiba518 - 博客园 (cnblogs.com)

[2]?快速掌握RabbitMQ(三)——消息确认、持久化、优先级的C#实现 - 捞月亮的猴子 - 博客园 (cnblogs.com)

[3]?C# 消息队列之 RabbitMQ 进阶篇 - Abeam - 博客园 (cnblogs.com)

[4]?C#教程之RabbitMQ基础入门篇|C#教程 (xin3721.com)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章           查看所有文章
加:2021-08-12 16:40:01  更:2021-08-12 16:42:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 21:05:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码