IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> ZeroMQ:事件监控 -> 正文阅读

[网络协议]ZeroMQ:事件监控

#include <stdio.h>
#include <zmq.h>
#include <string.h>
#include <assert.h>
#include <malloc.h>

//http://api.zeromq.org/4-2:zmq-socket-monitor

static int get_monitor_event(void* monitor, int* value, char* address)
{
	// First frame in message contains event number and value
	zmq_msg_t msg;
	zmq_msg_init(&msg);
	if (zmq_msg_recv(&msg, monitor, 0) == -1)
		return -1; // Interrupted, presumably
	assert(zmq_msg_more(&msg));

	uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
	uint16_t event = *(uint16_t*)(data);
	if (value)
		*value = *(uint32_t*)(data + 2);

	zmq_msg_close(&msg);
	// Second frame in message contains event address
	zmq_msg_init(&msg);
	if (zmq_msg_recv(&msg, monitor, 0) == -1)
		return -1; // Interrupted, presumably
	assert(!zmq_msg_more(&msg));

	if (address) {
		uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
		size_t size = zmq_msg_size(&msg);

		memcpy(address, data, size);
		address[size] = 0;
	}

	zmq_msg_close(&msg);
	return event;
}

typedef struct
{
	int event;
	char* desc;
}ZMQ_EVENT_TABLE;

