From bfeb43f38a6b277b10c774c74962d40719e4ad46 Mon Sep 17 00:00:00 2001 From: donpat1to Date: Fri, 3 Oct 2025 20:42:35 +0200 Subject: [PATCH] added docker management --- WatcherAgent/src/api.rs | 69 ++++-- WatcherAgent/src/docker/container.rs | 210 ++++++++++++++--- WatcherAgent/src/docker/mod.rs | 235 +++++++++++++++----- WatcherAgent/src/docker/serverclientcomm.rs | 111 ++------- WatcherAgent/src/docker/stats.rs | 206 +++++++++++++++++ WatcherAgent/src/main.rs | 57 +++-- WatcherAgent/src/metrics.rs | 5 +- WatcherAgent/src/models.rs | 30 ++- 8 files changed, 668 insertions(+), 255 deletions(-) create mode 100644 WatcherAgent/src/docker/stats.rs diff --git a/WatcherAgent/src/api.rs b/WatcherAgent/src/api.rs index 9041032..ae66900 100644 --- a/WatcherAgent/src/api.rs +++ b/WatcherAgent/src/api.rs @@ -1,5 +1,3 @@ - - /// # API Module /// /// This module provides all HTTP communication between WatcherAgent and the backend server. @@ -14,9 +12,11 @@ /// These functions are called from the main agent loop and background tasks. All network operations are asynchronous and robust to transient failures. use std::time::Duration; -use crate::hardware::HardwareInfo; -use crate::models::{HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage, Acknowledgment}; use crate::docker::serverclientcomm::handle_server_message; +use crate::hardware::HardwareInfo; +use crate::models::{ + Acknowledgment, HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage, +}; use anyhow::Result; use reqwest::{Client, StatusCode}; @@ -57,7 +57,7 @@ pub async fn register_with_server( // Prepare registration data let registration = RegistrationDto { - id: server_id, + server_id: server_id, ip_address: registered_ip.clone(), cpu_type: hardware.cpu.name.clone().unwrap_or_default(), cpu_cores: (hardware.cpu.cores).unwrap_or_default(), @@ -224,10 +224,13 @@ pub async fn send_metrics( /// /// # Returns /// * `Result<(), Box>` - Ok if commands are handled successfully. -pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(), Box> { +pub async fn listening_to_server( + docker: &Docker, + base_url: &str, +) -> Result<(), Box> { let url = format!("{}/api/message", base_url); let client = reqwest::Client::new(); - + loop { // Get message from server let resp = client.get(&url).send().await; @@ -238,20 +241,36 @@ pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(), match response.json::().await { Ok(msg) => { // Acknowledge receipt immediately - if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, "received", "Message received successfully").await { + if let Err(e) = send_acknowledgment( + &client, + base_url, + &msg.message_id, + "received", + "Message received successfully", + ) + .await + { eprintln!("Failed to send receipt acknowledgment: {}", e); } - + // Handle the message let result = handle_server_message(docker, msg.clone()).await; - + // Send execution result acknowledgment let (status, details) = match result { Ok(_) => ("success", "Message executed successfully".to_string()), Err(e) => ("error", format!("Execution failed: {}", e)), }; - - if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, status, &details).await { + + if let Err(e) = send_acknowledgment( + &client, + base_url, + &msg.message_id, + status, + &details, + ) + .await + { eprintln!("Failed to send execution acknowledgment: {}", e); } } @@ -297,24 +316,26 @@ async fn send_acknowledgment( details: &str, ) -> Result<(), Box> { let ack_url = format!("{}/api/acknowledge", base_url); - + let acknowledgment = Acknowledgment { message_id: message_id.to_string(), status: status.to_string(), details: details.to_string(), }; - - let response = client - .post(&ack_url) - .json(&acknowledgment) - .send() - .await?; - + + let response = client.post(&ack_url).json(&acknowledgment).send().await?; + if response.status().is_success() { - println!("Acknowledgment sent successfully for message {}", message_id); + println!( + "Acknowledgment sent successfully for message {}", + message_id + ); } else { - eprintln!("Server returned error for acknowledgment: {}", response.status()); + eprintln!( + "Server returned error for acknowledgment: {}", + response.status() + ); } - + Ok(()) -} \ No newline at end of file +} diff --git a/WatcherAgent/src/docker/container.rs b/WatcherAgent/src/docker/container.rs index 3430bdb..80eab2f 100644 --- a/WatcherAgent/src/docker/container.rs +++ b/WatcherAgent/src/docker/container.rs @@ -1,15 +1,17 @@ - //! Docker container utilities for WatcherAgent //! //! Provides functions to list and process Docker containers using the Bollard library. //! -use crate::models::DockerContainer; +use crate::docker::stats; +use crate::docker::stats::{ContainerCpuInfo, ContainerNetworkInfo}; +use crate::models::{DockerContainerDto, DockerContainerRegistrationDto}; -use bollard::query_parameters::{ListContainersOptions}; +use bollard::query_parameters::{ + CreateImageOptions, ListContainersOptions, RestartContainerOptions, +}; use bollard::Docker; - - - +use futures_util::StreamExt; +use std::error::Error; /// Returns a list of available Docker containers. /// @@ -18,54 +20,60 @@ use bollard::Docker; /// /// # Returns /// * `Vec` - Vector of Docker container info. -pub async fn get_available_container(docker: &Docker) -> Vec { +pub async fn get_available_containers(docker: &Docker) -> Vec { println!("=== DOCKER CONTAINER LIST ==="); - + let options = Some(ListContainersOptions { all: true, ..Default::default() }); - + let containers_list = match docker.list_containers(options).await { Ok(containers) => { println!("Available containers ({}):", containers.len()); - containers.into_iter() + containers + .into_iter() .filter_map(|container| { container.id.as_ref()?; // Skip if no ID - + let id = container.id?; let short_id = if id.len() > 12 { &id[..12] } else { &id }; - //let short_id: u32 = short_string_id.trim().parse().unwrap(); - let name = container.names + let name = container + .names .and_then(|names| names.into_iter().next()) .map(|name| name.trim_start_matches('/').to_string()) .unwrap_or_else(|| "unknown".to_string()); - - let image = container.image + + let image = container + .image .as_ref() .map(|img| img.to_string()) .unwrap_or_else(|| "unknown".to_string()); - - let status = container.status - .as_ref() - .map(|s| match s.to_lowercase().as_str() { - s if s.contains("up") || s.contains("running") => "running".to_string(), - s if s.contains("exited") || s.contains("stopped") => "stopped".to_string(), - _ => s.to_string(), - }) - .unwrap_or_else(|| "unknown".to_string()); - - println!(" - ID: {}, Image: {}, Name: {}", short_id, container.image.unwrap(), name); - - Some(DockerContainer { - ID: short_id.to_string(), + + /*let status = container + .status + .as_ref() + .map(|s| match s.to_lowercase().as_str() { + s if s.contains("up") || s.contains("running") => "running".to_string(), + s if s.contains("exited") || s.contains("stopped") => { + "stopped".to_string() + } + _ => s.to_string(), + }) + .unwrap_or_else(|| "unknown".to_string()); + + println!( + " - ID: {}, Image: {}, Name: {}", + short_id, + container.image.unwrap(), + name + );*/ + + Some(DockerContainerDto { + id: short_id.to_string(), image, - Name: name, - Status: status, - _net_in: 0.0, - _net_out: 0.0, - _cpu_load: 0.0, + name: name, }) }) .collect() @@ -75,10 +83,96 @@ pub async fn get_available_container(docker: &Docker) -> Vec { Vec::new() } }; - + containers_list } +/// Pulls a new Docker image and restarts the current container. +/// +/// # Arguments +/// * `docker` - Reference to a Bollard Docker client. +/// * `image` - The name of the Docker image to pull. +/// +/// # Returns +/// * `Result<(), Box>` - Ok if updated successfully, error otherwise. +pub async fn update_docker_image( + docker: &Docker, + image: &str, +) -> Result<(), Box> { + println!("Updating to {}", image); + + // 1. Pull new image + let mut stream = docker.create_image( + Some(CreateImageOptions { + from_image: Some(image.to_string()), + ..Default::default() + }), + None, + None, + ); + + // Use the stream with proper trait bounds + while let Some(result) = StreamExt::next(&mut stream).await { + match result { + Ok(progress) => { + if let Some(status) = progress.status { + println!("Pull status: {}", status); + } + } + Err(e) => { + eprintln!("Error pulling image: {}", e); + break; + } + } + } + + // 2. Restart the current container + let options = Some(ListContainersOptions { + all: true, + ..Default::default() + }); + let container_id = docker + .list_containers(options) + .await? + .into_iter() + .find_map(|c| { + c.image + .as_ref() + .and_then(|img| if img == image { c.id } else { None }) + }); + let _ = restart_container(docker, &container_id.unwrap()).await; + + Ok(()) +} + +/// Restarts the agent's own Docker container. +/// +/// # Arguments +/// * `docker` - Reference to a Bollard Docker client. +/// +/// # Returns +/// * `Result<(), Box>` - Ok if restarted successfully, error otherwise. +pub async fn restart_container( + docker: &Docker, + container_id: &str, +) -> Result<(), Box> { + println!("Restarting container {}", container_id); + if let Err(e) = docker + .restart_container( + &container_id.to_string(), + Some(RestartContainerOptions { + signal: None, + t: Some(0), + }), + ) + .await + { + eprintln!("Failed to restart container: {}", e); + } + + Ok(()) +} + /* /// Extracts a Docker container ID from a string line. /// @@ -90,4 +184,48 @@ pub async fn get_available_container(docker: &Docker) -> Vec { pub fn extract_client_container_id(line: &str) -> Option { // ...existing code... } -*/ \ No newline at end of file +*/ + +/// Gets network statistics for a specific container +pub async fn get_network_stats( + docker: &Docker, + container_id: &str, +) -> Result> { + let (_, net_info) = stats::get_single_container_stats(docker, container_id).await?; + + if let Some(net_info) = net_info { + Ok(net_info) + } else { + // Return default network info if not found + Ok(ContainerNetworkInfo { + container_id: container_id.to_string(), + rx_bytes: 0, + tx_bytes: 0, + rx_packets: 0, + tx_packets: 0, + rx_errors: 0, + tx_errors: 0, + }) + } +} + +/// Gets CPU statistics for a specific container +pub async fn get_cpu_stats( + docker: &Docker, + container_id: &str, +) -> Result> { + let (cpu_info, _) = stats::get_single_container_stats(docker, container_id).await?; + + if let Some(cpu_info) = cpu_info { + Ok(cpu_info) + } else { + // Return default CPU info if not found + Ok(ContainerCpuInfo { + container_id: container_id.to_string(), + cpu_usage_percent: 0.0, + system_cpu_usage: 0, + container_cpu_usage: 0, + online_cpus: 1, + }) + } +} diff --git a/WatcherAgent/src/docker/mod.rs b/WatcherAgent/src/docker/mod.rs index 6f54ea0..642e228 100644 --- a/WatcherAgent/src/docker/mod.rs +++ b/WatcherAgent/src/docker/mod.rs @@ -1,4 +1,3 @@ - /// # Docker Module /// /// This module provides Docker integration for WatcherAgent, including container enumeration, statistics, and lifecycle management. @@ -10,71 +9,195 @@ /// pub mod container; pub mod serverclientcomm; +pub mod stats; +use crate::models::{DockerContainerDto, DockerContainerMetricDto}; +use bollard::{query_parameters::InspectContainerOptions, Docker}; use std::error::Error; -use crate::models::DockerContainer; - -/// Aggregated Docker statistics for all managed containers. -/// -/// # Fields -/// - `number`: Number of running containers (optional) -/// - `net_in_total`: Total network receive rate in **bytes per second (B/s)** (optional) -/// - `net_out_total`: Total network transmit rate in **bytes per second (B/s)** (optional) -/// - `dockers`: List of [`DockerContainer`] statistics (optional) +/// Main Docker manager that holds the Docker client and provides all operations #[derive(Debug, Clone)] -pub struct DockerInfo { - pub number: Option, - pub net_in_total: Option, - pub net_out_total: Option, - pub dockers: Option>, +pub struct DockerManager { + pub docker: Docker, } - -impl DockerInfo { - /// Collects Docker statistics for all managed containers. - /// - /// # Returns - /// * `Result>` - Aggregated Docker statistics or error if collection fails. - pub async fn collect() -> Result> { - Ok(Self { number: None, net_in_total: None, net_out_total: None, dockers: None }) +impl Default for DockerManager { + fn default() -> Self { + Self { + docker: Docker::connect_with_local_defaults() + .unwrap_or_else(|e| panic!("Failed to create default Docker connection: {}", e)), + } } } +impl DockerManager { + /// Creates a new DockerManager instance + pub fn new() -> Result> { + let docker = Docker::connect_with_local_defaults() + .map_err(|e| format!("Failed to connect to Docker: {}", e))?; -impl DockerContainer { - /* - /// Restarts the specified Docker container by ID. - /// - /// # Arguments - /// * `docker` - Reference to a Bollard Docker client - /// - /// # Returns - /// * `Result<(), Box>` - Ok if restarted successfully, error otherwise. - pub async fn restart_container(docker: &Docker) -> Result<(), Box> { - // ...existing code... - } - */ - - /// Returns the container ID for a given [`DockerContainer`]. - /// - /// # Arguments - /// * `container` - Reference to a [`DockerContainer`] - /// - /// # Returns - /// * `Result>` - Container ID as integer. - pub async fn get_docker_container_id(container: DockerContainer) -> Result> { - Ok(container.ID) + Ok(Self { docker }) } - /// Returns the image name for a given [`DockerContainer`]. - /// - /// # Arguments - /// * `container` - Reference to a [`DockerContainer`] - /// - /// # Returns - /// * `Result>` - Image name as string. - pub async fn get_docker_container_image(container: DockerContainer) -> Result> { - Ok(container.image) + /// Creates a DockerManager instance with optional Docker connection + pub fn new_optional() -> Option { + Docker::connect_with_local_defaults() + .map(|docker| Self { docker }) + .ok() } -} \ No newline at end of file + + /// Finds the Docker container running the agent by image name + pub async fn get_client_container( + &self, + ) -> Result, Box> { + let containers = container::get_available_containers(&self.docker).await; + let client_image = "watcher-agent"; + + Ok(containers + .into_iter() + .find(|c| c.image == client_image) + .map(|container| DockerContainerDto { + id: container.id, + image: container.image, + name: container.name, + })) + } + + /// Gets the current client version (image name) if running in Docker + pub async fn get_client_version(&self) -> String { + match self.get_client_container().await { + Ok(Some(container)) => container.image, + Ok(None) => { + eprintln!("Warning: No WatcherAgent container found"); + "unknown".to_string() + } + Err(e) => { + eprintln!("Warning: Could not get current image version: {}", e); + "unknown".to_string() + } + } + } + + /// Checks if Docker is available and the agent is running in a container + pub async fn is_dockerized(&self) -> bool { + self.get_client_container() + .await + .map(|c| c.is_some()) + .unwrap_or(false) + } + + /// Gets all available containers as DTOs for registration + pub async fn get_containers_for_registration( + &self, + ) -> Result, Box> { + let containers = container::get_available_containers(&self.docker).await; + + Ok(containers + .into_iter() + .map(|container| DockerContainerDto { + id: container.id, + image: container.image, + name: container.name, + }) + .collect()) + } + + /// Gets container metrics for all containers + pub async fn get_container_metrics( + &self, + ) -> Result, Box> { + let containers = container::get_available_containers(&self.docker).await; + let mut metrics = Vec::new(); + + for container in containers { + // Get network stats (you'll need to implement this in container.rs) + let network_stats = container::get_network_stats(&self.docker, &container.id).await?; + // Get CPU stats (you'll need to implement this in container.rs) + let cpu_stats = container::get_cpu_stats(&self.docker, &container.id).await?; + + // Get current status by inspecting the container + let status = match self + .docker + .inspect_container(&container.id, None::) + .await + { + Ok(container_info) => { + // Extract status from container state and convert to string + container_info + .state + .and_then(|state| state.status) + .map(|status_enum| { + match status_enum { + bollard::models::ContainerStateStatusEnum::CREATED => "created", + bollard::models::ContainerStateStatusEnum::RUNNING => "running", + bollard::models::ContainerStateStatusEnum::PAUSED => "paused", + bollard::models::ContainerStateStatusEnum::RESTARTING => { + "restarting" + } + bollard::models::ContainerStateStatusEnum::REMOVING => "removing", + bollard::models::ContainerStateStatusEnum::EXITED => "exited", + bollard::models::ContainerStateStatusEnum::DEAD => "dead", + bollard::secret::ContainerStateStatusEnum::EMPTY => todo!(), + } + .to_string() + }) + .unwrap_or_else(|| "unknown".to_string()) + } + Err(_) => "unknown".to_string(), + }; + + metrics.push(DockerContainerMetricDto { + id: container.id, + status: status, + network: network_stats, + cpu: cpu_stats, + }); + } + + Ok(metrics) + } + + /// Gets the number of running containers + pub async fn get_container_count(&self) -> Result> { + let containers = container::get_available_containers(&self.docker).await; + Ok(containers.len()) + } + + /// Restarts a specific container by ID + pub async fn restart_container( + &self, + container_id: &str, + ) -> Result<(), Box> { + container::restart_container(&self.docker, container_id).await + } + + /// Gets total network statistics across all containers + pub async fn get_total_network_stats( + &self, + ) -> Result<(u64, u64), Box> { + let metrics = self.get_container_metrics().await?; + + let net_in_total: u64 = metrics.iter().map(|m| m.network.rx_bytes).sum(); + let net_out_total: u64 = metrics.iter().map(|m| m.network.tx_bytes).sum(); + + Ok((net_in_total, net_out_total)) + } +} + +// Keep these as utility functions if needed, but they should use DockerManager internally +impl DockerContainerDto { + /// Returns the container ID + pub fn id(&self) -> &str { + &self.id + } + + /// Returns the image name + pub fn image(&self) -> &str { + &self.image + } + + /// Returns the container name + pub fn name(&self) -> &str { + &self.name + } +} diff --git a/WatcherAgent/src/docker/serverclientcomm.rs b/WatcherAgent/src/docker/serverclientcomm.rs index 284453c..0620e55 100644 --- a/WatcherAgent/src/docker/serverclientcomm.rs +++ b/WatcherAgent/src/docker/serverclientcomm.rs @@ -1,15 +1,13 @@ - //! Server-client communication utilities for WatcherAgent //! //! Handles server commands, Docker image updates, and container management using the Bollard library. //! -use crate::models::{DockerContainer, ServerMessage}; -use crate::docker::container::{get_available_container}; +use crate::models::ServerMessage; -use std::error::Error; -use bollard::Docker; +use super::container::{restart_container, update_docker_image}; use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions}; -use futures_util::StreamExt; +use bollard::Docker; +use std::error::Error; /// Handles a message from the backend server and dispatches the appropriate action. /// @@ -19,7 +17,10 @@ use futures_util::StreamExt; /// /// # Returns /// * `Result<(), Box>` - Ok if handled successfully, error otherwise. -pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) -> Result<(), Box> { +pub async fn handle_server_message( + docker: &Docker, + msg: ServerMessage, +) -> Result<(), Box> { let msg = msg.clone(); println!("Handling server message: {:?}", msg); @@ -36,10 +37,14 @@ pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) -> Resul } } "restart_container" => { - println!("Received restart container command"); - // Call your restart_container function here - restart_container(docker).await?; - Ok(()) + if let Some(image_name) = msg.data.get("image").and_then(|v| v.as_str()) { + println!("Received restart command for image: {}", image_name); + // Call your update_docker_image function here + update_docker_image(docker, image_name).await?; + Ok(()) + } else { + Err("Missing image name in update message".into()) + } } "stop_agent" => { println!("Received stop agent command"); @@ -52,87 +57,3 @@ pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) -> Resul } } } - -/// Pulls a new Docker image and restarts the current container. -/// -/// # Arguments -/// * `docker` - Reference to a Bollard Docker client. -/// * `image` - The name of the Docker image to pull. -/// -/// # Returns -/// * `Result<(), Box>` - Ok if updated successfully, error otherwise. -pub async fn update_docker_image(docker: &Docker, image: &str) -> Result<(), Box> { - println!("Updating to {}", image); - - // 1. Pull new image - let mut stream = docker.create_image( - Some(CreateImageOptions { - from_image: Some(image.to_string()), - ..Default::default() - }), - None, - None, - ); - - // Use the stream with proper trait bounds - while let Some(result) = StreamExt::next(&mut stream).await { - match result { - Ok(progress) => { - if let Some(status) = progress.status { - println!("Pull status: {}", status); - } - } - Err(e) => { - eprintln!("Error pulling image: {}", e); - break; - } - } - } - - // 2. Restart the current container - let _ = restart_container(docker).await; - - Ok(()) -} - -/// Finds the Docker container running the agent by image name. -/// -/// # Arguments -/// * `docker` - Reference to a Bollard Docker client. -/// -/// # Returns -/// * `Result, Box>` - The agent's container info if found. -pub async fn get_client_container(docker: &Docker) -> Result, Box> { - let containers = get_available_container(docker).await; - let client_image = "watcher-agent"; - - // Find container with the specific image - if let Some(container) = containers.iter().find(|c| c.image == client_image) { - Ok(Some(container.clone())) - } else { - Ok(None) - } -} - -/// Restarts the agent's own Docker container. -/// -/// # Arguments -/// * `docker` - Reference to a Bollard Docker client. -/// -/// # Returns -/// * `Result<(), Box>` - Ok if restarted successfully, error otherwise. -pub async fn restart_container(docker: &Docker) -> Result<(), Box> { - if let Ok(Some(container)) = get_client_container(docker).await { - let container_id = container.clone().ID; - println!("Restarting container {}", container_id); - if let Err(e) = docker.restart_container(&container_id.to_string(), Some(RestartContainerOptions { signal: None, t: Some(0) })) - .await - { - eprintln!("Failed to restart container: {}", e); - } - } else { - eprintln!("No container ID found (HOSTNAME not set?)"); - } - - Ok(()) -} \ No newline at end of file diff --git a/WatcherAgent/src/docker/stats.rs b/WatcherAgent/src/docker/stats.rs new file mode 100644 index 0000000..ed87640 --- /dev/null +++ b/WatcherAgent/src/docker/stats.rs @@ -0,0 +1,206 @@ +use bollard::query_parameters::{ListContainersOptions, StatsOptions}; +use bollard::Docker; +use futures_util::stream::TryStreamExt; +use serde::{Deserialize, Serialize}; +use std::error::Error; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ContainerCpuInfo { + pub container_id: String, + pub cpu_usage_percent: f64, + pub system_cpu_usage: u64, + pub container_cpu_usage: u64, + pub online_cpus: u32, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ContainerNetworkInfo { + pub container_id: String, + pub rx_bytes: u64, + pub tx_bytes: u64, + pub rx_packets: u64, + pub tx_packets: u64, + pub rx_errors: u64, + pub tx_errors: u64, +} + +/// Get container statistics for all containers using an existing Docker client +pub async fn get_container_stats( + docker: &Docker, +) -> Result<(Vec, Vec), Box> { + let containers = docker + .list_containers(Some(ListContainersOptions { + all: true, + ..Default::default() + })) + .await?; + + let mut cpu_infos = Vec::new(); + let mut net_infos = Vec::new(); + + for container in containers { + let id = container.id.unwrap_or_default(); + + // Skip if no ID + if id.is_empty() { + continue; + } + + let mut stats_stream = docker.stats( + &id, + Some(StatsOptions { + stream: false, + one_shot: true, + }), + ); + + if let Some(stats) = stats_stream.try_next().await? { + // CPU Info + if let (Some(cpu_stats), Some(precpu_stats)) = (&stats.cpu_stats, &stats.precpu_stats) { + if let (Some(cpu_usage), Some(pre_cpu_usage)) = + (&cpu_stats.cpu_usage, &precpu_stats.cpu_usage) + { + let cpu_delta = cpu_usage + .total_usage + .unwrap_or(0) + .saturating_sub(pre_cpu_usage.total_usage.unwrap_or(0)); + + let system_delta = cpu_stats + .system_cpu_usage + .unwrap_or(0) + .saturating_sub(precpu_stats.system_cpu_usage.unwrap_or(0)); + + let online_cpus = cpu_stats.online_cpus.unwrap_or(1); + + let cpu_percent = if system_delta > 0 && online_cpus > 0 { + (cpu_delta as f64 / system_delta as f64) * online_cpus as f64 * 100.0 + } else { + 0.0 + }; + + cpu_infos.push(ContainerCpuInfo { + container_id: id.clone(), + cpu_usage_percent: cpu_percent, + system_cpu_usage: cpu_stats.system_cpu_usage.unwrap_or(0), + container_cpu_usage: cpu_usage.total_usage.unwrap_or(0), + online_cpus, + }); + } + } + + // Network Info + if let Some(networks) = stats.networks { + for (_name, net) in networks { + net_infos.push(ContainerNetworkInfo { + container_id: id.clone(), + rx_bytes: net.rx_bytes.unwrap(), + tx_bytes: net.tx_bytes.unwrap(), + rx_packets: net.rx_packets.unwrap(), + tx_packets: net.tx_packets.unwrap(), + rx_errors: net.rx_errors.unwrap(), + tx_errors: net.tx_errors.unwrap(), + }); + } + } + } + } + + Ok((cpu_infos, net_infos)) +} + +/// Get container statistics for a specific container +pub async fn get_single_container_stats( + docker: &Docker, + container_id: &str, +) -> Result<(Option, Option), Box> +{ + let mut stats_stream = docker.stats( + container_id, + Some(StatsOptions { + stream: false, + one_shot: true, + }), + ); + + if let Some(stats) = stats_stream.try_next().await? { + let mut cpu_info = None; + let mut net_info = None; + + // CPU Info + if let (Some(cpu_stats), Some(precpu_stats)) = (&stats.cpu_stats, &stats.precpu_stats) { + if let (Some(cpu_usage), Some(pre_cpu_usage)) = + (&cpu_stats.cpu_usage, &precpu_stats.cpu_usage) + { + let cpu_delta = cpu_usage + .total_usage + .unwrap_or(0) + .saturating_sub(pre_cpu_usage.total_usage.unwrap_or(0)); + + let system_delta = cpu_stats + .system_cpu_usage + .unwrap_or(0) + .saturating_sub(precpu_stats.system_cpu_usage.unwrap_or(0)); + + let online_cpus = cpu_stats.online_cpus.unwrap_or(1); + + let cpu_percent = if system_delta > 0 && online_cpus > 0 { + (cpu_delta as f64 / system_delta as f64) * online_cpus as f64 * 100.0 + } else { + 0.0 + }; + + cpu_info = Some(ContainerCpuInfo { + container_id: container_id.to_string(), + cpu_usage_percent: cpu_percent, + system_cpu_usage: cpu_stats.system_cpu_usage.unwrap_or(0), + container_cpu_usage: cpu_usage.total_usage.unwrap_or(0), + online_cpus, + }); + } + } + + // Network Info + if let Some(networks) = stats.networks { + // Take the first network interface (usually eth0) + if let Some((_name, net)) = networks.into_iter().next() { + net_info = Some(ContainerNetworkInfo { + container_id: container_id.to_string(), + rx_bytes: net.rx_bytes.unwrap(), + tx_bytes: net.tx_bytes.unwrap(), + rx_packets: net.rx_packets.unwrap(), + tx_packets: net.tx_packets.unwrap(), + rx_errors: net.rx_errors.unwrap(), + tx_errors: net.tx_errors.unwrap(), + }); + } + } + + Ok((cpu_info, net_info)) + } else { + Ok((None, None)) + } +} + +/// Get total network statistics across all containers +pub async fn get_total_network_stats( + docker: &Docker, +) -> Result<(u64, u64), Box> { + let (_, net_infos) = get_container_stats(docker).await?; + + let total_rx: u64 = net_infos.iter().map(|net| net.rx_bytes).sum(); + let total_tx: u64 = net_infos.iter().map(|net| net.tx_bytes).sum(); + + Ok((total_rx, total_tx)) +} + +/// Get average CPU usage across all containers +pub async fn get_average_cpu_usage(docker: &Docker) -> Result> { + let (cpu_infos, _) = get_container_stats(docker).await?; + + if cpu_infos.is_empty() { + return Ok(0.0); + } + + let total_cpu: f64 = cpu_infos.iter().map(|cpu| cpu.cpu_usage_percent).sum(); + Ok(total_cpu / cpu_infos.len() as f64) +} diff --git a/WatcherAgent/src/main.rs b/WatcherAgent/src/main.rs index db508a9..1550029 100644 --- a/WatcherAgent/src/main.rs +++ b/WatcherAgent/src/main.rs @@ -1,4 +1,3 @@ - /// # WatcherAgent /// /// **WatcherAgent** is a cross-platform system monitoring agent written in Rust. @@ -26,18 +25,16 @@ /// ``` /// /// The agent will register itself, start collecting metrics, and listen for remote commands. - pub mod api; +pub mod docker; pub mod hardware; pub mod metrics; pub mod models; -pub mod docker; -use tokio::task::JoinHandle; + use bollard::Docker; use std::env; use std::error::Error; - - +use tokio::task::JoinHandle; /// Awaits a spawned asynchronous task and flattens its nested `Result` type. /// @@ -82,26 +79,8 @@ async fn flatten( /// Returns an error if registration or any background task fails, or if required arguments are missing. #[tokio::main] async fn main() -> Result<(), Box> { - // Initialize Docker client - let docker = Docker::connect_with_local_defaults() - .map_err(|e| format!("Failed to connect to Docker: {}", e))?; - - // Get current image version - let client_version = match docker::serverclientcomm::get_client_container(&docker).await { - Ok(Some(version)) => version.image, - Ok(None) => { - eprintln!("Warning: No image version found"); - "unknown".to_string() - } - Err(e) => { - eprintln!("Warning: Could not get current image version: {}", e); - "unknown".to_string() - } - }; - println!("Client Version: {}", client_version); - + // Parse command-line arguments let args: Vec = env::args().collect(); - // args[0] is the binary name, args[1] is the first actual argument if args.len() < 2 { eprintln!("Usage: {} ", args[0]); return Err("Missing server URL argument".into()); @@ -118,13 +97,29 @@ async fn main() -> Result<(), Box> { } }; + // Initialize Docker (optional - agent can run without Docker) + let docker_manager = docker::DockerManager::new_optional(); + + // Get current image version + let client_version = if let Some(ref docker_manager) = docker_manager { + docker_manager.get_client_version().await + } else { + "unknown".to_string() + }; + println!("Client Version: {}", client_version); + // Start background tasks - // Start server listening for commands - let listening_handle = tokio::spawn({ - let docker = docker.clone(); - let server_url = server_url.to_string(); - async move { api::listening_to_server(&docker, &server_url).await } - }); + // Start server listening for commands (only if Docker is available) + let listening_handle = if let Some(docker_manager) = docker_manager { + tokio::spawn({ + let docker = docker_manager.docker.clone(); + let server_url = server_url.to_string(); + async move { api::listening_to_server(&docker, &server_url).await } + }) + } else { + println!("Docker not available, skipping server command listener."); + tokio::spawn(async { Ok(()) }) // Dummy task + }; // Start heartbeat in background let heartbeat_handle = tokio::spawn({ diff --git a/WatcherAgent/src/metrics.rs b/WatcherAgent/src/metrics.rs index 60d6b6c..22d8beb 100644 --- a/WatcherAgent/src/metrics.rs +++ b/WatcherAgent/src/metrics.rs @@ -1,5 +1,3 @@ - - /// # Metrics Module /// /// This module orchestrates the collection and reporting of hardware and network metrics for WatcherAgent. @@ -15,11 +13,11 @@ use std::error::Error; use std::time::Duration; use crate::api; +//use crate::docker::DockerInfo; use crate::hardware::network::NetworkMonitor; use crate::hardware::HardwareInfo; use crate::models::MetricDto; - /// Main orchestrator for hardware and network metric collection and reporting. /// /// The `Collector` struct manages the state required to collect metrics and send them to the backend server. It maintains a network monitor for bandwidth tracking, the agent's server ID, and its IP address. @@ -34,7 +32,6 @@ pub struct Collector { ip_address: String, } - impl Collector { /// Creates a new `Collector` instance for metric collection and reporting. /// diff --git a/WatcherAgent/src/models.rs b/WatcherAgent/src/models.rs index 1fb398d..34e92f5 100644 --- a/WatcherAgent/src/models.rs +++ b/WatcherAgent/src/models.rs @@ -9,6 +9,8 @@ /// /// ## Usage /// These types are serialized/deserialized for HTTP communication and used throughout the agent for data exchange. +use crate::docker::stats; + use serde::{Deserialize, Serialize}; /// Registration data sent to the backend server. @@ -23,7 +25,7 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Debug)] pub struct RegistrationDto { #[serde(rename = "id")] - pub id: i32, + pub server_id: i32, #[serde(rename = "ipAddress")] pub ip_address: String, #[serde(rename = "cpuType")] @@ -184,12 +186,22 @@ pub struct Acknowledgment { /// - `_net_out`: Network transmit rate in **bytes per second (B/s)** /// - `_cpu_load`: CPU usage as a percentage (**0.0–100.0**) #[derive(Debug, Serialize, Clone)] -pub struct DockerContainer { - pub ID: String, - pub image: String, - pub Name: String, - pub Status: String, // "running";"stopped";others - pub _net_in: f64, - pub _net_out: f64, - pub _cpu_load: f64, +pub struct DockerContainerRegistrationDto { + pub server_id: u32, + pub containers: Vec, +} + +#[derive(Debug, Serialize, Clone)] +pub struct DockerContainerDto { + pub id: String, + pub image: String, + pub name: String, +} + +#[derive(Debug, Serialize, Clone)] +pub struct DockerContainerMetricDto { + pub id: String, + pub status: String, // "running";"stopped";others + pub network: stats::ContainerNetworkInfo, + pub cpu: stats::ContainerCpuInfo, }