案例:非同步 subprocess
案例:非同步 subprocess
本案例基於 .claude/lib/git_utils.py 的實際程式碼,展示如何用 asyncio 實現非同步的外部命令執行。
先備知識
問題背景
現有設計
git_utils.py 使用同步的 subprocess.run:
1import subprocess
2from typing import Optional
3
4def run_git_command(
5 args: list[str],
6 cwd: Optional[str] = None,
7 timeout: int = 10
8) -> tuple[bool, str]:
9 """
10 執行 git 命令並返回結果
11
12 Args:
13 args: git 命令參數列表(不含 'git')
14 cwd: 執行目錄
15 timeout: 命令超時時間(秒)
16
17 Returns:
18 tuple[bool, str]: (是否成功, 輸出內容或錯誤訊息)
19 """
20 try:
21 result = subprocess.run(
22 ["git"] + args,
23 cwd=cwd,
24 capture_output=True,
25 text=True,
26 timeout=timeout
27 )
28 if result.returncode == 0:
29 return True, result.stdout.strip()
30 else:
31 return False, result.stderr.strip()
32 except subprocess.TimeoutExpired:
33 return False, f"Command timed out after {timeout}s"
34 except FileNotFoundError:
35 return False, "git command not found"
36 except Exception as e:
37 return False, str(e)
38
39def get_current_branch() -> Optional[str]:
40 """獲取當前分支名稱"""
41 success, output = run_git_command(["branch", "--show-current"])
42 return output if success and output else None
43
44def get_worktree_list() -> list[dict]:
45 """獲取所有 worktree 列表"""
46 success, output = run_git_command(["worktree", "list", "--porcelain"])
47 if not success:
48 return []
49 # ... 解析邏輯這個設計的優點
- 簡單直覺:同步呼叫,容易理解
- 錯誤處理完善:處理超時、檔案不存在等情況
- API 清晰:返回
(bool, str)元組
這個設計的限制
問題:無法並行執行多個 Git 命令
1def check_all_worktrees(worktrees: list[str]) -> dict[str, str]:
2 """檢查所有 worktree 的狀態"""
3 results = {}
4 for worktree in worktrees:
5 # 每次呼叫都會阻塞等待
6 success, status = run_git_command(["status", "-s"], cwd=worktree)
7 results[worktree] = status if success else "error"
8 return results
9
10# 如果有 10 個 worktree,每個花 0.5 秒
11# 總共需要 5 秒!問題:阻塞事件迴圈
1async def handle_request():
2 # 這會阻塞事件迴圈!
3 branch = get_current_branch()
4 # 其他協程無法執行
5 return {"branch": branch}進階解決方案:非同步 subprocess
設計目標
- 非阻塞執行:不阻塞事件迴圈
- 並行能力:可以同時執行多個命令
- 相容性:保持與同步版本相似的 API
實作步驟
步驟 1:建立非同步命令執行器
1import asyncio
2from typing import Optional
3
4async def async_run_git_command(
5 args: list[str],
6 cwd: Optional[str] = None,
7 timeout: float = 10.0
8) -> tuple[bool, str]:
9 """
10 非同步執行 git 命令
11
12 Args:
13 args: git 命令參數列表(不含 'git')
14 cwd: 執行目錄
15 timeout: 命令超時時間(秒)
16
17 Returns:
18 tuple[bool, str]: (是否成功, 輸出內容或錯誤訊息)
19
20 Example:
21 success, output = await async_run_git_command(["status"])
22 """
23 try:
24 # 建立非同步子進程
25 process = await asyncio.create_subprocess_exec(
26 "git", *args,
27 cwd=cwd,
28 stdout=asyncio.subprocess.PIPE,
29 stderr=asyncio.subprocess.PIPE
30 )
31
32 # 等待完成(帶超時)
33 try:
34 stdout, stderr = await asyncio.wait_for(
35 process.communicate(),
36 timeout=timeout
37 )
38 except asyncio.TimeoutError:
39 process.kill()
40 await process.wait()
41 return False, f"Command timed out after {timeout}s"
42
43 # 處理結果
44 if process.returncode == 0:
45 return True, stdout.decode().strip()
46 else:
47 return False, stderr.decode().strip()
48
49 except FileNotFoundError:
50 return False, "git command not found"
51 except Exception as e:
52 return False, str(e)步驟 2:建立便利函式
1async def async_get_current_branch() -> Optional[str]:
2 """非同步獲取當前分支名稱"""
3 success, output = await async_run_git_command(["branch", "--show-current"])
4 return output if success and output else None
5
6async def async_get_project_root() -> str:
7 """非同步獲取專案根目錄"""
8 import os
9 success, output = await async_run_git_command(["rev-parse", "--show-toplevel"])
10 return output if success else os.getcwd()
11
12async def async_get_worktree_list() -> list[dict]:
13 """非同步獲取 worktree 列表"""
14 success, output = await async_run_git_command(
15 ["worktree", "list", "--porcelain"]
16 )
17 if not success:
18 return []
19
20 worktrees = []
21 current_worktree: dict = {}
22
23 for line in output.split("\n"):
24 if line.startswith("worktree "):
25 if current_worktree:
26 worktrees.append(current_worktree)
27 current_worktree = {"path": line[9:]} # len("worktree ") = 9
28 elif line.startswith("branch "):
29 branch_ref = line[7:] # len("branch ") = 7
30 if branch_ref.startswith("refs/heads/"):
31 branch_ref = branch_ref[11:] # len("refs/heads/") = 11
32 current_worktree["branch"] = branch_ref
33 elif line == "detached":
34 current_worktree["detached"] = True
35
36 if current_worktree:
37 worktrees.append(current_worktree)
38
39 return worktrees步驟 3:實現並行執行
1async def check_all_worktrees(worktrees: list[str]) -> dict[str, str]:
2 """
3 並行檢查所有 worktree 的狀態
4
5 Args:
6 worktrees: worktree 路徑列表
7
8 Returns:
9 dict[str, str]: {路徑: 狀態} 映射
10 """
11 async def check_one(worktree: str) -> tuple[str, str]:
12 """檢查單個 worktree"""
13 success, status = await async_run_git_command(
14 ["status", "-s"],
15 cwd=worktree
16 )
17 return worktree, status if success else "error"
18
19 # 並行執行所有檢查
20 tasks = [check_one(wt) for wt in worktrees]
21 results = await asyncio.gather(*tasks)
22
23 return dict(results)
24
25async def get_all_branches(worktrees: list[str]) -> dict[str, str]:
26 """
27 並行獲取所有 worktree 的當前分支
28
29 Args:
30 worktrees: worktree 路徑列表
31
32 Returns:
33 dict[str, str]: {路徑: 分支名} 映射
34 """
35 async def get_branch(worktree: str) -> tuple[str, str]:
36 success, branch = await async_run_git_command(
37 ["branch", "--show-current"],
38 cwd=worktree
39 )
40 return worktree, branch if success else "unknown"
41
42 tasks = [get_branch(wt) for wt in worktrees]
43 results = await asyncio.gather(*tasks)
44
45 return dict(results)完整程式碼
1#!/usr/bin/env python3
2"""
3非同步 Git 操作工具 - 完整範例
4
5展示如何用 asyncio 實現非阻塞的 Git 命令執行。
6"""
7
8import asyncio
9import os
10from typing import Optional
11
12# ===== 核心功能 =====
13
14async def async_run_git_command(
15 args: list[str],
16 cwd: Optional[str] = None,
17 timeout: float = 10.0
18) -> tuple[bool, str]:
19 """
20 非同步執行 git 命令
21
22 Args:
23 args: git 命令參數列表
24 cwd: 執行目錄
25 timeout: 超時時間(秒)
26
27 Returns:
28 (是否成功, 輸出或錯誤訊息)
29 """
30 try:
31 process = await asyncio.create_subprocess_exec(
32 "git", *args,
33 cwd=cwd,
34 stdout=asyncio.subprocess.PIPE,
35 stderr=asyncio.subprocess.PIPE
36 )
37
38 try:
39 stdout, stderr = await asyncio.wait_for(
40 process.communicate(),
41 timeout=timeout
42 )
43 except asyncio.TimeoutError:
44 process.kill()
45 await process.wait()
46 return False, f"Command timed out after {timeout}s"
47
48 if process.returncode == 0:
49 return True, stdout.decode().strip()
50 else:
51 return False, stderr.decode().strip()
52
53 except FileNotFoundError:
54 return False, "git command not found"
55 except Exception as e:
56 return False, str(e)
57
58# ===== 便利函式 =====
59
60async def async_get_current_branch() -> Optional[str]:
61 """獲取當前分支"""
62 success, output = await async_run_git_command(["branch", "--show-current"])
63 return output if success and output else None
64
65async def async_get_project_root() -> str:
66 """獲取專案根目錄"""
67 success, output = await async_run_git_command(["rev-parse", "--show-toplevel"])
68 return output if success else os.getcwd()
69
70async def async_get_worktree_list() -> list[dict]:
71 """獲取 worktree 列表"""
72 success, output = await async_run_git_command(
73 ["worktree", "list", "--porcelain"]
74 )
75 if not success:
76 return []
77
78 worktrees = []
79 current_worktree: dict = {}
80
81 for line in output.split("\n"):
82 if line.startswith("worktree "):
83 if current_worktree:
84 worktrees.append(current_worktree)
85 current_worktree = {"path": line[9:]}
86 elif line.startswith("branch "):
87 branch_ref = line[7:]
88 if branch_ref.startswith("refs/heads/"):
89 branch_ref = branch_ref[11:]
90 current_worktree["branch"] = branch_ref
91 elif line == "detached":
92 current_worktree["detached"] = True
93
94 if current_worktree:
95 worktrees.append(current_worktree)
96
97 return worktrees
98
99# ===== 並行操作 =====
100
101async def check_all_worktrees(worktrees: list[str]) -> dict[str, str]:
102 """並行檢查所有 worktree 狀態"""
103 async def check_one(path: str) -> tuple[str, str]:
104 success, status = await async_run_git_command(["status", "-s"], cwd=path)
105 return path, status if success else "error"
106
107 tasks = [check_one(wt) for wt in worktrees]
108 results = await asyncio.gather(*tasks)
109 return dict(results)
110
111async def get_all_branches(worktrees: list[str]) -> dict[str, str]:
112 """並行獲取所有 worktree 的分支"""
113 async def get_branch(path: str) -> tuple[str, str]:
114 success, branch = await async_run_git_command(
115 ["branch", "--show-current"],
116 cwd=path
117 )
118 return path, branch if success else "unknown"
119
120 tasks = [get_branch(wt) for wt in worktrees]
121 results = await asyncio.gather(*tasks)
122 return dict(results)
123
124async def batch_git_commands(
125 commands: list[tuple[list[str], Optional[str]]]
126) -> list[tuple[bool, str]]:
127 """
128 批次執行多個 Git 命令
129
130 Args:
131 commands: [(args, cwd), ...] 命令列表
132
133 Returns:
134 [(success, output), ...] 結果列表
135 """
136 tasks = [
137 async_run_git_command(args, cwd=cwd)
138 for args, cwd in commands
139 ]
140 return await asyncio.gather(*tasks)
141
142# ===== 同步/非同步橋接 =====
143
144def run_git_command(
145 args: list[str],
146 cwd: Optional[str] = None,
147 timeout: float = 10.0
148) -> tuple[bool, str]:
149 """
150 同步版本(相容舊 API)
151
152 在已有事件迴圈的環境中,這會建立新的迴圈執行。
153 """
154 return asyncio.run(async_run_git_command(args, cwd, timeout))
155
156def get_current_branch() -> Optional[str]:
157 """同步版本:獲取當前分支"""
158 return asyncio.run(async_get_current_branch())
159
160# ===== 測試 =====
161
162async def demo():
163 """示範非同步 Git 操作"""
164 print("=== 非同步 Git 操作示範 ===\n")
165
166 # 單一命令
167 print("1. 獲取當前分支:")
168 branch = await async_get_current_branch()
169 print(f" 分支: {branch}\n")
170
171 # 獲取專案根目錄
172 print("2. 獲取專案根目錄:")
173 root = await async_get_project_root()
174 print(f" 根目錄: {root}\n")
175
176 # 獲取 worktree 列表
177 print("3. 獲取 worktree 列表:")
178 worktrees = await async_get_worktree_list()
179 for wt in worktrees:
180 branch = wt.get("branch", "detached")
181 print(f" - {branch}: {wt['path']}")
182 print()
183
184 # 如果有多個 worktree,示範並行操作
185 if len(worktrees) > 1:
186 print("4. 並行檢查所有 worktree 狀態:")
187 paths = [wt["path"] for wt in worktrees]
188 statuses = await check_all_worktrees(paths)
189 for path, status in statuses.items():
190 print(f" - {path}:")
191 if status:
192 for line in status.split("\n"):
193 print(f" {line}")
194 else:
195 print(" (clean)")
196 print()
197
198 # 批次命令示範
199 print("5. 批次執行多個命令:")
200 commands = [
201 (["config", "user.name"], None),
202 (["config", "user.email"], None),
203 (["rev-parse", "--short", "HEAD"], None),
204 ]
205 results = await batch_git_commands(commands)
206 labels = ["使用者名稱", "使用者信箱", "當前 commit"]
207 for label, (success, output) in zip(labels, results):
208 print(f" {label}: {output if success else '(未設定)'}")
209
210if __name__ == "__main__":
211 asyncio.run(demo())使用範例
基本使用
1import asyncio
2
3async def main():
4 # 非同步獲取分支
5 branch = await async_get_current_branch()
6 print(f"當前分支: {branch}")
7
8 # 非同步獲取 worktree
9 worktrees = await async_get_worktree_list()
10 print(f"Worktree 數量: {len(worktrees)}")
11
12asyncio.run(main())並行操作
1async def check_multiple_repos(repos: list[str]):
2 """同時檢查多個 repo 的狀態"""
3 tasks = [
4 async_run_git_command(["status", "-s"], cwd=repo)
5 for repo in repos
6 ]
7
8 # 並行執行,等待全部完成
9 results = await asyncio.gather(*tasks)
10
11 for repo, (success, output) in zip(repos, results):
12 status = "clean" if not output else "dirty"
13 print(f"{repo}: {status}")
14
15# 10 個 repo,如果每個花 0.5 秒
16# 並行執行只需要約 0.5 秒!與 FastAPI 整合
1from fastapi import FastAPI
2
3app = FastAPI()
4
5@app.get("/git/branch")
6async def get_branch():
7 """非同步端點:獲取當前分支"""
8 branch = await async_get_current_branch()
9 return {"branch": branch}
10
11@app.get("/git/worktrees")
12async def get_worktrees():
13 """非同步端點:獲取 worktree 列表"""
14 worktrees = await async_get_worktree_list()
15 return {"worktrees": worktrees}效能比較
1import time
2import asyncio
3
4def sync_check_repos(repos: list[str]) -> float:
5 """同步版本:依序檢查"""
6 start = time.perf_counter()
7 for repo in repos:
8 run_git_command(["status", "-s"], cwd=repo)
9 return time.perf_counter() - start
10
11async def async_check_repos(repos: list[str]) -> float:
12 """非同步版本:並行檢查"""
13 start = time.perf_counter()
14 tasks = [
15 async_run_git_command(["status", "-s"], cwd=repo)
16 for repo in repos
17 ]
18 await asyncio.gather(*tasks)
19 return time.perf_counter() - start
20
21# 假設每個 git status 花 0.2 秒
22repos = [f"/path/to/repo{i}" for i in range(10)]
23
24sync_time = sync_check_repos(repos) # ~2.0 秒
25async_time = asyncio.run(async_check_repos(repos)) # ~0.2 秒
26
27print(f"同步: {sync_time:.2f}s")
28print(f"非同步: {async_time:.2f}s")
29print(f"加速: {sync_time / async_time:.1f}x")設計權衡
| 面向 | 同步 subprocess | 非同步 subprocess |
|---|---|---|
| 簡單性 | 簡單直覺 | 需要理解 async/await |
| 效能 | 依序執行 | 可並行執行 |
| 相容性 | 到處可用 | 需要事件迴圈 |
| 錯誤處理 | 直覺 | 需要處理非同步異常 |
| 測試 | 簡單 | 需要 pytest-asyncio |
| 記憶體 | 低 | 略高(維護多個進程) |
什麼時候該用非同步 subprocess?
適合使用:
- 需要同時執行多個外部命令
- 在非同步框架(FastAPI、aiohttp)中執行
- 命令執行時間較長,需要同時做其他事
不建議使用:
- 只需要執行單一命令
- 在同步程式碼中(需要額外的橋接)
- 命令執行非常快(< 10ms)
進階:使用 TaskGroup(Python 3.11+)
1async def check_repos_with_taskgroup(repos: list[str]) -> dict[str, str]:
2 """使用 TaskGroup 管理並行任務"""
3 results: dict[str, str] = {}
4
5 async with asyncio.TaskGroup() as tg:
6 async def check_and_store(repo: str):
7 success, output = await async_run_git_command(
8 ["status", "-s"],
9 cwd=repo
10 )
11 results[repo] = output if success else "error"
12
13 for repo in repos:
14 tg.create_task(check_and_store(repo))
15
16 return resultsTaskGroup 的優點:
- 更好的異常處理(一個失敗,全部取消)
- 結構化並行(明確的範圍)
練習
基礎練習
- 實作
async_is_clean_repo(path)函式,檢查 repo 是否乾淨 - 實作
async_get_commit_count(branch)函式,獲取分支的 commit 數量
進階練習
- 實作一個
GitRepo類別,封裝非同步 Git 操作 - 為非同步版本加入重試機制(失敗時自動重試)
挑戰題
- 實作一個監控工具:每 5 秒並行檢查多個 repo 的狀態,有變化時通知
延伸閱讀
上一章:案例研究索引 下一章:案例:並行 I/O 操作