6.2 如何新增一種 domain event
新增 domain event 的核心流程是先定義事件語意,再決定哪些外部來源可以轉成這個事件。事件是系統內部對「發生了什麼」的穩定合約,常數清單只是其中一種表達方式。
本章目標
學完本章後,你將能夠:
- 判斷 event type 是否代表穩定 domain fact
- 用
DomainEventenvelope 承接不同外部來源 - 把 raw input 轉成內部事件,而不是直接更新狀態
- 設計 validation 與 dedup key
- 把 normalize、processor 與 repository 測試分開
【觀察】event type 代表已經發生的事
domain event 的核心語意是「某件對系統有意義的事已經發生」。command 表達意圖,event 表達事實;兩者的命名與處理責任要分開。
例如:
| 名稱 | 類型 | 語意 |
|---|---|---|
subscribe_topic | command/action | client 想訂閱 topic |
notification.created | event | 一筆通知已建立 |
job.started | event | 一個 job 已開始 |
account.status_changed | event | account 狀態已改變 |
event type 應該用過去式或事實語氣。notification.created 比 notification.create 更能表達事件語意。這個命名差異會影響後續設計:事件處理器應該處理已發生事實,授權與意圖判斷則留在 command/usecase。
用 typed constant 可以集中 event type:
1type EventType string
2
3const (
4 EventNotificationCreated EventType = "notification.created"
5 EventJobStarted EventType = "job.started"
6 EventAccountStatusChanged EventType = "account.status_changed"
7)若 event type 只是為了對應外部欄位名稱,還沒有內部語意,應先把外部 raw input normalize,再判斷它是否真的代表一個穩定事實。
【判讀】DomainEvent 是內部事件合約
DomainEvent 的核心責任是提供統一 envelope,讓不同來源的事件進入系統後使用同一種語意模型。HTTP callback、client action、timer 或 queue message 都可以被 adapter 轉成 DomainEvent。
1type EventSource string
2
3const (
4 SourceClientCommand EventSource = "client_command"
5 SourceHTTPCallback EventSource = "http_callback"
6 SourceTimer EventSource = "timer"
7)
8
9type SubjectKind string
10
11const (
12 SubjectNotification SubjectKind = "notification"
13 SubjectJob SubjectKind = "job"
14 SubjectAccount SubjectKind = "account"
15)
16
17type DomainEvent struct {
18 ID string `json:"id"`
19 Source EventSource `json:"source"`
20 Type EventType `json:"type"`
21 SubjectID string `json:"subjectId"`
22 SubjectKind SubjectKind `json:"subjectKind"`
23 CorrelationID string `json:"correlationId,omitempty"`
24 CausationID string `json:"causationId,omitempty"`
25 OccurredAt time.Time `json:"occurredAt"`
26 ReceivedAt time.Time `json:"receivedAt"`
27 SchemaVersion int `json:"schemaVersion"`
28 Payload json.RawMessage `json:"payload,omitempty"`
29}SubjectID 和 SubjectKind 用來描述事件作用在哪個對象上。OccurredAt 表示事件實際發生時間,ReceivedAt 表示系統收到事件的時間。這兩個時間不能混用,因為外部事件可能延遲送達。
CorrelationID 用來串起同一個使用者操作或 request 造成的一串事件。CausationID 用來記錄這筆事件是由哪個 command 或事件引起。初期可以先保留欄位,不必一開始就建立完整 tracing 系統。
【策略】payload 是補充資料
event envelope 的核心語意應該放在固定欄位。Payload 適合存事件特有資料,事件分類、主體、時間與來源則應維持在第一層欄位。
例如通知建立事件的 payload 可以是:
1type NotificationCreatedPayload struct {
2 Topic string `json:"topic"`
3 Title string `json:"title"`
4}建立 event 時,把穩定欄位放在 envelope:
1func NewNotificationCreatedEvent(id string, notificationID string, payload NotificationCreatedPayload, now time.Time) (DomainEvent, error) {
2 data, err := json.Marshal(payload)
3 if err != nil {
4 return DomainEvent{}, fmt.Errorf("marshal notification payload: %w", err)
5 }
6
7 return DomainEvent{
8 ID: id,
9 Source: SourceClientCommand,
10 Type: EventNotificationCreated,
11 SubjectID: notificationID,
12 SubjectKind: SubjectNotification,
13 OccurredAt: now,
14 ReceivedAt: now,
15 SchemaVersion: 1,
16 Payload: data,
17 }, nil
18}這個函式把事件建立規則集中起來。未來若要補 schema version、correlation ID 或預設時間,也不需要在每個呼叫端重複組 struct。
【執行】adapter 負責 raw input 轉換
adapter 的核心責任是把外部格式轉成內部事件。repository 更新交給 processor 或 usecase,raw payload 也應先轉成內部穩定模型再進入 domain layer。
假設外部 HTTP callback 長這樣:
1type RawNotificationCallback struct {
2 EventID string `json:"event_id"`
3 NotificationID string `json:"notification_id"`
4 EventName string `json:"event_name"`
5 Topic string `json:"topic"`
6 Title string `json:"title"`
7 Timestamp string `json:"timestamp"`
8}normalize 函式可以把它轉成 DomainEvent:
1func NormalizeNotificationCallback(raw RawNotificationCallback, receivedAt time.Time) (DomainEvent, error) {
2 occurredAt, err := time.Parse(time.RFC3339, raw.Timestamp)
3 if err != nil {
4 return DomainEvent{}, fmt.Errorf("parse callback timestamp: %w", err)
5 }
6
7 payload := NotificationCreatedPayload{
8 Topic: raw.Topic,
9 Title: raw.Title,
10 }
11
12 data, err := json.Marshal(payload)
13 if err != nil {
14 return DomainEvent{}, fmt.Errorf("marshal callback payload: %w", err)
15 }
16
17 event := DomainEvent{
18 ID: raw.EventID,
19 Source: SourceHTTPCallback,
20 Type: EventNotificationCreated,
21 SubjectID: raw.NotificationID,
22 SubjectKind: SubjectNotification,
23 OccurredAt: occurredAt,
24 ReceivedAt: receivedAt,
25 SchemaVersion: 1,
26 Payload: data,
27 }
28
29 if err := event.Validate(); err != nil {
30 return DomainEvent{}, err
31 }
32
33 return event, nil
34}這段程式把外部欄位名稱、時間格式與 payload 組裝限制在 adapter 內。下游 processor 只需要理解 DomainEvent,不需要知道 callback 原始格式。
【判讀】validation 保護事件合約
event validation 的核心目標是確保事件語意完整。缺少 ID、type、subject 或時間的 event 進入 processor 後,狀態更新與去重都會失去依據。
1func (e DomainEvent) Validate() error {
2 if strings.TrimSpace(e.ID) == "" {
3 return fmt.Errorf("event id is required")
4 }
5 if e.Type == "" {
6 return fmt.Errorf("event type is required")
7 }
8 if strings.TrimSpace(e.SubjectID) == "" {
9 return fmt.Errorf("subject id is required")
10 }
11 if e.SubjectKind == "" {
12 return fmt.Errorf("subject kind is required")
13 }
14 if e.OccurredAt.IsZero() {
15 return fmt.Errorf("occurred at is required")
16 }
17 if e.ReceivedAt.IsZero() {
18 return fmt.Errorf("received at is required")
19 }
20 if e.SchemaVersion <= 0 {
21 return fmt.Errorf("schema version must be positive")
22 }
23 return nil
24}validation 應該檢查事件 envelope 的基本合約。更細的 payload 規則可以在 normalize 或 processor 中處理,依資料來源與用途決定。
【策略】去重鍵應建立在 domain 語意上
event dedup 的核心規則是使用語意鍵。不同來源可能用不同格式描述同一件事,但只要 subject、type 與時間窗口相同,就可能是重複事件。
1type DedupKey struct {
2 SubjectKind SubjectKind
3 SubjectID string
4 Type EventType
5 Window int64
6}
7
8func NewDedupKey(event DomainEvent, window time.Duration) DedupKey {
9 return DedupKey{
10 SubjectKind: event.SubjectKind,
11 SubjectID: event.SubjectID,
12 Type: event.Type,
13 Window: event.OccurredAt.UnixNano() / int64(window),
14 }
15}去重鍵應排除 ReceivedAt、raw metadata 或 request ID 這類每次都可能不同的欄位。那些欄位適合記錄觀測資訊,不適合作為「是否同一件事」的判斷依據。
若事件 ID 由可靠上游產生,可以優先用 event ID 去重。若上游 ID 不穩定,才需要 domain dedup key。這個選擇應該寫成明確規則,讓 map key 的組成方式對應可理解的去重語意。
【執行】processor 負責套用事件規則
event processor 的核心責任是驗證、去重、更新狀態與發布結果。processor 不負責讀 HTTP request,也不負責解析 WebSocket message。
1type EventRepository interface {
2 Apply(ctx context.Context, event DomainEvent) error
3}
4
5type EventPublisher interface {
6 Publish(ctx context.Context, event DomainEvent) error
7}
8
9type Deduper interface {
10 Seen(event DomainEvent) bool
11}processor 可以依賴這些小介面:
1type EventProcessor struct {
2 repository EventRepository
3 publisher EventPublisher
4 deduper Deduper
5}
6
7func NewEventProcessor(repository EventRepository, publisher EventPublisher, deduper Deduper) *EventProcessor {
8 return &EventProcessor{
9 repository: repository,
10 publisher: publisher,
11 deduper: deduper,
12 }
13}處理流程保持短而明確:
1func (p *EventProcessor) Process(ctx context.Context, event DomainEvent) error {
2 if err := event.Validate(); err != nil {
3 return fmt.Errorf("validate event: %w", err)
4 }
5
6 if p.deduper.Seen(event) {
7 return nil
8 }
9
10 if err := p.repository.Apply(ctx, event); err != nil {
11 return fmt.Errorf("apply event: %w", err)
12 }
13
14 if err := p.publisher.Publish(ctx, event); err != nil {
15 return fmt.Errorf("publish event: %w", err)
16 }
17
18 return nil
19}這個 processor 不知道資料來自哪裡,也不知道 repository 是 memory map 還是資料庫。這種邊界讓新增 event source 時不需要重寫狀態規則。
【執行】normalize 測試要固定外部輸入
normalize 測試的核心目標是確認 raw input 會被轉成正確內部事件。這類測試應該固定時間,避免測試依賴真實現在時間。
1func TestNormalizeNotificationCallback(t *testing.T) {
2 receivedAt := time.Date(2026, 4, 22, 10, 1, 0, 0, time.UTC)
3 raw := RawNotificationCallback{
4 EventID: "evt_1",
5 NotificationID: "ntf_1",
6 EventName: "notification_created",
7 Topic: "deployments",
8 Title: "Deploy finished",
9 Timestamp: "2026-04-22T10:00:00Z",
10 }
11
12 event, err := NormalizeNotificationCallback(raw, receivedAt)
13 if err != nil {
14 t.Fatalf("normalize callback: %v", err)
15 }
16
17 if event.Type != EventNotificationCreated {
18 t.Fatalf("event type = %q, want %q", event.Type, EventNotificationCreated)
19 }
20 if event.SubjectID != "ntf_1" {
21 t.Fatalf("subject ID = %q, want %q", event.SubjectID, "ntf_1")
22 }
23 if !event.ReceivedAt.Equal(receivedAt) {
24 t.Fatalf("received at = %v, want %v", event.ReceivedAt, receivedAt)
25 }
26}這個測試不需要 repository,也不需要 publisher。它只保護 adapter 的轉換規則。
【執行】processor 測試要隔離外部來源
processor 測試的核心目標是確認事件規則被正確套用。測試應該直接建立 DomainEvent,讓 HTTP 或 WebSocket 解析留在 adapter 測試。
1type fakeEventRepository struct {
2 applied []DomainEvent
3}
4
5func (f *fakeEventRepository) Apply(ctx context.Context, event DomainEvent) error {
6 f.applied = append(f.applied, event)
7 return nil
8}
9
10type fakeEventPublisher struct {
11 published []DomainEvent
12}
13
14func (f *fakeEventPublisher) Publish(ctx context.Context, event DomainEvent) error {
15 f.published = append(f.published, event)
16 return nil
17}
18
19type neverSeenDeduper struct{}
20
21func (neverSeenDeduper) Seen(event DomainEvent) bool {
22 return false
23}成功案例可以確認 repository 與 publisher 都被呼叫:
1func TestEventProcessorProcess(t *testing.T) {
2 repo := &fakeEventRepository{}
3 publisher := &fakeEventPublisher{}
4 processor := NewEventProcessor(repo, publisher, neverSeenDeduper{})
5
6 event := DomainEvent{
7 ID: "evt_1",
8 Source: SourceHTTPCallback,
9 Type: EventNotificationCreated,
10 SubjectID: "ntf_1",
11 SubjectKind: SubjectNotification,
12 OccurredAt: time.Date(2026, 4, 22, 10, 0, 0, 0, time.UTC),
13 ReceivedAt: time.Date(2026, 4, 22, 10, 1, 0, 0, time.UTC),
14 SchemaVersion: 1,
15 }
16
17 if err := processor.Process(context.Background(), event); err != nil {
18 t.Fatalf("process event: %v", err)
19 }
20
21 if len(repo.applied) != 1 {
22 t.Fatalf("applied events = %d, want 1", len(repo.applied))
23 }
24 if len(publisher.published) != 1 {
25 t.Fatalf("published events = %d, want 1", len(publisher.published))
26 }
27}processor 測試使用 fake repository、publisher 或 deduper,就能隔離事件規則。真實 message broker 或資料庫屬於 adapter integration test。
實作檢查清單
新增 domain event 時,可以依序檢查:
- event type 是否描述已經發生的事
- event type 是否是內部穩定語意,而不是外部欄位名稱
- envelope 是否包含 source、type、subject、occurred/received time
- payload 是否只放事件特有補充資料
- raw input 是否先經過 adapter normalize
- adapter 是否不直接更新 repository
- validation 是否保護事件基本合約
- dedup key 是否建立在 domain 語意上
- normalize、processor、repository 是否分開測試
設計檢查
檢查一:command 和 event 分開命名
create_notification 是想做某件事,notification.created 是某件事已發生。把兩者混在一起,會讓 processor 不清楚自己是在判斷授權、執行行為,還是在套用事實。
檢查二:raw payload 停在 adapter
raw payload 會帶有外部來源格式、命名與缺漏。domain layer 應該面對內部穩定模型,外部格式應該停在 adapter。
檢查三:穩定欄位放在 envelope
如果 type、subject、time 都藏在 payload,processor、deduper、repository 都必須解析 payload 才能做事。這會讓事件系統難測,也難以演進 schema。
檢查四:事件順序使用發生時間
ReceivedAt 是系統看到事件的時間,不一定是事件發生時間。狀態轉移通常應該優先看 OccurredAt,再根據延遲與來源可靠度設計補償規則。
本章不處理
本章先處理 domain event 的定義、轉換與發布;event store 與 broker 傳遞,會在下列章節再往外延伸:
和 Go 教材的關係
這一章承接的是 event envelope、normalize 與 processor;如果你要先回看語言教材,可以讀: