简介: 消息队列 RocketMQ 版的标准版实例提供了通过 HTTP 协议的多语言 SDK 接入的能力,并支持公网访问。很多用户在使用.NET SDK的时候,因为本身官方对这部分的说明不够清晰,给很多用户的使用带来了困惑,这里分别介绍NET Framework SDK和NET Core SDK的集成使用。
连接地址:阿里云Rocket MQ Http .NET SDK使用Demo-阿里云开发者社区 (aliyun.com)
NET Framework SDK使用
1、SDK 下载
下载地址
2、Zip包解压获取SDK
3、新建项目并复制SDK到项目下面
4、加入项目
5、Code Sample
using System;
using System.Collections.Generic;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
using System.Threading;
namespace RocketMQHttpDemoTest123
{
class Program
{
// 设置HTTP接入域名(此处以公共云生产环境为例)
private const string _endpoint = "http://184821781*********.mqrest.cn-qingdao-public.aliyuncs.com";
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
private const string _accessKeyId = "LTAIOZZg*********";
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
private const string _secretAccessKey = "v7CjUJCMk7j9aK****************";
// 所属的 Topic
private const string _topicName = "***********";
// Topic所属实例ID,默认实例为空
private const string _instanceId = "MQ_INST_184821**********_BcLPQ2p0";
// 您在控制台创建的 Consumer ID(Group ID)
private const string _groupId = "GID_Http*********";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// 生产者程序
try
{
// 循环发送100条消息
for (int i = 0; i < 50; i++)
{
TopicMessage result = producer.PublishMessage(new TopicMessage("测试http类型的消息1"));
Console.WriteLine("publis message success: MessageId:" + result.Id + ", BodyMD5:" + result.BodyMD5);
result = producer.PublishMessage(new TopicMessage("测试http类型的消息2", "tag"));
Console.WriteLine("publis message success: MessageId:" + result.Id + ", BodyMD5:" + result.BodyMD5);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
// 消费者程序
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
while (true)
{
try
{
// 长轮询消费消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
List<Message> messages = null;
try
{
messages = consumer.ConsumeMessage(
3, // 一次最多消费3条(最多可设置为16条)
3 // 长轮询时间3秒(最多可设置为30秒)
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
// 处理业务逻辑
foreach (Message message in messages)
{
Console.WriteLine(message);
// Console.WriteLine("Property a is:" + message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack message success:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
// 某些消息的句柄可能超时了会导致确认不成功
if (exp2 is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exp2;
Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}
6、如果编译出现错误:无法从“String”转换为“char”
解决策略:调整源码
7、测试效果
NET Core SDK使用(本身项目是Core项目)
SDK 安装:Aliyun.MQ.Http
消息的发送和接收使用方式与Framework SDK一致。
更多收发消息示例:
更多参考
C# SDK 接入说明 mq-http-csharp-sdk
?
|