MediaPipe 系列 03:Packet 与 Stream——时间序列数据的流动完整指南

前言:为什么时间戳和流同步至关重要?

3.1 IMS 实时处理的挑战

IMS DMS 系统对时间同步的要求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
┌─────────────────────────────────────────────────────────────────────────┐
│ IMS DMS 时间同步挑战 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 问题:多传感器数据来源不同步 │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ IR Camera (30 FPS) │ │
│ │ Timestamp: 0, 33ms, 66ms, 100ms... │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 时间戳不同步 │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ CAN 总线 (10 Hz) │ │
│ │ Timestamp: 0, 100ms, 200ms, 300ms... │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 需求:统一时间基准,多流对齐 │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 疲劳检测算法需要同时使用: │ │
│ │ • IR 图像帧(视觉信息) │ │
│ │ • 车速数据(运动状态) │ │
│ │ • 时间戳(同步基准) │ │
│ │ │ │
│ │ 要求: │ │
│ │ • 视觉帧和车速数据必须对应同一时刻 │ │
│ │ • 延迟 < 50ms(法规要求) │ │
│ │ • 时间戳单调递增,无重复 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

3.2 MediaPipe 的解决方案

MediaPipe 通过 Packet 和 Stream 提供时间同步机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
┌─────────────────────────────────────────────────────────────┐
│ MediaPipe 时间同步机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Packet:数据包 │ │
│ │ │ │
│ │ • 携带时间戳(单调递增) │ │
│ │ • 携带数据负载(任意类型) │ │
│ │ • 零拷贝传递 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Stream:数据流通道 │ │
│ │ │ │
│ │ • Packet 的有序序列(按时间戳排序) │ │
│ │ • 自动时间戳对齐 │ │
│ │ • 支持多流同步 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Calculator:计算节点 │ │
│ │ │ │
│ │ • 接收多个 Stream 的 Packet │ │
│ │ • 按时间戳同步处理 │ │
│ │ • 输出到新的 Stream │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

四、Packet:数据包详解

4.1 Packet 核心结构

Packet 是不可变的数据包,包含时间戳和数据负载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// mediapipe/framework/packet.h
template <typename T>
class Packet {
public:
// ========== 创建 Packet ==========
// MakePacket:创建新 Packet(所有权转移)
static Packet MakePacket(const T& value);
static Packet MakePacket(T&& value);

// At:指定时间戳
Packet At(Timestamp timestamp) const;

// Adopt:转移所有权(原始指针)
static Packet Adopt(T* ptr);

// PointToForeign:不拥有所有权(引用)
static Packet PointToForeign(const T* ptr);

// ========== 获取数据 ==========
const T& Get() const;
T& GetMutable();

// ========== 时间戳操作 ==========
Timestamp Timestamp() const;
bool HasTimestamp() const;

// ========== 空值检查 ==========
bool IsEmpty() const;

// ========== 类型验证 ==========
template <typename U>
absl::Status ValidateAsType() const;

private:
Timestamp timestamp_; // 时间戳
std::shared_ptr<T> payload_; // 数据负载(共享指针)
};

Packet 内部结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
┌─────────────────────────────────────────────────────────────┐
│ Packet 内部结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Packet<std::string> │ │
│ │ │ │
│ │ ┌───────────────────────────────────┐ │ │
│ │ │ timestamp_ │ │ │
│ │ │ • 类型:Timestamp │ │ │
│ │ │ • 值:1000000 (1 秒) │ │ │
│ │ │ • 单调递增 │ │ │
│ │ └───────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌───────────────────────────────────┐ │ │
│ │ │ payload_ (shared_ptr) │ │ │
│ │ │ • 类型:shared_ptr<string> │ │ │
│ │ │ • 指向:new std::string("hello")│ │ │
│ │ │ • 引用计数:1 │ │ │
│ │ │ • 零拷贝传递 │ │ │
│ │ └───────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 时间戳示例: │
│ ┌─────────────────────────────────────────────┐ │
│ │ Timestamp::PreStream() = -1 (流开始前) │ │
│ │ Timestamp(0) = 0 │ │
│ │ Timestamp(1000000) = 1 秒 │ │
│ │ Timestamp(2000000) = 2 秒 │ │
│ │ Timestamp::PostStream() = -2 (流结束后) │ │
│ │ Timestamp::Unset() = -3 (未设置) │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

4.2 Packet 创建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// ========== 方式 1:MakePacket(推荐)==========
// 自动管理内存,推荐使用
#include "mediapipe/framework/packet.h"

// 创建基本类型
auto str_packet = MakePacket<std::string>("hello");
auto int_packet = MakePacket<int>(42);
auto float_packet = MakePacket<float>(3.14);

// 创建复杂类型(自动拷贝)
cv::Mat image = cv::imread("test.jpg");
auto mat_packet = MakePacket<cv::Mat>(image); // 拷贝构造

// 创建容器类型
std::vector<int> vec = {1, 2, 3, 4, 5};
auto vec_packet = MakePacket<std::vector<int>>(vec); // 拷贝

// ========== 方式 2:MakePacket + 时间戳 ==========
// 必须指定时间戳
auto packet1 = MakePacket<int>(42).At(Timestamp(1000000));
auto packet2 = MakePacket<cv::Mat>(image).At(Timestamp(2000000));

// ========== 方式 3:Adopt(转移所有权)==========
// 适用于:已经管理内存,转移所有权给 Packet
// ⚠️ 谨慎使用,必须确保数据不会提前释放

#include <memory>

// 创建原始指针
MyData* raw_ptr = new MyData{1, 2, 3};

// 转移所有权给 Packet
auto adopt_packet = Adopt(raw_ptr).At(Timestamp(0));

// raw_ptr 现在由 Packet 管理
// 当 Packet 析构时,MyData 也会被释放

// ========== 方式 4:PointToForeign(不拥有所有权)==========
// 适用于:数据由外部管理,Packet 只是引用
// ⚠️ 谨慎使用,必须确保数据在 Packet 生命周期内有效

MyData data{1, 2, 3};
auto foreign_packet = PointToForeign(&data).At(Timestamp(0));

// data 的生命周期必须 >= foreign_packet
// 否则会导致悬空指针

// ========== 方式 5:从已有值创建 ==========
int value = 42;
auto packet3 = MakePacket<int>(value).At(Timestamp(1000000));

std::string str = "world";
auto packet4 = MakePacket<std::string>(str).At(Timestamp(2000000));

4.3 Packet 访问与操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// ========== 获取数据 ==========
#include "mediapipe/framework/packet.h"

// MakePacket 创建的 Packet
auto str_packet = MakePacket<std::string>("hello");

// 获取数据(const 引用)
const std::string& str = str_packet.Get<std::string>();
// str 是 const,不能修改

// 获取可修改的数据(不推荐,因为数据是共享的)
std::string& str_mut = str_packet.GetMutable<std::string>();
// ⚠️ 修改会影响其他持有同一数据的 Packet

// ========== 获取时间戳 ==========
auto packet = MakePacket<int>(42).At(Timestamp(1000000));

Timestamp ts = packet.Timestamp();
int64_t us = ts.Value(); // 返回原始值:1000000

bool has_ts = packet.HasTimestamp(); // true

// ========== 空值检查 ==========
auto empty_packet = MakePacket<int>(); // 没有数据,只有时间戳
auto valid_packet = MakePacket<int>(42);

if (empty_packet.IsEmpty()) {
// Packet 是空的
}

if (!valid_packet.IsEmpty()) {
// Packet 有数据
}

// ========== 类型验证 ==========
auto packet = MakePacket<std::string>("hello");

// 验证类型是否正确
if (packet.ValidateAsType<std::string>().ok()) {
const std::string& str = packet.Get<std::string>();
LOG(INFO) << "Valid string: " << str;
} else {
LOG(ERROR) << "Type mismatch!";
}

// 验证失败会返回错误状态
absl::Status status = packet.ValidateAsType<cv::Mat>();
if (!status.ok()) {
// 类型不匹配
}

// ========== 时间戳比较 ==========
Timestamp ts1 = Timestamp(1000000);
Timestamp ts2 = Timestamp(2000000);

if (ts1.IsEarlierThan(ts2)) {
LOG(INFO) << "ts1 < ts2";
}

if (ts1.IsSameAs(ts2)) {
LOG(INFO) << "ts1 == ts2";
}

if (!ts1.IsEarlierThan(ts2) && !ts1.IsSameAs(ts2)) {
LOG(INFO) << "ts1 > ts2";
}

// ========== 时间戳运算 ==========
Timestamp ts = Timestamp(1000000);

Timestamp ts_plus = ts.Plus(500000); // +0.5 秒
Timestamp ts_minus = ts.Minus(200000); // -0.2 秒

Timestamp ts_abs = ts.Abs(); // 绝对值

// ========== 时间戳类型 ==========
Timestamp::PreStream() // -1(流开始前)
Timestamp::PostStream() // -2(流结束后)
Timestamp::Unset() // -3(未设置)

Timestamp t0 = Timestamp::PreStream();
Timestamp t1 = Timestamp::PostStream();

4.4 时间戳系统详解

Timestamp 的完整类型系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// mediapipe/framework/timestamp.h

// ========== 时间戳类型 ==========
enum class TimestampType {
PRE_STREAM, // -1:流开始前
NORMAL, // 0:普通时间戳
POST_STREAM, // -2:流结束后
UNSET, // -3:未设置
};

// ========== Timestamp 类 ==========
class Timestamp {
public:
// ========== 构造函数 ==========
Timestamp(); // 默认构造,类型 = UNSET
explicit Timestamp(int64_t value); // 普通时间戳
static Timestamp PreStream(); // -1
static Timestamp PostStream(); // -2
static Timestamp Unset(); // -3

// ========== 获取值 ==========
int64_t Value() const; // 返回原始值

// ========== 类型检查 ==========
TimestampType Type() const;
bool IsPreStream() const;
bool IsPostStream() const;
bool IsUnset() const;
bool IsNormal() const;
bool IsValid() const; // 不是 PreStream/PostStream/UnSet

// ========== 比较 ==========
bool IsEarlierThan(const Timestamp& other) const;
bool IsLaterThan(const Timestamp& other) const;
bool IsSameAs(const Timestamp& other) const;
bool IsBetween(const Timestamp& start, const Timestamp& end) const;

// ========== 运算 ==========
Timestamp Plus(int64_t offset_us) const;
Timestamp Minus(int64_t offset_us) const;
Timestamp Abs() const;

// ========== 常量 ==========
static constexpr int64_t kTimestampUnset = -3;
static constexpr int64_t kTimestampPreStream = -1;
static constexpr int64_t kTimestampPostStream = -2;
};

// ========== 使用示例 ==========
Timestamp ts0 = Timestamp::PreStream(); // -1
Timestamp ts1 = Timestamp(0); // 0
Timestamp ts2 = Timestamp(1000000); // 1 秒
Timestamp ts3 = Timestamp(2000000); // 2 秒
Timestamp ts4 = Timestamp::PostStream(); // -2
Timestamp ts5 = Timestamp::Unset(); // -3

LOG(INFO) << "ts0 type: " << ts0.Type(); // PRE_STREAM
LOG(INFO) << "ts1 type: " << ts1.Type(); // NORMAL
LOG(INFO) << "ts4 type: " << ts4.Type(); // POST_STREAM

// 时间戳比较
if (ts1.IsEarlierThan(ts2)) {
LOG(INFO) << "ts1 < ts2";
}

// 时间戳运算
Timestamp ts6 = ts2.Plus(500000); // 1.5 秒
Timestamp ts7 = ts2.Minus(500000); // 0.5 秒

时间戳的使用场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// ========== 场景 1:流开始前 ==========
Timestamp start = Timestamp::PreStream();

// 通常用于初始化
auto packet = MakePacket<int>(0).At(start);

// ========== 场景 2:流结束后 ==========
Timestamp end = Timestamp::PostStream();

// 用于标记流结束
graph.CloseInputStream("input");
graph.WaitUntilDone();

// ========== 场景 3:未设置 ==========
Timestamp unset = Timestamp::Unset();

// 用于标记无效数据
auto empty_packet = MakePacket<int>().At(unset);

// ========== 场景 4:普通时间戳 ==========
// 每个输入 Packet 必须有单调递增的时间戳
for (int i = 0; i < 10; ++i) {
auto packet = MakePacket<int>(i).At(Timestamp(i * 1000000));
graph.AddPacketToInputStream("input", packet);
}

// 帧率计算示例
// 30 FPS → 1 帧 = 33333 微秒(33.33ms)
auto frame1 = MakePacket<cv::Mat>(img1).At(Timestamp(0));
auto frame2 = MakePacket<cv::Mat>(img2).At(Timestamp(3333333));
auto frame3 = MakePacket<cv::Mat>(img3).At(Timestamp(6666666));

五、Stream:数据流通道详解

5.1 Stream 核心概念

Stream 是 Packet 的有序序列,按时间戳排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────────────────────────┐
│ Stream:数据流 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Stream: ──P1──P2──P3──P4──P5──P6──P7──P8──P9──P10──▶ │
│ │ │ │ │ │ │ │ │ │ │ │
t=0 t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8 t=9 │
│ │
│ 特征: │
│ 1. 有序:Packet 按时间戳升序排列 │
│ 2. 单调:时间戳必须单调递增(不能重复) │
│ 3. 同步:多个 Stream 自动时间戳对齐 │
│ 4. 背压:处理慢时自动阻塞输入 │
│ │
└─────────────────────────────────────────────────────────────┘

5.2 Stream 连接机制

Calculator 通过 Stream 连接:

1
2
3
4
5
6
7
8
9
10
11
12
# ========== Stream 连接示例 ==========
# Calculator A 输出到 Stream A
node {
calculator: "CalculatorA"
output_stream: "OUTPUT:stream_a"
}

# Calculator B 从 Stream A 输入
node {
calculator: "CalculatorB"
input_stream: "INPUT:stream_a"
}

连接规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
┌─────────────────────────────────────────────────────────────┐
Stream 连接规则 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Calculator A │
│ ┌─────────────────────────────────────────────┐ │
│ │ │ │
│ │ output_stream: "OUTPUT:stream_a" │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ │ Stream A │
│ │ │
│ ▼ │
│ Calculator B │
│ ┌─────────────────────────────────────────────┐ │
│ │ │ │
│ │ input_stream: "INPUT:stream_a" │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 连接规则: │
1. 输出 Stream 名称 + 输入 Stream 名称必须相同(Tag) │
2. 输出 Stream 名称 + 输入 Stream 名称必须相同(Name) │
3. 如果不指定 Tag,默认使用 Name 连接 │
│ │
└─────────────────────────────────────────────────────────────┘

5.3 Stream 标签(Tag)详解

使用 Tag 区分多个输入输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ========== 多输入 Stream ==========
node {
calculator: "MultiInputCalculator"
input_stream: "IMAGE:image_stream" # Tag: IMAGE, Name: image_stream
input_stream: "AUDIO:audio_stream" # Tag: AUDIO, Name: audio_stream
input_stream: "SENSOR:sensor_stream" # Tag: SENSOR, Name: sensor_stream
output_stream: "RESULT:result_stream" # Tag: RESULT, Name: result_stream
output_stream: "METRICS:metrics_stream" # Tag: METRICS, Name: metrics_stream
}

# ========== 多输出 Stream ==========
node {
calculator: "MultiOutputCalculator"
input_stream: "input"
output_stream: "DETECTIONS:detections_stream" # Tag: DETECTIONS
output_stream: "LANDMARKS:landmarks_stream" # Tag: LANDMARKS
output_stream: "POSE:pose_stream" # Tag: POSE
}

C++ 中访问 Stream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// ========== 访问输入 Stream ==========
static absl::Status GetContract(CalculatorContract* cc) {
// Tag 方式
cc->Inputs().Tag("IMAGE").Set<cv::Mat>();
cc->Inputs().Tag("AUDIO").Set<std::vector<float>>();
cc->Inputs().Tag("SENSOR").Set<SensorData>();

// Index 方式
cc->Inputs().Index(0).Set<cv::Mat>();
cc->Inputs().Index(1).Set<std::vector<float>>();
cc->Inputs().Index(2).Set<SensorData>();

return absl::OkStatus();
}

// ========== 访问输出 Stream ==========
static absl::Status GetContract(CalculatorContract* cc) {
// Tag 方式
cc->Outputs().Tag("DETECTIONS").Set<std::vector<Detection>>();
cc->Outputs().Tag("CONFIDENCES").Set<std::vector<float>>();
cc->Outputs().Tag("BBOXES").Set<std::vector<BoundingBox>>();

// Index 方式
cc->Outputs().Index(0).Set<std::vector<Detection>>();
cc->Outputs().Index(1).Set<std::vector<float>>();
cc->Outputs().Index(2).Set<std::vector<BoundingBox>>();

return absl::OkStatus();
}

// ========== 使用示例 ==========
absl::Status Process(CalculatorContext* cc) override {
// 获取输入
const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();
const auto& audio = cc->Inputs().Tag("AUDIO").Get<std::vector<float>>();

// 检查输入是否为空
if (cc->Inputs().Tag("IMAGE").IsEmpty()) {
return absl::OkStatus();
}

// 输出结果
cc->Outputs().Tag("DETECTIONS").AddPacket(
MakePacket<std::vector<Detection>>(detections)
.At(cc->InputTimestamp()));

cc->Outputs().Tag("CONFIDENCES").AddPacket(
MakePacket<std::vector<float>>(confidences)
.At(cc->InputTimestamp()));

cc->Outputs().Tag("BBOXES").AddPacket(
MakePacket<std::vector<BoundingBox>>(bboxes)
.At(cc->InputTimestamp()));

return absl::OkStatus();
}

六、Stream 同步机制

6.1 同步机制原理

当 Calculator 有多个输入 Stream 时,默认按时间戳同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
┌─────────────────────────────────────────────────────────────┐
│ 多流同步机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Stream A: ──P1──P2──P3──P4──P5──P6──P7──P8──P9──P10──▶ │
│ │ │ │ │ │ │ │ │ │ │ │
t=0 t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8 t=9 │
│ │
│ Stream B: ──P1──P2──P3──P4──P5──P6──P7──P8──P9──P10──▶ │
│ │ │ │ │ │ │ │ │ │ │ │
t=0 t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8 t=9 │
│ │
│ Calculator 处理流程: │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 1. 接收 P1(A) + P1(B) │ │
│ │ └─ 时间戳 t=0,两个 Stream 都有数据 │ │
│ │ └─ Process(t=0) 被调用 │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 2. 接收 P2(A) + P2(B) │ │
│ │ └─ 时间戳 t=1,两个 Stream 都有数据 │ │
│ │ └─ Process(t=1) 被调用 │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
... │
│ │
│ Process(t) 只在所有输入 Stream 都有 Packet(t) 时调用 │
│ │
└─────────────────────────────────────────────────────────────┘

6.2 同步策略

MediaPipe 提供多种同步策略:

策略 说明 适用场景
sync 严格同步(默认) 输入必须时间戳对齐
async 异步 不需要同步
sync_set 集合同步 多输入融合
sync_barrier 屏障同步 批量处理

配置同步策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# ========== 严格同步 ==========
node {
calculator: "MyCalculator"
input_stream: "input1"
input_stream: "input2"
input_stream: "input3"

input_stream_handler {
input_stream_handler: "SyncSetInputStreamHandler"
}
}

# ========== 异步 ==========
node {
calculator: "AsyncCalculator"
input_stream: "input1"
input_stream: "input2"

input_stream_handler {
input_stream_handler: "AsyncInputStreamHandler"
}
}

# ========== 屏障同步 ==========
node {
calculator: "BarrierCalculator"
input_stream: "input1"
input_stream: "input2"
input_stream: "input3"

input_stream_handler {
input_stream_handler: "SyncBarrierInputStreamHandler"
}
}

6.3 多流融合示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// ========== 多输入融合 Calculator ==========
class MultiInputCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// 定义多个输入
cc->Inputs().Tag("IMAGE").Set<cv::Mat>();
cc->Inputs().Tag("AUDIO").Set<std::vector<float>>();
cc->Inputs().Tag("SENSOR").Set<SensorData>();

// 定义输出
cc->Outputs().Tag("OUTPUT").Set<MergedData>();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// ========== 检查输入是否可用 ==========
bool has_image = !cc->Inputs().Tag("IMAGE").IsEmpty();
bool has_audio = !cc->Inputs().Tag("AUDIO").IsEmpty();
bool has_sensor = !cc->Inputs().Tag("SENSOR").IsEmpty();

// ========== 如果所有输入都有数据,则处理 ==========
if (has_image && has_audio && has_sensor) {
// 获取所有输入
const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>();
const auto& audio = cc->Inputs().Tag("AUDIO").Get<std::vector<float>>();
const SensorData& sensor = cc->Inputs().Tag("SENSOR").Get<SensorData>();

// 融合数据
MergedData merged;
merged.image = image;
merged.audio = audio;
merged.sensor = sensor;

// 输出结果
cc->Outputs().Tag("OUTPUT").AddPacket(
MakePacket<MergedData>(merged).At(cc->InputTimestamp()));
}

// ========== 如果有输入但没有所有输入,则等待 ==========
// MediaPipe 会自动缓存,直到所有输入都有数据

return absl::OkStatus();
}
};

REGISTER_CALCULATOR(MultiInputCalculator);

pbtxt 配置:

1
2
3
4
5
6
7
8
9
node {
calculator: "MultiInputCalculator"
input_stream: "IMAGE:image"
input_stream: "AUDIO:audio"
input_stream: "SENSOR:sensor"
input_stream_handler {
input_stream_handler: "SyncSetInputStreamHandler"
}
}

七、Packet 生命周期与内存管理

7.1 引用计数机制

Packet 使用 std::shared_ptr 管理内存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ========== 引用计数示例 ==========
#include <memory>

// 创建 Packet(引用计数 = 1)
auto packet1 = MakePacket<std::vector<int>>({1, 2, 3});

// 复制 Packet(引用计数 = 2)
auto packet2 = packet1; // 共享同一份数据

// 再次复制(引用计数 = 3)
auto packet3 = packet1; // 仍然是同一份数据

// 当所有引用消失时,数据才被释放
// packet1, packet2, packet3 都析构后,vector<int> 才被释放

// ========== 修改数据的影响 ==========
auto packet1 = MakePacket<std::vector<int>>({1, 2, 3});
auto packet2 = packet1; // 共享数据

// 修改 packet1
packet1.GetMutable<std::vector<int>>()->push_back(4);

// packet2 也会被修改
LOG(INFO) << packet2.Get<std::vector<int>>()[0]; // 输出:1
LOG(INFO) << packet2.Get<std::vector<int>>()[3]; // 输出:4

// ⚠️ 这是因为 shared_ptr 指向同一份数据

7.2 零拷贝设计

Packet 传递时不会复制数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ========== 零拷贝传递示例 ==========
class ZeroCopyCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("INPUT").Set<cv::Mat>();
cc->Outputs().Tag("OUTPUT").Set<cv::Mat>();
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// ========== 错误:直接传递(数据被拷贝)==========
// cc->Outputs().Tag("OUTPUT").AddPacket(
// cc->Inputs().Tag("INPUT").Get<cv::Mat>()); // 拷贝!

// ========== 正确:直接传递(零拷贝)==========
cc->Outputs().Tag("OUTPUT").AddPacket(
cc->Inputs().Tag("INPUT").Value()); // 零拷贝,共享所有权

return absl::OkStatus();
}
};

REGISTER_CALCULATOR(ZeroCopyCalculator);

零拷贝的优势:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
┌─────────────────────────────────────────────────────────────┐
│ 零拷贝设计优势 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 传统拷贝方式: │
│ ┌─────────────────────────────────────────────┐ │
│ │ Input Packet (100MB) │ │
│ │ └─ 拷贝到 Output Packet (100MB) │ │
│ │ └─ 总内存:200MB │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 零拷贝方式: │
│ ┌─────────────────────────────────────────────┐ │
│ │ Input Packet (100MB) │ │
│ │ └─ shared_ptr 指向同一数据 │ │
│ │ └─ Output Packet (100MB) │ │
│ │ └─ shared_ptr 指向同一数据 │ │
│ │ └─ 总内存:100MB + shared_ptr开销 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 性能提升: │
│ • 内存占用减少 50% │
│ • 拷贝时间从 100ms → 0ms │
│ • 适用于大图像、大视频帧 │
│ │
└─────────────────────────────────────────────────────────────┘

7.3 数据修改注意事项

Packet 数据是只读的,修改前必须拷贝:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ========== 错误:直接修改 Packet 数据 ==========
auto packet = MakePacket<cv::Mat>(image);
const cv::Mat& mat = packet.Get<cv::Mat>();

// 修改数据
mat.at<uint8_t>(0, 0) = 255; // ❌ 编译错误!

// 或者如果编译通过(例如使用 const_cast),运行时崩溃
cv::Mat& mat_mut = const_cast<cv::Mat&>(mat);
mat_mut.at<uint8_t>(0, 0) = 255; // ❌ 崩溃!shared_ptr 会被破坏

// ========== 正确:修改前拷贝 ==========
auto packet = MakePacket<cv::Mat>(image);
const cv::Mat& mat = packet.Get<cv::Mat>();

// 拷贝后修改
cv::Mat mat_copy = mat.clone(); // 拷贝
mat_copy.at<uint8_t>(0, 0) = 255;

// 创建新的 Packet
auto new_packet = MakePacket<cv::Mat>(mat_copy).At(packet.Timestamp());

// ========== 使用 CopyFrom(推荐)==========
auto packet = MakePacket<cv::Mat>(image);
cv::Mat output = packet.Get<cv::Mat>().clone(); // 拷贝
output.at<uint8_t>(0, 0) = 255;

// ========== 使用 MakePacket(推荐)==========
auto packet = MakePacket<cv::Mat>(image);
cv::Mat output = packet.Get<cv::Mat>().clone();
output.at<uint8_t>(0, 0) = 255;
auto new_packet = MakePacket<cv::Mat>(output).At(packet.Timestamp());

八、实战:时序分析 Calculator

8.1 滑动窗口 Calculator

需求:缓存最近 N 帧数据,用于时序分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// sliding_window_calculator.h
#ifndef SLIDING_WINDOW_CALCULATOR_H_
#define SLIDING_WINDOW_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <deque>
#include <memory>

namespace mediapipe {

// ========== 滑动窗口 Options ==========
message SlidingWindowOptions {
optional int32 window_size = 1 [default = 10];
optional int64_t step_size = 2 [default = 1]; // 步长(帧数)
}

// ========== 滑动窗口 Calculator ==========
template <typename T>
class SlidingWindowCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// 定义输入输出
cc->Inputs().Index(0).Set<T>();
cc->Outputs().Index(0).Set<std::deque<T>>();

// 声明 Options
cc->Options<SlidingWindowOptions>();

return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
// 读取 Options
window_size_ = cc->Options<SlidingWindowOptions>().window_size();
step_size_ = cc->Options<SlidingWindowOptions>().step_size();

// 初始化窗口
buffer_.clear();

LOG(INFO) << "SlidingWindowCalculator initialized: "
<< "window_size=" << window_size_
<< ", step_size=" << step_size_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// ========== 1. 添加新数据 ==========
const T& new_data = cc->Inputs().Index(0).Get<T>();
buffer_.push_back(new_data);

// ========== 2. 保持窗口大小 ==========
while (buffer_.size() > window_size_) {
buffer_.pop_front();
}

// ========== 3. 按步长输出(可选)==========
// 如果 step_size > 1,每隔 step_size 帧输出一次
if (step_size_ == 1 || (buffer_.size() - 1) % step_size_ == 0) {
// 输出当前窗口
cc->Outputs().Index(0).AddPacket(
MakePacket<std::deque<T>>(buffer_).At(cc->InputTimestamp()));
}

return absl::OkStatus();
}

absl::Status Close(CalculatorContext* cc) override {
// 输出剩余数据
if (!buffer_.empty()) {
cc->Outputs().Index(0).AddPacket(
MakePacket<std::deque<T>>(buffer_).At(Timestamp::PostStream()));
}

LOG(INFO) << "SlidingWindowCalculator closed, output frames: " << buffer_.size();

return absl::OkStatus();
}

private:
std::deque<T> buffer_;
int window_size_ = 10;
int step_size_ = 1;
};

// ========== 注册 Calculator ==========
REGISTER_CALCULATOR(SlidingWindowCalculator<float>);
REGISTER_CALCULATOR(SlidingWindowCalculator<cv::Mat>);
REGISTER_CALCULATOR(SlidingWindowCalculator<std::vector<int>>);

} // namespace mediapipe

#endif // SLIDING_WINDOW_CALCULATOR_H_

CMakeLists.txt:

1
2
3
4
5
6
7
8
9
10
11
# ========== 添加滑动窗口 Calculator ==========
mediapipe_cc_library(
name = "sliding_window_calculator",
srcs = ["sliding_window_calculator.cc"],
hdrs = ["sliding_window_calculator.h"],
deps = [
"//mediapipe/framework:calculator_framework",
"//mediapipe/framework:calculator_options_cc_proto",
],
alwayslink = 1,
)

8.2 使用示例

pbtxt 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# sliding_window_graph.pbtxt

# ========== 输入输出 ==========
input_stream: "input_frame"
output_stream: "frame_buffer"

# ========== 滑动窗口 Calculator ==========
node {
calculator: "SlidingWindowCalculator<cv::Mat>"
input_stream: "input_frame"
output_stream: "frame_buffer"
options {
[SlidingWindowOptions.ext] {
window_size: 5
step_size: 1
}
}
}

# ========== 使用示例:计算帧率 ==========
# 输入:30 FPS 视频
# 输出:每帧的帧率统计

# 假设 frame_buffer 包含 5
# t=0: [frame_0]
# t=1: [frame_0, frame_1]
# t=2: [frame_0, frame_1, frame_2]
# t=3: [frame_0, frame_1, frame_2, frame_3]
# t=4: [frame_0, frame_1, frame_2, frame_3, frame_4]
# t=5: [frame_1, frame_2, frame_3, frame_4, frame_5]
# ...

8.3 时序分析 Calculator

计算 PERCLOS 疲劳指标:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// perclos_calculator.h
#ifndef PERCLOS_CALCULATOR_H_
#define PERCLOS_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <deque>

namespace mediapipe {

// ========== PERCLOS Options ==========
message PERCLOSOptions {
optional int32 window_seconds = 1 [default = 30];
optional float ear_threshold = 2 [default = 0.2];
}

// ========== PERCLOS Calculator ==========
class PERCLOSCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
// 输入:EAR(眼纵横比)
cc->Inputs().Tag("EAR").Set<float>();

// 输出:PERCLOS 疲劳指标
cc->Outputs().Tag("PERCLOS").Set<float>();
cc->Outputs().Tag("PERCLOS_WINDOW").Set<std::vector<float>>();

cc->Options<PERCLOSOptions>();

return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<PERCLOSOptions>();
window_seconds_ = options.window_seconds();
ear_threshold_ = options.ear_threshold();

// 初始化窗口
ear_buffer_.clear();
timestamp_buffer_.clear();

LOG(INFO) << "PERCLOSCalculator initialized: "
<< "window_seconds=" << window_seconds_
<< ", ear_threshold=" << ear_threshold_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// 获取输入
float ear = cc->Inputs().Tag("EAR").Get<float>();
Timestamp ts = cc->InputTimestamp();

// 转换为微秒
int64_t us = ts.Value();

// 添加到窗口
ear_buffer_.push_back(ear);
timestamp_buffer_.push_back(us);

// 移除过期的 EAR
while (!ear_buffer_.empty() && !timestamp_buffer_.empty()) {
int64_t oldest_us = timestamp_buffer_.front();
int64_t window_end_us = us - window_seconds_ * 1000000;

if (oldest_us < window_end_us) {
ear_buffer_.pop_front();
timestamp_buffer_.pop_front();
} else {
break;
}
}

// 计算 PERCLOS
float perclos = CalculatePERCLOS();

// 输出
cc->Outputs().Tag("PERCLOS").AddPacket(
MakePacket<float>(perclos).At(cc->InputTimestamp()));

cc->Outputs().Tag("PERCLOS_WINDOW").AddPacket(
MakePacket<std::vector<float>>(ear_buffer_).At(cc->InputTimestamp()));

return absl::OkStatus();
}

absl::Status Close(CalculatorContext* cc) override {
// 输出剩余数据
if (!ear_buffer_.empty()) {
cc->Outputs().Tag("PERCLOS").AddPacket(
MakePacket<float>(CalculatePERCLOS()).At(Timestamp::PostStream()));

cc->Outputs().Tag("PERCLOS_WINDOW").AddPacket(
MakePacket<std::vector<float>>(ear_buffer_).At(Timestamp::PostStream()));
}

LOG(INFO) << "PERCLOSCalculator closed, window size: " << ear_buffer_.size();

return absl::OkStatus();
}

private:
float CalculatePERCLOS() {
if (ear_buffer_.empty()) {
return 0.0f;
}

// PERCLOS = EAR < 阈值 的帧数 / 窗口总帧数
int count = 0;
for (float ear : ear_buffer_) {
if (ear < ear_threshold_) {
count++;
}
}

return static_cast<float>(count) / ear_buffer_.size();
}

std::deque<float> ear_buffer_;
std::deque<int64_t> timestamp_buffer_;
int window_seconds_ = 30;
float ear_threshold_ = 0.2;
};

REGISTER_CALCULATOR(PERCLOSCalculator);

} // namespace mediapipe

#endif // PERCLOS_CALCULATOR_H_

九、常见问题与调试

9.1 时间戳不递增

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// ========== 错误:时间戳不递增 ==========
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"input",
MakePacket<int>(1).At(Timestamp(1000000)))); // t=1s

MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"input",
MakePacket<int>(2).At(Timestamp(500000)))); // t=0.5s ❌

// ========== 正确:时间戳必须单调递增 ==========
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"input",
MakePacket<int>(1).At(Timestamp(1000000)))); // t=1s

MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(
"input",
MakePacket<int>(2).At(Timestamp(2000000)))); // t=2s ✅

9.2 流同步死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ========== 问题:两个流时间戳不匹配,导致死锁 ==========
// Stream A: t=0, t=1, t=2, t=3
// Stream B: t=0, t=2, t=4
// Calculator 在 t=1 处等待 Stream B 的数据

// ========== 解决:使用异步输入处理器 ==========
node {
calculator: "MyCalculator"
input_stream: "input_a"
input_stream: "input_b"

input_stream_handler {
input_stream_handler: "AsyncInputStreamHandler"
}
}

// ========== 或者:使用同步处理器并设置超时 ==========
node {
calculator: "MyCalculator"
input_stream: "input_a"
input_stream: "input_b"

input_stream_handler {
input_stream_handler: "SyncSetInputStreamHandler"
timeout_seconds: 0.1 // 超时 100ms
}
}

9.3 内存泄漏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ========== 错误:Packet 持有原始指针 ==========
auto packet = Adopt(new MyData()); // 需要确保数据被正确释放

// 正确:使用 shared_ptr
auto data = std::make_shared<MyData>();
auto packet = MakePacket<MyData>(*data);

// ========== 错误:忘记释放资源 ==========
absl::Status Process(CalculatorContext* cc) override {
cv::Mat* image = new cv::Mat();
// ... 使用 image ...
// ❌ 忘记 delete image
return absl::OkStatus();
}

// 正确:使用 shared_ptr 或在 Close 中释放
absl::Status Process(CalculatorContext* cc) override {
auto image = std::make_shared<cv::Mat>();
// ... 使用 image ...
return absl::OkStatus();
}

// 或者
absl::Status Close(CalculatorContext* cc) override {
image_.reset(); // 释放
return absl::OkStatus();
}

9.4 调试技巧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// ========== 添加日志 ==========
LOG(INFO) << "Processing frame at timestamp: " << cc->InputTimestamp();
LOG(WARNING) << "Empty input detected";
LOG(ERROR) << "Calculation failed";

// ========== 统计信息 ==========
static int process_count = 0;
process_count++;
LOG(INFO) << "Processed " << process_count << " frames";

// ========== 性能分析 ==========
auto start = std::chrono::high_resolution_clock::now();
// ... 计算逻辑 ...
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end - start).count();
LOG(INFO) << "Process time: " << duration << "us";

// ========== 调试输出 ==========
cv::imshow("Debug", image);
cv::waitKey(1);

// ========== Packet 信息 ==========
LOG(INFO) << "Packet timestamp: " << packet.Timestamp();
LOG(INFO) << "Packet is empty: " << packet.IsEmpty();
LOG(INFO) << "Packet type: " << packet.ValidateAsType<cv::Mat>().ok();

十、总结

概念 说明 关键 API
Packet 数据单元 MakePacket<T>(), packet.Get<T>()
Timestamp 时间戳 Timestamp(v), packet.Timestamp()
Stream 数据通道 input_stream, output_stream
同步 多流对齐 SyncSetInputStreamHandler
零拷贝 零拷贝传递 packet.Value()

下篇预告

MediaPipe 系列 04:Side Packet——静态配置数据的注入

深入讲解 Side Packet 机制、配置注入、静态资源管理。


参考资料

  1. Google AI Edge. MediaPipe Packet Documentation
  2. Google AI Edge. MediaPipe Timestamp Documentation
  3. Lugaresi et al. (2019). MediaPipe: A Framework for Building Perception Pipelines. arXiv:1906.08172

系列进度: 3/55
更新时间: 2026-03-12


MediaPipe 系列 03:Packet 与 Stream——时间序列数据的流动完整指南
https://dapalm.com/2026/03/12/MediaPipe系列03-Packet与Stream:时间序列数据的流动/
作者
Mars
发布于
2026年3月12日
许可协议