认知分心检测:眼动行为与时空信息融合方法
认知分心检测:眼动行为与时空信息融合方法
论文来源: Springer Nature, Neural Information Processing 2026
作者: Qiao, Y., Yang, X., Wang, J., Si, T., Guo, Q.
核心贡献: 眼动行为 + 时空信息融合的认知分心检测
研究背景
认知分心 vs 视觉分心
| 类型 | 定义 | 检测难点 |
|---|---|---|
| 视觉分心 | 眼睛离开道路 | ✅ 易检测(视线追踪) |
| 认知分心 | 思维走神,眼睛在路上 | ❌ 难检测(需行为分析) |
认知分心的核心特征:
1 | |
核心方法
1. 眼动行为特征
扫视(Saccade)分析:
| 特征 | 正常驾驶 | 认知分心 |
|---|---|---|
| 扫视频率 | 3-5 次/秒 | ↓ 降低 |
| 扫视幅度 | 5-15° | ↓ 变小 |
| 扫视速度 | 200-500°/s | ↓ 变慢 |
| 扫视潜伏期 | 150-250ms | ↑ 延长 |
代码实现:
import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
class EyeMovementType(Enum):
"""眼动类型"""
FIXATION = "fixation"
SACCADE = "saccade"
BLINK = "blink"
SMOOTH_PURSUIT = "smooth_pursuit"
@dataclass
class GazePoint:
"""注视点"""
x: float # 像素坐标
y: float
timestamp: float # 秒
pupil_diameter: float # mm
@dataclass
class EyeMovement:
"""眼动事件"""
movement_type: EyeMovementType
start_time: float
end_time: float
start_pos: Tuple[float, float]
end_pos: Tuple[float, float]
amplitude: float # 度
velocity: float # 度/秒
class SaccadeDetector:
"""
扫视检测器
基于 I-VT(速度阈值)算法
"""
def __init__(
self,
velocity_threshold: float = 30.0, # 度/秒
min_duration: float = 0.010, # 最小持续时间 (秒)
max_duration: float = 0.100, # 最大持续时间 (秒)
screen_distance: float = 0.6, # 屏幕距离 (米)
screen_width: float = 0.5 # 屏幕宽度 (米)
):
self.velocity_threshold = velocity_threshold
self.min_duration = min_duration
self.max_duration = max_duration
self.screen_distance = screen_distance
self.screen_width = screen_width
# 像素到度的转换系数
self.pixels_per_degree = self._calculate_pixels_per_degree()
def _calculate_pixels_per_degree(self) -> float:
"""计算像素到度的转换系数"""
# 假设屏幕分辨率
# 视角 = arctan(物体尺寸 / 距离)
# 简化:1度 ≈ 屏幕距离 × tan(1°)
import math
one_degree_in_meters = self.screen_distance * math.tan(math.radians(1))
# 假设屏幕宽度对应 1920 像素
pixels_per_meter = 1920 / self.screen_width
return one_degree_in_meters * pixels_per_meter
def detect_saccades(
self,
gaze_sequence: List[GazePoint]
) -> List[EyeMovement]:
"""
检测扫视事件
Args:
gaze_sequence: 注视点序列
Returns:
saccades: 扫视事件列表
"""
if len(gaze_sequence) < 2:
return []
# 计算速度
velocities = []
for i in range(1, len(gaze_sequence)):
# 位移(像素)
dx = gaze_sequence[i].x - gaze_sequence[i-1].x
dy = gaze_sequence[i].y - gaze_sequence[i-1].y
distance_pixels = np.sqrt(dx**2 + dy**2)
# 转换为度
distance_degrees = distance_pixels / self.pixels_per_degree
# 时间间隔
dt = gaze_sequence[i].timestamp - gaze_sequence[i-1].timestamp
if dt > 0:
velocity = distance_degrees / dt
else:
velocity = 0
velocities.append(velocity)
# 基于速度阈值分类
saccades = []
in_saccade = False
saccade_start_idx = 0
for i, velocity in enumerate(velocities):
if velocity > self.velocity_threshold and not in_saccade:
# 扫视开始
in_saccade = True
saccade_start_idx = i
elif velocity <= self.velocity_threshold and in_saccade:
# 扫视结束
in_saccade = False
# 检查持续时间
duration = (
gaze_sequence[i].timestamp -
gaze_sequence[saccade_start_idx].timestamp
)
if self.min_duration <= duration <= self.max_duration:
# 计算扫视参数
start_pos = (
gaze_sequence[saccade_start_idx].x,
gaze_sequence[saccade_start_idx].y
)
end_pos = (gaze_sequence[i].x, gaze_sequence[i].y)
# 幅度
dx = end_pos[0] - start_pos[0]
dy = end_pos[1] - start_pos[1]
amplitude = np.sqrt(dx**2 + dy**2) / self.pixels_per_degree
# 平均速度
avg_velocity = amplitude / duration if duration > 0 else 0
saccades.append(EyeMovement(
movement_type=EyeMovementType.SACCADE,
start_time=gaze_sequence[saccade_start_idx].timestamp,
end_time=gaze_sequence[i].timestamp,
start_pos=start_pos,
end_pos=end_pos,
amplitude=amplitude,
velocity=avg_velocity
))
return saccades
def extract_saccade_features(
self,
saccades: List[EyeMovement],
window_duration: float = 60.0
) -> dict:
"""
提取扫视特征
Args:
saccades: 扫视事件列表
window_duration: 时间窗口 (秒)
Returns:
features: 扫视特征字典
"""
if not saccades:
return {
"saccade_count": 0,
"saccade_rate": 0,
"mean_amplitude": 0,
"std_amplitude": 0,
"mean_velocity": 0,
"std_velocity": 0,
"mean_duration": 0
}
# 计数
saccade_count = len(saccades)
saccade_rate = saccade_count / window_duration
# 幅度统计
amplitudes = [s.amplitude for s in saccades]
mean_amplitude = np.mean(amplitudes)
std_amplitude = np.std(amplitudes)
# 速度统计
velocities = [s.velocity for s in saccades]
mean_velocity = np.mean(velocities)
std_velocity = np.std(velocities)
# 持续时间
durations = [s.end_time - s.start_time for s in saccades]
mean_duration = np.mean(durations)
return {
"saccade_count": saccade_count,
"saccade_rate": saccade_rate,
"mean_amplitude": mean_amplitude,
"std_amplitude": std_amplitude,
"mean_velocity": mean_velocity,
"std_velocity": std_velocity,
"mean_duration": mean_duration
}
class CognitiveDistractionDetector:
"""
认知分心检测器
基于眼动行为和时空信息融合
"""
# 正常驾驶基线(来自研究数据)
NORMAL_BASELINE = {
"saccade_rate": 4.0, # Hz
"mean_amplitude": 8.0, # 度
"mean_velocity": 300.0, # 度/秒
"pupil_diameter_mean": 4.0, # mm
"blink_rate": 0.3 # Hz
}
def __init__(self):
self.saccade_detector = SaccadeDetector()
# 历史缓存
self.feature_history = []
self.history_length = 30 # 30秒历史
def extract_features(
self,
gaze_sequence: List[GazePoint]
) -> dict:
"""
提取完整特征集
Args:
gaze_sequence: 注视点序列
Returns:
features: 特征字典
"""
# 1. 扫视特征
saccades = self.saccade_detector.detect_saccades(gaze_sequence)
if gaze_sequence:
window_duration = (
gaze_sequence[-1].timestamp - gaze_sequence[0].timestamp
)
else:
window_duration = 60.0
saccade_features = self.saccade_detector.extract_saccade_features(
saccades, window_duration
)
# 2. 注视特征
fixation_features = self._extract_fixation_features(gaze_sequence)
# 3. 瞳孔特征
pupil_features = self._extract_pupil_features(gaze_sequence)
# 4. 眨眼特征
blink_features = self._extract_blink_features(gaze_sequence)
# 合并
features = {
**saccade_features,
**fixation_features,
**pupil_features,
**blink_features
}
return features
def _extract_fixation_features(
self,
gaze_sequence: List[GazePoint]
) -> dict:
"""提取注视特征"""
if len(gaze_sequence) < 10:
return {
"fixation_count": 0,
"mean_fixation_duration": 0,
"gaze_dispersion": 0
}
# 简化的注视检测:速度 < 阈值的连续点
fixations = []
in_fixation = False
fixation_start = 0
for i in range(1, len(gaze_sequence)):
dx = gaze_sequence[i].x - gaze_sequence[i-1].x
dy = gaze_sequence[i].y - gaze_sequence[i-1].y
distance = np.sqrt(dx**2 + dy**2)
dt = gaze_sequence[i].timestamp - gaze_sequence[i-1].timestamp
velocity = distance / dt if dt > 0 else 0
# 注视阈值:30 像素/秒(约 1 度/秒)
if velocity < 30 and not in_fixation:
in_fixation = True
fixation_start = i
elif velocity >= 30 and in_fixation:
in_fixation = False
fixations.append((fixation_start, i))
# 计算特征
if fixations:
durations = [
gaze_sequence[end].timestamp - gaze_sequence[start].timestamp
for start, end in fixations
]
mean_duration = np.mean(durations)
else:
mean_duration = 0
# 注视分散度
x_coords = [g.x for g in gaze_sequence]
y_coords = [g.y for g in gaze_sequence]
gaze_dispersion = np.std(x_coords) + np.std(y_coords)
return {
"fixation_count": len(fixations),
"mean_fixation_duration": mean_duration,
"gaze_dispersion": gaze_dispersion
}
def _extract_pupil_features(
self,
gaze_sequence: List[GazePoint]
) -> dict:
"""提取瞳孔特征"""
if not gaze_sequence:
return {
"pupil_diameter_mean": 0,
"pupil_diameter_std": 0,
"pupil_variability": 0
}
diameters = [g.pupil_diameter for g in gaze_sequence]
return {
"pupil_diameter_mean": np.mean(diameters),
"pupil_diameter_std": np.std(diameters),
"pupil_variability": np.std(diameters) / np.mean(diameters) if np.mean(diameters) > 0 else 0
}
def _extract_blink_features(
self,
gaze_sequence: List[GazePoint]
) -> dict:
"""提取眨眼特征"""
# 简化:通过瞳孔直径突变检测眨眼
if len(gaze_sequence) < 10:
return {"blink_count": 0, "blink_rate": 0}
blink_count = 0
for i in range(1, len(gaze_sequence)):
# 瞳孔直径突然变小接近0 → 眨眼
if (gaze_sequence[i].pupil_diameter < 1.0 and
gaze_sequence[i-1].pupil_diameter > 2.0):
blink_count += 1
duration = (
gaze_sequence[-1].timestamp - gaze_sequence[0].timestamp
) if len(gaze_sequence) > 1 else 1.0
blink_rate = blink_count / duration if duration > 0 else 0
return {
"blink_count": blink_count,
"blink_rate": blink_rate
}
def detect_cognitive_distraction(
self,
features: dict
) -> Tuple[bool, float, str]:
"""
检测认知分心
Args:
features: 特征字典
Returns:
(is_distracted, confidence, reason)
"""
# 与基线比较
deviations = {}
# 扫视频率下降
if features["saccade_rate"] > 0:
saccade_rate_deviation = (
self.NORMAL_BASELINE["saccade_rate"] - features["saccade_rate"]
) / self.NORMAL_BASELINE["saccade_rate"]
deviations["saccade_rate"] = saccade_rate_deviation
# 扫视幅度下降
if features["mean_amplitude"] > 0:
amplitude_deviation = (
self.NORMAL_BASELINE["mean_amplitude"] - features["mean_amplitude"]
) / self.NORMAL_BASELINE["mean_amplitude"]
deviations["amplitude"] = amplitude_deviation
# 扫视速度下降
if features["mean_velocity"] > 0:
velocity_deviation = (
self.NORMAL_BASELINE["mean_velocity"] - features["mean_velocity"]
) / self.NORMAL_BASELINE["mean_velocity"]
deviations["velocity"] = velocity_deviation
# 瞳孔直径变化
if features["pupil_diameter_mean"] > 0:
pupil_deviation = (
features["pupil_diameter_mean"] - self.NORMAL_BASELINE["pupil_diameter_mean"]
) / self.NORMAL_BASELINE["pupil_diameter_mean"]
deviations["pupil"] = pupil_deviation
# 综合评分
# 权重:扫视特征 60%,瞳孔 40%
cognitive_score = (
deviations.get("saccade_rate", 0) * 0.25 +
deviations.get("amplitude", 0) * 0.20 +
deviations.get("velocity", 0) * 0.15 +
deviations.get("pupil", 0) * 0.40
)
# 判定
is_distracted = cognitive_score > 0.3 # 偏离 30% 以上
confidence = min(1.0, cognitive_score / 0.5) # 归一化
# 原因
reasons = []
if deviations.get("saccade_rate", 0) > 0.3:
reasons.append("扫视频率显著下降")
if deviations.get("amplitude", 0) > 0.3:
reasons.append("扫视幅度显著下降")
if deviations.get("velocity", 0) > 0.3:
reasons.append("扫视速度显著下降")
if deviations.get("pupil", 0) > 0.3:
reasons.append("瞳孔直径异常")
reason = ";".join(reasons) if reasons else "正常"
return is_distracted, confidence, reason
# 测试
if __name__ == "__main__":
# 模拟眼动数据
np.random.seed(42)
# 正常驾驶
normal_gaze = [
GazePoint(
x=960 + np.random.normal(0, 50),
y=540 + np.random.normal(0, 30),
timestamp=i * 0.033,
pupil_diameter=4.0 + np.random.normal(0, 0.3)
)
for i in range(300)
]
# 添加扫视
for i in [50, 100, 150, 200, 250]:
normal_gaze[i].x += 100
normal_gaze[i+1].x += 80
normal_gaze[i+2].x += 50
# 认知分心
distracted_gaze = [
GazePoint(
x=960 + np.random.normal(0, 20), # 注视更集中
y=540 + np.random.normal(0, 15),
timestamp=i * 0.033,
pupil_diameter=4.5 + np.random.normal(0, 0.4) # 瞳孔略大
)
for i in range(300)
]
# 检测
detector = CognitiveDistractionDetector()
print("=== 正常驾驶 ===")
features_normal = detector.extract_features(normal_gaze)
is_distracted, conf, reason = detector.detect_cognitive_distraction(features_normal)
print(f"认知分心: {'是' if is_distracted else '否'}")
print(f"置信度: {conf:.2f}")
print(f"原因: {reason}")
print("\n=== 认知分心 ===")
features_distracted = detector.extract_features(distracted_gaze)
is_distracted, conf, reason = detector.detect_cognitive_distraction(features_distracted)
print(f"认知分心: {'是' if is_distracted else '否'}")
print(f"置信度: {conf:.2f}")
print(f"原因: {reason}")
认知分心检测:眼动行为与时空信息融合方法
https://dapalm.com/2026/04/21/2026-04-21-cognitive-distraction-eye-movement-fusion/