承接上篇博客null试了很久,发现是sleep()函数问题。我不太清楚是什么问题,就是程序运行一段时间就停止了,也不会报错,有大佬知道是为什么的可以说一下么,谢谢了。下面是我实现的MQTT订阅消息代码。import paho.mqtt.client as mqttfrom datetime import datetimeimport jsonimport timeimport pymysql as MySQLdbMQTTHOST = "192.168.31.67"MQTTPORT = 1883mqttClhttps://blog.csdn.net/huidong_zhu/article/details/121303819
多线程完美解决问题,下面是MQTT接收两个主题信息,处理信息并上传数据库,同时也解决了偶尔数据异常问题。
import threading
from datetime import datetime
import paho.mqtt.client as mqtt
import json
import time
import pymysql as MySQLdb
MQTTHOST = "192.168.31.67"
MQTTPORT = 1883
mqttClient1 = mqtt.Client()
mqttClient2 = mqtt.Client()
#global T_natine T_CO2 T_O2 CO_2 O_2 T P Table_name num
#记录时间和参数初始化
T_CO2 = datetime.now();
CO_2 = 0.00
T_O2 = datetime.now();
O_2 = 0.00
T = 0.00
P = 0.0
#定义表名
Table_name = "2021_11_13_end_gas3";
def on_EGO_mqtt_connect():
mqttClient2.connect(MQTTHOST, MQTTPORT, 60)
mqttClient2.loop_start()
def on_EGC_mqtt_connect():
mqttClient1.connect(MQTTHOST, MQTTPORT, 60)
mqttClient1.loop_start()
# 消息处理函数
def on_message_come(lient, userdata, msg):
global T_CO2
global T_O2
global CO_2
global O_2
global T
global P
T_native = datetime.now();
s = json.loads(msg.payload.decode("utf-8", errors='ignore'));
if ((T_native-T_CO2).seconds < 70 ) :
if ("尾碳" in s) :
CO_2 = float(str((s["尾碳"])));
print("CO2 : %.2f %%" %CO_2);
T_CO2 = datetime.now();
else:
print("CO2 electrode disconnection");
CO_2 = 0.00;
if (T_native-T_O2).seconds < 70:
if ("尾氧" in s) :
O_2 = float(str((s["尾氧"])));
print("O2 : %.2f %%" %O_2);
T_O2 = datetime.now();
T = float(str((s["尾温"])));
# print("T : %.2f " %T);
P = int(str((s["尾压"])))/10;
# print("P : %.2f kpa" %P);
else:
print("O2 electrode disconnection");
O_2 = 0.00;
T = 0.00;
P = 0.00;
# subscribe 消息订阅
def on_EGO_subscribe():
mqttClient2.subscribe("EGO", 0)
mqttClient2.on_message = on_message_come
# subscribe 消息订阅
def on_EGC_subscribe():
mqttClient1.subscribe("EGC", 0)
mqttClient1.on_message = on_message_come
def myTestFunc():
while True:
time.sleep(2);
T_native = datetime.now();
#上传数据
#连接数据库
global Table_name
db = MySQLdb.connect(host="192.168.31.67",user="root",password="123456",database="mydb") #,use_unicode=1, charset='utf8'
cursor = db.cursor()
sql = "INSERT INTO {Tablename}(time,CO2,O2,E_T,E_P) values(%s , %s, %s, %s, %s)".format(Tablename=Table_name) ;
str_native = datetime.now().strftime("%Y-%m-%d %H:%M:%S");
data = (str_native,CO_2,O_2,T,P);
cursor.execute(sql,data);
db.commit();
cursor.close();
db.close();
def thread_EGO_func():
on_EGO_mqtt_connect()
on_EGO_subscribe()
def thread_EGC_func():
on_EGC_mqtt_connect()
on_EGC_subscribe()
def many_thread():
build_MySQL_Table();
threads = [];
t = threading.Thread(target=myTestFunc);#主线程
threads.append(t);
t.setDaemon(True); # 给每个子线程添加守护线程
t = threading.Thread(target=thread_EGO_func);
threads.append(t);
t.setDaemon(True);
t = threading.Thread(target=thread_EGC_func);
threads.append(t);
t.setDaemon(True);
for t in threads: # 循环启动线程
t.start();
while True:
pass;
if __name__ == '__main__':
many_thread()
|