demo测试
订阅者 hbmqtt_sub --url mqtt://mqtt.eclipseprojects.io:1883 -t /geektime/iot
发布者 hbmqtt_pub --url mqtt://mqtt.eclipseprojects.io:1883 -t /geektime/iot -m Hello,World!
tips
- MQTT 消息采用二进制的编码格式,而不是 HTTP 协议那样的文本的表述方式。
- Client 在重复发送一个主题的消息时,可以从第二次开始,将主题名长度设置为 0,这样 Broker 会自动按照上次的主题来处理消息。这种情况对传感器设备来说十分常见,所以这个特性在工作中很有实际意义。3 种 QoS 级别:可靠通信
- Qos的三个级别
订阅者代码
import threading
import paho.mqtt.client as mqtt
import time
HOST = "xxxxx" #emq服务器地址
PORT = 1883
class Mqtt_subscribe(threading.Thread):
"""
mqtt thread, 完成订阅功能
"""
def __init__(self, subtopic):
super(Mqtt_subscribe, self).__init__()
self.client_id = time.strftime(
'%Y%m%d%H%M%S', time.localtime(
time.time()))
self.client = mqtt.Client(self.client_id)
self.client.user_data_set(subtopic)
self.client.username_pw_set("admin", "public")
self.message = None
def run(self):
# ClientId不能重复,所以使用当前时间
# 必须设置,否则会返回「Connected with result code 4」
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(HOST, PORT, 60)
self.client.loop_forever(timeout=60)
def on_connect(self, client, subtopic, flags, rc):
print("Connected with result code " + str(rc))
print("topic:" + subtopic)
client.subscribe(subtopic, 2)
def on_message(self, client, userdata, msg):
# print(msg.topic + " " + msg.payload.decode("utf-8"))
self.mess_age = msg.payload.decode("utf-8")
if 'test' in self.mess_age: #根据订阅内容判断,如果含有想要的值则会停止订阅
self.client.disconnect()
return self.mess_age
if __name__ == "__main__":
subtopic = "test"
t = Mqtt_subscribe(subtopic)
t.start()
发布者代码
import paho.mqtt.publish as publish
import time
def mqtt_Publish(topic,msg):
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
publish.single(topic,msg,qos=0,
hostname="xxxx", #emq地址
port=1883,
client_id=client_id,
auth={'username': "admin", 'password': "public"}) #登录密码,官方是amdin,public
if __name__ == '__main__':
topic = 'test'
message = "this is a test"
mqtt_Publish(topic,message)
参考
|