1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| import numpy as np from typing import Optional
class PrivacyCompliantDMS: """符合 GSR/GDPR 的 DMS 实现""" def __init__(self, model_path: str, config: dict): self.model = self._load_model(model_path) self.config = config self.logs = [] self.max_log_entries = config.get("max_log_entries", 1000) self.retention_hours = config.get("retention_hours", 24) def process_frame(self, frame: np.ndarray) -> dict: """ 处理单帧,返回状态结果 关键:不保存原始图像 """ features = self._extract_features(frame) state = self._classify_state(features) self._log_result( timestamp=time.time(), state_type=state["type"], confidence=state["confidence"] ) return { "state": state["type"], "confidence": state["confidence"], "should_warn": state["should_warn"] } def _extract_features(self, frame: np.ndarray) -> dict: """提取特征,不保存图像""" return { "eye_closure": eye_closure, "gaze_direction": gaze_direction, "head_pose": head_pose } def _classify_state(self, features: dict) -> dict: """分类状态""" return { "type": state_type, "confidence": confidence, "should_warn": should_warn } def _log_result(self, timestamp: float, state_type: str, confidence: float): """记录脱敏结果""" self.logs.append({ "timestamp": timestamp, "state_type": state_type, "confidence": confidence, }) cutoff = time.time() - self.retention_hours * 3600 self.logs = [ log for log in self.logs if log["timestamp"] > cutoff ] def get_logs(self, start_time: Optional[float] = None, end_time: Optional[float] = None) -> list: """获取日志(用于用户查看)""" if start_time is None and end_time is None: return self.logs.copy() return [ log for log in self.logs if start_time <= log["timestamp"] <= end_time ] def delete_all_logs(self): """删除所有日志(用户请求时)""" self.logs.clear() return True
|