From 67b24b33aaa15773e58d252efb42e27bff2bc93f Mon Sep 17 00:00:00 2001 From: donpat1to Date: Sat, 27 Sep 2025 21:34:30 +0200 Subject: [PATCH] added inital server communcation task --- WatcherAgent/src/api.rs | 89 +++++++++++++++++++++++----- WatcherAgent/src/main.rs | 40 ++++++++++--- WatcherAgent/src/models.rs | 20 ++++--- WatcherAgent/src/serverclientcomm.rs | 68 ++++++++++++++------- 4 files changed, 165 insertions(+), 52 deletions(-) diff --git a/WatcherAgent/src/api.rs b/WatcherAgent/src/api.rs index f8a54be..a663041 100644 --- a/WatcherAgent/src/api.rs +++ b/WatcherAgent/src/api.rs @@ -1,16 +1,15 @@ -use crate::serverclientcomm::handle_server_message; - use std::time::Duration; use crate::hardware::HardwareInfo; -use crate::models::{HeartbeatDto, IdResponse, MetricDto, RegistrationDto}; +use crate::models::{HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage, Acknowledgment}; +use crate::serverclientcomm::handle_server_message; + use anyhow::Result; use reqwest::{Client, StatusCode}; use std::error::Error; use tokio::time::sleep; use bollard::Docker; -use crate::models::ServerMessage; pub async fn register_with_server( base_url: &str, @@ -159,24 +158,84 @@ pub async fn send_metrics( Ok(()) } -pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(), Box>{ +pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(), Box> { let url = format!("{}/api/message", base_url); + let client = reqwest::Client::new(); + loop { - // Replace with your server endpoint - let resp = reqwest::get(&url) - .await; + // Get message from server + let resp = client.get(&url).send().await; - if let Ok(resp) = resp { - if let Ok(msg) = resp.json::().await { - handle_server_message(docker, msg).await; - } else { - eprintln!("Failed to parse message"); + match resp { + Ok(response) => { + if response.status().is_success() { + match response.json::().await { + Ok(msg) => { + // Acknowledge receipt immediately + if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, "received", "Message received successfully").await { + eprintln!("Failed to send receipt acknowledgment: {}", e); + } + + // Handle the message + let result = handle_server_message(docker, msg.clone()).await; + + // Send execution result acknowledgment + let (status, details) = match result { + Ok(_) => ("success", "Message executed successfully".to_string()), + Err(e) => ("error", format!("Execution failed: {}", e)), + }; + + if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, status, &details).await { + eprintln!("Failed to send execution acknowledgment: {}", e); + } + } + Err(e) => { + eprintln!("Failed to parse message: {}", e); + } + } + } else if response.status() == reqwest::StatusCode::NO_CONTENT { + // No new messages, continue polling + println!("No new messages from server"); + } else { + eprintln!("Server returned error status: {}", response.status()); + } + } + Err(e) => { + eprintln!("Failed to reach server: {}", e); } - } else { - eprintln!("Failed to reach server"); } // Poll every 5 seconds (or use WebSocket for real-time) sleep(Duration::from_secs(5)).await; } +} + +async fn send_acknowledgment( + client: &reqwest::Client, + base_url: &str, + message_id: &str, + status: &str, + details: &str, +) -> Result<(), Box> { + let ack_url = format!("{}/api/acknowledge", base_url); + + let acknowledgment = Acknowledgment { + message_id: message_id.to_string(), + status: status.to_string(), + details: details.to_string(), + }; + + let response = client + .post(&ack_url) + .json(&acknowledgment) + .send() + .await?; + + if response.status().is_success() { + println!("Acknowledgment sent successfully for message {}", message_id); + } else { + eprintln!("Server returned error for acknowledgment: {}", response.status()); + } + + Ok(()) } \ No newline at end of file diff --git a/WatcherAgent/src/main.rs b/WatcherAgent/src/main.rs index b1a84a5..58d08d9 100644 --- a/WatcherAgent/src/main.rs +++ b/WatcherAgent/src/main.rs @@ -13,6 +13,9 @@ use std::marker::Send; use std::marker::Sync; use std::result::Result; use tokio::task::JoinHandle; +use bollard::Docker; + +use crate::serverclientcomm::{get_current_image}; async fn flatten( handle: JoinHandle>>, @@ -26,14 +29,26 @@ async fn flatten( #[tokio::main] async fn main() -> Result<(), Box> { - let args: Vec = env::args().collect(); + // Initialize Docker client + let docker = Docker::connect_with_local_defaults() + .map_err(|e| format!("Failed to connect to Docker: {}", e))?; + // Get current image version + let client_version = match get_current_image(&docker).await { + Ok(version) => version.unwrap(), + Err(e) => { + eprintln!("Warning: Could not get current image version: {}", e); + "unknown".to_string() + } + }; + println!("Client Version: {}", client_version); + + let args: Vec = env::args().collect(); // args[0] is the binary name, args[1] is the first actual argument if args.len() < 2 { eprintln!("Usage: {} ", args[0]); return Err("Missing server URL argument".into()); } - let server_url = &args[1]; println!("Server URL: {:?}", server_url); @@ -47,6 +62,13 @@ async fn main() -> Result<(), Box> { }; // Start background tasks + // Start server listening for commands + let listening_handle = tokio::spawn({ + let docker = docker.clone(); + let server_url = server_url.to_string(); + async move { api::listening_to_server(&docker, &server_url).await } + }); + // Start heartbeat in background let heartbeat_handle = tokio::spawn({ let ip = ip.clone(); @@ -65,14 +87,16 @@ async fn main() -> Result<(), Box> { } }); - // Warte auf beide Tasks und prüfe explizit auf Fehler - let (heartbeat_handle, metrics_handle) = - tokio::try_join!(flatten(heartbeat_handle), flatten(metrics_handle))?; + // Wait for all tasks and check for errors + let (listening_result, heartbeat_result, metrics_result) = tokio::try_join!( + flatten(listening_handle), + flatten(heartbeat_handle), + flatten(metrics_handle) + )?; - let (heartbeat, metrics) = (heartbeat_handle, metrics_handle); println!( - "All tasks completed successfully: {:?}, {:?}.", - heartbeat, metrics + "All tasks completed: listening={:?}, heartbeat={:?}, metrics={:?}", + listening_result, heartbeat_result, metrics_result ); println!("All tasks completed successfully."); diff --git a/WatcherAgent/src/models.rs b/WatcherAgent/src/models.rs index 1433fdd..2242069 100644 --- a/WatcherAgent/src/models.rs +++ b/WatcherAgent/src/models.rs @@ -73,11 +73,17 @@ pub struct HardwareDto { pub ip_address: String, } -#[derive(Debug, Deserialize)] -#[serde(tag = "command", content = "data")] -pub enum ServerMessage { - Update(String), - Restart, - #[serde(other)] - Unknown, +#[derive(Debug, Deserialize, Clone)] +pub struct ServerMessage { + // Define your message structure here + pub message_type: String, + pub data: serde_json::Value, + pub message_id: String, // Add an ID for acknowledgment +} + +#[derive(Debug, Serialize, Clone)] +pub struct Acknowledgment { + pub message_id: String, + pub status: String, // "success" or "error" + pub details: String, } \ No newline at end of file diff --git a/WatcherAgent/src/serverclientcomm.rs b/WatcherAgent/src/serverclientcomm.rs index 75ac14e..e7ccfa7 100644 --- a/WatcherAgent/src/serverclientcomm.rs +++ b/WatcherAgent/src/serverclientcomm.rs @@ -1,26 +1,46 @@ -use crate::models::ServerMessage; +use crate::models::{ServerMessage}; + +use std::error::Error; use bollard::Docker; use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions, InspectContainerOptions}; use futures_util::StreamExt; -pub fn parse_message(raw: &str) -> ServerMessage { - match raw { - "restart" => ServerMessage::Restart, - msg if msg.starts_with("update:") => ServerMessage::Update(msg[7..].to_string()), - _ => ServerMessage::Unknown, +pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) -> Result<(), Box> { + let msg = msg.clone(); + println!("Handling server message: {:?}", msg); + + // Handle different message types + match msg.message_type.as_str() { + "update_image" => { + if let Some(image_name) = msg.data.get("image").and_then(|v| v.as_str()) { + println!("Received update command for image: {}", image_name); + // Call your update_docker_image function here + update_docker_image(docker, image_name).await?; + Ok(()) + } else { + Err("Missing image name in update message".into()) + } + } + "restart_container" => { + println!("Received restart container command"); + // Call your restart_container function here + restart_container(docker).await?; + Ok(()) + } + "stop_agent" => { + println!("Received stop agent command"); + // Implement graceful shutdown + std::process::exit(0); + } + _ => { + eprintln!("Unknown message type: {}", msg.message_type); + Err(format!("Unknown message type: {}", msg.message_type).into()) + } } } -pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) { - match msg { - ServerMessage::Update(version) => update_docker_image(docker, &version).await, - ServerMessage::Restart => restart_container(docker).await, - ServerMessage::Unknown => eprintln!("Unknown message"), - } -} - -pub async fn update_docker_image(docker: &Docker, image: &str) { +pub async fn update_docker_image(docker: &Docker, image: &str) -> Result<(), Box> { println!("Updating to {}", image); // 1. Pull new image @@ -49,10 +69,12 @@ pub async fn update_docker_image(docker: &Docker, image: &str) { } // 2. Restart the current container - restart_container(docker).await; + let _ = restart_container(docker).await; + + Ok(()) } -pub async fn get_current_image(docker: &Docker) -> Option { +pub async fn get_current_image(docker: &Docker) -> Result, Box> { // Get the current container ID from /proc/self/cgroup let container_id = match std::fs::read_to_string("/proc/self/cgroup") { Ok(content) => { @@ -68,7 +90,7 @@ pub async fn get_current_image(docker: &Docker) -> Option { } Err(e) => { eprintln!("Error reading cgroup file: {}", e); - return None; + return Ok(None); } }; @@ -76,23 +98,23 @@ pub async fn get_current_image(docker: &Docker) -> Option { Some(id) => id, None => { eprintln!("Could not find container ID in cgroup"); - return None; + return Ok(None); } }; // Inspect the current container to get its image match docker.inspect_container(&container_id, None::).await { Ok(container_info) => { - container_info.config.map(|config| config.image.unwrap_or_else(|| "unknown".to_string())) + Ok(container_info.config.map(|config| config.image.unwrap_or_else(|| "unknown".to_string()))) } Err(e) => { eprintln!("Error inspecting container: {}", e); - None + Ok(None) } } } -pub async fn restart_container(docker: &Docker) { +pub async fn restart_container(docker: &Docker) -> Result<(), Box> { if let Ok(container_id) = std::env::var("HOSTNAME") { println!("Restarting container {}", container_id); if let Err(e) = docker.restart_container(&container_id, Some(RestartContainerOptions { signal: None, t: Some(0) })) @@ -103,4 +125,6 @@ pub async fn restart_container(docker: &Docker) { } else { eprintln!("No container ID found (HOSTNAME not set?)"); } + + Ok(()) } \ No newline at end of file