前言
pulsar和 rabbitmq都是消息队列。本文介绍了mq-fw这个包,使用这个包可用很少的代码就能实现pulsar和rabbitmq使用 Pulsar是一个企业级分布式消息系统,最初由雅虎在2016年开源,目前由 Apache 软件基金会管理。Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。极低的发布延迟和端到端延迟。可无缝扩展到超过一百万个 topic。 RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
一、mq-fw 是什么?
mq-fw 是一个消息队列的框架,包含pulsar和 rabbitmq的消费和生产,还包含pulsar和 rabbitmq互相调用。 示例代码地址:https://gitee.com/maxbanana/mq-fw-examples
二、功能介绍
使用mq-fw能快速实现下列功能:
- pulsar 生产
- pulsar 消费
- pulsar 服务端
- pulsar 调用端
- rabbitmq 生产
- rabbitmq 消费
- rabbitmq 服务端
- rabbitmq 调用端
- pulsar与rabbitmq 互相调用
注意:rabbitmq默认是 topic模式,不能选择其他模式
三、安装
pip install mq-fw
四、示例
1. pulsar 生产
代码如下(示例):
"""
生产消息
"""
import pulsar_mq
import json
# pulsar服务地址
pulsar_url = 'pulsar://0.0.0.0:6650'
# 生产的topic
produce_topic = ''
"""
1. 连接pulsar
"""
client = pulsar_mq.client(pulsar_url)
"""
2. 创建生产者
"""
producer = client.create_producer(produce_topic)
msg = {"data": {"a": "1", "b": "2"}}
"""
3.发送消息
默认参数: _async=True, callback=None, random_topic=None
_async: 是否异步发送消息, True异步发送, Flase 同步发送
callback: 异步发送时的回调函数
random_topic: 随机topic
"""
producer.send(json.dumps(msg))
# 一次发多条消息
# producer.send([json.dumps(msg), json.dumps(msg2)])
2.pulsar消费
代码如下(示例):
"""
消费数据
"""
import pulsar_mq
# pulsar服务地址
pulsar_url = 'pulsar://0.0.0.0:6650'
# 消费者订阅的topic
consumer_topic = ''
# 消费者的名字
consumer_name = ''
"""
1. 连接pulsar
"""
client = pulsar_mq.client(pulsar_url)
"""
2. 创建消费者
默认参数 schema=pulsar.schema.StringSchema(), consumer_type='Shared'
'Shared': 共享模式
'Exclusive': 独占模式
'Failover': 灾备模式
'KeyShared': 关键字共享模式
"""
consumer = client.create_consumer(consumer_topic, consumer_name)
def task(msg):
"""
3. 回调函数
处理接收的消息
:param msg: 消费的消息
:return:
"""
print(msg)
"""
4.开始消费
一直监听进行消费
默认参数 thread_count=None, logger=None
若设置thread_count=5 程序将开启5个线程进行消费
logger 日志收集器
"""
consumer.receive(task)
# 只消费一个就停止监听,关闭消费者
# consumer.receive_one(task)
# 关闭消费者
# consumer.close()
# 取消订阅,并关闭消费者
# consumer.unsubscribe()
3.pulsar服务端
代码如下(示例):
"""
服务端
先消费消息,再生产消息
"""
import pulsar_mq
import json
client = pulsar_mq.client('pulsar://0.0.0.0:6650')
"""
默认使用随机队列模式
随机队列模式是:
消费的消息里带一个random_topic,
生产的消息网random_topic里发送
当传入参数 producer_topic,使producer_topic不等于None时,
生产的消息往 producer_topic里发送
"""
service = client.consume_produce(consumer_topic='', consumer_name='')
def task(msg):
"""
回调函数
:param msg:
:return:
"""
msg = json.loads(msg)
print(msg)
random_topic = msg.get('random_topic')
print(random_topic)
import time
time.sleep(6)
msg = {"data": {"a": "1", "b": "2"}}
return json.dumps(msg)
service.run(task)
4.pulsar调用端
代码如下(示例):
"""
调用端
先生产发送消息,然后消费消息
"""
import pulsar_mq
import json
client = pulsar_mq.client('pulsar://0.0.0.0:6650')
"""
默认使用随机队列模式
随机队列模式是:
当使用默认参数 consumer_topic=None, consumer_name=None时
pulsar_mq包自动生成一个random_topic,然后和消息一起发送出去,同时监听random_topic
服务端接收到消息后将本服务生产好的消息往random_topic发送。
当传入参数 consumer_topic、consumer_name时,不使用随机队列模式
"""
service = client.produce_consume(producer_topic='')
msg = {"data": {"a": "1", "b": "2"}}
msg2 = {"data": {"c": "3", "d": "4"}}
msg_list = [json.dumps(msg), json.dumps(msg2)]
data = service.run(msg_list)
print(data)
for d in data:
print(d)
# 只发送一个消息
# data = service.run(json.dumps(msg))
# print(data)
5.rabbitmq生产
代码如下(示例):
import rabbitmq
import json
"""
rabbitmq 使用topic模式,不能更改
"""
host = '0.0.0.0'
port = 5672
username = ''
password = ''
exchange = ''
routing_key = ''
# 连接rabbitmq
rq = rabbitmq.connect(host, port, username, password)
# 创建生产者
producer = rq.create_producer(exchange, routing_key)
msg = {"data": {"a": "1", "b": "2"}}
msg2 = {"data": {"c": "3", "d": "4"}}
msg_list = [json.dumps(msg), json.dumps(msg2)]
# 发送数据
producer.send(msg_list)
6.rabbitmq消费
代码如下(示例):
import rabbitmq
"""
rabbitmq 默认使用topic模式,不能更改
"""
host = '0.0.0.0'
port = 5672
username = ''
password = ''
exchange = ''
routing_key = ''
# 连接rabbitmq
rq = rabbitmq.connect(host, port, username, password)
# 创建消费者
consumer = rq.create_consumer(exchange, routing_key)
def task(msg):
import time
time.sleep(5)
print("接收 {} 成功.......".format(msg))
# 一直消费
consumer.receive(task)
# consume_num = 1 ,只消费一次, consume_num = n : 消费 n 次就停止消费
# consumer.receive(task, consume_num=1)
7.rabbitmq服务端
代码如下(示例):
import json
import rabbitmq
import time
"""
rabbitmq 默认使用topic模式,不能更改
该示例是 rabbitmq 服务端,先消费在生产
"""
host = '0.0.0.0'
port = 5672
username = ''
password = ''
consumer_exchange = ''
consumer_routing_key = ''
producer_exchange = consumer_exchange
producer_routing_key = ''
connect = rabbitmq.connect(host, port, username, password)
# 使用服务端, 推荐使用随机模式
# 随机模式:生产时使用random_exchange, random_routing_key
service = connect.consume_produce(consumer_exchange, consumer_routing_key, durable=True)
# 生产时 使用固定exchange, routing_key
# service = connect.consume_produce(consumer_exchange, consumer_routing_key,
# producer_exchange, producer_routing_key, durable=True)
def task(body):
print(body)
time.sleep(5)
return [json.dumps({'result': body})]
# 运行服务端
service.run(task, thread_count=2)
8.rabbitmq调用端
代码如下(示例):
import rabbitmq
import json
"""
rabbitmq 默认使用topic模式,不能更改
该示例是 rabbitmq 调用端,先生产在消费
"""
host = '0.0.0.0'
port = 5672
username = ''
password = ''
consumer_exchange = ''
consumer_routing_key = ''
producer_exchange = consumer_exchange
producer_routing_key = ''
connect = rabbitmq.connect(host, port, username, password)
# 使用调用端, 推荐使用随机模式
# 随机模式:消费时 使用random_exchange, random_routing_key
service = connect.produce_consume(producer_exchange, producer_routing_key, durable=True)
# 消费时 使用固定exchange, routing_key
# service = connect.produce_consume(producer_exchange, producer_routing_key, consumer_exchange, consumer_routing_key,
# durable=True)
msg_list = []
for i in range(5):
msg_list.append(json.dumps({'data': i}))
# 运行调用端
result = service.run(msg_list, thread_count=2)
for i in result:
print(i)
9.pulsar与rabbitmq 互相调用
代码如下(示例):
from loguru import logger
import RabbitmqPulsar
"""
Rabbitmq 与 Pulsar 互相连接
Rabbitmq 消费的消息发送至 pulsar
pulsar 消费的消息发送至 Rabbitmq
"""
# pulsar 配置
pulsar_url = ''
producer_topic = ''
consumer_topic = ''
consumer_name = ''
# rabbitmq 配置
host = ''
port = 5672
username = ''
password = ''
rb_mq_send_ex = ''
rb_mq_send_key = ''
rb_mq_cons_ex = ''
rb_mq_cons_key = ''
connect = RabbitmqPulsar.connect(host, port, username, password, pulsar_url)
"""
1.默认模式(消息从rabbitmq开始流转):inter_services() 里 默认参数是 start_with_rabbitmq=True, random_queue=True
1). 从 rabbitmq 订阅,将数据发送至 pulsar; 再从 pulsar 订阅,将数据发送至 rabbitmq
2). 使用使用随机队列来生产消息
3). 消息数据流向: rabbitmq调用端 --> 本互联服务(rabbitmq服务端,pulsar调用端)--> pulsar服务端
"""
# 使用默认模式
service = connect.inter_services()
# 运行服务
service.run(producer_topic, rb_mq_cons_ex, rb_mq_cons_key, durable=True, thread_count=5, logger=logger)
"""
2.消息从pulsar开始流转模式:inter_services() 里参数是 start_with_rabbitmq=False, random_queue=True
1). 从 pulsar 订阅,将数据发送至 rabbitmq; 再从 rabbitmq 订阅,将数据发送至 pulsar
2). random_queue=True, 使用使用随机队列来生产消息
3). 消息数据流向: pulsar调用端 --> 本互联服务(pulsar服务端,rabbitmq调用端)--> rabbitmq服务端
"""
# 使用从pulsar开始流转模式
# service = connect.inter_services(start_with_rabbitmq=False)
# 运行服务
# service.run(consumer_topic, consumer_name,
# rb_mq_send_ex, rb_mq_send_key, durable=True, thread_count=5, logger=logger)
"""
3.通用模式:inter_services() 里 参数是 random_queue=False
1). 从 pulsar 订阅,将数据发送至 rabbitmq; 再从 rabbitmq 订阅,将数据发送至 pulsar
2). random_queue=False, 不使用使用随机队列来生产消息
3). 消息数据流向, 两条独立的数据流同时运行
1. pulsar生产 --> 本互联服务(pulsar消费,rabbitmq生产)--> rabbitmq消费
2. rabbitmq生产 --> 本互联服务(rabbitmq消费,pulsar生产)--> pulsar消费
"""
# 使用从pulsar开始流转模式
# service = connect.inter_services(random_queue=False)
# 运行服务
# service.run(producer_topic, consumer_topic, consumer_name,
# rb_mq_send_ex, rb_mq_send_key, rb_mq_cons_ex, rb_mq_cons_key, durable=True, logger=logger)
五、pulsar与rabbitmq 互相调用(进阶)
上一步(9.pulsar与rabbitmq 互相调用)中的示例是将rabbitmq的消息原封不动的发送到pulsar,也会将pulsar的消息原封不动的发送到rabbitmq。假如想对rabbitmq 或 pulsar消息进行处理该怎么办? 可以在service.run里添加参数rabbitmq_task和pulsar_task来实现 代码如下(示例):
"""
进阶使用
Rabbitmq 与 Pulsar 互相连接
"""
from loguru import logger
import RabbitmqPulsar
# pulsar 配置
pulsar_url = ''
producer_topic = ''
consumer_topic = ''
consumer_name = ''
# rabbitmq 配置
host = ''
port = 5672
username = ''
password = ''
rb_mq_send_ex = ''
rb_mq_send_key = ''
rb_mq_cons_ex = ''
rb_mq_cons_key = ''
connect = RabbitmqPulsar.connect(host, port, username, password, pulsar_url)
# 使用默认模式(消息从rabbitmq开始流转)
service = connect.inter_services()
def rabbitmq_task(msg):
"""
处理消费的rabbitmq消息,返回结果将发送至 pulsar
:param msg: rabbitmq消息
:return:
"""
print('rabbitmq消息:', msg)
return msg
def pulsar_task(msg):
"""
处理消费的pulsar消息,返回结果将发送至 rabbitmq
:param msg: pulsar消息
:return:
"""
print('pulsar消息:', msg)
return [msg]
# 运行服务
service.run(producer_topic, rb_mq_cons_ex, rb_mq_cons_key, rabbitmq_task=rabbitmq_task, pulsar_task=pulsar_task,
durable=True, thread_count=5, logger=logger)
总结
本文简单介绍了mq-fw的使用,欢迎留言交流学习,有不足之处还望指正,感谢!!
|