本案例基於 .claude/lib/git_utils.py 的實際程式碼,展示如何用 asyncio 實現非同步的外部命令執行。

先備知識

問題背景

現有設計

git_utils.py 使用同步的 subprocess.run

 1import subprocess
 2from typing import Optional
 3
 4def run_git_command(
 5    args: list[str],
 6    cwd: Optional[str] = None,
 7    timeout: int = 10
 8) -> tuple[bool, str]:
 9    """
10    執行 git 命令並返回結果
11
12    Args:
13        args: git 命令參數列表(不含 'git')
14        cwd: 執行目錄
15        timeout: 命令超時時間(秒)
16
17    Returns:
18        tuple[bool, str]: (是否成功, 輸出內容或錯誤訊息)
19    """
20    try:
21        result = subprocess.run(
22            ["git"] + args,
23            cwd=cwd,
24            capture_output=True,
25            text=True,
26            timeout=timeout
27        )
28        if result.returncode == 0:
29            return True, result.stdout.strip()
30        else:
31            return False, result.stderr.strip()
32    except subprocess.TimeoutExpired:
33        return False, f"Command timed out after {timeout}s"
34    except FileNotFoundError:
35        return False, "git command not found"
36    except Exception as e:
37        return False, str(e)
38
39def get_current_branch() -> Optional[str]:
40    """獲取當前分支名稱"""
41    success, output = run_git_command(["branch", "--show-current"])
42    return output if success and output else None
43
44def get_worktree_list() -> list[dict]:
45    """獲取所有 worktree 列表"""
46    success, output = run_git_command(["worktree", "list", "--porcelain"])
47    if not success:
48        return []
49    # ... 解析邏輯

這個設計的優點

  1. 簡單直覺:同步呼叫,容易理解
  2. 錯誤處理完善:處理超時、檔案不存在等情況
  3. API 清晰:返回 (bool, str) 元組

這個設計的限制

問題:無法並行執行多個 Git 命令

 1def check_all_worktrees(worktrees: list[str]) -> dict[str, str]:
 2    """檢查所有 worktree 的狀態"""
 3    results = {}
 4    for worktree in worktrees:
 5        # 每次呼叫都會阻塞等待
 6        success, status = run_git_command(["status", "-s"], cwd=worktree)
 7        results[worktree] = status if success else "error"
 8    return results
 9
10# 如果有 10 個 worktree,每個花 0.5 秒
11# 總共需要 5 秒!

問題:阻塞事件迴圈

1async def handle_request():
2    # 這會阻塞事件迴圈!
3    branch = get_current_branch()
4    # 其他協程無法執行
5    return {"branch": branch}

進階解決方案:非同步 subprocess

設計目標

  1. 非阻塞執行:不阻塞事件迴圈
  2. 並行能力:可以同時執行多個命令
  3. 相容性:保持與同步版本相似的 API

實作步驟

步驟 1:建立非同步命令執行器

 1import asyncio
 2from typing import Optional
 3
 4async def async_run_git_command(
 5    args: list[str],
 6    cwd: Optional[str] = None,
 7    timeout: float = 10.0
 8) -> tuple[bool, str]:
 9    """
10    非同步執行 git 命令
11
12    Args:
13        args: git 命令參數列表(不含 'git')
14        cwd: 執行目錄
15        timeout: 命令超時時間(秒)
16
17    Returns:
18        tuple[bool, str]: (是否成功, 輸出內容或錯誤訊息)
19
20    Example:
21        success, output = await async_run_git_command(["status"])
22    """
23    try:
24        # 建立非同步子進程
25        process = await asyncio.create_subprocess_exec(
26            "git", *args,
27            cwd=cwd,
28            stdout=asyncio.subprocess.PIPE,
29            stderr=asyncio.subprocess.PIPE
30        )
31
32        # 等待完成(帶超時)
33        try:
34            stdout, stderr = await asyncio.wait_for(
35                process.communicate(),
36                timeout=timeout
37            )
38        except asyncio.TimeoutError:
39            process.kill()
40            await process.wait()
41            return False, f"Command timed out after {timeout}s"
42
43        # 處理結果
44        if process.returncode == 0:
45            return True, stdout.decode().strip()
46        else:
47            return False, stderr.decode().strip()
48
49    except FileNotFoundError:
50        return False, "git command not found"
51    except Exception as e:
52        return False, str(e)

步驟 2:建立便利函式

 1async def async_get_current_branch() -> Optional[str]:
 2    """非同步獲取當前分支名稱"""
 3    success, output = await async_run_git_command(["branch", "--show-current"])
 4    return output if success and output else None
 5
 6async def async_get_project_root() -> str:
 7    """非同步獲取專案根目錄"""
 8    import os
 9    success, output = await async_run_git_command(["rev-parse", "--show-toplevel"])
