效果图
mqtt发布
本代码中publish 是一个死循环,数据一直往外发送。
import random
import time
from paho.mqtt import client as mqtt_client
import json
from datetime import datetime
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
while True:
time.sleep(0.01)
msg = json.dumps({"MAC": "0123456789",
"samplerate": 12,
"sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]),
"battery": 0.5,
"acc": [
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
]})
result = client.publish(topic, msg)
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
mqtt订阅
from paho.mqtt import client as mqtt_client
import time
import os
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"
def connect_mqtt(client_id):
""" MQTT 连接函数。 """
def on_connect(client, userdata, flags, rc):
"""
连接回调函数
在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。
"""
if rc == 0:
print("Connected to MQTT Broker! return code %d" % rc)
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker , port)
return client
def subscribe(client: mqtt_client, a_topic):
""" 订阅消息 """
def on_message(client, userdata, msg):
"""
消息回调函数
在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
* 这里可添加自定义数据处理程序
"""
print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode()))
client.subscribe(topic)
client.on_message = on_message
def run(client_id, topic):
client = connect_mqtt(client_id)
subscribe(client, topic)
client.loop_forever()
if __name__ == '__main__':
run('test_eartag-003-python-li', 'zk100/gw/#')
matplotlib绘制动态图
import matplotlib.pyplot as plt
import numpy as np
count = 100
ax = list(range(count))
ay = [0] * 100
bx = list(range(count))
by = [0] * 100
num = count
plt.ion()
plt.rcParams['figure.figsize'] = (10, 10)
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['lines.linewidth'] = 0.5
plt.tight_layout()
while True:
plt.clf()
plt.suptitle("总标题", fontsize=30)
g1 = np.random.random()
ax.append(num)
ay.append(g1)
agraphic = plt.subplot(2, 1, 1)
agraphic.set_title('子图表标题1')
agraphic.set_xlabel('x轴', fontsize=10)
agraphic.set_ylabel('y轴', fontsize=20)
plt.plot(ax[-count:], ay[-count:], 'g-')
bx.append(num)
by.append(g1)
bgraghic = plt.subplot(2, 1, 2)
bgraghic.set_title('子图表标题2')
bgraghic.plot(bx[-count:], by[-count:], 'r^')
plt.pause(0.001)
num = num + 1
matplotlib绘制mqtt数据实时图像
- 单线程
- 先启动mqtt订阅服务
mqtt订阅中有阻塞,更新数据后因订阅服务没有结束,导致绘图程序无法绘图 - 先启动绘图程序
绘图程序本身也是个循环,拿不到mqtt的实时数据,图像无法更新 - 两个服务加入协程,也不行。具体原因还不知道,容后补充。
- mqtt作为线程启动,可解决上述问题
import json
import random
from paho.mqtt import client as mqtt_client
import time
import datetime
from math import ceil, floor
import matplotlib.pyplot as plt
import _thread
broker = 'broker.emqx.io'
topic = "/python/mqtt/li"
port = 1883
client_id = f'python-mqtt-li-{random.randint(0, 100)}'
show_num = 300
x_num = [-1]
acc1 = []
acc2 = []
acc3 = []
acc4 = []
acc5 = []
acc6 = []
stime = []
"""mqtt subscribe topic"""
def str_microsecond_datetime2int_13timestamp(str_microsecond_datetime):
"""将字符串型【毫秒级】格式化时间 转为 【13位】整型时间戳"""
datetime_obj = datetime.datetime.strptime(str_microsecond_datetime, "%Y/%m/%d-%H:%M:%S.%f")
obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) / 1000.0
return obj_stamp
def int2datetime(int_float_timestamp):
"""
有小数点:分离小数点,整数转为格式化时间,小数点直接跟在后面
无小数点:从第10位进行分离,
所以本函数只适用于时间戳整数位数大于9且小于11.
"""
if '.' in str(int_float_timestamp):
int_float = str(int_float_timestamp).split('.')
date = time.localtime(int(int_float[0]))
tempDate = time.strftime("%Y/%m/%d-%H:%M:%S", date)
secondafter = '.' + str(int_float[1])
return str(tempDate) + secondafter
def parse_mqttmsg(msg):
"""解析mqt头数据 MAC samplerate sampletime battery acc"""
content = json.loads(msg.payload.decode())
span = 1000 / content['samplerate'] * 10
time_span = [ceil(span) / 10 / 1000, floor(span) / 10 / 1000]
sampletime = content['sampletime']
sampletime_int = str_microsecond_datetime2int_13timestamp(sampletime)
acc = content['acc']
for i in range(len(acc)):
x_num.append(x_num[-1] + 1)
acc1.append(acc[i][0])
acc2.append(acc[i][1])
acc3.append(acc[i][2])
acc4.append(acc[i][3])
acc5.append(acc[i][4])
acc6.append(acc[i][5])
if i != 0:
sampletime_int += time_span[i % 2]
stime.append(int2datetime(round(sampletime_int * 1000, 0) / 1000))
else:
stime.append(sampletime)
print(x_num[-1], stime[-1], acc1[-1], acc2[-1], acc3[-1], acc4[-1], acc5[-1], acc6[-1])
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
pass
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
parse_mqttmsg(msg)
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
""" draw figures """
def draw_figure():
plt.ion()
plt.rcParams['figure.figsize'] = (10, 10)
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['lines.linewidth'] = 0.5
count = 0
while True:
plt.clf()
plt.suptitle("总标题", fontsize=30)
plt.tight_layout()
agraphic = plt.subplot(2, 1, 1)
agraphic.set_title('子图表标题1')
agraphic.set_xlabel('x轴', fontsize=10)
agraphic.set_ylabel('y轴', fontsize=20)
plt.plot(x_num[1:][-show_num:], acc1[-show_num:], 'g-')
try:
xtricks = list(range(len(acc1) - show_num, len(acc1), 10))
xlabels = [stime[i] for i in xtricks]
plt.xticks(xtricks, xlabels, rotation=15)
except:
pass
bgraghic = plt.subplot(2, 1, 2)
bgraghic.set_title('子图表标题2')
bgraghic.set_xlabel('x轴', fontsize=10)
bgraghic.set_ylabel('y轴', fontsize=20)
bgraghic.plot(x_num[1:][-show_num:], acc2[-show_num:], 'r^')
plt.pause(0.001)
count = count + 1
if __name__ == '__main__':
_thread.start_new_thread(run, ())
draw_figure()
|