本章透過兩個完整案例,展示如何將前四章的設計模式結合應用。每個案例都會說明各模式的協作關係。

先備知識

  • 本模組 3.5.1-3.5.4 所有章節

案例一:迷你 ORM 框架

建立一個簡化的 ORM(Object-Relational Mapping)框架,展示各模式的整合。

設計概覽

1┌─────────────────────────────────────────────────────────┐
2│                     MiniORM Framework                    │
3├─────────────────────────────────────────────────────────┤
4│  泛型        │ Repository[T] - 型別安全的資料存取       │
5│  異常        │ 階層式異常 - 精確的錯誤處理              │
6│  上下文      │ Transaction - 交易管理                   │
7│  插件        │ FieldType - 可擴展的欄位型別             │
8└─────────────────────────────────────────────────────────┘

異常層級設計

 1# miniorm/exceptions.py
 2
 3class ORMError(Exception):
 4    """ORM 框架的基礎異常"""
 5
 6    def __init__(self, message: str, code: str | None = None) -> None:
 7        super().__init__(message)
 8        self.message = message
 9        self.code = code
10
11class ConnectionError(ORMError):
12    """資料庫連接錯誤"""
13    pass
14
15class QueryError(ORMError):
16    """查詢執行錯誤"""
17    pass
18
19class EntityNotFoundError(ORMError):
20    """實體不存在"""
21
22    def __init__(self, model: str, pk: int | str) -> None:
23        super().__init__(f"{model} with pk={pk} not found", code="NOT_FOUND")
24        self.model = model
25        self.pk = pk
26
27class ValidationError(ORMError):
28    """資料驗證錯誤"""
29
30    def __init__(self, field: str, message: str) -> None:
31        super().__init__(f"Validation failed for '{field}': {message}", code="VALIDATION")
32        self.field = field
33
34class TransactionError(ORMError):
35    """交易錯誤"""
36    pass

插件系統:可擴展的欄位型別

  1# miniorm/fields.py
  2
  3from abc import ABC, abstractmethod
  4from typing import Any, ClassVar
  5from datetime import datetime
  6
  7class FieldType(ABC):
  8    """欄位型別插件基類"""
  9    _registry: ClassVar[dict[str, type["FieldType"]]] = {}
 10
 11    # 子類別必須定義
 12    type_name: ClassVar[str]
 13    python_type: ClassVar[type]
 14
 15    def __init_subclass__(cls, **kwargs) -> None:
 16        super().__init_subclass__(**kwargs)
 17        if hasattr(cls, "type_name"):
 18            FieldType._registry[cls.type_name] = cls
 19
 20    @abstractmethod
 21    def to_db(self, value: Any) -> Any:
 22        """Python 值轉換為資料庫值"""
 23        ...
 24
 25    @abstractmethod
 26    def from_db(self, value: Any) -> Any:
 27        """資料庫值轉換為 Python 值"""
 28        ...
 29
 30    @abstractmethod
 31    def validate(self, value: Any) -> None:
 32        """驗證值,失敗時拋出 ValidationError"""
 33        ...
 34
 35    @classmethod
 36    def get_type(cls, name: str) -> type["FieldType"] | None:
 37        return cls._registry.get(name)
 38
 39# 內建欄位型別
 40class IntegerField(FieldType):
 41    type_name = "integer"
 42    python_type = int
 43
 44    def to_db(self, value: Any) -> int:
 45        return int(value)
 46
 47    def from_db(self, value: Any) -> int:
 48        return int(value) if value is not None else 0
 49
 50    def validate(self, value: Any) -> None:
 51        if value is not None and not isinstance(value, int):
 52            raise ValidationError("value", f"Expected int, got {type(value).__name__}")
 53
 54class StringField(FieldType):
 55    type_name = "string"
 56    python_type = str
 57
 58    def __init__(self, max_length: int = 255) -> None:
 59        self.max_length = max_length
 60
 61    def to_db(self, value: Any) -> str:
 62        return str(value)
 63
 64    def from_db(self, value: Any) -> str:
 65        return str(value) if value is not None else ""
 66
 67    def validate(self, value: Any) -> None:
 68        if value is not None:
 69            if not isinstance(value, str):
 70                raise ValidationError("value", f"Expected str, got {type(value).__name__}")
 71            if len(value) > self.max_length:
 72                raise ValidationError("value", f"Max length is {self.max_length}")
 73
 74class DateTimeField(FieldType):
 75    type_name = "datetime"
 76    python_type = datetime
 77
 78    def to_db(self, value: Any) -> str:
 79        if isinstance(value, datetime):
 80            return value.isoformat()
 81        return str(value)
 82
 83    def from_db(self, value: Any) -> datetime:
 84        if isinstance(value, datetime):
 85            return value
 86        return datetime.fromisoformat(str(value))
 87
 88    def validate(self, value: Any) -> None:
 89        if value is not None and not isinstance(value, datetime):
 90            raise ValidationError("value", f"Expected datetime, got {type(value).__name__}")
 91
 92# 使用者可以擴展新的欄位型別
 93class JSONField(FieldType):
 94    """自訂欄位型別範例"""
 95    type_name = "json"
 96    python_type = dict
 97
 98    def to_db(self, value: Any) -> str:
 99        import json
100        return json.dumps(value)
101
102    def from_db(self, value: Any) -> dict:
103        import json
104        if isinstance(value, dict):
105            return value
106        return json.loads(str(value))
107
108    def validate(self, value: Any) -> None:
109        if value is not None and not isinstance(value, (dict, list)):
110            raise ValidationError("value", "Expected dict or list")

泛型 Repository

 1# miniorm/repository.py
 2
 3from typing import TypeVar, Generic, Protocol, ClassVar
 4from abc import abstractmethod
 5
 6class HasId(Protocol):
 7    """具有 id 屬性的協議"""
 8    id: int
 9
10T = TypeVar("T", bound=HasId)
11
12class Repository(Generic[T]):
13    """泛型 Repository 基類"""
14
15    model_class: ClassVar[type]
16
17    def __init__(self, connection: "Connection") -> None:
18        self._connection = connection
19
20    @abstractmethod
21    def get(self, pk: int) -> T | None:
22        """根據主鍵取得實體"""
23        ...
24
25    @abstractmethod
26    def save(self, entity: T) -> T:
27        """儲存實體"""
28        ...
29
30    @abstractmethod
31    def delete(self, pk: int) -> bool:
32        """刪除實體"""
33        ...
34
35    @abstractmethod
36    def find_all(self) -> list[T]:
37        """取得所有實體"""
38        ...
39
40    def get_or_raise(self, pk: int) -> T:
41        """取得實體,不存在時拋出異常"""
42        entity = self.get(pk)
43        if entity is None:
44            raise EntityNotFoundError(self.model_class.__name__, pk)
45        return entity
46
47class InMemoryRepository(Repository[T]):
48    """記憶體實作的 Repository"""
49
50    def __init__(self, connection: "Connection") -> None:
51        super().__init__(connection)
52        self._storage: dict[int, T] = {}
53        self._next_id = 1
54
55    def get(self, pk: int) -> T | None:
56        return self._storage.get(pk)
57
58    def save(self, entity: T) -> T:
59        # 驗證欄位
60        self._validate_entity(entity)
61
62        if entity.id == 0:
63            entity.id = self._next_id
64            self._next_id += 1
65        self._storage[entity.id] = entity
66        return entity
67
68    def delete(self, pk: int) -> bool:
69        if pk in self._storage:
70            del self._storage[pk]
71            return True
72        return False
73
74    def find_all(self) -> list[T]:
75        return list(self._storage.values())
76
77    def _validate_entity(self, entity: T) -> None:
78        """驗證實體的所有欄位"""
79        # 這裡可以整合 FieldType 的驗證邏輯
80        pass

上下文管理:交易

 1# miniorm/transaction.py
 2
 3from contextlib import contextmanager
 4from typing import Iterator
 5from dataclasses import dataclass
 6from enum import Enum, auto
 7
 8class TransactionState(Enum):
 9    ACTIVE = auto()
