本案例基於 .claude/lib/markdown_link_checker.py,展示如何用 ThreadPoolExecutor 加速 I/O 密集的檔案檢查任務。

先備知識

問題背景

現有設計

markdown_link_checker.pycheck_directory() 方法檢查目錄下所有 Markdown 檔案的內部連結:

 1def check_directory(
 2    self,
 3    dir_path: str,
 4    recursive: bool = True
 5) -> List[LinkCheckResult]:
 6    """
 7    檢查目錄下所有 Markdown 檔案
 8
 9    Args:
10        dir_path: 目錄路徑
11        recursive: 是否遞迴檢查子目錄
12
13    Returns:
14        list[LinkCheckResult]: 所有檔案的檢查結果
15    """
16    dir_path = self._resolve_path(dir_path)
17
18    if not dir_path.is_dir():
19        return [
20            LinkCheckResult(
21                file_path=str(dir_path),
22                total_links=0,
23                broken_links=[
24                    BrokenLink(
25                        file=str(dir_path),
26                        line=0,
27                        link_text="",
28                        link_target="",
29                        suggestion=f"目錄不存在: {dir_path}"
30                    )
31                ]
32            )
33        ]
34
35    # 收集所有 .md 檔案
36    if recursive:
37        md_files = sorted(dir_path.rglob("*.md"))
38    else:
39        md_files = sorted(dir_path.glob("*.md"))
40
41    # 循序檢查每個檔案
42    results = []
43    for md_file in md_files:
44        results.append(self.check_file(str(md_file)))
45
46    return results

這個設計的優點

  1. 簡單直覺:循序執行,程式碼容易理解
  2. 結果有序:檔案按排序順序處理,結果也按順序返回
  3. 除錯容易:問題發生時,可以精確定位到哪個檔案

效能瓶頸分析

讓我們分析 check_file() 方法的執行時間組成:

 1def check_file(self, file_path: str) -> LinkCheckResult:
 2    """檢查單個 Markdown 檔案的連結"""
 3    file_path = self._resolve_path(file_path)
 4
 5    # 1. 檢查檔案是否存在(I/O)
 6    if not file_path.exists():
 7        return LinkCheckResult(...)
 8
 9    # 2. 讀取檔案內容(I/O - 主要瓶頸)
10    try:
11        content = file_path.read_text(encoding="utf-8")
12    except Exception as e:
13        return LinkCheckResult(...)
14
15    # 3. 解析連結(CPU - 很快)
16    links = self.parse_markdown_links(content)
17
18    # 4. 過濾內部連結(CPU - 很快)
19    internal_links = self._filter_internal_links(links)
20
21    # 5. 檢查每個連結(I/O - 檔案系統檢查)
22    broken_links = []
23    for link in internal_links:
24        is_valid, suggestion = self._check_link(
25            link["target"],
26            file_path.parent
27        )
28        if not is_valid:
29            broken_links.append(...)
30
31    return LinkCheckResult(...)

時間分布估計

1操作              | 類型  | 每檔案耗時
2-----------------|-------|----------
3read_text()      | I/O   | 1-5 ms
4parse_links()    | CPU   | 0.1 ms
5filter_links()   | CPU   | 0.01 ms
6check_link() x N | I/O   | N * 0.5 ms
7-----------------|-------|----------
8總計(10 連結)  |       | ~7 ms

對於 100 個檔案的專案:

1# 循序執行
2total_time = 100 * 7ms = 700ms = 0.7 
3
4# 這看起來不長,但如果:
5# - 檔案更多(500+ 個)
6# - 每個檔案連結更多
7# - 網路檔案系統(NFS)
8# 時間會快速增長

為什麼適合並行化?

  1. I/O 密集:大部分時間花在檔案讀取和存在性檢查
  2. 任務獨立:每個檔案的檢查互不依賴
  3. 無共享狀態:不需要同步機制

進階解決方案

設計目標

  1. 提升效能:利用並行化加速 I/O 操作
  2. 保持 API 相容:不改變方法簽名和返回值
  3. 可配置:允許調整並行度

