IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> 用Python实现Modbus-RTU协议及串口调试(二) -> 正文阅读

[Python知识库]用Python实现Modbus-RTU协议及串口调试(二)

上篇文章“用Python实现Modbus-RTU协议及串口调试(一)”实现了简单的Modbus-RTU协议的03/04功能号的寄存器值读取。本文更深入一些,实现用线程读取仪表数据。原因是这块仪表由于是结算用计量表,所以有两个企业都要同时对其进行读取,我们知道RS485协议规定只能有一台主站读取从站,多台主站会造成数据冲突。怎么解决这个问题呢,我在某宝上发现有一种东西叫RS485数据分配器,可以同时多主站读取从站数据,原理就是RS485数据分配器会将主站发送的指令帧先缓存起来,按顺序依次向从站发送,从站返回数据帧后再返回给原主站。只要每个主站的采集数据频率不要太快,还是可以解决问题的。这样我的调试程序就得改造一下,得实现同时使用两个不同RS485串口无序读取仪表数据,来测试这个RS485数据分配器是否可靠,我使用两个独立线程,每个线程分别使用一路RS485来读取这一个Modbus-RTU协议的仪表数据。
在前篇文章的基础上,先将读取数据功能封装为一个函数。

# 读取仪表数据并解析返回
def readmeterdata(serialname, bandrate, meter_add, start_reg, reg_num):
    try:
        com = serial.Serial(serialname, bandrate, timeout=0.8)
        if not com:
            print(f"the serial {serialname} is error")
            return
        send_data = mmodbus03or04(meter_add, start_reg, reg_num)
        if not send_data:
            print("Pack the cmd error")
            return
        com.write(send_data)
        recv_data = com.read(reg_num*2+5)
        com.close()
        if recv_data and len(recv_data) > 0:
            retdata = smodbus03or04(recv_data)
            if retdata:
                return retdata
            else:
                return
        else:
            return
    except Exception as e:
        # print(f"Exception : {e}")
        return

如果读取到仪表数据则会返回读取数据的列表retdata。
再定义线程函数,注意下面线程函数中如果没有仪表数据则使用随机数,用来测试表格的显示。

console = Console()
threadrun = True
def readdatathread(serialname, bandrate, meteradd, start_reg, reg_num):
    readnums = 1
    errnums = 0
    now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)
    if not now_data:
        errnums += 1
        now_data = []
        for i in range(int(regnums/2)):
            value = random.random() * 100	# 如果没有仪表数据则使用随机数测试表格
            now_data.append(value)
    with Live(generate_table(now_data), refresh_per_second=4) as live:    
        while threadrun:
            live.update(generate_table(now_data))
            now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)            
            if not now_data:
                errnums += 1
                now_data = []
                for i in range(int(regnums / 2)):
                    value = random.random() * 100	# 如果没有仪表数据则使用随机数测试表格
                    now_data.append(value)
            readnums += 1
            time.sleep(1)   # 1秒读取一次数据
    console.print(Panel(f"[yellow]{serialname}线程: 读取总次数={readnums}  错误次数={errnums}", title="统计"))

首先定义了两个全局变量,一个是Consol对象实例,Consol对象是Rich库中的控制台对象,这里的Rich库是一个在终端中显示富文本、表格等高级内容的Python库,这里我用它来实现实时数据的显示。实时数据用表格形式显示,如下图:
实时数据表格

要实现终端中的表格数据实时刷新,要使用Rich库的Live对象和表格生成函数,函数中的

with Live(generate_table(now_data), refresh_per_second=4) as live:

这行代码用来生成Live对象实例,它需要一个刷新数据用的表格生成函数。

live.update(generate_table(now_data))

这行代码就是使用Live对象实例来刷新数据。Rich是一个很有意思的库而且支持中文,给喜欢在终端中编写代码的朋友带来不少编程乐趣。Rich库的具体用法有兴趣的朋友请参考Rich库的github网址。Rich的演示界面如下图:
Rich库演示

表格生成函数如下:

def generate_table(nowdata) -> Table:
    table = Table(show_header=True, header_style="bold magenta")
    table.add_column("No", width = 4)
    table.add_column("Data", width=12)
    for i in range(len(nowdata)):
        table.add_row("[red]"+str(i + 1), f"[green]{nowdata[i]:.3f}")
    return table

它接收一个数值列表参数构造表格。
需要注意的是由于终端中没法同时显示两个实时数据表格,所以两个线程不能都显示实时数据,要再构造一个不显示数据的读取数据线程,如下:

def readdatathread2(serialname, bandrate, meteradd, start_reg, reg_num):
    readnums = 0
    errnums = 0
    print(f"{serialname} thread start") 
    while threadrun:
        readnums += 1
        now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)            
        if not now_data:
            errnums += 1
        time.sleep(1)   # 1秒读取一次数据
    console.print(Panel(f"[yellow]{serialname}线程: 读取总次数={readnums}  错误次数={errnums}", title="统计"))

两个线程函数结束时都统计了读取数据的总次数和错误次数并用Rich库的面板显示出来,以此判断RS485数据分配器的可靠性。
最后是构造线程,执行一定时间后退出,查看两个数据读取线程的统计结果。如下:

if __name__ == '__main__':
    strcom = "com3"
    comrate = 9600
    slaveadd = 1
    startreg = 0
    regnums = 20

    # 单独使用线程采集仪表数据,线程根据标志threadrun为False时退出
    thread1 = threading.Thread(target=readdatathread, args=(strcom, comrate, slaveadd, startreg, regnums))
    thread1.start()
    time.sleep(1)
    thread2 = threading.Thread(target=readdatathread2, args=(strcom, comrate, slaveadd, startreg, regnums))
    thread2.start()
    time.sleep(30) # 让子弹飞一会,运行30秒再退出
    threadrun = False

其中线程1的数据会如上图的实时数据表格进行显示。
可以运行的完整代码如下,当然使用的几个库要先安装。

import serial
import crcmod
import time, sys
import struct
import threading
import random
from rich.console import Console
from rich.table import Column, Table
from rich.live import Live


# CRC16校验,返回整型数
def crc16(veritydata):
    if not veritydata:
        return
    crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
    return crc16(veritydata)

# 校验数据帧的CRC码是否正确
def checkcrc(data):
    if not data:
        return False
    if len(data) <= 2:
        return False
    nocrcdata = data[:-2]
    oldcrc16 = data[-2:]
    oldcrclist = list(oldcrc16)
    crcres = crc16(nocrcdata)
    crc16byts = crcres.to_bytes(2, byteorder="little", signed=False)
    # print("CRC16:", crc16byts.hex())
    crclist = list(crc16byts)
    if oldcrclist[0] != crclist[0] or oldcrclist[1] != crclist[1]:
        return False
    return True

# Modbus-RTU协议的03或04读取保存或输入寄存器功能主-》从命令帧
def mmodbus03or04(add, startregadd, regnum, funcode=3):
    if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D:
        print("Error: parameter error")
        return
    if funcode != 3 and funcode != 4:
        print("Error: parameter error")
        return
    sendbytes = add.to_bytes(1, byteorder="big", signed=False)
    sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \
                regnum.to_bytes(2, byteorder="big", signed=False)
    crcres = crc16(sendbytes)
    crc16bytes = crcres.to_bytes(2, byteorder="little", signed=False)
    sendbytes = sendbytes + crc16bytes
    return sendbytes


# Modbus-RTU协议的03或04读取保持或输入寄存器功能从-》主的数据帧解析(浮点数2,1,4,3格式,16位短整形(定义正负数))
def smodbus03or04(recvdata, valueformat=0, intsigned=False):
    if not recvdata:
        print("Error: data error")
        return
    if not checkcrc(recvdata):
        print("Error: crc error")
        return
    datalist = list(recvdata)
    if datalist[1] != 0x3 and datalist[1] != 0x4:
        print("Error: recv data funcode error")
        return
    bytenums = datalist[2]
    if bytenums % 2 != 0:
        print("Error: recv data reg data error")
        return
    retdata = []
    if valueformat == 0:
        floatnums = bytenums / 4
        floatlist = [0, 0, 0, 0]
        for i in range(int(floatnums)):
            floatlist[1] = datalist[3+i*4]
            floatlist[0] = datalist[4+i*4]
            floatlist[3] = datalist[5+i*4]
            floatlist[2] = datalist[6+i*4]
            bfloatdata = bytes(floatlist)
            [fvalue] = struct.unpack('f', bfloatdata)
            retdata.append(fvalue)
    elif valueformat == 1:
        shortintnums = bytenums / 2
        for i in range(int(shortintnums)):
            btemp = recvdata[3+i*2:5+i*2]
            shortvalue = int.from_bytes(btemp, byteorder="big", signed=intsigned)
            retdata.append(shortvalue)
    return retdata

# 读取仪表数据并解析返回
def readmeterdata(serialname, bandrate, meter_add, start_reg, reg_num):
    try:
        com = serial.Serial(serialname, bandrate, timeout=0.8)
        if not com:
            print(f"the serial {serialname} is error")
            return
        send_data = mmodbus03or04(meter_add, start_reg, reg_num)
        if not send_data:
            print("Pack the cmd error")
            return
        com.write(send_data)
        recv_data = com.read(reg_num*2+5)
        com.close()
        if recv_data and len(recv_data) > 0:
            retdata = smodbus03or04(recv_data)
            if retdata:
                return retdata
            else:
                return
        else:
            return
    except Exception as e:
        # print(f"Exception : {e}")
        return


# 读取寄存器数据线程
console = Console()
threadrun = True
def readdatathread(serialname, bandrate, meteradd, start_reg, reg_num):
    readnums = 1
    errnums = 0
    now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)
    if not now_data:
        errnums += 1
        now_data = []
        for i in range(int(regnums/2)):
            value = random.random() * 100	# 如果没有仪表数据则使用随机数测试表格
            now_data.append(value)
    with Live(generate_table(now_data), refresh_per_second=4) as live:    
        while threadrun:	# 循环读取数据
            live.update(generate_table(now_data))
            now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)            
            if not now_data:
                errnums += 1
                now_data = []
                for i in range(int(regnums / 2)):
                    value = random.random() * 100 # 如果没有仪表数据则使用随机数测试表格
                    now_data.append(value)
            readnums += 1
            time.sleep(1)   # 1秒读取一次数据
    console.print(f"\n{serialname} theard: read nums={readnums}  err nums={errnums}")
    console.print(f"[blue]the {serialname} thread is exiting")


# 读取寄存器数据线程,只读取数据不更新live对象进行表格显示
def readdatathread2(serialname, bandrate, meteradd, start_reg, reg_num):
    readnums = 0
    errnums = 0
    print(f"{serialname} thread start") 
    while threadrun:	# 循环读取数据
        readnums += 1
        now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)            
        if not now_data:
            errnums += 1
        time.sleep(1)   # 1秒读取一次数据
    print(f"\n{serialname} thread: read nums={readnums}  err nums={errnums}")
    print(f"the {serialname} thread is exiting")

# 实时刷新的数据表格函数
def generate_table(nowdata) -> Table:
    table = Table(show_header=True, header_style="bold magenta")
    table.add_column("No", width = 4)
    table.add_column("Data", width=12)
    for i in range(len(nowdata)):
        table.add_row("[red]"+str(i + 1), f"[green]{nowdata[i]:.3f}")
    return table


if __name__ == '__main__':
    strcom = "com3"
    strcom2 = "com4"
    comrate = 9600
    slaveadd = 1
    startreg = 0
    regnums = 20

    # 单独使用线程采集仪表数据,线程根据标志threadrun为False时退出
    thread1 = threading.Thread(target=readdatathread, args=(strcom, comrate, slaveadd, startreg, regnums))
    thread1.start()
    time.sleep(1)
    thread2 = threading.Thread(target=readdatathread2, args=(strcom2, comrate, slaveadd, startreg, regnums))
    thread2.start()
    time.sleep(30)
    threadrun = False
  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-09-09 11:42:11  更:2021-09-09 11:44:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/27 13:07:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计