1.4 實戰:與同步程式碼整合
1.4 實戰:與同步程式碼整合
本章討論如何在現有專案中引入 asyncio,以及同步與異步程式碼的整合策略。
先備知識
本章目標
學完本章後,你將能夠:
- 在異步程式中呼叫同步函式
- 在同步程式中呼叫異步函式
- 制定漸進式遷移策略
- 與常見框架整合
【原理層】混合程式設計的挑戰
兩個世界的衝突
同步和異步程式碼有根本性的差異:
1# 同步世界
2def sync_fetch(url):
3 response = requests.get(url) # 阻塞等待
4 return response.text
5
6# 異步世界
7async def async_fetch(url):
8 async with aiohttp.ClientSession() as session:
9 async with session.get(url) as response:
10 return await response.text() # 非阻塞等待問題在於:
- 在異步函式中呼叫同步函式會阻塞事件迴圈
- 在同步函式中無法直接
await異步函式
run_in_executor:橋樑
run_in_executor 在執行緒池中執行同步函式:
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3
4async def main():
5 loop = asyncio.get_running_loop()
6
7 # 使用預設執行緒池
8 result = await loop.run_in_executor(None, sync_blocking_func)
9
10 # 使用自訂執行緒池
11 with ThreadPoolExecutor(max_workers=4) as pool:
12 result = await loop.run_in_executor(pool, sync_blocking_func)【設計層】整合策略
在異步程式中呼叫同步函式
1import asyncio
2import requests
3
4def sync_fetch(url):
5 return requests.get(url).text
6
7async def async_wrapper(url):
8 loop = asyncio.get_running_loop()
9 return await loop.run_in_executor(None, sync_fetch, url)
10
11async def main():
12 # 並發呼叫同步函式
13 urls = ["https://example.com"] * 5
14 tasks = [async_wrapper(url) for url in urls]
15 results = await asyncio.gather(*tasks)在同步程式中呼叫異步函式
1import asyncio
2
3async def async_fetch(url):
4 async with aiohttp.ClientSession() as session:
5 async with session.get(url) as response:
6 return await response.text()
7
8def sync_main():
9 # 方法 1:asyncio.run()
10 result = asyncio.run(async_fetch("https://example.com"))
11
12 # 方法 2:在已有事件迴圈中(例如 Jupyter)
13 loop = asyncio.get_event_loop()
14 result = loop.run_until_complete(async_fetch("https://example.com"))漸進式遷移策略
1階段 1:識別 I/O 瓶頸
2 └─→ profiling,找出最常等待的地方
3
4階段 2:引入異步版本
5 └─→ 新功能用異步,舊功能保持同步
6
7階段 3:包裝同步程式碼
8 └─→ 用 run_in_executor 包裝同步函式
9
10階段 4:逐步替換
11 └─→ 用異步函式庫替換同步函式庫
12
13階段 5:完全異步(可選)
14 └─→ 整個應用改為異步【實作層】框架整合
FastAPI / Starlette
FastAPI 原生支援異步:
1from fastapi import FastAPI
2import asyncio
3
4app = FastAPI()
5
6@app.get("/async")
7async def async_endpoint():
8 await asyncio.sleep(1)
9 return {"message": "異步完成"}
10
11@app.get("/sync")
12def sync_endpoint():
13 # FastAPI 會自動在執行緒池中執行
14 time.sleep(1)
15 return {"message": "同步完成"}aiohttp 客戶端
1import aiohttp
2import asyncio
3
4async def fetch_all(urls):
5 async with aiohttp.ClientSession() as session:
6 async def fetch(url):
7 async with session.get(url) as response:
8 return await response.json()
9
10 return await asyncio.gather(*[fetch(url) for url in urls])SQLAlchemy 2.0
1from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
2from sqlalchemy.orm import sessionmaker
3
4engine = create_async_engine("postgresql+asyncpg://...")
5async_session = sessionmaker(engine, class_=AsyncSession)
6
7async def get_user(user_id):
8 async with async_session() as session:
9 result = await session.execute(
10 select(User).where(User.id == user_id)
11 )
12 return result.scalar_one_or_none()【測試策略】
測試異步程式碼
1import pytest
2import asyncio
3
4# pytest-asyncio
5@pytest.mark.asyncio
6async def test_async_function():
7 result = await async_fetch("https://example.com")
8 assert result is not None
9
10# 使用 asyncio.run
11def test_with_asyncio_run():
12 result = asyncio.run(async_fetch("https://example.com"))
13 assert result is not NoneMock 異步函式
1from unittest.mock import AsyncMock, patch
2
3@pytest.mark.asyncio
4async def test_with_mock():
5 mock_fetch = AsyncMock(return_value="mocked data")
6
7 with patch("module.async_fetch", mock_fetch):
8 result = await process_data()
9 assert result == "processed: mocked data"思考題
- 為什麼 FastAPI 可以同時支援同步和異步端點?
run_in_executor使用執行緒池,會不會有 GIL 的問題?- 在什麼情況下不值得遷移到 asyncio?
實作練習
- 將一個使用 requests 的爬蟲改寫為使用 aiohttp
- 實作一個支援同步和異步呼叫的函式庫包裝器
- 為異步程式碼撰寫單元測試