10    COMMITTED = auto()
11    ROLLED_BACK = auto()
12
13@dataclass
14class Transaction:
15    """交易物件"""
16    id: int
17    state: TransactionState = TransactionState.ACTIVE
18    _operations: list = None
19
20    def __post_init__(self):
21        self._operations = []
22
23    def add_operation(self, op: dict) -> None:
24        if self.state != TransactionState.ACTIVE:
25            raise TransactionError("Transaction is not active")
26        self._operations.append(op)
27
28    def commit(self) -> None:
29        if self.state != TransactionState.ACTIVE:
30            raise TransactionError("Transaction is not active")
31        # 實際應用中這裡會提交到資料庫
32        self.state = TransactionState.COMMITTED
33
34    def rollback(self) -> None:
35        if self.state == TransactionState.COMMITTED:
36            raise TransactionError("Cannot rollback committed transaction")
37        # 實際應用中這裡會回滾操作
38        self._operations.clear()
39        self.state = TransactionState.ROLLED_BACK
40
41class Connection:
42    """資料庫連接"""
43
44    def __init__(self, url: str) -> None:
45        self.url = url
46        self._tx_counter = 0
47        self._current_tx: Transaction | None = None
48
49    @contextmanager
50    def transaction(self) -> Iterator[Transaction]:
51        """交易上下文管理器"""
52        if self._current_tx is not None:
53            raise TransactionError("Nested transactions not supported")
54
55        self._tx_counter += 1
56        tx = Transaction(id=self._tx_counter)
57        self._current_tx = tx
58
59        try:
60            yield tx
61            if tx.state == TransactionState.ACTIVE:
62                tx.commit()
63        except Exception:
64            if tx.state == TransactionState.ACTIVE:
65                tx.rollback()
66            raise
67        finally:
68            self._current_tx = None
69
70    @property
71    def in_transaction(self) -> bool:
72        return self._current_tx is not None

整合使用範例

 1# 定義 Model
 2from dataclasses import dataclass, field
 3from datetime import datetime
 4
 5@dataclass
 6class User:
 7    id: int = 0
 8    name: str = ""
 9    email: str = ""
10    created_at: datetime = field(default_factory=datetime.now)
11
12# 定義具體的 Repository
13class UserRepository(InMemoryRepository[User]):
14    model_class = User
15
16# 使用
17def main():
18    # 建立連接
19    conn = Connection("memory://")
20
21    # 建立 Repository
22    users = UserRepository(conn)
23
24    # 使用交易
25    with conn.transaction() as tx:
26        # 建立使用者
27        user = User(name="Alice", email="alice@example.com")
28        users.save(user)
29
30        # 更多操作...
31        user.name = "Alice Smith"
32        users.save(user)
33
34    # 交易外的操作
35    try:
36        found = users.get_or_raise(999)
37    except EntityNotFoundError as e:
38        print(f"Error: {e.message}")
39
40    # 列出所有使用者
41    for user in users.find_all():
42        print(f"User: {user.name} ({user.email})")

案例二:任務排程器

建立一個支援並行執行的任務排程器。

設計概覽

1┌─────────────────────────────────────────────────────────┐
2│                    Task Scheduler                        │
3├─────────────────────────────────────────────────────────┤
4│  泛型        │ Task[T] - 型別安全的任務定義             │
5│  異常        │ ExceptionGroup - 並行錯誤處理            │
6│  上下文      │ TaskContext - 資源生命週期               │
7│  插件        │ TaskHandler - 可擴展的處理器             │
8└─────────────────────────────────────────────────────────┘

異常設計

 1# scheduler/exceptions.py
 2
 3class SchedulerError(Exception):
 4    """排程器基礎異常"""
 5    pass
 6
 7class TaskError(SchedulerError):
 8    """任務執行錯誤"""
 9
10    def __init__(self, task_id: str, message: str, cause: Exception | None = None) -> None:
11        super().__init__(f"Task '{task_id}' failed: {message}")
12        self.task_id = task_id
13        if cause:
14            self.__cause__ = cause
15
16class TimeoutError(TaskError):
17    """任務超時"""
18    pass
19
20class DependencyError(TaskError):
21    """依賴任務失敗"""
22
23    def __init__(self, task_id: str, failed_deps: list[str]) -> None:
24        super().__init__(task_id, f"Dependencies failed: {failed_deps}")
25        self.failed_deps = failed_deps

泛型任務定義

 1# scheduler/task.py
 2
 3from typing import TypeVar, Generic, Callable, Any
 4from dataclasses import dataclass, field
 5from enum import Enum, auto
 6from datetime import datetime
 7
 8T = TypeVar("T")
 9
10class TaskStatus(Enum):
11    PENDING = auto()
12    RUNNING = auto()
13    COMPLETED = auto()
14    FAILED = auto()
15    CANCELLED = auto()
16
17@dataclass
18class TaskResult(Generic[T]):
19    """任務執行結果"""
20    value: T | None = None
21    error: Exception | None = None
22    started_at: datetime | None = None
23    completed_at: datetime | None = None
24
25    @property
26    def success(self) -> bool:
27        return self.error is None
28
29    @property
30    def duration(self) -> float | None:
31        if self.started_at and self.completed_at:
32            return (self.completed_at - self.started_at).total_seconds()
33        return None
34
35@dataclass
36class Task(Generic[T]):
37    """泛型任務"""
38    id: str
39    handler: Callable[["TaskContext"], T]
40    dependencies: list[str] = field(default_factory=list)
41    timeout: float | None = None
42    status: TaskStatus = TaskStatus.PENDING
43    result: TaskResult[T] | None = None
44
45    def execute(self, ctx: "TaskContext") -> TaskResult[T]:
46        """執行任務"""
47        result = TaskResult[T](/python-advanced/03-design-patterns/integration/started_at=datetime.now())
48        self.status = TaskStatus.RUNNING
49
50        try:
51            value = self.handler(ctx)
52            result.value = value
53            self.status = TaskStatus.COMPLETED
54        except Exception as e:
55            result.error = e
56            self.status = TaskStatus.FAILED
57        finally:
58            result.completed_at = datetime.now()
59
60        self.result = result
61        return result

插件系統:任務處理器

 1# scheduler/handlers.py
 2
 3from typing import Any, ClassVar, Protocol
 4from abc import abstractmethod
 5
 6class TaskHandlerProtocol(Protocol):
 7    """任務處理器協議"""
 8
 9    def can_handle(self, task_type: str) -> bool:
10        ...
11
12    def handle(self, ctx: "TaskContext", payload: dict) -> Any:
13        ...
14
15class TaskHandler:
16    """任務處理器基類(基於註冊)"""
17    _registry: ClassVar[dict[str, "TaskHandler"]] = {}
18
19    task_type: ClassVar[str]
20
21    def __init_subclass__(cls, **kwargs) -> None:
22        super().__init_subclass__(**kwargs)
23        if hasattr(cls, "task_type"):
24            TaskHandler._registry[cls.task_type] = cls()
25
26    @abstractmethod
27    def handle(self, ctx: "TaskContext", payload: dict) -> Any:
28        """處理任務"""
29        ...
30
31    @classmethod
32    def get_handler(cls, task_type: str) -> "TaskHandler | None":
33        return cls._registry.get(task_type)
34
35# 具體的處理器
36class HttpRequestHandler(TaskHandler):
37    """HTTP 請求處理器"""
38    task_type = "http_request"
39
40    def handle(self, ctx: "TaskContext", payload: dict) -> dict:
41        import urllib.request
42        url = payload["url"]
43        method = payload.get("method", "GET")
44
45        # 簡化的實作
46        with urllib.request.urlopen(url) as response:
47            return {
48                "status": response.status,
49                "body": response.read().decode()
50            }
51
52class ShellCommandHandler(TaskHandler):
53    """Shell 命令處理器"""
54    task_type = "shell"
55
56    def handle(self, ctx: "TaskContext", payload: dict) -> dict:
57        import subprocess
58        command = payload["command"]
59        timeout = payload.get("timeout", 30)
60
61        result = subprocess.run(
62            command,
63            shell=True,
64            capture_output=True,
65            text=True,
66            timeout=timeout
67        )
68
69        return {
70            "returncode": result.returncode,
71            "stdout": result.stdout,
72            "stderr": result.stderr
73        }
74
75class DataProcessHandler(TaskHandler):
76    """資料處理器"""
77    task_type = "data_process"
78
79    def handle(self, ctx: "TaskContext", payload: dict) -> Any:
80        # 從上下文取得共享資源
81        data = ctx.get_resource("data")
82        operation = payload.get("operation", "identity")
83
84        if operation == "transform":
85            return [item * 2 for item in data]
86        elif operation == "filter":
87            return [item for item in data if item > 0]
88        else:
89            return data

上下文管理:任務上下文

 1# scheduler/context.py
 2
 3from contextlib import contextmanager, ExitStack
 4from typing import Any, Iterator
 5from dataclasses import dataclass, field
 6
 7@dataclass
 8class TaskContext:
 9    """任務執行上下文"""
10    task_id: str
11    _resources: dict[str, Any] = field(default_factory=dict)
12    _cleanup_callbacks: list = field(default_factory=list)
13
14    def set_resource(self, name: str, value: Any) -> None:
15        """設定共享資源"""
16        self._resources[name] = value
17
18    def get_resource(self, name: str) -> Any | None:
19        """取得共享資源"""
20        return self._resources.get(name)
21
22    def add_cleanup(self, callback) -> None:
23        """註冊清理回調"""
24        self._cleanup_callbacks.append(callback)
25
26    def cleanup(self) -> None:
27        """執行所有清理回調"""
28        errors = []
29        for callback in reversed(self._cleanup_callbacks):
30            try:
31                callback()
32            except Exception as e:
33                errors.append(e)
34
35        if errors:
36            raise ExceptionGroup("Cleanup failed", errors)
37
38@contextmanager
39def task_context(task_id: str) -> Iterator[TaskContext]:
40    """任務上下文管理器"""
41    ctx = TaskContext(task_id=task_id)
42
43    try:
44        yield ctx
45    finally:
46        ctx.cleanup()

排程器:整合所有模式

 1# scheduler/scheduler.py
 2
 3import asyncio
 4from typing import Any
 5from dataclasses import dataclass, field
 6
 7@dataclass
 8class Scheduler:
 9    """任務排程器"""
10    _tasks: dict[str, Task] = field(default_factory=dict)
11    _results: dict[str, TaskResult] = field(default_factory=dict)
12
13    def add_task(self, task: Task) -> None:
14        """新增任務"""
15        self._tasks[task.id] = task
16
17    def _get_execution_order(self) -> list[list[str]]:
18        """計算執行順序(拓撲排序)"""
19        # 簡化實作:按依賴深度分層
20        layers: list[list[str]] = []
21        remaining = set(self._tasks.keys())
22        completed: set[str] = set()
23
24        while remaining:
25            # 找出所有依賴已完成的任務
26            ready = [
27                tid for tid in remaining
28                if all(dep in completed for dep in self._tasks[tid].dependencies)
29            ]
30
31            if not ready:
32                # 有循環依賴
33                raise SchedulerError(f"Circular dependency detected: {remaining}")
34
35            layers.append(ready)
36            for tid in ready:
37                remaining.remove(tid)
38                completed.add(tid)
39
40        return layers
41
42    async def run_async(self) -> dict[str, TaskResult]:
43        """非同步執行所有任務"""
44        layers = self._get_execution_order()
45
46        for layer in layers:
47            # 同一層的任務可以並行執行
48            tasks_to_run = []
49
50            for task_id in layer:
51                task = self._tasks[task_id]
52
53                # 檢查依賴是否成功
54                failed_deps = [
55                    dep for dep in task.dependencies
56                    if dep in self._results and not self._results[dep].success
57                ]
58
59                if failed_deps:
60                    # 依賴失敗,跳過此任務
61                    result = TaskResult(
62                        error=DependencyError(task_id, failed_deps)
63                    )
64                    self._results[task_id] = result
65                    continue
66
67                tasks_to_run.append(self._run_task_async(task))
68
69            # 並行執行,收集所有錯誤
70            if tasks_to_run:
71                try:
72                    async with asyncio.TaskGroup() as tg:
73                        for coro in tasks_to_run:
74                            tg.create_task(coro)
75                except* TaskError as eg:
76                    # 記錄錯誤但繼續執行
77                    for e in eg.exceptions:
78                        print(f"Task failed: {e}")
79
80        return self._results
81
82    async def _run_task_async(self, task: Task) -> None:
83        """執行單個任務"""
84        with task_context(task.id) as ctx:
85            # 設定共享資源(從依賴任務的結果)
86            for dep_id in task.dependencies:
87                if dep_id in self._results:
88                    ctx.set_resource(f"result_{dep_id}", self._results[dep_id].value)
89
90            result = task.execute(ctx)
91            self._results[task.id] = result
92
93            if result.error:
94                raise TaskError(task.id, str(result.error), result.error)
95
96    def run(self) -> dict[str, TaskResult]:
97        """同步執行"""
98        return asyncio.run(self.run_async())

