running parallel tokio tasks for heartbeat and metrics

This commit is contained in:
2025-08-09 22:00:42 +02:00
parent a1bbbedd75
commit ac2fce75a0
7 changed files with 93 additions and 28 deletions

View File

@@ -7,7 +7,9 @@ use reqwest::{Client, StatusCode};
use std::error::Error; use std::error::Error;
use tokio::time::{interval, sleep}; use tokio::time::{interval, sleep};
pub async fn register_with_server(base_url: &str) -> Result<(i32, String), Box<dyn Error>> { pub async fn register_with_server(
base_url: &str,
) -> Result<(i32, String), Box<dyn Error + Send + Sync>> {
// First get local IP // First get local IP
let ip = local_ip_address::local_ip()?.to_string(); let ip = local_ip_address::local_ip()?.to_string();
println!("Local IP address detected: {}", ip); println!("Local IP address detected: {}", ip);
@@ -58,7 +60,10 @@ pub async fn register_with_server(base_url: &str) -> Result<(i32, String), Box<d
} }
} }
async fn get_server_id_by_ip(base_url: &str, ip: &str) -> Result<(i32, String), Box<dyn Error>> { async fn get_server_id_by_ip(
base_url: &str,
ip: &str,
) -> Result<(i32, String), Box<dyn Error + Send + Sync>> {
let client = Client::builder() let client = Client::builder()
.danger_accept_invalid_certs(true) .danger_accept_invalid_certs(true)
.build()?; .build()?;
@@ -106,7 +111,7 @@ async fn get_server_id_by_ip(base_url: &str, ip: &str) -> Result<(i32, String),
} }
} }
pub async fn heartbeat_loop(base_url: &str, ip: &str) -> Result<(), Box<dyn Error>> { pub async fn heartbeat_loop(base_url: &str, ip: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
let client = Client::builder() let client = Client::builder()
.danger_accept_invalid_certs(true) .danger_accept_invalid_certs(true)
.build()?; .build()?;
@@ -129,7 +134,10 @@ pub async fn heartbeat_loop(base_url: &str, ip: &str) -> Result<(), Box<dyn Erro
} }
} }
pub async fn send_metrics(base_url: &str, metrics: &MetricDto) -> Result<(), Box<dyn Error>> { pub async fn send_metrics(
base_url: &str,
metrics: &MetricDto,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let client = Client::new(); let client = Client::new();
let url = format!("{}/monitoring/metric", base_url); let url = format!("{}/monitoring/metric", base_url);
let mut interval = interval(Duration::from_secs(20)); let mut interval = interval(Duration::from_secs(20));
@@ -137,6 +145,7 @@ pub async fn send_metrics(base_url: &str, metrics: &MetricDto) -> Result<(), Box
loop { loop {
interval.tick().await; interval.tick().await;
let metric = metrics; let metric = metrics;
eprintln!("Sending metrics: {:?}", metric);
match client.post(&url).json(&metric).send().await { match client.post(&url).json(&metric).send().await {
Ok(res) => println!( Ok(res) => println!(

View File

@@ -11,7 +11,7 @@ pub struct CpuInfo {
pub current_temp: Option<f64>, pub current_temp: Option<f64>,
} }
pub async fn get_cpu_info() -> Result<CpuInfo, Box<dyn Error>> { pub async fn get_cpu_info() -> Result<CpuInfo, Box<dyn Error + Send + Sync>> {
let mut sys = System::new(); let mut sys = System::new();
sys.refresh_cpu_all(); sys.refresh_cpu_all();
@@ -28,13 +28,13 @@ pub async fn get_cpu_info() -> Result<CpuInfo, Box<dyn Error>> {
}) })
} }
pub async fn get_cpu_load(sys: &mut System) -> Result<f64, Box<dyn Error>> { pub async fn get_cpu_load(sys: &mut System) -> Result<f64, Box<dyn Error + Send + Sync>> {
sys.refresh_cpu_all(); sys.refresh_cpu_all();
tokio::task::yield_now().await; // Allow other tasks to run tokio::task::yield_now().await; // Allow other tasks to run
Ok(sys.global_cpu_usage() as f64) Ok(sys.global_cpu_usage() as f64)
} }
pub async fn get_cpu_temp() -> Result<f64, Box<dyn Error>> { pub async fn get_cpu_temp() -> Result<f64, Box<dyn Error + Send + Sync>> {
println!("Attempting to get CPU temperature..."); println!("Attempting to get CPU temperature...");
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]

View File

@@ -11,7 +11,7 @@ pub struct GpuInfo {
pub vram_used: Option<f64>, pub vram_used: Option<f64>,
} }
pub async fn get_gpu_info() -> Result<GpuInfo, Box<dyn Error>> { pub async fn get_gpu_info() -> Result<GpuInfo, Box<dyn Error + Send + Sync>> {
match get_gpu_metrics() { match get_gpu_metrics() {
Ok((gpu_temp, gpu_load, vram_used, vram_total)) => { Ok((gpu_temp, gpu_load, vram_used, vram_total)) => {
let gpu_name = detect_gpu_name(); let gpu_name = detect_gpu_name();
@@ -37,7 +37,7 @@ pub async fn get_gpu_info() -> Result<GpuInfo, Box<dyn Error>> {
} }
} }
pub fn get_gpu_metrics() -> Result<(f64, f64, f64, f64), Box<dyn Error>> { pub fn get_gpu_metrics() -> Result<(f64, f64, f64, f64), Box<dyn Error + Send + Sync>> {
let nvml = Nvml::init(); let nvml = Nvml::init();
if let Ok(nvml) = nvml { if let Ok(nvml) = nvml {
if let Ok(device) = nvml.device_by_index(0) { if let Ok(device) = nvml.device_by_index(0) {

View File

@@ -21,7 +21,7 @@ pub async fn get_memory_info() -> Result<MemoryInfo> {
}) })
} }
pub fn _get_memory_usage(sys: &mut System) -> Result<f64, Box<dyn Error>> { pub fn _get_memory_usage(sys: &mut System) -> Result<f64, Box<dyn Error + Send + Sync>> {
sys.refresh_memory(); sys.refresh_memory();
Ok((sys.used_memory() as f64 / sys.total_memory() as f64) * 100.0) Ok((sys.used_memory() as f64 / sys.total_memory() as f64) * 100.0)
} }

View File

@@ -25,14 +25,24 @@ pub struct HardwareInfo {
} }
impl HardwareInfo { impl HardwareInfo {
pub async fn collect() -> anyhow::Result<Self, Box<dyn Error>> { pub async fn collect() -> Result<Self, Box<dyn Error + Send + Sync>> {
let mut network_monitor = network::NetworkMonitor::new(); let mut network_monitor = network::NetworkMonitor::new();
Ok(Self { Ok(Self {
cpu: get_cpu_info().await?, cpu: get_cpu_info().await?,
gpu: get_gpu_info().await?, gpu: get_gpu_info().await?,
memory: get_memory_info().await?, memory: get_memory_info().await?,
disk: get_disk_info().await?, disk: get_disk_info().await?,
network: get_network_info(&mut network_monitor).await?, network: match get_network_info(&mut network_monitor).await {
Ok(info) => info,
Err(e) => {
eprintln!("Error collecting network info: {}", e);
network::NetworkInfo {
interfaces: None,
rx_rate: None,
tx_rate: None,
}
}
},
network_monitor, network_monitor,
}) })
} }

View File

@@ -7,33 +7,66 @@ pub mod metrics;
pub mod models; pub mod models;
pub use crate::hardware::gpu; pub use crate::hardware::gpu;
use anyhow::Result;
use std::error::Error; use std::error::Error;
use std::marker::Send;
use std::marker::Sync;
use std::result::Result;
use tokio::task::JoinHandle;
async fn flatten<T>(
handle: JoinHandle<Result<T, Box<dyn Error + Send + Sync>>>,
) -> Result<T, Box<dyn Error + Send + Sync>> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(_err) => Err("handling failed".into()),
}
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let server_url = "http://localhost:5000"; let server_url = "http://localhost:5000";
// Registration // Registration
let (server_id, ip) = api::register_with_server(server_url).await?; let (server_id, ip) = match api::register_with_server(server_url).await {
Ok((id, ip)) => (id, ip),
Err(e) => {
eprintln!("Fehler bei der Registrierung am Server: {e}");
return Err(e.into());
}
};
// Start background tasks // Start background tasks
// Start heartbeat in background // Start heartbeat in background
let heartbeat_handle = tokio::spawn({ let heartbeat_handle = tokio::spawn({
let ip = ip.clone(); let ip = ip.clone();
async move { let server_url = server_url.to_string();
if let Err(e) = api::heartbeat_loop(server_url, &ip).await { async move { api::heartbeat_loop(&server_url, &ip).await }
eprintln!("Heartbeat loop failed: {}", e);
}
}
}); });
heartbeat_handle.await?;
// Main metrics loop // Main metrics loop
println!("Starting metrics collection..."); println!("Starting metrics collection...");
let mut collector = metrics::Collector::new(server_id, ip); let metrics_handle = tokio::spawn({
collector.run(server_url).await?; let ip = ip.clone();
let server_url = server_url.to_string();
async move {
let mut collector = metrics::Collector::new(server_id, ip);
collector.run(&server_url).await
}
});
// Warte auf beide Tasks und prüfe explizit auf Fehler
let (heartbeat_handle, metrics_handle) =
tokio::try_join!(flatten(heartbeat_handle), flatten(metrics_handle))?;
match (heartbeat_handle, metrics_handle) {
(heartbeat, metrics) => println!(
"All tasks completed successfully: {:?}, {:?}.",
heartbeat, metrics
),
}
println!("All tasks completed successfully.");
Ok(()) Ok(())
} }

View File

@@ -21,20 +21,33 @@ impl Collector {
} }
} }
pub async fn run(&mut self, base_url: &str) -> Result<(), Box<dyn Error>> { pub async fn run(&mut self, base_url: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
loop { loop {
println!( println!(
"[{}] Starting metrics collection...", "[{}] Starting metrics collection...",
chrono::Local::now().format("%H:%M:%S") chrono::Local::now().format("%H:%M:%S")
); );
let metrics = self.collect().await?; let metrics = match self.collect().await {
Ok(metrics) => metrics,
Err(e) => {
eprintln!("Error collecting metrics: {}", e);
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
};
api::send_metrics(base_url, &metrics).await?; api::send_metrics(base_url, &metrics).await?;
tokio::time::sleep(Duration::from_secs(20)).await; tokio::time::sleep(Duration::from_secs(20)).await;
} }
} }
pub async fn collect(&mut self) -> Result<MetricDto, Box<dyn Error>> { pub async fn collect(&mut self) -> Result<MetricDto, Box<dyn Error + Send + Sync>> {
let hardware = HardwareInfo::collect().await?; let hardware = match HardwareInfo::collect().await {
Ok(hw) => hw,
Err(e) => {
eprintln!("Fehler beim Sammeln der Hardware-Infos: {e}");
return Err(e.into());
}
};
// Collect network usage // Collect network usage
let (_, _) = self.network_monitor.update_usage().unwrap_or((0.0, 0.0)); let (_, _) = self.network_monitor.update_usage().unwrap_or((0.0, 0.0));