8.1 並行處理實戰
本章將入門系列學到的 concurrent.futures 知識,應用於 .claude/lib 中的真實程式碼,展示如何識別並行化機會、實作並行版本,以及測量效能差異。
學習目標
完成本章後,你將能夠:
- 識別並行化機會:判斷一段程式碼是否適合並行化
- 應用 ThreadPoolExecutor:將循序處理改寫為並行版本
- 實作進度報告:使用
as_completed()追蹤任務完成狀態 - 處理並行錯誤:避免單一任務失敗影響整體執行
- 測量效能差異:用數據證明優化效果
先備知識
本章假設你已經讀過入門系列的並行處理章節:
- 3.7 並行處理 -
concurrent.futures基本用法
如果你對 ThreadPoolExecutor、executor.map()、as_completed() 還不熟悉,建議先閱讀該章節。
識別並行化機會
I/O 密集 vs CPU 密集:快速判斷法
在入門系列中,我們學到 I/O 密集任務適合 ThreadPoolExecutor。但實際程式碼中,如何快速判斷?
問自己這個問題:程式在等什麼?
1# 模式 1:等待外部資源(I/O 密集)
2response = requests.get(url) # 等網路
3content = file.read() # 等磁碟
4result = cursor.execute(query) # 等資料庫
5
6# 模式 2:純計算(CPU 密集)
7result = sum(i * i for i in range(10_000_000)) # 沒有等待,純運算獨立任務的識別
並行化的前提是任務之間互相獨立。識別方法:
1獨立性檢查清單:
2[ ] 任務之間沒有共享可變狀態?
3[ ] 任務執行順序不影響結果?
4[ ] 任務 B 不依賴任務 A 的輸出?
5
6如果三個都勾選,就適合並行化。真實案例:markdown_link_checker.py
讓我們看一個真實的例子。以下是 markdown_link_checker.py 中的 check_directory() 方法:
1# 原始版本:循序處理
2def check_directory(
3 self,
4 dir_path: str,
5 recursive: bool = True
6) -> List[LinkCheckResult]:
7 """檢查目錄下所有 Markdown 檔案"""
8 dir_path = self._resolve_path(dir_path)
9
10 # 收集所有 .md 檔案
11 if recursive:
12 md_files = sorted(dir_path.rglob("*.md"))
13 else:
14 md_files = sorted(dir_path.glob("*.md"))
15
16 # 循序檢查每個檔案
17 results = []
18 for md_file in md_files:
19 results.append(self.check_file(str(md_file))) # 一個接一個
20
21 return results這段程式碼適合並行化嗎?讓我們用檢查清單分析:
| 問題 | 分析 | 結論 |
|---|---|---|
| 任務之間有共享狀態? | results 只在主執行緒操作 | 沒有 |
| 執行順序影響結果? | 檢查檔案 A 不影響檔案 B | 不影響 |
| 任務互相依賴? | 每個檔案獨立檢查 | 不依賴 |
| 是 I/O 密集? | check_file() 讀取檔案內容 | 是 |
結論:非常適合並行化。
ThreadPoolExecutor 實戰
步驟 1:確認可並行化的函式
首先,確認被並行化的函式是「純函式」或至少沒有副作用:
1def check_file(self, file_path: str) -> LinkCheckResult:
2 """
3 檢查單個 Markdown 檔案的連結
4
5 特性:
6 - 輸入:檔案路徑
7 - 輸出:檢查結果
8 - 沒有修改外部狀態
9 - 沒有依賴執行順序
10 """
11 # ... 實作省略check_file() 滿足條件:
- 只讀取檔案,不修改
- 返回獨立的結果物件
- 不依賴其他檔案的結果
步驟 2:改寫為並行版本
1from concurrent.futures import ThreadPoolExecutor, as_completed
2
3def check_directory_parallel(
4 self,
5 dir_path: str,
6 recursive: bool = True,
7 max_workers: int = 8
8) -> List[LinkCheckResult]:
9 """並行檢查目錄下所有 Markdown 檔案"""
10 dir_path = self._resolve_path(dir_path)
11
12 # 收集所有 .md 檔案
13 if recursive:
14 md_files = sorted(dir_path.rglob("*.md"))
15 else:
16 md_files = sorted(dir_path.glob("*.md"))
17
18 # 並行檢查
19 results = []
20 with ThreadPoolExecutor(max_workers=max_workers) as executor:
21 # 提交所有任務
22 futures = {
23 executor.submit(self.check_file, str(md_file)): md_file
24 for md_file in md_files
25 }
26
27 # 收集結果
28 for future in as_completed(futures):
29 results.append(future.result())
30
31 return results步驟 3:選擇 max_workers
max_workers 的選擇影響效能:
1import os
2
3# I/O 密集任務:可以設定較高
4# 經驗法則:CPU 核心數的 2-4 倍
5max_workers = min(32, (os.cpu_count() or 1) + 4)
6
7# 或根據檔案數量動態調整
8def get_optimal_workers(file_count: int) -> int:
9 """根據檔案數量決定 worker 數量"""
10 cpu_count = os.cpu_count() or 1
11
12 # 檔案少時不需要太多 worker
13 if file_count < 10:
14 return min(file_count, cpu_count)
15
16 # 檔案多時,I/O 密集可以多開一些
17 return min(32, cpu_count * 2)為什麼 I/O 密集可以超過 CPU 核心數?
因為執行緒在等待 I/O 時會釋放 GIL,其他執行緒可以繼續執行。如果有 8 個核心,但每個任務有 80% 時間在等待 I/O,那開 16-32 個 worker 可以更充分利用 CPU。
進度報告與錯誤處理
使用 as_completed() 報告進度
as_completed() 返回一個迭代器,任務完成時立即 yield,適合顯示進度:
1from concurrent.futures import ThreadPoolExecutor, as_completed
2from typing import Callable, Optional
3
4def check_directory_with_progress(
5 self,
6 dir_path: str,
7 recursive: bool = True,
8 max_workers: int = 8,
9 progress_callback: Optional[Callable[[int, int, str], None]] = None
10) -> List[LinkCheckResult]:
11 """
12 並行檢查目錄,支援進度回報
13
14 Args:
15 progress_callback: 回呼函式 (completed, total, current_file)
16 """
17 dir_path = self._resolve_path(dir_path)
18
19 if recursive:
20 md_files = list(dir_path.rglob("*.md"))
21 else:
22 md_files = list(dir_path.glob("*.md"))
23
24 total = len(md_files)
25 results = []
26
27 with ThreadPoolExecutor(max_workers=max_workers) as executor:
28 # 建立 future -> 檔案 的映射
29 futures = {
30 executor.submit(self.check_file, str(f)): f
31 for f in md_files
32 }
33
34 completed = 0
35 for future in as_completed(futures):
36 completed += 1
37 current_file = futures[future]
38
39 # 回報進度
40 if progress_callback:
41 progress_callback(completed, total, str(current_file))
42
43 results.append(future.result())
44
45 return results
46
47# 使用範例
48def print_progress(completed: int, total: int, current_file: str):
49 percent = (completed / total) * 100
50 print(f"\r[{completed}/{total}] {percent:.1f}% - {current_file}", end="")
51
52checker = MarkdownLinkChecker()
53results = checker.check_directory_with_progress(
54 "docs/",
55 progress_callback=print_progress
56)
57print() # 換行錯誤處理:不讓單一失敗拖垮全部
並行處理時,單一任務的異常不應該中斷其他任務:
1def check_directory_robust(
2 self,
3 dir_path: str,
4 max_workers: int = 8
5) -> Tuple[List[LinkCheckResult], List[Dict]]:
6 """
7 並行檢查,分開返回成功結果和錯誤
8
9 Returns:
10 (成功的結果列表, 錯誤資訊列表)
11 """
12 dir_path = self._resolve_path(dir_path)
13 md_files = list(dir_path.rglob("*.md"))
14
15 results = []
16 errors = []
17
18 with ThreadPoolExecutor(max_workers=max_workers) as executor:
19 futures = {
20 executor.submit(self.check_file, str(f)): f
21 for f in md_files
22 }
23
24 for future in as_completed(futures):
25 file_path = futures[future]
26 try:
27 result = future.result()
28 results.append(result)
29 except Exception as e:
30 # 記錄錯誤,繼續處理其他檔案
31 errors.append({
32 "file": str(file_path),
33 "error": str(e),
34 "type": type(e).__name__
35 })
36
37 return results, errors
38
39# 使用範例
40results, errors = checker.check_directory_robust("docs/")
41
42print(f"成功檢查 {len(results)} 個檔案")
43if errors:
44 print(f"有 {len(errors)} 個錯誤:")
45 for err in errors:
46 print(f" {err['file']}: {err['error']}")效能測量
使用 timeit 比較效能
理論上並行會更快,但「更快多少」需要測量:
1import timeit
2from pathlib import Path
3
4# 準備測試資料
5test_dir = Path("docs/") # 假設有 50+ 個 .md 檔案
6checker = MarkdownLinkChecker()
7
8# 測量循序版本
9def test_sequential():
10 return checker.check_directory(str(test_dir))
11
12sequential_time = timeit.timeit(test_sequential, number=3) / 3
13
14# 測量並行版本
15def test_parallel():
16 return checker.check_directory_parallel(str(test_dir), max_workers=8)
17
18parallel_time = timeit.timeit(test_parallel, number=3) / 3
19
20# 計算加速比
21speedup = sequential_time / parallel_time
22
23print(f"循序版本:{sequential_time:.2f} 秒")
24print(f"並行版本:{parallel_time:.2f} 秒")
25print(f"加速比:{speedup:.2f}x")真實測試結果參考
以下是在不同檔案數量下的測試結果(供參考):
| 檔案數量 | 循序時間 | 並行時間 (8 workers) | 加速比 |
|---|---|---|---|
| 10 | 0.15s | 0.08s | 1.9x |
| 50 | 0.72s | 0.18s | 4.0x |
| 100 | 1.45s | 0.32s | 4.5x |
| 200 | 2.91s | 0.58s | 5.0x |
觀察:
- 檔案越多,並行效益越明顯
- 加速比不會無限增長(受限於 I/O 頻寬和 GIL)
- 檔案少於 10 個時,並行的額外開銷可能抵消效益
完整的效能測試腳本
1#!/usr/bin/env python3
2"""效能比較測試腳本"""
3
4import os
5import sys
6import time
7from pathlib import Path
8from concurrent.futures import ThreadPoolExecutor, as_completed
9
10# 假設這是 markdown_link_checker 的簡化版
11class MarkdownChecker:
12 def check_file(self, file_path: str) -> dict:
13 """模擬檔案檢查(包含 I/O 延遲)"""
14 path = Path(file_path)
15 content = path.read_text(encoding="utf-8")
16 # 模擬一些處理時間
17 time.sleep(0.01) # 10ms I/O 延遲
18 return {
19 "file": file_path,
20 "lines": len(content.splitlines()),
21 "chars": len(content)
22 }
23
24 def check_sequential(self, files: list) -> list:
25 """循序檢查"""
26 return [self.check_file(f) for f in files]
27
28 def check_parallel(self, files: list, max_workers: int = 8) -> list:
29 """並行檢查"""
30 results = []
31 with ThreadPoolExecutor(max_workers=max_workers) as executor:
32 futures = {executor.submit(self.check_file, f): f for f in files}
33 for future in as_completed(futures):
34 results.append(future.result())
35 return results
36
37def benchmark(func, *args, iterations: int = 3) -> float:
38 """測量函式執行時間"""
39 times = []
40 for _ in range(iterations):
41 start = time.perf_counter()
42 func(*args)
43 elapsed = time.perf_counter() - start
44 times.append(elapsed)
45 return sum(times) / len(times)
46
47def main():
48 # 收集測試檔案
49 test_dir = Path(".") # 替換為你的目錄
50 files = list(test_dir.rglob("*.md"))[:100] # 取前 100 個
51
52 if len(files) < 10:
53 print("需要至少 10 個 .md 檔案來測試")
54 sys.exit(1)
55
56 print(f"測試檔案數量:{len(files)}")
57
58 checker = MarkdownChecker()
59
60 # 測試不同 worker 數量
61 print("\n效能比較:")
62 print("-" * 50)
63
64 seq_time = benchmark(checker.check_sequential, files)
65 print(f"循序版本:{seq_time:.3f} 秒")
66
67 for workers in [2, 4, 8, 16]:
68 par_time = benchmark(checker.check_parallel, files, workers)
69 speedup = seq_time / par_time
70 print(f"並行 ({workers:2d} workers):{par_time:.3f} 秒 ({speedup:.2f}x)")
71
72if __name__ == "__main__":
73 main()什麼時候不該用並行?
反模式 1:檔案數量太少
1# 不好:只有 3 個檔案還用並行
2files = ["a.md", "b.md", "c.md"]
3with ThreadPoolExecutor(max_workers=8) as executor:
4 results = list(executor.map(check_file, files))
5
6# 問題:建立執行緒池的開銷可能比省下的時間還多建議:檔案少於 5-10 個時,直接循序處理。
1def check_files_smart(files: list) -> list:
2 """根據檔案數量選擇處理方式"""
3 if len(files) < 10:
4 # 少量檔案:循序處理
5 return [check_file(f) for f in files]
6 else:
7 # 大量檔案:並行處理
8 with ThreadPoolExecutor(max_workers=8) as executor:
9 return list(executor.map(check_file, files))反模式 2:任務之間有依賴
1# 不好:任務 B 依賴任務 A 的結果
2def process_a():
3 return fetch_config()
4
5def process_b(config): # 需要 A 的結果
6 return validate(config)
7
8# 強行並行化會出錯或需要額外同步反模式 3:共享可變狀態
1# 不好:多個執行緒修改同一個列表
2shared_results = []
3
4def bad_worker(file):
5 result = check_file(file)
6 shared_results.append(result) # 競爭條件!
7
8# 正確做法:讓 worker 返回結果,由主執行緒收集
9def good_worker(file):
10 return check_file(file)
11
12with ThreadPoolExecutor() as executor:
13 results = list(executor.map(good_worker, files))反模式 4:CPU 密集任務用 ThreadPoolExecutor
1# 不好:CPU 密集任務受 GIL 限制
2def compute_heavy(n):
3 return sum(i * i for i in range(n))
4
5# ThreadPoolExecutor 對 CPU 密集任務沒有加速效果
6with ThreadPoolExecutor(max_workers=8) as executor:
7 results = list(executor.map(compute_heavy, [10_000_000] * 8))
8
9# 正確:CPU 密集應該用 ProcessPoolExecutor
10from concurrent.futures import ProcessPoolExecutor
11
12with ProcessPoolExecutor(max_workers=8) as executor:
13 results = list(executor.map(compute_heavy, [10_000_000] * 8))快速決策表
| 情境 | 推薦做法 |
|---|---|
| < 10 個檔案 | 循序處理 |
| 10-100 個檔案,I/O 密集 | ThreadPoolExecutor |
| > 100 個檔案,I/O 密集 | ThreadPoolExecutor + 進度報告 |
| CPU 密集計算 | ProcessPoolExecutor |
| 任務有依賴關係 | 重新設計,或用 asyncio |
思考題
為什麼
check_directory_parallel()中使用as_completed()而不是executor.map()? 提示:思考兩者在「錯誤處理」和「進度報告」上的差異。如果
check_file()除了讀檔還會寫入日誌檔,還適合並行化嗎?需要什麼額外措施? 提示:考慮日誌寫入的執行緒安全性。在 8 核心的機器上,為什麼 I/O 密集任務開 16 個 worker 可能比 8 個更快? 提示:思考 GIL 和 I/O 等待時間的關係。
實作練習
修改 hook_validator.py:將
validate_all_hooks()改寫為並行版本,測量效能差異。實作超時機制:如果單一檔案檢查超過 5 秒,自動跳過並記錄警告。 提示:查看
future.result(timeout=5)。實作批次處理:當檔案超過 1000 個時,分批處理(每批 100 個),避免記憶體壓力。
延伸閱讀
- 案例研究:並行檔案檢查 - 完整的實作與測試
- 案例研究:並行 Hook 驗證 - 結合 as_completed 與進度報告
- 入門系列 3.7 並行處理 - 複習基礎概念
- 進階系列 4.3 GIL 與執行緒模型 - 深入理解 GIL
#python #python-advanced #optimization #parallel #multiprocessing