项目中需要同时使用ros、python3、gRPC。 使用记录如下: ros与python3、gRPC共同使用的方法 先编译工程
source source /opt/ros/kinetic/setup.bash
catkin_make
后续需要使用rosrun等指令时,无论出于python2.7还是python3环境,只需要
source devel/setup.bash
在python3环境中第一次尝试运行publisher
cd ros_python3_example/scripts python talker.py
import rospy
from std_msgs.msg import String
def talker():
pub = rospy.Publisher('chatter', String, queue_size=10)
rospy.init_node('talker', anonymous=True)
rate = rospy.Rate(10)
while not rospy.is_shutdown():
hello_str = "hello world %s" % rospy.get_time()
rospy.loginfo(hello_str)
pub.publish(hello_str)
rate.sleep()
if __name__ == '__main__':
print(rospy.__file__)
try:
talker()
except rospy.ROSInterruptException:
pass
报错
ModuleNotFoundError: No module named ‘rospkg’ python3环境下安装rospkg
pip install rospkg
再次运行python talker.py,即可正常发送消息 接着打开新terminal,source devel/setup.py,运行python listener.py,也可收到消息。
import rospy
from std_msgs.msg import String
def callback(data):
rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)
def listener():
rospy.init_node('listener', anonymous=True)
rospy.Subscriber("chatter", String, callback)
rospy.spin()
if __name__ == '__main__':
listener()
此时即证明了ros可以和python3一起使用
在修改listener.py,在其中加入一个gRPC的服务,再重新启动listener.py
import rospy
from std_msgs.msg import String
from concurrent import futures
import time
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
def callback(data):
rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)
def listener():
rospy.init_node('listener', anonymous=True)
rospy.Subscriber("chatter", String, callback)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
server.start()
rospy.spin()
server.stop(0)
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message = 'hello {msg}'.format(msg = request.name))
def SayHelloAgain(self, request, context):
return helloworld_pb2.HelloReply(message='hello {msg}'.format(msg = request.name))
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(60*60*24)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
listener()
其中helloworld.proto
syntax = "proto3";
service Greeter {
rpc SayHello(HelloRequest) returns (HelloReply) {}
rpc SayHelloAgain(HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
处理helloworld.proto文件
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto
再启动listener.py
python listener.py
打开两个terminal,分别运行python talker.py和python client.py均可获取正常的结果。 client.py内容
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='czl'))
print("Greeter client received: " + response.message)
response = stub.SayHelloAgain(helloworld_pb2.HelloRequest(name='daydaygo'))
print("Greeter client received: " + response.message)
if __name__ == '__main__':
run()
以上即证明了,ros可以和python3、gRPC同时使用
|