本章介紹 asyncio 的常見設計模式,以及如何避免常見陷阱。

先備知識

本章目標

學完本章後,你將能夠:

  1. 使用異步上下文管理器管理資源
  2. 實作異步迭代器處理串流資料
  3. 使用 Semaphore 控制並發
  4. 避免阻塞事件迴圈的陷阱

【原理層】異步協議

異步上下文管理器

同步版本使用 __enter____exit__,異步版本使用 __aenter____aexit__

 1class AsyncResource:
 2    async def __aenter__(self):
 3        print("獲取資源")
 4        await self.connect()
 5        return self
 6
 7    async def __aexit__(self, exc_type, exc_val, exc_tb):
 8        print("釋放資源")
 9        await self.disconnect()
10
11    async def connect(self):
12        await asyncio.sleep(0.1)
13
14    async def disconnect(self):
15        await asyncio.sleep(0.1)
16
17async def main():
18    async with AsyncResource() as resource:
19        print("使用資源")

異步迭代器

同步版本使用 __iter____next__,異步版本使用 __aiter____anext__

 1class AsyncCounter:
 2    def __init__(self, stop):
 3        self.current = 0
 4        self.stop = stop
 5
 6    def __aiter__(self):
 7        return self
 8
 9    async def __anext__(self):
10        if self.current >= self.stop:
11            raise StopAsyncIteration
12        await asyncio.sleep(0.1)  # 模擬異步操作
13        self.current += 1
14        return self.current
15
16async def main():
17    async for num in AsyncCounter(5):
18        print(num)

異步生成器

更簡潔的異步迭代器寫法:

1async def async_range(stop):
2    for i in range(stop):
3        await asyncio.sleep(0.1)
4        yield i
5
6async def main():
7    async for num in async_range(5):
8        print(num)

【設計層】常見模式

協程鏈(Chaining)

 1async def fetch(url):
 2    async with aiohttp.ClientSession() as session:
 3        async with session.get(url) as response:
 4            return await response.text()
 5
 6async def parse(html):
 7    # 假設這是 CPU 密集操作,應該放到執行緒池
 8    await asyncio.sleep(0)  # 讓出控制權
 9    return html[:100]
10
11async def process(data):
12    await asyncio.sleep(0.1)
13    return f"處理完成:{data}"
14
15async def pipeline(url):
16    html = await fetch(url)
17    parsed = await parse(html)
18    result = await process(parsed)
19    return result

Semaphore 並發控制

限制同時執行的任務數量:

 1async def fetch_with_limit(sem, url):
 2    async with sem:  # 獲取信號量
 3        return await fetch(url)
 4
 5async def main():
 6    sem = asyncio.Semaphore(10)  # 最多 10 個並發
 7    urls = [f"https://example.com/{i}" for i in range(100)]
 8
 9    tasks = [fetch_with_limit(sem, url) for url in urls]
10    results = await asyncio.gather(*tasks)

生產者-消費者

 1async def producer(queue):
 2    for i in range(10):
 3        await asyncio.sleep(0.1)
 4        await queue.put(i)
 5        print(f"生產:{i}")
 6    await queue.put(None)  # 結束信號
 7
 8async def consumer(queue):
 9    while True:
10        item = await queue.get()
11        if item is None:
12            break
13        await asyncio.sleep(0.2)
14        print(f"消費:{item}")
15
16async def main():
17    queue = asyncio.Queue(maxsize=5)
18    await asyncio.gather(
19        producer(queue),
20        consumer(queue)
21    )

【實作層】實用範例

異步 Rate Limiter

 1class RateLimiter:
 2    def __init__(self, rate, per):
 3        self.rate = rate
 4        self.per = per
 5        self.allowance = rate
 6        self.last_check = asyncio.get_event_loop().time()
 7
 8    async def acquire(self):
 9        current = asyncio.get_event_loop().time()
10        elapsed = current - self.last_check
11        self.last_check = current
12        self.allowance += elapsed * (self.rate / self.per)
13
14        if self.allowance > self.rate:
15            self.allowance = self.rate
16
17        if self.allowance < 1.0:
18            wait_time = (1.0 - self.allowance) * (self.per / self.rate)
19            await asyncio.sleep(wait_time)
20            self.allowance = 0
21        else:
22            self.allowance -= 1.0

重試模式

1async def retry(coro_func, max_retries=3, delay=1.0):
2    for attempt in range(max_retries):
3        try:
4            return await coro_func()
5        except Exception as e:
6            if attempt == max_retries - 1:
7                raise
8            print(f"重試 {attempt + 1}/{max_retries}")
9            await asyncio.sleep(delay * (attempt + 1))

【陷阱與避免】

阻塞事件迴圈

 1import time
 2
 3async def bad_example():
 4    time.sleep(1)  # 阻塞!整個事件迴圈停止
 5
 6async def good_example():
 7    await asyncio.sleep(1)  # 非阻塞
 8
 9# 如果必須執行同步阻塞函式
10async def run_blocking():
11    loop = asyncio.get_running_loop()
12    await loop.run_in_executor(None, time.sleep, 1)

資源洩漏

 1# 錯誤:沒有正確關閉
 2async def bad():
 3    session = aiohttp.ClientSession()
 4    response = await session.get("https://example.com")
 5    return await response.text()
 6
 7# 正確:使用 async with
 8async def good():
 9    async with aiohttp.ClientSession() as session:
10        async with session.get("https://example.com") as response:
11            return await response.text()

思考題

  1. 為什麼 Semaphore 在 asyncio 中不需要擔心執行緒安全?
  2. 異步生成器在什麼場景下比異步迭代器更適合?
  3. 如何檢測程式碼是否阻塞了事件迴圈?

實作練習

  1. 實作一個異步連線池
  2. 實作一個帶有指數退避的重試裝飾器
  3. 實作一個異步的 debounce 函式

延伸閱讀


上一章:協程與 Task 管理 下一章:實戰:與同步程式碼整合