diff --git a/WatcherAgent/src/api.rs b/WatcherAgent/src/api.rs index 1c8df20..87234aa 100644 --- a/WatcherAgent/src/api.rs +++ b/WatcherAgent/src/api.rs @@ -7,7 +7,9 @@ use reqwest::{Client, StatusCode}; use std::error::Error; use tokio::time::{interval, sleep}; -pub async fn register_with_server(base_url: &str) -> Result<(i32, String), Box> { +pub async fn register_with_server( + base_url: &str, +) -> Result<(i32, String), Box> { // First get local IP let ip = local_ip_address::local_ip()?.to_string(); println!("Local IP address detected: {}", ip); @@ -58,7 +60,10 @@ pub async fn register_with_server(base_url: &str) -> Result<(i32, String), Box Result<(i32, String), Box> { +async fn get_server_id_by_ip( + base_url: &str, + ip: &str, +) -> Result<(i32, String), Box> { let client = Client::builder() .danger_accept_invalid_certs(true) .build()?; @@ -106,7 +111,7 @@ async fn get_server_id_by_ip(base_url: &str, ip: &str) -> Result<(i32, String), } } -pub async fn heartbeat_loop(base_url: &str, ip: &str) -> Result<(), Box> { +pub async fn heartbeat_loop(base_url: &str, ip: &str) -> Result<(), Box> { let client = Client::builder() .danger_accept_invalid_certs(true) .build()?; @@ -129,7 +134,10 @@ pub async fn heartbeat_loop(base_url: &str, ip: &str) -> Result<(), Box Result<(), Box> { +pub async fn send_metrics( + base_url: &str, + metrics: &MetricDto, +) -> Result<(), Box> { let client = Client::new(); let url = format!("{}/monitoring/metric", base_url); let mut interval = interval(Duration::from_secs(20)); @@ -137,6 +145,7 @@ pub async fn send_metrics(base_url: &str, metrics: &MetricDto) -> Result<(), Box loop { interval.tick().await; let metric = metrics; + eprintln!("Sending metrics: {:?}", metric); match client.post(&url).json(&metric).send().await { Ok(res) => println!( diff --git a/WatcherAgent/src/hardware/cpu.rs b/WatcherAgent/src/hardware/cpu.rs index 59684b2..73efa06 100644 --- a/WatcherAgent/src/hardware/cpu.rs +++ b/WatcherAgent/src/hardware/cpu.rs @@ -11,7 +11,7 @@ pub struct CpuInfo { pub current_temp: Option, } -pub async fn get_cpu_info() -> Result> { +pub async fn get_cpu_info() -> Result> { let mut sys = System::new(); sys.refresh_cpu_all(); @@ -28,13 +28,13 @@ pub async fn get_cpu_info() -> Result> { }) } -pub async fn get_cpu_load(sys: &mut System) -> Result> { +pub async fn get_cpu_load(sys: &mut System) -> Result> { sys.refresh_cpu_all(); tokio::task::yield_now().await; // Allow other tasks to run Ok(sys.global_cpu_usage() as f64) } -pub async fn get_cpu_temp() -> Result> { +pub async fn get_cpu_temp() -> Result> { println!("Attempting to get CPU temperature..."); #[cfg(target_os = "linux")] diff --git a/WatcherAgent/src/hardware/gpu.rs b/WatcherAgent/src/hardware/gpu.rs index 836f95a..9e667e6 100644 --- a/WatcherAgent/src/hardware/gpu.rs +++ b/WatcherAgent/src/hardware/gpu.rs @@ -11,7 +11,7 @@ pub struct GpuInfo { pub vram_used: Option, } -pub async fn get_gpu_info() -> Result> { +pub async fn get_gpu_info() -> Result> { match get_gpu_metrics() { Ok((gpu_temp, gpu_load, vram_used, vram_total)) => { let gpu_name = detect_gpu_name(); @@ -37,7 +37,7 @@ pub async fn get_gpu_info() -> Result> { } } -pub fn get_gpu_metrics() -> Result<(f64, f64, f64, f64), Box> { +pub fn get_gpu_metrics() -> Result<(f64, f64, f64, f64), Box> { let nvml = Nvml::init(); if let Ok(nvml) = nvml { if let Ok(device) = nvml.device_by_index(0) { diff --git a/WatcherAgent/src/hardware/memory.rs b/WatcherAgent/src/hardware/memory.rs index 35a71cb..758c57a 100644 --- a/WatcherAgent/src/hardware/memory.rs +++ b/WatcherAgent/src/hardware/memory.rs @@ -21,7 +21,7 @@ pub async fn get_memory_info() -> Result { }) } -pub fn _get_memory_usage(sys: &mut System) -> Result> { +pub fn _get_memory_usage(sys: &mut System) -> Result> { sys.refresh_memory(); Ok((sys.used_memory() as f64 / sys.total_memory() as f64) * 100.0) } diff --git a/WatcherAgent/src/hardware/mod.rs b/WatcherAgent/src/hardware/mod.rs index c8c3b34..0eba3c2 100644 --- a/WatcherAgent/src/hardware/mod.rs +++ b/WatcherAgent/src/hardware/mod.rs @@ -25,14 +25,24 @@ pub struct HardwareInfo { } impl HardwareInfo { - pub async fn collect() -> anyhow::Result> { + pub async fn collect() -> Result> { let mut network_monitor = network::NetworkMonitor::new(); Ok(Self { cpu: get_cpu_info().await?, gpu: get_gpu_info().await?, memory: get_memory_info().await?, disk: get_disk_info().await?, - network: get_network_info(&mut network_monitor).await?, + network: match get_network_info(&mut network_monitor).await { + Ok(info) => info, + Err(e) => { + eprintln!("Error collecting network info: {}", e); + network::NetworkInfo { + interfaces: None, + rx_rate: None, + tx_rate: None, + } + } + }, network_monitor, }) } diff --git a/WatcherAgent/src/main.rs b/WatcherAgent/src/main.rs index 3ace2f6..a990253 100644 --- a/WatcherAgent/src/main.rs +++ b/WatcherAgent/src/main.rs @@ -7,33 +7,66 @@ pub mod metrics; pub mod models; pub use crate::hardware::gpu; -use anyhow::Result; use std::error::Error; +use std::marker::Send; +use std::marker::Sync; +use std::result::Result; +use tokio::task::JoinHandle; + +async fn flatten( + handle: JoinHandle>>, +) -> Result> { + match handle.await { + Ok(Ok(result)) => Ok(result), + Ok(Err(err)) => Err(err), + Err(_err) => Err("handling failed".into()), + } +} #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { let server_url = "http://localhost:5000"; // Registration - let (server_id, ip) = api::register_with_server(server_url).await?; + let (server_id, ip) = match api::register_with_server(server_url).await { + Ok((id, ip)) => (id, ip), + Err(e) => { + eprintln!("Fehler bei der Registrierung am Server: {e}"); + return Err(e.into()); + } + }; // Start background tasks // Start heartbeat in background let heartbeat_handle = tokio::spawn({ let ip = ip.clone(); - async move { - if let Err(e) = api::heartbeat_loop(server_url, &ip).await { - eprintln!("Heartbeat loop failed: {}", e); - } - } + let server_url = server_url.to_string(); + async move { api::heartbeat_loop(&server_url, &ip).await } }); - heartbeat_handle.await?; - // Main metrics loop println!("Starting metrics collection..."); - let mut collector = metrics::Collector::new(server_id, ip); - collector.run(server_url).await?; + let metrics_handle = tokio::spawn({ + let ip = ip.clone(); + let server_url = server_url.to_string(); + async move { + let mut collector = metrics::Collector::new(server_id, ip); + collector.run(&server_url).await + } + }); + + // Warte auf beide Tasks und prüfe explizit auf Fehler + let (heartbeat_handle, metrics_handle) = + tokio::try_join!(flatten(heartbeat_handle), flatten(metrics_handle))?; + + match (heartbeat_handle, metrics_handle) { + (heartbeat, metrics) => println!( + "All tasks completed successfully: {:?}, {:?}.", + heartbeat, metrics + ), + } + + println!("All tasks completed successfully."); Ok(()) } diff --git a/WatcherAgent/src/metrics.rs b/WatcherAgent/src/metrics.rs index 278ebf3..5ca541b 100644 --- a/WatcherAgent/src/metrics.rs +++ b/WatcherAgent/src/metrics.rs @@ -21,20 +21,33 @@ impl Collector { } } - pub async fn run(&mut self, base_url: &str) -> Result<(), Box> { + pub async fn run(&mut self, base_url: &str) -> Result<(), Box> { loop { println!( "[{}] Starting metrics collection...", chrono::Local::now().format("%H:%M:%S") ); - let metrics = self.collect().await?; + let metrics = match self.collect().await { + Ok(metrics) => metrics, + Err(e) => { + eprintln!("Error collecting metrics: {}", e); + tokio::time::sleep(Duration::from_secs(10)).await; + continue; + } + }; api::send_metrics(base_url, &metrics).await?; tokio::time::sleep(Duration::from_secs(20)).await; } } - pub async fn collect(&mut self) -> Result> { - let hardware = HardwareInfo::collect().await?; + pub async fn collect(&mut self) -> Result> { + let hardware = match HardwareInfo::collect().await { + Ok(hw) => hw, + Err(e) => { + eprintln!("Fehler beim Sammeln der Hardware-Infos: {e}"); + return Err(e.into()); + } + }; // Collect network usage let (_, _) = self.network_monitor.update_usage().unwrap_or((0.0, 0.0));