本案例基於 .claude/lib 整體架構,展示如何用 run_in_executorasyncio.run 在同步與非同步程式碼之間建立橋樑。

先備知識

問題背景

現有設計

.claude/lib 是一個同步設計的 Python 工具庫,包含多個模組:

 1# .claude/lib/__init__.py
 2"""
 3Claude Hooks 共用程式庫
 4
 5模組結構:
 6- git_utils: Git 操作工具(分支、worktree、專案根目錄)
 7- config_loader: 配置檔案載入
 8- hook_io: Hook 輸入輸出處理
 9- hook_validator: Hook 合規性驗證
10- markdown_link_checker: Markdown 連結檢查
11"""
12
13from .git_utils import (
14    run_git_command,
15    get_current_branch,
16    get_project_root,
17    get_worktree_list,
18)
19
20from .config_loader import (
21    load_config,
22    load_agents_config,
23)
24
25from .hook_io import (
26    read_hook_input,
27    write_hook_output,
28)

這些函式都是同步的:

 1# git_utils.py - 同步的 subprocess 呼叫
 2def run_git_command(
 3    args: list[str],
 4    cwd: Optional[str] = None,
 5    timeout: int = 10
 6) -> tuple[bool, str]:
 7    """Execute git command and return result"""
 8    result = subprocess.run(
 9        ["git"] + args,
10        cwd=cwd,
11        capture_output=True,
12        text=True,
13        timeout=timeout
14    )
15    if result.returncode == 0:
16        return True, result.stdout.strip()
17    return False, result.stderr.strip()
18
19# config_loader.py - 同步的檔案 I/O
20def load_config(config_name: str) -> dict:
21    """Load configuration file"""
22    config_dir = get_config_dir()
23    yaml_path = config_dir / f"{config_name}.yaml"
24
25    with open(yaml_path, "r", encoding="utf-8") as f:
26        return yaml.safe_load(f)
27
28# hook_validator.py - 同步的檔案驗證
29def validate_hook(hook_path: str) -> ValidationResult:
30    """Validate a single hook file"""
31    with open(hook_path, "r", encoding="utf-8") as f:
32        content = f.read()
33    # ... validation logic

這個設計的優點

  1. 簡單直覺:不需要了解 asyncio,任何 Python 開發者都能使用
  2. 向後相容:可以在任何 Python 環境中執行
  3. 測試容易:同步程式碼的測試更直觀
  4. 依賴少:不需要額外的非同步依賴庫

這個設計的限制

在非同步環境(如 FastAPI、aiohttp)中使用時:

問題 1:阻塞事件迴圈

 1from fastapi import FastAPI
 2from lib.git_utils import get_current_branch, get_worktree_list
 3
 4app = FastAPI()
 5
 6@app.get("/git/status")
 7async def get_git_status():
 8    # BAD: These synchronous calls block the event loop!
 9    branch = get_current_branch()      # Blocks ~50ms
10    worktrees = get_worktree_list()    # Blocks ~50ms
11
12    # During these 100ms, NO other requests can be processed!
13    return {"branch": branch, "worktrees": worktrees}

問題 2:無法並行執行

 1def validate_all_hooks(hooks_dir: str) -> list[ValidationResult]:
 2    """Validate all hooks - sequential execution"""
 3    results = []
 4    for hook_file in Path(hooks_dir).glob("*.py"):
 5        # Each validation runs one after another
 6        result = validate_hook(str(hook_file))  # ~10ms each
 7        results.append(result)
 8    return results
 9
10# 10 hooks = 100ms total
11# With parallelization, could be ~10ms!

問題 3:無法有效利用等待時間

 1def check_project_health() -> dict:
 2    """Check multiple aspects of project health"""
 3    # All I/O operations execute sequentially
 4    git_status = run_git_command(["status", "-s"])      # Wait...
 5    config = load_config("agents")                       # Wait...
 6    links = check_markdown_links("docs/README.md")       # Wait...
 7
 8    # Total time = sum of all operations
 9    return {
10        "git": git_status,
11        "config": config,
12        "links": links
13    }

進階解決方案

設計目標

  1. 在非同步程式碼中安全地呼叫同步函式(不阻塞事件迴圈)
  2. 在同步程式碼中使用非同步函式(保持向後相容)
  3. 避免巢狀事件迴圈問題
  4. 保持原有 API 不變

實作步驟

步驟 1:run_in_executor - 同步到非同步

run_in_executor 將同步函式放到執行緒池中執行,讓事件迴圈可以繼續處理其他任務。

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3from typing import TypeVar, Callable, Any
 4
 5T = TypeVar('T')
 6
 7# Create a shared thread pool for I/O operations
 8_io_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="async_io")
 9
