前言:为什么需要理解 Graph 和 Calculator?
MediaPipe 的本质:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| ┌─────────────────────────────────────────────────────────────┐ │ MediaPipe 核心抽象 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Graph:感知流水线架构 │ │ │ │ │ │ │ │ 定义数据如何流动、如何处理、如何输出 │ │ │ │ │ │ │ │ ┌───────────────────────────────────┐ │ │ │ │ │ │ │ │ │ │ │ node { │ │ │ │ │ │ calculator: "FaceMesh" │ │ │ │ │ │ input_stream: "video" │ │ │ │ │ │ output_stream: "landmarks" │ │ │ │ │ │ } │ │ │ │ │ │ │ │ │ │ │ └───────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌───────────────────────────────────┐ │ │ │ │ │ node { │ │ │ │ │ │ calculator: "GazeEst" │ │ │ │ │ │ input_stream: "landmarks" │ │ │ │ │ │ output_stream: "gaze" │ │ │ │ │ │ } │ │ │ │ │ └───────────────────────────────────┘ │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Calculator:计算单元 │ │ │ │ │ │ │ │ • 输入:Packet 时间序列 │ │ │ │ • 处理:模型推理、数据转换 │ │ │ │ • 输出:Packet 时间序列 │ │ │ │ • 生命周期:Open → Process → Close │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Stream:数据流通道 │ │ │ │ │ │ │ │ • Packet 的有序序列 │ │ │ │ • 自动时间同步 │ │ │ │ • 支持背压(Backpressure) │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Packet:数据包 │ │ │ │ │ │ │ │ • 携带时间戳 │ │ │ │ • 任意类型(图像、矩阵、结构体) │ │ │ │ • 零拷贝传递 │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
|
2.2 Graph 与 Calculator 的关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| ┌─────────────────────────────────────────────────────────────┐ │ Graph 与 Calculator 的关系 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Graph = 配置文件(pbtxt)+ 执行引擎 │ │ Calculator = C++ 代码实现 │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Graph(配置层) │ │ │ │ │ │ │ │ input_stream: "video" │ │ │ │ node { │ │ │ │ calculator: "FaceMesh" │ │ │ │ input_stream: "video" │ │ │ │ output_stream: "landmarks" │ │ │ │ } │ │ │ │ output_stream: "landmarks" │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Calculator(实现层) │ │ │ │ │ │ │ │ class FaceMeshCalculator { │ │ │ │ GetContract() │ │ Open() │ │ Process() │ │ Close() │ │ }; │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ 编译时:Graph 配置 → Calculator 实现 │ │ 运行时:Calculator 实例 → 处理数据 │ │ │ └─────────────────────────────────────────────────────────────┘
|
三、Graph:感知流水线配置
3.1 Graph 概念
Graph 是 MediaPipe 的核心架构概念:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ┌─────────────────────────────────────────────────────────────┐ │ Graph:有向无环图 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Input ──▶ Calculator A ──▶ Calculator B ──▶ Output │ │ │ │ │ └────▶ Calculator C ──▶ Output │ │ │ │ │ └────▶ Calculator D ──▶ Calculator E ──▶ Output │ │ │ 特征: │ │ 1. 有向:数据单向流动 │ │ 2. 无环:不会循环引用 │ │ 3. 模块化:每个 Calculator 是独立单元 │ │ 4. 可组合:任意复杂度 │ │ │ └─────────────────────────────────────────────────────────────┘
|
3.2 pbtxt 语法详解
Graph 配置文件使用 Protobuf 文本格式(.pbtxt):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| # ========== 基本语法 ========== # 注释:# 或 # 空行:忽略 # 缩进:2 空格或 4 空格
# ========== 输入流定义 ========== input_stream: "input_video" input_stream: "input_audio" input_stream: "input_sensor"
# ========== 输出流定义 ========== output_stream: "output_video" output_stream: "output_detections" output_stream: "output_audio"
# ========== 静态输入(Side Packet)========== input_side_packet: "model_path" input_side_packet: "config_file" input_side_packet: "lookup_table"
# ========== Calculator 节点 ========== node { calculator: "FlowLimiterCalculator" input_stream: "input_video" input_stream: "detections" output_stream: "throttled_video" options { [mediapipe.FlowLimiterCalculatorOptions.ext] { max_in_flight: 1 max_in_queue: 1 } } }
# ========== 线程池配置(可选)========== executor { name: "gpu_executor" type: "ThreadPool" num_threads: 4 }
# ========== 延迟配置(可选)========== delay_input_packets { seconds: 0.1 }
|
3.3 pbtxt 语法示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| # ========== 输入输出 ========== input_stream: "IMAGE:input_frame" # 格式:input_stream: "TAG:name" input_stream: "DETECTIONS:faces" output_stream: "IMAGE:output_frame" output_stream: "DETECTIONS:faces"
# ========== Calculator 节点 ========== node { calculator: "FaceDetectionCalculator" input_stream: "IMAGE:input_frame" input_stream: "DETECTIONS:faces" output_stream: "IMAGE:output_frame" output_stream: "DETECTIONS:faces" options { [mediapipe.FaceDetectionOptions.ext] { model_path: "/models/face_detection.tflite" confidence_threshold: 0.7 max_num_detections: 10 } } }
# ========== Stream 标签 ========== # Tag 用于区分多个输入输出 node { calculator: "MultiInputCalculator" input_stream: "IMAGE:video" # Tag: IMAGE input_stream: "AUDIO:audio" # Tag: AUDIO input_stream: "SENSOR:accel" # Tag: SENSOR output_stream: "OUTPUT:combined" # Tag: OUTPUT }
# ========== Options 扩展 ========== # Calculator 可以扩展 CalculatorOptions node { calculator: "MyCalculator" input_stream: "input" output_stream: "output" options { [mediapipe.MyCalculatorOptions.ext] { param1: "value1" param2: 123 param3: true repeated_param: "a" repeated_param: "b" } } }
|
3.4 常见 Graph 结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| # ========== 简单流水线 ========== input_stream: "input" output_stream: "output"
node { calculator: "PassThroughCalculator" input_stream: "input" output_stream: "output" }
# ========== 条件分支 ========== input_stream: "input" output_stream: "output"
node { calculator: "GateCalculator" input_stream: "input" output_stream: "output" options { [mediapipe.GateCalculatorOptions.ext] { condition: "input.size() > 10" } } }
# ========== 多路聚合 ========== input_stream: "input_a" input_stream: "input_b" input_stream: "input_c" output_stream: "output"
node { calculator: "MergeCalculator" input_stream: "input_a" input_stream: "input_b" input_stream: "input_c" output_stream: "output" }
# ========== 时序处理 ========== input_stream: "input" output_stream: "output"
node { calculator: "SlidingWindowCalculator" input_stream: "input" output_stream: "window_output" options { [mediapipe.SlidingWindowOptions.ext] { window_size: 30 # 30 帧窗口 step_size: 1 # 每帧步进 } } }
|
四、Calculator:计算节点详解
4.1 Calculator 生命周期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| ┌─────────────────────────────────────────────────────────────┐ │ Calculator 生命周期 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Graph 启动 │ │ │ │ │ ▼ │ │ ┌─────────────┐ │ │ │ GetContract │ ← 只调用一次(静态验证) │ │ │ │ 定义输入输出端口类型、数量 │ │ │ 返回值: │ - cc->Inputs().Tag("NAME").Set<T>() │ │ │ absl::OkStatus│ - cc->Outputs().Tag("NAME").Set<T>() │ │ └─────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────┐ │ │ │ Open │ ← 每个实例调用一次(初始化) │ │ │ │ - 读取 Options │ │ │ 返回值: │ - 分配内存、加载模型 │ │ │ absl::OkStatus│ - 初始化资源 │ │ └─────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────┐ │ │ │ Process │ ← 每个输入 Packet 调用一次(处理) │ │ │ │ - 检查输入是否可用 │ │ │ 返回值: │ - 获取输入数据 │ │ │ absl::OkStatus│ - 执行计算逻辑 │ │ │ │ - 输出结果 │ │ │ 调用次数: │ - cc->Inputs().Tag(...).Get<T>() │ │ │ N 次 │ - cc->Outputs().Tag(...).AddPacket(...) │ │ └─────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────┐ │ │ │ Close │ ← Graph 关闭时调用一次(清理) │ │ │ │ - 释放模型、内存 │ │ │ 返回值: │ - 关闭文件、连接 │ │ │ absl::OkStatus│ - 清理资源 │ │ └─────────────┘ │ │ │ │ │ ▼ │ │ Graph 关闭 │ │ │ └─────────────────────────────────────────────────────────────┘
|
4.2 Calculator 接口详解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| class CalculatorBase { public: virtual ~CalculatorBase() = default;
static absl::Status GetContract(CalculatorContract* cc); virtual absl::Status Open(CalculatorContext* cc); virtual absl::Status Process(CalculatorContext* cc) = 0; virtual absl::Status Close(CalculatorContext* cc); };
|
4.3 Calculator 开发完整示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
| #ifndef MEDIAPIPE_CALCULATORS_MY_MY_CALCULATOR_H_ #define MEDIAPIPE_CALCULATORS_MY_MY_CALCULATOR_H_
#include "mediapipe/framework/calculator_framework.h" #include "mediapipe/calculators/my/my_calculator_options.pb.h"
namespace mediapipe {
class MyCalculator : public CalculatorBase { public: static absl::Status GetContract(CalculatorContract* cc) { cc->Inputs().Tag("INPUT").Set<cv::Mat>(); cc->Inputs().Tag("CONFIG").Set<std::string>(); cc->Outputs().Tag("OUTPUT").Set<std::vector<Detection>>(); cc->Outputs().Tag("METRICS").Set<MyMetrics>(); cc->Options<MyCalculatorOptions>(); return absl::OkStatus(); }
absl::Status Open(CalculatorContext* cc) override { const auto& options = cc->Options<MyCalculatorOptions>(); threshold_ = options.threshold(); max_detections_ = options.max_detections(); enable_logging_ = options.enable_logging(); detections_buffer_.resize(max_detections_); cv::namedWindow("Debug Window", cv::WINDOW_NORMAL); LOG(INFO) << "MyCalculator initialized: threshold=" << threshold_ << ", max_detections=" << max_detections_; return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) override { if (cc->Inputs().Tag("INPUT").IsEmpty()) { return absl::OkStatus(); } const cv::Mat& input = cc->Inputs().Tag("INPUT").Get<cv::Mat>(); if (input.empty()) { LOG(WARNING) << "Empty input frame received"; return absl::OkStatus(); } auto start_time = std::chrono::high_resolution_clock::now(); std::vector<Detection> detections = DetectObjects(input); auto end_time = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( end_time - start_time).count(); cc->Outputs().Tag("OUTPUT").AddPacket( MakePacket<std::vector<Detection>>(detections) .At(cc->InputTimestamp())); MyMetrics metrics; metrics.process_time_ms = duration; metrics.detection_count = detections.size(); metrics.input_size = input.size(); cc->Outputs().Tag("METRICS").AddPacket( MakePacket<MyMetrics>(metrics).At(cc->InputTimestamp())); if (enable_logging_) { LOG(INFO) << "Processed frame at timestamp=" << cc->InputTimestamp() << ", detections=" << detections.size() << ", time=" << duration << "ms"; } process_count_++; return absl::OkStatus(); }
absl::Status Close(CalculatorContext* cc) override { cv::destroyWindow("Debug Window"); LOG(INFO) << "MyCalculator closed after processing " << process_count_ << " frames"; return absl::OkStatus(); }
private: float threshold_ = 0.5f; int max_detections_ = 10; bool enable_logging_ = false; int process_count_ = 0; std::vector<Detection> detections_buffer_; std::vector<Detection> DetectObjects(const cv::Mat& frame) { std::vector<Detection> detections; return detections; } };
REGISTER_CALCULATOR(MyCalculator);
}
#endif
|
五、Stream:数据流机制
5.1 Stream 概念
Stream 是连接 Calculator 的数据通道:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ┌─────────────────────────────────────────────────────────────┐ │ Stream:数据流 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Calculator A ──┬── Stream ──▶ Calculator B │ │ │ │ │ ├── Stream ──▶ Calculator C │ │ │ │ │ └── Stream ──▶ Calculator D │ │ │ │ 特征: │ │ 1. 有序:Packet 按时间戳顺序排列 │ │ 2. 同步:自动时间戳对齐 │ │ 3. 背压:处理速度慢时自动阻塞输入 │ │ 4. 零拷贝:共享数据所有权 │ │ │ └─────────────────────────────────────────────────────────────┘
|
5.2 Stream 标签(Tag)详解
使用 Tag 区分多个输入输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| # ========== 多输入示例 ========== node { calculator: "MultiInputCalculator" input_stream: "IMAGE:video" # Tag: IMAGE input_stream: "AUDIO:audio" # Tag: AUDIO input_stream: "SENSOR:accel" # Tag: SENSOR input_stream: "SENSOR:gyro" # Tag: GYRO output_stream: "OUTPUT:combined" # Tag: OUTPUT output_stream: "OUTPUT:separate" # Tag: SEPARATE }
# ========== 多输出示例 ========== node { calculator: "MultiOutputCalculator" input_stream: "input" output_stream: "DETECTIONS:faces" # Tag: DETECTIONS output_stream: "LANDMARKS:landmarks" # Tag: LANDMARKS output_stream: "POSE:pose" # Tag: POSE }
|
5.3 C++ 中访问 Stream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();
const cv::Mat& image = cc->Inputs().Index(0).Get<cv::Mat>();
if (cc->Inputs().Tag("IMAGE").IsEmpty()) { return absl::OkStatus(); }
cc->Outputs().Tag("OUTPUT").AddPacket( MakePacket<std::vector<Detection>>(detections) .At(cc->InputTimestamp()));
cc->Outputs().Index(0).AddPacket( MakePacket<std::vector<Detection>>(detections) .At(cc->InputTimestamp()));
cc->Outputs().Tag("DETECTIONS").AddPacket(detection_packet); cc->Outputs().Tag("CONFIDENCES").AddPacket(confidence_packet); cc->Outputs().Tag("BBOXES").AddPacket(bbox_packet);
|
5.4 Stream 传递示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| class MultiInputCalculator : public CalculatorBase { public: static absl::Status GetContract(CalculatorContract* cc) { cc->Inputs().Tag("IMAGE").Set<cv::Mat>(); cc->Inputs().Tag("AUDIO").Set<std::vector<float>>(); cc->Inputs().Tag("SENSOR").Set<SensorData>(); cc->Outputs().Tag("OUTPUT_A").Set<std::string>(); cc->Outputs().Tag("OUTPUT_B").Set<int>(); return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) override { if (!cc->Inputs().Tag("IMAGE").IsEmpty()) { const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>(); } if (!cc->Inputs().Tag("AUDIO").IsEmpty()) { const auto& audio = cc->Inputs().Tag("AUDIO").Get<std::vector<float>>(); } cc->Outputs().Tag("OUTPUT_A").AddPacket( MakePacket<std::string>("result").At(cc->InputTimestamp())); cc->Outputs().Tag("OUTPUT_B").AddPacket( MakePacket<int>(42).At(cc->InputTimestamp())); return absl::OkStatus(); } };
REGISTER_CALCULATOR(MultiInputCalculator);
|
六、Packet:数据包详解
6.1 Packet 概念
Packet 是 Stream 传输的数据单元:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| ┌─────────────────────────────────────────────────────────────┐ │ Packet:数据包 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Packet 结构 │ │ │ │ │ │ │ │ ┌───────────────────────────────────┐ │ │ │ │ │ Payload (数据负载) │ │ │ │ │ │ • 任意 C++ 类型 │ │ │ │ │ │ • std::shared_ptr 共享所有权 │ │ │ │ │ │ • 零拷贝传递 │ │ │ │ │ └───────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌───────────────────────────────────┐ │ │ │ │ │ Timestamp (时间戳) │ │ │ │ │ │ • 微秒精度(int64_t) │ │ │ │ │ │ • 单调递增 │ │ │ │ │ │ • 用于时间同步 │ │ │ │ │ └───────────────────────────────────┘ │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ 示例: │ │ ┌─────────────────────────────────────────────┐ │ │ │ Packet<std::string> │ │ │ │ • payload: "hello" │ │ │ │ • timestamp: 1000000 (1 秒) │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
|
6.2 Packet 创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| auto packet1 = MakePacket<std::string>("hello").At(Timestamp(1000000));
std::string value = "world"; auto packet2 = MakePacket<std::string>(value).At(cc->InputTimestamp());
auto packet3 = Adopt(new MyData()).At(Timestamp(0));
auto empty_packet = MakePacket<int>().At(Timestamp(0));
|
6.3 Packet 访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
const std::string& data = packet.Get<std::string>();
if (packet.ValidateAsType<std::string>().ok()) { const std::string& data = packet.Get<std::string>(); }
if (!packet.IsEmpty()) { }
Timestamp ts = packet.Timestamp(); int64_t us = ts.Value();
Timestamp ts1 = Timestamp(1000000); Timestamp ts2 = Timestamp(2000000);
if (ts1.IsEarlierThan(ts2)) { LOG(INFO) << "ts1 < ts2"; }
Timestamp ts3 = ts1.Plus(500000); Timestamp ts4 = ts2.Minus(1000000);
|
6.4 Packet 常见操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
cc->Outputs().Tag("DETECTIONS").AddPacket( MakePacket<std::vector<BoundingBox>>(bbox_list) .At(cc->InputTimestamp()));
cc->Outputs().Tag("IMAGE").AddPacket( MakePacket<cv::Mat>(image).At(cc->InputTimestamp()));
cc->Outputs().Tag("METRICS").AddPacket( MakePacket<PerformanceMetrics>(metrics) .At(cc->InputTimestamp()));
cc->Outputs().Tag("DETECTIONS").AddPacket(detection_packet); cc->Outputs().Tag("CONFIDENCES").AddPacket(confidence_packet); cc->Outputs().Tag("BBOXES").AddPacket(bbox_packet);
for (const auto& detection : detections) { cc->Outputs().Tag("DETECTIONS").AddPacket( MakePacket<Detection>(detection).At(cc->InputTimestamp())); }
cc->Outputs().Tag("OUTPUT_A").AddPacket(packet1); cc->Outputs().Tag("OUTPUT_B").AddPacket(packet2);
|
七、Graph 执行流程
7.1 完整执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| ┌─────────────────────────────────────────────────────────────┐ │ Graph 执行流程 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. Graph 初始化 │ │ └─ 解析 pbtxt 配置文件 │ │ └─ 创建 Calculator 实例 │ │ └─ 调用 GetContract() 验证端口 │ │ │ │ 2. Graph 启动 │ │ └─ 调用 Open() 初始化每个 Calculator │ │ └─ 启动 Executor 线程池 │ │ └─ 准备接收输入 Packet │ │ │ │ 3. 接收输入 Packet │ │ └─ AddPacketToInputStream("input", packet) │ │ └─ MediaPipe 自动路由到对应 Calculator │ │ │ │ 4. Calculator 处理 │ │ └─ 调用 Process() │ │ └─ 获取输入 Packet │ │ └─ 执行计算逻辑 │ │ └─ 输出结果 Packet │ │ └─ MediaPipe 自动路由到下游 Calculator │ │ └─ 重复步骤 3-4 │ │ │ │ 5. 关闭输入 Stream │ │ └─ CloseInputStream("input") │ │ └─ 等待所有 Calculator 处理完成 │ │ │ │ 6. 关闭 Graph │ │ └─ 调用 Close() 清理资源 │ │ └─ 释放所有 Calculator 实例 │ │ └─ 停止 Executor 线程池 │ │ │ └─────────────────────────────────────────────────────────────┘
|
7.2 C++ 运行 Graph 完整示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| #include "mediapipe/framework/calculator_framework.h" #include "mediapipe/framework/port/parse_text_proto.h" #include "mediapipe/framework/port/status.h" #include <opencv2/opencv.hpp> #include <iostream>
message MyCalculatorOptions { extend mediapipe.CalculatorOptions { optional MyCalculatorOptions ext = 1000000; } optional float threshold = 1 [default = 0.5]; }
input_stream: "input" output_stream: "output"
node { calculator: "PassThroughCalculator" input_stream: "input" output_stream: "output" }
int main() { mediapipe::CalculatorGraphConfig config = mediapipe::ParseTextProtoOrDie<mediapipe::CalculatorGraphConfig>(R"( input_stream: "input" output_stream: "output" node { calculator: "PassThroughCalculator" input_stream: "input" output_stream: "output" } )");
mediapipe::CalculatorGraph graph; MP_RETURN_IF_ERROR(graph.Initialize(config));
MP_RETURN_IF_ERROR(graph.ObserveOutputStream( "output", [](const mediapipe::Packet& packet) { LOG(INFO) << "Received output at timestamp: " << packet.Timestamp(); return absl::OkStatus(); }));
MP_RETURN_IF_ERROR(graph.StartRun({}));
for (int i = 0; i < 10; ++i) { auto packet = MakePacket<int>(i * 10).At(mediapipe::Timestamp(i * 1000000)); MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "input", packet));
}
MP_RETURN_IF_ERROR(graph.CloseInputStream("input"));
MP_RETURN_IF_ERROR(graph.WaitUntilDone());
LOG(INFO) << "Graph execution completed successfully";
return 0; }
|
八、常见错误与调试
8.1 输入为空错误
1 2 3 4 5 6 7 8 9
| const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();
if (cc->Inputs().Tag("IMAGE").IsEmpty()) { LOG(WARNING) << "Input is empty, skipping"; return absl::OkStatus(); } const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();
|
8.2 时间戳错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "input", MakePacket<int>(1).At(Timestamp(1000000))));
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "input", MakePacket<int>(2).At(Timestamp(500000))));
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "input", MakePacket<int>(1).At(Timestamp(1000000))));
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "input", MakePacket<int>(2).At(Timestamp(2000000))));
|
8.3 忘记注册 Calculator
1 2 3 4 5 6 7 8 9 10 11
| class MyCalculator : public CalculatorBase { };
class MyCalculator : public CalculatorBase { };
REGISTER_CALCULATOR(MyCalculator);
|
8.4 Options 类型错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| node { calculator: "MyCalculator" input_stream: "input" output_stream: "output" options { [mediapipe.MyCalculatorOptions.ext] { threshold: 0.5 } } }
message MyCalculatorOptions { extend CalculatorOptions { optional MyCalculatorOptions ext = 1000000; } required float threshold = 1; optional string name = 2; }
|
8.5 调试技巧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| LOG(INFO) << "Processing frame at timestamp: " << cc->InputTimestamp(); LOG(WARNING) << "Empty input detected"; LOG(ERROR) << "Calculation failed";
cv::imshow("Debug", image); cv::waitKey(1);
static int process_count = 0; process_count++; LOG(INFO) << "Processed " << process_count << " frames";
auto start = std::chrono::high_resolution_clock::now();
auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::microseconds>( end - start).count(); LOG(INFO) << "Process time: " << duration << "us";
|
九、实战:完整 Graph 示例
9.1 pbtxt 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| # dms_graph.pbtxt # IMS DMS 完整流水线配置
# ========== 输入输出 ========== input_stream: "IR_IMAGE:ir_frame" input_stream: "VEHICLE_SPEED:speed" input_stream: "TIMESTAMP:timestamp" output_stream: "DMS_RESULT:dms_output" output_stream: "ALERT:alert"
# ========== 1. 人脸检测 ========== node { calculator: "FaceDetectionCalculator" input_stream: "IMAGE:ir_frame" output_stream: "FACE_DETECTIONS:faces" options { [mediapipe.FaceDetectionOptions.ext] { model_path: "/models/face_detection.tflite" confidence_threshold: 0.7 max_num_detections: 10 } } }
# ========== 2. 人脸关键点 ========== node { calculator: "FaceMeshCalculator" input_stream: "IMAGE:ir_frame" input_stream: "FACE_DETECTIONS:faces" output_stream: "FACE_LANDMARKS:landmarks" options { [mediapipe.FaceMeshOptions.ext] { model_path: "/models/face_landmark.tflite" num_faces: 1 enable_iris: true } } }
# ========== 3. EAR 计算 ========== node { calculator: "ARCalculator" input_stream: "FACE_LANDMARKS:landmarks" output_stream: "EAR:left_ear" output_stream: "EAR:right_ear" output_stream: "EAR:avg_ear" options { [mediapipe.ARCalculatorOptions.ext] { ear_threshold: 0.2 } } }
# ========== 4. PERCLOS 计算 ========== node { calculator: "PERCLOSCalculator" input_stream: "EAR:avg_ear" input_stream: "TIMESTAMP:timestamp" output_stream: "PERCLOS:perclos" output_stream: "PERCLOS_WINDOW:window_stats" options { [mediapipe.PERCLOSOptions.ext] { window_seconds: 30 ear_threshold: 0.2 } } }
# ========== 5. 头部姿态估计 ========== node { calculator: "HeadPoseCalculator" input_stream: "FACE_LANDMARKS:landmarks" output_stream: "HEAD_POSE:head_pose" options { [mediapipe.HeadPoseOptions.ext] { model_path: "/models/head_pose.tflite" } } }
# ========== 6. 疲劳评分计算 ========== node { calculator: "FatigueScoreCalculator" input_stream: "EAR:avg_ear" input_stream: "PERCLOS:perclos" input_stream: "HEAD_POSE:head_pose" input_stream: "VEHICLE_SPEED:speed" output_stream: "FATIGUE_SCORE:fatigue_score" output_stream: "FATIGUE_LEVEL:fatigue_level" options { [mediapipe.FatigueOptions.ext] { perclos_weight: 0.5 ear_weight: 0.3 head_pose_weight: 0.2 perclos_low: 15.0 perclos_high: 30.0 ear_low: 0.2 ear_high: 0.25 speed_factor_enabled: true speed_threshold: 80.0 } } }
# ========== 7. 告警触发 ========== node { calculator: "FatigueAlertCalculator" input_stream: "FATIGUE_LEVEL:fatigue_level" input_stream: "TIMESTAMP:timestamp" output_stream: "ALERT:alert" options { [mediapipe.FatigueAlertOptions.ext] { level_1_threshold: 1 level_2_threshold: 2 level_3_threshold: 3 alert_cooldown_seconds: 30 } } }
# ========== 线程池配置 ========== executor { name: "cpu_executor" type: "ThreadPool" num_threads: 4 }
|
十、总结
| 概念 |
说明 |
关键点 |
| Graph |
感知流水线 |
pbtxt 配置、DAG 结构 |
| Calculator |
计算节点 |
Open/Process/Close 生命周期 |
| Stream |
数据流 |
Packet 序列、时间戳同步 |
| Packet |
数据包 |
Timestamp + Payload、零拷贝 |
下篇预告
MediaPipe 系列 03:Packet 与 Stream——时间序列数据的流动
深入讲解 Packet 传递机制、时间戳对齐、背压控制。
参考资料
- Google AI Edge. MediaPipe Framework Documentation
- Lugaresi et al. (2019). MediaPipe: A Framework for Building Perception Pipelines. arXiv:1906.08172
- LearnOpenCV. MediaPipe: The Ultimate Guide to Video Processing
系列进度: 2/55
更新时间: 2026-03-12