zmq的消息模式
- PUB[server] - SUB[client]
- REP[server] - REQ[client]
- REQ - ROUTER
- DEALER - REP
- DEALER - ROUTER
- DEALER - DEALER
- ROUTER - ROUTER
- PUSH[server] - PULL[client]
- PAIR - PAIR
zmq的api
- 创建和销毁套接字:zmq_socket(), zmq_close()
- 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
- 为套接字建立连接:zmq_bind()(server端), zmq_connect()(client端)
- 发送和接收消息:zmq_send(), zmq_recv()
不同语言的库
libzmq,zmq的核心代码 cppzmq, 该库只有一个头文件。 Pyzmq,zmq的python库
git clone https://github.com/zeromq/libzmq.git
cd libzmq && mkdir build
cmake -D WITH_PERF_TOOL=OFF -D ZMQ_BUILD_TESTS=OFF \
-D ENABLE_CPACK=OFF -D CMAKE_BUILD_TYPE=Release
make -j3
更多编译选项可以参考链接
内存布局
socket.send ("Hello")
对应的布局: 长度+内容
REP&REQ例子
#include <zmq.hpp>
#include <string>
#include <iostream>
#include<zhelpers.hpp>
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REQ);
std::cout << "Connecting to hello world server..." << std::endl;
socket.connect ("tcp://localhost:5555");
for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
s_send (socket, "hello");
std::cout << "Client1 Received :" <<s_recv (socket)<< std::endl;
Sleep(1000);
}
return 0;
server:
#include <zmq.hpp>
#include <string>
#include <iostream>
#include<zhelpers.hpp>
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REQ);
std::cout << "Connecting to hello world server..." << std::endl;
socket.connect ("tcp://localhost:5555");
for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
s_send (socket, "SB");
std::cout << "Client2 Received :" <<s_recv (socket)<< std::endl;
Sleep(1000);
}
import zmq
context = zmq.Context()
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
for request in range(10):
print("Sending request %s …" % request)
socket.send(b"Hello")
message = socket.recv()
print("Received reply %s [ %s ]" % (request, message))
server:
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
message = socket.recv()
print("Received request: %s" % message)
time.sleep(1)
socket.send(b"World")
pub&sub模式
cpp示例: pub:
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5563");
while (1) {
s_sendmore (publisher, "A");
s_send (publisher, "We don't want to see this");
Sleep (100);
s_sendmore (publisher, "B");
s_send (publisher, "We would like to see this");
Sleep (100);
}
sub:
zmq::context_t context(1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5563");
subscriber.setsockopt( ZMQ_SUBSCRIBE, "B", 1);
while (1) {
std::string address = s_recv (subscriber);
std::string contents = s_recv (subscriber);
std::cout << "订阅2:[" << address << "] " << contents << std::endl;
}
python示例:
import zmq
import sys
def start_sub():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE, b'2333')
for _ in range(10):
msg = socket.recv()
topic, data = msg.split()
print(f'topic:{topic}, data:{data}')
def start_pub():
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
topic = 2333
idx = 0
while True:
msg = b'%d %d'% (topic, idx)
socket.send(msg)
idx += 1
if __name__ == '__main__':
if len(sys.argv) > 1:
print('starting sub')
start_sub()
else:
print('starting pub')
start_pub()
参考链接: https://www.cnblogs.com/ssss429170331/p/5559210.html https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/patterns/client_server.html https://zeromq.org/socket-api/ https://github.com/anjuke/zguide-cn
|