10    return output if success else os.getcwd()
11
12async def async_get_worktree_list() -> list[dict]:
13    """非同步獲取 worktree 列表"""
14    success, output = await async_run_git_command(
15        ["worktree", "list", "--porcelain"]
16    )
17    if not success:
18        return []
19
20    worktrees = []
21    current_worktree: dict = {}
22
23    for line in output.split("\n"):
24        if line.startswith("worktree "):
25            if current_worktree:
26                worktrees.append(current_worktree)
27            current_worktree = {"path": line[9:]}  # len("worktree ") = 9
28        elif line.startswith("branch "):
29            branch_ref = line[7:]  # len("branch ") = 7
30            if branch_ref.startswith("refs/heads/"):
31                branch_ref = branch_ref[11:]  # len("refs/heads/") = 11
32            current_worktree["branch"] = branch_ref
33        elif line == "detached":
34            current_worktree["detached"] = True
35
36    if current_worktree:
37        worktrees.append(current_worktree)
38
39    return worktrees

步驟 3:實現並行執行

 1async def check_all_worktrees(worktrees: list[str]) -> dict[str, str]:
 2    """
 3    並行檢查所有 worktree 的狀態
 4
 5    Args:
 6        worktrees: worktree 路徑列表
 7
 8    Returns:
 9        dict[str, str]: {路徑: 狀態} 映射
10    """
11    async def check_one(worktree: str) -> tuple[str, str]:
12        """檢查單個 worktree"""
13        success, status = await async_run_git_command(
14            ["status", "-s"],
15            cwd=worktree
16        )
17        return worktree, status if success else "error"
18
19    # 並行執行所有檢查
20    tasks = [check_one(wt) for wt in worktrees]
21    results = await asyncio.gather(*tasks)
22
23    return dict(results)
24
25async def get_all_branches(worktrees: list[str]) -> dict[str, str]:
26    """
27    並行獲取所有 worktree 的當前分支
28
29    Args:
30        worktrees: worktree 路徑列表
31
32    Returns:
33        dict[str, str]: {路徑: 分支名} 映射
34    """
35    async def get_branch(worktree: str) -> tuple[str, str]:
36        success, branch = await async_run_git_command(
37            ["branch", "--show-current"],
38            cwd=worktree
39        )
40        return worktree, branch if success else "unknown"
41
42    tasks = [get_branch(wt) for wt in worktrees]
43    results = await asyncio.gather(*tasks)
44
45    return dict(results)

完整程式碼

  1#!/usr/bin/env python3
  2"""
  3非同步 Git 操作工具 - 完整範例
  4
  5展示如何用 asyncio 實現非阻塞的 Git 命令執行。
  6"""
  7
  8import asyncio
  9import os
 10from typing import Optional
 11
 12# ===== 核心功能 =====
 13
 14async def async_run_git_command(
 15    args: list[str],
 16    cwd: Optional[str] = None,
 17    timeout: float = 10.0
 18) -> tuple[bool, str]:
 19    """
 20    非同步執行 git 命令
 21
 22    Args:
 23        args: git 命令參數列表
 24        cwd: 執行目錄
 25        timeout: 超時時間(秒)
 26
 27    Returns:
 28        (是否成功, 輸出或錯誤訊息)
 29    """
 30    try:
 31        process = await asyncio.create_subprocess_exec(
 32            "git", *args,
 33            cwd=cwd,
 34            stdout=asyncio.subprocess.PIPE,
 35            stderr=asyncio.subprocess.PIPE
 36        )
 37
 38        try:
 39            stdout, stderr = await asyncio.wait_for(
 40                process.communicate(),
 41                timeout=timeout
 42            )
 43        except asyncio.TimeoutError:
 44            process.kill()
 45            await process.wait()
 46            return False, f"Command timed out after {timeout}s"
 47
 48        if process.returncode == 0:
 49            return True, stdout.decode().strip()
 50        else:
 51            return False, stderr.decode().strip()
 52
 53    except FileNotFoundError:
 54        return False, "git command not found"
 55    except Exception as e:
 56        return False, str(e)
 57
 58# ===== 便利函式 =====
 59
 60async def async_get_current_branch() -> Optional[str]:
 61    """獲取當前分支"""
 62    success, output = await async_run_git_command(["branch", "--show-current"])
 63    return output if success and output else None
 64
 65async def async_get_project_root() -> str:
 66    """獲取專案根目錄"""
 67    success, output = await async_run_git_command(["rev-parse", "--show-toplevel"])
 68    return output if success else os.getcwd()
 69
 70async def async_get_worktree_list() -> list[dict]:
 71    """獲取 worktree 列表"""
 72    success, output = await async_run_git_command(
 73        ["worktree", "list", "--porcelain"]
 74    )
 75    if not success:
 76        return []
 77
 78    worktrees = []
 79    current_worktree: dict = {}
 80
 81    for line in output.split("\n"):
 82        if line.startswith("worktree "):
 83            if current_worktree:
 84                worktrees.append(current_worktree)
 85            current_worktree = {"path": line[9:]}
 86        elif line.startswith("branch "):
 87            branch_ref = line[7:]
 88            if branch_ref.startswith("refs/heads/"):
 89                branch_ref = branch_ref[11:]
 90            current_worktree["branch"] = branch_ref
 91        elif line == "detached":
 92            current_worktree["detached"] = True
 93
 94    if current_worktree:
 95        worktrees.append(current_worktree)
 96
 97    return worktrees
 98
 99# ===== 並行操作 =====
