1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| class AttentionSharingDetector: """注意力共享检测器""" def __init__(self): self.gaze_estimator = GazeEstimator() self.head_pose_estimator = HeadPoseEstimator() self.context_analyzer = ContextAnalyzer() self.attention_threshold = 0.7 def detect_attention_sharing(self, frame: np.ndarray, driving_context: dict) -> dict: """检测注意力共享 Args: frame: 当前帧 driving_context: { 'road_scenario': str, 'vehicle_speed': float, 'steering_input': float } Returns: { 'attention_score': float, # 0-1 'is_sharing_attention': bool, 'sharing_duration': float, 'warning_level': str } """ gaze = self.gaze_estimator.estimate(frame) head_pose = self.head_pose_estimator.estimate(frame) context_score = self.context_analyzer.analyze( gaze, head_pose, driving_context ) attention_score = self._compute_attention_score( gaze, head_pose, context_score ) is_sharing = attention_score < self.attention_threshold return { 'attention_score': attention_score, 'is_sharing_attention': is_sharing, 'sharing_duration': self._compute_duration(is_sharing), 'warning_level': self._get_warning_level(attention_score) } def _compute_attention_score(self, gaze, head_pose, context) -> float: """计算注意力分数""" gaze_stability = 1.0 - min(gaze['variance'] / 10, 1.0) head_stability = 1.0 - min(head_pose['movement_rate'] / 5, 1.0) context_score = context attention_score = ( 0.4 * gaze_stability + 0.3 * head_stability + 0.3 * context_score ) return attention_score def _compute_duration(self, is_sharing: bool) -> float: """计算持续时间""" return 0.0 def _get_warning_level(self, score: float) -> str: """获取警告级别""" if score < 0.5: return 'CRITICAL' elif score < 0.7: return 'WARNING' else: return 'NORMAL'
|