事件去重重構的核心目標是把語義鍵、時間窗口與來源優先順序整理成可測規則。本章用一般事件處理流程說明如何降低重複邏輯,同時保留事件合併的判斷依據。

本章目標

學完本章後,你將能夠:

  1. 辨識 raw payload 去重的風險
  2. 用 domain dedup key 表達同一件事
  3. 把去重邏輯抽成 Deduper
  4. 設計時間窗口與 cleanup
  5. 測試同窗口、跨窗口、不同來源與過期清理

【觀察】重複事件通常先散落在入口層

去重邏輯重構的核心觸發點是多個入口開始各自判斷「這筆事件看過了嗎」。HTTP callback、queue consumer、background worker 或 WebSocket action 都可能收到同一件事,若每個入口各自去重,規則很快會不一致。

重構前常見寫法:

 1var seenHTTPEvents = map[string]bool{}
 2
 3func handleCallback(w http.ResponseWriter, r *http.Request) {
 4    var raw RawNotificationCallback
 5    _ = json.NewDecoder(r.Body).Decode(&raw)
 6
 7    key := raw.NotificationID + ":" + raw.EventName + ":" + raw.Timestamp
 8    if seenHTTPEvents[key] {
 9        w.WriteHeader(http.StatusOK)
10        return
11    }
12    seenHTTPEvents[key] = true
13
14    // update state...
15}

worker 裡又有另一套:

 1var seenWorkerEvents = map[string]time.Time{}
 2
 3func handleWorkerUpdate(update RawNotificationUpdate) {
 4    key := update.ID
 5    if _, ok := seenWorkerEvents[key]; ok {
 6        return
 7    }
 8    seenWorkerEvents[key] = time.Now()
 9
10    // update state...
11}

這兩段程式都在去重,但依據不同。一個用 notification ID、event name、timestamp;另一個用 raw event ID。當兩個來源描述同一件 domain event 時,它們無法互相辨識。

【判讀】raw payload 不適合當去重依據

raw payload 去重的核心問題是來源格式不是 domain 語意。不同來源可能使用不同欄位名稱、timestamp 精度、metadata 或 request ID,但仍然描述同一件事。

容易造成誤判的欄位:

欄位問題
request ID每次重送都可能不同
received timestamp取決於系統收到時間,不是發生時間
raw payload hash欄位順序或 metadata 變化會改變 hash
source-specific ID不同來源可能沒有共同 ID
debug metadata不代表事件語意

去重應該發生在 normalized DomainEvent 上,而不是 raw HTTP body、queue message 或 worker update 上。

【策略】domain dedup key 表達同一件事

domain dedup key 的核心責任是回答「哪兩筆事件應該被視為同一件 domain fact」。常見欄位是 subject kind、subject ID、event type 與時間窗口。

 1type DedupKey struct {
 2    SubjectKind SubjectKind
 3    SubjectID   string
 4    EventType   EventType
 5    Window      int64
 6}
 7
 8func NewDedupKey(event DomainEvent, window time.Duration) DedupKey {
 9    return DedupKey{
10        SubjectKind: event.SubjectKind,
11        SubjectID:   event.SubjectID,
12        EventType:   event.Type,
13        Window:      event.OccurredAt.UnixNano() / int64(window),
14    }
15}

這個 key 不包含 Source,因為不同來源可能送來同一件事。是否包含 source 是一個 domain 決策:如果不同來源代表不同事實,就包含;如果不同來源只是同一事實的不同通道,就不要包含。

時間窗口是容忍來源時間差的折衷。窗口太小會漏掉重複事件,窗口太大可能合併兩件獨立事件。

【執行】抽出 Deduper

Deduper 的核心責任是保存已看過的 key,並回報目前事件是否重複。它不應知道 HTTP、WebSocket 或 queue,也不應更新狀態。

 1type Deduper struct {
 2    mu      sync.Mutex
 3    seen    map[DedupKey]time.Time
 4    window  time.Duration
 5    expires time.Duration
 6}
 7
 8func NewDeduper(window time.Duration, expires time.Duration) *Deduper {
 9    return &Deduper{
10        seen:    make(map[DedupKey]time.Time),
11        window:  window,
12        expires: expires,
13    }
14}

Seen 判斷是否看過:

 1func (d *Deduper) Seen(event DomainEvent) bool {
 2    d.mu.Lock()
 3    defer d.mu.Unlock()
 4
 5    key := NewDedupKey(event, d.window)
 6    if _, ok := d.seen[key]; ok {
 7        return true
 8    }
 9
10    d.seen[key] = event.ReceivedAt
11    return false
12}

這裡用 ReceivedAt 作為清理基準,因為清理是系統內部記憶體管理問題;去重 key 則用 OccurredAt,因為那是事件發生語意。兩個時間各有用途,不應混用。

【執行】processor 使用 Deduper

重構後的核心方向是讓所有來源先 normalize 成 DomainEvent,再交給同一個 processor 去重與套用規則。

 1type EventProcessor struct {
 2    deduper    *Deduper
 3    repository EventRepository
 4    publisher  Publisher
 5}
 6
 7func (p *EventProcessor) Process(ctx context.Context, event DomainEvent) error {
 8    if err := event.Validate(); err != nil {
 9        return fmt.Errorf("validate event: %w", err)
10    }
11
12    if p.deduper.Seen(event) {
13        return nil
14    }
15
16    if err := p.repository.Apply(ctx, event); err != nil {
17        return fmt.Errorf("apply event: %w", err)
18    }
19
20    return p.publisher.Publish(ctx, event)
21}

這個位置比 handler 或 worker 更適合去重,因為 processor 已經面對 normalized domain event。新增事件來源時,只要它走同一個 processor,就自然共用同一套去重規則。

【策略】來源優先順序要顯式化

來源優先順序的核心問題是重複事件不一定完全相同。有些來源即時但資料少,有些來源延遲但資料完整。若需要合併資料,就要把 priority rule 寫成可測規則。

先定義 priority:

 1func SourcePriority(source EventSource) int {
 2    switch source {
 3    case SourceHTTPCallback:
 4        return 100
 5    case SourceClientCommand:
 6        return 80
 7    case SourceTimer:
 8        return 50
 9    default:
10        return 0
11    }
12}

若 deduper 只需要判斷 seen,就不處理 priority。若系統需要「較高 priority 事件可以取代較低 priority 事件」,應把 deduper 改成更明確的 result:

1type DedupDecision int
2
3const (
4    DedupAccept DedupDecision = iota
5    DedupDrop
6    DedupReplace
7)

不要把 priority 規則藏在 if 裡。它是 domain policy,應該可以被直接測試。

【執行】cleanup 防止去重表無限成長

cleanup 的核心責任是移除過期 key,防止去重表變成記憶體 leak。只要 seen 是 map,就必須設計生命週期。

 1func (d *Deduper) Cleanup(now time.Time) {
 2    d.mu.Lock()
 3    defer d.mu.Unlock()
 4
 5    for key, seenAt := range d.seen {
 6        if now.Sub(seenAt) > d.expires {
 7            delete(d.seen, key)
 8        }
 9    }
10}

cleanup 可以由 background worker 定期呼叫:

 1func RunDeduperCleanup(ctx context.Context, deduper *Deduper, interval time.Duration, now func() time.Time) {
 2    ticker := time.NewTicker(interval)
 3    defer ticker.Stop()
 4
 5    for {
 6        select {
 7        case <-ctx.Done():
 8            return
 9        case <-ticker.C:
10            deduper.Cleanup(now())
11        }
12    }
13}

這裡注入 now 是為了測試。清理策略不應依賴測試中的真實等待。

【執行】同窗口去重測試

同窗口測試的核心目標是確認兩筆語意相同、時間接近的事件會共用 key。

 1func TestDeduperSeenSameWindow(t *testing.T) {
 2    deduper := NewDeduper(time.Minute, time.Hour)
 3    base := time.Date(2026, 4, 22, 10, 0, 0, 0, time.UTC)
 4
 5    first := DomainEvent{
 6        ID:          "evt_1",
 7        Type:        EventNotificationCreated,
 8        SubjectKind: SubjectNotification,
 9        SubjectID:   "ntf_1",
10        OccurredAt:  base,
11        ReceivedAt:  base,
12    }
13    second := first
14    second.ID = "evt_2"
15    second.ReceivedAt = base.Add(5 * time.Second)
16
17    if deduper.Seen(first) {
18        t.Fatalf("first event should not be duplicate")
19    }
20    if !deduper.Seen(second) {
21        t.Fatalf("second event in same window should be duplicate")
22    }
23}

這個測試刻意讓 ID 不同,證明去重依賴 domain key。

【執行】跨窗口不去重測試

跨窗口測試的核心目標是確認兩件不同時間窗口的事件不會被誤合併。

 1func TestDeduperSeenDifferentWindow(t *testing.T) {
 2    deduper := NewDeduper(time.Minute, time.Hour)
 3    base := time.Date(2026, 4, 22, 10, 0, 0, 0, time.UTC)
 4
 5    first := DomainEvent{
 6        ID:          "evt_1",
 7        Type:        EventNotificationCreated,
 8        SubjectKind: SubjectNotification,
 9        SubjectID:   "ntf_1",
10        OccurredAt:  base,
11        ReceivedAt:  base,
12    }
13    second := first
14    second.ID = "evt_2"
15    second.OccurredAt = base.Add(2 * time.Minute)
16    second.ReceivedAt = base.Add(2 * time.Minute)
17
18    _ = deduper.Seen(first)
19    if deduper.Seen(second) {
20        t.Fatalf("event in different window should not be duplicate")
21    }
22}

窗口大小是一個業務取捨,測試可以讓這個取捨變成明確規格。

【執行】cleanup 測試不應 sleep

cleanup 測試的核心目標是確認過期 key 會被移除。測試應直接傳入時間,不要真的等待過期。

 1func TestDeduperCleanup(t *testing.T) {
 2    deduper := NewDeduper(time.Minute, time.Minute)
 3    base := time.Date(2026, 4, 22, 10, 0, 0, 0, time.UTC)
 4
 5    event := DomainEvent{
 6        ID:          "evt_1",
 7        Type:        EventNotificationCreated,
 8        SubjectKind: SubjectNotification,
 9        SubjectID:   "ntf_1",
10        OccurredAt:  base,
11        ReceivedAt:  base,
12    }
13
14    _ = deduper.Seen(event)
15    deduper.Cleanup(base.Add(2 * time.Minute))
16
17    if deduper.Seen(event) {
18        t.Fatalf("event should be accepted after cleanup")
19    }
20}

這個測試能快速完成,也不受機器速度影響。

重構步驟

把散落的去重邏輯收斂到 Deduper 時,可以按這個順序:

  1. 先列出所有入口目前的去重 key。
  2. 找出它們真正想表達的 domain 語意。
  3. 建立 DedupKey,使用 subject、event type 與時間窗口。
  4. 把 raw input 先 normalize 成 DomainEvent
  5. 在 processor 中呼叫 Deduper.Seen
  6. 移除 handler、worker 內的重複 map。
  7. 補同窗口、跨窗口、不同來源與 cleanup 測試。

不要一開始就把所有事件融合規則做完。先把「是否看過」集中,再處理 priority 或 replace policy。

設計檢查

檢查一:去重鍵使用 domain 語意

payload hash 對格式變化太敏感。欄位順序、metadata 或 timestamp 精度改變,都會讓同一件事看起來不同。

檢查二:事件順序使用 OccurredAt

ReceivedAt 是系統收到時間。事件是否同一件事,通常應看 OccurredAt 與 subject 語意。

檢查三:去重表需要 cleanup

任何「看過的 key」map 都會成長。沒有 cleanup 的 deduper 會在長時間服務中累積記憶體壓力。

檢查四:來源 priority 需要測試

若不同來源資料完整度不同,priority 是 domain policy。它應該有明確函式與測試,而不是散落在 processor 的條件判斷裡。

本章不處理

本章先處理單一服務內的去重規則如何集中;分散式去重與 idempotency store,會在下列章節再往外延伸:

和 Go 教材的關係

這一章承接的是 event normalization、processor 與 source priority;如果你要先回看語言教材,可以讀: