Kafka Integration
About 3 min
Overview
This page is the English counterpart of the Chinese Kafka Integration guide. It summarizes cluster edition operation, deployment, integration, and high-availability maintenance while keeping the document path aligned with the Chinese documentation structure.
How To Use This Page
- Follow the same operation order as the Chinese source page.
- Keep configuration values, topic names, ports, commands, and file paths consistent with your deployment environment.
- Screenshots and diagrams reuse the Chinese documentation image directory so assets are maintained in one place.
- For production changes, validate the operation in a test environment before applying it online.
Reference Commands And Configuration
The following snippets are preserved from the source guide because commands, configuration keys, and protocol examples should remain exact.
broker(bootstrap.servers): 127.0.0.1:9092
键序列化 (key.serializer): org.apache.kafka.common.serialization.StringSerializer
值序列化 (value.serializer): org.apache.kafka.common.serialization.StringSerializer
写入确认方式(acks): -1
批量发送消息大小(batch.size): 16384
等待消息入批的最长时间(linger.ms): 0
消息缓冲内存大小(buffer.memory): 33554432
消息压缩类型(compression.type): none
消息重试次数(retries): 0
重试间隔时间(retry.backoff.ms): 100 (毫秒)
```
```sql
select
*
from
"$events/message_publish"
where topic =~ 'property/post'import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
// Kafka集群的地址,多个地址用逗号隔开,实际的IP地址请根据情况自行修改
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 消费者组ID,同一组内的消费者共同消费主题分区
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
// 自动提交偏移量的时间间隔,单位是毫秒,这里设置为1000毫秒(即1秒)
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 自动提交偏移量的配置,设为true表示开启自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 键的反序列化类,这里使用字符串反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 值的反序列化类,同样使用字符串反序列化
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 创建Kafka消费者实例
KafkaConsumer<String, String> consumer = null;
try {
consumer = new KafkaConsumer<>(props);
// 订阅kafka生产者的主题
consumer.subscribe(Arrays.asList("property_post"));
// 循环拉取消息并处理
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消费者,释放资源
if (consumer!= null) {
try {
consumer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}/**
* 设备主动上报消息-kafka消费
* kafka主题:property_post
* kafka消费者id:device_property_group
* kafka消费者配置 containerFactory 配置的是 KafkaListenerContainerFactory 实例,
* 里面配置了 n个消费者实例 factory.setConcurrency(n); ,这里类似于启动了n个线程来消费
* 这个根据数据并发量和服务器实际的规格来做适当调整。即是生产调优
* @param data 消息字符串
*/
@KafkaListener(topics = KafkaConstant.PROPERTY_POST, groupId = KafkaConstant.DEVICE_PROPERTY_GROUP,
containerFactory = "KafkaListenerContainerFactory")
public void listen(String data) {
try {
DeviceReportBo reportBo = JSONObject.parseObject(data, DeviceReportBo.class);
log.debug("消费者实例 [{}] 设备主动上报数据: {}", consumerInstanceId, reportBo);
} catch (Exception e) {
log.error("消费者实例 [{}] 设备主动上报异常: {}", consumerInstanceId, e.getMessage());
}
} {
"name": "test",
"value": 666
}Shared Screenshots
The English documentation reuses the screenshots from the Chinese source page. UI labels in screenshots may remain Chinese.











