IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> ros2 topic 源码解析 -> 正文阅读

[Python知识库]ros2 topic 源码解析

ros2 命令加载过程
ros2 topic代码位置:ros2\src\ros2cli\ros2topic
最终运作在ros2topic文件中
setup.py 定义挂载点,加载命令的执行函数

from setuptools import find_packages
from setuptools import setup
package_name = 'ros2topic'
setup(
    name=package_name,
    version='0.13.1',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/' + package_name, ['package.xml']),
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
    ],
    install_requires=['ros2cli'],
    zip_safe=True,
    author='Dirk Thomas',
    author_email='dthomas@osrfoundation.org',
    maintainer='Claire Wang, Mabel Zhang',
    maintainer_email='clairewang@openrobotics.org, mabel@openrobotics.org',
    url='https://github.com/ros2/ros2cli/tree/master/ros2topic',
    download_url='https://github.com/ros2/ros2cli/releases',
    keywords=[],
    classifiers=[
        'Environment :: Console',
        'Intended Audience :: Developers',
        'License :: OSI Approved :: Apache Software License',
        'Programming Language :: Python',
    ],
    description='The topic command for ROS 2 command line tools.',
    long_description="""\
The package provides the topic command for the ROS 2 command line tools.""",
    license='Apache License, Version 2.0',
    tests_require=['pytest'],
    entry_points={
        'ros2cli.command': [
            'topic = ros2topic.command.topic:TopicCommand',
        ],
        'ros2cli.extension_point': [
            'ros2topic.verb = ros2topic.verb:VerbExtension',
        ],
        'ros2topic.verb': [
            'bw = ros2topic.verb.bw:BwVerb',
            'delay = ros2topic.verb.delay:DelayVerb',
            'echo = ros2topic.verb.echo:EchoVerb',
            'find = ros2topic.verb.find:FindVerb',
            'hz = ros2topic.verb.hz:HzVerb',
            'info = ros2topic.verb.info:InfoVerb',
            'list = ros2topic.verb.list:ListVerb',
            'pub = ros2topic.verb.pub:PubVerb',
            'type = ros2topic.verb.type:TypeVerb',
        ],
    }
)

例如: ros2 topic list .会运行ros2topic.verb.list:ListVerb
先运行下面的main函数

from ros2cli.command import add_subparsers_on_demand
from ros2cli.command import CommandExtension

class TopicCommand(CommandExtension):
    """Various topic related sub-commands."""
    def add_arguments(self, parser, cli_name):
        self._subparser = parser
        parser.add_argument(
            '--include-hidden-topics', action='store_true',
            help='Consider hidden topics as well')
        # add arguments and sub-commands of verbs
        add_subparsers_on_demand(
            parser, cli_name, '_verb', 'ros2topic.verb', required=False)

    def main(self, *, parser, args):
        if not hasattr(args, '_verb'):
            # in case no verb was passed
            self._subparser.print_help()
            return 0

        extension = getattr(args, '_verb')

        # call the verb's main method
        return extension.main(args=args)

下面执行 ros2\src\ros2cli\ros2topic\ros2topic\verb\list.py

from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2topic.api import get_topic_names_and_types
from ros2topic.verb import VerbExtension


class ListVerb(VerbExtension):
    """Output a list of available topics."""
	 # topic 后面参数解析
    def add_arguments(self, parser, cli_name):
        add_strategy_node_arguments(parser)

        parser.add_argument(
            '-t', '--show-types', action='store_true',
            help='Additionally show the topic type')
        parser.add_argument(
            '-c', '--count-topics', action='store_true',
            help='Only display the number of topics discovered')
        # duplicate the following argument from the command for visibility
        parser.add_argument(
            '--include-hidden-topics', action='store_true',
            help='Consider hidden topics as well')
        parser.add_argument(
            '-v', '--verbose', action='store_true',
            help='List full details about each topic')

    @staticmethod
    def show_topic_info(topic_info, is_publisher):
        message = ('Published' if is_publisher else 'Subscribed') + ' topics:\n'
        for (topic_name, topic_types, pub_count, sub_count) in topic_info:
            count = pub_count if is_publisher else sub_count
            if count:
                topic_types_formatted = ', '.join(topic_types)
                count_str = str(count) + ' ' + ('publisher' if is_publisher else 'subscriber') \
                    + ('s' if count > 1 else '')
                message += f' * {topic_name} [{topic_types_formatted}] {count_str}\n'
        return message

    def main(self, *, args):
        topic_info = []
        # NodeStrategy包括两个实现DirectNode和DaemonNode,DaemonNode在localhost 11511+domain 端口上,DirectNode
        #下面代码走DaemonNode分支 
        with NodeStrategy(args) as node:
            topic_names_and_types = get_topic_names_and_types(
                node=node,
                include_hidden_topics=args.include_hidden_topics)
            for (topic_name, topic_types) in topic_names_and_types:
                if args.verbose:
                    pub_count = node.count_publishers(topic_name)
                    sub_count = node.count_subscribers(topic_name)
                    topic_info.append((topic_name, topic_types, pub_count, sub_count))
                else:
                    topic_info.append((topic_name, topic_types, 0, 0))
			 #输出发现主题个数, -c
        if args.count_topics:  
            print(len(topic_names_and_types))
        elif topic_names_and_types:
        		# 列出有关每个主题的完整详细信息 -v
            if args.verbose:
                print(self.show_topic_info(topic_info, is_publisher=True))
                print(self.show_topic_info(topic_info, is_publisher=False))
            else:
            	# ros2 topic list 默认输出
                for (topic_name, topic_types, _, _) in topic_info:
                    msg = '{topic_name}'
                    topic_types_formatted = ', '.join(topic_types)
                    if args.show_types:
                        msg += ' [{topic_types_formatted}]'
                    print(msg.format_map(locals()))

列如
ros2 topic list -c
ros2 topic list -v
ros2 topic list -t
ros2 topic list --include-hidden-topics

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-04-04 12:06:36  更:2022-04-04 12:11:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/12 2:50:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码