1.2 協程與 Task 管理
1.2 協程與 Task 管理
本章深入探討協程的執行機制,以及如何使用 Task 管理並發任務。
先備知識
本章目標
學完本章後,你將能夠:
- 理解協程、Task、Future 的區別與關係
- 使用
create_task()建立並發任務 - 使用
gather()、wait()和TaskGroup管理多個任務 - 正確處理任務取消與超時
【原理層】可等待物件
協程、Task、Future
在 asyncio 中,有三種「可等待物件」(Awaitable):
1import asyncio
2
3# 1. 協程(Coroutine)
4async def my_coroutine():
5 return 42
6
7# 2. Task - 協程的包裝,可追蹤狀態
8task = asyncio.create_task(my_coroutine())
9
10# 3. Future - 底層的「未來結果」容器
11future = asyncio.Future()它們的關係:
1 ┌─────────────────────┐
2 │ Awaitable │
3 │ (可等待物件) │
4 └─────────────────────┘
5 ▲
6 ┌─────────┼─────────┐
7 │ │ │
8 Coroutine Future Task
9 (協程) (未來) (任務)
10 ▲
11 │
12 Task
13 (Task 繼承自 Future)await 的語義
當你 await 一個物件時:
- 如果結果已就緒,立即返回
- 如果結果未就緒,暫停當前協程,讓出控制權
- 當結果就緒時,恢復執行
1async def demo():
2 # await 協程
3 result1 = await some_coroutine()
4
5 # await Task
6 task = asyncio.create_task(some_coroutine())
7 result2 = await task
8
9 # await Future
10 future = asyncio.Future()
11 # ... 某處設定 future.set_result(value)
12 result3 = await future【設計層】Task 管理
create_task() 的時機
create_task() 將協程包裝成 Task 並排程執行:
1async def worker(name, delay):
2 print(f"{name} 開始")
3 await asyncio.sleep(delay)
4 print(f"{name} 完成")
5 return f"{name} 結果"
6
7async def main():
8 # 方法 1:依序執行(不並發)
9 result1 = await worker("A", 1)
10 result2 = await worker("B", 1)
11 # 總時間:2 秒
12
13 # 方法 2:先建立 Task,再 await(並發)
14 task1 = asyncio.create_task(worker("C", 1))
15 task2 = asyncio.create_task(worker("D", 1))
16 result3 = await task1
17 result4 = await task2
18 # 總時間:1 秒
19
20asyncio.run(main())重要:create_task() 會立即排程任務,即使你還沒 await 它。
gather() vs wait() vs TaskGroup
三種管理多個任務的方式:
1async def main():
2 tasks = [worker(f"Task-{i}", 1) for i in range(3)]
3
4 # gather():等待所有完成,返回結果列表
5 results = await asyncio.gather(*tasks)
6
7 # wait():更細緻的控制
8 done, pending = await asyncio.wait(
9 tasks,
10 return_when=asyncio.FIRST_COMPLETED # 或 ALL_COMPLETED
11 )
12
13 # TaskGroup(Python 3.11+):結構化並發
14 async with asyncio.TaskGroup() as tg:
15 task1 = tg.create_task(worker("A", 1))
16 task2 = tg.create_task(worker("B", 1))
17 # 離開 context 時,所有任務都已完成| 方法 | 特點 | 使用場景 |
|---|---|---|
gather() | 簡單,返回結果列表 | 等待所有任務完成 |
wait() | 可選擇等待策略 | 需要處理先完成的任務 |
TaskGroup | 結構化,異常處理更好 | Python 3.11+,推薦使用 |
【實作層】任務生命週期
任務狀態
1async def demo():
2 task = asyncio.create_task(asyncio.sleep(1))
3
4 print(f"已完成:{task.done()}") # False
5 print(f"已取消:{task.cancelled()}") # False
6
7 await task
8
9 print(f"已完成:{task.done()}") # True
10 print(f"結果:{task.result()}") # None(sleep 返回 None)取消任務
1async def long_running():
2 try:
3 await asyncio.sleep(10)
4 except asyncio.CancelledError:
5 print("任務被取消")
6 raise # 重要:要重新拋出
7
8async def main():
9 task = asyncio.create_task(long_running())
10 await asyncio.sleep(1) # 讓任務開始
11 task.cancel() # 請求取消
12
13 try:
14 await task
15 except asyncio.CancelledError:
16 print("確認已取消")超時控制
1async def main():
2 # 方法 1:wait_for
3 try:
4 result = await asyncio.wait_for(long_running(), timeout=2.0)
5 except asyncio.TimeoutError:
6 print("超時")
7
8 # 方法 2:timeout(Python 3.11+)
9 try:
10 async with asyncio.timeout(2.0):
11 result = await long_running()
12 except asyncio.TimeoutError:
13 print("超時")【常見錯誤】
1. 協程從未執行
1async def main():
2 # 錯誤:只建立了協程物件,沒有執行
3 worker("A", 1) # RuntimeWarning!
4
5 # 正確
6 await worker("A", 1)2. Task 被垃圾回收
1async def main():
2 # 錯誤:task 沒有被引用,可能被 GC
3 asyncio.create_task(worker("A", 1))
4
5 # 正確:保持引用
6 task = asyncio.create_task(worker("A", 1))
7 await task3. 異常被吞掉
1async def failing_task():
2 raise ValueError("出錯了")
3
4async def main():
5 task = asyncio.create_task(failing_task())
6 await asyncio.sleep(1) # 異常不會在這裡拋出
7 # 必須 await task 才會看到異常思考題
await task和await asyncio.gather(task)有什麼區別?- 為什麼
CancelledError要重新拋出? TaskGroup相比gather()有什麼優勢?
實作練習
- 實作一個函式,並發執行多個任務,但限制同時執行的數量(提示:使用
Semaphore) - 實作一個「競速」函式,返回最先完成的任務結果
- 實作一個任務管理器,可以動態新增和取消任務