这里使用多线程与互斥锁的方法,开启了两个线程一个发送线程,不停的向传感器发送数据回发请求,另一个为监听线程,在回发请求发送后,经过一定的延时操作,接收传感器的回发数据
1.注意报文发送的格式,在python中是以十六进制列表形式发送的.
2.注意线程与互斥锁的使用,在该代码的基础上想要通过报文控制传感器实现更多的功能, 这里python虽然是多线程的,但是传感器控制只是通过486接口进行的,所以本质上是不停的通过各种发送喂不同的报文. 所以多模块的开发依然实在这两个收发线程的基础上决定.
如下图,通过本文程序不同的向激光位移传感器发送数据请求报文,然后接收返回的测距信息(butes格式),还需进行进一步的代码解码 返回值:
打印串口收到的数据: b'\x02\x06\nJ\x03F'
<class 'bytes'>
****************************************************************************************************
打印串口收到的数据: b'\x02\x06\nJ\x03F'
<class 'bytes'>
****************************************************************************************************
打印串口收到的数据: b'\x02\x06\nJ\x03F'
<class 'bytes'>
****************************************************************************************************
打印串口收到的数据: b'\x02\x06\nK\x03G'
<class 'bytes'>
****************************************************************************************************
代码正文:
"""用于通过485串口对位移传感器的数据进行读取"""
import serial
import threading
import time
import platform
python_version = platform.python_version()[0:1]
class Serial_Control():
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
self.t_main_bool = True
self.statistical_fail_num = 0
self.open_serial_control()
def set_port(self, port):
self.port = port
def open_serial_control(self):
t_main = threading.Thread(target=self.init_main_serial_thread)
t_main.setDaemon(True)
t_main.start()
def set_off(self):
self.t_main_bool = False
def init_main_serial_thread(self):
self.ser = serial.Serial(port=self.port, baudrate=self.baudrate)
self.ser.timeout = 2
print("初始化串口成功 34")
t1 = threading.Thread(target=self.serial_listen)
t1.setDaemon(True)
self.mutex_t1_t2_one = threading.Lock()
self.mutex_t1_t2_two = threading.Lock()
self.mutex_t2_t3_one = threading.Lock()
self.mutex_t2_t3_two = threading.Lock()
t2 = threading.Thread(target=self.serial_send)
t2.setDaemon(True)
mutexFlag = self.mutex_t1_t2_two.acquire(True)
t1.start()
t2.start()
print("开启t1,t2完毕\n")
def serial_listen(self):
global python_version
"""aa #开始
例子:
20 #类型
20 #地址
36 #功能码
01 #数据长度
00 #数据
e4 1c #校验
0d #结束
"""
while True:
mutexFlag = self.mutex_t1_t2_one.acquire(True)
n = self.ser.inWaiting()
if n:
return_data = self.ser.read(n)
print("打印串口收到的数据: " + str(return_data))
print(type(return_data))
print("*" * 100)
print("\n")
self.statistical_fail_num = 0
else:
if self.statistical_fail_num > 10:
print("连续" + str(self.statistical_fail_num) + "次没有收到 未收到串口的回发数据,请检测连接本系统的开发板是否匹配 118")
self.statistical_fail_num = self.statistical_fail_num + 1
if self.t_main_bool == False:
break
self.mutex_t1_t2_two.release()
def serial_send(self):
while True:
mutexFlag = self.mutex_t1_t2_two.acquire(True)
self.ser.write([0x02, 0x43, 0xB0, 0x01, 0x03, 0xF2])
time.sleep(0.1)
self.mutex_t1_t2_one.release()
class Control_send():
def __init__(self):
self.x = 0
self.y = 0
self.p = 0
if __name__ == '__main__':
car = Serial_Control(port='COM4', baudrate=9600)
time.sleep(0.5)
while True:
print("*" * 20)
order = input("请输入控制命令(q为退出程序):")
if order == "q":
car.set_off()
break
|