MediaPipe 系列 04:Side Packet——静态配置数据的注入完整指南

前言:为什么需要 Side Packet?

4.1 IMS 系统的配置管理挑战

IMS DMS 系统需要管理大量配置参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
┌─────────────────────────────────────────────────────────────────────────┐
│ IMS DMS 配置管理挑战 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 问题:如何传递静态配置数据到所有 Calculator? │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 需要传递的配置: │ │
│ │ │ │
│ │ • 模型路径 │ │
│ │ - face_detection.tflite │ │
│ │ - face_landmark.tflite │ │
│ │ - iris_detection.tflite │ │
│ │ │ │
│ │ • 算法参数 │ │
│ │ - EAR 阈值: 0.2 │ │
│ │ - PERCLOS 窗口: 30 秒 │ │
│ │ - 头部姿态阈值: 30° │ │
│ │ │ │
│ │ • 平台配置 │ │
│ │ - GPU 后端: true/false │ │
│ │ - 线程数: 4 │ │
│ │ - 精度: FP16/INT8 │ │
│ │ │ │
│ │ • 系统状态 │ │
│ │ - 车速: 60 km/h │ │
│ │ - 转向角度: 0° │ │
│ │ - CAN 信号 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 方案对比: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Stream 方式(不推荐) │ │
│ │ ──P1──P2──P3──P4──▶ │ │
│ │ ↓ ↓ ↓ ↓ │ │
│ │ 每帧都要传递配置,浪费资源 │ │
│ │ │ │
│ │ Side Packet 方式(推荐) │ │
│ │ ══════════════════════════════════════════════════════▶│ │
│ │ ↓ │ │
│ │ 只在 Graph 启动时传递一次,所有 Calculator 共享 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

4.2 Side Packet 的核心价值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
┌─────────────────────────────────────────────────────────────┐
│ Side Packet 核心价值 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Stream:时序数据流 │ │
│ │ │ │
│ │ 特征: │ │
│ │ • 每帧都有数据 │ │
│ │ • 时间戳递增 │ │
│ │ • 数据可能变化 │ │
│ │ • 需要同步多个 Stream │ │
│ │ │ │
│ │ 用途: │ │
│ │ • 视频帧 │ │
│ │ • 传感器数据 │ │
│ │ • 检测结果 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Side Packet:静态配置 │ │
│ │ │ │
│ │ 特征: │ │
│ │ • Graph 生命周期内不变 │ │
│ │ • 无时间戳 │ │
│ │ • 单个值 │ │
│ │ • 可被多个 Calculator 访问 │ │
│ │ │ │
│ │ 用途: │ │
│ │ • 模型路径 │ │
│ │ • 算法参数 │ │
│ │ • 平台配置 │ │
│ │ • 系统状态 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

五、Side Packet 概念详解

5.1 什么是 Side Packet?

Side Packet 是 MediaPipe 中用于传递静态配置数据的机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
┌─────────────────────────────────────────────────────────────┐
│ Side Packet 核心概念 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 定义: │
│ Side Packet 是 Graph 启动时传入的静态数据,在整个 Graph
│ 生命周期内保持不变。 │
│ │
│ vs Stream: │
│ ┌─────────────────────────────────────────────┐ │
│ │ Stream(数据流) │ │
│ │ • 每帧都有数据 │ │
│ │ • 时间戳递增 │ │
│ │ • 数据可能变化 │ │
│ │ • 需要时间同步 │ │
│ │ │ │
│ │ Side Packet(配置) │ │
│ │ • Graph 生命周期内不变 │ │
│ │ • 无时间戳 │ │
│ │ • 单个值 │ │
│ │ • 可被多个 Calculator 访问 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

5.2 Side Packet vs Stream 完整对比

维度 Stream Side Packet
数据性质 时序数据 静态配置
时间戳 必须有
更新频率 每帧 Graph 生命周期内不变
数据类型 任意类型 任意类型
数据量 可能很大 通常较小
同步要求 多流需要同步 无需同步
生命周期 流的生命周期 Graph 的生命周期
访问次数 每帧都访问 只在 Open 时访问
典型用途 视频帧、检测结果 模型路径、算法参数

5.3 Side Packet 的典型用途

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
┌─────────────────────────────────────────────────────────────┐
│ Side Packet 典型用途 │
├─────────────────────────────────────────────────────────────┤
│ │
1. 模型路径管理 │
│ ┌─────────────────────────────────────────────┐ │
│ │ face_detection_model = "/models/face.tflite"│ │
│ │ landmark_model = "/models/landmark.tflite" │ │
│ │ iris_model = "/models/iris.tflite" │ │
│ └─────────────────────────────────────────────┘ │
│ │
2. 算法参数配置 │
│ ┌─────────────────────────────────────────────┐ │
│ │ ear_threshold = 0.2 │ │
│ │ perclos_window = 30秒 │ │
│ │ head_pose_threshold = 30° │ │
│ │ nms_threshold = 0.45 │ │
│ └─────────────────────────────────────────────┘ │
│ │
3. 平台配置 │
│ ┌─────────────────────────────────────────────┐ │
│ │ use_gpu = true │ │
│ │ num_threads = 4 │ │
│ │ precision = "FP16" │ │
│ │ backend = "QNN" │ │
│ └─────────────────────────────────────────────┘ │
│ │
4. 系统状态输入 │
│ ┌─────────────────────────────────────────────┐ │
│ │ vehicle_speed = 60.0 │ │
│ │ steering_angle = 0.0 │ │
│ │ brake_pressure = 0.0 │ │
│ │ can_signal = {...} │ │
│ └─────────────────────────────────────────────┘ │
│ │
5. 资源共享 │
│ ┌─────────────────────────────────────────────┐ │
│ │ gpu_context = shared_ptr<GPUContext> │ │
│ │ model_cache = shared_ptr<ModelCache> │ │
│ │ logger = shared_ptr<Logger> │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

六、Side Packet 定义与使用

6.1 Graph 配置(pbtxt)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# ========== 定义 Side Packet 输入 ==========
input_side_packet: "MODEL_PATH:model_path"
input_side_packet: "CONFIG:config"
input_side_packet: "THRESHOLD:threshold"
input_side_packet: "NUM_THREADS:num_threads"

# ========== Calculator 使用 Side Packet ==========
node {
calculator: "FaceDetectionCalculator"
input_stream: "IMAGE:image"
input_side_packet: "MODEL_PATH:model_path"
input_side_packet: "THRESHOLD:threshold"
output_stream: "DETECTIONS:detections"
}

node {
calculator: "FilterCalculator"
input_stream: "DETECTIONS:detections"
input_side_packet: "THRESHOLD:threshold"
output_stream: "FILTERED:filtered"
}

node {
calculator: "PostProcessCalculator"
input_stream: "FILTERED:filtered"
input_side_packet: "NUM_THREADS:num_threads"
output_stream: "OUTPUT:result"
}

6.2 Calculator 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// ========== Calculator 头文件 ==========
#ifndef MY_CALCULATOR_H_
#define MY_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

class MyCalculator : public CalculatorBase {
public:
// GetContract:定义输入输出和 Side Packet
static absl::Status GetContract(CalculatorContract* cc) {
// ========== Stream 定义 ==========
// 输入 Stream
cc->Inputs().Tag("IMAGE").Set<cv::Mat>();
cc->Inputs().Tag("CONFIG").Set<RuntimeConfig>();

// 输出 Stream
cc->Outputs().Tag("OUTPUT").Set<Result>();

// ========== Side Packet 定义 ==========
// 输入 Side Packet
cc->InputSidePackets().Tag("MODEL_PATH").Set<std::string>();
cc->InputSidePackets().Tag("THRESHOLD").Set<float>();
cc->InputSidePackets().Tag("NUM_THREADS").Set<int>();

// 输出 Side Packet(可选)
cc->OutputSidePackets().Tag("MODEL").Set<ModelData>();

return absl::OkStatus();
}

// Open:初始化,获取 Side Packet
absl::Status Open(CalculatorContext* cc) override {
// ========== 获取 Side Packet ==========
// 在 Open 中获取 Side Packet,每个 Calculator 实例调用一次
model_path_ = cc->InputSidePackets().Tag("MODEL_PATH").Get<std::string>();
threshold_ = cc->InputSidePackets().Tag("THRESHOLD").Get<float>();
num_threads_ = cc->InputSidePackets().Tag("NUM_THREADS").Get<int>();

// ========== 加载模型 ==========
// 使用 Side Packet 中的配置
MP_RETURN_IF_ERROR(LoadModel(model_path_, num_threads_));

// ========== 初始化资源 ==========
// 只在 Open 中初始化一次
logger_ = std::make_shared<Logger>("MyCalculator");

LOG(INFO) << "MyCalculator initialized: "
<< "model_path=" << model_path_
<< ", threshold=" << threshold_
<< ", num_threads=" << num_threads_;

return absl::OkStatus();
}

// Process:处理数据
absl::Status Process(CalculatorContext* cc) override {
// ========== 检查输入是否可用 ==========
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
return absl::OkStatus();
}

// ========== 获取输入数据 ==========
const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();

// ========== 使用 Side Packet 中的配置 ==========
// 配置在 Open 中已加载,Process 中直接使用
Result result = Inference(image, threshold_);

// ========== 输出结果 ==========
cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<Result>(result).At(cc->InputTimestamp()));

return absl::OkStatus();
}

// Close:清理资源
absl::Status Close(CalculatorContext* cc) override {
// 释放模型
if (interpreter_) {
interpreter_->Reset();
}

LOG(INFO) << "MyCalculator closed";

return absl::OkStatus();
}

private:
// ========== 运行时配置 ==========
std::string model_path_;
float threshold_;
int num_threads_;

// ========== 资源 ==========
std::shared_ptr<tflite::Interpreter> interpreter_;
std::shared_ptr<Logger> logger_;

// ========== 方法 ==========
absl::Status LoadModel(const std::string& path, int threads);
Result Inference(const cv::Mat& image, float threshold);
};

// ========== 注册 Calculator ==========
REGISTER_CALCULATOR(MyCalculator);

} // namespace mediapipe

#endif // MY_CALCULATOR_H_

七、Side Packet 传递机制

7.1 外部传入 Side Packet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include "mediapipe/framework/calculator_framework.h"
#include <map>

int main() {
// ========== 1. 初始化 Graph ==========
mediapipe::CalculatorGraph graph;
MP_RETURN_IF_ERROR(graph.Initialize(config));

// ========== 2. 准备 Side Packet ==========
std::map<std::string, mediapipe::Packet> side_packets;

// 添加 Side Packet
side_packets["model_path"] = MakePacket<std::string>(
"/path/to/face_detection.tflite");

side_packets["threshold"] = MakePacket<float>(0.6f);

side_packets["num_threads"] = MakePacket<int>(4);

side_packets["gpu_context"] = MakePacket<std::shared_ptr<GPUContext>>(
gpu_context_);

side_packets["vehicle_speed"] = MakePacket<float>(60.0f);

// ========== 3. 启动 Graph 并传入 Side Packet ==========
MP_RETURN_IF_ERROR(graph.StartRun(side_packets));

// ========== 4. 发送 Stream 数据 ==========
cv::VideoCapture cap(0);
cv::Mat frame;
int frame_id = 0;

while (cap.read(frame)) {
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"image",
MakePacket<cv::Mat>(frame.clone()).At(Timestamp(frame_id++))));
}

// ========== 5. 关闭 Graph ==========
MP_RETURN_IF_ERROR(graph.CloseInputStream("image"));
MP_RETURN_IF_ERROR(graph.WaitUntilDone());

