前言:为什么时间戳和流同步至关重要?
3.1 IMS 实时处理的挑战
IMS DMS 系统对时间同步的要求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| ┌─────────────────────────────────────────────────────────────────────────┐ │ IMS DMS 时间同步挑战 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 问题:多传感器数据来源不同步 │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ IR Camera (30 FPS) │ │ │ │ Timestamp: 0, 33ms, 66ms, 100ms... │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ │ 时间戳不同步 │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ CAN 总线 (10 Hz) │ │ │ │ Timestamp: 0, 100ms, 200ms, 300ms... │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ 需求:统一时间基准,多流对齐 │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ 疲劳检测算法需要同时使用: │ │ │ │ • IR 图像帧(视觉信息) │ │ │ │ • 车速数据(运动状态) │ │ │ │ • 时间戳(同步基准) │ │ │ │ │ │ │ │ 要求: │ │ │ │ • 视觉帧和车速数据必须对应同一时刻 │ │ │ │ • 延迟 < 50ms(法规要求) │ │ │ │ • 时间戳单调递增,无重复 │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘
|
MediaPipe 通过 Packet 和 Stream 提供时间同步机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| ┌─────────────────────────────────────────────────────────────┐ │ MediaPipe 时间同步机制 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Packet:数据包 │ │ │ │ │ │ │ │ • 携带时间戳(单调递增) │ │ │ │ • 携带数据负载(任意类型) │ │ │ │ • 零拷贝传递 │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Stream:数据流通道 │ │ │ │ │ │ │ │ • Packet 的有序序列(按时间戳排序) │ │ │ │ • 自动时间戳对齐 │ │ │ │ • 支持多流同步 │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Calculator:计算节点 │ │ │ │ │ │ │ │ • 接收多个 Stream 的 Packet │ │ │ │ • 按时间戳同步处理 │ │ │ │ • 输出到新的 Stream │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
|
四、Packet:数据包详解
4.1 Packet 核心结构
Packet 是不可变的数据包,包含时间戳和数据负载:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| template <typename T> class Packet { public: static Packet MakePacket(const T& value); static Packet MakePacket(T&& value); Packet At(Timestamp timestamp) const; static Packet Adopt(T* ptr); static Packet PointToForeign(const T* ptr); const T& Get() const; T& GetMutable(); Timestamp Timestamp() const; bool HasTimestamp() const; bool IsEmpty() const; template <typename U> absl::Status ValidateAsType() const; private: Timestamp timestamp_; std::shared_ptr<T> payload_; };
|
Packet 内部结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| ┌─────────────────────────────────────────────────────────────┐ │ Packet 内部结构 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Packet<std::string> │ │ │ │ │ │ │ │ ┌───────────────────────────────────┐ │ │ │ │ │ timestamp_ │ │ │ │ │ │ • 类型:Timestamp │ │ │ │ │ │ • 值:1000000 (1 秒) │ │ │ │ │ │ • 单调递增 │ │ │ │ │ └───────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌───────────────────────────────────┐ │ │ │ │ │ payload_ (shared_ptr) │ │ │ │ │ │ • 类型:shared_ptr<string> │ │ │ │ │ │ • 指向:new std::string("hello")│ │ │ │ │ │ • 引用计数:1 │ │ │ │ │ │ • 零拷贝传递 │ │ │ │ │ └───────────────────────────────────┘ │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ 时间戳示例: │ │ ┌─────────────────────────────────────────────┐ │ │ │ Timestamp::PreStream() = -1 (流开始前) │ │ │ │ Timestamp(0) = 0 │ │ │ │ Timestamp(1000000) = 1 秒 │ │ │ │ Timestamp(2000000) = 2 秒 │ │ │ │ Timestamp::PostStream() = -2 (流结束后) │ │ │ │ Timestamp::Unset() = -3 (未设置) │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
|
4.2 Packet 创建方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
#include "mediapipe/framework/packet.h"
auto str_packet = MakePacket<std::string>("hello"); auto int_packet = MakePacket<int>(42); auto float_packet = MakePacket<float>(3.14);
cv::Mat image = cv::imread("test.jpg"); auto mat_packet = MakePacket<cv::Mat>(image);
std::vector<int> vec = {1, 2, 3, 4, 5}; auto vec_packet = MakePacket<std::vector<int>>(vec);
auto packet1 = MakePacket<int>(42).At(Timestamp(1000000)); auto packet2 = MakePacket<cv::Mat>(image).At(Timestamp(2000000));
#include <memory>
MyData* raw_ptr = new MyData{1, 2, 3};
auto adopt_packet = Adopt(raw_ptr).At(Timestamp(0));
MyData data{1, 2, 3}; auto foreign_packet = PointToForeign(&data).At(Timestamp(0));
int value = 42; auto packet3 = MakePacket<int>(value).At(Timestamp(1000000));
std::string str = "world"; auto packet4 = MakePacket<std::string>(str).At(Timestamp(2000000));
|
4.3 Packet 访问与操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| #include "mediapipe/framework/packet.h"
auto str_packet = MakePacket<std::string>("hello");
const std::string& str = str_packet.Get<std::string>();
std::string& str_mut = str_packet.GetMutable<std::string>();
auto packet = MakePacket<int>(42).At(Timestamp(1000000));
Timestamp ts = packet.Timestamp(); int64_t us = ts.Value();
bool has_ts = packet.HasTimestamp();
auto empty_packet = MakePacket<int>(); auto valid_packet = MakePacket<int>(42);
if (empty_packet.IsEmpty()) { }
if (!valid_packet.IsEmpty()) { }
auto packet = MakePacket<std::string>("hello");
if (packet.ValidateAsType<std::string>().ok()) { const std::string& str = packet.Get<std::string>(); LOG(INFO) << "Valid string: " << str; } else { LOG(ERROR) << "Type mismatch!"; }
absl::Status status = packet.ValidateAsType<cv::Mat>(); if (!status.ok()) { }
Timestamp ts1 = Timestamp(1000000); Timestamp ts2 = Timestamp(2000000);
if (ts1.IsEarlierThan(ts2)) { LOG(INFO) << "ts1 < ts2"; }
if (ts1.IsSameAs(ts2)) { LOG(INFO) << "ts1 == ts2"; }
if (!ts1.IsEarlierThan(ts2) && !ts1.IsSameAs(ts2)) { LOG(INFO) << "ts1 > ts2"; }
Timestamp ts = Timestamp(1000000);
Timestamp ts_plus = ts.Plus(500000); Timestamp ts_minus = ts.Minus(200000);
Timestamp ts_abs = ts.Abs();
Timestamp::PreStream() Timestamp::PostStream() Timestamp::Unset()
Timestamp t0 = Timestamp::PreStream(); Timestamp t1 = Timestamp::PostStream();
|
4.4 时间戳系统详解
Timestamp 的完整类型系统:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
enum class TimestampType { PRE_STREAM, NORMAL, POST_STREAM, UNSET, };
class Timestamp { public: Timestamp(); explicit Timestamp(int64_t value); static Timestamp PreStream(); static Timestamp PostStream(); static Timestamp Unset(); int64_t Value() const; TimestampType Type() const; bool IsPreStream() const; bool IsPostStream() const; bool IsUnset() const; bool IsNormal() const; bool IsValid() const; bool IsEarlierThan(const Timestamp& other) const; bool IsLaterThan(const Timestamp& other) const; bool IsSameAs(const Timestamp& other) const; bool IsBetween(const Timestamp& start, const Timestamp& end) const; Timestamp Plus(int64_t offset_us) const; Timestamp Minus(int64_t offset_us) const; Timestamp Abs() const; static constexpr int64_t kTimestampUnset = -3; static constexpr int64_t kTimestampPreStream = -1; static constexpr int64_t kTimestampPostStream = -2; };
Timestamp ts0 = Timestamp::PreStream(); Timestamp ts1 = Timestamp(0); Timestamp ts2 = Timestamp(1000000); Timestamp ts3 = Timestamp(2000000); Timestamp ts4 = Timestamp::PostStream(); Timestamp ts5 = Timestamp::Unset();
LOG(INFO) << "ts0 type: " << ts0.Type(); LOG(INFO) << "ts1 type: " << ts1.Type(); LOG(INFO) << "ts4 type: " << ts4.Type();
if (ts1.IsEarlierThan(ts2)) { LOG(INFO) << "ts1 < ts2"; }
Timestamp ts6 = ts2.Plus(500000); Timestamp ts7 = ts2.Minus(500000);
|
时间戳的使用场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| Timestamp start = Timestamp::PreStream();
auto packet = MakePacket<int>(0).At(start);
Timestamp end = Timestamp::PostStream();
graph.CloseInputStream("input"); graph.WaitUntilDone();
Timestamp unset = Timestamp::Unset();
auto empty_packet = MakePacket<int>().At(unset);
for (int i = 0; i < 10; ++i) { auto packet = MakePacket<int>(i).At(Timestamp(i * 1000000)); graph.AddPacketToInputStream("input", packet); }
auto frame1 = MakePacket<cv::Mat>(img1).At(Timestamp(0)); auto frame2 = MakePacket<cv::Mat>(img2).At(Timestamp(3333333)); auto frame3 = MakePacket<cv::Mat>(img3).At(Timestamp(6666666));
|
五、Stream:数据流通道详解
5.1 Stream 核心概念
Stream 是 Packet 的有序序列,按时间戳排序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ┌─────────────────────────────────────────────────────────────┐ │ Stream:数据流 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Stream: ──P1──P2──P3──P4──P5──P6──P7──P8──P9──P10──▶ │ │ │ │ │ │ │ │ │ │ │ │ │ │ t=0 t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8 t=9 │ │ │ │ 特征: │ │ 1. 有序:Packet 按时间戳升序排列 │ │ 2. 单调:时间戳必须单调递增(不能重复) │ │ 3. 同步:多个 Stream 自动时间戳对齐 │ │ 4. 背压:处理慢时自动阻塞输入 │ │ │ └─────────────────────────────────────────────────────────────┘
|
5.2 Stream 连接机制
Calculator 通过 Stream 连接:
1 2 3 4 5 6 7 8 9 10 11 12
| # ========== Stream 连接示例 ========== # Calculator A 输出到 Stream A node { calculator: "CalculatorA" output_stream: "OUTPUT:stream_a" }
# Calculator B 从 Stream A 输入 node { calculator: "CalculatorB" input_stream: "INPUT:stream_a" }
|
连接规则:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| ┌─────────────────────────────────────────────────────────────┐ │ Stream 连接规则 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Calculator A │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ │ │ output_stream: "OUTPUT:stream_a" │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ Stream A │ │ │ │ │ ▼ │ │ Calculator B │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ │ │ input_stream: "INPUT:stream_a" │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ 连接规则: │ │ 1. 输出 Stream 名称 + 输入 Stream 名称必须相同(Tag) │ │ 2. 输出 Stream 名称 + 输入 Stream 名称必须相同(Name) │ │ 3. 如果不指定 Tag,默认使用 Name 连接 │ │ │ └─────────────────────────────────────────────────────────────┘
|
5.3 Stream 标签(Tag)详解
使用 Tag 区分多个输入输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| # ========== 多输入 Stream ========== node { calculator: "MultiInputCalculator" input_stream: "IMAGE:image_stream" # Tag: IMAGE, Name: image_stream input_stream: "AUDIO:audio_stream" # Tag: AUDIO, Name: audio_stream input_stream: "SENSOR:sensor_stream" # Tag: SENSOR, Name: sensor_stream output_stream: "RESULT:result_stream" # Tag: RESULT, Name: result_stream output_stream: "METRICS:metrics_stream" # Tag: METRICS, Name: metrics_stream }
# ========== 多输出 Stream ========== node { calculator: "MultiOutputCalculator" input_stream: "input" output_stream: "DETECTIONS:detections_stream" # Tag: DETECTIONS output_stream: "LANDMARKS:landmarks_stream" # Tag: LANDMARKS output_stream: "POSE:pose_stream" # Tag: POSE }
|
C++ 中访问 Stream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| static absl::Status GetContract(CalculatorContract* cc) { cc->Inputs().Tag("IMAGE").Set<cv::Mat>(); cc->Inputs().Tag("AUDIO").Set<std::vector<float>>(); cc->Inputs().Tag("SENSOR").Set<SensorData>(); cc->Inputs().Index(0).Set<cv::Mat>(); cc->Inputs().Index(1).Set<std::vector<float>>(); cc->Inputs().Index(2).Set<SensorData>(); return absl::OkStatus(); }
static absl::Status GetContract(CalculatorContract* cc) { cc->Outputs().Tag("DETECTIONS").Set<std::vector<Detection>>(); cc->Outputs().Tag("CONFIDENCES").Set<std::vector<float>>(); cc->Outputs().Tag("BBOXES").Set<std::vector<BoundingBox>>(); cc->Outputs().Index(0).Set<std::vector<Detection>>(); cc->Outputs().Index(1).Set<std::vector<float>>(); cc->Outputs().Index(2).Set<std::vector<BoundingBox>>(); return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) override { const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>(); const auto& audio = cc->Inputs().Tag("AUDIO").Get<std::vector<float>>(); if (cc->Inputs().Tag("IMAGE").IsEmpty()) { return absl::OkStatus(); } cc->Outputs().Tag("DETECTIONS").AddPacket( MakePacket<std::vector<Detection>>(detections) .At(cc->InputTimestamp())); cc->Outputs().Tag("CONFIDENCES").AddPacket( MakePacket<std::vector<float>>(confidences) .At(cc->InputTimestamp())); cc->Outputs().Tag("BBOXES").AddPacket( MakePacket<std::vector<BoundingBox>>(bboxes) .At(cc->InputTimestamp())); return absl::OkStatus(); }
|
六、Stream 同步机制
6.1 同步机制原理
当 Calculator 有多个输入 Stream 时,默认按时间戳同步:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| ┌─────────────────────────────────────────────────────────────┐ │ 多流同步机制 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Stream A: ──P1──P2──P3──P4──P5──P6──P7──P8──P9──P10──▶ │ │ │ │ │ │ │ │ │ │ │ │ │ │ t=0 t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8 t=9 │ │ │ │ Stream B: ──P1──P2──P3──P4──P5──P6──P7──P8──P9──P10──▶ │ │ │ │ │ │ │ │ │ │ │ │ │ │ t=0 t=1 t=2 t=3 t=4 t=5 t=6 t=7 t=8 t=9 │ │ │ │ Calculator 处理流程: │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 1. 接收 P1(A) + P1(B) │ │ │ │ └─ 时间戳 t=0,两个 Stream 都有数据 │ │ │ │ └─ Process(t=0) 被调用 │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 2. 接收 P2(A) + P2(B) │ │ │ │ └─ 时间戳 t=1,两个 Stream 都有数据 │ │ │ │ └─ Process(t=1) 被调用 │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ... │ │ │ │ Process(t) 只在所有输入 Stream 都有 Packet(t) 时调用 │ │ │ └─────────────────────────────────────────────────────────────┘
|
6.2 同步策略
MediaPipe 提供多种同步策略:
| 策略 |
说明 |
适用场景 |
sync |
严格同步(默认) |
输入必须时间戳对齐 |
async |
异步 |
不需要同步 |
sync_set |
集合同步 |
多输入融合 |
sync_barrier |
屏障同步 |
批量处理 |
配置同步策略:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| # ========== 严格同步 ========== node { calculator: "MyCalculator" input_stream: "input1" input_stream: "input2" input_stream: "input3" input_stream_handler { input_stream_handler: "SyncSetInputStreamHandler" } }
# ========== 异步 ========== node { calculator: "AsyncCalculator" input_stream: "input1" input_stream: "input2" input_stream_handler { input_stream_handler: "AsyncInputStreamHandler" } }
# ========== 屏障同步 ========== node { calculator: "BarrierCalculator" input_stream: "input1" input_stream: "input2" input_stream: "input3" input_stream_handler { input_stream_handler: "SyncBarrierInputStreamHandler" } }
|
6.3 多流融合示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| class MultiInputCalculator : public CalculatorBase { public: static absl::Status GetContract(CalculatorContract* cc) { cc->Inputs().Tag("IMAGE").Set<cv::Mat>(); cc->Inputs().Tag("AUDIO").Set<std::vector<float>>(); cc->Inputs().Tag("SENSOR").Set<SensorData>(); cc->Outputs().Tag("OUTPUT").Set<MergedData>(); return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) override { bool has_image = !cc->Inputs().Tag("IMAGE").IsEmpty(); bool has_audio = !cc->Inputs().Tag("AUDIO").IsEmpty(); bool has_sensor = !cc->Inputs().Tag("SENSOR").IsEmpty(); if (has_image && has_audio && has_sensor) { const cv::Mat& image = cc->Inputs().Tag("IMAGE").Get<cv::Mat>(); const auto& audio = cc->Inputs().Tag("AUDIO").Get<std::vector<float>>(); const SensorData& sensor = cc->Inputs().Tag("SENSOR").Get<SensorData>(); MergedData merged; merged.image = image; merged.audio = audio; merged.sensor = sensor; cc->Outputs().Tag("OUTPUT").AddPacket( MakePacket<MergedData>(merged).At(cc->InputTimestamp())); } return absl::OkStatus(); } };
REGISTER_CALCULATOR(MultiInputCalculator);
|
pbtxt 配置:
1 2 3 4 5 6 7 8 9
| node { calculator: "MultiInputCalculator" input_stream: "IMAGE:image" input_stream: "AUDIO:audio" input_stream: "SENSOR:sensor" input_stream_handler { input_stream_handler: "SyncSetInputStreamHandler" } }
|
七、Packet 生命周期与内存管理
7.1 引用计数机制
Packet 使用 std::shared_ptr 管理内存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include <memory>
auto packet1 = MakePacket<std::vector<int>>({1, 2, 3});
auto packet2 = packet1;
auto packet3 = packet1;
auto packet1 = MakePacket<std::vector<int>>({1, 2, 3}); auto packet2 = packet1;
packet1.GetMutable<std::vector<int>>()->push_back(4);
LOG(INFO) << packet2.Get<std::vector<int>>()[0]; LOG(INFO) << packet2.Get<std::vector<int>>()[3];
|
7.2 零拷贝设计
Packet 传递时不会复制数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class ZeroCopyCalculator : public CalculatorBase { public: static absl::Status GetContract(CalculatorContract* cc) { cc->Inputs().Tag("INPUT").Set<cv::Mat>(); cc->Outputs().Tag("OUTPUT").Set<cv::Mat>(); return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) override {
cc->Outputs().Tag("OUTPUT").AddPacket( cc->Inputs().Tag("INPUT").Value()); return absl::OkStatus(); } };
REGISTER_CALCULATOR(ZeroCopyCalculator);
|
零拷贝的优势:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| ┌─────────────────────────────────────────────────────────────┐ │ 零拷贝设计优势 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 传统拷贝方式: │ │ ┌─────────────────────────────────────────────┐ │ │ │ Input Packet (100MB) │ │ │ │ └─ 拷贝到 Output Packet (100MB) │ │ │ │ └─ 总内存:200MB │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 零拷贝方式: │ │ ┌─────────────────────────────────────────────┐ │ │ │ Input Packet (100MB) │ │ │ │ └─ shared_ptr 指向同一数据 │ │ │ │ └─ Output Packet (100MB) │ │ │ │ └─ shared_ptr 指向同一数据 │ │ │ │ └─ 总内存:100MB + shared_ptr开销 │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ 性能提升: │ │ • 内存占用减少 50% │ │ • 拷贝时间从 100ms → 0ms │ │ • 适用于大图像、大视频帧 │ │ │ └─────────────────────────────────────────────────────────────┘
|
7.3 数据修改注意事项
Packet 数据是只读的,修改前必须拷贝:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| auto packet = MakePacket<cv::Mat>(image); const cv::Mat& mat = packet.Get<cv::Mat>();
mat.at<uint8_t>(0, 0) = 255;
cv::Mat& mat_mut = const_cast<cv::Mat&>(mat); mat_mut.at<uint8_t>(0, 0) = 255;
auto packet = MakePacket<cv::Mat>(image); const cv::Mat& mat = packet.Get<cv::Mat>();
cv::Mat mat_copy = mat.clone(); mat_copy.at<uint8_t>(0, 0) = 255;
auto new_packet = MakePacket<cv::Mat>(mat_copy).At(packet.Timestamp());
auto packet = MakePacket<cv::Mat>(image); cv::Mat output = packet.Get<cv::Mat>().clone(); output.at<uint8_t>(0, 0) = 255;
auto packet = MakePacket<cv::Mat>(image); cv::Mat output = packet.Get<cv::Mat>().clone(); output.at<uint8_t>(0, 0) = 255; auto new_packet = MakePacket<cv::Mat>(output).At(packet.Timestamp());
|
八、实战:时序分析 Calculator
8.1 滑动窗口 Calculator
需求:缓存最近 N 帧数据,用于时序分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| #ifndef SLIDING_WINDOW_CALCULATOR_H_ #define SLIDING_WINDOW_CALCULATOR_H_
#include "mediapipe/framework/calculator_framework.h" #include <deque> #include <memory>
namespace mediapipe {
message SlidingWindowOptions { optional int32 window_size = 1 [default = 10]; optional int64_t step_size = 2 [default = 1]; }
template <typename T> class SlidingWindowCalculator : public CalculatorBase { public: static absl::Status GetContract(CalculatorContract* cc) { cc->Inputs().Index(0).Set<T>(); cc->Outputs().Index(0).Set<std::deque<T>>(); cc->Options<SlidingWindowOptions>(); return absl::OkStatus(); }
absl::Status Open(CalculatorContext* cc) override { window_size_ = cc->Options<SlidingWindowOptions>().window_size(); step_size_ = cc->Options<SlidingWindowOptions>().step_size(); buffer_.clear(); LOG(INFO) << "SlidingWindowCalculator initialized: " << "window_size=" << window_size_ << ", step_size=" << step_size_; return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) override { const T& new_data = cc->Inputs().Index(0).Get<T>(); buffer_.push_back(new_data); while (buffer_.size() > window_size_) { buffer_.pop_front(); } if (step_size_ == 1 || (buffer_.size() - 1) % step_size_ == 0) { cc->Outputs().Index(0).AddPacket( MakePacket<std::deque<T>>(buffer_).At(cc->InputTimestamp())); } return absl::OkStatus(); }
absl::Status Close(CalculatorContext* cc) override { if (!buffer_.empty()) { cc->Outputs().Index(0).AddPacket( MakePacket<std::deque<T>>(buffer_).At(Timestamp::PostStream())); } LOG(INFO) << "SlidingWindowCalculator closed, output frames: " << buffer_.size(); return absl::OkStatus(); }
private: std::deque<T> buffer_; int window_size_ = 10; int step_size_ = 1; };
REGISTER_CALCULATOR(SlidingWindowCalculator<float>); REGISTER_CALCULATOR(SlidingWindowCalculator<cv::Mat>); REGISTER_CALCULATOR(SlidingWindowCalculator<std::vector<int>>);
}
#endif
|
CMakeLists.txt:
1 2 3 4 5 6 7 8 9 10 11
| mediapipe_cc_library( name = "sliding_window_calculator", srcs = ["sliding_window_calculator.cc"], hdrs = ["sliding_window_calculator.h"], deps = [ "//mediapipe/framework:calculator_framework", "//mediapipe/framework:calculator_options_cc_proto", ], alwayslink = 1, )
|
8.2 使用示例
pbtxt 配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| # sliding_window_graph.pbtxt
# ========== 输入输出 ========== input_stream: "input_frame" output_stream: "frame_buffer"
# ========== 滑动窗口 Calculator ========== node { calculator: "SlidingWindowCalculator<cv::Mat>" input_stream: "input_frame" output_stream: "frame_buffer" options { [SlidingWindowOptions.ext] { window_size: 5 step_size: 1 } } }
# ========== 使用示例:计算帧率 ========== # 输入:30 FPS 视频 # 输出:每帧的帧率统计
# 假设 frame_buffer 包含 5 帧 # t=0: [frame_0] # t=1: [frame_0, frame_1] # t=2: [frame_0, frame_1, frame_2] # t=3: [frame_0, frame_1, frame_2, frame_3] # t=4: [frame_0, frame_1, frame_2, frame_3, frame_4] # t=5: [frame_1, frame_2, frame_3, frame_4, frame_5] # ...
|
8.3 时序分析 Calculator
计算 PERCLOS 疲劳指标:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
| #ifndef PERCLOS_CALCULATOR_H_ #define PERCLOS_CALCULATOR_H_
#include "mediapipe/framework/calculator_framework.h" #include <deque>
namespace mediapipe {
message PERCLOSOptions { optional int32 window_seconds = 1 [default = 30]; optional float ear_threshold = 2 [default = 0.2]; }
class PERCLOSCalculator : public CalculatorBase { public: static absl::Status GetContract(CalculatorContract* cc) { cc->Inputs().Tag("EAR").Set<float>(); cc->Outputs().Tag("PERCLOS").Set<float>(); cc->Outputs().Tag("PERCLOS_WINDOW").Set<std::vector<float>>(); cc->Options<PERCLOSOptions>(); return absl::OkStatus(); }
absl::Status Open(CalculatorContext* cc) override { const auto& options = cc->Options<PERCLOSOptions>(); window_seconds_ = options.window_seconds(); ear_threshold_ = options.ear_threshold(); ear_buffer_.clear(); timestamp_buffer_.clear(); LOG(INFO) << "PERCLOSCalculator initialized: " << "window_seconds=" << window_seconds_ << ", ear_threshold=" << ear_threshold_; return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) override { float ear = cc->Inputs().Tag("EAR").Get<float>(); Timestamp ts = cc->InputTimestamp(); int64_t us = ts.Value(); ear_buffer_.push_back(ear); timestamp_buffer_.push_back(us); while (!ear_buffer_.empty() && !timestamp_buffer_.empty()) { int64_t oldest_us = timestamp_buffer_.front(); int64_t window_end_us = us - window_seconds_ * 1000000; if (oldest_us < window_end_us) { ear_buffer_.pop_front(); timestamp_buffer_.pop_front(); } else { break; } } float perclos = CalculatePERCLOS(); cc->Outputs().Tag("PERCLOS").AddPacket( MakePacket<float>(perclos).At(cc->InputTimestamp())); cc->Outputs().Tag("PERCLOS_WINDOW").AddPacket( MakePacket<std::vector<float>>(ear_buffer_).At(cc->InputTimestamp())); return absl::OkStatus(); }
absl::Status Close(CalculatorContext* cc) override { if (!ear_buffer_.empty()) { cc->Outputs().Tag("PERCLOS").AddPacket( MakePacket<float>(CalculatePERCLOS()).At(Timestamp::PostStream())); cc->Outputs().Tag("PERCLOS_WINDOW").AddPacket( MakePacket<std::vector<float>>(ear_buffer_).At(Timestamp::PostStream())); } LOG(INFO) << "PERCLOSCalculator closed, window size: " << ear_buffer_.size(); return absl::OkStatus(); }
private: float CalculatePERCLOS() { if (ear_buffer_.empty()) { return 0.0f; } int count = 0; for (float ear : ear_buffer_) { if (ear < ear_threshold_) { count++; } } return static_cast<float>(count) / ear_buffer_.size(); } std::deque<float> ear_buffer_; std::deque<int64_t> timestamp_buffer_; int window_seconds_ = 30; float ear_threshold_ = 0.2; };
REGISTER_CALCULATOR(PERCLOSCalculator);
}
#endif
|
九、常见问题与调试
9.1 时间戳不递增
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "input", MakePacket<int>(1).At(Timestamp(1000000))));
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "input", MakePacket<int>(2).At(Timestamp(500000))));
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "input", MakePacket<int>(1).At(Timestamp(1000000))));
MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "input", MakePacket<int>(2).At(Timestamp(2000000))));
|
9.2 流同步死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
node { calculator: "MyCalculator" input_stream: "input_a" input_stream: "input_b" input_stream_handler { input_stream_handler: "AsyncInputStreamHandler" } }
node { calculator: "MyCalculator" input_stream: "input_a" input_stream: "input_b" input_stream_handler { input_stream_handler: "SyncSetInputStreamHandler" timeout_seconds: 0.1 } }
|
9.3 内存泄漏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| auto packet = Adopt(new MyData());
auto data = std::make_shared<MyData>(); auto packet = MakePacket<MyData>(*data);
absl::Status Process(CalculatorContext* cc) override { cv::Mat* image = new cv::Mat(); return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) override { auto image = std::make_shared<cv::Mat>(); return absl::OkStatus(); }
absl::Status Close(CalculatorContext* cc) override { image_.reset(); return absl::OkStatus(); }
|
9.4 调试技巧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| LOG(INFO) << "Processing frame at timestamp: " << cc->InputTimestamp(); LOG(WARNING) << "Empty input detected"; LOG(ERROR) << "Calculation failed";
static int process_count = 0; process_count++; LOG(INFO) << "Processed " << process_count << " frames";
auto start = std::chrono::high_resolution_clock::now();
auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::microseconds>( end - start).count(); LOG(INFO) << "Process time: " << duration << "us";
cv::imshow("Debug", image); cv::waitKey(1);
LOG(INFO) << "Packet timestamp: " << packet.Timestamp(); LOG(INFO) << "Packet is empty: " << packet.IsEmpty(); LOG(INFO) << "Packet type: " << packet.ValidateAsType<cv::Mat>().ok();
|
十、总结
| 概念 |
说明 |
关键 API |
| Packet |
数据单元 |
MakePacket<T>(), packet.Get<T>() |
| Timestamp |
时间戳 |
Timestamp(v), packet.Timestamp() |
| Stream |
数据通道 |
input_stream, output_stream |
| 同步 |
多流对齐 |
SyncSetInputStreamHandler |
| 零拷贝 |
零拷贝传递 |
packet.Value() |
下篇预告
MediaPipe 系列 04:Side Packet——静态配置数据的注入
深入讲解 Side Packet 机制、配置注入、静态资源管理。
参考资料
- Google AI Edge. MediaPipe Packet Documentation
- Google AI Edge. MediaPipe Timestamp Documentation
- Lugaresi et al. (2019). MediaPipe: A Framework for Building Perception Pipelines. arXiv:1906.08172
系列进度: 3/55
更新时间: 2026-03-12