Thingsboard 规则引擎介绍

Thingsboard 的 Rule engine 是针对物联网场景下的定制实现的规则引擎,主要通过节点的定义,实现物联网各种场景的需求,可以对消息进行转换,消息类型进行路由,消息内容进行解析,消息数据进行转发,并且可以实现对设备下发指令动作片等强大的功能。Thingsboard规则引擎的高性能和高效是我们值得学习的编程框架,主要采用了Actor模型,以及大量的使用了消息队列异步编程。实现的高可靠规则引擎。本篇讲讲规则引擎的初始化过程,让大家入门了解规则引擎的设计。

表结构说明

rule_chain 表结构

在这里插入图片描述

rule_node表结构 主要保存规则链里面的每一个节点信息,核心的配置项信息,以json存储在configuration字段里面,

在这里插入图片描述

relation 表 保存rule node 的关联关系

在这里插入图片描述

整体初始化过程

整个初始化过程 大致如下:AppActor初始化 -> TennantActor初始化 -> RuleChainActor初始化 - > RuleNodeActor初始化 -> RuleNode的关系初始化

程序初始化入口

第一步:TenantActor 初始化租户的时候,调用了org.thingsboard.server.actors.ruleChain.RuleChainManagerActor#initRuleChains

TenantActor

初始化过程

第二步: org.thingsboard.server.actors.ruleChain.RuleChainManagerActor#initRuleChains 方法 查询数据库获取租户下的规则引擎配置,也就是规则链的配置信息


    protected void initRuleChains() {
        for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
            RuleChainId ruleChainId = ruleChain.getId();
            log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
            TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
            visit(ruleChain, actorRef);
            log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
        }
    }

第三步:org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#start 初始化 规则链的节点 ruleNodeActor,调用了initRoutes

@Override
public void start(TbActorCtx context) {
        if (!started) {
            RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
            if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) {
                List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
                log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
                // Creating and starting the actors;
                for (RuleNode ruleNode : ruleNodeList) {
                    log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
                    TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
                    nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
                }
                initRoutes(ruleChain, ruleNodeList);
                started = true;
            }
        } else {
            onUpdate(context);
        }
}

第四步:初始化 rule node之间的关联关系,关系保存在org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#nodeRoutes 中,

private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {
        nodeRoutes.clear();
        // Populating the routes map;
        for (RuleNode ruleNode : ruleNodeList) {
            List<EntityRelation> relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId());
            log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
            if (relations.size() == 0) {
                nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
            } else {
                for (EntityRelation relation : relations) {
                    log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
                    if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
                        RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
                        if (ruleNodeCtx == null) {
                            throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
                        }
                    }
                    nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
                            .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
                }
            }
        }

        firstId = ruleChain.getFirstRuleNodeId();
        firstNode = nodeActors.get(firstId);
        state = ComponentLifecycleState.ACTIVE;
    }

总结

至此,规则引擎的核心Actor 初始化完成,包括初始化了所有的RuleChainActor,RuleNodeActor以及 RuleNode之间的关联关系,收到设备上报消息,消息根据设备对应的 设备配置 设备配置的规则链找到对应的规则链也就是RuleChainActor进行消息投递。最终处理过程 AppActor -> TenantActor -> RuleChainActor -> RuleNodeActor,下一篇文章我讲解这个消息的处理过程。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