如果你已經熟悉入門系列的 threading 模組,本章將幫助你理解為什麼需要 asyncio,以及如何將現有的多執行緒程式碼轉換為異步版本。

為什麼要從 threading 轉向 asyncio?

threading 的限制

threading 是處理並發的傳統方案,但它有幾個固有限制:

1. 資源消耗高

每個執行緒都需要分配記憶體(預設約 8MB stack):

1import threading
2
3# 建立 100 個執行緒
4threads = [threading.Thread(target=some_task) for _ in range(100)]
5# 記憶體消耗:約 800MB 的 stack 空間

當需要處理數千個並發連線時,執行緒模型會面臨資源瓶頸。

2. 上下文切換成本

作業系統需要在執行緒之間切換,這涉及:

  • 保存和恢復 CPU 暫存器
  • 切換記憶體映射
  • 快取失效
1執行緒 1 執行 → 上下文切換(耗時)→ 執行緒 2 執行 → 上下文切換(耗時)→ ...

3. GIL 的限制

由於 GIL,多個執行緒無法真正並行執行 Python 程式碼:

1# 這段程式碼實際上是序列執行的
2def cpu_task():
3    total = 0
4    for i in range(1000000):
5        total += i
6    return total
7
8# 即使使用多執行緒,也無法利用多核 CPU
9threads = [threading.Thread(target=cpu_task) for _ in range(4)]

asyncio 的優勢

asyncio 採用不同的並發模型來解決這些問題:

1. 輕量級協程

協程只是普通的 Python 物件,記憶體消耗極低:

1import asyncio
2
3# 建立 10000 個協程
4async def some_task():
5    await asyncio.sleep(1)
6
7# 記憶體消耗:幾十 KB
8tasks = [some_task() for _ in range(10000)]

2. 協作式切換

協程只在 await 點切換,沒有強制的上下文切換:

1協程 1 執行 → await(主動讓出)→ 協程 2 執行 → await(主動讓出)→ ...

3. 單執行緒高並發

asyncio 在單執行緒中處理所有任務,避免了執行緒同步問題:

1async def handle_request(client):
2    data = await client.read()    # 等待時處理其他請求
3    result = process(data)
4    await client.write(result)    # 等待時處理其他請求

並發模型比較

特性threadingasyncio
執行模型多執行緒並行單執行緒協作
切換方式作業系統搶佔程式主動讓出
記憶體消耗高(每執行緒 ~8MB)低(每協程 ~KB)
並發數量百級萬級
GIL 影響受限制無影響(不需要多核)
同步複雜度需要鎖(Lock、Semaphore)較少(單執行緒)
適用場景I/O 密集 + 共享記憶體I/O 密集 + 大規模並發

程式碼轉換模式

模式 1:簡單函式轉換

threading 版本

 1import time
 2import threading
 3
 4def fetch_data(url):
 5    time.sleep(1)  # 模擬網路延遲
 6    return f"Data from {url}"
 7
 8def main():
 9    urls = ["url1", "url2", "url3"]
10    threads = []
11    results = []
12
13    for url in urls:
14        t = threading.Thread(target=lambda u=url: results.append(fetch_data(u)))
15        threads.append(t)
16        t.start()
17
18    for t in threads:
19        t.join()
20
21    return results

asyncio 版本

 1import asyncio
 2
 3async def fetch_data(url):
 4    await asyncio.sleep(1)  # 非阻塞等待
 5    return f"Data from {url}"
 6
 7async def main():
 8    urls = ["url1", "url2", "url3"]
 9
10    # 使用 gather 並發執行
11    results = await asyncio.gather(*[fetch_data(url) for url in urls])
12
13    return results
14
15# 執行
16asyncio.run(main())

轉換要點

原本轉換後
defasync def
time.sleep()await asyncio.sleep()
threading.Thread + joinasyncio.gather()

模式 2:ThreadPoolExecutor 轉換

threading 版本

1from concurrent.futures import ThreadPoolExecutor
2
3def process_file(filepath):
4    with open(filepath) as f:
5        return len(f.read())
6
7with ThreadPoolExecutor(max_workers=4) as executor:
8    results = list(executor.map(process_file, file_paths))

asyncio 版本(使用 aiofiles):

 1import asyncio
 2import aiofiles
 3
 4async def process_file(filepath):
 5    async with aiofiles.open(filepath) as f:
 6        content = await f.read()
 7        return len(content)
 8
 9async def main():
10    tasks = [process_file(fp) for fp in file_paths]
11    results = await asyncio.gather(*tasks)
12    return results
13
14asyncio.run(main())

模式 3:保留同步程式碼(混合模式)

有時候你無法(或不想)將所有程式碼都轉為異步。asyncio 提供了 run_in_executor 來處理這種情況:

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3
 4# 保持原有的同步函式
 5def blocking_operation(data):
 6    # 這是一個阻塞的第三方函式庫呼叫
 7    import time
 8    time.sleep(1)
 9    return f"Processed: {data}"
10
11async def main():
12    loop = asyncio.get_event_loop()
13
14    # 在執行緒池中執行同步函式
15    with ThreadPoolExecutor() as pool:
16        result = await loop.run_in_executor(
17            pool,
18            blocking_operation,
19            "my_data"
20        )
21
22    return result
23
24asyncio.run(main())

這種模式讓你可以漸進式地將程式碼遷移到 asyncio。

何時選擇哪種方案?

選擇 threading

  • 需要與不支援 asyncio 的函式庫整合
  • 需要共享記憶體且修改頻繁
  • 並發數量較少(< 100)
  • 團隊對 threading 更熟悉

選擇 asyncio

  • 需要處理大量並發連線(Web 伺服器、聊天室)
  • 主要是 I/O 操作(網路、檔案)
  • 使用現代 async 函式庫(aiohttp、httpx、asyncpg)
  • 需要高效能的單機並發

選擇 multiprocessing

  • CPU 密集任務(資料處理、科學計算)
  • 需要真正的並行計算
  • 各任務相對獨立,不需要頻繁通訊

決策流程圖

 1任務類型是什麼?
 2 3    ├─ CPU 密集 ────────────────→ multiprocessing
 4 5    └─ I/O 密集
 6 7        ├─ 並發數 > 100 ─────────→ asyncio
 8 9        ├─ 需要共享記憶體 ────────→ threading
1011        └─ 第三方函式庫支援 async?
1213            ├─ 是 ───────────────→ asyncio
1415            └─ 否 ───────────────→ threading 或 asyncio + run_in_executor

常見轉換陷阱

陷阱 1:忘記 await

1# 錯誤:忘記 await
2async def main():
3    result = fetch_data("url")  # 這只會建立協程物件,不會執行
4    print(result)  # <coroutine object fetch_data at 0x...>
5
6# 正確
7async def main():
8    result = await fetch_data("url")
9    print(result)

陷阱 2:在異步函式中使用阻塞呼叫

1# 錯誤:使用阻塞的 time.sleep
2async def bad_sleep():
3    time.sleep(1)  # 這會阻塞整個事件迴圈!
4
5# 正確:使用非阻塞的 asyncio.sleep
6async def good_sleep():
7    await asyncio.sleep(1)  # 這會讓出控制權

陷阱 3:在同步函式中呼叫異步函式

1# 錯誤:在普通函式中直接呼叫
2def sync_function():
3    result = await fetch_data("url")  # SyntaxError!
4
5# 正確:使用 asyncio.run
6def sync_function():
7    result = asyncio.run(fetch_data("url"))

實戰練習

練習 1:轉換簡單的多執行緒下載器

將以下 threading 程式碼轉換為 asyncio 版本:

 1import threading
 2import time
 3
 4def download(url):
 5    print(f"Downloading {url}...")
 6    time.sleep(2)  # 模擬下載
 7    print(f"Finished {url}")
 8    return f"Content of {url}"
 9
10def main():
11    urls = ["url1", "url2", "url3", "url4"]
12    threads = []
13
14    for url in urls:
15        t = threading.Thread(target=download, args=(url,))
16        threads.append(t)
17        t.start()
18
19    for t in threads:
20        t.join()
21
22if __name__ == "__main__":
23    main()

練習 2:使用 run_in_executor 整合同步函式庫

假設你有一個只支援同步的第三方 API 客戶端,寫一個異步包裝器。

下一步

理解了 threading 和 asyncio 的區別後,你可以開始深入學習 asyncio 的核心概念:


上一章:入門系列 3.7 並行處理 下一章:1.1 基礎概念與事件迴圈