import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import numpy as np import os import matplotlib.pyplot as plt from sklearn.metrics import confusion_matrix, classification_report, f1_score from model import AnomalyDetector, detect_anomaly def generate_synthetic_data(num_samples, is_anomaly=False, input_dim=41): """ Generates synthetic network traffic data. Normal traffic is centered around 0 with small variance. Anomalous traffic has higher variance and shifted means. """ if not is_anomaly: # Normal traffic: Gaussian distribution centered at 0 data = np.random.normal(loc=0.0, scale=0.5, size=(num_samples, input_dim)) else: # Anomalous traffic: Shifted mean and higher variance data = np.random.normal(loc=2.0, scale=1.5, size=(num_samples, input_dim)) return torch.tensor(data, dtype=torch.float32) def train_autoencoder(): print("Starting NetGuard-AI Model Training...") # Hyperparameters input_dim = 41 batch_size = 64 epochs = 20 learning_rate = 1e-3 # 1. Generate Training Data (Only Normal Traffic for Autoencoder) print("Generating synthetic normal traffic for training...") train_data = generate_synthetic_data(10000, is_anomaly=False, input_dim=input_dim) train_loader = DataLoader(TensorDataset(train_data, train_data), batch_size=batch_size, shuffle=True) # 2. Initialize Model model = AnomalyDetector(input_dim=input_dim) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=learning_rate) # 3. Training Loop model.train() loss_history = [] for epoch in range(epochs): epoch_loss = 0 for batch_x, _ in train_loader: optimizer.zero_grad() reconstructed = model(batch_x) loss = criterion(reconstructed, batch_x) loss.backward() optimizer.step() epoch_loss += loss.item() avg_loss = epoch_loss / len(train_loader) loss_history.append(avg_loss) if (epoch + 1) % 5 == 0: print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}") # 4. Save Model os.makedirs('../models', exist_ok=True) model_path = '../models/autoencoder.pth' torch.save(model.state_dict(), model_path) print("Model saved to", model_path) # 5. Evaluation Phase print("\nEvaluating Model on Test Set (Mixed Traffic)...") model.eval() # Generate test data (80% normal, 20% anomalous) test_normal = generate_synthetic_data(800, is_anomaly=False) test_anomalous = generate_synthetic_data(200, is_anomaly=True) test_data = torch.cat([test_normal, test_anomalous]) true_labels = np.concatenate([np.zeros(800), np.ones(200)]) # 0 = Normal, 1 = Anomaly # Predict anomalies, scores = detect_anomaly(model, test_data, threshold=0.5) # Set a reasonable threshold pred_labels = anomalies.numpy().astype(int) # Metrics print("\nClassification Report:") print(classification_report(true_labels, pred_labels, target_names=["Normal", "Anomaly"])) f1 = f1_score(true_labels, pred_labels) print(f"F1 Score: {f1:.4f}") # Save evaluation plot plt.figure(figsize=(10, 5)) plt.plot(loss_history, label='Training Loss') plt.title('Autoencoder Training Loss') plt.xlabel('Epoch') plt.ylabel('MSE Loss') plt.legend() plt.savefig('../models/training_loss.png') print("Training loss plot saved to models/training_loss.png") if __name__ == "__main__": train_autoencoder()