1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| import cv2 import time from collections import deque
class RealTimeDistractionDetector: """ 实时分心检测Pipeline """ def __init__(self, model_path: str, class_names: list, alert_threshold: float = 0.8, window_size: int = 10): self.model = self.load_model(model_path) self.model.eval() self.class_names = class_names self.alert_threshold = alert_threshold self.prediction_history = deque(maxlen=window_size) self.transform = self.get_transform() def load_model(self, path: str): """加载TensorRT优化模型""" model = DSDFormer(num_classes=10) model.load_state_dict(torch.load(path)) return model def get_transform(self): """图像预处理""" from torchvision import transforms return transforms.Compose([ transforms.ToPILImage(), transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] ) ]) def predict_frame(self, frame: np.ndarray): """预测单帧""" img = self.transform(frame) img = img.unsqueeze(0) with torch.no_grad(): logits = self.model(img) probs = torch.softmax(logits, dim=1) return probs[0].cpu().numpy() def detect_distraction(self, frame: np.ndarray): """ 检测分心行为 Returns: result: { 'distraction_type': str, 'confidence': float, 'alert': bool } """ probs = self.predict_frame(frame) self.prediction_history.append(probs) avg_probs = np.mean(self.prediction_history, axis=0) pred_idx = np.argmax(avg_probs) confidence = avg_probs[pred_idx] is_distracted = pred_idx != 0 alert = is_distracted and confidence > self.alert_threshold return { 'distraction_type': self.class_names[pred_idx], 'confidence': float(confidence), 'alert': alert, 'all_probs': avg_probs }
DISTRACTION_CLASSES = [ "正常驾驶", "使用手机", "调节收音机", "喝水", "吃东西", "化妆", "与乘客交谈", "伸手取物", "调节空调", "其他分心" ]
if __name__ == "__main__": detector = RealTimeDistractionDetector( model_path="dsdformer_weights.pth", class_names=DISTRACTION_CLASSES ) frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) result = detector.detect_distraction(frame) print("分心检测结果:") print(f" 类型: {result['distraction_type']}") print(f" 置信度: {result['confidence']:.3f}") print(f" 是否警告: {'是' if result['alert'] else '否'}")
|