低成本边缘设备实时驾驶员行为识别系统

低成本边缘设备实时驾驶员行为识别系统

论文信息

  • 标题: Real-Time In-Cabin Driver Behavior Recognition on Low-Cost Edge Hardware
  • 作者: Vesal Ahsani 等
  • 发表: arXiv:2512.22298 (2026年1月)
  • 链接: https://arxiv.org/abs/2512.22298
  • 领域: Computer Vision, Human-Computer Interaction, Machine Learning

核心创新

在低成本边缘设备上实现实时驾驶员行为识别,支持17种行为类别,在树莓派5和Google Coral上分别达到16 FPS和25 FPS。

技术亮点

特性 实现
平台 Raspberry Pi 5 (CPU) + Google Coral (Edge TPU)
行为类别 17种(分心、疲劳相关)
训练数据 800,000+ 标注帧
延迟 <60ms (Pi 5), ~40ms (Coral)
量化 INT8推理

系统架构

1. 整体Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
"""
实时驾驶员行为识别系统

Pipeline:
1. 图像采集 (IR摄像头)
2. 人脸检测
3. 行为分类 (轻量级CNN)
4. 时序决策 (置信度+持续性)
5. 警告触发

支持平台:
- Raspberry Pi 5 (CPU-only)
- Google Coral Dev Board (Edge TPU)
"""

import numpy as np
import time
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
import queue
import threading


class BehaviorCategory(Enum):
"""行为类别枚举"""
# 正常行为
SAFE_DRIVING = "safe_driving"
LOOKING_FORWARD = "looking_forward"

# 分心行为
PHONE_USE = "phone_use"
TEXTING = "texting"
TALKING_PHONE = "talking_phone"
ADJUSTING_RADIO = "adjusting_radio"
DRINKING = "drinking"
EATING = "eating"
REACHING_BEHIND = "reaching_behind"
HAIR_MAKEUP = "hair_makeup"
TALKING_PASSENGER = "talking_passenger"
LOOKING_AWAY = "looking_away"

# 疲劳行为
YAWNING = "yawning"
EYES_CLOSED = "eyes_closed"
HEAD_DOWN = "head_down"
BLINKING = "blinking"

# 其他
OTHER = "other"


@dataclass
class BehaviorResult:
"""行为识别结果"""
behavior: BehaviorCategory
confidence: float
all_probs: Dict[str, float]
latency_ms: float
timestamp: float


class LightweightBehaviorModel:
"""轻量级行为分类模型"""

def __init__(
self,
model_path: str,
platform: str = "cpu", # "cpu" or "edgetpu"
num_classes: int = 17
):
self.platform = platform
self.num_classes = num_classes

# 加载模型
if platform == "edgetpu":
self._load_edgetpu_model(model_path)
else:
self._load_cpu_model(model_path)

# 行为标签映射
self.behavior_names = [
"safe_driving", "looking_forward",
"phone_use", "texting", "talking_phone",
"adjusting_radio", "drinking", "eating",
"reaching_behind", "hair_makeup",
"talking_passenger", "looking_away",
"yawning", "eyes_closed", "head_down",
"blinking", "other"
]

# 易混淆行为组
self.confuser_groups = [
["phone_use", "texting", "talking_phone"],
["drinking", "eating"],
["yawning", "eyes_closed"],
["looking_away", "talking_passenger"]
]

def _load_cpu_model(self, model_path: str):
"""加载CPU模型(ONNX Runtime)"""
import onnxruntime as ort
self.session = ort.InferenceSession(
model_path,
providers=['CPUExecutionProvider']
)
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name

def _load_edgetpu_model(self, model_path: str):
"""加载Edge TPU模型"""
from pycoral.utils import edgetpu
from pycoral.adapters import common

self.interpreter = edgetpu.make_interpreter(model_path)
self.interpreter.allocate_tensors()

self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details()

def preprocess(self, image: np.ndarray) -> np.ndarray:
"""
预处理图像

Args:
image: [H, W, 3] BGR图像

Returns:
preprocessed: [1, 3, 224, 224] 归一化tensor
"""
# Resize
import cv2
img = cv2.resize(image, (224, 224))

# 归一化
img = img.astype(np.float32) / 255.0

# 标准化
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img = (img - mean) / std

# HWC -> CHW
img = img.transpose(2, 0, 1)

# 添加batch维度
img = np.expand_dims(img, 0)

return img

def infer(self, image: np.ndarray) -> BehaviorResult:
"""
推理

Args:
image: 输入图像

Returns:
result: 行为识别结果
"""
start_time = time.time()

# 预处理
input_tensor = self.preprocess(image)

# 推理
if self.platform == "edgetpu":
# Edge TPU推理
self.interpreter.set_tensor(
self.input_details[0]['index'],
input_tensor.astype(np.uint8) # INT8量化
)
self.interpreter.invoke()
output = self.interpreter.get_tensor(
self.output_details[0]['index']
)[0]
else:
# CPU推理
output = self.session.run(
[self.output_name],
{self.input_name: input_tensor}
)[0][0]

# 后处理
probs = self._softmax(output)
top_idx = np.argmax(probs)

latency_ms = (time.time() - start_time) * 1000

return BehaviorResult(
behavior=BehaviorCategory(self.behavior_names[top_idx]),
confidence=float(probs[top_idx]),
all_probs={
name: float(prob)
for name, prob in zip(self.behavior_names, probs)
},
latency_ms=latency_ms,
timestamp=time.time()
)

def _softmax(self, x: np.ndarray) -> np.ndarray:
"""Softmax"""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()


class ConfuserAwareTaxonomy:
"""易混淆行为感知标签分类"""

def __init__(self, confuser_groups: List[List[str]]):
self.confuser_groups = confuser_groups

# 构建映射
self.label_to_group = {}
for i, group in enumerate(confuser_groups):
for label in group:
self.label_to_group[label] = i

def refine_prediction(
self,
result: BehaviorResult,
threshold: float = 0.3
) -> BehaviorResult:
"""
细化预测结果

对于易混淆行为组,检查组内其他行为是否也有较高置信度

Args:
result: 原始预测结果
threshold: 置信度差异阈值

Returns:
refined: 细化后结果
"""
predicted_label = result.behavior.value
predicted_conf = result.confidence

# 检查是否属于易混淆组
if predicted_label not in self.label_to_group:
return result

group_idx = self.label_to_group[predicted_label]
group = self.confuser_groups[group_idx]

# 计算组内其他行为的置信度
group_probs = {
label: result.all_probs.get(label, 0.0)
for label in group
}

# 找出组内最高和次高
sorted_probs = sorted(
group_probs.items(),
key=lambda x: x[1],
reverse=True
)

top_label, top_conf = sorted_probs[0]
second_label, second_conf = sorted_probs[1] if len(sorted_probs) > 1 else (None, 0)

# 如果置信度差异小,标记为不确定
if top_conf - second_conf < threshold:
# 返回更通用的类别
return BehaviorResult(
behavior=BehaviorCategory.OTHER,
confidence=top_conf,
all_probs=result.all_probs,
latency_ms=result.latency_ms,
timestamp=result.timestamp,
note=f"Uncertain in group {group}: {top_label} vs {second_label}"
)

return result


class TemporalDecisionHead:
"""时序决策头"""

def __init__(
self,
window_size: int = 30, # 1秒窗口(30fps)
min_confidence: float = 0.7,
min_duration_frames: int = 15, # 0.5秒
cooldown_frames: int = 90 # 3秒冷却
):
self.window_size = window_size
self.min_confidence = min_confidence
self.min_duration_frames = min_duration_frames
self.cooldown_frames = cooldown_frames

# 状态
self.history = []
self.alert_state = {}
self.cooldown_counter = {}

def update(
self,
result: BehaviorResult
) -> Tuple[bool, str, Dict]:
"""
更新时序状态并决定是否触发警告

Args:
result: 当前帧预测结果

Returns:
should_alert: 是否应该警告
alert_type: 警告类型
metadata: 元数据
"""
# 更新历史
self.history.append({
'behavior': result.behavior.value,
'confidence': result.confidence,
'timestamp': result.timestamp
})

if len(self.history) > self.window_size:
self.history.pop(0)

# 更新冷却计数器
for behavior in list(self.cooldown_counter.keys()):
self.cooldown_counter[behavior] -= 1
if self.cooldown_counter[behavior] <= 0:
del self.cooldown_counter[behavior]

# 检查是否应该警告
behavior = result.behavior.value

# 正常行为不警告
if behavior in ['safe_driving', 'looking_forward']:
return False, 'normal', {}

# 在冷却期内不重复警告
if behavior in self.cooldown_counter:
return False, 'cooldown', {
'remaining': self.cooldown_counter[behavior]
}

# 检查置信度
if result.confidence < self.min_confidence:
return False, 'low_confidence', {
'confidence': result.confidence
}

# 检查持续时间
recent = [h for h in self.history[-self.min_duration_frames:]]
behavior_count = sum(
1 for h in recent
if h['behavior'] == behavior
)

if behavior_count < self.min_duration_frames * 0.7:
return False, 'insufficient_duration', {
'count': behavior_count,
'required': int(self.min_duration_frames * 0.7)
}

# 触发警告
self.cooldown_counter[behavior] = self.cooldown_frames

return True, behavior, {
'confidence': result.confidence,
'duration_frames': behavior_count,
'avg_confidence': np.mean([
h['confidence'] for h in recent
if h['behavior'] == behavior
])
}


class RealtimeDMS:
"""实时DMS系统"""

def __init__(
self,
model_path: str,
platform: str = "cpu",
fps: int = 30
):
self.fps = fps

# 组件
self.model = LightweightBehaviorModel(model_path, platform)
self.confuser_handler = ConfuserAwareTaxonomy(
self.model.confuser_groups
)
self.decision_head = TemporalDecisionHead(
window_size=fps * 2, # 2秒窗口
min_duration_frames=int(fps * 0.5) # 0.5秒
)

# 统计
self.stats = {
'total_frames': 0,
'alerts': {},
'avg_latency': []
}

def process_frame(self, frame: np.ndarray) -> Dict:
"""
处理单帧

Args:
frame: 输入帧

Returns:
result: 处理结果
"""
self.stats['total_frames'] += 1

# 模型推理
raw_result = self.model.infer(frame)

# 易混淆行为处理
refined_result = self.confuser_handler.refine_prediction(raw_result)

# 时序决策
should_alert, alert_type, metadata = self.decision_head.update(
refined_result
)

# 更新统计
self.stats['avg_latency'].append(raw_result.latency_ms)
if len(self.stats['avg_latency']) > 100:
self.stats['avg_latency'].pop(0)

if should_alert:
self.stats['alerts'][alert_type] = \
self.stats['alerts'].get(alert_type, 0) + 1

return {
'behavior': refined_result.behavior.value,
'confidence': refined_result.confidence,
'latency_ms': raw_result.latency_ms,
'should_alert': should_alert,
'alert_type': alert_type,
'metadata': metadata
}

def get_stats(self) -> Dict:
"""获取统计信息"""
return {
'total_frames': self.stats['total_frames'],
'alerts': self.stats['alerts'],
'avg_latency_ms': np.mean(self.stats['avg_latency']),
'fps_estimate': 1000 / np.mean(self.stats['avg_latency'])
}


# 边缘设备部署脚本
def deploy_to_raspberrypi():
"""部署到树莓派5"""
script = """
# 树莓派5部署脚本

# 1. 安装依赖
sudo apt-get update
sudo apt-get install -y python3-pip libopenblas-dev

pip3 install onnxruntime opencv-python numpy

# 2. 下载模型(INT8量化版本)
wget https://example.com/dms_model_int8.onnx

# 3. 运行推理
python3 dms_inference.py --model dms_model_int8.onnx --platform cpu

# 预期性能:
# - 帧率:~16 FPS
# - 延迟:<60ms
# - CPU占用:~80%
"""
return script


def deploy_to_coral():
"""部署到Google Coral"""
script = """
# Google Coral部署脚本

# 1. 安装Edge TPU运行时
sudo apt-get install -y libedgetpu1-std

# 2. 安装PyCoral
pip3 install pycoral opencv-python numpy

# 3. 转换模型到Edge TPU格式
edgetpu_compiler dms_model.tflite

# 4. 运行推理
python3 dms_inference.py --model dms_model_edgetpu.tflite --platform edgetpu

# 预期性能:
# - 帧率:~25 FPS
# - 延迟:~40ms
# - TPU占用:~60%
"""
return script


# 测试
if __name__ == "__main__":
# 模拟测试
print("实时DMS系统测试")
print("=" * 50)

# 创建系统(模拟)
dms = RealtimeDMS(
model_path="dms_model.onnx",
platform="cpu",
fps=30
)

# 模拟帧处理
import cv2
# frame = cv2.imread("test_frame.jpg")

# 打印配置
print(f"行为类别数: {len(dms.model.behavior_names)}")
print(f"易混淆行为组: {len(dms.model.confuser_groups)}")
print(f"时序窗口: {dms.decision_head.window_size} 帧")
print(f"最小持续时间: {dms.decision_head.min_duration_frames} 帧")
print(f"冷却时间: {dms.decision_head.cooldown_frames} 帧")

2. 模型量化实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class ModelQuantizer:
"""模型量化工具"""

def __init__(self, model: torch.nn.Module):
self.model = model

def quantize_to_int8(
self,
calibration_data: List[np.ndarray],
backend: str = "onnx"
) -> str:
"""
量化到INT8

Args:
calibration_data: 校准数据
backend: 目标后端

Returns:
quantized_model_path: 量化后模型路径
"""
import torch.quantization as quant

# 准备量化
self.model.qconfig = quant.get_default_qconfig('qnnpack')
quant.prepare(self.model, inplace=True)

# 校准
with torch.no_grad():
for data in calibration_data:
self.model(torch.from_numpy(data))

# 转换
quant.convert(self.model, inplace=True)

# 导出
if backend == "onnx":
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
self.model,
dummy_input,
"dms_quantized.onnx",
opset_version=13
)
return "dms_quantized.onnx"

return ""

def convert_to_edgetpu(self, tflite_path: str) -> str:
"""
转换到Edge TPU格式

Args:
tflite_path: TFLite模型路径

Returns:
edgetpu_model_path: Edge TPU模型路径
"""
import subprocess

# 编译
result = subprocess.run(
["edgetpu_compiler", tflite_path],
capture_output=True
)

if result.returncode == 0:
return tflite_path.replace(".tflite", "_edgetpu.tflite")

raise RuntimeError(f"Edge TPU compilation failed: {result.stderr}")

实验结果

性能对比

平台 帧率 延迟 功耗 成本
Raspberry Pi 5 16 FPS <60ms ~5W $80
Google Coral 25 FPS ~40ms ~2W $150
Jetson Nano 30 FPS ~33ms ~10W $99
PC (RTX 3060) 100+ FPS <10ms ~150W $300+

行为识别准确率

行为类别 准确率 召回率 F1
安全驾驶 94.2% 96.1% 95.1%
手机使用 88.5% 85.3% 86.9%
疲劳(打哈欠) 91.3% 89.7% 90.5%
疲劳(闭眼) 93.8% 92.4% 93.1%
吃东西/喝饮料 86.2% 84.1% 85.1%

Euro NCAP合规性

检测要求

Euro NCAP要求 系统实现 状态
分心检测(手机) 手机使用、发短信、打电话
分心检测(其他) 吃喝、调节设备、后取物
疲劳检测 打哈欠、闭眼、低头
响应时间 ≤3秒 ✅ (~40-60ms)
误报率 <5% ⚠️ 4.2%

IMS应用启示

部署建议

车型级别 推荐平台 原因
经济型 Raspberry Pi 5 成本低,性能够用
中端型 Google Coral 性能更好,功耗低
高端型 Qualcomm QCS8255 集成度高,功能丰富

优化方向

  1. 模型进一步压缩

    • 知识蒸馏
    • 神经架构搜索
    • 剪枝
  2. 多任务扩展

    • 增加视线估计
    • 增加身份识别
    • 增加情绪检测
  3. 鲁棒性提升

    • 跨域适应
    • 少样本学习
    • 自监督预训练

总结

核心贡献

  1. 低成本硬件实时DMS:树莓派5实现16 FPS
  2. 17种行为类别:覆盖主要分心/疲劳行为
  3. 易混淆行为处理:专门设计标签分类
  4. 时序决策机制:置信度+持续性双重保障

实用价值

  • 量产可行性高:低成本硬件,成熟方案
  • Euro NCAP合规:满足2026要求
  • 易于部署:完整Pipeline,支持多平台

参考资源: