IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 开发测试 -> Python双串口测试工具 -> 正文阅读

[开发测试]Python双串口测试工具

这是一个针对特定应用的测试工具,主要实现两个串口设备之间自动通讯与测试。

界面如下:

代码如下:

#!/usr/bin/env python
# coding=utf-8

import serial
import struct
import time
import math
import threading
import tkinter as tk
from tkinter import ttk
import tkinter.font as tkFont
import serial.tools.list_ports
import re
import json


class uploader(object):
    MAX_PACK_SIZE = 64
    MAX_FLASH_PRGRAM_TIME = 0.001  # Time on an F7 to send SYNC, RESULT from last data in multi RXed

    def __init__(self, portname, baudrate_bootloader):
        # Open the port, keep the default timeout short so we can poll quickly.
        # On some systems writes can suddenly get stuck without having a
        # write_timeout > 0 set.
        # chartime 8n1 * bit rate is us
        self.chartime = 10 * (1.0 / baudrate_bootloader)

        # we use a window approche to SYNC,<result> gathring
        self.port = serial.Serial(portname, baudrate_bootloader, timeout=3, write_timeout=0)

    def close(self):
        if self.port is not None:
            self.port.close()

    def open(self):
        # upload timeout
        timeout = time.time() + 0.2

        # attempt to open the port while it exists and until timeout occurs
        while self.port is not None:
            portopen = True
            try:
                portopen = self.port.is_open
            except AttributeError:
                portopen = self.port.isOpen()

            if not portopen and time.time() < timeout:
                try:
                    self.port.open()
                except OSError:
                    # wait for the port to be ready
                    time.sleep(0.04)
                except serial.SerialException:
                    # if open fails, try again later
                    time.sleep(0.04)
            else:
                break

    def __send(self, c):
        # print("send " + binascii.hexlify(c))
        self.port.write(c)

    def __recv(self, count=1):
        c = self.port.read(count)
        if len(c) < 1:
            raise RuntimeError("timeout waiting for data (%u bytes)" % count)
        # print("recv " + binascii.hexlify(c))
        return c

    def __recv_uint8(self):
        uint8 = struct.unpack("<B", self.__recv(1))
        return uint8[0]

    def __recv_int(self):
        raw = self.__recv(4)
        val = struct.unpack("<I", raw)
        return val[0]

    def receive_one_message(self):
        str_msg = self.port.readline()
        if len(str_msg) < 1:
            raise RuntimeError("timeout waiting for data")
        # print("recv " + binascii.hexlify(c))
        return str_msg.decode('utf-8')


