DMS 数据流架构:从摄像头到警告的完整流程

前言

DMS 系统需要在**≤100ms 延迟**内完成从摄像头输入到警告输出的完整流程。本文解析数据流架构。


一、实时性要求

1.1 Euro NCAP 要求

功能 响应时间
分心检测 ≤3s
疲劳检测 ≤5s
无响应检测 ≤10s

1.2 系统延迟分配

模块 延迟 占比
图像采集 5ms 5%
ISP 处理 10ms 10%
特征提取 30ms 30%
状态判断 20ms 20%
警告输出 10ms 10%
系统余量 25ms 25%
总延迟 100ms 100%

二、数据流架构

2.1 完整架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
DMS 数据流架构:

┌─────────────────────────────────────────────┐
│ 传感器层(延迟:5ms) │
│ ├─ IR 摄像头采集 │
│ └─ 帧缓存 │
├─────────────────────────────────────────────┤
│ ISP 层(延迟:10ms) │
│ ├─ 畸变校正 │
│ ├─ 自动曝光 │
│ └─ 图像增强 │
├─────────────────────────────────────────────┤
│ 特征提取层(延迟:30ms) │
│ ├─ 人脸检测 │
│ ├─ 关键点定位 │
│ ├─ 眼动追踪 │
│ ├─ 视线估计 │
│ └─ 头部姿态 │
├─────────────────────────────────────────────┤
│ 状态判断层(延迟:20ms) │
│ ├─ 疲劳判断 │
│ ├─ 分心判断 │
│ ├─ 损伤检测 │
│ └─ 无响应检测 │
├─────────────────────────────────────────────┤
│ 决策输出层(延迟:10ms) │
│ ├─ 警告级别判断 │
│ ├─ 警告信号生成 │
│ └─ CAN-FD 输出 │
└─────────────────────────────────────────────┘

2.2 数据格式

阶段 数据格式 大小
传感器 Raw (Bayer) 5MP × 2bytes = 10MB
ISP RGB/IR 5MP × 3bytes = 15MB
特征 特征向量 128-512 floats
判断 状态枚举 1-4 bytes
输出 CAN 帧 8-64 bytes

三、关键模块

3.1 人脸检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 人脸检测模块(概念代码)
import cv2
import numpy as np

class FaceDetector:
def __init__(self, model_path):
self.model = cv2.dnn.readNetFromONNX(model_path)
self.input_size = (320, 240)

def detect(self, frame):
"""
人脸检测

输入:frame [H, W, 3]
输出:face_bbox [N, 4]
延迟:<10ms
"""
# 1. 预处理
blob = cv2.dnn.blobFromImage(
frame,
scalefactor=1.0/255,
size=self.input_size,
mean=(0.5, 0.5, 0.5),
swapRB=True
)

# 2. 推理
self.model.setInput(blob)
detections = self.model.forward()

# 3. 后处理
faces = self._postprocess(detections, frame.shape)

return faces

3.2 眼动追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 眼动追踪模块
class EyeTracker:
def __init__(self, model_path):
self.model = self._load_model(model_path)
self.history = []
self.history_size = 30 # 1秒历史(@30fps)

def track(self, face_landmarks, frame):
"""
眼动追踪

输入:face_landmarks [68, 2]
输出:{
'eye_closure': float,
'gaze_direction': [2],
'blink_detected': bool
}
延迟:<10ms
"""
# 1. 提取眼部区域
left_eye_roi = self._get_eye_roi(frame, face_landmarks, 'left')
right_eye_roi = self._get_eye_roi(frame, face_landmarks, 'right')

# 2. 计算眼睑闭合度(EAR)
ear = self._calculate_ear(left_eye_roi, right_eye_roi)

# 3. 估计视线方向
gaze = self._estimate_gaze(left_eye_roi, right_eye_roi)

# 4. 检测眨眼
blink = self._detect_blink(ear)

# 5. 更新历史
self.history.append({
'ear': ear,
'gaze': gaze,
'blink': blink
})
if len(self.history) > self.history_size:
self.history.pop(0)

return {
'eye_closure': ear,
'gaze_direction': gaze,
'blink_detected': blink
}

3.3 状态判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 状态判断模块
class StateClassifier:
def __init__(self):
self.fatigue_threshold = 0.7
self.distraction_threshold = 0.6

def classify(self, features):
"""
状态分类

输入:features {
'perclos': float,
'blink_rate': float,
'gaze_off_road': float,
'head_pose': [3]
}
输出:{
'state': 'normal' | 'fatigue' | 'distraction',
'confidence': float,
'should_warn': bool
}
延迟:<5ms
"""
# 1. 疲劳判断
fatigue_score = self._calculate_fatigue(features)

# 2. 分心判断
distraction_score = self._calculate_distraction(features)

# 3. 综合判断
if fatigue_score > self.fatigue_threshold:
return {
'state': 'fatigue',
'confidence': fatigue_score,
'should_warn': True
}
elif distraction_score > self.distraction_threshold:
return {
'state': 'distraction',
'confidence': distraction_score,
'should_warn': True
}
else:
return {
'state': 'normal',
'confidence': 1.0 - max(fatigue_score, distraction_score),
'should_warn': False
}

四、性能优化

4.1 多线程架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
多线程处理架构:

┌─────────────────────────────────────────────┐
│ 线程 1:图像采集 │
│ └─ 循环采集,写入缓冲区 │
├─────────────────────────────────────────────┤
│ 线程 2:特征提取 │
│ ├─ 从缓冲区读取帧 │
│ ├─ 执行特征提取 │
│ └─ 写入特征队列 │
├─────────────────────────────────────────────┤
│ 线程 3:状态判断 │
│ ├─ 从特征队列读取 │
│ ├─ 执行状态判断 │
│ └─ 写入结果队列 │
├─────────────────────────────────────────────┤
│ 线程 4:警告输出 │
│ ├─ 从结果队列读取 │
│ └─ 输出 CAN 信号 │
└─────────────────────────────────────────────┘

4.2 模型优化

优化方法 效果
INT8 量化 4x 加速
模型剪枝 2x 加速
知识蒸馏 模型减小 50%

五、开发启示

5.1 延迟优化

优化点 方法
模型推理 INT8 量化
内存访问 零拷贝
多线程 流水线并行

5.2 与 IMS 集成

功能 数据流位置
疲劳检测 特征提取 → 状态判断
分心检测 特征提取 → 状态判断
警告输出 状态判断 → CAN 输出

六、总结

关键要点

要点 说明
总延迟 ≤100ms 满足实时性要求
特征提取最耗时 需重点优化
多线程并行 提升吞吐量
模型量化 4x 加速

开发启示

启示 说明
流水线设计 并行处理
模型优化 量化、剪枝
内存管理 零拷贝
性能测试 全链路测试

发布日期: 2026-04-14
标签: #DMS #数据流 #系统架构 #实时处理


DMS 数据流架构:从摄像头到警告的完整流程
https://dapalm.com/2026/04/14/2026-04-14-DMS-Data-Flow-Architecture/
作者
Mars
发布于
2026年4月14日
许可协议