本案例基於 .claude/lib/hook_validator.py 的實際程式碼,展示如何用 Protocol 和註冊機制實現可擴展的插件系統。

先備知識

問題背景

現有設計

hook_validator.py 的驗證邏輯直接寫在類別中:

 1class HookValidator:
 2    """Hook 合規性驗證器"""
 3
 4    def validate_hook(self, hook_path: str) -> ValidationResult:
 5        """驗證單個 Hook 檔案"""
 6        # ... 讀取檔案 ...
 7
 8        # 執行各項檢查 - 硬編碼的驗證規則
 9        issues = []
10        issues.extend(self.check_naming_convention(hook_path))
11        issues.extend(self.check_lib_imports(content, hook_path))
12        issues.extend(self.check_output_format(content))
13        issues.extend(self.check_test_exists(hook_path))
14
15        return ValidationResult(hook_path=str(hook_path), issues=issues)
16
17    def check_naming_convention(self, hook_path: Path) -> List[ValidationIssue]:
18        """檢查命名規範"""
19        # ... 具體驗證邏輯 ...
20
21    def check_lib_imports(self, content: str, hook_path: Path) -> List[ValidationIssue]:
22        """檢查共用模組導入"""
23        # ... 具體驗證邏輯 ...
24
25    def check_output_format(self, content: str) -> List[ValidationIssue]:
26        """檢查輸出格式"""
27        # ... 具體驗證邏輯 ...
28
29    def check_test_exists(self, hook_path: Path) -> List[ValidationIssue]:
30        """檢查測試檔案是否存在"""
31        # ... 具體驗證邏輯 ...

這個設計的優點

  • 所有邏輯集中管理,容易找到相關程式碼
  • 沒有額外的抽象層,直接明瞭
  • 對小型專案來說足夠使用

這個設計的限制

當需要擴展時:

  • 第三方無法新增自己的檢查項:想加入「Docstring 檢查」必須修改 HookValidator
  • 修改需要改動核心程式碼:每新增一個檢查都要改 validate_hook 方法
  • 違反開放封閉原則:對擴展不開放,對修改不封閉

進階解決方案

設計目標

  1. 定義清晰的插件介面:用 Protocol 描述檢查項該有的行為
  2. 支援第三方擴展:外部套件可以新增檢查項
  3. 插件可以獨立測試:每個檢查項是獨立的單元
  4. 支援插件的啟用/停用:可以動態控制要執行哪些檢查

實作步驟

步驟 1:用 Protocol 定義插件介面

Protocol 讓我們定義「檢查項該有的行為」,而不強制繼承關係:

 1from dataclasses import dataclass, field
 2from pathlib import Path
 3from typing import Protocol, Optional, List, runtime_checkable
 4
 5@dataclass
 6class ValidationIssue:
 7    """Validation issue description"""
 8    level: str  # "error" | "warning" | "info"
 9    message: str
10    line: Optional[int] = None
11    suggestion: Optional[str] = None
12
13@dataclass
14class CheckContext:
15    """Context passed to each check"""
16    hook_path: Path
17    content: str
18    project_root: Path
19
20@runtime_checkable
21class ValidationCheck(Protocol):
22    """
23    Protocol for validation checks
24
25    Any class implementing this protocol can be used as a validation check.
26    No inheritance required - just implement the methods.
27    """
28
29    @property
30    def name(self) -> str:
31        """Check name for identification"""
32        ...
33
34    @property
35    def description(self) -> str:
36        """Human-readable description"""
37        ...
38
39    def check(self, context: CheckContext) -> List[ValidationIssue]:
40        """
41        Execute the validation check
42
43        Args:
44            context: Check context with file info
45
46        Returns:
47            List of validation issues found
48        """
49        ...
為什麼用 Protocol 而不是 ABC?
  • Protocol 是「鴨子型別」的靜態版本
  • 不強制繼承,現有類別只要有對應方法就能用
  • @runtime_checkable 讓我們可以用 isinstance() 檢查

步驟 2:實作插件註冊機制

註冊機制提供兩種方式:裝飾器和明確註冊:

 1from typing import Type, Dict, Callable, TypeVar
 2
 3# Type variable for check classes
 4C = TypeVar("C", bound=ValidationCheck)
 5
 6class CheckRegistry:
 7    """
 8    Registry for validation checks
 9
10    Supports both decorator registration and explicit registration.
11    """
12
13    def __init__(self) -> None:
14        self._checks: Dict[str, ValidationCheck] = {}
15        self._disabled: set[str] = set()
16
17    def register(self, check: ValidationCheck) -> ValidationCheck:
18        """
19        Register a check instance
20
21        Args:
22            check: Check instance to register
23
24        Returns:
25            The same check instance (for chaining)
26
27        Example:
28            registry.register(NamingCheck())
29        """
30        if not isinstance(check, ValidationCheck):
31            raise TypeError(
32                f"Check must implement ValidationCheck protocol, "
33                f"got {type(check).__name__}"
34            )
35        self._checks[check.name] = check
36        return check
37
38    def check(self, name: Optional[str] = None) -> Callable[[Type[C]], Type[C]]:
39        """
40        Decorator for registering check classes
41
42        Args:
43            name: Optional custom name (default: class name)
44
45        Returns:
46            Class decorator
47
48        Example:
49            @registry.check()
50            class MyCheck:
51                ...
52        """
53        def decorator(cls: Type[C]) -> Type[C]:
54            instance = cls()
55            self._checks[name or instance.name] = instance
56            return cls
57        return decorator
58
59    def get_check(self, name: str) -> Optional[ValidationCheck]:
60        """Get a check by name"""
61        return self._checks.get(name)
62
63    def get_enabled_checks(self) -> List[ValidationCheck]:
64        """Get all enabled checks"""
65        return [
66            check for name, check in self._checks.items()
67            if name not in self._disabled
68        ]
69
70    def enable(self, name: str) -> None:
71        """Enable a check"""
72        self._disabled.discard(name)
73
74    def disable(self, name: str) -> None:
75        """Disable a check"""
76        self._disabled.add(name)
77
78    def list_checks(self) -> List[str]:
79        """List all registered check names"""
80        return list(self._checks.keys())
81
82# Global registry instance
83default_registry = CheckRegistry()

兩種註冊方式的比較

方式使用時機優點缺點
裝飾器 @registry.check()類別定義時簡潔、宣告式模組載入時就註冊
明確註冊 registry.register()執行時更靈活、可延遲較囉嗦

步驟 3:支援 entry_points 自動發現

entry_points 是 Python 打包系統的標準機制,讓外部套件可以註冊插件:

 1import sys
 2from importlib.metadata import entry_points
 3from typing import Iterator
 4
 5def discover_checks(
 6    group: str = "hook_validator.checks"
 7) -> Iterator[ValidationCheck]:
 8    """
 9    Discover checks from installed packages via entry_points
10
11    Args:
12        group: Entry point group name
13
14    Yields:
15        Discovered check instances
16
17    Example:
18        for check in discover_checks():
19            registry.register(check)
20    """
21    # Python 3.10+ API
22    if sys.version_info >= (3, 10):
23        eps = entry_points(group=group)
24    else:
25        # Python 3.9 compatibility
26        eps = entry_points().get(group, [])
27
28    for ep in eps:
29        try:
30            # Load the check class/factory
31            check_factory = ep.load()
32
33            # Create instance
34            if callable(check_factory):
35                check = check_factory()
36            else:
37                check = check_factory
38
39            # Validate it implements the protocol
40            if isinstance(check, ValidationCheck):
41                yield check
42            else:
43                import warnings
44                warnings.warn(
45                    f"Entry point {ep.name} does not implement "
46                    f"ValidationCheck protocol"
47                )
48
49        except Exception as e:
50            import warnings
51            warnings.warn(f"Failed to load check {ep.name}: {e}")
52
53def load_external_checks(registry: CheckRegistry) -> int:
54    """
55    Load all external checks into a registry
56
57    Args:
58        registry: Target registry
59
60    Returns:
61        Number of checks loaded
62    """
63    count = 0
64    for check in discover_checks():
65        registry.register(check)
66        count += 1
67    return count

步驟 4:實作插件載入與管理

插件管理器整合了註冊、發現和執行:

  1from dataclasses import dataclass, field
  2from pathlib import Path
  3from typing import Optional, List
  4
  5@dataclass
  6class ValidationResult:
  7    """Validation result for a single hook"""
  8    hook_path: str
  9    issues: List[ValidationIssue] = field(default_factory=list)
 10    is_compliant: bool = True
 11    checks_run: List[str] = field(default_factory=list)
 12
 13    def __post_init__(self) -> None:
 14        """Calculate compliance status"""
 15        self.is_compliant = not any(
 16            issue.level == "error" for issue in self.issues
 17        )
 18
 19class PluginValidator:
 20    """
 21    Plugin-based hook validator
 22
 23    Uses registered checks to validate hook files.
 24    """
 25
 26    def __init__(
 27        self,
 28        registry: Optional[CheckRegistry] = None,
 29        project_root: Optional[str] = None,
 30        auto_discover: bool = True
 31    ) -> None:
 32        """
 33        Initialize validator
 34
 35        Args:
 36            registry: Check registry (default: global registry)
 37            project_root: Project root directory
 38            auto_discover: Whether to auto-discover external checks
 39        """
 40        self.registry = registry or default_registry
 41        self.project_root = Path(
 42            project_root or os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd())
 43        )
 44
 45        if auto_discover:
 46            load_external_checks(self.registry)
 47
 48    def validate_hook(self, hook_path: str) -> ValidationResult:
 49        """
 50        Validate a single hook file
 51
 52        Args:
 53            hook_path: Path to hook file
 54
 55        Returns:
 56            Validation result
 57        """
 58        path = self._resolve_path(hook_path)
 59
 60        # Check file exists
 61        if not path.exists():
 62            return ValidationResult(
 63                hook_path=str(path),
 64                issues=[
 65                    ValidationIssue(
 66                        level="error",
 67                        message=f"Hook file not found: {path}"
 68                    )
 69                ]
 70            )
 71
 72        # Read content
 73        try:
 74            content = path.read_text(encoding="utf-8")
 75        except Exception as e:
 76            return ValidationResult(
 77                hook_path=str(path),
 78                issues=[
 79                    ValidationIssue(
 80                        level="error",
 81                        message=f"Cannot read hook file: {e}"
 82                    )
 83                ]
 84            )
 85
 86        # Build context
 87        context = CheckContext(
 88            hook_path=path,
 89            content=content,
 90            project_root=self.project_root
 91        )
 92
 93        # Run all enabled checks
 94        all_issues: List[ValidationIssue] = []
 95        checks_run: List[str] = []
 96
 97        for check in self.registry.get_enabled_checks():
 98            try:
 99                issues = check.check(context)
100                all_issues.extend(issues)
101                checks_run.append(check.name)
102            except Exception as e:
103                all_issues.append(
104                    ValidationIssue(
105                        level="error",
106                        message=f"Check {check.name} failed: {e}"
107                    )
108                )
109
110        return ValidationResult(
111            hook_path=str(path),
112            issues=all_issues,
113            checks_run=checks_run
114        )
115
116    def _resolve_path(self, path: str) -> Path:
117        """Resolve path to absolute"""
118        p = Path(path)
119        if p.is_absolute():
120            return p
121        return self.project_root / p

完整程式碼

  1#!/usr/bin/env python3
  2"""
  3Plugin-based Hook Validator
  4
  5A validation system using Protocol and registry pattern,
  6allowing third-party extensions via entry_points.
  7"""
  8
  9from __future__ import annotations
 10
 11import os
 12import re
 13import sys
 14import warnings
 15from dataclasses import dataclass, field
 16from importlib.metadata import entry_points
 17from pathlib import Path
 18from typing import (
 19    Callable,
 20    Dict,
 21    Iterator,
 22    List,
 23    Optional,
 24    Protocol,
 25    Type,
 26    TypeVar,
 27    runtime_checkable,
 28)
 29
 30# ============================================================
 31# Data Classes
 32# ============================================================
 33
 34@dataclass
 35class ValidationIssue:
 36    """Validation issue description"""
 37    level: str  # "error" | "warning" | "info"
 38    message: str
 39    line: Optional[int] = None
 40    suggestion: Optional[str] = None
 41
 42@dataclass
 43class CheckContext:
 44    """Context passed to each check"""
 45    hook_path: Path
 46    content: str
 47    project_root: Path
 48
 49@dataclass
 50class ValidationResult:
 51    """Validation result for a single hook"""
 52    hook_path: str
 53    issues: List[ValidationIssue] = field(default_factory=list)
 54    is_compliant: bool = True
 55    checks_run: List[str] = field(default_factory=list)
 56
 57    def __post_init__(self) -> None:
 58        self.is_compliant = not any(
 59            issue.level == "error" for issue in self.issues
 60        )
 61
 62# ============================================================
 63# Protocol Definition
 64# ============================================================
 65
 66@runtime_checkable
 67class ValidationCheck(Protocol):
 68    """Protocol for validation checks"""
 69
 70    @property
 71    def name(self) -> str:
 72        """Check name for identification"""
 73        ...
 74
 75    @property
 76    def description(self) -> str:
 77        """Human-readable description"""
 78        ...
 79
 80    def check(self, context: CheckContext) -> List[ValidationIssue]:
 81        """Execute the validation check"""
 82        ...
 83
 84# ============================================================
 85# Registry
 86# ============================================================
 87
 88C = TypeVar("C", bound=ValidationCheck)
 89
 90class CheckRegistry:
 91    """Registry for validation checks"""
 92
 93    def __init__(self) -> None:
 94        self._checks: Dict[str, ValidationCheck] = {}
 95        self._disabled: set[str] = set()
 96
 97    def register(self, check: ValidationCheck) -> ValidationCheck:
 98        """Register a check instance"""
 99        if not isinstance(check, ValidationCheck):
100            raise TypeError(
101                f"Check must implement ValidationCheck protocol, "
102                f"got {type(check).__name__}"
103            )
104        self._checks[check.name] = check
105        return check
106
107    def check(self, name: Optional[str] = None) -> Callable[[Type[C]], Type[C]]:
108        """Decorator for registering check classes"""
109        def decorator(cls: Type[C]) -> Type[C]:
110            instance = cls()
111            self._checks[name or instance.name] = instance
112            return cls
113        return decorator
114
115    def get_check(self, name: str) -> Optional[ValidationCheck]:
116        """Get a check by name"""
117        return self._checks.get(name)
118
119    def get_enabled_checks(self) -> List[ValidationCheck]:
120        """Get all enabled checks"""
121        return [
122            check for name, check in self._checks.items()
123            if name not in self._disabled
124        ]
125
126    def enable(self, name: str) -> None:
127        """Enable a check"""
128        self._disabled.discard(name)
129
130    def disable(self, name: str) -> None:
131        """Disable a check"""
132        self._disabled.add(name)
133
134    def list_checks(self) -> List[str]:
135        """List all registered check names"""
136        return list(self._checks.keys())
137
138# Global registry
139default_registry = CheckRegistry()
140
141# ============================================================
142# Entry Points Discovery
143# ============================================================
144
145def discover_checks(
146    group: str = "hook_validator.checks"
147) -> Iterator[ValidationCheck]:
148    """Discover checks from installed packages via entry_points"""
149    if sys.version_info >= (3, 10):
150        eps = entry_points(group=group)
151    else:
152        eps = entry_points().get(group, [])
153
154    for ep in eps:
155        try:
156            check_factory = ep.load()
157            check = check_factory() if callable(check_factory) else check_factory
158
159            if isinstance(check, ValidationCheck):
160                yield check
161            else:
162                warnings.warn(
163                    f"Entry point {ep.name} does not implement "
164                    f"ValidationCheck protocol"
165                )
166        except Exception as e:
167            warnings.warn(f"Failed to load check {ep.name}: {e}")
168
169def load_external_checks(registry: CheckRegistry) -> int:
170    """Load all external checks into a registry"""
171    count = 0
172    for check in discover_checks():
173        registry.register(check)
174        count += 1
175    return count
176
177# ============================================================
178# Built-in Checks
179# ============================================================
180
181@default_registry.check()
182class NamingConventionCheck:
183    """Check hook file naming convention"""
184
185    VALID_PATTERNS = [
186        r"^[a-z0-9](/python-advanced/03-design-patterns/case-studies/plugin-architecture/[a-z0-9\-_]*[a-z0-9])?\.py$",
187    ]
188
189    @property
190    def name(self) -> str:
191        return "naming_convention"
192
193    @property
194    def description(self) -> str:
195        return "Check that hook files follow naming conventions"
196
197    def check(self, context: CheckContext) -> List[ValidationIssue]:
198        issues: List[ValidationIssue] = []
199        filename = context.hook_path.name
200
201        valid = any(
202            re.match(pattern, filename)
203            for pattern in self.VALID_PATTERNS
204        )
205
206        if not valid:
207            issues.append(
208                ValidationIssue(
209                    level="warning",
210                    message=f"Invalid file name: {filename}",
211                    suggestion="Use snake_case or kebab-case naming"
212                )
213            )
214
215        return issues
216
217@default_registry.check()
218class LibImportCheck:
219    """Check that hooks import required libraries"""
220
221    HOOK_IO_PATTERNS = [
222        r"from\s+hook_io\s+import",
223        r"from\s+lib\.hook_io\s+import",
224    ]
225
226    @property
227    def name(self) -> str:
228        return "lib_import"
229
230    @property
231    def description(self) -> str:
232        return "Check that hooks import hook_io module"
233
234    def check(self, context: CheckContext) -> List[ValidationIssue]:
235        issues: List[ValidationIssue] = []
236
237        has_import = any(
238            re.search(pattern, context.content)
239            for pattern in self.HOOK_IO_PATTERNS
240        )
241
242        if not has_import:
243            issues.append(
244                ValidationIssue(
245                    level="warning",
246                    message="Missing hook_io import",
247                    suggestion="Add: from hook_io import read_hook_input, write_hook_output"
248                )
249            )
250
251        return issues
252
253@default_registry.check()
254class OutputFormatCheck:
255    """Check hook output format"""
256
257    GOOD_PATTERNS = [
258        r"write_hook_output\s*\(",
259        r"create_pretooluse_output\s*\(",
260        r"create_posttooluse_output\s*\(",
261    ]
262
263    BAD_PATTERNS = [
264        r'print\s*\(\s*json\.dumps\s*\(',
265        r'sys\.stdout\.write\s*\(\s*json\.dumps\s*\(',
266    ]
267
268    @property
269    def name(self) -> str:
270        return "output_format"
271
272    @property
273    def description(self) -> str:
274        return "Check that hooks use proper output functions"
275
276    def check(self, context: CheckContext) -> List[ValidationIssue]:
277        issues: List[ValidationIssue] = []
278
279        has_bad = any(
280            re.search(pattern, context.content)
281            for pattern in self.BAD_PATTERNS
282        )
283
284        if has_bad:
285            issues.append(
286                ValidationIssue(
287                    level="warning",
288                    message="Using print(json.dumps(...)) instead of write_hook_output()",
289                    suggestion="Use write_hook_output() for proper output formatting"
290                )
291            )
292
293        return issues
294
295@default_registry.check()
296class TestExistsCheck:
297    """Check that corresponding test file exists"""
298
299    @property
300    def name(self) -> str:
301        return "test_exists"
302
303    @property
304    def description(self) -> str:
305        return "Check that hook has a corresponding test file"
306
307    def check(self, context: CheckContext) -> List[ValidationIssue]:
308        issues: List[ValidationIssue] = []
309
310        hook_name = context.hook_path.stem
311        test_name = f"test_{hook_name.replace('-', '_')}.py"
312
313        possible_paths = [
314            context.project_root / ".claude" / "lib" / "tests" / test_name,
315            context.project_root / ".claude" / "hooks" / "tests" / test_name,
316        ]
317
318        if not any(p.exists() for p in possible_paths):
319            issues.append(
320                ValidationIssue(
321                    level="info",
322                    message=f"No test file found: {test_name}",
323                    suggestion=f"Create test at .claude/lib/tests/{test_name}"
324                )
325            )
326
327        return issues
328
329# ============================================================
330# Validator
331# ============================================================
332
333class PluginValidator:
334    """Plugin-based hook validator"""
335
336    def __init__(
337        self,
338        registry: Optional[CheckRegistry] = None,
339        project_root: Optional[str] = None,
340        auto_discover: bool = True
341    ) -> None:
342        self.registry = registry or default_registry
343        self.project_root = Path(
344            project_root or os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd())
345        )
346
347        if auto_discover:
348            load_external_checks(self.registry)
349
350    def validate_hook(self, hook_path: str) -> ValidationResult:
351        """Validate a single hook file"""
352        path = self._resolve_path(hook_path)
353
354        if not path.exists():
355            return ValidationResult(
356                hook_path=str(path),
357                issues=[
358                    ValidationIssue(
359                        level="error",
360                        message=f"Hook file not found: {path}"
361                    )
362                ]
363            )
364
365        try:
366            content = path.read_text(encoding="utf-8")
367        except Exception as e:
368            return ValidationResult(
369                hook_path=str(path),
370                issues=[
371                    ValidationIssue(
372                        level="error",
373                        message=f"Cannot read hook file: {e}"
374                    )
375                ]
376            )
377
378        context = CheckContext(
379            hook_path=path,
380            content=content,
381            project_root=self.project_root
382        )
383
384        all_issues: List[ValidationIssue] = []
385        checks_run: List[str] = []
386
387        for check in self.registry.get_enabled_checks():
388            try:
389                issues = check.check(context)
390                all_issues.extend(issues)
391                checks_run.append(check.name)
392            except Exception as e:
393                all_issues.append(
394                    ValidationIssue(
395                        level="error",
396                        message=f"Check {check.name} failed: {e}"
397                    )
398                )
399
400        return ValidationResult(
401            hook_path=str(path),
402            issues=all_issues,
403            checks_run=checks_run
404        )
405
406    def validate_all_hooks(self, hooks_dir: Optional[str] = None) -> List[ValidationResult]:
407        """Validate all hook files in a directory"""
408        if hooks_dir is None:
409            hooks_dir = str(self.project_root / ".claude" / "hooks")
410
411        hooks_path = self._resolve_path(hooks_dir)
412
413        if not hooks_path.is_dir():
414            return [
415                ValidationResult(
416                    hook_path=str(hooks_path),
417                    issues=[
418                        ValidationIssue(
419                            level="error",
420                            message=f"Hooks directory not found: {hooks_path}"
421                        )
422                    ]
423                )
424            ]
425
426        results = []
427        for hook_file in sorted(hooks_path.glob("*.py")):
428            if hook_file.name.startswith("_"):
429                continue
430            results.append(self.validate_hook(str(hook_file)))
431
432        return results
433
434    def _resolve_path(self, path: str) -> Path:
435        """Resolve path to absolute"""
436        p = Path(path)
437        return p if p.is_absolute() else self.project_root / p
438
439# ============================================================
440# Demo
441# ============================================================
442
443if __name__ == "__main__":
444    import tempfile
445
446    # Show registered checks
447    print("Registered checks:")
448    for name in default_registry.list_checks():
449        check = default_registry.get_check(name)
450        if check:
451            print(f"  - {name}: {check.description}")
452
453    # Create test hook
454    with tempfile.TemporaryDirectory() as tmpdir:
455        hooks_dir = Path(tmpdir) / ".claude" / "hooks"
456        hooks_dir.mkdir(parents=True)
457
458        # Create a hook file
459        hook_file = hooks_dir / "check-permissions.py"
460        hook_file.write_text("""
461#!/usr/bin/env python3
462from hook_io import read_hook_input, write_hook_output
463
464def main():
465    data = read_hook_input()
466    write_hook_output({"decision": "allow"})
467
468if __name__ == "__main__":
469    main()
470""")
471
472        # Validate
473        validator = PluginValidator(project_root=tmpdir, auto_discover=False)
474        result = validator.validate_hook(str(hook_file))
475
476        print(f"\nValidation result for {result.hook_path}:")
477        print(f"  Compliant: {result.is_compliant}")
478        print(f"  Checks run: {result.checks_run}")
479        for issue in result.issues:
480            print(f"  [{issue.level}] {issue.message}")

使用範例

建立插件

建立自訂檢查項只需要實作 Protocol 定義的介面:

 1from typing import List
 2import re
 3
 4class DocstringCheck:
 5    """
 6    Check that hooks have proper docstrings
 7
 8    A custom check that can be added to the validator.
 9    No inheritance required - just implement the protocol.
10    """
11
12    @property
13    def name(self) -> str:
14        return "docstring"
15
16    @property
17    def description(self) -> str:
18        return "Check that hook has a module docstring"
19
20    def check(self, context: CheckContext) -> List[ValidationIssue]:
21        issues: List[ValidationIssue] = []
22
23        # Check for module docstring
24        docstring_pattern = r'^(#!/.*\n)?[\s]*["\'\]{3}'
25
26        if not re.match(docstring_pattern, context.content):
27            issues.append(
28                ValidationIssue(
29                    level="info",
30                    message="Missing module docstring",
31                    suggestion='Add a docstring at the top: """Description of hook"""'
32                )
33            )
34
35        return issues
36
37class SecurityCheck:
38    """
39    Check for potential security issues
40
41    Looks for dangerous patterns like eval(), exec(), subprocess with shell=True
42    """
43
44    DANGEROUS_PATTERNS = [
45        (r'\beval\s*\(', "eval() is dangerous, consider alternatives"),
46        (r'\bexec\s*\(', "exec() is dangerous, consider alternatives"),
47        (r'subprocess.*shell\s*=\s*True', "shell=True is a security risk"),
48        (r'os\.system\s*\(', "os.system() is insecure, use subprocess instead"),
49    ]
50
51    @property
52    def name(self) -> str:
53        return "security"
54
55    @property
56    def description(self) -> str:
57        return "Check for potential security vulnerabilities"
58
59    def check(self, context: CheckContext) -> List[ValidationIssue]:
60        issues: List[ValidationIssue] = []
61
62        for pattern, message in self.DANGEROUS_PATTERNS:
63            if re.search(pattern, context.content):
64                issues.append(
65                    ValidationIssue(
66                        level="warning",
67                        message=message
68                    )
69                )
70
71        return issues

註冊插件

有三種方式可以註冊插件:

 1# Method 1: Using decorator (at class definition time)
 2@default_registry.check()
 3class MyCheck:
 4    @property
 5    def name(self) -> str:
 6        return "my_check"
 7    # ...
 8
 9# Method 2: Explicit registration (at runtime)
10docstring_check = DocstringCheck()
11security_check = SecurityCheck()
12
13default_registry.register(docstring_check)
14default_registry.register(security_check)
15
16# Method 3: Create custom registry
17custom_registry = CheckRegistry()
18custom_registry.register(DocstringCheck())
19custom_registry.register(SecurityCheck())
20
21validator = PluginValidator(registry=custom_registry, auto_discover=False)

使用 entry_points

對於要發布為獨立套件的插件,使用 pyproject.toml 設定 entry_points:

 1# pyproject.toml
 2[project]
 3name = "my-hook-checks"
 4version = "1.0.0"
 5description = "Custom hook validation checks"
 6
 7dependencies = [
 8    # hook-validator is the core package
 9]
10
11[project.entry-points."hook_validator.checks"]
12# Format: name = "module:factory_or_class"
13docstring = "my_hook_checks:DocstringCheck"
14security = "my_hook_checks.security:SecurityCheck"

插件模組結構:

 1# my_hook_checks/__init__.py
 2from .docstring import DocstringCheck
 3from .security import SecurityCheck
 4
 5__all__ = ["DocstringCheck", "SecurityCheck"]
 6
 7# my_hook_checks/docstring.py
 8class DocstringCheck:
 9    """Check for module docstrings"""
