MediaPipe 系列 08:Input/Output Stream 与 Port 完整指南

前言:Port 机制的核心地位

8.1 Port 在数据流中的作用

Port 是 Calculator 与外部数据流的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
┌─────────────────────────────────────────────────────────────────────────┐
Port 在数据流中的作用 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 问题:Calculator 如何与外部数据流交互? │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 挑战: │ │
│ │ │ │
│ │ • 如何接收输入数据? │ │
│ │ • 如何发送输出数据? │ │
│ │ • 如何处理多个输入输出? │ │
│ │ • 如何区分不同类型的数据? │ │
│ │ • 如何处理可选输入? │ │
│ │ • 如何动态调整端口? │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 解决方案:Port 机制 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Port = 端口 = Calculator 的输入输出接口 │ │
│ │ │ │
│ │ • Input Port:接收 Stream 数据 │ │
│ │ • Output Port:发送 Stream 数据 │ │
│ │ • Side Input Port:接收 Side Packet │ │
│ │ • Side Output Port:发送 Side Packet │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 数据流动示意图: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Stream A ──────┐ │ │
│ │ │ │ │
│ │ Stream B ──────┼──▶ [Calculator] ──▶ Stream D │ │
│ │ │ │ │ │
│ │ Stream C ──────┘ └──▶ Stream E │ │
│ │ │ │
│ │ 输入 Port: A, B, C │ │
│ │ 输出 Port: D, E │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

8.2 Port 的核心特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
┌─────────────────────────────────────────────────────────────┐
│ Port 核心特性 │
├─────────────────────────────────────────────────────────────┤
│ │
1. 类型安全 │
│ • 每个端口有明确的类型 │
│ • 编译时检查类型匹配 │
│ • 避免运行时类型错误 │
│ │
2. 灵活命名 │
│ • Tag 方式:可读性好(推荐) │
│ • Index 方式:简洁快速 │
│ • 混合方式:灵活组合 │
│ │
3. 可选性 │
│ • 支持可选输入输出 │
│ • 自动处理缺失数据 │
│ • 提供默认值机制 │
│ │
4. 动态性 │
│ • 支持动态端口数量 │
│ • 运行时决定端口类型 │
│ • 适应不同输入场景 │
│ │
└─────────────────────────────────────────────────────────────┘

九、Port 类型详解

9.1 Port 类型分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
┌─────────────────────────────────────────────────────────────┐
│ Port 类型分类 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Input Port(输入端口) │ │
│ │ │ │
│ │ • 接收 Stream 数据 │ │
│ │ • 支持时间戳同步 │ │
│ │ • 可以有多个输入 │ │
│ │ │ │
│ │ 访问方式: │ │
│ │ cc->Inputs().Tag("NAME") │ │
│ │ cc->Inputs().Index(0) │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Output Port(输出端口) │ │
│ │ │ │
│ │ • 发送 Stream 数据 │ │
│ │ • 支持多路输出 │ │
│ │ • 可以有多个输出 │ │
│ │ │ │
│ │ 访问方式: │ │
│ │ cc->Outputs().Tag("NAME") │ │
│ │ cc->Outputs().Index(0) │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Side Input Port(静态输入端口) │ │
│ │ │ │
│ │ • 接收 Side Packet │ │
│ │ • 无时间戳 │ │
│ │ • Graph 生命周期内不变 │ │
│ │ │ │
│ │ 访问方式: │ │
│ │ cc->InputSidePackets().Tag("NAME") │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Side Output Port(静态输出端口) │ │
│ │ │ │
│ │ • 发送 Side Packet │ │
│ │ • 无时间戳 │ │
│ │ • 用于传递状态 │ │
│ │ │ │
│ │ 访问方式: │ │
│ │ cc->OutputSidePackets().Tag("NAME") │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

9.2 Port 类型系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// ========== 端口类型定义 ==========
// mediapipe/framework/calculator_contract.h

class PortBase {
public:
// 设置端口类型
template <typename T>
void Set();

// 设置任意类型
void SetAny();

// 设置与另一个端口相同类型
void SetSameAs(const PortBase* other);

// 设置为可选
void Optional();

// 获取端口类型
const TypeInfo& GetType() const;

// 检查是否为空
bool IsEmpty() const;

// 检查是否为可选
bool IsOptional() const;
};

// ========== 输入端口 ==========
class InputPort : public PortBase {
public:
// 获取输入数据
template <typename T>
const T& Get() const;

// 获取 Packet
const Packet& Value() const;

// 检查是否为空
bool IsEmpty() const;

// 获取时间戳
Timestamp Timestamp() const;
};

// ========== 输出端口 ==========
class OutputPort : public PortBase {
public:
// 添加 Packet
void AddPacket(const Packet& packet);

// 添加数据(简化接口)
template <typename T>
void Add(const T& value, Timestamp timestamp);

// 设置输出 Packet 数量
void SetHeader(const Packet& header);
};

十、Tag 方式详解(推荐)

10.1 Tag 方式的优势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌─────────────────────────────────────────────────────────────┐
Tag 方式的优势 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 可读性 │
│ • 端口名称清晰明了 │
│ • 代码更易理解 │
│ • 维护更方便 │
│ │
2. 灵活性 │
│ • 可以添加/删除端口而不影响其他端口 │
│ • 端口顺序无关 │
│ • 更容易重构 │
│ │
3. 错误检测 │
│ • 编译时检查 Tag 是否存在 │
│ • 运行时检查类型匹配 │
│ • 更早发现问题 │
│ │
│ 示例: │
│ ┌─────────────────────────────────────────────┐ │
│ │ cc->Inputs().Tag("IMAGE").Set<ImageFrame>(); │ │
│ │ cc->Inputs().Tag("ROI").Set<Rect>(); │ │
│ │ │ │
│ │ 清晰明了:IMAGE 对应图像,ROI 对应区域 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

10.2 Tag 命名规范

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// ========== Tag 命名规范 ==========

// 1. 使用大写字母和下划线
cc->Inputs().Tag("IMAGE").Set<ImageFrame>(); // ✅
cc->Inputs().Tag("DETECTION_RESULT").Set<Detection>(); // ✅

// 2. 避免使用缩写(除非是通用缩写)
cc->Inputs().Tag("IMAGE").Set<ImageFrame>(); // ✅
cc->Inputs().Tag("IMG").Set<ImageFrame>(); // ❌ 不清晰

// 3. 使用语义化名称
cc->Inputs().Tag("LEFT_EYE").Set<Landmark>(); // ✅
cc->Inputs().Tag("RIGHT_EYE").Set<Landmark>(); // ✅

// 4. 保持一致性
// 同一类型的端口使用相同的 Tag
cc->Inputs().Tag("IMAGE").Set<ImageFrame>(); // ✅
cc->Outputs().Tag("IMAGE").Set<ImageFrame>(); // ✅ 相同 Tag

// ========== Graph 配置中的 Tag ==========
// pbtxt 中的 Tag 必须与 C++ 代码中的 Tag 匹配

// C++ 定义
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();

// pbtxt 配置
node {
calculator: "MyCalculator"
input_stream: "IMAGE:video_stream" # Tag: IMAGE, Stream: video_stream
output_stream: "OUTPUT:result" # Tag: OUTPUT, Stream: result
}

10.3 Tag 方式完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// ========== Tag 方式完整示例 ==========

// ========== Calculator 头文件 ==========
#ifndef MY_CALCULATOR_H_
#define MY_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/formats/image_frame.h"

