连接器
大约 4 分钟
一、连接器
连接器理解
连接器作为数据源的底层连接通道,用于连接到Saas系统,或业务系统,等三方服务系统
连接器与外部收系统对接,用户创建规则动作,为多个数据源提供连接
下面以KAFKA连接器为例,连接器,动作,数据映射,与外部数据系统的关系如下图所示

连接器特点
在创建动作和匹配规则时,选择一个已经创建好的连接器,而不用关心底层连接的细节。如此设计的优势如下:
- 将连接配置从数据处理和数据映射配置中分离出来,数据处理流的设计更加模块化和灵活。
- 连接器的创建和配置不会影响到数据处理流的设计,连接器与规则关联性较低,达到了与数据转发解耦的目的。
- 外部数据系统需要多个动作或规则转发,可以单独配置动作的通道,相互之间独立,互不干预。
支持的连接器
在FBMQ系统中,目前支持的连接器有5中,如下所示:
KAFKA 连接器
RABBITMQ 连机器
MQTT 连接器
ROCKETMQ 连接器
WEBHOOK 连接器
创建连接器
在可视化页面中,选择 集成 - 连机器,点击创建 ,进入连接器的创建流程
在连接器页面选择所需的类型,进行创建
创建KAFKA连接器
参数如下所示
broker(bootstrap.servers): 127.0.0.1:9092,127.0.0.2:9092 键序列化 (key.serializer): org.apache.kafka.common.serialization.StringSerializer 值序列化 (value.serializer): org.apache.kafka.common.serialization.StringSerializer 写入确认方式(acks): -1 批量发送消息大小(batch.size): 16384 等待消息入批的最长时间(linger.ms): 0 消息缓冲内存大小(buffer.memory): 33554432 消息压缩类型(compression.type): none 消息重试次数(retries): 0 重试间隔时间(retry.backoff.ms): 100 (毫秒)

连接器状态
创建完成点击 测试连接 按钮,进行验证
具体连接状态分为:
已连接:连接器的配置信息填写正确,连接的外部服务器正常
未连接: 连接器的配置信息有误,处于不健康状态,根据其配置会定时重新尝试连接
测试成功示例

测试失败示例

创建ROCKETMQ连接器
rocketMQ作为连接通道

参数如下所示
生产者组(producer_group): producer_group 服务地址(namesrv_addr): 127.0.0.1:9876 实例名称(instance_name): default 发送失败重试次数(retrys_failed): 2 发送失败等待时间(retry_async_failed): 2

创建RABBIRMQ连接器
rabbitmq作为连接通道

参数示例如下所示:
连接主机(host): localhost 连接端口(port): 5762 虚拟主机(vhost): / 用户名(username): guest 密码(password): guest 自动恢复连接(automaticRecovery): true 拓扑结构恢复(topologyRecovery): true 网络恢复间隔(networkRecoveryInterval): 5000

创建WEBHOOK连接器
webhook作为连接通道

参数如下所示


创建MQTT连接器

更新连接器
创建完成的连接器可以进行连接器的基本信息和连接参数进行修改
如果连接器被规则引擎使用,更新后会导致规则引擎重新加载数据,会造成更新过程中一些数据丢弃。
被规则引擎使用的连接器无法进行删除,如果要删除,需要先删除掉使用该连接器的规则引擎。
PS事项
连接器在创建后不会直接连接服务器,不会消耗任何资源,再被规则引擎使用后,触发动作后,才会创建连接池
有些连接器提供了连接池,连接池是复用的,可以降低资源的消耗和提高连接与并发能力
节点间的连接器是共用的,一般连接器的连接池大小为8。
