事件系統的核心邊界是把「收到訊號」、「轉成事件」、「套用規則」、「更新狀態」與「輸出結果」拆開。每個邊界都應該有自己的型別與測試,否則一個 handler 或 worker 很快就會同時負責協定、驗證、去重、狀態與推送。

本章目標

學完本章後,你將能夠:

  1. 分辨 event source、normalizer、processor、repository、publisher 的責任
  2. 用 Go interface 表達元件能力,而不是表達資料夾模板
  3. 把外部格式限制在 adapter 內
  4. 讓狀態更新集中到 repository 或 state owner
  5. 用測試驗證每個邊界是否可替換

【觀察】事件流程容易被寫成一團

事件流程膨脹的常見原因是入口程式碼太方便。HTTP handler 可以 decode JSON、驗證欄位、查 map、送通知;worker 也可以讀 queue、判斷重複、更新狀態、寫 log。短期看起來直接,長期會讓每個入口都複製一套規則。

反模式示意:

 1func handleCallback(w http.ResponseWriter, r *http.Request) {
 2    var raw CallbackPayload
 3    json.NewDecoder(r.Body).Decode(&raw)
 4
 5    if raw.ID == "" {
 6        http.Error(w, "missing id", http.StatusBadRequest)
 7        return
 8    }
 9
10    if seen[raw.ID] {
11        w.WriteHeader(http.StatusNoContent)
12        return
13    }
14
15    seen[raw.ID] = true
16    states[raw.AccountID] = "active"
17    hub.Broadcast(raw.AccountID, "active")
18}

這段程式的問題是責任混在一起。HTTP 協定、輸入格式、去重策略、狀態更新與推送規則都被綁在同一個函式,任何一項改變都會影響整個入口。

【判讀】事件邊界應該按照責任切開

事件邊界的核心規則是每一層只知道自己必須知道的資訊。adapter 知道外部協定,normalizer 知道格式轉換,processor 知道事件規則,repository 知道狀態保存,publisher 知道輸出方式。

一個可維護的事件流程可以長這樣:

 1HTTP / queue / timer
 2 3 4    adapter
 5        │ raw input
 6 7   normalizer
 8        │ DomainEvent
 910   processor
1112        ├── deduper
13        ├── repository
14        └── publisher

這是依賴方向的要求。外部來源依賴內部事件模型;內部處理流程不依賴外部 raw payload。

【策略】先定義內部事件模型

內部事件模型的核心責任是提供穩定語意。不同來源可以有不同欄位名稱與時間格式,但進入 processor 前都應轉成同一種事件。

 1type EventType string
 2
 3const (
 4    EventNotificationCreated EventType = "notification.created"
 5    EventAccountActivated    EventType = "account.activated"
 6    EventJobFinished         EventType = "job.finished"
 7)
 8
 9type EventSource string
10
11const (
12    SourceHTTPCallback EventSource = "http_callback"
13    SourceQueue        EventSource = "queue"
14    SourceTimer        EventSource = "timer"
15)
16
17type DomainEvent struct {
18    ID         string
19    Source     EventSource
20    Type       EventType
21    SubjectID  string
22    OccurredAt time.Time
23    ReceivedAt time.Time
24    Payload    json.RawMessage
25}

OccurredAt 是事件發生時間,ReceivedAt 是系統收到時間。這兩個欄位要分開,因為外部事件可能延遲送達;去重與排序通常看事件語意時間,操作監控通常看收到時間。

【執行】adapter 只負責外部格式

adapter 的核心責任是把外部輸入轉成內部事件或 command。它可以知道 JSON tag、HTTP status、queue ack、header,但不應直接修改狀態。

 1type CallbackPayload struct {
 2    EventID   string `json:"event_id"`
 3    AccountID string `json:"account_id"`
 4    EventName string `json:"event_name"`
 5    Timestamp string `json:"timestamp"`
 6}
 7
 8type CallbackHandler struct {
 9    processor *EventProcessor
10    now       func() time.Time
11}
12
13func (h CallbackHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
14    var payload CallbackPayload
15    if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
16        writeError(w, http.StatusBadRequest, "invalid_json")
17        return
18    }
19
20    event, err := NormalizeCallback(payload, h.now())
21    if err != nil {
22        writeError(w, http.StatusBadRequest, "invalid_event")
23        return
24    }
25
26    if err := h.processor.Process(r.Context(), event); err != nil {
27        writeError(w, http.StatusInternalServerError, "process_event_failed")
28        return
29    }
30
31    w.WriteHeader(http.StatusAccepted)
32}

handler 的測試應該檢查 HTTP 行為與 normalize 錯誤對應。事件規則的測試不應透過 HTTP handler 才能執行,否則 processor 的變化會被協定細節干擾。

【執行】normalizer 負責轉換與基本合約

