ROS系统底层原理及代码剖析(1)

  • ros中的序列化和反序列化
  • roscpp_core代码分析
  • 消息的订阅与发布
  • ros核心函数分析
  • 各个管理器的作用

1、目的

  本文介绍ROS机器人操作系统(Robot Operating System)的实现原理,从最底层分析ROS代码是如何实现的。

2、ros是什么

  ROS是一种开源的机器人操作系统,它提供了一个分布式的通信框架和一系列的工具和库,使得机器人软件开发变得更加容易和灵活。ROS的底层原理和代码可以分为以下几个方面进行剖析:

  2.1 节点通信原理
  ROS中的节点是指运行在不同进程或机器上的独立应用程序,节点之间的通信是通过ROS的master节点来实现的。当一个节点需要与其他节点通信时,它首先要向master节点注册自己,并查询其他节点的信息,比如名称、主题等。然后它可以通过发布器(publisher)和订阅器(subscriber)来发布和接收消息,消息会被master节点转发给订阅者的回调函数。

  2.2 ROS消息格式
  ROS的消息格式是基于ROS定义的消息格式,它是一种文本格式,用于在节点之间传输数据。每个消息类型都有一个由字符串表示的名称和一组由类型和名称组成的字段。ROS使用消息描述文件(.msg)来定义消息类型,这些文件通常以文本格式保存,并由ROS常规库提供的消息生成工具生成对应的源代码。

  2.3 ROS参数服务器
  ROS参数服务器是一个全局的键-值存储系统,用于节点之间的参数共享和修改。节点可以使用ROS API来读取和写入参数服务器中的参数,这些参数在ROS系统启动时从配置文件中加载,并可以在运行时动态修改。

  2.4 ROS启动过程
  ROS的启动过程主要包括master节点的启动,参数服务器的初始化,节点的注册和通信的建立。在ROS系统启动时,master节点会被启动,并监视节点的注册、卸载和通信请求。每个节点注册后,master会将它们加入到网络拓扑结构中,并将节点之间建立起通信。

  2.5 ROS客户端库
  ROS客户端库是一组用于构建和使用ROS节点和其它ROS工具的库,包括C++、Python等语言。ROS客户端库提供了一系列的API,包括用于节点注册和卸载、消息发布和订阅、参数读写和服务调用等功能。这些API封装了ROS系统中的底层细节,使得用户可以更加方便地使用ROS。

3、ros系统的核心组件

  3.1 节点(Node)
  ROS节点是ROS系统中最基本的运行单元,它可以是一个独立的进程,也可以是多个进程的组合。节点之间可以通过发布/订阅、服务调用、参数服务器等方式进行通信和交互。例如,在开发一个机器人控制系统时,可以将机器人控制板的驱动部分作为一个节点,将感应器部分作为另一个节点。控制板节点通过发布电机控制指令到主题中,感应器节点通过订阅主题获取机器人编码器数据,两个节点通过主题通信协作实现了机器人的控制。

  3.2 主节点(Master)
  主节点是ROS系统的中心管理机构,负责节点之间的协调和通信。主要维护了节点之间的配对关系和消息路由信息,同时也负责节点之间的服务调用和参数传递等功能。例如,在访问ROS系统之前,必须先启动主节点。所有的节点在启动时都会向主节点注册,主节点管理节点之间的通信,包括查找节点、创建主题、建立服务等。

  3.3 消息(Message)
  ROS消息是ROS节点之间进行数据交换的基本单元,它可以是各种数据类型的结构体,包括数字、字符串、图像、点云等。消息可以通过主题进行发布和订阅。例如,在ROS中,常用的消息格式有std_msgs/Int32、sensor_msgs/Image、geometry_msgs/Point等,这些消息类型可以通过定义消息格式来实现节点之间的数据传递。

  3.4 主题(Topic)
  主题是ROS节点之间消息传递的通道,它可以被认为是一个消息队列,其中包含了一个或多个发布者和订阅者。发布者负责将消息发送到主题,而订阅者则从主题中接收消息。主题的消息传递是异步的,并且是单向的,即发布者将消息发送到主题,而订阅者从主题中接收消息。例如,在机器人控制系统中,将电机控制指令发布到主题中,所有订阅该主题的节点都可以接收到这个消息。

  3.5 服务(Service)
  服务是ROS节点之间同步通信的一种方式,它可以被认为是一种RPC(远程过程调用)机制。服务请求者向服务提供者发送请求消息,服务提供者接收请求并发送响应消息给服务请求者。服务请求和响应是一对一的,请求者发送一个消息并等待回应。例如,在ROS中,可以创建一个名为“/add_two_ints”的服务,该服务接收两个整数,然后返回它们的和。

  3.6 参数服务器(Parameter Server)
  ROS参数服务器是ROS节点之间共享数据的一种机制,它可以存储节点的共享参数,并被所有节点访问和修改。参数服务器通常被用来存储配置参数、机器人模型参数等信息。例如,在机器人控制系统中,可以将机器人的PID控制器参数存储在参数服务器中,所有节点都可以访问这些参数并通过修改它们来修改控制器行为。

4、ros中的序列化和反序列化

  ROS使用序列化来将消息数据转换为字节流,并将其传输到网络或存储在文件中。这是因为ROS的分布式系统需要在不同的节点之间进行通信,并且节点可能需要使用不同的编程语言或计算机架构。序列化可以确保消息在传输或存储过程中的一致性,无论消息发送或接收方的编程语言或架构如何不同。此外,序列化还可以帮助ROS实现高效的消息传输和存储,因为它可以将消息数据压缩为较小的字节流,并且可以提高数据传输和存储的速度。
  ROS提供了两个消息序列化库,一个是ros::serialization,另一个是ROS Messages API。这里我们以ros::serialization为例,来详细解释ROS序列化的使用方式和功能。代码在这里链接地址。

4.1 序列化

  ros::serialization提供了一个类模板ros::serialization::Serializer,用于将ROS消息进行序列化。下面是一个将std_msgs::String消息序列化为字节数组的例子。

#include <ros/ros.h>
#include <std_msgs/String.h>
#include <ros/serialization.h>

int main(int argc, char** argv)
{
  // 初始化ROS节点
  ros::init(argc, argv, "serializer_example");

  // 创建std_msgs::String消息
  std_msgs::String msg;
  msg.data = "Hello ROS!";

  // 序列化std_msgs::String消息
  uint32_t serialized_size = ros::serialization::serializationLength(msg);
  boost::shared_array<uint8_t> buffer(new uint8_t[serialized_size]);
  ros::serialization::OStream stream(buffer.get(), serialized_size);
  ros::serialization::Serializer<std_msgs::String>::serialize(stream, msg);

  // 输出序列化后的字节数组
  for(int i=0; i<serialized_size; i++)
  {
    printf("%02x ", buffer[i]);
  }

  printf("\n");

  return 0;
}

  在上述代码中,我们首先创建了一个std_msgs::String消息,并将其赋值为”Hello ROS!”。然后,我们使用ros::serialization::OStream将消息序列化为字节数组,并使用ros::serialization::Serializer<std_msgs::String>::serialize将其写入字节数组中。最后,我们遍历字节数组并输出每个字节的十六进制表示。

4.2 反序列化

  反序列化是将二进制消息转换为ROS消息的过程。ros::serialization提供了一个类模板ros::serialization::Serializer,用于将字节数组进行反序列化。下面是一个将字节数组反序列化为std_msgs::String消息的例子。

#include <ros/ros.h>
#include <std_msgs/String.h>
#include <ros/serialization.h>

int main(int argc, char** argv)
{
  // 初始化ROS节点
  ros::init(argc, argv, "deserializer_example");

  // 定义字节数组
  uint8_t buffer[] = {0x15, 0x00, 0x00, 0x00, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x52, 0x4f, 0x53, 0x21};

  // 反序列化std_msgs::String消息
  std_msgs::String msg;
  ros::serialization::IStream stream(buffer, sizeof(buffer));
  ros::serialization::Serializer<std_msgs::String>::deserialize(stream, msg);

  // 输出反序列化后的消息
  ROS_INFO("Received message: %s", msg.data.c_str());

  return 0;
}

  在上述代码中,我们首先定义了一个字节数组,其包含了一个序列化后的std_msgs::String消息。然后,我们使用ros::serialization::IStream将字节数组反序列化为std_msgs::String消息,并使用ros::serialization::Serializer<std_msgs::String>::deserialize将其读取到msg中。最后,我们使用ROS_INFO输出反序列化后的消息。

  总的来说,ROS序列化是ROS系统中非常重要的一部分,它使得节点之间的数据传输变得高效且可靠。开发者可以通过ros::serialization来实现ROS消息的序列化和反序列化,这些工具可以帮助开发者轻松解决消息格式、字节序和对齐等问题。

5、roscpp_core代码分析

  分析的代码:在这里
  在该代码仓库中, roscpp_core 包被认为是一个过时的包,因为它已经被 ROS Kinetic 版本中的 roscpp 取代了。但是,对于一些历史性问题,roscpp_core 包仍然在某些项目中得到使用。(ROS诞生的时候是2007年,有些序列化库可能还不存在比如protobuf诞生于2008年)

  roscpp_core 包提供了 ROS 系统的 C++ 通信库的基本实现,包括消息的发布、订阅、服务等等。具体来说,在 roscpp_core 包中,实现了以下三个子包:
  1.roscpp_serialization:ROS 消息的序列化和反序列化实现。
  2.roscpp_traits:ROS 消息类型的元信息获取实现。
  3.test:roscpp_core 包的一些单元测试。
  接下来,本文将挑选出 roscpp_serialization 和 roscpp_traits 两个子包的核心代码片段进行讲解和说明。

5.1 roscpp_serialization 子包

  在 roscpp_serialization 子包中,最关键的组件就是 Serializer 类,这个类提供了 ROS 消息序列化和反序列化的底层实现。
  Serializer 类的定义可以在 roscpp_serialization/include/roscpp_serialization/serialization.h 文件中找到。下面是其定义的部分代码:

template<typename T, typename Allocator = std::allocator<void> >
class Serializer
{
public:
  /// Constructor
  Serializer();

  /// Destructor
  virtual ~Serializer() {}

  /// Fill a buffer with a serialized representation of a message
  virtual uint32_t serialize(void* buffer, const T& message);

  /// Calculate the serialized length of a message
  virtual uint32_t serializedLength(const T& message) const;

  /// Deserialize a message from a buffer
  virtual bool deserialize(void* buffer, T& message);
};

  该类包含了以下几个公共成员函数:

  Serializer():构造函数,创建 Serializer 类的实例。
  ~Serializer():析构函数。
  serialize(void* buffer, const T& message):将消息序列化为二进制数据,并将序列化后的数据写入给定的缓冲区 buffer 中。该函数返回序列化后的数据大小。
  serializedLength(const T& message):计算给定消息序列化后需要的字节长度。
  deserialize(void* buffer, T& message):从给定的二进制数据缓冲区 buffer 中反序列化消息,并将反序列化得到的消息写入到给定的 T(模板参数类型) 变量 message 中。函数返回一个布尔值,指示是否反序列化成功。

  可以看出,Serializer 类是非常关键的一个类。它负责将 ROS 消息序列化为二进制流,并将其反序列化为 ROS 消息。
  下面是一个简单的例子,用于演示如何使用 Serializer 类,将一个 std_msgs::String 消息序列化为二进制流,并输出序列化后的数据大小。

#include <ros/ros.h>
#include <std_msgs/String.h>
#include <roscpp_serialization/Serializer.h>