namespace mediapipe {

class MyCalculator : public CalculatorBase {
public:
// ========== GetContract:定义端口 ==========
static absl::Status GetContract(CalculatorContract* cc) {
// 输入端口(使用 Tag)
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();
cc->Inputs().Tag("ROI").Set<Rect>();
cc->Inputs().Tag("CONFIG").Set<Config>();

// 可选输入
cc->Inputs().Tag("DEBUG").Set<DebugInfo>().Optional();

// 输出端口(使用 Tag)
cc->Outputs().Tag("DETECTIONS").Set<std::vector<Detection>>();
cc->Outputs().Tag("LANDMARKS").Set<std::vector<Landmark>>();
cc->Outputs().Tag("SCORES").Set<std::vector<float>>();

// 可选输出
cc->Outputs().Tag("DEBUG_IMAGE").Set<ImageFrame>().Optional();

// Side Packet
cc->InputSidePackets().Tag("MODEL_PATH").Set<std::string>();

// Options
cc->Options<MyCalculatorOptions>();

return absl::OkStatus();
}

// ========== Open:初始化 ==========
absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<MyCalculatorOptions>();
threshold_ = options.threshold();

// 获取 Side Packet
model_path_ = cc->InputSidePackets().Tag("MODEL_PATH").Get<std::string>();

// 加载模型
MP_RETURN_IF_ERROR(LoadModel(model_path_));

return absl::OkStatus();
}

// ========== Process:处理数据 ==========
absl::Status Process(CalculatorContext* cc) override {
// 检查必需输入
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
return absl::OkStatus();
}

// 获取输入数据
const ImageFrame& image = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();

// 检查可选输入
Rect roi;
if (!cc->Inputs().Tag("ROI").IsEmpty()) {
roi = cc->Inputs().Tag("ROI").Get<Rect>();
} else {
roi = Rect(0, 0, image.Width(), image.Height()); // 默认值
}

// 获取配置
Config config;
if (!cc->Inputs().Tag("CONFIG").IsEmpty()) {
config = cc->Inputs().Tag("CONFIG").Get<Config>();
}

// 检查调试信息
if (!cc->Inputs().Tag("DEBUG").IsEmpty()) {
const auto& debug = cc->Inputs().Tag("DEBUG").Get<DebugInfo>();
// 使用调试信息
}

// 处理数据
auto [detections, landmarks, scores] = ProcessImage(image, roi, config);

// 输出结果
cc->Outputs().Tag("DETECTIONS").AddPacket(
MakePacket<std::vector<Detection>>(detections).At(cc->InputTimestamp()));

cc->Outputs().Tag("LANDMARKS").AddPacket(
MakePacket<std::vector<Landmark>>(landmarks).At(cc->InputTimestamp()));

cc->Outputs().Tag("SCORES").AddPacket(
MakePacket<std::vector<float>>(scores).At(cc->InputTimestamp()));

// 可选输出
if (cc->Outputs().HasTag("DEBUG_IMAGE") && enable_debug_) {
ImageFrame debug_image = CreateDebugImage(image, detections);
cc->Outputs().Tag("DEBUG_IMAGE").AddPacket(
MakePacket<ImageFrame>(debug_image).At(cc->InputTimestamp()));
}

return absl::OkStatus();
}

private:
float threshold_;
std::string model_path_;
bool enable_debug_ = false;

absl::Status LoadModel(const std::string& path);
std::tuple<std::vector<Detection>, std::vector<Landmark>, std::vector<float>>
ProcessImage(const ImageFrame& image, const Rect& roi, const Config& config);
};

REGISTER_CALCULATOR(MyCalculator);

} // namespace mediapipe

#endif // MY_CALCULATOR_H_

Graph 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# my_calculator_graph.pbtxt

# 输入输出定义
input_stream: "VIDEO:video_stream"
input_stream: "ROI:roi_stream"
input_stream: "CONFIG:config_stream"
output_stream: "DETECTIONS:detections"
output_stream: "LANDMARKS:landmarks"

# Side Packet
input_side_packet: "MODEL_PATH:model_path"

# Calculator 节点
node {
calculator: "MyCalculator"
input_stream: "IMAGE:video_stream"
input_stream: "ROI:roi_stream"
input_stream: "CONFIG:config_stream"
input_side_packet: "MODEL_PATH:model_path"
output_stream: "DETECTIONS:detections"
output_stream: "LANDMARKS:landmarks"
output_stream: "SCORES:scores"

options {
[mediapipe.MyCalculatorOptions.ext] {
threshold: 0.5
}
}
}

十一、Index 方式详解

