官方文档
https://netmq.readthedocs.io/en/latest/introduction/
github
https://github.com/zeromq/netmq
官网介绍的几种使用模式和一些组件,注意版本有3和4不是很兼容,示例大部分是使用的3版本,4不支持。
如果在版本4下报错,先将版本改为3,之后查看过时语法里的提示,修改成最新的语法。
主要介绍几种使用模式:
1.请求/回应
https://netmq.readthedocs.io/en/latest/request-response/
using (var responseSocket = new ResponseSocket("@tcp://*:5555"))
using (var requestSocket = new RequestSocket(">tcp://localhost:5555"))
{
Console.WriteLine("requestSocket : Sending 'Hello'");
requestSocket.SendFrame("Hello");
var message = responseSocket.ReceiveFrameString();
Console.WriteLine("responseSocket : Server Received '{0}'", message);
Console.WriteLine("responseSocket Sending 'World'");
responseSocket.SendFrame("World");
message = requestSocket.ReceiveFrameString();
Console.WriteLine("requestSocket : Received '{0}'", message);
}
注意:服务端和客户端初始的类对象是不同的,需要注意。还有就是发送之后需要返回信息之后才能进行下次的发送.
2.发布 - 订阅
Task.Run(() =>
{
Random rand = new Random(50);
using (var pubSocket = new PublisherSocket())
{
Console.WriteLine("Publisher socket binding...");
pubSocket.Options.SendHighWatermark = 1000;
pubSocket.ReceiveReady += PubSocket_ReceiveReady;
pubSocket.Bind("tcp://*:12345");
for (var i = 0; i < 100; i++)
{
var randomizedTopic = rand.NextDouble();
if (randomizedTopic > 0.5)
{
var msg = "TopicA msg-" + i;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMoreFrame("TopicA").SendFrame(msg);
}
else
{
var msg = "TopicB msg-" + i;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMoreFrame("TopicB").SendFrame(msg);
}
Thread.Sleep(500);
}
}
});
Task.Run(() =>
{
string topic = "";
Console.WriteLine("Subscriber started for Topic : {0}", topic);
using (var subSocket = new SubscriberSocket())
{
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect("tcp://localhost:20011");
subSocket.Subscribe(topic);
Console.WriteLine("Subscriber socket connecting...");
while (true)
{
string messageTopicReceived = subSocket.ReceiveFrameString();
string messageReceived = subSocket.ReceiveFrameString();
Console.WriteLine(messageReceived);
}
}
});
Task.Run(() =>
{
string topic = "TopicA";
Console.WriteLine("Subscriber started for Topic : {0}", topic);
using (var subSocket = new SubscriberSocket())
{
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect("tcp://localhost:12345");
subSocket.Subscribe(topic);
Console.WriteLine("Subscriber socket connecting...");
while (true)
{
string messageTopicReceived = subSocket.ReceiveFrameString();
string messageReceived = subSocket.ReceiveFrameString();
Console.WriteLine(messageReceived);
}
}
}); Task.Run(() =>
{
string topic = "TopicB";
Console.WriteLine("Subscriber started for Topic : {0}", topic);
using (var subSocket = new SubscriberSocket())
{
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect("tcp://localhost:12345");
subSocket.Subscribe(topic);
Console.WriteLine("Subscriber socket connecting...");
while (true)
{
string messageTopicReceived = subSocket.ReceiveFrameString();
string messageReceived = subSocket.ReceiveFrameString();
Console.WriteLine(messageReceived);
}
}
});
注意:订阅者需要调用2次ReceiveFrameString(),第一次得到主题,第二次得到发送的消息。
主题层次结构 使用前缀检查将消息的主题与订阅者的订阅主题进行比较。
也就是订阅了topic会收到带有以下主题的邮件:
topic
topic/subtopic
topical
但是,它不会接收包含以下主题的消息:
topi
TOPIC(记住,这是一个字节级的比较)
这种前缀匹配行为的结果是,您可以通过订阅一个空主题字符串来接收所有发布的消息:
sub.Subscribe("");
3.推/拉
PushSocket PushA = new PushSocket("@tcp://*:5557");
PullSocket PullA = new PullSocket(">tcp://localhost:5558");
PushSocket PushB = new PushSocket("@tcp://*:5558");
PullSocket PullB = new PullSocket(">tcp://localhost:5557");
string info = "测试信息";
Console.WriteLine("A发送:" + info);
PushA.SendFrame(info);
string str1 = PullB.ReceiveFrameString();
PushB.SendFrame(str1);
string str = PullA.ReceiveFrameString();
Console.WriteLine("A拉取:" + str);
4.NetMQPoller的使用
NetMQTimer netMQTimer = new NetMQTimer(TimeSpan.FromMilliseconds(1000));
netMQTimer.Elapsed += NetMQTimer_Elapsed;
using (var pubSocket = new PublisherSocket())
using (var subSocket = new SubscriberSocket())
{
pubSocket.Options.SendHighWatermark = 1000;
pubSocket.SendReady += PubSocket_SendReady;
pubSocket.ReceiveReady += PubSocket_ReceiveReady;
pubSocket.Bind("tcp://*:12345");
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect("tcp://127.0.0.1:12345");
subSocket.Subscribe("TopicA");
subSocket.SendReady += SubSocket_SendReady;
subSocket.ReceiveReady += SubSocket_ReceiveReady;
var netmqpoller = new NetMQPoller { pubSocket, subSocket, netMQTimer };
netmqpoller.Run();
}
private void SubSocket_SendReady(object sender, NetMQSocketEventArgs e)
{
throw new NotImplementedException();
}
private void SubSocket_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
Console.WriteLine("主题:" + e.Socket.ReceiveFrameString());
Console.WriteLine("收到:" + e.Socket.ReceiveFrameString());
}
private void NetMQTimer_Elapsed(object sender, NetMQTimerEventArgs e)
{
Console.WriteLine("---------------------------------- ---------------" + DateTime.Now);
}
private void PubSocket_SendReady(object sender, NetMQSocketEventArgs e)
{
e.Socket.SendMoreFrame("TopicA").SendFrame("haha");
}
使用NetMQPoller会自动触发ReceiveReady事件
|