1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| import numpy as np from scipy import signal from scipy.fft import fft, fftfreq
class CPDRadarDetector: """60GHz 雷达 CPD 检测器""" def __init__(self, frame_rate=30, fft_size=4096): self.frame_rate = frame_rate self.fft_size = fft_size self.breath_band = (0.1, 0.5) self.heartbeat_band = (1.0, 2.0) def process_point_cloud(self, adc_data): """处理雷达 ADC 数据 Args: adc_data: (n_chirps, n_rx, n_samples) 原始 ADC 数据 Returns: point_cloud: (n_points, 4) [x, y, z, velocity] """ range_fft = fft(adc_data, axis=-1, n=self.fft_size) doppler_fft = fft(range_fft, axis=0, n=self.fft_size) detected_points = self._cfar_detection(doppler_fft) angles = self._digital_beamforming(detected_points) return angles def detect_vital_signs(self, range_bin, time_series): """检测呼吸和心跳 Args: range_bin: 目标距离单元 time_series: 时间序列数据(相位或幅度) Returns: dict: {'breathing_rate': Hz, 'heartbeat_rate': Hz, 'confidence': float} """ detrended = signal.detrend(time_series) breath_filtered = self._bandpass_filter( detrended, self.breath_band[0], self.breath_band[1] ) heart_filtered = self._bandpass_filter( detrended, self.heartbeat_band[0], self.heartbeat_band[1] ) breath_psd = self._compute_psd(breath_filtered) heart_psd = self._compute_psd(heart_filtered) breath_freq = self._find_peak_frequency(breath_psd, self.breath_band) heart_freq = self._find_peak_frequency(heart_psd, self.heartbeat_band) total_energy = np.sum(np.abs(detrended)**2) breath_energy = np.sum(np.abs(breath_filtered)**2) confidence = breath_energy / total_energy return { 'breathing_rate': breath_freq, 'heartbeat_rate': heart_freq, 'breathing_detected': breath_freq > 0, 'heartbeat_detected': heart_freq > 0, 'confidence': confidence, 'is_child_present': confidence > 0.3 } def _bandpass_filter(self, data, low, high): """带通滤波器""" nyquist = self.frame_rate / 2 b, a = signal.butter(4, [low/nyquist, high/nyquist], btype='band') return signal.filtfilt(b, a, data) def _compute_psd(self, data): """计算功率谱密度""" freqs, psd = signal.periodogram(data, fs=self.frame_rate, nfft=self.fft_size) return freqs, psd def _find_peak_frequency(self, psd_tuple, freq_band): """在指定频带内找峰值频率""" freqs, psd = psd_tuple mask = (freqs >= freq_band[0]) & (freqs <= freq_band[1]) if not np.any(mask): return 0 band_psd = psd[mask] band_freqs = freqs[mask] peak_idx = np.argmax(band_psd) return band_freqs[peak_idx]
|