认知分心检测:眼动行为与时空信息融合方法

认知分心检测:眼动行为与时空信息融合方法

论文来源: Springer Nature, Neural Information Processing 2026
作者: Qiao, Y., Yang, X., Wang, J., Si, T., Guo, Q.
核心贡献: 眼动行为 + 时空信息融合的认知分心检测


研究背景

认知分心 vs 视觉分心

类型 定义 检测难点
视觉分心 眼睛离开道路 ✅ 易检测(视线追踪)
认知分心 思维走神,眼睛在路上 ❌ 难检测(需行为分析)

认知分心的核心特征:

1
2
3
4
5
6
7
8
视觉分心:眼睛偏离道路 → 直接检测
认知分心:眼睛在道路上,但大脑不在线 → 间接推断

关键洞察:
1. 扫视模式改变
2. 注视分布异常
3. 眨眼频率变化
4. 瞳孔直径波动

核心方法

1. 眼动行为特征

扫视(Saccade)分析:

特征 正常驾驶 认知分心
扫视频率 3-5 次/秒 ↓ 降低
扫视幅度 5-15° ↓ 变小
扫视速度 200-500°/s ↓ 变慢
扫视潜伏期 150-250ms ↑ 延长

代码实现:

import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum

class EyeMovementType(Enum):
    """眼动类型"""
    FIXATION = "fixation"
    SACCADE = "saccade"
    BLINK = "blink"
    SMOOTH_PURSUIT = "smooth_pursuit"


@dataclass
class GazePoint:
    """注视点"""
    x: float  # 像素坐标
    y: float
    timestamp: float  # 秒
    pupil_diameter: float  # mm


@dataclass
class EyeMovement:
    """眼动事件"""
    movement_type: EyeMovementType
    start_time: float
    end_time: float
    start_pos: Tuple[float, float]
    end_pos: Tuple[float, float]
    amplitude: float  # 度
    velocity: float  # 度/秒


class SaccadeDetector:
    """
    扫视检测器
    
    基于 I-VT(速度阈值)算法
    """
    
    def __init__(
        self,
        velocity_threshold: float = 30.0,  # 度/秒
        min_duration: float = 0.010,  # 最小持续时间 (秒)
        max_duration: float = 0.100,  # 最大持续时间 (秒)
        screen_distance: float = 0.6,  # 屏幕距离 (米)
        screen_width: float = 0.5  # 屏幕宽度 (米)
    ):
        self.velocity_threshold = velocity_threshold
        self.min_duration = min_duration
        self.max_duration = max_duration
        self.screen_distance = screen_distance
        self.screen_width = screen_width
        
        # 像素到度的转换系数
        self.pixels_per_degree = self._calculate_pixels_per_degree()
    
    def _calculate_pixels_per_degree(self) -> float:
        """计算像素到度的转换系数"""
        # 假设屏幕分辨率
        # 视角 = arctan(物体尺寸 / 距离)
        # 简化:1度 ≈ 屏幕距离 × tan(1°)
        import math
        one_degree_in_meters = self.screen_distance * math.tan(math.radians(1))
        
        # 假设屏幕宽度对应 1920 像素
        pixels_per_meter = 1920 / self.screen_width
        
        return one_degree_in_meters * pixels_per_meter
    
    def detect_saccades(
        self,
        gaze_sequence: List[GazePoint]
    ) -> List[EyeMovement]:
        """
        检测扫视事件
        
        Args:
            gaze_sequence: 注视点序列
            
        Returns:
            saccades: 扫视事件列表
        """
        if len(gaze_sequence) < 2:
            return []
        
        # 计算速度
        velocities = []
        
        for i in range(1, len(gaze_sequence)):
            # 位移(像素)
            dx = gaze_sequence[i].x - gaze_sequence[i-1].x
            dy = gaze_sequence[i].y - gaze_sequence[i-1].y
            distance_pixels = np.sqrt(dx**2 + dy**2)
            
            # 转换为度
            distance_degrees = distance_pixels / self.pixels_per_degree
            
            # 时间间隔
            dt = gaze_sequence[i].timestamp - gaze_sequence[i-1].timestamp
            
            if dt > 0:
                velocity = distance_degrees / dt
            else:
                velocity = 0
            
            velocities.append(velocity)
        
        # 基于速度阈值分类
        saccades = []
        in_saccade = False
        saccade_start_idx = 0
        
        for i, velocity in enumerate(velocities):
            if velocity > self.velocity_threshold and not in_saccade:
                # 扫视开始
                in_saccade = True
                saccade_start_idx = i
            
            elif velocity <= self.velocity_threshold and in_saccade:
                # 扫视结束
                in_saccade = False
                
                # 检查持续时间
                duration = (
                    gaze_sequence[i].timestamp - 
                    gaze_sequence[saccade_start_idx].timestamp
                )
                
                if self.min_duration <= duration <= self.max_duration:
                    # 计算扫视参数
                    start_pos = (
                        gaze_sequence[saccade_start_idx].x,
                        gaze_sequence[saccade_start_idx].y
                    )
                    end_pos = (gaze_sequence[i].x, gaze_sequence[i].y)
                    
                    # 幅度
                    dx = end_pos[0] - start_pos[0]
                    dy = end_pos[1] - start_pos[1]
                    amplitude = np.sqrt(dx**2 + dy**2) / self.pixels_per_degree
                    
                    # 平均速度
                    avg_velocity = amplitude / duration if duration > 0 else 0
                    
                    saccades.append(EyeMovement(
                        movement_type=EyeMovementType.SACCADE,
                        start_time=gaze_sequence[saccade_start_idx].timestamp,
                        end_time=gaze_sequence[i].timestamp,
                        start_pos=start_pos,
                        end_pos=end_pos,
                        amplitude=amplitude,
                        velocity=avg_velocity
                    ))
        
        return saccades
    
    def extract_saccade_features(
        self,
        saccades: List[EyeMovement],
        window_duration: float = 60.0
    ) -> dict:
        """
        提取扫视特征
        
        Args:
            saccades: 扫视事件列表
            window_duration: 时间窗口 (秒)
            
        Returns:
            features: 扫视特征字典
        """
        if not saccades:
            return {
                "saccade_count": 0,
                "saccade_rate": 0,
                "mean_amplitude": 0,
                "std_amplitude": 0,
                "mean_velocity": 0,
                "std_velocity": 0,
                "mean_duration": 0
            }
        
        # 计数
        saccade_count = len(saccades)
        saccade_rate = saccade_count / window_duration
        
        # 幅度统计
        amplitudes = [s.amplitude for s in saccades]
        mean_amplitude = np.mean(amplitudes)
        std_amplitude = np.std(amplitudes)
        
        # 速度统计
        velocities = [s.velocity for s in saccades]
        mean_velocity = np.mean(velocities)
        std_velocity = np.std(velocities)
        
        # 持续时间
        durations = [s.end_time - s.start_time for s in saccades]
        mean_duration = np.mean(durations)
        
        return {
            "saccade_count": saccade_count,
            "saccade_rate": saccade_rate,
            "mean_amplitude": mean_amplitude,
            "std_amplitude": std_amplitude,
            "mean_velocity": mean_velocity,
            "std_velocity": std_velocity,
            "mean_duration": mean_duration
        }


class CognitiveDistractionDetector:
    """
    认知分心检测器
    
    基于眼动行为和时空信息融合
    """
    
    # 正常驾驶基线(来自研究数据)
    NORMAL_BASELINE = {
        "saccade_rate": 4.0,  # Hz
        "mean_amplitude": 8.0,  # 度
        "mean_velocity": 300.0,  # 度/秒
        "pupil_diameter_mean": 4.0,  # mm
        "blink_rate": 0.3  # Hz
    }
    
    def __init__(self):
        self.saccade_detector = SaccadeDetector()
        
        # 历史缓存
        self.feature_history = []
        self.history_length = 30  # 30秒历史
    
    def extract_features(
        self,
        gaze_sequence: List[GazePoint]
    ) -> dict:
        """
        提取完整特征集
        
        Args:
            gaze_sequence: 注视点序列
            
        Returns:
            features: 特征字典
        """
        # 1. 扫视特征
        saccades = self.saccade_detector.detect_saccades(gaze_sequence)
        
        if gaze_sequence:
            window_duration = (
                gaze_sequence[-1].timestamp - gaze_sequence[0].timestamp
            )
        else:
            window_duration = 60.0
        
        saccade_features = self.saccade_detector.extract_saccade_features(
            saccades, window_duration
        )
        
        # 2. 注视特征
        fixation_features = self._extract_fixation_features(gaze_sequence)
        
        # 3. 瞳孔特征
        pupil_features = self._extract_pupil_features(gaze_sequence)
        
        # 4. 眨眼特征
        blink_features = self._extract_blink_features(gaze_sequence)
        
        # 合并
        features = {
            **saccade_features,
            **fixation_features,
            **pupil_features,
            **blink_features
        }
        
        return features
    
    def _extract_fixation_features(
        self,
        gaze_sequence: List[GazePoint]
    ) -> dict:
        """提取注视特征"""
        if len(gaze_sequence) < 10:
            return {
                "fixation_count": 0,
                "mean_fixation_duration": 0,
                "gaze_dispersion": 0
            }
        
        # 简化的注视检测:速度 < 阈值的连续点
        fixations = []
        in_fixation = False
        fixation_start = 0
        
        for i in range(1, len(gaze_sequence)):
            dx = gaze_sequence[i].x - gaze_sequence[i-1].x
            dy = gaze_sequence[i].y - gaze_sequence[i-1].y
            distance = np.sqrt(dx**2 + dy**2)
            
            dt = gaze_sequence[i].timestamp - gaze_sequence[i-1].timestamp
            velocity = distance / dt if dt > 0 else 0
            
            # 注视阈值:30 像素/秒(约 1 度/秒)
            if velocity < 30 and not in_fixation:
                in_fixation = True
                fixation_start = i
            elif velocity >= 30 and in_fixation:
                in_fixation = False
                fixations.append((fixation_start, i))
        
        # 计算特征
        if fixations:
            durations = [
                gaze_sequence[end].timestamp - gaze_sequence[start].timestamp
                for start, end in fixations
            ]
            mean_duration = np.mean(durations)
        else:
            mean_duration = 0
        
        # 注视分散度
        x_coords = [g.x for g in gaze_sequence]
        y_coords = [g.y for g in gaze_sequence]
        gaze_dispersion = np.std(x_coords) + np.std(y_coords)
        
        return {
            "fixation_count": len(fixations),
            "mean_fixation_duration": mean_duration,
            "gaze_dispersion": gaze_dispersion
        }
    
    def _extract_pupil_features(
        self,
        gaze_sequence: List[GazePoint]
    ) -> dict:
        """提取瞳孔特征"""
        if not gaze_sequence:
            return {
                "pupil_diameter_mean": 0,
                "pupil_diameter_std": 0,
                "pupil_variability": 0
            }
        
        diameters = [g.pupil_diameter for g in gaze_sequence]
        
        return {
            "pupil_diameter_mean": np.mean(diameters),
            "pupil_diameter_std": np.std(diameters),
            "pupil_variability": np.std(diameters) / np.mean(diameters) if np.mean(diameters) > 0 else 0
        }
    
    def _extract_blink_features(
        self,
        gaze_sequence: List[GazePoint]
    ) -> dict:
        """提取眨眼特征"""
        # 简化:通过瞳孔直径突变检测眨眼
        if len(gaze_sequence) < 10:
            return {"blink_count": 0, "blink_rate": 0}
        
        blink_count = 0
        for i in range(1, len(gaze_sequence)):
            # 瞳孔直径突然变小接近0 → 眨眼
            if (gaze_sequence[i].pupil_diameter < 1.0 and
                gaze_sequence[i-1].pupil_diameter > 2.0):
                blink_count += 1
        
        duration = (
            gaze_sequence[-1].timestamp - gaze_sequence[0].timestamp
        ) if len(gaze_sequence) > 1 else 1.0
        
        blink_rate = blink_count / duration if duration > 0 else 0
        
        return {
            "blink_count": blink_count,
            "blink_rate": blink_rate
        }
    
    def detect_cognitive_distraction(
        self,
        features: dict
    ) -> Tuple[bool, float, str]:
        """
        检测认知分心
        
        Args:
            features: 特征字典
            
        Returns:
            (is_distracted, confidence, reason)
        """
        # 与基线比较
        deviations = {}
        
        # 扫视频率下降
        if features["saccade_rate"] > 0:
            saccade_rate_deviation = (
                self.NORMAL_BASELINE["saccade_rate"] - features["saccade_rate"]
            ) / self.NORMAL_BASELINE["saccade_rate"]
            deviations["saccade_rate"] = saccade_rate_deviation
        
        # 扫视幅度下降
        if features["mean_amplitude"] > 0:
            amplitude_deviation = (
                self.NORMAL_BASELINE["mean_amplitude"] - features["mean_amplitude"]
            ) / self.NORMAL_BASELINE["mean_amplitude"]
            deviations["amplitude"] = amplitude_deviation
        
        # 扫视速度下降
        if features["mean_velocity"] > 0:
            velocity_deviation = (
                self.NORMAL_BASELINE["mean_velocity"] - features["mean_velocity"]
            ) / self.NORMAL_BASELINE["mean_velocity"]
            deviations["velocity"] = velocity_deviation
        
        # 瞳孔直径变化
        if features["pupil_diameter_mean"] > 0:
            pupil_deviation = (
                features["pupil_diameter_mean"] - self.NORMAL_BASELINE["pupil_diameter_mean"]
            ) / self.NORMAL_BASELINE["pupil_diameter_mean"]
            deviations["pupil"] = pupil_deviation
        
        # 综合评分
        # 权重:扫视特征 60%,瞳孔 40%
        cognitive_score = (
            deviations.get("saccade_rate", 0) * 0.25 +
            deviations.get("amplitude", 0) * 0.20 +
            deviations.get("velocity", 0) * 0.15 +
            deviations.get("pupil", 0) * 0.40
        )
        
        # 判定
        is_distracted = cognitive_score > 0.3  # 偏离 30% 以上
        confidence = min(1.0, cognitive_score / 0.5)  # 归一化
        
        # 原因
        reasons = []
        if deviations.get("saccade_rate", 0) > 0.3:
            reasons.append("扫视频率显著下降")
        if deviations.get("amplitude", 0) > 0.3:
            reasons.append("扫视幅度显著下降")
        if deviations.get("velocity", 0) > 0.3:
            reasons.append("扫视速度显著下降")
        if deviations.get("pupil", 0) > 0.3:
            reasons.append("瞳孔直径异常")
        
        reason = ";".join(reasons) if reasons else "正常"
        
        return is_distracted, confidence, reason


# 测试
if __name__ == "__main__":
    # 模拟眼动数据
    np.random.seed(42)
    
    # 正常驾驶
    normal_gaze = [
        GazePoint(
            x=960 + np.random.normal(0, 50),
            y=540 + np.random.normal(0, 30),
            timestamp=i * 0.033,
            pupil_diameter=4.0 + np.random.normal(0, 0.3)
        )
        for i in range(300)
    ]
    
    # 添加扫视
    for i in [50, 100, 150, 200, 250]:
        normal_gaze[i].x += 100
        normal_gaze[i+1].x += 80
        normal_gaze[i+2].x += 50
    
    # 认知分心
    distracted_gaze = [
        GazePoint(
            x=960 + np.random.normal(0, 20),  # 注视更集中
            y=540 + np.random.normal(0, 15),
            timestamp=i * 0.033,
            pupil_diameter=4.5 + np.random.normal(0, 0.4)  # 瞳孔略大
        )
        for i in range(300)
    ]
    
    # 检测
    detector = CognitiveDistractionDetector()
    
    print("=== 正常驾驶 ===")
    features_normal = detector.extract_features(normal_gaze)
    is_distracted, conf, reason = detector.detect_cognitive_distraction(features_normal)
    print(f"认知分心: {'是' if is_distracted else '否'}")
    print(f"置信度: {conf:.2f}")
    print(f"原因: {reason}")
    
    print("\n=== 认知分心 ===")
    features_distracted = detector.extract_features(distracted_gaze)
    is_distracted, conf, reason = detector.detect_cognitive_distraction(features_distracted)
    print(f"认知分心: {'是' if is_distracted else '否'}")
    print(f"置信度: {conf:.2f}")
    print(f"原因: {reason}")

认知分心检测:眼动行为与时空信息融合方法
https://dapalm.com/2026/04/21/2026-04-21-cognitive-distraction-eye-movement-fusion/
作者
Mars
发布于
2026年4月21日
许可协议