10
11    @property
12    def name(self) -> str:
13        return "docstring"
14
15    @property
16    def description(self) -> str:
17        return "Check that hook has a module docstring"
18
19    def check(self, context):
20        # ... implementation ...
21        return []

安裝後,PluginValidator 會自動發現並載入這些檢查項:

1# Automatic discovery from entry_points
2validator = PluginValidator(auto_discover=True)
3
4# Check what's loaded
5print(validator.registry.list_checks())
6# ['naming_convention', 'lib_import', 'output_format', 'test_exists',
7#  'docstring', 'security']  # <- external checks discovered!

設計權衡

面向硬編碼方法插件架構
擴展性差:需修改核心程式碼優秀:第三方可自由擴展
初始複雜度低:直接寫邏輯中:需要理解 Protocol 和註冊機制
維護成本隨功能增加而上升穩定:新增功能不改核心
第三方擴展不支援支援:透過 entry_points
測試難度需要 mock 整個類別容易:每個 check 獨立測試
執行時彈性固定高:可動態啟用/停用
除錯難度簡單:程式碼集中需要追蹤插件來源
效能開銷輕微:註冊和迭代成本

什麼時候該用這個技術?

適合使用

  • 需要支援第三方擴展的工具(如 pytest、flake8、pre-commit)
  • 功能模組化明確,各檢查項獨立
  • 預期會頻繁新增功能
  • 需要讓使用者自訂行為

不建議使用

  • 內部使用的小工具
  • 功能很少變動
  • 團隊對 Protocol 和 entry_points 不熟悉
  • 效能關鍵的熱路徑

練習

基礎練習:實作格式檢查插件

實作一個 IndentationCheck,檢查 Python 檔案是否使用一致的縮排(空格 vs Tab):

 1class IndentationCheck:
 2    """Check for consistent indentation"""
 3
 4    @property
 5    def name(self) -> str:
 6        return "indentation"
 7
 8    @property
 9    def description(self) -> str:
10        return "Check for consistent indentation style"
11
12    def check(self, context: CheckContext) -> List[ValidationIssue]:
13        issues: List[ValidationIssue] = []
14
15        # TODO: Implement
16        # 1. Check each line's leading whitespace
17        # 2. Detect if mixing tabs and spaces
18        # 3. Report as warning if inconsistent
19
20        return issues

提示:

  • 使用 context.content.splitlines() 取得每一行
  • 檢查每行開頭的空白字元 line[:len(line) - len(line.lstrip())]

進階練習:新增優先順序和相依性

擴展 ValidationCheck Protocol,支援:

  1. 優先順序:決定檢查項執行順序
  2. 相依性:某些檢查必須在其他檢查之後執行
 1@runtime_checkable
 2class ValidationCheck(Protocol):
 3    @property
 4    def name(self) -> str: ...
 5
 6    @property
 7    def description(self) -> str: ...
 8
 9    @property
10    def priority(self) -> int:
11        """Lower number = higher priority (default: 100)"""
12        return 100
13
14    @property
15    def depends_on(self) -> List[str]:
16        """Names of checks that must run before this one"""
17        return []
18
19    def check(self, context: CheckContext) -> List[ValidationIssue]: ...

然後修改 PluginValidator.validate_hook() 使用拓撲排序執行檢查。

挑戰題:實作插件的熱載入

實作「不重啟程式就能載入新插件」的功能:

 1class HotReloadableRegistry(CheckRegistry):
 2    """Registry that supports hot-reloading plugins"""
 3
 4    def watch_directory(self, path: str) -> None:
 5        """Watch a directory for new plugin files"""
 6        # TODO: Use watchdog or polling to detect new files
 7        pass
 8
 9    def reload_check(self, name: str) -> None:
10        """Reload a specific check"""
11        # TODO: Unload old module, load new version
12        pass

提示:

  • 使用 importlib.reload() 重新載入模組
  • 使用 watchdog 套件監控檔案變化
  • 注意處理模組快取 sys.modules

延伸閱讀


上一章:快取生命週期管理 下一章:異常設計架構