IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 分布式异步队列学习总结4(生产者和消费者) -> 正文阅读

[大数据]分布式异步队列学习总结4(生产者和消费者)

生产者向RabbitMQ服务发送消息,消费者从RabbitMQ服务中获取消息。

单生产者单消费者模式

使用程序演示单生产者和单消费者模型:

项目结构为两个.net Core 控制台程序应用程序

程序需要添加引用 RabbitMQ.Client

AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer(消费者)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
    public class ProductionConsumer
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //声明队列
                        channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);

                        //声明路由
                        channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                        //绑定队列和路由
                        channel.QueueBind(queue: "OnlyProducerMessage", exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, arguments: null);


                        //基于当前信道创建事件
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"消费者01 接收消息:{message}");
                        };

                        channel.BasicConsume(queue: "OnlyProducerMessage", autoAck: true, consumer: consumer);
                        Console.WriteLine("按下回车退出啊");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }
    }
}

?Main程序中调用下Show方法?

class Program
    {
        static void Main(string[] args)
        {
            ProductionConsumer.Show();
        }
    }

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer(生产者)

using RabbitMQ.Client;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// 生产者
    /// </summary>
    public class ProductionConsumer
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {
                 
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("生产者已准备就绪......");

                    int i = 1;
                    {
                        while (true)
                        {
                            IBasicProperties basicProperties = channel.CreateBasicProperties();
                            basicProperties.Persistent = true;

                            string message = $"消息{i}";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, basicProperties: basicProperties, body: body);
                            Console.WriteLine($"消息{message}已发送");
                            i++;
                        }
                    }
                }
            }

        }
    }
}

Main程序中调用下Show方法?

class Program
    {
        static void Main(string[] args)
        {
            ProductionConsumer.Show();
        }
    }

启动两个程序:

启动消费者,在项目目录中进入命令窗口执行dotnet 启动命令

以同样的方式,启动生产者

消费者和生产者都启动以后,可以看到两个命令窗口打印如下信息:

多生产者多消费者模式

模拟多个生产者和多个消费者的情况

AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer(消费者)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
    public class MultiProductionConsumer
    {
        public static void Show1()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (var connection = factory.CreateConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //声明队列
                        //channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);

                        //声明路由
                        //channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                        //绑定队列和路由
                        //channel.QueueBind(queue: "OnlyProducerMessage", exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, arguments: null);


                        //基于当前信道创建事件
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"消费者01 接收消息:{message}");
                        };

                        channel.BasicConsume(queue: "MultiProducerMessage", autoAck: true, consumer: consumer);
                        Console.WriteLine("按下回车退出啊");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }


        public static void Show2()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (var connection = factory.CreateConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //声明队列
                        //channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);

                        //声明路由
                        //channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                        //绑定队列和路由
                        //channel.QueueBind(queue: "OnlyProducerMessage", exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, arguments: null);


                        //基于当前信道创建事件
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"消费者02 接收消息:{message}");
                        };

                        channel.BasicConsume(queue: "MultiProducerMessage", autoAck: true, consumer: consumer);
                        Console.WriteLine("按下回车退出啊");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }



        public static void Show3()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (var connection = factory.CreateConnection())
            {
                //创建信道
                using (var channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //声明队列
                        //channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);

                        //声明路由
                        //channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                        //绑定队列和路由
                        //channel.QueueBind(queue: "OnlyProducerMessage", exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, arguments: null);


                        //基于当前信道创建事件
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"消费者03 接收消息:{message}");
                        };

                        channel.BasicConsume(queue: "MultiProducerMessage", autoAck: true, consumer: consumer);
                        Console.WriteLine("按下回车退出啊");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }
    }
}

?Main程序中调用下Show方法?

 class Program
    {
        static void Main(string[] args)
        {
            //ProductionConsumer.Show();

            //多消费者
            {
                Task.Run(() => { MultiProductionConsumer.Show1(); });
                Task.Run(() => { MultiProductionConsumer.Show2(); });
                Task.Run(() => { MultiProductionConsumer.Show3(); });
            }
            Console.ReadLine();
        }
    }

