Python 提供了多種並行處理的方式。本章介紹三個核心模組,幫助你根據任務特性選擇合適的方案。

為什麼需要並行處理?

在實際開發中,我們常遇到需要同時處理多個任務的情況:

1# 情境 1:批次下載多個檔案(I/O 密集)
2urls = ["https://example.com/file1", "https://example.com/file2", ...]
3# 一個一個下載太慢了!
4
5# 情境 2:處理大量資料(CPU 密集)
6data_chunks = [chunk1, chunk2, chunk3, ...]
7# 能不能同時處理多個資料區塊?

並行處理可以顯著提升這類任務的效率。

I/O 密集 vs CPU 密集

在選擇並行方案之前,首先要判斷你的任務類型:

I/O 密集任務

程式大部分時間在「等待」外部資源:

  • 網路請求(HTTP、API 呼叫)
  • 檔案讀寫
  • 資料庫查詢
1# I/O 密集的特徵:大部分時間在等待
2def fetch_data():
3    response = requests.get(url)  # 等待網路回應
4    return response.json()

CPU 密集任務

程式大部分時間在「計算」:

  • 數學運算
  • 資料處理與轉換
  • 圖像處理
1# CPU 密集的特徵:大部分時間在計算
2def compute_heavy(n):
3    return sum(i * i for i in range(n))  # 純計算

GIL(全域直譯器鎖)

在深入各模組之前,需要先了解 Python 的一個重要機制。

什麼是 GIL?

GIL(Global Interpreter Lock)是 CPython 直譯器的一個機制,它確保同一時間只有一個執行緒能執行 Python bytecode。

 1┌─────────────────────────────────────────┐
 2│              Python 直譯器                │
 3│  ┌─────┐  ┌─────┐  ┌─────┐              │
 4│  │執行緒1│  │執行緒2│  │執行緒3│              │
 5│  └──┬──┘  └──┬──┘  └──┬──┘              │
 6│     │        │        │                 │
 7│     └────────┼────────┘                 │
 8│              ▼                          │
 9│         ┌───────┐                       │
10│         │  GIL  │ ← 同時只有一個能執行      │
11│         └───────┘                       │
12└─────────────────────────────────────────┘

GIL 的影響

任務類型GIL 影響原因
I/O 密集影響小等待 I/O 時會釋放 GIL
CPU 密集影響大多執行緒無法真正並行計算

這就是為什麼:

  • I/O 密集:使用 threading 即可
  • CPU 密集:需要使用 multiprocessing 繞過 GIL

注意:Python 3.13+ 推出了 Free-threading(無 GIL)版本,詳見 3.8 Free-Threading

threading 模組

threading 模組提供執行緒級別的並行,適合 I/O 密集任務。

基本用法

 1import threading
 2import time
 3
 4def worker(name, delay):
 5    print(f"{name} 開始工作")
 6    time.sleep(delay)  # 模擬 I/O 等待
 7    print(f"{name} 完成工作")
 8
 9# 建立執行緒
10t1 = threading.Thread(target=worker, args=("Worker-1", 2))
11t2 = threading.Thread(target=worker, args=("Worker-2", 1))
12
13# 啟動執行緒
14t1.start()
15t2.start()
16
17# 等待執行緒完成
18t1.join()
19t2.join()
20
21print("所有工作完成")

執行緒安全與 Lock

當多個執行緒存取共享資源時,需要使用鎖來避免競爭條件:

 1import threading
 2
 3counter = 0
 4lock = threading.Lock()
 5
 6def increment():
 7    global counter
 8    for _ in range(100000):
 9        with lock:  # 使用 context manager 自動獲取和釋放鎖
10            counter += 1
11
12# 建立多個執行緒
13threads = [threading.Thread(target=increment) for _ in range(5)]
14
15for t in threads:
16    t.start()
17for t in threads:
18    t.join()
19
20print(f"Counter: {counter}")  # 應該是 500000

何時使用 threading

  • 網路請求(HTTP、API)
  • 檔案讀寫
  • 資料庫操作
  • 任何需要等待外部資源的任務

multiprocessing 模組

multiprocessing 模組使用多個進程來實現真正的並行,繞過 GIL 限制。

基本用法

 1from multiprocessing import Process
 2
 3def cpu_intensive(n):
 4    """CPU 密集計算"""
 5    result = sum(i * i for i in range(n))
 6    print(f"計算完成: {result}")
 7
 8if __name__ == "__main__":  # 在 Windows 上必須使用這個保護
 9    processes = []
10    for i in range(4):
11        p = Process(target=cpu_intensive, args=(10_000_000,))
12        processes.append(p)
13        p.start()
14
15    for p in processes:
16        p.join()
17
18    print("所有計算完成")

進程間通訊

進程之間不共享記憶體,需要使用 Queue 或 Pipe 來通訊:

 1from multiprocessing import Process, Queue
 2
 3def worker(queue, n):
 4    result = sum(i * i for i in range(n))
 5    queue.put(result)  # 將結果放入佇列
 6
 7if __name__ == "__main__":
 8    queue = Queue()
 9    processes = []
10
11    for i in range(4):
12        p = Process(target=worker, args=(queue, 5_000_000))
13        processes.append(p)
14        p.start()
15
16    for p in processes:
17        p.join()
18
19    # 收集結果
20    results = [queue.get() for _ in range(4)]
21    print(f"結果: {results}")

何時使用 multiprocessing

  • CPU 密集計算
  • 資料處理與轉換
  • 需要真正並行執行的任務

concurrent.futures(推薦入門)

concurrent.futures 提供了更高階、更簡潔的 API,統一了執行緒和進程的使用方式。

ThreadPoolExecutor

適合 I/O 密集任務:

 1from concurrent.futures import ThreadPoolExecutor
 2import urllib.request
 3
 4def fetch_url(url):
 5    """下載網頁並返回大小"""
 6    try:
 7        with urllib.request.urlopen(url, timeout=10) as response:
 8            return url, len(response.read())
 9    except Exception as e:
10        return url, f"Error: {e}"
11
12urls = [
13    "https://www.python.org",
14    "https://docs.python.org",
15    "https://pypi.org",
16]
17
18# 使用執行緒池並行下載
19with ThreadPoolExecutor(max_workers=3) as executor:
20    results = list(executor.map(fetch_url, urls))
21
22for url, size in results:
23    print(f"{url}: {size}")

ProcessPoolExecutor

適合 CPU 密集任務:

 1from concurrent.futures import ProcessPoolExecutor, as_completed
 2
 3def compute_heavy(n):
 4    """CPU 密集計算"""
 5    return n, sum(i * i for i in range(n))
 6
 7if __name__ == "__main__":
 8    numbers = [10_000_000, 20_000_000, 15_000_000, 5_000_000]
 9
10    with ProcessPoolExecutor() as executor:
11        # 方法 1:使用 map(保持順序)
12        results = list(executor.map(compute_heavy, numbers))
13
14        # 方法 2:使用 submit + as_completed(先完成先處理)
15        futures = {executor.submit(compute_heavy, n): n for n in numbers}
16        for future in as_completed(futures):
17            n, result = future.result()
18            print(f"n={n}: {result}")

處理異常

 1from concurrent.futures import ThreadPoolExecutor, as_completed
 2
 3def risky_task(n):
 4    if n == 3:
 5        raise ValueError("不喜歡 3!")
 6    return n * 2
 7
 8with ThreadPoolExecutor(max_workers=4) as executor:
 9    futures = {executor.submit(risky_task, i): i for i in range(5)}
10
11    for future in as_completed(futures):
12        n = futures[future]
13        try:
14            result = future.result()
15            print(f"任務 {n} 完成: {result}")
16        except Exception as e:
17            print(f"任務 {n} 失敗: {e}")

選擇指南

任務類型推薦方案原因
I/O 密集ThreadPoolExecutor輕量、共享記憶體、GIL 影響小
CPU 密集ProcessPoolExecutor繞過 GIL、真正並行
需要細控制threading/multiprocessing底層 API、更多控制
Python 3.14+ CPU 密集threading + Free-threading真正的多執行緒並行

決策流程

 1任務類型是什麼?
 2 3    ├─→ I/O 密集(網路、檔案、DB)
 4    │       │
 5    │       └─→ 使用 ThreadPoolExecutor
 6 7    └─→ CPU 密集(計算、處理)
 8 9            ├─→ Python 3.14+ Free-threaded
10            │       │
11            │       └─→ 可以使用 threading
1213            └─→ 傳統 Python
1415                    └─→ 使用 ProcessPoolExecutor

常見陷阱與最佳實踐

1. 設定合理的 worker 數量

1import os
2
3# I/O 密集:可以設定較多的 worker
4io_workers = min(32, os.cpu_count() + 4)
5
6# CPU 密集:不要超過 CPU 核心數
7cpu_workers = os.cpu_count()

2. 避免共享可變狀態

 1# 不好:共享可變狀態
 2results = []
 3
 4def bad_worker(n):
 5    results.append(n * 2)  # 危險!多執行緒存取
 6
 7# 好:返回結果,由主執行緒收集
 8def good_worker(n):
 9    return n * 2
10
11with ThreadPoolExecutor() as executor:
12    results = list(executor.map(good_worker, range(10)))

3. 使用 context manager

1# 好:使用 with 語句自動管理資源
2with ThreadPoolExecutor(max_workers=4) as executor:
3    results = executor.map(task, items)
4
5# 不好:手動管理
6executor = ThreadPoolExecutor(max_workers=4)
7results = executor.map(task, items)
8executor.shutdown(wait=True)  # 容易忘記

4. multiprocessing 的 if __name__ == "__main__" 保護

 1from multiprocessing import Process
 2
 3def worker():
 4    print("Working...")
 5
 6# Windows 上必須使用這個保護,否則會無限遞迴
 7if __name__ == "__main__":
 8    p = Process(target=worker)
 9    p.start()
10    p.join()

思考題

  1. 為什麼 I/O 密集任務使用 threading 就夠了,而 CPU 密集任務需要 multiprocessing
  2. ThreadPoolExecutor 和手動建立 Thread 有什麼優缺點?
  3. 在什麼情況下,並行處理反而會比序列處理更慢?

實作練習

  1. 寫一個函式,使用 ThreadPoolExecutor 同時檢查多個網址是否可以連線
  2. 使用 ProcessPoolExecutor 計算一組大數字的質因數分解
  3. 實作一個進度顯示器,顯示多個任務的完成進度

延伸閱讀(進階系列)


上一章:argparse - CLI 介面 下一章:效能迷思與優化策略