100
101async def check_all_worktrees(worktrees: list[str]) -> dict[str, str]:
102    """並行檢查所有 worktree 狀態"""
103    async def check_one(path: str) -> tuple[str, str]:
104        success, status = await async_run_git_command(["status", "-s"], cwd=path)
105        return path, status if success else "error"
106
107    tasks = [check_one(wt) for wt in worktrees]
108    results = await asyncio.gather(*tasks)
109    return dict(results)
110
111async def get_all_branches(worktrees: list[str]) -> dict[str, str]:
112    """並行獲取所有 worktree 的分支"""
113    async def get_branch(path: str) -> tuple[str, str]:
114        success, branch = await async_run_git_command(
115            ["branch", "--show-current"],
116            cwd=path
117        )
118        return path, branch if success else "unknown"
119
120    tasks = [get_branch(wt) for wt in worktrees]
121    results = await asyncio.gather(*tasks)
122    return dict(results)
123
124async def batch_git_commands(
125    commands: list[tuple[list[str], Optional[str]]]
126) -> list[tuple[bool, str]]:
127    """
128    批次執行多個 Git 命令
129
130    Args:
131        commands: [(args, cwd), ...] 命令列表
132
133    Returns:
134        [(success, output), ...] 結果列表
135    """
136    tasks = [
137        async_run_git_command(args, cwd=cwd)
138        for args, cwd in commands
139    ]
140    return await asyncio.gather(*tasks)
141
142# ===== 同步/非同步橋接 =====
143
144def run_git_command(
145    args: list[str],
146    cwd: Optional[str] = None,
147    timeout: float = 10.0
148) -> tuple[bool, str]:
149    """
150    同步版本(相容舊 API)
151
152    在已有事件迴圈的環境中,這會建立新的迴圈執行。
153    """
154    return asyncio.run(async_run_git_command(args, cwd, timeout))
155
156def get_current_branch() -> Optional[str]:
157    """同步版本:獲取當前分支"""
158    return asyncio.run(async_get_current_branch())
159
160# ===== 測試 =====
161
162async def demo():
163    """示範非同步 Git 操作"""
164    print("=== 非同步 Git 操作示範 ===\n")
165
166    # 單一命令
167    print("1. 獲取當前分支:")
168    branch = await async_get_current_branch()
169    print(f"   分支: {branch}\n")
170
171    # 獲取專案根目錄
172    print("2. 獲取專案根目錄:")
173    root = await async_get_project_root()
174    print(f"   根目錄: {root}\n")
175
176    # 獲取 worktree 列表
177    print("3. 獲取 worktree 列表:")
178    worktrees = await async_get_worktree_list()
179    for wt in worktrees:
180        branch = wt.get("branch", "detached")
181        print(f"   - {branch}: {wt['path']}")
182    print()
183
184    # 如果有多個 worktree,示範並行操作
185    if len(worktrees) > 1:
186        print("4. 並行檢查所有 worktree 狀態:")
187        paths = [wt["path"] for wt in worktrees]
188        statuses = await check_all_worktrees(paths)
189        for path, status in statuses.items():
190            print(f"   - {path}:")
191            if status:
192                for line in status.split("\n"):
193                    print(f"       {line}")
194            else:
195                print("       (clean)")
196        print()
197
198    # 批次命令示範
199    print("5. 批次執行多個命令:")
200    commands = [
201        (["config", "user.name"], None),
202        (["config", "user.email"], None),
203        (["rev-parse", "--short", "HEAD"], None),
204    ]
205    results = await batch_git_commands(commands)
206    labels = ["使用者名稱", "使用者信箱", "當前 commit"]
207    for label, (success, output) in zip(labels, results):
208        print(f"   {label}: {output if success else '(未設定)'}")
209
210if __name__ == "__main__":
211    asyncio.run(demo())