10async def run_sync_in_executor(
11    func: Callable[..., T],
12    *args: Any,
13    **kwargs: Any
14) -> T:
15    """
16    Run a synchronous function in a thread pool executor.
17
18    This prevents blocking the event loop when calling
19    synchronous I/O operations from async code.
20
21    Args:
22        func: The synchronous function to execute
23        *args: Positional arguments for the function
24        **kwargs: Keyword arguments for the function
25
26    Returns:
27        The result of the function call
28
29    Example:
30        # Instead of blocking:
31        result = sync_function(arg1, arg2)
32
33        # Use executor:
34        result = await run_sync_in_executor(sync_function, arg1, arg2)
35    """
36    loop = asyncio.get_running_loop()
37
38    # functools.partial handles kwargs
39    if kwargs:
40        import functools
41        func = functools.partial(func, **kwargs)
42
43    return await loop.run_in_executor(_io_executor, func, *args)

使用範例:包裝現有同步函式

 1from lib.git_utils import get_current_branch, get_worktree_list, run_git_command
 2from lib.config_loader import load_config
 3from lib.hook_validator import validate_hook
 4
 5# Async wrappers for existing sync functions
 6async def async_get_current_branch() -> Optional[str]:
 7    """Async wrapper for get_current_branch"""
 8    return await run_sync_in_executor(get_current_branch)
 9
10async def async_get_worktree_list() -> list[dict]:
11    """Async wrapper for get_worktree_list"""
12    return await run_sync_in_executor(get_worktree_list)
13
14async def async_load_config(config_name: str) -> dict:
15    """Async wrapper for load_config"""
16    return await run_sync_in_executor(load_config, config_name)
17
18async def async_validate_hook(hook_path: str) -> ValidationResult:
19    """Async wrapper for validate_hook"""
20    return await run_sync_in_executor(validate_hook, hook_path)

步驟 2:asyncio.run - 非同步到同步

當你有非同步程式碼,但需要從同步環境呼叫時,使用 asyncio.run

 1import asyncio
 2from typing import TypeVar, Coroutine
 3
 4T = TypeVar('T')
 5
 6def run_async(coro: Coroutine[Any, Any, T]) -> T:
 7    """
 8    Run an async function from synchronous code.
 9
10    Creates a new event loop if none exists.
11    Safe to call from synchronous entry points.
12
13    Args:
14        coro: The coroutine to execute
15
16    Returns:
17        The result of the coroutine
18
19    Example:
20        # From a synchronous script:
21        result = run_async(async_function(arg1, arg2))
22
23    Warning:
24        Cannot be called when an event loop is already running!
25        Use nest_asyncio or redesign your code structure.
26    """
27    return asyncio.run(coro)
28
29# Synchronous API that uses async implementation internally
30def get_all_worktree_branches() -> dict[str, str]:
31    """
32    Get branches for all worktrees.
33
34    Uses async implementation internally for parallelization,
35    but provides a synchronous API for compatibility.
36    """
37    async def _async_impl():
38        worktrees = await async_get_worktree_list()
39        tasks = []
40        for wt in worktrees:
41            path = wt["path"]
42            tasks.append(_get_branch_for_path(path))
43        results = await asyncio.gather(*tasks)
44        return dict(zip([wt["path"] for wt in worktrees], results))
45
46    return run_async(_async_impl())
47
48async def _get_branch_for_path(path: str) -> str:
49    """Get current branch for a specific path"""
50    success, output = await run_sync_in_executor(
51        run_git_command,
52        ["branch", "--show-current"],
53        cwd=path
54    )
55    return output if success else "unknown"

步驟 3:處理已存在的事件迴圈

當你在已有事件迴圈的環境中(如 Jupyter Notebook、某些 Web 框架),直接呼叫 asyncio.run 會失敗:

1# This will raise RuntimeError in Jupyter or when loop is running
2asyncio.run(some_coroutine())
3# RuntimeError: asyncio.run() cannot be called from a running event loop
解決方案 A:使用 nest_asyncio(快速修復)
 1# pip install nest-asyncio
 2import nest_asyncio
 3
 4def run_async_safe(coro: Coroutine[Any, Any, T]) -> T:
 5    """
 6    Run async code safely, even from a running event loop.
 7
 8    Uses nest_asyncio to allow nested event loops.
 9    This is a pragmatic solution for environments like Jupyter.
10
11    Note:
12        nest_asyncio patches the event loop globally.
13        Use with caution in production code.
14    """
15    try:
16        loop = asyncio.get_running_loop()
17    except RuntimeError:
18        # No running loop - safe to use asyncio.run
19        return asyncio.run(coro)
20
21    # Running loop exists - need to nest
22    nest_asyncio.apply()
23    return loop.run_until_complete(coro)
解決方案 B:偵測執行環境(推薦)
 1import asyncio
 2from typing import TypeVar, Coroutine, Any
 3
 4T = TypeVar('T')
 5
 6def is_event_loop_running() -> bool:
 7    """Check if there's a running event loop"""
 8    try:
 9        asyncio.get_running_loop()
10        return True
11    except RuntimeError:
12        return False
13
14def run_async_adaptive(coro: Coroutine[Any, Any, T]) -> T:
15    """
16    Run async code with automatic environment detection.
17
18    - If no event loop: uses asyncio.run()
19    - If loop running: uses run_in_executor to run in a new thread
20
21    This is safer than nest_asyncio for production use.
22    """
23    if is_event_loop_running():
24        # We're in an async context - run in a new thread
25        import concurrent.futures
26        with concurrent.futures.ThreadPoolExecutor() as executor:
27            future = executor.submit(asyncio.run, coro)
28            return future.result()
29    else:
30        # Safe to run directly
31        return asyncio.run(coro)
解決方案 C:提供雙重 API(最佳實踐)
 1class GitUtils:
 2    """
 3    Git utilities with both sync and async APIs.
 4
 5    Usage:
 6        # Synchronous (traditional)
 7        utils = GitUtils()
 8        branch = utils.get_current_branch()
 9
10        # Asynchronous
11        branch = await utils.async_get_current_branch()
12    """
13
14    def __init__(self, cwd: Optional[str] = None):
15        self.cwd = cwd
16
17    # ===== Synchronous API (original) =====
18
19    def get_current_branch(self) -> Optional[str]:
20        """Get current branch (sync)"""
21        success, output = run_git_command(
22            ["branch", "--show-current"],
23            cwd=self.cwd
24        )
25        return output if success else None
26
27    def get_worktree_list(self) -> list[dict]:
28        """Get worktree list (sync)"""
29        # ... original implementation
30        pass
31
32    # ===== Asynchronous API =====
33
34    async def async_get_current_branch(self) -> Optional[str]:
35        """Get current branch (async)"""
36        return await run_sync_in_executor(self.get_current_branch)
37
38    async def async_get_worktree_list(self) -> list[dict]:
39        """Get worktree list (async)"""
40        return await run_sync_in_executor(self.get_worktree_list)

步驟 4:建立統一的 API

整合以上模式,建立一個統一的適配器:

  1"""
  2Sync/Async Bridge Adapter
  3
  4Provides unified access to .claude/lib modules from both
  5synchronous and asynchronous contexts.
  6"""
  7
  8import asyncio
  9import functools
 10from concurrent.futures import ThreadPoolExecutor
 11from typing import TypeVar, Callable, Any, Optional, Coroutine
 12
 13T = TypeVar('T')
 14
 15# Shared executor for I/O operations
 16_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="lib_async")
 17
 18class AsyncAdapter:
 19    """
 20    Adapter for converting sync functions to async.
 21
 22    Provides both decorator and wrapper patterns for flexibility.
 23
 24    Example:
 25        adapter = AsyncAdapter()
 26
 27        # As decorator
 28        @adapter.make_async
 29        def sync_function(x):
 30            return x * 2
 31
 32        result = await sync_function(5)
 33
 34        # As wrapper
 35        result = await adapter.run(other_sync_function, arg1, arg2)
 36    """
 37
 38    def __init__(self, executor: Optional[ThreadPoolExecutor] = None):
 39        self.executor = executor or _executor
 40
 41    async def run(
 42        self,
 43        func: Callable[..., T],
 44        *args: Any,
 45        **kwargs: Any
 46    ) -> T:
 47        """Run a sync function asynchronously"""
 48        loop = asyncio.get_running_loop()
 49
 50        if kwargs:
 51            func = functools.partial(func, **kwargs)
 52
 53        return await loop.run_in_executor(self.executor, func, *args)
 54
 55    def make_async(self, func: Callable[..., T]) -> Callable[..., Coroutine[Any, Any, T]]:
 56        """
 57        Decorator to create async version of sync function.
 58
 59        Example:
 60            @adapter.make_async
 61            def slow_io_operation(path: str) -> str:
 62                with open(path) as f:
 63                    return f.read()
 64
 65            # Now can be awaited
 66            content = await slow_io_operation("file.txt")
 67        """
 68        @functools.wraps(func)
 69        async def wrapper(*args: Any, **kwargs: Any) -> T:
 70            return await self.run(func, *args, **kwargs)
 71        return wrapper
 72
 73class SyncAdapter:
 74    """
 75    Adapter for calling async functions from sync code.
 76
 77    Example:
 78        adapter = SyncAdapter()
 79
 80        # Run async function synchronously
 81        result = adapter.run(async_function(arg1, arg2))
 82    """
 83
 84    @staticmethod
 85    def run(coro: Coroutine[Any, Any, T]) -> T:
 86        """
 87        Run a coroutine from synchronous code.
 88
 89        Automatically handles the case when an event loop
 90        is already running.
 91        """
 92        try:
 93            asyncio.get_running_loop()
 94            # Loop is running - use thread
 95            import concurrent.futures
 96            with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
 97                future = ex.submit(asyncio.run, coro)
 98                return future.result()
 99        except RuntimeError:
100            # No running loop - safe to use asyncio.run
101            return asyncio.run(coro)
102
103    @staticmethod
104    def make_sync(
105        async_func: Callable[..., Coroutine[Any, Any, T]]
106    ) -> Callable[..., T]:
107        """
108        Decorator to create sync version of async function.
109
110        Example:
111            @SyncAdapter.make_sync
112            async def async_fetch(url: str) -> str:
113                async with aiohttp.get(url) as resp:
114                    return await resp.text()
115
116            # Now can be called synchronously
117            content = async_fetch("https://example.com")
118        """
119        @functools.wraps(async_func)
120        def wrapper(*args: Any, **kwargs: Any) -> T:
121            coro = async_func(*args, **kwargs)
122            return SyncAdapter.run(coro)
123        return wrapper

完整程式碼

  1#!/usr/bin/env python3
  2"""
  3Sync/Async Bridge for .claude/lib
  4
  5This module provides async wrappers for the synchronous
  6.claude/lib modules, enabling their use in async contexts
  7like FastAPI without blocking the event loop.
  8
  9Usage:
 10    # In async code (FastAPI, aiohttp, etc.)
 11    from lib_async import (
 12        async_get_current_branch,
 13        async_load_config,
 14        async_validate_hooks,
 15    )
 16
 17    branch = await async_get_current_branch()
 18
 19    # In sync code that needs parallelization
 20    from lib_async import parallel_validate_hooks
 21
 22    results = parallel_validate_hooks(hooks_dir)
 23"""
 24
 25import asyncio
 26import functools
 27from concurrent.futures import ThreadPoolExecutor
 28from pathlib import Path
 29from typing import TypeVar, Callable, Any, Optional, Coroutine
 30
 31# Import original sync modules
 32from lib.git_utils import (
 33    run_git_command,
 34    get_current_branch,
 35    get_project_root,
 36    get_worktree_list,
 37    is_protected_branch,
 38    is_allowed_branch,
 39)
 40from lib.config_loader import (
 41    load_config,
 42    load_agents_config,
 43    load_quality_rules,
 44)
 45from lib.hook_io import (
 46    read_hook_input,
 47    write_hook_output,
 48)
 49from lib.hook_validator import (
 50    validate_hook,
 51    validate_all_hooks,
 52    ValidationResult,
 53)
 54from lib.markdown_link_checker import (
 55    check_markdown_links,
 56    check_directory as check_markdown_directory,
 57    LinkCheckResult,
 58)
 59
 60T = TypeVar('T')
 61
 62# ===== Shared Resources =====
 63
 64# Thread pool for I/O-bound sync operations
 65_io_executor = ThreadPoolExecutor(
 66    max_workers=10,
 67    thread_name_prefix="lib_async_io"
 68)
 69
 70# ===== Core Utilities =====
 71
 72async def run_in_executor(
 73    func: Callable[..., T],
 74    *args: Any,
 75    **kwargs: Any
 76) -> T:
 77    """
 78    Run a synchronous function in the thread pool executor.
 79
 80    Args:
 81        func: Synchronous function to execute
 82        *args: Positional arguments
 83        **kwargs: Keyword arguments
 84
 85    Returns:
 86        Function result
 87    """
 88    loop = asyncio.get_running_loop()
 89
 90    if kwargs:
 91        func = functools.partial(func, **kwargs)
 92
 93    return await loop.run_in_executor(_io_executor, func, *args)
 94
 95def make_async(func: Callable[..., T]) -> Callable[..., Coroutine[Any, Any, T]]:
 96    """
 97    Decorator to create async version of a sync function.
 98
 99    Example:
100        @make_async
101        def slow_operation(x):
102            time.sleep(1)
103            return x * 2
104
105        result = await slow_operation(5)
106    """
107    @functools.wraps(func)
108    async def wrapper(*args: Any, **kwargs: Any) -> T:
109        return await run_in_executor(func, *args, **kwargs)
110    return wrapper
111
112# ===== Async Git Utils =====
113
114async def async_run_git_command(
115    args: list[str],
116    cwd: Optional[str] = None,
117    timeout: int = 10
118) -> tuple[bool, str]:
119    """Async version of run_git_command"""
120    return await run_in_executor(run_git_command, args, cwd=cwd, timeout=timeout)
121
122async def async_get_current_branch() -> Optional[str]:
123    """Async version of get_current_branch"""
124    return await run_in_executor(get_current_branch)
125
126async def async_get_project_root() -> str:
127    """Async version of get_project_root"""
128    return await run_in_executor(get_project_root)
129
130async def async_get_worktree_list() -> list[dict]:
131    """Async version of get_worktree_list"""
132    return await run_in_executor(get_worktree_list)
133
134# ===== Async Config Loader =====
135
136async def async_load_config(config_name: str) -> dict:
137    """Async version of load_config"""
138    return await run_in_executor(load_config, config_name)
139
140async def async_load_agents_config() -> dict:
141    """Async version of load_agents_config"""
142    return await run_in_executor(load_agents_config)
143
144async def async_load_quality_rules() -> dict:
145    """Async version of load_quality_rules"""
146    return await run_in_executor(load_quality_rules)
147
148# ===== Async Hook Validator =====
149
150async def async_validate_hook(hook_path: str) -> ValidationResult:
151    """Async version of validate_hook"""
152    return await run_in_executor(validate_hook, hook_path)
153
154async def async_validate_all_hooks(
155    hooks_dir: Optional[str] = None
156) -> list[ValidationResult]:
157    """
158    Validate all hooks in parallel.
159
160    Unlike the sync version that validates sequentially,
161    this version validates all hooks concurrently.
162    """
163    if hooks_dir is None:
164        hooks_dir = str(Path(await async_get_project_root()) / ".claude" / "hooks")
165
166    hooks_path = Path(hooks_dir)
167    if not hooks_path.is_dir():
168        return []
169
170    # Collect all hook files
171    hook_files = [
172        str(f) for f in sorted(hooks_path.glob("*.py"))
173        if not f.name.startswith("_")
174    ]
175
176    # Validate all in parallel
177    tasks = [async_validate_hook(hook) for hook in hook_files]
178    return await asyncio.gather(*tasks)
179
180# ===== Async Markdown Link Checker =====
181
182async def async_check_markdown_links(file_path: str) -> LinkCheckResult:
183    """Async version of check_markdown_links"""
184    return await run_in_executor(check_markdown_links, file_path)
185
186async def async_check_markdown_directory(
187    dir_path: str,
188    recursive: bool = True
189) -> list[LinkCheckResult]:
190    """
191    Check all markdown files in directory in parallel.
192    """
193    # Get file list synchronously (fast operation)
194    dir_path_obj = Path(dir_path)
195    if not dir_path_obj.is_dir():
196        return []
197
198    if recursive:
199        md_files = list(dir_path_obj.rglob("*.md"))
200    else:
201        md_files = list(dir_path_obj.glob("*.md"))
202
203    # Check all files in parallel
204    tasks = [async_check_markdown_links(str(f)) for f in md_files]
205    return await asyncio.gather(*tasks)
206
207# ===== Parallel Operations =====
208
209async def async_check_all_worktrees() -> dict[str, dict]:
210    """
211    Check status of all worktrees in parallel.
212
213    Returns:
214        dict: {path: {"branch": str, "status": str, "is_clean": bool}}
215    """
216    worktrees = await async_get_worktree_list()
217
218    async def check_one(wt: dict) -> tuple[str, dict]:
219        path = wt["path"]
220
221        # Run git status and branch in parallel for each worktree
222        status_task = async_run_git_command(["status", "-s"], cwd=path)
223        branch_task = async_run_git_command(["branch", "--show-current"], cwd=path)
224
225        (status_ok, status), (branch_ok, branch) = await asyncio.gather(
226            status_task, branch_task
227        )
228
229        return path, {
230            "branch": branch if branch_ok else wt.get("branch", "unknown"),
231            "status": status if status_ok else "error",
232            "is_clean": status_ok and not status,
233        }
234
235    tasks = [check_one(wt) for wt in worktrees]
236    results = await asyncio.gather(*tasks)
237
238    return dict(results)
239
240async def async_project_health_check() -> dict:
241    """
242    Comprehensive project health check with parallel execution.
243
244    Checks:
245    - Git status
246    - Configuration validity
247    - Hook compliance
248    - Documentation links
249
250    Returns:
251        dict: Health check results
252    """
253    # Run all checks in parallel
254    git_task = async_get_current_branch()
255    worktrees_task = async_check_all_worktrees()
256    config_task = async_load_agents_config()
257    hooks_task = async_validate_all_hooks()
258
259    branch, worktrees, config, hook_results = await asyncio.gather(
260        git_task, worktrees_task, config_task, hooks_task,
261        return_exceptions=True  # Don't fail if one check fails
262    )
263
264    return {
265        "git": {
266            "branch": branch if not isinstance(branch, Exception) else str(branch),
267            "worktrees": worktrees if not isinstance(worktrees, Exception) else {},
268        },
269        "config": {
270            "loaded": not isinstance(config, Exception),
271            "agents_count": len(config.get("known_agents", [])) if not isinstance(config, Exception) else 0,
272        },
273        "hooks": {
274            "total": len(hook_results) if not isinstance(hook_results, Exception) else 0,
275            "compliant": sum(1 for r in hook_results if r.is_compliant) if not isinstance(hook_results, Exception) else 0,
276        }
277    }
278
279# ===== Sync Wrappers for Async Functions =====
280
281def parallel_validate_hooks(hooks_dir: Optional[str] = None) -> list[ValidationResult]:
282    """
283    Synchronous API that uses async parallelization internally.
284
285    Use this when you want parallelization but are in sync context.
286    """
287    return asyncio.run(async_validate_all_hooks(hooks_dir))
288
289def parallel_check_worktrees() -> dict[str, dict]:
290    """
291    Synchronous API for parallel worktree checking.
292    """
293    return asyncio.run(async_check_all_worktrees())
294
295def project_health_check() -> dict:
296    """
297    Synchronous API for comprehensive health check.
298    """
299    return asyncio.run(async_project_health_check())
300
301# ===== Demo =====
302
303async def demo():
304    """Demonstrate the sync/async bridge capabilities"""
305    import time
306
307    print("=" * 60)
308    print("Sync/Async Bridge Demo")
309    print("=" * 60)
310
311    # 1. Basic async wrapper usage
312    print("\n1. Basic async wrapper:")
313    branch = await async_get_current_branch()
314    print(f"   Current branch: {branch}")
315
316    # 2. Parallel execution comparison
317    print("\n2. Parallel vs Sequential comparison:")
318
319    # Sequential (simulated)
320    start = time.perf_counter()
321    worktrees = await async_get_worktree_list()
322    seq_time = 0
323    for wt in worktrees:
324        _ = await async_run_git_command(["status", "-s"], cwd=wt["path"])
325        seq_time += time.perf_counter() - start
326        start = time.perf_counter()
327    print(f"   Sequential check: {seq_time:.3f}s")
328
329    # Parallel
330    start = time.perf_counter()
331    results = await async_check_all_worktrees()
332    par_time = time.perf_counter() - start
333    print(f"   Parallel check: {par_time:.3f}s")
334    print(f"   Speedup: {seq_time/par_time:.1f}x")
335
336    # 3. Project health check
337    print("\n3. Project health check (parallel):")
338    start = time.perf_counter()
339    health = await async_project_health_check()
340    elapsed = time.perf_counter() - start
341
342    print(f"   Branch: {health['git']['branch']}")
343    print(f"   Worktrees: {len(health['git']['worktrees'])}")
344    print(f"   Hooks compliant: {health['hooks']['compliant']}/{health['hooks']['total']}")
345    print(f"   Time: {elapsed:.3f}s")
346
347    print("\n" + "=" * 60)
348
349if __name__ == "__main__":
350    asyncio.run(demo())

使用範例

在 FastAPI 中使用同步函式

 1from fastapi import FastAPI, HTTPException
 2from lib_async import (
 3    async_get_current_branch,
 4    async_get_worktree_list,
 5    async_check_all_worktrees,
 6    async_validate_all_hooks,
 7    async_project_health_check,
 8)
 9
10app = FastAPI()
11
12@app.get("/git/branch")
13async def get_branch():
14    """
15    Get current git branch.
16
17    Uses async wrapper to prevent blocking the event loop.
18    """
19    branch = await async_get_current_branch()
20    if branch is None:
21        raise HTTPException(status_code=500, detail="Not a git repository")
22    return {"branch": branch}
23
24@app.get("/git/worktrees")
25async def get_worktrees():
26    """
27    Get all worktrees with their status.
28
29    Checks all worktrees in parallel for fast response.
30    """
31    worktrees = await async_check_all_worktrees()
32    return {"worktrees": worktrees}
33
34@app.get("/health")
35async def health_check():
36    """
37    Comprehensive project health check.
38
39    Runs multiple checks in parallel:
40    - Git status
41    - Configuration
42    - Hook compliance
43    """
44    health = await async_project_health_check()
45    return health
46
47@app.get("/hooks/validate")
48async def validate_hooks():
49    """
50    Validate all hooks in parallel.
51
52    Much faster than sequential validation for many hooks.
53    """
54    results = await async_validate_all_hooks()
55
56    return {
57        "total": len(results),
58        "compliant": sum(1 for r in results if r.is_compliant),
59        "issues": [
60            {
61                "hook": r.hook_path,
62                "issues": [
63                    {"level": i.level, "message": i.message}
64                    for i in r.issues
65                ]
66            }
67            for r in results
68            if not r.is_compliant
69        ]
70    }

在同步腳本中使用非同步函式

 1#!/usr/bin/env python3
 2"""
 3Synchronous script that leverages async parallelization internally.
 4"""
 5
 6from lib_async import (
 7    parallel_validate_hooks,
 8    parallel_check_worktrees,
 9    project_health_check,
10)
11
12def main():
13    print("Project Health Report")
14    print("=" * 50)
15
16    # These functions use asyncio internally for parallelization
17    # but provide a synchronous API
18
19    # 1. Check all worktrees in parallel
20    print("\n1. Worktree Status:")
21    worktrees = parallel_check_worktrees()
22    for path, info in worktrees.items():
23        status = "clean" if info["is_clean"] else "dirty"
24        print(f"   [{info['branch']}] {path}: {status}")
25
26    # 2. Validate all hooks in parallel
27    print("\n2. Hook Validation:")
28    results = parallel_validate_hooks()
29    compliant = sum(1 for r in results if r.is_compliant)
30    print(f"   Compliant: {compliant}/{len(results)}")
31
32    for result in results:
33        if not result.is_compliant:
34            print(f"   - {result.hook_path}:")
35            for issue in result.issues:
36                print(f"     [{issue.level}] {issue.message}")
37
38    # 3. Comprehensive health check
39    print("\n3. Overall Health:")
40    health = project_health_check()
41    print(f"   Git branch: {health['git']['branch']}")
42    print(f"   Config loaded: {health['config']['loaded']}")
43    print(f"   Hooks OK: {health['hooks']['compliant']}/{health['hooks']['total']}")
44
45if __name__ == "__main__":
46    main()

Python 3.9+ 使用 asyncio.to_thread

Python 3.9 引入了 asyncio.to_thread,提供更簡潔的語法:

 1import asyncio
 2from lib.git_utils import get_current_branch, run_git_command
 3from lib.config_loader import load_config
 4
 5# Python 3.9+ simplified syntax
 6async def async_get_current_branch_39() -> Optional[str]:
 7    """Using asyncio.to_thread (Python 3.9+)"""
 8    return await asyncio.to_thread(get_current_branch)
 9
10async def async_load_config_39(config_name: str) -> dict:
11    """Using asyncio.to_thread (Python 3.9+)"""
12    return await asyncio.to_thread(load_config, config_name)
13
14async def async_run_git_command_39(
15    args: list[str],
16    cwd: Optional[str] = None
17) -> tuple[bool, str]:
18    """Using asyncio.to_thread with keyword arguments"""
19    # to_thread supports kwargs directly in Python 3.9+
20    return await asyncio.to_thread(run_git_command, args, cwd)
21
22# Comparison: run_in_executor vs to_thread
23async def comparison_demo():
24    """
25    asyncio.to_thread vs run_in_executor
26
27    to_thread advantages:
28    - Simpler syntax
29    - Better default executor management
30    - Direct kwargs support
31
32    run_in_executor advantages:
33    - Works on Python 3.7+
34    - Can use custom executors
35    - More control over thread pool
36    """
37    # run_in_executor (Python 3.7+)
38    loop = asyncio.get_running_loop()
39    result1 = await loop.run_in_executor(None, get_current_branch)
40
41    # to_thread (Python 3.9+)
42    result2 = await asyncio.to_thread(get_current_branch)
43
44    assert result1 == result2

設計權衡

面向run_in_executorasyncio.runasyncio.to_thread
方向同步 -> 非同步非同步 -> 同步同步 -> 非同步
執行緒使用執行緒池建立新事件迴圈使用預設執行緒池
適用場景非同步環境呼叫同步 I/O同步入口點執行非同步非同步環境呼叫同步 I/O
限制需要事件迴圈存在不能巢狀呼叫需要 Python 3.9+
效能高(可重用執行緒)中(建立新迴圈開銷)高(優化的執行緒池)
複雜度

ThreadPoolExecutor vs ProcessPoolExecutor

面向ThreadPoolExecutorProcessPoolExecutor
適用場景I/O 密集操作CPU 密集操作
記憶體共享記憶體獨立記憶體空間
GIL受 GIL 限制繞過 GIL
啟動成本高(進程建立)
資料傳遞直接傳遞需要序列化
 1from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 2
 3# I/O-bound: use ThreadPoolExecutor
 4io_executor = ThreadPoolExecutor(max_workers=10)
 5
 6# CPU-bound: use ProcessPoolExecutor
 7cpu_executor = ProcessPoolExecutor(max_workers=4)
 8
 9async def io_bound_task():
10    """File I/O, network calls, subprocess"""
11    loop = asyncio.get_running_loop()
12    return await loop.run_in_executor(io_executor, sync_io_function)
13
14async def cpu_bound_task():
15    """Heavy computation"""
16    loop = asyncio.get_running_loop()
17    return await loop.run_in_executor(cpu_executor, sync_cpu_function)

什麼時候該用這個技術?

適合使用

  • 漸進式遷移到 asyncio:有大量同步程式碼,需要逐步遷移
  • 在 FastAPI 中使用同步第三方庫:如 requestsboto3、資料庫驅動
  • 提供同步/非同步雙 API:讓使用者選擇適合的模式
  • 並行化現有同步操作:如批次檔案處理、多 API 呼叫
  • 整合傳統程式碼:舊系統的同步函式需要在新非同步系統中使用

不建議使用

  • 全新專案:直接用原生 asyncio 設計
  • CPU 密集操作:應用 multiprocessingProcessPoolExecutor
  • 簡單的單一操作:不需要並行的情況
  • 效能極度敏感:執行緒池有微小開銷
  • 已有原生非同步替代方案:如用 aiohttp 替代 requests

練習

基礎練習

  1. 用 run_in_executor 包裝 requests.get
 1import requests
 2import asyncio
 3
 4# TODO: Implement this function
 5async def async_fetch(url: str) -> str:
 6    """
 7    Fetch URL content asynchronously using requests library.
 8
 9    Hint: Use run_in_executor to wrap requests.get
10    """
11    pass
12
13# Test your implementation
14async def test():
15    content = await async_fetch("https://httpbin.org/get")
16    print(content[:200])
17
18asyncio.run(test())
參考解答
 1import requests
 2import asyncio
 3
 4async def async_fetch(url: str) -> str:
 5    """Fetch URL content asynchronously using requests"""
 6    loop = asyncio.get_running_loop()
 7
 8    def fetch():
 9        response = requests.get(url, timeout=10)
10        response.raise_for_status()
11        return response.text
12
13    return await loop.run_in_executor(None, fetch)
14
15# Or using to_thread (Python 3.9+)
16async def async_fetch_39(url: str) -> str:
17    def fetch():
18        response = requests.get(url, timeout=10)
19        response.raise_for_status()
20        return response.text
21
22    return await asyncio.to_thread(fetch)

進階練習

  1. 建立支援同步/非同步雙模式的 API 客戶端
 1# TODO: Implement a dual-mode API client
 2class WeatherClient:
 3    """
 4    Weather API client supporting both sync and async modes.
 5
 6    Usage:
 7        client = WeatherClient(api_key="xxx")
 8
 9        # Sync mode
10        weather = client.get_weather("Tokyo")
11
12        # Async mode
13        weather = await client.async_get_weather("Tokyo")
14    """
15
16    def __init__(self, api_key: str):
17        self.api_key = api_key
18        self.base_url = "https://api.weatherapi.com/v1"
19
20    def get_weather(self, city: str) -> dict:
21        """Synchronous weather fetch"""
22        # TODO: Implement
23        pass
24
25    async def async_get_weather(self, city: str) -> dict:
26        """Asynchronous weather fetch"""
27        # TODO: Implement
28        pass
29
30    async def async_get_multiple(self, cities: list[str]) -> dict[str, dict]:
31        """Fetch weather for multiple cities in parallel"""
32        # TODO: Implement
33        pass
參考解答
 1import requests
 2import asyncio
 3from typing import Optional
 4
 5class WeatherClient:
 6    def __init__(self, api_key: str):
 7        self.api_key = api_key
 8        self.base_url = "https://api.weatherapi.com/v1"
 9
10    def get_weather(self, city: str) -> dict:
11        """Synchronous weather fetch"""
12        url = f"{self.base_url}/current.json"
13        response = requests.get(
14            url,
15            params={"key": self.api_key, "q": city},
16            timeout=10
17        )
18        response.raise_for_status()
19        return response.json()
20
21    async def async_get_weather(self, city: str) -> dict:
22        """Asynchronous weather fetch using run_in_executor"""
23        loop = asyncio.get_running_loop()
24        return await loop.run_in_executor(None, self.get_weather, city)
25
26    async def async_get_multiple(self, cities: list[str]) -> dict[str, dict]:
27        """Fetch weather for multiple cities in parallel"""
28        tasks = [self.async_get_weather(city) for city in cities]
29        results = await asyncio.gather(*tasks, return_exceptions=True)
30
31        return {
32            city: result if not isinstance(result, Exception) else {"error": str(result)}
33            for city, result in zip(cities, results)
34        }

挑戰題

  1. 實作自動偵測執行環境的適配器
 1# TODO: Implement an adaptive function caller
 2class AdaptiveCaller:
 3    """
 4    Automatically detects the execution context and calls
 5    functions appropriately.
 6
 7    - In async context: awaits coroutines, wraps sync functions
 8    - In sync context: runs async functions, calls sync directly
 9
10    Usage:
11        caller = AdaptiveCaller()
12
13        # Works in both sync and async contexts!
14        result = caller.call(some_function, arg1, arg2)
15    """
16
17    def call(self, func, *args, **kwargs):
18        """
19        Call a function adaptively based on context.
20
21        - If func is async and we're in sync: run with asyncio.run
22        - If func is sync and we're in async: use run_in_executor
23        - Otherwise: call directly
24        """
25        # TODO: Implement
26        pass
參考解答
 1import asyncio
 2import inspect
 3from typing import Any, Callable
 4from concurrent.futures import ThreadPoolExecutor
 5
 6class AdaptiveCaller:
 7    def __init__(self):
 8        self._executor = ThreadPoolExecutor(max_workers=10)
 9
10    def _is_async_context(self) -> bool:
11        """Check if we're in an async context"""
12        try:
13            asyncio.get_running_loop()
14            return True
15        except RuntimeError:
16            return False
17
18    def call(self, func: Callable, *args, **kwargs) -> Any:
19        """Call function adaptively based on context"""
20        is_async_func = asyncio.iscoroutinefunction(func)
21        in_async_context = self._is_async_context()
22
23        if is_async_func:
24            if in_async_context:
25                # Return coroutine to be awaited
26                return func(*args, **kwargs)
27            else:
28                # Run async function in new event loop
29                return asyncio.run(func(*args, **kwargs))
30        else:
31            if in_async_context:
32                # Wrap sync function for async context
33                return self._wrap_sync(func, *args, **kwargs)
34            else:
35                # Call sync function directly
36                return func(*args, **kwargs)
37
38    async def _wrap_sync(self, func: Callable, *args, **kwargs) -> Any:
39        """Wrap sync function for async execution"""
40        import functools
41        loop = asyncio.get_running_loop()
42
43        if kwargs:
44            func = functools.partial(func, **kwargs)
45            return await loop.run_in_executor(self._executor, func, *args)
46
47        return await loop.run_in_executor(self._executor, func, *args)
48
49    async def call_async(self, func: Callable, *args, **kwargs) -> Any:
50        """Explicitly async version of call"""
51        is_async_func = asyncio.iscoroutinefunction(func)
52
53        if is_async_func:
54            return await func(*args, **kwargs)
55        else:
56            return await self._wrap_sync(func, *args, **kwargs)
57
58# Usage example
59caller = AdaptiveCaller()
60
61def sync_func(x):
62    return x * 2
63
64async def async_func(x):
65    await asyncio.sleep(0.1)
66    return x * 3
67
68# In sync context
69result1 = caller.call(sync_func, 5)      # Direct call
70result2 = caller.call(async_func, 5)     # Uses asyncio.run
71
72# In async context
73async def demo():
74    result3 = await caller.call_async(sync_func, 5)   # Uses executor
75    result4 = await caller.call_async(async_func, 5)  # Direct await

延伸閱讀


上一章:並行 I/O 操作 返回:模組一:非同步程式設計