完整使用範例

 1# 建立排程器
 2scheduler = Scheduler()
 3
 4# 定義任務
 5def fetch_data(ctx: TaskContext) -> list[int]:
 6    """模擬取得資料"""
 7    return [1, 2, 3, 4, 5]
 8
 9def process_data(ctx: TaskContext) -> list[int]:
10    """處理資料"""
11    data = ctx.get_resource("result_fetch")
12    return [x * 2 for x in data]
13
14def save_result(ctx: TaskContext) -> str:
15    """儲存結果"""
16    data = ctx.get_resource("result_process")
17    return f"Saved {len(data)} items"
18
19# 新增任務(有依賴關係)
20scheduler.add_task(Task(
21    id="fetch",
22    handler=fetch_data
23))
24
25scheduler.add_task(Task(
26    id="process",
27    handler=process_data,
28    dependencies=["fetch"]
29))
30
31scheduler.add_task(Task(
32    id="save",
33    handler=save_result,
34    dependencies=["process"]
35))
36
37# 執行
38results = scheduler.run()
39
40# 檢查結果
41for task_id, result in results.items():
42    if result.success:
43        print(f"[OK] {task_id}: {result.value} ({result.duration:.3f}s)")
44    else:
45        print(f"[FAIL] {task_id}: {result.error}")

模式協作關係圖

 1┌─────────────────────────────────────────────────────────────────┐
 2│                         應用程式                                 │
 3├─────────────────────────────────────────────────────────────────┤
 4│                                                                  │
 5│   ┌─────────────┐     ┌─────────────┐     ┌─────────────┐       │
 6│   │   泛型      │────▶│   異常      │────▶│   上下文    │       │
 7│   │ Repository  │     │ 層級設計    │     │ Transaction │       │
 8│   │ Task[T]     │     │ 異常鏈      │     │ TaskContext │       │
 9│   └─────────────┘     └─────────────┘     └─────────────┘       │
10│          │                   │                   │               │
11│          │                   │                   │               │
12│          ▼                   ▼                   ▼               │
13│   ┌─────────────────────────────────────────────────────┐       │
14│   │                      插件系統                        │       │
15│   │  FieldType  │  TaskHandler  │  HookPlugin          │       │
16│   └─────────────────────────────────────────────────────┘       │
17│                                                                  │
18└─────────────────────────────────────────────────────────────────┘
19
20協作方式:
211. 泛型確保型別安全,在編譯期捕獲錯誤
222. 異常提供精確的錯誤處理,支援錯誤傳播和轉換
233. 上下文管理資源生命週期,確保正確清理
244. 插件系統提供擴展點,允許自訂行為

設計原則總結

模式解決的問題使用時機
泛型型別安全的重用容器、Repository、服務介面
異常層級精確的錯誤處理大型專案、API 設計
上下文管理資源生命週期連接、交易、臨時資源
插件系統可擴展性框架設計、開放式架構

思考題

  1. 如何在這些案例中加入日誌記錄?
  2. 如果要支援分散式執行,需要修改哪些部分?
  3. 如何為這些框架加入效能監控?

上一章:3.5.4 插件系統設計 回到模組目錄:模組 3.5:進階設計模式