用 Go 实现一个轻量级事件总线,解耦智能工作流

做智能工作流系统时,最麻烦的是业务逻辑太散。如果每个节点(比如大模型调用、数据清洗、发通知)都直接硬编码调用,代码很快就会纠缠在一起。与其在核心业务里写满 if-else 和同步调用,不如在中间加一层事件总线。让节点通过发布 - 订阅模式异步通信,后续加新功能时,不用动旧代码。

一、为什么同步调用会卡住系统

传统写法通常是一个主函数按顺序调完所有模块。比如工单处理完了,顺便调个接口把摘要发到企业微信。一旦要加新功能,就得打开核心文件改代码。

这种写法有两个硬伤:

  1. 耦合太重:违背了开闭原则,改一个地方可能影响全局。
  2. 阻塞风险:下游通知模块如果网络超时,会直接拖慢主业务的响应速度。

对于长周期的 AI 任务,通常需要支持异步回调和插件热插拔。如果没有一个抽象的事件中枢,代码很容易变成难以维护的“意大利面”。

二、事件驱动的工作流逻辑

解耦的核心思路是:节点执行完后,把结果扔进事件总线,不用管谁在听。

架构逻辑大致如下:

  1. 发布者(如工单节点)发布 order.created 事件。
  2. 事件总线收到后,分发给所有订阅该主题的节点。
  3. 订阅者(如分类器、存储库、通知服务)处理完后,如果需要,可以继续发布新事件(如 ticket.processed)。
  4. 插件管理器可以动态注册或注销订阅者,无需重启服务。

新增业务节点时,只需要向总线注册订阅,完全不用修改发布者的代码。

三、Go 语言实现:轻量级并发事件总线

为了保持单机高性能,这里用 Go 标准库实现了一个简单的并发安全事件总线。它支持基于 Topic 的发布订阅、带缓冲的 Channel、以及安全的动态注销。没有引入 RabbitMQ 或 Kafka 等外部中间件,适合单机场景。

package bus

import (
	"context"
	"errors"
	"sync"
)

// Event 结构体定义了传递的数据实体
type Event struct {
	Topic string
	Data  interface{}
}

// EventChannel 是订阅者接收事件的通道
type EventChannel chan Event

// EventBus 管理所有的主题订阅关系
type EventBus struct {
	subscribers map[string][]EventChannel
	mu          sync.RWMutex
}

// NewEventBus 初始化事件总线
func NewEventBus() *EventBus {
	return &EventBus{
		subscribers: make(map[string][]EventChannel),
	}
}

// Subscribe 注册对某个 Topic 的订阅,返回一个接收通道
func (eb *EventBus) Subscribe(topic string, bufferSize int) EventChannel {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	ch := make(EventChannel, bufferSize)
	eb.subscribers[topic] = append(eb.subscribers[topic], ch)
	return ch
}

// Unsubscribe 注销订阅,安全关闭通道并释放资源
func (eb *EventBus) Unsubscribe(topic string, ch EventChannel) error {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	subs, ok := eb.subscribers[topic]
	if !ok {
		return errors.New("topic not found")
	}

	for i, sub := range subs {
		if sub == ch {
			// 安全关闭通道防止向已关闭通道发送导致的 panic
			close(ch)
			// 从切片中移除该通道
			eb.subscribers[topic] = append(subs[:i], subs[i+1:]...)
			return nil
		}
	}
	return errors.New("subscriber channel not found in this topic")
}

// Publish 异步广播事件,支持 context 超时控制防止下游阻塞拖垮总线
func (eb *EventBus) Publish(ctx context.Context, event Event) {
	eb.mu.RLock()
	subs, ok := eb.subscribers[event.Topic]
	eb.mu.RUnlock()

	if !ok {
		return // 当前没有订阅者
	}

	var wg sync.WaitGroup
	for _, sub := range subs {
		wg.Add(1)
		go func(ch EventChannel) {
			defer wg.Done()
			select {
			case <-ctx.Done():
				// 上下文超时或被撤销,放弃发送防止 goroutine 挂起泄漏
				return
			case ch <- event:
				// 事件成功写入缓冲通道
			}
		}(sub)
	}
	
	// 等待本次广播的所有并发分发 goroutine 结束
	wg.Wait()
}

四、实际落地时的几个坑

直接用事件总线虽然方便,但有几个问题需要自己处理:

  1. 顺序问题:并发处理时,后发布的事件可能先执行完。如果业务强依赖顺序(比如先扣款后发货),需要在 Event 里加序列号,让订阅者自己排序。
  2. 背压(Backpressure):如果某个订阅者(比如调大模型 API)处理太慢,它的 Channel 缓冲区会满。这时候需要设置合理的 buffer 大小,或者在 Publish 时加超时控制,直接丢弃或重试,别让慢节点拖死整个系统。
  3. 消息丢失:被丢弃或消费失败的事件不能直接消失。建议做个死信队列(Dead Letter Queue),把失败的消息存起来,方便后续人工排查或重放。

五、小结

这套方案的核心就是“解耦”。在智能工作流底层加一层并发安全的发布 - 订阅机制,配合超时控制,能大幅降低代码的维护成本。后续加新功能时,只需要注册新订阅者,不用动老代码,系统扩展起来也顺手。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