int main(int argc, char** argv)
{
    // 初始化 ROS 节点
    ros::init(argc, argv, "test_serialization");
    ros::NodeHandle nh;

    // 创建一个 std_msgs::String 消息,并给其赋值
    std_msgs::String msg;
    msg.data = "Hello, world!";

    // 创建一个 Serializer 对象,用于将消息序列化为二进制流
    roscpp_serialization::Serializer<std_msgs::String> serializer;
    uint32_t serialized_size = serializer.serializedLength(msg); // 获取序列化后的数据大小
    boost::shared_array<uint8_t> buffer(new uint8_t[serialized_size]); // 创建缓冲区
    serializer.serialize(buffer.get(), msg); // 将消息序列化到缓冲区

    // 输出序列化后的数据大小
    ROS_INFO_STREAM("Serialized message size: " << serialized_size);

    return 0;
}

  以上代码中,我们首先创建了一个 ROS 节点,并创建了一个 std_msgs::String 消息。然后,我们创建了一个 Serializer 对象,并使用 serializedLength() 函数计算了消息序列化后的字节长度。接着,我们创建了一个缓冲区,并使用 serialize() 函数将消息序列化到缓冲区中。最后,我们输出了序列化后的数据大小。

5.2 roscpp_traits 子包

  在 roscpp_traits 子包中,最关键的组件是 DataType 类,该类负责获取 ROS 消息类型的元信息,包括消息类型名称、消息类型哈希值、ROS 消息头结构等。
  下面是 DataType 类的定义,可以在 roscpp_traits/include/roscpp_traits/datatype.h 文件中找到。

template<typename T>
struct DataType
{
  static std::string value() { return "unknown_type"; }
  static const char* md5sum() { return "d41d8cd98f00b204e9800998ecf8427e"; }
  static const char* definition() { return "message data not yet set"; }

  /**
   * Returns a "struct _Headerless" that can be used
   * to serialize/deserialize messages.
   */
  static ros::SerializedMessage serializedHeaderlessMessageDefinition();

  /**
   * Returns whether or not a struct is empty. If it is, that means it has no
   * fields.
   */
  static bool isEmpty() { return false; }

  /**
   * Used to force registration at startup
   */
  static void* instantiate() { return NULL; }
};

  该类模板包含了以下几个公共成员函数:
  value():获取消息类型名称。
  md5sum():获取消息 MD5 哈希码。
  definition():获取消息结构定义。
  serializedHeaderlessMessageDefinition():获取一个 _Headerless 结构体,用于消息的序列化和反序列化。
  isEmpty():判断该类型是否为空。
  instantiate():用于在启动时强制实例化该类型。

  在 ROS 中,所有的消息类型都需要注册到到 ROS 系统中,以便于在后续的消息通信时能够进行类型匹配。在 roscpp_traits 子包中,使用了一个宏定义 ROSCPP_TRAITS_SPECIALIZE 来使用具体的消息类型来实例化 DataType 类。下面是一个例子:

  下面,我们以 std_msgs::String 消息类型为例子,来演示如何通过显式实例化来注册消息类型。

#include <ros/ros.h>
#include <std_msgs/String.h>
#include <roscpp_traits/datatype.h>
#include <roscpp_serialization/serializer.h>

ROSCPP_TRAITS_SPECIALIZE(std_msgs::String, "std_msgs/String")

int main(int argc, char** argv)
{
    // 初始化 ROS 节点
    ros::init(argc, argv, "test_datatype");
    ros::NodeHandle nh;

    // 使用 DataType<std_msgs::String>::value() 函数获取消息类型名称
    ROS_INFO_STREAM("Message type: " << DataType<std_msgs::String>::value());

    // 创建一个 std_msgs::String 消息,并给其赋值
    std_msgs::String msg;
    msg.data = "Hello, world!";

    // 使用 Serializer<std_msgs::String> 类将消息序列化为二进制流
    roscpp_serialization::Serializer<std_msgs::String> serializer;
    uint32_t serialized_size = serializer.serializedLength(msg); // 获取序列化后的数据大小
    boost::shared_array<uint8_t> buffer(new uint8_t[serialized_size]); // 创建缓冲区
    serializer.serialize(buffer.get(), msg); // 将消息序列化到缓冲区

    // 输出序列化后的数据大小
    ROS_INFO_STREAM("Serialized message size: " << serialized_size);

    return 0;
}

  以上代码中,我们使用 ROSCPP_TRAITS_SPECIALIZE 宏显式地注册了 std_msgs::String 消息类型到 ROS 系统中。接着,我们使用 DataType<std_msgs::String>::value() 函数获取消息类型名称,以及使用 Serializer<std_msgs::String> 类将消息序列化为二进制流,并输出序列化后的数据大小。可以看出,在使用 ROSCPP_TRAITS_SPECIALIZE 宏注册完消息类型后,我们就可以随意使用该消息类型的相关实现。

  总结起来,本文介绍了 roscpp_core 包中 roscpp_serialization 和 roscpp_traits 两个子包的核心代码片段以及用例。其中,Serializer 类和 DataType 类是两个非常关键的组件,分别负责将 ROS 消息序列化为二进制流和 获取消息类型的元信息。在实际使用中,这些组件很有用,他们为 ROS 系统提供了 C++ 中的消息序列化和反序列化的实现。

  在 ROS 中,消息的序列化和反序列化过程非常重要,因为它涉及到 ROS 节点之间的通信。ROS 通过将消息序列化为二进制流,在节点之间实现了跨平台和跨语言的消息传输。本文主要介绍了 ROS 中的序列化与反序列化的实现原理,并提供了使用 roscpp_serialization 和 roscpp_traits 两个子包进行消息序列化和反序列化的示例。在使用过程中需要注意,不同的数据类型需要在 roscpp_traits 中显式注册,以便 ROS 能够正确地进行编解码操作。
  需要注意的是,由于 roscpp_core 代码已经停止维护,推荐使用 ROS 的 C++ 客户端库 ros2cpp 这个仓库进行进一步开发和部署。该仓库提供了 C++ 中的 ROS 节点编程接口,并将消息的序列化和反序列化功能移植至了 ROS 2 中,能够更好地支持最新的 ROS 版本。

6、消息的订阅与发布

  分析的代码:在这里

6.1 ros要解决通信问题

  ROS解决通信问题是为了让机器人系统中各个组件可以相互通信,实现数据交换和共享资源。在机器人系统中,不同的组件或节点需要传递消息来共同完成任务,例如传感器节点、控制节点、执行节点等都需要相互通信来实现机器人的控制和操作。而且,机器人系统中的节点数量通常非常庞大,需要实现高效、可靠的通信才能保证机器人系统的正确运行。
  ROS通过提供基于消息传递的通信机制,让机器人系统中的各个组件之间可以方便、高效地传递消息和数据,让机器人系统更加容易地协同工作和实现复杂任务。同时,ROS提供了多种通信协议,以适应不同环境下的问题,让机器人开发者们可以根据实际需要灵活地选择适合的通信方式。

6.2 ros中一个节点的诞生过程

在ROS中,一个节点的诞生过程可以大致分为以下步骤:

6.2.1.初始化ROS系统:

  第一步是调用 ros::init() 函数,在ROS系统启动时完成。该函数会读取命令行参数,检查ROS参数和ROS Master的启动情况,确保ROS系统的正确运行。如果ROS系统已经被初始化过,则调用此函数无效。

  init() 函数是 node_handle.h 中的一个成员函数,用于初始化 ROS 节点。该函数的定义如下:

void init(const M_string& remappings, const std::string& name, uint32_t options);

其中:
  remappings:一个 map 类型的结构,其中包含节点名称重映射的信息。在初始化节点时,如果 remappings 中包含了节点名称的重映射信息,原本的节点名称将被修改为新的节点名称。这个参数可以为空。
  name:ROS 节点的名称。如果 name 参数为空,则 ROS 节点的名称将根据 ros::this_node::getName() 自动设置。
  options:一个整数型值,表示 ROS 节点的初始化选项。默认情况下是 0,不需要额外的选项。

  init() 函数是 ROS 节点初始化的第一步,在这里会做一些必要的初始化工作,例如向 ROS Master 注册节点,读取命令行参数、设置 ROS 参数等。
  在 init() 函数中,首先会检查 ROS 节点是否已经被初始化过,如果已经初始化过则函数会直接返回,否则会创建一个节点句柄(NodeHandle),并且读取remappings中定义的映射关系,同时注册节点名称到 ROS Master,并且初始化一些用于通信的底层实现。最后,将该节点句柄保存到节点管理器中,之后可以使用该句柄来订阅、发布主题,调用服务等。
  这个函数是 ROS 节点初始化的第一步,不论是在编写 ROS 节点程序还是在使用ROS系统时,都会用到该函数。在 ROS 的代码库和程序中,init() 函数经常被用到,是涉及 ROS 节点初始化的重要接口之一。初学者需要了解它的作用和使用方法,才能更好地理解和使用 ROS 程序。

6.2.2.创建ROS节点:

  调用 ros::NodeHandle nh 函数,可以创建 ROS 节点,并在ROS系统中注册它的身份。该代码行可以创建一个名为“节点名称”的ROS节点:

#include "ros/ros.h"
int main(int argc, char **argv)
{
  ros::init(argc, argv, "节点名称");
  ros::NodeHandle nh;
  
  //...
}

  在创建节点时,可以通过 ros::NodeHandle 构造函数中附加命名空间信息,从而在ROS系统中创建具有唯一命名的节点。这样一个ROS节点就被成功地创建出来了。

  NodeHandle::NodeHandle(const std::string& ns, const M_string& remappings) 该构造函数根据指定的名称空间和参数信息,创建一个新的 ROS 节点处理对象。以下是该构造函数的详细解释:

NodeHandle::NodeHandle(const std::string& ns, const M_string& remappings)
  : namespace_(this_node::getNamespace())
  , callback_queue_(0)
  , collection_(0)
{
  // 判断传入的ns是否以符号~开头
  std::string tilde_resolved_ns;
  if (!ns.empty() && ns[0] == '~') // starts with tilde
    tilde_resolved_ns = names::resolve(ns); // 如果是以~开头, 将其解析为全局名称空间下的名称
  else // 不以 ~ 开头
    tilde_resolved_ns = ns; // 直接赋值

  construct(tilde_resolved_ns, true); // 调用 construct 函数完成处理对象的创建

  initRemappings(remappings); // 处理 remappings
}

  构造函数的逻辑比较简单,主要分为三个步骤:1.解析节点的名称空间;2.调用 construct() 函数创建 ROS 节点处理对象;3.处理参数重新映射。
  在第一步中,该构造函数会判断传入的 ns 是否以符号 ~ 开头。如果以 ~ 开头,将其解析为全局名称空间下的名称。否则,直接使用传入的名称空间。解析全局名称空间下的名称可以使用 names::resolve() 函数来完成。
  在第二步中,该构造函数调用了一个名为 construct() 的辅助函数,用于创建 ROS 节点处理对象。在这里,构造函数将已解析的名称空间传递给 construct() 函数,以便在创建节点处理对象时使用。
  在第三步中,该构造函数调用了 initRemappings() 函数来处理参数重新映射。在这里,它将传入的参数重新映射表 remappings 作为参数传递给 initRemappings() 函数,以便对节点处理对象进行重新映射设置。

6.2.3.注册节点:

  在节点完成创建之后,需要将其注册到ROS Master上,以便其他节点可以找到并与之通信。使用 ros::spin() 函数或 ros::spinOnce() 函数来将节点注册到ROS Master上。

#include "ros/ros.h"
int main(int argc, char **argv)
{
  ros::init(argc, argv, "节点名称");
  ros::NodeHandle nh;
  
  ros::spin();
  
  return 0;
}

  在 ros::spin() 函数被调用时,节点开始等待从订阅的主题接收到的消息。在消息到达时,会调用相关的监听函数(回调函数)处理消息内容。

6.2.4.发布和订阅主题:

  节点需要订阅和发布主题以进行消息交换。创建它们的方法分别是 ros::Publisher 和 ros::Subscriber。创建它们后,就可以开始发布和订阅主题了。

#include "ros/ros.h"
#include "std_msgs/String.h"

void callback(const std_msgs::String::ConstPtr& msg)
{
  ROS_INFO("接收到消息: [%s]", msg->data.c_str());
}

int main(int argc, char **argv)
{
  ros::init(argc, argv, "节点名称");
  ros::NodeHandle nh;

  ros::Subscriber sub = nh.subscribe("主题名称", 1000, callback);
  ros::Publisher pub = nh.advertise<std_msgs::String>("主题名称", 1000);

  ros::spin();

  return 0;
}

  在上面的例子中,节点订阅一个名为“主题名称”的主题,并将回调函数 callback 附加到主题上。然后在ROS系统中广播一个具有相同主题名称的消息。

6.2.5.服务调用:

  节点可以通过在ROS系统中注册服务以接受请求,并使用 ros::ServiceClient 向其他节点发送请求并获取响应。

#include "ros/ros.h"
#include "demo_service/demo_service.h"

int main(int argc, char **argv)
{
  ros::init(argc, argv, "节点名称");
  ros::NodeHandle nh;

  ros::ServiceClient client = nh.serviceClient<demo_service::demo_service>("服务名称");

  demo_service::demo_service srv;
  srv.request.request_data = "请求数据";

  if (client.call(srv))
  {
    ROS_INFO("响应数据: %s", srv.response.response_data.c_str());
  }
  else
  {
    ROS_ERROR("服务调用失败");
  }
  
  return 0;
}
6.2.6.处理消息:

  在启动节点后,它会不断等待从订阅的主题上接收到的消息,然后执行相应的回调函数进行处理。

  以上就是ROS节点的诞生过程,节点可以创建、订阅和发布主题、调用服务以及处理消息。这些可以通过使用ROS提供的多种API来简化实现。需要注意的是,在节点中必须调用 ros::spin() 函数或 ros::spinOnce() 函数来启动节点和注册到ROS Master上,否则节点将无法接收到任何消息。

7、ros核心函数分析

7.1 construct()函数

  在创建ROS节点时,ros::NodeHandle 构造函数中会用到construct() 函数。
  该函数位置:在这里 152行

void NodeHandle::construct(const std::string& ns, bool validate_name)
{
  if (!ros::isInitialized())
  {
    ROS_FATAL("You must call ros::init() before creating the first NodeHandle");
    ROS_BREAK();
  }

  collection_ = new NodeHandleBackingCollection; // 创建一个 NodeHandleBackingCollection 对象
  unresolved_namespace_ = ns; // 保存未解析的名称空间

  if (validate_name) // 如果需要验证名称空间
    namespace_ = resolveName(ns, true); // 将名称空间解析成绝对路径,并验证
  else
    {
      namespace_ = resolveName(ns, true, no_validate()); // 将名称空间解析成绝对路径,但不验证
      // FIXME validate namespace_ now
    }
  ok_ = true;

  boost::mutex::scoped_lock lock(g_nh_refcount_mutex);

  // 如果节点处理对象个数为 0,且系统未启动,则启动 ROS 系统的核心功能
  if (g_nh_refcount == 0 && !ros::isStarted())
  {
    g_node_started_by_nh = true;
    ros::start();
  }

  ++g_nh_refcount; // 增加节点处理对象个数的计数器
}

  在 construct() 函数中,首先判断 ROS 系统是否已经被初始化。如果未初始化,会输出一条错误信息,并终止程序的运行。
  接着,函数创建了一个名为 collection_ 的 NodeHandleBackingCollection 对象,该对象用于管理节点处理对象的生命周期。函数还保存了未解析的名称空间。
  接下来,函数根据传入的 validate_name 参数,决定是否需要验证节点处理对象的名称空间。如果需要验证,函数调用 resolveName() 函数将名称空间解析为绝对路径,并进行验证。如果不需要验证,则同样调用 resolveName() 函数,但不对名称空间进行验证。
  在完成节点处理对象的名称空间设置后,函数将 ok_ 标志设置为 true。
  接着,函数使用互斥锁保护节点处理对象个数的计数器,并判断节点处理对象个数是否为 0,以及 ROS 系统是否已经启动。如果节点处理对象个数为 0,且 ROS 系统未启动,则启动 ROS 系统的核心功能。
  最后,该函数增加节点处理对象个数的计数器。

7.2 ros::start函数

  我们这里重点来看ros::start()的实现:
  该函数位置:在这里 293行