return 0;
}

7.2 Side Packet 命名规范

1
2
3
4
5
6
7
8
9
10
11
12
13
// ========== 命名规范 ==========
// Side Packet 名称通常使用下划线分隔
side_packets["model_path"] = MakePacket<std::string>("/models/face.tflite");
side_packets["detection_threshold"] = MakePacket<float>(0.6f);
side_packets["num_threads"] = MakePacket<int>(4);
side_packets["gpu_backend"] = MakePacket<std::string>("QNN");
side_packets["vehicle_speed"] = MakePacket<float>(60.0f);

// ========== Tag 命名规范 ==========
// 在 GetContract 中使用 Tag 命名
cc->InputSidePackets().Tag("MODEL_PATH").Set<std::string>();
cc->InputSidePackets().Tag("THRESHOLD").Set<float>();
cc->InputSidePackets().Tag("NUM_THREADS").Set<int>();

7.3 检查 Side Packet 是否存在

1
2
3
4
5
6
7
8
9
10
11
12
absl::Status Open(CalculatorContext* cc) override {
// 检查 Side Packet 是否存在
if (cc->InputSidePackets().HasTag("OPTIONAL_CONFIG")) {
auto config = cc->InputSidePackets().Tag("OPTIONAL_CONFIG").Get<Config>();
// 使用配置
} else {
// 使用默认值
auto config = GetDefaultConfig();
}

return absl::OkStatus();
}

八、Calculator 生成 Side Packet

8.1 输出 Side Packet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ========== Calculator A 输出 Side Packet ==========
node {
calculator: "ModelLoaderCalculator"
output_side_packet: "MODEL:model"
output_side_packet: "MODEL_INFO:model_info"
}

# ========== Calculator B 使用 ==========
node {
calculator: "InferenceCalculator"
input_stream: "IMAGE:image"
input_side_packet: "MODEL:model"
input_side_packet: "MODEL_INFO:model_info"
output_stream: "OUTPUT:result"
}

8.2 实现 ModelLoader Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// model_loader_calculator.h
#ifndef MODEL_LOADER_CALCULATOR_H_
#define MODEL_LOADER_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include "tensorflow/lite/interpreter.h"

namespace mediapipe {

class ModelLoaderCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// 输入:模型路径(从 Stream 或 Side Packet 传入)
cc->Inputs().Tag("MODEL_PATH").Set<std::string>();

// 输出:模型和模型信息(Side Packet)
cc->OutputSidePackets().Tag("MODEL").Set<ModelData>();
cc->OutputSidePackets().Tag("MODEL_INFO").Set<ModelInfo>();

return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
// 只执行一次
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 获取模型路径
const std::string& model_path = cc->Inputs().Tag("MODEL_PATH").Get<std::string>();

// 加载模型
auto model_data = LoadModel(model_path);
auto model_info = GetModelInfo(model_path);

// 输出 Side Packet
cc->OutputSidePackets().Tag("MODEL").Set(
MakePacket<ModelData>(model_data));
cc->OutputSidePackets().Tag("MODEL_INFO").Set(
MakePacket<ModelInfo>(model_info));

// 只输出一次,后续帧跳过
return absl::Status(absl::StatusCode::kCancelled, "Model loaded");
}

private:
ModelData LoadModel(const std::string& path);
ModelInfo GetModelInfo(const std::string& path);
};

REGISTER_CALCULATOR(ModelLoaderCalculator);

} // namespace mediapipe

#endif // MODEL_LOADER_CALCULATOR_H_

九、Side Packet 应用场景详解

9.1 模型路径配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ========== 场景:多个模型路径配置 ==========
int main() {
std::map<std::string, mediapipe::Packet> side_packets;

// 不同任务使用不同模型
side_packets["face_detection_model"] =
MakePacket<std::string>("/models/face_detection.tflite");

side_packets["face_landmark_model"] =
MakePacket<std::string>("/models/face_landmark.tflite");

side_packets["iris_detection_model"] =
MakePacket<std::string>("/models/iris_detection.tflite");

side_packets["hand_tracking_model"] =
MakePacket<std::string>("/models/hand_tracking.tflite");

MP_RETURN_IF_ERROR(graph.StartRun(side_packets));

// ...
}

9.2 运行时参数配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ========== 场景:动态调整算法参数 ==========
int main() {
std::map<std::string, mediapipe::Packet> side_packets;

// 疲劳检测参数
side_packets["ear_threshold"] = MakePacket<float>(0.2f);
side_packets["perclos_window"] = MakePacket<int>(30);
side_packets["head_pose_threshold"] = MakePacket<float>(30.0f);

// 分心检测参数
side_packets["gaze_zone_threshold"] = MakePacket<float>(0.5f);
side_packets["look_away_duration"] = MakePacket<int>(5);

// 危险行为检测参数
side_packets["yawn_threshold"] = MakePacket<float>(0.8f);
side_packets["head_bob_threshold"] = MakePacket<float>(0.5f);

MP_RETURN_IF_ERROR(graph.StartRun(side_packets));

// ...
}

9.3 平台配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ========== 场景:平台配置 ==========
int main() {
std::map<std::string, mediapipe::Packet> side_packets;

// GPU 配置
side_packets["use_gpu"] = MakePacket<bool>(true);
side_packets["gpu_backend"] = MakePacket<std::string>("QNN");
side_packets["gpu_device_id"] = MakePacket<int>(0);

// 线程配置
side_packets["num_threads"] = MakePacket<int>(4);
side_packets["thread_affinity"] = MakePacket<bool>(true);

// 精度配置
side_packets["precision"] = MakePacket<std::string>("FP16");
side_packets["quantization"] = MakePacket<bool>(true);

// 性能配置
side_packets["max_batch_size"] = MakePacket<int>(1);
side_packets["enable_profiling"] = MakePacket<bool>(false);

MP_RETURN_IF_ERROR(graph.StartRun(side_packets));

// ...
}

9.4 系统状态输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ========== 场景:CAN 信号和系统状态 ==========
int main() {
std::map<std::string, mediapipe::Packet> side_packets;

// 车辆状态
side_packets["vehicle_speed"] = MakePacket<float>(60.0f); // km/h
side_packets["steering_angle"] = MakePacket<float>(0.0f); // degrees
side_packets["brake_pressure"] = MakePacket<float>(0.0f); // kPa
side_packets["accelerator_position"] = MakePacket<float>(0.0f); // %

// 转向状态
side_packets["steering_direction"] = MakePacket<int>(0); // 0: straight, 1: left, 2: right
side_packets["lane_offset"] = MakePacket<float>(0.0f); // meters

// 车道线信息
side_packets["lane_left"] = MakePacket<cv::Mat>();
side_packets["lane_right"] = MakePacket<cv::Mat>();

MP_RETURN_IF_ERROR(graph.StartRun(side_packets));

// ...
}

十、实战:参数化推理 Calculator

10.1 完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// configurable_inference_calculator.h
#ifndef CONFIGURABLE_INFERENCE_CALCULATOR_H_
#define CONFIGURABLE_INFERENCE_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include "tensorflow/lite/interpreter.h"

namespace mediapipe {

// ========== 配置 Options ==========
message InferenceOptions {
optional float threshold = 1 [default = 0.5];
optional int32 num_threads = 2 [default = 4];
optional bool use_gpu = 2 [default = false];
optional string backend = 3 [default = "CPU"];
optional string precision = 4 [default = "FP32"];
}

// ========== 推理 Calculator ==========
class ConfigurableInferenceCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// Stream
cc->Inputs().Tag("IMAGE").Set<cv::Mat>();
cc->Outputs().Tag("OUTPUT").Set<std::vector<Detection>>();

// Side Packet 配置
cc->InputSidePackets().Tag("MODEL_PATH").Set<std::string>();
cc->InputSidePackets().Tag("THRESHOLD").Set<float>();
cc->InputSidePackets().Tag("NUM_THREADS").Set<int>();
cc->InputSidePackets().Tag("USE_GPU").Set<bool>();
cc->InputSidePackets().Tag("BACKEND").Set<std::string>();

return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
// 获取配置
model_path_ = cc->InputSidePackets().Tag("MODEL_PATH").Get<std::string>();
threshold_ = cc->InputSidePackets().Tag("THRESHOLD").Get<float>();
num_threads_ = cc->InputSidePackets().Tag("NUM_THREADS").Get<int>();
use_gpu_ = cc->InputSidePackets().Tag("USE_GPU").Get<bool>();
backend_ = cc->InputSidePackets().Tag("BACKEND").Get<std::string>();

// 加载模型
MP_RETURN_IF_ERROR(LoadModel(model_path_, num_threads_, use_gpu_, backend_));

LOG(INFO) << "ConfigurableInferenceCalculator initialized: "
<< "model_path=" << model_path_
<< ", threshold=" << threshold_
<< ", num_threads=" << num_threads_
<< ", use_gpu=" << use_gpu_
<< ", backend=" << backend_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 检查输入
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
return absl::OkStatus();
}

const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();

// 预处理
cv::Mat input_tensor = Preprocess(image);

// 推理
auto output = interpreter_->Invoke();

// 后处理
std::vector<Detection> detections = Postprocess(output, threshold_);

// 输出
cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<std::vector<Detection>>(detections).At(cc->InputTimestamp()));

return absl::OkStatus();
}

absl::Status Close(CalculatorContext* cc) override {
if (interpreter_) {
interpreter_->Reset();
}
LOG(INFO) << "ConfigurableInferenceCalculator closed";
return absl::OkStatus();
}

private:
// ========== 配置 ==========
std::string model_path_;
float threshold_;
int num_threads_;
bool use_gpu_;
std::string backend_;

// ========== 资源 ==========
std::unique_ptr<tflite::Interpreter> interpreter_;

// ========== 方法 ==========
absl::Status LoadModel(const std::string& path, int threads,
bool use_gpu, const std::string& backend);
cv::Mat Preprocess(const cv::Mat& image);
std::vector<Detection> Postprocess(const float* output, float threshold);
};

REGISTER_CALCULATOR(ConfigurableInferenceCalculator);

} // namespace mediapipe

#endif // CONFIGURABLE_INFERENCE_CALCULATOR_H_

10.2 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# inference_graph.pbtxt
input_stream: "IMAGE:image"
output_stream: "DETECTIONS:detections"

input_side_packet: "MODEL_PATH:model_path"
input_side_packet: "THRESHOLD:threshold"
input_side_packet: "NUM_THREADS:num_threads"
input_side_packet: "USE_GPU:use_gpu"
input_side_packet: "BACKEND:backend"

node {
calculator: "ConfigurableInferenceCalculator"
input_stream: "IMAGE:image"
input_side_packet: "MODEL_PATH:model_path"
input_side_packet: "THRESHOLD:threshold"
input_side_packet: "NUM_THREADS:num_threads"
input_side_packet: "USE_GPU:use_gpu"
input_side_packet: "BACKEND:backend"
output_stream: "OUTPUT:detections"
}

10.3 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// main.cpp
#include "mediapipe/framework/calculator_framework.h"
#include <map>

int main() {
mediapipe::CalculatorGraph graph;

// 初始化 Graph
MP_RETURN_IF_ERROR(graph.Initialize(config));

// ========== 配置参数 ==========
std::map<std::string, mediapipe::Packet> side_packets;

// 模型路径
side_packets["model_path"] = MakePacket<std::string>(
"/models/face_detection.tflite");

// 检测阈值
side_packets["threshold"] = MakePacket<float>(0.6f);

// 线程数
side_packets["num_threads"] = MakePacket<int>(4);

// GPU 后端
side_packets["use_gpu"] = MakePacket<bool>(true);
side_packets["backend"] = MakePacket<std::string>("QNN");

// ========== 启动 Graph ==========
MP_RETURN_IF_ERROR(graph.StartRun(side_packets));

// ========== 处理视频 ==========
cv::VideoCapture cap(0);
cv::Mat frame;
int frame_id = 0;

while (cap.read(frame)) {
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"image",
MakePacket<cv::Mat>(frame.clone()).At(Timestamp(frame_id++))));
}

// ========== 关闭 Graph ==========
MP_RETURN_IF_ERROR(graph.CloseInputStream("image"));
MP_RETURN_IF_ERROR(graph.WaitUntilDone());

LOG(INFO) << "Graph execution completed";

return 0;
}

十一、常见问题与最佳实践

11.1 Side Packet 未设置

1
2
3
4
5
6
7
// ========== 错误:Side Packet 未传入 ==========
MP_RETURN_IF_ERROR(graph.StartRun({})); // 缺少 side_packets

// ========== 正确:传入 Side Packet ==========
std::map<std::string, mediapipe::Packet> side_packets;
side_packets["model_path"] = MakePacket<std::string>("/path/to/model.tflite");
MP_RETURN_IF_ERROR(graph.StartRun(side_packets));

11.2 修改 Side Packet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ========== 错误:Side Packet 在运行期间不能修改 ==========
MP_RETURN_IF_ERROR(graph.StartRun(side_packets));
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"config_stream", MakePacket<float>(0.7f).At(Timestamp(0)))); // ❌

// ========== 正确:使用 Stream 传递可变参数 ==========
input_stream: "threshold_stream"

node {
calculator: "MyCalculator"
input_stream: "threshold_stream"
input_side_packet: "static_threshold"
}

// 或者使用 Packet Generator
node {
calculator: "DynamicThresholdCalculator"
output_side_packet: "THRESHOLD:threshold"
}

11.3 空 Side Packet

1
2
3
4
5
6
7
8
// 检查 Side Packet 是否存在
if (cc->InputSidePackets().HasTag("OPTIONAL")) {
auto value = cc->InputSidePackets().Tag("OPTIONAL").Get<int>();
// 使用配置
} else {
// 使用默认值
auto value = GetDefaultValue();
}

11.4 命名规范

1
2
3
4
5
6
7
8
9
// ========== 命名规范 ==========
// Side Packet 名称:使用下划线分隔
side_packets["model_path"] = MakePacket<std::string>("/models/face.tflite");
side_packets["detection_threshold"] = MakePacket<float>(0.6f);
side_packets["num_threads"] = MakePacket<int>(4);

// Tag 命名:使用大写字母和下划线
cc->InputSidePackets().Tag("MODEL_PATH").Set<std::string>();
cc->InputSidePackets().Tag("THRESHOLD").Set<float>();

十二、总结

要点 说明
用途 传递静态配置数据
生命周期 Graph 启动时设置,运行期间不变
获取时机 通常在 Open() 中获取
vs Stream 无时间戳,单值,不变

下篇预告

MediaPipe 系列 05:Graph 配置文件(pbtxt)详解

深入讲解 pbtxt 语法、Graph 配置最佳实践、命名规范。


参考资料

  1. Google AI Edge. MediaPipe Side Packet Documentation
  2. Google AI Edge. MediaPipe Calculator Contract
  3. Lugaresi et al. (2019). MediaPipe: A Framework for Building Perception Pipelines. arXiv:1906.08172

系列进度: 4/55
更新时间: 2026-03-12


MediaPipe 系列 04:Side Packet——静态配置数据的注入完整指南
https://dapalm.com/2026/03/12/MediaPipe系列04-Side-Packet:静态配置数据的注入/
作者
Mars
发布于
2026年3月12日
许可协议