normalizer 的核心責任是把 raw input 轉成 DomainEvent,並拒絕語意不完整的資料。它是外部世界與內部模型的邊界。

 1func NormalizeCallback(raw CallbackPayload, receivedAt time.Time) (DomainEvent, error) {
 2    occurredAt, err := time.Parse(time.RFC3339, raw.Timestamp)
 3    if err != nil {
 4        return DomainEvent{}, fmt.Errorf("parse timestamp: %w", err)
 5    }
 6
 7    event := DomainEvent{
 8        ID:         strings.TrimSpace(raw.EventID),
 9        Source:     SourceHTTPCallback,
10        Type:       mapCallbackEventName(raw.EventName),
11        SubjectID:  strings.TrimSpace(raw.AccountID),
12        OccurredAt: occurredAt,
13        ReceivedAt: receivedAt,
14    }
15
16    if err := event.Validate(); err != nil {
17        return DomainEvent{}, err
18    }
19    return event, nil
20}
21
22func (e DomainEvent) Validate() error {
23    if e.ID == "" {
24        return fmt.Errorf("event id is required")
25    }
26    if e.Type == "" {
27        return fmt.Errorf("event type is required")
28    }
29    if e.SubjectID == "" {
30        return fmt.Errorf("subject id is required")
31    }
32    if e.OccurredAt.IsZero() {
33        return fmt.Errorf("occurred at is required")
34    }
35    if e.ReceivedAt.IsZero() {
36        return fmt.Errorf("received at is required")
37    }
38    return nil
39}

validation 應該保護 envelope 的必要欄位。更細的 payload 規則可以放在特定事件的 normalizer 或 processor,避免 Validate 變成所有事件的巨大規則表。

【執行】processor 負責事件規則

processor 的核心責任是套用內部事件規則。它可以驗證、去重、更新狀態、寫入事件紀錄、呼叫 publisher,但不應知道 HTTP body 或 queue message 的原始格式。

 1type EventRepository interface {
 2    Apply(ctx context.Context, event DomainEvent) error
 3}
 4
 5type Deduper interface {
 6    Seen(ctx context.Context, event DomainEvent) (bool, error)
 7}
 8
 9type Publisher interface {
10    Publish(ctx context.Context, event DomainEvent) error
11}
12
13type EventProcessor struct {
14    deduper    Deduper
15    repository EventRepository
16    publisher  Publisher
17}
18
19func (p *EventProcessor) Process(ctx context.Context, event DomainEvent) error {
20    if err := event.Validate(); err != nil {
21        return err
22    }
23
24    duplicated, err := p.deduper.Seen(ctx, event)
25    if err != nil {
26        return fmt.Errorf("dedup event: %w", err)
27    }
28    if duplicated {
29        return nil
30    }
31
32    if err := p.repository.Apply(ctx, event); err != nil {
33        return fmt.Errorf("apply event: %w", err)
34    }
35
36    if err := p.publisher.Publish(ctx, event); err != nil {
37        return fmt.Errorf("publish event: %w", err)
38    }
39
40    return nil
41}

這個 processor 依賴能力介面,不依賴具體實作。Go 的 implicit interface 讓 memory repository、database repository 或測試 fake 都可以自然接上。

【判讀】publisher 失敗策略必須明確

publisher 的核心問題是「輸出失敗是否影響狀態成功」。即時通知、審計紀錄、外部 webhook 的可靠性要求不同,不能一律用同一個錯誤策略。

常見策略:

輸出類型失敗策略適用情境
即時 UI 推送記錄錯誤,可允許狀態已更新客戶端可重新查詢最新狀態
事件紀錄失敗時中止流程紀錄是不可遺失的資料
外部 webhook寫入 outbox,稍後重試下游需要可靠接收

repository.Apply 成功但 publisher.Publish 失敗,系統必須知道這是可接受的降級,還是需要重試與補償。這個決策應該寫在 processor 或 usecase 的設計裡,不應藏在 publisher implementation。

【測試】每個邊界分開測

事件邊界的測試目標是讓錯誤定位清楚。normalizer 測 raw input 轉換,processor 測規則順序,repository 測狀態一致性,publisher 測輸出協定。

processor fake test 範例:

 1func TestProcessorSkipsDuplicateEvent(t *testing.T) {
 2    processor := EventProcessor{
 3        deduper:    fakeDeduper{duplicated: true},
 4        repository: &fakeRepository{},
 5        publisher:  &fakePublisher{},
 6    }
 7
 8    err := processor.Process(context.Background(), DomainEvent{
 9        ID:         "evt_1",
10        Type:       EventAccountActivated,
11        SubjectID:  "acct_1",
12        OccurredAt: time.Now(),
13        ReceivedAt: time.Now(),
14    })
15    if err != nil {
16        t.Fatalf("process event: %v", err)
17    }
18
19    if processor.repository.(*fakeRepository).applied {
20        t.Fatalf("duplicate event should not update repository")
21    }
22}

這種測試不需要 HTTP server。它直接驗證 processor 的規則:重複事件不應更新狀態,也不應送出推送。

本章不處理

本章先處理單一 Go 服務內的事件來源與處理邊界;分散式一致性與 event sourcing,會在下列章節再往外延伸:

和 Go 教材的關係

這一章承接的是 action、event、repository 與 publisher 的邊界;如果你要先回看語言教材,可以讀:

小結

事件系統的可維護性來自清楚邊界:adapter 處理外部格式,normalizer 建立內部事件,processor 套用規則,repository 擁有狀態,publisher 輸出結果。當每個元件只承擔一種責任時,新增來源、新增事件或替換儲存實作都會變成局部修改。