新增背景工作流程的核心規則是先定義生命週期,再定義資料流。worker 是有 context、輸入、輸出、錯誤處理與 shutdown 協定的長期元件。

本章目標

學完本章後,你將能夠:

  1. 判斷一段工作是否適合做成 worker
  2. Run(ctx) 設計 worker 生命週期
  3. 用 channel 和 ticker 表達資料流與週期性工作
  4. 處理 queue full、shutdown 與錯誤記錄
  5. 分開測試 SyncOnceRun(ctx) 與 channel 行為

【觀察】worker 是有生命週期的元件

worker 的核心定義是長時間運行、可被啟動、可被取消、會消費輸入或定期執行工作的元件。任意程式碼包進 go func() 只能產生背景 goroutine,還需要生命週期協定才會成為可維護的 worker。

適合做成 worker 的工作通常有三種:

工作類型範例worker 責任
queue consumer從 channel 讀取外部事件驗證、轉送 processor
periodic task每 30 秒同步一次外部狀態產生 command 或 event
cleanup task定期清理過期資料呼叫 repository 或 usecase 的清理方法

本章使用「通知同步 worker」作為範例。它定期向外部來源取得通知更新,轉成 domain event,再交給 EventProcessor 處理。

【判讀】worker 責任要先寫清楚

worker 責任的核心問題是它消費什麼、產生什麼、交給誰處理。worker 應聚焦在資料取得、格式轉換與轉交處理器,業務規則、狀態更新與 client 推送要留給對應的 usecase 或 processor。

先定義外部來源:

 1type NotificationSource interface {
 2    FetchUpdates(ctx context.Context) ([]RawNotificationUpdate, error)
 3}
 4
 5type RawNotificationUpdate struct {
 6    ID             string
 7    NotificationID string
 8    Topic          string
 9    Title          string
10    OccurredAt     time.Time
11}

再定義 worker 會呼叫的處理器:

1type EventProcessor interface {
2    Process(ctx context.Context, event DomainEvent) error
3}

worker 的責任是把外部更新取回來、normalize 成 DomainEvent、交給 processor。repository 寫入與推送規則仍然留在 processor 或 usecase 裡。

【策略】把單次工作獨立成 SyncOnce

worker 的核心設計技巧是把「單次工作」和「長時間迴圈」分開。SyncOnce 負責做一次同步,Run(ctx) 負責週期性呼叫它。

 1type SyncWorker struct {
 2    source    NotificationSource
 3    processor EventProcessor
 4    logger    *slog.Logger
 5}
 6
 7func NewSyncWorker(source NotificationSource, processor EventProcessor, logger *slog.Logger) *SyncWorker {
 8    return &SyncWorker{
 9        source:    source,
10        processor: processor,
11        logger:    logger,
12    }
13}

SyncOnce 可以像普通函式一樣測試:

 1func (w *SyncWorker) SyncOnce(ctx context.Context) error {
 2    updates, err := w.source.FetchUpdates(ctx)
 3    if err != nil {
 4        return fmt.Errorf("fetch notification updates: %w", err)
 5    }
 6
 7    for _, update := range updates {
 8        event, err := NormalizeNotificationUpdate(update, time.Now())
 9        if err != nil {
10            w.logger.Warn("skip invalid notification update", "id", update.ID, "error", err)
11            continue
12        }
13
14        if err := w.processor.Process(ctx, event); err != nil {
15            return fmt.Errorf("process notification update %s: %w", update.ID, err)
16        }
17    }
18
19    return nil
20}

這裡的 time.Now() 先展示基本寫法;如果測試需要固定時間,可以把 clock 注入進 worker。時間注入會在後面測試章節更完整處理。

【執行】Run(ctx) 管理長時間生命週期

Run(ctx) 的核心責任是等待 ticker、呼叫單次工作、尊重取消訊號。它應該在 context 被取消時退出,並釋放 ticker。

 1func (w *SyncWorker) Run(ctx context.Context, interval time.Duration) error {
 2    ticker := time.NewTicker(interval)
 3    defer ticker.Stop()
 4
 5    for {
 6        select {
 7        case <-ctx.Done():
 8            return ctx.Err()
 9        case <-ticker.C:
10            if err := w.SyncOnce(ctx); err != nil {
11                w.logger.Error("sync worker failed", "error", err)
12            }
13        }
14    }
15}

這個版本把單次同步錯誤記錄下來,但不讓 worker 退出。這是策略選擇:若外部來源短暫失敗,worker 可以等待下一輪;若錯誤代表設定失效或授權失效,則可以選擇 return error 讓上層重啟或停止服務。

