1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
| import numpy as np from scipy.signal import butter, filtfilt, find_peaks from typing import Tuple, Optional
class MicromovementDetector: """ 微动检测器 检测儿童/宠物的呼吸和心跳微动信号 用于 CPD(儿童存在检测) """ def __init__(self, fs: int = 100): """ Args: fs: 雷达采样率(Hz) """ self.fs = fs self.breath_band = (0.1, 0.7) self.heart_band = (0.8, 3.0) self.breath_filter = self._design_bandpass(self.breath_band) self.heart_filter = self._design_bandpass(self.heart_band) def _design_bandpass(self, band: Tuple[float, float]) -> Tuple: """设计带通滤波器""" nyq = self.fs / 2 low = band[0] / nyq high = band[1] / nyq return butter(4, [low, high], btype='band') def detect( self, radar_signal: np.ndarray, duration: float = 5.0 ) -> dict: """ 检测微动信号 Args: radar_signal: 雷达信号(复数,包含相位信息) duration: 检测窗口时长(秒) Returns: { 'has_micromovement': bool, 'breathing_detected': bool, 'heartbeat_detected': bool, 'breathing_rate': float, # 次/分钟 'heartbeat_rate': float, # 次/分钟 'confidence': float } """ phase = np.angle(radar_signal) phase_unwrapped = np.unwrap(phase) breath_signal = filtfilt(self.breath_filter[0], self.breath_filter[1], phase_unwrapped) heart_signal = filtfilt(self.heart_filter[0], self.heart_filter[1], phase_unwrapped) breathing_detected, breathing_rate = self._detect_periodic( breath_signal, self.breath_band ) heartbeat_detected, heartbeat_rate = self._detect_periodic( heart_signal, self.heart_band ) has_micromovement = breathing_detected or heartbeat_detected confidence = self._calculate_confidence( breath_signal, heart_signal, breathing_detected, heartbeat_detected ) return { 'has_micromovement': has_micromovement, 'breathing_detected': breathing_detected, 'heartbeat_detected': heartbeat_detected, 'breathing_rate': breathing_rate, 'heartbeat_rate': heartbeat_rate, 'confidence': confidence } def _detect_periodic( self, signal: np.ndarray, freq_band: Tuple[float, float] ) -> Tuple[bool, float]: """ 检测周期性信号 Returns: (detected, rate) - rate 单位:次/分钟 """ n = len(signal) freq = np.fft.rfftfreq(n, 1.0 / self.fs) fft_mag = np.abs(np.fft.rfft(signal)) mask = (freq >= freq_band[0]) & (freq <= freq_band[1]) if not np.any(mask): return False, 0.0 freq_in_band = freq[mask] mag_in_band = fft_mag[mask] peak_idx = np.argmax(mag_in_band) peak_freq = freq_in_band[peak_idx] peak_mag = mag_in_band[peak_idx] avg_mag = np.mean(mag_in_band) snr = peak_mag / avg_mag if avg_mag > 0 else 0 detected = snr > 3.0 rate = peak_freq * 60.0 return detected, rate def _calculate_confidence( self, breath_signal: np.ndarray, heart_signal: np.ndarray, breathing_detected: bool, heartbeat_detected: bool ) -> float: """计算检测置信度""" breath_energy = np.var(breath_signal) heart_energy = np.var(heart_signal) base_confidence = 0.5 if breathing_detected: base_confidence += 0.3 if heartbeat_detected: base_confidence += 0.2 energy_factor = min(breath_energy + heart_energy, 1.0) confidence = min(base_confidence * (1 + energy_factor), 1.0) return confidence
if __name__ == "__main__": detector = MicromovementDetector(fs=100) t = np.arange(0, 10, 0.01) breath_component = 0.1 * np.sin(2 * np.pi * 0.3 * t) heart_component = 0.02 * np.sin(2 * np.pi * 1.5 * t) noise = 0.01 * np.random.randn(len(t)) phase = breath_component + heart_component + noise radar_signal = np.exp(1j * phase * 10) result = detector.detect(radar_signal) print("微动检测结果:") print(f" 检测到微动: {result['has_micromovement']}") print(f" 呼吸检测: {result['breathing_detected']} ({result['breathing_rate']:.1f} 次/分钟)") print(f" 心跳检测: {result['heartbeat_detected']} ({result['heartbeat_rate']:.1f} 次/分钟)") print(f" 置信度: {result['confidence']:.2f}")
|