1.打开项目,terminal中输 python tool.py -s app_cmd_vel_response -p app_cmd_vel 注:-s 订阅app_cmd_vel_response话题 -p 发布app_cmd_vel话题
2.在app_cmd_vel_controller.py中初始化手柄
import global_values
import threading
import pygame
import json
import time
from Dxr_log.log import *
class dxr_joystick(object):
def __init__(self):
self.change_times = 16
self.max_v = 2.8
self.max_w = 1.0
self.now_times = 1
self.now_max_v = self.max_v * self.now_times / self.change_times
self.now_max_w = self.max_w * self.now_times / self.change_times
self.v = 0
self.w = 0
self.run_thread = threading.Thread()
self.enable_old = False
self.jsonObj = {}
self.disconnect_check_thread = threading.Thread()
self.disconnect_protect_flag = False
self.joystick_is_alive = False
self.joystick = pygame.joystick
self.follow_control_thread = threading.Thread()
3.在global_values.py中添加初始值
max_v = 2.8
max_w = 1.0
nav_follow_state = None
4.在app_cmd_vel_controller.py中赋值手柄数据
def run(self):
self.max_v = float(global_values.max_v)
self.max_w = float(global_values.max_w)
self.now_times = 1
self.now_max_v = self.max_v * self.now_times / self.change_times
self.now_max_w = self.max_w * self.now_times / self.change_times
self.v = 0
self.w = 0
self.disconnect_protect_flag = False
self.joystick_is_alive = False
self.run_thread = threading.Thread(target=self.joystick_initial)
self.run_thread.daemon = True
self.run_thread.start()
5.在app_cmd_vel_controller.py中读取手柄数据
def joystick_initial(self):
try:
pygame.init() //初始化pygame
self.joystick = pygame.joystick.Joystick(0) //新建一个Joystick 对象
temp_alive = 0
while 1: //循环获取手柄数据
for event_ in pygame.event.get(): //从队列中获取事件
if not self.joystick_is_alive:
temp_alive = temp_alive + 1
if temp_alive > 10://能成功获取十次数据标志位置位true
self.joystick_is_alive = True
print_info("Joystick active success")
if event_.type == pygame.JOYDEVICEADDED:
print_info("Joystick create success")
if event_.type == pygame.JOYBUTTONDOWN:
button_id = event_.button //按钮id赋值
button = self.joystick.get_button(button_id) //获取当前按钮状态
print_info("button " + str(button_id) + ":" + str(button))//写入日志
if button_id == 3: //如果按钮id等于3,速度值加
if event_.type == pygame.JOYBUTTONDOWN:
self.start_add()
if button_id == 0: //如果按钮id等于0,速度值减
if event_.type == pygame.JOYBUTTONDOWN:
self.start_sub()
if (button_id == 6 or button_id == 1) and self.joystick_is_alive: //如果按钮id等于6或1并且手柄标记位为true,开启失联保护
if event_.type == pygame.JOYBUTTONDOWN:
if not self.disconnect_check_thread.is_alive():
self.disconnect_check_thread = threading.Thread(target=self.disconnect_protect)
self.disconnect_check_thread.daemon = True
self.disconnect_check_thread.start()
if button_id == 5: //如果按钮id等于5,获取跟随状态并返回改变的状态值
if event_.type == pygame.JOYBUTTONDOWN:
if not self.follow_control_thread.is_alive():
self.follow_control_thread = threading.Thread(target=self.follow_control_thread_poc)
self.follow_control_thread.daemon = True
self.follow_control_thread.start()
value1 = self.joystick.get_axis(1) //获取v的值
if -0.05 < value1 < 0.05:
value1 = 0
elif value1 > 0.99:
value1 = 1
elif value1 < -0.99:
value1 = -1
value2 = self.joystick.get_axis(2) //获取w的值
if -0.05 < value2 < 0.05:
value2 = 0
elif value2 > 0.99:
value2 = 1
elif value2 < -0.99:
value2 = -1
self.v = -self.now_max_v * value1
self.w = self.now_max_w * value2
if (abs(self.v) > (0.9 * self.max_v)) and abs(value2) > 0.1:
self.v = -round(self.now_max_v * (value1 * (1 - 0.3 * abs(value2))), 4)
if (self.joystick.get_button(4) == 1) and self.joystick_is_alive://如果按钮id等于4被按下并且手柄标记位为true,发送线速度和角速度
self.enable_old = True
self.move_method()
else:
if self.enable_old:
self.enable_old = False
self.stop_method() //发送线速度角速度为0,让车停止
if (pygame.joystick.get_count() <= 0) or self.disconnect_protect_flag: //如果系统无中摇杆或失联保护位为true,失联
raise Exception("connect lost")
time.sleep(0.1)
except Exception as ex:
print_error("Joystick initial error:", ex)
self.stop_method() //发送线速度角速度为0,让车停止
pygame.joystick.quit() //卸载 joystick 模块
pygame.quit() //关闭pygame
time.sleep(5)
self.run()
6.在global_values.py中定义发布器
app_cmd_vel_publisher = Dxr_Publisher('/app_cmd_vel') //发布/app_cmd_vel话题
get_navigation_info_publisher = Dxr_Publisher('/get_navigation_info') //发布/get_navigation_info话题
ctrl_follow_publisher = Dxr_Publisher('/ctrl_follow') // 发布/ctrl_follow话题
7.在app_cmd_vel_controller.py中调用global_values中的发布器
publisher = global_values.app_cmd_vel_publisher
get_navigation_info_publisher = global_values.get_navigation_info_publisher
ctrl_follow_publisher = global_values.ctrl_follow_publisher
8.app_cmd_vel_controller.py中,增加角速度线速度
def start_add(self):
try:
self.now_times = self.now_times + 1
if self.now_times > self.change_times:
self.now_times = self.change_times
self.now_max_v = self.max_v * self.now_times / self.change_times
self.now_max_w = self.max_w * self.now_times / self.change_times
except Exception as ex:
print_error(ex)
9.减少角速度线速度
def start_sub(self):
try:
self.now_times = self.now_times - 1
if self.now_times < 0:
self.now_times = 0
self.now_max_v = self.max_v * self.now_times / self.change_times
self.now_max_w = self.max_w * self.now_times / self.change_times
except Exception as ex:
print_error(ex)
10.失联保护
def disconnect_protect(self):
try:
print_info("Enter disconnect_protect")
time.sleep(0.5)
temp = 0
while (self.joystick.get_button(6) == 1) and (self.joystick.get_button(1) == 1):
time.sleep(0.1)
temp = temp + 1
if temp > 20:
self.disconnect_protect_flag = True
raise Exception("disconnect_protect start")
print_info("Out disconnect_protect")
except Exception as ex:
print_error(ex)
11.发送指令,停止运行
def stop_method(self):
self.enable_old = False
self.jsonObj = {
"priority": 0,
"msg": {
"v": 0,
"w": 0
}
}
if __name__ != '__main__':
publisher.publish(json.dumps(self.jsonObj)) //发出指令
else:
print_info(json.dumps(self.jsonObj))
12.发送当前角速度线速度
def move_method(self):
self.jsonObj = {
"priority": 0,
"msg": {
"v": self.v,
"w": self.w
}
}
if __name__ != '__main__':
publisher.publish(json.dumps(self.jsonObj))
else:
print_info(json.dumps(self.jsonObj))
13.跟随状态检测
def follow_state_check_method(self):
self.jsonObj = {
}
print_info("joystick send to nav:" + json.dumps(self.jsonObj))
get_navigation_info_publisher.publish(json.dumps(self.jsonObj)) //发布器发布指令
14.发送开启或关闭跟随指令
def follow_control_method(self, state):
if state: //如果state为true,开启跟随
self.jsonObj = {
"msg": {
"action": 1
}
}
else: //如果state为false,关闭跟随
self.jsonObj = {
"msg": {
"action": 0
}
}
print_info("joystick send to nav:" + json.dumps(self.jsonObj))
ctrl_follow_publisher.publish(json.dumps(self.jsonObj)) //发布指令
15.跟随线程函数
def follow_control_thread_poc(self):
pass
try:
global_values.nav_follow_state = None //导航状态置位空
self.follow_state_check_method() //获取跟随状态
temp = 50 //循环50次检测
while 1:
if not (global_values.nav_follow_state is None): //如果跟随状态为开启,发送关闭指令,如果状态为关闭,发送开启指令
self.follow_control_method(not global_values.nav_follow_state)
break
else:
time.sleep(0.1)
temp = temp - 1
if temp < 0:
break
time.sleep(0.5)
except Exception as e:
print_error(ex)
16.获取跟随状态回复数据并进行处理
def get_navigation_info_response_callback(data):
print(data)
data1 = str(data,encoding="utf-8") //转为字符串格式
json_data = json.loads(data1)
json_data1 =json.loads(json_data) //字符串转为json格式
if json_data1['msg']['result'] == 1:
global_values.nav_follow_state = True
else:
global_values.nav_follow_state = False
global_values.setCallback('/get_navigation_info_response', get_navigation_info_response_callback) //获取话题回复数据
17.查看话题,登录服务器 再输入mosquitto_sub -t '/topic/#' 显示 joystick2_pub: /app_cmd_vel /get_navigation_info /ctrl_follow joystick2_sub: /get_navigation_info_response 18.补充见Python Mqtt 二次封装与模块化开发
|