merged registration heartbeat metrics

This commit is contained in:
2025-07-29 11:15:38 +02:00
parent 76f16d863f
commit b04b9382dc
2 changed files with 400 additions and 2 deletions

View File

@@ -2,5 +2,18 @@
name = "WatcherAgent" name = "WatcherAgent"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
authors = ["Patrick Mahnke-Hartmann <https://github.com/donpat1to>"]
description = "A client heartbeat sender for CS monitoring"
license = "MIT"
[dependencies] [dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.37", features = ["full"] }
local-ip-address = "0.5"
reqwest = { version = "0.11", default-features = false, features = ["json", "blocking", "cookies", "rustls-tls"] }
serde_json = "1.0"
sysinfo = "0.30"
metrics = "0.24.2"
chrono = "0.4"
nvml-wrapper = "0.10"

View File

@@ -1,3 +1,388 @@
fn main() { use chrono::Utc;
println!("Hello, world!"); use nvml_wrapper::Nvml;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::{error::Error, fs, process::Command, time::Duration};
use sysinfo::{
CpuExt, CpuRefreshKind, DiskExt, MacAddr, MemoryRefreshKind, RefreshKind, System, SystemExt,
};
use tokio::time::{interval, sleep};
// Shared data structures
#[derive(Serialize, Debug)]
struct Metric {
timestamp: String,
server_id: i32,
cpu_load: f32,
cpu_temp: f32,
gpu_load: f32,
gpu_temp: f32,
gpu_vram_size: u32,
gpu_vram_usage: u32,
ram_load: f32,
ram_size: u32,
disk_size: u32,
disk_usage: f32,
disk_temp: f32,
net_in: u64,
net_out: u64,
}
#[derive(Serialize, Debug)]
struct RegistrationDto {
#[serde(rename = "id")]
id: i32,
#[serde(rename = "ipAddress")]
ip_address: String,
#[serde(rename = "cpuType")]
cpu_type: String,
#[serde(rename = "cpuCores")]
cpu_cores: i32,
#[serde(rename = "gpuType")]
gpu_type: String,
#[serde(rename = "ramSize")]
ram_size: f64,
}
#[derive(Deserialize)]
struct IdResponse {
id: i32,
ip_address: String,
}
#[derive(Serialize)]
struct HeartbeatPayload {
#[serde(rename = "IpAddress")]
ip_address: String,
}
// Hardware info collection
struct HardwareInfo {
cpu_type: String,
cpu_cores: i32,
gpu_type: String,
ram_size: f64,
ip_address: String,
}
impl HardwareInfo {
async fn collect() -> Result<Self, Box<dyn Error>> {
let mut sys = System::new_with_specifics(
RefreshKind::new()
.with_cpu(CpuRefreshKind::everything())
.with_memory(MemoryRefreshKind::everything()),
);
sys.refresh_cpu();
sys.refresh_memory();
let cpus = sys.cpus();
let cpu_type = cpus
.get(0)
.map(|c| c.brand().to_string())
.unwrap_or("Unknown CPU".to_string());
let cpu_cores = cpus.len() as i32;
let ram_gb = (sys.total_memory() as f64) / 1024.0 / 1024.0;
let gpu_type = Self::detect_gpu_name();
let ip_address = local_ip_address::local_ip()?.to_string();
Ok(Self {
cpu_type,
cpu_cores,
gpu_type,
ram_size: ram_gb,
ip_address,
})
}
fn detect_gpu_name() -> String {
Self::try_nvml_gpu_name()
.or_else(Self::fallback_gpu_name)
.unwrap_or_else(|| "Unknown GPU".to_string())
}
fn try_nvml_gpu_name() -> Option<String> {
let nvml = Nvml::init().ok()?;
let device = nvml.device_by_index(0).ok()?;
device.name().ok().map(|s| s.to_string())
}
fn fallback_gpu_name() -> Option<String> {
#[cfg(target_os = "linux")]
{
let output = std::process::Command::new("lshw")
.args(&["-C", "display"])
.output()
.ok()?;
Some(
String::from_utf8_lossy(&output.stdout)
.lines()
.find(|l| l.contains("product:"))
.map(|l| l.trim().replace("product:", "").trim().to_string())
.unwrap_or("Unknown GPU".to_string()),
)
}
#[cfg(target_os = "windows")]
{
let output = std::process::Command::new("wmic")
.args(&["path", "win32_VideoController", "get", "name"])
.output()
.ok()?;
Some(
String::from_utf8_lossy(&output.stdout)
.lines()
.nth(1)
.map(|s| s.trim().to_string())
.unwrap_or("Unknown GPU".to_string()),
)
}
}
}
// Registration module
async fn register_with_server(base_url: &str) -> Result<(i32, String), Box<dyn Error>> {
let client = Client::builder()
.danger_accept_invalid_certs(true)
.build()?;
// First get server ID
let url = format!("{}/server-id", base_url);
let (id, ip_address) = loop {
println!("Attempting to fetch server ID...");
match client.get(&url).send().await {
Ok(resp) if resp.status().is_success() => {
let id_resp: IdResponse = resp.json().await?;
println!(
"✅ Received ID {} for IP {}",
id_resp.id, id_resp.ip_address
);
break (id_resp.id, id_resp.ip_address);
}
Ok(resp) => println!("⚠️ Server responded with status: {}", resp.status()),
Err(err) => println!("❌ Request failed: {}", err),
}
sleep(Duration::from_secs(3)).await;
};
// Then register hardware info
let hardware = HardwareInfo::collect().await?;
let registration = RegistrationDto {
id,
ip_address: ip_address.clone(),
cpu_type: hardware.cpu_type,
cpu_cores: hardware.cpu_cores,
gpu_type: hardware.gpu_type,
ram_size: hardware.ram_size,
};
let url = format!("{}/monitoring/register", base_url);
let resp = client.post(&url).json(&registration).send().await?;
if resp.status().is_success() {
println!("Successfully registered with server.");
} else {
let text = resp.text().await?;
println!("Registration failed: {}", text);
}
Ok((id, ip_address))
}
// Heartbeat module
async fn heartbeat_loop(base_url: &str, ip: &str) -> Result<(), Box<dyn Error>> {
let client = Client::builder()
.danger_accept_invalid_certs(true)
.build()?;
let url = format!("{}/heartbeat/receive", base_url);
loop {
let payload = HeartbeatPayload {
ip_address: ip.to_string(),
};
match client.post(&url).json(&payload).send().await {
Ok(res) if res.status().is_success() => {
println!("Heartbeat sent successfully.");
}
Ok(res) => eprintln!("Server responded with status: {}", res.status()),
Err(e) => eprintln!("Heartbeat error: {}", e),
}
sleep(Duration::from_secs(20)).await;
}
}
// Metrics module
struct MetricsCollector {
sys: System,
nvml: Option<Nvml>,
server_id: i32,
}
impl MetricsCollector {
fn new(server_id: i32) -> Self {
Self {
sys: System::new_all(),
nvml: Nvml::init().ok(),
server_id,
}
}
async fn collect_and_send_loop(&mut self, base_url: &str) -> Result<(), Box<dyn Error>> {
let client = Client::new();
let url = format!("{}/metric/receive", base_url);
let mut interval = interval(Duration::from_secs(30));
loop {
interval.tick().await;
let metric = self.collect_metrics();
match client.post(&url).json(&metric).send().await {
Ok(res) => println!(
"✅ Sent metrics: {} @ {} | Status: {}",
metric.server_id,
metric.timestamp,
res.status()
),
Err(err) => eprintln!("❌ Failed to send metrics: {}", err),
}
}
}
fn collect_metrics(&mut self) -> Metric {
self.sys.refresh_all();
// CPU
let cpu_load = self.sys.global_cpu_info().cpu_usage();
let cpu_temp = get_cpu_temp().unwrap_or(0.0);
// RAM
let total_memory = self.sys.total_memory();
let used_memory = self.sys.used_memory();
let ram_load = (used_memory as f32 / total_memory as f32) * 100.0;
// Disk
let disk = self.sys.disks().first();
let (disk_size, disk_used) = if let Some(d) = disk {
let total = d.total_space();
let available = d.available_space();
(
total / 1024 / 1024,
(total - available) as f32 / total as f32 * 100.0,
)
} else {
(0, 0.0)
};
// GPU (NVIDIA)
let (gpu_temp, gpu_load, vram_used, vram_total) = if let Some(nvml) = &self.nvml {
if let Ok(device) = nvml.device_by_index(0) {
let temp = device
.temperature(nvml_wrapper::enum_wrappers::device::TemperatureSensor::Gpu)
.unwrap_or(0) as f32;
let load = device
.utilization_rates()
.map(|u| u.gpu as f32)
.unwrap_or(0.0);
let mem = device.memory_info().ok();
let used = mem.clone().map(|m| m.used / 1024 / 1024).unwrap_or(0);
let total = mem.map(|m| m.total / 1024 / 1024).unwrap_or(0);
(temp, load, used as u32, total as u32)
} else {
(0.0, 0.0, 0, 0)
}
} else {
(0.0, 0.0, 0, 0)
};
// Network
let (net_in, net_out) = get_network_traffic().unwrap_or((0, 0));
Metric {
timestamp: Utc::now().to_rfc3339(),
server_id: self.server_id,
cpu_load,
cpu_temp,
gpu_load,
gpu_temp,
gpu_vram_size: vram_total,
gpu_vram_usage: vram_used,
ram_load,
ram_size: (total_memory as f32 / 1024.0) as u32,
disk_size: disk_size as u32,
disk_usage: disk_used,
disk_temp: 0.0,
net_in,
net_out,
}
}
}
// Helper functions
fn get_cpu_temp() -> Option<f32> {
let output = Command::new("sensors").output().ok()?;
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if line.to_lowercase().contains("package id") || line.to_lowercase().contains("cpu temp") {
if let Some(temp_str) = line.split_whitespace().find(|s| s.contains("°C")) {
let number: String = temp_str
.chars()
.filter(|c| c.is_digit(10) || *c == '.')
.collect();
return number.parse::<f32>().ok();
}
}
}
None
}
fn get_network_traffic() -> Option<(u64, u64)> {
let content = fs::read_to_string("/proc/net/dev").ok()?;
let mut rx_total = 0u64;
let mut tx_total = 0u64;
for line in content.lines().skip(2) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 17 {
continue;
}
if parts[0].contains("lo:") {
continue;
}
rx_total += parts[1].parse::<u64>().ok()?;
tx_total += parts[9].parse::<u64>().ok()?;
}
Some((rx_total, tx_total))
}
// Main agent
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let server_base_url = "http://localhost:5258";
// Registration phase
println!("Starting registration process...");
let (server_id, ip_address) = register_with_server(server_base_url).await?;
// Start heartbeat in background
let heartbeat_handle = tokio::spawn({
let ip = ip_address.clone();
async move {
if let Err(e) = heartbeat_loop(server_base_url, &ip).await {
eprintln!("Heartbeat loop failed: {}", e);
}
}
});
// Start metrics collection
println!("Starting metrics collection...");
let mut metrics_collector = MetricsCollector::new(server_id);
metrics_collector
.collect_and_send_loop(server_base_url)
.await?;
// This line is theoretically unreachable because both loops are infinite
heartbeat_handle.await?;
Ok(())
} }