本章將入門系列學到的 concurrent.futures 知識,應用於 .claude/lib 中的真實程式碼,展示如何識別並行化機會、實作並行版本,以及測量效能差異。

學習目標

完成本章後,你將能夠:

  1. 識別並行化機會:判斷一段程式碼是否適合並行化
  2. 應用 ThreadPoolExecutor:將循序處理改寫為並行版本
  3. 實作進度報告:使用 as_completed() 追蹤任務完成狀態
  4. 處理並行錯誤:避免單一任務失敗影響整體執行
  5. 測量效能差異:用數據證明優化效果

先備知識

本章假設你已經讀過入門系列的並行處理章節:

如果你對 ThreadPoolExecutorexecutor.map()as_completed() 還不熟悉,建議先閱讀該章節。


識別並行化機會

I/O 密集 vs CPU 密集:快速判斷法

在入門系列中,我們學到 I/O 密集任務適合 ThreadPoolExecutor。但實際程式碼中,如何快速判斷?

問自己這個問題:程式在等什麼?

1# 模式 1:等待外部資源(I/O 密集)
2response = requests.get(url)      # 等網路
3content = file.read()             # 等磁碟
4result = cursor.execute(query)    # 等資料庫
5
6# 模式 2:純計算(CPU 密集)
7result = sum(i * i for i in range(10_000_000))  # 沒有等待,純運算

獨立任務的識別

並行化的前提是任務之間互相獨立。識別方法:

1獨立性檢查清單:
2[ ] 任務之間沒有共享可變狀態?
3[ ] 任務執行順序不影響結果?
4[ ] 任務 B 不依賴任務 A 的輸出?
5
6如果三個都勾選,就適合並行化。

讓我們看一個真實的例子。以下是 markdown_link_checker.py 中的 check_directory() 方法:

 1# 原始版本:循序處理
 2def check_directory(
 3    self,
 4    dir_path: str,
 5    recursive: bool = True
 6) -> List[LinkCheckResult]:
 7    """檢查目錄下所有 Markdown 檔案"""
 8    dir_path = self._resolve_path(dir_path)
 9
10    # 收集所有 .md 檔案
11    if recursive:
12        md_files = sorted(dir_path.rglob("*.md"))
13    else:
14        md_files = sorted(dir_path.glob("*.md"))
15
16    # 循序檢查每個檔案
17    results = []
18    for md_file in md_files:
19        results.append(self.check_file(str(md_file)))  # 一個接一個
20
21    return results

這段程式碼適合並行化嗎?讓我們用檢查清單分析:

問題分析結論
任務之間有共享狀態?results 只在主執行緒操作沒有
執行順序影響結果?檢查檔案 A 不影響檔案 B不影響
任務互相依賴?每個檔案獨立檢查不依賴
是 I/O 密集?check_file() 讀取檔案內容

結論:非常適合並行化


ThreadPoolExecutor 實戰

步驟 1:確認可並行化的函式

首先,確認被並行化的函式是「純函式」或至少沒有副作用:

 1def check_file(self, file_path: str) -> LinkCheckResult:
 2    """
 3    檢查單個 Markdown 檔案的連結
 4
 5    特性:
 6    - 輸入:檔案路徑
 7    - 輸出:檢查結果
 8    - 沒有修改外部狀態
 9    - 沒有依賴執行順序
10    """
11    # ... 實作省略

check_file() 滿足條件:

  • 只讀取檔案,不修改
  • 返回獨立的結果物件
  • 不依賴其他檔案的結果

步驟 2:改寫為並行版本

 1from concurrent.futures import ThreadPoolExecutor, as_completed
 2
 3def check_directory_parallel(
 4    self,
 5    dir_path: str,
 6    recursive: bool = True,
 7    max_workers: int = 8
 8) -> List[LinkCheckResult]:
 9    """並行檢查目錄下所有 Markdown 檔案"""
10    dir_path = self._resolve_path(dir_path)
11
12    # 收集所有 .md 檔案
13    if recursive:
14        md_files = sorted(dir_path.rglob("*.md"))
15    else:
16        md_files = sorted(dir_path.glob("*.md"))
17
18    # 並行檢查
19    results = []
20    with ThreadPoolExecutor(max_workers=max_workers) as executor:
21        # 提交所有任務
22        futures = {
23            executor.submit(self.check_file, str(md_file)): md_file
24            for md_file in md_files
25        }
26
27        # 收集結果
28        for future in as_completed(futures):
29            results.append(future.result())
30
31    return results

步驟 3:選擇 max_workers

max_workers 的選擇影響效能:

 1import os
 2
 3# I/O 密集任務:可以設定較高
 4# 經驗法則:CPU 核心數的 2-4 倍
 5max_workers = min(32, (os.cpu_count() or 1) + 4)
 6
 7# 或根據檔案數量動態調整
 8def get_optimal_workers(file_count: int) -> int:
 9    """根據檔案數量決定 worker 數量"""
10    cpu_count = os.cpu_count() or 1
11
12    # 檔案少時不需要太多 worker
13    if file_count < 10:
14        return min(file_count, cpu_count)
15
16    # 檔案多時,I/O 密集可以多開一些
17    return min(32, cpu_count * 2)

為什麼 I/O 密集可以超過 CPU 核心數?

因為執行緒在等待 I/O 時會釋放 GIL,其他執行緒可以繼續執行。如果有 8 個核心,但每個任務有 80% 時間在等待 I/O,那開 16-32 個 worker 可以更充分利用 CPU。


進度報告與錯誤處理

使用 as_completed() 報告進度

as_completed() 返回一個迭代器,任務完成時立即 yield,適合顯示進度:

 1from concurrent.futures import ThreadPoolExecutor, as_completed
 2from typing import Callable, Optional
 3
 4def check_directory_with_progress(
 5    self,
 6    dir_path: str,
 7    recursive: bool = True,
 8    max_workers: int = 8,
 9    progress_callback: Optional[Callable[[int, int, str], None]] = None
10) -> List[LinkCheckResult]:
11    """
12    並行檢查目錄,支援進度回報
13
14    Args:
15        progress_callback: 回呼函式 (completed, total, current_file)
16    """
17    dir_path = self._resolve_path(dir_path)
18
19    if recursive:
20        md_files = list(dir_path.rglob("*.md"))
21    else:
22        md_files = list(dir_path.glob("*.md"))
23
24    total = len(md_files)
25    results = []
26
27    with ThreadPoolExecutor(max_workers=max_workers) as executor:
28        # 建立 future -> 檔案 的映射
29        futures = {
30            executor.submit(self.check_file, str(f)): f
31            for f in md_files
32        }
33
34        completed = 0
35        for future in as_completed(futures):
36            completed += 1
37            current_file = futures[future]
38
39            # 回報進度
40            if progress_callback:
41                progress_callback(completed, total, str(current_file))
42
43            results.append(future.result())
44
45    return results
46
47# 使用範例
48def print_progress(completed: int, total: int, current_file: str):
49    percent = (completed / total) * 100
50    print(f"\r[{completed}/{total}] {percent:.1f}% - {current_file}", end="")
51
52checker = MarkdownLinkChecker()
53results = checker.check_directory_with_progress(
54    "docs/",
55    progress_callback=print_progress
56)
57print()  # 換行

錯誤處理:不讓單一失敗拖垮全部

並行處理時,單一任務的異常不應該中斷其他任務:

 1def check_directory_robust(
 2    self,
 3    dir_path: str,
 4    max_workers: int = 8
 5) -> Tuple[List[LinkCheckResult], List[Dict]]:
 6    """
 7    並行檢查,分開返回成功結果和錯誤
 8
 9    Returns:
10        (成功的結果列表, 錯誤資訊列表)
11    """
12    dir_path = self._resolve_path(dir_path)
13    md_files = list(dir_path.rglob("*.md"))
14
15    results = []
16    errors = []
17
18    with ThreadPoolExecutor(max_workers=max_workers) as executor:
19        futures = {
20            executor.submit(self.check_file, str(f)): f
21            for f in md_files
22        }
23
24        for future in as_completed(futures):
25            file_path = futures[future]
26            try:
27                result = future.result()
28                results.append(result)
29            except Exception as e:
30                # 記錄錯誤,繼續處理其他檔案
31                errors.append({
32                    "file": str(file_path),
33                    "error": str(e),
34                    "type": type(e).__name__
35                })
36
37    return results, errors
38
39# 使用範例
40results, errors = checker.check_directory_robust("docs/")
41
42print(f"成功檢查 {len(results)} 個檔案")
43if errors:
44    print(f"有 {len(errors)} 個錯誤:")
45    for err in errors:
46        print(f"  {err['file']}: {err['error']}")

效能測量

使用 timeit 比較效能

理論上並行會更快,但「更快多少」需要測量:

 1import timeit
 2from pathlib import Path
 3
 4# 準備測試資料
 5test_dir = Path("docs/")  # 假設有 50+ 個 .md 檔案
 6checker = MarkdownLinkChecker()
 7
 8# 測量循序版本
 9def test_sequential():
10    return checker.check_directory(str(test_dir))
11
12sequential_time = timeit.timeit(test_sequential, number=3) / 3
13
14# 測量並行版本
15def test_parallel():
16    return checker.check_directory_parallel(str(test_dir), max_workers=8)
17
18parallel_time = timeit.timeit(test_parallel, number=3) / 3
19
20# 計算加速比
21speedup = sequential_time / parallel_time
22
23print(f"循序版本:{sequential_time:.2f} 秒")
24print(f"並行版本:{parallel_time:.2f} 秒")
25print(f"加速比:{speedup:.2f}x")

真實測試結果參考

以下是在不同檔案數量下的測試結果(供參考):

檔案數量循序時間並行時間 (8 workers)加速比
100.15s0.08s1.9x
500.72s0.18s4.0x
1001.45s0.32s4.5x
2002.91s0.58s5.0x

觀察

  1. 檔案越多,並行效益越明顯
  2. 加速比不會無限增長(受限於 I/O 頻寬和 GIL)
  3. 檔案少於 10 個時,並行的額外開銷可能抵消效益

完整的效能測試腳本

 1#!/usr/bin/env python3
 2"""效能比較測試腳本"""
 3
 4import os
 5import sys
 6import time
 7from pathlib import Path
 8from concurrent.futures import ThreadPoolExecutor, as_completed
 9
10# 假設這是 markdown_link_checker 的簡化版
11class MarkdownChecker:
12    def check_file(self, file_path: str) -> dict:
13        """模擬檔案檢查(包含 I/O 延遲)"""
14        path = Path(file_path)
15        content = path.read_text(encoding="utf-8")
16        # 模擬一些處理時間
17        time.sleep(0.01)  # 10ms I/O 延遲
18        return {
19            "file": file_path,
20            "lines": len(content.splitlines()),
21            "chars": len(content)
22        }
23
24    def check_sequential(self, files: list) -> list:
25        """循序檢查"""
26        return [self.check_file(f) for f in files]
27
28    def check_parallel(self, files: list, max_workers: int = 8) -> list:
29        """並行檢查"""
30        results = []
31        with ThreadPoolExecutor(max_workers=max_workers) as executor:
32            futures = {executor.submit(self.check_file, f): f for f in files}
33            for future in as_completed(futures):
34                results.append(future.result())
35        return results
36
37def benchmark(func, *args, iterations: int = 3) -> float:
38    """測量函式執行時間"""
39    times = []
40    for _ in range(iterations):
41        start = time.perf_counter()
42        func(*args)
43        elapsed = time.perf_counter() - start
44        times.append(elapsed)
45    return sum(times) / len(times)
46
47def main():
48    # 收集測試檔案
49    test_dir = Path(".")  # 替換為你的目錄
50    files = list(test_dir.rglob("*.md"))[:100]  # 取前 100 個
51
52    if len(files) < 10:
53        print("需要至少 10 個 .md 檔案來測試")
54        sys.exit(1)
55
56    print(f"測試檔案數量:{len(files)}")
57
58    checker = MarkdownChecker()
59
60    # 測試不同 worker 數量
61    print("\n效能比較:")
62    print("-" * 50)
63
64    seq_time = benchmark(checker.check_sequential, files)
65    print(f"循序版本:{seq_time:.3f} 秒")
66
67    for workers in [2, 4, 8, 16]:
68        par_time = benchmark(checker.check_parallel, files, workers)
69        speedup = seq_time / par_time
70        print(f"並行 ({workers:2d} workers):{par_time:.3f} 秒 ({speedup:.2f}x)")
71
72if __name__ == "__main__":
73    main()

什麼時候不該用並行?

反模式 1:檔案數量太少

1# 不好:只有 3 個檔案還用並行
2files = ["a.md", "b.md", "c.md"]
3with ThreadPoolExecutor(max_workers=8) as executor:
4    results = list(executor.map(check_file, files))
5
6# 問題:建立執行緒池的開銷可能比省下的時間還多

建議:檔案少於 5-10 個時,直接循序處理。

1def check_files_smart(files: list) -> list:
2    """根據檔案數量選擇處理方式"""
3    if len(files) < 10:
4        # 少量檔案:循序處理
5        return [check_file(f) for f in files]
6    else:
7        # 大量檔案:並行處理
8        with ThreadPoolExecutor(max_workers=8) as executor:
9            return list(executor.map(check_file, files))

反模式 2:任務之間有依賴

1# 不好:任務 B 依賴任務 A 的結果
2def process_a():
3    return fetch_config()
4
5def process_b(config):  # 需要 A 的結果
6    return validate(config)
7
8# 強行並行化會出錯或需要額外同步

反模式 3:共享可變狀態

 1# 不好:多個執行緒修改同一個列表
 2shared_results = []
 3
 4def bad_worker(file):
 5    result = check_file(file)
 6    shared_results.append(result)  # 競爭條件!
 7
 8# 正確做法:讓 worker 返回結果,由主執行緒收集
 9def good_worker(file):
10    return check_file(file)
11
12with ThreadPoolExecutor() as executor:
13    results = list(executor.map(good_worker, files))

反模式 4:CPU 密集任務用 ThreadPoolExecutor

 1# 不好:CPU 密集任務受 GIL 限制
 2def compute_heavy(n):
 3    return sum(i * i for i in range(n))
 4
 5# ThreadPoolExecutor 對 CPU 密集任務沒有加速效果
 6with ThreadPoolExecutor(max_workers=8) as executor:
 7    results = list(executor.map(compute_heavy, [10_000_000] * 8))
 8
 9# 正確:CPU 密集應該用 ProcessPoolExecutor
10from concurrent.futures import ProcessPoolExecutor
11
12with ProcessPoolExecutor(max_workers=8) as executor:
13    results = list(executor.map(compute_heavy, [10_000_000] * 8))

快速決策表

情境推薦做法
< 10 個檔案循序處理
10-100 個檔案,I/O 密集ThreadPoolExecutor
> 100 個檔案,I/O 密集ThreadPoolExecutor + 進度報告
CPU 密集計算ProcessPoolExecutor
任務有依賴關係重新設計,或用 asyncio

思考題

  1. 為什麼 check_directory_parallel() 中使用 as_completed() 而不是 executor.map() 提示:思考兩者在「錯誤處理」和「進度報告」上的差異。

  2. 如果 check_file() 除了讀檔還會寫入日誌檔,還適合並行化嗎?需要什麼額外措施? 提示:考慮日誌寫入的執行緒安全性。

  3. 在 8 核心的機器上,為什麼 I/O 密集任務開 16 個 worker 可能比 8 個更快? 提示:思考 GIL 和 I/O 等待時間的關係。

實作練習

  1. 修改 hook_validator.py:將 validate_all_hooks() 改寫為並行版本,測量效能差異。

  2. 實作超時機制:如果單一檔案檢查超過 5 秒,自動跳過並記錄警告。 提示:查看 future.result(timeout=5)

  3. 實作批次處理:當檔案超過 1000 個時,分批處理(每批 100 個),避免記憶體壓力。

延伸閱讀


上一章:模組索引 下一章:效能調優實戰