From b04b9382dc78337dffcb9650ca38085ee91ac15b Mon Sep 17 00:00:00 2001 From: donpat1to Date: Tue, 29 Jul 2025 11:15:38 +0200 Subject: [PATCH] merged registration heartbeat metrics --- WatcherAgent/Cargo.toml | 13 ++ WatcherAgent/src/main.rs | 389 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 400 insertions(+), 2 deletions(-) diff --git a/WatcherAgent/Cargo.toml b/WatcherAgent/Cargo.toml index a55dae7..21c6af9 100644 --- a/WatcherAgent/Cargo.toml +++ b/WatcherAgent/Cargo.toml @@ -2,5 +2,18 @@ name = "WatcherAgent" version = "0.1.0" edition = "2021" +authors = ["Patrick Mahnke-Hartmann "] +description = "A client heartbeat sender for CS monitoring" +license = "MIT" [dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1" +tokio = { version = "1.37", features = ["full"] } +local-ip-address = "0.5" +reqwest = { version = "0.11", default-features = false, features = ["json", "blocking", "cookies", "rustls-tls"] } +serde_json = "1.0" +sysinfo = "0.30" +metrics = "0.24.2" +chrono = "0.4" +nvml-wrapper = "0.10" \ No newline at end of file diff --git a/WatcherAgent/src/main.rs b/WatcherAgent/src/main.rs index e7a11a9..df7deed 100644 --- a/WatcherAgent/src/main.rs +++ b/WatcherAgent/src/main.rs @@ -1,3 +1,388 @@ -fn main() { - println!("Hello, world!"); +use chrono::Utc; +use nvml_wrapper::Nvml; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::{error::Error, fs, process::Command, time::Duration}; +use sysinfo::{ + CpuExt, CpuRefreshKind, DiskExt, MacAddr, MemoryRefreshKind, RefreshKind, System, SystemExt, +}; +use tokio::time::{interval, sleep}; + +// Shared data structures +#[derive(Serialize, Debug)] +struct Metric { + timestamp: String, + server_id: i32, + cpu_load: f32, + cpu_temp: f32, + gpu_load: f32, + gpu_temp: f32, + gpu_vram_size: u32, + gpu_vram_usage: u32, + ram_load: f32, + ram_size: u32, + disk_size: u32, + disk_usage: f32, + disk_temp: f32, + net_in: u64, + net_out: u64, +} + +#[derive(Serialize, Debug)] +struct RegistrationDto { + #[serde(rename = "id")] + id: i32, + #[serde(rename = "ipAddress")] + ip_address: String, + #[serde(rename = "cpuType")] + cpu_type: String, + #[serde(rename = "cpuCores")] + cpu_cores: i32, + #[serde(rename = "gpuType")] + gpu_type: String, + #[serde(rename = "ramSize")] + ram_size: f64, +} + +#[derive(Deserialize)] +struct IdResponse { + id: i32, + ip_address: String, +} + +#[derive(Serialize)] +struct HeartbeatPayload { + #[serde(rename = "IpAddress")] + ip_address: String, +} + +// Hardware info collection +struct HardwareInfo { + cpu_type: String, + cpu_cores: i32, + gpu_type: String, + ram_size: f64, + ip_address: String, +} + +impl HardwareInfo { + async fn collect() -> Result> { + let mut sys = System::new_with_specifics( + RefreshKind::new() + .with_cpu(CpuRefreshKind::everything()) + .with_memory(MemoryRefreshKind::everything()), + ); + + sys.refresh_cpu(); + sys.refresh_memory(); + + let cpus = sys.cpus(); + let cpu_type = cpus + .get(0) + .map(|c| c.brand().to_string()) + .unwrap_or("Unknown CPU".to_string()); + let cpu_cores = cpus.len() as i32; + let ram_gb = (sys.total_memory() as f64) / 1024.0 / 1024.0; + let gpu_type = Self::detect_gpu_name(); + let ip_address = local_ip_address::local_ip()?.to_string(); + + Ok(Self { + cpu_type, + cpu_cores, + gpu_type, + ram_size: ram_gb, + ip_address, + }) + } + + fn detect_gpu_name() -> String { + Self::try_nvml_gpu_name() + .or_else(Self::fallback_gpu_name) + .unwrap_or_else(|| "Unknown GPU".to_string()) + } + + fn try_nvml_gpu_name() -> Option { + let nvml = Nvml::init().ok()?; + let device = nvml.device_by_index(0).ok()?; + device.name().ok().map(|s| s.to_string()) + } + + fn fallback_gpu_name() -> Option { + #[cfg(target_os = "linux")] + { + let output = std::process::Command::new("lshw") + .args(&["-C", "display"]) + .output() + .ok()?; + Some( + String::from_utf8_lossy(&output.stdout) + .lines() + .find(|l| l.contains("product:")) + .map(|l| l.trim().replace("product:", "").trim().to_string()) + .unwrap_or("Unknown GPU".to_string()), + ) + } + + #[cfg(target_os = "windows")] + { + let output = std::process::Command::new("wmic") + .args(&["path", "win32_VideoController", "get", "name"]) + .output() + .ok()?; + Some( + String::from_utf8_lossy(&output.stdout) + .lines() + .nth(1) + .map(|s| s.trim().to_string()) + .unwrap_or("Unknown GPU".to_string()), + ) + } + } +} + +// Registration module +async fn register_with_server(base_url: &str) -> Result<(i32, String), Box> { + let client = Client::builder() + .danger_accept_invalid_certs(true) + .build()?; + + // First get server ID + let url = format!("{}/server-id", base_url); + let (id, ip_address) = loop { + println!("Attempting to fetch server ID..."); + match client.get(&url).send().await { + Ok(resp) if resp.status().is_success() => { + let id_resp: IdResponse = resp.json().await?; + println!( + "✅ Received ID {} for IP {}", + id_resp.id, id_resp.ip_address + ); + break (id_resp.id, id_resp.ip_address); + } + Ok(resp) => println!("⚠️ Server responded with status: {}", resp.status()), + Err(err) => println!("❌ Request failed: {}", err), + } + sleep(Duration::from_secs(3)).await; + }; + + // Then register hardware info + let hardware = HardwareInfo::collect().await?; + let registration = RegistrationDto { + id, + ip_address: ip_address.clone(), + cpu_type: hardware.cpu_type, + cpu_cores: hardware.cpu_cores, + gpu_type: hardware.gpu_type, + ram_size: hardware.ram_size, + }; + + let url = format!("{}/monitoring/register", base_url); + let resp = client.post(&url).json(®istration).send().await?; + + if resp.status().is_success() { + println!("Successfully registered with server."); + } else { + let text = resp.text().await?; + println!("Registration failed: {}", text); + } + + Ok((id, ip_address)) +} + +// Heartbeat module +async fn heartbeat_loop(base_url: &str, ip: &str) -> Result<(), Box> { + let client = Client::builder() + .danger_accept_invalid_certs(true) + .build()?; + let url = format!("{}/heartbeat/receive", base_url); + + loop { + let payload = HeartbeatPayload { + ip_address: ip.to_string(), + }; + + match client.post(&url).json(&payload).send().await { + Ok(res) if res.status().is_success() => { + println!("Heartbeat sent successfully."); + } + Ok(res) => eprintln!("Server responded with status: {}", res.status()), + Err(e) => eprintln!("Heartbeat error: {}", e), + } + + sleep(Duration::from_secs(20)).await; + } +} + +// Metrics module +struct MetricsCollector { + sys: System, + nvml: Option, + server_id: i32, +} + +impl MetricsCollector { + fn new(server_id: i32) -> Self { + Self { + sys: System::new_all(), + nvml: Nvml::init().ok(), + server_id, + } + } + + async fn collect_and_send_loop(&mut self, base_url: &str) -> Result<(), Box> { + let client = Client::new(); + let url = format!("{}/metric/receive", base_url); + let mut interval = interval(Duration::from_secs(30)); + + loop { + interval.tick().await; + let metric = self.collect_metrics(); + + match client.post(&url).json(&metric).send().await { + Ok(res) => println!( + "✅ Sent metrics: {} @ {} | Status: {}", + metric.server_id, + metric.timestamp, + res.status() + ), + Err(err) => eprintln!("❌ Failed to send metrics: {}", err), + } + } + } + + fn collect_metrics(&mut self) -> Metric { + self.sys.refresh_all(); + + // CPU + let cpu_load = self.sys.global_cpu_info().cpu_usage(); + let cpu_temp = get_cpu_temp().unwrap_or(0.0); + + // RAM + let total_memory = self.sys.total_memory(); + let used_memory = self.sys.used_memory(); + let ram_load = (used_memory as f32 / total_memory as f32) * 100.0; + + // Disk + let disk = self.sys.disks().first(); + let (disk_size, disk_used) = if let Some(d) = disk { + let total = d.total_space(); + let available = d.available_space(); + ( + total / 1024 / 1024, + (total - available) as f32 / total as f32 * 100.0, + ) + } else { + (0, 0.0) + }; + + // GPU (NVIDIA) + let (gpu_temp, gpu_load, vram_used, vram_total) = if let Some(nvml) = &self.nvml { + if let Ok(device) = nvml.device_by_index(0) { + let temp = device + .temperature(nvml_wrapper::enum_wrappers::device::TemperatureSensor::Gpu) + .unwrap_or(0) as f32; + let load = device + .utilization_rates() + .map(|u| u.gpu as f32) + .unwrap_or(0.0); + let mem = device.memory_info().ok(); + let used = mem.clone().map(|m| m.used / 1024 / 1024).unwrap_or(0); + let total = mem.map(|m| m.total / 1024 / 1024).unwrap_or(0); + (temp, load, used as u32, total as u32) + } else { + (0.0, 0.0, 0, 0) + } + } else { + (0.0, 0.0, 0, 0) + }; + + // Network + let (net_in, net_out) = get_network_traffic().unwrap_or((0, 0)); + + Metric { + timestamp: Utc::now().to_rfc3339(), + server_id: self.server_id, + cpu_load, + cpu_temp, + gpu_load, + gpu_temp, + gpu_vram_size: vram_total, + gpu_vram_usage: vram_used, + ram_load, + ram_size: (total_memory as f32 / 1024.0) as u32, + disk_size: disk_size as u32, + disk_usage: disk_used, + disk_temp: 0.0, + net_in, + net_out, + } + } +} + +// Helper functions +fn get_cpu_temp() -> Option { + let output = Command::new("sensors").output().ok()?; + let stdout = String::from_utf8_lossy(&output.stdout); + for line in stdout.lines() { + if line.to_lowercase().contains("package id") || line.to_lowercase().contains("cpu temp") { + if let Some(temp_str) = line.split_whitespace().find(|s| s.contains("°C")) { + let number: String = temp_str + .chars() + .filter(|c| c.is_digit(10) || *c == '.') + .collect(); + return number.parse::().ok(); + } + } + } + None +} + +fn get_network_traffic() -> Option<(u64, u64)> { + let content = fs::read_to_string("/proc/net/dev").ok()?; + let mut rx_total = 0u64; + let mut tx_total = 0u64; + + for line in content.lines().skip(2) { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() < 17 { + continue; + } + if parts[0].contains("lo:") { + continue; + } + rx_total += parts[1].parse::().ok()?; + tx_total += parts[9].parse::().ok()?; + } + Some((rx_total, tx_total)) +} + +// Main agent +#[tokio::main] +async fn main() -> Result<(), Box> { + let server_base_url = "http://localhost:5258"; + + // Registration phase + println!("Starting registration process..."); + let (server_id, ip_address) = register_with_server(server_base_url).await?; + + // Start heartbeat in background + let heartbeat_handle = tokio::spawn({ + let ip = ip_address.clone(); + async move { + if let Err(e) = heartbeat_loop(server_base_url, &ip).await { + eprintln!("Heartbeat loop failed: {}", e); + } + } + }); + + // Start metrics collection + println!("Starting metrics collection..."); + let mut metrics_collector = MetricsCollector::new(server_id); + metrics_collector + .collect_and_send_loop(server_base_url) + .await?; + + // This line is theoretically unreachable because both loops are infinite + heartbeat_handle.await?; + Ok(()) }