Channel

相关链接

参考示例:

protobuf 协议

协议用于确定通信各端的消息格式。一般来说,协议都是使用一种与具体的编程语言无关的 IDL ( Interface description language )描述,然后由某种工具转换为各个语言的代码。

Protobuf是一种由 Google 开发的、用于序列化结构化数据的轻量级、高效的数据交换格式,是一种广泛使用的 IDL。

当前版本 AimRT Python 只支持 protobuf 协议。在使用 AimRT Python 发送/订阅消息之前,使用者需要先基于 protobuf 协议生成一些 python 的桩代码。

在使用时,开发者需要先定义一个.proto文件,在其中定义一个消息结构。例如example.proto

syntax = "proto3";

message ExampleMsg {
  string msg = 1;
  int32 num = 2;
}

然后使用 Protobuf 官方提供的 protoc 工具进行转换,生成 Python 代码,例如:

protoc --python_out=. example.proto

这将生成example_pb2.py文件,包含了根据定义的消息类型生成的 Python 接口,我们的业务代码中需要 import 此文件。

ChannelHandle

模块可以通过调用CoreRef句柄的GetChannelHandle()接口,获取ChannelHandleRef句柄,来使用 Channel 功能。其提供的核心接口如下:

  • GetPublisher(str)->PublisherRef

  • GetSubscriber(str)->SubscriberRef

开发者可以调用ChannelHandleRef中的GetPublisher方法和GetSubscriber方法,获取指定 Topic 名称的PublisherRefSubscriberRef类型句柄,分别用于 Channel 发布和订阅。这两个方法使用注意如下:

  • 这两个接口是线程安全的。

  • 这两个接口可以在Initialize阶段和Start阶段使用。

Publish

用户如果需要发布一个 Msg,牵涉的接口主要有以下两个:

  • aimrt_py.RegisterPublishType(publisher, msg_type)->bool : 用于注册此消息类型;

    • 第一个参数publisher是一个PublisherRef句柄,代表某个 Topic;

    • 第二个参数msg_type是一个Protobuf类型;

    • 返回值是一个 bool 值,表示注册是否成功;

  • aimrt_py.Publish(publisher, msg, ctx | serialization_type) : 用于发布消息;

    • 第一个参数publisher是一个PublisherRef句柄,代表某个 Topic;

    • 第二个参数msg是一个Protobuf类型实例,需要与注册时的消息类型对应;

    • 第三个参数可以传入 Context类型实例或者ContextRef句柄或者 serialization_type字符串,用于指定消息的上下文或者序列化类型,serialization_type 仅可指定 pbjsonctxserialization_type均可为空,当为空时,默认使用 pb 序列化;

    • 该函数同时有以下重载:

      • aimrt_py.Publish(publisher, ctx | serialization_type, msg)

用户需要两个步骤来实现逻辑层面的消息发布:

  • Step 1:使用aimrt_py.RegisterPublishType方法注册协议类型;

    • 只能在Initialize阶段注册;

    • 不允许在一个PublisherRef中重复注册同一种类型;

    • 如果注册失败,会返回 false;

  • Step 2:使用aimrt_py.Publish方法发布数据;

    • 只能在Start阶段之后发布数据;

    • 在调用Publish接口时,开发者应保证传入的 Msg 在Publish接口返回之前都不会发生变化,否则行为是未定义的;

用户Publish一个消息后,特定的 Channel 后端将处理具体的消息发布请求。此时根据不同后端的实现,有可能会阻塞一段时间,因此Publish方法耗费的时间是未定义的。但一般来说,Channel 后端都不会阻塞Publish方法太久,详细信息请参考对应后端的文档。

Subscribe

用户如果需要订阅一个 Msg,需要使用以下接口:

  • aimrt_py.Subscribe(subscriber, msg_type, handle)->bool : 用于订阅一种消息;

    • 第一个参数subscriber是一个SubscriberRef句柄,代表某个 Topic;

    • 第二个参数msg_type是一个Protobuf类型;

    • 第三个参数handle是一个签名为(msg)->void或者(ctx_ref, msg)->void的消息处理回调,msg类型是订阅时传入的msg_type类型, ctx_ref是消息的上下文句柄;

    • 返回值是一个 bool 值,表示注册是否成功;

注意:

  • 只能在Initialize调用订阅接口;

  • 不允许在一个SubscriberRef中重复订阅同一种类型;

  • 如果订阅失败,会返回 false;

此外还需要注意的是,由哪个执行器来执行订阅的回调,这和具体的 Channel 后端实现有关,在运行阶段通过配置才能确定,使用者在编写逻辑代码时不应有任何假设。详细信息请参考对应后端的文档。

最佳实践是:如果回调中的任务非常轻量,比如只是设置一个变量,那就可以直接在回调里处理;但如果回调中的任务比较重,那最好调度到其他专门执行任务的执行器里进行处理。

Context

Context是 AimRT 中用于传递上下文信息的数据结构,其所支持的接口如下:

  • Reset()->void : 重置上下文,重置后上下文可以被再次使用;

  • CheckUsed()->bool : 检查上下文是否被使用;

  • SetUsed()->void : 设置上下文为已使用;

  • GetType()->aimrt_channel_context_type_t : 获取上下文类型;

  • SetMetaValue(key: str, value: str)->void : 设置元数据;

  • GetMetaValue(key: str)->str : 获取元数据;

  • GetMetaKeys()->List[str] : 获取所有元数据键值对中的键列表;

  • SetSerializationType(serialization_type: str)->void : 设置序列化类型;

  • GetSerializationType()->str : 获取序列化类型;

  • ToString()->str : 获取上下文信息,以字符串形式返回可读性高的信息;

ContextRefContext的引用类型,除不具备Reset接口外,其他接口与Context完全相同。

aimrt_channel_context_type_t 是一个枚举类型,定义了上下文类型,具体值为AIMRT_CHANNEL_PUBLISHER_CONTEXTAIMRT_CHANNEL_SUBSCRIBER_CONTEXT,表明这是发布者还是订阅者的上下文。

使用示例

以下是一个使用 AimRT Python 进行 Publish 的示例,通过 Create Module 方式拿到CoreRef句柄。如果是基于Module模式在Initialize方法中拿到CoreRef句柄,使用方式也类似:

import aimrt_py
import threading

from google.protobuf.json_format import MessageToJson
import event_pb2

def main():
    aimrt_core = aimrt_py.Core()

    # Initialize
    core_options = aimrt_py.CoreOptions()
    core_options.cfg_file_path = "path/to/cfg/xxx_cfg.yaml"
    aimrt_core.Initialize(core_options)

    # Create Module
    module_handle = aimrt_core.CreateModule("NormalPublisherPyModule")

    # Register publish type
    topic_name = "test_topic"
    publisher = module_handle.GetChannelHandle().GetPublisher(topic_name)
    assert publisher, f"Get publisher for topic '{topic_name}' failed."

    aimrt_py.RegisterPublishType(publisher, event_pb2.ExampleEventMsg)

    # Start
    thread = threading.Thread(target=aimrt_core.Start)
    thread.start()

    # Sleep for seconds
    time.sleep(1)

    # Publish event
    event_msg = event_pb2.ExampleEventMsg()
    event_msg.msg = "Publish without ctx or serialization_type"
    event_msg.num = 1
    aimrt_py.Publish(publisher, event_msg)
    aimrt_py.info(module_handle.GetLogger(),
                  f"Publish new pb event, data: {MessageToJson(event_msg)}")

    # Publish event with json serialization
    event_msg.msg = "Publish with json serialization"
    event_msg.num = 2
    aimrt_py.Publish(publisher, "json", event_msg)
    aimrt_py.info(module_handle.GetLogger(),
                  f"Publish new pb event, data: {MessageToJson(event_msg)}")

    # Publish event with context
    ctx = aimrt_py.Context()
    ctx.SetMetaValue("key1", "value1")
    event_msg.msg = "Publish with context"
    event_msg.num = 3
    aimrt_py.Publish(publisher, ctx, event_msg)
    aimrt_py.info(module_handle.GetLogger(),
                  f"Publish new pb event, data: {MessageToJson(event_msg)}")

    # Publish event with context ref
    ctx.Reset()  # Reset context, then it can be used again
    ctx_ref = aimrt_py.ContextRef(ctx)
    ctx_ref.SetMetaValue("key2", "value2")
    ctx_ref.SetSerializationType("json")
    event_msg.msg = "Publish with context ref"
    event_msg.num = 4
    aimrt_py.Publish(publisher, ctx_ref, event_msg)
    aimrt_py.info(module_handle.GetLogger(),
                  f"Publish new pb event, data: {MessageToJson(event_msg)}")

    # Sleep for seconds
    time.sleep(1)

    # Shutdown
    aimrt_core.Shutdown()

    thread.join()

if __name__ == '__main__':
    main()

以下是一个使用 AimRT Python 进行 Subscribe 的示例,通过 Create Module 方式拿到CoreRef句柄。如果是基于Module模式在Initialize方法中拿到CoreRef句柄,使用方式也类似:

import aimrt_py
import threading

from google.protobuf.json_format import MessageToJson
import event_pb2

global_aimrt_core = None


def signal_handler(sig, frame):
    global global_aimrt_core

    if (global_aimrt_core and (sig == signal.SIGINT or sig == signal.SIGTERM)):
        global_aimrt_core.Shutdown()
        return

    sys.exit(0)


def main():
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    aimrt_core = aimrt_py.Core()

    global global_aimrt_core
    global_aimrt_core = aimrt_core

    # Initialize
    core_options = aimrt_py.CoreOptions()
    core_options.cfg_file_path = "path/to/cfg/xxx_cfg.yaml"
    aimrt_core.Initialize(core_options)

    # Create Module
    module_handle = aimrt_core.CreateModule("NormalSubscriberPyModule")

    # Subscribe
    topic_name = "test_topic"
    subscriber = module_handle.GetChannelHandle().GetSubscriber(topic_name)
    assert subscriber, f"Get subscriber for topic '{topic_name}' failed."

    def EventHandle(ctx_ref, msg):
        aimrt_py.info(module_handle.GetLogger(),
                      f"Get new pb event, ctx: {ctx_ref.ToString()}, data: {MessageToJson(msg)}")

    aimrt_py.Subscribe(subscriber, event_pb2.ExampleEventMsg, EventHandle)

    # Start
    thread = threading.Thread(target=aimrt_core.Start)
    thread.start()

    while thread.is_alive():
        thread.join(1.0)


if __name__ == '__main__':
    main()