生产者向RabbitMQ服务发送消息,消费者从RabbitMQ服务中获取消息。
单生产者单消费者模式
使用程序演示单生产者和单消费者模型:
项目结构为两个.net Core 控制台程序应用程序
程序需要添加引用 RabbitMQ.Client
AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer(消费者)
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
public class ProductionConsumer
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//声明队列
channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明路由
channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//绑定队列和路由
channel.QueueBind(queue: "OnlyProducerMessage", exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, arguments: null);
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"消费者01 接收消息:{message}");
};
channel.BasicConsume(queue: "OnlyProducerMessage", autoAck: true, consumer: consumer);
Console.WriteLine("按下回车退出啊");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
?Main程序中调用下Show方法?
class Program
{
static void Main(string[] args)
{
ProductionConsumer.Show();
}
}
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer(生产者)
using RabbitMQ.Client;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// 生产者
/// </summary>
public class ProductionConsumer
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("生产者已准备就绪......");
int i = 1;
{
while (true)
{
IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Persistent = true;
string message = $"消息{i}";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, basicProperties: basicProperties, body: body);
Console.WriteLine($"消息{message}已发送");
i++;
}
}
}
}
}
}
}
Main程序中调用下Show方法?
class Program
{
static void Main(string[] args)
{
ProductionConsumer.Show();
}
}
启动两个程序:
启动消费者,在项目目录中进入命令窗口执行dotnet 启动命令
以同样的方式,启动生产者
消费者和生产者都启动以后,可以看到两个命令窗口打印如下信息:
多生产者多消费者模式
模拟多个生产者和多个消费者的情况
AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer(消费者)
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
public class MultiProductionConsumer
{
public static void Show1()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (var connection = factory.CreateConnection())
{
//创建信道
using (var channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//声明队列
//channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明路由
//channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//绑定队列和路由
//channel.QueueBind(queue: "OnlyProducerMessage", exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, arguments: null);
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"消费者01 接收消息:{message}");
};
channel.BasicConsume(queue: "MultiProducerMessage", autoAck: true, consumer: consumer);
Console.WriteLine("按下回车退出啊");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
public static void Show2()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (var connection = factory.CreateConnection())
{
//创建信道
using (var channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//声明队列
//channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明路由
//channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//绑定队列和路由
//channel.QueueBind(queue: "OnlyProducerMessage", exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, arguments: null);
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"消费者02 接收消息:{message}");
};
channel.BasicConsume(queue: "MultiProducerMessage", autoAck: true, consumer: consumer);
Console.WriteLine("按下回车退出啊");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
public static void Show3()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (var connection = factory.CreateConnection())
{
//创建信道
using (var channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//声明队列
//channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明路由
//channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//绑定队列和路由
//channel.QueueBind(queue: "OnlyProducerMessage", exchange: "OnlyProducerMessageExChange", routingKey: string.Empty, arguments: null);
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"消费者03 接收消息:{message}");
};
channel.BasicConsume(queue: "MultiProducerMessage", autoAck: true, consumer: consumer);
Console.WriteLine("按下回车退出啊");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
?Main程序中调用下Show方法?
class Program
{
static void Main(string[] args)
{
//ProductionConsumer.Show();
//多消费者
{
Task.Run(() => { MultiProductionConsumer.Show1(); });
Task.Run(() => { MultiProductionConsumer.Show2(); });
Task.Run(() => { MultiProductionConsumer.Show3(); });
}
Console.ReadLine();
}
}
AspNetCore.RabbitMQ.MessageProducer.MessageProducer(生产者)
using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// 生产者
/// </summary>
public class MultiProductionConsumer
{
public static void Show(string No)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
//声明队列
channel.QueueDeclare(queue: "MultiProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明路由
channel.ExchangeDeclare(exchange: "MultiProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//绑定队列和路由
channel.QueueBind(queue: "MultiProducerMessage", exchange: "MultiProducerMessageExChange", routingKey: string.Empty, arguments: null);
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"生产者{No}已准备就绪......");
int i = 1;
{
while (true)
{
string message = $"生产者{No},消息{i}";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "MultiProducerMessageExChange", routingKey: string.Empty, basicProperties: null, body: body);
Console.WriteLine($"消息{message}已发送");
i++;
Thread.Sleep(200);
}
}
}
}
}
}
}
??Main程序中调用下Show方法?
class Program
{
static void Main(string[] args)
{
//ProductionConsumer.Show();
//多生产者
{
//这是添加命令行功能
IConfigurationRoot config = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddCommandLine(args)
.Build();
string strMinute = config["minute"];//什么时间开始执行
string No = config["no"];//生产者编号
int minute = Convert.ToInt32(strMinute);
bool flag = true;
while (flag)
{
if (DateTime.Now.Minute == minute)
{
Console.WriteLine($"到{strMinute}分钟,开始写入消息...");
flag = false;
MultiProductionConsumer.Show(No);
}
}
}
}
}
同时启动三个生产者服务,命令中的minute参数代表程序执行的分钟数字,从几分钟开始执行(表示当前时间的分钟数,13代表21:13开始执行)。no参数表示服务的编号,用于方便观察打印信息。
接着启动消费者服务
可以看到三个生产者同时向消息队列中发送消息。消费者服务开启三个线程,模拟三个消费者同时接收不同生产者发送的消息。
应用案例:商品秒杀实现
一共有10个商品,实现10个商品卖出后不再调用数据库进行操作(过滤无效请求,减少数据库压力)。
下面程序中写的生成者和消费者是从消息队列的角度出发的,发送数据的为生产者,接收数据的为
消费者。
AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer(消费者)
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
public class SeckillConsumer
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//声明队列
channel.QueueDeclare(queue: "SeckillProducerMessage", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明路由
channel.ExchangeDeclare(exchange: "SeckillProducerMessageExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//绑定队列和路由
channel.QueueBind(queue: "SeckillProducerMessage", exchange: "SeckillProducerMessageExChange", routingKey: string.Empty, arguments: null);
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
int count = 0;
consumer.Received += (model, ea) =>
{
if (count >= 10)
{
Console.WriteLine("商品秒杀结束,已经不再调用数据库操作");
}
else
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"{message} 秒杀成功");
count++;
}
};
channel.BasicConsume(queue: "SeckillProducerMessage", autoAck: true, consumer: consumer);
Console.WriteLine("按下回车退出啊");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
SeckillConsumer.Show();
Console.ReadLine();
}
}
AspNetCore.RabbitMQ.MessageProducer.MessageProducer(生产者)
using RabbitMQ.Client;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// 单生产者
/// </summary>
public class ProductionConsumer
{
/// <summary>
/// 秒杀商品
/// </summary>
public static void SkillShow()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("秒杀已准备就绪......");
int i = 1;
{
while (true)
{
IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Persistent = true;
string message = $"粉丝{i}";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "SeckillProducerMessageExChange", routingKey: string.Empty, basicProperties: basicProperties, body: body);
Console.WriteLine($"{message}开始抢购");
i++;
}
}
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
ProductionConsumer.SkillShow();
}
}
启动服务:
?
从打印信息可以看出,前10个用户请求处理完成后,后面的请求就不再调用数据库进行处理。达到了数据库限流的目的。
|