MediaPipe 系列 07:Calculator 开发规范——从接口到实现完整指南

前言:Calculator 开发的重要性

7.1 Calculator 的核心地位

Calculator 是 MediaPipe 框架的核心组件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
┌─────────────────────────────────────────────────────────────────────────┐
│ Calculator 的核心地位 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Graph(感知流水线) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ node { calculator: "CalculatorA" } │ │
│ │ node { calculator: "CalculatorB" } │ │
│ │ node { calculator: "CalculatorC" } │ │
│ │ ... │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Calculator(计算节点) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ • 封装具体算法逻辑 │ │
│ │ • 定义输入输出端口 │ │
│ │ • 管理计算资源 │ │
│ │ • 处理错误和异常 │ │
│ │ • 输出计算结果 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ IMS DMS 系统中的 Calculator 示例: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ • FaceDetectionCalculator:人脸检测 │ │
│ │ • FaceMeshCalculator:人脸关键点 │ │
│ │ • IrisDetectionCalculator:虹膜检测 │ │
│ │ • HeadPoseCalculator:头部姿态 │ │
│ │ • ARCalculator:EAR 计算 │ │
│ │ • PERCLOSCalculator:PERCLOS 计算 │ │
│ │ • FatigueScoreCalculator:疲劳评分 │ │
│ │ • DistractionScoreCalculator:分心评分 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

7.2 开发规范的重要性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
┌─────────────────────────────────────────────────────────────┐
│ 开发规范的重要性 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 问题:如何开发高质量、可维护的 Calculator? │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 规范化的好处 │ │
│ │ │ │
│ │ 1. 可读性:代码清晰易懂 │ │
│ │ 2. 可维护性:修改和扩展方便 │ │
│ │ 3. 可复用性:跨项目复用 │ │
│ │ 4. 稳定性:错误处理完善 │ │
│ │ 5. 性能:资源管理高效 │ │
│ │ 6. 调试:问题定位快速 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 本篇内容: │
│ ┌─────────────────────────────────────────────┐ │
│ │ │ │
│ │ • Calculator 接口详解 │ │
│ │ • 生命周期管理 │ │
│ │ • 输入输出定义 │ │
│ │ • 数据访问最佳实践 │ │
│ │ • 配置选项管理 │ │
│ │ • 错误处理规范 │ │
│ │ • 性能优化技巧 │ │
│ │ • 调试方法 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

八、Calculator 接口详解

8.1 CalculatorBase 基类

CalculatorBase 是所有 Calculator 的基类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// mediapipe/framework/calculator_base.h

class CalculatorBase {
public:
virtual ~CalculatorBase() = default;

// ========== 静态方法:定义接口 ==========
// 在 Graph 初始化时调用,验证配置正确性
// 只调用一次,每个 Calculator 实例调用一次
static absl::Status GetContract(CalculatorContract* cc);

// ========== 生命周期方法 ==========

// Open:初始化
// 在 Graph 启动时调用一次
// 返回值:absl::OkStatus() 成功,其他值失败
// 常用操作:
// - 读取 Options:cc->Options<MyOptions>()
// - 分配内存:new T()
// - 加载模型:TfLiteModel::Load()
// - 初始化资源:cv::Mat(), std::vector<>()
virtual absl::Status Open(CalculatorContext* cc);

// Process:处理数据
// 每个输入 Packet 调用一次
// 返回值:absl::OkStatus() 成功,其他值失败
// 常用操作:
// - 检查输入:cc->Inputs().Tag("NAME").IsEmpty()
// - 获取输入:cc->Inputs().Tag("NAME").Get<T>()
// - 输出结果:cc->Outputs().Tag("NAME").AddPacket(...)
virtual absl::Status Process(CalculatorContext* cc) = 0;

// Close:清理
// 在 Graph 关闭时调用一次
// 返回值:absl::OkStatus()
// 常用操作:
// - 释放模型:model_->Close()
// - 释放内存:delete ptr
// - 关闭文件:file_.close()
virtual absl::Status Close(CalculatorContext* cc);

// ========== 流处理方法(可选)==========
// 用于自定义流处理逻辑
virtual absl::Status ProcessInputStreams(CalculatorContext* cc);

protected:
// ========== 受保护方法 ==========
// 获取 Calculator 的配置
template <typename OptionsType>
const OptionsType& Options() const;

// 获取输入时间戳
Timestamp InputTimestamp() const;
};

8.2 CalculatorContract 类

CalculatorContract 用于定义 Calculator 的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// mediapipe/framework/calculator_contract.h

class CalculatorContract {
public:
// ========== 输入端口 ==========
InputStreamManager& Inputs();

// 添加输入端口(Tag 方式)
InputStreamHandler& Tag(const std::string& tag);

// 添加输入端口(Index 方式)
InputStreamHandler& Index(int index);

// ========== 输出端口 ==========
OutputStreamManager& Outputs();

// 添加输出端口(Tag 方式)
OutputStreamHandler& Tag(const std::string& tag);

// 添加输出端口(Index 方式)
OutputStreamHandler& Index(int index);

// ========== Side Packet ==========
SidePacketManager& InputSidePackets();
SidePacketManager& OutputSidePackets();

// ========== Options ==========
template <typename OptionsType>
const OptionsType& Options() const;

// ========== 输入流处理器 ==========
void SetInputStreamHandler(const std::string& handler_type);
};

// ========== 端口类型设置 ==========
class InputStreamHandler {
public:
// 设置输入类型
template <typename T>
void Set();

// 设置任意类型
void SetAny();

// 与另一个端口类型相同
void SetSameAs(const InputStreamHandler* other);
};

class OutputStreamHandler {
public:
// 设置输出类型
template <typename T>
void Set();

// 设置任意类型
void SetAny();

// 与另一个端口类型相同
void SetSameAs(const OutputStreamHandler* other);
};

8.3 CalculatorContext 类

CalculatorContext 提供运行时上下文:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// mediapipe/framework/calculator_context.h

class CalculatorContext {
public:
// ========== 输入输出端口 ==========
// 获取输入端口
InputStreamShard& Inputs();

// 获取输出端口
OutputStreamShard& Outputs();

// 获取 Side Packet
SidePacketShard& InputSidePackets();
SidePacketShard& OutputSidePackets();

// ========== Options ==========
// 获取配置选项
template <typename OptionsType>
const OptionsType& Options() const;

// ========== 时间戳 ==========
// 获取输入时间戳
Timestamp InputTimestamp() const;

// ========== Graph 状态 ==========
// 获取 Graph 运行状态
GraphStatus GetGraphStatus() const;

// ========== 计数器(性能分析)==========
// 获取计数器
Counter* GetCounter(const std::string& name);
};

// ========== 输入流 Shard ==========
class InputStreamShard {
public:
// 检查是否为空
bool IsEmpty() const;

// 获取数据
template <typename T>
const T& Get() const;

// 获取 Packet
const Packet& Value() const;

// 通过 Tag 访问
InputStreamShard& Tag(const std::string& tag);

// 通过 Index 访问
InputStreamShard& Index(int index);
};

// ========== 输出流 Shard ==========
class OutputStreamShard {
public:
// 添加 Packet
void AddPacket(const Packet& packet);

// 添加数据(简化接口)
template <typename T>
void Add(const T& value, Timestamp timestamp);

// 通过 Tag 访问
OutputStreamShard& Tag(const std::string& tag);

// 通过 Index 访问
OutputStreamShard& Index(int index);
};

九、Calculator 生命周期详解

9.1 生命周期流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
┌─────────────────────────────────────────────────────────────┐
│ Calculator 生命周期 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Graph 启动 │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ GetContract │ ← 只调用一次(静态验证) │
│ │ │ 定义输入输出端口类型、数量 │
│ │ 返回值: │ - cc->Inputs().Tag("NAME").Set<T>() │
│ │ absl::OkStatus│ - cc->Outputs().Tag("NAME").Set<T>() │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Open │ ← 每个实例调用一次(初始化) │
│ │ │ - 读取 Options │
│ │ 返回值: │ - 分配内存、加载模型 │
│ │ absl::OkStatus│ - 初始化资源 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Process │ ← 每个输入 Packet 调用一次(处理) │
│ │ │ - 检查输入是否可用 │
│ │ 返回值: │ - 获取输入数据 │
│ │ absl::OkStatus│ - 执行计算逻辑 │
│ │ │ - 输出结果 │
│ │ 调用次数: │ - cc->Inputs().Tag(...).Get<T>() │
│ │ N 次 │ - cc->Outputs().Tag(...).AddPacket(...) │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Close │ ← Graph 关闭时调用一次(清理) │
│ │ │ - 释放模型、内存 │
│ │ 返回值: │ - 关闭文件、连接 │
│ │ absl::OkStatus│ - 清理资源 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ Graph 关闭 │
│ │
└─────────────────────────────────────────────────────────────┘

9.2 GetContract 详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// ========== GetContract:定义接口 ==========
static absl::Status GetContract(CalculatorContract* cc) {
// ========== 1. 定义输入端口 ==========
// Tag 方式(推荐)
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();
cc->Inputs().Tag("CONFIG").Set<Config>();
cc->Inputs().Tag("ROI").Set<Rect>();

// Index 方式
cc->Inputs().Index(0).Set<ImageFrame>();
cc->Inputs().Index(1).Set<Config>();

// 任意类型
cc->Inputs().Tag("DATA").SetAny();

// 与另一个端口类型相同
cc->Outputs().Tag("OUTPUT").SetSameAs(&cc->Inputs().Tag("IMAGE"));

// ========== 2. 定义输出端口 ==========
cc->Outputs().Tag("DETECTIONS").Set<std::vector<Detection>>();
cc->Outputs().Tag("LANDMARKS").Set<std::vector<Landmark>>();
cc->Outputs().Tag("SCORES").Set<std::vector<float>>();

// ========== 3. 定义 Side Packet ==========
cc->InputSidePackets().Tag("MODEL_PATH").Set<std::string>();
cc->InputSidePackets().Tag("CONFIG").Set<Config>();

cc->OutputSidePackets().Tag("STATUS").Set<Status>();

// ========== 4. 声明 Options ==========
// 这会验证 Graph 配置中的 options 是否正确
cc->Options<MyCalculatorOptions>();

// ========== 5. 设置输入流处理器(可选)==========
cc->SetInputStreamHandler("SyncSetInputStreamHandler");

return absl::OkStatus();
}

9.3 Open 详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// ========== Open:初始化 ==========
absl::Status Open(CalculatorContext* cc) override {
// ========== 1. 读取 Options ==========
const auto& options = cc->Options<MyCalculatorOptions>();

// 基本类型
threshold_ = options.threshold();
max_results_ = options.max_results();
enable_debug_ = options.enable_debug();

// 重复字段
for (const auto& label : options.labels()) {
labels_.push_back(label);
}

// 嵌套消息
const auto& model_config = options.model_config();
model_path_ = model_config.path();
input_size_ = model_config.input_size();

// ========== 2. 获取 Side Packet ==========
// 必需的 Side Packet
model_path_ = cc->InputSidePackets().Tag("MODEL_PATH").Get<std::string>();

// 可选的 Side Packet
if (cc->InputSidePackets().HasTag("OPTIONAL_CONFIG")) {
const auto& config = cc->InputSidePackets().Tag("OPTIONAL_CONFIG").Get<Config>();
// 使用配置
}

// ========== 3. 初始化资源 ==========
// 加载模型
MP_RETURN_IF_ERROR(LoadModel(model_path_));

// 分配内存
input_buffer_.resize(input_size_);
output_buffer_.resize(output_size_);

// 初始化 OpenCV
cv::setNumThreads(num_threads_);

// ========== 4. 日志输出 ==========
LOG(INFO) << "MyCalculator initialized: "
<< "threshold=" << threshold_
<< ", max_results=" << max_results_
<< ", model_path=" << model_path_;

return absl::OkStatus();
}

9.4 Process 详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// ========== Process:处理数据 ==========
absl::Status Process(CalculatorContext* cc) override {
// ========== 1. 检查输入是否可用 ==========
// 检查必需输入
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
// 输入为空,跳过处理
return absl::OkStatus();
}

// 检查可选输入
bool has_config = !cc->Inputs().Tag("CONFIG").IsEmpty();
bool has_roi = !cc->Inputs().Tag("ROI").IsEmpty();

// ========== 2. 获取输入数据 ==========
const ImageFrame& image = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();

// 可选输入
Rect roi;
if (has_roi) {
roi = cc->Inputs().Tag("ROI").Get<Rect>();
}

// ========== 3. 验证输入数据 ==========
// 检查图像尺寸
if (image.Width() == 0 || image.Height() == 0) {
return absl::InvalidArgumentError("Invalid image size");
}

// 检查图像格式
if (image.Format() != ImageFormat::SRGB) {
return absl::InvalidArgumentError("Unsupported image format");
}

// ========== 4. 执行计算 ==========
auto start_time = std::chrono::high_resolution_clock::now();

std::vector<Detection> detections;
MP_RETURN_IF_ERROR(DetectObjects(image, &detections));

auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time).count();

// ========== 5. 输出结果 ==========
// 输出检测结果
cc->Outputs().Tag("DETECTIONS").AddPacket(
MakePacket<std::vector<Detection>>(detections).At(cc->InputTimestamp()));

// 输出性能指标
PerformanceMetrics metrics;
metrics.process_time_us = duration;
metrics.detection_count = detections.size();
cc->Outputs().Tag("METRICS").AddPacket(
MakePacket<PerformanceMetrics>(metrics).At(cc->InputTimestamp()));

// ========== 6. 日志输出(可选)==========
if (enable_debug_) {
LOG(INFO) << "Processed frame at timestamp=" << cc->InputTimestamp()
<< ", detections=" << detections.size()
<< ", time=" << duration << "us";
}

// ========== 7. 更新统计 ==========
process_count_++;

return absl::OkStatus();
}

9.5 Close 详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// ========== Close:清理资源 ==========
absl::Status Close(CalculatorContext* cc) override {
// ========== 1. 释放模型 ==========
if (model_) {
model_->Close();
model_.reset();
}

// ========== 2. 释放内存 ==========
input_buffer_.clear();
output_buffer_.clear();
input_buffer_.shrink_to_fit();
output_buffer_.shrink_to_fit();

// ========== 3. 关闭文件 ==========
if (log_file_.is_open()) {
log_file_.close();
}

// ========== 4. 关闭连接 ==========
if (socket_.is_open()) {
socket_.close();
}

// ========== 5. 输出统计 ==========
LOG(INFO) << "MyCalculator closed after processing " << process_count_
<< " frames";

// ========== 6. 输出 Side Packet(可选)==========
if (cc->OutputSidePackets().HasTag("STATUS")) {
Status status;
status.process_count = process_count_;
status.success = true;
cc->OutputSidePackets().Tag("STATUS").Set(
MakePacket<Status>(status));
}

return absl::OkStatus();
}

十、Calculator 完整实现示例

10.1 图像处理 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// image_processor_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_IMAGE_IMAGE_PROCESSOR_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_IMAGE_IMAGE_PROCESSOR_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/formats/image_frame.h"
#include "mediapipe/framework/port/opencv_imgproc.h"
#include "mediapipe/framework/port/opencv_imgcodecs.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/status.h"

namespace mediapipe {

// ========== Proto 定义 ==========
message ImageProcessorOptions {
optional int32 target_width = 1 [default = 0]; // 目标宽度(0 = 不缩放)
optional int32 target_height = 2 [default = 0]; // 目标高度(0 = 不缩放)
optional bool grayscale = 3 [default = false]; // 灰度转换
optional bool normalize = 4 [default = false]; // 归一化
optional bool enable_debug = 5 [default = false]; // 调试输出
optional float rotation_angle = 6 [default = 0.0]; // 旋转角度(度)
}

// ========== Calculator 类 ==========
class ImageProcessorCalculator : public CalculatorBase {
public:
// ========== GetContract:定义接口 ==========
static absl::Status GetContract(CalculatorContract* cc) {
// 输入端口
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();

// 可选输入:ROI
if (cc->Inputs().HasTag("ROI")) {
cc->Inputs().Tag("ROI").Set<Rect>();
}

// 输出端口
cc->Outputs().Tag("IMAGE").Set<ImageFrame>();

// 性能指标输出(可选)
if (cc->Outputs().HasTag("METRICS")) {
cc->Outputs().Tag("METRICS").Set<PerformanceMetrics>();
}

// Options
cc->Options<ImageProcessorOptions>();

return absl::OkStatus();
}

// ========== Open:初始化 ==========
absl::Status Open(CalculatorContext* cc) override {
// 读取配置
const auto& options = cc->Options<ImageProcessorOptions>();

target_width_ = options.target_width();
target_height_ = options.target_height();
grayscale_ = options.grayscale();
normalize_ = options.normalize();
enable_debug_ = options.enable_debug();
rotation_angle_ = options.rotation_angle();

// 验证配置
if (target_width_ < 0 || target_height_ < 0) {
return absl::InvalidArgumentError("Invalid target size");
}

if (rotation_angle_ != 0.0f && rotation_angle_ != 90.0f &&
rotation_angle_ != 180.0f && rotation_angle_ != 270.0f) {
return absl::InvalidArgumentError("Invalid rotation angle");
}

LOG(INFO) << "ImageProcessorCalculator initialized: "
<< "target_size=" << target_width_ << "x" << target_height_
<< ", grayscale=" << grayscale_
<< ", normalize=" << normalize_
<< ", rotation=" << rotation_angle_;

return absl::OkStatus();
}

// ========== Process:处理数据 ==========
absl::Status Process(CalculatorContext* cc) override {
// 检查输入
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
return absl::OkStatus();
}

auto start_time = std::chrono::high_resolution_clock::now();

// 获取输入
const ImageFrame& input = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();

// 转换为 OpenCV Mat
cv::Mat input_mat = formats::MatView(&input);
cv::Mat output_mat;

// ========== 1. 旋转 ==========
if (rotation_angle_ != 0.0f) {
RotateImage(input_mat, &output_mat, rotation_angle_);
} else {
output_mat = input_mat.clone();
}

// ========== 2. 缩放 ==========
if (target_width_ > 0 && target_height_ > 0) {
cv::resize(output_mat, output_mat,
cv::Size(target_width_, target_height_));
}

// ========== 3. 灰度转换 ==========
if (grayscale_) {
cv::cvtColor(output_mat, output_mat, cv::COLOR_RGB2GRAY);
cv::cvtColor(output_mat, output_mat, cv::COLOR_GRAY2RGB);
}

// ========== 4. 归一化 ==========
if (normalize_) {
output_mat.convertTo(output_mat, CV_32F, 1.0 / 255.0);
output_mat.convertTo(output_mat, CV_8U, 255.0);
}

// ========== 5. 创建输出 ImageFrame ==========
auto output_frame = absl::make_unique<ImageFrame>(
output_mat.channels() == 3 ? ImageFormat::SRGB : ImageFormat::GRAY8,
output_mat.cols, output_mat.rows, output_mat.step,
output_mat.data,
[output_mat](uint8*) mutable { output_mat.release(); });

cc->Outputs().Tag("IMAGE").Add(output_frame.release(), cc->InputTimestamp());

// ========== 6. 输出性能指标 ==========
if (cc->Outputs().HasTag("METRICS")) {
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time).count();

PerformanceMetrics metrics;
metrics.process_time_us = duration;
metrics.input_size = cv::Size(input.Width(), input.Height());
metrics.output_size = cv::Size(output_mat.cols, output_mat.rows);

cc->Outputs().Tag("METRICS").AddPacket(
MakePacket<PerformanceMetrics>(metrics).At(cc->InputTimestamp()));
}

// ========== 7. 日志输出 ==========
if (enable_debug_) {
LOG(INFO) << "Processed image: "
<< input.Width() << "x" << input.Height()
<< " -> " << output_mat.cols << "x" << output_mat.rows;
}

process_count_++;

return absl::OkStatus();
}

// ========== Close:清理 ==========
absl::Status Close(CalculatorContext* cc) override {
LOG(INFO) << "ImageProcessorCalculator closed after processing "
<< process_count_ << " frames";
return absl::OkStatus();
}

private:
// ========== 配置参数 ==========
int target_width_ = 0;
int target_height_ = 0;
bool grayscale_ = false;
bool normalize_ = false;
bool enable_debug_ = false;
float rotation_angle_ = 0.0f;

// ========== 统计信息 ==========
int process_count_ = 0;

// ========== 辅助方法 ==========
void RotateImage(const cv::Mat& input, cv::Mat* output, float angle) {
if (angle == 90.0f) {
cv::rotate(input, *output, cv::ROTATE_90_CLOCKWISE);
} else if (angle == 180.0f) {
cv::rotate(input, *output, cv::ROTATE_180);
} else if (angle == 270.0f) {
cv::rotate(input, *output, cv::ROTATE_90_COUNTERCLOCKWISE);
} else {
// 任意角度旋转
cv::Point2f center(input.cols / 2.0f, input.rows / 2.0f);
cv::Mat rot_mat = cv::getRotationMatrix2D(center, angle, 1.0);
cv::warpAffine(input, *output, rot_mat, input.size());
}
}
};

// ========== 注册 Calculator ==========
REGISTER_CALCULATOR(ImageProcessorCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_IMAGE_IMAGE_PROCESSOR_CALCULATOR_H_

10.2 BUILD 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# mediapipe/calculators/image/BUILD

load("//mediapipe/framework/port:build_config.bzl", "mediapipe_cc_library")

# ========== 图像处理 Calculator ==========
cc_library(
name = "image_processor_calculator",
srcs = ["image_processor_calculator.cc"],
hdrs = ["image_processor_calculator.h"],
deps = [
"//mediapipe/framework:calculator_framework",
"//mediapipe/framework/formats:image_frame",
"//mediapipe/framework/port:opencv_imgproc",
"//mediapipe/framework/port:opencv_imgcodecs",
"//mediapipe/framework/port:logging",
"@com_google_absl//absl/memory",
],
alwayslink = 1,
visibility = ["//visibility:public"],
)

十一、错误处理规范

11.1 错误类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ========== 常用错误类型 ==========
// 1. InvalidArgumentError:参数错误
return absl::InvalidArgumentError("Invalid image size");

// 2. NotFoundError:资源未找到
return absl::NotFoundError("Model file not found");

// 3. InternalError:内部错误
return absl::InternalError("Model inference failed");

// 4. UnavailableError:资源不可用
return absl::UnavailableError("GPU not available");

// 5. FailedPreconditionError:前置条件失败
return absl::FailedPreconditionError("Model not loaded");

11.2 错误处理最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// ========== 1. 使用 MP_RETURN_IF_ERROR ==========
MP_RETURN_IF_ERROR(LoadModel(model_path_));
MP_RETURN_IF_ERROR(Inference(input, &output));

// ========== 2. 使用 RET_CHECK ==========
RET_CHECK(!image.empty()) << "Image is empty";
RET_CHECK(model_ != nullptr) << "Model not loaded";
RET_CHECK(threshold_ > 0.0f && threshold_ < 1.0f) << "Invalid threshold";

// ========== 3. 使用 CHECK_* 断言 ==========
CHECK_EQ(image.channels(), 3) << "Expected 3-channel image";
CHECK_GT(image.rows, 0) << "Image height must be positive";
CHECK_LT(threshold_, 1.0f) << "Threshold must be < 1.0";

// ========== 4. 错误处理示例 ==========
absl::Status Process(CalculatorContext* cc) override {
// 检查输入
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
return absl::OkStatus(); // 空输入,跳过
}

const ImageFrame& image = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();

// 验证输入
if (image.Width() == 0 || image.Height() == 0) {
return absl::InvalidArgumentError("Invalid image size");
}

// 检查模型
RET_CHECK(model_ != nullptr) << "Model not loaded";

// 执行计算
std::vector<Detection> detections;
MP_RETURN_IF_ERROR(DetectObjects(image, &detections));

// 输出结果
cc->Outputs().Tag("DETECTIONS").AddPacket(
MakePacket<std::vector<Detection>>(detections).At(cc->InputTimestamp()));

return absl::OkStatus();
}

十二、性能优化技巧

12.1 零拷贝设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ========== 零拷贝传递 ==========
absl::Status Process(CalculatorContext* cc) override {
// 直接传递 Packet(零拷贝)
cc->Outputs().Tag("OUTPUT").AddPacket(cc->Inputs().Tag("INPUT").Value());

return absl::OkStatus();
}

// ========== 避免不必要的拷贝 ==========
absl::Status Process(CalculatorContext* cc) override {
const ImageFrame& input = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();

// ❌ 错误:不必要的拷贝
// ImageFrame copy = input;

// ✅ 正确:使用引用
cv::Mat input_mat = formats::MatView(&input);

return absl::OkStatus();
}

12.2 内存预分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
absl::Status Open(CalculatorContext* cc) override {
// 预分配内存
input_buffer_.reserve(1024);
output_buffer_.reserve(1024);
detections_.reserve(max_results_);

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 清空缓冲区(不释放内存)
detections_.clear();

// 使用预分配的内存
// ...

return absl::OkStatus();
}

12.3 并行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <omp.h>

absl::Status Process(CalculatorContext* cc) override {
const auto& inputs = cc->Inputs().Tag("DATA").Get<std::vector<Data>>();
std::vector<Result> results(inputs.size());

// 使用 OpenMP 并行处理
#pragma omp parallel for num_threads(num_threads_)
for (size_t i = 0; i < inputs.size(); ++i) {
results[i] = ProcessData(inputs[i]);
}

cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<std::vector<Result>>(results).At(cc->InputTimestamp()));

return absl::OkStatus();
}

十三、调试技巧

13.1 日志输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include "mediapipe/framework/port/logging.h"

// ========== 日志级别 ==========
LOG(INFO) << "Information message";
LOG(WARNING) << "Warning message";
LOG(ERROR) << "Error message";
LOG(FATAL) << "Fatal message"; // 会终止程序

// ========== 条件日志 ==========
LOG_IF(INFO, process_count_ % 100 == 0)
<< "Processed " << process_count_ << " frames";

// ========== 详细日志 ==========
VLOG(1) << "Debug level 1";
VLOG(2) << "Debug level 2";

// ========== 使用 GLOG ==========
// GLOG_logtostderr=1 GLOG_v=2 ./my_app

13.2 性能分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include "mediapipe/framework/port/logging.h"
#include <chrono>

absl::Status Process(CalculatorContext* cc) override {
auto start = std::chrono::high_resolution_clock::now();

// ... 计算逻辑 ...

auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end - start).count();

LOG(INFO) << "Process time: " << duration << "us";

return absl::OkStatus();
}

十四、总结

要点 说明
GetContract 定义输入输出端口类型
Open 初始化资源、读取配置
Process 处理数据、输出结果
Close 释放资源
alwayslink BUILD 文件必须设置
REGISTER_CALCULATOR 必须注册
错误处理 使用 MP_RETURN_IF_ERROR、RET_CHECK
性能优化 零拷贝、内存预分配、并行处理

下篇预告

MediaPipe 系列 08:Input/Output Stream 与 Port

深入讲解 Stream 连接机制、Port 命名规范、多流同步。


参考资料

  1. Google AI Edge. MediaPipe Calculator Framework
  2. Google AI Edge. Calculator Base
  3. Abseil. Status and Error Handling

系列进度: 7/55
更新时间: 2026-03-12


MediaPipe 系列 07:Calculator 开发规范——从接口到实现完整指南
https://dapalm.com/2026/03/12/MediaPipe系列07-Calculator开发规范:从接口到实现/
作者
Mars
发布于
2026年3月12日
许可协议