Zenoh 插件

相关链接

参考示例:

插件概述

zenoh_plugin 是一个轻量级的、高效的、实时的数据传输插件,它旨在为分布式系统提供低延迟、高吞吐量的数据传输和处理能力。当需要以下业务场景的时候,zenoh 插件将非常合适:

  • 服务发现机制的通信系统;

  • 灵活的网络拓扑结构;

  • 低延迟、高吞吐量的网络通信和数据传输;

  • SHM 和 非 SHM 两种传输模式;

此插件为 AimRT 提供以下组件:

  • zenoh 类型 Rpc 后端

  • zenoh 类型 Channel 后端

插件的配置项如下:

节点

类型

是否可选

默认值

作用

shm_pool_size

int

可选

10 MB

共享内存池的大小 , 单位:B

shm_init_loan_size

int

可选

1 KB

共享内存初始借用大小 , 单位:B

native_cfg_path

string

可选

“”

使用zenoh提供的原生配置文件

limit_domain

string

可选

“”

对插件的通信域进行限制

关于zenoh_plugin的配置,使用注意点如下:

  • shm_pool_size 表示共享内存池的大小,单位:B,默认值是 10 MB,可以根据实际情况调整, 如果不使用共享内存,可忽略该配置项。 如果剩余共享内存不足以满足数据传输需求,则会自动切换到非共享内存传输模式 。

  • shm_init_loan_size 表示向共享内存池初始借用大小,单位:B,默认值是 1 KB,可以根据实际情况调整, 如果不使用共享内存,可忽略该配置项。

  • native_cfg_path 表示 zenoh 提供的原生配置文件的路径,可以通过配置该文件来灵活配置 zenoh 的网络结构。如果不填写则默认使用 zenoh 官方提供的默认配置,具体配置内容请参考 zenoh 官方关于configuration的说明,您也可以直接修改 zenoh_native_config.json5文件来自定义配置,这里列举几个常用的配置项:

配置项

作用

zenoh_native_config中的配置值

scouting.multicast. enabled

是否开启多播,这将允许多个 zenoh 节点自动发现彼此

true

scouting.multicast. address

配置多播地址

224.0.0.224:7446

scouting.multicast. interface

配置所用的网络接口

auto

listen.endpoints

需要主动监听的地址

-

transport.unicast.lowlatency

是否启动最低时延,若开启将有助于提升传输速率,注意其不能与qos同时开启

false

transport.unicast.qos.enabled

是否启动服务质量,注意其不能与lowlatency同时开启

true

  • limit_domain 表示插件的通信域,其兼容 zenoh 强大的Key & Key Expression。如果不填写则使用插件的默认的通信域(即消息的 topic ),只有相匹配的域才能进行通信,具体的书写格式如下:

#请不要以"/"开始,中间以"/"分隔,结尾不要带"/" (与zenoh官方书写方式一致),如:

xxx/yyy/zzz

最简单的一种匹配就是二者域相同,除此之外,zenoh 官方提供了更灵活的匹配机制,具体可查阅 zenoh 官方关于key的解释。

以下是一个简单的示例:

aimrt:
  plugin:
    plugins:
      - name: zenoh_plugin
        path: ./libaimrt_zenoh_plugin.so 
        options: 
          native_cfg_path: ./cfg/zenoh_native_config.json5       

zenoh 类型 Rpc 后端

zenoh类型的 Rpc后端是zenoh_plugin中提供的一种 Rpc 后端,主要用来构建请求-响应模型。其所有的配置项如下:

节点

类型

是否可选

默认值

作用

timeout_executor

string

可选

“”

Client 端发起 RPC 超时情况下的执行器

clients_options

array

可选

[]

Client 端发起 RPC 请求时的规则

clients_options[i].func_name

string

必选

“”

RPC Func 名称,支持正则表达式

clients_options[i].shm_enabled

bool

必选

false

RPC Func 是否使用共享内存通信

servers_options

array

可选

[]

服务端处理 RPC 请求时的规则

servers_options[i].func_name

string

必选

“”

RPC Func 名称,支持正则表达式

servers_options[i].shm_enabled

bool

必选

false

RPC Func 是否使用共享内存通信

注意: zenoh 支持 SHM 和 非 SHM 的自动转换, 即如果数据离开其所在的SHM 域,则自动切换到非 SHM 通信。 例如,如果节点 A 和 节点 B 都设置的共享内存,但其不再同一机器上,仍可以进行通信,因为数据会自动切换到非共享内存的传输模式。

以下是一个简单的客户端的示例:

aimrt:
  plugin:
    plugins:
      - name: zenoh_plugin
        path: ./libaimrt_zenoh_plugin.so
        options: 
          native_cfg_path: ./cfg/zenoh_native_config.json5
          shm_pool_size: 10240
  executor:
    executors:
      - name: timeout_handle
        type: time_wheel
        options:
          bind_executor: work_thread_pool
  rpc:
    backends:
      - type: zenoh
        options: 
          timeout_executor: timeout_handle
          clients_options: 
            - func_name: "(.*)"
              shm_enabled: false
    clients_options:
      - func_name: "(.*)"
        enable_backends: [zenoh]

以下是一个简单的服务端的示例:

aimrt:
  plugin:
    plugins:
      - name: zenoh_plugin
        path: ./libaimrt_zenoh_plugin.so
        options:
          native_cfg_path: ./cfg/zenoh_native_config.json5                 
  rpc:
    backends:
      - type: zenoh
        options: 
          timeout_executor: timeout_handle
          servers_options: 
            - func_name: "(.*)"
              shm_enabled: true
    servers_options:
      - func_name: "(.*)"
        enable_backends: [zenoh]

以上示例中,Client 端和 Server 端都采用服务发现机制,即在统一网络中的两个端点可自动发现彼此并建立连接。

在整个 RPC 过程中,底层使用的 Zenoh Topic 名称格式如下:

  • Server 端

    • 订阅 Req 使用的 topic:

      • req/aimrt_rpc/${func_name}/${limit_domain}>

    • 发布 Rsp 使用的 topic:rsp/aimrt_rpc/${func_name}/${limit_domain}

  • Client 端

    • 发布 Req 使用的 topic:

      • req/aimrt_rpc/${func_name}/ ${limit_domain}

    • 订阅 Rsp 使用的 topic:rsp_/imrt_rpc/${func_name}/${limit_domain}

${func_name}是 url 编码后的 AimRT RPC 方法名称。

例如,对于一个 client 的请求来说,若 func 名称为/aimrt.protocols.example.ExampleService/GetBarData, limit_domain 没有配置,则最终的topic名称为:req/aimrt_rpc/%2Faimrt.protocols.example.ExampleService%2FGetBarData`。

Client -> Server 的 Zenoh 数据包格式整体分 5 段:

  • 序列化类型,一般是pbjson

  • client 端想要 server 端回复 rsp 的 zenoh topic 名称。client 端自己需要订阅这个 zenoh topic

  • msg id,4 字节,server 端会原封不动的封装到 rsp 包里,供 client 端定位 rsp 对应哪个 req

  • context 区

    • context 数量,1 字节,最大 255 个 context

    • context_1 key, 2 字节长度 + 数据区

    • context_2 key, 2 字节长度 + 数据区

  • msg 数据

| n(0~255) [1 byte] | content type [n byte]
| m(0~255) [1 byte] | rsp topic name [m byte]
| msg id [4 byte]
| context num [1 byte]
| context_1 key size [2 byte] | context_1 key data [key_1_size byte]
| context_1 val size [2 byte] | context_1 val data [val_1_size byte]
| context_2 key size [2 byte] | context_2 key data [key_2_size byte]
| context_2 val size [2 byte] | context_2 val data [val_2_size byte]
| ...
| msg data [remaining byte]

Server -> Client 的 Zenoh 数据包格式整体分 4 段:

  • 序列化类型,一般是pbjson

  • msg id,4 字节,req 中的 msg id

  • status code,4 字节,框架错误码,如果这个部分不为零,则代表服务端发生了错误,数据段将没有内容

  • msg 数据

| n(0~255) [1 byte] | content type [n byte]
| msg id [4 byte]
| status code [4 byte]
| msg data [remaining byte]

zenoh 类型 Channel 后端

zenoh类型的 Channel 后端是zenoh_plugin中提供的一种Channel后端,主要用来构建发布和订阅模型。其所有的配置项如下:

节点

类型

是否可选

默认值

作用

pub_topics_options

array

可选

[]

发布 Topic 时的规则

pub_topics_options[i].topic_name

string

必选

“”

Topic 名称,支持正则表达式

pub_topics_options[i].shm_enabled

bool

必选

false

Publish 端 是否使用共享内存通信

注意: zenoh 支持 SHM 和 非 SHM 的自动转换, 即如果数据离开其所在的SHM 域,则自动切换到非 SHM 通信。 例如,如果节点 A 和 节点 B 都设置的共享内存,但其不再同一机器上,仍可以进行通信,因为数据会自动切换到非共享内存的传输模式。

以下是一个简单的发布端的示例:

aimrt:
  plugin:
    plugins:
      - name: zenoh_plugin
        path: ./libaimrt_zenoh_plugin.so
        options: 
          shm_pool_size: 1024      
  channel:
    backends:
      - type: zenoh
        options:
          pub_topics_options: 
            - topic_name: "(.*)"
              shm_enabled: false
    pub_topics_options:
      - topic_name: "(.*)" 
        enable_backends: [zenoh]

以下是一个简单的订阅端的示例:

aimrt:
  plugin:
    plugins:
      - name: zenoh_plugin
        path: ./libaimrt_zenoh_plugin.so
        options: 
          native_cfg_path: ./cfg/zenoh_native_config.json5      
channel:
    backends:
      - type: zenoh
    sub_topics_options:
      - topic_name: "(.*)"
        enable_backends: [zenoh]

以上示例中都使用 zeonh 的服务发现机制,即在统一网络中的两个端点可自动发现彼此并建立连接。

在这个过程中,底层使用的 Topic 名称格式为:channel/${topic_name}/${message_type}${limit_domain}。其中,${topic_name}为 AimRT 的 Topic 名称,${message_type}为 url 编码后的 AimRT 消息名称, ${limit_domain}为插件的限制域。这个 Topic 被设置成为 Zenoh 最终的键表达式(Keyxpr),这是 Zenoh 的提供的资源标识符,只有键表达式匹配的订阅者和发布者才能够进行通信。

例如,AimRT Topic 名称为test_topic,消息类型为pb:aimrt.protocols.example.ExampleEventMsg,限制域为room1/A2,则最终 Zenoh 的 topic 名称为:channel/test_topic/pb%3Aaimrt.protocols.example.ExampleEventMsgroom1/A2。如果订阅者和发布者的 Topic 均为channel/test_topic/pb%3Aaimrt.protocols.example.ExampleEventMsgroom1/A2,则二者可以进行通信。

在AimRT发布端发布数据到订阅端这个链路上,Zenoh 数据包格式整体分 3 段:

  • 序列化类型,一般是pbjson

  • context 区

    • context 数量,1 字节,最大 255 个 context

    • context_1 key, 2 字节长度 + 数据区

    • context_2 key, 2 字节长度 + 数据区

  • 数据

数据包格式如下:

| n(0~255) [1 byte] | content type [n byte]
| context num [1 byte]
| context_1 key size [2 byte] | context_1 key data [key_1_size byte]
| context_1 val size [2 byte] | context_1 val data [val_1_size byte]
| context_2 key size [2 byte] | context_2 key data [key_2_size byte]
| context_2 val size [2 byte] | context_2 val data [val_2_size byte]
| ...
| msg data [len - 1 - n byte] |