一、添加Nuget引用
在Nuget中搜索DotNetCore.CAP.RabbitMQ并安装
二、在项目的StartUp中注入并配置服务
context.Services.AddCap(x =>
{
string connectionString = configuration["ConnectionStrings:Order"];
x.UseMySql(connectionString);
x.UseRabbitMQ(option =>
{
option.HostName = configuration["CAPRabbitMQ:HostName"];
string portStr = configuration["CAPRabbitMQ:Port"];
if (!string.IsNullOrEmpty(portStr))
{
option.Port = Convert.ToInt32(portStr);
}
option.VirtualHost = configuration["CAPRabbitMQ:VirtualHost"];
option.UserName = configuration["CAPRabbitMQ:UserName"];
option.Password = configuration["CAPRabbitMQ:Password"];
if (!string.IsNullOrEmpty(configuration["CAPRabbitMQ:ExchangeName"]))
{
option.ExchangeName = configuration["CAPRabbitMQ:ExchangeName"];
}
option.ConnectionFactoryOptions = factory => factory.AutomaticRecoveryEnabled = true;
});
x.SucceedMessageExpiredAfter = Convert.ToInt32(configuration["CAPRabbitMQ:SucceedMessageExpiredAfter"]);
x.FailedRetryCount = Convert.ToInt32(configuration["CAPRabbitMQ:FailedRetryCount"]);
x.FailedRetryInterval = Convert.ToInt32(configuration["CAPRabbitMQ:FailedRetryInterval"]);
x.FailedThresholdCallback = (failedInfo) =>
{
};
x.Version = configuration["CAPRabbitMQ:Version"];
}).AddSubscribeFilter<MyCapFilter>();
三、 配置AppSetting文件
可以看到在上面配置Rabbitmq服务的时候为了方便修改配置,读取的是Appsetting中的CAPRabbitMQ节点。所以在配置文件中添加配置节点Appsetting。如下、
"CAPRabbitMQ": {
"HostName": "172.16.7.39",
"Port": 5672,
"UserName": "dev",
"Password": "dev_DKYao2021",
"VirtualHost": "vH_bcs_dev",
"ExchangeName": "e.bcs.dev.exchange",
"FailedRetryCount": 5,
"FailedRetryInterval": 3,
"Version": "v1",
"SucceedMessageExpiredAfter": 1440
},
四、接口定义
像平时定义接口一样 定义在MQ中要响应的接口
public interface IMqAppService
{
void GetSubscribe(TodoDto<MqOrderInfoDto> orderObj);
}
五、接口实现
这里以消费为例
[CapSubscribe(EventConstants.EVENT_ORDER_WAYBILL, Group = "q.topic.order.reviewed")]
public void GetOrderSubscribe(TodoDto<MqOrderInfoDto> orderObj)
{
if (orderObj is {data: { }} && !string.IsNullOrEmpty(orderObj.data.OrderNo))
{
var stressful = _pendingOrderAppService.OrderTurnInvoiceNew(orderObj.data.OrderNo).Result; _logger.LogInformation(stressful.message);
}
else
{
_logger.LogError("订单推送数据为空");
}
}
这里需要注意 Group 的值即为MQ服务器中Exchange的队列名称, 而 EventConstants.EVENT_ORDER_WAYBILL 是已经定义好的消息名称,需要注意应该与推送的消息名称一致
|