class painter(object):
    PRINT_ON = 'print_on\r\n'
    PRINT_OFF = 'print_off\r\n'
    SET_ANTENNA_RF1 = '''ETST {"airAntSelect":"ant1"}\r\n'''  
    SET_ANTENNA_RF2 = '''ETST {"airAntSelect":"ant2"}\r\n'''      
    SET_ANTENNA_AUTO =  '''ETST {"airAntSelect":"ant2"}\r\n'''
    FREQ_LIST = ['2410', '2427', '2444', '2461', '2478']    
    SET_VGA_HEAD = 'set_tx_vga '
    SET_VGA_TAIL = ' \r\n'
    SET_GET_VGA = 'get_tx_vga\r\n'
    TEST_COUNT = 15
    SET_TESTMODE = ['''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2410,"power_db":27}\r\n''',
                '''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2427,"power_db":27}\r\n''',
                '''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2444,"power_db":27}\r\n''',
                '''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2461,"power_db":27}\r\n''',
                '''ETST {"testMode":"enable","continuousTx":"disable","continuousRx":"disable","mcs":"BPSK_1/2(2.08Mbps)","loFreq":2478,"power_db":27}\r\n''']
    RE_STR_GND = r'.*?fail:.*?([0-9]+).*?SNR:.*?([0-9]+).*?RSSI:.*?([0-9]+).*?RSSI_B:.*?([0-9]+).*?rx_F:.*?([0-9]+).*?stat:.*?([0-9])'
    RE_STR_AIR = r'.*?fail:.*?([0-9]+).*?SNR:.*?([0-9]+).*?RSSI:.*?([0-9]+).*?RSSI_B:.*?([0-9]+).*?rx_F:.*?([0-9]+).*?stat:.*?([0-9])'

    def __init__(self):
        self.file_path = None
        self.window = tk.Tk()
        scwidth = self.window.winfo_screenwidth()
        scheight = self.window.winfo_screenheight()
        window_size = '+%d' % ((scwidth - 900) / 2) + '+%d' % ((scheight - 700) / 2)
        self.warning = tk.StringVar()
        self.fail_RMS_input = tk.StringVar()
        self.snr_lmt_input = tk.StringVar()
        self.rssi_RMS_input = tk.StringVar()
        self.vga_initial_input = tk.StringVar()
        self.device_num_input = tk.StringVar()
        self.rssi_differ_input = tk.StringVar()
        self.auto_increase = tk.IntVar()

        self.freq_2410 = tk.IntVar()
        self.freq_2427 = tk.IntVar()
        self.freq_2444 = tk.IntVar()
        self.freq_2461 = tk.IntVar()
        self.freq_2478 = tk.IntVar()        

        self.font_warning = tkFont.Font(size = 16, weight = tkFont.BOLD)

        self.window.title("三合一底噪灵敏度测试工具V0.6 2021-04-26")
        self.window.geometry(window_size)
        self.main_frame = tk.Frame(self.window)

        self.input_frame = tk.Frame(self.main_frame)
        self.label_port_fc = tk.Label(self.input_frame, text="飞机串口:", width=10)
        self.combox_port_fc = ttk.Combobox(self.input_frame, width=40)
        self.label_port_gs = tk.Label(self.input_frame, text="遥控器串口:", width=10)
        self.combox_port_gs = ttk.Combobox(self.input_frame, width=40)
        self.button_start = tk.Button(self.input_frame, text="打开\n串口", width=5, height=2,
                                command=lambda: self.thread_it(self.start_test))
        self.text_warning = tk.Label(self.input_frame, textvariable=self.warning, width=30, height=7, bg='white',
                                wraplength = 330, font = self.font_warning)

        self.checkbox_frame = tk.Frame(self.main_frame)
        self.label_freq = tk.Label(self.checkbox_frame, text="测试频率:")
        self.checkbox_2410 = tk.Checkbutton(self.checkbox_frame, text="2410MHz ", width=10,
                                            variable=self.freq_2410)  # , state = tk.DISABLED)
        self.checkbox_2427 = tk.Checkbutton(self.checkbox_frame, text="2427MHz ", width=10,
                                            variable=self.freq_2427)  # , state = tk.DISABLED)
        self.checkbox_2444 = tk.Checkbutton(self.checkbox_frame, text="2444MHz ", width=10,
                                            variable=self.freq_2444)  # , state = tk.DISABLED)
        self.checkbox_2461 = tk.Checkbutton(self.checkbox_frame, text="2461MHz ", width=10,
                                            variable=self.freq_2461)  # , state = tk.DISABLED)
        self.checkbox_2478 = tk.Checkbutton(self.checkbox_frame, text="2478MHz ", width=10,
                                            variable=self.freq_2478)  # , state = tk.DISABLED)        
        self.label_fail_RMS = tk.Label(self.checkbox_frame, text="FAIL_RMS:")
        self.label_rssi_RMS = tk.Label(self.checkbox_frame, text="RSSI_RMS:")
        self.label_vga_initial = tk.Label(self.checkbox_frame, text="起始VGA:")
        self.label_fail_limit = tk.Label(self.checkbox_frame, text="有效范围(0-30)")
        self.label_rssi_limit = tk.Label(self.checkbox_frame, text="有效范围(50-110)")
        self.label_vga_limit = tk.Label(self.checkbox_frame, text="有效范围(5000-50000)")
        self.label_SNR = tk.Label(self.checkbox_frame, text="SNR_LIMIT:")
        self.label_SNR_limit = tk.Label(self.checkbox_frame, text="有效范围(0-30)")
        self.entry_SNR_LMT = tk.Entry(self.checkbox_frame, textvariable=self.snr_lmt_input,
                                       width=10)  # , state = tk.DISABLED)
        self.entry_fail_RMS = tk.Entry(self.checkbox_frame, textvariable=self.fail_RMS_input,
                                       width=10)  # , state = tk.DISABLED)
        self.entry_rssi_RMS = tk.Entry(self.checkbox_frame, textvariable=self.rssi_RMS_input,
                                       width=10)  # , state = tk.DISABLED)
        self.entry_vga_initial = tk.Entry(self.checkbox_frame, textvariable=self.vga_initial_input,
                                          width=10)  # , state = tk.DISABLED)
        self.label_device_num = tk.Label(self.checkbox_frame, text="设备编号:")
        self.entry_device_num = tk.Entry(self.checkbox_frame, textvariable=self.device_num_input,
                                         width=10)  # , state = tk.DISABLED)
        self.checkbox_auto_increase = tk.Checkbutton(self.checkbox_frame, text="编号自动增加",
                                                     variable=self.auto_increase)  # , state = tk.DISABLED)
        self.button_test_gs = tk.Button(self.checkbox_frame, text="地端设备测试", command=lambda: self.thread_it(self.gs_test),
                                        state=tk.DISABLED)
        self.button_test_fc = tk.Button(self.checkbox_frame, text="天端设备测试", command=lambda: self.thread_it(self.fc_test),
                                        state=tk.DISABLED)
        self.button_test_again = tk.Button(self.checkbox_frame, text="重新测试灵敏度", command=lambda: self.thread_it(self.test_sensitive_again),
                                        state=tk.DISABLED)
        self.label_rssi_difference = tk.Label(self.checkbox_frame, text="RSSI极限差异值:")
        self.entry_rssi_difference = tk.Entry(self.checkbox_frame, textvariable=self.rssi_differ_input,
                                       width=10)  # , state = tk.DISABLED)

        self.print_frame = tk.Frame(self.main_frame)
        self.text_print = tk.Text(self.print_frame, width=130, height=40, wrap=tk.WORD, state=tk.DISABLED)
        self.text_scroll = tk.Scrollbar(self.print_frame, command=self.text_print.yview)
        self.text_print.config(yscrollcommand=self.text_scroll.set)

        self.load_parms()
        self.orig_color = self.label_port_fc.cget('bg')
        self.main_frame.pack(anchor = tk.CENTER)

        self.input_frame.grid(row=1, column=1, padx = 10, sticky='N')
        self.label_port_fc.grid(row=1, column=1, pady = 10)
        self.combox_port_fc.grid(row=1, column=2, columnspan=2, sticky='W')
        self.label_port_gs.grid(row=2, column=1)
        self.combox_port_gs.grid(row=2, column=2, columnspan=2, sticky='W')
        self.button_start.grid(row=1, column=4, rowspan=2, padx=5)
        self.text_warning.grid(row=3, column=2, columnspan=3, pady = 5)

        self.checkbox_frame.grid(row=1, column=2, padx = 10)
        self.label_freq.grid(row=1, column=1)
        self.checkbox_2410.grid(row=1, column=2)
        self.checkbox_2427.grid(row=1, column=3)
        self.checkbox_2444.grid(row=1, column=4)
        self.checkbox_2461.grid(row=2, column=1)
        self.checkbox_2478.grid(row=2, column=2)        
        self.label_fail_RMS.grid(row=3, column=1, sticky='E')
        self.entry_fail_RMS.grid(row=3, column=2)
        self.label_fail_limit.grid(row=3, column=3, columnspan=2, sticky='W')
        self.label_rssi_RMS.grid(row=4, column=1, sticky='E')
        self.entry_rssi_RMS.grid(row=4, column=2)
        self.label_rssi_limit.grid(row=4, column=3, columnspan=2, sticky='W')
        self.label_vga_initial.grid(row=5, column=1, sticky='E')
        self.entry_vga_initial.grid(row=5, column=2)
        self.label_vga_limit.grid(row=5, column=3, columnspan=2, sticky='W')
        self.label_SNR.grid(row=6, column=1, sticky='E')
        self.entry_SNR_LMT.grid(row=6, column=2)
        self.label_SNR_limit.grid(row=6, column=3, columnspan=2, sticky='W')
        self.label_device_num.grid(row=7, column=1, sticky='E')
        self.entry_device_num.grid(row=7, column=2)
        self.checkbox_auto_increase.grid(row=7, column=3, columnspan=2, sticky='W')
        self.label_rssi_difference.grid(row=8, column=1, sticky='E')
        self.entry_rssi_difference.grid(row=8, column=2)
        self.button_test_gs.grid(row=9, column=1, pady=5)
        self.button_test_fc.grid(row=9, column=2, pady=5)
        self.button_test_again.grid(row=9, column=3, columnspan=2, pady=5)

        self.print_frame.grid(row=2, column=1, columnspan=2, pady = 10, padx = 10)
        self.text_print.pack(side=tk.LEFT, fill=tk.Y)
        self.text_scroll.pack(side=tk.RIGHT, fill=tk.Y)

        port_list = list(serial.tools.list_ports.comports())
        self.port_list_des = []
        self.port_list_str = []
        for port in port_list:
            port_str = list(port)
            self.port_list_des.append(port_str[0] + ' ' + port_str[1])
            self.port_list_str.append(port_str[0])
        self.combox_port_fc['value'] = self.port_list_des
        self.combox_port_gs['value'] = self.port_list_des

        self.window.bind("<Button-1>", self.set_default_focus)
        """ 如果添加了新的输入框对象,请将其加入以下entry_set列表中,否则无法点击输入 """
        self.entry_set = [self.combox_port_fc, self.combox_port_gs, self.text_print, self.entry_rssi_RMS,
                          self.entry_fail_RMS, self.entry_vga_initial, self.entry_device_num,
                          self.entry_SNR_LMT, self.entry_rssi_difference]
        self.warning.set("请选择串口设备")
        self.print_buffer = [' '] * 20
        self.print_buffer[0] = 'Waiting for logs...\r\n---------------------------------------------------'
        self.current_msg_index = 1
        self.sent_msg_index = 0
        self.test_state = 0
        self.thread_lock = threading.Lock()
        self.window.after(100, self.update_gui)

    """ 设置鼠标焦点,设置下拉框条目 """
    def set_default_focus(self, event):
        if event.widget not in self.entry_set:
            self.window.focus_set()
        else:
            port_list = list(serial.tools.list_ports.comports())
            self.port_list_des = []
            self.port_list_str = []
            for port in port_list:
                port_str = list(port)
                self.port_list_des.append(port_str[0] + ' ' + port_str[1])
                self.port_list_str.append(port_str[0])
            self.combox_port_fc['value'] = self.port_list_des
            self.combox_port_gs['value'] = self.port_list_des

    @staticmethod
    def thread_it(func, *args):
        t = threading.Thread(target=func, args=args)
        t.setDaemon(True)
        t.start()

    """ 加载保存在JSON文件中的参数 """
    def load_parms(self):
        try:
            with open('XD_STconfig.json', 'r') as f:
                self.parm_save = json.load(f)
            if self.parm_save['2410']:
                self.checkbox_2410.select()
            if self.parm_save['2427']:
                self.checkbox_2427.select()
            if self.parm_save['2444']:
                self.checkbox_2444.select()
            if self.parm_save['2461']:
                self.checkbox_2461.select()
            if self.parm_save['2478']:
                self.checkbox_2478.select()            
            if self.parm_save['increase']:
                self.checkbox_auto_increase.select()
            self.fail_RMS_input.set(self.parm_save['fail_rms'])
            self.rssi_RMS_input.set(self.parm_save['rssi_rms'])
            self.vga_initial_input.set(self.parm_save['vga'])
            self.snr_lmt_input.set(self.parm_save['snr'])
            self.device_num_input.set(self.parm_save['number'])
            self.rssi_differ_input.set(self.parm_save['rssi_diff'])
        except Exception:
            self.checkbox_2410.select()
            self.checkbox_2427.select()
            self.checkbox_2444.select()
            self.checkbox_2461.select()
            self.checkbox_2478.select()            
            self.fail_RMS_input.set('10')
            self.rssi_RMS_input.set('93')
            self.vga_initial_input.set('15000')
            self.snr_lmt_input.set('7')
            self.checkbox_auto_increase.select()
            self.device_num_input.set('1')
            self.rssi_differ_input.set('10')
            self.parm_save = {'increase':1, '2410':1, '2427':1, '2444':1, '2461':1, '2478':1,
                            'fail_rms':'10', 'rssi_rms':'93', 'vga':'15000', 'snr':'7', 'number':'1', 'rssi_diff':'10'}            
            with open('XD_STconfig.json','w+') as f:
                json.dump(self.parm_save, f)

    """ 打印三合一消息 """
    def print_msg(self, msg):
        self.print_buffer[self.current_msg_index] = msg
        self.current_msg_index += 1
        if self.current_msg_index > 19:
            self.current_msg_index = 0

    """ 更新GUI,在主线程中更新GUI元素,否则程序容易崩溃 """
    def update_gui(self):
        self.text_print.config(state=tk.NORMAL)
        while self.current_msg_index != self.sent_msg_index:
            self.text_print.insert(tk.END, '\r\n' + self.print_buffer[self.sent_msg_index])
            self.sent_msg_index += 1
            if self.sent_msg_index > 19:
                self.sent_msg_index = 0
        # print(self.text_scroll.get())
        if self.text_scroll.get()[1] > 0.98:
            self.text_print.see(tk.END)
            while len(self.text_print.get(0.0, tk.END)) > 10000:
                self.text_print.delete(0.0, 3.0)
        self.text_print.config(state=tk.DISABLED)

        with self.thread_lock:
            if self.test_state == 1:
                self.test_state = 0
                self.button_test_gs.config(state=tk.DISABLED)
                self.button_test_fc.config(state=tk.DISABLED)
                self.button_test_again.config(state=tk.DISABLED)
                self.text_warning.config(fg='black')
            elif self.test_state == 2:
                self.test_state = 0
                self.button_test_gs.config(state=tk.NORMAL)
                self.button_test_fc.config(state=tk.NORMAL)
                self.button_test_again.config(state=tk.NORMAL)
                if self.test_result_flag:
                    self.text_warning.config(fg='red')
                else:
                    self.text_warning.config(fg='green')
            elif self.test_state == 3:
                self.test_state = 0
                self.button_test_gs.config(state=tk.DISABLED)
                self.button_test_fc.config(state=tk.DISABLED)
                self.button_test_again.config(state=tk.DISABLED)
                self.button_start.config(state=tk.NORMAL)
                self.combox_port_fc.config(state=tk.NORMAL)
                self.combox_port_gs.config(state=tk.NORMAL)
                self.text_warning.config(fg='black')
            elif self.test_state == 4:
                self.test_state = 0
                self.button_test_gs.config(state=tk.NORMAL)
                self.button_test_fc.config(state=tk.NORMAL)
                self.button_test_again.config(state=tk.DISABLED)
                self.button_start.config(state=tk.DISABLED)
                self.combox_port_fc.config(state=tk.DISABLED)
                self.combox_port_gs.config(state=tk.DISABLED)

        self.window.after(100, self.update_gui)

    """ 绑定到 打开串口 按钮的函数 """
    def start_test(self):
        try:
            self.open_com_port()
            self.test_state = 4
            self.warning.set('点击下列按钮开始测试')          
            record_headers = '测试编号,设备类型,通道,底噪RSSI,底噪RSSI_B,初始RSSI,初始RSSI_B,初始SNR,实测RSSI,实测RSSI_B,实测SNR,能量值,备注'
            if self.file_path is None:
                self.file_path = time.strftime('record %Y-%m-%d %Hh-%Mm-%Ss.csv', time.localtime(time.time()))
                with open(self.file_path, 'a') as record_file:
                    print(record_headers.decode('utf-8').encode('gbk'), file=record_file)
        except Exception:
            return

    """ 设置勾选的测试频率 """
    def select_freq(self):
        self.test_freq_seq = [0,0,0,0,0]
        if self.freq_2410.get():
            self.test_freq_seq[0] = 1
        if self.freq_2427.get():
            self.test_freq_seq[1] = 1
        if self.freq_2444.get():
            self.test_freq_seq[2] = 1
        if self.freq_2461.get():
            self.test_freq_seq[3] = 1
        if self.freq_2478.get():
            self.test_freq_seq[4] = 1

        for item in self.test_freq_seq:
            if item:
                return False
        self.warning.set('请勾选测试频率')
        return True

    """ 设置输入的测试参数 """
    def para_input(self):
        try:
            self.fail_rms_limit = int(self.fail_RMS_input.get())
            self.rssi_rms_limit = int(self.rssi_RMS_input.get())
            self.vga_initial = int(self.vga_initial_input.get())
            self.test_number = int(self.device_num_input.get())
            self.snr_limit = int(self.snr_lmt_input.get())
            self.rssi_difference = int(self.rssi_differ_input.get())

            if (self.fail_rms_limit < 0 or self.fail_rms_limit > 30
                        or self.rssi_rms_limit > 110 or self.rssi_rms_limit < 50
                        or self.vga_initial > 50000 or self.vga_initial < 5000
                        or self.snr_limit < 0 or self.snr_limit > 30
                ):
                self.warning.set('输入值超出范围')
                return True

            with open('XD_STconfig.json','w+') as f:
                self.parm_save['2410'] = self.freq_2410.get()
                self.parm_save['2427'] = self.freq_2427.get()
                self.parm_save['2444'] = self.freq_2444.get()
                self.parm_save['2461'] = self.freq_2461.get()
                self.parm_save['2478'] = self.freq_2478.get()                
                self.parm_save['increase'] = self.auto_increase.get()
                self.parm_save['fail_rms'] = self.fail_RMS_input.get()
                self.parm_save['rssi_rms'] = self.rssi_RMS_input.get()
                self.parm_save['vga'] = self.vga_initial_input.get()
                self.parm_save['snr'] = self.snr_lmt_input.get()
                self.parm_save['number'] = self.device_num_input.get()
                self.parm_save['rssi_diff'] = self.rssi_differ_input.get()
                json.dump(self.parm_save, f)
            return False
        except Exception:
            self.warning.set('请检查输入值是否正确')
            return True

    @staticmethod
    def get_rms(records):
        return int(math.floor(math.sqrt(sum([x ** 2 for x in records]) / len(records))))

    """ 尝试打开串口 """
    def open_com_port(self):
        time.sleep(0.1)
        port_fc_des = self.combox_port_fc.get()
        port_gs_des = self.combox_port_gs.get()
        if port_fc_des not in self.port_list_des:
            self.warning.set('错误!!!:\n未选择正确的COM口!')
            raise RuntimeError("com port invalid")
        else:
            port_fc = self.port_list_str[self.port_list_des.index(port_fc_des)]

        if port_gs_des not in self.port_list_des:
            self.warning.set('错误!!!:\n未选择正确的COM口!')
            raise RuntimeError("com port invalid")
        else:
            port_gs = self.port_list_str[self.port_list_des.index(port_gs_des)]

        try:
            self.com_fc = uploader(port_fc, 115200)
            self.com_gs = uploader(port_gs, 115200)
            time.sleep(0.1)
            self.com_fc.open()
            self.com_gs.open()
        except Exception:
            self.warning.set('错误!!!:\n串口初始化失败')
            raise RuntimeError("com port open failed")
        finally:
            self.com_fc.close()
            self.com_gs.close()

    """ 绑定到 重新测试灵敏度 按钮的函数 """
    def test_sensitive_again(self):
        try:
            with self.thread_lock:
                self.test_state = 1
            self.com_fc.open()
            self.com_gs.open()
            if self.select_freq() or self.para_input():
                return

            self.noise_record_rows = []
            for freq_selected in painter.FREQ_LIST:
                self.noise_record_rows.append('%d,%s,%s,N/A,N/A,' % (self.test_number, self.test_describe, freq_selected))
            self.test_sensitive()

        except Exception:
            self.warning.set('请检查线路连接,重新开始')
            with self.thread_lock:
                self.test_state = 3

        finally:
            self.com_fc.close()
            self.com_gs.close()

    """ 绑定到 天端设备测试 按钮的函数 """
    def fc_test(self):
        try:
            with self.thread_lock:
                self.test_state = 1
            self.com_fc.open()
            self.com_gs.open()
            if self.select_freq() or self.para_input():
                return
            self.test_describe = 'F'
            self.noise_record_rows = []
            self.com_fc.port.write(painter.PRINT_OFF.encode())
            time.sleep(0.2)
            self.com_fc.port.flushInput()

            for freq_selected in painter.FREQ_LIST:
                test_index = painter.FREQ_LIST.index(freq_selected)
                self.noise_record_rows.append('%d,%s,%s,' % (self.test_number, self.test_describe, freq_selected))

                if self.test_freq_seq[test_index] == 0:
                    self.noise_record_rows[test_index] += 'N/A,N/A,'
                else:
                    i = 0
                    rssi_nums = []
                    rssi_b_nums = []
                    self.warning.set('正在测试飞机端底噪\n频率%sMHz' % freq_selected)
                    self.com_fc.port.write(painter.PRINT_OFF.encode())
                    time.sleep(0.2)
                    self.com_fc.port.flushInput()
                    while 1:
                        self.com_fc.port.write(painter.SET_TESTMODE[test_index].encode())
                        read_str = self.com_fc.receive_one_message()
                        # match_obj = re.match(r'.*?cmd_test_mode: 1, 4,.*?([0-9]+), 27', read_str)
                        match_obj = re.match(r'.*?ETST {"status":"OK"}', read_str)
                        if match_obj is not None:
                            self.print_msg(read_str)
                            break

                    self.com_fc.port.write(painter.PRINT_ON.encode())
                    time.sleep(0.2)
                    self.com_fc.port.flushInput()
                    while i < painter.TEST_COUNT:
                        read_str = self.com_fc.receive_one_message()
                        match_obj = re.match(painter.RE_STR_AIR, read_str)
                        if match_obj is not None:
                            self.print_msg(read_str)
                            rssi_nums.append(int(match_obj.group(3)))
                            rssi_b_nums.append(int(match_obj.group(4)))
                            i += 1
                    noise_rssi = self.get_rms(rssi_nums)
                    noise_rssi_b = self.get_rms(rssi_b_nums)
                    self.noise_record_rows[test_index] += ('%d,%d,' % (noise_rssi, noise_rssi_b))

            time.sleep(0.5)
            self.test_sensitive()

            if self.auto_increase.get():
                self.test_number += 1
                self.device_num_input.set(str(self.test_number))

        except Exception:
            self.warning.set('请检查线路连接,重新开始')
            with self.thread_lock:
                self.test_state = 3

        finally:
            self.com_fc.close()
            self.com_gs.close()

    """ 绑定到 地端设备测试 按钮的函数 """
    def gs_test(self):
        try:
            with self.thread_lock:
                self.test_state = 1
            self.com_gs.open()
            self.com_fc.open()
            if self.select_freq() or self.para_input():
                return
            self.test_describe = 'G'
            self.noise_record_rows = []
            self.com_gs.port.write(painter.PRINT_OFF.encode())
            time.sleep(0.2)
            self.com_fc.port.flushInput()

            for freq_selected in painter.FREQ_LIST:
                test_index = painter.FREQ_LIST.index(freq_selected)
                self.noise_record_rows.append('%d,%s,%s,' % (self.test_number, self.test_describe, freq_selected))

                if self.test_freq_seq[test_index] == 0:
                    self.noise_record_rows[test_index] += 'N/A,N/A,'
                else:
                    i = 0
                    rssi_nums = []
                    rssi_b_nums = []
                    self.warning.set('正在测试地面站底噪\n频率%sMHz' % freq_selected)
                    self.com_gs.port.write(painter.PRINT_OFF.encode())
                    time.sleep(0.2)
                    self.com_gs.port.flushInput()
                    while 1:
                        self.com_gs.port.write(painter.SET_TESTMODE[test_index].encode())
                        read_str = self.com_gs.receive_one_message()
                        match_obj = re.match(r'.*?ETST {"status":"OK"}', read_str)
                        if match_obj is not None:
                            self.print_msg(read_str)
                            break

                    self.com_gs.port.write(painter.PRINT_ON.encode())
                    time.sleep(0.2)
                    self.com_gs.port.flushInput()
                    while i < painter.TEST_COUNT:
                        read_str = self.com_gs.receive_one_message()
                        match_obj = re.match(painter.RE_STR_GND, read_str)
                        if match_obj is not None:
                            self.print_msg(read_str)
                            rssi_nums.append(int(match_obj.group(3)))
                            rssi_b_nums.append(int(match_obj.group(4)))
                            i += 1
                    noise_rssi = self.get_rms(rssi_nums)
                    noise_rssi_b = self.get_rms(rssi_b_nums)
                    self.noise_record_rows[test_index] += ('%d,%d,' % (noise_rssi, noise_rssi_b))

            time.sleep(0.5)
            self.test_sensitive()

            if self.auto_increase.get():
                self.test_number += 1
                self.device_num_input.set(str(self.test_number))

        except Exception:
            self.warning.set('请检查线路连接,重新开始')
            with self.thread_lock:
                self.test_state = 3

        finally:
            self.com_fc.close()
            self.com_gs.close()

    """ 测试灵敏度 """
    def test_sensitive(self):
        test_result = ' '
        record_rows = [i for i in self.noise_record_rows]

        for freq_selected in painter.FREQ_LIST:
            test_index = painter.FREQ_LIST.index(freq_selected)

            if self.test_freq_seq[test_index] == 0:
                record_rows[test_index] += 'N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A'
            else:
                set_vga = self.vga_initial - 1000
                i = 0
                rssi_nums = []
                rssi_b_nums = []
                snr_nums = []
                fail_RMS = 0
                rssi_refer_RMS = 0
                signle_result_string = ''
                if self.test_describe == 'G':
                    self.warning.set('正在测试地面站灵敏度\n频率%sMHz' % freq_selected)
                    com_a = self.com_gs
                    com_b = self.com_fc
                    re_match_string = painter.RE_STR_GND
                else:
                    self.warning.set('正在测试飞机端灵敏度\n频率%sMHz' % freq_selected)
                    com_a = self.com_fc
                    com_b = self.com_gs
                    re_match_string = painter.RE_STR_AIR

                com_a.port.write(painter.PRINT_OFF.encode())
                com_b.port.write(painter.PRINT_OFF.encode())
                time.sleep(0.2)

                com_a.port.flushInput()
                com_b.port.flushInput()
                while 1:
                    com_b.port.write(painter.SET_TESTMODE[test_index].encode())
                    read_str = com_b.receive_one_message()
                    # match_obj = re.match(r'.*?cmd_test_mode: 1, 4,.*?([0-9]+), 27', read_str)
                    match_obj = re.match(r'.*?ETST {"status":"OK"}', read_str)
                    if match_obj is not None:
                        self.print_msg(read_str)
                        break

                if self.test_describe == 'G':
                    self.com_fc.port.flushInput()
                    while 1:
                        self.com_fc.port.write(painter.SET_ANTENNA_RF1.encode())
                        read_str = self.com_fc.receive_one_message()
                        # match_obj = re.match(r'.*?set_air_ant 1', read_str)
                        match_obj = re.match(r'.*?ETST {"status":"OK"}', read_str)
                        if match_obj is not None:
                            self.print_msg(read_str)
                            break

                com_a.port.write(painter.SET_TESTMODE[test_index].encode())
                com_a.port.write(painter.PRINT_ON.encode())
                time.sleep(0.2)
                com_a.port.flushInput()
                while 1:
                    read_str = com_a.receive_one_message()
                    match_obj = re.match(re_match_string, read_str)
                    if match_obj is not None:
                        self.print_msg(read_str)
                        if freq_selected == match_obj.group(5):
                            if match_obj.group(6) == '4':
                                break
                        else:
                            com_a.port.write(painter.SET_TESTMODE[test_index].encode())
                            time.sleep(0.2)
                            com_a.port.flushInput()

                time.sleep(0.2)
                com_a.port.flushInput()
                while i < painter.TEST_COUNT:
                    read_str = com_a.receive_one_message()
                    self.print_msg(read_str)
                    match_obj = re.match(re_match_string, read_str)
                    if match_obj is not None and match_obj.group(6) == '4':
                        rssi_nums.append(int(match_obj.group(3)))
                        snr_nums.append(int(match_obj.group(2)))
                        rssi_b_nums.append(int(match_obj.group(4)))
                        i += 1
                initial_rssi = self.get_rms(rssi_nums)
                initial_rssi_b = self.get_rms(rssi_b_nums)
                initial_snr = self.get_rms(snr_nums)
                record_rows[test_index] += ('%d,%d,%d,' % (initial_rssi, initial_rssi_b, initial_snr))

                while fail_RMS < self.fail_rms_limit:
                    i = 0
                    fail_counts = []
                    rssi_nums = []
                    rssi_b_nums = []
                    snr_nums = []
                    if fail_RMS < (self.fail_rms_limit / 2.0):
                        set_vga += 1000
                    else:
                        set_vga += 500
                    set_vga_com = painter.SET_VGA_HEAD + str(set_vga) + painter.SET_VGA_TAIL
                    com_b.port.write(set_vga_com.encode())
                    time.sleep(0.2)
                    com_b.port.flushInput()
                    com_b.port.write(painter.SET_GET_VGA.encode())
                    while 1:
                        read_str = com_b.receive_one_message()
                        match_obj = re.match(r'.*?Tx vga : ([0-9]+) mdB', read_str)
                        if match_obj is not None:
                            set_vga = int(match_obj.group(1))
                            self.print_msg(read_str)
                            break
                    time.sleep(0.2)
                    com_a.port.flushInput()

                    read_str = com_a.receive_one_message()
                    self.print_msg(read_str)
                    match_obj = re.match(re_match_string, read_str)
                    if match_obj is not None:
                        fail_RMS_old = match_obj.group(1)              

                    while i < painter.TEST_COUNT:
                        read_str = com_a.receive_one_message()
                        self.print_msg(read_str)
                        match_obj = re.match(re_match_string, read_str)
                        if match_obj is not None and match_obj.group(6) == '4':
                            fail_RMS_new = match_obj.group(1)
                            fail_num = int(fail_RMS_new) - int(fail_RMS_old)   
                            fail_counts.append(fail_num)
                            snr_nums.append(int(match_obj.group(2)))
                            rssi_nums.append(int(match_obj.group(3)))
                            rssi_b_nums.append(int(match_obj.group(4)))                         
                            i += 1
                    fail_RMS = self.get_rms(fail_counts)
                    snr_RMS = self.get_rms(snr_nums)
                    rssi_RMS = self.get_rms(rssi_nums)
                    rssi_b_RMS = self.get_rms(rssi_b_nums)
                    if rssi_RMS > rssi_b_RMS:
                        rssi_refer_RMS = rssi_b_RMS
                    else:
                        rssi_refer_RMS = rssi_RMS
                    if fail_RMS > 0 and snr_RMS > self.snr_limit:
                        self.warning.set('频点%sMHz测试SNR不合格' % freq_selected)
                        signle_result_string = 'SNR Failed'
                        break
                    if abs(rssi_RMS - rssi_b_RMS) > self.rssi_difference:
                        self.warning.set('频点%sMHz测试RSSI和RSSI_B差异过大' % freq_selected)
                        signle_result_string = 'RSSI unbalance'
                        break

                if len(signle_result_string) > 1:
                    test_result = test_result + freq_selected + ' '
                elif rssi_refer_RMS < self.rssi_rms_limit:
                    signle_result_string = 'RSSI Failed'
                    self.warning.set('频点%sMHz测试RSSI不合格' % freq_selected)
                    test_result = test_result + freq_selected + ' '
                else:
                    signle_result_string = 'OK'
                    self.warning.set('频点%sMHz通过测试' % freq_selected)

                record_rows[test_index] += ('%d,%d,%d,%d,%s' % (rssi_RMS, rssi_b_RMS, snr_RMS, set_vga, signle_result_string))
                time.sleep(1.5)

        com_a.port.write(painter.PRINT_OFF.encode())
        with open(self.file_path, 'a') as record_file:
            for row in record_rows:
                print(row.encode('gbk'), file=record_file)
        if len(test_result) > 2:
            self.warning.set('下列频率未通过测试:\n' + test_result)
            self.test_result_flag = 1
        else:
            self.warning.set('设备通过测试')
            self.test_result_flag = 0
        with self.thread_lock:
            self.test_state = 2


def main():
    window = painter()
    window.window.mainloop()


if __name__ == '__main__':
    main()

  开发测试 最新文章
pytest系列——allure之生成测试报告(Wind
某大厂软件测试岗一面笔试题+二面问答题面试
iperf 学习笔记
关于Python中使用selenium八大定位方法
【软件测试】为什么提升不了?8年测试总结再
软件测试复习
PHP笔记-Smarty模板引擎的使用
C++Test使用入门
【Java】单元测试
Net core 3.x 获取客户端地址
上一篇文章      下一篇文章      查看所有文章
加:2021-10-17 12:16:56  更:2021-10-17 12:17:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/18 3:00:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码