6.4 如何新增背景工作流程
新增背景工作流程的核心規則是先定義生命週期,再定義資料流。worker 是有 context、輸入、輸出、錯誤處理與 shutdown 協定的長期元件。
本章目標
學完本章後,你將能夠:
- 判斷一段工作是否適合做成 worker
- 用
Run(ctx)設計 worker 生命週期 - 用 channel 和 ticker 表達資料流與週期性工作
- 處理 queue full、shutdown 與錯誤記錄
- 分開測試
SyncOnce、Run(ctx)與 channel 行為
【觀察】worker 是有生命週期的元件
worker 的核心定義是長時間運行、可被啟動、可被取消、會消費輸入或定期執行工作的元件。任意程式碼包進 go func() 只能產生背景 goroutine,還需要生命週期協定才會成為可維護的 worker。
適合做成 worker 的工作通常有三種:
| 工作類型 | 範例 | worker 責任 |
|---|---|---|
| queue consumer | 從 channel 讀取外部事件 | 驗證、轉送 processor |
| periodic task | 每 30 秒同步一次外部狀態 | 產生 command 或 event |
| cleanup task | 定期清理過期資料 | 呼叫 repository 或 usecase 的清理方法 |
本章使用「通知同步 worker」作為範例。它定期向外部來源取得通知更新,轉成 domain event,再交給 EventProcessor 處理。
【判讀】worker 責任要先寫清楚
worker 責任的核心問題是它消費什麼、產生什麼、交給誰處理。worker 應聚焦在資料取得、格式轉換與轉交處理器,業務規則、狀態更新與 client 推送要留給對應的 usecase 或 processor。
先定義外部來源:
1type NotificationSource interface {
2 FetchUpdates(ctx context.Context) ([]RawNotificationUpdate, error)
3}
4
5type RawNotificationUpdate struct {
6 ID string
7 NotificationID string
8 Topic string
9 Title string
10 OccurredAt time.Time
11}再定義 worker 會呼叫的處理器:
1type EventProcessor interface {
2 Process(ctx context.Context, event DomainEvent) error
3}worker 的責任是把外部更新取回來、normalize 成 DomainEvent、交給 processor。repository 寫入與推送規則仍然留在 processor 或 usecase 裡。
【策略】把單次工作獨立成 SyncOnce
worker 的核心設計技巧是把「單次工作」和「長時間迴圈」分開。SyncOnce 負責做一次同步,Run(ctx) 負責週期性呼叫它。
1type SyncWorker struct {
2 source NotificationSource
3 processor EventProcessor
4 logger *slog.Logger
5}
6
7func NewSyncWorker(source NotificationSource, processor EventProcessor, logger *slog.Logger) *SyncWorker {
8 return &SyncWorker{
9 source: source,
10 processor: processor,
11 logger: logger,
12 }
13}SyncOnce 可以像普通函式一樣測試:
1func (w *SyncWorker) SyncOnce(ctx context.Context) error {
2 updates, err := w.source.FetchUpdates(ctx)
3 if err != nil {
4 return fmt.Errorf("fetch notification updates: %w", err)
5 }
6
7 for _, update := range updates {
8 event, err := NormalizeNotificationUpdate(update, time.Now())
9 if err != nil {
10 w.logger.Warn("skip invalid notification update", "id", update.ID, "error", err)
11 continue
12 }
13
14 if err := w.processor.Process(ctx, event); err != nil {
15 return fmt.Errorf("process notification update %s: %w", update.ID, err)
16 }
17 }
18
19 return nil
20}這裡的 time.Now() 先展示基本寫法;如果測試需要固定時間,可以把 clock 注入進 worker。時間注入會在後面測試章節更完整處理。
【執行】Run(ctx) 管理長時間生命週期
Run(ctx) 的核心責任是等待 ticker、呼叫單次工作、尊重取消訊號。它應該在 context 被取消時退出,並釋放 ticker。
1func (w *SyncWorker) Run(ctx context.Context, interval time.Duration) error {
2 ticker := time.NewTicker(interval)
3 defer ticker.Stop()
4
5 for {
6 select {
7 case <-ctx.Done():
8 return ctx.Err()
9 case <-ticker.C:
10 if err := w.SyncOnce(ctx); err != nil {
11 w.logger.Error("sync worker failed", "error", err)
12 }
13 }
14 }
15}這個版本把單次同步錯誤記錄下來,但不讓 worker 退出。這是策略選擇:若外部來源短暫失敗,worker 可以等待下一輪;若錯誤代表設定失效或授權失效,則可以選擇 return error 讓上層重啟或停止服務。
worker 錯誤策略應該明確。暫時性錯誤通常要記錄後等待下一輪;致命設定錯誤則可以回傳給上層,讓服務決定重啟或停止。
【判讀】channel worker 要設計 backpressure
channel worker 的核心問題是接收端跟不上時要怎麼辦。buffer 大小、blocking send、non-blocking send 都是在回答 backpressure 策略。
假設外部 HTTP callback 會把 raw update 送進 worker queue:
1type QueueWorker struct {
2 updates chan RawNotificationUpdate
3 processor EventProcessor
4 logger *slog.Logger
5}
6
7func NewQueueWorker(processor EventProcessor, logger *slog.Logger, bufferSize int) *QueueWorker {
8 return &QueueWorker{
9 updates: make(chan RawNotificationUpdate, bufferSize),
10 processor: processor,
11 logger: logger,
12 }
13}送入 queue 可以選擇 blocking 或 non-blocking。若呼叫端不能被背景處理拖慢,可以用 non-blocking send 並回傳錯誤:
1var ErrQueueFull = errors.New("notification update queue full")
2
3func (w *QueueWorker) Enqueue(update RawNotificationUpdate) error {
4 select {
5 case w.updates <- update:
6 return nil
7 default:
8 return ErrQueueFull
9 }
10}這個設計很誠實:queue 滿了就是系統忙碌。上層可以記錄 log、回 503,或告訴 client 稍後重試。
【執行】queue worker 要同時監聽輸入與取消
queue worker 的核心生命週期是等待 update 或 context cancel。Run(ctx) 裡應用 select 同時處理這兩件事。
1func (w *QueueWorker) Run(ctx context.Context) error {
2 for {
3 select {
4 case <-ctx.Done():
5 return ctx.Err()
6 case update, ok := <-w.updates:
7 if !ok {
8 return nil
9 }
10 if err := w.handleUpdate(ctx, update); err != nil {
11 w.logger.Error("handle notification update failed", "id", update.ID, "error", err)
12 }
13 }
14 }
15}handleUpdate 負責單筆資料轉換與處理:
1func (w *QueueWorker) handleUpdate(ctx context.Context, update RawNotificationUpdate) error {
2 event, err := NormalizeNotificationUpdate(update, time.Now())
3 if err != nil {
4 return fmt.Errorf("normalize update: %w", err)
5 }
6 return w.processor.Process(ctx, event)
7}這裡仍然遵守同一條邊界:worker 不直接改 repository,只把事件交給 processor。
【策略】shutdown 是否 drain queue 要先決定
shutdown 的核心決策是取消時要立刻停止,還是處理完 queue 中既有資料。兩種策略都合理,但語意不同。
| 策略 | 做法 | 適用情境 |
|---|---|---|
| 立即停止 | 收到 ctx.Done() 就 return | 即時通知、可重試資料 |
| drain queue | 停止接收新資料,處理已排隊資料 | 不可輕易丟棄的資料 |
立即停止比較簡單:
1case <-ctx.Done():
2 return ctx.Err()drain queue 則需要另一個停止接收的協定,例如由擁有送出端的一方關閉 channel,再讓 worker range 到 channel 關閉。channel close 的所有權要留在送出端,因為送出端最清楚是否還會送資料。
這裡的核心區分是「取消 context」和「關閉 channel」代表不同訊號。context 表示這件工作該停了;channel close 表示不會再有新資料。兩者可以搭配,但語意不同。
【判讀】worker 使用服務生命週期 context
worker context 的核心規則是:長時間 worker 使用服務生命週期 context,單次工作可以另外接收 request context。某個 HTTP request 的 context 只適合控制該次請求;request 結束後 context 會被取消。
1func (w *QueueWorker) Enqueue(update RawNotificationUpdate) error {
2 select {
3 case w.updates <- update:
4 return nil
5 default:
6 return ErrQueueFull
7 }
8}Enqueue 不把 request context 存起來。真正處理 update 時,worker 使用自己的 Run(ctx) context 控制生命週期。
若某筆 update 需要保留 request ID 或 correlation ID,應把它放進明確欄位,而不是依賴 context value 在背景工作中長期存在。
【執行】SyncOnce 測試要隔離時間與外部來源
SyncOnce 測試的核心目標是確認單次工作會把外部資料交給 processor。測試重點放在單次同步,不需要等待真實 ticker。
1type fakeNotificationSource struct {
2 updates []RawNotificationUpdate
3 err error
4}
5
6func (f fakeNotificationSource) FetchUpdates(ctx context.Context) ([]RawNotificationUpdate, error) {
7 if f.err != nil {
8 return nil, f.err
9 }
10 return f.updates, nil
11}
12
13type recordingProcessor struct {
14 events []DomainEvent
15}
16
17func (p *recordingProcessor) Process(ctx context.Context, event DomainEvent) error {
18 p.events = append(p.events, event)
19 return nil
20}測試單次同步:
1func TestSyncWorkerSyncOnce(t *testing.T) {
2 source := fakeNotificationSource{
3 updates: []RawNotificationUpdate{
4 {
5 ID: "evt_1",
6 NotificationID: "ntf_1",
7 Topic: "deployments",
8 Title: "Deploy finished",
9 OccurredAt: time.Date(2026, 4, 22, 10, 0, 0, 0, time.UTC),
10 },
11 },
12 }
13 processor := &recordingProcessor{}
14 worker := NewSyncWorker(source, processor, slog.Default())
15
16 if err := worker.SyncOnce(context.Background()); err != nil {
17 t.Fatalf("sync once: %v", err)
18 }
19
20 if len(processor.events) != 1 {
21 t.Fatalf("processed events = %d, want 1", len(processor.events))
22 }
23}這個測試不需要 goroutine。先把單次工作測清楚,再測長時間生命週期。
【執行】Run(ctx) 測試要能快速取消
Run(ctx) 測試的核心目標是確認 worker 會尊重取消訊號。測試應該使用已取消或很短的 context,讓測試快速結束。
1func TestSyncWorkerRunStopsWhenContextCanceled(t *testing.T) {
2 source := fakeNotificationSource{}
3 processor := &recordingProcessor{}
4 worker := NewSyncWorker(source, processor, slog.Default())
5
6 ctx, cancel := context.WithCancel(context.Background())
7 cancel()
8
9 err := worker.Run(ctx, time.Hour)
10 if !errors.Is(err, context.Canceled) {
11 t.Fatalf("error = %v, want context.Canceled", err)
12 }
13}這個測試用 time.Hour 當 interval,因為 context 已經取消,Run 應該立刻退出,不需要等 ticker。
【執行】queue full 測試要固定 buffer
queue full 測試的核心目標是確認 backpressure 策略。buffer 設成 1,先塞滿,再確認第二次 enqueue 回錯。
1func TestQueueWorkerEnqueueFull(t *testing.T) {
2 worker := NewQueueWorker(&recordingProcessor{}, slog.Default(), 1)
3
4 err := worker.Enqueue(RawNotificationUpdate{ID: "evt_1"})
5 if err != nil {
6 t.Fatalf("first enqueue: %v", err)
7 }
8
9 err = worker.Enqueue(RawNotificationUpdate{ID: "evt_2"})
10 if !errors.Is(err, ErrQueueFull) {
11 t.Fatalf("error = %v, want ErrQueueFull", err)
12 }
13}這個測試不啟動 worker,所以 channel 裡第一筆資料不會被消費,第二筆必然遇到 full。這比用 sleep 製造滿載狀態穩定。
實作檢查清單
新增 background worker 時,可以依序檢查:
- worker 責任是否明確:消費什麼,產生什麼,交給誰
- 是否有
Run(ctx)作為生命週期入口 - 單次工作是否拆成
SyncOnce或handleUpdate - worker 是否尊重
ctx.Done() - ticker 是否
defer Stop() - channel buffer 是否有明確 backpressure 策略
- queue full 是否回錯或記錄,而不是靜默丟棄
- worker 是否呼叫 usecase/processor,而不是直接改 repository
- 測試是否避免真實長時間 sleep
設計檢查
檢查一:goroutine 要有停止條件
長時間 goroutine 需要 context、channel close 或其他退出條件。缺少停止條件時,服務運行越久,越容易累積難以診斷的資源問題。
檢查二:worker 透過 processor 或 usecase 修改狀態
worker 透過 processor 或 usecase 修改狀態,可以讓背景流程和即時流程共用同一套規則。worker 直接改 repository 時,狀態規則容易分散。
檢查三:queue full 要有明確策略
資料可以丟時,應明確記錄 log 或 metric;資料需要保留時,應 blocking 或回錯。queue full 策略明確,後續追蹤才有依據。
檢查四:測試隔離真實時間
測試優先測 SyncOnce,再用已取消 context 測 Run 的退出行為。真實 ticker 等待數秒會讓測試慢且不穩。
本章不處理
本章先處理單一背景工作如何啟動、停止與回報;cron、queue、retry 與 outbox,會在下列章節再往外延伸:
- Go 進階:bounded worker pool
- Go 進階:select loop 的生命週期設計
- Go 進階:Durable queue、outbox 與 idempotency
- Backend:訊息佇列與事件傳遞
和 Go 教材的關係
這一章承接的是 goroutine 生命週期與 event processing;如果你要先回看語言教材,可以讀: