1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| import tflite_runtime.interpreter as tflite import numpy as np import cv2 import time
class EdgeDMSInference: """边缘 DMS 推理引擎""" def __init__(self, model_path: str, num_threads: int = 4): """ Args: model_path: TFLite 模型路径 num_threads: CPU 线程数 """ self.interpreter = tflite.Interpreter( model_path=model_path, num_threads=num_threads ) self.interpreter.allocate_tensors() self.input_details = self.interpreter.get_input_details() self.output_details = self.interpreter.get_output_details() self.input_shape = self.input_details[0]['shape'] self.input_height = self.input_shape[1] self.input_width = self.input_shape[2] self.input_scale = self.input_details[0]['quantization_parameters']['scales'][0] self.input_zero_point = self.input_details[0]['quantization_parameters']['zero_points'][0] self.class_names = [ 'normal', 'phone_talk_left', 'phone_talk_right', 'phone_text_left', 'phone_text_right', 'eating', 'drinking', 'smoking', 'look_left', 'look_down', 'look_right', 'reaching_behind', 'grooming', 'control_panel', 'yawning', 'eyes_closed_sleep' ] self.temporal_head = TemporalDecisionHead() self.inference_times = [] def preprocess(self, frame: np.ndarray) -> np.ndarray: """预处理""" resized = cv2.resize(frame, (self.input_width, self.input_height)) normalized = resized.astype(np.float32) / 255.0 quantized = (normalized / self.input_scale + self.input_zero_point).astype(np.uint8) return np.expand_dims(quantized, axis=0) def inference(self, frame: np.ndarray, frame_idx: int) -> dict: """推理 Returns: { 'behavior': str, 'confidence': float, 'alert': Optional[Alert], 'latency_ms': float } """ start_time = time.time() input_data = self.preprocess(frame) self.interpreter.set_tensor(self.input_details[0]['index'], input_data) self.interpreter.invoke() output = self.interpreter.get_tensor(self.output_details[0]['index']) output_scale = self.output_details[0]['quantization_parameters']['scales'][0] output_zero_point = self.output_details[0]['quantization_parameters']['zero_points'][0] probabilities = (output.astype(np.float32) - output_zero_point) * output_scale probabilities = self._softmax(probabilities[0]) latency_ms = (time.time() - start_time) * 1000 self.inference_times.append(latency_ms) alert = self.temporal_head.update(frame_idx, probabilities, self.class_names) max_idx = np.argmax(probabilities) return { 'behavior': self.class_names[max_idx], 'confidence': probabilities[max_idx], 'probabilities': probabilities, 'alert': alert, 'latency_ms': latency_ms } def _softmax(self, x: np.ndarray) -> np.ndarray: """Softmax""" exp_x = np.exp(x - np.max(x)) return exp_x / exp_x.sum() def get_stats(self) -> dict: """获取性能统计""" return { 'avg_latency_ms': np.mean(self.inference_times), 'max_latency_ms': np.max(self.inference_times), 'min_latency_ms': np.min(self.inference_times), 'fps': 1000 / np.mean(self.inference_times) }
|