MediaPipe 系列 21:外部数据注入 Calculator——传感器数据接入完整指南

前言:为什么需要外部数据注入?

21.1 外部数据接入的重要性

IMS 需要融合多种外部数据才能做出准确判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
┌─────────────────────────────────────────────────────────────────────────┐
│ 外部数据接入的重要性 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 问题:仅靠视觉无法判断驾驶状态? │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ IMS DMS 需要融合的传感器数据: │ │
│ │ │ │
│ │ • CAN 总线:车速、方向盘角度、转向灯、档位 │ │
│ │ • GPS:位置、速度、航向 │ │
│ │ • IMU:加速度、角速度(转弯检测) │ │
│ │ • 麦克风:语音识别、呼救检测 │ │
│ │ • 雷达:CPD 儿童存在检测 │ │
│ │ • 座椅传感器:乘员检测 │ │
│ │ │ │
│ │ 为什么需要融合? │ │
│ │ • 视线偏移 + 低速 → 不严重 │ │
│ │ • 视线偏移 + 高速 → 严重分心 │ │
│ │ • 闭眼 + 转弯 → 极度危险 │ │
│ │ • 打哈欠 + 高速 + 夜间 → 疲劳预警 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 挑战: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ • 不同传感器有不同的采样率 │ │
│ │ • 时间戳可能不对齐 │ │
│ │ • 数据格式各异 │ │
│ │ • 需要实时性 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 解决方案:外部数据注入 Calculator │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. 数据源 Calculator:从硬件读取数据 │ │
│ │ 2. 解析 Calculator:转换为标准格式 │ │
│ │ 3. 同步 Calculator:对齐时间戳 │ │
│ │ 4. 融合 Calculator:多源数据融合 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘

21.2 数据源分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
┌─────────────────────────────────────────────────────────────┐
│ 数据源分类 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. CAN 总线 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 数据:车速、方向盘、转向灯、档位 │ │
│ │ 频率:10-100 Hz │ │
│ │ 接口:Socket CAN、CAN 适配器 │ │
│ │ 协议:DBC 文件定义 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 2. GPS │
│ ┌─────────────────────────────────────────────┐ │
│ │ 数据:位置、速度、航向 │ │
│ │ 频率:1-10 Hz │ │
│ │ 接口:串口、USB │ │
│ │ 协议:NMEA 0183 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 3. IMU │
│ ┌─────────────────────────────────────────────┐ │
│ │ 数据:加速度、角速度、磁场 │ │
│ │ 频率:50-200 Hz │ │
│ │ 接口:I2C、SPI │ │
│ │ 协议:厂商定义 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 4. 音频 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 数据:麦克风音频 │ │
│ │ 频率:16-48 kHz 采样 │ │
│ │ 接口:ALSA、PulseAudio │ │
│ │ 协议:PCM │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 5. 雷达 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 数据:距离、速度、角度(CPD) │ │
│ │ 频率:10-30 Hz │ │
│ │ 接口:串口、以太网 │ │
│ │ 协议:厂商定义 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

二十二、CAN 总线数据注入

22.1 Socket CAN 基础

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// ========== Socket CAN 基础知识 ==========

// Linux 下 CAN 总线编程使用 Socket CAN

#include <linux/can.h>
#include <linux/can/raw.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <net/if.h>

// ========== CAN 帧结构 ==========
struct can_frame {
uint32_t can_id; // CAN ID(11 位标准或 29 位扩展)
uint8_t can_dlc; // 数据长度(0-8)
uint8_t data[8]; // 数据(最多 8 字节)
};

// ========== 打开 CAN 接口 ==========
int OpenCANSocket(const std::string& interface) {
// 创建 Socket
int sock = socket(PF_CAN, SOCK_RAW, CAN_RAW);
if (sock < 0) {
LOG(ERROR) << "Failed to create CAN socket";
return -1;
}

// 获取接口索引
struct ifreq ifr;
strcpy(ifr.ifr_name, interface.c_str());
ioctl(sock, SIOCGIFINDEX, &ifr);

// 绑定到 CAN 接口
struct sockaddr_can addr;
memset(&addr, 0, sizeof(addr));
addr.can_family = AF_CAN;
addr.can_ifindex = ifr.ifr_ifindex;

if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
LOG(ERROR) << "Failed to bind CAN socket";
close(sock);
return -1;
}

