对接fastbee
小于 1 分钟
对接fastbee
yml文件配置
配置kafka的服务器地址
bootstrap-servers: localhost:9092
更改为您的服务器IP地址
kafka:
bootstrap-servers: localhost:9092 # kafka 服务器集群地址,默认为 localhost:9092
template:
default-topic: property_post #将消息发送到的默认主题,KafkaTemplate.sendDefault
listener:
type: batch
kafka消费者
@KafkaListener(topics = KafkaConstant.FMQ_PROPERTY_POST, groupId = KafkaConstant.FMQ_PROPERTY_POST_GROUP,
containerFactory = "propertyPostFactory", batch = "true")
public void propertyPostListen(String data) {
try {
//此处省略
} catch (Exception e) {
}
}
kafka生产者
public Promise<Void> transmit(ProducerVO vo) {
final Promise<Void> promise = defaultEventExecutorGroup.next().newPromise();
try {
String key = vo.getKey();
String jKey = key != null ? key : "default";
ProducerRecord<String,Object> record;
if (Objects.isNull(key)){
record = new ProducerRecord<>(vo.getTopic(), vo.getData());
}else {
record = new ProducerRecord<>(vo.getTopic(),jKey, vo.getData());
}
producer.send(record,(data ,e) -> {
if (e!= null){
promise.setFailure(e);
}else {
promise.setSuccess(null);
}
});
}catch (Exception e){
}
return promise;
}
运行测试