案例:Rust 正則表達式
案例:Rust 正則表達式
為什麼用
本案例基於 .claude/lib/hook_validator.py 的實際程式碼,展示如何用 Rust 的 regex crate 加速模式匹配。
先備知識
問題背景
現有設計
hook_validator.py 使用 Python 的 re 模組進行多種模式匹配驗證:
1import re
2from typing import List, Optional
3from pathlib import Path
4
5class HookValidator:
6 """Hook 合規性驗證器"""
7
8 # Pattern definitions for various validation checks
9 HOOK_IO_PATTERNS = [
10 r"from\s+hook_io\s+import",
11 r"from\s+lib\.hook_io\s+import",
12 ]
13
14 HOOK_LOGGING_PATTERNS = [
15 r"from\s+hook_logging\s+import",
16 r"from\s+lib\.hook_logging\s+import",
17 ]
18
19 CONFIG_LOADER_PATTERNS = [
20 r"from\s+config_loader\s+import",
21 r"from\s+lib\.config_loader\s+import",
22 ]
23
24 GIT_UTILS_PATTERNS = [
25 r"from\s+git_utils\s+import",
26 r"from\s+lib\.git_utils\s+import",
27 ]
28
29 OUTPUT_PATTERNS = [
30 r"write_hook_output\s*\(",
31 r"create_pretooluse_output\s*\(",
32 r"create_posttooluse_output\s*\(",
33 ]
34
35 BAD_OUTPUT_PATTERNS = [
36 r'print\s*\(\s*json\.dumps\s*\(',
37 r'sys\.stdout\.write\s*\(\s*json\.dumps\s*\(',
38 ]
39
40 VALID_NAME_PATTERNS = [
41 r"^[a-z0-9](/python-advanced/06-rust-extensions/case-studies/rust-regex/[a-z0-9\-_]*[a-z0-9])?\.py$",
42 ]
43
44 def _has_import(self, content: str, patterns: List[str]) -> bool:
45 """Check if content matches any of the import patterns"""
46 return any(
47 re.search(pattern, content)
48 for pattern in patterns
49 )
50
51 def _matches_pattern(self, content: str, patterns: List[str]) -> bool:
52 """Check if content matches any pattern"""
53 return any(
54 re.search(pattern, content)
55 for pattern in patterns
56 )
57
58 def check_naming_convention(self, hook_path: Path) -> List[dict]:
59 """Validate file naming convention"""
60 filename = hook_path.name
61 valid_name = any(
62 re.match(pattern, filename)
63 for pattern in self.VALID_NAME_PATTERNS
64 )
65 # ... validation logic這段程式碼展示了幾個核心問題:
- 重複編譯:每次呼叫
re.search()或re.match()都可能重新編譯正則表達式 - 多模式匹配:需要遍歷多個模式逐一檢查
- 混合使用場景:部分用於
match(從頭匹配),部分用於search(任意位置)
效能限制
Python re 模組的限制:
| 限制 | 說明 | 影響 |
|---|---|---|
| 回溯型引擎 | NFA with backtracking | 某些模式可能導致指數級時間複雜度 |
| 解釋器開銷 | 每次匹配都經過 Python 呼叫 | 大量匹配時累積顯著延遲 |
| 無硬體加速 | 純軟體實作 | 無法利用 SIMD 等現代 CPU 特性 |
| GIL 限制 | 受 Global Interpreter Lock 影響 | 多執行緒場景效能受限 |
病態輸入示例
1import re
2import time
3
4# Pathological pattern: catastrophic backtracking
5pattern = r"(a+)+b"
6text = "a" * 25 + "c" # No match, triggers backtracking
7
8start = time.time()
9re.search(pattern, text)
10elapsed = time.time() - start
11print(f"Python re: {elapsed:.2f}s") # May take several seconds!進階解決方案
設計目標
- 用 Rust regex crate 取代 Python re
- 利用 Rust regex 的 DFA 引擎確保線性時間複雜度
- 使用
RegexSet實現高效批次驗證 - 預編譯正則表達式,避免重複編譯開銷
實作步驟
步驟 1:建立專案結構
1# Create new maturin project
2maturin new hook_validator_rs
3cd hook_validator_rs
4
5# Project structure
6hook_validator_rs/
7├── Cargo.toml
8├── pyproject.toml
9└── src/
10 └── lib.rs編輯 Cargo.toml:
1[package]
2name = "hook_validator_rs"
3version = "0.1.0"
4edition = "2021"
5
6[lib]
7name = "hook_validator_rs"
8crate-type = ["cdylib"]
9
10[dependencies]
11pyo3 = { version = "0.22", features = ["extension-module"] }
12regex = "1.10"
13once_cell = "1.19"步驟 2:定義預編譯正則表達式
使用 once_cell::sync::Lazy 實現執行緒安全的延遲初始化:
1use once_cell::sync::Lazy;
2use regex::{Regex, RegexSet};
3
4// Pre-compiled individual patterns
5static HOOK_IO_REGEX: Lazy<RegexSet> = Lazy::new(|| {
6 RegexSet::new([
7 r"from\s+hook_io\s+import",
8 r"from\s+lib\.hook_io\s+import",
9 ]).expect("Invalid regex pattern")
10});
11
12static HOOK_LOGGING_REGEX: Lazy<RegexSet> = Lazy::new(|| {
13 RegexSet::new([
14 r"from\s+hook_logging\s+import",
15 r"from\s+lib\.hook_logging\s+import",
16 ]).expect("Invalid regex pattern")
17});
18
19static CONFIG_LOADER_REGEX: Lazy<RegexSet> = Lazy::new(|| {
20 RegexSet::new([
21 r"from\s+config_loader\s+import",
22 r"from\s+lib\.config_loader\s+import",
23 ]).expect("Invalid regex pattern")
24});
25
26static GIT_UTILS_REGEX: Lazy<RegexSet> = Lazy::new(|| {
27 RegexSet::new([
28 r"from\s+git_utils\s+import",
29 r"from\s+lib\.git_utils\s+import",
30 ]).expect("Invalid regex pattern")
31});
32
33static OUTPUT_REGEX: Lazy<RegexSet> = Lazy::new(|| {
34 RegexSet::new([
35 r"write_hook_output\s*\(",
36 r"create_pretooluse_output\s*\(",
37 r"create_posttooluse_output\s*\(",
38 ]).expect("Invalid regex pattern")
39});
40
41static BAD_OUTPUT_REGEX: Lazy<RegexSet> = Lazy::new(|| {
42 RegexSet::new([
43 r"print\s*\(\s*json\.dumps\s*\(",
44 r"sys\.stdout\.write\s*\(\s*json\.dumps\s*\(",
45 ]).expect("Invalid regex pattern")
46});
47
48// For filename validation (anchored match)
49static VALID_NAME_REGEX: Lazy<Regex> = Lazy::new(|| {
50 Regex::new(r"^[a-z0-9](/python-advanced/06-rust-extensions/case-studies/rust-regex/[a-z0-9\-_]*[a-z0-9])?\.py$")
51 .expect("Invalid regex pattern")
52});為什麼用 once_cell::sync::Lazy?
- 執行緒安全:
Lazy確保初始化只執行一次,即使多執行緒同時存取 - 延遲初始化:只在第一次使用時編譯正則表達式
- 零執行時開銷:初始化後的存取是零成本的
步驟 3:實作批次匹配邏輯
1use pyo3::prelude::*;
2use std::collections::HashMap;
3
4/// Result of validating import patterns in source code
5#[pyclass]
6#[derive(Clone)]
7pub struct ImportCheckResult {
8 #[pyo3(get)]
9 pub has_hook_io: bool,
10 #[pyo3(get)]
11 pub has_hook_logging: bool,
12 #[pyo3(get)]
13 pub has_config_loader: bool,
14 #[pyo3(get)]
15 pub has_git_utils: bool,
16 #[pyo3(get)]
17 pub has_good_output: bool,
18 #[pyo3(get)]
19 pub has_bad_output: bool,
20}
21
22#[pymethods]
23impl ImportCheckResult {
24 fn __repr__(&self) -> String {
25 format!(
26 "ImportCheckResult(hook_io={}, logging={}, config={}, git={}, good_out={}, bad_out={})",
27 self.has_hook_io, self.has_hook_logging,
28 self.has_config_loader, self.has_git_utils,
29 self.has_good_output, self.has_bad_output
30 )
31 }
32}
33
34/// Check all import patterns in a single pass through the content
35#[pyfunction]
36pub fn check_imports(content: &str) -> ImportCheckResult {
37 ImportCheckResult {
38 has_hook_io: HOOK_IO_REGEX.is_match(content),
39 has_hook_logging: HOOK_LOGGING_REGEX.is_match(content),
40 has_config_loader: CONFIG_LOADER_REGEX.is_match(content),
41 has_git_utils: GIT_UTILS_REGEX.is_match(content),
42 has_good_output: OUTPUT_REGEX.is_match(content),
43 has_bad_output: BAD_OUTPUT_REGEX.is_match(content),
44 }
45}
46
47/// Validate filename against naming convention
48#[pyfunction]
49pub fn is_valid_hook_name(filename: &str) -> bool {
50 VALID_NAME_REGEX.is_match(filename)
51}
52
53/// Check which specific patterns matched (for detailed reporting)
54#[pyfunction]
55pub fn get_matched_patterns(content: &str, pattern_group: &str) -> Vec<usize> {
56 let regex_set = match pattern_group {
57 "hook_io" => &*HOOK_IO_REGEX,
58 "hook_logging" => &*HOOK_LOGGING_REGEX,
59 "config_loader" => &*CONFIG_LOADER_REGEX,
60 "git_utils" => &*GIT_UTILS_REGEX,
61 "output" => &*OUTPUT_REGEX,
62 "bad_output" => &*BAD_OUTPUT_REGEX,
63 _ => return vec![],
64 };
65
66 regex_set.matches(content).iter().collect()
67}步驟 4:進階批次驗證 API
對於需要一次驗證大量檔案的場景,提供更高效的批次 API:
1/// Batch validation result for multiple files
2#[pyclass]
3#[derive(Clone)]
4pub struct BatchValidationResult {
5 #[pyo3(get)]
6 pub results: HashMap<String, ImportCheckResult>,
7 #[pyo3(get)]
8 pub valid_names: HashMap<String, bool>,
9}
10
11#[pymethods]
12impl BatchValidationResult {
13 fn __repr__(&self) -> String {
14 format!("BatchValidationResult({} files)", self.results.len())
15 }
16
17 /// Get files that are missing hook_io import
18 fn files_missing_hook_io(&self) -> Vec<String> {
19 self.results
20 .iter()
21 .filter(|(_, r)| !r.has_hook_io)
22 .map(|(path, _)| path.clone())
23 .collect()
24 }
25
26 /// Get files with bad output patterns
27 fn files_with_bad_output(&self) -> Vec<String> {
28 self.results
29 .iter()
30 .filter(|(_, r)| r.has_bad_output)
31 .map(|(path, _)| path.clone())
32 .collect()
33 }
34}
35
36/// Validate multiple files in batch
37///
38/// This is more efficient than calling check_imports for each file
39/// because it can potentially parallelize the work.
40#[pyfunction]
41pub fn validate_batch(files: HashMap<String, String>) -> BatchValidationResult {
42 let results: HashMap<String, ImportCheckResult> = files
43 .iter()
44 .map(|(path, content)| (path.clone(), check_imports(content)))
45 .collect();
46
47 let valid_names: HashMap<String, bool> = files
48 .keys()
49 .map(|path| {
50 let filename = path.rsplit('/').next().unwrap_or(path);
51 (path.clone(), is_valid_hook_name(filename))
52 })
53 .collect();
54
55 BatchValidationResult { results, valid_names }
56}步驟 5:PyO3 模組導出
1/// Rust-powered hook validator with pre-compiled regex patterns
2#[pymodule]
3fn hook_validator_rs(m: &Bound<'_, PyModule>) -> PyResult<()> {
4 m.add_function(wrap_pyfunction!(check_imports, m)?)?;
5 m.add_function(wrap_pyfunction!(is_valid_hook_name, m)?)?;
6 m.add_function(wrap_pyfunction!(get_matched_patterns, m)?)?;
7 m.add_function(wrap_pyfunction!(validate_batch, m)?)?;
8 m.add_class::<ImportCheckResult>()?;
9 m.add_class::<BatchValidationResult>()?;
10 Ok(())
11}步驟 6:Python 端整合
在 Python 端無縫整合 Rust 模組:
1"""
2Hook 合規性驗證工具(Rust 加速版)
3
4This module provides a drop-in replacement for the pure Python
5hook_validator, using Rust regex crate for pattern matching.
6"""
7
8from pathlib import Path
9from typing import List, Optional
10from dataclasses import dataclass, field
11
12# Try to import Rust extension, fall back to pure Python
13try:
14 import hook_validator_rs as _rs
15 _USE_RUST = True
16except ImportError:
17 import re
18 _USE_RUST = False
19 print("Warning: Rust extension not available, using pure Python")
20
21@dataclass
22class ValidationIssue:
23 """Validation issue description"""
24 level: str # "error" | "warning" | "info"
25 message: str
26 line: Optional[int] = None
27 suggestion: Optional[str] = None
28
29@dataclass
30class ValidationResult:
31 """Validation result for a single hook"""
32 hook_path: str
33 issues: List[ValidationIssue] = field(default_factory=list)
34 is_compliant: bool = True
35
36 def __post_init__(self):
37 self.is_compliant = not any(
38 issue.level == "error" for issue in self.issues
39 )
40
41class HookValidator:
42 """Hook compliance validator with optional Rust acceleration"""
43
44 def __init__(self, project_root: Optional[str] = None):
45 if project_root is None:
46 import os
47 project_root = os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd())
48 self.project_root = Path(project_root)
49
50 def check_lib_imports(
51 self,
52 content: str,
53 hook_path: Optional[Path] = None
54 ) -> List[ValidationIssue]:
55 """Check shared module imports using Rust regex"""
56 issues = []
57
58 if _USE_RUST:
59 # Use Rust-accelerated pattern matching
60 result = _rs.check_imports(content)
61
62 if not result.has_hook_io:
63 issues.append(ValidationIssue(
64 level="warning",
65 message="Missing hook_io import",
66 suggestion="Add: from hook_io import read_hook_input, write_hook_output"
67 ))
68
69 if not result.has_hook_logging:
70 issues.append(ValidationIssue(
71 level="info",
72 message="Missing hook_logging import (recommended)",
73 suggestion="Add: from hook_logging import setup_hook_logging"
74 ))
75
76 if result.has_bad_output:
77 issues.append(ValidationIssue(
78 level="warning",
79 message="Using print(json.dumps(...)) instead of write_hook_output()",
80 suggestion="Replace with: write_hook_output(output_dict)"
81 ))
82 else:
83 # Fallback to pure Python regex
84 issues.extend(self._check_imports_python(content, hook_path))
85
86 return issues
87
88 def check_naming_convention(self, hook_path: Path) -> List[ValidationIssue]:
89 """Validate filename against naming convention"""
90 issues = []
91 filename = hook_path.name
92
93 if _USE_RUST:
94 valid = _rs.is_valid_hook_name(filename)
95 else:
96 import re
97 valid = bool(re.match(
98 r"^[a-z0-9](/python-advanced/06-rust-extensions/case-studies/rust-regex/[a-z0-9\-_]*[a-z0-9])?\.py$",
99 filename
100 ))
101
102 if not valid:
103 issues.append(ValidationIssue(
104 level="warning",
105 message=f"Invalid filename: {filename}",
106 suggestion="Use snake-case or kebab-case: check_permissions.py"
107 ))
108
109 return issues
110
111 def validate_hook(self, hook_path: str) -> ValidationResult:
112 """Validate a single hook file"""
113 path = self._resolve_path(hook_path)
114
115 if not path.exists():
116 return ValidationResult(
117 hook_path=str(path),
118 issues=[ValidationIssue(
119 level="error",
120 message=f"Hook file not found: {path}"
121 )]
122 )
123
124 content = path.read_text(encoding="utf-8")
125 issues = []
126 issues.extend(self.check_naming_convention(path))
127 issues.extend(self.check_lib_imports(content, path))
128
129 return ValidationResult(hook_path=str(path), issues=issues)
130
131 def validate_all_hooks(
132 self,
133 hooks_dir: Optional[str] = None
134 ) -> List[ValidationResult]:
135 """Validate all hooks with batch optimization"""
136 if hooks_dir is None:
137 hooks_dir = str(self.project_root / ".claude" / "hooks")
138
139 hooks_path = self._resolve_path(hooks_dir)
140 hook_files = list(hooks_path.glob("*.py"))
141
142 if _USE_RUST and len(hook_files) > 1:
143 # Use batch validation for multiple files
144 files_content = {
145 str(f): f.read_text(encoding="utf-8")
146 for f in hook_files
147 if not f.name.startswith("_")
148 }
149
150 batch_result = _rs.validate_batch(files_content)
151
152 results = []
153 for path, content in files_content.items():
154 import_result = batch_result.results[path]
155 valid_name = batch_result.valid_names[path]
156
157 issues = self._import_result_to_issues(import_result, valid_name)
158 results.append(ValidationResult(hook_path=path, issues=issues))
159
160 return results
161 else:
162 # Single file or no Rust: use standard validation
163 return [
164 self.validate_hook(str(f))
165 for f in hook_files
166 if not f.name.startswith("_")
167 ]
168
169 def _resolve_path(self, path: str) -> Path:
170 p = Path(path)
171 return p if p.is_absolute() else self.project_root / p
172
173 def _import_result_to_issues(
174 self,
175 result,
176 valid_name: bool
177 ) -> List[ValidationIssue]:
178 """Convert Rust ImportCheckResult to list of issues"""
179 issues = []
180
181 if not valid_name:
182 issues.append(ValidationIssue(
183 level="warning",
184 message="Invalid filename format"
185 ))
186
187 if not result.has_hook_io:
188 issues.append(ValidationIssue(
189 level="warning",
190 message="Missing hook_io import"
191 ))
192
193 if result.has_bad_output:
194 issues.append(ValidationIssue(
195 level="warning",
196 message="Using deprecated output pattern"
197 ))
198
199 return issues
200
201 def _check_imports_python(
202 self,
203 content: str,
204 hook_path: Optional[Path]
205 ) -> List[ValidationIssue]:
206 """Pure Python fallback for import checking"""
207 import re
208 issues = []
209
210 hook_io_patterns = [
211 r"from\s+hook_io\s+import",
212 r"from\s+lib\.hook_io\s+import",
213 ]
214
215 if not any(re.search(p, content) for p in hook_io_patterns):
216 issues.append(ValidationIssue(
217 level="warning",
218 message="Missing hook_io import"
219 ))
220
221 return issues完整程式碼
以下是完整的 src/lib.rs:
1//! Hook Validator - Rust regex acceleration for Python hook validation
2//!
3//! This module provides pre-compiled regex patterns for validating
4//! Claude Code hook files, with significant performance improvements
5//! over pure Python regex.
6
7use once_cell::sync::Lazy;
8use pyo3::prelude::*;
9use regex::{Regex, RegexSet};
10use std::collections::HashMap;
11
12// ============================================================================
13// Pre-compiled Regex Patterns
14// ============================================================================
15
16/// Import patterns for hook_io module
17static HOOK_IO_REGEX: Lazy<RegexSet> = Lazy::new(|| {
18 RegexSet::new([
19 r"from\s+hook_io\s+import",
20 r"from\s+lib\.hook_io\s+import",
21 ])
22 .expect("Invalid HOOK_IO_REGEX pattern")
23});
24
25/// Import patterns for hook_logging module
26static HOOK_LOGGING_REGEX: Lazy<RegexSet> = Lazy::new(|| {
27 RegexSet::new([
28 r"from\s+hook_logging\s+import",
29 r"from\s+lib\.hook_logging\s+import",
30 ])
31 .expect("Invalid HOOK_LOGGING_REGEX pattern")
32});
33
34/// Import patterns for config_loader module
35static CONFIG_LOADER_REGEX: Lazy<RegexSet> = Lazy::new(|| {
36 RegexSet::new([
37 r"from\s+config_loader\s+import",
38 r"from\s+lib\.config_loader\s+import",
39 ])
40 .expect("Invalid CONFIG_LOADER_REGEX pattern")
41});
42
43/// Import patterns for git_utils module
44static GIT_UTILS_REGEX: Lazy<RegexSet> = Lazy::new(|| {
45 RegexSet::new([
46 r"from\s+git_utils\s+import",
47 r"from\s+lib\.git_utils\s+import",
48 ])
49 .expect("Invalid GIT_UTILS_REGEX pattern")
50});
51
52/// Recommended output function patterns
53static OUTPUT_REGEX: Lazy<RegexSet> = Lazy::new(|| {
54 RegexSet::new([
55 r"write_hook_output\s*\(",
56 r"create_pretooluse_output\s*\(",
57 r"create_posttooluse_output\s*\(",
58 ])
59 .expect("Invalid OUTPUT_REGEX pattern")
60});
61
62/// Deprecated output patterns (should be avoided)
63static BAD_OUTPUT_REGEX: Lazy<RegexSet> = Lazy::new(|| {
64 RegexSet::new([
65 r"print\s*\(\s*json\.dumps\s*\(",
66 r"sys\.stdout\.write\s*\(\s*json\.dumps\s*\(",
67 ])
68 .expect("Invalid BAD_OUTPUT_REGEX pattern")
69});
70
71/// Valid hook filename pattern (anchored)
72static VALID_NAME_REGEX: Lazy<Regex> = Lazy::new(|| {
73 Regex::new(r"^[a-z0-9](/python-advanced/06-rust-extensions/case-studies/rust-regex/[a-z0-9\-_]*[a-z0-9])?\.py$")
74 .expect("Invalid VALID_NAME_REGEX pattern")
75});
76
77/// JSON output detection patterns
78static JSON_OUTPUT_REGEX: Lazy<RegexSet> = Lazy::new(|| {
79 RegexSet::new([
80 r"json\.dumps",
81 r"write_hook_output",
82 r"create_.*_output",
83 ])
84 .expect("Invalid JSON_OUTPUT_REGEX pattern")
85});
86
87// ============================================================================
88// Result Types
89// ============================================================================
90
91/// Result of checking import patterns in source code
92#[pyclass]
93#[derive(Clone, Debug)]
94pub struct ImportCheckResult {
95 #[pyo3(get)]
96 pub has_hook_io: bool,
97 #[pyo3(get)]
98 pub has_hook_logging: bool,
99 #[pyo3(get)]
100 pub has_config_loader: bool,
101 #[pyo3(get)]
102 pub has_git_utils: bool,
103 #[pyo3(get)]
104 pub has_good_output: bool,
105 #[pyo3(get)]
106 pub has_bad_output: bool,
107 #[pyo3(get)]
108 pub has_json_output: bool,
109}
110
111#[pymethods]
112impl ImportCheckResult {
113 fn __repr__(&self) -> String {
114 format!(
115 "ImportCheckResult(hook_io={}, logging={}, config={}, git={}, \
116 good_out={}, bad_out={}, json_out={})",
117 self.has_hook_io,
118 self.has_hook_logging,
119 self.has_config_loader,
120 self.has_git_utils,
121 self.has_good_output,
122 self.has_bad_output,
123 self.has_json_output
124 )
125 }
126
127 /// Check if the hook uses recommended output patterns
128 fn uses_recommended_output(&self) -> bool {
129 self.has_good_output && !self.has_bad_output
130 }
131
132 /// Check if the hook has all required imports
133 fn has_required_imports(&self) -> bool {
134 self.has_hook_io
135 }
136}
137
138/// Batch validation result for multiple files
139#[pyclass]
140#[derive(Clone, Debug)]
141pub struct BatchValidationResult {
142 #[pyo3(get)]
143 pub results: HashMap<String, ImportCheckResult>,
144 #[pyo3(get)]
145 pub valid_names: HashMap<String, bool>,
146}
147
148#[pymethods]
149impl BatchValidationResult {
150 fn __repr__(&self) -> String {
151 format!("BatchValidationResult({} files)", self.results.len())
152 }
153
154 /// Get list of files missing hook_io import
155 fn files_missing_hook_io(&self) -> Vec<String> {
156 self.results
157 .iter()
158 .filter(|(_, r)| !r.has_hook_io)
159 .map(|(path, _)| path.clone())
160 .collect()
161 }
162
163 /// Get list of files using bad output patterns
164 fn files_with_bad_output(&self) -> Vec<String> {
165 self.results
166 .iter()
167 .filter(|(_, r)| r.has_bad_output)
168 .map(|(path, _)| path.clone())
169 .collect()
170 }
171
172 /// Get list of files with invalid names
173 fn files_with_invalid_names(&self) -> Vec<String> {
174 self.valid_names
175 .iter()
176 .filter(|(_, valid)| !*valid)
177 .map(|(path, _)| path.clone())
178 .collect()
179 }
180
181 /// Get summary statistics
182 fn summary(&self) -> HashMap<String, usize> {
183 let mut stats = HashMap::new();
184 stats.insert("total".to_string(), self.results.len());
185 stats.insert(
186 "missing_hook_io".to_string(),
187 self.files_missing_hook_io().len(),
188 );
189 stats.insert(
190 "bad_output".to_string(),
191 self.files_with_bad_output().len(),
192 );
193 stats.insert(
194 "invalid_names".to_string(),
195 self.files_with_invalid_names().len(),
196 );
197 stats
198 }
199}
200
201// ============================================================================
202// Public API Functions
203// ============================================================================
204
205/// Check all import patterns in source code
206///
207/// This function performs all pattern checks in a single pass through
208/// the content, making it much more efficient than individual checks.
209///
210/// # Arguments
211/// * `content` - The source code content to check
212///
213/// # Returns
214/// * `ImportCheckResult` - Results of all pattern checks
215#[pyfunction]
216pub fn check_imports(content: &str) -> ImportCheckResult {
217 ImportCheckResult {
218 has_hook_io: HOOK_IO_REGEX.is_match(content),
219 has_hook_logging: HOOK_LOGGING_REGEX.is_match(content),
220 has_config_loader: CONFIG_LOADER_REGEX.is_match(content),
221 has_git_utils: GIT_UTILS_REGEX.is_match(content),
222 has_good_output: OUTPUT_REGEX.is_match(content),
223 has_bad_output: BAD_OUTPUT_REGEX.is_match(content),
224 has_json_output: JSON_OUTPUT_REGEX.is_match(content),
225 }
226}
227
228/// Validate filename against naming convention
229///
230/// Valid names must:
231/// - Start and end with lowercase alphanumeric
232/// - Contain only lowercase letters, numbers, hyphens, underscores
233/// - Have .py extension
234///
235/// # Arguments
236/// * `filename` - The filename to validate (just the name, not full path)
237///
238/// # Returns
239/// * `bool` - True if the filename is valid
240#[pyfunction]
241pub fn is_valid_hook_name(filename: &str) -> bool {
242 VALID_NAME_REGEX.is_match(filename)
243}
244
245/// Get indices of matched patterns in a pattern group
246///
247/// Useful for detailed reporting of which specific patterns matched.
248///
249/// # Arguments
250/// * `content` - The source code content to check
251/// * `pattern_group` - One of: "hook_io", "hook_logging", "config_loader",
252/// "git_utils", "output", "bad_output", "json_output"
253///
254/// # Returns
255/// * `Vec<usize>` - Indices of patterns that matched
256#[pyfunction]
257pub fn get_matched_patterns(content: &str, pattern_group: &str) -> Vec<usize> {
258 let regex_set: &RegexSet = match pattern_group {
259 "hook_io" => &HOOK_IO_REGEX,
260 "hook_logging" => &HOOK_LOGGING_REGEX,
261 "config_loader" => &CONFIG_LOADER_REGEX,
262 "git_utils" => &GIT_UTILS_REGEX,
263 "output" => &OUTPUT_REGEX,
264 "bad_output" => &BAD_OUTPUT_REGEX,
265 "json_output" => &JSON_OUTPUT_REGEX,
266 _ => return vec![],
267 };
268
269 regex_set.matches(content).iter().collect()
270}
271
272/// Validate multiple files in a single batch operation
273///
274/// This is significantly more efficient than validating files one by one,
275/// especially when dealing with many files.
276///
277/// # Arguments
278/// * `files` - HashMap of file paths to their contents
279///
280/// # Returns
281/// * `BatchValidationResult` - Combined results for all files
282#[pyfunction]
283pub fn validate_batch(files: HashMap<String, String>) -> BatchValidationResult {
284 let results: HashMap<String, ImportCheckResult> = files
285 .iter()
286 .map(|(path, content)| (path.clone(), check_imports(content)))
287 .collect();
288
289 let valid_names: HashMap<String, bool> = files
290 .keys()
291 .map(|path| {
292 // Extract filename from path
293 let filename = path.rsplit('/').next().unwrap_or(path);
294 (path.clone(), is_valid_hook_name(filename))
295 })
296 .collect();
297
298 BatchValidationResult {
299 results,
300 valid_names,
301 }
302}
303
304/// Check if content contains specific import pattern (simple check)
305///
306/// # Arguments
307/// * `content` - The source code to check
308/// * `module_name` - The module to check for: "hook_io", "hook_logging", etc.
309///
310/// # Returns
311/// * `bool` - True if the import pattern is found
312#[pyfunction]
313pub fn has_import(content: &str, module_name: &str) -> bool {
314 match module_name {
315 "hook_io" => HOOK_IO_REGEX.is_match(content),
316 "hook_logging" => HOOK_LOGGING_REGEX.is_match(content),
317 "config_loader" => CONFIG_LOADER_REGEX.is_match(content),
318 "git_utils" => GIT_UTILS_REGEX.is_match(content),
319 _ => false,
320 }
321}
322
323// ============================================================================
324// Python Module Definition
325// ============================================================================
326
327/// Rust-accelerated hook validator with pre-compiled regex patterns
328///
329/// This module provides significant performance improvements over pure Python
330/// regex for validating Claude Code hook files. Key features:
331///
332/// - Pre-compiled regex patterns using once_cell
333/// - RegexSet for efficient multi-pattern matching
334/// - Batch validation API for multiple files
335/// - Guaranteed linear time complexity (DFA engine)
336#[pymodule]
337fn hook_validator_rs(m: &Bound<'_, PyModule>) -> PyResult<()> {
338 m.add_function(wrap_pyfunction!(check_imports, m)?)?;
339 m.add_function(wrap_pyfunction!(is_valid_hook_name, m)?)?;
340 m.add_function(wrap_pyfunction!(get_matched_patterns, m)?)?;
341 m.add_function(wrap_pyfunction!(validate_batch, m)?)?;
342 m.add_function(wrap_pyfunction!(has_import, m)?)?;
343 m.add_class::<ImportCheckResult>()?;
344 m.add_class::<BatchValidationResult>()?;
345 Ok(())
346}建置與測試
1# Build the extension
2maturin develop --release
3
4# Run tests
5python -c "
6import hook_validator_rs as rs
7
8# Test basic import checking
9content = '''
10from hook_io import read_hook_input, write_hook_output
11from hook_logging import setup_hook_logging
12'''
13
14result = rs.check_imports(content)
15print(f'Import check: {result}')
16print(f'Has hook_io: {result.has_hook_io}')
17print(f'Uses recommended output: {result.uses_recommended_output()}')
18
19# Test filename validation
20print(f'Valid name \"check-permissions.py\": {rs.is_valid_hook_name(\"check-permissions.py\")}')
21print(f'Valid name \"BadName.py\": {rs.is_valid_hook_name(\"BadName.py\")}')
22"效能比較
1"""Performance comparison: Python re vs Rust regex"""
2
3import time
4import re
5from typing import Callable
6
7def benchmark(name: str, func: Callable, iterations: int = 10000) -> float:
8 """Run benchmark and return average time in microseconds"""
9 start = time.perf_counter()
10 for _ in range(iterations):
11 func()
12 elapsed = time.perf_counter() - start
13 avg_us = (elapsed / iterations) * 1_000_000
14 print(f"{name}: {avg_us:.2f} us/iteration ({iterations} iterations)")
15 return avg_us
16
17# Test content (typical hook file)
18TEST_CONTENT = '''
19#!/usr/bin/env python3
20"""Example hook for testing performance"""
21
22import json
23import sys
24from pathlib import Path
25
26from hook_io import read_hook_input, write_hook_output
27from hook_logging import setup_hook_logging
28from config_loader import load_config
29
30def main():
31 logger = setup_hook_logging("example-hook")
32 hook_input = read_hook_input()
33
34 # Process input
35 result = {"decision": "approve"}
36
37 write_hook_output(result)
38
39if __name__ == "__main__":
40 main()
41'''
42
43# Python patterns
44HOOK_IO_PATTERNS_PY = [
45 r"from\s+hook_io\s+import",
46 r"from\s+lib\.hook_io\s+import",
47]
48HOOK_LOGGING_PATTERNS_PY = [
49 r"from\s+hook_logging\s+import",
50 r"from\s+lib\.hook_logging\s+import",
51]
52
53def python_check():
54 """Pure Python regex check"""
55 has_hook_io = any(
56 re.search(p, TEST_CONTENT) for p in HOOK_IO_PATTERNS_PY
57 )
58 has_logging = any(
59 re.search(p, TEST_CONTENT) for p in HOOK_LOGGING_PATTERNS_PY
60 )
61 return has_hook_io, has_logging
62
63def python_check_compiled():
64 """Python regex with pre-compiled patterns"""
65 global _compiled_hook_io, _compiled_logging
66 has_hook_io = any(p.search(TEST_CONTENT) for p in _compiled_hook_io)
67 has_logging = any(p.search(TEST_CONTENT) for p in _compiled_logging)
68 return has_hook_io, has_logging
69
70# Pre-compile Python patterns
71_compiled_hook_io = [re.compile(p) for p in HOOK_IO_PATTERNS_PY]
72_compiled_logging = [re.compile(p) for p in HOOK_LOGGING_PATTERNS_PY]
73
74def rust_check():
75 """Rust regex check"""
76 import hook_validator_rs as rs
77 result = rs.check_imports(TEST_CONTENT)
78 return result.has_hook_io, result.has_hook_logging
79
80if __name__ == "__main__":
81 print("=" * 60)
82 print("Performance Comparison: Python re vs Rust regex")
83 print("=" * 60)
84 print(f"Content size: {len(TEST_CONTENT)} bytes\n")
85
86 # Warm up
87 python_check()
88 python_check_compiled()
89 rust_check()
90
91 # Benchmark
92 py_time = benchmark("Python re (uncompiled)", python_check)
93 py_compiled_time = benchmark("Python re (compiled)", python_check_compiled)
94 rust_time = benchmark("Rust regex", rust_check)
95
96 print("\n" + "=" * 60)
97 print("Results Summary")
98 print("=" * 60)
99 print(f"Python uncompiled: {py_time:.2f} us")
100 print(f"Python compiled: {py_compiled_time:.2f} us")
101 print(f"Rust regex: {rust_time:.2f} us")
102 print(f"\nSpeedup vs uncompiled: {py_time / rust_time:.1f}x")
103 print(f"Speedup vs compiled: {py_compiled_time / rust_time:.1f}x")典型結果:
1============================================================
2Performance Comparison: Python re vs Rust regex
3============================================================
4Content size: 512 bytes
5
6Python re (uncompiled): 12.45 us/iteration (10000 iterations)
7Python re (compiled): 4.32 us/iteration (10000 iterations)
8Rust regex: 0.89 us/iteration (10000 iterations)
9
10============================================================
11Results Summary
12============================================================
13Python uncompiled: 12.45 us
14Python compiled: 4.32 us
15Rust regex: 0.89 us
16
17Speedup vs uncompiled: 14.0x
18Speedup vs compiled: 4.9x病態輸入效能比較
1"""Pathological input benchmark - demonstrating DFA vs backtracking"""
2
3import time
4import re
5
6def test_catastrophic_backtracking():
7 """
8 Test pattern that causes catastrophic backtracking in NFA engines
9
10 Pattern: (a+)+b
11 Input: "aaa...a" (no 'b' at end)
12
13 Python re: O(2^n) time complexity
14 Rust regex: O(n) time complexity (DFA engine)
15 """
16 pattern = r"(a+)+b"
17
18 print("Catastrophic Backtracking Test")
19 print("Pattern: (a+)+b")
20 print("-" * 50)
21
22 for n in [15, 20, 22, 24, 25]:
23 text = "a" * n + "c" # No match - triggers backtracking
24
25 # Python test
26 start = time.perf_counter()
27 try:
28 re.search(pattern, text, timeout=5)
29 except TimeoutError:
30 py_time = ">5s (timeout)"
31 else:
32 py_time = f"{(time.perf_counter() - start)*1000:.2f}ms"
33
34 # Note: Rust regex doesn't support backreferences,
35 # so (a+)+b is rewritten as a+b internally
36 # This demonstrates why Rust regex is safe from this attack
37
38 print(f"n={n:2d}: Python={py_time}")
39
40def test_regex_dos():
41 """
42 Test ReDoS (Regular Expression Denial of Service) patterns
43 """
44 # Common ReDoS patterns
45 redos_patterns = [
46 (r"(a+)+$", "a" * 20 + "!"), # Nested quantifiers
47 (r"(a|aa)+$", "a" * 20 + "!"), # Overlapping alternatives
48 (r"(.*a){10}$", "a" * 10 + "!"), # Repeated wildcards
49 ]
50
51 print("\nReDoS Pattern Tests")
52 print("-" * 50)
53
54 for pattern, text in redos_patterns:
55 start = time.perf_counter()
56 re.search(pattern, text)
57 elapsed = time.perf_counter() - start
58 print(f"Pattern: {pattern:20s} Time: {elapsed*1000:.2f}ms")
59
60if __name__ == "__main__":
61 test_catastrophic_backtracking()
62 test_regex_dos()
63
64 print("\n" + "=" * 50)
65 print("Note: Rust regex crate uses DFA/hybrid engine")
66 print("that guarantees O(n) time complexity for all inputs.")
67 print("It does NOT support backreferences, which prevents")
68 print("catastrophic backtracking by design.")設計權衡
| 面向 | Python re | Rust regex |
|---|---|---|
| 引擎類型 | NFA with backtracking | DFA/混合引擎 |
| 時間複雜度 | 最壞 O(2^n) | 保證 O(n) |
| 功能完整性 | 完整(lookahead、backreference) | 部分限制(無 backreference) |
| 整合難度 | 無(內建) | 需要 FFI(PyO3 + Maturin) |
| 除錯便利 | Python 原生 | 需要 Rust 工具鏈 |
| 記憶體安全 | GC 管理 | 編譯時保證 |
| 多執行緒 | 受 GIL 限制 | 完全平行化 |
| SIMD 加速 | 無 | 自動啟用 |
Rust regex 不支援的功能
1// These patterns will fail to compile in Rust regex:
2
3// 1. Backreferences
4// r"(\w+)\s+\1" // ERROR: backreference not supported
5
6// 2. Lookahead/Lookbehind
7// r"(?=foo)" // ERROR: lookahead not supported
8// r"(?<=foo)" // ERROR: lookbehind not supported
9
10// 3. Atomic groups
11// r"(?>foo)" // ERROR: atomic groups not supported
12
13// Workaround: Use regex-fancy crate for these features
14// (with performance trade-offs)
什麼時候該用 Rust regex?
適合使用
- 大量文字需要驗證:日誌分析、程式碼審查、批次處理
- 正則表達式可能有病態輸入:用戶提供的輸入、不可信來源
- 需要保證線性時間:安全性要求、SLA 保證
- 高併發場景:多執行緒處理、Web 服務
- 效能關鍵路徑:CI/CD pipeline、即時驗證
不建議使用
- 需要 lookahead/lookbehind:複雜的文字邊界檢查
- 需要 backreference:重複單詞檢測、HTML 標籤匹配
- 驗證次數很少:一次性腳本、開發階段
- 模式簡單:固定字串、簡單前綴/後綴檢查
- 團隊不熟悉 Rust:維護成本可能超過效能收益
練習
1. 基礎練習:Email 驗證
用 Rust regex 實作 email 地址驗證:
1// Exercise: Implement email validation
2use once_cell::sync::Lazy;
3use regex::Regex;
4use pyo3::prelude::*;
5
6static EMAIL_REGEX: Lazy<Regex> = Lazy::new(|| {
7 // TODO: Implement email pattern
8 // Requirements:
9 // - Local part: alphanumeric + dots + underscores + hyphens
10 // - @ symbol
11 // - Domain: alphanumeric + dots + hyphens
12 // - TLD: 2-6 alphabetic characters
13 Regex::new(r"TODO").expect("Invalid email regex")
14});
15
16#[pyfunction]
17pub fn is_valid_email(email: &str) -> bool {
18 EMAIL_REGEX.is_match(email)
19}
20
21// Test cases:
22// is_valid_email("user@example.com") -> true
23// is_valid_email("user.name+tag@example.co.uk") -> true
24// is_valid_email("invalid@") -> false
25// is_valid_email("@example.com") -> false
2. 進階練習:RegexSet 批次匹配
實作一個程式語言檢測器,判斷程式碼片段是哪種語言:
1// Exercise: Language detection using RegexSet
2use once_cell::sync::Lazy;
3use regex::RegexSet;
4use pyo3::prelude::*;
5use std::collections::HashMap;
6
7static LANGUAGE_PATTERNS: Lazy<RegexSet> = Lazy::new(|| {
8 RegexSet::new([
9 // TODO: Add patterns for different languages
10 // 0: Python (def, import, from ... import)
11 // 1: JavaScript (const, let, =>)
12 // 2: Rust (fn, let mut, impl)
13 // 3: Go (func, package, import)
14 ]).expect("Invalid language patterns")
15});
16
17static LANGUAGE_NAMES: [&str; 4] = ["Python", "JavaScript", "Rust", "Go"];
18
19#[pyfunction]
20pub fn detect_languages(code: &str) -> Vec<String> {
21 // TODO: Return list of detected languages
22 // Hint: Use LANGUAGE_PATTERNS.matches(code)
23 vec![]
24}
25
26// Test case:
27// detect_languages("def hello():\n print('Hi')") -> ["Python"]
28// detect_languages("const x = () => {}") -> ["JavaScript"]
3. 挑戰題:病態輸入防護
設計一個安全的正則表達式驗證器,拒絕可能導致 ReDoS 的模式:
1// Challenge: ReDoS-safe regex validator
2use pyo3::prelude::*;
3
4/// Validate that a regex pattern is safe from ReDoS attacks
5///
6/// Unsafe patterns to detect:
7/// 1. Nested quantifiers: (a+)+
8/// 2. Overlapping alternatives: (a|a)+
9/// 3. Long quantified groups with wildcards: (.*)+
10#[pyfunction]
11pub fn is_safe_pattern(pattern: &str) -> PyResult<bool> {
12 // Strategy 1: Try to compile with Rust regex
13 // Rust regex rejects inherently unsafe patterns
14 match regex::Regex::new(pattern) {
15 Ok(_) => Ok(true),
16 Err(e) => {
17 // Check if error is due to unsupported features
18 // vs actual syntax errors
19 let error_msg = e.to_string();
20 if error_msg.contains("backreference")
21 || error_msg.contains("look") {
22 // Potentially unsafe pattern
23 Ok(false)
24 } else {
25 // Syntax error
26 Err(pyo3::exceptions::PyValueError::new_err(error_msg))
27 }
28 }
29 }
30}
31
32/// Benchmark a pattern to detect slow execution
33#[pyfunction]
34pub fn benchmark_pattern(
35 pattern: &str,
36 test_input: &str,
37 max_ms: u64
38) -> PyResult<bool> {
39 // TODO: Implement timeout-based safety check
40 // 1. Compile the pattern
41 // 2. Run match with timeout
42 // 3. Return false if exceeds max_ms
43 Ok(true)
44}延伸閱讀
- Rust regex crate 文件 - 完整的 API 文件與效能說明
- 正則表達式引擎比較 - Russ Cox 的經典系列文章
- PyO3 User Guide - PyO3 完整教學
- once_cell crate - 延遲初始化最佳實踐
- ReDoS 攻擊與防護 - OWASP 安全指南
上一章:PyO3 文字解析 返回:模組六:用 Rust 擴展 Python