MediaPipe 系列 17:数据聚合 Calculator——多流同步完整指南

前言:为什么需要多流同步?

17.1 多流同步的重要性

复杂感知流水线需要多个数据流的聚合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
┌─────────────────────────────────────────────────────────────────────────┐
│ 多流同步的重要性 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 问题:如何聚合多个数据源? │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ IMS DMS 场景: │ │
│ │ │ │
│ │ 输入流: │ │
│ │ • IR 图像流(30 FPS) │ │
│ │ • 眼睛状态流(30 FPS) │ │
│ │ • 头部姿态流(30 FPS) │ │
│ │ • 眨眼频率流(统计值) │ │
│ │ • 车速流(CAN 总线) │ │
│ │ │ │
│ │ 需要同步聚合计算疲劳分数 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 挑战: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ • 不同流有不同的帧率和延迟 │ │
│ │ • 时间戳可能不完全对齐 │ │
│ │ • 某些流可能丢失数据 │ │
│ │ • 需要处理历史数据(时序分析) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 解决方案:多流同步 Calculator │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ • 时间戳同步:按时间戳匹配数据 │ │
│ │ • 滑动窗口:缓存历史数据 │ │
│ │ • 加权融合:多指标聚合 │ │
│ │ • 容错处理:缺失数据处理 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

17.2 同步场景分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
┌─────────────────────────────────────────────────────────────┐
│ 同步场景分类 │
├─────────────────────────────────────────────────────────────┤
│ │
1. 时间戳同步 │
│ ┌─────────────────────────────────────────────┐ │
│ │ Stream A: ──P1──P2──P3──P4──▶ │ │
│ │ │ │ │ │ │ │
│ │ t=0 t=1 t=2 t=3 │ │
│ │ │ │
│ │ Stream B: ──P1──P2──P3──P4──▶ │ │
│ │ │ │ │ │ │ │
│ │ t=0 t=1 t=2 t=3 │ │
│ │ │ │
│ │ 同步输出: ──(A1,B1)──(A2,B2)──▶ │ │
│ └─────────────────────────────────────────────┘ │
│ │
2. 屏障同步 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 等待所有输入到达后才输出 │ │
│ │ │ │
│ │ Stream A: ──P1──────P2──▶ │ │
│ │ Stream B: ────P1──P2───▶ │ │
│ │ Stream C: ─P1──────P2───▶ │ │
│ │ │ │
│ │ 同步输出: ────(A1,B1,C1)────(A2,B2,C2)─▶ │ │
│ └─────────────────────────────────────────────┘ │
│ │
3. 异步处理 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 任一输入到达就处理 │ │
│ │ │ │
│ │ Stream A: ──P1──P2──P3──▶ │ │
│ │ Stream B: ────P1────P2──▶ │ │
│ │ │ │
│ │ 输出: ──A1──B1──A2──A3──B2──▶ │ │
│ └─────────────────────────────────────────────┘ │
│ │
4. 滑动窗口 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 缓存历史数据用于时序分析 │ │
│ │ │ │
│ │ 输入: ──P1──P2──P3──P4──P5──▶ │ │
│ │ │ │
│ │ 窗口: [P1] │ │
│ │ [P1,P2] │ │
│ │ [P1,P2,P3] │ │
│ │ [P2,P3,P4] │ │
│ │ [P3,P4,P5] │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

十八、MediaPipe 同步机制

18.1 时间戳系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// ========== MediaPipe 时间戳系统 ==========

#include "mediapipe/framework/timestamp.h"

namespace mediapipe {

// ========== Timestamp 类 ==========
class Timestamp {
public:
// 构造
explicit Timestamp(int64 value);

// 特殊时间戳
static Timestamp Unset(); // 未设置
static Timestamp Unstarted(); // 未开始
static Timestamp PreStream(); // 流开始前
static Timestamp Min(); // 最小值
static Timestamp Max(); // 最大值
static Timestamp PostStream(); // 流结束后
static Timestamp OneOverPost(); // 结束后一个单位
static Timestamp Done(); // 完成

// 时间戳值
int64 Value() const;

// 比较操作
bool operator<(const Timestamp& other) const;
bool operator<=(const Timestamp& other) const;
bool operator==(const Timestamp& other) const;

// 转换为微秒(如果时间戳单位是微秒)
int64 Microseconds() const;

// 常用时间戳
static constexpr int64 kTimestampUnset = -2;
static constexpr int64 kTimestampUnstarted = -1;
static constexpr int64 kTimestampPreStream = -1;
static constexpr int64 kTimestampMin = std::numeric_limits<int64>::min();
static constexpr int64 kTimestampMax = std::numeric_limits<int64>::max();
static constexpr int64 kTimestampPostStream = std::numeric_limits<int64>::max() - 1;
};

} // namespace mediapipe

// ========== 时间戳使用示例 ==========

// 创建时间戳
Timestamp t1(1000); // 时间戳值 1000

// 获取 Packet 时间戳
const Packet& packet = cc->Inputs().Tag("INPUT").Value();
Timestamp ts = packet.Timestamp();

// 使用时间戳创建 Packet
auto output_packet = MakePacket<Data>(data).At(Timestamp(1000));

18.2 默认同步机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
┌─────────────────────────────────────────────────────────────┐
│ 默认同步机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ MediaPipe 默认使用 SyncSetInputStreamHandler │
│ │
│ 工作原理: │
│ ┌─────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. 输入 Packet 到达 Input Stream │ │
│ │ │ │
│ │ 2. 检查所有输入流是否有相同时间戳的 Packet │ │
│ │ │ │
│ │ 3. 如果都有 → 调用 Process() │ │
│ │ 如果没有 → 缓存等待 │ │
│ │ │ │
│ │ 4. Process() 中可以访问所有输入 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 示例: │
│ ┌─────────────────────────────────────────────┐ │
│ │ Stream A: ──P1(t=0)──P2(t=1)──P3(t=2)─▶ │ │
│ │ │ │
│ │ Stream B: ──P1(t=0)──────P3(t=2)─────▶ │ │
│ │ (P2 丢失) │ │
│ │ │ │
│ │ 调用 Process: │ │
│ │ t=0: (A1, B1) ✓ │ │
│ │ t=1: 等待 B1...(缓存 A1) │ │
│ │ t=2: (A2 等待), (A3, B3) ✓ │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

十九、Input Stream Handler 详解

19.1 SyncSetInputStreamHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# ========== SyncSetInputStreamHandler 配置 ==========

# 默认同步策略(按时间戳匹配)
node {
calculator: "MergeCalculator"
input_stream: "A:stream_a"
input_stream: "B:stream_b"
output_stream: "OUTPUT:merged"
input_stream_handler {
input_stream_handler: "SyncSetInputStreamHandler"
options {
[mediapipe.SyncSetInputStreamHandlerOptions.ext] {
# 同步集合配置
sync_set {
tag_indices: "A"
tag_indices: "B"
}

# 是否允许缺失输入
allow_missing_inputs: false

# 缓存大小限制
max_queue_size: 100
}
}
}
}

# ========== 多同步集合 ==========
# 将输入分组,每组独立同步
node {
calculator: "MultiGroupCalculator"
input_stream: "A:stream_a"
input_stream: "B:stream_b"
input_stream: "C:stream_c"
input_stream: "D:stream_d"
output_stream: "OUTPUT:output"
input_stream_handler {
input_stream_handler: "SyncSetInputStreamHandler"
options {
[mediapipe.SyncSetInputStreamHandlerOptions.ext] {
# 集合 1:A 和 B 同步
sync_set {
tag_indices: "A"
tag_indices: "B"
}
# 集合 2:C 和 D 同步
sync_set {
tag_indices: "C"
tag_indices: "D"
}
}
}
}
}

19.2 BarrierInputStreamHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ========== BarrierInputStreamHandler 配置 ==========

# 屏障同步:等待所有输入到达
node {
calculator: "AggregateCalculator"
input_stream: "A:stream_a"
input_stream: "B:stream_b"
input_stream: "C:stream_c"
output_stream: "OUTPUT:aggregated"
input_stream_handler {
input_stream_handler: "BarrierInputStreamHandler"
}
}

# 工作原理:
# Stream A: ──P1──────P2──▶
# Stream B: ────P1──P2───▶
# Stream C: ─P1──────P2───▶
#
# 输出: ────(A1,B1,C1)────(A2,B2,C2)─▶
# 必须等所有流都有数据才输出

19.3 AsyncInputStreamHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ========== AsyncInputStreamHandler 配置 ==========

# 异步处理:任一输入到达就处理
node {
calculator: "AsyncCalculator"
input_stream: "A:stream_a"
input_stream: "B:stream_b"
output_stream: "OUTPUT:output"
input_stream_handler {
input_stream_handler: "AsyncInputStreamHandler"
}
}

# 工作原理:
# Stream A: ──P1──P2──P3──▶
# Stream B: ────P1────P2──▶
#
# 输出: ──A1──B1──A2──A3──B2──▶
# 每个输入到达立即处理

19.4 ImmediateInputStreamHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
# ========== ImmediateInputStreamHandler 配置 ==========

# 立即处理:不缓存,立即触发
node {
calculator: "ImmediateCalculator"
input_stream: "INPUT:input"
output_stream: "OUTPUT:output"
input_stream_handler {
input_stream_handler: "ImmediateInputStreamHandler"
}
}

# 适用于单输入、实时性要求高的场景

二十、滑动窗口聚合 Calculator

20.1 基本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// sliding_window_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_CORE_SLIDING_WINDOW_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_CORE_SLIDING_WINDOW_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <deque>

namespace mediapipe {

// ========== Proto Options ==========
/*
syntax = "proto3";
package mediapipe;

message SlidingWindowOptions {
optional int32 window_size = 1 [default = 10];
optional bool emit_only_if_full = 2 [default = false];
optional bool timestamp_anchor = 3 [default = LATEST]; // LATEST, EARLIEST, CENTER
}
*/

// ========== 滑动窗口 Calculator ==========
template <typename T>
class SlidingWindowCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).Set<T>();
cc->Outputs().Index(0).Set<std::deque<T>>();
cc->Options<SlidingWindowOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<SlidingWindowOptions>();
window_size_ = options.window_size();
emit_only_if_full_ = options.emit_only_if_full();

LOG(INFO) << "SlidingWindowCalculator initialized: "
<< "window_size=" << window_size_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Index(0).IsEmpty()) {
return absl::OkStatus();
}

// 获取输入数据
const T& data = cc->Inputs().Index(0).Get<T>();

// 添加到窗口
buffer_.push_back(data);

// 保持窗口大小
while (buffer_.size() > window_size_) {
buffer_.pop_front();
}

// 是否输出
if (emit_only_if_full_ && buffer_.size() < window_size_) {
return absl::OkStatus();
}

// 输出窗口数据
cc->Outputs().Index(0).AddPacket(
MakePacket<std::deque<T>>(buffer_).At(cc->InputTimestamp()));

return absl::OkStatus();
}

private:
std::deque<T> buffer_;
int window_size_ = 10;
bool emit_only_if_full_ = false;
};

REGISTER_CALCULATOR(SlidingWindowCalculator);

// ========== 模板实例化 ==========
extern template class SlidingWindowCalculator<float>;
extern template class SlidingWindowCalculator<int>;
extern template class SlidingWindowCalculator<Detection>;

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_CORE_SLIDING_WINDOW_CALCULATOR_H_

20.2 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# ========== 滑动窗口 Graph 配置 ==========

# 历史数据缓存
node {
calculator: "SlidingWindowCalculator<float>"
input_stream: "ear_values"
output_stream: "ear_history"
options {
[mediapipe.SlidingWindowOptions.ext] {
window_size: 30 # 30 帧(1秒)
emit_only_if_full: false
}
}
}

# 检测历史缓存
node {
calculator: "SlidingWindowCalculator<Detection>"
input_stream: "face_detections"
output_stream: "detection_history"
options {
[mediapipe.SlidingWindowOptions.ext] {
window_size: 10
emit_only_if_full: true
}
}
}

二十一、多流融合 Calculator

21.1 加权融合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// multi_stream_fusion_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_FUSION_MULTI_STREAM_FUSION_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_FUSION_MULTI_STREAM_FUSION_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// ========== Proto Options ==========
/*
syntax = "proto3";
package mediapipe;

message MultiStreamFusionOptions {
enum FusionMethod {
WEIGHTED_AVERAGE = 0;
MAX = 1;
MIN = 2;
SUM = 3;
PRODUCT = 4;
}
optional FusionMethod fusion_method = 1 [default = WEIGHTED_AVERAGE];
repeated float weights = 2; # 各输入权重
optional float threshold = 3 [default = 0.5];
}
*/

// ========== 多流融合 Calculator ==========
class MultiStreamFusionCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// 可变数量输入
for (int i = 0; i < cc->Inputs().NumEntries(); ++i) {
cc->Inputs().Index(i).Set<float>();
}
cc->Outputs().Index(0).Set<float>();
cc->Options<MultiStreamFusionOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<MultiStreamFusionOptions>();

fusion_method_ = options.fusion_method();
threshold_ = options.threshold();

// 读取权重
for (const auto& w : options.weights()) {
weights_.push_back(w);
}

// 如果没有指定权重,使用均匀权重
if (weights_.empty()) {
int num_inputs = cc->Inputs().NumEntries();
float uniform_weight = 1.0f / num_inputs;
for (int i = 0; i < num_inputs; ++i) {
weights_.push_back(uniform_weight);
}
}

LOG(INFO) << "MultiStreamFusionCalculator initialized: "
<< "method=" << fusion_method_
<< ", weights=" << weights_.size();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 收集所有输入
std::vector<float> inputs;
std::vector<float> valid_weights;

for (int i = 0; i < cc->Inputs().NumEntries(); ++i) {
if (!cc->Inputs().Index(i).IsEmpty()) {
inputs.push_back(cc->Inputs().Index(i).Get<float>());
if (i < weights_.size()) {
valid_weights.push_back(weights_[i]);
} else {
valid_weights.push_back(1.0f); // 默认权重
}
}
}

if (inputs.empty()) {
return absl::OkStatus();
}

// 融合
float fused_value = 0.0f;

switch (fusion_method_) {
case MultiStreamFusionOptions::WEIGHTED_AVERAGE:
fused_value = WeightedAverage(inputs, valid_weights);
break;

case MultiStreamFusionOptions::MAX:
fused_value = *std::max_element(inputs.begin(), inputs.end());
break;

case MultiStreamFusionOptions::MIN:
fused_value = *std::min_element(inputs.begin(), inputs.end());
break;

case MultiStreamFusionOptions::SUM:
fused_value = std::accumulate(inputs.begin(), inputs.end(), 0.0f);
break;

case MultiStreamFusionOptions::PRODUCT:
fused_value = 1.0f;
for (float v : inputs) {
fused_value *= v;
}
break;
}

// 输出
cc->Outputs().Index(0).AddPacket(
MakePacket<float>(fused_value).At(cc->InputTimestamp()));

return absl::OkStatus();
}

private:
MultiStreamFusionOptions::FusionMethod fusion_method_ =
MultiStreamFusionOptions::WEIGHTED_AVERAGE;
std::vector<float> weights_;
float threshold_ = 0.5f;

float WeightedAverage(const std::vector<float>& values,
const std::vector<float>& weights) {
float sum = 0.0f;
float weight_sum = 0.0f;

for (size_t i = 0; i < values.size(); ++i) {
sum += values[i] * weights[i];
weight_sum += weights[i];
}

return weight_sum > 0 ? sum / weight_sum : 0.0f;
}
};

REGISTER_CALCULATOR(MultiStreamFusionCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_FUSION_MULTI_STREAM_FUSION_CALCULATOR_H_

二十二、IMS 实战:疲劳检测聚合

22.1 完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// fatigue_aggregator_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_IMS_FATIGUE_AGGREGATOR_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_IMS_FATIGUE_AGGREGATOR_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <deque>

namespace mediapipe {

// ========== Proto Options ==========
/*
syntax = "proto3";
package mediapipe;

message FatigueAggregatorOptions {
// 各指标权重
optional float eye_weight = 1 [default = 0.4];
optional float yawn_weight = 2 [default = 0.2];
optional float pose_weight = 3 [default = 0.2];
optional float blink_weight = 4 [default = 0.2];

// 阈值
optional float fatigue_threshold = 5 [default = 0.7];
optional int32 history_size = 6 [default = 30];

// EAR 阈值
optional float ear_threshold = 7 [default = 0.2];

// PERCLOS 参数
optional float perclos_window = 8 [default = 60]; # 60 帧 = 2 秒
optional float perclos_threshold = 9 [default = 0.15];
}
*/

// ========== 疲劳聚合 Calculator ==========
class FatigueAggregatorCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("EYE_STATE").Set<EyeState>();
cc->Inputs().Tag("YAWN_STATE").Set<YawnState>();
cc->Inputs().Tag("HEAD_POSE").Set<HeadPose>();
cc->Inputs().Tag("BLINK_RATE").Set<float>();

cc->Outputs().Tag("FATIGUE_SCORE").Set<float>();
cc->Outputs().Tag("ALERT").Set<bool>();
cc->Outputs().Tag("DETAILS").Set<FatigueDetails>();

cc->Options<FatigueAggregatorOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<FatigueAggregatorOptions>();

// 读取权重
eye_weight_ = options.eye_weight();
yawn_weight_ = options.yawn_weight();
pose_weight_ = options.pose_weight();
blink_weight_ = options.blink_weight();

// 读取阈值
fatigue_threshold_ = options.fatigue_threshold();
history_size_ = options.history_size();
ear_threshold_ = options.ear_threshold();

// 初始化历史缓冲区
fatigue_history_.resize(history_size_, 0.0f);
ear_history_.resize(options.perclos_window(), 0.0f);

LOG(INFO) << "FatigueAggregatorCalculator initialized: "
<< "weights=(" << eye_weight_ << "," << yawn_weight_
<< "," << pose_weight_ << "," << blink_weight_ << ")"
<< ", threshold=" << fatigue_threshold_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// ========== 1. 计算各指标分数 ==========
float eye_score = ComputeEyeScore(cc);
float yawn_score = ComputeYawnScore(cc);
float pose_score = ComputePoseScore(cc);
float blink_score = ComputeBlinkScore(cc);

// ========== 2. 加权融合 ==========
float raw_fatigue =
eye_weight_ * eye_score +
yawn_weight_ * yawn_score +
pose_weight_ * pose_score +
blink_weight_ * blink_score;

// 归一化到 [0, 1]
raw_fatigue = std::max(0.0f, std::min(1.0f, raw_fatigue));

// ========== 3. 更新历史 ==========
fatigue_history_.push_back(raw_fatigue);
fatigue_history_.pop_front();

// ========== 4. 平滑 ==========
float smoothed = SmoothScore(fatigue_history_);

// ========== 5. 判断疲劳 ==========
bool is_fatigued = smoothed > fatigue_threshold_;

// ========== 6. 创建详细信息 ==========
FatigueDetails details;
details.set_eye_score(eye_score);
details.set_yawn_score(yawn_score);
details.set_pose_score(pose_score);
details.set_blink_score(blink_score);
details.set_raw_fatigue(raw_fatigue);
details.set_smoothed_fatigue(smoothed);
details.set_is_fatigued(is_fatigued);

// 计算 PERCLOS
float perclos = ComputePERCLOS();
details.set_perclos(perclos);

// ========== 7. 输出 ==========
cc->Outputs().Tag("FATIGUE_SCORE").AddPacket(
MakePacket<float>(smoothed).At(cc->InputTimestamp()));

cc->Outputs().Tag("ALERT").AddPacket(
MakePacket<bool>(is_fatigued).At(cc->InputTimestamp()));

cc->Outputs().Tag("DETAILS").AddPacket(
MakePacket<FatigueDetails>(details).At(cc->InputTimestamp()));

LOG(INFO) << "Fatigue score: " << smoothed
<< ", alert=" << is_fatigued
<< ", PERCLOS=" << perclos;

return absl::OkStatus();
}

private:
// ========== 配置参数 ==========
float eye_weight_ = 0.4f;
float yawn_weight_ = 0.2f;
float pose_weight_ = 0.2f;
float blink_weight_ = 0.2f;

float fatigue_threshold_ = 0.7f;
int history_size_ = 30;
float ear_threshold_ = 0.2f;

// ========== 历史数据 ==========
std::deque<float> fatigue_history_;
std::deque<float> ear_history_;

// ========== 方法 ==========
float ComputeEyeScore(CalculatorContext* cc) {
if (cc->Inputs().Tag("EYE_STATE").IsEmpty()) {
return 0.0f;
}

const EyeState& eye = cc->Inputs().Tag("EYE_STATE").Get<EyeState>();

// 基于 EAR 计算分数
float ear_avg = (eye.ear_left() + eye.ear_right()) / 2.0f;

// 更新 EAR 历史
ear_history_.push_back(ear_avg);
ear_history_.pop_front();

// EAR 越低,分数越高
if (ear_avg < ear_threshold_) {
return 1.0f; // 闭眼
} else {
return 0.0f; // 睁眼
}
}

float ComputeYawnScore(CalculatorContext* cc) {
if (cc->Inputs().Tag("YAWN_STATE").IsEmpty()) {
return 0.0f;
}

const YawnState& yawn = cc->Inputs().Tag("YAWN_STATE").Get<YawnState>();

// 打哈欠分数
return yawn.is_yawning() ? 1.0f : 0.0f;
}

float ComputePoseScore(CalculatorContext* cc) {
if (cc->Inputs().Tag("HEAD_POSE").IsEmpty()) {
return 0.0f;
}

const HeadPose& pose = cc->Inputs().Tag("HEAD_POSE").Get<HeadPose>();

// 头部姿态异常分数
float pitch_score = std::abs(pose.pitch()) > 30.0f ? 1.0f : 0.0f;
float yaw_score = std::abs(pose.yaw()) > 45.0f ? 1.0f : 0.0f;

return std::max(pitch_score, yaw_score);
}

float ComputeBlinkScore(CalculatorContext* cc) {
if (cc->Inputs().Tag("BLINK_RATE").IsEmpty()) {
return 0.0f;
}

float blink_rate = cc->Inputs().Tag("BLINK_RATE").Get<float>();

// 眨眼频率异常分数
// 正常眨眼频率:15-20 次/分钟
if (blink_rate > 30.0f || blink_rate < 5.0f) {
return 1.0f;
} else {
return 0.0f;
}
}

float SmoothScore(const std::deque<float>& history) {
if (history.empty()) return 0.0f;

// 指数加权移动平均
float alpha = 0.1f;
float smoothed = history.front();

for (size_t i = 1; i < history.size(); ++i) {
smoothed = alpha * history[i] + (1.0f - alpha) * smoothed;
}

return smoothed;
}

float ComputePERCLOS() {
if (ear_history_.empty()) return 0.0f;

// 计算眼睛闭合时间比例
int closed_count = 0;
for (float ear : ear_history_) {
if (ear < ear_threshold_) {
closed_count++;
}
}

return static_cast<float>(closed_count) / ear_history_.size();
}
};

REGISTER_CALCULATOR(FatigueAggregatorCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_IMS_FATIGUE_AGGREGATOR_CALCULATOR_H_

22.2 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# ims_fatigue_detection_graph.pbtxt

input_stream: "IR_IMAGE:ir_image"
output_stream: "FATIGUE_SCORE:fatigue_score"
output_stream: "ALERT:alert"

# ========== 眼睛状态检测 ==========
node {
calculator: "EyeStateCalculator"
input_stream: "IMAGE:ir_image"
output_stream: "EYE_STATE:eye_state"
}

# ========== 打哈欠检测 ==========
node {
calculator: "YawnDetectionCalculator"
input_stream: "IMAGE:ir_image"
output_stream: "YAWN_STATE:yawn_state"
}

# ========== 头部姿态估计 ==========
node {
calculator: "HeadPoseCalculator"
input_stream: "IMAGE:ir_image"
output_stream: "HEAD_POSE:head_pose"
}

# ========== 眨眼频率计算 ==========
node {
calculator: "BlinkRateCalculator"
input_stream: "EYE_STATE:eye_state"
output_stream: "BLINK_RATE:blink_rate"
}

# ========== 疲劳指标聚合 ==========
node {
calculator: "FatigueAggregatorCalculator"
input_stream: "EYE_STATE:eye_state"
input_stream: "YAWN_STATE:yawn_state"
input_stream: "HEAD_POSE:head_pose"
input_stream: "BLINK_RATE:blink_rate"
output_stream: "FATIGUE_SCORE:fatigue_score"
output_stream: "ALERT:alert"
output_stream: "DETAILS:details"
input_stream_handler {
input_stream_handler: "SyncSetInputStreamHandler"
}
options {
[mediapipe.FatigueAggregatorOptions.ext] {
eye_weight: 0.4
yawn_weight: 0.2
pose_weight: 0.2
blink_weight: 0.2
fatigue_threshold: 0.7
history_size: 30
ear_threshold: 0.2
perclos_window: 60
}
}
}

二十三、总结

策略 Handler 说明 适用场景
时间戳同步 SyncSet 按时间戳匹配 多输入融合
屏障同步 Barrier 等待所有输入 聚合计算
异步处理 Async 立即处理 实时响应
立即处理 Immediate 不缓存 单输入高实时

下篇预告

MediaPipe 系列 18:滑动窗口 Calculator——时序数据处理

深入讲解时序数据处理、状态机实现、事件检测。


参考资料

  1. Google AI Edge. MediaPipe Input Stream Handlers
  2. Google AI Edge. SyncSet Handler
  3. Google AI Edge. Timestamp System

系列进度: 17/55
更新时间: 2026-03-12


MediaPipe 系列 17:数据聚合 Calculator——多流同步完整指南
https://dapalm.com/2026/03/13/MediaPipe系列17-数据聚合Calculator:多流同步/
作者
Mars
发布于
2026年3月13日
许可协议