1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| class UWBRadarPipeline: """UWB雷达信号处理流水线""" def __init__(self): self.pipeline_stages = [ "data_extraction", "spatial_gating", "rbm_detection", "source_separation", "vital_signs_estimation" ] def process(self, cir_data: np.ndarray): """ 完整处理流水线 Args: cir_data: Channel Impulse Response数据 Returns: result: 生命体征检测结果 """ raw_signal = self.extract_raw_signal(cir_data) gated_signal = self.spatial_gating(raw_signal) clean_signal, rbm_events = self.detect_rbm(gated_signal) breathing, heartbeat = self.source_separation(clean_signal) result = self.estimate_vitals(breathing, heartbeat) return result def extract_raw_signal(self, cir_data): """提取原始信号""" peak_idx = np.argmax(np.abs(cir_data), axis=1) return cir_data[np.arange(len(cir_data)), peak_idx] def spatial_gating(self, signal): """空间门控,滤除静态杂波""" sos = signal.butter(4, 0.1, btype='high', fs=100, output='sos') return signal.sosfilt(sos, signal) def detect_rbm(self, signal): """检测快速身体运动(RBM)""" window_size = 100 threshold = 3.0 variances = [] for i in range(0, len(signal) - window_size, window_size//2): window = signal[i:i+window_size] variances.append(np.std(window)) variances = np.array(variances) rbm_mask = variances > threshold * np.median(variances) clean_signal = signal.copy() rbm_indices = np.where(rbm_mask)[0] * (window_size // 2) clean_signal[rbm_indices] = 0 return clean_signal, rbm_mask def source_separation(self, signal): """分离呼吸和心跳信号""" sos_breath = signal.butter(4, [0.2, 0.8], btype='band', fs=100, output='sos') breathing = signal.sosfilt(sos_breath, signal) sos_heart = signal.butter(4, [1.0, 2.0], btype='band', fs=100, output='sos') heartbeat = signal.sosfilt(sos_heart, signal) return breathing, heartbeat def estimate_vitals(self, breathing, heartbeat): """估计生命体征""" freq_breath = np.fft.rfftfreq(len(breathing), 0.01) spectrum_breath = np.abs(np.fft.rfft(breathing)) breath_freq = freq_breath[np.argmax(spectrum_breath)] breath_bpm = breath_freq * 60 freq_heart = np.fft.rfftfreq(len(heartbeat), 0.01) spectrum_heart = np.abs(np.fft.rfft(heartbeat)) heart_freq = freq_heart[np.argmax(spectrum_heart)] heart_bpm = heart_freq * 60 return { "breathing_rate": breath_bpm, "heart_rate": heart_bpm, "confidence": np.max(spectrum_breath) / np.sum(spectrum_breath) }
if __name__ == "__main__": pipeline = UWBRadarPipeline() t = np.linspace(0, 60, 6000) breathing = np.sin(2 * np.pi * 0.35 * t) heartbeat = np.sin(2 * np.pi * 1.5 * t) noise = np.random.randn(6000) * 0.1 cir_data = np.random.randn(6000, 256) * 0.1 cir_data[:, 128] += (breathing + heartbeat * 0.3 + noise) * 10 result = pipeline.process(cir_data) print("生命体征检测结果:") print(f" 呼吸率: {result['breathing_rate']:.1f} BPM") print(f" 心率: {result['heart_rate']:.1f} BPM") print(f" 置信度: {result['confidence']:.2f}")
|