前言:为什么需要多流同步? 17.1 多流同步的重要性 复杂感知流水线需要多个数据流的聚合:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 ┌─────────────────────────────────────────────────────────────────────────┐ │ 多流同步的重要性 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 问题:如何聚合多个数据源? │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ IMS DMS 场景: │ │ │ │ │ │ │ │ 输入流: │ │ │ │ • IR 图像流(30 FPS) │ │ │ │ • 眼睛状态流(30 FPS) │ │ │ │ • 头部姿态流(30 FPS) │ │ │ │ • 眨眼频率流(统计值) │ │ │ │ • 车速流(CAN 总线) │ │ │ │ │ │ │ │ 需要同步聚合计算疲劳分数 │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ 挑战: │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ • 不同流有不同的帧率和延迟 │ │ │ │ • 时间戳可能不完全对齐 │ │ │ │ • 某些流可能丢失数据 │ │ │ │ • 需要处理历史数据(时序分析) │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ 解决方案:多流同步 Calculator │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ • 时间戳同步:按时间戳匹配数据 │ │ │ │ • 滑动窗口:缓存历史数据 │ │ │ │ • 加权融合:多指标聚合 │ │ │ │ • 容错处理:缺失数据处理 │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘
17.2 同步场景分类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 ┌─────────────────────────────────────────────────────────────┐ │ 同步场景分类 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1 . 时间戳同步 │ │ ┌─────────────────────────────────────────────┐ │ │ │ Stream A: ──P1 ──P2 ──P3 ──P4 ──▶ │ │ │ │ │ │ │ │ │ │ │ │ t=0 t=1 t=2 t=3 │ │ │ │ │ │ │ │ Stream B: ──P1 ──P2 ──P3 ──P4 ──▶ │ │ │ │ │ │ │ │ │ │ │ │ t=0 t=1 t=2 t=3 │ │ │ │ │ │ │ │ 同步输出: ──(A1 ,B1)──(A2 ,B2)──▶ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ 2 . 屏障同步 │ │ ┌─────────────────────────────────────────────┐ │ │ │ 等待所有输入到达后才输出 │ │ │ │ │ │ │ │ Stream A: ──P1 ──────P2 ──▶ │ │ │ │ Stream B: ────P1 ──P2 ───▶ │ │ │ │ Stream C: ─P1 ──────P2 ───▶ │ │ │ │ │ │ │ │ 同步输出: ────(A1 ,B1,C1 )────(A2 ,B2,C2 )─▶ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ 3 . 异步处理 │ │ ┌─────────────────────────────────────────────┐ │ │ │ 任一输入到达就处理 │ │ │ │ │ │ │ │ Stream A: ──P1 ──P2 ──P3 ──▶ │ │ │ │ Stream B: ────P1 ────P2 ──▶ │ │ │ │ │ │ │ │ 输出: ──A1 ──B1──A2 ──A3 ──B2──▶ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ 4 . 滑动窗口 │ │ ┌─────────────────────────────────────────────┐ │ │ │ 缓存历史数据用于时序分析 │ │ │ │ │ │ │ │ 输入: ──P1 ──P2 ──P3 ──P4 ──P5 ──▶ │ │ │ │ │ │ │ │ 窗口: [P1 ] │ │ │ │ [P1 ,P2 ] │ │ │ │ [P1 ,P2 ,P3 ] │ │ │ │ [P2 ,P3 ,P4 ] │ │ │ │ [P3 ,P4 ,P5 ] │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
18.1 时间戳系统 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 #include "mediapipe/framework/timestamp.h" namespace mediapipe {class Timestamp { public : explicit Timestamp (int64 value) ; static Timestamp Unset () ; static Timestamp Unstarted () ; static Timestamp PreStream () ; static Timestamp Min () ; static Timestamp Max () ; static Timestamp PostStream () ; static Timestamp OneOverPost () ; static Timestamp Done () ; int64 Value () const ; bool operator <(const Timestamp& other) const ; bool operator <=(const Timestamp& other) const ; bool operator ==(const Timestamp& other) const ; int64 Microseconds () const ; static constexpr int64 kTimestampUnset = -2 ; static constexpr int64 kTimestampUnstarted = -1 ; static constexpr int64 kTimestampPreStream = -1 ; static constexpr int64 kTimestampMin = std::numeric_limits<int64>::min (); static constexpr int64 kTimestampMax = std::numeric_limits<int64>::max (); static constexpr int64 kTimestampPostStream = std::numeric_limits<int64>::max () - 1 ; }; } Timestamp t1 (1000 ) ; const Packet& packet = cc->Inputs ().Tag ("INPUT" ).Value (); Timestamp ts = packet.Timestamp ();auto output_packet = MakePacket <Data>(data).At (Timestamp (1000 ));
18.2 默认同步机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 ┌─────────────────────────────────────────────────────────────┐ │ 默认同步机制 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ MediaPipe 默认使用 SyncSetInputStreamHandler │ │ │ │ 工作原理: │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ │ │ 1 . 输入 Packet 到达 Input Stream │ │ │ │ │ │ │ │ 2 . 检查所有输入流是否有相同时间戳的 Packet │ │ │ │ │ │ │ │ 3 . 如果都有 → 调用 Process() │ │ │ │ 如果没有 → 缓存等待 │ │ │ │ │ │ │ │ 4 . Process() 中可以访问所有输入 │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ 示例: │ │ ┌─────────────────────────────────────────────┐ │ │ │ Stream A: ──P1 (t=0 )──P2 (t=1 )──P3 (t=2 )─▶ │ │ │ │ │ │ │ │ Stream B: ──P1 (t=0 )──────P3 (t=2 )─────▶ │ │ │ │ (P2 丢失) │ │ │ │ │ │ │ │ 调用 Process: │ │ │ │ t=0 : (A1 , B1) ✓ │ │ │ │ t=1 : 等待 B1...(缓存 A1 ) │ │ │ │ t=2 : (A2 等待), (A3 , B3) ✓ │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 # ========== SyncSetInputStreamHandler 配置 ========== # 默认同步策略(按时间戳匹配) node { calculator: "MergeCalculator" input_stream: "A:stream_a" input_stream: "B:stream_b" output_stream: "OUTPUT:merged" input_stream_handler { input_stream_handler: "SyncSetInputStreamHandler" options { [mediapipe.SyncSetInputStreamHandlerOptions.ext] { # 同步集合配置 sync_set { tag_indices: "A" tag_indices: "B" } # 是否允许缺失输入 allow_missing_inputs: false # 缓存大小限制 max_queue_size: 100 } } } } # ========== 多同步集合 ========== # 将输入分组,每组独立同步 node { calculator: "MultiGroupCalculator" input_stream: "A:stream_a" input_stream: "B:stream_b" input_stream: "C:stream_c" input_stream: "D:stream_d" output_stream: "OUTPUT:output" input_stream_handler { input_stream_handler: "SyncSetInputStreamHandler" options { [mediapipe.SyncSetInputStreamHandlerOptions.ext] { # 集合 1 :A 和 B 同步 sync_set { tag_indices: "A" tag_indices: "B" } # 集合 2 :C 和 D 同步 sync_set { tag_indices: "C" tag_indices: "D" } } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # ========== BarrierInputStreamHandler 配置 ========== # 屏障同步:等待所有输入到达 node { calculator: "AggregateCalculator" input_stream: "A:stream_a" input_stream: "B:stream_b" input_stream: "C:stream_c" output_stream: "OUTPUT:aggregated" input_stream_handler { input_stream_handler: "BarrierInputStreamHandler" } } # 工作原理: # Stream A: ──P1──────P2──▶ # Stream B: ────P1──P2───▶ # Stream C: ─P1──────P2───▶ # # 输出: ────(A1,B1,C1)────(A2,B2,C2)─▶ # 必须等所有流都有数据才输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # ========== AsyncInputStreamHandler 配置 ========== # 异步处理:任一输入到达就处理 node { calculator: "AsyncCalculator" input_stream: "A:stream_a" input_stream: "B:stream_b" output_stream: "OUTPUT:output" input_stream_handler { input_stream_handler: "AsyncInputStreamHandler" } } # 工作原理: # Stream A: ──P1──P2──P3──▶ # Stream B: ────P1────P2──▶ # # 输出: ──A1──B1──A2──A3──B2──▶ # 每个输入到达立即处理
1 2 3 4 5 6 7 8 9 10 11 12 13 # ========== ImmediateInputStreamHandler 配置 ========== # 立即处理:不缓存,立即触发 node { calculator: "ImmediateCalculator" input_stream: "INPUT:input" output_stream: "OUTPUT:output" input_stream_handler { input_stream_handler: "ImmediateInputStreamHandler" } } # 适用于单输入、实时性要求高的场景
二十、滑动窗口聚合 Calculator 20.1 基本实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 #ifndef MEDIAPIPE_CALCULATORS_CORE_SLIDING_WINDOW_CALCULATOR_H_ #define MEDIAPIPE_CALCULATORS_CORE_SLIDING_WINDOW_CALCULATOR_H_ #include "mediapipe/framework/calculator_framework.h" #include <deque> namespace mediapipe {template <typename T>class SlidingWindowCalculator : public CalculatorBase { public : static absl::Status GetContract (CalculatorContract* cc) { cc->Inputs ().Index (0 ).Set <T>(); cc->Outputs ().Index (0 ).Set<std::deque<T>>(); cc->Options <SlidingWindowOptions>(); return absl::OkStatus (); } absl::Status Open (CalculatorContext* cc) override { const auto & options = cc->Options <SlidingWindowOptions>(); window_size_ = options.window_size (); emit_only_if_full_ = options.emit_only_if_full (); LOG (INFO) << "SlidingWindowCalculator initialized: " << "window_size=" << window_size_; return absl::OkStatus (); } absl::Status Process (CalculatorContext* cc) override { if (cc->Inputs ().Index (0 ).IsEmpty ()) { return absl::OkStatus (); } const T& data = cc->Inputs ().Index (0 ).Get <T>(); buffer_.push_back (data); while (buffer_.size () > window_size_) { buffer_.pop_front (); } if (emit_only_if_full_ && buffer_.size () < window_size_) { return absl::OkStatus (); } cc->Outputs ().Index (0 ).AddPacket ( MakePacket<std::deque<T>>(buffer_).At (cc->InputTimestamp ())); return absl::OkStatus (); } private : std::deque<T> buffer_; int window_size_ = 10 ; bool emit_only_if_full_ = false ; };REGISTER_CALCULATOR (SlidingWindowCalculator);extern template class SlidingWindowCalculator <float >;extern template class SlidingWindowCalculator <int >;extern template class SlidingWindowCalculator <Detection>; } #endif
20.2 Graph 配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # ========== 滑动窗口 Graph 配置 ========== # 历史数据缓存 node { calculator: "SlidingWindowCalculator<float>" input_stream: "ear_values" output_stream: "ear_history" options { [mediapipe.SlidingWindowOptions.ext] { window_size: 30 # 30 帧(1 秒) emit_only_if_full: false } } } # 检测历史缓存 node { calculator: "SlidingWindowCalculator<Detection>" input_stream: "face_detections" output_stream: "detection_history" options { [mediapipe.SlidingWindowOptions.ext] { window_size: 10 emit_only_if_full: true } } }
二十一、多流融合 Calculator 21.1 加权融合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 #ifndef MEDIAPIPE_CALCULATORS_FUSION_MULTI_STREAM_FUSION_CALCULATOR_H_ #define MEDIAPIPE_CALCULATORS_FUSION_MULTI_STREAM_FUSION_CALCULATOR_H_ #include "mediapipe/framework/calculator_framework.h" namespace mediapipe {class MultiStreamFusionCalculator : public CalculatorBase { public : static absl::Status GetContract (CalculatorContract* cc) { for (int i = 0 ; i < cc->Inputs ().NumEntries (); ++i) { cc->Inputs ().Index (i).Set <float >(); } cc->Outputs ().Index (0 ).Set <float >(); cc->Options <MultiStreamFusionOptions>(); return absl::OkStatus (); } absl::Status Open (CalculatorContext* cc) override { const auto & options = cc->Options <MultiStreamFusionOptions>(); fusion_method_ = options.fusion_method (); threshold_ = options.threshold (); for (const auto & w : options.weights ()) { weights_.push_back (w); } if (weights_.empty ()) { int num_inputs = cc->Inputs ().NumEntries (); float uniform_weight = 1.0f / num_inputs; for (int i = 0 ; i < num_inputs; ++i) { weights_.push_back (uniform_weight); } } LOG (INFO) << "MultiStreamFusionCalculator initialized: " << "method=" << fusion_method_ << ", weights=" << weights_.size (); return absl::OkStatus (); } absl::Status Process (CalculatorContext* cc) override { std::vector<float > inputs; std::vector<float > valid_weights; for (int i = 0 ; i < cc->Inputs ().NumEntries (); ++i) { if (!cc->Inputs ().Index (i).IsEmpty ()) { inputs.push_back (cc->Inputs ().Index (i).Get <float >()); if (i < weights_.size ()) { valid_weights.push_back (weights_[i]); } else { valid_weights.push_back (1.0f ); } } } if (inputs.empty ()) { return absl::OkStatus (); } float fused_value = 0.0f ; switch (fusion_method_) { case MultiStreamFusionOptions::WEIGHTED_AVERAGE: fused_value = WeightedAverage (inputs, valid_weights); break ; case MultiStreamFusionOptions::MAX: fused_value = *std::max_element (inputs.begin (), inputs.end ()); break ; case MultiStreamFusionOptions::MIN: fused_value = *std::min_element (inputs.begin (), inputs.end ()); break ; case MultiStreamFusionOptions::SUM: fused_value = std::accumulate (inputs.begin (), inputs.end (), 0.0f ); break ; case MultiStreamFusionOptions::PRODUCT: fused_value = 1.0f ; for (float v : inputs) { fused_value *= v; } break ; } cc->Outputs ().Index (0 ).AddPacket ( MakePacket <float >(fused_value).At (cc->InputTimestamp ())); return absl::OkStatus (); } private : MultiStreamFusionOptions::FusionMethod fusion_method_ = MultiStreamFusionOptions::WEIGHTED_AVERAGE; std::vector<float > weights_; float threshold_ = 0.5f ; float WeightedAverage (const std::vector<float >& values, const std::vector<float >& weights) { float sum = 0.0f ; float weight_sum = 0.0f ; for (size_t i = 0 ; i < values.size (); ++i) { sum += values[i] * weights[i]; weight_sum += weights[i]; } return weight_sum > 0 ? sum / weight_sum : 0.0f ; } };REGISTER_CALCULATOR (MultiStreamFusionCalculator); } #endif
二十二、IMS 实战:疲劳检测聚合 22.1 完整实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 #ifndef MEDIAPIPE_CALCULATORS_IMS_FATIGUE_AGGREGATOR_CALCULATOR_H_ #define MEDIAPIPE_CALCULATORS_IMS_FATIGUE_AGGREGATOR_CALCULATOR_H_ #include "mediapipe/framework/calculator_framework.h" #include <deque> namespace mediapipe {class FatigueAggregatorCalculator : public CalculatorBase { public : static absl::Status GetContract (CalculatorContract* cc) { cc->Inputs ().Tag ("EYE_STATE" ).Set <EyeState>(); cc->Inputs ().Tag ("YAWN_STATE" ).Set <YawnState>(); cc->Inputs ().Tag ("HEAD_POSE" ).Set <HeadPose>(); cc->Inputs ().Tag ("BLINK_RATE" ).Set <float >(); cc->Outputs ().Tag ("FATIGUE_SCORE" ).Set <float >(); cc->Outputs ().Tag ("ALERT" ).Set <bool >(); cc->Outputs ().Tag ("DETAILS" ).Set <FatigueDetails>(); cc->Options <FatigueAggregatorOptions>(); return absl::OkStatus (); } absl::Status Open (CalculatorContext* cc) override { const auto & options = cc->Options <FatigueAggregatorOptions>(); eye_weight_ = options.eye_weight (); yawn_weight_ = options.yawn_weight (); pose_weight_ = options.pose_weight (); blink_weight_ = options.blink_weight (); fatigue_threshold_ = options.fatigue_threshold (); history_size_ = options.history_size (); ear_threshold_ = options.ear_threshold (); fatigue_history_.resize (history_size_, 0.0f ); ear_history_.resize (options.perclos_window (), 0.0f ); LOG (INFO) << "FatigueAggregatorCalculator initialized: " << "weights=(" << eye_weight_ << "," << yawn_weight_ << "," << pose_weight_ << "," << blink_weight_ << ")" << ", threshold=" << fatigue_threshold_; return absl::OkStatus (); } absl::Status Process (CalculatorContext* cc) override { float eye_score = ComputeEyeScore (cc); float yawn_score = ComputeYawnScore (cc); float pose_score = ComputePoseScore (cc); float blink_score = ComputeBlinkScore (cc); float raw_fatigue = eye_weight_ * eye_score + yawn_weight_ * yawn_score + pose_weight_ * pose_score + blink_weight_ * blink_score; raw_fatigue = std::max (0.0f , std::min (1.0f , raw_fatigue)); fatigue_history_.push_back (raw_fatigue); fatigue_history_.pop_front (); float smoothed = SmoothScore (fatigue_history_); bool is_fatigued = smoothed > fatigue_threshold_; FatigueDetails details; details.set_eye_score (eye_score); details.set_yawn_score (yawn_score); details.set_pose_score (pose_score); details.set_blink_score (blink_score); details.set_raw_fatigue (raw_fatigue); details.set_smoothed_fatigue (smoothed); details.set_is_fatigued (is_fatigued); float perclos = ComputePERCLOS (); details.set_perclos (perclos); cc->Outputs ().Tag ("FATIGUE_SCORE" ).AddPacket ( MakePacket <float >(smoothed).At (cc->InputTimestamp ())); cc->Outputs ().Tag ("ALERT" ).AddPacket ( MakePacket <bool >(is_fatigued).At (cc->InputTimestamp ())); cc->Outputs ().Tag ("DETAILS" ).AddPacket ( MakePacket <FatigueDetails>(details).At (cc->InputTimestamp ())); LOG (INFO) << "Fatigue score: " << smoothed << ", alert=" << is_fatigued << ", PERCLOS=" << perclos; return absl::OkStatus (); } private : float eye_weight_ = 0.4f ; float yawn_weight_ = 0.2f ; float pose_weight_ = 0.2f ; float blink_weight_ = 0.2f ; float fatigue_threshold_ = 0.7f ; int history_size_ = 30 ; float ear_threshold_ = 0.2f ; std::deque<float > fatigue_history_; std::deque<float > ear_history_; float ComputeEyeScore (CalculatorContext* cc) { if (cc->Inputs ().Tag ("EYE_STATE" ).IsEmpty ()) { return 0.0f ; } const EyeState& eye = cc->Inputs ().Tag ("EYE_STATE" ).Get <EyeState>(); float ear_avg = (eye.ear_left () + eye.ear_right ()) / 2.0f ; ear_history_.push_back (ear_avg); ear_history_.pop_front (); if (ear_avg < ear_threshold_) { return 1.0f ; } else { return 0.0f ; } } float ComputeYawnScore (CalculatorContext* cc) { if (cc->Inputs ().Tag ("YAWN_STATE" ).IsEmpty ()) { return 0.0f ; } const YawnState& yawn = cc->Inputs ().Tag ("YAWN_STATE" ).Get <YawnState>(); return yawn.is_yawning () ? 1.0f : 0.0f ; } float ComputePoseScore (CalculatorContext* cc) { if (cc->Inputs ().Tag ("HEAD_POSE" ).IsEmpty ()) { return 0.0f ; } const HeadPose& pose = cc->Inputs ().Tag ("HEAD_POSE" ).Get <HeadPose>(); float pitch_score = std::abs (pose.pitch ()) > 30.0f ? 1.0f : 0.0f ; float yaw_score = std::abs (pose.yaw ()) > 45.0f ? 1.0f : 0.0f ; return std::max (pitch_score, yaw_score); } float ComputeBlinkScore (CalculatorContext* cc) { if (cc->Inputs ().Tag ("BLINK_RATE" ).IsEmpty ()) { return 0.0f ; } float blink_rate = cc->Inputs ().Tag ("BLINK_RATE" ).Get <float >(); if (blink_rate > 30.0f || blink_rate < 5.0f ) { return 1.0f ; } else { return 0.0f ; } } float SmoothScore (const std::deque<float >& history) { if (history.empty ()) return 0.0f ; float alpha = 0.1f ; float smoothed = history.front (); for (size_t i = 1 ; i < history.size (); ++i) { smoothed = alpha * history[i] + (1.0f - alpha) * smoothed; } return smoothed; } float ComputePERCLOS () { if (ear_history_.empty ()) return 0.0f ; int closed_count = 0 ; for (float ear : ear_history_) { if (ear < ear_threshold_) { closed_count++; } } return static_cast <float >(closed_count) / ear_history_.size (); } };REGISTER_CALCULATOR (FatigueAggregatorCalculator); } #endif
22.2 Graph 配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 # ims_fatigue_detection_graph.pbtxt input_stream: "IR_IMAGE:ir_image" output_stream: "FATIGUE_SCORE:fatigue_score" output_stream: "ALERT:alert" # ========== 眼睛状态检测 ========== node { calculator: "EyeStateCalculator" input_stream: "IMAGE:ir_image" output_stream: "EYE_STATE:eye_state" } # ========== 打哈欠检测 ========== node { calculator: "YawnDetectionCalculator" input_stream: "IMAGE:ir_image" output_stream: "YAWN_STATE:yawn_state" } # ========== 头部姿态估计 ========== node { calculator: "HeadPoseCalculator" input_stream: "IMAGE:ir_image" output_stream: "HEAD_POSE:head_pose" } # ========== 眨眼频率计算 ========== node { calculator: "BlinkRateCalculator" input_stream: "EYE_STATE:eye_state" output_stream: "BLINK_RATE:blink_rate" } # ========== 疲劳指标聚合 ========== node { calculator: "FatigueAggregatorCalculator" input_stream: "EYE_STATE:eye_state" input_stream: "YAWN_STATE:yawn_state" input_stream: "HEAD_POSE:head_pose" input_stream: "BLINK_RATE:blink_rate" output_stream: "FATIGUE_SCORE:fatigue_score" output_stream: "ALERT:alert" output_stream: "DETAILS:details" input_stream_handler { input_stream_handler: "SyncSetInputStreamHandler" } options { [mediapipe.FatigueAggregatorOptions.ext] { eye_weight: 0.4 yawn_weight: 0.2 pose_weight: 0.2 blink_weight: 0.2 fatigue_threshold: 0.7 history_size: 30 ear_threshold: 0.2 perclos_window: 60 } } }
二十三、总结
策略
Handler
说明
适用场景
时间戳同步
SyncSet
按时间戳匹配
多输入融合
屏障同步
Barrier
等待所有输入
聚合计算
异步处理
Async
立即处理
实时响应
立即处理
Immediate
不缓存
单输入高实时
下篇预告 MediaPipe 系列 18:滑动窗口 Calculator——时序数据处理
深入讲解时序数据处理、状态机实现、事件检测。
参考资料
Google AI Edge. MediaPipe Input Stream Handlers
Google AI Edge. SyncSet Handler
Google AI Edge. Timestamp System
系列进度: 17/55更新时间: 2026-03-12