實作步驟

步驟 1:識別獨立任務

每個檔案的檢查是完全獨立的:

1# 這些操作可以同時執行
2result_1 = checker.check_file("doc1.md")  # 獨立
3result_2 = checker.check_file("doc2.md")  # 獨立
4result_3 = checker.check_file("doc3.md")  # 獨立

步驟 2:使用 ThreadPoolExecutor

 1from concurrent.futures import ThreadPoolExecutor, as_completed
 2from typing import List, Optional
 3
 4def check_directory_parallel(
 5    self,
 6    dir_path: str,
 7    recursive: bool = True,
 8    max_workers: Optional[int] = None
 9) -> List[LinkCheckResult]:
10    """
11    並行檢查目錄下所有 Markdown 檔案
12
13    Args:
14        dir_path: 目錄路徑
15        recursive: 是否遞迴檢查子目錄
16        max_workers: 最大工作執行緒數,預設為 CPU 核心數
17
18    Returns:
19        list[LinkCheckResult]: 所有檔案的檢查結果
20    """
21    dir_path = self._resolve_path(dir_path)
22
23    if not dir_path.is_dir():
24        return [self._create_error_result(dir_path, "目錄不存在")]
25
26    # 收集所有 .md 檔案
27    pattern = "**/*.md" if recursive else "*.md"
28    md_files = sorted(dir_path.glob(pattern) if not recursive
29                      else dir_path.rglob("*.md"))
30
31    if not md_files:
32        return []
33
34    # 使用 ThreadPoolExecutor 並行處理
35    results = []
36    with ThreadPoolExecutor(max_workers=max_workers) as executor:
37        # 提交所有任務
38        future_to_file = {
39            executor.submit(self.check_file, str(md_file)): md_file
40            for md_file in md_files
41        }
42
43        # 收集結果
44        for future in as_completed(future_to_file):
45            result = future.result()
46            results.append(result)
47
48    # 按檔案路徑排序(保持一致的輸出順序)
49    results.sort(key=lambda r: r.file_path)
50
51    return results

步驟 3:選擇 max_workers

max_workers 的選擇影響效能:

 1import os
 2
 3# 預設值:min(32, os.cpu_count() + 4)
 4# 這是 Python 3.8+ 的預設行為
 5
 6# 對於 I/O 密集任務,可以設定更高
 7def get_optimal_workers(file_count: int) -> int:
 8    """
 9    根據檔案數量計算最佳工作執行緒數
10
11    經驗法則:
12    - 檔案數 < 10: 使用檔案數
13    - 檔案數 >= 10: 使用 CPU 核心數 * 2,但不超過 32
14    """
15    cpu_count = os.cpu_count() or 4
16
17    if file_count < 10:
18        return file_count
19
20    return min(32, cpu_count * 2, file_count)

完整程式碼

  1#!/usr/bin/env python3
  2"""
  3並行 Markdown 連結檢查器
  4
  5基於 markdown_link_checker.py,展示如何用 ThreadPoolExecutor 加速檔案檢查。
  6"""
  7
  8import os
  9import re
 10from concurrent.futures import ThreadPoolExecutor, as_completed
 11from dataclasses import dataclass, field
 12from pathlib import Path
 13from typing import Dict, List, Optional, Tuple
 14
 15@dataclass
 16class BrokenLink:
 17    """失效連結描述"""
 18    file: str
 19    line: int
 20    link_text: str
 21    link_target: str
 22    suggestion: str = ""
 23
 24@dataclass
 25class LinkCheckResult:
 26    """單個檔案的連結檢查結果"""
 27    file_path: str
 28    total_links: int
 29    broken_links: List[BrokenLink] = field(default_factory=list)
 30    is_valid: bool = True
 31
 32    def __post_init__(self):
 33        self.is_valid = len(self.broken_links) == 0
 34
 35class ParallelMarkdownLinkChecker:
 36    """
 37    並行 Markdown 連結檢查器
 38
 39    相較於原版的改進:
 40    - check_directory() 使用 ThreadPoolExecutor 並行處理
 41    - 支援自訂 max_workers
 42    - 保持 API 相容性
 43    """
 44
 45    INLINE_LINK_PATTERN = re.compile(r'(?<!!)\[([^\]]+)\]\(([^)]+)\)')
 46    EXTERNAL_PATTERNS = [r'^https?://', r'^mailto:', r'^tel:', r'^ftp://']
 47
 48    def __init__(self, project_root: Optional[str] = None):
 49        if project_root is None:
 50            project_root = os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd())
 51        self.project_root = Path(project_root)
 52
 53    # ===== 核心方法 =====
 54
 55    def check_file(self, file_path: str) -> LinkCheckResult:
 56        """
 57        檢查單個 Markdown 檔案的連結
 58
 59        這個方法是執行緒安全的,可以並行呼叫。
 60        """
 61        file_path = self._resolve_path(file_path)
 62
 63        if not file_path.exists():
 64            return LinkCheckResult(
 65                file_path=str(file_path),
 66                total_links=0,
 67                broken_links=[
 68                    BrokenLink(
 69                        file=str(file_path), line=0,
 70                        link_text="", link_target="",
 71                        suggestion=f"檔案不存在: {file_path}"
 72                    )
 73                ]
 74            )
 75
 76        try:
 77            content = file_path.read_text(encoding="utf-8")
 78        except Exception as e:
 79            return LinkCheckResult(
 80                file_path=str(file_path),
 81                total_links=0,
 82                broken_links=[
 83                    BrokenLink(
 84                        file=str(file_path), line=0,
 85                        link_text="", link_target="",
 86                        suggestion=f"無法讀取檔案: {e}"
 87                    )
 88                ]
 89            )
 90
 91        links = self._parse_links(content)
 92        internal_links = self._filter_internal_links(links)
 93
 94        broken_links = []
 95        for link in internal_links:
 96            is_valid, suggestion = self._check_link(
 97                link["target"], file_path.parent
 98            )
 99            if not is_valid:
