#include <stdio.h>
#include <zmq.h>
#include <string.h>
#include <assert.h>
#include <malloc.h>
//http://api.zeromq.org/4-2:zmq-socket-monitor
static int get_monitor_event(void* monitor, int* value, char* address)
{
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert(zmq_msg_more(&msg));
uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
uint16_t event = *(uint16_t*)(data);
if (value)
*value = *(uint32_t*)(data + 2);
zmq_msg_close(&msg);
// Second frame in message contains event address
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert(!zmq_msg_more(&msg));
if (address) {
uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
size_t size = zmq_msg_size(&msg);
memcpy(address, data, size);
address[size] = 0;
}
zmq_msg_close(&msg);
return event;
}
typedef struct
{
int event;
char* desc;
}ZMQ_EVENT_TABLE;
#define ZMQ_EVENT_DEF(event)\
{event, #event}
/*
static ZMQ_EVENT_TABLE zmq_event_table[] = {
//{ZMQ_EVENT_CONNECTED, "ZMQ_EVENT_CONNECTED"},
ZMQ_EVENT_DEF(ZMQ_EVENT_CONNECTED),
{ZMQ_EVENT_CONNECT_DELAYED, "ZMQ_EVENT_CONNECT_DELAYED"},
{ZMQ_EVENT_CONNECT_RETRIED, "ZMQ_EVENT_CONNECT_RETRIED"},
{ZMQ_EVENT_LISTENING, "ZMQ_EVENT_LISTENING"},
{ZMQ_EVENT_BIND_FAILED, "ZMQ_EVENT_BIND_FAILED"},
{ZMQ_EVENT_ACCEPTED, "ZMQ_EVENT_ACCEPTED"},
{ZMQ_EVENT_ACCEPT_FAILED, "ZMQ_EVENT_ACCEPT_FAILED"},
{ZMQ_EVENT_CLOSED, "ZMQ_EVENT_CLOSED"},
{ZMQ_EVENT_CLOSE_FAILED, "ZMQ_EVENT_CLOSE_FAILED"},
{ZMQ_EVENT_DISCONNECTED, "ZMQ_EVENT_DISCONNECTED"},
{ZMQ_EVENT_MONITOR_STOPPED, "ZMQ_EVENT_MONITOR_STOPPED"},
{ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL, "ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL"},
{ZMQ_EVENT_HANDSHAKE_SUCCEEDED, "ZMQ_EVENT_HANDSHAKE_SUCCEEDED"},
{ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL, "ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL"},
{ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, "ZMQ_EVENT_HANDSHAKE_FAILED_AUTH"}
};
*/
static ZMQ_EVENT_TABLE zmq_event_table[] = {
ZMQ_EVENT_DEF(ZMQ_EVENT_CONNECTED),
ZMQ_EVENT_DEF(ZMQ_EVENT_CONNECT_DELAYED),
ZMQ_EVENT_DEF(ZMQ_EVENT_CONNECT_RETRIED),
ZMQ_EVENT_DEF(ZMQ_EVENT_LISTENING),
ZMQ_EVENT_DEF(ZMQ_EVENT_BIND_FAILED),
ZMQ_EVENT_DEF(ZMQ_EVENT_ACCEPTED),
ZMQ_EVENT_DEF(ZMQ_EVENT_ACCEPT_FAILED),
ZMQ_EVENT_DEF(ZMQ_EVENT_CLOSED),
ZMQ_EVENT_DEF(ZMQ_EVENT_CLOSE_FAILED),
ZMQ_EVENT_DEF(ZMQ_EVENT_DISCONNECTED),
ZMQ_EVENT_DEF(ZMQ_EVENT_MONITOR_STOPPED),
ZMQ_EVENT_DEF(ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL),
ZMQ_EVENT_DEF(ZMQ_EVENT_HANDSHAKE_SUCCEEDED),
ZMQ_EVENT_DEF(ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL),
ZMQ_EVENT_DEF(ZMQ_EVENT_HANDSHAKE_FAILED_AUTH)
};
char* zmq_strevent(int event)
{
int idx;
for (idx = 0; idx < sizeof(zmq_event_table) / sizeof(ZMQ_EVENT_TABLE); idx++) {
if (event & zmq_event_table[idx].event) {
return zmq_event_table[idx].desc;
}
}
return "unknown event";
}
static void rep_socket_monitor(void* ctx)
{
int value;
char addr[2048];
int rc;
printf("starting monitor...\n");
void* s = zmq_socket(ctx, ZMQ_PAIR);
assert(s);
rc = zmq_connect(s, "inproc://monitor.rep");
assert(rc == 0);
rc = 1;
while (rc != -1) {
rc = get_monitor_event(s, &value, addr);
if (rc != -1) {
printf("event : %d,%s, value %d @ addr %s\r\n", rc, zmq_strevent(rc), value, addr);
continue;
}
printf("errno : %s\r\n", zmq_strerror(errno));
}
zmq_close(s);
}
void dump_msg(const void* data, int size)
{
unsigned char* ptr = (unsigned char*)data;
printf("[%03d] ", size);
int i = 0;
for (i = 0; i < size; i++)
printf("%02X", ptr[i]);
printf("\n");
}
static void wait_thread(void* socket)
{
int more;
while (1) {
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, socket, 0) == -1)
return;
more = zmq_msg_more(&msg);
uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
size_t size = zmq_msg_size(&msg);
dump_msg(data, size);
zmq_msg_send(&msg, socket, 0);
zmq_sleep(1);
}
}
int main()
{
const char* addr = "tcp://127.0.0.1:6666";
void* ctx = zmq_init(1);
assert(ctx);
void* rep = zmq_socket(ctx, ZMQ_REP);
assert(rep);
int rc = zmq_socket_monitor(rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
assert(rc == 0);
zmq_threadstart(rep_socket_monitor, ctx);
assert(rc == 0);
rc = zmq_bind(rep, addr);
assert(rc == 0);
// Allow some time for event detection
//zmq_sleep(1);
wait_thread(rep);
rc = zmq_close(rep);
assert(rc == 0);
zmq_term(ctx);
return 0;
}
starting monitor...
event : 8,ZMQ_EVENT_LISTENING, value 420 @ addr tcp://127.0.0.1:6666
event : 32,ZMQ_EVENT_ACCEPTED, value 424 @ addr tcp://127.0.0.1:6666
event : 4096,ZMQ_EVENT_HANDSHAKE_SUCCEEDED, value 0 @ addr tcp://127.0.0.1:6666
event : 512,ZMQ_EVENT_DISCONNECTED, value 424 @ addr tcp://127.0.0.1:6666
event : 32,ZMQ_EVENT_ACCEPTED, value 436 @ addr tcp://127.0.0.1:6666
event : 4096,ZMQ_EVENT_HANDSHAKE_SUCCEEDED, value 0 @ addr tcp://127.0.0.1:6666
[001] 61
[005] 6262626262
[007] 73616466617366
[031] 61736661736666666666666666666666666666666666666666666666666666
[026] 6161616161616161616161616161616161616161616161616161
[004] 6D6D6D6D
[010] 61736466617366617366
[005] 6166617364
[010] 69206C6F766520796F75
event : 512,ZMQ_EVENT_DISCONNECTED, value 436 @ addr tcp://127.0.0.1:6666
event : 32,ZMQ_EVENT_ACCEPTED, value 436 @ addr tcp://127.0.0.1:6666
event : 4096,ZMQ_EVENT_HANDSHAKE_SUCCEEDED, value 0 @ addr tcp://127.0.0.1:6666
[006] 313233343536
[008] 6661736466617366
[011] 6173666173646661736664
[010] 67617366617366646173
[008] 6173646661736661
[009] 617364666173666173
[009] 616661647364666173
import zmq, sys
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:6666")
while True:
data = input("input your data:")
if data == 'q':
sys.exit()
socket.send_string(data)
response = socket.recv_string()
print("recv : ", response)
D:\source\python\pikatest\Scripts\python.exe C:/Users/Administrator/PycharmProjects/pythonProject/zmq-1/zmq_client1.py
input your data:123456
recv : 123456
input your data:fasdfasf
recv : fasdfasf
input your data:asfasdfasfd
recv : asfasdfasfd
input your data:gasfasfdas
recv : gasfasfdas
input your data:asdfasfa
recv : asdfasfa
input your data:asdfasfas
recv : asdfasfas
input your data:afadsdfas
recv : afadsdfas
|