IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 嵌入式 -> python MQTT多线程接收与数据上传MySQL -> 正文阅读

[嵌入式]python MQTT多线程接收与数据上传MySQL

承接上篇博客null试了很久,发现是sleep()函数问题。我不太清楚是什么问题,就是程序运行一段时间就停止了,也不会报错,有大佬知道是为什么的可以说一下么,谢谢了。下面是我实现的MQTT订阅消息代码。import paho.mqtt.client as mqttfrom datetime import datetimeimport jsonimport timeimport pymysql as MySQLdbMQTTHOST = "192.168.31.67"MQTTPORT = 1883mqttClhttps://blog.csdn.net/huidong_zhu/article/details/121303819

多线程完美解决问题,下面是MQTT接收两个主题信息,处理信息并上传数据库,同时也解决了偶尔数据异常问题。

import threading
from datetime import datetime
import paho.mqtt.client as mqtt
import json
import time
import pymysql as MySQLdb

MQTTHOST = "192.168.31.67"
MQTTPORT = 1883
mqttClient1 = mqtt.Client()
mqttClient2 = mqtt.Client()

#global T_natine T_CO2 T_O2 CO_2 O_2 T P Table_name num 
#记录时间和参数初始化
T_CO2 = datetime.now();  
CO_2 = 0.00
T_O2 = datetime.now();
O_2 = 0.00
T = 0.00
P = 0.0

#定义表名
Table_name = "2021_11_13_end_gas3";


def on_EGO_mqtt_connect():
    mqttClient2.connect(MQTTHOST, MQTTPORT, 60)
    mqttClient2.loop_start()

def on_EGC_mqtt_connect():
    mqttClient1.connect(MQTTHOST, MQTTPORT, 60)
    mqttClient1.loop_start()

# 消息处理函数
def on_message_come(lient, userdata, msg):
    global T_CO2
    global T_O2
    global CO_2
    global O_2
    global T
    global P
    T_native = datetime.now();
    
    s = json.loads(msg.payload.decode("utf-8", errors='ignore'));
    if ((T_native-T_CO2).seconds < 70 ) :
        if ("尾碳" in s) :
            CO_2 = float(str((s["尾碳"])));
            print("CO2 : %.2f %%" %CO_2); 
            T_CO2 = datetime.now();  
    else:
        print("CO2 electrode disconnection");   
        CO_2 = 0.00; 

    if (T_native-T_O2).seconds < 70:
        if ("尾氧" in s) :
            O_2 = float(str((s["尾氧"])));
            print("O2 : %.2f %%" %O_2);
            T_O2 = datetime.now();
            T = float(str((s["尾温"])));
            # print("T : %.2f " %T);
            P = int(str((s["尾压"])))/10;
            # print("P : %.2f kpa" %P);
    else:
        print("O2 electrode disconnection");
        O_2 = 0.00;
        T = 0.00;
        P = 0.00;

# subscribe 消息订阅
def on_EGO_subscribe():
  mqttClient2.subscribe("EGO", 0) 
  mqttClient2.on_message = on_message_come 

# subscribe 消息订阅
def on_EGC_subscribe():
  mqttClient1.subscribe("EGC", 0) 
  mqttClient1.on_message = on_message_come 
    
def myTestFunc():
    while True:
        time.sleep(2);
        T_native = datetime.now();
        #上传数据
        #连接数据库
        global Table_name
        db = MySQLdb.connect(host="192.168.31.67",user="root",password="123456",database="mydb") #,use_unicode=1, charset='utf8'
        cursor = db.cursor()
        sql = "INSERT INTO {Tablename}(time,CO2,O2,E_T,E_P) values(%s , %s, %s, %s, %s)".format(Tablename=Table_name) ;
        str_native = datetime.now().strftime("%Y-%m-%d %H:%M:%S");
        data = (str_native,CO_2,O_2,T,P);
        cursor.execute(sql,data);
        db.commit();
        cursor.close();
        db.close();

def thread_EGO_func():
    on_EGO_mqtt_connect()
    on_EGO_subscribe()

def thread_EGC_func():
    on_EGC_mqtt_connect()
    on_EGC_subscribe()

def many_thread():
    build_MySQL_Table();
    threads = [];
    t = threading.Thread(target=myTestFunc);#主线程
    threads.append(t);
    t.setDaemon(True);  # 给每个子线程添加守护线程
    t = threading.Thread(target=thread_EGO_func);
    threads.append(t);
    t.setDaemon(True);  
    t = threading.Thread(target=thread_EGC_func);
    threads.append(t);
    t.setDaemon(True);  
    for t in threads:  # 循环启动线程
        t.start();
    while True:
        pass;


if __name__ == '__main__':
    many_thread()

  嵌入式 最新文章
基于高精度单片机开发红外测温仪方案
89C51单片机与DAC0832
基于51单片机宠物自动投料喂食器控制系统仿
《痞子衡嵌入式半月刊》 第 68 期
多思计组实验实验七 简单模型机实验
CSC7720
启明智显分享| ESP32学习笔记参考--PWM(脉冲
STM32初探
STM32 总结
【STM32】CubeMX例程四---定时器中断(附工
上一篇文章      下一篇文章      查看所有文章
加:2021-11-14 21:52:49  更:2021-11-14 21:53:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/6 22:16:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码