7 Commits

Author SHA1 Message Date
8bac357dc6 added debugging for docker image
All checks were successful
Rust Cross-Platform Build / Detect Rust Project (push) Successful in 5s
Rust Cross-Platform Build / Set Tag Name (push) Successful in 5s
Rust Cross-Platform Build / Run Tests (push) Successful in 1m10s
Rust Cross-Platform Build / Build (x86_64-unknown-linux-gnu) (push) Successful in 3m9s
Rust Cross-Platform Build / Build (x86_64-pc-windows-gnu) (push) Successful in 3m54s
Rust Cross-Platform Build / Build and Push Docker Image (push) Successful in 2m12s
Rust Cross-Platform Build / Workflow Summary (push) Successful in 1s
Rust Cross-Platform Build / Create Tag (push) Successful in 4s
2025-09-28 00:36:52 +02:00
7154c01f7a added search for docker image in different locations
All checks were successful
Rust Cross-Platform Build / Detect Rust Project (push) Successful in 4s
Rust Cross-Platform Build / Set Tag Name (push) Successful in 5s
Rust Cross-Platform Build / Run Tests (push) Successful in 1m6s
Rust Cross-Platform Build / Build (x86_64-unknown-linux-gnu) (push) Successful in 2m45s
Rust Cross-Platform Build / Build (x86_64-pc-windows-gnu) (push) Successful in 3m30s
Rust Cross-Platform Build / Build and Push Docker Image (push) Successful in 2m19s
Rust Cross-Platform Build / Workflow Summary (push) Successful in 2s
Rust Cross-Platform Build / Create Tag (push) Successful in 5s
2025-09-28 00:24:47 +02:00
813bf4e407 cleaned up disk information
All checks were successful
Rust Cross-Platform Build / Detect Rust Project (push) Successful in 5s
Rust Cross-Platform Build / Set Tag Name (push) Successful in 5s
Rust Cross-Platform Build / Run Tests (push) Successful in 1m4s
Rust Cross-Platform Build / Build (x86_64-unknown-linux-gnu) (push) Successful in 2m37s
Rust Cross-Platform Build / Build (x86_64-pc-windows-gnu) (push) Successful in 3m21s
Rust Cross-Platform Build / Build and Push Docker Image (push) Successful in 1m55s
Rust Cross-Platform Build / Workflow Summary (push) Successful in 2s
Rust Cross-Platform Build / Create Tag (push) Successful in 4s
2025-09-27 23:48:37 +02:00
bf6f89c954 added error handling for client version print 2025-09-27 23:48:02 +02:00
dbe87fedb6 added error handling for client version print 2025-09-27 21:36:59 +02:00
67b24b33aa added inital server communcation task 2025-09-27 21:34:30 +02:00
67ebbdaa19 added image screening for current container 2025-09-26 23:36:09 +02:00
7 changed files with 429 additions and 138 deletions

View File

@@ -19,6 +19,8 @@ nvml-wrapper = "0.11"
nvml-wrapper-sys = "0.9.0"
anyhow = "1.0.98"
regex = "1.11.3"
# Docker .env loading
# config = "0.13"

View File

