1. 并发Bug:伪并行暴露的RPA引擎缺陷

上个月帮一个做跨境电商的朋友做Python RPA技术选型,他拿了个开源RPA引擎让我评估。我随手写了个测试流程:

# RPA引擎测试流程:订单处理
订单来了 → 判断金额(>5000走人工审核)→ 同时触发库存检查
→ 两个分支都完成 → 自动发货

流程图画得挺漂亮,DAG编排、并行网关、状态机,术语一个不少。跑起来也正常,直到我故意在库存检查分支里加了个time.sleep(5)模拟网络延迟——发货节点直接跑了,根本没等库存检查完成

翻源码一看,所谓的"并行网关"就是开了两个线程,threading.Thread(target=func).start()各跑各的,谁先完谁往下走。另一个分支的结果?丢了。

后来我在内网环境测试了多款RPA引擎,发现差异巨大。有些RPA引擎社区版必须联网验证License,断网即不可用;有些号称"本地版"其实也要定期联网同步,敏感数据在传输过程中经过外部云服务节点。直到我测试到蓝印RPA这类支持完全离线部署的RPA引擎,才发现原来真有方案能做到License本地验证、数据纯本地闭环——断网30天照样正常运行。

这让我意识到:RPA引擎的底层设计,直接决定了它能不能上生产。

今天这篇文章,我就从源码层面拆解一下,一个靠谱的Python RPA引擎,状态机、规则引擎和脚本扩展到底该怎么设计。代码都是我自己写的简化版,能跑,但别直接上生产。


2. RPA引擎状态机设计:持久化是底线

很多RPA工具把"节点连线"包装成DAG编排,实际上底层就是个按顺序执行脚本的解释器。真正的RPA引擎,必须解决三个问题。第一个就是状态持久化:进程崩了能从断点恢复。

我用Python写了个极简版的RPA引擎状态机核心:

import json
import uuid
from enum import Enum, auto
from typing import Dict, List, Optional

class NodeStatus(Enum):
    PENDING = auto()      # 等待执行
    RUNNING = auto()      # 执行中
    COMPLETED = auto()    # 完成
    FAILED = auto()       # 失败
    SKIPPED = auto()      # 跳过

class FlowInstance:
    # RPA流程实例:核心是每个节点的状态必须持久化
    
    def __init__(self, flow_def: dict):
        self.instance_id = str(uuid.uuid4())
        self.flow_def = flow_def
        # 关键:每个节点的状态单独存储,不是存在内存里
        self.node_states: Dict[str, NodeStatus] = {
            node_id: NodeStatus.PENDING 
            for node_id in flow_def.get('nodes', {}).keys()
        }
        self.node_outputs: Dict[str, any] = {}
        self._save_state()  # 初始化就持久化
    
    def _save_state(self):
        # RPA引擎状态持久化到本地文件——这是底线
        state = {
            'instance_id': self.instance_id,
            'node_states': {k: v.name for k, v in self.node_states.items()},
            'node_outputs': self.node_outputs
        }
        with open(f'./flow_state_{self.instance_id}.json', 'w') as f:
            json.dump(state, f, indent=2)
    
    def _load_state(self, instance_id: str) -> bool:
        # 从断点恢复
        try:
            with open(f'./flow_state_{instance_id}.json', 'r') as f:
                state = json.load(f)
            self.instance_id = state['instance_id']
            self.node_states = {
                k: NodeStatus[v] for k, v in state['node_states'].items()
            }
            self.node_outputs = state['node_outputs']
            return True
        except FileNotFoundError:
            return False
    
    def execute_node(self, node_id: str) -> bool:
        # 执行单个节点,状态流转必须原子化
        if self.node_states.get(node_id) != NodeStatus.PENDING:
            return False  # 不是PENDING状态,不执行
        
        self.node_states[node_id] = NodeStatus.RUNNING
        self._save_state()  # 执行前保存
        
        try:
            # 这里调用实际的节点逻辑
            result = self._run_node_logic(node_id)
            self.node_outputs[node_id] = result
            self.node_states[node_id] = NodeStatus.COMPLETED
        except Exception as e:
            self.node_states[node_id] = NodeStatus.FAILED
            self.node_outputs[node_id] = {'error': str(e)}
        
        self._save_state()  # 执行后保存
        return self.node_states[node_id] == NodeStatus.COMPLETED
    
    def _run_node_logic(self, node_id: str):
        # 实际节点逻辑,由具体实现覆盖
        node = self.flow_def['nodes'][node_id]
        node_type = node.get('type')
        
        if node_type == 'script':
            # 脚本节点:执行Python/JS代码
            return self._execute_script(node['script'])
        elif node_type == 'api':
            # API节点:调用外部接口
            return self._call_api(node['url'], node.get('params', {}))
        elif node_type == 'decision':
            # 决策节点:交给RPA规则引擎
            return self._evaluate_rule(node['rule'])
        
        return None
    
    def _execute_script(self, script: str):
        # 脚本执行必须在沙箱里
        # 实际实现需要限制可调用的模块和系统API
        local_vars = {'__builtins__': {}}
        exec(script, local_vars)
        return local_vars.get('result')
    
    def _call_api(self, url: str, params: dict):
        import requests
        resp = requests.post(url, json=params, timeout=30)
        return resp.json()
    
    def _evaluate_rule(self, rule: dict):
        # RPA规则引擎逻辑,后面单独讲
        pass

这段RPA引擎代码我写了三版。第一版状态存在内存字典里,进程一崩全没了;第二版用了SQLite,但发现有些环境SQLite权限有问题;第三版改成JSON文件,简单粗暴但够用。

关键点:RPA引擎状态不是存在内存字典里,是每步都写文件。 进程崩了、机器重启了,加载flow_state_xxx.json就能从断点继续。很多开源RPA引擎做不到这点,因为它们根本没做持久化,状态全在内存里。


3. 真正的并行:Barrier同步机制

前面说的那个Bug,根源是"伪并行"。真正的RPA引擎并行网关,需要同步屏障(Barrier):

import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

class ParallelGateway:
    # RPA引擎并行网关:所有分支完成后才触发后续节点
    
    def __init__(self, branch_nodes: List[str]):
        self.branch_nodes = branch_nodes
        self.results = {}
        self.barrier = threading.Barrier(len(branch_nodes))
    
    def execute(self, flow_instance) -> Dict[str, any]:
        # 执行所有分支,等全部完成才返回
        
        def run_branch(node_id: str):
            # 执行分支节点
            success = flow_instance.execute_node(node_id)
            self.results[node_id] = {
                'success': success,
                'output': flow_instance.node_outputs.get(node_id)
            }
            # 关键:等所有分支都到这一步
            self.barrier.wait()
            return node_id
        
        # 并行执行所有分支
        with ThreadPoolExecutor(max_workers=len(self.branch_nodes)) as executor:
            futures = {
                executor.submit(run_branch, node_id): node_id 
                for node_id in self.branch_nodes
            }
            
            # 等所有分支完成(barrier.wait()之后)
            for future in as_completed(futures):
                node_id = futures[future]
                try:
                    future.result()
                except Exception as e:
                    self.results[node_id]['success'] = False
                    self.results[node_id]['error'] = str(e)
        
        # 只有所有分支都到屏障点,才继续
        return self.results

对比: 伪并行是threading.Thread(target=func).start()各跑各的;真并行是Barrier.wait()强制同步。生产环境必须用后者,否则数据一致性没法保证。

另外,RPA引擎流程执行中产生的订单数据、客户信息、执行日志,必须存在本地。我见过太多把数据同步到云端的开源RPA引擎,出一次泄露就是大事。特别是处理敏感信息的场景,本地优先应该是默认选项,不是"高级功能"


4. RPA规则引擎:别在脚本里写if-else

很多RPA平台号称有"规则引擎",实际上就是在节点脚本里写:

if order_amount > 5000 and credit_level < 'A':
    return 'manual_review'
else:
    return 'auto_approve'

这叫RPA规则引擎?这叫硬编码。业务规则一变,改代码、测流程、重新部署,一套下来半天没了。