11.1 Index 方式的特点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
┌─────────────────────────────────────────────────────────────┐
│ Index 方式的特点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 优点: │
│ • 简洁快速 │
│ • 无需定义 Tag
│ • 适用于端口数量固定且少的场景 │
│ │
│ 缺点: │
│ • 可读性差 │
│ • 顺序敏感(添加/删除端口影响索引) │
│ • 容易出错(索引混淆) │
│ │
│ 适用场景: │
│ • 端口数量少(1-2 个) │
│ • 端口顺序固定 │
│ • 快速原型开发 │
│ │
└─────────────────────────────────────────────────────────────┘

11.2 Index 方式示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// ========== Index 方式示例 ==========

static absl::Status GetContract(CalculatorContract* cc) {
// 输入端口(使用索引)
cc->Inputs().Index(0).Set<ImageFrame>(); // 第一个输入
cc->Inputs().Index(1).Set<Rect>(); // 第二个输入
cc->Inputs().Index(2).Set<Config>(); // 第三个输入

// 输出端口(使用索引)
cc->Outputs().Index(0).Set<std::vector<Detection>>(); // 第一个输出
cc->Outputs().Index(1).Set<std::vector<Landmark>>(); // 第二个输出

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 检查输入
if (cc->Inputs().Index(0).IsEmpty()) {
return absl::OkStatus();
}

// 获取输入数据
const ImageFrame& image = cc->Inputs().Index(0).Get<ImageFrame>();
const Rect& roi = cc->Inputs().Index(1).Get<Rect>();
const Config& config = cc->Inputs().Index(2).Get<Config>();

// 处理数据
auto [detections, landmarks] = ProcessImage(image, roi, config);

// 输出结果
cc->Outputs().Index(0).AddPacket(
MakePacket<std::vector<Detection>>(detections).At(cc->InputTimestamp()));
cc->Outputs().Index(1).AddPacket(
MakePacket<std::vector<Landmark>>(landmarks).At(cc->InputTimestamp()));

return absl::OkStatus();
}

Graph 配置:

1
2
3
4
5
6
7
8
9
# Index 方式 Graph 配置
node {
calculator: "MyCalculator"
input_stream: "video_stream" # Index 0
input_stream: "roi_stream" # Index 1
input_stream: "config_stream" # Index 2
output_stream: "detections" # Index 0
output_stream: "landmarks" # Index 1
}

十二、混合方式

12.1 Tag + Index 混合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ========== 混合方式示例 ==========

static absl::Status GetContract(CalculatorContract* cc) {
// 主要输入使用 Tag(推荐)
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();

// 其他输入使用 Index
cc->Inputs().Index(1).Set<Rect>();

// 输出使用 Tag
cc->Outputs().Tag("OUTPUT").Set<Result>();

return absl::OkStatus();
}

12.2 可选输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// ========== 可选输入示例 ==========

static absl::Status GetContract(CalculatorContract* cc) {
// 必需输入
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();

// 可选输入
cc->Inputs().Tag("ROI").Set<Rect>().Optional();
cc->Inputs().Tag("CONFIG").Set<Config>().Optional();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 获取必需输入
const ImageFrame& image = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();

// 处理可选输入
Rect roi;
if (!cc->Inputs().Tag("ROI").IsEmpty()) {
roi = cc->Inputs().Tag("ROI").Get<Rect>();
} else {
// 使用默认值
roi = Rect(0, 0, image.Width(), image.Height());
}

Config config;
if (!cc->Inputs().Tag("CONFIG").IsEmpty()) {
config = cc->Inputs().Tag("CONFIG").Get<Config>();
} else {
// 使用默认配置
config = GetDefaultConfig();
}

return absl::OkStatus();
}

十三、高级特性

13.1 动态端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// ========== 动态端口示例 ==========

static absl::Status GetContract(CalculatorContract* cc) {
// 根据配置动态添加端口
const auto& options = cc->Options<MyCalculatorOptions>();

// 动态输入端口
for (int i = 0; i < options.num_inputs(); ++i) {
cc->Inputs().Index(i).Set<ImageFrame>();
}

// 动态输出端口
for (int i = 0; i < options.num_outputs(); ++i) {
cc->Outputs().Index(i).Set<Result>();
}

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 遍历所有输入
for (int i = 0; i < cc->Inputs().NumEntries(); ++i) {
if (!cc->Inputs().Index(i).IsEmpty()) {
const ImageFrame& image = cc->Inputs().Index(i).Get<ImageFrame>();
// 处理每个输入
}
}

return absl::OkStatus();
}

13.2 类型推断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ========== 类型推断示例 ==========

static absl::Status GetContract(CalculatorContract* cc) {
// 接受任意类型
cc->Inputs().Tag("INPUT").SetAny();

// 输出与输入相同类型
cc->Outputs().Tag("OUTPUT").SetSameAs(&cc->Inputs().Tag("INPUT"));

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 使用类型推断处理数据
const Packet& input = cc->Inputs().Tag("INPUT").Value();

// 验证类型
if (input.ValidateAsType<ImageFrame>().ok()) {
const ImageFrame& image = input.Get<ImageFrame>();
// 处理图像
} else if (input.ValidateAsType<AudioFrame>().ok()) {
const AudioFrame& audio = input.Get<AudioFrame>();
// 处理音频
}

return absl::OkStatus();
}

13.3 输出延迟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// ========== 输出延迟示例 ==========

class BufferCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("INPUT").Set<Data>();
cc->Outputs().Tag("OUTPUT").Set<std::vector<Data>>();
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 获取输入
const Data& data = cc->Inputs().Tag("INPUT").Get<Data>();

// 缓存数据
buffer_.push_back(data);

// 达到缓冲大小后输出
if (buffer_.size() >= buffer_size_) {
cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<std::vector<Data>>(buffer_).At(cc->InputTimestamp()));
buffer_.clear();
}

return absl::OkStatus();
}

private:
std::vector<Data> buffer_;
int buffer_size_ = 10;
};

十四、实战:多输入融合 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// fusion_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_FUSION_FUSION_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_FUSION_FUSION_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/formats/image_frame.h"

namespace mediapipe {

// ========== 融合 Calculator ==========
class MultiModalFusionCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// 多模态输入
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();
cc->Inputs().Tag("AUDIO").Set<AudioFrame>();
cc->Inputs().Tag("SENSOR").Set<SensorData>();

// 可选输入
cc->Inputs().Tag("CONTEXT").Set<Context>().Optional();

// 输出
cc->Outputs().Tag("RESULT").Set<FusionResult>();
cc->Outputs().Tag("CONFIDENCE").Set<float>();

// Side Packet
cc->InputSidePackets().Tag("CONFIG").Set<FusionConfig>();

// Options
cc->Options<FusionOptions>();

// 设置同步处理器
cc->SetInputStreamHandler("SyncSetInputStreamHandler");

return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
// 获取配置
const auto& options = cc->Options<FusionOptions>();
fusion_weights_ = {options.image_weight(),
options.audio_weight(),
options.sensor_weight()};

// 获取 Side Packet
const auto& config = cc->InputSidePackets().Tag("CONFIG").Get<FusionConfig>();
threshold_ = config.threshold();

LOG(INFO) << "MultiModalFusionCalculator initialized";

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 检查所有必需输入
if (cc->Inputs().Tag("IMAGE").IsEmpty() ||
cc->Inputs().Tag("AUDIO").IsEmpty() ||
cc->Inputs().Tag("SENSOR").IsEmpty()) {
return absl::OkStatus(); // 等待所有输入就绪
}

// 获取输入数据
const ImageFrame& image = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();
const AudioFrame& audio = cc->Inputs().Tag("AUDIO").Get<AudioFrame>();
const SensorData& sensor = cc->Inputs().Tag("SENSOR").Get<SensorData>();

// 处理可选输入
Context context;
if (!cc->Inputs().Tag("CONTEXT").IsEmpty()) {
context = cc->Inputs().Tag("CONTEXT").Get<Context>();
}

// 特征提取
auto image_features = ExtractImageFeatures(image);
auto audio_features = ExtractAudioFeatures(audio);
auto sensor_features = ExtractSensorFeatures(sensor);

// 多模态融合
FusionResult result = Fuse(
image_features, audio_features, sensor_features,
fusion_weights_, context);

// 计算置信度
float confidence = ComputeConfidence(result);

// 输出结果
cc->Outputs().Tag("RESULT").AddPacket(
MakePacket<FusionResult>(result).At(cc->InputTimestamp()));

cc->Outputs().Tag("CONFIDENCE").AddPacket(
MakePacket<float>(confidence).At(cc->InputTimestamp()));

LOG(INFO) << "Fusion result: confidence=" << confidence;

return absl::OkStatus();
}

private:
std::vector<float> fusion_weights_;
float threshold_;

std::vector<float> ExtractImageFeatures(const ImageFrame& image);
std::vector<float> ExtractAudioFeatures(const AudioFrame& audio);
std::vector<float> ExtractSensorFeatures(const SensorData& sensor);
FusionResult Fuse(const std::vector<float>& image_features,
const std::vector<float>& audio_features,
const std::vector<float>& sensor_features,
const std::vector<float>& weights,
const Context& context);
float ComputeConfidence(const FusionResult& result);
};

REGISTER_CALCULATOR(MultiModalFusionCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_FUSION_FUSION_CALCULATOR_H_

Graph 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# fusion_graph.pbtxt

input_stream: "IMAGE:image_stream"
input_stream: "AUDIO:audio_stream"
input_stream: "SENSOR:sensor_stream"
input_stream: "CONTEXT:context_stream"
output_stream: "RESULT:result"
output_stream: "CONFIDENCE:confidence"

input_side_packet: "CONFIG:fusion_config"

node {
calculator: "MultiModalFusionCalculator"
input_stream: "IMAGE:image_stream"
input_stream: "AUDIO:audio_stream"
input_stream: "SENSOR:sensor_stream"
input_stream: "CONTEXT:context_stream"
input_side_packet: "CONFIG:fusion_config"
output_stream: "RESULT:result"
output_stream: "CONFIDENCE:confidence"

input_stream_handler {
input_stream_handler: "SyncSetInputStreamHandler"
}

options {
[mediapipe.FusionOptions.ext] {
image_weight: 0.5
audio_weight: 0.3
sensor_weight: 0.2
}
}
}

十五、调试技巧

15.1 检查端口状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
absl::Status Process(CalculatorContext* cc) override {
// 打印所有输入状态
LOG(INFO) << "Input port status:";
for (int i = 0; i < cc->Inputs().NumEntries(); ++i) {
LOG(INFO) << " Index " << i << ": "
<< (cc->Inputs().Index(i).IsEmpty() ? "empty" : "has data");
}

// 检查特定端口
if (cc->Inputs().HasTag("IMAGE")) {
LOG(INFO) << "IMAGE port exists: "
<< (cc->Inputs().Tag("IMAGE").IsEmpty() ? "empty" : "has data");
}

return absl::OkStatus();
}

15.2 类型验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
absl::Status Process(CalculatorContext* cc) override {
const Packet& packet = cc->Inputs().Tag("INPUT").Value();

// 打印类型信息
LOG(INFO) << "Packet type: " << packet.DebugString();

// 验证类型
if (packet.ValidateAsType<ImageFrame>().ok()) {
LOG(INFO) << "Packet is ImageFrame";
} else if (packet.ValidateAsType<AudioFrame>().ok()) {
LOG(INFO) << "Packet is AudioFrame";
} else {
LOG(WARNING) << "Unknown packet type";
}

return absl::OkStatus();
}

十六、总结

方式 优点 缺点 适用场景
Tag 可读性好、易维护 需要定义 Tag 推荐使用
Index 简洁、无需 Tag 可读性差、顺序敏感 端口少且固定
混合 灵活 可能混淆 特殊需求

推荐: 优先使用 Tag 方式,保持代码清晰易维护。


下篇预告

MediaPipe 系列 09:Calculator Options——参数化配置

深入讲解 Calculator Options 定义、使用、Proto 文件编写。


参考资料

  1. Google AI Edge. MediaPipe Calculator Contract
  2. Google AI Edge. Input/Output Streams

系列进度: 8/55
更新时间: 2026-03-12


MediaPipe 系列 08:Input/Output Stream 与 Port 完整指南
https://dapalm.com/2026/03/12/MediaPipe系列08-Input-Output-Stream与Port/
作者
Mars
发布于
2026年3月12日
许可协议