3.7 並行處理 - threading、multiprocessing、concurrent.futures
3.7 並行處理 - threading、multiprocessing、concurrent.futures
4. multiprocessing 的
Python 提供了多種並行處理的方式。本章介紹三個核心模組,幫助你根據任務特性選擇合適的方案。
為什麼需要並行處理?
在實際開發中,我們常遇到需要同時處理多個任務的情況:
1# 情境 1:批次下載多個檔案(I/O 密集)
2urls = ["https://example.com/file1", "https://example.com/file2", ...]
3# 一個一個下載太慢了!
4
5# 情境 2:處理大量資料(CPU 密集)
6data_chunks = [chunk1, chunk2, chunk3, ...]
7# 能不能同時處理多個資料區塊?並行處理可以顯著提升這類任務的效率。
I/O 密集 vs CPU 密集
在選擇並行方案之前,首先要判斷你的任務類型:
I/O 密集任務
程式大部分時間在「等待」外部資源:
- 網路請求(HTTP、API 呼叫)
- 檔案讀寫
- 資料庫查詢
1# I/O 密集的特徵:大部分時間在等待
2def fetch_data():
3 response = requests.get(url) # 等待網路回應
4 return response.json()CPU 密集任務
程式大部分時間在「計算」:
- 數學運算
- 資料處理與轉換
- 圖像處理
1# CPU 密集的特徵:大部分時間在計算
2def compute_heavy(n):
3 return sum(i * i for i in range(n)) # 純計算GIL(全域直譯器鎖)
在深入各模組之前,需要先了解 Python 的一個重要機制。
什麼是 GIL?
GIL(Global Interpreter Lock)是 CPython 直譯器的一個機制,它確保同一時間只有一個執行緒能執行 Python bytecode。
1┌─────────────────────────────────────────┐
2│ Python 直譯器 │
3│ ┌─────┐ ┌─────┐ ┌─────┐ │
4│ │執行緒1│ │執行緒2│ │執行緒3│ │
5│ └──┬──┘ └──┬──┘ └──┬──┘ │
6│ │ │ │ │
7│ └────────┼────────┘ │
8│ ▼ │
9│ ┌───────┐ │
10│ │ GIL │ ← 同時只有一個能執行 │
11│ └───────┘ │
12└─────────────────────────────────────────┘GIL 的影響
| 任務類型 | GIL 影響 | 原因 |
|---|---|---|
| I/O 密集 | 影響小 | 等待 I/O 時會釋放 GIL |
| CPU 密集 | 影響大 | 多執行緒無法真正並行計算 |
這就是為什麼:
- I/O 密集:使用
threading即可 - CPU 密集:需要使用
multiprocessing繞過 GIL
注意:Python 3.13+ 推出了 Free-threading(無 GIL)版本,詳見 3.8 Free-Threading
threading 模組
threading 模組提供執行緒級別的並行,適合 I/O 密集任務。
基本用法
1import threading
2import time
3
4def worker(name, delay):
5 print(f"{name} 開始工作")
6 time.sleep(delay) # 模擬 I/O 等待
7 print(f"{name} 完成工作")
8
9# 建立執行緒
10t1 = threading.Thread(target=worker, args=("Worker-1", 2))
11t2 = threading.Thread(target=worker, args=("Worker-2", 1))
12
13# 啟動執行緒
14t1.start()
15t2.start()
16
17# 等待執行緒完成
18t1.join()
19t2.join()
20
21print("所有工作完成")執行緒安全與 Lock
當多個執行緒存取共享資源時,需要使用鎖來避免競爭條件:
1import threading
2
3counter = 0
4lock = threading.Lock()
5
6def increment():
7 global counter
8 for _ in range(100000):
9 with lock: # 使用 context manager 自動獲取和釋放鎖
10 counter += 1
11
12# 建立多個執行緒
13threads = [threading.Thread(target=increment) for _ in range(5)]
14
15for t in threads:
16 t.start()
17for t in threads:
18 t.join()
19
20print(f"Counter: {counter}") # 應該是 500000何時使用 threading
- 網路請求(HTTP、API)
- 檔案讀寫
- 資料庫操作
- 任何需要等待外部資源的任務
multiprocessing 模組
multiprocessing 模組使用多個進程來實現真正的並行,繞過 GIL 限制。
基本用法
1from multiprocessing import Process
2
3def cpu_intensive(n):
4 """CPU 密集計算"""
5 result = sum(i * i for i in range(n))
6 print(f"計算完成: {result}")
7
8if __name__ == "__main__": # 在 Windows 上必須使用這個保護
9 processes = []
10 for i in range(4):
11 p = Process(target=cpu_intensive, args=(10_000_000,))
12 processes.append(p)
13 p.start()
14
15 for p in processes:
16 p.join()
17
18 print("所有計算完成")進程間通訊
進程之間不共享記憶體,需要使用 Queue 或 Pipe 來通訊:
1from multiprocessing import Process, Queue
2
3def worker(queue, n):
4 result = sum(i * i for i in range(n))
5 queue.put(result) # 將結果放入佇列
6
7if __name__ == "__main__":
8 queue = Queue()
9 processes = []
10
11 for i in range(4):
12 p = Process(target=worker, args=(queue, 5_000_000))
13 processes.append(p)
14 p.start()
15
16 for p in processes:
17 p.join()
18
19 # 收集結果
20 results = [queue.get() for _ in range(4)]
21 print(f"結果: {results}")何時使用 multiprocessing
- CPU 密集計算
- 資料處理與轉換
- 需要真正並行執行的任務
concurrent.futures(推薦入門)
concurrent.futures 提供了更高階、更簡潔的 API,統一了執行緒和進程的使用方式。
ThreadPoolExecutor
適合 I/O 密集任務:
1from concurrent.futures import ThreadPoolExecutor
2import urllib.request
3
4def fetch_url(url):
5 """下載網頁並返回大小"""
6 try:
7 with urllib.request.urlopen(url, timeout=10) as response:
8 return url, len(response.read())
9 except Exception as e:
10 return url, f"Error: {e}"
11
12urls = [
13 "https://www.python.org",
14 "https://docs.python.org",
15 "https://pypi.org",
16]
17
18# 使用執行緒池並行下載
19with ThreadPoolExecutor(max_workers=3) as executor:
20 results = list(executor.map(fetch_url, urls))
21
22for url, size in results:
23 print(f"{url}: {size}")ProcessPoolExecutor
適合 CPU 密集任務:
1from concurrent.futures import ProcessPoolExecutor, as_completed
2
3def compute_heavy(n):
4 """CPU 密集計算"""
5 return n, sum(i * i for i in range(n))
6
7if __name__ == "__main__":
8 numbers = [10_000_000, 20_000_000, 15_000_000, 5_000_000]
9
10 with ProcessPoolExecutor() as executor:
11 # 方法 1:使用 map(保持順序)
12 results = list(executor.map(compute_heavy, numbers))
13
14 # 方法 2:使用 submit + as_completed(先完成先處理)
15 futures = {executor.submit(compute_heavy, n): n for n in numbers}
16 for future in as_completed(futures):
17 n, result = future.result()
18 print(f"n={n}: {result}")處理異常
1from concurrent.futures import ThreadPoolExecutor, as_completed
2
3def risky_task(n):
4 if n == 3:
5 raise ValueError("不喜歡 3!")
6 return n * 2
7
8with ThreadPoolExecutor(max_workers=4) as executor:
9 futures = {executor.submit(risky_task, i): i for i in range(5)}
10
11 for future in as_completed(futures):
12 n = futures[future]
13 try:
14 result = future.result()
15 print(f"任務 {n} 完成: {result}")
16 except Exception as e:
17 print(f"任務 {n} 失敗: {e}")選擇指南
| 任務類型 | 推薦方案 | 原因 |
|---|---|---|
| I/O 密集 | ThreadPoolExecutor | 輕量、共享記憶體、GIL 影響小 |
| CPU 密集 | ProcessPoolExecutor | 繞過 GIL、真正並行 |
| 需要細控制 | threading/multiprocessing | 底層 API、更多控制 |
| Python 3.14+ CPU 密集 | threading + Free-threading | 真正的多執行緒並行 |
決策流程
1任務類型是什麼?
2 │
3 ├─→ I/O 密集(網路、檔案、DB)
4 │ │
5 │ └─→ 使用 ThreadPoolExecutor
6 │
7 └─→ CPU 密集(計算、處理)
8 │
9 ├─→ Python 3.14+ Free-threaded
10 │ │
11 │ └─→ 可以使用 threading
12 │
13 └─→ 傳統 Python
14 │
15 └─→ 使用 ProcessPoolExecutor常見陷阱與最佳實踐
1. 設定合理的 worker 數量
1import os
2
3# I/O 密集:可以設定較多的 worker
4io_workers = min(32, os.cpu_count() + 4)
5
6# CPU 密集:不要超過 CPU 核心數
7cpu_workers = os.cpu_count()2. 避免共享可變狀態
1# 不好:共享可變狀態
2results = []
3
4def bad_worker(n):
5 results.append(n * 2) # 危險!多執行緒存取
6
7# 好:返回結果,由主執行緒收集
8def good_worker(n):
9 return n * 2
10
11with ThreadPoolExecutor() as executor:
12 results = list(executor.map(good_worker, range(10)))3. 使用 context manager
1# 好:使用 with 語句自動管理資源
2with ThreadPoolExecutor(max_workers=4) as executor:
3 results = executor.map(task, items)
4
5# 不好:手動管理
6executor = ThreadPoolExecutor(max_workers=4)
7results = executor.map(task, items)
8executor.shutdown(wait=True) # 容易忘記4. multiprocessing 的 if __name__ == "__main__" 保護
1from multiprocessing import Process
2
3def worker():
4 print("Working...")
5
6# Windows 上必須使用這個保護,否則會無限遞迴
7if __name__ == "__main__":
8 p = Process(target=worker)
9 p.start()
10 p.join()思考題
- 為什麼 I/O 密集任務使用
threading就夠了,而 CPU 密集任務需要multiprocessing? ThreadPoolExecutor和手動建立Thread有什麼優缺點?- 在什麼情況下,並行處理反而會比序列處理更慢?
實作練習
- 寫一個函式,使用
ThreadPoolExecutor同時檢查多個網址是否可以連線 - 使用
ProcessPoolExecutor計算一組大數字的質因數分解 - 實作一個進度顯示器,顯示多個任務的完成進度
延伸閱讀(進階系列)
- 實戰效能優化:並行處理 - 真實案例的並行化改造
- asyncio 非同步程式設計 - 學習協程與事件迴圈
- GIL 與執行緒模型 - 深入理解 GIL 的設計與實現
- Free-Threading - Python 3.13+ 無 GIL 多執行緒
上一章:argparse - CLI 介面 下一章:效能迷思與優化策略