数据桥接
一、数据桥接简介
数据桥接作为连接 Fastbee 与外部数据系统的桥梁,支持对接多种类型的外部系统,如 MQTT 服务器、 HTTP 服务器,以及MySQL、SQLServer、Oracle、PostgreSQL等数据库。 通过数据桥接,用户能够实时地将消息从 Fastbee 传输至外部数据系统,或者从外部数据系统拉取数据并发布到 MQTT 服务器的指定主题。
结合规则脚本的编写,用户无需额外编写代码,即可快速完成所有业务数据的处理与分发。用户可以选择通过指定的 HTTP 接口接入数据,或者订阅 MQTT 主题来接收数据。 在数据经过规则处理后,再通过数据桥接将数据发送到外部应用系统中。
桥接输入侧支持:
- 基于产品维度的设备上下线,设备上报,平台下发等事件触发
- 基于数据流维度:HTTP 接入,MQTT 主题订阅方式接入数据
桥接输出侧支持:
- 内置MQTT重发布(默认)
- 外部MQTT服务器主题发布输出
- 第三方 HTTP OpenAPI 接口输出
- MySQL、SQLServer、Oracle、PostgreSQL 等数据库的SQL插入操作
注意
注意:输入和输出可同时使用,更加灵活的转发数据和处理数据流
二、桥接输入侧
1. HTTP接入
目前开放三个固定的api接口来接入数据,分别是:“/bridge/get”,“/bridge/post”,“/bridge/put” 目前使用的认证方式的jwt token认证,用户通过系统工具-系统授权来创建生成token,然后再请求时携带认证头信息即可完成接入。
数据桥接--新增
桥接方向选择输入--点击Http桥接--选择接入路由
规则脚本--新增
脚本数据选择HTTP接入--点击选择HTTP桥接接入点--修改脚本内容即可1.1.新建http桥接配置: 规则引擎-->数据桥接-->新增

1.2.创建http输入脚本: 规则引擎-->规则脚本-->新增
// /bridge/get可以传递路径参数和header配置
// /bridge/post可以传递路径参数和header配置,body
// /bridge/put可以传递路径参数和header配置,body
// 获取指定路径参数
String paramStr = msgContext.getData("params");
JSONObject params = JSONUtil.parseObj(paramStr);
Object value = params.get("key");
// 获取指定消息头配置
String headersStr = msgContext.getData("headers");
JSONObject headers = JSONUtil.parseObj(headersStr);
Object value = headers.get("key");
// 获取body内容
String payload = msgContext.getPayload();
JSONObject body = JSONUtil.parseObj(payload);
Object value = body.get("key");

1.3.创建http接入token: 系统工具->系统授权-->新增

1.4.使用接口工具访问接口,填写参数和认证token,
- 后端接口地址为:http://localhost:8080/bridge/get 或者 http://localhost/prod-api/bridge/get



2. MQTT接入
// 接入端为本系统或者第三方系统都可以
数据桥接--新增
桥接方向选择输入--点击Mqtt桥接--输入服务器配置和桥接主题即可
规则脚本--新增
脚本数据选择MQTT接入--点击选择Mqtt桥接接入点--修改脚本内容即可2.1.新建mqtt桥接配置: 规则引擎-->数据桥接-->新增

2.2.创建mqtt输入脚本: 规则引擎-->规则脚本-->新增
// 获取接入主题
String topic = msgContext.getTopic();
// 获取接入消息
String payload = msgContext.getPayload();
String NewTopic = "/test";
String NewPayload = payload;
// 设置发布主题和消息内容
msgContext.setTopic(NewTopic);
msgContext.setPayload(NewPayload);

2.3.使用客户端工具往桥接主题发送消息,可以查到规则日志


三、桥接输出侧
1. 内置MQTT重发布
数据接入后,可以经过脚本处理,修改重新发布的主题,数据格式,数据计算方式,然后重新发布到内部MQTT服务器,重新交到业务系统处理相关数据。
2. 外部MQTT服务器主题发布输出
2.1. 新建mqtt桥接配置: 规则引擎-->数据桥接-->新增

桥接配置参数说明:
桥接名称:桥接配置名称(一般无限制)
桥接方向:输出
MQTT服务地址:要桥接的地址(地址+端口),例:fastbee.cn:1883 / 101.33.237.12:1883
客户端ID:连接mqtt的clientId,例:S&D1LN0D9DN5X6&205&1
用户名:连接mqtt的用户名
密码:连接mqtt的密码
桥接路由:默认使用该配置发布主题。当桥接路由可为空,脚本中才可以通过msgContext.setTopic(Topic)传递发布主题注意
占位符支持情况: 桥接在执行前会通过上下文对象中的dataMap来替换占位符。 例如: msgContext.setData("key",value)可以定义要替换的占位符,这个是替换${key}占位符, 设置msgContext.setTopic("${productId}/${serialNumber}/property/post"), 执行前会把datamap中 key为productId的值替换掉${productId},key为serialNumber的值替换掉${serialNumber}。
发布内容payload也支持这种占位符
新增配置之后,可以点击连接测试按钮创建mqtt客户端,根据状态判断配置是否有效

2.2. 创建mqtt输出脚本: 规则引擎-->规则脚本-->新增
// 脚本中可以修改发布的主题和内容
String NewTopic = "/test";
String NewPayload = payload;
// 设置发布主题和消息内容
msgContext.setTopic(NewTopic); // 当桥接配置中输出主题为空时,会用msgContext传递的主题
msgContext.setPayload(NewPayload);


2.3. 测试脚本--建立mqtt桥接规则脚本之后,选择规则脚本对应的产品设备,使用mqttx工具模拟设备上报

我们通过桥接功能将本地设备上报的数据桥接到fastbee演示站点数据

可以看到,通过mqtt桥接功能,设备上报的数据在两个平台都接收到。
3. 第三方 HTTP OpenAPI 接口输出
3.1. 新增配置
添加配置
1.新建脚本与mqtt输出一样,规则引擎-->数据桥接-->新增
配置参数说明:
桥接名称:配置名称
桥接方向:输出
请求方法:根据要对接平台api说明选择 get/post/put
桥接地址:api访问的完整地址,例:http://localhost:18083/api/v5/metrics
请求头:根据对接平台api说明文档添加,例:认证参数 Authorization,application/json等等
请求参数:根据对接平台api说明文档添加
请求体:根据对接平台api说明文档添加注意
占位符支持情况: 桥接在执行前会通过上下文对象中的dataMap来替换占位符。 请求头,请求参数,请求体都支持占位符
3.2. 增加http桥接输出规则脚本: 规则引擎-->规则脚本-->新增
// 设置输出headers
JSONObject headers = new JSONObject();
headers.put("Content-Type", "application/json");
cxt.setData("headers", headers.toStringPretty());
// 设置输出请求参数
// 设置输出设置占位符值
cxt.setData("key", 111));
JSONObject params = new JSONObject();
headers.put("param1", "${key}");
cxt.setData("params", params.toStringPretty());
// 设置请求body内容
// 默认使用桥接配置的"body",如果未配置则使用上下文中的"payload"
String NewPayload = "payload";
msgContext.setPayload(NewPayload);


3.3. 输出侧测试--使用mqttx模拟设备上报数据测试

在后端看到如下日志

4. 其他平台对接参考
4.1. emqx对接
对接参考文档:https://www.emqx.io/docs/zh/v5.1/admin/api.html#java
1.在mqtt服务器新建API密钥
2.HTTP认证:采用Basic认证:将用户名和密码用MD5在线加密
3.在fastbee平台创建配置
HTTP认证参数:Authorization: 支持两种认证方式:Basic或Bearer
accept:application/json- 两种认证方式:

- 在emqx管理界面创建api密钥

- fastbee平台创建桥接配置

4.2. 萤石对接
对接参考文档:https://open.ys7.com/help/81
HTTP认证参数:Content-Type: application/x-www-form-urlencoded
accessToken


4.3. OneNET-中国移动物联网开放平台对接
对接参考文档:https://open.ys7.com/help/81
HTTP认证参数:Content-Type: application/x-www-form-urlencoded
accessToken参数说明:



4.4. 百度智能云api对接
参考文档:https://cloud.baidu.com/doc/EVS/s/Ekowke5c8
1.创建应用
2.获取access_token:https://console.bce.baidu.com/support/#/api?product=AI&project=%E5%9B%BE%E5%83%8F%E8%AF%86%E5%88%AB&parent=%E9%89%B4%E6%9D%83%E8%AE%A4%E8%AF%81%E6%9C%BA%E5%88%B6&api=oauth%2F2.0%2Ftoken&method=post
HTTP认证参数:access_token
Content-Type: application/json; charset=utf-8


4.5. 海康威视API对接
参考文档:https://ai.hikvision.com/resource-center/document-detail/8aee27227c3046eba42e84d26d88bc96
1.注册账号
2.获取token:https://ai.hikvision.com/resource-center/document-detail/a45858cdd40c43fb8669537e55bf2cae
HTTP认证参数:Content-Type:application/x-www-form-urlencoded
referer:https://ai.hikvision.com/
token在康海平台注册账号之后,在个人控制台获取ak和sk,获取token 


更对平台对接参考对应平台的api对接文档
5. 数据库存储
5.1. 新增数据库存储配置:规则引擎-->数据桥接-->新增

数据库存储配置说明:
桥接名称:
桥接方向:一般为输入
数据库类型:目前仅支持关系型数据库
数据源:MySQL/Oracle/SQL server/PostgreSQL
链接地址:ip + 端口,例如:localhost:3306
用户名:登录数据库的用户名
密码:密码
数据库名称:库名SQL:sql语句,例如:
INSERT INTO database_storage(topic, payload, serial_number, product_id,protocol_code)
VALUES ('${topic}', '${payload}', '${serialNumber}', '${productId}','${protocolCode}');
说明:可在脚本中使用msgContext.setData("test",1);
然后sql语句通过${test}调用
可在输出侧http body和 sql语句中传入该值注意
注意:sql语句 values 中使用自定义占位符 '${}' 赋值
单引号 '' 最好不要省略,否则可能会导致sql语句错误
5.2. 新增数据存储规则脚本:规则引擎-->规则脚本-->新增

规则脚本给sql语句传值格式:
// 以下几个字段在规则脚本中可以省略,因为后端已经封装好,如果sql涉及其他字段需要手动封装
msgContext.setData("topic",NewTopic);
msgContext.setData("payload",NewPayload);
msgContext.setData("serialNumber",serialNumber);
msgContext.setData("productId",productId);
msgContext.setData("protocolCode",protocolCode);5.3. 测试数据库存储


