IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> NetMQ使用说明 -> 正文阅读

[系统运维]NetMQ使用说明

官方文档

https://netmq.readthedocs.io/en/latest/introduction/

github

https://github.com/zeromq/netmq

官网介绍的几种使用模式和一些组件,注意版本有3和4不是很兼容,示例大部分是使用的3版本,4不支持。

如果在版本4下报错,先将版本改为3,之后查看过时语法里的提示,修改成最新的语法。

主要介绍几种使用模式:

1.请求/回应

https://netmq.readthedocs.io/en/latest/request-response/

  using (var responseSocket = new ResponseSocket("@tcp://*:5555"))
            using (var requestSocket = new RequestSocket(">tcp://localhost:5555"))
            {
                Console.WriteLine("requestSocket : Sending 'Hello'");
                requestSocket.SendFrame("Hello");
                var message = responseSocket.ReceiveFrameString();
                Console.WriteLine("responseSocket : Server Received '{0}'", message);
                Console.WriteLine("responseSocket Sending 'World'");
                responseSocket.SendFrame("World");
                message = requestSocket.ReceiveFrameString();
                Console.WriteLine("requestSocket : Received '{0}'", message);
            }

注意:服务端和客户端初始的类对象是不同的,需要注意。还有就是发送之后需要返回信息之后才能进行下次的发送.

2.发布 - 订阅

 // 发布 - 订阅是一种消息传递模式,在这种模式下,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者)。
 // 相反,发布的消息被分成几类,而不知道可能有哪些订阅者。
 // 类似地,订户表达对一个或多个类别的兴趣,并且仅接收感兴趣的消息,而不知道有什么发布者(如果有的话)。

 Task.Run(() =>
 {
     Random rand = new Random(50);
     using (var pubSocket = new PublisherSocket())
     {
         Console.WriteLine("Publisher socket binding...");
         pubSocket.Options.SendHighWatermark = 1000;
         pubSocket.ReceiveReady += PubSocket_ReceiveReady;

         pubSocket.Bind("tcp://*:12345");
         for (var i = 0; i < 100; i++)
         {
             var randomizedTopic = rand.NextDouble();


             // 
             if (randomizedTopic > 0.5)
             {
                 var msg = "TopicA msg-" + i;
                 Console.WriteLine("Sending message : {0}", msg);
                 pubSocket.SendMoreFrame("TopicA").SendFrame(msg); // 发送 TopicA 主题消息  将主题包含在消息的第一帧中
             }
             else
             {
                 var msg = "TopicB msg-" + i;
                 Console.WriteLine("Sending message : {0}", msg);
                 pubSocket.SendMoreFrame("TopicB").SendFrame(msg); // 发送 TopicB 主题消息 
             }
             Thread.Sleep(500);
         }
     }
 });

 Task.Run(() =>
 {
     string topic = "";
     Console.WriteLine("Subscriber started for Topic : {0}", topic);
     using (var subSocket = new SubscriberSocket())
     {
         subSocket.Options.ReceiveHighWatermark = 1000;
         subSocket.Connect("tcp://localhost:20011");
         subSocket.Subscribe(topic); // 订阅所有主题消息 
         Console.WriteLine("Subscriber socket connecting...");
         while (true)
         {
             string messageTopicReceived = subSocket.ReceiveFrameString();
             string messageReceived = subSocket.ReceiveFrameString();
             Console.WriteLine(messageReceived);
         }
     }

 });

 Task.Run(() =>
 {
     string topic = "TopicA";
     Console.WriteLine("Subscriber started for Topic : {0}", topic);
     using (var subSocket = new SubscriberSocket())
     {
         subSocket.Options.ReceiveHighWatermark = 1000;
         subSocket.Connect("tcp://localhost:12345");
         subSocket.Subscribe(topic); // 订阅主题消息  TopicA
         Console.WriteLine("Subscriber socket connecting...");
         while (true)
         {
             string messageTopicReceived = subSocket.ReceiveFrameString();
             string messageReceived = subSocket.ReceiveFrameString();
             Console.WriteLine(messageReceived);
         }
     }

 }); Task.Run(() =>
 {
     string topic = "TopicB";
     Console.WriteLine("Subscriber started for Topic : {0}", topic);
     using (var subSocket = new SubscriberSocket())
     {
         subSocket.Options.ReceiveHighWatermark = 1000;
         subSocket.Connect("tcp://localhost:12345");
         subSocket.Subscribe(topic); // 订阅主题消息 TopicB
         Console.WriteLine("Subscriber socket connecting...");
         while (true)
         {
             string messageTopicReceived = subSocket.ReceiveFrameString();
             string messageReceived = subSocket.ReceiveFrameString();
             Console.WriteLine(messageReceived);
         }
     }

 });

