[root@han node]# cat pubrepnode.py
import zmq
import time
a = 0
def rep(node):
global a
print('------------')
time.sleep(1)
a = a+1
node.pub_socket.send_string("%s %s" % ('onu/online', str(a)))
class PubRepNode(object):
def __init__(self, publish_address, reply_address):
self.context = zmq.Context()
self.pub_socket = self.context.socket(zmq.PUB)
self.pub_socket.bind(publish_address)
self.rep_socket = self.context.socket(zmq.REP)
self.rep_socket.bind(reply_address)
def publish(self, topic, content):
self.pub_socket.send_string("%i %s" % (topic, content))
def start(self, rep_func):
while True:
rep_func(self)
def publish(self, topic, data):
pass
#socket.send_json("%i %i %i" % ('onu/online/001122334455', data))
node = PubRepNode("tcp://*:55566", "tcp://*:55567")
node.start(rep)
[root@han node]# cat subreqnode.py
import zmq
import time
def subscribe_callback(node):
string = node.sub_socket.recv_string()
print('{}'.format(string))
class SubReqNode(object):
def __init__(self, subscribe_address, request_address):
self.context = zmq.Context()
self.sub_socket = self.context.socket(zmq.SUB)
self.sub_socket.connect(subscribe_address)
self.req_socket = self.context.socket(zmq.REQ)
self.req_socket.connect(request_address)
def send_req(self, topic, content):
self.pub_socket.send_string("%i %s" % (topic, content))
def start(self, subsribe_callback):
while True:
subscribe_callback(self)
def subscribe(self, topic):
zip_filter = topic
zip_filter = zip_filter.decode('ascii')
self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
pass
node = SubReqNode("tcp://127.0.0.1:55566", "tcp://127.0.0.1:55567")
node.subscribe('onu/online')
node.start(subscribe_callback)
[root@han node]#
|