LOG(INFO) << "CAN socket opened on " << interface;
return sock;
}

// ========== 读取 CAN 帧 ==========
bool ReadCANFrame(int sock, can_frame* frame) {
int nbytes = read(sock, frame, sizeof(can_frame));
if (nbytes < 0) {
return false;
}
return nbytes == sizeof(can_frame);
}

// ========== 写入 CAN 帧 ==========
bool WriteCANFrame(int sock, const can_frame& frame) {
int nbytes = write(sock, &frame, sizeof(can_frame));
return nbytes == sizeof(can_frame);
}

22.2 CAN 数据源 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// can_data_source_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_SENSORS_CAN_DATA_SOURCE_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_SENSORS_CAN_DATA_SOURCE_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <linux/can.h>

namespace mediapipe {

// ========== Proto Options ==========
/*
syntax = "proto3";
package mediapipe;

message CANSourceOptions {
optional string can_interface = 1 [default = "can0"];
optional int32 frame_rate = 2 [default = 50];
optional bool block_on_read = 3 [default = false];

// CAN ID 过滤
repeated uint32 filter_ids = 4;
}
*/

// ========== CAN 帧消息 ==========
message CANFrameMessage {
uint32 can_id = 1;
uint64 timestamp_us = 2;
bytes data = 3; // 最多 8 字节
}

// ========== 车辆状态消息 ==========
message VehicleState {
uint64 timestamp_us = 1;
float speed = 2; // km/h
float steering_angle = 3; // degrees
bool turn_signal_left = 4;
bool turn_signal_right = 5;
int32 gear = 6; // 0=P, 1=R, 2=N, 3=D
float engine_rpm = 7;
bool brake_pressed = 8;
float accelerator_position = 9; // 0-100%
}

// ========== CAN 数据源 Calculator ==========
class CANDataSourceCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Outputs().Tag("CAN_FRAME").Set<CANFrameMessage>();
cc->Outputs().Tag("VEHICLE_STATE").Set<VehicleState>();
cc->Options<CANSourceOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<CANSourceOptions>();

can_interface_ = options.can_interface();
frame_rate_ = options.frame_rate();
block_on_read_ = options.block_on_read();

for (const auto& id : options.filter_ids()) {
filter_ids_.insert(id);
}

// 打开 CAN 接口
can_socket_ = OpenCANSocket(can_interface_);
if (can_socket_ < 0) {
return absl::InternalError("Failed to open CAN socket");
}

LOG(INFO) << "CANDataSourceCalculator opened on " << can_interface_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// ========== 读取 CAN 帧 ==========
can_frame frame;
if (!ReadCANFrame(can_socket_, &frame)) {
return absl::OkStatus();
}

// ========== 过滤 CAN ID ==========
if (!filter_ids_.empty() && filter_ids_.count(frame.can_id) == 0) {
return absl::OkStatus();
}

uint64_t timestamp_us = GetCurrentTimeUs();

// ========== 输出原始 CAN 帧 ==========
CANFrameMessage can_msg;
can_msg.set_can_id(frame.can_id);
can_msg.set_timestamp_us(timestamp_us);
can_msg.set_data(std::string(reinterpret_cast<char*>(frame.data),
frame.can_dlc));

cc->Outputs().Tag("CAN_FRAME").AddPacket(
MakePacket<CANFrameMessage>(can_msg).At(cc->InputTimestamp()));

// ========== 解析车辆状态 ==========
VehicleState state = ParseCANFrame(frame, timestamp_us);

cc->Outputs().Tag("VEHICLE_STATE").AddPacket(
MakePacket<VehicleState>(state).At(cc->InputTimestamp()));

return absl::OkStatus();
}

absl::Status Close(CalculatorContext* cc) override {
if (can_socket_ >= 0) {
close(can_socket_);
can_socket_ = -1;
}
return absl::OkStatus();
}

private:
std::string can_interface_ = "can0";
int frame_rate_ = 50;
bool block_on_read_ = false;
std::set<uint32_t> filter_ids_;
int can_socket_ = -1;

// ========== CAN ID 定义 ==========
// 实际项目中应从 DBC 文件解析
static constexpr uint32_t CAN_ID_SPEED = 0x123;
static constexpr uint32_t CAN_ID_STEERING = 0x124;
static constexpr uint32_t CAN_ID_SIGNALS = 0x125;
static constexpr uint32_t CAN_ID_GEAR = 0x126;
static constexpr uint32_t CAN_ID_RPM = 0x127;

