3.5.5 設計模式整合案例
3.5.5 設計模式整合案例
本章透過兩個完整案例,展示如何將前四章的設計模式結合應用。每個案例都會說明各模式的協作關係。
先備知識
- 本模組 3.5.1-3.5.4 所有章節
案例一:迷你 ORM 框架
建立一個簡化的 ORM(Object-Relational Mapping)框架,展示各模式的整合。
設計概覽
1┌─────────────────────────────────────────────────────────┐
2│ MiniORM Framework │
3├─────────────────────────────────────────────────────────┤
4│ 泛型 │ Repository[T] - 型別安全的資料存取 │
5│ 異常 │ 階層式異常 - 精確的錯誤處理 │
6│ 上下文 │ Transaction - 交易管理 │
7│ 插件 │ FieldType - 可擴展的欄位型別 │
8└─────────────────────────────────────────────────────────┘異常層級設計
1# miniorm/exceptions.py
2
3class ORMError(Exception):
4 """ORM 框架的基礎異常"""
5
6 def __init__(self, message: str, code: str | None = None) -> None:
7 super().__init__(message)
8 self.message = message
9 self.code = code
10
11class ConnectionError(ORMError):
12 """資料庫連接錯誤"""
13 pass
14
15class QueryError(ORMError):
16 """查詢執行錯誤"""
17 pass
18
19class EntityNotFoundError(ORMError):
20 """實體不存在"""
21
22 def __init__(self, model: str, pk: int | str) -> None:
23 super().__init__(f"{model} with pk={pk} not found", code="NOT_FOUND")
24 self.model = model
25 self.pk = pk
26
27class ValidationError(ORMError):
28 """資料驗證錯誤"""
29
30 def __init__(self, field: str, message: str) -> None:
31 super().__init__(f"Validation failed for '{field}': {message}", code="VALIDATION")
32 self.field = field
33
34class TransactionError(ORMError):
35 """交易錯誤"""
36 pass插件系統:可擴展的欄位型別
1# miniorm/fields.py
2
3from abc import ABC, abstractmethod
4from typing import Any, ClassVar
5from datetime import datetime
6
7class FieldType(ABC):
8 """欄位型別插件基類"""
9 _registry: ClassVar[dict[str, type["FieldType"]]] = {}
10
11 # 子類別必須定義
12 type_name: ClassVar[str]
13 python_type: ClassVar[type]
14
15 def __init_subclass__(cls, **kwargs) -> None:
16 super().__init_subclass__(**kwargs)
17 if hasattr(cls, "type_name"):
18 FieldType._registry[cls.type_name] = cls
19
20 @abstractmethod
21 def to_db(self, value: Any) -> Any:
22 """Python 值轉換為資料庫值"""
23 ...
24
25 @abstractmethod
26 def from_db(self, value: Any) -> Any:
27 """資料庫值轉換為 Python 值"""
28 ...
29
30 @abstractmethod
31 def validate(self, value: Any) -> None:
32 """驗證值,失敗時拋出 ValidationError"""
33 ...
34
35 @classmethod
36 def get_type(cls, name: str) -> type["FieldType"] | None:
37 return cls._registry.get(name)
38
39# 內建欄位型別
40class IntegerField(FieldType):
41 type_name = "integer"
42 python_type = int
43
44 def to_db(self, value: Any) -> int:
45 return int(value)
46
47 def from_db(self, value: Any) -> int:
48 return int(value) if value is not None else 0
49
50 def validate(self, value: Any) -> None:
51 if value is not None and not isinstance(value, int):
52 raise ValidationError("value", f"Expected int, got {type(value).__name__}")
53
54class StringField(FieldType):
55 type_name = "string"
56 python_type = str
57
58 def __init__(self, max_length: int = 255) -> None:
59 self.max_length = max_length
60
61 def to_db(self, value: Any) -> str:
62 return str(value)
63
64 def from_db(self, value: Any) -> str:
65 return str(value) if value is not None else ""
66
67 def validate(self, value: Any) -> None:
68 if value is not None:
69 if not isinstance(value, str):
70 raise ValidationError("value", f"Expected str, got {type(value).__name__}")
71 if len(value) > self.max_length:
72 raise ValidationError("value", f"Max length is {self.max_length}")
73
74class DateTimeField(FieldType):
75 type_name = "datetime"
76 python_type = datetime
77
78 def to_db(self, value: Any) -> str:
79 if isinstance(value, datetime):
80 return value.isoformat()
81 return str(value)
82
83 def from_db(self, value: Any) -> datetime:
84 if isinstance(value, datetime):
85 return value
86 return datetime.fromisoformat(str(value))
87
88 def validate(self, value: Any) -> None:
89 if value is not None and not isinstance(value, datetime):
90 raise ValidationError("value", f"Expected datetime, got {type(value).__name__}")
91
92# 使用者可以擴展新的欄位型別
93class JSONField(FieldType):
94 """自訂欄位型別範例"""
95 type_name = "json"
96 python_type = dict
97
98 def to_db(self, value: Any) -> str:
99 import json
100 return json.dumps(value)
101
102 def from_db(self, value: Any) -> dict:
103 import json
104 if isinstance(value, dict):
105 return value
106 return json.loads(str(value))
107
108 def validate(self, value: Any) -> None:
109 if value is not None and not isinstance(value, (dict, list)):
110 raise ValidationError("value", "Expected dict or list")泛型 Repository
1# miniorm/repository.py
2
3from typing import TypeVar, Generic, Protocol, ClassVar
4from abc import abstractmethod
5
6class HasId(Protocol):
7 """具有 id 屬性的協議"""
8 id: int
9
10T = TypeVar("T", bound=HasId)
11
12class Repository(Generic[T]):
13 """泛型 Repository 基類"""
14
15 model_class: ClassVar[type]
16
17 def __init__(self, connection: "Connection") -> None:
18 self._connection = connection
19
20 @abstractmethod
21 def get(self, pk: int) -> T | None:
22 """根據主鍵取得實體"""
23 ...
24
25 @abstractmethod
26 def save(self, entity: T) -> T:
27 """儲存實體"""
28 ...
29
30 @abstractmethod
31 def delete(self, pk: int) -> bool:
32 """刪除實體"""
33 ...
34
35 @abstractmethod
36 def find_all(self) -> list[T]:
37 """取得所有實體"""
38 ...
39
40 def get_or_raise(self, pk: int) -> T:
41 """取得實體,不存在時拋出異常"""
42 entity = self.get(pk)
43 if entity is None:
44 raise EntityNotFoundError(self.model_class.__name__, pk)
45 return entity
46
47class InMemoryRepository(Repository[T]):
48 """記憶體實作的 Repository"""
49
50 def __init__(self, connection: "Connection") -> None:
51 super().__init__(connection)
52 self._storage: dict[int, T] = {}
53 self._next_id = 1
54
55 def get(self, pk: int) -> T | None:
56 return self._storage.get(pk)
57
58 def save(self, entity: T) -> T:
59 # 驗證欄位
60 self._validate_entity(entity)
61
62 if entity.id == 0:
63 entity.id = self._next_id
64 self._next_id += 1
65 self._storage[entity.id] = entity
66 return entity
67
68 def delete(self, pk: int) -> bool:
69 if pk in self._storage:
70 del self._storage[pk]
71 return True
72 return False
73
74 def find_all(self) -> list[T]:
75 return list(self._storage.values())
76
77 def _validate_entity(self, entity: T) -> None:
78 """驗證實體的所有欄位"""
79 # 這裡可以整合 FieldType 的驗證邏輯
80 pass上下文管理:交易
1# miniorm/transaction.py
2
3from contextlib import contextmanager
4from typing import Iterator
5from dataclasses import dataclass
6from enum import Enum, auto
7
8class TransactionState(Enum):
9 ACTIVE = auto()
10 COMMITTED = auto()
11 ROLLED_BACK = auto()
12
13@dataclass
14class Transaction:
15 """交易物件"""
16 id: int
17 state: TransactionState = TransactionState.ACTIVE
18 _operations: list = None
19
20 def __post_init__(self):
21 self._operations = []
22
23 def add_operation(self, op: dict) -> None:
24 if self.state != TransactionState.ACTIVE:
25 raise TransactionError("Transaction is not active")
26 self._operations.append(op)
27
28 def commit(self) -> None:
29 if self.state != TransactionState.ACTIVE:
30 raise TransactionError("Transaction is not active")
31 # 實際應用中這裡會提交到資料庫
32 self.state = TransactionState.COMMITTED
33
34 def rollback(self) -> None:
35 if self.state == TransactionState.COMMITTED:
36 raise TransactionError("Cannot rollback committed transaction")
37 # 實際應用中這裡會回滾操作
38 self._operations.clear()
39 self.state = TransactionState.ROLLED_BACK
40
41class Connection:
42 """資料庫連接"""
43
44 def __init__(self, url: str) -> None:
45 self.url = url
46 self._tx_counter = 0
47 self._current_tx: Transaction | None = None
48
49 @contextmanager
50 def transaction(self) -> Iterator[Transaction]:
51 """交易上下文管理器"""
52 if self._current_tx is not None:
53 raise TransactionError("Nested transactions not supported")
54
55 self._tx_counter += 1
56 tx = Transaction(id=self._tx_counter)
57 self._current_tx = tx
58
59 try:
60 yield tx
61 if tx.state == TransactionState.ACTIVE:
62 tx.commit()
63 except Exception:
64 if tx.state == TransactionState.ACTIVE:
65 tx.rollback()
66 raise
67 finally:
68 self._current_tx = None
69
70 @property
71 def in_transaction(self) -> bool:
72 return self._current_tx is not None整合使用範例
1# 定義 Model
2from dataclasses import dataclass, field
3from datetime import datetime
4
5@dataclass
6class User:
7 id: int = 0
8 name: str = ""
9 email: str = ""
10 created_at: datetime = field(default_factory=datetime.now)
11
12# 定義具體的 Repository
13class UserRepository(InMemoryRepository[User]):
14 model_class = User
15
16# 使用
17def main():
18 # 建立連接
19 conn = Connection("memory://")
20
21 # 建立 Repository
22 users = UserRepository(conn)
23
24 # 使用交易
25 with conn.transaction() as tx:
26 # 建立使用者
27 user = User(name="Alice", email="alice@example.com")
28 users.save(user)
29
30 # 更多操作...
31 user.name = "Alice Smith"
32 users.save(user)
33
34 # 交易外的操作
35 try:
36 found = users.get_or_raise(999)
37 except EntityNotFoundError as e:
38 print(f"Error: {e.message}")
39
40 # 列出所有使用者
41 for user in users.find_all():
42 print(f"User: {user.name} ({user.email})")案例二:任務排程器
建立一個支援並行執行的任務排程器。
設計概覽
1┌─────────────────────────────────────────────────────────┐
2│ Task Scheduler │
3├─────────────────────────────────────────────────────────┤
4│ 泛型 │ Task[T] - 型別安全的任務定義 │
5│ 異常 │ ExceptionGroup - 並行錯誤處理 │
6│ 上下文 │ TaskContext - 資源生命週期 │
7│ 插件 │ TaskHandler - 可擴展的處理器 │
8└─────────────────────────────────────────────────────────┘異常設計
1# scheduler/exceptions.py
2
3class SchedulerError(Exception):
4 """排程器基礎異常"""
5 pass
6
7class TaskError(SchedulerError):
8 """任務執行錯誤"""
9
10 def __init__(self, task_id: str, message: str, cause: Exception | None = None) -> None:
11 super().__init__(f"Task '{task_id}' failed: {message}")
12 self.task_id = task_id
13 if cause:
14 self.__cause__ = cause
15
16class TimeoutError(TaskError):
17 """任務超時"""
18 pass
19
20class DependencyError(TaskError):
21 """依賴任務失敗"""
22
23 def __init__(self, task_id: str, failed_deps: list[str]) -> None:
24 super().__init__(task_id, f"Dependencies failed: {failed_deps}")
25 self.failed_deps = failed_deps泛型任務定義
1# scheduler/task.py
2
3from typing import TypeVar, Generic, Callable, Any
4from dataclasses import dataclass, field
5from enum import Enum, auto
6from datetime import datetime
7
8T = TypeVar("T")
9
10class TaskStatus(Enum):
11 PENDING = auto()
12 RUNNING = auto()
13 COMPLETED = auto()
14 FAILED = auto()
15 CANCELLED = auto()
16
17@dataclass
18class TaskResult(Generic[T]):
19 """任務執行結果"""
20 value: T | None = None
21 error: Exception | None = None
22 started_at: datetime | None = None
23 completed_at: datetime | None = None
24
25 @property
26 def success(self) -> bool:
27 return self.error is None
28
29 @property
30 def duration(self) -> float | None:
31 if self.started_at and self.completed_at:
32 return (self.completed_at - self.started_at).total_seconds()
33 return None
34
35@dataclass
36class Task(Generic[T]):
37 """泛型任務"""
38 id: str
39 handler: Callable[["TaskContext"], T]
40 dependencies: list[str] = field(default_factory=list)
41 timeout: float | None = None
42 status: TaskStatus = TaskStatus.PENDING
43 result: TaskResult[T] | None = None
44
45 def execute(self, ctx: "TaskContext") -> TaskResult[T]:
46 """執行任務"""
47 result = TaskResult[T](/python-advanced/03-design-patterns/integration/started_at=datetime.now())
48 self.status = TaskStatus.RUNNING
49
50 try:
51 value = self.handler(ctx)
52 result.value = value
53 self.status = TaskStatus.COMPLETED
54 except Exception as e:
55 result.error = e
56 self.status = TaskStatus.FAILED
57 finally:
58 result.completed_at = datetime.now()
59
60 self.result = result
61 return result插件系統:任務處理器
1# scheduler/handlers.py
2
3from typing import Any, ClassVar, Protocol
4from abc import abstractmethod
5
6class TaskHandlerProtocol(Protocol):
7 """任務處理器協議"""
8
9 def can_handle(self, task_type: str) -> bool:
10 ...
11
12 def handle(self, ctx: "TaskContext", payload: dict) -> Any:
13 ...
14
15class TaskHandler:
16 """任務處理器基類(基於註冊)"""
17 _registry: ClassVar[dict[str, "TaskHandler"]] = {}
18
19 task_type: ClassVar[str]
20
21 def __init_subclass__(cls, **kwargs) -> None:
22 super().__init_subclass__(**kwargs)
23 if hasattr(cls, "task_type"):
24 TaskHandler._registry[cls.task_type] = cls()
25
26 @abstractmethod
27 def handle(self, ctx: "TaskContext", payload: dict) -> Any:
28 """處理任務"""
29 ...
30
31 @classmethod
32 def get_handler(cls, task_type: str) -> "TaskHandler | None":
33 return cls._registry.get(task_type)
34
35# 具體的處理器
36class HttpRequestHandler(TaskHandler):
37 """HTTP 請求處理器"""
38 task_type = "http_request"
39
40 def handle(self, ctx: "TaskContext", payload: dict) -> dict:
41 import urllib.request
42 url = payload["url"]
43 method = payload.get("method", "GET")
44
45 # 簡化的實作
46 with urllib.request.urlopen(url) as response:
47 return {
48 "status": response.status,
49 "body": response.read().decode()
50 }
51
52class ShellCommandHandler(TaskHandler):
53 """Shell 命令處理器"""
54 task_type = "shell"
55
56 def handle(self, ctx: "TaskContext", payload: dict) -> dict:
57 import subprocess
58 command = payload["command"]
59 timeout = payload.get("timeout", 30)
60
61 result = subprocess.run(
62 command,
63 shell=True,
64 capture_output=True,
65 text=True,
66 timeout=timeout
67 )
68
69 return {
70 "returncode": result.returncode,
71 "stdout": result.stdout,
72 "stderr": result.stderr
73 }
74
75class DataProcessHandler(TaskHandler):
76 """資料處理器"""
77 task_type = "data_process"
78
79 def handle(self, ctx: "TaskContext", payload: dict) -> Any:
80 # 從上下文取得共享資源
81 data = ctx.get_resource("data")
82 operation = payload.get("operation", "identity")
83
84 if operation == "transform":
85 return [item * 2 for item in data]
86 elif operation == "filter":
87 return [item for item in data if item > 0]
88 else:
89 return data上下文管理:任務上下文
1# scheduler/context.py
2
3from contextlib import contextmanager, ExitStack
4from typing import Any, Iterator
5from dataclasses import dataclass, field
6
7@dataclass
8class TaskContext:
9 """任務執行上下文"""
10 task_id: str
11 _resources: dict[str, Any] = field(default_factory=dict)
12 _cleanup_callbacks: list = field(default_factory=list)
13
14 def set_resource(self, name: str, value: Any) -> None:
15 """設定共享資源"""
16 self._resources[name] = value
17
18 def get_resource(self, name: str) -> Any | None:
19 """取得共享資源"""
20 return self._resources.get(name)
21
22 def add_cleanup(self, callback) -> None:
23 """註冊清理回調"""
24 self._cleanup_callbacks.append(callback)
25
26 def cleanup(self) -> None:
27 """執行所有清理回調"""
28 errors = []
29 for callback in reversed(self._cleanup_callbacks):
30 try:
31 callback()
32 except Exception as e:
33 errors.append(e)
34
35 if errors:
36 raise ExceptionGroup("Cleanup failed", errors)
37
38@contextmanager
39def task_context(task_id: str) -> Iterator[TaskContext]:
40 """任務上下文管理器"""
41 ctx = TaskContext(task_id=task_id)
42
43 try:
44 yield ctx
45 finally:
46 ctx.cleanup()排程器:整合所有模式
1# scheduler/scheduler.py
2
3import asyncio
4from typing import Any
5from dataclasses import dataclass, field
6
7@dataclass
8class Scheduler:
9 """任務排程器"""
10 _tasks: dict[str, Task] = field(default_factory=dict)
11 _results: dict[str, TaskResult] = field(default_factory=dict)
12
13 def add_task(self, task: Task) -> None:
14 """新增任務"""
15 self._tasks[task.id] = task
16
17 def _get_execution_order(self) -> list[list[str]]:
18 """計算執行順序(拓撲排序)"""
19 # 簡化實作:按依賴深度分層
20 layers: list[list[str]] = []
21 remaining = set(self._tasks.keys())
22 completed: set[str] = set()
23
24 while remaining:
25 # 找出所有依賴已完成的任務
26 ready = [
27 tid for tid in remaining
28 if all(dep in completed for dep in self._tasks[tid].dependencies)
29 ]
30
31 if not ready:
32 # 有循環依賴
33 raise SchedulerError(f"Circular dependency detected: {remaining}")
34
35 layers.append(ready)
36 for tid in ready:
37 remaining.remove(tid)
38 completed.add(tid)
39
40 return layers
41
42 async def run_async(self) -> dict[str, TaskResult]:
43 """非同步執行所有任務"""
44 layers = self._get_execution_order()
45
46 for layer in layers:
47 # 同一層的任務可以並行執行
48 tasks_to_run = []
49
50 for task_id in layer:
51 task = self._tasks[task_id]
52
53 # 檢查依賴是否成功
54 failed_deps = [
55 dep for dep in task.dependencies
56 if dep in self._results and not self._results[dep].success
57 ]
58
59 if failed_deps:
60 # 依賴失敗,跳過此任務
61 result = TaskResult(
62 error=DependencyError(task_id, failed_deps)
63 )
64 self._results[task_id] = result
65 continue
66
67 tasks_to_run.append(self._run_task_async(task))
68
69 # 並行執行,收集所有錯誤
70 if tasks_to_run:
71 try:
72 async with asyncio.TaskGroup() as tg:
73 for coro in tasks_to_run:
74 tg.create_task(coro)
75 except* TaskError as eg:
76 # 記錄錯誤但繼續執行
77 for e in eg.exceptions:
78 print(f"Task failed: {e}")
79
80 return self._results
81
82 async def _run_task_async(self, task: Task) -> None:
83 """執行單個任務"""
84 with task_context(task.id) as ctx:
85 # 設定共享資源(從依賴任務的結果)
86 for dep_id in task.dependencies:
87 if dep_id in self._results:
88 ctx.set_resource(f"result_{dep_id}", self._results[dep_id].value)
89
90 result = task.execute(ctx)
91 self._results[task.id] = result
92
93 if result.error:
94 raise TaskError(task.id, str(result.error), result.error)
95
96 def run(self) -> dict[str, TaskResult]:
97 """同步執行"""
98 return asyncio.run(self.run_async())完整使用範例
1# 建立排程器
2scheduler = Scheduler()
3
4# 定義任務
5def fetch_data(ctx: TaskContext) -> list[int]:
6 """模擬取得資料"""
7 return [1, 2, 3, 4, 5]
8
9def process_data(ctx: TaskContext) -> list[int]:
10 """處理資料"""
11 data = ctx.get_resource("result_fetch")
12 return [x * 2 for x in data]
13
14def save_result(ctx: TaskContext) -> str:
15 """儲存結果"""
16 data = ctx.get_resource("result_process")
17 return f"Saved {len(data)} items"
18
19# 新增任務(有依賴關係)
20scheduler.add_task(Task(
21 id="fetch",
22 handler=fetch_data
23))
24
25scheduler.add_task(Task(
26 id="process",
27 handler=process_data,
28 dependencies=["fetch"]
29))
30
31scheduler.add_task(Task(
32 id="save",
33 handler=save_result,
34 dependencies=["process"]
35))
36
37# 執行
38results = scheduler.run()
39
40# 檢查結果
41for task_id, result in results.items():
42 if result.success:
43 print(f"[OK] {task_id}: {result.value} ({result.duration:.3f}s)")
44 else:
45 print(f"[FAIL] {task_id}: {result.error}")模式協作關係圖
1┌─────────────────────────────────────────────────────────────────┐
2│ 應用程式 │
3├─────────────────────────────────────────────────────────────────┤
4│ │
5│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
6│ │ 泛型 │────▶│ 異常 │────▶│ 上下文 │ │
7│ │ Repository │ │ 層級設計 │ │ Transaction │ │
8│ │ Task[T] │ │ 異常鏈 │ │ TaskContext │ │
9│ └─────────────┘ └─────────────┘ └─────────────┘ │
10│ │ │ │ │
11│ │ │ │ │
12│ ▼ ▼ ▼ │
13│ ┌─────────────────────────────────────────────────────┐ │
14│ │ 插件系統 │ │
15│ │ FieldType │ TaskHandler │ HookPlugin │ │
16│ └─────────────────────────────────────────────────────┘ │
17│ │
18└─────────────────────────────────────────────────────────────────┘
19
20協作方式:
211. 泛型確保型別安全,在編譯期捕獲錯誤
222. 異常提供精確的錯誤處理,支援錯誤傳播和轉換
233. 上下文管理資源生命週期,確保正確清理
244. 插件系統提供擴展點,允許自訂行為設計原則總結
| 模式 | 解決的問題 | 使用時機 |
|---|---|---|
| 泛型 | 型別安全的重用 | 容器、Repository、服務介面 |
| 異常層級 | 精確的錯誤處理 | 大型專案、API 設計 |
| 上下文管理 | 資源生命週期 | 連接、交易、臨時資源 |
| 插件系統 | 可擴展性 | 框架設計、開放式架構 |
思考題
- 如何在這些案例中加入日誌記錄?
- 如果要支援分散式執行,需要修改哪些部分?
- 如何為這些框架加入效能監控?
上一章:3.5.4 插件系統設計 回到模組目錄:模組 3.5:進階設計模式