驾驶员认知分心检测DCDD模型深度解析

驾驶员认知分心检测DCDD模型深度解析

论文信息

核心创新

首次提出基于眼动行为和多视图空间-通道特征融合的认知分心检测模型,解决了传统视觉分心检测无法识别”眼在看路、心在想别的事”的认知分心问题。

认知分心 vs 视觉分心

类型 定义 特征 检测难度
视觉分心 眼睛离开道路 视线偏移 低 ✅
认知分心 心智资源被占用 眼动模式异常 高 ⚠️

技术难点

认知分心检测的挑战:

  1. 无外部行为:驾驶员眼睛看着道路,但心思不在
  2. 眼动模式变化:扫视频率降低、凝视时间增加
  3. 个体差异大:不同人认知分心时眼动模式不同

技术方案

1. DCDD模型架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
"""
Driver Cognitive Distraction Detection (DCDD) Model

核心思想:
1. 眼动行为分析:提取眼动模式特征
2. 多视图特征融合:DashCam图像 + 眼动轨迹
3. 时空信息融合:时序眼动序列 + 空间上下文

架构:
- 眼动编码器:处理眼动轨迹序列
- 图像编码器:处理DashCam图像
- 融合网络:多视图特征融合
- 分类头:认知分心/正常状态
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, Tuple, List, Optional
import numpy as np
from dataclasses import dataclass


@dataclass
class EyeMovement:
"""眼动数据"""
timestamp: float
gaze_x: float # 归一化 [0, 1]
gaze_y: float
pupil_diameter: float # 瞳孔直径
fixation_duration: float # 注视时长
saccade_amplitude: float # 扫视幅度
blink_rate: float # 眨眼频率


class EyeMovementEncoder(nn.Module):
"""
眼动编码器

输入:眼动序列 [(timestamp, gaze_x, gaze_y, pupil, fixation, saccade, blink)]
输出:眼动特征向量 [batch, hidden_dim]
"""

def __init__(
self,
input_dim: int = 7, # 眼动特征维度
hidden_dim: int = 128,
num_layers: int = 2,
dropout: float = 0.3
):
super().__init__()

# 时序编码器 (LSTM)
self.lstm = nn.LSTM(
input_size=input_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=True
)

# 注意力机制
self.attention = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.Tanh(),
nn.Linear(hidden_dim, 1)
)

# 输出投影
self.output_proj = nn.Linear(hidden_dim * 2, hidden_dim)

def forward(
self,
eye_sequence: torch.Tensor, # [batch, seq_len, input_dim]
mask: Optional[torch.Tensor] = None # [batch, seq_len]
) -> torch.Tensor:
"""
编码眼动序列

Args:
eye_sequence: 眼动序列
mask: 有效帧掩码

Returns:
features: 眼动特征 [batch, hidden_dim]
"""
batch_size, seq_len, _ = eye_sequence.shape

# LSTM编码
lstm_out, _ = self.lstm(eye_sequence) # [batch, seq_len, hidden*2]

# 注意力权重
attn_weights = self.attention(lstm_out) # [batch, seq_len, 1]

if mask is not None:
attn_weights = attn_weights.masked_fill(
mask.unsqueeze(-1) == 0, float('-inf')
)

attn_weights = F.softmax(attn_weights, dim=1)

# 加权求和
features = (lstm_out * attn_weights).sum(dim=1) # [batch, hidden*2]

return self.output_proj(features)


class DashCamImageEncoder(nn.Module):
"""
DashCam图像编码器

输入:前方道路图像
输出:道路场景特征
"""

def __init__(
self,
backbone: str = "resnet18",
pretrained: bool = True,
output_dim: int = 128
):
super().__init__()

# Backbone
if backbone == "resnet18":
from torchvision.models import resnet18
self.backbone = resnet18(pretrained=pretrained)
self.backbone = nn.Sequential(*list(self.backbone.children())[:-1])
backbone_dim = 512
else:
raise ValueError(f"Unknown backbone: {backbone}")

# 输出投影
self.proj = nn.Linear(backbone_dim, output_dim)

def forward(self, image: torch.Tensor) -> torch.Tensor:
"""
编码图像

Args:
image: [batch, 3, H, W]

Returns:
features: [batch, output_dim]
"""
features = self.backbone(image) # [batch, 512, 1, 1]
features = features.view(features.size(0), -1) # [batch, 512]
return self.proj(features) # [batch, output_dim]


class FusionAdversarialNetwork(nn.Module):
"""
融合对抗网络 (FAN)

功能:
1. 融合眼动特征和图像特征
2. 对抗学习减少域差异
"""

def __init__(
self,
eye_dim: int = 128,
image_dim: int = 128,
fusion_dim: int = 256,
num_classes: int = 2 # 正常/认知分心
):
super().__init__()

# 特征融合
self.fusion = nn.Sequential(
nn.Linear(eye_dim + image_dim, fusion_dim),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(fusion_dim, fusion_dim),
nn.ReLU()
)

# 分类器
self.classifier = nn.Sequential(
nn.Linear(fusion_dim, fusion_dim // 2),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(fusion_dim // 2, num_classes)
)

# 域判别器(对抗学习)
self.domain_discriminator = nn.Sequential(
nn.Linear(fusion_dim, fusion_dim // 2),
nn.ReLU(),
nn.Linear(fusion_dim // 2, 1),
nn.Sigmoid()
)

def forward(
self,
eye_features: torch.Tensor,
image_features: torch.Tensor,
return_domain: bool = False
) -> Dict[str, torch.Tensor]:
"""
前向传播

Args:
eye_features: 眼动特征
image_features: 图像特征
return_domain: 是否返回域预测

Returns:
output: {
'logits': 分类logits,
'domain': 域预测(可选)
}
"""
# 特征拼接
concat = torch.cat([eye_features, image_features], dim=-1)

# 融合
fused = self.fusion(concat)

# 分类
logits = self.classifier(fused)

output = {'logits': logits}

if return_domain:
output['domain'] = self.domain_discriminator(fused)

return output


class MultiViewSpaceChannelNetwork(nn.Module):
"""
多视图空间-通道网络 (MSCN)

功能:
1. 空间注意力:关注眼动轨迹的空间分布
2. 通道注意力:选择重要的特征通道
"""

def __init__(
self,
eye_dim: int = 128,
image_dim: int = 128,
num_heads: int = 4
):
super().__init__()

# 空间注意力(跨模态)
self.spatial_attention = nn.MultiheadAttention(
embed_dim=eye_dim + image_dim,
num_heads=num_heads,
batch_first=True
)

# 通道注意力
self.channel_attention = nn.Sequential(
nn.AdaptiveAvgPool1d(1),
nn.Conv1d(1, 1, kernel_size=1),
nn.Sigmoid()
)

def forward(
self,
eye_features: torch.Tensor,
image_features: torch.Tensor
) -> torch.Tensor:
"""
多视图特征融合

Args:
eye_features: [batch, eye_dim]
image_features: [batch, image_dim]

Returns:
enhanced: [batch, eye_dim + image_dim]
"""
# 拼接
concat = torch.cat([eye_features, image_features], dim=-1)

# 添加序列维度用于空间注意力
concat_seq = concat.unsqueeze(1) # [batch, 1, dim]

# 空间注意力
spatial_out, _ = self.spatial_attention(
concat_seq, concat_seq, concat_seq
)
spatial_out = spatial_out.squeeze(1) # [batch, dim]

# 通道注意力
channel_weights = self.channel_attention(
spatial_out.unsqueeze(1)
).squeeze(1) # [batch, dim]

# 加权
enhanced = spatial_out * channel_weights

return enhanced


class DCDDModel(nn.Module):
"""
完整的DCDD模型

架构:
1. 眼动编码器 -> 眼动特征
2. 图像编码器 -> 图像特征
3. MSCN -> 多视图融合特征
4. FAN -> 分类 + 对抗学习
"""

def __init__(
self,
eye_input_dim: int = 7,
hidden_dim: int = 128,
num_classes: int = 2,
use_adversarial: bool = True
):
super().__init__()

self.use_adversarial = use_adversarial

# 编码器
self.eye_encoder = EyeMovementEncoder(
input_dim=eye_input_dim,
hidden_dim=hidden_dim
)

self.image_encoder = DashCamImageEncoder(
output_dim=hidden_dim
)

# 多视图融合
self.mscn = MultiViewSpaceChannelNetwork(
eye_dim=hidden_dim,
image_dim=hidden_dim
)

# 分类器
self.fan = FusionAdversarialNetwork(
eye_dim=hidden_dim,
image_dim=hidden_dim,
fusion_dim=hidden_dim * 2,
num_classes=num_classes
)

def forward(
self,
eye_sequence: torch.Tensor,
image: torch.Tensor,
eye_mask: Optional[torch.Tensor] = None
) -> Dict[str, torch.Tensor]:
"""
前向传播

Args:
eye_sequence: 眼动序列 [batch, seq_len, eye_dim]
image: 图像 [batch, 3, H, W]
eye_mask: 眼动掩码 [batch, seq_len]

Returns:
output: {
'logits': 分类logits,
'domain': 域预测(对抗学习)
}
"""
# 编码
eye_features = self.eye_encoder(eye_sequence, eye_mask)
image_features = self.image_encoder(image)

# 多视图融合
fused = self.mscn(eye_features, image_features)

# 分类(重新分割特征)
eye_out = fused[:, :eye_features.size(-1)]
image_out = fused[:, eye_features.size(-1):]

output = self.fan(
eye_out, image_out,
return_domain=self.use_adversarial
)

return output


# 训练脚本
class DCDDTrainer:
"""DCDD模型训练器"""

def __init__(
self,
model: DCDDModel,
lr: float = 1e-4,
weight_decay: float = 1e-5,
adversarial_weight: float = 0.1
):
self.model = model
self.adversarial_weight = adversarial_weight

# 优化器
self.optimizer = torch.optim.AdamW(
model.parameters(),
lr=lr,
weight_decay=weight_decay
)

# 损失函数
self.classification_loss = nn.CrossEntropyLoss()
self.domain_loss = nn.BCELoss()

def train_step(
self,
eye_sequence: torch.Tensor,
image: torch.Tensor,
labels: torch.Tensor,
domain_labels: torch.Tensor,
eye_mask: Optional[torch.Tensor] = None
) -> Dict[str, float]:
"""
单步训练

Args:
eye_sequence: 眼动序列
image: 图像
labels: 分类标签 (0: 正常, 1: 认知分心)
domain_labels: 域标签 (0: 源域, 1: 目标域)
eye_mask: 眼动掩码

Returns:
losses: 各项损失
"""
self.model.train()
self.optimizer.zero_grad()

# 前向传播
output = self.model(eye_sequence, image, eye_mask)

# 分类损失
cls_loss = self.classification_loss(output['logits'], labels)

total_loss = cls_loss

# 对抗损失
if self.model.use_adversarial and 'domain' in output:
# 梯度反转层效果
domain_loss = self.domain_loss(
output['domain'].squeeze(),
domain_labels.float()
)

# 对抗:最小化分类损失,最大化域损失
total_loss = cls_loss - self.adversarial_weight * domain_loss

# 反向传播
total_loss.backward()
self.optimizer.step()

return {
'total_loss': total_loss.item(),
'classification_loss': cls_loss.item()
}


# 眼动特征提取
class EyeMovementFeatureExtractor:
"""眼动特征提取器"""

def __init__(self, sampling_rate: int = 30):
self.sampling_rate = sampling_rate

def extract_features(
self,
gaze_sequence: List[Tuple[float, float, float]], # (t, x, y)
window_sec: float = 5.0
) -> np.ndarray:
"""
提取眼动特征

Args:
gaze_sequence: 注视点序列
window_sec: 时间窗口

Returns:
features: [seq_len, 7] 特征数组
"""
window_samples = int(window_sec * self.sampling_rate)

features_list = []

for i in range(len(gaze_sequence) - window_samples):
window = gaze_sequence[i:i + window_samples]

# 提取特征
features = self._extract_window_features(window)
features_list.append(features)

return np.array(features_list)

def _extract_window_features(
self,
window: List[Tuple[float, float, float]]
) -> np.ndarray:
"""提取窗口特征"""
times = np.array([w[0] for w in window])
xs = np.array([w[1] for w in window])
ys = np.array([w[2] for w in window])

# 计算眼动指标
# 1. 扫视频率
saccades = self._detect_saccades(xs, ys)
saccade_rate = len(saccades) / (times[-1] - times[0]) if len(times) > 1 else 0

# 2. 平均扫视幅度
saccade_amplitudes = [self._saccade_amplitude(s) for s in saccades]
avg_saccade_amplitude = np.mean(saccade_amplitudes) if saccade_amplitudes else 0

# 3. 注视时长
fixations = self._detect_fixations(xs, ys)
avg_fixation_duration = np.mean([f['duration'] for f in fixations]) if fixations else 0

# 4. 瞳孔直径(模拟)
pupil_diameter = np.random.normal(4.0, 0.5) # 需实际测量

# 5. 眨眼频率
blink_rate = np.random.poisson(0.3) * 60 # 次/分钟,需实际测量

# 6. 视线分散度
gaze_dispersion = np.std(xs) + np.std(ys)

return np.array([
np.mean(xs), # 平均x
np.mean(ys), # 平均y
pupil_diameter, # 瞳孔直径
avg_fixation_duration, # 平均注视时长
avg_saccade_amplitude, # 平均扫视幅度
blink_rate, # 眨眼频率
gaze_dispersion # 视线分散度
])

def _detect_saccades(
self,
xs: np.ndarray,
ys: np.ndarray,
threshold: float = 0.05
) -> List[Tuple[int, int]]:
"""检测扫视"""
saccades = []
in_saccade = False
start_idx = 0

for i in range(1, len(xs)):
dist = np.sqrt((xs[i] - xs[i-1])**2 + (ys[i] - ys[i-1])**2)

if dist > threshold and not in_saccade:
in_saccade = True
start_idx = i - 1
elif dist <= threshold and in_saccade:
in_saccade = False
saccades.append((start_idx, i - 1))

return saccades

def _detect_fixations(
self,
xs: np.ndarray,
ys: np.ndarray,
threshold: float = 0.02,
min_duration: int = 6
) -> List[Dict]:
"""检测注视"""
fixations = []
start_idx = 0
fixation_count = 1

for i in range(1, len(xs)):
dist = np.sqrt((xs[i] - xs[start_idx])**2 + (ys[i] - ys[start_idx])**2)

if dist < threshold:
fixation_count += 1
else:
if fixation_count >= min_duration:
fixations.append({
'start': start_idx,
'end': i - 1,
'duration': fixation_count / self.sampling_rate,
'x': np.mean(xs[start_idx:i]),
'y': np.mean(ys[start_idx:i])
})

start_idx = i
fixation_count = 1

return fixations

def _saccade_amplitude(self, saccade: Tuple[int, int]) -> float:
"""计算扫视幅度"""
# 简化实现
return np.random.uniform(5, 20) # 度


# 测试
if __name__ == "__main__":
# 创建模型
model = DCDDModel(
eye_input_dim=7,
hidden_dim=128,
num_classes=2,
use_adversarial=True
)

print("DCDD模型架构:")
print(f"- 眼动编码器: LSTM + Attention")
print(f"- 图像编码器: ResNet18")
print(f"- 多视图融合: MSCN (空间+通道注意力)")
print(f"- 分类器: FAN (融合对抗网络)")

# 模拟输入
batch_size = 4
seq_len = 150 # 5秒 @ 30fps

eye_sequence = torch.randn(batch_size, seq_len, 7)
image = torch.randn(batch_size, 3, 224, 224)

# 前向传播
with torch.no_grad():
output = model(eye_sequence, image)

print(f"\n输出: logits shape = {output['logits'].shape}")
print(f"预测: {torch.argmax(output['logits'], dim=-1)}")

实验结果

数据集

数据集 样本数 场景 标注
实车采集 50小时 高速/城市 认知分心/正常
模拟器 30小时 多场景 细粒度标注

性能指标

方法 准确率 召回率 F1 AUC
仅眼动 78.3% 75.2% 76.7% 0.82
仅图像 71.5% 68.9% 70.2% 0.76
早期融合 82.1% 79.8% 80.9% 0.87
DCDD (本文) 89.2% 87.5% 88.3% 0.93

消融实验

组件 准确率 Δ
Baseline 82.1% -
+ MSCN 85.7% +3.6%
+ 对抗学习 88.3% +2.6%
+ 时序注意力 89.2% +0.9%

IMS应用启示

认知分心检测的关键指标

指标 正常驾驶 认知分心 检测方法
扫视频率 3-5次/秒 1-2次/秒 降低 >40%
注视时长 0.2-0.3秒 0.5-1.0秒 增加 >100%
瞳孔直径 3-4mm 4-5mm 增加 >20%
眨眼频率 15-20次/分 5-10次/分 降低 >50%

部署建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 实时认知分心检测Pipeline
class RealtimeCognitiveDistractionDetector:
"""实时认知分心检测器"""

def __init__(self, model_path: str):
self.model = DCDDModel()
self.model.load_state_dict(torch.load(model_path))
self.model.eval()

# 眼动缓冲
self.gaze_buffer = []
self.window_size = 150 # 5秒

def update(self, gaze_x: float, gaze_y: float, frame):
"""更新检测"""
# 添加眼动数据
self.gaze_buffer.append((time.time(), gaze_x, gaze_y))

# 限制缓冲大小
if len(self.gaze_buffer) > self.window_size:
self.gaze_buffer.pop(0)

# 检测
if len(self.gaze_buffer) >= self.window_size:
return self._detect()

return None

总结

核心贡献

  1. 首次定义认知分心检测任务
  2. 多模态融合架构:眼动 + 道路图像
  3. MSCN多视图融合:空间+通道注意力
  4. 对抗学习:跨域泛化

未来方向

  1. 轻量化部署:模型压缩到边缘设备
  2. 多任务学习:认知分心 + 疲劳检测
  3. 无监督学习:无需标注的自适应检测
  4. 生理信号融合:EEG + 心率 + 眼动

参考资源: