4.1 事件來源、處理流程與狀態邊界
事件系統的核心邊界是把「收到訊號」、「轉成事件」、「套用規則」、「更新狀態」與「輸出結果」拆開。每個邊界都應該有自己的型別與測試,否則一個 handler 或 worker 很快就會同時負責協定、驗證、去重、狀態與推送。
本章目標
學完本章後,你將能夠:
- 分辨 event source、normalizer、processor、repository、publisher 的責任
- 用 Go interface 表達元件能力,而不是表達資料夾模板
- 把外部格式限制在 adapter 內
- 讓狀態更新集中到 repository 或 state owner
- 用測試驗證每個邊界是否可替換
【觀察】事件流程容易被寫成一團
事件流程膨脹的常見原因是入口程式碼太方便。HTTP handler 可以 decode JSON、驗證欄位、查 map、送通知;worker 也可以讀 queue、判斷重複、更新狀態、寫 log。短期看起來直接,長期會讓每個入口都複製一套規則。
反模式示意:
1func handleCallback(w http.ResponseWriter, r *http.Request) {
2 var raw CallbackPayload
3 json.NewDecoder(r.Body).Decode(&raw)
4
5 if raw.ID == "" {
6 http.Error(w, "missing id", http.StatusBadRequest)
7 return
8 }
9
10 if seen[raw.ID] {
11 w.WriteHeader(http.StatusNoContent)
12 return
13 }
14
15 seen[raw.ID] = true
16 states[raw.AccountID] = "active"
17 hub.Broadcast(raw.AccountID, "active")
18}這段程式的問題是責任混在一起。HTTP 協定、輸入格式、去重策略、狀態更新與推送規則都被綁在同一個函式,任何一項改變都會影響整個入口。
【判讀】事件邊界應該按照責任切開
事件邊界的核心規則是每一層只知道自己必須知道的資訊。adapter 知道外部協定,normalizer 知道格式轉換,processor 知道事件規則,repository 知道狀態保存,publisher 知道輸出方式。
一個可維護的事件流程可以長這樣:
1HTTP / queue / timer
2 │
3 ▼
4 adapter
5 │ raw input
6 ▼
7 normalizer
8 │ DomainEvent
9 ▼
10 processor
11 │
12 ├── deduper
13 ├── repository
14 └── publisher這是依賴方向的要求。外部來源依賴內部事件模型;內部處理流程不依賴外部 raw payload。
【策略】先定義內部事件模型
內部事件模型的核心責任是提供穩定語意。不同來源可以有不同欄位名稱與時間格式,但進入 processor 前都應轉成同一種事件。
1type EventType string
2
3const (
4 EventNotificationCreated EventType = "notification.created"
5 EventAccountActivated EventType = "account.activated"
6 EventJobFinished EventType = "job.finished"
7)
8
9type EventSource string
10
11const (
12 SourceHTTPCallback EventSource = "http_callback"
13 SourceQueue EventSource = "queue"
14 SourceTimer EventSource = "timer"
15)
16
17type DomainEvent struct {
18 ID string
19 Source EventSource
20 Type EventType
21 SubjectID string
22 OccurredAt time.Time
23 ReceivedAt time.Time
24 Payload json.RawMessage
25}OccurredAt 是事件發生時間,ReceivedAt 是系統收到時間。這兩個欄位要分開,因為外部事件可能延遲送達;去重與排序通常看事件語意時間,操作監控通常看收到時間。
【執行】adapter 只負責外部格式
adapter 的核心責任是把外部輸入轉成內部事件或 command。它可以知道 JSON tag、HTTP status、queue ack、header,但不應直接修改狀態。
1type CallbackPayload struct {
2 EventID string `json:"event_id"`
3 AccountID string `json:"account_id"`
4 EventName string `json:"event_name"`
5 Timestamp string `json:"timestamp"`
6}
7
8type CallbackHandler struct {
9 processor *EventProcessor
10 now func() time.Time
11}
12
13func (h CallbackHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
14 var payload CallbackPayload
15 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
16 writeError(w, http.StatusBadRequest, "invalid_json")
17 return
18 }
19
20 event, err := NormalizeCallback(payload, h.now())
21 if err != nil {
22 writeError(w, http.StatusBadRequest, "invalid_event")
23 return
24 }
25
26 if err := h.processor.Process(r.Context(), event); err != nil {
27 writeError(w, http.StatusInternalServerError, "process_event_failed")
28 return
29 }
30
31 w.WriteHeader(http.StatusAccepted)
32}handler 的測試應該檢查 HTTP 行為與 normalize 錯誤對應。事件規則的測試不應透過 HTTP handler 才能執行,否則 processor 的變化會被協定細節干擾。
【執行】normalizer 負責轉換與基本合約
normalizer 的核心責任是把 raw input 轉成 DomainEvent,並拒絕語意不完整的資料。它是外部世界與內部模型的邊界。
1func NormalizeCallback(raw CallbackPayload, receivedAt time.Time) (DomainEvent, error) {
2 occurredAt, err := time.Parse(time.RFC3339, raw.Timestamp)
3 if err != nil {
4 return DomainEvent{}, fmt.Errorf("parse timestamp: %w", err)
5 }
6
7 event := DomainEvent{
8 ID: strings.TrimSpace(raw.EventID),
9 Source: SourceHTTPCallback,
10 Type: mapCallbackEventName(raw.EventName),
11 SubjectID: strings.TrimSpace(raw.AccountID),
12 OccurredAt: occurredAt,
13 ReceivedAt: receivedAt,
14 }
15
16 if err := event.Validate(); err != nil {
17 return DomainEvent{}, err
18 }
19 return event, nil
20}
21
22func (e DomainEvent) Validate() error {
23 if e.ID == "" {
24 return fmt.Errorf("event id is required")
25 }
26 if e.Type == "" {
27 return fmt.Errorf("event type is required")
28 }
29 if e.SubjectID == "" {
30 return fmt.Errorf("subject id is required")
31 }
32 if e.OccurredAt.IsZero() {
33 return fmt.Errorf("occurred at is required")
34 }
35 if e.ReceivedAt.IsZero() {
36 return fmt.Errorf("received at is required")
37 }
38 return nil
39}validation 應該保護 envelope 的必要欄位。更細的 payload 規則可以放在特定事件的 normalizer 或 processor,避免 Validate 變成所有事件的巨大規則表。
【執行】processor 負責事件規則
processor 的核心責任是套用內部事件規則。它可以驗證、去重、更新狀態、寫入事件紀錄、呼叫 publisher,但不應知道 HTTP body 或 queue message 的原始格式。
1type EventRepository interface {
2 Apply(ctx context.Context, event DomainEvent) error
3}
4
5type Deduper interface {
6 Seen(ctx context.Context, event DomainEvent) (bool, error)
7}
8
9type Publisher interface {
10 Publish(ctx context.Context, event DomainEvent) error
11}
12
13type EventProcessor struct {
14 deduper Deduper
15 repository EventRepository
16 publisher Publisher
17}
18
19func (p *EventProcessor) Process(ctx context.Context, event DomainEvent) error {
20 if err := event.Validate(); err != nil {
21 return err
22 }
23
24 duplicated, err := p.deduper.Seen(ctx, event)
25 if err != nil {
26 return fmt.Errorf("dedup event: %w", err)
27 }
28 if duplicated {
29 return nil
30 }
31
32 if err := p.repository.Apply(ctx, event); err != nil {
33 return fmt.Errorf("apply event: %w", err)
34 }
35
36 if err := p.publisher.Publish(ctx, event); err != nil {
37 return fmt.Errorf("publish event: %w", err)
38 }
39
40 return nil
41}這個 processor 依賴能力介面,不依賴具體實作。Go 的 implicit interface 讓 memory repository、database repository 或測試 fake 都可以自然接上。
【判讀】publisher 失敗策略必須明確
publisher 的核心問題是「輸出失敗是否影響狀態成功」。即時通知、審計紀錄、外部 webhook 的可靠性要求不同,不能一律用同一個錯誤策略。
常見策略:
| 輸出類型 | 失敗策略 | 適用情境 |
|---|---|---|
| 即時 UI 推送 | 記錄錯誤,可允許狀態已更新 | 客戶端可重新查詢最新狀態 |
| 事件紀錄 | 失敗時中止流程 | 紀錄是不可遺失的資料 |
| 外部 webhook | 寫入 outbox,稍後重試 | 下游需要可靠接收 |
若 repository.Apply 成功但 publisher.Publish 失敗,系統必須知道這是可接受的降級,還是需要重試與補償。這個決策應該寫在 processor 或 usecase 的設計裡,不應藏在 publisher implementation。
【測試】每個邊界分開測
事件邊界的測試目標是讓錯誤定位清楚。normalizer 測 raw input 轉換,processor 測規則順序,repository 測狀態一致性,publisher 測輸出協定。
processor fake test 範例:
1func TestProcessorSkipsDuplicateEvent(t *testing.T) {
2 processor := EventProcessor{
3 deduper: fakeDeduper{duplicated: true},
4 repository: &fakeRepository{},
5 publisher: &fakePublisher{},
6 }
7
8 err := processor.Process(context.Background(), DomainEvent{
9 ID: "evt_1",
10 Type: EventAccountActivated,
11 SubjectID: "acct_1",
12 OccurredAt: time.Now(),
13 ReceivedAt: time.Now(),
14 })
15 if err != nil {
16 t.Fatalf("process event: %v", err)
17 }
18
19 if processor.repository.(*fakeRepository).applied {
20 t.Fatalf("duplicate event should not update repository")
21 }
22}這種測試不需要 HTTP server。它直接驗證 processor 的規則:重複事件不應更新狀態,也不應送出推送。
本章不處理
本章先處理單一 Go 服務內的事件來源與處理邊界;分散式一致性與 event sourcing,會在下列章節再往外延伸:
和 Go 教材的關係
這一章承接的是 action、event、repository 與 publisher 的邊界;如果你要先回看語言教材,可以讀:
小結
事件系統的可維護性來自清楚邊界:adapter 處理外部格式,normalizer 建立內部事件,processor 套用規則,repository 擁有狀態,publisher 輸出結果。當每個元件只承擔一種責任時,新增來源、新增事件或替換儲存實作都會變成局部修改。