#define ZMQ_EVENT_DEF(event)\
	{event, #event}

/*
static ZMQ_EVENT_TABLE zmq_event_table[] = {
	//{ZMQ_EVENT_CONNECTED,		"ZMQ_EVENT_CONNECTED"},
	ZMQ_EVENT_DEF(ZMQ_EVENT_CONNECTED),
	{ZMQ_EVENT_CONNECT_DELAYED, "ZMQ_EVENT_CONNECT_DELAYED"},
	{ZMQ_EVENT_CONNECT_RETRIED, "ZMQ_EVENT_CONNECT_RETRIED"},
	{ZMQ_EVENT_LISTENING,		"ZMQ_EVENT_LISTENING"},
	{ZMQ_EVENT_BIND_FAILED,		"ZMQ_EVENT_BIND_FAILED"},
	{ZMQ_EVENT_ACCEPTED,		"ZMQ_EVENT_ACCEPTED"},
	{ZMQ_EVENT_ACCEPT_FAILED,	"ZMQ_EVENT_ACCEPT_FAILED"},
	{ZMQ_EVENT_CLOSED,			"ZMQ_EVENT_CLOSED"},
	{ZMQ_EVENT_CLOSE_FAILED,	"ZMQ_EVENT_CLOSE_FAILED"},
	{ZMQ_EVENT_DISCONNECTED,	"ZMQ_EVENT_DISCONNECTED"},
	{ZMQ_EVENT_MONITOR_STOPPED, "ZMQ_EVENT_MONITOR_STOPPED"},
	{ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL, "ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL"},
	{ZMQ_EVENT_HANDSHAKE_SUCCEEDED, "ZMQ_EVENT_HANDSHAKE_SUCCEEDED"},
	{ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL, "ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL"},
	{ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, "ZMQ_EVENT_HANDSHAKE_FAILED_AUTH"}
};
*/
static ZMQ_EVENT_TABLE zmq_event_table[] = {
	ZMQ_EVENT_DEF(ZMQ_EVENT_CONNECTED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_CONNECT_DELAYED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_CONNECT_RETRIED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_LISTENING),
	ZMQ_EVENT_DEF(ZMQ_EVENT_BIND_FAILED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_ACCEPTED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_ACCEPT_FAILED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_CLOSED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_CLOSE_FAILED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_DISCONNECTED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_MONITOR_STOPPED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL),
	ZMQ_EVENT_DEF(ZMQ_EVENT_HANDSHAKE_SUCCEEDED),
	ZMQ_EVENT_DEF(ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL),
	ZMQ_EVENT_DEF(ZMQ_EVENT_HANDSHAKE_FAILED_AUTH)
};


char* zmq_strevent(int event)
{
	int idx;

	for (idx = 0; idx < sizeof(zmq_event_table) / sizeof(ZMQ_EVENT_TABLE); idx++) {
		if (event & zmq_event_table[idx].event) {
			return zmq_event_table[idx].desc;
		}
	}

	return "unknown event";
}

static void rep_socket_monitor(void* ctx)
{
	int value;
	char addr[2048];
	int rc;

	printf("starting monitor...\n");
	void* s = zmq_socket(ctx, ZMQ_PAIR);
	assert(s);
	rc = zmq_connect(s, "inproc://monitor.rep");
	assert(rc == 0);
	rc = 1;
	while (rc != -1) {
		rc = get_monitor_event(s, &value, addr);
		if (rc != -1) {
			printf("event : %d,%s, value %d @ addr %s\r\n", rc, zmq_strevent(rc), value, addr);
			continue;
		}
		printf("errno : %s\r\n", zmq_strerror(errno));
	}
	zmq_close(s);
}

void dump_msg(const void* data, int size)
{
	unsigned char* ptr = (unsigned char*)data;
	printf("[%03d] ", size);
	int i = 0;
	for (i = 0; i < size; i++)
		printf("%02X", ptr[i]);
	printf("\n");
}

static void wait_thread(void* socket)
{
	int more;

	while (1) {
		zmq_msg_t msg;
		zmq_msg_init(&msg);
		
		if (zmq_msg_recv(&msg, socket, 0) == -1)
			return;
		more = zmq_msg_more(&msg);

		uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
		size_t size = zmq_msg_size(&msg);

		dump_msg(data, size);

		zmq_msg_send(&msg, socket, 0);
		zmq_sleep(1);
	}
}

int main()
{
	const char* addr = "tcp://127.0.0.1:6666";

	void* ctx = zmq_init(1);
	assert(ctx);

	void* rep = zmq_socket(ctx, ZMQ_REP);
	assert(rep);

	int rc = zmq_socket_monitor(rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
	assert(rc == 0);

	zmq_threadstart(rep_socket_monitor, ctx);
	assert(rc == 0);
	rc = zmq_bind(rep, addr);
	assert(rc == 0);

	// Allow some time for event detection
	//zmq_sleep(1);
	wait_thread(rep);

	rc = zmq_close(rep);
	assert(rc == 0);
	zmq_term(ctx);
	return 0;
}
starting monitor...
event : 8,ZMQ_EVENT_LISTENING, value 420 @ addr tcp://127.0.0.1:6666
event : 32,ZMQ_EVENT_ACCEPTED, value 424 @ addr tcp://127.0.0.1:6666
event : 4096,ZMQ_EVENT_HANDSHAKE_SUCCEEDED, value 0 @ addr tcp://127.0.0.1:6666
event : 512,ZMQ_EVENT_DISCONNECTED, value 424 @ addr tcp://127.0.0.1:6666
event : 32,ZMQ_EVENT_ACCEPTED, value 436 @ addr tcp://127.0.0.1:6666
event : 4096,ZMQ_EVENT_HANDSHAKE_SUCCEEDED, value 0 @ addr tcp://127.0.0.1:6666
[001] 61
[005] 6262626262
[007] 73616466617366
[031] 61736661736666666666666666666666666666666666666666666666666666
[026] 6161616161616161616161616161616161616161616161616161
[004] 6D6D6D6D
[010] 61736466617366617366
[005] 6166617364
[010] 69206C6F766520796F75
event : 512,ZMQ_EVENT_DISCONNECTED, value 436 @ addr tcp://127.0.0.1:6666
event : 32,ZMQ_EVENT_ACCEPTED, value 436 @ addr tcp://127.0.0.1:6666
event : 4096,ZMQ_EVENT_HANDSHAKE_SUCCEEDED, value 0 @ addr tcp://127.0.0.1:6666
[006] 313233343536
[008] 6661736466617366
[011] 6173666173646661736664
[010] 67617366617366646173
[008] 6173646661736661
[009] 617364666173666173
[009] 616661647364666173
import zmq, sys

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:6666")
while True:
    data = input("input your data:")
    if data == 'q':
        sys.exit()
    socket.send_string(data)
    response = socket.recv_string()
    print("recv : ", response)
D:\source\python\pikatest\Scripts\python.exe C:/Users/Administrator/PycharmProjects/pythonProject/zmq-1/zmq_client1.py
input your data:123456
recv :  123456
input your data:fasdfasf
recv :  fasdfasf
input your data:asfasdfasfd
recv :  asfasdfasfd
input your data:gasfasfdas
recv :  gasfasfdas
input your data:asdfasfa
recv :  asdfasfa
input your data:asdfasfas
recv :  asdfasfas
input your data:afadsdfas
recv :  afadsdfas

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-22 19:15:05  更:2022-04-22 19:15:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 3:22:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码