案例:同步/非同步橋接
案例:同步/非同步橋接
本案例基於 .claude/lib 整體架構,展示如何用 run_in_executor 和 asyncio.run 在同步與非同步程式碼之間建立橋樑。
先備知識
問題背景
現有設計
.claude/lib 是一個同步設計的 Python 工具庫,包含多個模組:
1# .claude/lib/__init__.py
2"""
3Claude Hooks 共用程式庫
4
5模組結構:
6- git_utils: Git 操作工具(分支、worktree、專案根目錄)
7- config_loader: 配置檔案載入
8- hook_io: Hook 輸入輸出處理
9- hook_validator: Hook 合規性驗證
10- markdown_link_checker: Markdown 連結檢查
11"""
12
13from .git_utils import (
14 run_git_command,
15 get_current_branch,
16 get_project_root,
17 get_worktree_list,
18)
19
20from .config_loader import (
21 load_config,
22 load_agents_config,
23)
24
25from .hook_io import (
26 read_hook_input,
27 write_hook_output,
28)這些函式都是同步的:
1# git_utils.py - 同步的 subprocess 呼叫
2def run_git_command(
3 args: list[str],
4 cwd: Optional[str] = None,
5 timeout: int = 10
6) -> tuple[bool, str]:
7 """Execute git command and return result"""
8 result = subprocess.run(
9 ["git"] + args,
10 cwd=cwd,
11 capture_output=True,
12 text=True,
13 timeout=timeout
14 )
15 if result.returncode == 0:
16 return True, result.stdout.strip()
17 return False, result.stderr.strip()
18
19# config_loader.py - 同步的檔案 I/O
20def load_config(config_name: str) -> dict:
21 """Load configuration file"""
22 config_dir = get_config_dir()
23 yaml_path = config_dir / f"{config_name}.yaml"
24
25 with open(yaml_path, "r", encoding="utf-8") as f:
26 return yaml.safe_load(f)
27
28# hook_validator.py - 同步的檔案驗證
29def validate_hook(hook_path: str) -> ValidationResult:
30 """Validate a single hook file"""
31 with open(hook_path, "r", encoding="utf-8") as f:
32 content = f.read()
33 # ... validation logic這個設計的優點
- 簡單直覺:不需要了解 asyncio,任何 Python 開發者都能使用
- 向後相容:可以在任何 Python 環境中執行
- 測試容易:同步程式碼的測試更直觀
- 依賴少:不需要額外的非同步依賴庫
這個設計的限制
在非同步環境(如 FastAPI、aiohttp)中使用時:
問題 1:阻塞事件迴圈
1from fastapi import FastAPI
2from lib.git_utils import get_current_branch, get_worktree_list
3
4app = FastAPI()
5
6@app.get("/git/status")
7async def get_git_status():
8 # BAD: These synchronous calls block the event loop!
9 branch = get_current_branch() # Blocks ~50ms
10 worktrees = get_worktree_list() # Blocks ~50ms
11
12 # During these 100ms, NO other requests can be processed!
13 return {"branch": branch, "worktrees": worktrees}問題 2:無法並行執行
1def validate_all_hooks(hooks_dir: str) -> list[ValidationResult]:
2 """Validate all hooks - sequential execution"""
3 results = []
4 for hook_file in Path(hooks_dir).glob("*.py"):
5 # Each validation runs one after another
6 result = validate_hook(str(hook_file)) # ~10ms each
7 results.append(result)
8 return results
9
10# 10 hooks = 100ms total
11# With parallelization, could be ~10ms!問題 3:無法有效利用等待時間
1def check_project_health() -> dict:
2 """Check multiple aspects of project health"""
3 # All I/O operations execute sequentially
4 git_status = run_git_command(["status", "-s"]) # Wait...
5 config = load_config("agents") # Wait...
6 links = check_markdown_links("docs/README.md") # Wait...
7
8 # Total time = sum of all operations
9 return {
10 "git": git_status,
11 "config": config,
12 "links": links
13 }進階解決方案
設計目標
- 在非同步程式碼中安全地呼叫同步函式(不阻塞事件迴圈)
- 在同步程式碼中使用非同步函式(保持向後相容)
- 避免巢狀事件迴圈問題
- 保持原有 API 不變
實作步驟
步驟 1:run_in_executor - 同步到非同步
run_in_executor 將同步函式放到執行緒池中執行,讓事件迴圈可以繼續處理其他任務。
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3from typing import TypeVar, Callable, Any
4
5T = TypeVar('T')
6
7# Create a shared thread pool for I/O operations
8_io_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="async_io")
9
10async def run_sync_in_executor(
11 func: Callable[..., T],
12 *args: Any,
13 **kwargs: Any
14) -> T:
15 """
16 Run a synchronous function in a thread pool executor.
17
18 This prevents blocking the event loop when calling
19 synchronous I/O operations from async code.
20
21 Args:
22 func: The synchronous function to execute
23 *args: Positional arguments for the function
24 **kwargs: Keyword arguments for the function
25
26 Returns:
27 The result of the function call
28
29 Example:
30 # Instead of blocking:
31 result = sync_function(arg1, arg2)
32
33 # Use executor:
34 result = await run_sync_in_executor(sync_function, arg1, arg2)
35 """
36 loop = asyncio.get_running_loop()
37
38 # functools.partial handles kwargs
39 if kwargs:
40 import functools
41 func = functools.partial(func, **kwargs)
42
43 return await loop.run_in_executor(_io_executor, func, *args)使用範例:包裝現有同步函式
1from lib.git_utils import get_current_branch, get_worktree_list, run_git_command
2from lib.config_loader import load_config
3from lib.hook_validator import validate_hook
4
5# Async wrappers for existing sync functions
6async def async_get_current_branch() -> Optional[str]:
7 """Async wrapper for get_current_branch"""
8 return await run_sync_in_executor(get_current_branch)
9
10async def async_get_worktree_list() -> list[dict]:
11 """Async wrapper for get_worktree_list"""
12 return await run_sync_in_executor(get_worktree_list)
13
14async def async_load_config(config_name: str) -> dict:
15 """Async wrapper for load_config"""
16 return await run_sync_in_executor(load_config, config_name)
17
18async def async_validate_hook(hook_path: str) -> ValidationResult:
19 """Async wrapper for validate_hook"""
20 return await run_sync_in_executor(validate_hook, hook_path)步驟 2:asyncio.run - 非同步到同步
當你有非同步程式碼,但需要從同步環境呼叫時,使用 asyncio.run。
1import asyncio
2from typing import TypeVar, Coroutine
3
4T = TypeVar('T')
5
6def run_async(coro: Coroutine[Any, Any, T]) -> T:
7 """
8 Run an async function from synchronous code.
9
10 Creates a new event loop if none exists.
11 Safe to call from synchronous entry points.
12
13 Args:
14 coro: The coroutine to execute
15
16 Returns:
17 The result of the coroutine
18
19 Example:
20 # From a synchronous script:
21 result = run_async(async_function(arg1, arg2))
22
23 Warning:
24 Cannot be called when an event loop is already running!
25 Use nest_asyncio or redesign your code structure.
26 """
27 return asyncio.run(coro)
28
29# Synchronous API that uses async implementation internally
30def get_all_worktree_branches() -> dict[str, str]:
31 """
32 Get branches for all worktrees.
33
34 Uses async implementation internally for parallelization,
35 but provides a synchronous API for compatibility.
36 """
37 async def _async_impl():
38 worktrees = await async_get_worktree_list()
39 tasks = []
40 for wt in worktrees:
41 path = wt["path"]
42 tasks.append(_get_branch_for_path(path))
43 results = await asyncio.gather(*tasks)
44 return dict(zip([wt["path"] for wt in worktrees], results))
45
46 return run_async(_async_impl())
47
48async def _get_branch_for_path(path: str) -> str:
49 """Get current branch for a specific path"""
50 success, output = await run_sync_in_executor(
51 run_git_command,
52 ["branch", "--show-current"],
53 cwd=path
54 )
55 return output if success else "unknown"步驟 3:處理已存在的事件迴圈
當你在已有事件迴圈的環境中(如 Jupyter Notebook、某些 Web 框架),直接呼叫 asyncio.run 會失敗:
1# This will raise RuntimeError in Jupyter or when loop is running
2asyncio.run(some_coroutine())
3# RuntimeError: asyncio.run() cannot be called from a running event loop解決方案 A:使用 nest_asyncio(快速修復)
1# pip install nest-asyncio
2import nest_asyncio
3
4def run_async_safe(coro: Coroutine[Any, Any, T]) -> T:
5 """
6 Run async code safely, even from a running event loop.
7
8 Uses nest_asyncio to allow nested event loops.
9 This is a pragmatic solution for environments like Jupyter.
10
11 Note:
12 nest_asyncio patches the event loop globally.
13 Use with caution in production code.
14 """
15 try:
16 loop = asyncio.get_running_loop()
17 except RuntimeError:
18 # No running loop - safe to use asyncio.run
19 return asyncio.run(coro)
20
21 # Running loop exists - need to nest
22 nest_asyncio.apply()
23 return loop.run_until_complete(coro)解決方案 B:偵測執行環境(推薦)
1import asyncio
2from typing import TypeVar, Coroutine, Any
3
4T = TypeVar('T')
5
6def is_event_loop_running() -> bool:
7 """Check if there's a running event loop"""
8 try:
9 asyncio.get_running_loop()
10 return True
11 except RuntimeError:
12 return False
13
14def run_async_adaptive(coro: Coroutine[Any, Any, T]) -> T:
15 """
16 Run async code with automatic environment detection.
17
18 - If no event loop: uses asyncio.run()
19 - If loop running: uses run_in_executor to run in a new thread
20
21 This is safer than nest_asyncio for production use.
22 """
23 if is_event_loop_running():
24 # We're in an async context - run in a new thread
25 import concurrent.futures
26 with concurrent.futures.ThreadPoolExecutor() as executor:
27 future = executor.submit(asyncio.run, coro)
28 return future.result()
29 else:
30 # Safe to run directly
31 return asyncio.run(coro)解決方案 C:提供雙重 API(最佳實踐)
1class GitUtils:
2 """
3 Git utilities with both sync and async APIs.
4
5 Usage:
6 # Synchronous (traditional)
7 utils = GitUtils()
8 branch = utils.get_current_branch()
9
10 # Asynchronous
11 branch = await utils.async_get_current_branch()
12 """
13
14 def __init__(self, cwd: Optional[str] = None):
15 self.cwd = cwd
16
17 # ===== Synchronous API (original) =====
18
19 def get_current_branch(self) -> Optional[str]:
20 """Get current branch (sync)"""
21 success, output = run_git_command(
22 ["branch", "--show-current"],
23 cwd=self.cwd
24 )
25 return output if success else None
26
27 def get_worktree_list(self) -> list[dict]:
28 """Get worktree list (sync)"""
29 # ... original implementation
30 pass
31
32 # ===== Asynchronous API =====
33
34 async def async_get_current_branch(self) -> Optional[str]:
35 """Get current branch (async)"""
36 return await run_sync_in_executor(self.get_current_branch)
37
38 async def async_get_worktree_list(self) -> list[dict]:
39 """Get worktree list (async)"""
40 return await run_sync_in_executor(self.get_worktree_list)步驟 4:建立統一的 API
整合以上模式,建立一個統一的適配器:
1"""
2Sync/Async Bridge Adapter
3
4Provides unified access to .claude/lib modules from both
5synchronous and asynchronous contexts.
6"""
7
8import asyncio
9import functools
10from concurrent.futures import ThreadPoolExecutor
11from typing import TypeVar, Callable, Any, Optional, Coroutine
12
13T = TypeVar('T')
14
15# Shared executor for I/O operations
16_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="lib_async")
17
18class AsyncAdapter:
19 """
20 Adapter for converting sync functions to async.
21
22 Provides both decorator and wrapper patterns for flexibility.
23
24 Example:
25 adapter = AsyncAdapter()
26
27 # As decorator
28 @adapter.make_async
29 def sync_function(x):
30 return x * 2
31
32 result = await sync_function(5)
33
34 # As wrapper
35 result = await adapter.run(other_sync_function, arg1, arg2)
36 """
37
38 def __init__(self, executor: Optional[ThreadPoolExecutor] = None):
39 self.executor = executor or _executor
40
41 async def run(
42 self,
43 func: Callable[..., T],
44 *args: Any,
45 **kwargs: Any
46 ) -> T:
47 """Run a sync function asynchronously"""
48 loop = asyncio.get_running_loop()
49
50 if kwargs:
51 func = functools.partial(func, **kwargs)
52
53 return await loop.run_in_executor(self.executor, func, *args)
54
55 def make_async(self, func: Callable[..., T]) -> Callable[..., Coroutine[Any, Any, T]]:
56 """
57 Decorator to create async version of sync function.
58
59 Example:
60 @adapter.make_async
61 def slow_io_operation(path: str) -> str:
62 with open(path) as f:
63 return f.read()
64
65 # Now can be awaited
66 content = await slow_io_operation("file.txt")
67 """
68 @functools.wraps(func)
69 async def wrapper(*args: Any, **kwargs: Any) -> T:
70 return await self.run(func, *args, **kwargs)
71 return wrapper
72
73class SyncAdapter:
74 """
75 Adapter for calling async functions from sync code.
76
77 Example:
78 adapter = SyncAdapter()
79
80 # Run async function synchronously
81 result = adapter.run(async_function(arg1, arg2))
82 """
83
84 @staticmethod
85 def run(coro: Coroutine[Any, Any, T]) -> T:
86 """
87 Run a coroutine from synchronous code.
88
89 Automatically handles the case when an event loop
90 is already running.
91 """
92 try:
93 asyncio.get_running_loop()
94 # Loop is running - use thread
95 import concurrent.futures
96 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
97 future = ex.submit(asyncio.run, coro)
98 return future.result()
99 except RuntimeError:
100 # No running loop - safe to use asyncio.run
101 return asyncio.run(coro)
102
103 @staticmethod
104 def make_sync(
105 async_func: Callable[..., Coroutine[Any, Any, T]]
106 ) -> Callable[..., T]:
107 """
108 Decorator to create sync version of async function.
109
110 Example:
111 @SyncAdapter.make_sync
112 async def async_fetch(url: str) -> str:
113 async with aiohttp.get(url) as resp:
114 return await resp.text()
115
116 # Now can be called synchronously
117 content = async_fetch("https://example.com")
118 """
119 @functools.wraps(async_func)
120 def wrapper(*args: Any, **kwargs: Any) -> T:
121 coro = async_func(*args, **kwargs)
122 return SyncAdapter.run(coro)
123 return wrapper完整程式碼
1#!/usr/bin/env python3
2"""
3Sync/Async Bridge for .claude/lib
4
5This module provides async wrappers for the synchronous
6.claude/lib modules, enabling their use in async contexts
7like FastAPI without blocking the event loop.
8
9Usage:
10 # In async code (FastAPI, aiohttp, etc.)
11 from lib_async import (
12 async_get_current_branch,
13 async_load_config,
14 async_validate_hooks,
15 )
16
17 branch = await async_get_current_branch()
18
19 # In sync code that needs parallelization
20 from lib_async import parallel_validate_hooks
21
22 results = parallel_validate_hooks(hooks_dir)
23"""
24
25import asyncio
26import functools
27from concurrent.futures import ThreadPoolExecutor
28from pathlib import Path
29from typing import TypeVar, Callable, Any, Optional, Coroutine
30
31# Import original sync modules
32from lib.git_utils import (
33 run_git_command,
34 get_current_branch,
35 get_project_root,
36 get_worktree_list,
37 is_protected_branch,
38 is_allowed_branch,
39)
40from lib.config_loader import (
41 load_config,
42 load_agents_config,
43 load_quality_rules,
44)
45from lib.hook_io import (
46 read_hook_input,
47 write_hook_output,
48)
49from lib.hook_validator import (
50 validate_hook,
51 validate_all_hooks,
52 ValidationResult,
53)
54from lib.markdown_link_checker import (
55 check_markdown_links,
56 check_directory as check_markdown_directory,
57 LinkCheckResult,
58)
59
60T = TypeVar('T')
61
62# ===== Shared Resources =====
63
64# Thread pool for I/O-bound sync operations
65_io_executor = ThreadPoolExecutor(
66 max_workers=10,
67 thread_name_prefix="lib_async_io"
68)
69
70# ===== Core Utilities =====
71
72async def run_in_executor(
73 func: Callable[..., T],
74 *args: Any,
75 **kwargs: Any
76) -> T:
77 """
78 Run a synchronous function in the thread pool executor.
79
80 Args:
81 func: Synchronous function to execute
82 *args: Positional arguments
83 **kwargs: Keyword arguments
84
85 Returns:
86 Function result
87 """
88 loop = asyncio.get_running_loop()
89
90 if kwargs:
91 func = functools.partial(func, **kwargs)
92
93 return await loop.run_in_executor(_io_executor, func, *args)
94
95def make_async(func: Callable[..., T]) -> Callable[..., Coroutine[Any, Any, T]]:
96 """
97 Decorator to create async version of a sync function.
98
99 Example:
100 @make_async
101 def slow_operation(x):
102 time.sleep(1)
103 return x * 2
104
105 result = await slow_operation(5)
106 """
107 @functools.wraps(func)
108 async def wrapper(*args: Any, **kwargs: Any) -> T:
109 return await run_in_executor(func, *args, **kwargs)
110 return wrapper
111
112# ===== Async Git Utils =====
113
114async def async_run_git_command(
115 args: list[str],
116 cwd: Optional[str] = None,
117 timeout: int = 10
118) -> tuple[bool, str]:
119 """Async version of run_git_command"""
120 return await run_in_executor(run_git_command, args, cwd=cwd, timeout=timeout)
121
122async def async_get_current_branch() -> Optional[str]:
123 """Async version of get_current_branch"""
124 return await run_in_executor(get_current_branch)
125
126async def async_get_project_root() -> str:
127 """Async version of get_project_root"""
128 return await run_in_executor(get_project_root)
129
130async def async_get_worktree_list() -> list[dict]:
131 """Async version of get_worktree_list"""
132 return await run_in_executor(get_worktree_list)
133
134# ===== Async Config Loader =====
135
136async def async_load_config(config_name: str) -> dict:
137 """Async version of load_config"""
138 return await run_in_executor(load_config, config_name)
139
140async def async_load_agents_config() -> dict:
141 """Async version of load_agents_config"""
142 return await run_in_executor(load_agents_config)
143
144async def async_load_quality_rules() -> dict:
145 """Async version of load_quality_rules"""
146 return await run_in_executor(load_quality_rules)
147
148# ===== Async Hook Validator =====
149
150async def async_validate_hook(hook_path: str) -> ValidationResult:
151 """Async version of validate_hook"""
152 return await run_in_executor(validate_hook, hook_path)
153
154async def async_validate_all_hooks(
155 hooks_dir: Optional[str] = None
156) -> list[ValidationResult]:
157 """
158 Validate all hooks in parallel.
159
160 Unlike the sync version that validates sequentially,
161 this version validates all hooks concurrently.
162 """
163 if hooks_dir is None:
164 hooks_dir = str(Path(await async_get_project_root()) / ".claude" / "hooks")
165
166 hooks_path = Path(hooks_dir)
167 if not hooks_path.is_dir():
168 return []
169
170 # Collect all hook files
171 hook_files = [
172 str(f) for f in sorted(hooks_path.glob("*.py"))
173 if not f.name.startswith("_")
174 ]
175
176 # Validate all in parallel
177 tasks = [async_validate_hook(hook) for hook in hook_files]
178 return await asyncio.gather(*tasks)
179
180# ===== Async Markdown Link Checker =====
181
182async def async_check_markdown_links(file_path: str) -> LinkCheckResult:
183 """Async version of check_markdown_links"""
184 return await run_in_executor(check_markdown_links, file_path)
185
186async def async_check_markdown_directory(
187 dir_path: str,
188 recursive: bool = True
189) -> list[LinkCheckResult]:
190 """
191 Check all markdown files in directory in parallel.
192 """
193 # Get file list synchronously (fast operation)
194 dir_path_obj = Path(dir_path)
195 if not dir_path_obj.is_dir():
196 return []
197
198 if recursive:
199 md_files = list(dir_path_obj.rglob("*.md"))
200 else:
201 md_files = list(dir_path_obj.glob("*.md"))
202
203 # Check all files in parallel
204 tasks = [async_check_markdown_links(str(f)) for f in md_files]
205 return await asyncio.gather(*tasks)
206
207# ===== Parallel Operations =====
208
209async def async_check_all_worktrees() -> dict[str, dict]:
210 """
211 Check status of all worktrees in parallel.
212
213 Returns:
214 dict: {path: {"branch": str, "status": str, "is_clean": bool}}
215 """
216 worktrees = await async_get_worktree_list()
217
218 async def check_one(wt: dict) -> tuple[str, dict]:
219 path = wt["path"]
220
221 # Run git status and branch in parallel for each worktree
222 status_task = async_run_git_command(["status", "-s"], cwd=path)
223 branch_task = async_run_git_command(["branch", "--show-current"], cwd=path)
224
225 (status_ok, status), (branch_ok, branch) = await asyncio.gather(
226 status_task, branch_task
227 )
228
229 return path, {
230 "branch": branch if branch_ok else wt.get("branch", "unknown"),
231 "status": status if status_ok else "error",
232 "is_clean": status_ok and not status,
233 }
234
235 tasks = [check_one(wt) for wt in worktrees]
236 results = await asyncio.gather(*tasks)
237
238 return dict(results)
239
240async def async_project_health_check() -> dict:
241 """
242 Comprehensive project health check with parallel execution.
243
244 Checks:
245 - Git status
246 - Configuration validity
247 - Hook compliance
248 - Documentation links
249
250 Returns:
251 dict: Health check results
252 """
253 # Run all checks in parallel
254 git_task = async_get_current_branch()
255 worktrees_task = async_check_all_worktrees()
256 config_task = async_load_agents_config()
257 hooks_task = async_validate_all_hooks()
258
259 branch, worktrees, config, hook_results = await asyncio.gather(
260 git_task, worktrees_task, config_task, hooks_task,
261 return_exceptions=True # Don't fail if one check fails
262 )
263
264 return {
265 "git": {
266 "branch": branch if not isinstance(branch, Exception) else str(branch),
267 "worktrees": worktrees if not isinstance(worktrees, Exception) else {},
268 },
269 "config": {
270 "loaded": not isinstance(config, Exception),
271 "agents_count": len(config.get("known_agents", [])) if not isinstance(config, Exception) else 0,
272 },
273 "hooks": {
274 "total": len(hook_results) if not isinstance(hook_results, Exception) else 0,
275 "compliant": sum(1 for r in hook_results if r.is_compliant) if not isinstance(hook_results, Exception) else 0,
276 }
277 }
278
279# ===== Sync Wrappers for Async Functions =====
280
281def parallel_validate_hooks(hooks_dir: Optional[str] = None) -> list[ValidationResult]:
282 """
283 Synchronous API that uses async parallelization internally.
284
285 Use this when you want parallelization but are in sync context.
286 """
287 return asyncio.run(async_validate_all_hooks(hooks_dir))
288
289def parallel_check_worktrees() -> dict[str, dict]:
290 """
291 Synchronous API for parallel worktree checking.
292 """
293 return asyncio.run(async_check_all_worktrees())
294
295def project_health_check() -> dict:
296 """
297 Synchronous API for comprehensive health check.
298 """
299 return asyncio.run(async_project_health_check())
300
301# ===== Demo =====
302
303async def demo():
304 """Demonstrate the sync/async bridge capabilities"""
305 import time
306
307 print("=" * 60)
308 print("Sync/Async Bridge Demo")
309 print("=" * 60)
310
311 # 1. Basic async wrapper usage
312 print("\n1. Basic async wrapper:")
313 branch = await async_get_current_branch()
314 print(f" Current branch: {branch}")
315
316 # 2. Parallel execution comparison
317 print("\n2. Parallel vs Sequential comparison:")
318
319 # Sequential (simulated)
320 start = time.perf_counter()
321 worktrees = await async_get_worktree_list()
322 seq_time = 0
323 for wt in worktrees:
324 _ = await async_run_git_command(["status", "-s"], cwd=wt["path"])
325 seq_time += time.perf_counter() - start
326 start = time.perf_counter()
327 print(f" Sequential check: {seq_time:.3f}s")
328
329 # Parallel
330 start = time.perf_counter()
331 results = await async_check_all_worktrees()
332 par_time = time.perf_counter() - start
333 print(f" Parallel check: {par_time:.3f}s")
334 print(f" Speedup: {seq_time/par_time:.1f}x")
335
336 # 3. Project health check
337 print("\n3. Project health check (parallel):")
338 start = time.perf_counter()
339 health = await async_project_health_check()
340 elapsed = time.perf_counter() - start
341
342 print(f" Branch: {health['git']['branch']}")
343 print(f" Worktrees: {len(health['git']['worktrees'])}")
344 print(f" Hooks compliant: {health['hooks']['compliant']}/{health['hooks']['total']}")
345 print(f" Time: {elapsed:.3f}s")
346
347 print("\n" + "=" * 60)
348
349if __name__ == "__main__":
350 asyncio.run(demo())使用範例
在 FastAPI 中使用同步函式
1from fastapi import FastAPI, HTTPException
2from lib_async import (
3 async_get_current_branch,
4 async_get_worktree_list,
5 async_check_all_worktrees,
6 async_validate_all_hooks,
7 async_project_health_check,
8)
9
10app = FastAPI()
11
12@app.get("/git/branch")
13async def get_branch():
14 """
15 Get current git branch.
16
17 Uses async wrapper to prevent blocking the event loop.
18 """
19 branch = await async_get_current_branch()
20 if branch is None:
21 raise HTTPException(status_code=500, detail="Not a git repository")
22 return {"branch": branch}
23
24@app.get("/git/worktrees")
25async def get_worktrees():
26 """
27 Get all worktrees with their status.
28
29 Checks all worktrees in parallel for fast response.
30 """
31 worktrees = await async_check_all_worktrees()
32 return {"worktrees": worktrees}
33
34@app.get("/health")
35async def health_check():
36 """
37 Comprehensive project health check.
38
39 Runs multiple checks in parallel:
40 - Git status
41 - Configuration
42 - Hook compliance
43 """
44 health = await async_project_health_check()
45 return health
46
47@app.get("/hooks/validate")
48async def validate_hooks():
49 """
50 Validate all hooks in parallel.
51
52 Much faster than sequential validation for many hooks.
53 """
54 results = await async_validate_all_hooks()
55
56 return {
57 "total": len(results),
58 "compliant": sum(1 for r in results if r.is_compliant),
59 "issues": [
60 {
61 "hook": r.hook_path,
62 "issues": [
63 {"level": i.level, "message": i.message}
64 for i in r.issues
65 ]
66 }
67 for r in results
68 if not r.is_compliant
69 ]
70 }在同步腳本中使用非同步函式
1#!/usr/bin/env python3
2"""
3Synchronous script that leverages async parallelization internally.
4"""
5
6from lib_async import (
7 parallel_validate_hooks,
8 parallel_check_worktrees,
9 project_health_check,
10)
11
12def main():
13 print("Project Health Report")
14 print("=" * 50)
15
16 # These functions use asyncio internally for parallelization
17 # but provide a synchronous API
18
19 # 1. Check all worktrees in parallel
20 print("\n1. Worktree Status:")
21 worktrees = parallel_check_worktrees()
22 for path, info in worktrees.items():
23 status = "clean" if info["is_clean"] else "dirty"
24 print(f" [{info['branch']}] {path}: {status}")
25
26 # 2. Validate all hooks in parallel
27 print("\n2. Hook Validation:")
28 results = parallel_validate_hooks()
29 compliant = sum(1 for r in results if r.is_compliant)
30 print(f" Compliant: {compliant}/{len(results)}")
31
32 for result in results:
33 if not result.is_compliant:
34 print(f" - {result.hook_path}:")
35 for issue in result.issues:
36 print(f" [{issue.level}] {issue.message}")
37
38 # 3. Comprehensive health check
39 print("\n3. Overall Health:")
40 health = project_health_check()
41 print(f" Git branch: {health['git']['branch']}")
42 print(f" Config loaded: {health['config']['loaded']}")
43 print(f" Hooks OK: {health['hooks']['compliant']}/{health['hooks']['total']}")
44
45if __name__ == "__main__":
46 main()Python 3.9+ 使用 asyncio.to_thread
Python 3.9 引入了 asyncio.to_thread,提供更簡潔的語法:
1import asyncio
2from lib.git_utils import get_current_branch, run_git_command
3from lib.config_loader import load_config
4
5# Python 3.9+ simplified syntax
6async def async_get_current_branch_39() -> Optional[str]:
7 """Using asyncio.to_thread (Python 3.9+)"""
8 return await asyncio.to_thread(get_current_branch)
9
10async def async_load_config_39(config_name: str) -> dict:
11 """Using asyncio.to_thread (Python 3.9+)"""
12 return await asyncio.to_thread(load_config, config_name)
13
14async def async_run_git_command_39(
15 args: list[str],
16 cwd: Optional[str] = None
17) -> tuple[bool, str]:
18 """Using asyncio.to_thread with keyword arguments"""
19 # to_thread supports kwargs directly in Python 3.9+
20 return await asyncio.to_thread(run_git_command, args, cwd)
21
22# Comparison: run_in_executor vs to_thread
23async def comparison_demo():
24 """
25 asyncio.to_thread vs run_in_executor
26
27 to_thread advantages:
28 - Simpler syntax
29 - Better default executor management
30 - Direct kwargs support
31
32 run_in_executor advantages:
33 - Works on Python 3.7+
34 - Can use custom executors
35 - More control over thread pool
36 """
37 # run_in_executor (Python 3.7+)
38 loop = asyncio.get_running_loop()
39 result1 = await loop.run_in_executor(None, get_current_branch)
40
41 # to_thread (Python 3.9+)
42 result2 = await asyncio.to_thread(get_current_branch)
43
44 assert result1 == result2設計權衡
| 面向 | run_in_executor | asyncio.run | asyncio.to_thread |
|---|---|---|---|
| 方向 | 同步 -> 非同步 | 非同步 -> 同步 | 同步 -> 非同步 |
| 執行緒 | 使用執行緒池 | 建立新事件迴圈 | 使用預設執行緒池 |
| 適用場景 | 非同步環境呼叫同步 I/O | 同步入口點執行非同步 | 非同步環境呼叫同步 I/O |
| 限制 | 需要事件迴圈存在 | 不能巢狀呼叫 | 需要 Python 3.9+ |
| 效能 | 高(可重用執行緒) | 中(建立新迴圈開銷) | 高(優化的執行緒池) |
| 複雜度 | 中 | 低 | 低 |
ThreadPoolExecutor vs ProcessPoolExecutor
| 面向 | ThreadPoolExecutor | ProcessPoolExecutor |
|---|---|---|
| 適用場景 | I/O 密集操作 | CPU 密集操作 |
| 記憶體 | 共享記憶體 | 獨立記憶體空間 |
| GIL | 受 GIL 限制 | 繞過 GIL |
| 啟動成本 | 低 | 高(進程建立) |
| 資料傳遞 | 直接傳遞 | 需要序列化 |
1from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
2
3# I/O-bound: use ThreadPoolExecutor
4io_executor = ThreadPoolExecutor(max_workers=10)
5
6# CPU-bound: use ProcessPoolExecutor
7cpu_executor = ProcessPoolExecutor(max_workers=4)
8
9async def io_bound_task():
10 """File I/O, network calls, subprocess"""
11 loop = asyncio.get_running_loop()
12 return await loop.run_in_executor(io_executor, sync_io_function)
13
14async def cpu_bound_task():
15 """Heavy computation"""
16 loop = asyncio.get_running_loop()
17 return await loop.run_in_executor(cpu_executor, sync_cpu_function)什麼時候該用這個技術?
適合使用
- 漸進式遷移到 asyncio:有大量同步程式碼,需要逐步遷移
- 在 FastAPI 中使用同步第三方庫:如
requests、boto3、資料庫驅動 - 提供同步/非同步雙 API:讓使用者選擇適合的模式
- 並行化現有同步操作:如批次檔案處理、多 API 呼叫
- 整合傳統程式碼:舊系統的同步函式需要在新非同步系統中使用
不建議使用
- 全新專案:直接用原生 asyncio 設計
- CPU 密集操作:應用
multiprocessing或ProcessPoolExecutor - 簡單的單一操作:不需要並行的情況
- 效能極度敏感:執行緒池有微小開銷
- 已有原生非同步替代方案:如用
aiohttp替代requests
練習
基礎練習
- 用 run_in_executor 包裝 requests.get
1import requests
2import asyncio
3
4# TODO: Implement this function
5async def async_fetch(url: str) -> str:
6 """
7 Fetch URL content asynchronously using requests library.
8
9 Hint: Use run_in_executor to wrap requests.get
10 """
11 pass
12
13# Test your implementation
14async def test():
15 content = await async_fetch("https://httpbin.org/get")
16 print(content[:200])
17
18asyncio.run(test())參考解答
1import requests
2import asyncio
3
4async def async_fetch(url: str) -> str:
5 """Fetch URL content asynchronously using requests"""
6 loop = asyncio.get_running_loop()
7
8 def fetch():
9 response = requests.get(url, timeout=10)
10 response.raise_for_status()
11 return response.text
12
13 return await loop.run_in_executor(None, fetch)
14
15# Or using to_thread (Python 3.9+)
16async def async_fetch_39(url: str) -> str:
17 def fetch():
18 response = requests.get(url, timeout=10)
19 response.raise_for_status()
20 return response.text
21
22 return await asyncio.to_thread(fetch)進階練習
- 建立支援同步/非同步雙模式的 API 客戶端
1# TODO: Implement a dual-mode API client
2class WeatherClient:
3 """
4 Weather API client supporting both sync and async modes.
5
6 Usage:
7 client = WeatherClient(api_key="xxx")
8
9 # Sync mode
10 weather = client.get_weather("Tokyo")
11
12 # Async mode
13 weather = await client.async_get_weather("Tokyo")
14 """
15
16 def __init__(self, api_key: str):
17 self.api_key = api_key
18 self.base_url = "https://api.weatherapi.com/v1"
19
20 def get_weather(self, city: str) -> dict:
21 """Synchronous weather fetch"""
22 # TODO: Implement
23 pass
24
25 async def async_get_weather(self, city: str) -> dict:
26 """Asynchronous weather fetch"""
27 # TODO: Implement
28 pass
29
30 async def async_get_multiple(self, cities: list[str]) -> dict[str, dict]:
31 """Fetch weather for multiple cities in parallel"""
32 # TODO: Implement
33 pass參考解答
1import requests
2import asyncio
3from typing import Optional
4
5class WeatherClient:
6 def __init__(self, api_key: str):
7 self.api_key = api_key
8 self.base_url = "https://api.weatherapi.com/v1"
9
10 def get_weather(self, city: str) -> dict:
11 """Synchronous weather fetch"""
12 url = f"{self.base_url}/current.json"
13 response = requests.get(
14 url,
15 params={"key": self.api_key, "q": city},
16 timeout=10
17 )
18 response.raise_for_status()
19 return response.json()
20
21 async def async_get_weather(self, city: str) -> dict:
22 """Asynchronous weather fetch using run_in_executor"""
23 loop = asyncio.get_running_loop()
24 return await loop.run_in_executor(None, self.get_weather, city)
25
26 async def async_get_multiple(self, cities: list[str]) -> dict[str, dict]:
27 """Fetch weather for multiple cities in parallel"""
28 tasks = [self.async_get_weather(city) for city in cities]
29 results = await asyncio.gather(*tasks, return_exceptions=True)
30
31 return {
32 city: result if not isinstance(result, Exception) else {"error": str(result)}
33 for city, result in zip(cities, results)
34 }挑戰題
- 實作自動偵測執行環境的適配器
1# TODO: Implement an adaptive function caller
2class AdaptiveCaller:
3 """
4 Automatically detects the execution context and calls
5 functions appropriately.
6
7 - In async context: awaits coroutines, wraps sync functions
8 - In sync context: runs async functions, calls sync directly
9
10 Usage:
11 caller = AdaptiveCaller()
12
13 # Works in both sync and async contexts!
14 result = caller.call(some_function, arg1, arg2)
15 """
16
17 def call(self, func, *args, **kwargs):
18 """
19 Call a function adaptively based on context.
20
21 - If func is async and we're in sync: run with asyncio.run
22 - If func is sync and we're in async: use run_in_executor
23 - Otherwise: call directly
24 """
25 # TODO: Implement
26 pass參考解答
1import asyncio
2import inspect
3from typing import Any, Callable
4from concurrent.futures import ThreadPoolExecutor
5
6class AdaptiveCaller:
7 def __init__(self):
8 self._executor = ThreadPoolExecutor(max_workers=10)
9
10 def _is_async_context(self) -> bool:
11 """Check if we're in an async context"""
12 try:
13 asyncio.get_running_loop()
14 return True
15 except RuntimeError:
16 return False
17
18 def call(self, func: Callable, *args, **kwargs) -> Any:
19 """Call function adaptively based on context"""
20 is_async_func = asyncio.iscoroutinefunction(func)
21 in_async_context = self._is_async_context()
22
23 if is_async_func:
24 if in_async_context:
25 # Return coroutine to be awaited
26 return func(*args, **kwargs)
27 else:
28 # Run async function in new event loop
29 return asyncio.run(func(*args, **kwargs))
30 else:
31 if in_async_context:
32 # Wrap sync function for async context
33 return self._wrap_sync(func, *args, **kwargs)
34 else:
35 # Call sync function directly
36 return func(*args, **kwargs)
37
38 async def _wrap_sync(self, func: Callable, *args, **kwargs) -> Any:
39 """Wrap sync function for async execution"""
40 import functools
41 loop = asyncio.get_running_loop()
42
43 if kwargs:
44 func = functools.partial(func, **kwargs)
45 return await loop.run_in_executor(self._executor, func, *args)
46
47 return await loop.run_in_executor(self._executor, func, *args)
48
49 async def call_async(self, func: Callable, *args, **kwargs) -> Any:
50 """Explicitly async version of call"""
51 is_async_func = asyncio.iscoroutinefunction(func)
52
53 if is_async_func:
54 return await func(*args, **kwargs)
55 else:
56 return await self._wrap_sync(func, *args, **kwargs)
57
58# Usage example
59caller = AdaptiveCaller()
60
61def sync_func(x):
62 return x * 2
63
64async def async_func(x):
65 await asyncio.sleep(0.1)
66 return x * 3
67
68# In sync context
69result1 = caller.call(sync_func, 5) # Direct call
70result2 = caller.call(async_func, 5) # Uses asyncio.run
71
72# In async context
73async def demo():
74 result3 = await caller.call_async(sync_func, 5) # Uses executor
75 result4 = await caller.call_async(async_func, 5) # Direct await延伸閱讀
上一章:並行 I/O 操作 返回:模組一:非同步程式設計
#python #python-advanced #asyncio #case-study #design-patterns