前言:Port 机制的核心地位 8.1 Port 在数据流中的作用 Port 是 Calculator 与外部数据流的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 ┌─────────────────────────────────────────────────────────────────────────┐ │ Port 在数据流中的作用 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 问题:Calculator 如何与外部数据流交互? │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ 挑战: │ │ │ │ │ │ │ │ • 如何接收输入数据? │ │ │ │ • 如何发送输出数据? │ │ │ │ • 如何处理多个输入输出? │ │ │ │ • 如何区分不同类型的数据? │ │ │ │ • 如何处理可选输入? │ │ │ │ • 如何动态调整端口? │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ 解决方案:Port 机制 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Port = 端口 = Calculator 的输入输出接口 │ │ │ │ │ │ │ │ • Input Port :接收 Stream 数据 │ │ │ │ • Output Port :发送 Stream 数据 │ │ │ │ • Side Input Port :接收 Side Packet │ │ │ │ • Side Output Port :发送 Side Packet │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ 数据流动示意图: │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Stream A ──────┐ │ │ │ │ │ │ │ │ │ Stream B ──────┼──▶ [Calculator] ──▶ Stream D │ │ │ │ │ │ │ │ │ │ Stream C ──────┘ └──▶ Stream E │ │ │ │ │ │ │ │ 输入 Port : A, B, C │ │ │ │ 输出 Port : D, E │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘
8.2 Port 的核心特性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 ┌─────────────────────────────────────────────────────────────┐ │ Port 核心特性 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1 . 类型安全 │ │ • 每个端口有明确的类型 │ │ • 编译时检查类型匹配 │ │ • 避免运行时类型错误 │ │ │ │ 2 . 灵活命名 │ │ • Tag 方式:可读性好(推荐) │ │ • Index 方式:简洁快速 │ │ • 混合方式:灵活组合 │ │ │ │ 3 . 可选性 │ │ • 支持可选输入输出 │ │ • 自动处理缺失数据 │ │ • 提供默认值机制 │ │ │ │ 4 . 动态性 │ │ • 支持动态端口数量 │ │ • 运行时决定端口类型 │ │ • 适应不同输入场景 │ │ │ └─────────────────────────────────────────────────────────────┘
九、Port 类型详解 9.1 Port 类型分类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 ┌─────────────────────────────────────────────────────────────┐ │ Port 类型分类 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Input Port(输入端口) │ │ │ │ │ │ │ │ • 接收 Stream 数据 │ │ │ │ • 支持时间戳同步 │ │ │ │ • 可以有多个输入 │ │ │ │ │ │ │ │ 访问方式: │ │ │ │ cc->Inputs ().Tag ("NAME") │ │ │ │ cc->Inputs ().Index (0 ) │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Output Port(输出端口) │ │ │ │ │ │ │ │ • 发送 Stream 数据 │ │ │ │ • 支持多路输出 │ │ │ │ • 可以有多个输出 │ │ │ │ │ │ │ │ 访问方式: │ │ │ │ cc->Outputs ().Tag ("NAME") │ │ │ │ cc->Outputs ().Index (0 ) │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Side Input Port(静态输入端口) │ │ │ │ │ │ │ │ • 接收 Side Packet │ │ │ │ • 无时间戳 │ │ │ │ • Graph 生命周期内不变 │ │ │ │ │ │ │ │ 访问方式: │ │ │ │ cc->InputSidePackets ().Tag ("NAME") │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Side Output Port(静态输出端口) │ │ │ │ │ │ │ │ • 发送 Side Packet │ │ │ │ • 无时间戳 │ │ │ │ • 用于传递状态 │ │ │ │ │ │ │ │ 访问方式: │ │ │ │ cc->OutputSidePackets ().Tag ("NAME") │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
9.2 Port 类型系统 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class PortBase { public : template <typename T> void Set () ; void SetAny () ; void SetSameAs (const PortBase* other) ; void Optional () ; const TypeInfo& GetType () const ; bool IsEmpty () const ; bool IsOptional () const ; };class InputPort : public PortBase { public : template <typename T> const T& Get () const ; const Packet& Value () const ; bool IsEmpty () const ; Timestamp Timestamp () const ; };class OutputPort : public PortBase { public : void AddPacket (const Packet& packet) ; template <typename T> void Add (const T& value, Timestamp timestamp) ; void SetHeader (const Packet& header) ; };
十、Tag 方式详解(推荐) 10.1 Tag 方式的优势 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 ┌─────────────────────────────────────────────────────────────┐ │ Tag 方式的优势 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1 . 可读性 │ │ • 端口名称清晰明了 │ │ • 代码更易理解 │ │ • 维护更方便 │ │ │ │ 2 . 灵活性 │ │ • 可以添加/删除端口而不影响其他端口 │ │ • 端口顺序无关 │ │ • 更容易重构 │ │ │ │ 3 . 错误检测 │ │ • 编译时检查 Tag 是否存在 │ │ • 运行时检查类型匹配 │ │ • 更早发现问题 │ │ │ │ 示例: │ │ ┌─────────────────────────────────────────────┐ │ │ │ cc- >Inputs().Tag ("IMAGE" ).Set<ImageFrame> (); │ │ │ │ cc->Inputs().Tag ("ROI" ).Set<Rect> (); │ │ │ │ │ │ │ │ 清晰明了:IMAGE 对应图像,ROI 对应区域 │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
10.2 Tag 命名规范 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 cc->Inputs ().Tag ("IMAGE" ).Set <ImageFrame>(); cc->Inputs ().Tag ("DETECTION_RESULT" ).Set <Detection>(); cc->Inputs ().Tag ("IMAGE" ).Set <ImageFrame>(); cc->Inputs ().Tag ("IMG" ).Set <ImageFrame>(); cc->Inputs ().Tag ("LEFT_EYE" ).Set <Landmark>(); cc->Inputs ().Tag ("RIGHT_EYE" ).Set <Landmark>(); cc->Inputs ().Tag ("IMAGE" ).Set <ImageFrame>(); cc->Outputs ().Tag ("IMAGE" ).Set <ImageFrame>(); cc->Inputs ().Tag ("IMAGE" ).Set <ImageFrame>(); node { calculator: "MyCalculator" input_stream: "IMAGE:video_stream" # Tag: IMAGE, Stream: video_stream output_stream: "OUTPUT:result" # Tag: OUTPUT, Stream: result }
10.3 Tag 方式完整示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 #ifndef MY_CALCULATOR_H_ #define MY_CALCULATOR_H_ #include "mediapipe/framework/calculator_framework.h" #include "mediapipe/framework/formats/image_frame.h" namespace mediapipe {class MyCalculator : public CalculatorBase { public : static absl::Status GetContract (CalculatorContract* cc) { cc->Inputs ().Tag ("IMAGE" ).Set <ImageFrame>(); cc->Inputs ().Tag ("ROI" ).Set <Rect>(); cc->Inputs ().Tag ("CONFIG" ).Set <Config>(); cc->Inputs ().Tag ("DEBUG" ).Set <DebugInfo>().Optional (); cc->Outputs ().Tag ("DETECTIONS" ).Set<std::vector<Detection>>(); cc->Outputs ().Tag ("LANDMARKS" ).Set<std::vector<Landmark>>(); cc->Outputs ().Tag ("SCORES" ).Set<std::vector<float >>(); cc->Outputs ().Tag ("DEBUG_IMAGE" ).Set <ImageFrame>().Optional (); cc->InputSidePackets ().Tag ("MODEL_PATH" ).Set <std::string>(); cc->Options <MyCalculatorOptions>(); return absl::OkStatus (); } absl::Status Open (CalculatorContext* cc) override { const auto & options = cc->Options <MyCalculatorOptions>(); threshold_ = options.threshold (); model_path_ = cc->InputSidePackets ().Tag ("MODEL_PATH" ).Get <std::string>(); MP_RETURN_IF_ERROR (LoadModel (model_path_)); return absl::OkStatus (); } absl::Status Process (CalculatorContext* cc) override { if (cc->Inputs ().Tag ("IMAGE" ).IsEmpty ()) { return absl::OkStatus (); } const ImageFrame& image = cc->Inputs ().Tag ("IMAGE" ).Get <ImageFrame>(); Rect roi; if (!cc->Inputs ().Tag ("ROI" ).IsEmpty ()) { roi = cc->Inputs ().Tag ("ROI" ).Get <Rect>(); } else { roi = Rect (0 , 0 , image.Width (), image.Height ()); } Config config; if (!cc->Inputs ().Tag ("CONFIG" ).IsEmpty ()) { config = cc->Inputs ().Tag ("CONFIG" ).Get <Config>(); } if (!cc->Inputs ().Tag ("DEBUG" ).IsEmpty ()) { const auto & debug = cc->Inputs ().Tag ("DEBUG" ).Get <DebugInfo>(); } auto [detections, landmarks, scores] = ProcessImage (image, roi, config); cc->Outputs ().Tag ("DETECTIONS" ).AddPacket ( MakePacket<std::vector<Detection>>(detections).At (cc->InputTimestamp ())); cc->Outputs ().Tag ("LANDMARKS" ).AddPacket ( MakePacket<std::vector<Landmark>>(landmarks).At (cc->InputTimestamp ())); cc->Outputs ().Tag ("SCORES" ).AddPacket ( MakePacket<std::vector<float >>(scores).At (cc->InputTimestamp ())); if (cc->Outputs ().HasTag ("DEBUG_IMAGE" ) && enable_debug_) { ImageFrame debug_image = CreateDebugImage (image, detections); cc->Outputs ().Tag ("DEBUG_IMAGE" ).AddPacket ( MakePacket <ImageFrame>(debug_image).At (cc->InputTimestamp ())); } return absl::OkStatus (); } private : float threshold_; std::string model_path_; bool enable_debug_ = false ; absl::Status LoadModel (const std::string& path) ; std::tuple<std::vector<Detection>, std::vector<Landmark>, std::vector<float >> ProcessImage (const ImageFrame& image, const Rect& roi, const Config& config); };REGISTER_CALCULATOR (MyCalculator); } #endif
Graph 配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 # my_calculator_graph.pbtxt # 输入输出定义 input_stream: "VIDEO:video_stream" input_stream: "ROI:roi_stream" input_stream: "CONFIG:config_stream" output_stream: "DETECTIONS:detections" output_stream: "LANDMARKS:landmarks" # Side Packet input_side_packet: "MODEL_PATH:model_path" # Calculator 节点 node { calculator: "MyCalculator" input_stream: "IMAGE:video_stream" input_stream: "ROI:roi_stream" input_stream: "CONFIG:config_stream" input_side_packet: "MODEL_PATH:model_path" output_stream: "DETECTIONS:detections" output_stream: "LANDMARKS:landmarks" output_stream: "SCORES:scores" options { [mediapipe.MyCalculatorOptions.ext] { threshold: 0.5 } } }
十一、Index 方式详解 11.1 Index 方式的特点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ┌─────────────────────────────────────────────────────────────┐ │ Index 方式的特点 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 优点: │ │ • 简洁快速 │ │ • 无需定义 Tag │ │ • 适用于端口数量固定且少的场景 │ │ │ │ 缺点: │ │ • 可读性差 │ │ • 顺序敏感(添加/删除端口影响索引) │ │ • 容易出错(索引混淆) │ │ │ │ 适用场景: │ │ • 端口数量少(1-2 个) │ │ • 端口顺序固定 │ │ • 快速原型开发 │ │ │ └─────────────────────────────────────────────────────────────┘
11.2 Index 方式示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 static absl::Status GetContract (CalculatorContract* cc) { cc->Inputs ().Index (0 ).Set <ImageFrame>(); cc->Inputs ().Index (1 ).Set <Rect>(); cc->Inputs ().Index (2 ).Set <Config>(); cc->Outputs ().Index (0 ).Set<std::vector<Detection>>(); cc->Outputs ().Index (1 ).Set<std::vector<Landmark>>(); return absl::OkStatus (); }absl::Status Process (CalculatorContext* cc) override { if (cc->Inputs ().Index (0 ).IsEmpty ()) { return absl::OkStatus (); } const ImageFrame& image = cc->Inputs ().Index (0 ).Get <ImageFrame>(); const Rect& roi = cc->Inputs ().Index (1 ).Get <Rect>(); const Config& config = cc->Inputs ().Index (2 ).Get <Config>(); auto [detections, landmarks] = ProcessImage (image, roi, config); cc->Outputs ().Index (0 ).AddPacket ( MakePacket<std::vector<Detection>>(detections).At (cc->InputTimestamp ())); cc->Outputs ().Index (1 ).AddPacket ( MakePacket<std::vector<Landmark>>(landmarks).At (cc->InputTimestamp ())); return absl::OkStatus (); }
Graph 配置:
1 2 3 4 5 6 7 8 9 # Index 方式 Graph 配置 node { calculator: "MyCalculator" input_stream: "video_stream" # Index 0 input_stream: "roi_stream" # Index 1 input_stream: "config_stream" # Index 2 output_stream: "detections" # Index 0 output_stream: "landmarks" # Index 1 }
十二、混合方式 12.1 Tag + Index 混合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 static absl::Status GetContract (CalculatorContract* cc) { cc->Inputs ().Tag ("IMAGE" ).Set <ImageFrame>(); cc->Inputs ().Index (1 ).Set <Rect>(); cc->Outputs ().Tag ("OUTPUT" ).Set <Result>(); return absl::OkStatus (); }
12.2 可选输入 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 static absl::Status GetContract (CalculatorContract* cc) { cc->Inputs ().Tag ("IMAGE" ).Set <ImageFrame>(); cc->Inputs ().Tag ("ROI" ).Set <Rect>().Optional (); cc->Inputs ().Tag ("CONFIG" ).Set <Config>().Optional (); return absl::OkStatus (); }absl::Status Process (CalculatorContext* cc) override { const ImageFrame& image = cc->Inputs ().Tag ("IMAGE" ).Get <ImageFrame>(); Rect roi; if (!cc->Inputs ().Tag ("ROI" ).IsEmpty ()) { roi = cc->Inputs ().Tag ("ROI" ).Get <Rect>(); } else { roi = Rect (0 , 0 , image.Width (), image.Height ()); } Config config; if (!cc->Inputs ().Tag ("CONFIG" ).IsEmpty ()) { config = cc->Inputs ().Tag ("CONFIG" ).Get <Config>(); } else { config = GetDefaultConfig (); } return absl::OkStatus (); }
十三、高级特性 13.1 动态端口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 static absl::Status GetContract (CalculatorContract* cc) { const auto & options = cc->Options <MyCalculatorOptions>(); for (int i = 0 ; i < options.num_inputs (); ++i) { cc->Inputs ().Index (i).Set <ImageFrame>(); } for (int i = 0 ; i < options.num_outputs (); ++i) { cc->Outputs ().Index (i).Set <Result>(); } return absl::OkStatus (); }absl::Status Process (CalculatorContext* cc) override { for (int i = 0 ; i < cc->Inputs ().NumEntries (); ++i) { if (!cc->Inputs ().Index (i).IsEmpty ()) { const ImageFrame& image = cc->Inputs ().Index (i).Get <ImageFrame>(); } } return absl::OkStatus (); }
13.2 类型推断 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 static absl::Status GetContract (CalculatorContract* cc) { cc->Inputs ().Tag ("INPUT" ).SetAny (); cc->Outputs ().Tag ("OUTPUT" ).SetSameAs (&cc->Inputs ().Tag ("INPUT" )); return absl::OkStatus (); }absl::Status Process (CalculatorContext* cc) override { const Packet& input = cc->Inputs ().Tag ("INPUT" ).Value (); if (input.ValidateAsType <ImageFrame>().ok ()) { const ImageFrame& image = input.Get <ImageFrame>(); } else if (input.ValidateAsType <AudioFrame>().ok ()) { const AudioFrame& audio = input.Get <AudioFrame>(); } return absl::OkStatus (); }
13.3 输出延迟 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class BufferCalculator : public CalculatorBase { public : static absl::Status GetContract (CalculatorContract* cc) { cc->Inputs ().Tag ("INPUT" ).Set <Data>(); cc->Outputs ().Tag ("OUTPUT" ).Set<std::vector<Data>>(); return absl::OkStatus (); } absl::Status Process (CalculatorContext* cc) override { const Data& data = cc->Inputs ().Tag ("INPUT" ).Get <Data>(); buffer_.push_back (data); if (buffer_.size () >= buffer_size_) { cc->Outputs ().Tag ("OUTPUT" ).AddPacket ( MakePacket<std::vector<Data>>(buffer_).At (cc->InputTimestamp ())); buffer_.clear (); } return absl::OkStatus (); } private : std::vector<Data> buffer_; int buffer_size_ = 10 ; };
十四、实战:多输入融合 Calculator 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 #ifndef MEDIAPIPE_CALCULATORS_FUSION_FUSION_CALCULATOR_H_ #define MEDIAPIPE_CALCULATORS_FUSION_FUSION_CALCULATOR_H_ #include "mediapipe/framework/calculator_framework.h" #include "mediapipe/framework/formats/image_frame.h" namespace mediapipe {class MultiModalFusionCalculator : public CalculatorBase { public : static absl::Status GetContract (CalculatorContract* cc) { cc->Inputs ().Tag ("IMAGE" ).Set <ImageFrame>(); cc->Inputs ().Tag ("AUDIO" ).Set <AudioFrame>(); cc->Inputs ().Tag ("SENSOR" ).Set <SensorData>(); cc->Inputs ().Tag ("CONTEXT" ).Set <Context>().Optional (); cc->Outputs ().Tag ("RESULT" ).Set <FusionResult>(); cc->Outputs ().Tag ("CONFIDENCE" ).Set <float >(); cc->InputSidePackets ().Tag ("CONFIG" ).Set <FusionConfig>(); cc->Options <FusionOptions>(); cc->SetInputStreamHandler ("SyncSetInputStreamHandler" ); return absl::OkStatus (); } absl::Status Open (CalculatorContext* cc) override { const auto & options = cc->Options <FusionOptions>(); fusion_weights_ = {options.image_weight (), options.audio_weight (), options.sensor_weight ()}; const auto & config = cc->InputSidePackets ().Tag ("CONFIG" ).Get <FusionConfig>(); threshold_ = config.threshold (); LOG (INFO) << "MultiModalFusionCalculator initialized" ; return absl::OkStatus (); } absl::Status Process (CalculatorContext* cc) override { if (cc->Inputs ().Tag ("IMAGE" ).IsEmpty () || cc->Inputs ().Tag ("AUDIO" ).IsEmpty () || cc->Inputs ().Tag ("SENSOR" ).IsEmpty ()) { return absl::OkStatus (); } const ImageFrame& image = cc->Inputs ().Tag ("IMAGE" ).Get <ImageFrame>(); const AudioFrame& audio = cc->Inputs ().Tag ("AUDIO" ).Get <AudioFrame>(); const SensorData& sensor = cc->Inputs ().Tag ("SENSOR" ).Get <SensorData>(); Context context; if (!cc->Inputs ().Tag ("CONTEXT" ).IsEmpty ()) { context = cc->Inputs ().Tag ("CONTEXT" ).Get <Context>(); } auto image_features = ExtractImageFeatures (image); auto audio_features = ExtractAudioFeatures (audio); auto sensor_features = ExtractSensorFeatures (sensor); FusionResult result = Fuse ( image_features, audio_features, sensor_features, fusion_weights_, context); float confidence = ComputeConfidence (result); cc->Outputs ().Tag ("RESULT" ).AddPacket ( MakePacket <FusionResult>(result).At (cc->InputTimestamp ())); cc->Outputs ().Tag ("CONFIDENCE" ).AddPacket ( MakePacket <float >(confidence).At (cc->InputTimestamp ())); LOG (INFO) << "Fusion result: confidence=" << confidence; return absl::OkStatus (); } private : std::vector<float > fusion_weights_; float threshold_; std::vector<float > ExtractImageFeatures (const ImageFrame& image) ; std::vector<float > ExtractAudioFeatures (const AudioFrame& audio) ; std::vector<float > ExtractSensorFeatures (const SensorData& sensor) ; FusionResult Fuse (const std::vector<float >& image_features, const std::vector<float >& audio_features, const std::vector<float >& sensor_features, const std::vector<float >& weights, const Context& context) ; float ComputeConfidence (const FusionResult& result) ; };REGISTER_CALCULATOR (MultiModalFusionCalculator); } #endif
Graph 配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 # fusion_graph.pbtxt input_stream: "IMAGE:image_stream" input_stream: "AUDIO:audio_stream" input_stream: "SENSOR:sensor_stream" input_stream: "CONTEXT:context_stream" output_stream: "RESULT:result" output_stream: "CONFIDENCE:confidence" input_side_packet: "CONFIG:fusion_config" node { calculator: "MultiModalFusionCalculator" input_stream: "IMAGE:image_stream" input_stream: "AUDIO:audio_stream" input_stream: "SENSOR:sensor_stream" input_stream: "CONTEXT:context_stream" input_side_packet: "CONFIG:fusion_config" output_stream: "RESULT:result" output_stream: "CONFIDENCE:confidence" input_stream_handler { input_stream_handler: "SyncSetInputStreamHandler" } options { [mediapipe.FusionOptions.ext] { image_weight: 0.5 audio_weight: 0.3 sensor_weight: 0.2 } } }
十五、调试技巧 15.1 检查端口状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 absl::Status Process (CalculatorContext* cc) override { LOG (INFO) << "Input port status:" ; for (int i = 0 ; i < cc->Inputs ().NumEntries (); ++i) { LOG (INFO) << " Index " << i << ": " << (cc->Inputs ().Index (i).IsEmpty () ? "empty" : "has data" ); } if (cc->Inputs ().HasTag ("IMAGE" )) { LOG (INFO) << "IMAGE port exists: " << (cc->Inputs ().Tag ("IMAGE" ).IsEmpty () ? "empty" : "has data" ); } return absl::OkStatus (); }
15.2 类型验证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 absl::Status Process (CalculatorContext* cc) override { const Packet& packet = cc->Inputs ().Tag ("INPUT" ).Value (); LOG (INFO) << "Packet type: " << packet.DebugString (); if (packet.ValidateAsType <ImageFrame>().ok ()) { LOG (INFO) << "Packet is ImageFrame" ; } else if (packet.ValidateAsType <AudioFrame>().ok ()) { LOG (INFO) << "Packet is AudioFrame" ; } else { LOG (WARNING) << "Unknown packet type" ; } return absl::OkStatus (); }
十六、总结
方式
优点
缺点
适用场景
Tag
可读性好、易维护
需要定义 Tag
推荐使用
Index
简洁、无需 Tag
可读性差、顺序敏感
端口少且固定
混合
灵活
可能混淆
特殊需求
推荐: 优先使用 Tag 方式,保持代码清晰易维护。
下篇预告 MediaPipe 系列 09:Calculator Options——参数化配置
深入讲解 Calculator Options 定义、使用、Proto 文件编写。
参考资料
Google AI Edge. MediaPipe Calculator Contract
Google AI Edge. Input/Output Streams
系列进度: 8/55更新时间: 2026-03-12