案例:並行檔案檢查
案例:並行檔案檢查
本案例基於 .claude/lib/markdown_link_checker.py,展示如何用 ThreadPoolExecutor 加速 I/O 密集的檔案檢查任務。
先備知識
問題背景
現有設計
markdown_link_checker.py 的 check_directory() 方法檢查目錄下所有 Markdown 檔案的內部連結:
1def check_directory(
2 self,
3 dir_path: str,
4 recursive: bool = True
5) -> List[LinkCheckResult]:
6 """
7 檢查目錄下所有 Markdown 檔案
8
9 Args:
10 dir_path: 目錄路徑
11 recursive: 是否遞迴檢查子目錄
12
13 Returns:
14 list[LinkCheckResult]: 所有檔案的檢查結果
15 """
16 dir_path = self._resolve_path(dir_path)
17
18 if not dir_path.is_dir():
19 return [
20 LinkCheckResult(
21 file_path=str(dir_path),
22 total_links=0,
23 broken_links=[
24 BrokenLink(
25 file=str(dir_path),
26 line=0,
27 link_text="",
28 link_target="",
29 suggestion=f"目錄不存在: {dir_path}"
30 )
31 ]
32 )
33 ]
34
35 # 收集所有 .md 檔案
36 if recursive:
37 md_files = sorted(dir_path.rglob("*.md"))
38 else:
39 md_files = sorted(dir_path.glob("*.md"))
40
41 # 循序檢查每個檔案
42 results = []
43 for md_file in md_files:
44 results.append(self.check_file(str(md_file)))
45
46 return results這個設計的優點
- 簡單直覺:循序執行,程式碼容易理解
- 結果有序:檔案按排序順序處理,結果也按順序返回
- 除錯容易:問題發生時,可以精確定位到哪個檔案
效能瓶頸分析
讓我們分析 check_file() 方法的執行時間組成:
1def check_file(self, file_path: str) -> LinkCheckResult:
2 """檢查單個 Markdown 檔案的連結"""
3 file_path = self._resolve_path(file_path)
4
5 # 1. 檢查檔案是否存在(I/O)
6 if not file_path.exists():
7 return LinkCheckResult(...)
8
9 # 2. 讀取檔案內容(I/O - 主要瓶頸)
10 try:
11 content = file_path.read_text(encoding="utf-8")
12 except Exception as e:
13 return LinkCheckResult(...)
14
15 # 3. 解析連結(CPU - 很快)
16 links = self.parse_markdown_links(content)
17
18 # 4. 過濾內部連結(CPU - 很快)
19 internal_links = self._filter_internal_links(links)
20
21 # 5. 檢查每個連結(I/O - 檔案系統檢查)
22 broken_links = []
23 for link in internal_links:
24 is_valid, suggestion = self._check_link(
25 link["target"],
26 file_path.parent
27 )
28 if not is_valid:
29 broken_links.append(...)
30
31 return LinkCheckResult(...)時間分布估計:
1操作 | 類型 | 每檔案耗時
2-----------------|-------|----------
3read_text() | I/O | 1-5 ms
4parse_links() | CPU | 0.1 ms
5filter_links() | CPU | 0.01 ms
6check_link() x N | I/O | N * 0.5 ms
7-----------------|-------|----------
8總計(10 連結) | | ~7 ms對於 100 個檔案的專案:
1# 循序執行
2total_time = 100 * 7ms = 700ms = 0.7 秒
3
4# 這看起來不長,但如果:
5# - 檔案更多(500+ 個)
6# - 每個檔案連結更多
7# - 網路檔案系統(NFS)
8# 時間會快速增長為什麼適合並行化?
- I/O 密集:大部分時間花在檔案讀取和存在性檢查
- 任務獨立:每個檔案的檢查互不依賴
- 無共享狀態:不需要同步機制
進階解決方案
設計目標
- 提升效能:利用並行化加速 I/O 操作
- 保持 API 相容:不改變方法簽名和返回值
- 可配置:允許調整並行度
實作步驟
步驟 1:識別獨立任務
每個檔案的檢查是完全獨立的:
1# 這些操作可以同時執行
2result_1 = checker.check_file("doc1.md") # 獨立
3result_2 = checker.check_file("doc2.md") # 獨立
4result_3 = checker.check_file("doc3.md") # 獨立步驟 2:使用 ThreadPoolExecutor
1from concurrent.futures import ThreadPoolExecutor, as_completed
2from typing import List, Optional
3
4def check_directory_parallel(
5 self,
6 dir_path: str,
7 recursive: bool = True,
8 max_workers: Optional[int] = None
9) -> List[LinkCheckResult]:
10 """
11 並行檢查目錄下所有 Markdown 檔案
12
13 Args:
14 dir_path: 目錄路徑
15 recursive: 是否遞迴檢查子目錄
16 max_workers: 最大工作執行緒數,預設為 CPU 核心數
17
18 Returns:
19 list[LinkCheckResult]: 所有檔案的檢查結果
20 """
21 dir_path = self._resolve_path(dir_path)
22
23 if not dir_path.is_dir():
24 return [self._create_error_result(dir_path, "目錄不存在")]
25
26 # 收集所有 .md 檔案
27 pattern = "**/*.md" if recursive else "*.md"
28 md_files = sorted(dir_path.glob(pattern) if not recursive
29 else dir_path.rglob("*.md"))
30
31 if not md_files:
32 return []
33
34 # 使用 ThreadPoolExecutor 並行處理
35 results = []
36 with ThreadPoolExecutor(max_workers=max_workers) as executor:
37 # 提交所有任務
38 future_to_file = {
39 executor.submit(self.check_file, str(md_file)): md_file
40 for md_file in md_files
41 }
42
43 # 收集結果
44 for future in as_completed(future_to_file):
45 result = future.result()
46 results.append(result)
47
48 # 按檔案路徑排序(保持一致的輸出順序)
49 results.sort(key=lambda r: r.file_path)
50
51 return results步驟 3:選擇 max_workers
max_workers 的選擇影響效能:
1import os
2
3# 預設值:min(32, os.cpu_count() + 4)
4# 這是 Python 3.8+ 的預設行為
5
6# 對於 I/O 密集任務,可以設定更高
7def get_optimal_workers(file_count: int) -> int:
8 """
9 根據檔案數量計算最佳工作執行緒數
10
11 經驗法則:
12 - 檔案數 < 10: 使用檔案數
13 - 檔案數 >= 10: 使用 CPU 核心數 * 2,但不超過 32
14 """
15 cpu_count = os.cpu_count() or 4
16
17 if file_count < 10:
18 return file_count
19
20 return min(32, cpu_count * 2, file_count)完整程式碼
1#!/usr/bin/env python3
2"""
3並行 Markdown 連結檢查器
4
5基於 markdown_link_checker.py,展示如何用 ThreadPoolExecutor 加速檔案檢查。
6"""
7
8import os
9import re
10from concurrent.futures import ThreadPoolExecutor, as_completed
11from dataclasses import dataclass, field
12from pathlib import Path
13from typing import Dict, List, Optional, Tuple
14
15@dataclass
16class BrokenLink:
17 """失效連結描述"""
18 file: str
19 line: int
20 link_text: str
21 link_target: str
22 suggestion: str = ""
23
24@dataclass
25class LinkCheckResult:
26 """單個檔案的連結檢查結果"""
27 file_path: str
28 total_links: int
29 broken_links: List[BrokenLink] = field(default_factory=list)
30 is_valid: bool = True
31
32 def __post_init__(self):
33 self.is_valid = len(self.broken_links) == 0
34
35class ParallelMarkdownLinkChecker:
36 """
37 並行 Markdown 連結檢查器
38
39 相較於原版的改進:
40 - check_directory() 使用 ThreadPoolExecutor 並行處理
41 - 支援自訂 max_workers
42 - 保持 API 相容性
43 """
44
45 INLINE_LINK_PATTERN = re.compile(r'(?<!!)\[([^\]]+)\]\(([^)]+)\)')
46 EXTERNAL_PATTERNS = [r'^https?://', r'^mailto:', r'^tel:', r'^ftp://']
47
48 def __init__(self, project_root: Optional[str] = None):
49 if project_root is None:
50 project_root = os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd())
51 self.project_root = Path(project_root)
52
53 # ===== 核心方法 =====
54
55 def check_file(self, file_path: str) -> LinkCheckResult:
56 """
57 檢查單個 Markdown 檔案的連結
58
59 這個方法是執行緒安全的,可以並行呼叫。
60 """
61 file_path = self._resolve_path(file_path)
62
63 if not file_path.exists():
64 return LinkCheckResult(
65 file_path=str(file_path),
66 total_links=0,
67 broken_links=[
68 BrokenLink(
69 file=str(file_path), line=0,
70 link_text="", link_target="",
71 suggestion=f"檔案不存在: {file_path}"
72 )
73 ]
74 )
75
76 try:
77 content = file_path.read_text(encoding="utf-8")
78 except Exception as e:
79 return LinkCheckResult(
80 file_path=str(file_path),
81 total_links=0,
82 broken_links=[
83 BrokenLink(
84 file=str(file_path), line=0,
85 link_text="", link_target="",
86 suggestion=f"無法讀取檔案: {e}"
87 )
88 ]
89 )
90
91 links = self._parse_links(content)
92 internal_links = self._filter_internal_links(links)
93
94 broken_links = []
95 for link in internal_links:
96 is_valid, suggestion = self._check_link(
97 link["target"], file_path.parent
98 )
99 if not is_valid:
100 broken_links.append(
101 BrokenLink(
102 file=str(file_path),
103 line=link["line"],
104 link_text=link["text"],
105 link_target=link["target"],
106 suggestion=suggestion
107 )
108 )
109
110 return LinkCheckResult(
111 file_path=str(file_path),
112 total_links=len(internal_links),
113 broken_links=broken_links
114 )
115
116 def check_directory(
117 self,
118 dir_path: str,
119 recursive: bool = True,
120 max_workers: Optional[int] = None
121 ) -> List[LinkCheckResult]:
122 """
123 並行檢查目錄下所有 Markdown 檔案
124
125 Args:
126 dir_path: 目錄路徑
127 recursive: 是否遞迴檢查子目錄
128 max_workers: 最大工作執行緒數,None 表示使用預設值
129
130 Returns:
131 list[LinkCheckResult]: 所有檔案的檢查結果(按路徑排序)
132 """
133 dir_path = self._resolve_path(dir_path)
134
135 if not dir_path.is_dir():
136 return [
137 LinkCheckResult(
138 file_path=str(dir_path),
139 total_links=0,
140 broken_links=[
141 BrokenLink(
142 file=str(dir_path), line=0,
143 link_text="", link_target="",
144 suggestion=f"目錄不存在: {dir_path}"
145 )
146 ]
147 )
148 ]
149
150 # 收集所有 .md 檔案
151 if recursive:
152 md_files = list(dir_path.rglob("*.md"))
153 else:
154 md_files = list(dir_path.glob("*.md"))
155
156 if not md_files:
157 return []
158
159 # 計算最佳工作執行緒數
160 if max_workers is None:
161 max_workers = self._get_optimal_workers(len(md_files))
162
163 # 並行處理
164 results: List[LinkCheckResult] = []
165
166 with ThreadPoolExecutor(max_workers=max_workers) as executor:
167 # 提交所有任務
168 future_to_file = {
169 executor.submit(self.check_file, str(f)): f
170 for f in md_files
171 }
172
173 # 收集結果(as_completed 提供最快的回應)
174 for future in as_completed(future_to_file):
175 try:
176 result = future.result()
177 results.append(result)
178 except Exception as e:
179 # 處理意外錯誤
180 md_file = future_to_file[future]
181 results.append(
182 LinkCheckResult(
183 file_path=str(md_file),
184 total_links=0,
185 broken_links=[
186 BrokenLink(
187 file=str(md_file), line=0,
188 link_text="", link_target="",
189 suggestion=f"檢查失敗: {e}"
190 )
191 ]
192 )
193 )
194
195 # 排序以保持一致的輸出順序
196 results.sort(key=lambda r: r.file_path)
197
198 return results
199
200 # ===== 循序版本(用於比較)=====
201
202 def check_directory_sequential(
203 self,
204 dir_path: str,
205 recursive: bool = True
206 ) -> List[LinkCheckResult]:
207 """循序版本,用於效能比較"""
208 dir_path = self._resolve_path(dir_path)
209
210 if not dir_path.is_dir():
211 return [
212 LinkCheckResult(
213 file_path=str(dir_path),
214 total_links=0,
215 broken_links=[
216 BrokenLink(
217 file=str(dir_path), line=0,
218 link_text="", link_target="",
219 suggestion=f"目錄不存在: {dir_path}"
220 )
221 ]
222 )
223 ]
224
225 if recursive:
226 md_files = sorted(dir_path.rglob("*.md"))
227 else:
228 md_files = sorted(dir_path.glob("*.md"))
229
230 results = []
231 for md_file in md_files:
232 results.append(self.check_file(str(md_file)))
233
234 return results
235
236 # ===== 私有方法 =====
237
238 def _resolve_path(self, path: str) -> Path:
239 p = Path(path)
240 return p if p.is_absolute() else self.project_root / p
241
242 def _parse_links(self, content: str) -> List[Dict]:
243 links = []
244 in_code_block = False
245
246 for line_num, line in enumerate(content.split('\n'), start=1):
247 if line.strip().startswith("```"):
248 in_code_block = not in_code_block
249 continue
250
251 if in_code_block:
252 continue
253
254 for match in self.INLINE_LINK_PATTERN.finditer(line):
255 links.append({
256 "text": match.group(1),
257 "target": match.group(2),
258 "line": line_num
259 })
260
261 return links
262
263 def _filter_internal_links(self, links: List[Dict]) -> List[Dict]:
264 internal = []
265 for link in links:
266 target = link["target"]
267 if target.startswith("#"):
268 continue
269 if any(re.match(p, target) for p in self.EXTERNAL_PATTERNS):
270 continue
271 internal.append(link)
272 return internal
273
274 def _check_link(
275 self,
276 target: str,
277 base_dir: Path
278 ) -> Tuple[bool, str]:
279 target_path = target.split("#")[0]
280 if not target_path:
281 return True, ""
282
283 resolved = (base_dir / target_path).resolve()
284 if resolved.exists():
285 return True, ""
286 else:
287 return False, f"檔案不存在: {target_path}"
288
289 def _get_optimal_workers(self, file_count: int) -> int:
290 """計算最佳工作執行緒數"""
291 cpu_count = os.cpu_count() or 4
292 if file_count < 10:
293 return file_count
294 return min(32, cpu_count * 2, file_count)
295
296# ===== 效能測量工具 =====
297
298def benchmark_checker(
299 dir_path: str,
300 iterations: int = 3
301) -> Dict[str, float]:
302 """
303 比較循序與並行版本的效能
304
305 Args:
306 dir_path: 要檢查的目錄
307 iterations: 執行次數(取平均)
308
309 Returns:
310 dict: {'sequential': 秒數, 'parallel': 秒數, 'speedup': 加速比}
311 """
312 import time
313
314 checker = ParallelMarkdownLinkChecker()
315
316 # 預熱(讓檔案系統快取生效)
317 checker.check_directory(dir_path)
318
319 # 測量循序版本
320 seq_times = []
321 for _ in range(iterations):
322 start = time.perf_counter()
323 checker.check_directory_sequential(dir_path)
324 seq_times.append(time.perf_counter() - start)
325
326 # 測量並行版本
327 par_times = []
328 for _ in range(iterations):
329 start = time.perf_counter()
330 checker.check_directory(dir_path)
331 par_times.append(time.perf_counter() - start)
332
333 seq_avg = sum(seq_times) / len(seq_times)
334 par_avg = sum(par_times) / len(par_times)
335
336 return {
337 "sequential": seq_avg,
338 "parallel": par_avg,
339 "speedup": seq_avg / par_avg if par_avg > 0 else 0
340 }
341
342# ===== 示範 =====
343
344if __name__ == "__main__":
345 import sys
346
347 # 預設檢查當前目錄
348 target_dir = sys.argv[1] if len(sys.argv) > 1 else "."
349
350 print(f"=== 並行 Markdown 連結檢查示範 ===\n")
351 print(f"目標目錄: {target_dir}\n")
352
353 checker = ParallelMarkdownLinkChecker()
354
355 # 執行檢查
356 results = checker.check_directory(target_dir)
357
358 # 統計
359 total_files = len(results)
360 total_links = sum(r.total_links for r in results)
361 broken_count = sum(len(r.broken_links) for r in results)
362 invalid_files = sum(1 for r in results if not r.is_valid)
363
364 print(f"檔案數: {total_files}")
365 print(f"連結數: {total_links}")
366 print(f"失效連結: {broken_count}")
367 print(f"有問題的檔案: {invalid_files}")
368
369 # 顯示失效連結
370 if broken_count > 0:
371 print(f"\n失效連結詳情:")
372 for result in results:
373 if not result.is_valid:
374 print(f"\n {result.file_path}:")
375 for link in result.broken_links:
376 print(f" Line {link.line}: [{link.link_text}](/python-advanced/08-practical-optimization/case-studies/parallel-file-check/{link.link_target})")
377
378 # 效能比較
379 if total_files >= 5:
380 print(f"\n=== 效能比較 ===\n")
381 benchmark = benchmark_checker(target_dir)
382 print(f"循序版本: {benchmark['sequential']:.3f} 秒")
383 print(f"並行版本: {benchmark['parallel']:.3f} 秒")
384 print(f"加速比: {benchmark['speedup']:.2f}x")效能測量
使用 timeit 比較前後效能:
1import timeit
2from parallel_link_checker import ParallelMarkdownLinkChecker
3
4def measure_performance(dir_path: str, num_runs: int = 5):
5 """測量並比較循序與並行版本的效能"""
6 checker = ParallelMarkdownLinkChecker()
7
8 # 循序版本
9 seq_time = timeit.timeit(
10 lambda: checker.check_directory_sequential(dir_path),
11 number=num_runs
12 ) / num_runs
13
14 # 並行版本
15 par_time = timeit.timeit(
16 lambda: checker.check_directory(dir_path),
17 number=num_runs
18 ) / num_runs
19
20 print(f"目錄: {dir_path}")
21 print(f"循序版本: {seq_time:.4f} 秒")
22 print(f"並行版本: {par_time:.4f} 秒")
23 print(f"加速比: {seq_time / par_time:.2f}x")
24
25# 實際測試結果(範例)
26# 目錄: ./docs (50 個 .md 檔案)
27# 循序版本: 0.3521 秒
28# 並行版本: 0.0892 秒
29# 加速比: 3.95x不同規模的預期加速比:
| 檔案數 | 循序時間 | 並行時間 | 加速比 |
|---|---|---|---|
| 10 | 70 ms | 25 ms | 2.8x |
| 50 | 350 ms | 90 ms | 3.9x |
| 100 | 700 ms | 160 ms | 4.4x |
| 500 | 3.5 s | 750 ms | 4.7x |
注意:實際加速比取決於檔案大小、連結數量、磁碟速度等因素。
設計權衡
| 面向 | 循序版本 | 並行版本 |
|---|---|---|
| 效能 | 較慢,線性增長 | 快 3-5 倍 |
| 複雜度 | 簡單 | 需要理解執行緒池 |
| 除錯 | 容易 | 需要注意執行緒安全 |
| 記憶體 | 較低 | 較高(執行緒開銷) |
| 結果順序 | 保證有序 | 需要額外排序 |
| 錯誤處理 | 直接 | 需要處理 Future 例外 |
執行緒安全考量
check_file() 方法是執行緒安全的,因為:
- 無共享狀態:每次呼叫都獨立處理一個檔案
- 唯讀操作:只讀取檔案,不修改
- 獨立返回值:每個呼叫返回獨立的
LinkCheckResult
1# 這是安全的
2def check_file(self, file_path: str) -> LinkCheckResult:
3 # 所有變數都是區域變數
4 file_path = self._resolve_path(file_path) # 新物件
5 content = file_path.read_text() # 區域變數
6 links = self._parse_links(content) # 區域變數
7 # ...
8 return LinkCheckResult(...) # 新物件什麼時候該用這個技術?
適合使用
- 多檔案處理:需要處理大量獨立檔案
- I/O 密集:主要時間花在檔案讀寫
- 任務獨立:每個任務不依賴其他任務的結果
- 可接受亂序:或願意在最後排序
不建議使用
- 檔案很少:少於 5 個檔案,並行開銷可能大於收益
- CPU 密集:如果主要時間花在計算,應考慮 ProcessPoolExecutor
- 有依賴關係:後續檔案依賴前面檔案的結果
- 記憶體受限:並行版本會同時載入多個檔案
練習
基礎練習
練習 1:加入進度回報
1def check_directory_with_progress(
2 self,
3 dir_path: str,
4 callback: callable = None
5) -> List[LinkCheckResult]:
6 """
7 並行檢查,並在每個檔案完成時呼叫 callback
8
9 callback 簽名: callback(completed: int, total: int, result: LinkCheckResult)
10
11 提示:使用 as_completed() 在每個任務完成時觸發回報
12 """
13 # Your implementation here
14 pass練習 2:支援取消
1def check_directory_cancellable(
2 self,
3 dir_path: str,
4 cancel_event: threading.Event = None
5) -> List[LinkCheckResult]:
6 """
7 可取消的並行檢查
8
9 當 cancel_event.is_set() 時,停止提交新任務並返回已完成的結果
10
11 提示:在迴圈中檢查 cancel_event
12 """
13 # Your implementation here
14 pass進階練習
練習 3:批次處理大型目錄
1def check_directory_batched(
2 self,
3 dir_path: str,
4 batch_size: int = 100
5) -> List[LinkCheckResult]:
6 """
7 分批處理大型目錄
8
9 避免一次提交太多任務導致記憶體問題
10
11 提示:將檔案列表分成多個批次,依序處理每批
12 """
13 # Your implementation here
14 pass練習 4:加入重試機制
1def check_file_with_retry(
2 self,
3 file_path: str,
4 max_retries: int = 3
5) -> LinkCheckResult:
6 """
7 帶重試的檔案檢查
8
9 當檔案被鎖定或暫時不可用時自動重試
10
11 提示:捕捉特定例外,使用指數退避
12 """
13 # Your implementation here
14 pass挑戰題
練習 5:實作並行度自動調整
1class AdaptiveParallelChecker:
2 """
3 自動調整並行度的檢查器
4
5 根據系統負載和檢查速度動態調整 max_workers
6
7 功能:
8 - 初始使用保守的 max_workers
9 - 如果任務完成很快,增加 max_workers
10 - 如果系統負載高,減少 max_workers
11 - 記錄最佳 max_workers 供下次使用
12 """
13 # Your implementation here
14 pass延伸閱讀
下一章:並行 Hook 驗證
#python #python-advanced #optimization #parallel #case-study