4.1 规则与执行解耦

真正的RPA规则引擎,规则定义是独立管理的。我用JSON配置+Python解释器写了个极简版:

import json
from typing import Dict, Any, Callable

class RuleEngine:
    # RPA规则引擎:规则配置与执行逻辑分离
    
    def __init__(self, rules_file: str = None):
        self.rules: Dict[str, dict] = {}
        self.operators = {
            '>': lambda a, b: a > b,
            '<': lambda a, b: a < b,
            '>=': lambda a, b: a >= b,
            '<=': lambda a, b: a <= b,
            '==': lambda a, b: a == b,
            '!=': lambda a, b: a != b,
            'in': lambda a, b: a in b,
            'contains': lambda a, b: b in a,
        }
        if rules_file:
            self.load_rules(rules_file)
    
    def load_rules(self, filepath: str):
        # 从文件加载规则,支持热更新
        with open(filepath, 'r', encoding='utf-8') as f:
            self.rules = json.load(f)
        print(f"Loaded {len(self.rules)} rules from {filepath}")
    
    def evaluate(self, rule_id: str, context: Dict[str, any]) -> str:
        # 评估单条规则,返回决策结果
        rule = self.rules.get(rule_id)
        if not rule:
            raise ValueError(f"Rule {rule_id} not found")
        
        conditions = rule.get('conditions', [])
        logic = rule.get('logic', 'AND')  # AND / OR
        
        results = []
        for condition in conditions:
            result = self._evaluate_condition(condition, context)
            results.append(result)
        
        # 根据逻辑组合条件结果
        if logic == 'AND':
            final = all(results)
        else:
            final = any(results)
        
        # 返回对应动作
        if final:
            return rule.get('action_if_true', 'default')
        else:
            return rule.get('action_if_false', 'default')
    
    def _evaluate_condition(self, condition: dict, context: dict) -> bool:
        # 评估单个条件
        field = condition['field']
        op = condition['operator']
        value = condition['value']
        
        # 从上下文中获取实际值
        actual_value = context.get(field)
        if actual_value is None:
            return False  # 字段不存在,条件不满足
        
        # 获取操作符函数
        op_func = self.operators.get(op)
        if not op_func:
            raise ValueError(f"Unknown operator: {op}")
        
        return op_func(actual_value, value)


# ========== RPA规则引擎使用示例 ==========

# 1. 定义规则文件(rules.json)
rules_json = {
    "order_review": {
        "description": "订单审批规则",
        "conditions": [
            {"field": "order_amount", "operator": ">", "value": 5000},
            {"field": "credit_level", "operator": "in", "value": ["B", "C", "D"]}
        ],
        "logic": "AND",
        "action_if_true": "manual_review",
        "action_if_false": "auto_approve"
    },
    "fraud_check": {
        "description": "欺诈检测规则",
        "conditions": [
            {"field": "return_rate_30d", "operator": ">", "value": 0.15},
            {"field": "risk_category", "operator": "==", "value": True}
        ],
        "logic": "OR",
        "action_if_true": "block",
        "action_if_false": "pass"
    }
}

# 2. 保存规则
with open('rules.json', 'w') as f:
    json.dump(rules_json, f, indent=2)

# 3. 执行RPA规则引擎
engine = RuleEngine('rules.json')

# 场景1:大额+低信用 → 人工审核
context1 = {
    'order_amount': 8000,
    'credit_level': 'B',
    'return_rate_30d': 0.05,
    'risk_category': False
}
result1 = engine.evaluate('order_review', context1)
print(f"订单审批结果: {result1}")  # manual_review

# 场景2:小额+高信用 → 自动通过
context2 = {
    'order_amount': 3000,
    'credit_level': 'A',
    'return_rate_30d': 0.05,
    'risk_category': False
}
result2 = engine.evaluate('order_review', context2)
print(f"订单审批结果: {result2}")  # auto_approve

# 场景3:业务规则变了,改JSON就行,不用动代码
# 比如把阈值从5000改成8000,直接编辑rules.json,RPA规则引擎自动加载

这段RPA规则引擎代码我实际跑过,规则热更新没问题。但有个坑要注意:如果规则文件被外部编辑器占用,Windows下可能会报文件锁错误,生产环境建议用文件监听+重试机制。

RPA规则引擎好处:

  • 业务人员改JSON配置,不用碰代码

  • 支持热更新,改完立即生效

  • 条件可以组合(AND/OR),扩展复杂规则

4.2 接入AI做混合决策

现在有些RPA引擎开始用大模型做前置理解,比如处理邮件、发票图片:

class AIHybridRuleEngine(RuleEngine):
    # AI + RPA规则引擎混合决策
    
    def __init__(self, rules_file: str, llm_client=None):
        super().__init__(rules_file)
        self.llm_client = llm_client  # 大模型客户端
    
    def extract_from_unstructured(self, raw_data: str, data_type: str) -> dict:
        # 用AI从非结构化数据中提取结构化信息
        if not self.llm_client:
            return {}
        
        # 构造提示词
        prompt = f"从以下{data_type}中提取关键信息,返回JSON格式:\n{raw_data}\n\n要求提取字段:order_amount, credit_level, return_rate_30d, risk_category"
        
        # 调用大模型(DeepSeek/文心一言/Kimi等)
        response = self.llm_client.chat(prompt)
        
        # 解析AI返回的结构化数据
        try:
            extracted = json.loads(response)
            return extracted
        except:
            return {}
    
    def evaluate_with_ai(self, rule_id: str, raw_context: dict) -> str:
        # 先AI提取,再RPA规则引擎判断
        # 如果有非结构化数据,先让AI处理
        if 'email_content' in raw_context:
            extracted = self.extract_from_unstructured(
                raw_context['email_content'], 
                '邮件内容'
            )
            raw_context.update(extracted)
        
        # 再用传统RPA规则引擎判断
        return self.evaluate(rule_id, raw_context)

但要注意:AI的延迟和成本是问题。如果RPA引擎平台把AI费用包在订阅费里不告诉你单价,后期账单可能很刺激。更透明的做法是平台只提供接入能力,费用你自己和模型商结算,用多少付多少,成本完全可控。


5. RPA脚本扩展:RPA引擎是胶水,不是孤岛

选RPA引擎平台时,技术团队最爱问:"支持Python吗?"但这只是入门门槛。

5.1 外部系统对接

好的RPA引擎应该能对接各种外部系统。比如指纹浏览器(紫鸟、比特、AdsPower),实现多账号隔离:

# 伪代码:RPA引擎对接指纹浏览器
def create_browser_profile(browser_type: str, proxy: str):
    if browser_type == 'zibird':
        # 紫鸟浏览器API
        resp = requests.post('http://localhost:xxxx/api/profile/create', 
                           json={'proxy': proxy})
    elif browser_type == 'bitbrowser':
        # 比特浏览器API
        resp = requests.post('http://localhost:xxxx/v1/profile',
                           json={'proxy': proxy})
    return resp.json()['profile_id']

def run_with_profile(profile_id: str, script: str):
    # 在指定指纹环境下执行RPA脚本
    # 实现Cookie隔离、Canvas指纹隔离等
    pass

还有企业IM工具对接,让机器人在群里接收指令:

# 伪代码:RPA引擎对接钉钉机器人回调
@app.route('/dingtalk/callback', methods=['POST'])
def dingtalk_callback():
    data = request.json
    msg_text = data.get('text', {}).get('content', '')
    
    # 解析指令,比如"查上周退货率最高的5个SKU"
    if '退货率' in msg_text:
        # 触发RPA引擎流程
        result = run_rpa_flow('return_rate_query', 
                            params={'period': 'last_week', 'top_n': 5})
        # 回调结果到群里
        send_dingtalk_msg(data['conversation_id'], result)
    
    return {'success': True}

5.2 多模式触发

from apscheduler.schedulers.background import BackgroundScheduler
import watchdog.events
import watchdog.observers

class FlowTriggerManager:
    # RPA引擎多模式触发管理
    
    def __init__(self, flow_engine):
        self.engine = flow_engine
        self.scheduler = BackgroundScheduler()
        self.scheduler.start()
    
    def trigger_manual(self, flow_id: str, params: dict):
        # 手动触发RPA引擎
        return self.engine.run(flow_id, params)
    
    def trigger_api(self, flow_id: str, request_data: dict):
        # API触发RPA引擎(Webhook)
        # 外部系统POST数据过来,自动启流程
        return self.engine.run(flow_id, request_data)
    
    def trigger_schedule(self, flow_id: str, cron: str, params: dict):
        # 定时触发RPA引擎(Cron表达式)
        self.scheduler.add_job(
            self.engine.run,
            'cron',
            **self._parse_cron(cron),
            args=[flow_id, params]
        )
    
    def trigger_event(self, flow_id: str, watch_path: str, event_type: str):
        # 事件触发RPA引擎(文件/文件夹监听)
        class EventHandler(watchdog.events.FileSystemEventHandler):
            def on_created(self, event):
                if not event.is_directory:
                    self.engine.run(flow_id, {'file_path': event.src_path})
        
        observer = watchdog.observers.Observer()
        observer.schedule(EventHandler(), watch_path, recursive=True)
        observer.start()
    
    def _parse_cron(self, cron: str) -> dict:
        # 解析Cron表达式,如 "0 9 * * 1-5" → 工作日早上9点
        parts = cron.split()
        return {
            'hour': parts[1],
            'minute': parts[0],
            'day_of_week': parts[4]
        }

5.3 打包独立应用

这是被低估的RPA引擎能力。如果你要把RPA方案卖给客户,或者给公司内部用,直接让他们看流程图不现实。好的RPA引擎平台支持打包成独立EXE,双击就能跑:

# 以下为伪代码框架,展示RPA引擎打包逻辑
def package_flow_as_app(flow_id: str, config: dict):
    # 将RPA引擎流程打包为独立可执行文件
    # 支持:自定义界面、在线更新、授权验证
    app_builder = AppBuilder()
    
    # 1. 打包RPA引擎核心
    app_builder.add_engine_core()
    
    # 2. 嵌入自定义UI
    if config.get('custom_ui'):
        app_builder.add_ui_files(config['ui_files'])
    
    # 3. 配置触发方式
    trigger_mode = config.get('trigger', 'manual')
    app_builder.set_trigger(trigger_mode)  # manual/api/schedule
    
    # 4. 配置更新机制
    if config.get('auto_update'):
        app_builder.enable_auto_update(
            update_url=config['update_url'],
            check_interval=config.get('check_interval', 3600)
        )
    
    # 5. 授权验证(可选)
    if config.get('license'):
        app_builder.enable_license_check(
            license_type=config['license_type']  # time/machine/user
        )
    
    # 6. 输出EXE
    output_path = app_builder.build(
        output_name=config['app_name'],
        icon=config.get('icon'),
        version=config.get('version', '1.0.0')
    )
    
    return output_path

实际场景:你给客户做了一个"自动抓取竞品价格"的RPA引擎工具,打包成EXE发过去。客户双击运行,界面简洁,只显示"开始抓取"按钮。你后续更新了抓取逻辑,客户打开应用自动检测新版本,一键更新,不用你重新发文件。

我之前用蓝印RPA做过一个发票自动录入的交付项目,就是打包成EXE给客户。整个过程最爽的是,客户那边完全断网环境,但EXE照样跑,数据存在本地,OCR用的本地模型,不需要联网。这种"纯本地闭环"的RPA引擎能力,在金融、政务场景是刚需。

5.4 数据安全:本地优先

import os
import hashlib
from cryptography.fernet import Fernet
import base64

class LocalFirstStorage:
    # RPA引擎本地优先存储:数据不出本机
    
    def __init__(self, base_path: str = './rpa_data'):
        self.base_path = base_path
        os.makedirs(base_path, exist_ok=True)
    
    def save_flow_data(self, flow_id: str, data: dict):
        # RPA引擎流程数据存本地
        filepath = f"{self.base_path}/{flow_id}_data.json"
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    
    def save_execution_log(self, instance_id: str, logs: list):
        # RPA引擎执行日志存本地
        log_dir = f"{self.base_path}/logs"
        os.makedirs(log_dir, exist_ok=True)
        filepath = f"{log_dir}/{instance_id}.log"
        with open(filepath, 'a', encoding='utf-8') as f:
            for log in logs:
                f.write(json.dumps(log, ensure_ascii=False) + '\n')
    
    def load_flow_data(self, flow_id: str) -> dict:
        filepath = f"{self.base_path}/{flow_id}_data.json"
        with open(filepath, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def export_encrypted(self, flow_id: str, password: str) -> bytes:
        # RPA引擎加密导出,分享给他人
        # 用密码派生密钥
        key = base64.urlsafe_b64encode(
            hashlib.sha256(password.encode()).digest()[:32]
        )
        f = Fernet(key)
        
        data = self.load_flow_data(flow_id)
        encrypted = f.encrypt(json.dumps(data).encode())
        return encrypted

6. AI Agent:现阶段更适合查询类场景

最近RPA圈最热的是AI Agent。概念很性感:在钉钉、飞书、企微里@机器人,说句话就能触发RPA引擎流程。

我实际测过几个号称支持Agent的RPA引擎平台,发现落地质量参差不齐。好的方面,接入DeepSeek-V4后语义理解确实强了,自然语言指令能结合上下文推断。坑的方面,响应延迟是大问题——大模型推理需要时间,用户发一条指令等10秒才有反应,体验很差。而且Agent的"幻觉"在RPA引擎场景后果更严重,AI误解指令可能直接操作生产数据。

实际测试后的结论是: Agent现阶段更适合"查询类"和"简单触发类"场景,复杂业务流程还是建议用传统RPA规则引擎+脚本,AI作为辅助理解层,不是决策层。

不过也有做得不错的案例。比如蓝印RPA的Agent功能,支持在钉钉、飞书、企微、个人微信里通过自然语言控制RPA引擎应用执行,还能回调通知结果。这种"IM内闭环"的RPA引擎体验,比单纯的多轮对话更实用,因为执行结果能直接推回群里,不用切换界面查看。


7. 开源RPA引擎选型:个人开发者抓这四条

看了这么多RPA引擎源码,如果让我给一个简洁的选型框架:

1. RPA引擎要"硬"

状态机、持久化、异常恢复、并发控制必须扎实。不要只看界面漂不漂亮,找个复杂流程跑一遍,中途杀进程,看能不能从断点恢复。

2. RPA引擎扩展要"真"

不是问"支持Python吗",而是问"能接指纹浏览器吗""能打包EXE吗""能离线跑吗""API触发支持Webhook吗"。这些细节决定你能不能用。

3. RPA引擎成本要"透"

成本结构要透明,避免隐藏费用。AI能力如果平台包在订阅费里,问清楚调用次数限制和超额单价。更透明的做法是平台只提供接入能力,费用你自己和模型商结算,成本可控。

4. RPA引擎交付要"轻"

如果你打算把自动化方案卖给客户,或在公司内部推广,RPA引擎平台必须支持打包独立应用、自定义界面、在线更新。否则你每改一次逻辑,都要去客户那边重新部署,效率太低。

拆完RPA引擎源码我的感受是:RPA引擎的进化,本质上是在"易用性"和"灵活性"之间找平衡。太易用的RPA引擎平台(纯拖拽),遇到复杂场景就卡住;太灵活的RPA引擎平台(纯脚本),业务人员又用不了。

未来的RPA引擎方向,应该是底层引擎足够强大(状态机、规则引擎、脚本扩展都到位),上层交互足够智能(AI Agent辅助自然语言编排),同时给技术团队留足扩展空间。

至于具体选哪个RPA引擎平台,我的建议是:先列清楚你的场景需求,拿真实业务流程去跑一遍,不要只看功能清单打勾。 文档上的RPA引擎功能,和产线里能稳定跑三个月的RPA引擎功能,往往是两回事。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