import time
import zmq
from tornado import ioloop
from zmq.eventloop import zmqstream
# from threading import Thread
loop = ioloop.IOLoop.current()
LOG_URL = 'tcp://127.0.0.1:12345'
idx = 0
def getcommand(msg):
global idx
print("Received control command: %s" % msg, idx)
idx += 1
def consume(url):
ctx = zmq.Context.instance()
s = ctx.socket(zmq.PULL)
s.connect(url)
stream_pull = zmqstream.ZMQStream(s)
stream_pull.on_recv(getcommand)
# while True:
# msg = s.recv() # return bytes
# # msg = s.recv_multipart() # return List
# # msg = s.recv_multipart(copy=False) # return zmq.Frame
# print(type(msg))
# print(msg)
# s.close()
consume(LOG_URL)
loop.start()
while True:
time.sleep(1)
import time
import zmq
from zmq.log.handlers import PUBHandler
import logging
from threading import Thread
LOG_URL = 'tcp://127.0.0.1:12345'
def produce(url):
ctx = zmq.Context.instance()
s = ctx.socket(zmq.PUSH)
s.bind(url)
idx = 0
while True:
ret = s.send_string("aaaaaaaaaaaaaaa") # return bytes
idx += 1
# msg = s.recv_multipart() # return List
# msg = s.recv_multipart(copy=False) # return zmq.Frame
print(idx)
print(ret)
s.close()
producer = Thread(target=produce, args=(LOG_URL,))
producer.start()
while True:
time.sleep(1)
|