使用範例

基本使用

 1import asyncio
 2
 3async def main():
 4    # 非同步獲取分支
 5    branch = await async_get_current_branch()
 6    print(f"當前分支: {branch}")
 7
 8    # 非同步獲取 worktree
 9    worktrees = await async_get_worktree_list()
10    print(f"Worktree 數量: {len(worktrees)}")
11
12asyncio.run(main())

並行操作

 1async def check_multiple_repos(repos: list[str]):
 2    """同時檢查多個 repo 的狀態"""
 3    tasks = [
 4        async_run_git_command(["status", "-s"], cwd=repo)
 5        for repo in repos
 6    ]
 7
 8    # 並行執行,等待全部完成
 9    results = await asyncio.gather(*tasks)
10
11    for repo, (success, output) in zip(repos, results):
12        status = "clean" if not output else "dirty"
13        print(f"{repo}: {status}")
14
15# 10 個 repo,如果每個花 0.5 秒
16# 並行執行只需要約 0.5 秒!

與 FastAPI 整合

 1from fastapi import FastAPI
 2
 3app = FastAPI()
 4
 5@app.get("/git/branch")
 6async def get_branch():
 7    """非同步端點:獲取當前分支"""
 8    branch = await async_get_current_branch()
 9    return {"branch": branch}
10
11@app.get("/git/worktrees")
12async def get_worktrees():
13    """非同步端點:獲取 worktree 列表"""
14    worktrees = await async_get_worktree_list()
15    return {"worktrees": worktrees}

效能比較

 1import time
 2import asyncio
 3
 4def sync_check_repos(repos: list[str]) -> float:
 5    """同步版本:依序檢查"""
 6    start = time.perf_counter()
 7    for repo in repos:
 8        run_git_command(["status", "-s"], cwd=repo)
 9    return time.perf_counter() - start
10
11async def async_check_repos(repos: list[str]) -> float:
12    """非同步版本:並行檢查"""
13    start = time.perf_counter()
14    tasks = [
15        async_run_git_command(["status", "-s"], cwd=repo)
16        for repo in repos
17    ]
18    await asyncio.gather(*tasks)
19    return time.perf_counter() - start
20
21# 假設每個 git status 花 0.2 秒
22repos = [f"/path/to/repo{i}" for i in range(10)]
23
24sync_time = sync_check_repos(repos)      # ~2.0 秒
25async_time = asyncio.run(async_check_repos(repos))  # ~0.2 秒
26
27print(f"同步: {sync_time:.2f}s")
28print(f"非同步: {async_time:.2f}s")
29print(f"加速: {sync_time / async_time:.1f}x")

設計權衡

面向同步 subprocess非同步 subprocess
簡單性簡單直覺需要理解 async/await
效能依序執行可並行執行
相容性到處可用需要事件迴圈
錯誤處理直覺需要處理非同步異常
測試簡單需要 pytest-asyncio
記憶體略高(維護多個進程)

什麼時候該用非同步 subprocess?

適合使用

  • 需要同時執行多個外部命令
  • 在非同步框架(FastAPI、aiohttp)中執行
  • 命令執行時間較長,需要同時做其他事

不建議使用

  • 只需要執行單一命令
  • 在同步程式碼中(需要額外的橋接)
  • 命令執行非常快(< 10ms)

進階:使用 TaskGroup(Python 3.11+)

 1async def check_repos_with_taskgroup(repos: list[str]) -> dict[str, str]:
 2    """使用 TaskGroup 管理並行任務"""
 3    results: dict[str, str] = {}
 4
 5    async with asyncio.TaskGroup() as tg:
 6        async def check_and_store(repo: str):
 7            success, output = await async_run_git_command(
 8                ["status", "-s"],
 9                cwd=repo
10            )
11            results[repo] = output if success else "error"
12
13        for repo in repos:
14            tg.create_task(check_and_store(repo))
15
16    return results

TaskGroup 的優點:

  • 更好的異常處理(一個失敗,全部取消)
  • 結構化並行(明確的範圍)

練習

基礎練習

  1. 實作 async_is_clean_repo(path) 函式,檢查 repo 是否乾淨
  2. 實作 async_get_commit_count(branch) 函式,獲取分支的 commit 數量

進階練習

  1. 實作一個 GitRepo 類別,封裝非同步 Git 操作
  2. 為非同步版本加入重試機制(失敗時自動重試)

挑戰題

  1. 實作一個監控工具:每 5 秒並行檢查多個 repo 的狀態,有變化時通知

延伸閱讀


上一章:案例研究索引 下一章:案例:並行 I/O 操作