ros2 命令加载过程 ros2 topic代码位置:ros2\src\ros2cli\ros2topic 最终运作在ros2topic文件中 setup.py 定义挂载点,加载命令的执行函数
from setuptools import find_packages
from setuptools import setup
package_name = 'ros2topic'
setup(
name=package_name,
version='0.13.1',
packages=find_packages(exclude=['test']),
data_files=[
('share/' + package_name, ['package.xml']),
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
],
install_requires=['ros2cli'],
zip_safe=True,
author='Dirk Thomas',
author_email='dthomas@osrfoundation.org',
maintainer='Claire Wang, Mabel Zhang',
maintainer_email='clairewang@openrobotics.org, mabel@openrobotics.org',
url='https://github.com/ros2/ros2cli/tree/master/ros2topic',
download_url='https://github.com/ros2/ros2cli/releases',
keywords=[],
classifiers=[
'Environment :: Console',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python',
],
description='The topic command for ROS 2 command line tools.',
long_description="""\
The package provides the topic command for the ROS 2 command line tools.""",
license='Apache License, Version 2.0',
tests_require=['pytest'],
entry_points={
'ros2cli.command': [
'topic = ros2topic.command.topic:TopicCommand',
],
'ros2cli.extension_point': [
'ros2topic.verb = ros2topic.verb:VerbExtension',
],
'ros2topic.verb': [
'bw = ros2topic.verb.bw:BwVerb',
'delay = ros2topic.verb.delay:DelayVerb',
'echo = ros2topic.verb.echo:EchoVerb',
'find = ros2topic.verb.find:FindVerb',
'hz = ros2topic.verb.hz:HzVerb',
'info = ros2topic.verb.info:InfoVerb',
'list = ros2topic.verb.list:ListVerb',
'pub = ros2topic.verb.pub:PubVerb',
'type = ros2topic.verb.type:TypeVerb',
],
}
)
例如: ros2 topic list .会运行ros2topic.verb.list:ListVerb 先运行下面的main函数
from ros2cli.command import add_subparsers_on_demand
from ros2cli.command import CommandExtension
class TopicCommand(CommandExtension):
"""Various topic related sub-commands."""
def add_arguments(self, parser, cli_name):
self._subparser = parser
parser.add_argument(
'--include-hidden-topics', action='store_true',
help='Consider hidden topics as well')
add_subparsers_on_demand(
parser, cli_name, '_verb', 'ros2topic.verb', required=False)
def main(self, *, parser, args):
if not hasattr(args, '_verb'):
self._subparser.print_help()
return 0
extension = getattr(args, '_verb')
return extension.main(args=args)
下面执行 ros2\src\ros2cli\ros2topic\ros2topic\verb\list.py
from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2topic.api import get_topic_names_and_types
from ros2topic.verb import VerbExtension
class ListVerb(VerbExtension):
"""Output a list of available topics."""
def add_arguments(self, parser, cli_name):
add_strategy_node_arguments(parser)
parser.add_argument(
'-t', '--show-types', action='store_true',
help='Additionally show the topic type')
parser.add_argument(
'-c', '--count-topics', action='store_true',
help='Only display the number of topics discovered')
parser.add_argument(
'--include-hidden-topics', action='store_true',
help='Consider hidden topics as well')
parser.add_argument(
'-v', '--verbose', action='store_true',
help='List full details about each topic')
@staticmethod
def show_topic_info(topic_info, is_publisher):
message = ('Published' if is_publisher else 'Subscribed') + ' topics:\n'
for (topic_name, topic_types, pub_count, sub_count) in topic_info:
count = pub_count if is_publisher else sub_count
if count:
topic_types_formatted = ', '.join(topic_types)
count_str = str(count) + ' ' + ('publisher' if is_publisher else 'subscriber') \
+ ('s' if count > 1 else '')
message += f' * {topic_name} [{topic_types_formatted}] {count_str}\n'
return message
def main(self, *, args):
topic_info = []
with NodeStrategy(args) as node:
topic_names_and_types = get_topic_names_and_types(
node=node,
include_hidden_topics=args.include_hidden_topics)
for (topic_name, topic_types) in topic_names_and_types:
if args.verbose:
pub_count = node.count_publishers(topic_name)
sub_count = node.count_subscribers(topic_name)
topic_info.append((topic_name, topic_types, pub_count, sub_count))
else:
topic_info.append((topic_name, topic_types, 0, 0))
if args.count_topics:
print(len(topic_names_and_types))
elif topic_names_and_types:
if args.verbose:
print(self.show_topic_info(topic_info, is_publisher=True))
print(self.show_topic_info(topic_info, is_publisher=False))
else:
for (topic_name, topic_types, _, _) in topic_info:
msg = '{topic_name}'
topic_types_formatted = ', '.join(topic_types)
if args.show_types:
msg += ' [{topic_types_formatted}]'
print(msg.format_map(locals()))
列如 ros2 topic list -c ros2 topic list -v ros2 topic list -t ros2 topic list --include-hidden-topics
|