worker 錯誤策略應該明確。暫時性錯誤通常要記錄後等待下一輪;致命設定錯誤則可以回傳給上層,讓服務決定重啟或停止。

【判讀】channel worker 要設計 backpressure

channel worker 的核心問題是接收端跟不上時要怎麼辦。buffer 大小、blocking send、non-blocking send 都是在回答 backpressure 策略。

假設外部 HTTP callback 會把 raw update 送進 worker queue:

 1type QueueWorker struct {
 2    updates   chan RawNotificationUpdate
 3    processor EventProcessor
 4    logger    *slog.Logger
 5}
 6
 7func NewQueueWorker(processor EventProcessor, logger *slog.Logger, bufferSize int) *QueueWorker {
 8    return &QueueWorker{
 9        updates:   make(chan RawNotificationUpdate, bufferSize),
10        processor: processor,
11        logger:    logger,
12    }
13}

送入 queue 可以選擇 blocking 或 non-blocking。若呼叫端不能被背景處理拖慢,可以用 non-blocking send 並回傳錯誤:

 1var ErrQueueFull = errors.New("notification update queue full")
 2
 3func (w *QueueWorker) Enqueue(update RawNotificationUpdate) error {
 4    select {
 5    case w.updates <- update:
 6        return nil
 7    default:
 8        return ErrQueueFull
 9    }
10}

這個設計很誠實:queue 滿了就是系統忙碌。上層可以記錄 log、回 503,或告訴 client 稍後重試。

【執行】queue worker 要同時監聽輸入與取消

queue worker 的核心生命週期是等待 update 或 context cancel。Run(ctx) 裡應用 select 同時處理這兩件事。

 1func (w *QueueWorker) Run(ctx context.Context) error {
 2    for {
 3        select {
 4        case <-ctx.Done():
 5            return ctx.Err()
 6        case update, ok := <-w.updates:
 7            if !ok {
 8                return nil
 9            }
10            if err := w.handleUpdate(ctx, update); err != nil {
11                w.logger.Error("handle notification update failed", "id", update.ID, "error", err)
12            }
13        }
14    }
15}

handleUpdate 負責單筆資料轉換與處理:

1func (w *QueueWorker) handleUpdate(ctx context.Context, update RawNotificationUpdate) error {
2    event, err := NormalizeNotificationUpdate(update, time.Now())
3    if err != nil {
4        return fmt.Errorf("normalize update: %w", err)
5    }
6    return w.processor.Process(ctx, event)
7}

這裡仍然遵守同一條邊界:worker 不直接改 repository,只把事件交給 processor。

【策略】shutdown 是否 drain queue 要先決定

shutdown 的核心決策是取消時要立刻停止,還是處理完 queue 中既有資料。兩種策略都合理,但語意不同。

策略做法適用情境
立即停止收到 ctx.Done() 就 return即時通知、可重試資料
drain queue停止接收新資料,處理已排隊資料不可輕易丟棄的資料

立即停止比較簡單:

1case <-ctx.Done():
2    return ctx.Err()

drain queue 則需要另一個停止接收的協定,例如由擁有送出端的一方關閉 channel,再讓 worker range 到 channel 關閉。channel close 的所有權要留在送出端,因為送出端最清楚是否還會送資料。

這裡的核心區分是「取消 context」和「關閉 channel」代表不同訊號。context 表示這件工作該停了;channel close 表示不會再有新資料。兩者可以搭配,但語意不同。

【判讀】worker 使用服務生命週期 context

worker context 的核心規則是:長時間 worker 使用服務生命週期 context,單次工作可以另外接收 request context。某個 HTTP request 的 context 只適合控制該次請求;request 結束後 context 會被取消。

1func (w *QueueWorker) Enqueue(update RawNotificationUpdate) error {
2    select {
3    case w.updates <- update:
4        return nil
5    default:
6        return ErrQueueFull
7    }
8}

Enqueue 不把 request context 存起來。真正處理 update 時,worker 使用自己的 Run(ctx) context 控制生命週期。

若某筆 update 需要保留 request IDcorrelation ID,應把它放進明確欄位,而不是依賴 context value 在背景工作中長期存在。

【執行】SyncOnce 測試要隔離時間與外部來源

SyncOnce 測試的核心目標是確認單次工作會把外部資料交給 processor。測試重點放在單次同步,不需要等待真實 ticker。

 1type fakeNotificationSource struct {
 2    updates []RawNotificationUpdate
 3    err     error
 4}
 5
 6func (f fakeNotificationSource) FetchUpdates(ctx context.Context) ([]RawNotificationUpdate, error) {
 7    if f.err != nil {
 8        return nil, f.err
 9    }
10    return f.updates, nil
11}
12
13type recordingProcessor struct {
14    events []DomainEvent
15}
16
17func (p *recordingProcessor) Process(ctx context.Context, event DomainEvent) error {
18    p.events = append(p.events, event)
19    return nil
20}

測試單次同步:

 1func TestSyncWorkerSyncOnce(t *testing.T) {
 2    source := fakeNotificationSource{
 3        updates: []RawNotificationUpdate{
 4            {
 5                ID:             "evt_1",
 6                NotificationID: "ntf_1",
 7                Topic:          "deployments",
 8                Title:          "Deploy finished",
 9                OccurredAt:     time.Date(2026, 4, 22, 10, 0, 0, 0, time.UTC),
10            },
11        },
12    }
13    processor := &recordingProcessor{}
14    worker := NewSyncWorker(source, processor, slog.Default())
15
16    if err := worker.SyncOnce(context.Background()); err != nil {
17        t.Fatalf("sync once: %v", err)
18    }
19
20    if len(processor.events) != 1 {
21        t.Fatalf("processed events = %d, want 1", len(processor.events))
22    }
23}

這個測試不需要 goroutine。先把單次工作測清楚,再測長時間生命週期。

【執行】Run(ctx) 測試要能快速取消

Run(ctx) 測試的核心目標是確認 worker 會尊重取消訊號。測試應該使用已取消或很短的 context,讓測試快速結束。

 1func TestSyncWorkerRunStopsWhenContextCanceled(t *testing.T) {
 2    source := fakeNotificationSource{}
 3    processor := &recordingProcessor{}
 4    worker := NewSyncWorker(source, processor, slog.Default())
 5
 6    ctx, cancel := context.WithCancel(context.Background())
 7    cancel()
 8
 9    err := worker.Run(ctx, time.Hour)
10    if !errors.Is(err, context.Canceled) {
11        t.Fatalf("error = %v, want context.Canceled", err)
12    }
13}

這個測試用 time.Hour 當 interval,因為 context 已經取消,Run 應該立刻退出,不需要等 ticker。

【執行】queue full 測試要固定 buffer

queue full 測試的核心目標是確認 backpressure 策略。buffer 設成 1,先塞滿,再確認第二次 enqueue 回錯。

 1func TestQueueWorkerEnqueueFull(t *testing.T) {
 2    worker := NewQueueWorker(&recordingProcessor{}, slog.Default(), 1)
 3
 4    err := worker.Enqueue(RawNotificationUpdate{ID: "evt_1"})
 5    if err != nil {
 6        t.Fatalf("first enqueue: %v", err)
 7    }
 8
 9    err = worker.Enqueue(RawNotificationUpdate{ID: "evt_2"})
10    if !errors.Is(err, ErrQueueFull) {
11        t.Fatalf("error = %v, want ErrQueueFull", err)
12    }
13}

這個測試不啟動 worker,所以 channel 裡第一筆資料不會被消費,第二筆必然遇到 full。這比用 sleep 製造滿載狀態穩定。

實作檢查清單

新增 background worker 時,可以依序檢查:

  1. worker 責任是否明確:消費什麼,產生什麼,交給誰
  2. 是否有 Run(ctx) 作為生命週期入口
  3. 單次工作是否拆成 SyncOncehandleUpdate
  4. worker 是否尊重 ctx.Done()
  5. ticker 是否 defer Stop()
  6. channel buffer 是否有明確 backpressure 策略
  7. queue full 是否回錯或記錄,而不是靜默丟棄
  8. worker 是否呼叫 usecase/processor,而不是直接改 repository
  9. 測試是否避免真實長時間 sleep

設計檢查

檢查一:goroutine 要有停止條件

長時間 goroutine 需要 context、channel close 或其他退出條件。缺少停止條件時,服務運行越久,越容易累積難以診斷的資源問題。

檢查二:worker 透過 processor 或 usecase 修改狀態

worker 透過 processor 或 usecase 修改狀態,可以讓背景流程和即時流程共用同一套規則。worker 直接改 repository 時,狀態規則容易分散。

檢查三:queue full 要有明確策略

資料可以丟時,應明確記錄 log 或 metric;資料需要保留時,應 blocking 或回錯。queue full 策略明確,後續追蹤才有依據。

檢查四:測試隔離真實時間

測試優先測 SyncOnce,再用已取消 context 測 Run 的退出行為。真實 ticker 等待數秒會讓測試慢且不穩。

本章不處理

本章先處理單一背景工作如何啟動、停止與回報;cron、queue、retry 與 outbox,會在下列章節再往外延伸:

和 Go 教材的關係

這一章承接的是 goroutine 生命週期與 event processing;如果你要先回看語言教材,可以讀: