概念:
MQTT(消息遥测传输) 是ISO【国际标准化组织】标准下基于 **发布/订阅** 范式的**消息协议**。它是工作在 TCP/IP协议簇上的,是为**硬件性能低**下的**远程设备**以及网络状况糟糕的情况下而设计的 发布/订阅型 消息协议。为此,需要一个消息中间件。
? MQTT是一个基于 客户端-服务器 的消息发布/订阅的传输协议。 协议是轻量,简单,开放和易于实现的。优点在于,可以以极少的代码和有限的宽带,为连接远程设备提供实时可靠的消息服务。
特性:
? 1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合; ? 2、对负载内容屏蔽的消息传输; ? 3、使用 TCP/IP 提供网络连接; ? 4、有三种消息发布服务质量:
运行机制:
? 实现MQTT协议的通讯,需要服务器与客户端来完成,而在实际过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息代理是服务器,消息的发布者和订阅者都是客户端,并且消息发布者可以同时是订阅者。
MQTT传输的消息分为: 主题(Topic)和负载(payload)? 两部分:
? (1)Topic,主题,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)
? (2)payload,负载,可以理解为消息的内容,是指订阅者具体要使用的内容
MQTT是一种简单、稳定、开放、轻量级易于实现的消息协议,在物联网的应用下的信息采集,工业控制,智能家居等方面具有广泛的适用性。
MQTT主要应用领域 MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。 如:物联网M2M通信,物联网大数据采集 ? Android消息推送,WEB消息推送 ? 移动即时消息,例如Facebook Messenger ? 智能硬件、智能家具、智能电器 ? 车联网通信,电动车站桩采集 ? 智慧城市、远程医疗、远程教育 ? 电力、石油与能源等行业市场
例子:
新建一个 mqtt_public.py文件 【发布者】
import paho.mqtt.client as mqtt
import json
import time
from main import Main
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc)) #rc的值很重要,为0代表连接成功。
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("$SYS/#") #订阅$SYS/下的所有主题
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client("ADADWWRWFGWERWRWFGTERTER-PUB")
client.username_pw_set("WeightUpdate-PUB", password="1qaz#EDC")
client.on_connect = on_connect #连接broker时broker响应的回调
client.on_message = on_message #接收到订阅消息时的回调
client.connect("192.168.10.200", 1883, 190) #连接到broker
print(client.is_connected())
for i in range(1000):
# msg = Main().up_weight()
# print(msg,"msg")
client.publish(f"aicp/sound/soundDevice/new/report/", qos=0, payload=json.loads("kkkkkkkkkkkkk"))
time.sleep(1)
新建一个 mqtt_sub.py文件 【订阅者】
import paho.mqtt.client as mqtt
import json
class EstimationWeightUpdate(object):
def __init__(self):
self.init_mqtt()
# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
print("Connected with result code "+str(rc)) #rc的值很重要,为0代表连接成功。
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
self.client.subscribe("$SYS/#") #订阅$SYS/下的所有主题
# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
def init_mqtt(self):
self.client = mqtt.Client("ADADWWRWFGWERWRWFGTERTER-SUB")
self.client.username_pw_set("WeightUpdate-SUB", password="1qaz#EDC")
self.client.on_connect = self.on_connect #连接broker时broker响应的回调
self.client.on_message = self.on_message #接收到订阅消息时的回调
def subscribe_update_topic(self):
self.client.connect("192.168.10.200", 1883, 60) #连接到broker
self.client.subscribe("aicp/sound/soundDevice/new/report/", 0)
self.client.loop_forever()
if __name__ == "__main__":
ew = EstimationWeightUpdate()
ew.subscribe_update_topic()
|