新增 domain event 的核心流程是先定義事件語意,再決定哪些外部來源可以轉成這個事件。事件是系統內部對「發生了什麼」的穩定合約,常數清單只是其中一種表達方式。

本章目標

學完本章後,你將能夠:

  1. 判斷 event type 是否代表穩定 domain fact
  2. DomainEvent envelope 承接不同外部來源
  3. 把 raw input 轉成內部事件,而不是直接更新狀態
  4. 設計 validation 與 dedup key
  5. 把 normalize、processor 與 repository 測試分開

【觀察】event type 代表已經發生的事

domain event 的核心語意是「某件對系統有意義的事已經發生」。command 表達意圖,event 表達事實;兩者的命名與處理責任要分開。

例如:

名稱類型語意
subscribe_topiccommand/actionclient 想訂閱 topic
notification.createdevent一筆通知已建立
job.startedevent一個 job 已開始
account.status_changedeventaccount 狀態已改變

event type 應該用過去式或事實語氣。notification.creatednotification.create 更能表達事件語意。這個命名差異會影響後續設計:事件處理器應該處理已發生事實,授權與意圖判斷則留在 command/usecase。

用 typed constant 可以集中 event type:

1type EventType string
2
3const (
4    EventNotificationCreated EventType = "notification.created"
5    EventJobStarted          EventType = "job.started"
6    EventAccountStatusChanged EventType = "account.status_changed"
7)

若 event type 只是為了對應外部欄位名稱,還沒有內部語意,應先把外部 raw input normalize,再判斷它是否真的代表一個穩定事實。

【判讀】DomainEvent 是內部事件合約

DomainEvent 的核心責任是提供統一 envelope,讓不同來源的事件進入系統後使用同一種語意模型。HTTP callback、client action、timer 或 queue message 都可以被 adapter 轉成 DomainEvent

 1type EventSource string
 2
 3const (
 4    SourceClientCommand EventSource = "client_command"
 5    SourceHTTPCallback  EventSource = "http_callback"
 6    SourceTimer         EventSource = "timer"
 7)
 8
 9type SubjectKind string
10
11const (
12    SubjectNotification SubjectKind = "notification"
13    SubjectJob          SubjectKind = "job"
14    SubjectAccount      SubjectKind = "account"
15)
16
17type DomainEvent struct {
18    ID            string          `json:"id"`
19    Source        EventSource     `json:"source"`
20    Type          EventType       `json:"type"`
21    SubjectID     string          `json:"subjectId"`
22    SubjectKind   SubjectKind     `json:"subjectKind"`
23    CorrelationID string          `json:"correlationId,omitempty"`
24    CausationID   string          `json:"causationId,omitempty"`
25    OccurredAt    time.Time       `json:"occurredAt"`
26    ReceivedAt    time.Time       `json:"receivedAt"`
27    SchemaVersion int             `json:"schemaVersion"`
28    Payload       json.RawMessage `json:"payload,omitempty"`
29}

SubjectIDSubjectKind 用來描述事件作用在哪個對象上。OccurredAt 表示事件實際發生時間,ReceivedAt 表示系統收到事件的時間。這兩個時間不能混用,因為外部事件可能延遲送達。

CorrelationID 用來串起同一個使用者操作或 request 造成的一串事件。CausationID 用來記錄這筆事件是由哪個 command 或事件引起。初期可以先保留欄位,不必一開始就建立完整 tracing 系統。

【策略】payload 是補充資料

event envelope 的核心語意應該放在固定欄位。Payload 適合存事件特有資料,事件分類、主體、時間與來源則應維持在第一層欄位。

例如通知建立事件的 payload 可以是:

1type NotificationCreatedPayload struct {
2    Topic string `json:"topic"`
3    Title string `json:"title"`
4}

建立 event 時,把穩定欄位放在 envelope:

 1func NewNotificationCreatedEvent(id string, notificationID string, payload NotificationCreatedPayload, now time.Time) (DomainEvent, error) {
 2    data, err := json.Marshal(payload)
 3    if err != nil {
 4        return DomainEvent{}, fmt.Errorf("marshal notification payload: %w", err)
 5    }
 6
 7    return DomainEvent{
 8        ID:            id,
 9        Source:        SourceClientCommand,
10        Type:          EventNotificationCreated,
11        SubjectID:     notificationID,
12        SubjectKind:   SubjectNotification,
13        OccurredAt:    now,
14        ReceivedAt:    now,
15        SchemaVersion: 1,
16        Payload:       data,
17    }, nil
18}

這個函式把事件建立規則集中起來。未來若要補 schema version、correlation ID 或預設時間,也不需要在每個呼叫端重複組 struct。

【執行】adapter 負責 raw input 轉換

adapter 的核心責任是把外部格式轉成內部事件。repository 更新交給 processor 或 usecase,raw payload 也應先轉成內部穩定模型再進入 domain layer。

假設外部 HTTP callback 長這樣:

1type RawNotificationCallback struct {
2    EventID        string `json:"event_id"`
3    NotificationID string `json:"notification_id"`
4    EventName      string `json:"event_name"`
5    Topic          string `json:"topic"`
6    Title          string `json:"title"`
7    Timestamp      string `json:"timestamp"`
8}

normalize 函式可以把它轉成 DomainEvent

 1func NormalizeNotificationCallback(raw RawNotificationCallback, receivedAt time.Time) (DomainEvent, error) {
 2    occurredAt, err := time.Parse(time.RFC3339, raw.Timestamp)
 3    if err != nil {
 4        return DomainEvent{}, fmt.Errorf("parse callback timestamp: %w", err)
 5    }
 6
 7    payload := NotificationCreatedPayload{
 8        Topic: raw.Topic,
 9        Title: raw.Title,
10    }
11
12    data, err := json.Marshal(payload)
13    if err != nil {
14        return DomainEvent{}, fmt.Errorf("marshal callback payload: %w", err)
15    }
16
17    event := DomainEvent{
18        ID:            raw.EventID,
19        Source:        SourceHTTPCallback,
20        Type:          EventNotificationCreated,
21        SubjectID:     raw.NotificationID,
22        SubjectKind:   SubjectNotification,
23        OccurredAt:    occurredAt,
24        ReceivedAt:    receivedAt,
25        SchemaVersion: 1,
26        Payload:       data,
27    }
28
29    if err := event.Validate(); err != nil {
30        return DomainEvent{}, err
31    }
32
33    return event, nil
34}

這段程式把外部欄位名稱、時間格式與 payload 組裝限制在 adapter 內。下游 processor 只需要理解 DomainEvent,不需要知道 callback 原始格式。

【判讀】validation 保護事件合約

event validation 的核心目標是確保事件語意完整。缺少 ID、type、subject 或時間的 event 進入 processor 後,狀態更新與去重都會失去依據。

 1func (e DomainEvent) Validate() error {
 2    if strings.TrimSpace(e.ID) == "" {
 3        return fmt.Errorf("event id is required")
 4    }
 5    if e.Type == "" {
 6        return fmt.Errorf("event type is required")
 7    }
 8    if strings.TrimSpace(e.SubjectID) == "" {
 9        return fmt.Errorf("subject id is required")
10    }
11    if e.SubjectKind == "" {
12        return fmt.Errorf("subject kind is required")
13    }
14    if e.OccurredAt.IsZero() {
15        return fmt.Errorf("occurred at is required")
16    }
17    if e.ReceivedAt.IsZero() {
18        return fmt.Errorf("received at is required")
19    }
20    if e.SchemaVersion <= 0 {
21        return fmt.Errorf("schema version must be positive")
22    }
23    return nil
24}

validation 應該檢查事件 envelope 的基本合約。更細的 payload 規則可以在 normalize 或 processor 中處理,依資料來源與用途決定。

【策略】去重鍵應建立在 domain 語意上

