1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
| """ manD 1.0 数据加载器
论文:A multimodal driver monitoring benchmark dataset for driver modeling in assisted driving automation 期刊:Scientific Data (Nature), 2024 数据仓库:Harvard Dataverse """
import numpy as np import pandas as pd import cv2 from pathlib import Path from typing import Dict, List, Optional, Tuple from dataclasses import dataclass import matplotlib.pyplot as plt
@dataclass class DriverStateData: """驾驶员状态数据""" timestamp: np.ndarray perclos: np.ndarray pupil_diameter: np.ndarray gaze_x: np.ndarray gaze_y: np.ndarray head_position: np.ndarray head_rotation: np.ndarray hr_empatica: np.ndarray hrv: np.ndarray eda: np.ndarray scl: np.ndarray scr: np.ndarray temperature: np.ndarray eeg: np.ndarray ecg: np.ndarray hr_biopac: np.ndarray seat_pressure: np.ndarray back_pressure: np.ndarray activity: np.ndarray
class ManD1Dataset: """ manD 1.0 数据集加载器 Example: >>> dataset = ManD1Dataset('/path/to/manD1.0') >>> data = dataset.load_participant('P1', 'Sadness') >>> print(f"Duration: {len(data.timestamp) / 256:.1f} seconds") """ def __init__(self, root_path: str): self.root = Path(root_path) self.sampling_rate = 256 self.participants = self._list_participants() self.data_info = pd.read_excel(self.root / 'DataInfoSheet.xlsx') def _list_participants(self) -> List[str]: """列出所有参与者""" without_ms = list((self.root / 'withoutMS').glob('P*.7z')) with_ms = list((self.root / 'withMS').glob('P*.7z')) return [p.stem for p in sorted(without_ms + with_ms)] def load_participant( self, participant: str, scenario: str, data_types: Optional[List[str]] = None ) -> DriverStateData: """ 加载单个参与者的单个场景数据 Args: participant: 参与者 ID(如 'P1') scenario: 场景名称('Familiarization', 'NoEmotion', 'Anger', 'Surprise', 'Sadness', 'Fear') data_types: 要加载的数据类型,None 表示全部 Returns: DriverStateData: 统一格式的数据对象 """ if participant in [p.stem for p in (self.root / 'withoutMS').glob('P*.7z')]: base_path = self.root / 'withoutMS' / participant / scenario / 'DriverState' else: base_path = self.root / 'withMS' / participant / scenario / 'DriverState' eye_data = self._load_eye_tracking(base_path / 'EyeTracking.txt') ppg_eda = self._load_ppg_eda(base_path / 'PhysiologicalData' / 'PPG_EDA.txt') eeg_ecg = self._load_eeg_ecg(base_path / 'PhysiologicalData' / 'EEG_ECG.txt') pressure = self._load_pressure(base_path / 'SeatPressureSensor' / 'SeatPressureDistribution.txt') activity = self._load_activity(base_path / 'ActivityLabels.txt') return DriverStateData( timestamp=np.arange(len(eye_data['perclos'])) / self.sampling_rate, perclos=eye_data['perclos'], pupil_diameter=eye_data['pupil_diameter'], gaze_x=eye_data['gaze_x'], gaze_y=eye_data['gaze_y'], head_position=eye_data['head_position'], head_rotation=eye_data['head_rotation'], hr_empatica=ppg_eda['hr'], hrv=ppg_eda['hrv'], eda=ppg_eda['eda'], scl=ppg_eda['scl'], scr=ppg_eda['scr'], temperature=ppg_eda['temperature'], eeg=eeg_ecg['eeg'], ecg=eeg_ecg['ecg'], hr_biopac=eeg_ecg['hr'], seat_pressure=pressure['seat'], back_pressure=pressure['back'], activity=activity ) def _load_eye_tracking(self, path: Path) -> dict: """加载眼动追踪数据""" df = pd.read_csv(path, sep='\t') return { 'perclos': df['PERCLOS'].values, 'pupil_diameter': df['PupilDiameter'].values, 'gaze_x': df['GazeX'].values, 'gaze_y': df['GazeY'].values, 'head_position': df[['HeadX', 'HeadY', 'HeadZ']].values, 'head_rotation': df[['HeadRotX', 'HeadRotY', 'HeadRotZ']].values } def _load_ppg_eda(self, path: Path) -> dict: """加载 PPG/EDA 生理数据""" df = pd.read_csv(path, sep='\t') return { 'hr': df['Empatica/HR'].values, 'hrv': df.get('Empatica/HRV', np.zeros(len(df))).values, 'eda': df['Empatica/EDA'].values, 'scl': df['Empatica/SCL'].values, 'scr': df['Empatica/SCR'].values, 'temperature': df['Empatica/Temperature'].values } def _load_eeg_ecg(self, path: Path) -> dict: """加载 EEG/ECG 数据""" df = pd.read_csv(path, sep='\t') eeg_channels = ['Poz', 'Fz', 'Cz', 'C3', 'C4', 'F3', 'F4', 'P3', 'P4'] return { 'eeg': df[eeg_channels].values, 'ecg': df['ECG'].values, 'hr': df['BIOPAC/HR'].values } def _load_pressure(self, path: Path) -> dict: """加载座椅压力数据""" df = pd.read_csv(path, sep='\t') seat_cols = [f'Seat_{i:03d}' for i in range(1024)] seat = df[seat_cols].values.reshape(-1, 32, 32) back_cols = [f'Back_{i:03d}' for i in range(1024)] back = df[back_cols].values.reshape(-1, 32, 32) return {'seat': seat, 'back': back} def _load_activity(self, path: Path) -> np.ndarray: """加载活动标签""" df = pd.read_csv(path, sep='\t') return df['Activity'].values def load_video( self, participant: str, scenario: str ) -> cv2.VideoCapture: """ 加载面部视频 Returns: cv2.VideoCapture: OpenCV 视频对象 """ base_path = self.root / 'withoutMS' / participant / scenario / 'DriverState' video_path = base_path / 'Face_cropped.mp4' return cv2.VideoCapture(str(video_path))
if __name__ == "__main__": dataset = ManD1Dataset('/path/to/manD1.0') data = dataset.load_participant('P1', 'Sadness') print(f"数据时长: {len(data.timestamp) / 256:.1f} 秒") print(f"PERCLOS 范围: {data.perclos.min():.1f}% - {data.perclos.max():.1f}%") print(f"心率范围: {np.nanmin(data.hr_empatica):.1f} - {np.nanmax(data.hr_empatica):.1f} BPM") print(f"EEG 数据形状: {data.eeg.shape}") print(f"座椅压力数据形状: {data.seat_pressure.shape}") fig, axes = plt.subplots(2, 1, figsize=(12, 6), sharex=True) time = data.timestamp / 60 axes[0].plot(time, data.perclos, 'b-', linewidth=0.5) axes[0].set_ylabel('PERCLOS (%)') axes[0].set_title('驾驶员疲劳指标 (PERCLOS)') axes[0].grid(True, alpha=0.3) axes[1].plot(time, data.hr_empatica, 'r-', linewidth=0.5) axes[1].set_xlabel('时间 (分钟)') axes[1].set_ylabel('心率 (BPM)') axes[1].set_title('驾驶员心率') axes[1].grid(True, alpha=0.3) plt.tight_layout() plt.savefig('driver_state_analysis.png', dpi=150) print("图表已保存: driver_state_analysis.png")
|