这是一个针对特定应用的测试工具,主要实现两个串口设备之间自动通讯与测试。
界面如下:
代码如下:
#!/usr/bin/env python
# coding=utf-8
import serial
import struct
import time
import math
import threading
import tkinter as tk
from tkinter import ttk
import tkinter.font as tkFont
import serial.tools.list_ports
import re
import json
class uploader(object):
MAX_PACK_SIZE = 64
MAX_FLASH_PRGRAM_TIME = 0.001 # Time on an F7 to send SYNC, RESULT from last data in multi RXed
def __init__(self, portname, baudrate_bootloader):
# Open the port, keep the default timeout short so we can poll quickly.
# On some systems writes can suddenly get stuck without having a
# write_timeout > 0 set.
# chartime 8n1 * bit rate is us
self.chartime = 10 * (1.0 / baudrate_bootloader)
# we use a window approche to SYNC,<result> gathring
self.port = serial.Serial(portname, baudrate_bootloader, timeout=3, write_timeout=0)
def close(self):
if self.port is not None:
self.port.close()
def open(self):
# upload timeout
timeout = time.time() + 0.2
# attempt to open the port while it exists and until timeout occurs
while self.port is not None:
portopen = True
try:
portopen = self.port.is_open
except AttributeError:
portopen = self.port.isOpen()
if not portopen and time.time() < timeout:
try:
self.port.open()
except OSError:
# wait for the port to be ready
time.sleep(0.04)
except serial.SerialException:
# if open fails, try again later
time.sleep(0.04)
else:
break
def __send(self, c):
# print("send " + binascii.hexlify(c))
self.port.write(c)
def __recv(self, count=1):
c = self.port.read(count)
if len(c) < 1:
raise RuntimeError("timeout waiting for data (%u bytes)" % count)
# print("recv " + binascii.hexlify(c))
return c
def __recv_uint8(self):
uint8 = struct.unpack("<B", self.__recv(1))
return uint8[0]
def __recv_int(self):
raw = self.__recv(4)
val = struct.unpack("<I", raw)
return val[0]
def receive_one_message(self):
str_msg = self.port.readline()
if len(str_msg) < 1:
raise RuntimeError("timeout waiting for data")
# print("recv " + binascii.hexlify(c))
return str_msg.decode('utf-8')
class painter(object):
PRINT_ON = 'print_on\r\n'
PRINT_OFF = 'print_off\r\n'
SET_ANTENNA_RF1 = '''ETST {"airAntSelect":"ant1"}\r\n'''
SET_ANTENNA_RF2 = '''ETST {"airAntSelect":"ant2"}\r\n'''
SET_ANTENNA_AUTO = '''ETST {"airAntSelect":"ant2"}\r\n'''
FREQ_LIST = ['2410', '2427', '2444', '2461', '2478']
SET_VGA_HEAD = 'set_tx_vga '
SET_VGA_TAIL = ' \r\n'
SET_GET_VGA = 'get_tx_vga\r\n'
TEST_COUNT = 15
SET_TESTMODE = ['''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2410,"power_db":27}\r\n''',
'''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2427,"power_db":27}\r\n''',
'''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2444,"power_db":27}\r\n''',
'''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2461,"power_db":27}\r\n''',
'''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2478,"power_db":27}\r\n''']
RE_STR_GND = r'.*?fail:.*?([0-9]+).*?SNR:.*?([0-9]+).*?RSSI:.*?([0-9]+).*?RSSI_B:.*?([0-9]+).*?rx_F:.*?([0-9]+).*?stat:.*?([0-9])'
RE_STR_AIR = r'.*?fail:.*?([0-9]+).*?SNR:.*?([0-9]+).*?RSSI:.*?([0-9]+).*?RSSI_B:.*?([0-9]+).*?rx_F:.*?([0-9]+).*?stat:.*?([0-9])'
def __init__(self):
self.file_path = None
self.window = tk.Tk()
scwidth = self.window.winfo_screenwidth()
scheight = self.window.winfo_screenheight()
window_size = '+%d' % ((scwidth - 900) / 2) + '+%d' % ((scheight - 700) / 2)
self.warning = tk.StringVar()
self.fail_RMS_input = tk.StringVar()
self.snr_lmt_input = tk.StringVar()
self.rssi_RMS_input = tk.StringVar()
self.vga_initial_input = tk.StringVar()
self.device_num_input = tk.StringVar()
self.rssi_differ_input = tk.StringVar()
self.auto_increase = tk.IntVar()
self.freq_2410 = tk.IntVar()
self.freq_2427 = tk.IntVar()
self.freq_2444 = tk.IntVar()
self.freq_2461 = tk.IntVar()
self.freq_2478 = tk.IntVar()
self.font_warning = tkFont.Font(size = 16, weight = tkFont.BOLD)
self.window.title("三合一底噪灵敏度测试工具V0.6 2021-04-26")
self.window.geometry(window_size)
self.main_frame = tk.Frame(self.window)
self.input_frame = tk.Frame(self.main_frame)
self.label_port_fc = tk.Label(self.input_frame, text="飞机串口:", width=10)
self.combox_port_fc = ttk.Combobox(self.input_frame, width=40)
self.label_port_gs = tk.Label(self.input_frame, text="遥控器串口:", width=10)
self.combox_port_gs = ttk.Combobox(self.input_frame, width=40)
self.button_start = tk.Button(self.input_frame, text="打开\n串口", width=5, height=2,
command=lambda: self.thread_it(self.start_test))
self.text_warning = tk.Label(self.input_frame, textvariable=self.warning, width=30, height=7, bg='white',
wraplength = 330, font = self.font_warning)
self.checkbox_frame = tk.Frame(self.main_frame)
self.label_freq = tk.Label(self.checkbox_frame, text="测试频率:")
self.checkbox_2410 = tk.Checkbutton(self.checkbox_frame, text="2410MHz ", width=10,
variable=self.freq_2410) # , state = tk.DISABLED)
self.checkbox_2427 = tk.Checkbutton(self.checkbox_frame, text="2427MHz ", width=10,
variable=self.freq_2427) # , state = tk.DISABLED)
self.checkbox_2444 = tk.Checkbutton(self.checkbox_frame, text="2444MHz ", width=10,
variable=self.freq_2444) # , state = tk.DISABLED)
self.checkbox_2461 = tk.Checkbutton(self.checkbox_frame, text="2461MHz ", width=10,
variable=self.freq_2461) # , state = tk.DISABLED)
self.checkbox_2478 = tk.Checkbutton(self.checkbox_frame, text="2478MHz ", width=10,
variable=self.freq_2478) # , state = tk.DISABLED)
self.label_fail_RMS = tk.Label(self.checkbox_frame, text="FAIL_RMS:")
self.label_rssi_RMS = tk.Label(self.checkbox_frame, text="RSSI_RMS:")
self.label_vga_initial = tk.Label(self.checkbox_frame, text="起始VGA:")
self.label_fail_limit = tk.Label(self.checkbox_frame, text="有效范围(0-30)")
self.label_rssi_limit = tk.Label(self.checkbox_frame, text="有效范围(50-110)")
self.label_vga_limit = tk.Label(self.checkbox_frame, text="有效范围(5000-50000)")
self.label_SNR = tk.Label(self.checkbox_frame, text="SNR_LIMIT:")
self.label_SNR_limit = tk.Label(self.checkbox_frame, text="有效范围(0-30)")
self.entry_SNR_LMT = tk.Entry(self.checkbox_frame, textvariable=self.snr_lmt_input,
width=10) # , state = tk.DISABLED)
self.entry_fail_RMS = tk.Entry(self.checkbox_frame, textvariable=self.fail_RMS_input,
width=10) # , state = tk.DISABLED)
self.entry_rssi_RMS = tk.Entry(self.checkbox_frame, textvariable=self.rssi_RMS_input,
width=10) # , state = tk.DISABLED)
self.entry_vga_initial = tk.Entry(self.checkbox_frame, textvariable=self.vga_initial_input,
width=10) # , state = tk.DISABLED)
self.label_device_num = tk.Label(self.checkbox_frame, text="设备编号:")
self.entry_device_num = tk.Entry(self.checkbox_frame, textvariable=self.device_num_input,
width=10) # , state = tk.DISABLED)
self.checkbox_auto_increase = tk.Checkbutton(self.checkbox_frame, text="编号自动增加",
variable=self.auto_increase) # , state = tk.DISABLED)
self.button_test_gs = tk.Button(self.checkbox_frame, text="地端设备测试", command=lambda: self.thread_it(self.gs_test),
state=tk.DISABLED)
self.button_test_fc = tk.Button(self.checkbox_frame, text="天端设备测试", command=lambda: self.thread_it(self.fc_test),
state=tk.DISABLED)
self.button_test_again = tk.Button(self.checkbox_frame, text="重新测试灵敏度", command=lambda: self.thread_it(self.test_sensitive_again),
state=tk.DISABLED)
self.label_rssi_difference = tk.Label(self.checkbox_frame, text="RSSI极限差异值:")
self.entry_rssi_difference = tk.Entry(self.checkbox_frame, textvariable=self.rssi_differ_input,
width=10) # , state = tk.DISABLED)
self.print_frame = tk.Frame(self.main_frame)
self.text_print = tk.Text(self.print_frame, width=130, height=40, wrap=tk.WORD, state=tk.DISABLED)
self.text_scroll = tk.Scrollbar(self.print_frame, command=self.text_print.yview)
self.text_print.config(yscrollcommand=self.text_scroll.set)
self.load_parms()
self.orig_color = self.label_port_fc.cget('bg')
self.main_frame.pack(anchor = tk.CENTER)
self.input_frame.grid(row=1, column=1, padx = 10, sticky='N')
self.label_port_fc.grid(row=1, column=1, pady = 10)
self.combox_port_fc.grid(row=1, column=2, columnspan=2, sticky='W')
self.label_port_gs.grid(row=2, column=1)
self.combox_port_gs.grid(row=2, column=2, columnspan=2, sticky='W')
self.button_start.grid(row=1, column=4, rowspan=2, padx=5)
self.text_warning.grid(row=3, column=2, columnspan=3, pady = 5)
self.checkbox_frame.grid(row=1, column=2, padx = 10)
self.label_freq.grid(row=1, column=1)
self.checkbox_2410.grid(row=1, column=2)
self.checkbox_2427.grid(row=1, column=3)
self.checkbox_2444.grid(row=1, column=4)
self.checkbox_2461.grid(row=2, column=1)
self.checkbox_2478.grid(row=2, column=2)
self.label_fail_RMS.grid(row=3, column=1, sticky='E')
self.entry_fail_RMS.grid(row=3, column=2)
self.label_fail_limit.grid(row=3, column=3, columnspan=2, sticky='W')
self.label_rssi_RMS.grid(row=4, column=1, sticky='E')
self.entry_rssi_RMS.grid(row=4, column=2)
self.label_rssi_limit.grid(row=4, column=3, columnspan=2, sticky='W')
self.label_vga_initial.grid(row=5, column=1, sticky='E')
self.entry_vga_initial.grid(row=5, column=2)
self.label_vga_limit.grid(row=5, column=3, columnspan=2, sticky='W')
self.label_SNR.grid(row=6, column=1, sticky='E')
self.entry_SNR_LMT.grid(row=6, column=2)
self.label_SNR_limit.grid(row=6, column=3, columnspan=2, sticky='W')
self.label_device_num.grid(row=7, column=1, sticky='E')
self.entry_device_num.grid(row=7, column=2)
self.checkbox_auto_increase.grid(row=7, column=3, columnspan=2, sticky='W')
self.label_rssi_difference.grid(row=8, column=1, sticky='E')
self.entry_rssi_difference.grid(row=8, column=2)
self.button_test_gs.grid(row=9, column=1, pady=5)
self.button_test_fc.grid(row=9, column=2, pady=5)
self.button_test_again.grid(row=9, column=3, columnspan=2, pady=5)
self.print_frame.grid(row=2, column=1, columnspan=2, pady = 10, padx = 10)
self.text_print.pack(side=tk.LEFT, fill=tk.Y)
self.text_scroll.pack(side=tk.RIGHT, fill=tk.Y)
port_list = list(serial.tools.list_ports.comports())
self.port_list_des = []
self.port_list_str = []
for port in port_list:
port_str = list(port)
self.port_list_des.append(port_str[0] + ' ' + port_str[1])
self.port_list_str.append(port_str[0])
self.combox_port_fc['value'] = self.port_list_des
self.combox_port_gs['value'] = self.port_list_des
self.window.bind("<Button-1>", self.set_default_focus)
""" 如果添加了新的输入框对象,请将其加入以下entry_set列表中,否则无法点击输入 """
self.entry_set = [self.combox_port_fc, self.combox_port_gs, self.text_print, self.entry_rssi_RMS,
self.entry_fail_RMS, self.entry_vga_initial, self.entry_device_num,
self.entry_SNR_LMT, self.entry_rssi_difference]
self.warning.set("请选择串口设备")
self.print_buffer = [' '] * 20
self.print_buffer[0] = 'Waiting for logs...\r\n---------------------------------------------------'
self.current_msg_index = 1
self.sent_msg_index = 0
self.test_state = 0
self.thread_lock = threading.Lock()
self.window.after(100, self.update_gui)
""" 设置鼠标焦点,设置下拉框条目 """
def set_default_focus(self, event):
if event.widget not in self.entry_set:
self.window.focus_set()
else:
port_list = list(serial.tools.list_ports.comports())
self.port_list_des = []
self.port_list_str = []
for port in port_list:
port_str = list(port)
self.port_list_des.append(port_str[0] + ' ' + port_str[1])
self.port_list_str.append(port_str[0])
self.combox_port_fc['value'] = self.port_list_des
self.combox_port_gs['value'] = self.port_list_des
@staticmethod
def thread_it(func, *args):
t = threading.Thread(target=func, args=args)
t.setDaemon(True)
t.start()
""" 加载保存在JSON文件中的参数 """
def load_parms(self):
try:
with open('XD_STconfig.json', 'r') as f:
self.parm_save = json.load(f)
if self.parm_save['2410']:
self.checkbox_2410.select()
if self.parm_save['2427']:
self.checkbox_2427.select()
if self.parm_save['2444']:
self.checkbox_2444.select()
if self.parm_save['2461']:
self.checkbox_2461.select()
if self.parm_save['2478']:
self.checkbox_2478.select()
if self.parm_save['increase']:
self.checkbox_auto_increase.select()
self.fail_RMS_input.set(self.parm_save['fail_rms'])
self.rssi_RMS_input.set(self.parm_save['rssi_rms'])
self.vga_initial_input.set(self.parm_save['vga'])
self.snr_lmt_input.set(self.parm_save['snr'])
self.device_num_input.set(self.parm_save['number'])
self.rssi_differ_input.set(self.parm_save['rssi_diff'])
except Exception:
self.checkbox_2410.select()
self.checkbox_2427.select()
self.checkbox_2444.select()
self.checkbox_2461.select()
self.checkbox_2478.select()
self.fail_RMS_input.set('10')
self.rssi_RMS_input.set('93')
self.vga_initial_input.set('15000')
self.snr_lmt_input.set('7')
self.checkbox_auto_increase.select()
self.device_num_input.set('1')
self.rssi_differ_input.set('10')
self.parm_save = {'increase':1, '2410':1, '2427':1, '2444':1, '2461':1, '2478':1,
'fail_rms':'10', 'rssi_rms':'93', 'vga':'15000', 'snr':'7', 'number':'1', 'rssi_diff':'10'}
with open('XD_STconfig.json','w+') as f:
json.dump(self.parm_save, f)
""" 打印三合一消息 """
def print_msg(self, msg):
self.print_buffer[self.current_msg_index] = msg
self.current_msg_index += 1
if self.current_msg_index > 19:
self.current_msg_index = 0
""" 更新GUI,在主线程中更新GUI元素,否则程序容易崩溃 """
def update_gui(self):
self.text_print.config(state=tk.NORMAL)
while self.current_msg_index != self.sent_msg_index:
self.text_print.insert(tk.END, '\r\n' + self.print_buffer[self.sent_msg_index])
self.sent_msg_index += 1
if self.sent_msg_index > 19:
self.sent_msg_index = 0
# print(self.text_scroll.get())
if self.text_scroll.get()[1] > 0.98:
self.text_print.see(tk.END)
while len(self.text_print.get(0.0, tk.END)) > 10000:
self.text_print.delete(0.0, 3.0)
self.text_print.config(state=tk.DISABLED)
with self.thread_lock:
if self.test_state == 1:
self.test_state = 0
self.button_test_gs.config(state=tk.DISABLED)
self.button_test_fc.config(state=tk.DISABLED)
self.button_test_again.config(state=tk.DISABLED)
self.text_warning.config(fg='black')
elif self.test_state == 2:
self.test_state = 0
self.button_test_gs.config(state=tk.NORMAL)
self.button_test_fc.config(state=tk.NORMAL)
self.button_test_again.config(state=tk.NORMAL)
if self.test_result_flag:
self.text_warning.config(fg='red')
else:
self.text_warning.config(fg='green')
elif self.test_state == 3:
self.test_state = 0
self.button_test_gs.config(state=tk.DISABLED)
self.button_test_fc.config(state=tk.DISABLED)
self.button_test_again.config(state=tk.DISABLED)
self.button_start.config(state=tk.NORMAL)
self.combox_port_fc.config(state=tk.NORMAL)
self.combox_port_gs.config(state=tk.NORMAL)
self.text_warning.config(fg='black')
elif self.test_state == 4:
self.test_state = 0
self.button_test_gs.config(state=tk.NORMAL)
self.button_test_fc.config(state=tk.NORMAL)
self.button_test_again.config(state=tk.DISABLED)
self.button_start.config(state=tk.DISABLED)
self.combox_port_fc.config(state=tk.DISABLED)
self.combox_port_gs.config(state=tk.DISABLED)
self.window.after(100, self.update_gui)
""" 绑定到 打开串口 按钮的函数 """
def start_test(self):
try:
self.open_com_port()
self.test_state = 4
self.warning.set('点击下列按钮开始测试')
record_headers = '测试编号,设备类型,通道,底噪RSSI,底噪RSSI_B,初始RSSI,初始RSSI_B,初始SNR,实测RSSI,实测RSSI_B,实测SNR,能量值,备注'
if self.file_path is None:
self.file_path = time.strftime('record %Y-%m-%d %Hh-%Mm-%Ss.csv', time.localtime(time.time()))
with open(self.file_path, 'a') as record_file:
print(record_headers.decode('utf-8').encode('gbk'), file=record_file)
except Exception:
return
""" 设置勾选的测试频率 """
def select_freq(self):
self.test_freq_seq = [0,0,0,0,0]
if self.freq_2410.get():
self.test_freq_seq[0] = 1
if self.freq_2427.get():
self.test_freq_seq[1] = 1
if self.freq_2444.get():
self.test_freq_seq[2] = 1
if self.freq_2461.get():
self.test_freq_seq[3] = 1
if self.freq_2478.get():
self.test_freq_seq[4] = 1
for item in self.test_freq_seq:
if item:
return False
self.warning.set('请勾选测试频率')
return True
""" 设置输入的测试参数 """
def para_input(self):
try:
self.fail_rms_limit = int(self.fail_RMS_input.get())
self.rssi_rms_limit = int(self.rssi_RMS_input.get())
self.vga_initial = int(self.vga_initial_input.get())
self.test_number = int(self.device_num_input.get())
self.snr_limit = int(self.snr_lmt_input.get())
self.rssi_difference = int(self.rssi_differ_input.get())
if (self.fail_rms_limit < 0 or self.fail_rms_limit > 30
or self.rssi_rms_limit > 110 or self.rssi_rms_limit < 50
or self.vga_initial > 50000 or self.vga_initial < 5000
or self.snr_limit < 0 or self.snr_limit > 30
):
self.warning.set('输入值超出范围')
return True
with open('XD_STconfig.json','w+') as f:
self.parm_save['2410'] = self.freq_2410.get()
self.parm_save['2427'] = self.freq_2427.get()
self.parm_save['2444'] = self.freq_2444.get()
self.parm_save['2461'] = self.freq_2461.get()
self.parm_save['2478'] = self.freq_2478.get()
self.parm_save['increase'] = self.auto_increase.get()
self.parm_save['fail_rms'] = self.fail_RMS_input.get()
self.parm_save['rssi_rms'] = self.rssi_RMS_input.get()
self.parm_save['vga'] = self.vga_initial_input.get()
self.parm_save['snr'] = self.snr_lmt_input.get()
self.parm_save['number'] = self.device_num_input.get()
self.parm_save['rssi_diff'] = self.rssi_differ_input.get()
json.dump(self.parm_save, f)
return False
except Exception:
self.warning.set('请检查输入值是否正确')
return True
@staticmethod
def get_rms(records):
return int(math.floor(math.sqrt(sum([x ** 2 for x in records]) / len(records))))
""" 尝试打开串口 """
def open_com_port(self):
time.sleep(0.1)
port_fc_des = self.combox_port_fc.get()
port_gs_des = self.combox_port_gs.get()
if port_fc_des not in self.port_list_des:
self.warning.set('错误!!!:\n未选择正确的COM口!')
raise RuntimeError("com port invalid")
else:
port_fc = self.port_list_str[self.port_list_des.index(port_fc_des)]
if port_gs_des not in self.port_list_des:
self.warning.set('错误!!!:\n未选择正确的COM口!')
raise RuntimeError("com port invalid")
else:
port_gs = self.port_list_str[self.port_list_des.index(port_gs_des)]
try:
self.com_fc = uploader(port_fc, 115200)
self.com_gs = uploader(port_gs, 115200)
time.sleep(0.1)
self.com_fc.open()
self.com_gs.open()
except Exception:
self.warning.set('错误!!!:\n串口初始化失败')
raise RuntimeError("com port open failed")
finally:
self.com_fc.close()
self.com_gs.close()
""" 绑定到 重新测试灵敏度 按钮的函数 """
def test_sensitive_again(self):
try:
with self.thread_lock:
self.test_state = 1
self.com_fc.open()
self.com_gs.open()
if self.select_freq() or self.para_input():
return
self.noise_record_rows = []
for freq_selected in painter.FREQ_LIST:
self.noise_record_rows.append('%d,%s,%s,N/A,N/A,' % (self.test_number, self.test_describe, freq_selected))
self.test_sensitive()
except Exception:
self.warning.set('请检查线路连接,重新开始')
with self.thread_lock:
self.test_state = 3
finally:
self.com_fc.close()
self.com_gs.close()
""" 绑定到 天端设备测试 按钮的函数 """
def fc_test(self):
try:
with self.thread_lock:
self.test_state = 1
self.com_fc.open()
self.com_gs.open()
if self.select_freq() or self.para_input():
return
self.test_describe = 'F'
self.noise_record_rows = []
self.com_fc.port.write(painter.PRINT_OFF.encode())
time.sleep(0.2)
self.com_fc.port.flushInput()
for freq_selected in painter.FREQ_LIST:
test_index = painter.FREQ_LIST.index(freq_selected)
self.noise_record_rows.append('%d,%s,%s,' % (self.test_number, self.test_describe, freq_selected))
if self.test_freq_seq[test_index] == 0:
self.noise_record_rows[test_index] += 'N/A,N/A,'
else:
i = 0
rssi_nums = []
rssi_b_nums = []
self.warning.set('正在测试飞机端底噪\n频率%sMHz' % freq_selected)
self.com_fc.port.write(painter.PRINT_OFF.encode())
time.sleep(0.2)
self.com_fc.port.flushInput()
while 1:
self.com_fc.port.write(painter.SET_TESTMODE[test_index].encode())
read_str = self.com_fc.receive_one_message()
# match_obj = re.match(r'.*?cmd_test_mode: 1, 4,.*?([0-9]+), 27', read_str)
match_obj = re.match(r'.*?ETST {"status":"OK"}', read_str)
if match_obj is not None:
self.print_msg(read_str)
break
self.com_fc.port.write(painter.PRINT_ON.encode())
time.sleep(0.2)
self.com_fc.port.flushInput()
while i < painter.TEST_COUNT:
read_str = self.com_fc.receive_one_message()
match_obj = re.match(painter.RE_STR_AIR, read_str)
if match_obj is not None:
self.print_msg(read_str)
rssi_nums.append(int(match_obj.group(3)))
rssi_b_nums.append(int(match_obj.group(4)))
i += 1
noise_rssi = self.get_rms(rssi_nums)
noise_rssi_b = self.get_rms(rssi_b_nums)
self.noise_record_rows[test_index] += ('%d,%d,' % (noise_rssi, noise_rssi_b))
time.sleep(0.5)
self.test_sensitive()
if self.auto_increase.get():
self.test_number += 1
self.device_num_input.set(str(self.test_number))
except Exception:
self.warning.set('请检查线路连接,重新开始')
with self.thread_lock:
self.test_state = 3
finally:
self.com_fc.close()
self.com_gs.close()
""" 绑定到 地端设备测试 按钮的函数 """
def gs_test(self):
try:
with self.thread_lock:
self.test_state = 1
self.com_gs.open()
self.com_fc.open()
if self.select_freq() or self.para_input():
return
self.test_describe = 'G'
self.noise_record_rows = []
self.com_gs.port.write(painter.PRINT_OFF.encode())
time.sleep(0.2)
self.com_fc.port.flushInput()
for freq_selected in painter.FREQ_LIST:
test_index = painter.FREQ_LIST.index(freq_selected)
self.noise_record_rows.append('%d,%s,%s,' % (self.test_number, self.test_describe, freq_selected))
if self.test_freq_seq[test_index] == 0:
self.noise_record_rows[test_index] += 'N/A,N/A,'
else:
i = 0
rssi_nums = []
rssi_b_nums = []
self.warning.set('正在测试地面站底噪\n频率%sMHz' % freq_selected)
self.com_gs.port.write(painter.PRINT_OFF.encode())
time.sleep(0.2)
self.com_gs.port.flushInput()
while 1:
self.com_gs.port.write(painter.SET_TESTMODE[test_index].encode())
read_str = self.com_gs.receive_one_message()
match_obj = re.match(r'.*?ETST {"status":"OK"}', read_str)
if match_obj is not None:
self.print_msg(read_str)
break
self.com_gs.port.write(painter.PRINT_ON.encode())
time.sleep(0.2)
self.com_gs.port.flushInput()
while i < painter.TEST_COUNT:
read_str = self.com_gs.receive_one_message()
match_obj = re.match(painter.RE_STR_GND, read_str)
if match_obj is not None:
self.print_msg(read_str)
rssi_nums.append(int(match_obj.group(3)))
rssi_b_nums.append(int(match_obj.group(4)))
i += 1
noise_rssi = self.get_rms(rssi_nums)
noise_rssi_b = self.get_rms(rssi_b_nums)
self.noise_record_rows[test_index] += ('%d,%d,' % (noise_rssi, noise_rssi_b))
time.sleep(0.5)
self.test_sensitive()
if self.auto_increase.get():
self.test_number += 1
self.device_num_input.set(str(self.test_number))
except Exception:
self.warning.set('请检查线路连接,重新开始')
with self.thread_lock:
self.test_state = 3
finally:
self.com_fc.close()
self.com_gs.close()
""" 测试灵敏度 """
def test_sensitive(self):
test_result = ' '
record_rows = [i for i in self.noise_record_rows]
for freq_selected in painter.FREQ_LIST:
test_index = painter.FREQ_LIST.index(freq_selected)
if self.test_freq_seq[test_index] == 0:
record_rows[test_index] += 'N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A'
else:
set_vga = self.vga_initial - 1000
i = 0
rssi_nums = []
rssi_b_nums = []
snr_nums = []
fail_RMS = 0
rssi_refer_RMS = 0
signle_result_string = ''
if self.test_describe == 'G':
self.warning.set('正在测试地面站灵敏度\n频率%sMHz' % freq_selected)
com_a = self.com_gs
com_b = self.com_fc
re_match_string = painter.RE_STR_GND
else:
self.warning.set('正在测试飞机端灵敏度\n频率%sMHz' % freq_selected)
com_a = self.com_fc
com_b = self.com_gs
re_match_string = painter.RE_STR_AIR
com_a.port.write(painter.PRINT_OFF.encode())
com_b.port.write(painter.PRINT_OFF.encode())
time.sleep(0.2)
com_a.port.flushInput()
com_b.port.flushInput()
while 1:
com_b.port.write(painter.SET_TESTMODE[test_index].encode())
read_str = com_b.receive_one_message()
# match_obj = re.match(r'.*?cmd_test_mode: 1, 4,.*?([0-9]+), 27', read_str)
match_obj = re.match(r'.*?ETST {"status":"OK"}', read_str)
if match_obj is not None:
self.print_msg(read_str)
break
if self.test_describe == 'G':
self.com_fc.port.flushInput()
while 1:
self.com_fc.port.write(painter.SET_ANTENNA_RF1.encode())
read_str = self.com_fc.receive_one_message()
# match_obj = re.match(r'.*?set_air_ant 1', read_str)
match_obj = re.match(r'.*?ETST {"status":"OK"}', read_str)
if match_obj is not None:
self.print_msg(read_str)
break
com_a.port.write(painter.SET_TESTMODE[test_index].encode())
com_a.port.write(painter.PRINT_ON.encode())
time.sleep(0.2)
com_a.port.flushInput()
while 1:
read_str = com_a.receive_one_message()
match_obj = re.match(re_match_string, read_str)
if match_obj is not None:
self.print_msg(read_str)
if freq_selected == match_obj.group(5):
if match_obj.group(6) == '4':
break
else:
com_a.port.write(painter.SET_TESTMODE[test_index].encode())
time.sleep(0.2)
com_a.port.flushInput()
time.sleep(0.2)
com_a.port.flushInput()
while i < painter.TEST_COUNT:
read_str = com_a.receive_one_message()
self.print_msg(read_str)
match_obj = re.match(re_match_string, read_str)
if match_obj is not None and match_obj.group(6) == '4':
rssi_nums.append(int(match_obj.group(3)))
snr_nums.append(int(match_obj.group(2)))
rssi_b_nums.append(int(match_obj.group(4)))
i += 1
initial_rssi = self.get_rms(rssi_nums)
initial_rssi_b = self.get_rms(rssi_b_nums)
initial_snr = self.get_rms(snr_nums)
record_rows[test_index] += ('%d,%d,%d,' % (initial_rssi, initial_rssi_b, initial_snr))
while fail_RMS < self.fail_rms_limit:
i = 0
fail_counts = []
rssi_nums = []
rssi_b_nums = []
snr_nums = []
if fail_RMS < (self.fail_rms_limit / 2.0):
set_vga += 1000
else:
set_vga += 500
set_vga_com = painter.SET_VGA_HEAD + str(set_vga) + painter.SET_VGA_TAIL
com_b.port.write(set_vga_com.encode())
time.sleep(0.2)
com_b.port.flushInput()
com_b.port.write(painter.SET_GET_VGA.encode())
while 1:
read_str = com_b.receive_one_message()
match_obj = re.match(r'.*?Tx vga : ([0-9]+) mdB', read_str)
if match_obj is not None:
set_vga = int(match_obj.group(1))
self.print_msg(read_str)
break
time.sleep(0.2)
com_a.port.flushInput()
read_str = com_a.receive_one_message()
self.print_msg(read_str)
match_obj = re.match(re_match_string, read_str)
if match_obj is not None:
fail_RMS_old = match_obj.group(1)
while i < painter.TEST_COUNT:
read_str = com_a.receive_one_message()
self.print_msg(read_str)
match_obj = re.match(re_match_string, read_str)
if match_obj is not None and match_obj.group(6) == '4':
fail_RMS_new = match_obj.group(1)
fail_num = int(fail_RMS_new) - int(fail_RMS_old)
fail_counts.append(fail_num)
snr_nums.append(int(match_obj.group(2)))
rssi_nums.append(int(match_obj.group(3)))
rssi_b_nums.append(int(match_obj.group(4)))
i += 1
fail_RMS = self.get_rms(fail_counts)
snr_RMS = self.get_rms(snr_nums)
rssi_RMS = self.get_rms(rssi_nums)
rssi_b_RMS = self.get_rms(rssi_b_nums)
if rssi_RMS > rssi_b_RMS:
rssi_refer_RMS = rssi_b_RMS
else:
rssi_refer_RMS = rssi_RMS
if fail_RMS > 0 and snr_RMS > self.snr_limit:
self.warning.set('频点%sMHz测试SNR不合格' % freq_selected)
signle_result_string = 'SNR Failed'
break
if abs(rssi_RMS - rssi_b_RMS) > self.rssi_difference:
self.warning.set('频点%sMHz测试RSSI和RSSI_B差异过大' % freq_selected)
signle_result_string = 'RSSI unbalance'
break
if len(signle_result_string) > 1:
test_result = test_result + freq_selected + ' '
elif rssi_refer_RMS < self.rssi_rms_limit:
signle_result_string = 'RSSI Failed'
self.warning.set('频点%sMHz测试RSSI不合格' % freq_selected)
test_result = test_result + freq_selected + ' '
else:
signle_result_string = 'OK'
self.warning.set('频点%sMHz通过测试' % freq_selected)
record_rows[test_index] += ('%d,%d,%d,%d,%s' % (rssi_RMS, rssi_b_RMS, snr_RMS, set_vga, signle_result_string))
time.sleep(1.5)
com_a.port.write(painter.PRINT_OFF.encode())
with open(self.file_path, 'a') as record_file:
for row in record_rows:
print(row.encode('gbk'), file=record_file)
if len(test_result) > 2:
self.warning.set('下列频率未通过测试:\n' + test_result)
self.test_result_flag = 1
else:
self.warning.set('设备通过测试')
self.test_result_flag = 0
with self.thread_lock:
self.test_state = 2
def main():
window = painter()
window.window.mainloop()
if __name__ == '__main__':
main()
|