從 threading 到 asyncio:轉換指南
從 threading 到 asyncio:轉換指南
如果你已經熟悉入門系列的 threading 模組,本章將幫助你理解為什麼需要 asyncio,以及如何將現有的多執行緒程式碼轉換為異步版本。
為什麼要從 threading 轉向 asyncio?
threading 的限制
threading 是處理並發的傳統方案,但它有幾個固有限制:
1. 資源消耗高
每個執行緒都需要分配記憶體(預設約 8MB stack):
1import threading
2
3# 建立 100 個執行緒
4threads = [threading.Thread(target=some_task) for _ in range(100)]
5# 記憶體消耗:約 800MB 的 stack 空間當需要處理數千個並發連線時,執行緒模型會面臨資源瓶頸。
2. 上下文切換成本
作業系統需要在執行緒之間切換,這涉及:
- 保存和恢復 CPU 暫存器
- 切換記憶體映射
- 快取失效
1執行緒 1 執行 → 上下文切換(耗時)→ 執行緒 2 執行 → 上下文切換(耗時)→ ...3. GIL 的限制
由於 GIL,多個執行緒無法真正並行執行 Python 程式碼:
1# 這段程式碼實際上是序列執行的
2def cpu_task():
3 total = 0
4 for i in range(1000000):
5 total += i
6 return total
7
8# 即使使用多執行緒,也無法利用多核 CPU
9threads = [threading.Thread(target=cpu_task) for _ in range(4)]asyncio 的優勢
asyncio 採用不同的並發模型來解決這些問題:
1. 輕量級協程
協程只是普通的 Python 物件,記憶體消耗極低:
1import asyncio
2
3# 建立 10000 個協程
4async def some_task():
5 await asyncio.sleep(1)
6
7# 記憶體消耗:幾十 KB
8tasks = [some_task() for _ in range(10000)]2. 協作式切換
協程只在 await 點切換,沒有強制的上下文切換:
1協程 1 執行 → await(主動讓出)→ 協程 2 執行 → await(主動讓出)→ ...3. 單執行緒高並發
asyncio 在單執行緒中處理所有任務,避免了執行緒同步問題:
1async def handle_request(client):
2 data = await client.read() # 等待時處理其他請求
3 result = process(data)
4 await client.write(result) # 等待時處理其他請求並發模型比較
| 特性 | threading | asyncio |
|---|---|---|
| 執行模型 | 多執行緒並行 | 單執行緒協作 |
| 切換方式 | 作業系統搶佔 | 程式主動讓出 |
| 記憶體消耗 | 高(每執行緒 ~8MB) | 低(每協程 ~KB) |
| 並發數量 | 百級 | 萬級 |
| GIL 影響 | 受限制 | 無影響(不需要多核) |
| 同步複雜度 | 需要鎖(Lock、Semaphore) | 較少(單執行緒) |
| 適用場景 | I/O 密集 + 共享記憶體 | I/O 密集 + 大規模並發 |
程式碼轉換模式
模式 1:簡單函式轉換
threading 版本:
1import time
2import threading
3
4def fetch_data(url):
5 time.sleep(1) # 模擬網路延遲
6 return f"Data from {url}"
7
8def main():
9 urls = ["url1", "url2", "url3"]
10 threads = []
11 results = []
12
13 for url in urls:
14 t = threading.Thread(target=lambda u=url: results.append(fetch_data(u)))
15 threads.append(t)
16 t.start()
17
18 for t in threads:
19 t.join()
20
21 return resultsasyncio 版本:
1import asyncio
2
3async def fetch_data(url):
4 await asyncio.sleep(1) # 非阻塞等待
5 return f"Data from {url}"
6
7async def main():
8 urls = ["url1", "url2", "url3"]
9
10 # 使用 gather 並發執行
11 results = await asyncio.gather(*[fetch_data(url) for url in urls])
12
13 return results
14
15# 執行
16asyncio.run(main())轉換要點:
| 原本 | 轉換後 |
|---|---|
def | async def |
time.sleep() | await asyncio.sleep() |
threading.Thread + join | asyncio.gather() |
模式 2:ThreadPoolExecutor 轉換
threading 版本:
1from concurrent.futures import ThreadPoolExecutor
2
3def process_file(filepath):
4 with open(filepath) as f:
5 return len(f.read())
6
7with ThreadPoolExecutor(max_workers=4) as executor:
8 results = list(executor.map(process_file, file_paths))asyncio 版本(使用 aiofiles):
1import asyncio
2import aiofiles
3
4async def process_file(filepath):
5 async with aiofiles.open(filepath) as f:
6 content = await f.read()
7 return len(content)
8
9async def main():
10 tasks = [process_file(fp) for fp in file_paths]
11 results = await asyncio.gather(*tasks)
12 return results
13
14asyncio.run(main())模式 3:保留同步程式碼(混合模式)
有時候你無法(或不想)將所有程式碼都轉為異步。asyncio 提供了 run_in_executor 來處理這種情況:
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3
4# 保持原有的同步函式
5def blocking_operation(data):
6 # 這是一個阻塞的第三方函式庫呼叫
7 import time
8 time.sleep(1)
9 return f"Processed: {data}"
10
11async def main():
12 loop = asyncio.get_event_loop()
13
14 # 在執行緒池中執行同步函式
15 with ThreadPoolExecutor() as pool:
16 result = await loop.run_in_executor(
17 pool,
18 blocking_operation,
19 "my_data"
20 )
21
22 return result
23
24asyncio.run(main())這種模式讓你可以漸進式地將程式碼遷移到 asyncio。
何時選擇哪種方案?
選擇 threading
- 需要與不支援 asyncio 的函式庫整合
- 需要共享記憶體且修改頻繁
- 並發數量較少(< 100)
- 團隊對 threading 更熟悉
選擇 asyncio
- 需要處理大量並發連線(Web 伺服器、聊天室)
- 主要是 I/O 操作(網路、檔案)
- 使用現代 async 函式庫(aiohttp、httpx、asyncpg)
- 需要高效能的單機並發
選擇 multiprocessing
- CPU 密集任務(資料處理、科學計算)
- 需要真正的並行計算
- 各任務相對獨立,不需要頻繁通訊
決策流程圖
1任務類型是什麼?
2 │
3 ├─ CPU 密集 ────────────────→ multiprocessing
4 │
5 └─ I/O 密集
6 │
7 ├─ 並發數 > 100 ─────────→ asyncio
8 │
9 ├─ 需要共享記憶體 ────────→ threading
10 │
11 └─ 第三方函式庫支援 async?
12 │
13 ├─ 是 ───────────────→ asyncio
14 │
15 └─ 否 ───────────────→ threading 或 asyncio + run_in_executor常見轉換陷阱
陷阱 1:忘記 await
1# 錯誤:忘記 await
2async def main():
3 result = fetch_data("url") # 這只會建立協程物件,不會執行
4 print(result) # <coroutine object fetch_data at 0x...>
5
6# 正確
7async def main():
8 result = await fetch_data("url")
9 print(result)陷阱 2:在異步函式中使用阻塞呼叫
1# 錯誤:使用阻塞的 time.sleep
2async def bad_sleep():
3 time.sleep(1) # 這會阻塞整個事件迴圈!
4
5# 正確:使用非阻塞的 asyncio.sleep
6async def good_sleep():
7 await asyncio.sleep(1) # 這會讓出控制權陷阱 3:在同步函式中呼叫異步函式
1# 錯誤:在普通函式中直接呼叫
2def sync_function():
3 result = await fetch_data("url") # SyntaxError!
4
5# 正確:使用 asyncio.run
6def sync_function():
7 result = asyncio.run(fetch_data("url"))實戰練習
練習 1:轉換簡單的多執行緒下載器
將以下 threading 程式碼轉換為 asyncio 版本:
1import threading
2import time
3
4def download(url):
5 print(f"Downloading {url}...")
6 time.sleep(2) # 模擬下載
7 print(f"Finished {url}")
8 return f"Content of {url}"
9
10def main():
11 urls = ["url1", "url2", "url3", "url4"]
12 threads = []
13
14 for url in urls:
15 t = threading.Thread(target=download, args=(url,))
16 threads.append(t)
17 t.start()
18
19 for t in threads:
20 t.join()
21
22if __name__ == "__main__":
23 main()練習 2:使用 run_in_executor 整合同步函式庫
假設你有一個只支援同步的第三方 API 客戶端,寫一個異步包裝器。
下一步
理解了 threading 和 asyncio 的區別後,你可以開始深入學習 asyncio 的核心概念:
- 1.1 基礎概念與事件迴圈 - 理解 asyncio 的運作原理
- 1.2 協程與 Task 管理 - 掌握 async/await 語法
- 1.4 實戰:與同步程式碼整合 - 學習混合模式的最佳實踐
上一章:入門系列 3.7 並行處理 下一章:1.1 基礎概念與事件迴圈