本案例基於 .claude/lib/git_utils.py 的實際程式碼,展示如何用 asyncio.gatherTaskGroup 實現高效的並行 I/O 操作。

先備知識

問題背景

現有設計

git_utils.pyget_worktree_list() 取得 worktree 列表後,如果要檢查每個 worktree 的狀態,需要逐一呼叫:

 1from git_utils import run_git_command, get_worktree_list
 2
 3def check_all_worktrees_sync() -> dict[str, str]:
 4    """
 5    同步版本:依序檢查每個 worktree 的狀態
 6
 7    Returns:
 8        dict[str, str]: {路徑: 狀態} 映射
 9    """
10    worktrees = get_worktree_list()
11    results = {}
12
13    for wt in worktrees:
14        path = wt["path"]
15        # Every call blocks until completion
16        success, output = run_git_command(["status", "-s"], cwd=path)
17        results[path] = output if success else "error"
18
19    return results
20
21def get_all_branches_sync() -> dict[str, str]:
22    """
23    同步版本:依序取得每個 worktree 的分支名稱
24
25    Returns:
26        dict[str, str]: {路徑: 分支名} 映射
27    """
28    worktrees = get_worktree_list()
29    results = {}
30
31    for wt in worktrees:
32        path = wt["path"]
33        # Each command waits for the previous one
34        success, output = run_git_command(["branch", "--show-current"], cwd=path)
35        results[path] = output if success else "unknown"
36
37    return results

這個設計的優點

  • 簡單直覺:循序執行,容易理解和除錯
  • 錯誤處理明確:每個操作的結果立即可用
  • 資源友善:一次只執行一個進程

這個設計的限制

當 worktree 數量增加時:

  • 執行時間線性增長:10 個 worktree,每個 0.2 秒 = 2 秒
  • 無法利用 I/O 等待時間:等待一個命令完成時,CPU 閒置
  • 使用者體驗差:大量 worktree 時響應緩慢
 1import time
 2
 3def benchmark_sync(worktrees: list[str]) -> float:
 4    """測量同步版本的執行時間"""
 5    start = time.perf_counter()
 6
 7    for path in worktrees:
 8        # Simulate I/O wait (actual git command)
 9        run_git_command(["status", "-s"], cwd=path)
10
11    return time.perf_counter() - start
12
13# 10 worktrees, each taking 0.2s
14# Total: 10 * 0.2 = 2.0 seconds

進階解決方案

設計目標

  1. 並行執行多個獨立的 I/O 操作
  2. 正確處理部分失敗的情況
  3. 支援取消和超時機制

實作步驟

步驟 1:使用 asyncio.gather

asyncio.gather 是並行執行多個協程最直接的方式:

 1import asyncio
 2from typing import Optional
 3
 4async def async_run_git_command(
 5    args: list[str],
 6    cwd: Optional[str] = None,
 7    timeout: float = 10.0
 8) -> tuple[bool, str]:
 9    """
10    非同步執行 git 命令(詳見上一章)
11    """
12    try:
13        process = await asyncio.create_subprocess_exec(
14            "git", *args,
15            cwd=cwd,
16            stdout=asyncio.subprocess.PIPE,
17            stderr=asyncio.subprocess.PIPE
18        )
19
20        try:
21            stdout, stderr = await asyncio.wait_for(
22                process.communicate(),
23                timeout=timeout
24            )
25        except asyncio.TimeoutError:
26            process.kill()
27            await process.wait()
28            return False, f"Command timed out after {timeout}s"
29
30        if process.returncode == 0:
31            return True, stdout.decode().strip()
32        else:
33            return False, stderr.decode().strip()
34
35    except FileNotFoundError:
36        return False, "git command not found"
37    except Exception as e:
38        return False, str(e)
39
40async def check_all_worktrees_basic(worktrees: list[str]) -> dict[str, str]:
41    """
42    使用 asyncio.gather 並行檢查所有 worktree
43
44    Args:
45        worktrees: worktree 路徑列表
46
47    Returns:
48        dict[str, str]: {路徑: 狀態} 映射
49    """
50    async def check_one(path: str) -> tuple[str, str]:
51        """Check a single worktree and return (path, status)"""
52        success, output = await async_run_git_command(
53            ["status", "-s"],
54            cwd=path
55        )
56        return path, output if success else "error"
57
58    # Create tasks for all worktrees
59    tasks = [check_one(path) for path in worktrees]
60
61    # Execute all tasks in parallel
62    results = await asyncio.gather(*tasks)
63
64    # Convert list of tuples to dict
65    return dict(results)
66
67# Usage example
68async def demo_basic():
69    worktrees = ["/path/to/repo1", "/path/to/repo2", "/path/to/repo3"]
70
71    # All three checks run in parallel
72    # If each takes 0.2s, total time is ~0.2s, not 0.6s
73    statuses = await check_all_worktrees_basic(worktrees)
74
75    for path, status in statuses.items():
76        print(f"{path}: {'clean' if not status else status}")