event dedup 的核心規則是使用語意鍵。不同來源可能用不同格式描述同一件事,但只要 subject、type 與時間窗口相同,就可能是重複事件。

 1type DedupKey struct {
 2    SubjectKind SubjectKind
 3    SubjectID   string
 4    Type        EventType
 5    Window      int64
 6}
 7
 8func NewDedupKey(event DomainEvent, window time.Duration) DedupKey {
 9    return DedupKey{
10        SubjectKind: event.SubjectKind,
11        SubjectID:   event.SubjectID,
12        Type:        event.Type,
13        Window:      event.OccurredAt.UnixNano() / int64(window),
14    }
15}

去重鍵應排除 ReceivedAt、raw metadata 或 request ID 這類每次都可能不同的欄位。那些欄位適合記錄觀測資訊,不適合作為「是否同一件事」的判斷依據。

若事件 ID 由可靠上游產生,可以優先用 event ID 去重。若上游 ID 不穩定,才需要 domain dedup key。這個選擇應該寫成明確規則,讓 map key 的組成方式對應可理解的去重語意。

【執行】processor 負責套用事件規則

event processor 的核心責任是驗證、去重、更新狀態與發布結果。processor 不負責讀 HTTP request,也不負責解析 WebSocket message。

 1type EventRepository interface {
 2    Apply(ctx context.Context, event DomainEvent) error
 3}
 4
 5type EventPublisher interface {
 6    Publish(ctx context.Context, event DomainEvent) error
 7}
 8
 9type Deduper interface {
10    Seen(event DomainEvent) bool
11}

processor 可以依賴這些小介面:

 1type EventProcessor struct {
 2    repository EventRepository
 3    publisher  EventPublisher
 4    deduper    Deduper
 5}
 6
 7func NewEventProcessor(repository EventRepository, publisher EventPublisher, deduper Deduper) *EventProcessor {
 8    return &EventProcessor{
 9        repository: repository,
10        publisher:  publisher,
11        deduper:    deduper,
12    }
13}

處理流程保持短而明確:

 1func (p *EventProcessor) Process(ctx context.Context, event DomainEvent) error {
 2    if err := event.Validate(); err != nil {
 3        return fmt.Errorf("validate event: %w", err)
 4    }
 5
 6    if p.deduper.Seen(event) {
 7        return nil
 8    }
 9
10    if err := p.repository.Apply(ctx, event); err != nil {
11        return fmt.Errorf("apply event: %w", err)
12    }
13
14    if err := p.publisher.Publish(ctx, event); err != nil {
15        return fmt.Errorf("publish event: %w", err)
16    }
17
18    return nil
19}

這個 processor 不知道資料來自哪裡,也不知道 repository 是 memory map 還是資料庫。這種邊界讓新增 event source 時不需要重寫狀態規則。

【執行】normalize 測試要固定外部輸入

normalize 測試的核心目標是確認 raw input 會被轉成正確內部事件。這類測試應該固定時間,避免測試依賴真實現在時間。

 1func TestNormalizeNotificationCallback(t *testing.T) {
 2    receivedAt := time.Date(2026, 4, 22, 10, 1, 0, 0, time.UTC)
 3    raw := RawNotificationCallback{
 4        EventID:        "evt_1",
 5        NotificationID: "ntf_1",
 6        EventName:      "notification_created",
 7        Topic:          "deployments",
 8        Title:          "Deploy finished",
 9        Timestamp:      "2026-04-22T10:00:00Z",
10    }
11
12    event, err := NormalizeNotificationCallback(raw, receivedAt)
13    if err != nil {
14        t.Fatalf("normalize callback: %v", err)
15    }
16
17    if event.Type != EventNotificationCreated {
18        t.Fatalf("event type = %q, want %q", event.Type, EventNotificationCreated)
19    }
20    if event.SubjectID != "ntf_1" {
21        t.Fatalf("subject ID = %q, want %q", event.SubjectID, "ntf_1")
22    }
23    if !event.ReceivedAt.Equal(receivedAt) {
24        t.Fatalf("received at = %v, want %v", event.ReceivedAt, receivedAt)
25    }
26}

這個測試不需要 repository,也不需要 publisher。它只保護 adapter 的轉換規則。

【執行】processor 測試要隔離外部來源

processor 測試的核心目標是確認事件規則被正確套用。測試應該直接建立 DomainEvent,讓 HTTP 或 WebSocket 解析留在 adapter 測試。

 1type fakeEventRepository struct {
 2    applied []DomainEvent
 3}
 4
 5func (f *fakeEventRepository) Apply(ctx context.Context, event DomainEvent) error {
 6    f.applied = append(f.applied, event)
 7    return nil
 8}
 9
10type fakeEventPublisher struct {
11    published []DomainEvent
12}
13
14func (f *fakeEventPublisher) Publish(ctx context.Context, event DomainEvent) error {
15    f.published = append(f.published, event)
16    return nil
17}
18
19type neverSeenDeduper struct{}
20
21func (neverSeenDeduper) Seen(event DomainEvent) bool {
22    return false
23}

成功案例可以確認 repository 與 publisher 都被呼叫:

 1func TestEventProcessorProcess(t *testing.T) {
 2    repo := &fakeEventRepository{}
 3    publisher := &fakeEventPublisher{}
 4    processor := NewEventProcessor(repo, publisher, neverSeenDeduper{})
 5
 6    event := DomainEvent{
 7        ID:            "evt_1",
 8        Source:        SourceHTTPCallback,
 9        Type:          EventNotificationCreated,
10        SubjectID:     "ntf_1",
11        SubjectKind:   SubjectNotification,
12        OccurredAt:    time.Date(2026, 4, 22, 10, 0, 0, 0, time.UTC),
13        ReceivedAt:    time.Date(2026, 4, 22, 10, 1, 0, 0, time.UTC),
14        SchemaVersion: 1,
15    }
16
17    if err := processor.Process(context.Background(), event); err != nil {
18        t.Fatalf("process event: %v", err)
19    }
20
21    if len(repo.applied) != 1 {
22        t.Fatalf("applied events = %d, want 1", len(repo.applied))
23    }
24    if len(publisher.published) != 1 {
25        t.Fatalf("published events = %d, want 1", len(publisher.published))
26    }
27}

processor 測試使用 fake repository、publisher 或 deduper,就能隔離事件規則。真實 message broker 或資料庫屬於 adapter integration test。

實作檢查清單

新增 domain event 時,可以依序檢查:

  1. event type 是否描述已經發生的事
  2. event type 是否是內部穩定語意,而不是外部欄位名稱
  3. envelope 是否包含 source、type、subject、occurred/received time
  4. payload 是否只放事件特有補充資料
  5. raw input 是否先經過 adapter normalize
  6. adapter 是否不直接更新 repository
  7. validation 是否保護事件基本合約
  8. dedup key 是否建立在 domain 語意上
  9. normalize、processor、repository 是否分開測試

設計檢查

檢查一:command 和 event 分開命名

create_notification 是想做某件事,notification.created 是某件事已發生。把兩者混在一起,會讓 processor 不清楚自己是在判斷授權、執行行為,還是在套用事實。

檢查二:raw payload 停在 adapter

raw payload 會帶有外部來源格式、命名與缺漏。domain layer 應該面對內部穩定模型,外部格式應該停在 adapter。

檢查三:穩定欄位放在 envelope

如果 type、subject、time 都藏在 payload,processor、deduper、repository 都必須解析 payload 才能做事。這會讓事件系統難測,也難以演進 schema。

檢查四:事件順序使用發生時間

ReceivedAt 是系統看到事件的時間,不一定是事件發生時間。狀態轉移通常應該優先看 OccurredAt,再根據延遲與來源可靠度設計補償規則。

本章不處理

本章先處理 domain event 的定義、轉換與發布;event store 與 broker 傳遞,會在下列章節再往外延伸:

和 Go 教材的關係

這一章承接的是 event envelope、normalize 與 processor;如果你要先回看語言教材,可以讀: