本章討論如何在現有專案中引入 asyncio,以及同步與異步程式碼的整合策略。

先備知識

本章目標

學完本章後,你將能夠:

  1. 在異步程式中呼叫同步函式
  2. 在同步程式中呼叫異步函式
  3. 制定漸進式遷移策略
  4. 與常見框架整合

【原理層】混合程式設計的挑戰

兩個世界的衝突

同步和異步程式碼有根本性的差異:

 1# 同步世界
 2def sync_fetch(url):
 3    response = requests.get(url)  # 阻塞等待
 4    return response.text
 5
 6# 異步世界
 7async def async_fetch(url):
 8    async with aiohttp.ClientSession() as session:
 9        async with session.get(url) as response:
10            return await response.text()  # 非阻塞等待

問題在於:

  • 在異步函式中呼叫同步函式會阻塞事件迴圈
  • 在同步函式中無法直接 await 異步函式

run_in_executor:橋樑

run_in_executor 在執行緒池中執行同步函式:

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3
 4async def main():
 5    loop = asyncio.get_running_loop()
 6
 7    # 使用預設執行緒池
 8    result = await loop.run_in_executor(None, sync_blocking_func)
 9
10    # 使用自訂執行緒池
11    with ThreadPoolExecutor(max_workers=4) as pool:
12        result = await loop.run_in_executor(pool, sync_blocking_func)

【設計層】整合策略

在異步程式中呼叫同步函式

 1import asyncio
 2import requests
 3
 4def sync_fetch(url):
 5    return requests.get(url).text
 6
 7async def async_wrapper(url):
 8    loop = asyncio.get_running_loop()
 9    return await loop.run_in_executor(None, sync_fetch, url)
10
11async def main():
12    # 並發呼叫同步函式
13    urls = ["https://example.com"] * 5
14    tasks = [async_wrapper(url) for url in urls]
15    results = await asyncio.gather(*tasks)

在同步程式中呼叫異步函式

 1import asyncio
 2
 3async def async_fetch(url):
 4    async with aiohttp.ClientSession() as session:
 5        async with session.get(url) as response:
 6            return await response.text()
 7
 8def sync_main():
 9    # 方法 1:asyncio.run()
10    result = asyncio.run(async_fetch("https://example.com"))
11
12    # 方法 2:在已有事件迴圈中(例如 Jupyter)
13    loop = asyncio.get_event_loop()
14    result = loop.run_until_complete(async_fetch("https://example.com"))

漸進式遷移策略

 1階段 1:識別 I/O 瓶頸
 2    └─→ profiling,找出最常等待的地方
 3
 4階段 2:引入異步版本
 5    └─→ 新功能用異步,舊功能保持同步
 6
 7階段 3:包裝同步程式碼
 8    └─→ 用 run_in_executor 包裝同步函式
 9
10階段 4:逐步替換
11    └─→ 用異步函式庫替換同步函式庫
12
13階段 5:完全異步(可選)
14    └─→ 整個應用改為異步

【實作層】框架整合

FastAPI / Starlette

FastAPI 原生支援異步:

 1from fastapi import FastAPI
 2import asyncio
 3
 4app = FastAPI()
 5
 6@app.get("/async")
 7async def async_endpoint():
 8    await asyncio.sleep(1)
 9    return {"message": "異步完成"}
10
11@app.get("/sync")
12def sync_endpoint():
13    # FastAPI 會自動在執行緒池中執行
14    time.sleep(1)
15    return {"message": "同步完成"}

aiohttp 客戶端

 1import aiohttp
 2import asyncio
 3
 4async def fetch_all(urls):
 5    async with aiohttp.ClientSession() as session:
 6        async def fetch(url):
 7            async with session.get(url) as response:
 8                return await response.json()
 9
10        return await asyncio.gather(*[fetch(url) for url in urls])

SQLAlchemy 2.0

 1from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
 2from sqlalchemy.orm import sessionmaker
 3
 4engine = create_async_engine("postgresql+asyncpg://...")
 5async_session = sessionmaker(engine, class_=AsyncSession)
 6
 7async def get_user(user_id):
 8    async with async_session() as session:
 9        result = await session.execute(
10            select(User).where(User.id == user_id)
11        )
12        return result.scalar_one_or_none()

【測試策略】

測試異步程式碼

 1import pytest
 2import asyncio
 3
 4# pytest-asyncio
 5@pytest.mark.asyncio
 6async def test_async_function():
 7    result = await async_fetch("https://example.com")
 8    assert result is not None
 9
10# 使用 asyncio.run
11def test_with_asyncio_run():
12    result = asyncio.run(async_fetch("https://example.com"))
13    assert result is not None

Mock 異步函式

1from unittest.mock import AsyncMock, patch
2
3@pytest.mark.asyncio
4async def test_with_mock():
5    mock_fetch = AsyncMock(return_value="mocked data")
6
7    with patch("module.async_fetch", mock_fetch):
8        result = await process_data()
9        assert result == "processed: mocked data"

思考題

  1. 為什麼 FastAPI 可以同時支援同步和異步端點?
  2. run_in_executor 使用執行緒池,會不會有 GIL 的問題?
  3. 在什麼情況下不值得遷移到 asyncio?

實作練習

  1. 將一個使用 requests 的爬蟲改寫為使用 aiohttp
  2. 實作一個支援同步和異步呼叫的函式庫包裝器
  3. 為異步程式碼撰寫單元測試

延伸閱讀


上一章:設計模式與最佳實踐 下一模組:模組二:元編程