Pyserial库的安装
将python版本更新到2.7及以上,或3.4及以上,然后直接使用pip安装:
pip install pyserial
Serial对象
导入pyserial包后使用serial.Serial创建。 此处因为业务需要,需要同时在windows和linux或macos下访问此应用。 在windows和linux中,端口名称不相同。 在windows下,端口名形如【COMx】(x为整数); 在linux下,端口名形如【/dev/xxxxx】(x为具体端口名)。 因此还引入了platform来判断系统以获取不同系统下的端口。
import serial
def create_connection(self, port):
if self._system.lower() == "darwin":
self.ser = serial.Serial(port="/dev/{}".format(port),
baudrate=115200,
bytesize=8,
stopbits=1)
elif self._system.lower() == "windows":
self.ser = serial.Serial(port=port,
baudrate=115200,
bytesize=8,
stopbits=1)
此处应根据实际业务需求设置【校验位】,【超时时间】等属性。
常用属性
以下为Serial对象所有的属性:
port: str
baudrate: int
bytesize
parity
stopbits
timeout: float
xonxoff: bool
rtscts: bool
dsrdtr: bool
write_timeout: float
inter_byte_timeout: float
in_waiting: int
out_waiting: int
- 其中【port】【baudrate】【bytesize】【parity】【stopbits】【in_waiting】很常用。
- 【timeout】的设置会影响read()方法的行为:
- 设置为None时,read()会一直从缓冲区中读取数据,直到指定的字符数量已被读取。
- 设置为0时,read()会立即返回读到的从0到指定数量的字符。
- 设置为x(x为整数)时,read()会在达到x秒时返回读到的所有字符,或在此之前返回指定数 量的字符。
常用方法
def serial.tool.list_ports()
"""返回所有检测到的串口
"""
serial.Serial.open()
"""打开串口
"""
serial.Serial.close()
"""关闭串口
"""
serial.Serial.read(size=1)
"""从串口中读取最多【size】个的字符
"""
serial.Serial.write(data)
"""向串口发送数据【data】,【data】的类型为bytes, bytearray或str。
"""
serial.Serial.flush()
"""等待所有数据发送完毕
"""
例子
封装案例:
import serial
import serial.tools.list_ports
import platform
from time import sleep
class PortManager(object):
"""操作串口的类
Attributes:
_system: str - 当前应用所在环境的系统类型
ser: serial.Serial - 串口链接的对象
_serial_port: str - 串口名称
"""
_system = platform.system()
ser: serial.Serial
_serial_port: str
def set_port(self, port):
"""_serial_port的setter"""
self._serial_port = port
def get_port(self):
"""_serial_port的getter"""
return self._serial_port
def list_ports(self):
"""获取并返回所有的串口"""
list_p = list(serial.tools.list_ports.comports())
list_ports_name = []
if self._system.lower() == "darwin":
list_ports_name = [str(i.name) for i in list_p]
elif self._system.lower() == "windows":
list_ports_name = [str(i.device) for i in list_p]
return list_ports_name
def create_connection(self, port):
"""创建串口链接"""
if self._system.lower() == "darwin":
self.ser = serial.Serial(port="/dev/{}".format(port),
baudrate=115200,
bytesize=8,
stopbits=1)
elif self._system.lower() == "windows":
self.ser = serial.Serial(port=port,
baudrate=115200,
bytesize=8,
stopbits=1)
def send_data(self, value):
"""向串口发送数据"""
write_data = bytearray.fromhex(value)
try:
sleep(0.1)
if self.ser.out_waiting:
self.ser.reset_output_buffer()
if self.ser.write(write_data):
self.ser.flush()
else:
if self.ser.write(write_data):
self.ser.flush()
self.ser.reset_output_buffer()
return True
except Exception as e:
print(e)
self.ser.reset_output_buffer()
return False
def read_data(self):
"""从串口读取数据"""
try:
sleep(0.1)
if self.ser.in_waiting:
bs = self.ser.read(self.ser.in_waiting).hex()
self.ser.reset_input_buffer()
res = ''
for i in range(len(bs)):
res += bs[i]
if i % 2 == 1:
res += ' '
res = res.rstrip(' ')
return res
else:
sleep(0.1)
if self.ser.in_waiting:
bs = self.ser.read(self.ser.in_waiting).hex()
self.ser.reset_input_buffer()
res = ''
for i in range(len(bs)):
res += bs[i]
if i % 2 == 1:
res += ' '
res = res.rstrip(' ')
return res
else:
self.ser.reset_input_buffer()
return ""
except Exception as e:
print(e)
self.ser.reset_input_buffer()
return None
def close(self, *ser):
"""关闭串口"""
if ser:
ser[0].close()
else:
self.ser.close()
小结
这是用bootstrap构建的一个web app,主要用途是配置测试公司客制化的继电器板卡。 此app就是利用pyserial与串口通信。
配置组模式:
板卡控制面板:
板卡配置面板:
连接错误处理:
多板卡级联时的板卡切换:
|