2.3 訂閱模型與訊息路由
訂閱模型的核心目標是把 client action 轉成明確的連線狀態與回應訊息。WebSocket 是長連線,單次 action 失敗通常不應直接關閉連線;router 應把錯誤轉成可理解的 server message。
本章目標
學完本章後,你將能夠:
- 設計穩定的 client action envelope
- 把 router、handler、usecase 與 client state 分開
- 用訂閱集合表達 client 想收到的 topic
- 在 broadcast 前檢查訂閱狀態
- 測試 action routing、payload validation 與 error response
【觀察】client message 很容易變成臨時協定
WebSocket action 的核心風險是前後端快速加功能時,訊息格式變成一堆臨時欄位。若 action 名稱依賴按鈕、畫面或短期 UI 狀態,server 很快會累積難以維護的分支。
不穩定的訊息格式:
1{
2 "button": "watch",
3 "tab": "jobs",
4 "id": "topic_1"
5}這種訊息描述 UI 發生什麼,不是描述 client 想對服務做什麼。服務端應該接收穩定 action,例如 subscribe_topic、unsubscribe_topic、list_subscriptions。
【判讀】action 是 client intent
Client action 的核心語意是「client 想做什麼」。它不是 domain event,因為它還沒被驗證、授權或套用規則。Domain event 表示已經發生的事,action 表示請求。
1type ClientAction string
2
3const (
4 ActionSubscribeTopic ClientAction = "subscribe_topic"
5 ActionUnsubscribeTopic ClientAction = "unsubscribe_topic"
6 ActionListTopics ClientAction = "list_topics"
7)
8
9type ClientMessage struct {
10 Action ClientAction `json:"action"`
11 Data json.RawMessage `json:"data,omitempty"`
12}外層 envelope 穩定,內層 Data 依 action 解析。這讓 read pump 可以先解析 envelope,router 再依 action 決定 payload 型別。
【策略】router 負責分派,不擁有全部規則
Router 的核心責任是把 action 分派到對應 handler。它應該知道有哪些 action,但不應把訂閱規則、權限檢查、資料查詢全部塞在一個巨大 switch 裡。
1type Router struct {
2 subscriptions *SubscriptionService
3}
4
5func (r Router) Route(ctx context.Context, client *Client, message ClientMessage) error {
6 switch message.Action {
7 case ActionSubscribeTopic:
8 return r.handleSubscribe(ctx, client, message.Data)
9 case ActionUnsubscribeTopic:
10 return r.handleUnsubscribe(ctx, client, message.Data)
11 case ActionListTopics:
12 return r.handleListTopics(ctx, client)
13 default:
14 return fmt.Errorf("unknown action: %s", message.Action)
15 }
16}switch 讓支援的 action 集中可見。真正的訂閱狀態修改可以交給 SubscriptionService 或 client method,避免 router 變成所有規則的聚集地。
【執行】payload validation 在 action 邊界完成
Payload validation 的核心責任是讓內部服務只收到有效 command。訂閱 topic 至少要檢查 JSON 格式、topic 是否空白、topic 名稱是否符合規則。
1type SubscribeTopicRequest struct {
2 Topic string `json:"topic"`
3}
4
5type SubscribeTopicCommand struct {
6 ClientID string
7 Topic string
8}
9
10func (r Router) handleSubscribe(ctx context.Context, client *Client, raw json.RawMessage) error {
11 var req SubscribeTopicRequest
12 if err := json.Unmarshal(raw, &req); err != nil {
13 return fmt.Errorf("decode subscribe request: %w", err)
14 }
15
16 topic := strings.TrimSpace(req.Topic)
17 if topic == "" {
18 return fmt.Errorf("topic is required")
19 }
20
21 cmd := SubscribeTopicCommand{
22 ClientID: client.ID(),
23 Topic: topic,
24 }
25
26 return r.subscriptions.Subscribe(ctx, client, cmd)
27}Request struct 是 wire format,command 是內部意圖。兩者分開後,JSON 命名、驗證錯誤與內部服務規則不會混在同一個型別。
【執行】訂閱集合是連線狀態
訂閱集合的核心語意是「這個 client 目前想收到哪些 topic」。它可以放在 client 上,也可以由 hub 集中保存;重點是 owner 要明確。
Client owner 版本:
1type Client struct {
2 id string
3
4 mu sync.RWMutex
5 subscriptions map[string]struct{}
6}
7
8func (c *Client) Subscribe(topic string) {
9 c.mu.Lock()
10 defer c.mu.Unlock()
11 c.subscriptions[topic] = struct{}{}
12}
13
14func (c *Client) Unsubscribe(topic string) {
15 c.mu.Lock()
16 defer c.mu.Unlock()
17 delete(c.subscriptions, topic)
18}
19
20func (c *Client) IsSubscribed(topic string) bool {
21 c.mu.RLock()
22 defer c.mu.RUnlock()
23 _, ok := c.subscriptions[topic]
24 return ok
25}map[string]struct{} 是 Go 常見 set 表示法。若 read pump 修改訂閱,hub broadcast 讀取訂閱,就需要 lock 或把所有訂閱操作集中到 hub event loop。
【策略】訂閱狀態也需要 copy boundary
訂閱列表的核心風險是直接回傳 map 會暴露內部狀態。若需要列出目前訂閱,應回傳 slice 或 map copy。
1func (c *Client) Subscriptions() []string {
2 c.mu.RLock()
3 defer c.mu.RUnlock()
4
5 topics := make([]string, 0, len(c.subscriptions))
6 for topic := range c.subscriptions {
7 topics = append(topics, topic)
8 }
9 sort.Strings(topics)
10 return topics
11}回傳 sorted slice 讓測試更穩定,也避免呼叫端修改內部 map。排序不是業務必要條件,但對 API response 與測試可讀性有幫助。
【執行】成功與失敗都應回 server message
WebSocket action 的核心互動模式是 request-like,但連線不會因單次 action 結束。成功或失敗都應回一筆 server message,讓 client 能更新 UI 或顯示錯誤。
1type ServerMessage struct {
2 Type string `json:"type"`
3 Topic string `json:"topic,omitempty"`
4 Error string `json:"error,omitempty"`
5}
6
7func (s *SubscriptionService) Subscribe(ctx context.Context, client *Client, cmd SubscribeTopicCommand) error {
8 client.Subscribe(cmd.Topic)
9 ok := client.TrySend(ServerMessage{
10 Type: "topic_subscribed",
11 Topic: cmd.Topic,
12 })
13 if !ok {
14 return ErrClientQueueFull
15 }
16 return nil
17}若 action 失敗,read pump 或 router wrapper 可以把錯誤轉成 ServerMessage{Type: "error"}。不要只寫 server log,因為 client 需要知道該 action 沒有成功。
【執行】broadcast 前檢查訂閱
Broadcast 的核心規則是 producer 只產生 topic 與 message,hub 決定哪些 client 應該收到。訂閱邏輯不應散落在每個 producer 裡。
1func (h *Hub) Broadcast(topic string, message ServerMessage) {
2 for client := range h.clients {
3 if !client.IsSubscribed(topic) {
4 continue
5 }
6
7 if ok := client.TrySend(message); !ok {
8 h.unregister <- client
9 }
10 }
11}這段程式先檢查訂閱,再嘗試送出。若 client 的 send buffer 滿了,hub 可以 unregister 或採用其他慢 client 策略;下一章會專門處理。
【測試】router test 不需要真實 WebSocket
Router 的測試核心是 action 到行為的對應。它不需要真實 WebSocket connection,只需要 fake client 或檢查 client state。
1func TestSubscribeActionAddsTopic(t *testing.T) {
2 client := NewTestClient("client_1")
3 router := Router{subscriptions: NewSubscriptionService()}
4
5 data := json.RawMessage(`{"topic":"alerts"}`)
6 err := router.Route(context.Background(), client, ClientMessage{
7 Action: ActionSubscribeTopic,
8 Data: data,
9 })
10 if err != nil {
11 t.Fatalf("route subscribe: %v", err)
12 }
13
14 if !client.IsSubscribed("alerts") {
15 t.Fatalf("client should subscribe to alerts")
16 }
17}Payload validation 也應獨立測:
1func TestSubscribeActionRequiresTopic(t *testing.T) {
2 client := NewTestClient("client_1")
3 router := Router{subscriptions: NewSubscriptionService()}
4
5 err := router.Route(context.Background(), client, ClientMessage{
6 Action: ActionSubscribeTopic,
7 Data: json.RawMessage(`{"topic":" "}`),
8 })
9 if err == nil {
10 t.Fatalf("empty topic should return error")
11 }
12}WebSocket integration test 留給「真實 client/server 互動」;router 單元測試先確保協定語意正確。
本章不處理
本章先處理 action envelope 到 subscription 的路由與 ownership;授權、presence 與跨節點同步,會在下列章節延伸:
和 Go 教材的關係
這一章承接的是 WebSocket action、event fusion 與 handler boundary;如果你要先回看語言教材,可以讀:
- Go:如何新增一個即時訊息 action
- Go:如何新增一種 domain event
- Go:事件融合
- Go:把 handler 邏輯拆成可測單元
- Backend:快取與 Redis
- Backend:訊息佇列與事件傳遞
小結
訂閱模型把 client action 轉成連線狀態與 server response。Action 是 client intent,不是 domain event;router 負責分派,payload validation 在邊界完成,訂閱集合要有明確 owner,broadcast 由 hub 統一檢查訂閱。這樣新增 action 或 topic 時,修改範圍會清楚且可測。