Channel
相关链接
代码文件:
参考示例:
协议
协议用于确定通信各端的消息格式。一般来说,协议都是使用一种与具体的编程语言无关的 IDL ( Interface description language )描述,然后由某种工具转换为各个语言的代码。此处简要介绍一下 AimRT 各方支持的两种 IDL 如何转换为 Cpp 代码,进阶的使用方式请参考对应 IDL 的官方文档。
Protobuf
Protobuf是一种由 Google 开发的、用于序列化结构化数据的轻量级、高效的数据交换格式,是一种广泛使用的 IDL。
在使用时,开发者需要先定义一个.proto
文件,在其中定义一个消息结构。例如example.proto
:
syntax = "proto3";
message ExampleMsg {
string msg = 1;
int32 num = 2;
}
然后使用 Protobuf 官方提供的 protoc 工具进行转换,生成 C++ 代码,例如:
protoc --cpp_out=. example.proto
这将生成example.pb.h
和example.pb.cc
文件,包含了根据定义的消息类型生成的 C++ 类和方法。
请注意,以上这套原生的代码生成方式只是为了给开发者展示底层的原理,实际使用时还需要手动处理依赖和 CMake 封装等方面的问题,比较繁琐。AimRT 对这个过程进行了一定的封装,开发者可以直接使用ProtobufGenCode.cmake文件中提供的两个 CMake 方法:
add_protobuf_gencode_target_for_proto_path
:为某个路径下的.proto
文件生成 C++ 代码,参数如下:TARGET_NAME:生成的 CMake Target 名称;
PROTO_PATH:协议存放目录;
GENCODE_PATH:生成的桩代码存放路径;
DEP_PROTO_TARGETS:依赖的 Proto CMake Target;
OPTIONS:传递给 protoc 的其他参数;
add_protobuf_gencode_target_for_one_proto_file
:为单个.proto
文件生成 C++ 代码,参数如下:TARGET_NAME:生成的 CMake Target 名称;
PROTO_FILE:单个协议文件的路径;
GENCODE_PATH:生成的桩代码存放路径;
DEP_PROTO_TARGETS:依赖的 Proto CMake Target;
OPTIONS:传递给 protoc 的其他参数;
使用示例如下:
# Generate C++ code for all '.proto' files in the current folder
add_protobuf_gencode_target_for_proto_path(
TARGET_NAME example_pb_gencode
PROTO_PATH ${CMAKE_CURRENT_SOURCE_DIR}
GENCODE_PATH ${CMAKE_CURRENT_BINARY_DIR})
之后只要链接example_pb_gencode
这个 CMake Target 即可使用该协议。例如:
target_link_libraries(my_lib PUBLIC example_pb_gencode)
ROS2 Message
ROS2 Message 是一种用于在 ROS2 中进行通信和数据交换的结构化数据格式。在使用时,开发者需要先定义一个 ROS2 Package,在其中定义一个.msg
文件,比如example.msg
:
int32 num
float32 num2
char data
然后直接通过 ROS2 提供的 CMake 方法rosidl_generate_interfaces
,为消息生成 C++ 代码和 CMake Target,例如:
rosidl_generate_interfaces(
example_msg_gencode
"msg/example.msg"
)
在这之后就可以引用相关的 CMake Target 来使用生成的 C++ 代码。详情请参考 ROS2 的官方文档和 AimRT 提供的 Example。
ChannelHandle
AimRT 中,模块可以通过调用CoreRef
句柄的GetChannelHandle()
接口,获取aimrt::channel::ChannelHandleRef
句柄,来使用 Channel 功能。其提供的核心接口如下:
namespace aimrt::channel {
class ChannelHandleRef {
public:
PublisherRef GetPublisher(std::string_view topic) const;
SubscriberRef GetSubscriber(std::string_view topic) const;
void MergeSubscribeContextToPublishContext(
const ContextRef subscribe_ctx_ref, ContextRef publish_ctx_ref) const;
};
} // namespace aimrt::channel
开发者可以调用ChannelHandleRef
中的GetPublisher
方法和GetSubscriber
方法,获取指定 Topic 名称的PublisherRef
和SubscriberRef
类型句柄,分别用于 Channel 发布和订阅。这两个方法使用注意如下:
这两个接口是线程安全的。
这两个接口可以在
Initialize
阶段和Start
阶段使用。
PublisherRef
和SubscriberRef
句柄提供了一个与具体协议类型无关的 Api 接口,但除非开发者想要使用自定义的消息类型,才需要直接调用它们提供的接口。
AimRT 官方支持了两种协议类型:Protobuf 和 Ros2 Message,并提供了这两种协议类型的 Channel 接口封装。这两套 Channel 接口除了协议类型不同,整体的 Api 风格都一致,开发者一般直接使用这两套与协议类型绑定的 Channel 接口即可。使用时需要分别引用对应的 CMake Target:
Protobuf Channel:需 CMake 引用
aimrt::interface::aimrt_module_protobuf_interface
;Ros2 Channel:需 CMake 引用
aimrt::interface::aimrt_module_ros2_interface
;
开发者还可以使用MergeSubscribeContextToPublishContext
方法,来将 subscribe 端的 context 信息传递到 publish 端的 context 中,可以用于打通整条数据链路。详情请参考 Context 章节的说明。
Publish
AimRT 提供了函数风格和Proxy风格两种风格的接口来发布一个消息:
函数风格接口:
namespace aimrt::channel {
template <typename MsgType>
bool RegisterPublishType(PublisherRef publisher);
template <typename MsgType>
void Publish(PublisherRef publisher, aimrt::channel::ContextRef ctx_ref, const MsgType& msg);
template <typename MsgType>
void Publish(PublisherRef publisher, const MsgType& msg);
} // namespace aimrt::channel
Proxy 类风格接口:
namespace aimrt::channel {
template <typename MsgType>
class PublisherProxy {
public:
explicit PublisherProxy(PublisherRef publisher);
// Context
std::shared_ptr<Context> NewContextSharedPtr(ContextRef ctx_ref = ContextRef()) const;
void SetDefaultContextSharedPtr(const std::shared_ptr<Context>& ctx_ptr);
std::shared_ptr<Context> GetDefaultContextSharedPtr() const;
// Register type
static bool RegisterPublishType(PublisherRef publisher);
bool RegisterPublishType() const;
// Publish
void Publish(ContextRef ctx_ref, const MsgType& msg) const;
void Publish(const MsgType& msg) const;
};
} // namespace aimrt::channel
Proxy 类型接口可以绑定类型信息和一个默认 Context,功能更齐全一些。但两种风格接口的基本使用效果是一致的,用户需要两个步骤来实现逻辑层面的消息发布:
Step 1:使用
RegisterPublishType
方法注册消息类型:只能在
Initialize
阶段注册;不允许在一个
PublisherRef
中重复注册同一种类型;如果注册失败,会返回 false;
Step 2:使用
Publish
方法发布数据:只能在
Start
阶段之后发布数据;有两种
Publish
接口,其中一种多一个 Context 参数,用于向后端、下游传递一些额外信息,Context 的详细说明见后续章节;在调用
Publish
接口时,开发者应保证传入的 Context 和 Msg 在Publish
接口返回之前都不会发生变化,否则行为是未定义的;
用户Publish
一个消息后,特定的 Channel 后端将处理具体的消息发布请求。此时根据不同后端的实现,有可能会阻塞一段时间,因此Publish
方法耗费的时间是未定义的。但一般来说,Channel 后端都不会阻塞Publish
方法太久,详细信息请参考对应后端的文档。
Subscribe
与发布接口一样,AimRT 提供了函数风格和Proxy风格两种风格类型的接口来订阅一个消息,同时还提供了智能指针形式和协程形式两种回调函数:
函数风格接口:
// Callback accept a CTX and a smart pointer as parameters
template <MsgType>
bool Subscribe(
SubscriberRef subscriber,
std::function<void(ContextRef, const std::shared_ptr<const MsgType>&)>&& callback);
// Callback accept a pointer as a parameter
template <MsgType>
bool Subscribe(
SubscriberRef subscriber,
std::function<void(const std::shared_ptr<const MsgType>&)>&& callback);
// Coroutine callback, accept a CTX and a const reference to message as parameters
template <MsgType>
bool SubscribeCo(
SubscriberRef subscriber,
std::function<co::Task<void>(ContextRef, const MsgType&)>&& callback);
// Coroutine callback, accept a const reference to message as a parameter
template <MsgType>
bool SubscribeCo(
SubscriberRef subscriber,
std::function<co::Task<void>(const MsgType&)>&& callback);
Proxy 类风格接口:
namespace aimrt::channel {
template <typename MsgType>
class SubscriberProxy {
public:
explicit SubscriberProxy(SubscriberRef subscriber);
// Callback accept a CTX and a smart pointer as parameters
bool Subscribe(
std::function<void(ContextRef, const std::shared_ptr<const MsgType>&)>&& callback) const;
// Callback accept a pointer as a parameter
bool Subscribe(
std::function<void(const std::shared_ptr<const MsgType>&)>&& callback) const;
// Coroutine callback, accept a CTX and a const reference to message as parameters
bool SubscribeCo(
std::function<co::Task<void>(ContextRef, const MsgType&)>&& callback) const;
// Coroutine callback, accept a const reference to message as a parameter
bool SubscribeCo(std::function<co::Task<void>(const MsgType&)>&& callback) const;
};
} // namespace aimrt::channel
Proxy 类型接口可以绑定类型信息,功能更齐全一些。但两种风格接口的基本使用效果是一致的,使用 Subscribe 接口时需要注意:
只能在
Initialize
阶段调用订阅接口;不允许在一个
SubscriberRef
中重复订阅同一种类型;如果订阅失败,会返回 false;
可以传入两种回调函数,其中一种多一个 Context 参数,用于向传递一些额外信息,Context 的详细说明见后续章节;
Context 和 Msg 的生命周期:
对于接收智能指针形式 Msg 的回调,Context 和 Msg 的生命周期将持续到 Msg 的智能指针引用计数归零析构时;
对于协程形式的回调,Context 和 Msg 的生命周期将持续到协程退出为止;
此外还需要注意的是,由哪个执行器来执行订阅的回调,这和具体的 Channel 后端实现有关,在运行阶段通过配置才能确定,使用者在编写逻辑代码时不应有任何假设,详细信息请参考对应后端的文档。
最佳实践是:如果回调中的任务非常轻量,比如只是设置一个变量,那就可以直接在回调里处理;但如果回调中的任务比较重,那最好调度到其他专门执行任务的执行器里进行处理。
Context
开发者在发布 Channel 消息时,可以传入一个aimrt::channel::Context
,在订阅 Channel 消息时,也可以选择在回调中接收一个aimrt::channel::ContextRef
。ContextRef
类型是Context
类型的引用,两者包含的接口基本一致,它们最主要的功能是携带一些 Key-Val 数据,用于向下游或 Channel 后端传递特定的信息。
其接口如下所示:
namespace aimrt::channel {
class Context {
public:
bool CheckUsed() const;
void SetUsed();
void Reset();
aimrt_channel_context_type_t GetType() const;
std::string_view GetMetaValue(std::string_view key) const;
void SetMetaValue(std::string_view key, std::string_view val);
std::vector<std::string_view> GetMetaKeys() const;
std::string ToString() const;
};
class ContextRef {
public:
ContextRef(const Context& ctx);
ContextRef(const Context* ctx_ptr);
ContextRef(const std::shared_ptr<Context>& ctx_ptr);
explicit ContextRef(const aimrt_channel_context_base_t* base_ptr);
bool CheckUsed() const;
void SetUsed();
void Reset();
aimrt_channel_context_type_t GetType() const;
std::string_view GetMetaValue(std::string_view key) const;
void SetMetaValue(std::string_view key, std::string_view val);
std::vector<std::string_view> GetMetaKeys() const;
std::string ToString() const;
};
} // namespace aimrt::channel
使用Context
或ContextRef
类型的 Channel ctx 时需要注意:
Channel ctx 分为 Publish 端和 Subscribe 端两种类型,在构造时确定,无法修改,分别用于 Publish 和 Subscribe 场景;
可以使用
SetMetaValue
、GetMetaValue
方法来设置、获取 ctx 中的 Key-Val 值,使用GetMetaKeys
来获取当前所有的 Key 值;
AimRT 在channel_context_base.h文件中定义了一些特殊的 Key,业务使用这些特殊 Key 时应遵循一定的规则,这些特殊的 Key 包括:
AIMRT_CHANNEL_CONTEXT_KEY_SERIALIZATION_TYPE:用于设置消息的序列化类型,必须是注册时 type support 中支持的类型;
AIMRT_CHANNEL_CONTEXT_KEY_BACKEND:用于给 Subscribe 端传递实际处理的后端名称;
在 Publish 端,Context
主要是用于在调用Publish
方法时传入一些特殊的信息给 AimRT 框架和 Channel 后端,其使用时需要注意以下几点:
开发者可以直接构造一个
Context
类型实例,并自行负责其生命周期;只能给
Publish
方法传入 Publish 类型的 ctx;每个
Context
只能用于一次 Publish 过程,在传递给Publish
方法后,状态即会被置为Used
,如果未经Reset
就用于下一次 Publish,消息将不会被正确发布;Publish
方法实际接受的是ContextRef
类型作为参数,但Context
类型可以隐式的转换为ContextRef
类型;开发者可以向 ctx 中设置一些信息传递给具体的 Channel 后端,不同的后端对于 ctx 中的信息会有不同的处理方式,有的会读取其中特定的 Key-Val 值来特化传输行为,有的会将所有 Key-Val 信息透传到下游,具体的处理方式请参考特定 Channel 后端的文档。
在 Subscribe 端,开发者可以选择在回调处理函数中接收ContextRef
类型的参数,其使用时需要注意以下几点:
传递给回调处理函数的 ctx 生命周期由 AimRT 框架管理,与 Msg 的生命周期一致;
传递给回调处理函数的 ctx 是 Subscribe 类型的,并且是
Used
状态;传递给回调处理函数的 ctx 中可能会有一些 Key-Val 信息,具体会传递哪些信息则由 Channel 后端决定,请参考特定 Channel 后端的文档。
此外,一般来说在一个复杂业务系统中,一些订阅者会在收到消息后发布新的消息到更下游,会存在很多条逻辑层面上的长链路。如果要在框架层面打通这条逻辑上的链路,来实现一些监控、调度上的功能,就需要将 Subscribe 类型的 ctx 中的特定信息同步到 Publish 类型的 ctx 中,有两种方式:
可以使用
PublisherRef
或ChannelHandleRef
类型提供的MergeSubscribeContextToPublishContext
方法,例如:
aimrt::channel::PublisherRef publisher;
// Subscribe callback
void EventHandle(ContextRef subscribe_ctx, const std::shared_ptr<const FooMsg>& msg) {
BarMsg new_msg;
Context publishe_ctx;
publisher.MergeSubscribeContextToPublishContext(subscribe_ctx, publishe_ctx);
aimrt::channel::Publish(publisher, publishe_ctx, new_msg);
}
可以使用
aimrt::channel::PublisherProxy
的NewContextSharedPtr
方法,将 Subscribe 类型的 ctx 作为参数传递给该方法,例如:
aimrt::channel::PublisherProxy<BarMsg> publisher_proxy;
// Subscribe callback
void EventHandle(ContextRef subscribe_ctx, const std::shared_ptr<const FooMsg>& msg) {
BarMsg new_msg;
auto publishe_ctx = publisher_proxy.NewContextSharedPtr(subscribe_ctx);
publisher_proxy.Publish(publishe_ctx, new_msg);
}
使用示例
以下是一个简单的发布消息的示例,基于 proxy 风格接口:
#include "event.pb.h"
class NormalPublisherModule : public aimrt::ModuleBase {
public:
bool Initialize(aimrt::CoreRef core) override {
core_ = core;
// Register publish type
std::string topic_name = "test_topic";
publisher_ = core_.GetChannelHandle().GetPublisher(topic_name);
aimrt::channel::RegisterPublishType<ExampleEventMsg>(publisher_);
return true;
}
bool Start() override {
// create publish proxy
aimrt::channel::PublisherProxy<ExampleEventMsg> publisher_proxy(publisher_);
// set msg
ExampleEventMsg msg;
msg.set_msg("hello world");
// publish msg
publisher_proxy.Publish(msg);
}
// ...
private:
aimrt::CoreRef core_;
aimrt::channel::PublisherRef publisher_;
};
以下是一个简单的订阅消息的示例:
#include "event.pb.h"
class NormalSubscriberModule : public aimrt::ModuleBase {
public:
bool Initialize(aimrt::CoreRef core) override {
// Subscribe
std::string topic_name = "test_topic";
auto subscriber = core_.GetChannelHandle().GetSubscriber(topic_name);
aimrt::channel::Subscribe<ExampleEventMsg>(
subscriber,
[](const std::shared_ptr<const ExampleEventMsg>& data) {
// Handle msg ...
});
}
// ...
};