规则事件
大约 4 分钟
一、规则事件
动作是MQTT消息与外部数据系统的连接匹配规则,包含数据处理,映射,MQTT协议设备连接MQTT服务实施传递消息,其中动作为数据集成到外部数据系统提供了关联,从而实现设备与其他业务系统的无缝集成。
规则事件
规则引擎可以消息发布,也可以处理客户端上下线、客户端订阅等事件。对于消息,对于事件,是放在SQL语句中FROM 子句后面跟事件主题。
下面对多有客户端事件说明
连接建立事件
事件标识: $events/client_connected
客户端连接MQTT服务成功触发规则
可选字段列表
| 字段名 | 字段说明 |
|---|---|
| clientId | 客户端id |
| proto_name | 协议类型名称 |
| ip_address | 客户端IP |
| node | 节点IP |
| port | 客户端端口 |
| ts | 事件触发时间 |
| keepalive | 心跳间隔 |
| clean_start | 是否清除会话 |
| status | 是否在线 |
SQL规则示例
select
*
from
"$events/client_connected"数据输出JSON格式示例
{
"proto_name": "MQTT",
"node": "127.0.0.1",
"clientId": "S&testD664Q81G8092&41&1",
"ip_address": "127.0.0.1",
"port": 52865,
"ts": 1735455370388,
"keepalive": 60,
"clean_start": true,
"status": true
}字段可选示例
select
clientId,
status,
keepalive
from
"$events/client_connected"数据输出如下
{
"clientId": "S&testD664Q81G8092&41&1",
"keepalive": 60,
"status": true
}连接断开事件
事件标识 : $events/client_disconnected
客户端连接断开时触发规则。
可选字段列表
| 字段 | 解释 |
|---|---|
| offline_reason | 客户端连接断开原因: normal:客户端主动断开 kicked:服务端踢出 keepalive_timeout:心跳超时 not_authorized:认证失败 tcp_closed:对端关闭了网络连接 discarded: 另一个客户端使用相同的 clientId 连接 |
| clientId | 客户端id |
| status | 设备状态 |
| port | 客户端端口 |
| ip_address | 客户端ip |
| ts | 事件触发时间 |
| node | 节点IP |
SQL规则示例
select
*
from
"$events/client_disconnected"数据输出示例
{
"proto_name": "MQTT",
"node": "127.0.0.1",
"clientId": "S&testD664Q81G8092&41&1",
"ip_address": "127.0.0.1",
"port": 0,
"ts": 1735456294357,
"offline_reason": "kicked",
"status": false
}发布消息事件
事件标识: $events/message_publish
可选字段
| 字段 | 解释 |
|---|---|
| proto_name | 协议名称 |
| clientId | 客户端id |
| id | 消息id |
| topic | 主题 |
| peerhost | 客户端ip |
| payload | MQTT消息体 |
| node | 节点IP |
| qos | 消息质量 |
| port | 客户端端口 |
| datatype | 数据类型,json/hex/base64/cbor |
| publish_received_at | 消息到达MQTT服务时间 |
| ts | 事件触发时间 |
SQL规则示例
select
*
from
"$events/message_publish"数据输出示例
{
"proto_name" : "MQTT",
"ts" : 1735486854867,
"clientId" : "S&testD664Q81G8092&41&1",
"id" : "1873393767773118464",
"topic" : "/D1832O34Z6M9/property/post",
"qos" : 0,
"payload" : [ 123, 13, 10, 32, 32, 34, 109, 115, 103, 34, 58, 32, 34, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 34, 13, 10, 125 ],
"peerhost" : "127.0.0.1",
"clientPort" : 65385,
"node" : "127.0.0.1",
"publish_received_at" : 1735486854868,
"datatype" : "json"
}客户端订阅成功事件
事件标识:$events/session_subscribed
当客户端订阅成功时触发规则。
可选字段
| 字段 | 解释 |
|---|---|
| clientId | 消息目的 Client ID |
| proto_name | 协议名称 |
| peerhost | 客户端的 IPAddress |
| topic | MQTT 主题 |
| qos | MQTT事件 subscribe |
| event | SUBSCRIBE Properties (仅适用于 5.0) |
| timestamp | 事件触发时间 (单位:毫秒) |
| node | 事件触发所在节点 |
SQL规则示例
select
*
from
"$events/session_subscribed"数据输出示例
{
"node": "127.0.0.1",
"clientId": "860061061621885",
"peerhost": "127.0.0.1",
"qos": 0,
"topic": "/860061061621test/property/post",
"proto_name": "MQTT",
"event": "subscribe",
"timestamp": 1736217855399
}客户端取消订阅成功事件
事件标识:$events/session_unsubscribed
当客户端取消订阅成功时触发事件
可选字段
| 字段 | 解释 |
|---|---|
| clientId | 消息目的 Client ID |
| proto_name | 协议名称 |
| peerhost | 客户端的 IPAddress |
| topic | MQTT 主题 |
| ts | 事件触发时间 (单位:毫秒) |
| event | SUBSCRIBE Properties (仅适用于 5.0) |
| node | 事件触发所在节点 |
SQL规则示例
select
*
from
"$events/session_unsubscribed"数据输出示例
{
"node": "127.0.0.1",
"clientId": "860061061621885",
"peerhost": "127.0.0.1",
"topic": "/860061061621test/property/post",
"proto_name": "MQTT",
"event": "unSubscribe",
"ts": 1736217858065
}消息应答事件
事件标识:$events/message_acked
当消息发送到客户端,并收到客户端回复的ack时触发规则,仅QOS1,QOS2会触发。
| 字段 | 解释 |
|---|---|
| id | MQTT 消息 ID |
| from_clientid | 消息来源 Client ID |
| from_username | 消息来源用户名 |
| clientId | 消息目的 Client ID |
| username | 消息目的用户名 |
| payload | MQTT 消息体 |
| peerhost | 客户端的 IPAddress |
| topic | MQTT 主题 |
| qos | MQTT 消息的 QoS |
| node | 事件触发所在节点 |
| timestamp | 事件触发时间 (单位:毫秒) |
SQL规则示例
SELECT
from_clientid,
from_username,
topic,
qos,
node,
timestamp
FROM
"$events/message_acked"数据输出示例
{
"topic": "t/a",
"timestamp": 1645002965664,
"qos": 1,
"node": "127.0.0.1",
"from_username": "fastbee",
"from_clientid": "fastbee"
}会话保持事件
事件标识:$events/session_ping
设备心跳保持时触发
可选字段
| 字段 | 解释 |
|---|---|
| clientId | 消息目的 Client ID |
| proto_name | 协议名称 |
| port | 客户端端口 |
| node | 事件触发所在节点 |
| ts | 事件触发时间 (单位:毫秒) |
| ip_address | 客户端ip |
SQL规则示例
SELECT
*
FROM
"$events/session_ping"数据输出示例
{
"proto_name": "MQTT",
"node": "127.0.0.1",
"clientId": "860061061621885",
"ip_address": "127.0.0.1",
"port": 51182,
"ts": 1736219438405
}消息已投递事件
事件标识:$events/client_delivered
当消息被放入底层socket时触发规则。
可选字段
| 字段 | 解释 |
|---|---|
| id | MQTT 消息 ID |
| from_clientid | 消息来源 Client ID |
| from_username | 消息来源用户名 |
| clientId | 消息目的 Client ID |
| username | 消息目的用户名 |
| payload | MQTT 消息体 |
| peerhost | 客户端的 IPAddress |
| topic | MQTT 主题 |
| qos | MQTT 消息的 QoS |
| node | 事件触发所在节点 |
| timestamp | 事件触发时间 (单位:毫秒) |
SQL规则示例
SELECT
from_clientid,
from_username,
topic,
qos,
node,
timestamp
FROM
"$events/client_delivered"数据输出示例
{
"topic": "t/a",
"timestamp": 1645002965664,
"qos": 1,
"node": "127.0.0.1",
"from_username": "fastbee",
"from_clientid": "fastbee"
}