int OpenCANSocket(const std::string& interface);
bool ReadCANFrame(int sock, can_frame* frame);

uint64_t GetCurrentTimeUs() {
auto now = std::chrono::high_resolution_clock::now();
auto duration = now.time_since_epoch();
return std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
}

VehicleState ParseCANFrame(const can_frame& frame, uint64_t timestamp_us) {
VehicleState state;
state.set_timestamp_us(timestamp_us);

switch (frame.can_id) {
case CAN_ID_SPEED:
// 假设:字节 0-1 为速度,单位 0.1 km/h
state.set_speed(((frame.data[0] << 8) | frame.data[1]) * 0.1f);
break;

case CAN_ID_STEERING:
// 假设:字节 0-1 为方向盘角度,偏移 780 度
state.set_steering_angle(((frame.data[0] << 8) | frame.data[1]) * 0.1f - 780.0f);
break;

case CAN_ID_SIGNALS:
state.set_turn_signal_left(frame.data[0] & 0x01);
state.set_turn_signal_right(frame.data[0] & 0x02);
state.set_brake_pressed(frame.data[0] & 0x04);
break;

case CAN_ID_GEAR:
state.set_gear(frame.data[0] & 0x0F);
break;

case CAN_ID_RPM:
state.set_engine_rpm(((frame.data[0] << 8) | frame.data[1]));
break;
}

return state;
}
};

REGISTER_CALCULATOR(CANDataSourceCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_SENSORS_CAN_DATA_SOURCE_CALCULATOR_H_

22.3 Graph 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# can_data_source_graph.pbtxt

# CAN 数据源
node {
calculator: "CANDataSourceCalculator"
output_stream: "CAN_FRAME:can_frame"
output_stream: "VEHICLE_STATE:vehicle_state"
options {
[mediapipe.CANSourceOptions.ext] {
can_interface: "can0"
frame_rate: 50
filter_ids: 0x123
filter_ids: 0x124
filter_ids: 0x125
filter_ids: 0x126
filter_ids: 0x127
}
}
}

# 打印车辆状态
node {
calculator: "VehicleStatePrinter"
input_stream: "STATE:vehicle_state"
}

# 发送到下游处理
node {
calculator: "VehicleStatePublisher"
input_stream: "STATE:vehicle_state"
output_stream: "PUBLISHED:published_state"
}

二十三、GPS 数据接入

23.1 GPS 数据源 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// gps_data_source_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_SENSORS_GPS_DATA_SOURCE_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_SENSORS_GPS_DATA_SOURCE_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <termios.h>

namespace mediapipe {

// ========== GPS 数据消息 ==========
message GPSData {
uint64 timestamp_us = 1;
double latitude = 2; // 纬度(度)
double longitude = 3; // 经度(度)
double altitude = 4; // 海拔(米)
float speed = 5; // 速度(km/h)
float heading = 6; // 航向(度)
float hdop = 7; // 水平精度因子
int32 satellites = 8; // 卫星数量
}

// ========== GPS 数据源 Calculator ==========
class GPSDataSourceCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Outputs().Tag("GPS_DATA").Set<GPSData>();
cc->Options<GPSSourceOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<GPSSourceOptions>();

serial_device_ = options.serial_device();
baud_rate_ = options.baud_rate();

// 打开串口
serial_fd_ = OpenSerialPort(serial_device_, baud_rate_);
if (serial_fd_ < 0) {
return absl::InternalError("Failed to open GPS serial port");
}

LOG(INFO) << "GPSDataSourceCalculator opened on " << serial_device_;

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
// ========== 读取 NMEA 数据 ==========
std::string line;
if (!ReadLine(&line)) {
return absl::OkStatus();
}

// ========== 解析 NMEA ==========
GPSData gps_data = ParseNMEA(line);

if (gps_data.satellites() > 0) {
cc->Outputs().Tag("GPS_DATA").AddPacket(
MakePacket<GPSData>(gps_data).At(cc->InputTimestamp()));
}

return absl::OkStatus();
}

absl::Status Close(CalculatorContext* cc) override {
if (serial_fd_ >= 0) {
close(serial_fd_);
serial_fd_ = -1;
}
return absl::OkStatus();
}

private:
std::string serial_device_ = "/dev/ttyUSB0";
int baud_rate_ = 9600;
int serial_fd_ = -1;
std::string buffer_;

int OpenSerialPort(const std::string& device, int baud_rate);
bool ReadLine(std::string* line);

GPSData ParseNMEA(const std::string& nmea) {
GPSData gps;

// 解析 GPGGA(位置)
if (nmea.find("$GPGGA") == 0) {
ParseGPGGA(nmea, &gps);
}

// 解析 GPRMC(速度、航向)
if (nmea.find("$GPRMC") == 0) {
ParseGPRMC(nmea, &gps);
}

gps.set_timestamp_us(GetCurrentTimeUs());

return gps;
}

void ParseGPGGA(const std::string& nmea, GPSData* gps);
void ParseGPRMC(const std::string& nmea, GPSData* gps);

uint64_t GetCurrentTimeUs();
};

REGISTER_CALCULATOR(GPSDataSourceCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_SENSORS_GPS_DATA_SOURCE_CALCULATOR_H_

二十四、数据同步机制

24.1 时间戳同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
┌─────────────────────────────────────────────────────────────┐
│ 时间戳同步策略 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 问题:不同传感器有不同的采样率 │
│ │
│ 视频流: ──F1──F2──F3──F4──F5──▶ (30 FPS) │
│ │ │ │ │ │ │
t=0 t=33 t=66 t=99 t=132 ms │
│ │
│ CAN 总线:──C1────C2────C3────C4──▶ (50 Hz) │
│ │ │ │ │ │
t=0 t=20 t=40 t=60 t=80 ms │
│ │
│ 同步方案: │
│ ┌─────────────────────────────────────────────┐ │
│ │ 1. 最近邻插值 │ │
│ │ 找最近时间戳的 CAN 数据 │ │
│ │ │ │
│ │ F(t=33ms) + CAN(t=40ms) │ │
│ │ │ │
│ │ 2. 线性插值 │ │
│ │ 插值计算精确时刻的值 │ │
│ │ │ │
│ │ CAN(t=33ms) = CAN(20ms) + │ │
│ │ (33-20)/(40-20) * (CAN(40)-CAN(20)) │ │
│ │ │ │
│ │ 3. 缓存窗口 │ │
│ │ 维护历史数据窗口 │ │
│ │ │ │
│ │ [CAN(t=0,20,40,60...)] │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

24.2 同步 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// sensor_sync_calculator.h
#ifndef MEDIAPIPE_CALCULATORS_SENSORS_SENSOR_SYNC_CALCULATOR_H_
#define MEDIAPIPE_CALCULATORS_SENSORS_SENSOR_SYNC_CALCULATOR_H_

#include "mediapipe/framework/calculator_framework.h"
#include <deque>

namespace mediapipe {

// ========== 同步状态消息 ==========
message SyncedState {
uint64 timestamp_us = 1;

// 视觉数据
int32 gaze_zone = 10;
float fatigue_score = 11;

// 车辆状态
float speed = 20;
float steering_angle = 21;
bool turn_signal_left = 22;
bool turn_signal_right = 23;
}

// ========== 传感器同步 Calculator ==========
class SensorSyncCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("GAZE_ZONE").Set<int>();
cc->Inputs().Tag("FATIGUE").Set<float>();
cc->Inputs().Tag("VEHICLE_STATE").Set<VehicleState>();

cc->Outputs().Tag("SYNCED_STATE").Set<SyncedState>();

cc->Options<SensorSyncOptions>();
return absl::OkStatus();
}

absl::Status Open(CalculatorContext* cc) override {
const auto& options = cc->Options<SensorSyncOptions>();

max_latency_ms_ = options.max_latency_ms();
interpolation_ = options.interpolation();

return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
uint64_t current_time = cc->InputTimestamp().Value();

// ========== 缓存输入数据 ==========
if (!cc->Inputs().Tag("GAZE_ZONE").IsEmpty()) {
int gaze = cc->Inputs().Tag("GAZE_ZONE").Get<int>();
gaze_buffer_.push_back({current_time, gaze});
}

if (!cc->Inputs().Tag("FATIGUE").IsEmpty()) {
float fatigue = cc->Inputs().Tag("FATIGUE").Get<float>();
fatigue_buffer_.push_back({current_time, fatigue});
}

if (!cc->Inputs().Tag("VEHICLE_STATE").IsEmpty()) {
auto state = cc->Inputs().Tag("VEHICLE_STATE").Get<VehicleState>();
vehicle_buffer_.push_back({current_time, state});
}

// ========== 清理过期数据 ==========
uint64_t cutoff_time = current_time - max_latency_ms_ * 1000;
CleanupOldBuffer(gaze_buffer_, cutoff_time);
CleanupOldBuffer(fatigue_buffer_, cutoff_time);
CleanupOldBuffer(vehicle_buffer_, cutoff_time);

// ========== 同步输出 ==========
SyncedState synced;
synced.set_timestamp_us(current_time);

// 最近邻查找
synced.set_gaze_zone(FindNearest(gaze_buffer_, current_time));
synced.set_fatigue_score(FindNearest(fatigue_buffer_, current_time));

// 插值
if (!vehicle_buffer_.empty()) {
auto state = InterpolateVehicleState(current_time);
synced.set_speed(state.speed());
synced.set_steering_angle(state.steering_angle());
synced.set_turn_signal_left(state.turn_signal_left());
synced.set_turn_signal_right(state.turn_signal_right());
}

cc->Outputs().Tag("SYNCED_STATE").AddPacket(
MakePacket<SyncedState>(synced).At(cc->InputTimestamp()));

return absl::OkStatus();
}

private:
int max_latency_ms_ = 100;
bool interpolation_ = true;

template <typename T>
struct TimedData {
uint64_t timestamp;
T data;
};

std::deque<TimedData<int>> gaze_buffer_;
std::deque<TimedData<float>> fatigue_buffer_;
std::deque<TimedData<VehicleState>> vehicle_buffer_;

template <typename T>
void CleanupOldBuffer(std::deque<TimedData<T>>& buffer, uint64_t cutoff) {
while (!buffer.empty() && buffer.front().timestamp < cutoff) {
buffer.pop_front();
}
}

template <typename T>
T FindNearest(const std::deque<TimedData<T>>& buffer, uint64_t target) {
if (buffer.empty()) return T();

// 二分查找最近
auto it = std::lower_bound(buffer.begin(), buffer.end(), target,
[](const TimedData<T>& a, uint64_t t) { return a.timestamp < t; });

if (it == buffer.end()) return buffer.back().data;
if (it == buffer.begin()) return it->data;

auto prev = it - 1;
if (target - prev->timestamp < it->timestamp - target) {
return prev->data;
}
return it->data;
}

VehicleState InterpolateVehicleState(uint64_t target) {
if (vehicle_buffer_.empty()) return VehicleState();
if (vehicle_buffer_.size() == 1) return vehicle_buffer_.front().data;

// 线性插值
auto it = std::lower_bound(vehicle_buffer_.begin(), vehicle_buffer_.end(),
target, [](const TimedData<VehicleState>& a, uint64_t t) {
return a.timestamp < t;
});

if (it == vehicle_buffer_.end()) return vehicle_buffer_.back().data;
if (it == vehicle_buffer_.begin()) return it->data;

auto prev = it - 1;
float t = static_cast<float>(target - prev->timestamp) /
(it->timestamp - prev->timestamp);

VehicleState result;
result.set_speed(prev->data.speed() + t * (it->data.speed() - prev->data.speed()));
result.set_steering_angle(prev->data.steering_angle() +
t * (it->data.steering_angle() - prev->data.steering_angle()));

return result;
}
};

REGISTER_CALCULATOR(SensorSyncCalculator);

} // namespace mediapipe

#endif // MEDIAPIPE_CALCULATORS_SENSORS_SENSOR_SYNC_CALCULATOR_H_

二十五、IMS 实战:分心检测融合

25.1 完整 Graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# ims_distraction_with_vehicle_state.pbtxt

input_stream: "IR_IMAGE:ir_image"
output_stream: "DISTRACTION_SCORE:distraction_score"
output_stream: "ALERT:alert"

# ========== CAN 数据源 ==========
node {
calculator: "CANDataSourceCalculator"
output_stream: "VEHICLE_STATE:vehicle_state"
options {
[mediapipe.CANSourceOptions.ext] {
can_interface: "can0"
frame_rate: 50
}
}
}

# ========== 视觉处理 ==========
node {
calculator: "FaceDetector"
input_stream: "IMAGE:ir_image"
output_stream: "DETECTIONS:detections"
}

node {
calculator: "GazeDetectionCalculator"
input_stream: "IMAGE:ir_image"
input_stream: "DETECTIONS:detections"
output_stream: "GAZE_ZONE:gaze_zone"
}

node {
calculator: "FatigueCalculator"
input_stream: "IMAGE:ir_image"
input_stream: "DETECTIONS:detections"
output_stream: "FATIGUE_SCORE:fatigue"
}

# ========== 数据同步 ==========
node {
calculator: "SensorSyncCalculator"
input_stream: "GAZE_ZONE:gaze_zone"
input_stream: "FATIGUE:fatigue"
input_stream: "VEHICLE_STATE:vehicle_state"
output_stream: "SYNCED_STATE:synced_state"
options {
[mediapipe.SensorSyncOptions.ext] {
max_latency_ms: 100
interpolation: true
}
}
}

# ========== 分心判断 ==========
node {
calculator: "DistractionAnalyzerCalculator"
input_stream: "SYNCED_STATE:synced_state"
output_stream: "DISTRACTION_SCORE:distraction_score"
output_stream: "ALERT:alert"
}

25.2 分心判断 Calculator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// distraction_analyzer_calculator.h
class DistractionAnalyzerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Tag("SYNCED_STATE").Set<SyncedState>();
cc->Outputs().Tag("DISTRACTION_SCORE").Set<float>();
cc->Outputs().Tag("ALERT").Set<bool>();
return absl::OkStatus();
}

absl::Status Process(CalculatorContext* cc) override {
if (cc->Inputs().Tag("SYNCED_STATE").IsEmpty()) {
return absl::OkStatus();
}

const auto& state = cc->Inputs().Tag("SYNCED_STATE").Get<SyncedState>();

float distraction_score = 0.0f;

// ========== 1. 视线偏离检测 ==========
if (state.gaze_zone() != GAZE_FORWARD) {
distraction_score += 0.4f;
}

// ========== 2. 低速时不严重 ==========
if (state.speed() < 10) {
distraction_score *= 0.5f;
}

// ========== 3. 高速时更严重 ==========
if (state.speed() > 80) {
distraction_score *= 1.5f;
}

// ========== 4. 转弯时更严重 ==========
if (std::abs(state.steering_angle()) > 30) {
distraction_score *= 1.3f;
}

// ========== 5. 打转向灯时容忍度提高 ==========
if (state.turn_signal_left() || state.turn_signal_right()) {
distraction_score *= 0.7f;
}

// ========== 6. 疲劳时更危险 ==========
if (state.fatigue_score() > 0.5) {
distraction_score *= 1.5f;
}

// ========== 限制范围 ==========
distraction_score = std::min(1.0f, std::max(0.0f, distraction_score));

// ========== 输出 ==========
cc->Outputs().Tag("DISTRACTION_SCORE").AddPacket(
MakePacket<float>(distraction_score).At(cc->InputTimestamp()));

bool alert = distraction_score > 0.6f;
cc->Outputs().Tag("ALERT").AddPacket(
MakePacket<bool>(alert).At(cc->InputTimestamp()));

return absl::OkStatus();
}
};

REGISTER_CALCULATOR(DistractionAnalyzerCalculator);

二十六、总结

数据源 接入方式 同步策略
CAN 总线 Socket CAN 最近邻/插值
GPS 串口 NMEA 最近邻
IMU I2C/SPI 插值
音频 ALSA 时间戳对齐
雷达 串口/以太网 最近邻

下篇预告

MediaPipe 系列 22:结果输出 Calculator——标准化输出格式

深入讲解如何输出标准化结果:JSON、Protobuf、可视化。


参考资料

  1. Linux Socket CAN. Documentation
  2. NMEA 0183. Standard
  3. Google AI Edge. External Data

系列进度: 21/55
更新时间: 2026-03-12


MediaPipe 系列 21:外部数据注入 Calculator——传感器数据接入完整指南
https://dapalm.com/2026/03/13/MediaPipe系列21-外部数据注入Calculator:传感器数据接入/
作者
Mars
发布于
2026年3月13日
许可协议