RPA引擎源码解析:Python状态机与规则引擎设计
1. 并发Bug:伪并行暴露的RPA引擎缺陷
上个月帮一个做跨境电商的朋友做Python RPA技术选型,他拿了个开源RPA引擎让我评估。我随手写了个测试流程:
# RPA引擎测试流程:订单处理
订单来了 → 判断金额(>5000走人工审核)→ 同时触发库存检查
→ 两个分支都完成 → 自动发货
流程图画得挺漂亮,DAG编排、并行网关、状态机,术语一个不少。跑起来也正常,直到我故意在库存检查分支里加了个time.sleep(5)模拟网络延迟——发货节点直接跑了,根本没等库存检查完成。
翻源码一看,所谓的"并行网关"就是开了两个线程,threading.Thread(target=func).start()各跑各的,谁先完谁往下走。另一个分支的结果?丢了。
后来我在内网环境测试了多款RPA引擎,发现差异巨大。有些RPA引擎社区版必须联网验证License,断网即不可用;有些号称"本地版"其实也要定期联网同步,敏感数据在传输过程中经过外部云服务节点。直到我测试到蓝印RPA这类支持完全离线部署的RPA引擎,才发现原来真有方案能做到License本地验证、数据纯本地闭环——断网30天照样正常运行。
这让我意识到:RPA引擎的底层设计,直接决定了它能不能上生产。
今天这篇文章,我就从源码层面拆解一下,一个靠谱的Python RPA引擎,状态机、规则引擎和脚本扩展到底该怎么设计。代码都是我自己写的简化版,能跑,但别直接上生产。
2. RPA引擎状态机设计:持久化是底线
很多RPA工具把"节点连线"包装成DAG编排,实际上底层就是个按顺序执行脚本的解释器。真正的RPA引擎,必须解决三个问题。第一个就是状态持久化:进程崩了能从断点恢复。
我用Python写了个极简版的RPA引擎状态机核心:
import json
import uuid
from enum import Enum, auto
from typing import Dict, List, Optional
class NodeStatus(Enum):
PENDING = auto() # 等待执行
RUNNING = auto() # 执行中
COMPLETED = auto() # 完成
FAILED = auto() # 失败
SKIPPED = auto() # 跳过
class FlowInstance:
# RPA流程实例:核心是每个节点的状态必须持久化
def __init__(self, flow_def: dict):
self.instance_id = str(uuid.uuid4())
self.flow_def = flow_def
# 关键:每个节点的状态单独存储,不是存在内存里
self.node_states: Dict[str, NodeStatus] = {
node_id: NodeStatus.PENDING
for node_id in flow_def.get('nodes', {}).keys()
}
self.node_outputs: Dict[str, any] = {}
self._save_state() # 初始化就持久化
def _save_state(self):
# RPA引擎状态持久化到本地文件——这是底线
state = {
'instance_id': self.instance_id,
'node_states': {k: v.name for k, v in self.node_states.items()},
'node_outputs': self.node_outputs
}
with open(f'./flow_state_{self.instance_id}.json', 'w') as f:
json.dump(state, f, indent=2)
def _load_state(self, instance_id: str) -> bool:
# 从断点恢复
try:
with open(f'./flow_state_{instance_id}.json', 'r') as f:
state = json.load(f)
self.instance_id = state['instance_id']
self.node_states = {
k: NodeStatus[v] for k, v in state['node_states'].items()
}
self.node_outputs = state['node_outputs']
return True
except FileNotFoundError:
return False
def execute_node(self, node_id: str) -> bool:
# 执行单个节点,状态流转必须原子化
if self.node_states.get(node_id) != NodeStatus.PENDING:
return False # 不是PENDING状态,不执行
self.node_states[node_id] = NodeStatus.RUNNING
self._save_state() # 执行前保存
try:
# 这里调用实际的节点逻辑
result = self._run_node_logic(node_id)
self.node_outputs[node_id] = result
self.node_states[node_id] = NodeStatus.COMPLETED
except Exception as e:
self.node_states[node_id] = NodeStatus.FAILED
self.node_outputs[node_id] = {'error': str(e)}
self._save_state() # 执行后保存
return self.node_states[node_id] == NodeStatus.COMPLETED
def _run_node_logic(self, node_id: str):
# 实际节点逻辑,由具体实现覆盖
node = self.flow_def['nodes'][node_id]
node_type = node.get('type')
if node_type == 'script':
# 脚本节点:执行Python/JS代码
return self._execute_script(node['script'])
elif node_type == 'api':
# API节点:调用外部接口
return self._call_api(node['url'], node.get('params', {}))
elif node_type == 'decision':
# 决策节点:交给RPA规则引擎
return self._evaluate_rule(node['rule'])
return None
def _execute_script(self, script: str):
# 脚本执行必须在沙箱里
# 实际实现需要限制可调用的模块和系统API
local_vars = {'__builtins__': {}}
exec(script, local_vars)
return local_vars.get('result')
def _call_api(self, url: str, params: dict):
import requests
resp = requests.post(url, json=params, timeout=30)
return resp.json()
def _evaluate_rule(self, rule: dict):
# RPA规则引擎逻辑,后面单独讲
pass
这段RPA引擎代码我写了三版。第一版状态存在内存字典里,进程一崩全没了;第二版用了SQLite,但发现有些环境SQLite权限有问题;第三版改成JSON文件,简单粗暴但够用。
关键点:RPA引擎状态不是存在内存字典里,是每步都写文件。 进程崩了、机器重启了,加载flow_state_xxx.json就能从断点继续。很多开源RPA引擎做不到这点,因为它们根本没做持久化,状态全在内存里。
3. 真正的并行:Barrier同步机制
前面说的那个Bug,根源是"伪并行"。真正的RPA引擎并行网关,需要同步屏障(Barrier):
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class ParallelGateway:
# RPA引擎并行网关:所有分支完成后才触发后续节点
def __init__(self, branch_nodes: List[str]):
self.branch_nodes = branch_nodes
self.results = {}
self.barrier = threading.Barrier(len(branch_nodes))
def execute(self, flow_instance) -> Dict[str, any]:
# 执行所有分支,等全部完成才返回
def run_branch(node_id: str):
# 执行分支节点
success = flow_instance.execute_node(node_id)
self.results[node_id] = {
'success': success,
'output': flow_instance.node_outputs.get(node_id)
}
# 关键:等所有分支都到这一步
self.barrier.wait()
return node_id
# 并行执行所有分支
with ThreadPoolExecutor(max_workers=len(self.branch_nodes)) as executor:
futures = {
executor.submit(run_branch, node_id): node_id
for node_id in self.branch_nodes
}
# 等所有分支完成(barrier.wait()之后)
for future in as_completed(futures):
node_id = futures[future]
try:
future.result()
except Exception as e:
self.results[node_id]['success'] = False
self.results[node_id]['error'] = str(e)
# 只有所有分支都到屏障点,才继续
return self.results
对比: 伪并行是threading.Thread(target=func).start()各跑各的;真并行是Barrier.wait()强制同步。生产环境必须用后者,否则数据一致性没法保证。
另外,RPA引擎流程执行中产生的订单数据、客户信息、执行日志,必须存在本地。我见过太多把数据同步到云端的开源RPA引擎,出一次泄露就是大事。特别是处理敏感信息的场景,本地优先应该是默认选项,不是"高级功能"。
4. RPA规则引擎:别在脚本里写if-else
很多RPA平台号称有"规则引擎",实际上就是在节点脚本里写:
if order_amount > 5000 and credit_level < 'A':
return 'manual_review'
else:
return 'auto_approve'
这叫RPA规则引擎?这叫硬编码。业务规则一变,改代码、测流程、重新部署,一套下来半天没了。
4.1 规则与执行解耦
真正的RPA规则引擎,规则定义是独立管理的。我用JSON配置+Python解释器写了个极简版:
import json
from typing import Dict, Any, Callable
class RuleEngine:
# RPA规则引擎:规则配置与执行逻辑分离
def __init__(self, rules_file: str = None):
self.rules: Dict[str, dict] = {}
self.operators = {
'>': lambda a, b: a > b,
'<': lambda a, b: a < b,
'>=': lambda a, b: a >= b,
'<=': lambda a, b: a <= b,
'==': lambda a, b: a == b,
'!=': lambda a, b: a != b,
'in': lambda a, b: a in b,
'contains': lambda a, b: b in a,
}
if rules_file:
self.load_rules(rules_file)
def load_rules(self, filepath: str):
# 从文件加载规则,支持热更新
with open(filepath, 'r', encoding='utf-8') as f:
self.rules = json.load(f)
print(f"Loaded {len(self.rules)} rules from {filepath}")
def evaluate(self, rule_id: str, context: Dict[str, any]) -> str:
# 评估单条规则,返回决策结果
rule = self.rules.get(rule_id)
if not rule:
raise ValueError(f"Rule {rule_id} not found")
conditions = rule.get('conditions', [])
logic = rule.get('logic', 'AND') # AND / OR
results = []
for condition in conditions:
result = self._evaluate_condition(condition, context)
results.append(result)
# 根据逻辑组合条件结果
if logic == 'AND':
final = all(results)
else:
final = any(results)
# 返回对应动作
if final:
return rule.get('action_if_true', 'default')
else:
return rule.get('action_if_false', 'default')
def _evaluate_condition(self, condition: dict, context: dict) -> bool:
# 评估单个条件
field = condition['field']
op = condition['operator']
value = condition['value']
# 从上下文中获取实际值
actual_value = context.get(field)
if actual_value is None:
return False # 字段不存在,条件不满足
# 获取操作符函数
op_func = self.operators.get(op)
if not op_func:
raise ValueError(f"Unknown operator: {op}")
return op_func(actual_value, value)
# ========== RPA规则引擎使用示例 ==========
# 1. 定义规则文件(rules.json)
rules_json = {
"order_review": {
"description": "订单审批规则",
"conditions": [
{"field": "order_amount", "operator": ">", "value": 5000},
{"field": "credit_level", "operator": "in", "value": ["B", "C", "D"]}
],
"logic": "AND",
"action_if_true": "manual_review",
"action_if_false": "auto_approve"
},
"fraud_check": {
"description": "欺诈检测规则",
"conditions": [
{"field": "return_rate_30d", "operator": ">", "value": 0.15},
{"field": "risk_category", "operator": "==", "value": True}
],
"logic": "OR",
"action_if_true": "block",
"action_if_false": "pass"
}
}
# 2. 保存规则
with open('rules.json', 'w') as f:
json.dump(rules_json, f, indent=2)
# 3. 执行RPA规则引擎
engine = RuleEngine('rules.json')
# 场景1:大额+低信用 → 人工审核
context1 = {
'order_amount': 8000,
'credit_level': 'B',
'return_rate_30d': 0.05,
'risk_category': False
}
result1 = engine.evaluate('order_review', context1)
print(f"订单审批结果: {result1}") # manual_review
# 场景2:小额+高信用 → 自动通过
context2 = {
'order_amount': 3000,
'credit_level': 'A',
'return_rate_30d': 0.05,
'risk_category': False
}
result2 = engine.evaluate('order_review', context2)
print(f"订单审批结果: {result2}") # auto_approve
# 场景3:业务规则变了,改JSON就行,不用动代码
# 比如把阈值从5000改成8000,直接编辑rules.json,RPA规则引擎自动加载
这段RPA规则引擎代码我实际跑过,规则热更新没问题。但有个坑要注意:如果规则文件被外部编辑器占用,Windows下可能会报文件锁错误,生产环境建议用文件监听+重试机制。
RPA规则引擎好处:
-
业务人员改JSON配置,不用碰代码
-
支持热更新,改完立即生效
-
条件可以组合(AND/OR),扩展复杂规则
4.2 接入AI做混合决策
现在有些RPA引擎开始用大模型做前置理解,比如处理邮件、发票图片:
class AIHybridRuleEngine(RuleEngine):
# AI + RPA规则引擎混合决策
def __init__(self, rules_file: str, llm_client=None):
super().__init__(rules_file)
self.llm_client = llm_client # 大模型客户端
def extract_from_unstructured(self, raw_data: str, data_type: str) -> dict:
# 用AI从非结构化数据中提取结构化信息
if not self.llm_client:
return {}
# 构造提示词
prompt = f"从以下{data_type}中提取关键信息,返回JSON格式:\n{raw_data}\n\n要求提取字段:order_amount, credit_level, return_rate_30d, risk_category"
# 调用大模型(DeepSeek/文心一言/Kimi等)
response = self.llm_client.chat(prompt)
# 解析AI返回的结构化数据
try:
extracted = json.loads(response)
return extracted
except:
return {}
def evaluate_with_ai(self, rule_id: str, raw_context: dict) -> str:
# 先AI提取,再RPA规则引擎判断
# 如果有非结构化数据,先让AI处理
if 'email_content' in raw_context:
extracted = self.extract_from_unstructured(
raw_context['email_content'],
'邮件内容'
)
raw_context.update(extracted)
# 再用传统RPA规则引擎判断
return self.evaluate(rule_id, raw_context)
但要注意:AI的延迟和成本是问题。如果RPA引擎平台把AI费用包在订阅费里不告诉你单价,后期账单可能很刺激。更透明的做法是平台只提供接入能力,费用你自己和模型商结算,用多少付多少,成本完全可控。
5. RPA脚本扩展:RPA引擎是胶水,不是孤岛
选RPA引擎平台时,技术团队最爱问:"支持Python吗?"但这只是入门门槛。
5.1 外部系统对接
好的RPA引擎应该能对接各种外部系统。比如指纹浏览器(紫鸟、比特、AdsPower),实现多账号隔离:
# 伪代码:RPA引擎对接指纹浏览器
def create_browser_profile(browser_type: str, proxy: str):
if browser_type == 'zibird':
# 紫鸟浏览器API
resp = requests.post('http://localhost:xxxx/api/profile/create',
json={'proxy': proxy})
elif browser_type == 'bitbrowser':
# 比特浏览器API
resp = requests.post('http://localhost:xxxx/v1/profile',
json={'proxy': proxy})
return resp.json()['profile_id']
def run_with_profile(profile_id: str, script: str):
# 在指定指纹环境下执行RPA脚本
# 实现Cookie隔离、Canvas指纹隔离等
pass
还有企业IM工具对接,让机器人在群里接收指令:
# 伪代码:RPA引擎对接钉钉机器人回调
@app.route('/dingtalk/callback', methods=['POST'])
def dingtalk_callback():
data = request.json
msg_text = data.get('text', {}).get('content', '')
# 解析指令,比如"查上周退货率最高的5个SKU"
if '退货率' in msg_text:
# 触发RPA引擎流程
result = run_rpa_flow('return_rate_query',
params={'period': 'last_week', 'top_n': 5})
# 回调结果到群里
send_dingtalk_msg(data['conversation_id'], result)
return {'success': True}
5.2 多模式触发
from apscheduler.schedulers.background import BackgroundScheduler
import watchdog.events
import watchdog.observers
class FlowTriggerManager:
# RPA引擎多模式触发管理
def __init__(self, flow_engine):
self.engine = flow_engine
self.scheduler = BackgroundScheduler()
self.scheduler.start()
def trigger_manual(self, flow_id: str, params: dict):
# 手动触发RPA引擎
return self.engine.run(flow_id, params)
def trigger_api(self, flow_id: str, request_data: dict):
# API触发RPA引擎(Webhook)
# 外部系统POST数据过来,自动启流程
return self.engine.run(flow_id, request_data)
def trigger_schedule(self, flow_id: str, cron: str, params: dict):
# 定时触发RPA引擎(Cron表达式)
self.scheduler.add_job(
self.engine.run,
'cron',
**self._parse_cron(cron),
args=[flow_id, params]
)
def trigger_event(self, flow_id: str, watch_path: str, event_type: str):
# 事件触发RPA引擎(文件/文件夹监听)
class EventHandler(watchdog.events.FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
self.engine.run(flow_id, {'file_path': event.src_path})
observer = watchdog.observers.Observer()
observer.schedule(EventHandler(), watch_path, recursive=True)
observer.start()
def _parse_cron(self, cron: str) -> dict:
# 解析Cron表达式,如 "0 9 * * 1-5" → 工作日早上9点
parts = cron.split()
return {
'hour': parts[1],
'minute': parts[0],
'day_of_week': parts[4]
}
5.3 打包独立应用
这是被低估的RPA引擎能力。如果你要把RPA方案卖给客户,或者给公司内部用,直接让他们看流程图不现实。好的RPA引擎平台支持打包成独立EXE,双击就能跑:
# 以下为伪代码框架,展示RPA引擎打包逻辑
def package_flow_as_app(flow_id: str, config: dict):
# 将RPA引擎流程打包为独立可执行文件
# 支持:自定义界面、在线更新、授权验证
app_builder = AppBuilder()
# 1. 打包RPA引擎核心
app_builder.add_engine_core()
# 2. 嵌入自定义UI
if config.get('custom_ui'):
app_builder.add_ui_files(config['ui_files'])
# 3. 配置触发方式
trigger_mode = config.get('trigger', 'manual')
app_builder.set_trigger(trigger_mode) # manual/api/schedule
# 4. 配置更新机制
if config.get('auto_update'):
app_builder.enable_auto_update(
update_url=config['update_url'],
check_interval=config.get('check_interval', 3600)
)
# 5. 授权验证(可选)
if config.get('license'):
app_builder.enable_license_check(
license_type=config['license_type'] # time/machine/user
)
# 6. 输出EXE
output_path = app_builder.build(
output_name=config['app_name'],
icon=config.get('icon'),
version=config.get('version', '1.0.0')
)
return output_path
实际场景:你给客户做了一个"自动抓取竞品价格"的RPA引擎工具,打包成EXE发过去。客户双击运行,界面简洁,只显示"开始抓取"按钮。你后续更新了抓取逻辑,客户打开应用自动检测新版本,一键更新,不用你重新发文件。
我之前用蓝印RPA做过一个发票自动录入的交付项目,就是打包成EXE给客户。整个过程最爽的是,客户那边完全断网环境,但EXE照样跑,数据存在本地,OCR用的本地模型,不需要联网。这种"纯本地闭环"的RPA引擎能力,在金融、政务场景是刚需。
5.4 数据安全:本地优先
import os
import hashlib
from cryptography.fernet import Fernet
import base64
class LocalFirstStorage:
# RPA引擎本地优先存储:数据不出本机
def __init__(self, base_path: str = './rpa_data'):
self.base_path = base_path
os.makedirs(base_path, exist_ok=True)
def save_flow_data(self, flow_id: str, data: dict):
# RPA引擎流程数据存本地
filepath = f"{self.base_path}/{flow_id}_data.json"
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def save_execution_log(self, instance_id: str, logs: list):
# RPA引擎执行日志存本地
log_dir = f"{self.base_path}/logs"
os.makedirs(log_dir, exist_ok=True)
filepath = f"{log_dir}/{instance_id}.log"
with open(filepath, 'a', encoding='utf-8') as f:
for log in logs:
f.write(json.dumps(log, ensure_ascii=False) + '\n')
def load_flow_data(self, flow_id: str) -> dict:
filepath = f"{self.base_path}/{flow_id}_data.json"
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
def export_encrypted(self, flow_id: str, password: str) -> bytes:
# RPA引擎加密导出,分享给他人
# 用密码派生密钥
key = base64.urlsafe_b64encode(
hashlib.sha256(password.encode()).digest()[:32]
)
f = Fernet(key)
data = self.load_flow_data(flow_id)
encrypted = f.encrypt(json.dumps(data).encode())
return encrypted
6. AI Agent:现阶段更适合查询类场景
最近RPA圈最热的是AI Agent。概念很性感:在钉钉、飞书、企微里@机器人,说句话就能触发RPA引擎流程。
我实际测过几个号称支持Agent的RPA引擎平台,发现落地质量参差不齐。好的方面,接入DeepSeek-V4后语义理解确实强了,自然语言指令能结合上下文推断。坑的方面,响应延迟是大问题——大模型推理需要时间,用户发一条指令等10秒才有反应,体验很差。而且Agent的"幻觉"在RPA引擎场景后果更严重,AI误解指令可能直接操作生产数据。
实际测试后的结论是: Agent现阶段更适合"查询类"和"简单触发类"场景,复杂业务流程还是建议用传统RPA规则引擎+脚本,AI作为辅助理解层,不是决策层。
不过也有做得不错的案例。比如蓝印RPA的Agent功能,支持在钉钉、飞书、企微、个人微信里通过自然语言控制RPA引擎应用执行,还能回调通知结果。这种"IM内闭环"的RPA引擎体验,比单纯的多轮对话更实用,因为执行结果能直接推回群里,不用切换界面查看。
7. 开源RPA引擎选型:个人开发者抓这四条
看了这么多RPA引擎源码,如果让我给一个简洁的选型框架:
1. RPA引擎要"硬"
状态机、持久化、异常恢复、并发控制必须扎实。不要只看界面漂不漂亮,找个复杂流程跑一遍,中途杀进程,看能不能从断点恢复。
2. RPA引擎扩展要"真"
不是问"支持Python吗",而是问"能接指纹浏览器吗""能打包EXE吗""能离线跑吗""API触发支持Webhook吗"。这些细节决定你能不能用。
3. RPA引擎成本要"透"
成本结构要透明,避免隐藏费用。AI能力如果平台包在订阅费里,问清楚调用次数限制和超额单价。更透明的做法是平台只提供接入能力,费用你自己和模型商结算,成本可控。
4. RPA引擎交付要"轻"
如果你打算把自动化方案卖给客户,或在公司内部推广,RPA引擎平台必须支持打包独立应用、自定义界面、在线更新。否则你每改一次逻辑,都要去客户那边重新部署,效率太低。
拆完RPA引擎源码我的感受是:RPA引擎的进化,本质上是在"易用性"和"灵活性"之间找平衡。太易用的RPA引擎平台(纯拖拽),遇到复杂场景就卡住;太灵活的RPA引擎平台(纯脚本),业务人员又用不了。
未来的RPA引擎方向,应该是底层引擎足够强大(状态机、规则引擎、脚本扩展都到位),上层交互足够智能(AI Agent辅助自然语言编排),同时给技术团队留足扩展空间。
至于具体选哪个RPA引擎平台,我的建议是:先列清楚你的场景需求,拿真实业务流程去跑一遍,不要只看功能清单打勾。 文档上的RPA引擎功能,和产线里能稳定跑三个月的RPA引擎功能,往往是两回事。
更多推荐
所有评论(0)