void start()
{
  boost::mutex::scoped_lock lock(g_start_mutex); // 使用互斥锁保证函数的线程安全性

  if (g_started) // 如果 ROS 系统已经启动,则返回
  {
    return;
  }

  g_shutdown_requested = false;
  g_shutting_down = false;

  // 标志 ROS 系统已经启动,并设置相关的标志
  g_started = true;
  g_ok = true;

  // 解析 ROSCPP_ENABLE_DEBUG 环境变量,并决定是否启用 ROS 的调试模式
  bool enable_debug = false;  
  std::string enable_debug_env;
  if ( get_environment_variable(enable_debug_env,"ROSCPP_ENABLE_DEBUG") )
  {
    try
    {
      enable_debug = boost::lexical_cast<bool>(enable_debug_env.c_str());
    }
    catch (boost::bad_lexical_cast&)
    {
    }
  }

  // 初始化 TCP 连接会话的 keepalive 选项
  param::param("/tcp_keepalive", TransportTCP::s_use_keepalive_, TransportTCP::s_use_keepalive_);

  // 注册一个回调函数,以便在不同的线程中检查 ROS 系统是否需要关闭
  PollManager::instance()->addPollThreadListener(checkForShutdown);

  // 绑定 ROS 的关闭命令到 XML-RPC 端口上
  XMLRPCManager::instance()->bind("shutdown", shutdownCallback);

  // 初始化 ROS 的定时器管理器
  initInternalTimerManager();

  // 启动 ROS 的各种管理器,包括话题管理器、服务管理器、连接管理器、轮询管理器和 XML-RPC 管理器
  TopicManager::instance()->start();
  ServiceManager::instance()->start();
  ConnectionManager::instance()->start();
  PollManager::instance()->start();
  XMLRPCManager::instance()->start();

  // 如果在初始化过程中没有设置 NoSigintHandler 选项,则安装一个 SIGINT 信号处理函数
  if (!(g_init_options & init_options::NoSigintHandler))
  {
    signal(SIGINT, basicSigintHandler);
  }

  // 初始化 ROS 时间
  ros::Time::init();

  // 如果没有设置 NoRosout 选项,则创建一个 ROSOutAppender 对象,将日志消息输出到 roscpp/Logger 这个话题上
  if (!(g_init_options & init_options::NoRosout))
  {
    g_rosout_appender = new ROSOutAppender;
    ros::console::register_appender(g_rosout_appender);
  }

  // 注册 get_loggers 和 set_logger_level 这两个服务
  {
    ros::AdvertiseServiceOptions ops;
    ops.init<roscpp::GetLoggers>(names::resolve("~get_loggers"), getLoggers);
    ops.callback_queue = getInternalCallbackQueue().get();
    ServiceManager::instance()->advertiseService(ops);
  }

  {
    ros::AdvertiseServiceOptions ops;
    ops.init<roscpp::SetLoggerLevel>(names::resolve("~set_logger_level"), setLoggerLevel);
    ops.callback_queue = getInternalCallbackQueue().get();
    ServiceManager::instance()->advertiseService(ops);
  }

  // 如果启用了调试模式,则注册一个关闭所有连接的服务
  if (enable_debug)
  {
    ros::AdvertiseServiceOptions ops;
    ops.init<roscpp::Empty>(names::resolve("~debug/close_all_connections"), closeAllConnections);
    ops.callback_queue = getInternalCallbackQueue().get();
    ServiceManager::instance()->advertiseService(ops);
  }

  // 如果使用仿真时间,则设置 ROS 时间的当前值,并订阅 /clock 话题
  {
    bool use_sim_time = false;
    param::param("/use_sim_time", use_sim_time, use_sim_time);

    if (use_sim_time)
    {
      Time::setNow(ros::Time());
    }

    if (use_sim_time)
    {
      ros::SubscribeOptions ops;
      ops.init<rosgraph_msgs::Clock>(names::resolve("/clock"), 1, clockCallback);
      ops.callback_queue = getInternalCallbackQueue().get();
      TopicManager::instance()->subscribe(ops);
    }
  }

  // 创建一个线程,运行内部回调队列的轮询线程函数
  g_internal_queue_thread = boost::thread(internalCallbackQueueThreadFunc);
  getGlobalCallbackQueue()->enable();

  // 输出 ROS 系统的启动信息,并记录日志
  ROSCPP_LOG_DEBUG("Started node [%s], pid [%d], bound on [%s], xmlrpc port [%d], tcpros port [%d], using [%s] time", 
		   this_node::getName().c_str(), getpid(), network::getHost().c_str(), 
		   XMLRPCManager::instance()->getServerPort(), ConnectionManager::instance()->getTCPPort(), 
		   Time::useSystemTime() ? "real" : "sim");

  // 如果在启动中断期间接收到关闭请求,则等待关闭完全完成后再返回
  if (g_shutting_down)
  {
    boost::recursive_mutex::scoped_lock lock(g_shutting_down_mutex);
  }
}

  在 ros::start() 函数中,首先使用互斥锁保证函数的线程安全性。如果 ROS 系统已经启动,则跳过不执行任何操作。
  接着,函数标志 ROS 系统正在启动中,并初始化一些相关的变量和选项。然后,函数解析 ROSCPP_ENABLE_DEBUG 环境变量,并决定是否启用 ROS 的调试模式。
  接下来,函数初始化 TCP 连接会话的 keepalive 选项,并注册一个回调函数,以便在不同的线程中检查 ROS 系统是否需要关闭。然后绑定 ROS 的关闭命令到 XML-RPC 端口上,并初始化 ROS 的定时器管理器。
  接下来,函数启动 ROS 的各种管理器,包括话题管理器、服务管理器、连接管理器、轮询管理器XML-RPC 管理器。如果在初始化过程中没有设置 NoSigintHandler 选项,则安装一个 SIGINT 信号处理函数。然后,函数初始化 ROS 时间,并创建一个 ROSOutAppender 对象,将日志消息输出到 roscpp/Logger 这个话题上。
  接着,函数注册 get_loggers 和 set_logger_level 这两个服务。如果启用了调试模式,则注册一个关闭所有连接的服务。如果使用仿真时间,则设置 ROS 时间的当前值,并订阅 /clock 话题。
  最后函数创建一个线程,运行内部回调队列的轮询线程函数,并输出 ROS 系统的启动信息,并记录日志。如果在启动中断期间接收到关闭请求,则等待关闭完全完成后再返回。

##########################分割线##########################(分析5种管理器的作用)

8、各个管理器的作用

下面我们分别来详细解释一下这5种管理器的作用,ros的各种管理器,包括话题管理器、服务管理器、连接管理器、轮询管理器和 XML-RPC 管理器

  // 启动 ROS 的各种管理器,包括话题管理器、服务管理器、连接管理器、轮询管理器和 XML-RPC 管理器
  TopicManager::instance()->start();
  ServiceManager::instance()->start();
  ConnectionManager::instance()->start();
  PollManager::instance()->start();
  XMLRPCManager::instance()->start();

8.1 话题管理器:

  话题管理器负责管理 ROS 中的话题(Topic),它维护了一个话题列表,以及每个话题监听器和发布者的列表。当一个话题有新的消息发布时,话题管理器会将该消息推送到所有已经注册的监听器。话题管理器可以通过ros::Subscriber和ros::PublisherAPI来添加和删除话题的监听器和发布者。

 TopicManager::instance()->start();
void TopicManager::start()
{
  boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
  shutting_down_ = false;

  poll_manager_ = PollManager::instance();
  connection_manager_ = ConnectionManager::instance();
  xmlrpc_manager_ = XMLRPCManager::instance();

  xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, _1, _2));
  xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, _1, _2));
  xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, _1, _2));
  xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, _1, _2));
  xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, _1, _2));
  xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, _1, _2));

  poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
}

  该函数首先获取了 shutting_down_mutex_ 的锁,将 shutting_down_ 标志设置为 false,表示节点未被关闭,进入正常运行状态。
  接着,函数获取了 PollManager、ConnectionManager 和 XMLRPCManager 的单例实例,并将这些实例分别赋值给了 poll_manager_、connection_manager_ 和 xmlrpc_manager_ 成员变量。这些实例分别用于管理 RO 消息传输的轮询、连接和 RPC 功能。其中,PollManager 类维护着当前节点所有 Subscriber 和 Publisher 的注册信息,同时也负责轮询这些注册信息,通过回调函数来处理消息传递。ConnectionManager 负责节点间的 TCP/IP 数据传输。XMLRPCManager 则是 ROS 的 XML-RPC 服务器,用于与其他节点通信。
  接下来,函数使用 XMLRPCManager 的 bind() 函数将一些重要的 XML-RPC 回调函数注册到 ROS 的 XML-RPC 服务器上,这些回调函数包括:
   publisherUpdate:当 ROS 节点的发布者更新时,此回调将通知所有订阅关联话题的节点。
   requestTopic:通过此回调函数,订阅者可以查询话题相关的信息,如话题名称、消息类型和连接信息等。
   getBusStats:通过此回调函数,节点可以获取本地消息总线(bus)的统计信息。
   getBusInfo:通过此回调函数,节点可以获取本地消息总线(bus)的详细信息。
   getSubscriptions:此回调将返回该节点注册的所有订阅者。
   getPublications:此回调将返回该节点注册的所有发布者。
  最后,函数使用 PollManageraddPollThreadListener() 方法将 processPublishQueues() 函数添加到轮询线程监听器队列中。该函数将检查所有的 Publisher 发布队列,并将消息发布到所有订阅它们的节点。
  综上所述,TopicManager::start() 函数是 ROS 发布-订阅系统的关键初始函数,它通过向 ROS XML-RPC 服务器中注册必要的回调函数,并将 ROS 内核管理器与 XML-RPC 服务器连接,监视发布者和订阅者的连接,并将新的发布者和订阅者添加到内部缓存数据结构中。最终,它启动发布队列的轮询,以便处理任何新的消息。

8.2 服务管理器:

  服务管理器负责管理 ROS 中的服务(Service),它维护了一个服务列表,以及每个服务的监听器和客户端列表。当一个服务请求被发送时,服务管理器会将该请求转发给已经注册的服务监听器,并等待响应。服务管理器可以通过ros::ServiceServer和ros::ServiceClientAPI来添加和删除服务的监听器和客户端。

 ServiceManager::instance()->start();
void ServiceManager::start()
{
  // 将标志设置为 false,表示节点未被关闭,进入正常运行状态。
  shutting_down_ = false;

  // 获取 XMLRPCManager 和 ConnectionManager 的单例实例,用于管理节点之间的远程过程调用和连接。
  xmlrpc_manager_ = XMLRPCManager::instance();
  connection_manager_ = ConnectionManager::instance();

  // 为 XML-RPC 回调函数注册必要的回调函数
  xmlrpc_manager_->bind("registerService", boost::bind(&ServiceManager::registerServiceCallback, this, _1, _2));
  xmlrpc_manager_->bind("lookupService", boost::bind(&ServiceManager::lookupServiceCallback, this, _1, _2));
  xmlrpc_manager_->bind("unregisterService", boost::bind(&ServiceManager::unregisterServiceCallback, this, _1, _2));
}

  该函数首先将 shutting_down_ 标志设置为 false,表示节点未被关闭,进入正常运行状态。
  接着,函数获取了 XMLRPCManager 和 ConnectionManager 的单例实例,并将这些实例分别赋值给了 xmlrpc_manager_ 和 connection_manager_ 成员变量。这些实例用于管理节点之间的远程过程调用和连接。
  接下来,函数使用 XMLRPCManager 的 bind() 函数将一些重要的 XML-RPC 回调函数注册到 ROS 的 XML-RPC 服务器上,这些回调函数包括:
   registerService:当远程节点想要注册(即提供)一个新的服务时,将调用此回调函数。在 ServiceManager::registerServiceCallback() 函数中,将创建一个新的 ServiceServer 对象 (一个 TCP/IP 连接),并将其添加到内部存储中,以便以后可以在该节点上查找和使用该服务。新创建的 ServiceServer 对象会向 XML-RPC 服务器注册该节点提供的服务。
   lookupService:通过此回调函数,远程节点可以查询某个服务的 TCP/IP 地址和端口号,以便与该服务建立连接。在 ServiceManager::lookupServiceCallback() 函数中,将检查请求中的服务名称,查询该服务在哪个节点上注册,并返回该节点的 TCP/IP 地址和端口号。
   unregisterService:当 ROS 节点需要注销一个服务时,此回调函数将被调用,并将其从内部存储中移除。在 ServiceManager::unregisterServiceCallback() 函数中,将解析请求中的服务名称,并将该名称从内部存储中移除。此外,该函数还会从 XML-RPC 服务器中注销该服务。
  最后,函数返回,在此之后,ROS 客户端库的服务管理器已经准备好接收来自其他节点的远程服务调用请求。
  综上所述,ServiceManager::start() 函数是 ROS 客户端库中服务管理器的关键初始函数,它通过向 ROS XML-RPC 服务器中注册必要的回调函数,并将 ROS 内核管理器与 XML-RPC 服务器连接,启动服务端,监听服务请求。最终,它为节点之间的远程服务通信提供了必要的支持。
  

8.3 连接管理器:

  连接管理器负责管理 ROS 中的连接。它维护了一个连接列表,以及每个连接的状态和元数据。当用户通过ros::Publisher或ros::SubscriberAPI创建一个新的发布者或监听器时,连接管理器会为这个节点创建一个新的连接。当一个连接关闭时,连接管理器会从列表中删除这个连接,然后通知话题/服务管理器更新关联的监听器/服务的状态。

void ConnectionManager::start()
{
  // 将标志设置为 false,表示节点未被关闭,进入正常工作状态。
  shutting_down_ = false;

  // 获取一个互斥锁,以确保同时只有一个线程可以访问 ConnectionManager 的数据结构。
  lock_guard<mutex> lock(data_mutex_);

  // 遍历所有连接协议,启动每个协议,为其对应的通信信道创建和管理内部数据结构。
  for (const auto& protocol : protocols_) {
    protocol.second->start();
  }
}

  首先,函数将 shutting_down_ 标志设置为 false,以确保节点未被关闭,处于正常工作状态。
  接着,函数获取一个互斥锁 data_mutex_,以确保只有一个线程可以访问 ConnectionManager 内部的数据结构,避免多个线程之间的竞争和冲突。
  然后,函数使用一个循环来遍历 protocols_ 中包含的协议。每个协议都将被启动 (或者说是打开),并为其对应的连接通道创建和管理内部数据结构,例如 TCP 服务和主题、服务等通信信道。
  最后,函数返回,此时 ROS 客户端库的连接管理器已经准备好接收来自其他节点的连接请求,并在内部根据协议类型为这些连接创建和管理对应的数据结构。
  综上所述ConnectionManager::start() 函数是 ROS 客户端库中连接管理器的关键激活函数,它启动并管理 ROS 节点的通信信道,包括 TCP/IP 连接、主题通信、服务调用等通信方式。在这个函数内部,ROS 客户端库不仅启动用于通信的一些协议,还为它们创建和管理内部数据结构,以确保它们运作良好,从而提供高效和稳定的节点间通信。

8.4 轮询管理器:

  轮询管理器负责执行轮询操作,以检查是否有新的 ROS 消息到达,或者是否有新的 XML-RPC 请求需要处理。轮询管理器根据一定的时间间隔执行轮询操作,并且可以通过ros::spin()等API来配合使用。

 PollManager::instance()->start();
void PollManager::start()
{
  shutting_down_ = false;
  thread_ = boost::thread(&PollManager::threadFunc, this);
}

  首先设置 shutting_down_ 属性为 false,以确保节点未被关闭,处于正常工作状态。然后创建一个 boost::thread 对象 thread_,并将该对象的执行函数设置为 PollManager::threadFunc() 。

void PollManager::threadFunc()
{
  disableAllSignalsInThisThread();

  while (!shutting_down_)
  {
    {
      boost::recursive_mutex::scoped_lock lock(signal_mutex_);
      poll_signal_();
    }

    if (shutting_down_)
    {
      return;
    }

    poll_set_.update(100);
  }
}

  在该函数中,首先通过调用 disableAllSignalsInThisThread() 函数禁用了所有信号。接着,在一个 while 循环中运行,直到 shutting_down_ 标记为真(即节点已经被关闭)才停止执行。
  在循环迭代中,使用 poll() 系统调用等待新的套接字事件。如果有一个进程间信号到达,该函数使用 poll_signal_() 处理该信号,确保正在运行的 ROS 节点能够响应信号而不是被信号终止。如果没有信号到达,线程将暂停100毫秒,然后再次等待套接字事件。

8.5 XML-RPC 管理器:

  XML-RPC 管理器负责处理 ROS 节点之间的 XML-RPC 请求。当 ROS 节点需要进行某些操作,比如发布一个新的消息或者调用某个服务时,它们将会向 XML-RPC 管理器发送一个 XML-RPC 请求。XML-RPC 管理器会接收这些请求,并将它们转发给对应的节点执行,然后将响应返回给请求方。

 XMLRPCManager::instance()->start();
void XMLRPCManager::start()
{
  shutting_down_ = false;
  port_ = 0;
  
  // 注册 getPid 函数
  bind("getPid", getPid);

  // 绑定监听端口,并启动XML-RPC服务器线程
  bool bound = server_.bindAndListen(0);
  (void) bound;
  ROS_ASSERT(bound);
  port_ = server_.get_port();
  ROS_ASSERT(port_ != 0);

  // 获取服务器的IP地址和端口号
  std::stringstream ss;
  ss << "http://" << network::getHost() << ":" << port_ << "/";
  uri_ = ss.str();

  // 启动XML-RPC服务器线程
  server_thread_ = boost::thread(boost::bind(&XMLRPCManager::serverThreadFunc, this));
}

  该函数的主要功能如下:
  1.将 shutting_down_ 重置为 false,以确保 ROS 节点不会在此处关闭。
  2.将 port_ 重置为 0。
  3.绑定 getPid 函数,以便可以在节点关闭时获取 PID。
  4.调用 bindAndListen() 函数,将XML-RPC服务器绑定到一个随机端口。
  5.获取绑定的端口号并确保它不为 0。
  6.通过使用 network::getHost() 函数获得服务器的 IP 地址,并将它和端口号组合成一个字符串作为 URI。
  7.启动XML-RPC服务器线程,该线程通过调用 serverThreadFunc() 函数来监听端口,以便能够处理来自客户端的远程过程调用请求。
  需要注意的是,在主循环结束之前,必须保证 ROS 节点处于 OK 状态。并且,启动线程后,线程一直监听端口。如果在初始化监听套接字时出现错误,程序将退出。

##########################分割线##########################(结束分析管理器)

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
xiaoxingxing的头像xiaoxingxing管理团队
上一篇 2023年11月13日
下一篇 2023年11月13日

相关推荐