python 使用MQTT协议接入电信AEP(AIOT)平台示例
电信物联网新平台AEP(Aiot)官网:https://www.ctwing.cn/ 天翼物联网平台(AIoT)通用组件服务支撑7群:569682371
注册
-
通过官网右上角单点注册 -
注册完成登录并订购使能服务 ? -
免费订购并开通使用 ? ?
产品创建
- 订购完成会立即跳转到设备管理页面或单点右上角控制台进入,点击创建产品
? - 随意创建一个MQTT的物模型非透传产品即可
- 产品名称:自定义
- 产品分类:智慧城市-环境感知-智能空调 (选择与自身产品设备相近似的物模型类型,因为创建完成后除了默认的标准属性和服务,可属性和服务自定义添加使用。)
- 接入方式:设备直连
- 网络类型:移动蜂窝或WiFi都行
- 通信协议:MQTT
- 数据加密方式:明文 (通常都是明文)
- 认证方式:特征串认证
- 安全类型:一型一密 (建议一型一密)
- 设备型号:自定义
- 是否透传:否 (本篇演示物模型非透传)
- 消息格式:JSON
? ?
设备创建
- 点击确定创建完成,添加一个测试设备
- 设备名称:自定义
- 设备编号:自定义 如有真实设备以实际为准
- 设备目前是已注册状态,通过设备右侧的快捷按钮认证信息查看到设备特征串,即MQTT连接时的所需passWord
? ?
激活设备
- 电信AEP官网MQTT文档:https://www.ctwing.cn/sbjr/42#see
2. 通过控制台页面获取到设备ID以及特征串
-
设备ID(client_id):
-
特征串 (passWord):
-
q1LpBvswC6qDi6lu9F7W6PpqxQdyBDrYSfv4rGCjcTU
? ?
订阅消息
- 通过控制台页面查看服务定义-服务列表
- 本篇只演示数据上报和指令收到后的指令下发响应操作
服务标识 | 服务名称 | 服务类型 |
---|
data_report | 业务数据上报 | 数据上报 | set_run_resp | 设定运行状态响应 | 指令下发响应 |
? ?
python MQTT程序代码块
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import json
import time
HOST = "mqtt.ctwing.cn" # 接入地址
PORT = 1883 # 端口号
client_id = "1514560220220217" # 设备ID
userName = "admin" # 用户名-自定义
passWord = "q1LpBvswC6qDi6lu9F7W6PpqxQdyBDrYSfv4rGCjcTU" # 特征串
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# client.subscribe("set_run") # 订阅消息,可有可无
def on_message(client, userdata, msg):
# 接收指令下发数据
print("主题:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8')))
def on_subscribe(client, userdata, mid, granted_qos):
print("On Subscribed: qos = %d" % granted_qos)
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection %s" % rc)
client = mqtt.Client(client_id)
client.username_pw_set(userName, passWord)
client.on_subscribe = on_subscribe
client.on_disconnect = on_disconnect
client.connect(HOST, PORT, 60)
client.loop_start()
while True:
client.on_connect = on_connect
client.on_message = on_message
time.sleep(2)
inputData = input('\033[1;31;40m'' 发送数据:''\033[0m')
param = json.dumps(json.loads(inputData)) # string转字典dict[str,int]
client.publish("data_report", payload=param, qos=0) # 发送消息
? ?
数据上报
- 通过服务定义-服务列表-查看详情编写上报数据的key/value JSON数据
{"temperature":16,"onoff_state":0,"mode":0,"fan_up_down":0,"fan_left_right":0,"current_speed":0}
?
-
程序启动发送注册报文到平台成功后,设备状态已激活。
- 离线灰点:设备主动离线或者无心跳则判断离线
- 在线绿点:表示设备连接在线中···
? -
通过数据查看按钮,查看刚才上报成功的数据 ? ?
指令下发
-
通过设备操作-按钮中的指令下发实现快速模拟下发功能 ? ? -
然后通过指令下发日志操作按钮,实时查看指令下发状态
? ?
AEP平台各种指令状态介绍
平台各种指令状态: 状态信息共分为:指令已保存(指令已到达平台但未下发到设备)、 指令已发送(指令下发过程中,平台未收到设备返回的ack确认)、 指令TTL超时(指令缓存时长超过命令设置的ttl值)、 指令已送达(指令已到达设备,且平台收到设备返回的ack确认)、 指令已完成 (指令下发到设备,且平台收到了设备主动回复的响应)、 指令下发超时 (指令下发到设备,平台超过既定时间未收到设备的ACK确认)、 指令失败(指令下发失败或设备返回执行结果为失败)。 其中,对于TCP透传设备,无法到达指令已送达/已完成状态,指令的最终状态即为指令已发送;对于其他协议透传设备,无法到达指令已完成状态,指令的最终状态即为指令已送达。
? ?
指令下发响应
- 这里我们把发送消息主题改成:set_run_resp 响应主题
client.publish("set_run_resp", payload=param, qos=0)
- 平台指令成功下发到MQTT程序控制台展示
{"temperature":22,"onoff_state":1,"mode":1,"fan_up_down":1,"fan_left_right":1,"current_speed":1}}
- 通过编译器控制台发送指令响应回复
? - 响应内容需携带指令消息的taskId,进行响应回复
{"taskId": 2, "resultPayload": {"result": 0}}`
-
通过指令下发日志查看指令状态是否已完成 ? ? -
点击指令详情查看是否有**返回内容 ** ? ? ? ?
通过线程实现同时收发数据并自动响应回复
import threading
import paho.mqtt.client as mqtt
import json
import time
HOST = "mqtt.ctwing.cn"
PORT = 1883
client_id = "1514560220220217"
userName = "admin"
passWord = "q1LpBvswC6qDi6lu9F7W6PpqxQdyBDrYSfv4rGCjcTU"
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
List = []
def on_message(client, userdata, msg):
print("\n 主题:" + msg.topic + " 消息:" + str(msg.payload))
taskId = json.loads(msg.payload)["taskId"]
temperature = json.loads(msg.payload)["payload"]["temperature"]
Command = {"taskId": int(taskId), "resultPayload": {"result": 0}}
List.append(json.dumps(Command))
"""
指令接收示例;
主题:set_run 消息:{"taskId":7,"payload":{"temperature":22,"onoff_state":0,"mode":1,"fan_up_down":0,"fan_left_right":1,"current_speed":0}}
"""
def on_subscribe(client, userdata, mid, granted_qos):
print("On Subscribed: qos = %d" % granted_qos)
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection %s" % rc)
client = mqtt.Client(client_id)
client.username_pw_set(userName, passWord)
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.on_disconnect = on_disconnect
print("开始连接MQTT:", HOST + ':' + str(PORT))
client.connect(HOST, PORT, 60)
print("MQTT连接完成", HOST + ':' + str(PORT))
client.loop_start()
"""
loop()是一个心跳函数,用来保持客户端与服务器的连接。
比如keepalive参数为60秒,那么60秒内必须loop()一下或者发布一下消息,不然连接会断,就无法继续发布或者接受消息。
loop_start()是启用一个进程保持loop()的重复调用,就不需要定期心跳了,对应的有loop_stop()。
loop_forever()用来保持无穷阻塞调用loop()
"""
BreakThread = ['']
def command(client):
while True:
if BreakThread[0] == 'exit':
break
elif len(List) >= 1:
paramCommand = List.pop()
client.publish("set_run_resp", payload=paramCommand, qos=1)
print('\033[1;39;33m', paramCommand, '响应回复成功''\033[0m')
List.clear()
def publish(client):
while True:
time.sleep(5)
inputData = input('\033[1;31;40m'' 发送数据:''\033[0m')
BreakThread[0] = inputData
if inputData == 'exit':
break
else:
param = json.dumps(json.loads(inputData))
client.publish("data_report", payload=param, qos=0)
print('\033[1;39;33m', '数据发送成功''\033[0m')
"""
发送数据示例:
{"temperature":16,"onoff_state":0,"mode":0,"fan_up_down":0,"fan_left_right":0,"current_speed":0}
"""
def main():
Threading1 = threading.Thread(target=command, args=(client,))
Threading2 = threading.Thread(target=publish, args=(client,))
Threading1.start()
Threading2.start()
Threading1.join()
Threading2.join()
if __name__ == '__main__':
main()
- 平台有技术支撑群,如果文章有其他不明白可以群里咨询管理员-QQ群支撑群:569682371
- 能更快的帮助你加速开发进程
|