@@ -1,16 +1,15 @@
use crate::serverclientcomm::handle_server_message;
use std::time::Duration;
use crate::hardware::HardwareInfo;
use crate::models::{HeartbeatDto, IdResponse, MetricDto, RegistrationDto};
use crate::models::{HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage, Acknowledgment};
use crate::serverclientcomm::handle_server_message;
use anyhow::Result;
use reqwest::{Client, StatusCode};
use std::error::Error;
use tokio::time::sleep;
use bollard::Docker;
use crate::models::ServerMessage;
pub async fn register_with_server(
base_url: &str,
@@ -159,24 +158,84 @@ pub async fn send_metrics(
Ok(())
}
pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(), Box<dyn Error + Send + Sync>>{
pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
let url = format!("{}/api/message", base_url);
loop {
// Replace with your server endpoint
let resp = reqwest::get(&url)
.await;
let client = reqwest::Client::new();
if let Ok(resp) = resp {
if let Ok(msg) = resp.json::<ServerMessage>().await {
handle_server_message(docker, msg).await;
} else {
eprintln!("Failed to parse message");
loop {
// Get message from server
let resp = client.get(&url).send().await;
match resp {
Ok(response) => {
if response.status().is_success() {
match response.json::<ServerMessage>().await {
Ok(msg) => {
// Acknowledge receipt immediately
if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, "received", "Message received successfully").await {
eprintln!("Failed to send receipt acknowledgment: {}", e);
}
// Handle the message
let result = handle_server_message(docker, msg.clone()).await;
// Send execution result acknowledgment
let (status, details) = match result {
Ok(_) => ("success", "Message executed successfully".to_string()),
Err(e) => ("error", format!("Execution failed: {}", e)),
};
if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, status, &details).await {
eprintln!("Failed to send execution acknowledgment: {}", e);
}
}
Err(e) => {
eprintln!("Failed to parse message: {}", e);
}
}
} else if response.status() == reqwest::StatusCode::NO_CONTENT {
// No new messages, continue polling
println!("No new messages from server");
} else {
eprintln!("Server returned error status: {}", response.status());
}
}
Err(e) => {
eprintln!("Failed to reach server: {}", e);
}
} else {
eprintln!("Failed to reach server");
}
// Poll every 5 seconds (or use WebSocket for real-time)
sleep(Duration::from_secs(5)).await;
}
}
async fn send_acknowledgment(
client: &reqwest::Client,
base_url: &str,
message_id: &str,
status: &str,
details: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let ack_url = format!("{}/api/acknowledge", base_url);
let acknowledgment = Acknowledgment {
message_id: message_id.to_string(),
status: status.to_string(),
details: details.to_string(),
};
let response = client
.post(&ack_url)
.json(&acknowledgment)
.send()
.await?;
if response.status().is_success() {
println!("Acknowledgment sent successfully for message {}", message_id);
} else {
eprintln!("Server returned error for acknowledgment: {}", response.status());
}
Ok(())
}

View File

@@ -1,69 +1,55 @@
use std::error::Error;
use crate::models::DiskInfoDetailed;
use std::error::Error;
use anyhow::Result;
use sysinfo::DiskUsage;
use sysinfo::{Component, Components, Disk, Disks, System};
use sysinfo::{Component, Components, Disk, Disks};
use serde::Serialize;
#[derive(Debug)]
#[derive(Serialize, Debug)]
pub struct DiskInfo {
pub total: Option<f64>,
pub used: Option<f64>,
pub free: Option<f64>,
pub total_size: Option<f64>,
pub total_used: Option<f64>,
pub total_available: Option<f64>,
pub total_usage: Option<f64>,
pub detailed_info: Vec<DiskInfoDetailed>,
}
pub async fn get_disk_info() -> Result<DiskInfo> {
pub async fn get_disk_info() -> Result<DiskInfo, Box<dyn std::error::Error + Send + Sync>> {
let disks = Disks::new_with_refreshed_list();
let _disk_types = [
sysinfo::DiskKind::HDD,
sysinfo::DiskKind::SSD,
sysinfo::DiskKind::Unknown(0),
];
let (_, _, _, _) = get_disk_utitlization().unwrap();
let mut total = 0;
let mut used = 0;
let mut detailed_info = Vec::new();
// Collect detailed disk information
for disk in disks.list() {
if disk.total_space() > 100 * 1024 * 1024 {
// > 100MB
total += disk.total_space();
used += disk.total_space() - disk.available_space();
}
}
Ok(DiskInfo {
total: Some(total as f64),
used: Some(used as f64),
free: Some((total - used) as f64),
})
}
pub fn get_disk_utitlization() -> Result<(f64, f64, f64, f64), Box<dyn Error>> {
let mut sys = System::new();
sys.refresh_all();
let mut count = 0;
let mut total_size = 0u64;
let mut total_used = 0u64;
let mut total_available = 0u64;
let disks = Disks::new_with_refreshed_list();
for disk in disks.list() {
// Only print disks with known kind
if disk.kind() == sysinfo::DiskKind::Unknown(0) {
continue;
}
let disk_used = disk.total_space() - disk.available_space();
detailed_info.push(DiskInfoDetailed {
disk_name: disk.name().to_string_lossy().into_owned(),
disk_kind: format!("{:?}", disk.kind()),
disk_total_space: disk.total_space() as f64,
disk_available_space: disk.available_space() as f64,
disk_used_space: disk_used as f64,
disk_mount_point: disk.mount_point().to_string_lossy().into_owned(),
component_disk_label: String::new(),
component_disk_temperature: 0.0,
});
println!(
"Disk_Name: {:?}:\n---- Disk_Kind: {},\n---- Total: {},\n---- Available: {},\n---- Used: {}, \n---- Mount_Point: {:?}",
disk.name(),
disk.kind(),
disk.total_space(),
disk.available_space(),
disk.total_space() - disk.available_space(),
disk_used,
disk.mount_point()
);
}
// Get component temperatures
let components = Components::new_with_refreshed_list();
for component in &components {
if let Some(temperature) = component.temperature() {
@@ -72,59 +58,97 @@ pub fn get_disk_utitlization() -> Result<(f64, f64, f64, f64), Box<dyn Error>> {
component.label(),
temperature
);
// Update detailed info with temperature data if it matches a disk component
for disk_info in &mut detailed_info {
if component.label().contains(&disk_info.disk_name) {
disk_info.component_disk_label = component.label().to_string();
disk_info.component_disk_temperature = temperature;
}
}
}
}
// Berechnungen
let total_size = if count > 0 {
total_size as f64 // in Bytes
// Calculate totals (only disks > 100MB)
let (total_size, total_used, total_available) = calculate_disk_totals(&disks);
let (total_size, total_used, total_available, total_usage) = if total_size > 0.0 {
(total_size, total_used, total_available, (total_used / total_size) * 100.0)
} else {
// Fallback: Versuche df unter Linux
println!("Fallback: Using 'df' command to get disk info.");
#[cfg(target_os = "linux")]
{
use std::process::Command;
if let Ok(output) = Command::new("df")
.arg("-B1")
.arg("--output=size,used")
.output()
{
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines().skip(1) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() == 2 {
if let (Ok(size), Ok(used)) =
(parts[0].parse::<u64>(), parts[1].parse::<u64>())
{
total_size += size;
total_used += used;
count += 1;
}
}
}
total_size as f64 // in Bytes
} else {
0.0
}
}
#[cfg(not(target_os = "linux"))]
{
0.0
match get_disk_info_fallback() {
Ok(fallback_data) => fallback_data,
Err(_) => (0.0, 0.0, 0.0, 0.0), // Default values if fallback fails
}
};
let usage = if total_size > 0.0 {
Ok(DiskInfo {
total_size: if total_size > 0.0 { Some(total_size) } else { None },
total_used: if total_used > 0.0 { Some(total_used) } else { None },
total_available: if total_available > 0.0 { Some(total_available) } else { None },
total_usage: if total_usage > 0.0 { Some(total_usage) } else { None },
detailed_info,
})
}
fn calculate_disk_totals(disks: &Disks) -> (f64, f64, f64) {
let mut total_size = 0u64;
let mut total_used = 0u64;
let mut total_available = 0u64;
for disk in disks.list() {
if disk.total_space() > 100 * 1024 * 1024 { // > 100MB
total_size += disk.total_space();
total_available += disk.available_space();
total_used += disk.total_space() - disk.available_space();
}
}
(total_size as f64, total_used as f64, total_available as f64)
}
#[cfg(target_os = "linux")]
fn get_disk_info_fallback() -> Result<(f64, f64, f64, f64), Box<dyn Error + Send + Sync>> {
use std::process::Command;
let output = Command::new("df")
.arg("-B1")
.arg("--output=size,used,avail")
.output()?;
let stdout = String::from_utf8_lossy(&output.stdout);
let mut total_size = 0u64;
let mut total_used = 0u64;
let mut total_available = 0u64;
let mut count = 0;
for line in stdout.lines().skip(1) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 {
if let (Ok(size), Ok(used), Ok(avail)) = (
parts[0].parse::<u64>(),
parts[1].parse::<u64>(),
parts[2].parse::<u64>(),
) {
total_size += size;
total_used += used;
total_available += avail;
count += 1;
}
}
}
let usage = if total_size > 0 {
(total_used as f64 / total_size as f64) * 100.0
} else {
0.0
};
Ok((
total_size,
total_used as f64,
total_available as f64,
usage as f64,
)) // Disk-Temp bleibt 0.0 ohne spezielle Hardware
Ok((total_size as f64, total_used as f64, total_available as f64, usage))
}
#[cfg(not(target_os = "linux"))]
fn get_disk_info_fallback() -> Result<(f64, f64, f64, f64), Box<dyn Error + Send + Sync>> {
Ok((0.0, 0.0, 0.0, 0.0))
}
pub fn _get_disk_temp_for_component(component: &Component) -> Option<f64> {

View File

@@ -13,6 +13,9 @@ use std::marker::Send;
use std::marker::Sync;
use std::result::Result;
use tokio::task::JoinHandle;
use bollard::Docker;
use crate::serverclientcomm::{get_current_image};
async fn flatten<T>(
handle: JoinHandle<Result<T, Box<dyn Error + Send + Sync>>>,
@@ -26,14 +29,30 @@ async fn flatten<T>(
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let args: Vec<String> = env::args().collect();
// Initialize Docker client
let docker = Docker::connect_with_local_defaults()
.map_err(|e| format!("Failed to connect to Docker: {}", e))?;
// Get current image version
let client_version = match get_current_image(&docker).await {
Ok(Some(version)) => version,
Ok(None) => {
eprintln!("Warning: No image version found");
"unknown".to_string()
}
Err(e) => {
eprintln!("Warning: Could not get current image version: {}", e);
"unknown".to_string()
}
};
println!("Client Version: {}", client_version);
let args: Vec<String> = env::args().collect();
// args[0] is the binary name, args[1] is the first actual argument
if args.len() < 2 {
eprintln!("Usage: {} <server-url>", args[0]);
return Err("Missing server URL argument".into());
}
let server_url = &args[1];
println!("Server URL: {:?}", server_url);
@@ -47,6 +66,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
};
// Start background tasks
// Start server listening for commands
let listening_handle = tokio::spawn({
let docker = docker.clone();
let server_url = server_url.to_string();
async move { api::listening_to_server(&docker, &server_url).await }
});
// Start heartbeat in background
let heartbeat_handle = tokio::spawn({
let ip = ip.clone();
@@ -65,14 +91,16 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
}
});
// Warte auf beide Tasks und prüfe explizit auf Fehler
let (heartbeat_handle, metrics_handle) =
tokio::try_join!(flatten(heartbeat_handle), flatten(metrics_handle))?;
// Wait for all tasks and check for errors
let (listening_result, heartbeat_result, metrics_result) = tokio::try_join!(
flatten(listening_handle),
flatten(heartbeat_handle),
flatten(metrics_handle)
)?;
let (heartbeat, metrics) = (heartbeat_handle, metrics_handle);
println!(
"All tasks completed successfully: {:?}, {:?}.",
heartbeat, metrics
"All tasks completed: listening={:?}, heartbeat={:?}, metrics={:?}",
listening_result, heartbeat_result, metrics_result
);
println!("All tasks completed successfully.");

View File

@@ -62,8 +62,8 @@ impl Collector {
gpu_vram_usage: hardware.gpu.vram_used.unwrap_or_default(),
ram_load: hardware.memory.used.unwrap_or_default(),
ram_size: hardware.memory.total.unwrap_or_default(),
disk_size: hardware.disk.total.unwrap_or_default(),
disk_usage: hardware.disk.used.unwrap_or_default(),
disk_size: hardware.disk.total_size.unwrap_or_default(),
disk_usage: hardware.disk.total_used.unwrap_or_default(),
disk_temp: 0.0, // not supported
net_rx: hardware.network.rx_rate.unwrap_or_default(),
net_tx: hardware.network.tx_rate.unwrap_or_default(),

View File

@@ -51,6 +51,18 @@ pub struct MetricDto {
pub net_tx: f64,
}
#[derive(Serialize, Debug)]
pub struct DiskInfoDetailed {
pub disk_name: String,
pub disk_kind: String,
pub disk_total_space: f64,
pub disk_available_space: f64,
pub disk_used_space: f64,
pub disk_mount_point: String,
pub component_disk_label: String,
pub component_disk_temperature: f32,
}
#[derive(Deserialize)]
pub struct IdResponse {
pub id: i32,
@@ -73,11 +85,17 @@ pub struct HardwareDto {
pub ip_address: String,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "command", content = "data")]
pub enum ServerMessage {
Update(String),
Restart,
#[serde(other)]
Unknown,
#[derive(Debug, Deserialize, Clone)]
pub struct ServerMessage {
// Define your message structure here
pub message_type: String,
pub data: serde_json::Value,
pub message_id: String, // Add an ID for acknowledgment
}
#[derive(Debug, Serialize, Clone)]
pub struct Acknowledgment {
pub message_id: String,
pub status: String, // "success" or "error"
pub details: String,
}

View File

@@ -1,27 +1,46 @@
use crate::models::ServerMessage;
use crate::models::{ServerMessage};
use std::error::Error;
use bollard::Docker;
use bollard::query_parameters::CreateImageOptions;
use bollard::query_parameters::RestartContainerOptions;
use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions, InspectContainerOptions};
use futures_util::StreamExt;
pub fn parse_message(raw: &str) -> ServerMessage {
match raw {
"restart" => ServerMessage::Restart,
msg if msg.starts_with("update:") => ServerMessage::Update(msg[7..].to_string()),
_ => ServerMessage::Unknown,
pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) -> Result<(), Box<dyn Error + Send + Sync>> {
let msg = msg.clone();
println!("Handling server message: {:?}", msg);
// Handle different message types
match msg.message_type.as_str() {
"update_image" => {
if let Some(image_name) = msg.data.get("image").and_then(|v| v.as_str()) {
println!("Received update command for image: {}", image_name);
// Call your update_docker_image function here
update_docker_image(docker, image_name).await?;
Ok(())
} else {
Err("Missing image name in update message".into())
}
}
"restart_container" => {
println!("Received restart container command");
// Call your restart_container function here
restart_container(docker).await?;
Ok(())
}
"stop_agent" => {
println!("Received stop agent command");
// Implement graceful shutdown
std::process::exit(0);
}
_ => {
eprintln!("Unknown message type: {}", msg.message_type);
Err(format!("Unknown message type: {}", msg.message_type).into())
}
}
}
pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) {
match msg {
ServerMessage::Update(version) => update_docker_image(docker, &version).await,
ServerMessage::Restart => restart_container(docker).await,
ServerMessage::Unknown => eprintln!("Unknown message"),
}
}
pub async fn update_docker_image(docker: &Docker, image: &str) {
pub async fn update_docker_image(docker: &Docker, image: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
println!("Updating to {}", image);
// 1. Pull new image
@@ -50,10 +69,149 @@ pub async fn update_docker_image(docker: &Docker, image: &str) {
}
// 2. Restart the current container
restart_container(docker).await;
let _ = restart_container(docker).await;
Ok(())
}
pub async fn restart_container(docker: &Docker) {
pub async fn get_current_image(docker: &Docker) -> Result<Option<String>, Box<dyn Error + Send + Sync>> {
// Get the current container ID from /proc/self/cgroup
let container_id = match std::fs::read_to_string("/proc/self/cgroup") {
Ok(content) => {
content
.lines()
.find_map(|line| {
// Look for any line that might contain container information
if line.contains("docker") || line.contains("crio") || line.contains("containerd") || line.contains("kubepods") {
// Extract the container ID from the line
extract_container_id(line)
} else {
None
}
})
}
Err(e) => {
eprintln!("Error reading cgroup file: {}", e);
return Ok(None);
}
};
// Debug: Print what we found
println!("Container ID search result: {:?}", container_id);
let container_id = match container_id {
Some(id) if !id.is_empty() => {
println!("Found container ID: '{}'", id);
id
}
_ => {
// Debug the cgroup file content
if let Ok(content) = std::fs::read_to_string("/proc/self/cgroup") {
eprintln!("Cgroup file content for debugging:");
for (i, line) in content.lines().enumerate() {
eprintln!("Line {}: {}", i, line);
}
}
eprintln!("Could not find valid container ID in cgroup");
return Ok(None);
}
};
// Clean up the container ID - remove any non-hex characters
let clean_container_id: String = container_id
.chars()
.filter(|c| c.is_ascii_hexdigit())
.collect();
// Validate the container ID (should be 64 characters for full ID, but short ID might work too)
if clean_container_id.is_empty() {
eprintln!("Container ID contains no hex characters: '{}'", container_id);
return Ok(None);
}
println!("Using container ID: '{}' (cleaned: '{}')", container_id, clean_container_id);
// Try with both the original and cleaned ID
let ids_to_try = vec![&clean_container_id, &container_id];
for id in ids_to_try {
println!("Attempting to inspect container with ID: '{}'", id);
match docker.inspect_container(id, None::<InspectContainerOptions>).await {
Ok(container_info) => {
if let Some(config) = container_info.config {
if let Some(image) = config.image {
println!("Successfully found image: {}", image);
return Ok(Some(image));
}
}
eprintln!("Container inspected but no image found in config");
return Ok(Some("unknown".to_string()));
}
Err(e) => {
eprintln!("Error inspecting container with ID '{}': {}", id, e);
// Continue to try the next ID
}
}
}
eprintln!("All attempts to inspect container failed");
Ok(None)
}
fn extract_container_id(line: &str) -> Option<String> {
let parts: Vec<&str> = line.split('/').collect();
if let Some(last_part) = parts.last() {
let last_part = last_part.trim();
println!("Processing cgroup line part: '{}'", last_part);
// Common patterns for container IDs in cgroups:
// Pattern 1: Full container ID (64-character hex)
if last_part.len() == 64 && last_part.chars().all(|c| c.is_ascii_hexdigit()) {
println!("Found full container ID: {}", last_part);
return Some(last_part.to_string());
}
// Pattern 2: docker-<short_id>.scope
if last_part.starts_with("docker-") && last_part.ends_with(".scope") {
let id = last_part
.trim_start_matches("docker-")
.trim_end_matches(".scope")
.to_string();
println!("Found docker scope ID: {}", id);
return Some(id);
}
// Pattern 3: <short_id> (12-character hex or similar)
if last_part.chars().all(|c| c.is_ascii_hexdigit()) && last_part.len() >= 12 {
println!("Found hex ID: {}", last_part);
return Some(last_part.to_string());
}
// Pattern 4: Try to extract ID from any string that looks like it contains a hex ID
if let Some(caps) = regex::Regex::new(r"[a-fA-F0-9]{12,64}")
.ok()
.and_then(|re| re.find(last_part))
{
let id = caps.as_str().to_string();
println!("Extracted ID via regex: {}", id);
return Some(id);
}
// Pattern 5: If nothing else matches, try the last part as-is
if !last_part.is_empty() {
println!("Using last part as ID: {}", last_part);
return Some(last_part.to_string());
}
}
None
}
pub async fn restart_container(docker: &Docker) -> Result<(), Box<dyn Error + Send + Sync>> {
if let Ok(container_id) = std::env::var("HOSTNAME") {
println!("Restarting container {}", container_id);
if let Err(e) = docker.restart_container(&container_id, Some(RestartContainerOptions { signal: None, t: Some(0) }))
@@ -64,4 +222,6 @@ pub async fn restart_container(docker: &Docker) {
} else {
eprintln!("No container ID found (HOSTNAME not set?)");
}
Ok(())
}