案例:並行 I/O 操作
案例:並行 I/O 操作
本案例基於 .claude/lib/git_utils.py 的實際程式碼,展示如何用 asyncio.gather 和 TaskGroup 實現高效的並行 I/O 操作。
先備知識
問題背景
現有設計
git_utils.py 的 get_worktree_list() 取得 worktree 列表後,如果要檢查每個 worktree 的狀態,需要逐一呼叫:
1from git_utils import run_git_command, get_worktree_list
2
3def check_all_worktrees_sync() -> dict[str, str]:
4 """
5 同步版本:依序檢查每個 worktree 的狀態
6
7 Returns:
8 dict[str, str]: {路徑: 狀態} 映射
9 """
10 worktrees = get_worktree_list()
11 results = {}
12
13 for wt in worktrees:
14 path = wt["path"]
15 # Every call blocks until completion
16 success, output = run_git_command(["status", "-s"], cwd=path)
17 results[path] = output if success else "error"
18
19 return results
20
21def get_all_branches_sync() -> dict[str, str]:
22 """
23 同步版本:依序取得每個 worktree 的分支名稱
24
25 Returns:
26 dict[str, str]: {路徑: 分支名} 映射
27 """
28 worktrees = get_worktree_list()
29 results = {}
30
31 for wt in worktrees:
32 path = wt["path"]
33 # Each command waits for the previous one
34 success, output = run_git_command(["branch", "--show-current"], cwd=path)
35 results[path] = output if success else "unknown"
36
37 return results這個設計的優點
- 簡單直覺:循序執行,容易理解和除錯
- 錯誤處理明確:每個操作的結果立即可用
- 資源友善:一次只執行一個進程
這個設計的限制
當 worktree 數量增加時:
- 執行時間線性增長:10 個 worktree,每個 0.2 秒 = 2 秒
- 無法利用 I/O 等待時間:等待一個命令完成時,CPU 閒置
- 使用者體驗差:大量 worktree 時響應緩慢
1import time
2
3def benchmark_sync(worktrees: list[str]) -> float:
4 """測量同步版本的執行時間"""
5 start = time.perf_counter()
6
7 for path in worktrees:
8 # Simulate I/O wait (actual git command)
9 run_git_command(["status", "-s"], cwd=path)
10
11 return time.perf_counter() - start
12
13# 10 worktrees, each taking 0.2s
14# Total: 10 * 0.2 = 2.0 seconds進階解決方案
設計目標
- 並行執行多個獨立的 I/O 操作
- 正確處理部分失敗的情況
- 支援取消和超時機制
實作步驟
步驟 1:使用 asyncio.gather
asyncio.gather 是並行執行多個協程最直接的方式:
1import asyncio
2from typing import Optional
3
4async def async_run_git_command(
5 args: list[str],
6 cwd: Optional[str] = None,
7 timeout: float = 10.0
8) -> tuple[bool, str]:
9 """
10 非同步執行 git 命令(詳見上一章)
11 """
12 try:
13 process = await asyncio.create_subprocess_exec(
14 "git", *args,
15 cwd=cwd,
16 stdout=asyncio.subprocess.PIPE,
17 stderr=asyncio.subprocess.PIPE
18 )
19
20 try:
21 stdout, stderr = await asyncio.wait_for(
22 process.communicate(),
23 timeout=timeout
24 )
25 except asyncio.TimeoutError:
26 process.kill()
27 await process.wait()
28 return False, f"Command timed out after {timeout}s"
29
30 if process.returncode == 0:
31 return True, stdout.decode().strip()
32 else:
33 return False, stderr.decode().strip()
34
35 except FileNotFoundError:
36 return False, "git command not found"
37 except Exception as e:
38 return False, str(e)
39
40async def check_all_worktrees_basic(worktrees: list[str]) -> dict[str, str]:
41 """
42 使用 asyncio.gather 並行檢查所有 worktree
43
44 Args:
45 worktrees: worktree 路徑列表
46
47 Returns:
48 dict[str, str]: {路徑: 狀態} 映射
49 """
50 async def check_one(path: str) -> tuple[str, str]:
51 """Check a single worktree and return (path, status)"""
52 success, output = await async_run_git_command(
53 ["status", "-s"],
54 cwd=path
55 )
56 return path, output if success else "error"
57
58 # Create tasks for all worktrees
59 tasks = [check_one(path) for path in worktrees]
60
61 # Execute all tasks in parallel
62 results = await asyncio.gather(*tasks)
63
64 # Convert list of tuples to dict
65 return dict(results)
66
67# Usage example
68async def demo_basic():
69 worktrees = ["/path/to/repo1", "/path/to/repo2", "/path/to/repo3"]
70
71 # All three checks run in parallel
72 # If each takes 0.2s, total time is ~0.2s, not 0.6s
73 statuses = await check_all_worktrees_basic(worktrees)
74
75 for path, status in statuses.items():
76 print(f"{path}: {'clean' if not status else status}")重點說明:
asyncio.gather(*tasks)同時啟動所有協程- 等待所有協程完成後,返回結果列表
- 結果順序與輸入任務順序一致
步驟 2:處理錯誤(return_exceptions)
預設情況下,gather 在遇到第一個異常時會傳播該異常。使用 return_exceptions=True 可以收集所有結果,包括異常:
1async def check_all_worktrees_safe(
2 worktrees: list[str]
3) -> dict[str, str | Exception]:
4 """
5 安全版本:使用 return_exceptions 處理部分失敗
6
7 即使某些 worktree 檢查失敗,仍然返回其他的結果。
8
9 Args:
10 worktrees: worktree 路徑列表
11
12 Returns:
13 dict: {路徑: 狀態或例外} 映射
14 """
15 async def check_one(path: str) -> tuple[str, str]:
16 """Check with potential exception"""
17 success, output = await async_run_git_command(
18 ["status", "-s"],
19 cwd=path,
20 timeout=5.0 # Shorter timeout
21 )
22
23 if not success:
24 # Raise exception for failed commands
25 raise RuntimeError(f"Git command failed: {output}")
26
27 return path, output
28
29 tasks = [check_one(path) for path in worktrees]
30
31 # return_exceptions=True: exceptions become results, not propagated
32 results = await asyncio.gather(*tasks, return_exceptions=True)
33
34 # Process results, handling both successes and exceptions
35 output = {}
36 for path, result in zip(worktrees, results):
37 if isinstance(result, Exception):
38 output[path] = f"error: {result}"
39 else:
40 # result is (path, status) tuple
41 _, status = result
42 output[path] = status if status else "clean"
43
44 return output
45
46async def demo_error_handling():
47 """示範錯誤處理"""
48 worktrees = [
49 "/valid/repo1", # Works
50 "/invalid/path", # Fails
51 "/valid/repo2", # Works
52 ]
53
54 results = await check_all_worktrees_safe(worktrees)
55
56 for path, status in results.items():
57 if status.startswith("error:"):
58 print(f"[FAILED] {path}: {status}")
59 else:
60 print(f"[OK] {path}: {status}")
61
62# Output:
63# [OK] /valid/repo1: clean
64# [FAILED] /invalid/path: error: Git command failed: ...
65# [OK] /valid/repo2: M file.txtreturn_exceptions 行為對比:
1async def compare_exception_handling():
2 async def might_fail(n: int) -> int:
3 if n == 2:
4 raise ValueError(f"Task {n} failed")
5 return n * 10
6
7 tasks = [might_fail(i) for i in range(5)]
8
9 # Without return_exceptions (default)
10 try:
11 results = await asyncio.gather(*tasks) # Raises ValueError
12 except ValueError as e:
13 print(f"Caught: {e}") # Only see first error, others lost
14
15 # With return_exceptions=True
16 results = await asyncio.gather(*tasks, return_exceptions=True)
17 # results: [0, 10, ValueError('Task 2 failed'), 30, 40]
18
19 for i, result in enumerate(results):
20 if isinstance(result, Exception):
21 print(f"Task {i}: Failed - {result}")
22 else:
23 print(f"Task {i}: Success - {result}")步驟 3:使用 TaskGroup(Python 3.11+)
Python 3.11 引入的 TaskGroup 提供更好的結構化並行控制:
1import asyncio
2from typing import Optional
3
4async def check_all_worktrees_taskgroup(
5 worktrees: list[str]
6) -> dict[str, str]:
7 """
8 使用 TaskGroup 並行檢查所有 worktree
9
10 TaskGroup 特性:
11 - 任一任務失敗時,自動取消其他任務
12 - 明確的作用域(context manager)
13 - 異常聚合(ExceptionGroup)
14
15 Args:
16 worktrees: worktree 路徑列表
17
18 Returns:
19 dict[str, str]: {路徑: 狀態} 映射
20 """
21 results: dict[str, str] = {}
22
23 async def check_and_store(path: str) -> None:
24 """Check worktree and store result in shared dict"""
25 success, output = await async_run_git_command(
26 ["status", "-s"],
27 cwd=path
28 )
29 results[path] = output if success else "error"
30
31 async with asyncio.TaskGroup() as tg:
32 for path in worktrees:
33 tg.create_task(check_and_store(path))
34
35 # All tasks complete when exiting the context
36 return results
37
38async def demo_taskgroup():
39 """示範 TaskGroup 的基本用法"""
40 worktrees = ["/repo1", "/repo2", "/repo3"]
41
42 try:
43 results = await check_all_worktrees_taskgroup(worktrees)
44 for path, status in results.items():
45 print(f"{path}: {status}")
46 except* Exception as eg:
47 # Python 3.11+ except* syntax for ExceptionGroup
48 for exc in eg.exceptions:
49 print(f"Task failed: {exc}")TaskGroup 的錯誤處理模式:
1async def taskgroup_error_demo():
2 """示範 TaskGroup 的異常處理"""
3
4 async def task_might_fail(name: str, should_fail: bool) -> str:
5 await asyncio.sleep(0.1)
6 if should_fail:
7 raise ValueError(f"{name} failed!")
8 return f"{name} succeeded"
9
10 try:
11 async with asyncio.TaskGroup() as tg:
12 tg.create_task(task_might_fail("A", False))
13 tg.create_task(task_might_fail("B", True)) # Will fail
14 tg.create_task(task_might_fail("C", False)) # Gets cancelled
15 except* ValueError as eg:
16 print(f"Caught {len(eg.exceptions)} errors:")
17 for exc in eg.exceptions:
18 print(f" - {exc}")
19
20# When B fails:
21# 1. TaskGroup cancels C (even though it would succeed)
22# 2. Waits for all tasks to finish
23# 3. Raises ExceptionGroup with the ValueError步驟 4:並行與循序的混合模式
實際應用中,常需要混合並行和循序操作:
1async def check_worktrees_batched(
2 worktrees: list[str],
3 batch_size: int = 5
4) -> dict[str, str]:
5 """
6 分批並行處理:控制同時執行的任務數量
7
8 適用場景:
9 - 避免同時開啟過多進程
10 - 防止 API rate limiting
11 - 控制資源使用
12
13 Args:
14 worktrees: worktree 路徑列表
15 batch_size: 每批並行數量
16
17 Returns:
18 dict[str, str]: {路徑: 狀態} 映射
19 """
20 results: dict[str, str] = {}
21
22 # Process in batches
23 for i in range(0, len(worktrees), batch_size):
24 batch = worktrees[i:i + batch_size]
25
26 async def check_one(path: str) -> tuple[str, str]:
27 success, output = await async_run_git_command(
28 ["status", "-s"],
29 cwd=path
30 )
31 return path, output if success else "error"
32
33 # Process batch in parallel
34 batch_results = await asyncio.gather(
35 *[check_one(path) for path in batch]
36 )
37
38 # Collect results
39 results.update(dict(batch_results))
40
41 return results
42
43async def check_worktrees_semaphore(
44 worktrees: list[str],
45 max_concurrent: int = 5
46) -> dict[str, str]:
47 """
48 使用 Semaphore 限制並行數量
49
50 比分批更靈活:任務完成後立即啟動新任務,
51 而不是等整批完成。
52
53 Args:
54 worktrees: worktree 路徑列表
55 max_concurrent: 最大同時執行數
56
57 Returns:
58 dict[str, str]: {路徑: 狀態} 映射
59 """
60 semaphore = asyncio.Semaphore(max_concurrent)
61
62 async def check_with_limit(path: str) -> tuple[str, str]:
63 """Check worktree with concurrency limit"""
64 async with semaphore:
65 # At most max_concurrent tasks run this block
66 success, output = await async_run_git_command(
67 ["status", "-s"],
68 cwd=path
69 )
70 return path, output if success else "error"
71
72 # Launch all tasks (they'll wait at semaphore if needed)
73 tasks = [check_with_limit(path) for path in worktrees]
74 results = await asyncio.gather(*tasks)
75
76 return dict(results)
77
78async def pipeline_example():
79 """
80 示範流水線模式:前一步的輸出是後一步的輸入
81 """
82 # Step 1: Get worktree list (single operation)
83 worktrees = await async_get_worktree_list()
84 paths = [wt["path"] for wt in worktrees]
85
86 # Step 2: Check status in parallel
87 statuses = await check_all_worktrees_basic(paths)
88
89 # Step 3: For dirty repos, get detailed diff (parallel)
90 dirty_paths = [p for p, s in statuses.items() if s]
91
92 async def get_diff(path: str) -> tuple[str, str]:
93 success, diff = await async_run_git_command(["diff"], cwd=path)
94 return path, diff if success else ""
95
96 diffs = dict(await asyncio.gather(
97 *[get_diff(p) for p in dirty_paths]
98 ))
99
100 return {"statuses": statuses, "diffs": diffs}完整程式碼
1#!/usr/bin/env python3
2"""
3並行 I/O 操作工具 - 完整範例
4
5展示如何用 asyncio.gather 和 TaskGroup 實現高效的並行 Git 操作。
6"""
7
8import asyncio
9import time
10from typing import Optional
11
12# ===== Core async function =====
13
14async def async_run_git_command(
15 args: list[str],
16 cwd: Optional[str] = None,
17 timeout: float = 10.0
18) -> tuple[bool, str]:
19 """
20 非同步執行 git 命令
21
22 Args:
23 args: git 命令參數列表
24 cwd: 執行目錄
25 timeout: 超時時間(秒)
26
27 Returns:
28 (是否成功, 輸出或錯誤訊息)
29 """
30 try:
31 process = await asyncio.create_subprocess_exec(
32 "git", *args,
33 cwd=cwd,
34 stdout=asyncio.subprocess.PIPE,
35 stderr=asyncio.subprocess.PIPE
36 )
37
38 try:
39 stdout, stderr = await asyncio.wait_for(
40 process.communicate(),
41 timeout=timeout
42 )
43 except asyncio.TimeoutError:
44 process.kill()
45 await process.wait()
46 return False, f"Command timed out after {timeout}s"
47
48 if process.returncode == 0:
49 return True, stdout.decode().strip()
50 else:
51 return False, stderr.decode().strip()
52
53 except FileNotFoundError:
54 return False, "git command not found"
55 except Exception as e:
56 return False, str(e)
57
58async def async_get_worktree_list() -> list[dict]:
59 """獲取 worktree 列表"""
60 success, output = await async_run_git_command(
61 ["worktree", "list", "--porcelain"]
62 )
63 if not success:
64 return []
65
66 worktrees = []
67 current_worktree: dict = {}
68
69 for line in output.split("\n"):
70 if line.startswith("worktree "):
71 if current_worktree:
72 worktrees.append(current_worktree)
73 current_worktree = {"path": line[9:]}
74 elif line.startswith("branch "):
75 branch_ref = line[7:]
76 if branch_ref.startswith("refs/heads/"):
77 branch_ref = branch_ref[11:]
78 current_worktree["branch"] = branch_ref
79 elif line == "detached":
80 current_worktree["detached"] = True
81
82 if current_worktree:
83 worktrees.append(current_worktree)
84
85 return worktrees
86
87# ===== Parallel strategies =====
88
89async def parallel_with_gather(
90 worktrees: list[str],
91 return_exceptions: bool = False
92) -> dict[str, str]:
93 """
94 Strategy 1: asyncio.gather
95
96 Args:
97 worktrees: worktree 路徑列表
98 return_exceptions: 是否將異常作為結果返回
99
100 Returns:
101 dict[str, str]: 檢查結果
102 """
103 async def check_one(path: str) -> tuple[str, str]:
104 success, output = await async_run_git_command(
105 ["status", "-s"],
106 cwd=path
107 )
108 return path, output if success else "error"
109
110 tasks = [check_one(path) for path in worktrees]
111 results = await asyncio.gather(*tasks, return_exceptions=return_exceptions)
112
113 output = {}
114 for path, result in zip(worktrees, results):
115 if isinstance(result, Exception):
116 output[path] = f"exception: {result}"
117 else:
118 _, status = result
119 output[path] = status if status else "clean"
120
121 return output
122
123async def parallel_with_taskgroup(worktrees: list[str]) -> dict[str, str]:
124 """
125 Strategy 2: TaskGroup (Python 3.11+)
126
127 One task fails -> all cancelled
128 """
129 results: dict[str, str] = {}
130
131 async def check_and_store(path: str) -> None:
132 success, output = await async_run_git_command(
133 ["status", "-s"],
134 cwd=path
135 )
136 results[path] = output if success else "error"
137
138 async with asyncio.TaskGroup() as tg:
139 for path in worktrees:
140 tg.create_task(check_and_store(path))
141
142 return results
143
144async def parallel_with_semaphore(
145 worktrees: list[str],
146 max_concurrent: int = 5
147) -> dict[str, str]:
148 """
149 Strategy 3: Semaphore for rate limiting
150 """
151 semaphore = asyncio.Semaphore(max_concurrent)
152
153 async def check_with_limit(path: str) -> tuple[str, str]:
154 async with semaphore:
155 success, output = await async_run_git_command(
156 ["status", "-s"],
157 cwd=path
158 )
159 return path, output if success else "error"
160
161 tasks = [check_with_limit(path) for path in worktrees]
162 results = await asyncio.gather(*tasks)
163
164 return dict(results)
165
166# ===== Practical helpers =====
167
168async def get_worktree_status_report() -> dict:
169 """
170 生成完整的 worktree 狀態報告
171
172 Returns:
173 dict: 包含狀態、分支、變更的完整報告
174 """
175 # Step 1: Get worktree list
176 worktrees = await async_get_worktree_list()
177 paths = [wt["path"] for wt in worktrees]
178
179 if not paths:
180 return {"error": "No worktrees found"}
181
182 # Step 2: Parallel operations
183 async def get_status(path: str) -> tuple[str, str]:
184 success, output = await async_run_git_command(
185 ["status", "-s"],
186 cwd=path
187 )
188 return path, output if success else "error"
189
190 async def get_branch(path: str) -> tuple[str, str]:
191 success, output = await async_run_git_command(
192 ["branch", "--show-current"],
193 cwd=path
194 )
195 return path, output if success else "unknown"
196
197 async def get_last_commit(path: str) -> tuple[str, str]:
198 success, output = await async_run_git_command(
199 ["log", "-1", "--format=%s"],
200 cwd=path
201 )
202 return path, output if success else "unknown"
203
204 # Execute all queries in parallel
205 status_task = asyncio.gather(*[get_status(p) for p in paths])
206 branch_task = asyncio.gather(*[get_branch(p) for p in paths])
207 commit_task = asyncio.gather(*[get_last_commit(p) for p in paths])
208
209 statuses, branches, commits = await asyncio.gather(
210 status_task, branch_task, commit_task
211 )
212
213 # Combine results
214 report = {}
215 for wt in worktrees:
216 path = wt["path"]
217 status_dict = dict(statuses)
218 branch_dict = dict(branches)
219 commit_dict = dict(commits)
220
221 report[path] = {
222 "branch": branch_dict.get(path, "unknown"),
223 "status": status_dict.get(path, "error"),
224 "is_clean": not status_dict.get(path, "error"),
225 "last_commit": commit_dict.get(path, "unknown"),
226 }
227
228 return report
229
230# ===== Benchmark =====
231
232async def benchmark_strategies(worktrees: list[str]) -> dict[str, float]:
233 """
234 比較不同策略的執行時間
235 """
236 results = {}
237
238 # Strategy 1: Sequential (baseline)
239 start = time.perf_counter()
240 for path in worktrees:
241 await async_run_git_command(["status", "-s"], cwd=path)
242 results["sequential"] = time.perf_counter() - start
243
244 # Strategy 2: gather
245 start = time.perf_counter()
246 await parallel_with_gather(worktrees)
247 results["gather"] = time.perf_counter() - start
248
249 # Strategy 3: TaskGroup
250 start = time.perf_counter()
251 await parallel_with_taskgroup(worktrees)
252 results["taskgroup"] = time.perf_counter() - start
253
254 # Strategy 4: Semaphore
255 start = time.perf_counter()
256 await parallel_with_semaphore(worktrees, max_concurrent=3)
257 results["semaphore(3)"] = time.perf_counter() - start
258
259 return results
260
261# ===== Demo =====
262
263async def demo():
264 """示範並行 I/O 操作"""
265 print("=== 並行 I/O 操作示範 ===\n")
266
267 # Get worktrees
268 print("1. 獲取 worktree 列表:")
269 worktrees = await async_get_worktree_list()
270 paths = [wt["path"] for wt in worktrees]
271
272 for wt in worktrees:
273 branch = wt.get("branch", "detached")
274 print(f" - {branch}: {wt['path']}")
275 print()
276
277 if len(paths) >= 1:
278 # Benchmark
279 print("2. 效能比較:")
280 times = await benchmark_strategies(paths)
281 for strategy, elapsed in times.items():
282 print(f" {strategy}: {elapsed:.3f}s")
283
284 speedup = times["sequential"] / times["gather"]
285 print(f" 加速比: {speedup:.1f}x")
286 print()
287
288 # Full report
289 print("3. 完整狀態報告:")
290 report = await get_worktree_status_report()
291 for path, info in report.items():
292 status = "clean" if info["is_clean"] else "dirty"
293 print(f" [{info['branch']}] {path}")
294 print(f" 狀態: {status}")
295 print(f" 最新提交: {info['last_commit'][:50]}...")
296
297if __name__ == "__main__":
298 asyncio.run(demo())使用範例
基本使用
1import asyncio
2
3async def main():
4 # Get all worktree paths
5 worktrees = await async_get_worktree_list()
6 paths = [wt["path"] for wt in worktrees]
7
8 # Check all in parallel
9 statuses = await parallel_with_gather(paths)
10
11 for path, status in statuses.items():
12 print(f"{path}: {status or 'clean'}")
13
14asyncio.run(main())處理大量 worktree
1async def check_many_worktrees(paths: list[str]):
2 """處理大量 worktree 時使用 semaphore 限制並行數"""
3 # Limit to 10 concurrent git processes
4 results = await parallel_with_semaphore(paths, max_concurrent=10)
5
6 clean_count = sum(1 for s in results.values() if not s or s == "clean")
7 dirty_count = len(results) - clean_count
8
9 print(f"Clean: {clean_count}, Dirty: {dirty_count}")
10 return results與錯誤重試結合
1async def check_with_retry(
2 path: str,
3 max_retries: int = 3
4) -> tuple[str, str]:
5 """帶重試的檢查"""
6 for attempt in range(max_retries):
7 success, output = await async_run_git_command(
8 ["status", "-s"],
9 cwd=path,
10 timeout=5.0
11 )
12
13 if success:
14 return path, output
15
16 if attempt < max_retries - 1:
17 await asyncio.sleep(0.5 * (attempt + 1)) # Backoff
18
19 return path, "error: max retries exceeded"
20
21async def check_all_with_retry(paths: list[str]) -> dict[str, str]:
22 """所有檢查都帶重試機制"""
23 tasks = [check_with_retry(p) for p in paths]
24 results = await asyncio.gather(*tasks)
25 return dict(results)設計權衡
| 面向 | asyncio.gather | TaskGroup | Semaphore |
|---|---|---|---|
| Python 版本 | 3.4+ | 3.11+ | 3.4+ |
| 錯誤處理 | return_exceptions | 自動取消其他任務 | 同 gather |
| 取消行為 | 需手動處理 | 結構化取消 | 需手動處理 |
| 程式碼可讀性 | 中等 | 較高 | 中等 |
| 並行控制 | 無內建限制 | 無內建限制 | 可限制數量 |
| 適用場景 | 一般並行 | 全有或全無 | 需要限流 |
選擇指南
1需要並行執行多個獨立 I/O 操作?
2├── 是 → 是否需要「一個失敗全部取消」?
3│ ├── 是 → 使用 TaskGroup
4│ └── 否 → 是否需要限制並行數量?
5│ ├── 是 → 使用 Semaphore + gather
6│ └── 否 → 使用 gather(可選 return_exceptions)
7└── 否 → 直接使用 await什麼時候該用這個技術?
適合使用:
- 多個獨立的 I/O 操作(HTTP 請求、檔案讀取、資料庫查詢)
- 需要等待所有操作完成
- 操作之間沒有依賴關係
- 單一操作耗時較長(> 10ms)
不建議使用:
- CPU 密集計算(應用 multiprocessing)
- 操作之間有依賴關係(應用循序執行或流水線)
- 單一操作極快(overhead 可能大於收益)
- 外部服務有嚴格的 rate limit(需要更精細的控制)
練習
基礎練習
練習 1:用 gather 同時讀取多個設定檔
1async def read_configs(paths: list[str]) -> dict[str, str]:
2 """
3 並行讀取多個設定檔
4
5 提示:使用 aiofiles 或 asyncio.to_thread
6 """
7 # Your implementation here
8 pass練習 2:實作 worktree 狀態快取
1class WorktreeCache:
2 """
3 快取 worktree 狀態,避免頻繁查詢
4
5 提示:
6 - 使用 dict 儲存狀態
7 - 設定過期時間
8 - 過期時重新並行查詢
9 """
10 def __init__(self, ttl_seconds: float = 30.0):
11 self._cache: dict[str, tuple[str, float]] = {}
12 self._ttl = ttl_seconds
13
14 async def get_status(self, path: str) -> str:
15 # Your implementation here
16 pass
17
18 async def get_all_statuses(self, paths: list[str]) -> dict[str, str]:
19 # Your implementation here
20 pass進階練習
練習 3:實作帶有重試邏輯的並行下載器
1async def parallel_download(
2 urls: list[str],
3 max_concurrent: int = 5,
4 max_retries: int = 3
5) -> dict[str, bytes | Exception]:
6 """
7 並行下載多個 URL,支援重試
8
9 提示:
10 - 使用 Semaphore 限制並行數
11 - 實作指數退避(exponential backoff)
12 - 使用 aiohttp 進行非同步 HTTP 請求
13 """
14 # Your implementation here
15 pass挑戰題
練習 4:實作 semaphore 限制並行數量的 TaskGroup
1class BoundedTaskGroup:
2 """
3 限制最大並行數的 TaskGroup
4
5 使用方式:
6 async with BoundedTaskGroup(max_concurrent=5) as tg:
7 for item in items:
8 tg.create_task(process(item))
9
10 提示:
11 - 結合 Semaphore 和 TaskGroup
12 - 保持 TaskGroup 的錯誤處理語義
13 """
14 def __init__(self, max_concurrent: int):
15 self._semaphore = asyncio.Semaphore(max_concurrent)
16 self._tg: asyncio.TaskGroup | None = None
17
18 async def __aenter__(self):
19 # Your implementation here
20 pass
21
22 async def __aexit__(self, *args):
23 # Your implementation here
24 pass
25
26 def create_task(self, coro):
27 # Your implementation here
28 pass練習 5:實作監控儀表板
建立一個即時監控多個 Git repo 狀態的工具:
1async def monitor_repos(
2 repos: list[str],
3 interval: float = 5.0,
4 on_change: callable = None
5):
6 """
7 每隔 interval 秒並行檢查所有 repo
8 狀態變化時呼叫 on_change callback
9
10 提示:
11 - 使用 asyncio.sleep 控制間隔
12 - 比較前後狀態找出變化
13 - 支援 Ctrl+C 優雅退出
14 """
15 # Your implementation here
16 pass延伸閱讀
- asyncio.gather 官方文件
- TaskGroup 官方文件
- Semaphore 官方文件
- PEP 654 - Exception Groups - TaskGroup 的異常處理基礎
上一章:非同步 Subprocess 下一章:同步/非同步橋接