桥接Kafka
一、MQTT数据集成到KAFKA
kafka是分布式事件流处理中间件, 可以使数据在业务系统与应用程序之间进行实时传输。
Kafka可以作为边缘物联网通信的桥接,Kafka 分为生产者和消费者,建立客户端需要稳定的网络连接。在物联网领域,设备和应用程序生成的数据使用轻量级 MQTT 协议传输。kafka的集成使用户能够无缝地将 MQTT 数据流入或流出 Kafka。MQTT 数据流被引入 Kafka 主题,MQTT与kafka的主题可以做映射处理,确保实时处理、存储和分析。而Kafka 主题的数据可以被 MQTT 设备消费,实现及时处理。
简单实现MQTT与kafka数据集成流程如下图

实现流程原理
kafka数据集成到MQTT是通过配置可以实现的功能,当然我们需要先安装KAFKA服务,能够在基于 MQTT 的物联网数据和 Kafka 强大的数据处理能力之间架起桥梁。通过内置的规则引擎组件,集成简化了两个平台之间的数据流和处理过程,无需复杂编码。
下图是MQTT与KAFKA实现数据集成的最佳实践架构图

Kafka的消费与生产 需要分别创建动作,实现规则引擎的向kafka消费者发送消息。用动作执行为例子,简单描述其转发的流程:
- 消息收发:MQTT设备终端通过 MQTT 协议成功连接到 MQTT服务器,并通过 MQTT 定期发布包含状态数据的消息。当 MQTT 收到这些消息时,它启动其规则引擎内的事件匹配。
- 数据流转:在规则引擎处理数据中,MQTT 消息可以根据主题匹配规则进行处理。当消息到达并通过规则引擎时,规则引擎将评估针对该消息事先定义好的处理规则。如果任何规则指定消息载荷转换,则应用这些转换。
- 转发Kafka:规则引擎中定义的规则触发将消息转发到 Kafka 的动作执行,MQTT 主题被映射到预定义的 Kafka 主题,所有处理过的消息和数据被写入 Kafka 主题。
MQTT数据被输入到 Kafka 后,可以灵活的使用:
- 与Kafka 客户端集成,从特定主题消费实时数据流,实现定制化的业务处理。
- 通过使用 Kafka连接 组件,您可以选择各种连接器将数据输出到外部数据库中。
MQTT消息集成到kafka演示
安装KAFKA服务
参考20章节 kafka安装
MQTT消息上报集成到KAFKA
创建KAFKA生产者连接器
在创建动作之前,需要先创建kafka生产者连接器,使MQTT与Kafka建立连接
进行MQTT 可视化,点击
集成->连接器在页面右上角的 点击
创建,在连接器选择页面。现在kafka服务配置kafka的参数
broker(bootstrap.servers): 127.0.0.1:9092 键序列化 (key.serializer): org.apache.kafka.common.serialization.StringSerializer 值序列化 (value.serializer): org.apache.kafka.common.serialization.StringSerializer 写入确认方式(acks): -1 批量发送消息大小(batch.size): 16384 等待消息入批的最长时间(linger.ms): 0 消息缓冲内存大小(buffer.memory): 33554432 消息压缩类型(compression.type): none 消息重试次数(retries): 0 重试间隔时间(retry.backoff.ms): 100 (毫秒)- 主机列表,输入
IP:9092,注意这里的IP是当前服务的IP地址 - 将其他选项保留默认值,或者根据需求进行一定的修改
- 点击
测试按钮,验证kafka配置的参数是否正确 - 点击**
创建** 按钮完成连接器的创建
如下图所示,创建完成后,连接器将自动连接到kafka,下面我们使用连机器创建一条动作

- 主机列表,输入
创建MQTT数据上报KAFKA 规则
本节演示来自MQTT主题
/${productId}/${clientId}/property/post 或
/${clientId}/property/post
等以 property/post 结尾的设备上报主题,通过动作处理,发送处理数据,以生产数据到kafka的
property_post 主题,已经规则的测试
在MQTT可视化界面,点击
集成 -> 动作,点击**创建**按钮选择**
KAFKA-桥接**,点击前进,继续配置- 填写动作名称
- 选择上面创建的 kafka连接器
- 主题,我们这里填写
property_post - key ,这里不填
在MQTT可视化界面,点击
集成 -> 规则引擎,点击页面右上角创建按钮点击 **
数据输入**选择事件,这里选择 **发布消息**事件,然后在SQL编辑器中补充完整语句select * from "$events/message_publish" where topic =~ 'property/post'其中 操作符
=~表示:比较主题(topic)是否能够匹配到主题过滤器(topic filter)。只能用于主题匹配点击 动作输出,选择step2中创建的动作,点击 **
提交**按钮,完成规则引擎创建,如下图所示

创建一个KAFKA消费实例
下面是一个kafka消费者,拉取property_post主题内容
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
// Kafka集群的地址,多个地址用逗号隔开,实际的IP地址请根据情况自行修改
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 消费者组ID,同一组内的消费者共同消费主题分区
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
// 自动提交偏移量的时间间隔,单位是毫秒,这里设置为1000毫秒(即1秒)
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 自动提交偏移量的配置,设为true表示开启自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 键的反序列化类,这里使用字符串反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 值的反序列化类,同样使用字符串反序列化
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 创建Kafka消费者实例
KafkaConsumer<String, String> consumer = null;
try {
consumer = new KafkaConsumer<>(props);
// 订阅kafka生产者的主题
consumer.subscribe(Arrays.asList("property_post"));
// 循环拉取消息并处理
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消费者,释放资源
if (consumer!= null) {
try {
consumer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}这里我们不使用上面的例子演示,我们使用业务系统建立的kafka消费实例来演示
/**
* 设备主动上报消息-kafka消费
* kafka主题:property_post
* kafka消费者id:device_property_group
* kafka消费者配置 containerFactory 配置的是 KafkaListenerContainerFactory 实例,
* 里面配置了 n个消费者实例 factory.setConcurrency(n); ,这里类似于启动了n个线程来消费
* 这个根据数据并发量和服务器实际的规格来做适当调整。即是生产调优
* @param data 消息字符串
*/
@KafkaListener(topics = KafkaConstant.PROPERTY_POST, groupId = KafkaConstant.DEVICE_PROPERTY_GROUP,
containerFactory = "KafkaListenerContainerFactory")
public void listen(String data) {
try {
DeviceReportBo reportBo = JSONObject.parseObject(data, DeviceReportBo.class);
log.debug("消费者实例 [{}] 设备主动上报数据: {}", consumerInstanceId, reportBo);
} catch (Exception e) {
log.error("消费者实例 [{}] 设备主动上报异常: {}", consumerInstanceId, e.getMessage());
}
}下面我们使用MQTTX 模拟客户端上报
新建一个MQTTX客户端,填写连接参数,这里新建一个客户端id:
860061061621test的客户端在发送区,填写主题
/860061061621test/property/post在内容发送区,填写 ,以
JSON格式演示{ "name": "test", "value": 666 }
如下图所示

点击发送,我们看看kafka消费端的日志打印情况
内容如下所示
{
"proto_name": "MQTT",
"ts": 1736150303008,
"clientId": "860061061621test",
"id": "1876176470964547584",
"topic": "/860061061621test/property/post",
"qos": 0,
"payload": [123,13,10,32,32,32,32,32,32,34,110,97,109,101,34,58,32,34,116,101,115,116,34,44,13,10,32,32,32,32,32,32,34,118,97,108,117,101,34,58,32,54,54,54,13,10,125,13,10 ]
"peerhost": "127.0.0.1",
"clientPort": 52775,
"node": "127.0.0.1",
"publish_received_at": 1736150303009,
"datatype": "json"
}控制台打印情况如下

我们可以在可视化页面 点击 **对应的规则详情 -> 概览**查看数据桥接情况


至此MQTT数据集成到kafka简单演示完成
客户端状态上报集成KAFKA演示
创建客户端状态KAFKA 规则
在MQTT可视化界面,点击
集成 -> 动作,点击**创建**按钮选择**
KAFKA-桥接**,点击前进,继续配置- 填写动作名称
- 选择上面创建的 kafka连接器
- 主题,我们这里填写
property_post_status,区分上报数据的kafka主题 - key ,这里不填
新建规则,数据输入选择事件
连接建立,连接断开SQL编辑器自动生成SQL规则
select * from "$events/client_connected", "$events/client_disconnected"点击
动作输出选择step2创建的动作,点击提交,创建完成。使用上面MQTTX创建的模拟设备测试,上线和离线
业务平台kafka消费实例监听
@KafkaListener(topics = KafkaConstant.DEVICE_STATUS_FB, groupId = KafkaConstant.DEVICE_MESSAGE_FB_GROUP,
containerFactory = "statusListenerContainerFactory", batch = "true")
public void deviceStatusListen(String data) {
System.out.println(JSONObject.toJSONString(data));
log.debug("消费者实例 [{}] 设备上下线消息: {}", consumerInstanceId, data);
}设备上线接收到的数据
{
"proto_name": "MQTT",
"node": "127.0.0.1",
"clientId": "860061061621test",
"ip_address": "127.0.0.1",
"port": 54703,
"ts": 1736152769586,
"keepalive": 10,
"clean_start": true,
"status": true
}设备离线接收到的数据
{
"proto_name": "MQTT",
"node": "127.0.0.1",
"clientId": "860061061621test",
"ip_address": "127.0.0.1",
"port": 0,
"ts": 1736152772566,
"offline_reason": "kicked",
"status": false
}我们可以在可视化页面 点击 **对应的规则详情 -> 概览**查看数据桥接情况

指令下发集成到KAFKA推送MQTT
创建 Kafka 消费者 转换器
- 进入 MQTT Dashboard,并点击 集成 -> 转换器,点击页面右上角的 创建
- 在创建转换器页面中,选择 Kafka 服务,然后点击 前进。
- 配置kafka参数
- 转换器名称
- broker,填写kafka服务器IP地址 如
localhost:9092 - 消费实例数,即是kafka消费者的实例,默认是10个消费实例
- 其他参数尽量不更改
- 测试连接,参数是否正确。点击
创建。
创建设备下行
进入 MQTT Dashboard,并点击 集成 -> 转换配置,点击页面右上角的 创建
进行转换参数配置
- 名称,转换配置名称
- 主题, 这里是MQTT主题,支持字段映射 ,我们这里填写
/${productId}/${clientId}/function/get$是占位符 - QOS,下发指令的消息指令
- 是否保留消息,MQTT的保留消息功能
- kafka主题是根据MQTT主题自动生成,按照step2,这里对应的kafka主题是
function_get
进入 MQTT Dashboard,并点击 集成 -> 设备下行,点击页面右上角的 创建
选择
KAFKA-服务,配置参数转换器,选择上面创建的转换器
转换配置,选择step2中创建的转换配置
我们在业务系统创建一个kafka的生产者,暴露一个API接口来测试设备下行
kafka生产者演示代码
public Promise<Void> transmit(ProducerVO vo) {
final Promise<Void> promise = coreGroup.next().newPromise();
try {
ProducerRecord<String,Object> record;
if (Objects.isNull(vo.getKey())) {
producerRecord = new ProducerRecord<>(vo.getTopic(), vo.getMessage());
} else {
producerRecord = new ProducerRecord<>(vo.getTopic(), vo.getKey(), vo.getMessage());
}
producer.send(producerRecord, (data, e) -> {
if (e!= null) {
promise.setFailure(e);
} else {
promise.setSuccess(null);
}
});
} catch (Exception e) {
promise.setFailure(e);
}
return promise;
}暴露的演示接口
@GetMapping("/command")
public String sendCommand(String type) {
String command = "{\"test\": \"" + "666" + "\"}";
ProducerVO producer = new ProducerVO();
JSONObject jb = new JSONObject();
jb.put("clientId","860061061621test");
jb.put("productId","1");
producer.setKey(JSON.toJSONString(jsonObject));
producer.setTopic("function_get_fb");
byte[] bytes = command.getBytes();
producer.setData(bytes);
producerTemplate.transmit(producer);
return "success";
}新建一个MQTTX模拟客户端,订阅主题 /1/860061061621test/function/get

我们执行接口,演示设备下发,下图我们看到点击接口发送,可以看到mqtt客户端接收到下发的指令
/860061061621test/property/post
{
"name": "test",
"value": 666
}
我们可以看看设备下行的统计,看看是否成功

至此,我们演示了从mqtt将数据集成到kafka,然后业务系统用kafka拉取数据进行处理,同事也演示了业务系统下发指令,通过kafka进行映射转发,给到mqtt下发给对应的设备。
