MediaPipe 系列 24:性能优化 Calculator——零拷贝设计完整指南

前言:为什么需要性能优化?

24.1 性能优化的重要性

MediaPipe Pipeline 需要高效处理实时视频流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
┌─────────────────────────────────────────────────────────────────────────┐
│ 性能优化的重要性 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 问题:如何实现高帧率、低延迟的视频处理? │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ IMS DMS 实时处理需求: │ │
│ │ │ │
│ │ • 输入:30 FPS 视频流 │ │
│ │ • 处理:人脸检测 + 关键点 + 疲劳分析 │ │
│ │ • 输出:实时告警 + CAN 发送 │ │
│ │ │ │
│ │ 挑战: │ │
│ │ • 每帧需要处理多个步骤 │ │
│ │ • 图像数据量大(1080p ~ 6MB) │ │
│ │ • 多个 Calculator 需要传递数据 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 解决方案:性能优化 Calculator │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. 零拷贝设计:避免数据复制 │ │
│ │ 2. 内存池:复用内存 │ │
│ │ 3. 原地处理:减少中间拷贝 │ │
│ │ 4. 异步处理:提高吞吐量 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

24.2 性能指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
┌─────────────────────────────────────────────────────────────┐
│ 性能指标 │
├─────────────────────────────────────────────────────────────┤
│ │
1. 延迟 │
│ ┌─────────────────────────────────────────────┐ │
│ │ • 单帧处理延迟 │ │
│ │ • 端到端延迟 │ │
│ │ • 目标:< 33ms(30 FPS) │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
2. 吞吐量 │
│ ┌─────────────────────────────────────────────┐ │
│ │ • FPS(每秒帧数) │ │
│ │ • 每秒处理帧数 │ │
│ │ • 目标:≥ 30 FPS │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
3. 内存使用 │
│ ┌─────────────────────────────────────────────┐ │
│ │ • 峰值内存 │ │
│ │ • 平均内存 │ │
│ │ • 目标:< 500MB │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
4. CPU 占用 │
│ ┌─────────────────────────────────────────────┐ │
│ │ • CPU 使用率 │ │
│ │ • 单核占用 │ │
│ │ • 目标:< 50% │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

二十五、零拷贝原理

25.1 传统方式 vs 零拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
┌─────────────────────────────────────────────────────────────┐
│ 传统方式(多次拷贝) │
├─────────────────────────────────────────────────────────────┤
│ │
│ Calculator A Calculator B │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ │ │ │ │
│ │ Data (10MB) │ │ Data (10MB) │ │
│ │ │ │ │ │
│ │ ──copy──▶ │ │ │ │
│ │ (拷贝 10MB) │ │ │ │
│ │ │ │ │ │
│ └──────────────────────┘ └──────────────────────┘ │
│ │
│ 如果有 10 个 Calculator: │
│ 内存: 10 × 10MB = 100MB │
│ CPU: 10 × 拷贝开销 │
│ 延迟: 拷贝延迟 × 10 │
│ │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ 零拷贝方式(共享数据) │
├─────────────────────────────────────────────────────────────┤
│ │
│ Calculator A Calculator B │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ │ │ │ │
│ │ Ptr ──────────────┼──┼─▶ Data (10MB) │ │
│ │ (shared_ptr) │ │ │ │
│ │ │ │ │ │
│ │ 引用计数 +1 │ │ 引用计数 +1 │ │
│ │ │ │ │ │
│ └──────────────────────┘ └──────────────────────┘ │
│ │
│ 所有 Calculator 共享同一份数据: │
│ 内存: 1 × 10MB = 10MB │
│ CPU: 0(无拷贝) │
│ 延迟: 0(无拷贝) │
│ │
└─────────────────────────────────────────────────────────────┘

25.2 Packet 引用计数机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// ========== Packet 内部实现(简化)==========
template <typename T>
class Packet {
public:
// ========== 构造函数 ==========
Packet() : timestamp_(Timestamp::Unset()) {}

Packet(T value, Timestamp timestamp)
: timestamp_(timestamp) {
// 使用 shared_ptr,引用计数从 1 开始
payload_ = std::make_shared<T>(std::move(value));
}

// ========== 复制 Packet ==========
Packet(const Packet& other)
: timestamp_(other.timestamp_),
payload_(other.payload_) {
// payload_ 共享,引用计数 +1
// 无数据拷贝
}

// ========== 移动 Packet ==========
Packet(Packet&& other) noexcept
: timestamp_(other.timestamp_),
payload_(std::move(other.payload_)) {
// 移动 payload_ 指针,引用计数不变
}

// ========== 获取数据 ==========
const T& Get() const {
return *payload_;
}

private:
Timestamp timestamp_;
std::shared_ptr<T> payload_; // 引用计数
};

// ========== 使用示例 ==========
Packet<std::vector<float>> packet1 = MakePacket<std::vector<float>>(data);
// payload_.use_count() = 1

Packet<std::vector<float>> packet2 = packet1;
// payload_.use_count() = 2(共享数据)

Packet<std::vector<float>> packet3 = std::move(packet2);
// payload_.use_count() = 2(packet3 持有,packet1 释放)

// 当所有引用消失时(use_count = 0),数据自动销毁

二十六、零拷贝实现

26.1 直接传递 Packet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ========== 零拷贝实现 1:直接传递 Packet ==========
class PassThroughCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0));
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 直接传递 Packet,零拷贝
cc->Outputs().Index(0).AddPacket(cc->Inputs().Index(0).Value());

return absl::OkStatus();
}
};

REGISTER_CALCULATOR(PassThroughCalculator);

26.2 使用 Lambda 持有数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// ========== 零拷贝实现 2:Lambda 持有数据 ==========
#include "mediapipe/framework/formats/image_frame.h"
#include "mediapipe/framework/formats/image_frame_opencv.h"
#include <opencv2/opencv.hpp>

class ImageProcessorCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();
cc->Outputs().Tag("IMAGE").Set<ImageFrame>();
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
return absl::OkStatus();
}

// ========== 1. 获取输入图像 ==========
const ImageFrame& input = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();
cv::Mat input_mat = formats::MatView(&input);

// ========== 2. 处理(原地处理)==========
cv::Mat output_mat;
cv::resize(input_mat, output_mat, cv::Size(320, 240));

// ========== 3. 零拷贝输出 ==========
// Lambda 持有 output_mat 的所有权
auto output_frame = absl::make_unique<ImageFrame>(
input.Format(),
output_mat.cols,
output_mat.rows,
output_mat.step,
output_mat.data, // 数据指针
[output_mat](uint8_t*) mutable {
output_mat.release(); // Lambda 持有,避免拷贝
});

// ========== 4. 输出(所有权转移)==========
cc->Outputs().Tag("IMAGE").Add(output_frame.release(), cc->InputTimestamp());

return absl::OkStatus();
}
};

REGISTER_CALCULATOR(ImageProcessorCalculator);

26.3 共享指针传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ========== 零拷贝实现 3:共享指针传递 ==========
class SharedDataCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("DATA").Set<std::vector<float>>();
cc->Outputs().Tag("SHARED").Set<std::shared_ptr<std::vector<float>>>();
cc->Outputs().Tag("COPY").Set<std::vector<float>>();
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// ========== 1. 获取共享指针 ==========
auto shared_data = cc->Inputs().Tag("DATA").Get<std::shared_ptr<std::vector<float>>>();

// ========== 2. 零拷贝输出(共享指针)==========
cc->Outputs().Tag("SHARED").AddPacket(
MakePacket<std::shared_ptr<std::vector<float>>>(shared_data)
.At(cc->InputTimestamp()));

// ========== 3. 拷贝输出(数据拷贝)==========
std::vector<float> copy_data(*shared_data); // 拷贝
cc->Outputs().Tag("COPY").AddPacket(
MakePacket<std::vector<float>>(copy_data).At(cc->InputTimestamp()));

return absl::OkStatus();
}
};

REGISTER_CALCULATOR(SharedDataCalculator);

二十七、内存池优化

27.1 基础内存池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// ========== 基础内存池 ==========
template <typename T>
class MemoryPool {
public:
explicit MemoryPool(size_t pool_size) {
for (size_t i = 0; i < pool_size; ++i) {
pool_.push(std::make_unique<T>());
}
}

// ========== 获取对象 ==========
std::unique_ptr<T> Acquire() {
if (pool_.empty()) {
return std::make_unique<T>();
}
auto obj = std::move(pool_.front());
pool_.pop();
return obj;
}

// ========== 归还对象 ==========
void Release(std::unique_ptr<T> obj) {
pool_.push(std::move(obj));
}

private:
std::queue<std::unique_ptr<T>> pool_;
};

// ========== 使用内存池 ==========
class ImageBufferPoolCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("INPUT").Set<cv::Mat>();
cc->Outputs().Tag("OUTPUT").Set<cv::Mat>();

cc->Options<ImageBufferPoolOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<ImageBufferPoolOptions>();
pool_size_ = options.pool_size();
buffer_size_ = options.buffer_size();

// 初始化内存池
for (int i = 0; i < pool_size_; ++i) {
pool_.push(std::make_unique<cv::Mat>(buffer_size_, CV_8UC3));
}

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
const cv::Mat& input = cc->Inputs().Tag("INPUT").Get<cv::Mat>();

// ========== 从池中获取缓冲区 ==========
std::unique_ptr<cv::Mat> buffer;
if (pool_.empty()) {
buffer = std::make_unique<cv::Mat>(buffer_size_, CV_8UC3);
} else {
buffer = std::move(pool_.front());
pool_.pop();
}

// ========== 处理 ==========
input.copyTo(*buffer);
// ...

// ========== 归还缓冲区 ==========
pool_.push(std::move(buffer));

return absl::OkStatus();
}

private:
int pool_size_ = 10;
int buffer_size_ = 1920 * 1080 * 3;
std::queue<std::unique_ptr<cv::Mat>> pool_;
};

二十八、避免不必要的拷贝

28.1 错误示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// ========== 错误示例 1:重复拷贝 ==========
class BadCalculator : public CalculatorBase {
public:
absl::Status Process(CalculatorContext* cc) override {
// 错误:每次都 clone
const cv::Mat& input = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();
cv::Mat output1 = input.clone();
cv::Mat output2 = output1.clone();
cv::Mat output3 = output2.clone();

cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<cv::Mat>(output3).At(cc->InputTimestamp()));

return absl::OkStatus();
}
};

// 内存:3 × 6MB = 18MB
// CPU:3 × 拷贝开销

// ========== 错误示例 2:频繁分配 ==========
class BadCalculator2 : public CalculatorBase {
public:
absl::Status Process(CalculatorContext* cc) override {
// 错误:每次循环都分配
for (int i = 0; i < 100; ++i) {
std::vector<float> temp(1000); // 分配 100 × 1000 = 100KB
// ...
}

return absl::OkStatus();
}
};

// CPU:100 次分配/释放

// ========== 错误示例 3:拷贝 Packet 数据 ==========
class BadCalculator3 : public CalculatorBase {
public:
absl::Status Process(CalculatorContext* cc) override {
// 错误:直接拷贝数据
const Packet<std::vector<float>>& input = cc->Inputs().Index(0).Get<Packet<std::vector<float>>>();
std::vector<float> data = input.Get(); // 拷贝数据

// 处理...

cc->Outputs().Index(0).AddPacket(
MakePacket<std::vector<float>>(data).At(cc->InputTimestamp()));

return absl::OkStatus();
}
};

28.2 正确示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// ========== 正确示例 1:原地处理 ==========
class GoodCalculator : public CalculatorBase {
public:
absl::Status Process(CalculatorContext* cc) override {
// 正确:只 clone 一次
const cv::Mat& input = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();
cv::Mat output = input.clone(); // 只一次拷贝

// 原地处理
cv::cvtColor(output, output, cv::COLOR_BGR2GRAY);
cv::blur(output, output, cv::Size(3, 3));

cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<cv::Mat>(output).At(cc->InputTimestamp()));

return absl::OkStatus();
}
};

// 内存:1 × 6MB = 6MB
// CPU:1 × 拷贝开销

// ========== 正确示例 2:复用对象 ==========
class GoodCalculator2 : public CalculatorBase {
public:
absl::Status Open(CalculatorContext* cc) override {
// 预分配
temp_.reserve(1000);
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 正确:复用对象,只 clear
temp_.clear(); // 不重新分配
for (int i = 0; i < 100; ++i) {
temp_.push_back(i);
}

return absl::OkStatus();
}

private:
std::vector<float> temp_;
};

// CPU:0 次分配

// ========== 正确示例 3:零拷贝 Packet ==========
class GoodCalculator3 : public CalculatorBase {
public:
absl::Status Process(CalculatorContext* cc) override {
// 正确:直接传递 Packet
cc->Outputs().Index(0).AddPacket(cc->Inputs().Index(0).Value());

return absl::OkStatus();
}
};

// 内存:0 额外拷贝
// CPU:0

二十九、性能对比

29.1 拷贝 vs 零拷贝

方式 内存分配 CPU 占用 延迟 适用场景
拷贝 N × 数据大小 简单场景
零拷贝 1 × 数据大小 实时处理
共享指针 1 × 数据大小 多输出共享

29.2 实测数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌─────────────────────────────────────────────────────────────┐
│ 实测性能对比 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 测试场景:处理 1080p 图像,10 个 Calculator │
│ │
│ 方式 1:传统拷贝 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 内存: ~300MB │ │
│ │ 延迟: 50ms │ │
│ │ CPU: 高 │ │
│ │ FPS: 20 FPS │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 方式 2:零拷贝 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 内存: ~30MB │ │
│ │ 延迟: 15ms │ │
│ │ CPU: 低 │ │
│ │ FPS: 60 FPS │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 提升效果: │
│ • 内存: 10x 减少 │
│ • 延迟: 3x 降低 │
│ • FPS: 3x 提升 │
│ │
└─────────────────────────────────────────────────────────────┘

三十、IMS 实战优化

30.1 图像处理流水线优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// ========== 优化前:每步拷贝 ==========
class BadImagePipeline : public CalculatorBase {
public:
absl::Status Process(CalculatorContext* cc) override {
// 步骤 1
cv::Mat step1 = input.clone();
cv::cvtColor(step1, step1, cv::COLOR_BGR2GRAY);

// 步骤 2
cv::Mat step2 = step1.clone();
cv::blur(step2, step2, cv::Size(3, 3));

// 步骤 3
cv::Mat step3 = step2.clone();
cv::threshold(step3, step3, 100, 255, cv::THRESH_BINARY);

// 步骤 4
cv::Mat step4 = step3.clone();
cv::findContours(step4, contours, cv::RETR_EXTERNAL, cv::CHAIN_APPROX_SIMPLE);

cc->Outputs().Index(0).AddPacket(
MakePacket<cv::Mat>(step4).At(cc->InputTimestamp()));

return absl::OkStatus();
}
};

// 内存:4 × 6MB = 24MB
// 拷贝:4 次

// ========== 优化后:原地处理 ==========
class GoodImagePipeline : public CalculatorBase {
public:
absl::Status Process(CalculatorContext* cc) override {
// 原地处理,只一次 clone
cv::Mat output = input.clone();

// 步骤 1
cv::cvtColor(output, output, cv::COLOR_BGR2GRAY);

// 步骤 2
cv::blur(output, output, cv::Size(3, 3));

// 步骤 3
cv::threshold(output, output, 100, 255, cv::THRESH_BINARY);

// 步骤 4
cv::findContours(output, contours, cv::RETR_EXTERNAL, cv::CHAIN_APPROX_SIMPLE);

cc->Outputs().Index(0).AddPacket(
MakePacket<cv::Mat>(output).At(cc->InputTimestamp()));

return absl::OkStatus();
}
};

// 内存:1 × 6MB = 6MB
// 拷贝:1 次

30.2 多输出零拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// ========== IMS DMS 多输出优化 ==========
class DMSMultiOutputCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("DETECTIONS").Set<std::vector<Detection>>();
cc->Outputs().Tag("RENDERED").Set<ImageFrame>();
cc->Outputs().Tag("JSON").Set<std::string>();
cc->Outputs().Tag("CAN").Set<CANFrame>();
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
const auto& detections = cc->Inputs().Tag("DETECTIONS").Get<std::vector<Detection>>();
Timestamp ts = cc->InputTimestamp();

// ========== 1. 零拷贝:共享数据 ==========
auto shared_detections = std::make_shared<std::vector<Detection>>(detections);

// ========== 2. 渲染输出 ==========
auto rendered = RenderDetections(input, detections);
cc->Outputs().Tag("RENDERED").AddPacket(
MakePacket<ImageFrame>(rendered).At(ts));

// ========== 3. JSON 输出(数据拷贝)==========
std::string json = DetectionsToJSON(detections);
cc->Outputs().Tag("JSON").AddPacket(
MakePacket<std::string>(json).At(ts));

// ========== 4. CAN 输出(数据拷贝)==========
CANFrame can = DetectionsToCAN(detections);
cc->Outputs().Tag("CAN").AddPacket(
MakePacket<CANFrame>(can).At(ts));

return absl::OkStatus();
}

private:
ImageFrame RenderDetections(const ImageFrame& input,
const std::vector<Detection>& detections) {
cv::Mat image = formats::MatView(&input).clone();

// 渲染...

// 返回新的 ImageFrame(数据拷贝,因为需要修改)
return ImageFrame(...);
}
};

// 优化说明:
// - 渲染输出:需要修改图像,必须拷贝
// - JSON 输出:需要字符串化,必须拷贝
// - CAN 输出:需要格式化,必须拷贝
// - 检测数据:可以共享

三十一、总结

技术 说明 适用场景
Packet 传递 直接传递,不复制 简单传递
Lambda 持有 持有数据所有权 图像处理
内存池 预分配,复用 频繁分配
原地处理 避免中间拷贝 图像处理
共享指针 多输出共享 数据共享

下篇预告

MediaPipe 系列 25:错误处理 Calculator——异常恢复

深入讲解如何处理异常:错误检测、恢复策略、重试机制。


参考资料

  1. Google AI Edge. Performance Optimization
  2. Google AI Edge. Zero-Copy Design
  3. cppreference.com. std::shared_ptr

系列进度: 24/55
更新时间: 2026-03-12


MediaPipe 系列 24:性能优化 Calculator——零拷贝设计完整指南
https://dapalm.com/2026/03/13/MediaPipe系列24-性能优化Calculator:零拷贝设计/
作者
Mars
发布于
2026年3月13日
许可协议