MongoDB change streams 是 3.6+ 原生 CDC 介面、本質上是 oplog tail 包裝成 cursor API。Application 從 dual-write 模式(自己寫 MongoDB 又寫 Elasticsearch / Redis / data warehouse)換成 change stream → Kafka → downstream sink 後、有了第一版 CDC pipeline、但連續工作幾週後出現「downstream 漏 event」或「duplicate event」;最痛的是 connector restart 後 resume token 過期(oplog 已滾掉)、整個 collection 必須重灌。本文把 change stream 機制、Kafka Connector 配置、resume token 治理、sharded cluster scope 選擇講清楚。

本文不重複 MongoDB vendor overview 已寫過的 change streams 簡介 — 而是 production CDC pipeline 部署 + 失敗修復的實作層教學。

MongoDB 適用度前置判讀:進到 CDC pipeline 設計前先確認 workload 在 MongoDB 適用區(document shape 主導 / contract layer 該放哪 / 跨雲 hedging 是否需要)— 詳見 schema-design-pattern 開頭 3 軸前置判讀、本篇不重複展開。Change streams 是 已選 MongoDB 後 的 event-driven 整合議題。

問題情境:第一版 CDC pipeline 跑幾週的踩雷

典型觸發場景:application 寫 MongoDB 後還要 dual-write Elasticsearch / Redis / data warehouse、application code 越塞越多 hook、寫入失敗的補償邏輯散落各處。改用 change stream → Kafka → downstream sink 後、有了第一版 CDC pipeline、但連續工作幾週後出現:

  • Downstream 漏 event 或 duplicate event
  • Connector restart 後 resume token 過期(oplog 已滾掉)、整個 collection 必須重灌
  • Sharded cluster 上 collection-level change stream 跟 cluster-wide change stream 行為不同、application 連 mongos 跟連 single shard 拿到不同 event

讀者徵兆:

  • MongoDB Kafka Connector log ChangeStreamHistoryLostResumeTokenChanged
  • Downstream Kafka topic event count vs source collection write count 不平
  • Replication oplog 跟 change stream consumer 的 lag 同時升

Case anchor:CDC pipeline resume token 過期導致全量重灌的具體 incident 細節需未來 case 補完、本文以「常見 failure pattern」+ 容量公式處理、不憑空編造 incident 數字。側面引用 Spotify Kafka → PubSub migration(pipeline-level migration 經驗對照)。

核心機制

Change stream 是 MongoDB 3.6+ 原生 CDC、本質上是 oplog tail 包裝成 cursor API。可以從 collection / database / cluster 三個 scope 開:

  • Collection-level:監看單一 collection 的變更
  • Database-level:監看整個 database 的所有 collection
  • Cluster-wide:監看整個 cluster 的所有 database

Oplog 是 capped collection、預設 size = disk 5% 或 50GB(取較小)。Resume token 對應 oplog entry 的 timestamp + UUID + documentKey。Token 必須對應仍在 oplog 內的 entry — oplog 滾掉就拿不到 token 對應的位置、ChangeStreamHistoryLost

Resume token 兩種用法

  • _id:每個 event 都帶、application 自己存
  • startAfter / resumeAfter parameter:重啟 cursor 時帶上

fullDocument: "updateLookup":update event 預設只給 delta、加這個 option 會額外 query 一次 primary 拿完整 doc;高頻 update 下成本顯著(primary 負擔翻倍)。

Pre-image / post-image(6.0+):可以拿到 update 前的 doc 狀態、需 collection-level option changeStreamPreAndPostImages: true

Cluster-wide vs collection-level change stream

  • Cluster-wide 必須打 mongos、event ordering 是 global
  • Collection-level 可直接打單 shard、ordering 只在該 shard 內
  • Sharded cluster 上 cluster-wide stream 容易把 mongos 變單點瓶頸(所有 shard 的 event 都收斂到 mongos)

MongoDB Kafka Connector(Confluent / MongoDB 官方):

  • Source connector:把 change stream → Kafka topic
  • Sink connector:把 Kafka topic → MongoDB
  • At-least-once 語義、需 application 處理 idempotency

對應 knowledge card:change-data-capturereplication-channelreplication-slot(MongoDB 沒 slot、概念對照)。

操作流程

Step 1:scope 決策樹

Scope適用條件代價
Collection-level單一 collection 的下游 sink、ordering 需求單一多 collection 要多 connector
Database-level多 collection 共享 sink、ordering 跨 collectionfilter cost 在 connector 端
Cluster-wide整個 cluster 統一 audit / replaymongos 單點瓶頸風險、event 量大

Step 2:oplog sizing。容量公式:

1oplog size >= peak write rate × max acceptable consumer downtime

典型設 24-72 小時可恢復窗口。例:peak 5K WPS、想容忍 48 小時 connector down、oplog 至少 5K × 86400 × 2 ÷ docs_per_GB ≈ 看實際 doc size 決定。在 Atlas 上 oplog size 可直接調、自管 cluster 改 replSetResizeOplog

Step 3:Kafka Connector 配置

 1{
 2  "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
 3  "connection.uri": "mongodb://...",
 4  "database": "shop",
 5  "collection": "orders",
 6  "publish.full.document.only": "true",
 7  "change.stream.full.document": "updateLookup",
 8  "copy.existing": "true",
 9  "copy.existing.namespace.regex": "shop\\.orders",
10  "errors.tolerance": "none",
11  "offset.flush.interval.ms": "10000"
12}

關鍵欄位:

  • change.stream.full.document: "updateLookup":每 update 額外 query primary 拿完整 doc(成本意識)
  • copy.existing: "true":connector 啟動時先把現有 collection 全量複製、再切到 change stream — 適合初次部署
  • errors.tolerance: "none":sink 失敗時 batch 停在 dead-letter queue、不 silently drop

Step 4:resume token persistence。Connector 把 token 寫 Kafka __consumer_offsets 或外部 store;application 自管 change stream 時要寫到 durable store(不是 in-memory)。

Step 5:filter pipeline。Change stream 支援 aggregation pipeline 把過濾下推到 MongoDB:

1const pipeline = [
2  { $match: { "operationType": { $in: ["insert", "update", "delete"] } } },
3  { $match: { "fullDocument.region": "ap-tokyo" } }
4]
5const changeStream = db.orders.watch(pipeline)

把過濾下推減少 connector 處理量、特別是高頻 collection 上。

Step 6:downstream idempotency。Sink 收 Kafka event 時用 documentKey._id + clusterTime 做 dedup key — at-least-once 語義意味著 connector restart 後幾分鐘 event 會重發。

驗證點:

  • Source collection write count vs Kafka topic event count 差異 < 0.1%
  • Resume token age < oplog retention 的 50%(健康狀態)
  • Connector restart drill 能 5 分鐘內接回

Rollback boundary:source connector 是 read-only 對 MongoDB 無傷;sink connector 要備份 target 才能還原;resume token 寫錯 → 從 startAtOperationTime 回退到時間點重跑。

失敗模式

Resume token 過期(oplog 滾掉):connector down 太久、oplog 已超出 retention、ChangeStreamHistoryLost → 必須 copy.existing 全量重灌、期間 downstream 看不到新資料。預防是 oplog sizing 留 buffer + connector lag alarm + token age 監控(age > oplog retention 的 50% 預警)。

updateLookup 在高頻 update 下打爆 primary:每筆 update event 都觸發一次 primary query、primary 負擔翻倍。修法是改 collection-level pre/post image(6.0+)、由 MongoDB 自己在寫入時記錄、或在 application 補完整 doc 後再寫 Kafka、不用 updateLookup。

Sharded cluster cluster-wide stream 打爆 mongos:所有 shard 的 event 都收斂到 mongos、mongos 變單點瓶頸。修法是改 collection-level stream 多 connector 並行、每 connector 連 mongos 但只訂單一 collection。

At-least-once 變 duplicate flood:connector restart 點之後幾分鐘 event 重發、downstream 沒做 idempotency → 重複 side effect(重複發 email、重複扣款)。修法是 sink 端強制 idempotency(dedup key 寫 Redis / DB)、不能假設「我用 at-least-once 但實際不會 duplicate」。

Schema drift 突然 break sink:MongoDB 寫了新欄位 / 改型別、sink connector 的 JSON schema 不認、batch 停在 dead-letter queue。修法是 schema 變動有 validation gate(見 schema design pattern)、sink schema 設 lenient 模式吃 unknown field、或加 schema registry 統一版本。

Backup / DDL 期間 change stream 異常reIndex / compact / dropCollection 觸發特殊 event、connector 沒處理 → consumer 停。修法是 connector 處理特殊 event 邏輯要明確、不認得的 operation type 至少 log warning 而不是 silently stuck。

Anti-recommendation:

  • 簡單的 outbox pattern + application transactional write 對於低吞吐 / 單 sink 的場景比 change stream + Kafka 簡單;不是所有「需要 event 通知」的場景都要 CDC pipeline
  • 若 downstream 只是同一 region 同團隊的 Elasticsearch index、$merge 寫進中介 collection 或 application 雙寫 + 對賬可能成本更低
  • Resume token 過期是這條路徑最痛的事故、oplog sizing 是 投資而不是成本 — 不要為了省 storage 把 oplog 設太小

容量與觀測

關鍵 metric:

  • Oplog 健康:oplog 寫入速率與保留時間
  • Change stream 健康:cursor age、resume token 距 oplog 頭尾的距離
  • Connector 健康:connector lag(Kafka offset 對比 source write)
  • 下游健康:event count diff(source write count vs sink apply count)、event time → arrival time lag 分布

Mongo command:

  • db.getReplicationInfo():oplog 大小 / 時間範圍
  • db.printReplicationInfo():oplog 摘要
  • db.currentOp({ "op": "getmore", "ns": "local.oplog.rs" }):看 change stream consumer 連線

Connector metric(Kafka Connect JMX):source-record-poll-ratesource-record-write-rateoffset-commit-success-rate

回到 4.20 observability evidence:oplog retention + connector lag + dedup rate 是 CDC pipeline 健康狀態 evidence 三件套。

回到 9.5 bottleneck localization:CDC lag 升高時區分 (a) source oplog 寫太快 (b) connector 處理慢 (c) downstream sink 慢。

邊界與整合

Sibling deep articles:

Migration playbook:

  • MongoDB → 其他 sink 的 bulk migration 走 → Atlas Migration Service
  • 遷出 MongoDB 時 change stream 是 catch-up 機制(先 bulk export、再 change stream 補增量)

跟 1.x 互引:1.7 schema migration rollout evidence 處理 schema drift 時 CDC pipeline 的對賬;1.9 reconciliation data repair 處理 CDC 失準後的對賬流程。

相關連結