上篇文章“用Python实现Modbus-RTU协议及串口调试(一)”实现了简单的Modbus-RTU协议的03/04功能号的寄存器值读取。本文更深入一些,实现用线程读取仪表数据。原因是这块仪表由于是结算用计量表,所以有两个企业都要同时对其进行读取,我们知道RS485协议规定只能有一台主站读取从站,多台主站会造成数据冲突。怎么解决这个问题呢,我在某宝上发现有一种东西叫RS485数据分配器,可以同时多主站读取从站数据,原理就是RS485数据分配器会将主站发送的指令帧先缓存起来,按顺序依次向从站发送,从站返回数据帧后再返回给原主站。只要每个主站的采集数据频率不要太快,还是可以解决问题的。这样我的调试程序就得改造一下,得实现同时使用两个不同RS485串口无序读取仪表数据,来测试这个RS485数据分配器是否可靠,我使用两个独立线程,每个线程分别使用一路RS485来读取这一个Modbus-RTU协议的仪表数据。 在前篇文章的基础上,先将读取数据功能封装为一个函数。
def readmeterdata(serialname, bandrate, meter_add, start_reg, reg_num):
try:
com = serial.Serial(serialname, bandrate, timeout=0.8)
if not com:
print(f"the serial {serialname} is error")
return
send_data = mmodbus03or04(meter_add, start_reg, reg_num)
if not send_data:
print("Pack the cmd error")
return
com.write(send_data)
recv_data = com.read(reg_num*2+5)
com.close()
if recv_data and len(recv_data) > 0:
retdata = smodbus03or04(recv_data)
if retdata:
return retdata
else:
return
else:
return
except Exception as e:
return
如果读取到仪表数据则会返回读取数据的列表retdata。 再定义线程函数,注意下面线程函数中如果没有仪表数据则使用随机数,用来测试表格的显示。
console = Console()
threadrun = True
def readdatathread(serialname, bandrate, meteradd, start_reg, reg_num):
readnums = 1
errnums = 0
now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)
if not now_data:
errnums += 1
now_data = []
for i in range(int(regnums/2)):
value = random.random() * 100
now_data.append(value)
with Live(generate_table(now_data), refresh_per_second=4) as live:
while threadrun:
live.update(generate_table(now_data))
now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)
if not now_data:
errnums += 1
now_data = []
for i in range(int(regnums / 2)):
value = random.random() * 100
now_data.append(value)
readnums += 1
time.sleep(1)
console.print(Panel(f"[yellow]{serialname}线程: 读取总次数={readnums} 错误次数={errnums}", title="统计"))
首先定义了两个全局变量,一个是Consol对象实例,Consol对象是Rich库中的控制台对象,这里的Rich库是一个在终端中显示富文本、表格等高级内容的Python库,这里我用它来实现实时数据的显示。实时数据用表格形式显示,如下图:
要实现终端中的表格数据实时刷新,要使用Rich库的Live对象和表格生成函数,函数中的
with Live(generate_table(now_data), refresh_per_second=4) as live:
这行代码用来生成Live对象实例,它需要一个刷新数据用的表格生成函数。
live.update(generate_table(now_data))
这行代码就是使用Live对象实例来刷新数据。Rich是一个很有意思的库而且支持中文,给喜欢在终端中编写代码的朋友带来不少编程乐趣。Rich库的具体用法有兴趣的朋友请参考Rich库的github网址。Rich的演示界面如下图:
表格生成函数如下:
def generate_table(nowdata) -> Table:
table = Table(show_header=True, header_style="bold magenta")
table.add_column("No", width = 4)
table.add_column("Data", width=12)
for i in range(len(nowdata)):
table.add_row("[red]"+str(i + 1), f"[green]{nowdata[i]:.3f}")
return table
它接收一个数值列表参数构造表格。 需要注意的是由于终端中没法同时显示两个实时数据表格,所以两个线程不能都显示实时数据,要再构造一个不显示数据的读取数据线程,如下:
def readdatathread2(serialname, bandrate, meteradd, start_reg, reg_num):
readnums = 0
errnums = 0
print(f"{serialname} thread start")
while threadrun:
readnums += 1
now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)
if not now_data:
errnums += 1
time.sleep(1)
console.print(Panel(f"[yellow]{serialname}线程: 读取总次数={readnums} 错误次数={errnums}", title="统计"))
两个线程函数结束时都统计了读取数据的总次数和错误次数并用Rich库的面板显示出来,以此判断RS485数据分配器的可靠性。 最后是构造线程,执行一定时间后退出,查看两个数据读取线程的统计结果。如下:
if __name__ == '__main__':
strcom = "com3"
comrate = 9600
slaveadd = 1
startreg = 0
regnums = 20
thread1 = threading.Thread(target=readdatathread, args=(strcom, comrate, slaveadd, startreg, regnums))
thread1.start()
time.sleep(1)
thread2 = threading.Thread(target=readdatathread2, args=(strcom, comrate, slaveadd, startreg, regnums))
thread2.start()
time.sleep(30)
threadrun = False
其中线程1的数据会如上图的实时数据表格进行显示。 可以运行的完整代码如下,当然使用的几个库要先安装。
import serial
import crcmod
import time, sys
import struct
import threading
import random
from rich.console import Console
from rich.table import Column, Table
from rich.live import Live
def crc16(veritydata):
if not veritydata:
return
crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
return crc16(veritydata)
def checkcrc(data):
if not data:
return False
if len(data) <= 2:
return False
nocrcdata = data[:-2]
oldcrc16 = data[-2:]
oldcrclist = list(oldcrc16)
crcres = crc16(nocrcdata)
crc16byts = crcres.to_bytes(2, byteorder="little", signed=False)
crclist = list(crc16byts)
if oldcrclist[0] != crclist[0] or oldcrclist[1] != crclist[1]:
return False
return True
def mmodbus03or04(add, startregadd, regnum, funcode=3):
if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D:
print("Error: parameter error")
return
if funcode != 3 and funcode != 4:
print("Error: parameter error")
return
sendbytes = add.to_bytes(1, byteorder="big", signed=False)
sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \
regnum.to_bytes(2, byteorder="big", signed=False)
crcres = crc16(sendbytes)
crc16bytes = crcres.to_bytes(2, byteorder="little", signed=False)
sendbytes = sendbytes + crc16bytes
return sendbytes
def smodbus03or04(recvdata, valueformat=0, intsigned=False):
if not recvdata:
print("Error: data error")
return
if not checkcrc(recvdata):
print("Error: crc error")
return
datalist = list(recvdata)
if datalist[1] != 0x3 and datalist[1] != 0x4:
print("Error: recv data funcode error")
return
bytenums = datalist[2]
if bytenums % 2 != 0:
print("Error: recv data reg data error")
return
retdata = []
if valueformat == 0:
floatnums = bytenums / 4
floatlist = [0, 0, 0, 0]
for i in range(int(floatnums)):
floatlist[1] = datalist[3+i*4]
floatlist[0] = datalist[4+i*4]
floatlist[3] = datalist[5+i*4]
floatlist[2] = datalist[6+i*4]
bfloatdata = bytes(floatlist)
[fvalue] = struct.unpack('f', bfloatdata)
retdata.append(fvalue)
elif valueformat == 1:
shortintnums = bytenums / 2
for i in range(int(shortintnums)):
btemp = recvdata[3+i*2:5+i*2]
shortvalue = int.from_bytes(btemp, byteorder="big", signed=intsigned)
retdata.append(shortvalue)
return retdata
def readmeterdata(serialname, bandrate, meter_add, start_reg, reg_num):
try:
com = serial.Serial(serialname, bandrate, timeout=0.8)
if not com:
print(f"the serial {serialname} is error")
return
send_data = mmodbus03or04(meter_add, start_reg, reg_num)
if not send_data:
print("Pack the cmd error")
return
com.write(send_data)
recv_data = com.read(reg_num*2+5)
com.close()
if recv_data and len(recv_data) > 0:
retdata = smodbus03or04(recv_data)
if retdata:
return retdata
else:
return
else:
return
except Exception as e:
return
console = Console()
threadrun = True
def readdatathread(serialname, bandrate, meteradd, start_reg, reg_num):
readnums = 1
errnums = 0
now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)
if not now_data:
errnums += 1
now_data = []
for i in range(int(regnums/2)):
value = random.random() * 100
now_data.append(value)
with Live(generate_table(now_data), refresh_per_second=4) as live:
while threadrun:
live.update(generate_table(now_data))
now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)
if not now_data:
errnums += 1
now_data = []
for i in range(int(regnums / 2)):
value = random.random() * 100
now_data.append(value)
readnums += 1
time.sleep(1)
console.print(f"\n{serialname} theard: read nums={readnums} err nums={errnums}")
console.print(f"[blue]the {serialname} thread is exiting")
def readdatathread2(serialname, bandrate, meteradd, start_reg, reg_num):
readnums = 0
errnums = 0
print(f"{serialname} thread start")
while threadrun:
readnums += 1
now_data = readmeterdata(serialname, bandrate, slaveadd, startreg, regnums)
if not now_data:
errnums += 1
time.sleep(1)
print(f"\n{serialname} thread: read nums={readnums} err nums={errnums}")
print(f"the {serialname} thread is exiting")
def generate_table(nowdata) -> Table:
table = Table(show_header=True, header_style="bold magenta")
table.add_column("No", width = 4)
table.add_column("Data", width=12)
for i in range(len(nowdata)):
table.add_row("[red]"+str(i + 1), f"[green]{nowdata[i]:.3f}")
return table
if __name__ == '__main__':
strcom = "com3"
strcom2 = "com4"
comrate = 9600
slaveadd = 1
startreg = 0
regnums = 20
thread1 = threading.Thread(target=readdatathread, args=(strcom, comrate, slaveadd, startreg, regnums))
thread1.start()
time.sleep(1)
thread2 = threading.Thread(target=readdatathread2, args=(strcom2, comrate, slaveadd, startreg, regnums))
thread2.start()
time.sleep(30)
threadrun = False
|