MediaPipe 系列 20:多路输出 Calculator——一个输入多个输出完整指南

前言:为什么需要多路输出?

20.1 多路输出的重要性

复杂感知流水线需要将数据分发到多个下游处理分支:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
┌─────────────────────────────────────────────────────────────────────────┐
│ 多路输出的重要性 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 问题:如何将一个处理结果分发到多个下游? │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ IMS DMS 场景: │ │
│ │ │ │
│ │ 人脸检测结果需要同时: │ │
│ │ • 渲染显示(可视化分支) │ │
│ │ • 关键点检测(处理分支) │ │
│ │ • 日志记录(存储分支) │ │
│ │ • CAN 发送(通信分支) │ │
│ │ │ │
│ │ 一个输入 → 多个输出 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 解决方案:多路输出 Calculator │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌──▶ Output 1(检测框) │ │
│ │ Input ──▶ Calculator ─┼──▶ Output 2(关键点) │ │
│ │ └──▶ Output 3(分数) │ │
│ │ │ │
│ │ 优点: │ │
│ │ • 减少重复计算 │ │
│ │ • 简化 Graph 结构 │ │
│ │ • 提高性能 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

20.2 应用场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
┌─────────────────────────────────────────────────────────────┐
│ 多路输出应用场景 │
├─────────────────────────────────────────────────────────────┤
│ │
1. 检测结果分发 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 输入:图像 │ │
│ │ 输出:检测框 + 关键点 + 分数 │ │
│ │ │ │
│ │ ┌──▶ 检测框 → 渲染器 │ │
│ │ Input ─┼──▶ 关键点 → 姿态分析 │ │
│ │ └──▶ 分数 → 过滤器 │ │
│ └─────────────────────────────────────────────┘ │
│ │
2. 疲劳分析结果 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 输入:眼睛状态 │ │
│ │ 输出:眨眼频率 + PERCLOS + 疲劳等级 │ │
│ │ │ │
│ │ ┌──▶ 眨眼频率 → 统计 │ │
│ │ Input ─┼──▶ PERCLOS → 疲劳判断 │ │
│ │ └──▶ 疲劳等级 → 告警 │ │
│ └─────────────────────────────────────────────┘ │
│ │
3. 结果输出 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 输入:检测结果 │ │
│ │ 输出:可视化 + JSON + CAN + 日志 │ │
│ │ │ │
│ │ ┌──▶ 可视化 → 显示器 │ │
│ │ ┌──▶ JSON → 上位机 │ │
│ │ Input ─┼──▶ CAN → 车辆网络 │ │
│ │ └──▶ 日志 → 存储系统 │ │
│ └─────────────────────────────────────────────┘ │
│ │
4. 多尺度处理 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 输入:图像 │ │
│ │ 输出:原图 + 缩放图1 + 缩放图2 │ │
│ │ │ │
│ │ ┌──▶ 原图 → 精细处理 │ │
│ │ Input ─┼──▶ 缩放图1 → 中等处理 │ │
│ │ └──▶ 缩放图2 → 快速处理 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

二十一、多输出端口定义

21.1 使用 Tag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// multi_output_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_CORE_MULTI_OUTPUT_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_CORE_MULTI_OUTPUT_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// ========== 多路输出 Calculator ==========
class MultiOutputCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// ========== 输入 ==========
cc->Inputs().Tag("IMAGE").Set<ImageFrame>();

// ========== 多个输出(使用 Tag)==========
cc->Outputs().Tag("DETECTIONS").Set<std::vector<Detection>>();
cc->Outputs().Tag("LANDMARKS").Set<std::vector<Landmark>>();
cc->Outputs().Tag("SCORES").Set<std::vector<float>>();
cc->Outputs().Tag("FEATURES").Set<std::vector<float>>();

return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
LOG(INFO) << "MultiOutputCalculator initialized";
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
return absl::OkStatus();
}

const ImageFrame& image = cc->Inputs().Tag("IMAGE").Get<ImageFrame>();

// ========== 1. 处理图像 ==========
auto [detections, landmarks, scores, features] = ProcessImage(image);

// ========== 2. 多路输出 ==========
Timestamp ts = cc->InputTimestamp();

// 输出检测框
cc->Outputs().Tag("DETECTIONS").AddPacket(
MakePacket<std::vector<Detection>>(detections).At(ts));

// 输出关键点
cc->Outputs().Tag("LANDMARKS").AddPacket(
MakePacket<std::vector<Landmark>>(landmarks).At(ts));

// 输出分数
cc->Outputs().Tag("SCORES").AddPacket(
MakePacket<std::vector<float>>(scores).At(ts));

// 输出特征
cc->Outputs().Tag("FEATURES").AddPacket(
MakePacket<std::vector<float>>(features).At(ts));

return absl::OkStatus();
}

private:
std::tuple<std::vector<Detection>, std::vector<Landmark>,
std::vector<float>, std::vector<float>>
ProcessImage(const ImageFrame& image);
};

REGISTER_CALCULATOR(MultiOutputCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_CORE_MULTI_OUTPUT_CALCULATOR_H_

21.2 使用 Index

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 使用 Index 定义多输出
class MultiOutputByIndexCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// 输入
cc->Inputs().Index(0).Set<ImageFrame>();

// 多个输出(使用 Index)
cc->Outputs().Index(0).Set<std::vector<Detection>>();
cc->Outputs().Index(1).Set<std::vector<Landmark>>();
cc->Outputs().Index(2).Set<std::vector<float>>();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
const ImageFrame& image = cc->Inputs().Index(0).Get<ImageFrame>();
Timestamp ts = cc->InputTimestamp();

auto [detections, landmarks, scores] = ProcessImage(image);

cc->Outputs().Index(0).AddPacket(
MakePacket<std::vector<Detection>>(detections).At(ts));
cc->Outputs().Index(1).AddPacket(
MakePacket<std::vector<Landmark>>(landmarks).At(ts));
cc->Outputs().Index(2).AddPacket(
MakePacket<std::vector<float>>(scores).At(ts));

return absl::OkStatus();
}
};

21.3 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 使用 Tag 的配置
node {
calculator: "MultiOutputCalculator"
input_stream: "IMAGE:image"
output_stream: "DETECTIONS:detections"
output_stream: "LANDMARKS:landmarks"
output_stream: "SCORES:scores"
output_stream: "FEATURES:features"
}

# 使用 Index 的配置
node {
calculator: "MultiOutputByIndexCalculator"
input_stream: "image"
output_stream: "detections"
output_stream: "landmarks"
output_stream: "scores"
}

# 分发到不同分支
node {
calculator: "DetectionRenderer"
input_stream: "DETECTIONS:detections"
output_stream: "RENDERED:rendered"
}

node {
calculator: "LandmarkAnalyzer"
input_stream: "LANDMARKS:landmarks"
output_stream: "ANALYSIS:analysis"
}

node {
calculator: "ScoreFilter"
input_stream: "SCORES:scores"
output_stream: "FILTERED:filtered"
}

二十二、分发策略

22.1 广播(相同数据)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ========== 广播分发:所有输出相同数据 ==========

class BroadcastCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("INPUT").Set<Data>();

// 多个输出
for (int i = 0; i < cc->Outputs().NumEntries(); ++i) {
cc->Outputs().Index(i).Set<Data>();
}

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Tag("INPUT").IsEmpty()) {
return absl::OkStatus();
}

const Data& data = cc->Inputs().Tag("INPUT").Get<Data>();

// 创建一次 Packet
auto packet = MakePacket<Data>(data).At(cc->InputTimestamp());

// 广播到所有输出
for (int i = 0; i < cc->Outputs().NumEntries(); ++i) {
cc->Outputs().Index(i).AddPacket(packet);
}

return absl::OkStatus();
}
};

22.2 拆分(不同数据)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// ========== 拆分分发:不同输出不同数据 ==========

class SplitterCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("INPUT").Set<CombinedData>();

cc->Outputs().Tag("PART_A").Set<DataA>();
cc->Outputs().Tag("PART_B").Set<DataB>();
cc->Outputs().Tag("PART_C").Set<DataC>();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Tag("INPUT").IsEmpty()) {
return absl::OkStatus();
}

const CombinedData& combined =
cc->Inputs().Tag("INPUT").Get<CombinedData>();

Timestamp ts = cc->InputTimestamp();

// 拆分到不同输出
cc->Outputs().Tag("PART_A").AddPacket(
MakePacket<DataA>(combined.part_a()).At(ts));

cc->Outputs().Tag("PART_B").AddPacket(
MakePacket<DataB>(combined.part_b()).At(ts));

cc->Outputs().Tag("PART_C").AddPacket(
MakePacket<DataC>(combined.part_c()).At(ts));

return absl::OkStatus();
}
};

22.3 条件分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// ========== 条件分发:根据条件选择输出 ==========

class ConditionalDistributorCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("DATA").Set<Data>();

cc->Outputs().Tag("OUTPUT_A").Set<DataA>();
cc->Outputs().Tag("OUTPUT_B").Set<DataB>();
cc->Outputs().Tag("OUTPUT_C").Set<DataC>();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Tag("DATA").IsEmpty()) {
return absl::OkStatus();
}

const Data& data = cc->Inputs().Tag("DATA").Get<Data>();
Timestamp ts = cc->InputTimestamp();

// 根据条件分发
if (data.HasA()) {
cc->Outputs().Tag("OUTPUT_A").AddPacket(
MakePacket<DataA>(data.GetA()).At(ts));
}

if (data.HasB()) {
cc->Outputs().Tag("OUTPUT_B").AddPacket(
MakePacket<DataB>(data.GetB()).At(ts));
}

if (data.HasC()) {
cc->Outputs().Tag("OUTPUT_C").AddPacket(
MakePacket<DataC>(data.GetC()).At(ts));
}

return absl::OkStatus();
}
};

二十三、可选输出

23.1 动态输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// ========== 可选输出:某些输出可能没有数据 ==========

class DynamicOutputCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("DETECTIONS").Set<std::vector<Detection>>();

// 定义可选输出
cc->Outputs().Tag("FACES").Set<std::vector<Detection>>().Optional();
cc->Outputs().Tag("OBJECTS").Set<std::vector<Detection>>().Optional();
cc->Outputs().Tag("PERSONS").Set<std::vector<Detection>>().Optional();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Tag("DETECTIONS").IsEmpty()) {
return absl::OkStatus();
}

const auto& all_detections =
cc->Inputs().Tag("DETECTIONS").Get<std::vector<Detection>>();

Timestamp ts = cc->InputTimestamp();

// 分类检测
std::vector<Detection> faces;
std::vector<Detection> objects;
std::vector<Detection> persons;

for (const auto& det : all_detections) {
if (det.label() == "face") {
faces.push_back(det);
} else if (det.label() == "person") {
persons.push_back(det);
} else {
objects.push_back(det);
}
}

// 只在有数据时输出
if (!faces.empty()) {
cc->Outputs().Tag("FACES").AddPacket(
MakePacket<std::vector<Detection>>(faces).At(ts));
}

if (!objects.empty()) {
cc->Outputs().Tag("OBJECTS").AddPacket(
MakePacket<std::vector<Detection>>(objects).At(ts));
}

if (!persons.empty()) {
cc->Outputs().Tag("PERSONS").AddPacket(
MakePacket<std::vector<Detection>>(persons).At(ts));
}

return absl::OkStatus();
}
};

23.2 Graph 处理可选输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 可选输出的 Graph

# 分类器
node {
calculator: "DynamicOutputCalculator"
input_stream: "DETECTIONS:detections"
output_stream: "FACES:faces"
output_stream: "OBJECTS:objects"
output_stream: "PERSONS:persons"
}

# 下游 Calculator 需要处理空输入
node {
calculator: "FaceProcessor"
input_stream: "FACES:faces"
output_stream: "RESULT:face_result"
# 当 faces 为空时,输出为空
}

# 使用 Gate 控制
node {
calculator: "PresenceCalculator"
input_stream: "faces"
output_stream: "HAS_FACES:has_faces"
}

node {
calculator: "GateCalculator"
input_stream: "faces"
input_stream: "ALLOW:has_faces"
output_stream: "valid_faces"
}

二十四、IMS 实战:DMS 多路输出

24.1 完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
// dms_output_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_IMS_DMS_OUTPUT_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_IMS_DMS_OUTPUT_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// ========== Proto Options ==========
/*
syntax = "proto3";
package mediapipe;

message DMSOutputOptions {
optional bool enable_visualization = 1 [default = true];
optional bool enable_json = 2 [default = true];
optional bool enable_can = 3 [default = true];
optional bool enable_log = 4 [default = true];
optional float alert_threshold = 5 [default = 0.7];
}
*/

// ========== 告警信息 ==========
message AlertInfo {
int64 timestamp_ms = 1;
bool is_fatigued = 2;
bool is_distracted = 3;
int32 gaze_zone = 4;
float fatigue_score = 5;
float distraction_score = 6;
int32 alert_level = 7; // 0=无, 1=轻度, 2=中度, 3=重度
string alert_message = 8;
}

// ========== 可视化数据 ==========
message VizData {
float fatigue_score = 1;
float distraction_score = 2;
int32 gaze_zone = 3;
HeadPose head_pose = 4;
EyeState eye_state = 5;
repeated Detection detections = 6;
repeated Landmark landmarks = 7;
}

// ========== CAN 消息 ==========
message CANMessage {
uint32 can_id = 1;
uint32 dlc = 2; // 数据长度
bytes data = 3; // 最多 8 字节
}

// ========== DMS 多路输出 Calculator ==========
class DMSOutputCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// ========== 输入 ==========
cc->Inputs().Tag("FATIGUE_SCORE").Set<float>().Optional();
cc->Inputs().Tag("DISTRACTION_SCORE").Set<float>().Optional();
cc->Inputs().Tag("GAZE_ZONE").Set<int>().Optional();
cc->Inputs().Tag("HEAD_POSE").Set<HeadPose>().Optional();
cc->Inputs().Tag("EYE_STATE").Set<EyeState>().Optional();
cc->Inputs().Tag("DETECTIONS").Set<std::vector<Detection>>().Optional();
cc->Inputs().Tag("LANDMARKS").Set<std::vector<Landmark>>().Optional();

// ========== 多路输出 ==========
cc->Outputs().Tag("ALERT").Set<AlertInfo>();
cc->Outputs().Tag("VISUALIZATION").Set<VizData>();
cc->Outputs().Tag("JSON").Set<std::string>();
cc->Outputs().Tag("CAN_MESSAGE").Set<CANMessage>();
cc->Outputs().Tag("DEBUG").Set<std::string>();

cc->Options<DMSOutputOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<DMSOutputOptions>();

enable_visualization_ = options.enable_visualization();
enable_json_ = options.enable_json();
enable_can_ = options.enable_can();
enable_log_ = options.enable_log();
alert_threshold_ = options.alert_threshold();

LOG(INFO) << "DMSOutputCalculator initialized: "
<< "viz=" << enable_visualization_
<< ", json=" << enable_json_
<< ", can=" << enable_can_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
Timestamp ts = cc->InputTimestamp();

// ========== 1. 收集输入数据 ==========
float fatigue = GetOptionalInput<float>(cc, "FATIGUE_SCORE", 0.0f);
float distraction = GetOptionalInput<float>(cc, "DISTRACTION_SCORE", 0.0f);
int gaze_zone = GetOptionalInput<int>(cc, "GAZE_ZONE", 0);

HeadPose pose;
if (!cc->Inputs().Tag("HEAD_POSE").IsEmpty()) {
pose = cc->Inputs().Tag("HEAD_POSE").Get<HeadPose>();
}

EyeState eye;
if (!cc->Inputs().Tag("EYE_STATE").IsEmpty()) {
eye = cc->Inputs().Tag("EYE_STATE").Get<EyeState>();
}

std::vector<Detection> detections;
if (!cc->Inputs().Tag("DETECTIONS").IsEmpty()) {
detections = cc->Inputs().Tag("DETECTIONS").Get<std::vector<Detection>>();
}

std::vector<Landmark> landmarks;
if (!cc->Inputs().Tag("LANDMARKS").IsEmpty()) {
landmarks = cc->Inputs().Tag("LANDMARKS").Get<std::vector<Landmark>>();
}

// ========== 2. 生成告警信息 ==========
AlertInfo alert = GenerateAlert(fatigue, distraction, gaze_zone, ts);
cc->Outputs().Tag("ALERT").AddPacket(
MakePacket<AlertInfo>(alert).At(ts));

// ========== 3. 生成可视化数据 ==========
if (enable_visualization_) {
VizData viz;
viz.set_fatigue_score(fatigue);
viz.set_distraction_score(distraction);
viz.set_gaze_zone(gaze_zone);
*viz.mutable_head_pose() = pose;
*viz.mutable_eye_state() = eye;

for (const auto& det : detections) {
*viz.add_detections() = det;
}
for (const auto& lm : landmarks) {
*viz.add_landmarks() = lm;
}

cc->Outputs().Tag("VISUALIZATION").AddPacket(
MakePacket<VizData>(viz).At(ts));
}

// ========== 4. 生成 JSON 输出 ==========
if (enable_json_) {
std::string json = GenerateJSON(fatigue, distraction, gaze_zone,
pose, eye, ts);
cc->Outputs().Tag("JSON").AddPacket(
MakePacket<std::string>(json).At(ts));
}

// ========== 5. 生成 CAN 消息 ==========
if (enable_can_) {
CANMessage can_msg = GenerateCANMessage(alert);
cc->Outputs().Tag("CAN_MESSAGE").AddPacket(
MakePacket<CANMessage>(can_msg).At(ts));
}

// ========== 6. 生成调试信息 ==========
std::string debug = GenerateDebugInfo(fatigue, distraction, gaze_zone);
cc->Outputs().Tag("DEBUG").AddPacket(
MakePacket<std::string>(debug).At(ts));

return absl::OkStatus();
}

private:
// ========== 配置 ==========
bool enable_visualization_ = true;
bool enable_json_ = true;
bool enable_can_ = true;
bool enable_log_ = true;
float alert_threshold_ = 0.7f;

// ========== 辅助方法 ==========
template <typename T>
T GetOptionalInput(CalculatorContext* cc, const std::string& tag, T default_val) {
if (cc->Inputs().Tag(tag).IsEmpty()) {
return default_val;
}
return cc->Inputs().Tag(tag).Get<T>();
}

AlertInfo GenerateAlert(float fatigue, float distraction, int gaze_zone,
Timestamp ts) {
AlertInfo alert;
alert.set_timestamp_ms(ts.Value() / 1000);
alert.set_fatigue_score(fatigue);
alert.set_distraction_score(distraction);
alert.set_gaze_zone(gaze_zone);

// 判断告警等级
int level = 0;
std::string message;

if (fatigue > alert_threshold_ && distraction > alert_threshold_) {
level = 3; // 重度
message = "严重疲劳分心,请立即停车休息!";
} else if (fatigue > alert_threshold_) {
level = 2; // 中度
message = "检测到疲劳,请注意休息";
} else if (distraction > alert_threshold_) {
level = 2; // 中度
message = "检测到分心,请集中注意力";
} else if (fatigue > alert_threshold_ * 0.7 ||
distraction > alert_threshold_ * 0.7) {
level = 1; // 轻度
message = "状态异常,请注意";
}

alert.set_is_fatigued(fatigue > alert_threshold_);
alert.set_is_distracted(distraction > alert_threshold_);
alert.set_alert_level(level);
alert.set_alert_message(message);

return alert;
}

std::string GenerateJSON(float fatigue, float distraction, int gaze_zone,
const HeadPose& pose, const EyeState& eye,
Timestamp ts) {
std::ostringstream oss;
oss << "{"
<< "\"timestamp\":" << ts.Value() / 1000 << ","
<< "\"fatigue\":" << fatigue << ","
<< "\"distraction\":" << distraction << ","
<< "\"gaze_zone\":" << gaze_zone << ","
<< "\"head_pose\":{"
<< "\"yaw\":" << pose.yaw() << ","
<< "\"pitch\":" << pose.pitch() << ","
<< "\"roll\":" << pose.roll() << "},"
<< "\"eye_state\":{"
<< "\"left_open\":" << eye.left_eye_open() << ","
<< "\"right_open\":" << eye.right_eye_open() << "}"
<< "}";
return oss.str();
}

CANMessage GenerateCANMessage(const AlertInfo& alert) {
CANMessage msg;
msg.set_can_id(0x18FFA027); // DMS 告警 ID
msg.set_dlc(8);

// 编码数据
uint8_t data[8] = {0};
data[0] = static_cast<uint8_t>(alert.alert_level());
data[1] = static_cast<uint8_t>(alert.is_fatigued() ? 1 : 0);
data[2] = static_cast<uint8_t>(alert.is_distracted() ? 1 : 0);
data[3] = static_cast<uint8_t>(alert.gaze_zone());
// 疲劳分数(0-100 映射到 0-255)
data[4] = static_cast<uint8_t>(alert.fatigue_score() * 255);
data[5] = static_cast<uint8_t>(alert.distraction_score() * 255);

msg.set_data(std::string(reinterpret_cast<char*>(data), 8));

return msg;
}

std::string GenerateDebugInfo(float fatigue, float distraction, int gaze_zone) {
std::ostringstream oss;
oss << "fatigue=" << std::fixed << std::setprecision(2) << fatigue
<< ", distraction=" << distraction
<< ", gaze_zone=" << gaze_zone;
return oss.str();
}
};

REGISTER_CALCULATOR(DMSOutputCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_IMS_DMS_OUTPUT_CALCULATOR_H_

24.2 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# ims_dms_output_graph.pbtxt

input_stream: "IR_IMAGE:ir_image"
output_stream: "ALERT:alert"
output_stream: "OUTPUT_IMAGE:output_image"

# ========== 前置处理 ==========
node {
calculator: "FaceDetector"
input_stream: "IMAGE:ir_image"
output_stream: "DETECTIONS:detections"
}

node {
calculator: "LandmarkDetector"
input_stream: "IMAGE:ir_image"
input_stream: "DETECTIONS:detections"
output_stream: "LANDMARKS:landmarks"
}

node {
calculator: "FatigueAnalyzer"
input_stream: "LANDMARKS:landmarks"
output_stream: "FATIGUE_SCORE:fatigue"
}

node {
calculator: "DistractionAnalyzer"
input_stream: "LANDMARKS:landmarks"
output_stream: "DISTRACTION_SCORE:distraction"
output_stream: "GAZE_ZONE:gaze_zone"
}

# ========== 多路输出 ==========
node {
calculator: "DMSOutputCalculator"
input_stream: "FATIGUE_SCORE:fatigue"
input_stream: "DISTRACTION_SCORE:distraction"
input_stream: "GAZE_ZONE:gaze_zone"
input_stream: "DETECTIONS:detections"
input_stream: "LANDMARKS:landmarks"
output_stream: "ALERT:alert"
output_stream: "VISUALIZATION:viz"
output_stream: "JSON:json"
output_stream: "CAN_MESSAGE:can_msg"
output_stream: "DEBUG:debug"
options {
[mediapipe.DMSOutputOptions.ext] {
enable_visualization: true
enable_json: true
enable_can: true
alert_threshold: 0.7
}
}
}

# ========== 可视化分支 ==========
node {
calculator: "OverlayRenderer"
input_stream: "IMAGE:ir_image"
input_stream: "DATA:viz"
output_stream: "OUTPUT_IMAGE:output_image"
}

# ========== CAN 发送分支 ==========
node {
calculator: "CANPublisher"
input_stream: "MESSAGE:can_msg"
}

# ========== 日志分支 ==========
node {
calculator: "JSONLogger"
input_stream: "JSON:json"
}

二十五、性能优化

25.1 零拷贝分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ========== 零拷贝分发 ==========
// 使用 shared_ptr 避免数据拷贝

class ZeroCopyDistributor : public CalculatorBase {
public:
absl::Status Process(CalculatorContext* cc) override {
const auto& data = cc->Inputs().Index(0).Get<std::shared_ptr<Data>>();

// 共享指针传递,无拷贝
auto packet = MakePacket<std::shared_ptr<Data>>(data).At(cc->InputTimestamp());

for (int i = 0; i < cc->Outputs().NumEntries(); ++i) {
cc->Outputs().Index(i).AddPacket(packet);
}

return absl::OkStatus();
}
};

25.2 延迟计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ========== 延迟计算:只在需要时生成输出 ==========
class LazyOutputCalculator : public CalculatorBase {
public:
absl::Status Process(CalculatorContext* cc) override {
// 检查哪些输出有下游连接
if (cc->Outputs().HasTag("OUTPUT_A") &&
cc->Outputs().Tag("OUTPUT_A").IsConnected()) {
// 只有连接时才计算
auto result_a = ComputeExpensiveA();
cc->Outputs().Tag("OUTPUT_A").AddPacket(
MakePacket<DataA>(result_a).At(cc->InputTimestamp()));
}

if (cc->Outputs().HasTag("OUTPUT_B") &&
cc->Outputs().Tag("OUTPUT_B").IsConnected()) {
auto result_b = ComputeExpensiveB();
cc->Outputs().Tag("OUTPUT_B").AddPacket(
MakePacket<DataB>(result_b).At(cc->InputTimestamp()));
}

return absl::OkStatus();
}
};

二十六、总结

输出方式 说明 应用场景
广播 所有输出相同数据 日志、监控
拆分 不同输出不同数据 分支处理
条件输出 部分输出有数据 动态处理
可选输出 输出可能为空 灵活配置

下篇预告

MediaPipe 系列 21:外部数据注入 Calculator——传感器数据接入

深入讲解如何接入外部数据:传感器、CAN 总线、音频流。


参考资料

  1. Google AI Edge. Calculator Outputs
  2. Google AI Edge. Packet Distribution
  3. Google AI Edge. Zero-Copy Design

系列进度: 20/55
更新时间: 2026-03-12


MediaPipe 系列 20:多路输出 Calculator——一个输入多个输出完整指南
https://dapalm.com/2026/03/13/MediaPipe系列20-多路输出Calculator:一个输入多个输出/
作者
Mars
发布于
2026年3月13日
许可协议