1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
| import torch import torch.nn as nn import numpy as np
class TakeoverReadinessPredictor(nn.Module): """接管准备度预测模型 LSTM-BiLSTM-Attention 架构 """ def __init__(self, input_dim: int = 14, hidden_dim: int = 128, num_layers: int = 2, dropout: float = 0.2): super().__init__() self.input_embedding = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.LayerNorm(hidden_dim), nn.ReLU(), nn.Dropout(dropout) ) self.bilstm = nn.LSTM( input_size=hidden_dim, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=True, dropout=dropout if num_layers > 1 else 0 ) self.attention = nn.Sequential( nn.Linear(hidden_dim * 2, hidden_dim), nn.Tanh(), nn.Linear(hidden_dim, 1), nn.Softmax(dim=1) ) self.predictor = nn.Sequential( nn.Linear(hidden_dim * 2, hidden_dim), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim, 3) ) def forward(self, x: torch.Tensor) -> dict: """ Args: x: (B, T, D) 时序特征序列 B = batch size T = 时间步(历史窗口) D = 特征维度 Returns: { 'readiness_score': (B, 1), # 0-1 准备度评分 'predicted_takeover_time': (B, 1), # 预测接管时间(秒) 'predicted_quality': (B, 1), # 预测接管质量 'attention_weights': (B, T, 1) # 注意力权重 } """ B, T, D = x.shape embedded = self.input_embedding(x) lstm_out, _ = self.bilstm(embedded) attn_weights = self.attention(lstm_out) weighted = torch.sum(attn_weights * lstm_out, dim=1) predictions = self.predictor(weighted) return { 'readiness_score': torch.sigmoid(predictions[:, 0:1]), 'predicted_takeover_time': torch.relu(predictions[:, 1:2]) + 1.0, 'predicted_quality': torch.sigmoid(predictions[:, 2:3]), 'attention_weights': attn_weights }
class TakeoverDecisionSystem: """接管决策系统""" def __init__(self, model_path: str, readiness_threshold: float = 0.7, min_takeover_time: float = 5.0): self.model = TakeoverReadinessPredictor() self.model.load_state_dict(torch.load(model_path)) self.model.eval() self.readiness_threshold = readiness_threshold self.min_takeover_time = min_takeover_time self.history = [] self.history_window = 30 def update(self, features: TakeoverReadinessFeatures) -> dict: """更新状态并预测 Returns: { 'is_ready': bool, 'readiness_score': float, 'predicted_takeover_time': float, 'recommended_action': str } """ self.history.append(features.to_feature_vector()) if len(self.history) > self.history_window: self.history.pop(0) if len(self.history) < self.history_window: return { 'is_ready': False, 'readiness_score': 0.0, 'predicted_takeover_time': float('inf'), 'recommended_action': 'MONITOR' } x = torch.tensor(np.array(self.history), dtype=torch.float32).unsqueeze(0) with torch.no_grad(): predictions = self.model(x) readiness = predictions['readiness_score'].item() takeover_time = predictions['predicted_takeover_time'].item() quality = predictions['predicted_quality'].item() if readiness >= self.readiness_threshold and takeover_time <= self.min_takeover_time: is_ready = True action = 'IMMEDIATE_HANDOVER' elif readiness >= 0.5: is_ready = False action = 'EXTENDED_WARNING' else: is_ready = False action = 'EMERGENCY_STOP' return { 'is_ready': is_ready, 'readiness_score': readiness, 'predicted_takeover_time': takeover_time, 'predicted_quality': quality, 'recommended_action': action }
|