本章深入探討協程的執行機制,以及如何使用 Task 管理並發任務。

先備知識

本章目標

學完本章後,你將能夠:

  1. 理解協程、Task、Future 的區別與關係
  2. 使用 create_task() 建立並發任務
  3. 使用 gather()wait()TaskGroup 管理多個任務
  4. 正確處理任務取消與超時

【原理層】可等待物件

協程、Task、Future

在 asyncio 中,有三種「可等待物件」(Awaitable):

 1import asyncio
 2
 3# 1. 協程(Coroutine)
 4async def my_coroutine():
 5    return 42
 6
 7# 2. Task - 協程的包裝,可追蹤狀態
 8task = asyncio.create_task(my_coroutine())
 9
10# 3. Future - 底層的「未來結果」容器
11future = asyncio.Future()

它們的關係:

 1         ┌─────────────────────┐
 2         │     Awaitable       │
 3         │   (可等待物件)      │
 4         └─────────────────────┘
 5 6         ┌─────────┼─────────┐
 7         │         │         │
 8    Coroutine   Future     Task
 9     (協程)   (未來)  (任務)
101112                 Task
13           (Task 繼承自 Future)

await 的語義

當你 await 一個物件時:

  1. 如果結果已就緒,立即返回
  2. 如果結果未就緒,暫停當前協程,讓出控制權
  3. 當結果就緒時,恢復執行
 1async def demo():
 2    # await 協程
 3    result1 = await some_coroutine()
 4
 5    # await Task
 6    task = asyncio.create_task(some_coroutine())
 7    result2 = await task
 8
 9    # await Future
10    future = asyncio.Future()
11    # ... 某處設定 future.set_result(value)
12    result3 = await future

【設計層】Task 管理

create_task() 的時機

create_task() 將協程包裝成 Task 並排程執行:

 1async def worker(name, delay):
 2    print(f"{name} 開始")
 3    await asyncio.sleep(delay)
 4    print(f"{name} 完成")
 5    return f"{name} 結果"
 6
 7async def main():
 8    # 方法 1:依序執行(不並發)
 9    result1 = await worker("A", 1)
10    result2 = await worker("B", 1)
11    # 總時間:2 秒
12
13    # 方法 2:先建立 Task,再 await(並發)
14    task1 = asyncio.create_task(worker("C", 1))
15    task2 = asyncio.create_task(worker("D", 1))
16    result3 = await task1
17    result4 = await task2
18    # 總時間:1 秒
19
20asyncio.run(main())

重要create_task() 會立即排程任務,即使你還沒 await 它。

gather() vs wait() vs TaskGroup

三種管理多個任務的方式:

 1async def main():
 2    tasks = [worker(f"Task-{i}", 1) for i in range(3)]
 3
 4    # gather():等待所有完成,返回結果列表
 5    results = await asyncio.gather(*tasks)
 6
 7    # wait():更細緻的控制
 8    done, pending = await asyncio.wait(
 9        tasks,
10        return_when=asyncio.FIRST_COMPLETED  # 或 ALL_COMPLETED
11    )
12
13    # TaskGroup(Python 3.11+):結構化並發
14    async with asyncio.TaskGroup() as tg:
15        task1 = tg.create_task(worker("A", 1))
16        task2 = tg.create_task(worker("B", 1))
17    # 離開 context 時,所有任務都已完成
方法特點使用場景
gather()簡單,返回結果列表等待所有任務完成
wait()可選擇等待策略需要處理先完成的任務
TaskGroup結構化,異常處理更好Python 3.11+,推薦使用

【實作層】任務生命週期

任務狀態

 1async def demo():
 2    task = asyncio.create_task(asyncio.sleep(1))
 3
 4    print(f"已完成:{task.done()}")      # False
 5    print(f"已取消:{task.cancelled()}")  # False
 6
 7    await task
 8
 9    print(f"已完成:{task.done()}")      # True
10    print(f"結果:{task.result()}")      # None(sleep 返回 None)

取消任務

 1async def long_running():
 2    try:
 3        await asyncio.sleep(10)
 4    except asyncio.CancelledError:
 5        print("任務被取消")
 6        raise  # 重要:要重新拋出
 7
 8async def main():
 9    task = asyncio.create_task(long_running())
10    await asyncio.sleep(1)  # 讓任務開始
11    task.cancel()           # 請求取消
12
13    try:
14        await task
15    except asyncio.CancelledError:
16        print("確認已取消")

超時控制

 1async def main():
 2    # 方法 1:wait_for
 3    try:
 4        result = await asyncio.wait_for(long_running(), timeout=2.0)
 5    except asyncio.TimeoutError:
 6        print("超時")
 7
 8    # 方法 2:timeout(Python 3.11+)
 9    try:
10        async with asyncio.timeout(2.0):
11            result = await long_running()
12    except asyncio.TimeoutError:
13        print("超時")

【常見錯誤】

1. 協程從未執行

1async def main():
2    # 錯誤:只建立了協程物件,沒有執行
3    worker("A", 1)  # RuntimeWarning!
4
5    # 正確
6    await worker("A", 1)

2. Task 被垃圾回收

1async def main():
2    # 錯誤:task 沒有被引用,可能被 GC
3    asyncio.create_task(worker("A", 1))
4
5    # 正確:保持引用
6    task = asyncio.create_task(worker("A", 1))
7    await task

3. 異常被吞掉

1async def failing_task():
2    raise ValueError("出錯了")
3
4async def main():
5    task = asyncio.create_task(failing_task())
6    await asyncio.sleep(1)  # 異常不會在這裡拋出
7    # 必須 await task 才會看到異常

思考題

  1. await taskawait asyncio.gather(task) 有什麼區別?
  2. 為什麼 CancelledError 要重新拋出?
  3. TaskGroup 相比 gather() 有什麼優勢?

實作練習

  1. 實作一個函式,並發執行多個任務,但限制同時執行的數量(提示:使用 Semaphore
  2. 實作一個「競速」函式,返回最先完成的任務結果
  3. 實作一個任務管理器,可以動態新增和取消任務

延伸閱讀


上一章:基礎概念與事件迴圈 下一章:設計模式與最佳實踐