Rule engine
The FastBee rule engine contains scene automation and rule scripts. It is built around the scene table, script table, scene-script table, and scene-device table.
- The scene table stores basic scene information and rule-chain expressions.
- The script table stores script metadata and script content.
- The scene-script table maps triggers and actions to scripts.
- The scene-device table stores all devices and products associated with a scene.
Scene automation
1. Scene rule chain
// Rule data examples
IF(AND(T1,T2,T3),THEN(A1,A2,A3))
IF(OR(T1,T2,T3),WHEN(A1,A2,A3))
// Notes
// AND and OR must contain at least two components.
// NOT can contain only one trigger.
// THEN runs actions serially.
// WHEN runs actions in parallel.2. Scene script format
String json = {
"cond": 3,
"delay": 3,
"deviceNums": "D1ELV3A5TOJS",
"id": "temperature",
"operator": "between",
"productId": 41,
"purpose": 2,
"sceneId": 63,
"scriptId": "T1754931118680969216",
"silent": 1,
"source": 1,
"type": 1,
"value": "10-50"
};
sceneContext.process(json);Key fields:
| Field | Description |
|---|---|
source | Trigger source. 1 = device, 3 = product, 4 = alarm. |
type | Model type. 1 = property, 2 = service, 3 = event, 4 = device upgrade, 5 = device online, 6 = device offline. |
delay | Delay in seconds. This is a non-persistent scheduled task. |
silent | Silence period in minutes. During the silence period, only the first action is reported. Alarm silence data is stored in Redis. |
purpose | Script purpose. 1 = data flow, 2 = trigger, 3 = action. |
cond | Trigger condition. 1 = any condition, 2 = all conditions, 3 = not matched. |
operator | Operator, such as greater than, equal to, less than, contains, not contains, or between. |
scriptId | Script ID. IDs starting with D are data-flow scripts, T are triggers, and A are actions. |
3. Scene script object
Scene script objects are divided into trigger objects and action objects. Action objects require fewer fields than trigger objects. In an action object, the selected thing model does not include read-only or telemetry-only fields.
// Fields required by a trigger
productId: 0,
productName: '',
deviceCount: 0,
deviceNums: [],
source: 1, // Trigger source: 1=device, 2=timer, 3=product
type: 1, // 1=property, 2=service, 3=event, 4=device upgrade, 5=device online, 6=device offline
parentId: '', // Parent thing-model ID
parentName: '',
parentModel: null, // Parent thing model, used for normal fields other than object and object-array types
model: null, // Thing model
operator: '=',
id: '',
name: '',
value: '', // For the between operator, value = valueA-valueB
valueA: '',
valueB: '',
arrayIndex: '', // Array index
arrayIndexName: '',
isAdvance: 0, // Custom CRON flag
cronExpression: '', // CRON expression
timerTimeValue: '', // Time value
timerWeekValue: [1, 2, 3, 4, 5, 6, 7], // Weekdays
// Fields required by an action
productId: 0,
productName: '',
deviceCount: 0,
deviceNums: [],
source: 4, // 1=device, 3=product, 4=alarm
type: 2,
parentId: '', // Parent thing-model ID
parentName: '',
parentModel: null,
model: null, // Thing model
id: '',
name: '',
value: '',
arrayIndex: '', // Array index
arrayIndexName: '',Rule scripts
1. Script language
The platform currently uses Groovy as the script language. Groovy syntax is close to Java, with the following differences:
| Groovy | Java |
|---|---|
| Compiles to JVM bytecode and is compatible with the Java platform. | Developed on the JDK and runs on the JVM. |
| Can be used as both a programming language and a scripting language. | Mainly used as a programming and object-oriented language. |
The default access modifier is public. | The default access modifier is package-private. |
| Automatically generates getters and setters for class members. | Getters and setters must be provided explicitly when needed, especially for Java Beans conventions. |
| Semicolons are optional. | Semicolons are required. |
| Common packages are imported by default. | Only java.lang.* is imported by default. |
| Everything is treated as an object; there is no automatic boxing or unboxing concept. | Primitive types and wrapper classes support implicit or explicit boxing and unboxing. |
A main method is not required to run a script. | A main method is required for a standalone Java program. |
2. Message forwarding
This section introduces the message forwarding flow:
- Define a script context service that exposes backend methods to scripts.
- Execute scripts through the rule-engine process class.
- Configure scripts in the frontend.
- Use scripts to adapt upstream and downstream messages.
2.1 Script context service
@ScriptBean("msgContextService") exposes the class to the script context. Scripts can call it with msgContextService.process().
/**
* Rule-engine context service.
*/
@Component
@Slf4j
@ScriptBean("msgContextService")
public class MsgContextService {
private final RedisCache redisCache;
public MsgContextService(RedisCache redisCache) {
this.redisCache = redisCache;
}
private void process(String serialNumber) {
// Execute custom business logic.
}
}2.2 Rule-engine executor
The executor obtains the product ID and protocol code by device serial number, caches the result in Redis, then queries the script table by product ID and event type. If matching scripts exist, it executes them through LiteFlow.
/**
* Rule-engine executor.
*/
@Component
@Slf4j
public class RuleProcess {
@Resource
private FlowExecutor flowExecutor;
@Resource
private IScriptService scriptService;
@Resource
private RedisCache redisCache;
@Resource
private IProductService productService;
}Script execution method:
/**
* Process rule scripts.
*
* @param serialNumber device serial number
* @param event 1=device report, 2=platform delivery, 3=device online, 4=device offline
* @param topic MQTT topic
* @param payload message payload
* @return processed message context
*/
public MsgContext processRuleScript(String serialNumber, int event, String topic, String payload) {
ProductCode productCode = getDeviceDetail(serialNumber);
if (Objects.isNull(productCode)) {
return null;
}
ScriptCondition scriptCondition = new ScriptCondition();
scriptCondition.setProductId(productCode.getProductId());
scriptCondition.setScriptEvent(event);
scriptCondition.setScriptPurpose(1);
String[] scriptIds = scriptService.selectRuleScriptIdArray(scriptCondition);
MsgContext context = new MsgContext(
topic,
payload,
serialNumber,
productCode.getProductId(),
productCode.getProtocolCode()
);
if (Objects.isNull(scriptIds) || scriptIds.length == 0) {
return null;
}
String el = String.join(",", scriptIds);
LiteFlowChainELBuilder.createChain()
.setChainName("dataChain")
.setEL("THEN(" + el + ")")
.build();
LiteflowResponse response = flowExecutor.execute2Resp("dataChain", null, context);
if (!response.isSuccess()) {
log.error("Rule script execution error: {}", response.getMessage());
}
return context;
}Get product and protocol information:
/**
* Query product ID and protocol code by serial number, then cache the result in Redis.
*/
public ProductCode getDeviceDetail(String serialNumber) {
ProductCode productCode;
String cacheKey = RedisKeyBuilder.buildDeviceMsgCacheKey(serialNumber);
if (redisCache.containsKey(cacheKey)) {
Object cacheObject = redisCache.getCacheObject(cacheKey);
return JSON.parseObject(cacheObject.toString(), ProductCode.class);
}
productCode = productService.getProtocolBySerialNumber(serialNumber);
String jsonString = JSON.toJSONString(productCode);
redisCache.setCacheObject(cacheKey, jsonString);
return productCode;
}2.3 Frontend script configuration
The frontend script list is shown below:

2.4 Script usage example
Device reporting means the device uploads data to the cloud. The topic or payload defined by the device may be different from the platform convention. Message forwarding scripts can normalize the topic and payload before the platform continues processing the message.
When EMQX is used as the MQTT server, the rule engine is usually called from the bridge callback method in MqttService.
/**
* Message callback.
*
* @param topic MQTT topic
* @param mqttMessage MQTT message body
*/
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
String message = new String(mqttMessage.getPayload());
// Other processing is omitted.
String[] split = topic.split("/");
String clientId = Arrays.stream(split)
.filter(imei -> imei.length() > 9)
.findFirst()
.get();
MsgContext context = ruleProcess.processRuleScript(clientId, 1, topic, message);
if (!Objects.isNull(context)) {
topic = context.getTopic();
message = context.getPayload();
}
}When Netty MQTT is used as the MQTT server, call the rule engine from MqttPublish#sendToMQ.
/**
* Push a message to MQ.
*
* @param message message to push
*/
@SneakyThrows
public void sendToMQ(MqttPublishMessage message, String clientId) {
String topicName = message.variableHeader().topicName();
byte[] source = ByteBufUtil.getBytes(message.content());
// Other processing is omitted.
MsgContext context = ruleProcess.processRuleScript(
clientId,
1,
topicName,
new String(source)
);
if (!Objects.isNull(context)) {
reportBo.setTopicName(context.getTopic());
reportBo.setData(context.getPayload().getBytes(StandardCharsets.UTF_8));
}
// Other processing is omitted.
}