重點說明

  • asyncio.gather(*tasks) 同時啟動所有協程
  • 等待所有協程完成後,返回結果列表
  • 結果順序與輸入任務順序一致

步驟 2:處理錯誤(return_exceptions)

預設情況下,gather 在遇到第一個異常時會傳播該異常。使用 return_exceptions=True 可以收集所有結果,包括異常:

 1async def check_all_worktrees_safe(
 2    worktrees: list[str]
 3) -> dict[str, str | Exception]:
 4    """
 5    安全版本:使用 return_exceptions 處理部分失敗
 6
 7    即使某些 worktree 檢查失敗,仍然返回其他的結果。
 8
 9    Args:
10        worktrees: worktree 路徑列表
11
12    Returns:
13        dict: {路徑: 狀態或例外} 映射
14    """
15    async def check_one(path: str) -> tuple[str, str]:
16        """Check with potential exception"""
17        success, output = await async_run_git_command(
18            ["status", "-s"],
19            cwd=path,
20            timeout=5.0  # Shorter timeout
21        )
22
23        if not success:
24            # Raise exception for failed commands
25            raise RuntimeError(f"Git command failed: {output}")
26
27        return path, output
28
29    tasks = [check_one(path) for path in worktrees]
30
31    # return_exceptions=True: exceptions become results, not propagated
32    results = await asyncio.gather(*tasks, return_exceptions=True)
33
34    # Process results, handling both successes and exceptions
35    output = {}
36    for path, result in zip(worktrees, results):
37        if isinstance(result, Exception):
38            output[path] = f"error: {result}"
39        else:
40            # result is (path, status) tuple
41            _, status = result
42            output[path] = status if status else "clean"
43
44    return output
45
46async def demo_error_handling():
47    """示範錯誤處理"""
48    worktrees = [
49        "/valid/repo1",      # Works
50        "/invalid/path",     # Fails
51        "/valid/repo2",      # Works
52    ]
53
54    results = await check_all_worktrees_safe(worktrees)
55
56    for path, status in results.items():
57        if status.startswith("error:"):
58            print(f"[FAILED] {path}: {status}")
59        else:
60            print(f"[OK] {path}: {status}")
61
62# Output:
63# [OK] /valid/repo1: clean
64# [FAILED] /invalid/path: error: Git command failed: ...
65# [OK] /valid/repo2: M file.txt

return_exceptions 行為對比

 1async def compare_exception_handling():
 2    async def might_fail(n: int) -> int:
 3        if n == 2:
 4            raise ValueError(f"Task {n} failed")
 5        return n * 10
 6
 7    tasks = [might_fail(i) for i in range(5)]
 8
 9    # Without return_exceptions (default)
10    try:
11        results = await asyncio.gather(*tasks)  # Raises ValueError
12    except ValueError as e:
13        print(f"Caught: {e}")  # Only see first error, others lost
14
15    # With return_exceptions=True
16    results = await asyncio.gather(*tasks, return_exceptions=True)
17    # results: [0, 10, ValueError('Task 2 failed'), 30, 40]
18
19    for i, result in enumerate(results):
20        if isinstance(result, Exception):
21            print(f"Task {i}: Failed - {result}")
22        else:
23            print(f"Task {i}: Success - {result}")

步驟 3:使用 TaskGroup(Python 3.11+)

Python 3.11 引入的 TaskGroup 提供更好的結構化並行控制:

 1import asyncio
 2from typing import Optional
 3
 4async def check_all_worktrees_taskgroup(
 5    worktrees: list[str]
 6) -> dict[str, str]:
 7    """
 8    使用 TaskGroup 並行檢查所有 worktree
 9
10    TaskGroup 特性:
11    - 任一任務失敗時,自動取消其他任務
12    - 明確的作用域(context manager)
13    - 異常聚合(ExceptionGroup)
14
15    Args:
16        worktrees: worktree 路徑列表
17
18    Returns:
19        dict[str, str]: {路徑: 狀態} 映射
20    """
21    results: dict[str, str] = {}
22
23    async def check_and_store(path: str) -> None:
24        """Check worktree and store result in shared dict"""
25        success, output = await async_run_git_command(
26            ["status", "-s"],
27            cwd=path
28        )
29        results[path] = output if success else "error"
30
31    async with asyncio.TaskGroup() as tg:
32        for path in worktrees:
33            tg.create_task(check_and_store(path))
34
35    # All tasks complete when exiting the context
36    return results
37
38async def demo_taskgroup():
39    """示範 TaskGroup 的基本用法"""
40    worktrees = ["/repo1", "/repo2", "/repo3"]
41
42    try:
43        results = await check_all_worktrees_taskgroup(worktrees)
44        for path, status in results.items():
45            print(f"{path}: {status}")
46    except* Exception as eg:
47        # Python 3.11+ except* syntax for ExceptionGroup
48        for exc in eg.exceptions:
49            print(f"Task failed: {exc}")

TaskGroup 的錯誤處理模式

 1async def taskgroup_error_demo():
 2    """示範 TaskGroup 的異常處理"""
 3
 4    async def task_might_fail(name: str, should_fail: bool) -> str:
 5        await asyncio.sleep(0.1)
 6        if should_fail:
 7            raise ValueError(f"{name} failed!")
 8        return f"{name} succeeded"
 9
10    try:
11        async with asyncio.TaskGroup() as tg:
12            tg.create_task(task_might_fail("A", False))
13            tg.create_task(task_might_fail("B", True))   # Will fail
14            tg.create_task(task_might_fail("C", False))  # Gets cancelled
15    except* ValueError as eg:
16        print(f"Caught {len(eg.exceptions)} errors:")
17        for exc in eg.exceptions:
18            print(f"  - {exc}")
19
20# When B fails:
21# 1. TaskGroup cancels C (even though it would succeed)
22# 2. Waits for all tasks to finish
23# 3. Raises ExceptionGroup with the ValueError

步驟 4:並行與循序的混合模式

實際應用中,常需要混合並行和循序操作:

  1async def check_worktrees_batched(
  2    worktrees: list[str],
  3    batch_size: int = 5
  4) -> dict[str, str]:
  5    """
  6    分批並行處理:控制同時執行的任務數量
  7
  8    適用場景:
  9    - 避免同時開啟過多進程
 10    - 防止 API rate limiting
 11    - 控制資源使用
 12
 13    Args:
 14        worktrees: worktree 路徑列表
 15        batch_size: 每批並行數量
 16
 17    Returns:
 18        dict[str, str]: {路徑: 狀態} 映射
 19    """
 20    results: dict[str, str] = {}
 21
 22    # Process in batches
 23    for i in range(0, len(worktrees), batch_size):
 24        batch = worktrees[i:i + batch_size]
 25
 26        async def check_one(path: str) -> tuple[str, str]:
 27            success, output = await async_run_git_command(
 28                ["status", "-s"],
 29                cwd=path
 30            )
 31            return path, output if success else "error"
 32
 33        # Process batch in parallel
 34        batch_results = await asyncio.gather(
 35            *[check_one(path) for path in batch]
 36        )
 37
 38        # Collect results
 39        results.update(dict(batch_results))
 40
 41    return results
 42
 43async def check_worktrees_semaphore(
 44    worktrees: list[str],
 45    max_concurrent: int = 5
 46) -> dict[str, str]:
 47    """
 48    使用 Semaphore 限制並行數量
 49
 50    比分批更靈活:任務完成後立即啟動新任務,
 51    而不是等整批完成。
 52
 53    Args:
 54        worktrees: worktree 路徑列表
 55        max_concurrent: 最大同時執行數
 56
 57    Returns:
 58        dict[str, str]: {路徑: 狀態} 映射
 59    """
 60    semaphore = asyncio.Semaphore(max_concurrent)
 61
 62    async def check_with_limit(path: str) -> tuple[str, str]:
 63        """Check worktree with concurrency limit"""
 64        async with semaphore:
 65            # At most max_concurrent tasks run this block
 66            success, output = await async_run_git_command(
 67                ["status", "-s"],
 68                cwd=path
 69            )
 70            return path, output if success else "error"
 71
 72    # Launch all tasks (they'll wait at semaphore if needed)
 73    tasks = [check_with_limit(path) for path in worktrees]
 74    results = await asyncio.gather(*tasks)
 75
 76    return dict(results)
 77
 78async def pipeline_example():
 79    """
 80    示範流水線模式:前一步的輸出是後一步的輸入
 81    """
 82    # Step 1: Get worktree list (single operation)
 83    worktrees = await async_get_worktree_list()
 84    paths = [wt["path"] for wt in worktrees]
 85
 86    # Step 2: Check status in parallel
 87    statuses = await check_all_worktrees_basic(paths)
 88
 89    # Step 3: For dirty repos, get detailed diff (parallel)
 90    dirty_paths = [p for p, s in statuses.items() if s]
 91
 92    async def get_diff(path: str) -> tuple[str, str]:
 93        success, diff = await async_run_git_command(["diff"], cwd=path)
 94        return path, diff if success else ""
 95
 96    diffs = dict(await asyncio.gather(
 97        *[get_diff(p) for p in dirty_paths]
 98    ))
 99
100    return {"statuses": statuses, "diffs": diffs}

完整程式碼

  1#!/usr/bin/env python3
  2"""
  3並行 I/O 操作工具 - 完整範例
  4
  5展示如何用 asyncio.gather 和 TaskGroup 實現高效的並行 Git 操作。
  6"""
  7
  8import asyncio
  9import time
 10from typing import Optional
 11
 12# ===== Core async function =====
 13
 14async def async_run_git_command(
 15    args: list[str],
 16    cwd: Optional[str] = None,
 17    timeout: float = 10.0
 18) -> tuple[bool, str]:
 19    """
 20    非同步執行 git 命令
 21
 22    Args:
 23        args: git 命令參數列表
 24        cwd: 執行目錄
 25        timeout: 超時時間(秒)
 26
 27    Returns:
 28        (是否成功, 輸出或錯誤訊息)
 29    """
 30    try:
 31        process = await asyncio.create_subprocess_exec(
 32            "git", *args,
 33            cwd=cwd,
 34            stdout=asyncio.subprocess.PIPE,
 35            stderr=asyncio.subprocess.PIPE
 36        )
 37
 38        try:
 39            stdout, stderr = await asyncio.wait_for(
 40                process.communicate(),
 41                timeout=timeout
 42            )
 43        except asyncio.TimeoutError:
 44            process.kill()
 45            await process.wait()
 46            return False, f"Command timed out after {timeout}s"
 47
 48        if process.returncode == 0:
 49            return True, stdout.decode().strip()
 50        else:
 51            return False, stderr.decode().strip()
 52
 53    except FileNotFoundError:
 54        return False, "git command not found"
 55    except Exception as e:
 56        return False, str(e)
 57
 58async def async_get_worktree_list() -> list[dict]:
 59    """獲取 worktree 列表"""
 60    success, output = await async_run_git_command(
 61        ["worktree", "list", "--porcelain"]
 62    )
 63    if not success:
 64        return []
 65
 66    worktrees = []
 67    current_worktree: dict = {}
 68
 69    for line in output.split("\n"):
 70        if line.startswith("worktree "):
 71            if current_worktree:
 72                worktrees.append(current_worktree)
 73            current_worktree = {"path": line[9:]}
 74        elif line.startswith("branch "):
 75            branch_ref = line[7:]
 76            if branch_ref.startswith("refs/heads/"):
 77                branch_ref = branch_ref[11:]
 78            current_worktree["branch"] = branch_ref
 79        elif line == "detached":
 80            current_worktree["detached"] = True
 81
 82    if current_worktree:
 83        worktrees.append(current_worktree)
 84
 85    return worktrees
 86
 87# ===== Parallel strategies =====
 88
 89async def parallel_with_gather(
 90    worktrees: list[str],
 91    return_exceptions: bool = False
 92) -> dict[str, str]:
 93    """
 94    Strategy 1: asyncio.gather
 95
 96    Args:
 97        worktrees: worktree 路徑列表
 98        return_exceptions: 是否將異常作為結果返回
 99
100    Returns:
101        dict[str, str]: 檢查結果
102    """
103    async def check_one(path: str) -> tuple[str, str]:
104        success, output = await async_run_git_command(
105            ["status", "-s"],
106            cwd=path
107        )
108        return path, output if success else "error"
109
110    tasks = [check_one(path) for path in worktrees]
111    results = await asyncio.gather(*tasks, return_exceptions=return_exceptions)
112
113    output = {}
114    for path, result in zip(worktrees, results):
115        if isinstance(result, Exception):
116            output[path] = f"exception: {result}"
117        else:
118            _, status = result
119            output[path] = status if status else "clean"
120
121    return output
122
123async def parallel_with_taskgroup(worktrees: list[str]) -> dict[str, str]:
124    """
125    Strategy 2: TaskGroup (Python 3.11+)
126
127    One task fails -> all cancelled
128    """
129    results: dict[str, str] = {}
130
131    async def check_and_store(path: str) -> None:
132        success, output = await async_run_git_command(
133            ["status", "-s"],
134            cwd=path
135        )
136        results[path] = output if success else "error"
137
138    async with asyncio.TaskGroup() as tg:
139        for path in worktrees:
140            tg.create_task(check_and_store(path))
141
142    return results
143
144async def parallel_with_semaphore(
145    worktrees: list[str],
146    max_concurrent: int = 5
147) -> dict[str, str]:
148    """
149    Strategy 3: Semaphore for rate limiting
150    """
151    semaphore = asyncio.Semaphore(max_concurrent)
152
153    async def check_with_limit(path: str) -> tuple[str, str]:
154        async with semaphore:
155            success, output = await async_run_git_command(
156                ["status", "-s"],
157                cwd=path
158            )
159            return path, output if success else "error"
160
161    tasks = [check_with_limit(path) for path in worktrees]
162    results = await asyncio.gather(*tasks)
163
164    return dict(results)
165
166# ===== Practical helpers =====
167
168async def get_worktree_status_report() -> dict:
169    """
170    生成完整的 worktree 狀態報告
171
172    Returns:
173        dict: 包含狀態、分支、變更的完整報告
174    """
175    # Step 1: Get worktree list
176    worktrees = await async_get_worktree_list()
177    paths = [wt["path"] for wt in worktrees]
178
179    if not paths:
180        return {"error": "No worktrees found"}
181
182    # Step 2: Parallel operations
183    async def get_status(path: str) -> tuple[str, str]:
184        success, output = await async_run_git_command(
185            ["status", "-s"],
186            cwd=path
187        )
188        return path, output if success else "error"
189
190    async def get_branch(path: str) -> tuple[str, str]:
191        success, output = await async_run_git_command(
192            ["branch", "--show-current"],
193            cwd=path
194        )
195        return path, output if success else "unknown"
196
197    async def get_last_commit(path: str) -> tuple[str, str]:
198        success, output = await async_run_git_command(
199            ["log", "-1", "--format=%s"],
200            cwd=path
201        )
202        return path, output if success else "unknown"
203
204    # Execute all queries in parallel
205    status_task = asyncio.gather(*[get_status(p) for p in paths])
206    branch_task = asyncio.gather(*[get_branch(p) for p in paths])
207    commit_task = asyncio.gather(*[get_last_commit(p) for p in paths])
208
209    statuses, branches, commits = await asyncio.gather(
210        status_task, branch_task, commit_task
211    )
212
213    # Combine results
214    report = {}
215    for wt in worktrees:
216        path = wt["path"]
217        status_dict = dict(statuses)
218        branch_dict = dict(branches)
219        commit_dict = dict(commits)
220
221        report[path] = {
222            "branch": branch_dict.get(path, "unknown"),
223            "status": status_dict.get(path, "error"),
224            "is_clean": not status_dict.get(path, "error"),
225            "last_commit": commit_dict.get(path, "unknown"),
226        }
227
228    return report
229
230# ===== Benchmark =====
231
232async def benchmark_strategies(worktrees: list[str]) -> dict[str, float]:
233    """
234    比較不同策略的執行時間
235    """
236    results = {}
237
238    # Strategy 1: Sequential (baseline)
239    start = time.perf_counter()
240    for path in worktrees:
241        await async_run_git_command(["status", "-s"], cwd=path)
242    results["sequential"] = time.perf_counter() - start
243
244    # Strategy 2: gather
245    start = time.perf_counter()
246    await parallel_with_gather(worktrees)
247    results["gather"] = time.perf_counter() - start
248
249    # Strategy 3: TaskGroup
250    start = time.perf_counter()
251    await parallel_with_taskgroup(worktrees)
252    results["taskgroup"] = time.perf_counter() - start
253
254    # Strategy 4: Semaphore
255    start = time.perf_counter()
256    await parallel_with_semaphore(worktrees, max_concurrent=3)
257    results["semaphore(3)"] = time.perf_counter() - start
258
259    return results
260
261# ===== Demo =====
262
263async def demo():
264    """示範並行 I/O 操作"""
265    print("=== 並行 I/O 操作示範 ===\n")
266
267    # Get worktrees
268    print("1. 獲取 worktree 列表:")
269    worktrees = await async_get_worktree_list()
270    paths = [wt["path"] for wt in worktrees]
271
272    for wt in worktrees:
273        branch = wt.get("branch", "detached")
274        print(f"   - {branch}: {wt['path']}")
275    print()
276
277    if len(paths) >= 1:
278        # Benchmark
279        print("2. 效能比較:")
280        times = await benchmark_strategies(paths)
281        for strategy, elapsed in times.items():
282            print(f"   {strategy}: {elapsed:.3f}s")
283
284        speedup = times["sequential"] / times["gather"]
285        print(f"   加速比: {speedup:.1f}x")
286        print()
287
288        # Full report
289        print("3. 完整狀態報告:")
290        report = await get_worktree_status_report()
291        for path, info in report.items():
292            status = "clean" if info["is_clean"] else "dirty"
293            print(f"   [{info['branch']}] {path}")
294            print(f"       狀態: {status}")
295            print(f"       最新提交: {info['last_commit'][:50]}...")
296
297if __name__ == "__main__":
298    asyncio.run(demo())

使用範例

基本使用

 1import asyncio
 2
 3async def main():
 4    # Get all worktree paths
 5    worktrees = await async_get_worktree_list()
 6    paths = [wt["path"] for wt in worktrees]
 7
 8    # Check all in parallel
 9    statuses = await parallel_with_gather(paths)
10
11    for path, status in statuses.items():
12        print(f"{path}: {status or 'clean'}")
13
14asyncio.run(main())

處理大量 worktree

 1async def check_many_worktrees(paths: list[str]):
 2    """處理大量 worktree 時使用 semaphore 限制並行數"""
 3    # Limit to 10 concurrent git processes
 4    results = await parallel_with_semaphore(paths, max_concurrent=10)
 5
 6    clean_count = sum(1 for s in results.values() if not s or s == "clean")
 7    dirty_count = len(results) - clean_count
 8
 9    print(f"Clean: {clean_count}, Dirty: {dirty_count}")
10    return results

與錯誤重試結合

 1async def check_with_retry(
 2    path: str,
 3    max_retries: int = 3
 4) -> tuple[str, str]:
 5    """帶重試的檢查"""
 6    for attempt in range(max_retries):
 7        success, output = await async_run_git_command(
 8            ["status", "-s"],
 9            cwd=path,
10            timeout=5.0
11        )
12
13        if success:
14            return path, output
15
16        if attempt < max_retries - 1:
17            await asyncio.sleep(0.5 * (attempt + 1))  # Backoff
18
19    return path, "error: max retries exceeded"
20
21async def check_all_with_retry(paths: list[str]) -> dict[str, str]:
22    """所有檢查都帶重試機制"""
23    tasks = [check_with_retry(p) for p in paths]
24    results = await asyncio.gather(*tasks)
25    return dict(results)

設計權衡

面向asyncio.gatherTaskGroupSemaphore
Python 版本3.4+3.11+3.4+
錯誤處理return_exceptions自動取消其他任務同 gather
取消行為需手動處理結構化取消需手動處理
程式碼可讀性中等較高中等
並行控制無內建限制無內建限制可限制數量
適用場景一般並行全有或全無需要限流

選擇指南

1需要並行執行多個獨立 I/O 操作?
2├── 是 → 是否需要「一個失敗全部取消」?
3│        ├── 是 → 使用 TaskGroup
4│        └── 否 → 是否需要限制並行數量?
5│                 ├── 是 → 使用 Semaphore + gather
6│                 └── 否 → 使用 gather(可選 return_exceptions)
7└── 否 → 直接使用 await

什麼時候該用這個技術?

適合使用

  • 多個獨立的 I/O 操作(HTTP 請求、檔案讀取、資料庫查詢)
  • 需要等待所有操作完成
  • 操作之間沒有依賴關係
  • 單一操作耗時較長(> 10ms)

不建議使用

  • CPU 密集計算(應用 multiprocessing)
  • 操作之間有依賴關係(應用循序執行或流水線)
  • 單一操作極快(overhead 可能大於收益)
  • 外部服務有嚴格的 rate limit(需要更精細的控制)

練習

基礎練習

練習 1:用 gather 同時讀取多個設定檔

1async def read_configs(paths: list[str]) -> dict[str, str]:
2    """
3    並行讀取多個設定檔
4
5    提示:使用 aiofiles 或 asyncio.to_thread
6    """
7    # Your implementation here
8    pass

練習 2:實作 worktree 狀態快取

 1class WorktreeCache:
 2    """
 3    快取 worktree 狀態,避免頻繁查詢
 4
 5    提示:
 6    - 使用 dict 儲存狀態
 7    - 設定過期時間
 8    - 過期時重新並行查詢
 9    """
10    def __init__(self, ttl_seconds: float = 30.0):
11        self._cache: dict[str, tuple[str, float]] = {}
12        self._ttl = ttl_seconds
13
14    async def get_status(self, path: str) -> str:
15        # Your implementation here
16        pass
17
18    async def get_all_statuses(self, paths: list[str]) -> dict[str, str]:
19        # Your implementation here
20        pass

進階練習

練習 3:實作帶有重試邏輯的並行下載器

 1async def parallel_download(
 2    urls: list[str],
 3    max_concurrent: int = 5,
 4    max_retries: int = 3
 5) -> dict[str, bytes | Exception]:
 6    """
 7    並行下載多個 URL,支援重試
 8
 9    提示:
10    - 使用 Semaphore 限制並行數
11    - 實作指數退避(exponential backoff)
12    - 使用 aiohttp 進行非同步 HTTP 請求
13    """
14    # Your implementation here
15    pass

挑戰題

練習 4:實作 semaphore 限制並行數量的 TaskGroup

 1class BoundedTaskGroup:
 2    """
 3    限制最大並行數的 TaskGroup
 4
 5    使用方式:
 6        async with BoundedTaskGroup(max_concurrent=5) as tg:
 7            for item in items:
 8                tg.create_task(process(item))
 9
10    提示:
11    - 結合 Semaphore 和 TaskGroup
12    - 保持 TaskGroup 的錯誤處理語義
13    """
14    def __init__(self, max_concurrent: int):
15        self._semaphore = asyncio.Semaphore(max_concurrent)
16        self._tg: asyncio.TaskGroup | None = None
17
18    async def __aenter__(self):
19        # Your implementation here
20        pass
21
22    async def __aexit__(self, *args):
23        # Your implementation here
24        pass
25
26    def create_task(self, coro):
27        # Your implementation here
28        pass

練習 5:實作監控儀表板

建立一個即時監控多個 Git repo 狀態的工具:

 1async def monitor_repos(
 2    repos: list[str],
 3    interval: float = 5.0,
 4    on_change: callable = None
 5):
 6    """
 7    每隔 interval 秒並行檢查所有 repo
 8    狀態變化時呼叫 on_change callback
 9
10    提示:
11    - 使用 asyncio.sleep 控制間隔
12    - 比較前後狀態找出變化
13    - 支援 Ctrl+C 優雅退出
14    """
15    # Your implementation here
16    pass

延伸閱讀


上一章:非同步 Subprocess 下一章:同步/非同步橋接