物联网平台 Thingsboard rule engine 规则引擎 - 初始化过程
Thingsboard 的 Rule engine 是针对物联网场景下的定制实现的规则引擎,主要通过节点的定义,实现物联网各种场景的需求,可以对等强大的功能。Thingsboard规则引擎的高性能和高效是我们值得学习的编程框架,主要采用了Actor模型,以及大量的使用了消息队列异步编程。实现的高可靠规则引擎。
Thingsboard 规则引擎介绍
Thingsboard 的 Rule engine 是针对物联网场景下的定制实现的规则引擎,主要通过节点的定义,实现物联网各种场景的需求,可以对
消息进行转换,消息类型进行路由,消息内容进行解析,消息数据进行转发,并且可以实现对设备下发指令动作片
等强大的功能。Thingsboard规则引擎的高性能和高效是我们值得学习的编程框架,主要采用了Actor
模型,以及大量的使用了消息队列异步编程。实现的高可靠规则引擎。本篇讲讲规则引擎的初始化过程,让大家入门了解规则引擎的设计。
表结构说明
rule_chain 表结构
rule_node表结构
主要保存规则链里面的每一个节点信息,核心的配置项信息,以json存储在configuration字段里面,
relation 表 保存rule node 的关联关系
整体初始化过程
整个初始化过程 大致如下:
AppActor初始化 -> TennantActor初始化 -> RuleChainActor初始化 - > RuleNodeActor初始化 -> RuleNode的关系初始化
程序初始化入口
第一步:TenantActor 初始化租户的时候,调用了
org.thingsboard.server.actors.ruleChain.RuleChainManagerActor#initRuleChains
初始化过程
第二步:
org.thingsboard.server.actors.ruleChain.RuleChainManagerActor#initRuleChains
方法 查询数据库获取租户下的规则引擎配置,也就是规则链的配置信息
protected void initRuleChains() {
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
RuleChainId ruleChainId = ruleChain.getId();
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
visit(ruleChain, actorRef);
log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
}
}
第三步:
org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#start
初始化 规则链的节点 ruleNodeActor,调用了initRoutes
@Override
public void start(TbActorCtx context) {
if (!started) {
RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) {
List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList);
started = true;
}
} else {
onUpdate(context);
}
}
第四步:初始化 rule node之间的关联关系,关系保存在
org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#nodeRoutes
中,
private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {
nodeRoutes.clear();
// Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) {
List<EntityRelation> relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId());
log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
if (relations.size() == 0) {
nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
} else {
for (EntityRelation relation : relations) {
log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
if (ruleNodeCtx == null) {
throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
}
}
nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
.add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
}
}
}
firstId = ruleChain.getFirstRuleNodeId();
firstNode = nodeActors.get(firstId);
state = ComponentLifecycleState.ACTIVE;
}
总结
至此,规则引擎的核心Actor 初始化完成,包括初始化了所有的RuleChainActor,RuleNodeActor以及 RuleNode之间的关联关系,收到设备上报消息,消息根据设备对应的
设备配置
设备配置的规则链
找到对应的规则链也就是RuleChainActor进行消息投递。最终处理过程AppActor -> TenantActor -> RuleChainActor -> RuleNodeActor
,下一篇文章我讲解这个消息的处理过程。
更多推荐
所有评论(0)