100                broken_links.append(
101                    BrokenLink(
102                        file=str(file_path),
103                        line=link["line"],
104                        link_text=link["text"],
105                        link_target=link["target"],
106                        suggestion=suggestion
107                    )
108                )
109
110        return LinkCheckResult(
111            file_path=str(file_path),
112            total_links=len(internal_links),
113            broken_links=broken_links
114        )
115
116    def check_directory(
117        self,
118        dir_path: str,
119        recursive: bool = True,
120        max_workers: Optional[int] = None
121    ) -> List[LinkCheckResult]:
122        """
123        並行檢查目錄下所有 Markdown 檔案
124
125        Args:
126            dir_path: 目錄路徑
127            recursive: 是否遞迴檢查子目錄
128            max_workers: 最大工作執行緒數,None 表示使用預設值
129
130        Returns:
131            list[LinkCheckResult]: 所有檔案的檢查結果(按路徑排序)
132        """
133        dir_path = self._resolve_path(dir_path)
134
135        if not dir_path.is_dir():
136            return [
137                LinkCheckResult(
138                    file_path=str(dir_path),
139                    total_links=0,
140                    broken_links=[
141                        BrokenLink(
142                            file=str(dir_path), line=0,
143                            link_text="", link_target="",
144                            suggestion=f"目錄不存在: {dir_path}"
145                        )
146                    ]
147                )
148            ]
149
150        # 收集所有 .md 檔案
151        if recursive:
152            md_files = list(dir_path.rglob("*.md"))
153        else:
154            md_files = list(dir_path.glob("*.md"))
155
156        if not md_files:
157            return []
158
159        # 計算最佳工作執行緒數
160        if max_workers is None:
161            max_workers = self._get_optimal_workers(len(md_files))
162
163        # 並行處理
164        results: List[LinkCheckResult] = []
165
166        with ThreadPoolExecutor(max_workers=max_workers) as executor:
167            # 提交所有任務
168            future_to_file = {
169                executor.submit(self.check_file, str(f)): f
170                for f in md_files
171            }
172
173            # 收集結果(as_completed 提供最快的回應)
174            for future in as_completed(future_to_file):
175                try:
176                    result = future.result()
177                    results.append(result)
178                except Exception as e:
179                    # 處理意外錯誤
180                    md_file = future_to_file[future]
181                    results.append(
182                        LinkCheckResult(
183                            file_path=str(md_file),
184                            total_links=0,
185                            broken_links=[
186                                BrokenLink(
187                                    file=str(md_file), line=0,
188                                    link_text="", link_target="",
189                                    suggestion=f"檢查失敗: {e}"
190                                )
191                            ]
192                        )
193                    )
194
195        # 排序以保持一致的輸出順序
196        results.sort(key=lambda r: r.file_path)
197
198        return results
199
200    # ===== 循序版本(用於比較)=====
201
202    def check_directory_sequential(
203        self,
204        dir_path: str,
205        recursive: bool = True
206    ) -> List[LinkCheckResult]:
207        """循序版本,用於效能比較"""
208        dir_path = self._resolve_path(dir_path)
209
210        if not dir_path.is_dir():
211            return [
212                LinkCheckResult(
213                    file_path=str(dir_path),
214                    total_links=0,
215                    broken_links=[
216                        BrokenLink(
217                            file=str(dir_path), line=0,
218                            link_text="", link_target="",
219                            suggestion=f"目錄不存在: {dir_path}"
220                        )
221                    ]
222                )
223            ]
224
225        if recursive:
226            md_files = sorted(dir_path.rglob("*.md"))
227        else:
228            md_files = sorted(dir_path.glob("*.md"))
229
230        results = []
231        for md_file in md_files:
232            results.append(self.check_file(str(md_file)))
233
234        return results
235
236    # ===== 私有方法 =====
237
238    def _resolve_path(self, path: str) -> Path:
239        p = Path(path)
240        return p if p.is_absolute() else self.project_root / p
241
242    def _parse_links(self, content: str) -> List[Dict]:
243        links = []
244        in_code_block = False
245
246        for line_num, line in enumerate(content.split('\n'), start=1):
247            if line.strip().startswith("```"):
248                in_code_block = not in_code_block
249                continue
250
251            if in_code_block:
252                continue
253
254            for match in self.INLINE_LINK_PATTERN.finditer(line):
255                links.append({
256                    "text": match.group(1),
257                    "target": match.group(2),
258                    "line": line_num
259                })
260
261        return links
262
263    def _filter_internal_links(self, links: List[Dict]) -> List[Dict]:
264        internal = []
265        for link in links:
266            target = link["target"]
267            if target.startswith("#"):
268                continue
269            if any(re.match(p, target) for p in self.EXTERNAL_PATTERNS):
270                continue
271            internal.append(link)
272        return internal
273
274    def _check_link(
275        self,
276        target: str,
277        base_dir: Path
278    ) -> Tuple[bool, str]:
279        target_path = target.split("#")[0]
280        if not target_path:
281            return True, ""
282
283        resolved = (base_dir / target_path).resolve()
284        if resolved.exists():
285            return True, ""
286        else:
287            return False, f"檔案不存在: {target_path}"
288
289    def _get_optimal_workers(self, file_count: int) -> int:
290        """計算最佳工作執行緒數"""
291        cpu_count = os.cpu_count() or 4
292        if file_count < 10:
293            return file_count
294        return min(32, cpu_count * 2, file_count)
295
296# ===== 效能測量工具 =====
297
298def benchmark_checker(
299    dir_path: str,
300    iterations: int = 3
301) -> Dict[str, float]:
302    """
303    比較循序與並行版本的效能
304
305    Args:
306        dir_path: 要檢查的目錄
307        iterations: 執行次數(取平均)
308
309    Returns:
310        dict: {'sequential': 秒數, 'parallel': 秒數, 'speedup': 加速比}
311    """
312    import time
313
314    checker = ParallelMarkdownLinkChecker()
315
316    # 預熱(讓檔案系統快取生效)
317    checker.check_directory(dir_path)
318
319    # 測量循序版本
320    seq_times = []
321    for _ in range(iterations):
322        start = time.perf_counter()
323        checker.check_directory_sequential(dir_path)
324        seq_times.append(time.perf_counter() - start)
325
326    # 測量並行版本
327    par_times = []
328    for _ in range(iterations):
329        start = time.perf_counter()
330        checker.check_directory(dir_path)
331        par_times.append(time.perf_counter() - start)
332
333    seq_avg = sum(seq_times) / len(seq_times)
334    par_avg = sum(par_times) / len(par_times)
335
336    return {
337        "sequential": seq_avg,
338        "parallel": par_avg,
339        "speedup": seq_avg / par_avg if par_avg > 0 else 0
340    }
341
342# ===== 示範 =====
343
344if __name__ == "__main__":
345    import sys
346
347    # 預設檢查當前目錄
348    target_dir = sys.argv[1] if len(sys.argv) > 1 else "."
349
350    print(f"=== 並行 Markdown 連結檢查示範 ===\n")
351    print(f"目標目錄: {target_dir}\n")
352
353    checker = ParallelMarkdownLinkChecker()
354
355    # 執行檢查
356    results = checker.check_directory(target_dir)
357
358    # 統計
359    total_files = len(results)
360    total_links = sum(r.total_links for r in results)
361    broken_count = sum(len(r.broken_links) for r in results)
362    invalid_files = sum(1 for r in results if not r.is_valid)
363
364    print(f"檔案數: {total_files}")
365    print(f"連結數: {total_links}")
366    print(f"失效連結: {broken_count}")
367    print(f"有問題的檔案: {invalid_files}")
368
369    # 顯示失效連結
370    if broken_count > 0:
371        print(f"\n失效連結詳情:")
372        for result in results:
373            if not result.is_valid:
374                print(f"\n  {result.file_path}:")
375                for link in result.broken_links:
376                    print(f"    Line {link.line}: [{link.link_text}](/python-advanced/08-practical-optimization/case-studies/parallel-file-check/{link.link_target})")
377
378    # 效能比較
379    if total_files >= 5:
380        print(f"\n=== 效能比較 ===\n")
381        benchmark = benchmark_checker(target_dir)
382        print(f"循序版本: {benchmark['sequential']:.3f} 秒")
383        print(f"並行版本: {benchmark['parallel']:.3f} 秒")
384        print(f"加速比: {benchmark['speedup']:.2f}x")

效能測量

使用 timeit 比較前後效能:

 1import timeit
 2from parallel_link_checker import ParallelMarkdownLinkChecker
 3
 4def measure_performance(dir_path: str, num_runs: int = 5):
 5    """測量並比較循序與並行版本的效能"""
 6    checker = ParallelMarkdownLinkChecker()
 7
 8    # 循序版本
 9    seq_time = timeit.timeit(
10        lambda: checker.check_directory_sequential(dir_path),
11        number=num_runs
12    ) / num_runs
13
14    # 並行版本
15    par_time = timeit.timeit(
16        lambda: checker.check_directory(dir_path),
17        number=num_runs
18    ) / num_runs
19
20    print(f"目錄: {dir_path}")
21    print(f"循序版本: {seq_time:.4f} 秒")
22    print(f"並行版本: {par_time:.4f} 秒")
23    print(f"加速比: {seq_time / par_time:.2f}x")
24
25# 實際測試結果(範例)
26# 目錄: ./docs (50 個 .md 檔案)
27# 循序版本: 0.3521 秒
28# 並行版本: 0.0892 秒
29# 加速比: 3.95x

不同規模的預期加速比

檔案數循序時間並行時間加速比
1070 ms25 ms2.8x
50350 ms90 ms3.9x
100700 ms160 ms4.4x
5003.5 s750 ms4.7x

注意:實際加速比取決於檔案大小、連結數量、磁碟速度等因素。

設計權衡

面向循序版本並行版本
效能較慢,線性增長快 3-5 倍
複雜度簡單需要理解執行緒池
除錯容易需要注意執行緒安全
記憶體較低較高(執行緒開銷)
結果順序保證有序需要額外排序
錯誤處理直接需要處理 Future 例外

執行緒安全考量

check_file() 方法是執行緒安全的,因為:

  1. 無共享狀態:每次呼叫都獨立處理一個檔案
  2. 唯讀操作:只讀取檔案,不修改
  3. 獨立返回值:每個呼叫返回獨立的 LinkCheckResult
1# 這是安全的
2def check_file(self, file_path: str) -> LinkCheckResult:
3    # 所有變數都是區域變數
4    file_path = self._resolve_path(file_path)  # 新物件
5    content = file_path.read_text()            # 區域變數
6    links = self._parse_links(content)         # 區域變數
7    # ...
8    return LinkCheckResult(...)                # 新物件

什麼時候該用這個技術?

適合使用

  • 多檔案處理:需要處理大量獨立檔案
  • I/O 密集:主要時間花在檔案讀寫
  • 任務獨立:每個任務不依賴其他任務的結果
  • 可接受亂序:或願意在最後排序

不建議使用

  • 檔案很少:少於 5 個檔案,並行開銷可能大於收益
  • CPU 密集:如果主要時間花在計算,應考慮 ProcessPoolExecutor
  • 有依賴關係:後續檔案依賴前面檔案的結果
  • 記憶體受限:並行版本會同時載入多個檔案

練習

基礎練習

練習 1:加入進度回報

 1def check_directory_with_progress(
 2    self,
 3    dir_path: str,
 4    callback: callable = None
 5) -> List[LinkCheckResult]:
 6    """
 7    並行檢查,並在每個檔案完成時呼叫 callback
 8
 9    callback 簽名: callback(completed: int, total: int, result: LinkCheckResult)
10
11    提示:使用 as_completed() 在每個任務完成時觸發回報
12    """
13    # Your implementation here
14    pass

練習 2:支援取消

 1def check_directory_cancellable(
 2    self,
 3    dir_path: str,
 4    cancel_event: threading.Event = None
 5) -> List[LinkCheckResult]:
 6    """
 7    可取消的並行檢查
 8
 9    當 cancel_event.is_set() 時,停止提交新任務並返回已完成的結果
10
11    提示:在迴圈中檢查 cancel_event
12    """
13    # Your implementation here
14    pass

進階練習

練習 3:批次處理大型目錄

 1def check_directory_batched(
 2    self,
 3    dir_path: str,
 4    batch_size: int = 100
 5) -> List[LinkCheckResult]:
 6    """
 7    分批處理大型目錄
 8
 9    避免一次提交太多任務導致記憶體問題
10
11    提示:將檔案列表分成多個批次,依序處理每批
12    """
13    # Your implementation here
14    pass

練習 4:加入重試機制

 1def check_file_with_retry(
 2    self,
 3    file_path: str,
 4    max_retries: int = 3
 5) -> LinkCheckResult:
 6    """
 7    帶重試的檔案檢查
 8
 9    當檔案被鎖定或暫時不可用時自動重試
10
11    提示:捕捉特定例外,使用指數退避
12    """
13    # Your implementation here
14    pass

挑戰題

練習 5:實作並行度自動調整

 1class AdaptiveParallelChecker:
 2    """
 3    自動調整並行度的檢查器
 4
 5    根據系統負載和檢查速度動態調整 max_workers
 6
 7    功能:
 8    - 初始使用保守的 max_workers
 9    - 如果任務完成很快,增加 max_workers
10    - 如果系統負載高,減少 max_workers
11    - 記錄最佳 max_workers 供下次使用
12    """
13    # Your implementation here
14    pass

延伸閱讀


下一章:並行 Hook 驗證