AspNetCore.RabbitMQ.MessageProducer.MessageProducer(生产者)

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// 生产者
    /// </summary>
    public class MultiProductionConsumer
    {
        public static void Show(string No)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {
                    //声明队列
                    channel.QueueDeclare(queue: "MultiProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明路由
                    channel.ExchangeDeclare(exchange: "MultiProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //绑定队列和路由
                    channel.QueueBind(queue: "MultiProducerMessage", exchange: "MultiProducerMessageExChange", routingKey: string.Empty, arguments: null);


                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine($"生产者{No}已准备就绪......");

                    int i = 1;
                    {
                        while (true)
                        {
                            string message = $"生产者{No},消息{i}";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "MultiProducerMessageExChange", routingKey: string.Empty, basicProperties: null, body: body);
                            Console.WriteLine($"消息{message}已发送");
                            i++;
                            Thread.Sleep(200);
                        }
                    }
                }
            }

        }
    }
}

??Main程序中调用下Show方法?

 class Program
    {
        static void Main(string[] args)
        {
            //ProductionConsumer.Show();

            //多生产者
            {
                //这是添加命令行功能
                IConfigurationRoot config = new ConfigurationBuilder()
                    .SetBasePath(Directory.GetCurrentDirectory())
                    .AddCommandLine(args)
                    .Build();

                string strMinute = config["minute"];//什么时间开始执行
                string No = config["no"];//生产者编号
                int minute = Convert.ToInt32(strMinute);

                bool flag = true;
                while (flag)
                {
                    if (DateTime.Now.Minute == minute)
                    {
                        Console.WriteLine($"到{strMinute}分钟,开始写入消息...");
                        flag = false;
                        MultiProductionConsumer.Show(No);
                    }
                }

            }
        }
    }

同时启动三个生产者服务,命令中的minute参数代表程序执行的分钟数字,从几分钟开始执行(表示当前时间的分钟数,13代表21:13开始执行)。no参数表示服务的编号,用于方便观察打印信息。

接着启动消费者服务

可以看到三个生产者同时向消息队列中发送消息。消费者服务开启三个线程,模拟三个消费者同时接收不同生产者发送的消息。

应用案例:商品秒杀实现

一共有10个商品,实现10个商品卖出后不再调用数据库进行操作(过滤无效请求,减少数据库压力)。

下面程序中写的生成者和消费者是从消息队列的角度出发的,发送数据的为生产者,接收数据的为

消费者。

AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer(消费者)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
    public class SeckillConsumer
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //声明队列
                        channel.QueueDeclare(queue: "SeckillProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);

                        //声明路由
                        channel.ExchangeDeclare(exchange: "SeckillProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                        //绑定队列和路由
                        channel.QueueBind(queue: "SeckillProducerMessage", exchange: "SeckillProducerMessageExChange", routingKey: string.Empty, arguments: null);


                        //基于当前信道创建事件
                        var consumer = new EventingBasicConsumer(channel);
                        int count = 0;
                        consumer.Received += (model, ea) =>
                        {
                            if (count >= 10)
                            {
                                Console.WriteLine("商品秒杀结束,已经不再调用数据库操作");
                            }
                            else
                            {
                                var body = ea.Body;
                                var message = Encoding.UTF8.GetString(body.ToArray());
                                Console.WriteLine($"{message} 秒杀成功");
                                count++;
                            }

                        };

                        channel.BasicConsume(queue: "SeckillProducerMessage", autoAck: true, consumer: consumer);
                        Console.WriteLine("按下回车退出啊");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }
    }
}

main方法:

  class Program
    {
        static void Main(string[] args)
        {
            SeckillConsumer.Show();
            Console.ReadLine();
        }
    }

AspNetCore.RabbitMQ.MessageProducer.MessageProducer(生产者)

using RabbitMQ.Client;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// 单生产者
    /// </summary>
    public class ProductionConsumer
    {
        /// <summary>
        /// 秒杀商品
        /// </summary>
        public static void SkillShow()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("秒杀已准备就绪......");

                    int i = 1;
                    {
                        while (true)
                        {
                            IBasicProperties basicProperties = channel.CreateBasicProperties();
                            basicProperties.Persistent = true;

                            string message = $"粉丝{i}";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "SeckillProducerMessageExChange", routingKey: string.Empty, basicProperties: basicProperties, body: body);
                            Console.WriteLine($"{message}开始抢购");
                            i++;
                        }
                    }
                }
            }

        }
    }
}

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ProductionConsumer.SkillShow();
        }
    }

启动服务:

?

从打印信息可以看出,前10个用户请求处理完成后,后面的请求就不再调用数据库进行处理。达到了数据库限流的目的。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-29 09:09:56  更:2021-08-29 09:10:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:58:05-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码