注意:订阅者需要调用2次ReceiveFrameString(),第一次得到主题,第二次得到发送的消息。

主题层次结构
使用前缀检查将消息的主题与订阅者的订阅主题进行比较。

也就是订阅了topic会收到带有以下主题的邮件:

topic
topic/subtopic
topical

但是,它不会接收包含以下主题的消息:

topi
TOPIC(记住,这是一个字节级的比较)

这种前缀匹配行为的结果是,您可以通过订阅一个空主题字符串来接收所有发布的消息:

sub.Subscribe(""); // subscribe to all topics

3.推/拉

 // https://netmq.readthedocs.io/en/latest/push-pull/

 // A是一个整体
 PushSocket PushA = new PushSocket("@tcp://*:5557");
 PullSocket PullA = new PullSocket(">tcp://localhost:5558");


 // B是一个整体
 PushSocket PushB = new PushSocket("@tcp://*:5558");
 PullSocket PullB = new PullSocket(">tcp://localhost:5557");

 // 流程: A 推送 B ,B将拉取的信息再推送到A

 string info = "测试信息";
 Console.WriteLine("A发送:" + info);
 PushA.SendFrame(info);
 string str1 = PullB.ReceiveFrameString();
 PushB.SendFrame(str1);
 string str = PullA.ReceiveFrameString();
 Console.WriteLine("A拉取:" + str);

4.NetMQPoller的使用

NetMQTimer netMQTimer = new NetMQTimer(TimeSpan.FromMilliseconds(1000));
netMQTimer.Elapsed += NetMQTimer_Elapsed;

// https://netmq.readthedocs.io/en/latest/poller/#pollers
using (var pubSocket = new PublisherSocket())
using (var subSocket = new SubscriberSocket())
{
    pubSocket.Options.SendHighWatermark = 1000;
    pubSocket.SendReady += PubSocket_SendReady;
    pubSocket.ReceiveReady += PubSocket_ReceiveReady;
    pubSocket.Bind("tcp://*:12345");


    subSocket.Options.ReceiveHighWatermark = 1000;
    subSocket.Connect("tcp://127.0.0.1:12345");
    subSocket.Subscribe("TopicA");
    subSocket.SendReady += SubSocket_SendReady;
    subSocket.ReceiveReady += SubSocket_ReceiveReady;
    var netmqpoller = new NetMQPoller { pubSocket, subSocket, netMQTimer };
    netmqpoller.Run();
}


        private void SubSocket_SendReady(object sender, NetMQSocketEventArgs e)
        {
            throw new NotImplementedException();
        }

        private void SubSocket_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            Console.WriteLine("主题:" + e.Socket.ReceiveFrameString());
            Console.WriteLine("收到:" + e.Socket.ReceiveFrameString());
        }

        private void NetMQTimer_Elapsed(object sender, NetMQTimerEventArgs e)
        {
            Console.WriteLine("---------------------------------- ---------------" + DateTime.Now);
        }

        private void PubSocket_SendReady(object sender, NetMQSocketEventArgs e)
        {

            e.Socket.SendMoreFrame("TopicA").SendFrame("haha");
            // Console.WriteLine(e.ToString() + DateTime.Now);
        }

使用NetMQPoller会自动触发ReceiveReady事件

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章           查看所有文章
加:2022-05-13 12:00:08  更:2022-05-13 12:03:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/2 1:24:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码