安全带错误佩戴检测:计算机视觉方案详解

Euro NCAP 2026 新要求: Belt Misuse Detection
检测目标: 错误佩戴、未佩戴、佩戴不规范
技术方案: 深度学习 + 关键点检测


Euro NCAP 安全带检测要求

2026 评分标准

检测项 分值 检测时限 警告要求
未佩戴检测 1分 ≤3秒 声光警告
错误佩戴检测 2分 ≤5秒 警告+限速
佩戴确认 0.5分 实时 绿灯确认

错误佩戴类型

类型 描述 风险等级
肩带在身后 肩带未跨过肩膀 🔴 高
肩带在手臂下 肩带从腋下穿过 🔴 高
腰带过高 腰带未贴髋骨 🟡 中
腰带过松 安全带松弛度过大 🟡 中
后向儿童座椅前向安装 儿童座椅安装方向错误 🔴 高

技术实现

1. 检测流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────────────────┐
│ 安全带检测流程 │
├─────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ RGB-IR │───→│ 人体检测 │───→│ 关键点 │ │
│ │ 摄像头 │ │ (YOLO) │ │ 提取 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ↓ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 分类输出 │←───│ 安全带 │←───│ 特征融合 │ │
│ │ (misuse) │ │ 定位 │ │ (CNN) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────┘

2. 深度学习模型

代码实现:

import numpy as np
import torch
import torch.nn as nn
from typing import List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum

class BeltMisuseType(Enum):
    """安全带错误佩戴类型"""
    CORRECT = "correct"
    NOT_WORN = "not_worn"
    SHOULDER_BEHIND = "shoulder_behind"
    SHOULDER_UNDERARM = "shoulder_underarm"
    LAP_TOO_HIGH = "lap_too_high"
    LOOSE = "too_loose"
    UNKNOWN = "unknown"


@dataclass
class Keypoint:
    """关键点"""
    name: str
    x: float
    y: float
    confidence: float


@dataclass
class SeatbeltDetection:
    """安全带检测结果"""
    misuse_type: BeltMisuseType
    confidence: float
    shoulder_belt_points: List[Keypoint]
    lap_belt_points: List[Keypoint]
    body_keypoints: List[Keypoint]
    warning_message: str


class SeatbeltCNN(nn.Module):
    """
    安全带检测 CNN
    
    轻量级网络,适合嵌入式部署
    """
    
    def __init__(self, num_classes: int = 7):
        super().__init__()
        
        # 共享特征提取
        self.features = nn.Sequential(
            # Stage 1
            nn.Conv2d(3, 32, 3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU6(),
            
            # Stage 2 - Depthwise Separable
            self._depthwise_separable(32, 64, stride=2),
            
            # Stage 3
            self._depthwise_separable(64, 128, stride=2),
            
            # Stage 4
            self._depthwise_separable(128, 256, stride=2),
            
            # Stage 5
            self._depthwise_separable(256, 512, stride=1),
        )
        
        # 安全带定位分支
        self.belt_localization = nn.Sequential(
            nn.Conv2d(512, 256, 1),
            nn.ReLU6(),
            nn.Conv2d(256, 2, 1),  # 2: 肩带/腰带
        )
        
        # 分类分支
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(512, 128),
            nn.ReLU6(),
            nn.Dropout(0.2),
            nn.Linear(128, num_classes)
        )
        
        # 关键点回归分支
        self.keypoint_head = nn.Sequential(
            nn.Conv2d(512, 256, 1),
            nn.ReLU6(),
            nn.Conv2d(256, 16, 1),  # 8 keypoints * 2 coords
        )
    
    def _depthwise_separable(
        self,
        in_channels: int,
        out_channels: int,
        stride: int = 1
    ) -> nn.Module:
        """深度可分离卷积"""
        return nn.Sequential(
            # Depthwise
            nn.Conv2d(
                in_channels, in_channels, 3,
                stride=stride, padding=1, groups=in_channels
            ),
            nn.BatchNorm2d(in_channels),
            nn.ReLU6(),
            
            # Pointwise
            nn.Conv2d(in_channels, out_channels, 1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU6(),
        )
    
    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        前向传播
        
        Args:
            x: 输入图像 (B, 3, H, W)
            
        Returns:
            (class_logits, belt_mask, keypoints)
        """
        # 特征提取
        features = self.features(x)
        
        # 分类
        class_logits = self.classifier(features)
        
        # 安全带定位
        belt_mask = self.belt_localization(features)
        
        # 关键点
        keypoints = self.keypoint_head(features)
        
        return class_logits, belt_mask, keypoints


class SeatbeltMisuseDetector:
    """
    安全带错误佩戴检测器
    
    完整的检测流程
    """
    
    # 人体关键点定义
    BODY_KEYPOINTS = [
        "left_shoulder", "right_shoulder",
        "left_hip", "right_hip",
        "left_elbow", "right_elbow",
        "nose", "neck"
    ]
    
    # 安全带关键点
    BELT_KEYPOINTS = [
        "shoulder_belt_top",  # 肩带上端(B柱锚点)
        "shoulder_belt_chest",  # 肩带胸口位置
        "shoulder_belt_shoulder",  # 肩带过肩位置
        "lap_belt_left",  # 腰带左侧
        "lap_belt_right",  # 腰带右侧
        "buckle",  # 锁扣
    ]
    
    def __init__(self, model_path: Optional[str] = None):
        self.model = SeatbeltCNN(num_classes=7)
        
        if model_path:
            self.model.load_state_dict(torch.load(model_path))
        
        self.model.eval()
        
        # 阈值
        self.confidence_threshold = 0.7
        self.loose_threshold = 5.0  # cm
    
    def preprocess(
        self,
        image: np.ndarray
    ) -> torch.Tensor:
        """
        图像预处理
        
        Args:
            image: BGR 图像 (H, W, 3)
            
        Returns:
            tensor: (1, 3, 224, 224)
        """
        # Resize
        image_resized = cv2.resize(image, (224, 224))
        
        # Normalize
        image_normalized = image_resized.astype(np.float32) / 255.0
        
        # HWC -> CHW
        image_chw = np.transpose(image_normalized, (2, 0, 1))
        
        # To tensor
        tensor = torch.from_numpy(image_chw).unsqueeze(0)
        
        return tensor
    
    def detect(
        self,
        image: np.ndarray
    ) -> SeatbeltDetection:
        """
        检测安全带佩戴状态
        
        Args:
            image: 输入图像 (H, W, 3)
            
        Returns:
            detection: 检测结果
        """
        # 预处理
        tensor = self.preprocess(image)
        
        # 推理
        with torch.no_grad():
            class_logits, belt_mask, keypoints = self.model(tensor)
        
        # 分类
        class_probs = torch.softmax(class_logits, dim=1)
        class_idx = torch.argmax(class_probs).item()
        confidence = class_probs[0, class_idx].item()
        
        misuse_type = BeltMisuseType(class_idx)
        
        # 解析关键点
        body_keypoints = self._parse_body_keypoints(keypoints)
        belt_points = self._parse_belt_keypoints(keypoints, image.shape)
        
        # 规则校验
        misuse_type, confidence = self._rule_based_verification(
            misuse_type, confidence, body_keypoints, belt_points
        )
        
        # 生成警告消息
        warning_message = self._generate_warning(misuse_type)
        
        return SeatbeltDetection(
            misuse_type=misuse_type,
            confidence=confidence,
            shoulder_belt_points=[p for p in belt_points if "shoulder" in p.name],
            lap_belt_points=[p for p in belt_points if "lap" in p.name],
            body_keypoints=body_keypoints,
            warning_message=warning_message
        )
    
    def _parse_body_keypoints(
        self,
        keypoints: torch.Tensor
    ) -> List[Keypoint]:
        """解析人体关键点"""
        keypoints_flat = keypoints.flatten().tolist()
        
        body_kps = []
        for i, name in enumerate(self.BODY_KEYPOINTS):
            x = keypoints_flat[i * 2]
            y = keypoints_flat[i * 2 + 1]
            
            # 归一化坐标 → 像素坐标
            body_kps.append(Keypoint(
                name=name,
                x=x * 224,
                y=y * 224,
                confidence=1.0  # 简化
            ))
        
        return body_kps
    
    def _parse_belt_keypoints(
        self,
        keypoints: torch.Tensor,
        image_shape: Tuple[int, int]
    ) -> List[Keypoint]:
        """解析安全带关键点"""
        # 简化:从特征图中提取
        # 实际需要专门的分割网络
        
        return []
    
    def _rule_based_verification(
        self,
        misuse_type: BeltMisuseType,
        confidence: float,
        body_keypoints: List[Keypoint],
        belt_points: List[Keypoint]
    ) -> Tuple[BeltMisuseType, float]:
        """
        基于规则的校验
        
        使用关键点几何关系校验 CNN 输出
        """
        # 提取肩膀关键点
        left_shoulder = next(
            (kp for kp in body_keypoints if kp.name == "left_shoulder"),
            None
        )
        right_shoulder = next(
            (kp for kp in body_keypoints if kp.name == "right_shoulder"),
            None
        )
        
        if not left_shoulder or not right_shoulder:
            return misuse_type, confidence
        
        # 检查肩带位置
        shoulder_belt = next(
            (kp for kp in belt_points if "shoulder_belt_shoulder" in kp.name),
            None
        )
        
        if shoulder_belt:
            # 检查肩带是否在肩膀上
            shoulder_center_x = (left_shoulder.x + right_shoulder.x) / 2
            shoulder_width = abs(right_shoulder.x - left_shoulder.x)
            
            # 肩带应该在肩膀范围内
            if shoulder_belt.x < left_shoulder.x:
                # 肩带在左肩外侧 → 可能在身后
                return BeltMisuseType.SHOULDER_BEHIND, max(confidence, 0.8)
            
            elif shoulder_belt.x > right_shoulder.x:
                # 肩带在右肩外侧
                return BeltMisuseType.SHOULDER_BEHIND, max(confidence, 0.8)
            
            elif shoulder_belt.y > left_shoulder.y + shoulder_width:
                # 肩带在肩膀下方 → 可能在手臂下
                return BeltMisuseType.SHOULDER_UNDERARM, max(confidence, 0.75)
        
        return misuse_type, confidence
    
    def _generate_warning(
        self,
        misuse_type: BeltMisuseType
    ) -> str:
        """生成警告消息"""
        warnings = {
            BeltMisuseType.CORRECT: "安全带佩戴正确",
            BeltMisuseType.NOT_WORN: "⚠️ 请系好安全带",
            BeltMisuseType.SHOULDER_BEHIND: "⚠️ 肩带应在肩前,请调整",
            BeltMisuseType.SHOULDER_UNDERARM: "⚠️ 肩带不应从腋下穿过,请调整",
            BeltMisuseType.LAP_TOO_HIGH: "⚠️ 腰带应贴髋骨,请调整",
            BeltMisuseType.LOOSE: "⚠️ 安全带过松,请拉紧",
            BeltMisuseType.UNKNOWN: "请检查安全带佩戴"
        }
        
        return warnings.get(misuse_type, "请检查安全带")


# 部署代码
class SeatbeltMisuseSystem:
    """
    完整的安全带错误佩戴检测系统
    
    面向车载部署
    """
    
    def __init__(self, model_path: str):
        self.detector = SeatbeltMisuseDetector(model_path)
        
        # 状态追踪
        self.detection_history = []
        self.history_length = 30  # 1秒 @ 30fps
        
        # 警告状态
        self.warning_active = False
        self.warning_cooldown = 0
    
    def process_frame(
        self,
        frame: np.ndarray,
        timestamp: float
    ) -> dict:
        """
        处理单帧
        
        Args:
            frame: 输入帧 (H, W, 3)
            timestamp: 时间戳
            
        Returns:
            result: {
                "misuse_type": 错误类型,
                "confidence": 置信度,
                "warning": 警告消息,
                "action": 干预措施
            }
        """
        # 检测
        detection = self.detector.detect(frame)
        
        # 记录历史
        self.detection_history.append({
            "timestamp": timestamp,
            "detection": detection
        })
        
        if len(self.detection_history) > self.history_length:
            self.detection_history.pop(0)
        
        # 稳定性分析
        stable_type = self._get_stable_detection()
        
        # 决定干预措施
        action = self._determine_action(stable_type)
        
        return {
            "misuse_type": stable_type.value,
            "confidence": detection.confidence,
            "warning": detection.warning_message,
            "action": action
        }
    
    def _get_stable_detection(self) -> BeltMisuseType:
        """获取稳定的检测结果"""
        if len(self.detection_history) < 10:
            return BeltMisuseType.UNKNOWN
        
        # 最近10帧的多数投票
        recent_types = [
            h["detection"].misuse_type 
            for h in self.detection_history[-10:]
        ]
        
        from collections import Counter
        type_counts = Counter(recent_types)
        most_common = type_counts.most_common(1)[0][0]
        
        return most_common
    
    def _determine_action(
        self,
        misuse_type: BeltMisuseType
    ) -> str:
        """决定干预措施"""
        if misuse_type == BeltMisuseType.CORRECT:
            self.warning_active = False
            return "none"
        
        elif misuse_type == BeltMisuseType.NOT_WORN:
            return "warn_continuous"
        
        elif misuse_type in [
            BeltMisuseType.SHOULDER_BEHIND,
            BeltMisuseType.SHOULDER_UNDERARM
        ]:
            return "warn_limit_speed"
        
        else:
            return "warn_once"


# 测试
if __name__ == "__main__":
    import cv2
    
    # 初始化系统
    system = SeatbeltMisuseSystem("seatbelt_model.pth")
    
    # 模拟视频流
    # cap = cv2.VideoCapture(0)
    
    print("安全带错误佩戴检测系统启动")
    print("检测类型:")
    for t in BeltMisuseType:
        print(f"  - {t.value}")

安全带错误佩戴检测:计算机视觉方案详解
https://dapalm.com/2026/04/21/2026-04-21-seatbelt-misuse-detection/
作者
Mars
发布于
2026年4月21日
许可协议