1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| """ DMS-ADAS 集成接口
定义 DMS 与 ADAS 的通信协议 """
from dataclasses import dataclass from enum import Enum import can
class InterventionCommand(Enum): """干预指令类型""" NONE = 0 WARNING_LEVEL_1 = 1 WARNING_LEVEL_2 = 2 DECELERATE = 3 PULL_OVER = 4 EMERGENCY_STOP = 5
@dataclass class DMSStatus: """DMS 状态消息""" driver_state: int confidence: float intervention_request: int timestamp: float
class DMSADASInterface: """ DMS-ADAS 通信接口 通过 CAN 总线通信 """ DMS_STATUS_ID = 0x100 ADAS_COMMAND_ID = 0x200 INTERVENTION_REQUEST_ID = 0x300 def __init__(self, can_channel='can0'): self.bus = can.interface.Bus(channel=can_channel, bustype='socketcan') def send_dms_status(self, status: DMSStatus): """ 发送 DMS 状态 8字节消息格式: - 字节 0: 驾驶员状态 - 字节 1-2: 置信度 (x1000) - 字节 3: 干预请求 - 字节 4-7: 时间戳 """ data = [ status.driver_state, int(status.confidence * 1000) >> 8, int(status.confidence * 1000) & 0xFF, status.intervention_request, int(status.timestamp) >> 24, int(status.timestamp) >> 16 & 0xFF, int(status.timestamp) >> 8 & 0xFF, int(status.timestamp) & 0xFF ] msg = can.Message( arbitration_id=self.DMS_STATUS_ID, data=data, is_extended_id=False ) self.bus.send(msg) def receive_adas_command(self) -> Dict: """ 接收 ADAS 命令 Returns: command: { 'intervention_type': int, 'target_speed': float, 'target_lane': int, 'deceleration': float } """ msg = self.bus.recv(timeout=0.1) if msg is None or msg.arbitration_id != self.ADAS_COMMAND_ID: return None return { 'intervention_type': msg.data[0], 'target_speed': msg.data[1] / 10.0, 'target_lane': msg.data[2], 'deceleration': msg.data[3] / 10.0 } def send_intervention_request(self, command: InterventionCommand): """ 发送干预请求 """ msg = can.Message( arbitration_id=self.INTERVENTION_REQUEST_ID, data=[command.value, 0, 0, 0, 0, 0, 0, 0], is_extended_id=False ) self.bus.send(msg)
class FunctionalSafetyMonitor: """ 功能安全监控 ASIL-B 级别监控 """ def __init__(self): self.watchdog_timeout = 0.1 self.last_update_time = None self.fault_detected = False def check_dms_availability(self, dms_status: DMSStatus) -> bool: """ 检查 DMS 可用性 """ if self.last_update_time is not None: elapsed = time.time() - self.last_update_time if elapsed > self.watchdog_timeout: self.fault_detected = True return False self.last_update_time = time.time() if dms_status.confidence < 0.5: return False return True def check_adas_availability(self, adas_command: Dict) -> bool: """ 检查 ADAS 可用性 """ if adas_command is None: return False return True def get_safe_state(self) -> InterventionCommand: """ 获取安全状态 故障时的降级策略 """ if self.fault_detected: return InterventionCommand.WARNING_LEVEL_1 return InterventionCommand.NONE
|