安全带错误佩戴检测:计算机视觉方案详解
Euro NCAP 2026 新要求: Belt Misuse Detection
检测目标: 错误佩戴、未佩戴、佩戴不规范
技术方案: 深度学习 + 关键点检测
Euro NCAP 安全带检测要求
2026 评分标准
| 检测项 | 分值 | 检测时限 | 警告要求 |
|---|---|---|---|
| 未佩戴检测 | 1分 | ≤3秒 | 声光警告 |
| 错误佩戴检测 | 2分 | ≤5秒 | 警告+限速 |
| 佩戴确认 | 0.5分 | 实时 | 绿灯确认 |
错误佩戴类型
| 类型 | 描述 | 风险等级 |
|---|---|---|
| 肩带在身后 | 肩带未跨过肩膀 | 🔴 高 |
| 肩带在手臂下 | 肩带从腋下穿过 | 🔴 高 |
| 腰带过高 | 腰带未贴髋骨 | 🟡 中 |
| 腰带过松 | 安全带松弛度过大 | 🟡 中 |
| 后向儿童座椅前向安装 | 儿童座椅安装方向错误 | 🔴 高 |
技术实现
1. 检测流程
1 | |
2. 深度学习模型
代码实现:
import numpy as np
import torch
import torch.nn as nn
from typing import List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
class BeltMisuseType(Enum):
"""安全带错误佩戴类型"""
CORRECT = "correct"
NOT_WORN = "not_worn"
SHOULDER_BEHIND = "shoulder_behind"
SHOULDER_UNDERARM = "shoulder_underarm"
LAP_TOO_HIGH = "lap_too_high"
LOOSE = "too_loose"
UNKNOWN = "unknown"
@dataclass
class Keypoint:
"""关键点"""
name: str
x: float
y: float
confidence: float
@dataclass
class SeatbeltDetection:
"""安全带检测结果"""
misuse_type: BeltMisuseType
confidence: float
shoulder_belt_points: List[Keypoint]
lap_belt_points: List[Keypoint]
body_keypoints: List[Keypoint]
warning_message: str
class SeatbeltCNN(nn.Module):
"""
安全带检测 CNN
轻量级网络,适合嵌入式部署
"""
def __init__(self, num_classes: int = 7):
super().__init__()
# 共享特征提取
self.features = nn.Sequential(
# Stage 1
nn.Conv2d(3, 32, 3, stride=2, padding=1),
nn.BatchNorm2d(32),
nn.ReLU6(),
# Stage 2 - Depthwise Separable
self._depthwise_separable(32, 64, stride=2),
# Stage 3
self._depthwise_separable(64, 128, stride=2),
# Stage 4
self._depthwise_separable(128, 256, stride=2),
# Stage 5
self._depthwise_separable(256, 512, stride=1),
)
# 安全带定位分支
self.belt_localization = nn.Sequential(
nn.Conv2d(512, 256, 1),
nn.ReLU6(),
nn.Conv2d(256, 2, 1), # 2: 肩带/腰带
)
# 分类分支
self.classifier = nn.Sequential(
nn.AdaptiveAvgPool2d(1),
nn.Flatten(),
nn.Linear(512, 128),
nn.ReLU6(),
nn.Dropout(0.2),
nn.Linear(128, num_classes)
)
# 关键点回归分支
self.keypoint_head = nn.Sequential(
nn.Conv2d(512, 256, 1),
nn.ReLU6(),
nn.Conv2d(256, 16, 1), # 8 keypoints * 2 coords
)
def _depthwise_separable(
self,
in_channels: int,
out_channels: int,
stride: int = 1
) -> nn.Module:
"""深度可分离卷积"""
return nn.Sequential(
# Depthwise
nn.Conv2d(
in_channels, in_channels, 3,
stride=stride, padding=1, groups=in_channels
),
nn.BatchNorm2d(in_channels),
nn.ReLU6(),
# Pointwise
nn.Conv2d(in_channels, out_channels, 1),
nn.BatchNorm2d(out_channels),
nn.ReLU6(),
)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
前向传播
Args:
x: 输入图像 (B, 3, H, W)
Returns:
(class_logits, belt_mask, keypoints)
"""
# 特征提取
features = self.features(x)
# 分类
class_logits = self.classifier(features)
# 安全带定位
belt_mask = self.belt_localization(features)
# 关键点
keypoints = self.keypoint_head(features)
return class_logits, belt_mask, keypoints
class SeatbeltMisuseDetector:
"""
安全带错误佩戴检测器
完整的检测流程
"""
# 人体关键点定义
BODY_KEYPOINTS = [
"left_shoulder", "right_shoulder",
"left_hip", "right_hip",
"left_elbow", "right_elbow",
"nose", "neck"
]
# 安全带关键点
BELT_KEYPOINTS = [
"shoulder_belt_top", # 肩带上端(B柱锚点)
"shoulder_belt_chest", # 肩带胸口位置
"shoulder_belt_shoulder", # 肩带过肩位置
"lap_belt_left", # 腰带左侧
"lap_belt_right", # 腰带右侧
"buckle", # 锁扣
]
def __init__(self, model_path: Optional[str] = None):
self.model = SeatbeltCNN(num_classes=7)
if model_path:
self.model.load_state_dict(torch.load(model_path))
self.model.eval()
# 阈值
self.confidence_threshold = 0.7
self.loose_threshold = 5.0 # cm
def preprocess(
self,
image: np.ndarray
) -> torch.Tensor:
"""
图像预处理
Args:
image: BGR 图像 (H, W, 3)
Returns:
tensor: (1, 3, 224, 224)
"""
# Resize
image_resized = cv2.resize(image, (224, 224))
# Normalize
image_normalized = image_resized.astype(np.float32) / 255.0
# HWC -> CHW
image_chw = np.transpose(image_normalized, (2, 0, 1))
# To tensor
tensor = torch.from_numpy(image_chw).unsqueeze(0)
return tensor
def detect(
self,
image: np.ndarray
) -> SeatbeltDetection:
"""
检测安全带佩戴状态
Args:
image: 输入图像 (H, W, 3)
Returns:
detection: 检测结果
"""
# 预处理
tensor = self.preprocess(image)
# 推理
with torch.no_grad():
class_logits, belt_mask, keypoints = self.model(tensor)
# 分类
class_probs = torch.softmax(class_logits, dim=1)
class_idx = torch.argmax(class_probs).item()
confidence = class_probs[0, class_idx].item()
misuse_type = BeltMisuseType(class_idx)
# 解析关键点
body_keypoints = self._parse_body_keypoints(keypoints)
belt_points = self._parse_belt_keypoints(keypoints, image.shape)
# 规则校验
misuse_type, confidence = self._rule_based_verification(
misuse_type, confidence, body_keypoints, belt_points
)
# 生成警告消息
warning_message = self._generate_warning(misuse_type)
return SeatbeltDetection(
misuse_type=misuse_type,
confidence=confidence,
shoulder_belt_points=[p for p in belt_points if "shoulder" in p.name],
lap_belt_points=[p for p in belt_points if "lap" in p.name],
body_keypoints=body_keypoints,
warning_message=warning_message
)
def _parse_body_keypoints(
self,
keypoints: torch.Tensor
) -> List[Keypoint]:
"""解析人体关键点"""
keypoints_flat = keypoints.flatten().tolist()
body_kps = []
for i, name in enumerate(self.BODY_KEYPOINTS):
x = keypoints_flat[i * 2]
y = keypoints_flat[i * 2 + 1]
# 归一化坐标 → 像素坐标
body_kps.append(Keypoint(
name=name,
x=x * 224,
y=y * 224,
confidence=1.0 # 简化
))
return body_kps
def _parse_belt_keypoints(
self,
keypoints: torch.Tensor,
image_shape: Tuple[int, int]
) -> List[Keypoint]:
"""解析安全带关键点"""
# 简化:从特征图中提取
# 实际需要专门的分割网络
return []
def _rule_based_verification(
self,
misuse_type: BeltMisuseType,
confidence: float,
body_keypoints: List[Keypoint],
belt_points: List[Keypoint]
) -> Tuple[BeltMisuseType, float]:
"""
基于规则的校验
使用关键点几何关系校验 CNN 输出
"""
# 提取肩膀关键点
left_shoulder = next(
(kp for kp in body_keypoints if kp.name == "left_shoulder"),
None
)
right_shoulder = next(
(kp for kp in body_keypoints if kp.name == "right_shoulder"),
None
)
if not left_shoulder or not right_shoulder:
return misuse_type, confidence
# 检查肩带位置
shoulder_belt = next(
(kp for kp in belt_points if "shoulder_belt_shoulder" in kp.name),
None
)
if shoulder_belt:
# 检查肩带是否在肩膀上
shoulder_center_x = (left_shoulder.x + right_shoulder.x) / 2
shoulder_width = abs(right_shoulder.x - left_shoulder.x)
# 肩带应该在肩膀范围内
if shoulder_belt.x < left_shoulder.x:
# 肩带在左肩外侧 → 可能在身后
return BeltMisuseType.SHOULDER_BEHIND, max(confidence, 0.8)
elif shoulder_belt.x > right_shoulder.x:
# 肩带在右肩外侧
return BeltMisuseType.SHOULDER_BEHIND, max(confidence, 0.8)
elif shoulder_belt.y > left_shoulder.y + shoulder_width:
# 肩带在肩膀下方 → 可能在手臂下
return BeltMisuseType.SHOULDER_UNDERARM, max(confidence, 0.75)
return misuse_type, confidence
def _generate_warning(
self,
misuse_type: BeltMisuseType
) -> str:
"""生成警告消息"""
warnings = {
BeltMisuseType.CORRECT: "安全带佩戴正确",
BeltMisuseType.NOT_WORN: "⚠️ 请系好安全带",
BeltMisuseType.SHOULDER_BEHIND: "⚠️ 肩带应在肩前,请调整",
BeltMisuseType.SHOULDER_UNDERARM: "⚠️ 肩带不应从腋下穿过,请调整",
BeltMisuseType.LAP_TOO_HIGH: "⚠️ 腰带应贴髋骨,请调整",
BeltMisuseType.LOOSE: "⚠️ 安全带过松,请拉紧",
BeltMisuseType.UNKNOWN: "请检查安全带佩戴"
}
return warnings.get(misuse_type, "请检查安全带")
# 部署代码
class SeatbeltMisuseSystem:
"""
完整的安全带错误佩戴检测系统
面向车载部署
"""
def __init__(self, model_path: str):
self.detector = SeatbeltMisuseDetector(model_path)
# 状态追踪
self.detection_history = []
self.history_length = 30 # 1秒 @ 30fps
# 警告状态
self.warning_active = False
self.warning_cooldown = 0
def process_frame(
self,
frame: np.ndarray,
timestamp: float
) -> dict:
"""
处理单帧
Args:
frame: 输入帧 (H, W, 3)
timestamp: 时间戳
Returns:
result: {
"misuse_type": 错误类型,
"confidence": 置信度,
"warning": 警告消息,
"action": 干预措施
}
"""
# 检测
detection = self.detector.detect(frame)
# 记录历史
self.detection_history.append({
"timestamp": timestamp,
"detection": detection
})
if len(self.detection_history) > self.history_length:
self.detection_history.pop(0)
# 稳定性分析
stable_type = self._get_stable_detection()
# 决定干预措施
action = self._determine_action(stable_type)
return {
"misuse_type": stable_type.value,
"confidence": detection.confidence,
"warning": detection.warning_message,
"action": action
}
def _get_stable_detection(self) -> BeltMisuseType:
"""获取稳定的检测结果"""
if len(self.detection_history) < 10:
return BeltMisuseType.UNKNOWN
# 最近10帧的多数投票
recent_types = [
h["detection"].misuse_type
for h in self.detection_history[-10:]
]
from collections import Counter
type_counts = Counter(recent_types)
most_common = type_counts.most_common(1)[0][0]
return most_common
def _determine_action(
self,
misuse_type: BeltMisuseType
) -> str:
"""决定干预措施"""
if misuse_type == BeltMisuseType.CORRECT:
self.warning_active = False
return "none"
elif misuse_type == BeltMisuseType.NOT_WORN:
return "warn_continuous"
elif misuse_type in [
BeltMisuseType.SHOULDER_BEHIND,
BeltMisuseType.SHOULDER_UNDERARM
]:
return "warn_limit_speed"
else:
return "warn_once"
# 测试
if __name__ == "__main__":
import cv2
# 初始化系统
system = SeatbeltMisuseSystem("seatbelt_model.pth")
# 模拟视频流
# cap = cv2.VideoCapture(0)
print("安全带错误佩戴检测系统启动")
print("检测类型:")
for t in BeltMisuseType:
print(f" - {t.value}")
安全带错误佩戴检测:计算机视觉方案详解
https://dapalm.com/2026/04/21/2026-04-21-seatbelt-misuse-detection/