1.3 設計模式與最佳實踐
1.3 設計模式與最佳實踐
本章介紹 asyncio 的常見設計模式,以及如何避免常見陷阱。
先備知識
本章目標
學完本章後,你將能夠:
- 使用異步上下文管理器管理資源
- 實作異步迭代器處理串流資料
- 使用 Semaphore 控制並發
- 避免阻塞事件迴圈的陷阱
【原理層】異步協議
異步上下文管理器
同步版本使用 __enter__ 和 __exit__,異步版本使用 __aenter__ 和 __aexit__:
1class AsyncResource:
2 async def __aenter__(self):
3 print("獲取資源")
4 await self.connect()
5 return self
6
7 async def __aexit__(self, exc_type, exc_val, exc_tb):
8 print("釋放資源")
9 await self.disconnect()
10
11 async def connect(self):
12 await asyncio.sleep(0.1)
13
14 async def disconnect(self):
15 await asyncio.sleep(0.1)
16
17async def main():
18 async with AsyncResource() as resource:
19 print("使用資源")異步迭代器
同步版本使用 __iter__ 和 __next__,異步版本使用 __aiter__ 和 __anext__:
1class AsyncCounter:
2 def __init__(self, stop):
3 self.current = 0
4 self.stop = stop
5
6 def __aiter__(self):
7 return self
8
9 async def __anext__(self):
10 if self.current >= self.stop:
11 raise StopAsyncIteration
12 await asyncio.sleep(0.1) # 模擬異步操作
13 self.current += 1
14 return self.current
15
16async def main():
17 async for num in AsyncCounter(5):
18 print(num)異步生成器
更簡潔的異步迭代器寫法:
1async def async_range(stop):
2 for i in range(stop):
3 await asyncio.sleep(0.1)
4 yield i
5
6async def main():
7 async for num in async_range(5):
8 print(num)【設計層】常見模式
協程鏈(Chaining)
1async def fetch(url):
2 async with aiohttp.ClientSession() as session:
3 async with session.get(url) as response:
4 return await response.text()
5
6async def parse(html):
7 # 假設這是 CPU 密集操作,應該放到執行緒池
8 await asyncio.sleep(0) # 讓出控制權
9 return html[:100]
10
11async def process(data):
12 await asyncio.sleep(0.1)
13 return f"處理完成:{data}"
14
15async def pipeline(url):
16 html = await fetch(url)
17 parsed = await parse(html)
18 result = await process(parsed)
19 return resultSemaphore 並發控制
限制同時執行的任務數量:
1async def fetch_with_limit(sem, url):
2 async with sem: # 獲取信號量
3 return await fetch(url)
4
5async def main():
6 sem = asyncio.Semaphore(10) # 最多 10 個並發
7 urls = [f"https://example.com/{i}" for i in range(100)]
8
9 tasks = [fetch_with_limit(sem, url) for url in urls]
10 results = await asyncio.gather(*tasks)生產者-消費者
1async def producer(queue):
2 for i in range(10):
3 await asyncio.sleep(0.1)
4 await queue.put(i)
5 print(f"生產:{i}")
6 await queue.put(None) # 結束信號
7
8async def consumer(queue):
9 while True:
10 item = await queue.get()
11 if item is None:
12 break
13 await asyncio.sleep(0.2)
14 print(f"消費:{item}")
15
16async def main():
17 queue = asyncio.Queue(maxsize=5)
18 await asyncio.gather(
19 producer(queue),
20 consumer(queue)
21 )【實作層】實用範例
異步 Rate Limiter
1class RateLimiter:
2 def __init__(self, rate, per):
3 self.rate = rate
4 self.per = per
5 self.allowance = rate
6 self.last_check = asyncio.get_event_loop().time()
7
8 async def acquire(self):
9 current = asyncio.get_event_loop().time()
10 elapsed = current - self.last_check
11 self.last_check = current
12 self.allowance += elapsed * (self.rate / self.per)
13
14 if self.allowance > self.rate:
15 self.allowance = self.rate
16
17 if self.allowance < 1.0:
18 wait_time = (1.0 - self.allowance) * (self.per / self.rate)
19 await asyncio.sleep(wait_time)
20 self.allowance = 0
21 else:
22 self.allowance -= 1.0重試模式
1async def retry(coro_func, max_retries=3, delay=1.0):
2 for attempt in range(max_retries):
3 try:
4 return await coro_func()
5 except Exception as e:
6 if attempt == max_retries - 1:
7 raise
8 print(f"重試 {attempt + 1}/{max_retries}")
9 await asyncio.sleep(delay * (attempt + 1))【陷阱與避免】
阻塞事件迴圈
1import time
2
3async def bad_example():
4 time.sleep(1) # 阻塞!整個事件迴圈停止
5
6async def good_example():
7 await asyncio.sleep(1) # 非阻塞
8
9# 如果必須執行同步阻塞函式
10async def run_blocking():
11 loop = asyncio.get_running_loop()
12 await loop.run_in_executor(None, time.sleep, 1)資源洩漏
1# 錯誤:沒有正確關閉
2async def bad():
3 session = aiohttp.ClientSession()
4 response = await session.get("https://example.com")
5 return await response.text()
6
7# 正確:使用 async with
8async def good():
9 async with aiohttp.ClientSession() as session:
10 async with session.get("https://example.com") as response:
11 return await response.text()思考題
- 為什麼 Semaphore 在 asyncio 中不需要擔心執行緒安全?
- 異步生成器在什麼場景下比異步迭代器更適合?
- 如何檢測程式碼是否阻塞了事件迴圈?
實作練習
- 實作一個異步連線池
- 實作一個帶有指數退避的重試裝飾器
- 實作一個異步的 debounce 函式
延伸閱讀
上一章:協程與 Task 管理 下一章:實戰:與同步程式碼整合
#python #python-advanced #asyncio #design-patterns #coroutine