服务是ROS2一种通信方式,具有以下特点: 1 客户端,服务器模型:不同于话题单向通信,服务可以进行双向通信,客户端发起请求,服务器端进行应答 2 同步通信机制:通信双端同步通信,如果一方无法收到会发送另一端 3 服务端唯一,客户端不唯一:一对多通信 4 .srv文件定义请求和应答数据结构:服务通信消息类型为.srv
示例1 加法计算器 客户端
import sys
import rclpy
from rclpy.node import Node
from learning_interface.src import AddTwoInts
class adderClient(Node):
def __init__(self, name):
super().__init__(name)
self.client = self.create_client(AddTwoInts, 'add_two_ints')
while not self.client.wait_for_service(timeout_sec = 1.0):
self.get_logger().info('service not available, waiting again...')
self.request = AddTwoInts.Request()
def send_request(self):
self.request.a = int(sys.argv[1])
self.request.b = int(sys.argv[2])
self.future = self.client.call_async(self.request)
def main(args=None)
rclpy.init(args=args)
node = adderClient("service_adder_client")
node.send_request()
while rclpy.ok():
rclpy.spin_once(node)
if node.future.done():
try:
response = node.future.result()
except Exception as e:
node.get_logger().info('Service call failed %r' % (e,))
else:
node,get_logger().info('Result of add_two_ints: for %d + %d = %d' % (node.request.a, node.request.b, response.sum))
break
node.destroy_node()
rclpy.shutdown()
说明: 1 from learning_interface.src import AddTwoInts 引入服务接口类型AddTwoInts
2 self.client = self.create_client(AddTwoInts, ‘add_two_ints’) 创建客户端对象,参数:接口类型,服务名称
3 while not self.client.wait_for_service(timeout_sec = 1.0): self.get_logger().info(‘service not available, waiting again…’) 使用客户端对象wait_for_service(timeout_sec)方法检测是否可以连接服务器端,如果无法连接在终端打出service not available, waiting again…
4 self.request = AddTwoInts.Request() 将接口实例化,为之后发送数据准备
5 self.request.a = int(sys.argv[1]) self.request.b = int(sys.argv[2] 在终端读取要发送的数据a和b
6 self.future = self.client.call_async(self.request) 使用异步通信发送请求,异步通信意味着发出请求后程序可以继续其他任务,不需要等待回复
7 rclpy.spin_once(node) 定期查看是否收到回复
8 node,get_logger().info(‘Result of add_two_ints: for %d + %d = %d’ % (node.request.a, node.request.b, response.sum)) 如果收到回复打出计算结果
服务器端:
import rclpy
from rclpy.node import Node
from learning_interface.srv import AddTwoInts
class adderServer(Node):
def __init__(self, name):
super().__init__(name)
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.adder_callback)
def adder_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request \n a: %d b: %d' % (request.a, request.b))
return response
def main(args=None):
rclpy.init(args=args)
node = adderServer("service_adder_server")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
1 self.srv = self.create_service(AddTwoInts, ‘add_two_ints’, self.adder_callback) 创建服务器端对象。参数:接口类型,名称,回调函数
2 return response return后会自动将结果发送回客户端
完成程序后别忘了setup文件里声明入口:
entry_points={
'console_scripts': [
'service_adder_client = learning_service.service_adder_client:main',
'service_adder_server = learning_service.service_adder_server:main'
]
}
客户端程序设计流程: 1 创建客户端对象 2 创建并发送请求数据 3 等待服务端应答数据
服务器端程序设计流程: 1 创建服务器端对象 2 通过回调函数进行服务 3 向客户端反馈应答结果
示例2:图像识别定位程序
服务器端程序主体类似于话题实例里的图片处理程序,程序讲解可见https://blog.csdn.net/Raine_Yang/article/details/125349724?spm=1001.2014.3001.5501
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
from cv_bridge import CvBridge
import cv2
from learning_interface.srv import GetObjectPosition
lower_red = np.array([0, 90, 128])
upper_red = np.array([180, 255, 255])
class ImageSubscriber(Node):
def __init__(self, name):
super().__init__(name)
sub.sub = self.create_subscription(Image, 'image_raw', self.listener_callback, 10)
self.cv_bridge = CvBridge()
sub,srv = self.create_service(GetObjectPosition, 'get_target_position',self.object_position_callback)
self.objectX = 0
self.objectY = 0
def object_detect(self, image):
hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
mask_red = cv2.inRange(hsv_img, lower_red, upper_red)
contours, hierarchy = cv2.findContours(mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
for cnt in contours:
if cnt.shape[0] < 150:
continue
(x, y, w, h) = cv2.boundingRect(cnt)
cv2.drawContours(image, [cnt], -1, (0, 255, 0), 2)
cv2.circle(image, (int(x + w / 2), int(y + h / 2), 5, (0, 255, 0), -1)
self.objectX = (int)(x + w /2)
self.objectY = (int)(y + h / 2)
cv2.imshow("object", image)
cv2.waitKey(10)
def listener_callback(self, data):
self.get_logger().info('Receiving video frame')
image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8')
self.object_detect(image)
def object_position_callback(self, request, response):
if request.get == True
response.x = self.objectX
response.y = self.objectY
self,get_logger().info('Object position\nx: %d y: %d' % (response.x, response.y))
else:
response.x = 0
response.y = 0
self.get_logger().info('Invalid command')
return response
def main(args=None):
rclpy.init(args=args)
rclpy.ImageSubscriber("service_object_server")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
1 from learning_interface.srv import GetObjectPosition 引入服务接口
2 sub,srv = self.create_service(GetObjectPosition, ‘get_target_position’,self.object_position_callback) 创建服务器端对象,参数:服务接口,服务名称,回调函数
3 self.objectX = (int)(x + w /2) self.objectY = (int)(y + h / 2) 保存图像中心点坐标为返回值
4 def object_position_callback(self, request, response): 创建服务回调函数
5 if request.get == True response.x = self.objectX response.y = self.objectY self,get_logger().info(‘Object position\nx: %d y: %d’ % (response.x, response.y)) 填充response并在日志打印物体位置
6 return response 将数据返回给客户端
7 rclpy.ImageSubscriber(“service_object_server”) 创建节点,名称service_object_server
客户端:
import rclpy
from rclpy.node import Node
from learning_interface.srv import GetObjectPosition
class ObjectClient(Node):
def __init__(self, name):
super().__init__(name)
self.client = self.create_client(GetObjectPosition, 'get_target_position')
while not self.client.wait_for_service(timeout_sec = 1.0):
self.get_logger().info('service not available, waiting again...')
self.request = GetObjectPosition.Request()
def send_request(self):
self.request.get = True
self.future = self.client.call_async(self.request)
def main(args=None):
rclpy.init(args=args)
node = objectClient("service_object_client")
node.self_request()
while rclpy.ok():
rclpy.spin_once(node)
if node.future.done():
try:
response = node.future.result()
except Exception as e:
node.get_logger().info('Service call failed %r' % (e,))
else:
node.get_logger().info('Result of the object position:\n x: %d y: %d' % (response.x, response.y))
break;
node.destroy_node()
rclpy.shutdown()
1 self.client = self.create_client(GetObjectPosition, ‘get_target_position’) 创建客户端对象,参数:服务接口,服务名称(和服务器端对应)
2 self.request = GetObjectPosition.Request() 实例化服务接口,准备填充数据
3 self.future = self.client.call_async(self.request) 异步通信发送服务请求
4 node = objectClient(“service_object_client”) 创建客户端节点,名称service_object_client
5 response = node.future.result() 查询是否得到回应
6 node.get_logger().info(‘Result of the object position:\n x: %d y: %d’ % (response.x, response.y)) 如得到回应,在终端打出目标x,y坐标
最后别忘了setup
entry_points={
'console_scripts': [
'service_object_client = learning_service.service_object_client:main',
'service_object_server = learning_service.service_object_server:main',
],
},
监测服务 显示服务列表
1 ros2 service list
显示服务接口类型
2 ros2 service type (服务名称)
在命令行发送服务请求:
ros2 service call (服务名称) (接口名称)“参数”
|