ros2 bag 自定义消息(msg)读取

需求

最近从甲方拿到的数据是ros2 bag包,含自定义的消息类型,在安装了ros2之后并编译消息文件之后,查询到有一个rosbags库可以直接序列化数据,也就可以把数据以明码文件的形式(如txt)保存到电脑上了。

方法

rosbags库

这里没有使用ros2 bag play XXXX的方式进行数据读取。

因为,一方面播放bag的方式速度慢;

另一方面,ros话题订阅中的callback函数是创建新线程运行的,也就是可能同时运行,如果要保存的数据有先后之分,比如从0到1000为保存的图像命名,则需要使用index_img++;多线程之间会发生冲突、顺序颠倒的情况,导致部分图像命名一致,新保存的图像覆盖旧图像。当然这也可以使用加线程锁的的方法解决。

rosbags具有如下特点

  • 可以无缝实现bag和db3的互相转换
  • 提供了db3文件的读写接口,非常方便。
  • 转换速度很快,因为不需要按照bag播放速度读取。
  • 不用依赖ros2、ros1

关键是这个包还不用依赖ros2和ros1,在python3下可以直接使用
废话不多说,
其文档在https://ternaris.gitlab.io/rosbags/index.html
代码托管在gitlab上,https://gitlab.com/ternaris/rosbags
安装

pip install rosbags

参考:https://heroacool.blog.csdn.net/article/details/128834476

ROS2自带消息格式数据读取

from rosbags.rosbag2 import Reader
from rosbags.serde import deserialize_cdr
from cv_bridge import CvBridge
import cv2


bridge = CvBridge()
# create reader instance and open for reading
with Reader('/home/room401/changAn/object_mapping/data/rosbag_2023_03/log/log/rosbag2_2022_08_19-15_50_10') as reader:
    # topic and msgtype information is available on .connections list
    for connection in reader.connections:
        print(connection.topic, connection.msgtype)
    connections = [x for x in reader.connections if x.topic == '/camera/compressed']
    index_img = 0
    for connection, timestamp, rawdata in reader.messages(connections=connections):
        msg = deserialize_cdr(rawdata, connection.msgtype)
        # print(msg.header)
        # print(msg)
        cv_image = bridge.compressed_imgmsg_to_cv2(msg,"bgr8")
        # 显示图像
        cv2.imshow('image', cv_image)
        # cv2.imwrite(os.path.join(dir_to, str(index_img)+'.jpg'), cv_image)
        cv2.waitKey(10)
        index_img += 1
    print('共提取图像个数: %d' % index_img)

自定义消息数据读取

消息类型编译、定义,可以参考鱼香ROS的ROS2教学:动手学ROS2

在上述代码添加如下代码即可,并调用即可。参考:https://ternaris.gitlab.io/rosbags/examples/register_types.html

def regist_custom_msg():
    """将自定义的ROS2消息类型添加到types中,保证数据可以被解析"""
    """Example: Register types from msg files."""

    from pathlib import Path

    from rosbags.typesys import get_types_from_msg, register_types


    def guess_msgtype(path: Path) -> str:
        """Guess message type name from path."""
        name = path.relative_to(path.parents[2]).with_suffix('')
        if 'msg' not in name.parts:
            name = name.parent / 'msg' / name.name
        return str(name)


    add_types = {}

    for pathstr in [
        '/home/room401/ros2_ws/src/csmap_platform/vehicle_state/msg/VehicleState.msg',
        '/home/room401/ros2_ws/src/csmap_platform/ins570d/msg/Ins.msg',
    ]:
        msgpath = Path(pathstr)
        print(guess_msgtype(msgpath))
        msgdef = msgpath.read_text(encoding='utf-8')
        print(msgdef)
        add_types.update(get_types_from_msg(msgdef, guess_msgtype(msgpath)))

    register_types(add_types)

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
乘风的头像乘风管理团队
上一篇 2023年12月4日
下一篇 2023年12月4日

相关推荐