Compare commits
13 Commits
a7cae5e93f
...
v0.1.29
| Author | SHA1 | Date | |
|---|---|---|---|
| a8ccb0521a | |||
| c90a276dca | |||
| dc4c23f9d9 | |||
| 3182d57539 | |||
| 8c1ef7f9f6 | |||
| 16020eea50 | |||
| 432a798210 | |||
| a095444222 | |||
| 5e7bc3df54 | |||
| 1c7a169956 | |||
| c7bce926e9 | |||
| 711083daa0 | |||
| 06cec6ff9f |
@@ -15,11 +15,13 @@ use std::time::Duration;
|
||||
use crate::docker::serverclientcomm::handle_server_message;
|
||||
use crate::hardware::HardwareInfo;
|
||||
use crate::models::{
|
||||
Acknowledgment, HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage, DockerMetricDto
|
||||
Acknowledgment, DockerContainer, DockerMetricDto, DockerRegistrationDto, HeartbeatDto,
|
||||
IdResponse, MetricDto, RegistrationDto, ServerMessage,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use reqwest::{Client, StatusCode};
|
||||
use serde::Serialize;
|
||||
use std::error::Error;
|
||||
use tokio::time::sleep;
|
||||
|
||||
@@ -151,6 +153,85 @@ async fn get_server_id_by_ip(
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcasts Docker container information to the monitoring server for service discovery.
|
||||
///
|
||||
/// This function sends the current Docker container configuration to the server
|
||||
/// to register available containers and enable service monitoring. It will
|
||||
/// continuously retry until successful, making it suitable for initial
|
||||
/// registration scenarios.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `base_url` - The base URL of the monitoring server API (e.g., "https://monitoring.example.com")
|
||||
/// * `server_id` - The ID of the server to associate the containers with
|
||||
/// * `container_dto` - Mutable reference to Docker container information for broadcast
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Ok(())` - When container information is successfully broadcasted to the server
|
||||
/// * `Err(Box<dyn Error + Send + Sync>)` - If an unrecoverable error occurs (though the function typically retries on transient failures)
|
||||
///
|
||||
/// # Behavior
|
||||
///
|
||||
/// This function operates in a retry loop with the following characteristics:
|
||||
///
|
||||
/// - **Retry Logic**: Attempts broadcast every 10 seconds until successful
|
||||
/// - **Mutation**: Modifies the `container_dto` to set the `server_id` before sending
|
||||
/// - **TLS**: Accepts invalid TLS certificates for development environments
|
||||
/// - **Logging**: Provides detailed console output about broadcast attempts and results
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This function may return an error in the following cases:
|
||||
///
|
||||
/// * **HTTP Client Creation**: Failed to create HTTP client with TLS configuration
|
||||
/// * **Network Issues**: Persistent connection failures to the backend server
|
||||
/// * **Server Errors**: Backend returns non-success HTTP status codes repeatedly
|
||||
/// * **JSON Serialization**: Cannot serialize container data (should be rare with proper DTOs)
|
||||
pub async fn broadcast_docker_containers(
|
||||
base_url: &str,
|
||||
server_id: u16,
|
||||
container_dto: &DockerRegistrationDto,
|
||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
// First get local IP
|
||||
println!("Preparing to broadcast docker containers...");
|
||||
// Create HTTP client for registration
|
||||
let client = Client::builder()
|
||||
.danger_accept_invalid_certs(true)
|
||||
.build()?;
|
||||
|
||||
// Prepare registration data
|
||||
let mut broadcast_data = container_dto.clone();
|
||||
broadcast_data.server_id = server_id;
|
||||
|
||||
// Try to register (will retry on failure)
|
||||
loop {
|
||||
println!("Attempting to broadcast containers...");
|
||||
let url = format!("{}/monitoring/service-discovery", base_url);
|
||||
match client.post(&url).json(&container_dto).send().await {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
println!(
|
||||
"✅ Successfully broadcasted following docker container: {:?}",
|
||||
container_dto
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Ok(resp) => {
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
println!(
|
||||
"⚠️ Broadcasting failed ({}): {} (will retry in 10 seconds)",
|
||||
status, text
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
println!("⚠️ Broadcasting failed: {} (will retry in 10 seconds)", err);
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Periodically sends heartbeat signals to the backend server to indicate agent liveness.
|
||||
///
|
||||
/// This function runs in a background task and will retry on network errors.
|
||||
@@ -359,7 +440,3 @@ pub async fn send_docker_metrics(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn broadcast_docker_containers() {
|
||||
// Placeholder for future implementation
|
||||
}
|
||||
@@ -4,7 +4,7 @@
|
||||
//!
|
||||
use crate::docker::stats;
|
||||
use crate::docker::stats::{ContainerCpuInfo, ContainerNetworkInfo};
|
||||
use crate::models::{DockerRegistrationDto, DockerMetricDto, DockerContainer};
|
||||
use crate::models::DockerContainer;
|
||||
|
||||
use bollard::query_parameters::{
|
||||
CreateImageOptions, ListContainersOptions, RestartContainerOptions,
|
||||
@@ -178,14 +178,15 @@ pub async fn get_network_stats(
|
||||
Ok(net_info)
|
||||
} else {
|
||||
// Return default network info if not found
|
||||
println!("No network info found for container {}", container_id);
|
||||
Ok(ContainerNetworkInfo {
|
||||
container_id: container_id.to_string(),
|
||||
rx_bytes: 0,
|
||||
tx_bytes: 0,
|
||||
rx_packets: 0,
|
||||
tx_packets: 0,
|
||||
rx_errors: 0,
|
||||
tx_errors: 0,
|
||||
container_id: Some(container_id.to_string()),
|
||||
rx_bytes: None,
|
||||
tx_bytes: None,
|
||||
rx_packets: None,
|
||||
tx_packets: None,
|
||||
rx_errors: None,
|
||||
tx_errors: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -201,12 +202,13 @@ pub async fn get_cpu_stats(
|
||||
Ok(cpu_info)
|
||||
} else {
|
||||
// Return default CPU info if not found
|
||||
println!("No CPU info found for container {}", container_id);
|
||||
Ok(ContainerCpuInfo {
|
||||
container_id: container_id.to_string(),
|
||||
cpu_usage_percent: 0.0,
|
||||
system_cpu_usage: 0,
|
||||
container_cpu_usage: 0,
|
||||
online_cpus: 1,
|
||||
container_id: Some(container_id.to_string()),
|
||||
cpu_usage_percent: None,
|
||||
system_cpu_usage: None,
|
||||
container_cpu_usage: None,
|
||||
online_cpus: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,8 +11,11 @@ pub mod container;
|
||||
pub mod serverclientcomm;
|
||||
pub mod stats;
|
||||
|
||||
use crate::models::{DockerRegistrationDto, DockerMetricDto, DockerContainer, DockerContainerInfo};
|
||||
use bollard::{query_parameters::InspectContainerOptions, Docker};
|
||||
use crate::models::{
|
||||
DockerCollectMetricDto, DockerContainer, DockerContainerCpuDto, DockerContainerInfo,
|
||||
DockerContainerNetworkDto, DockerContainerRamDto, DockerMetricDto, DockerRegistrationDto,
|
||||
};
|
||||
use bollard::Docker;
|
||||
use std::error::Error;
|
||||
|
||||
/// Main Docker manager that holds the Docker client and provides all operations
|
||||
@@ -60,14 +63,20 @@ impl DockerManager {
|
||||
id: container.id,
|
||||
image: container.image,
|
||||
name: container.name,
|
||||
|
||||
}))
|
||||
}
|
||||
|
||||
/// Gets the current client version (image name) if running in Docker
|
||||
pub async fn get_client_version(&self) -> String {
|
||||
match self.get_client_container().await {
|
||||
Ok(Some(container)) => container.image.clone().unwrap().split(':').next().unwrap_or("unknown").to_string(),
|
||||
Ok(Some(container)) => container
|
||||
.image
|
||||
.clone()
|
||||
.unwrap()
|
||||
.split(':')
|
||||
.next()
|
||||
.unwrap_or("unknown")
|
||||
.to_string(),
|
||||
Ok(None) => {
|
||||
println!("Warning: No WatcherAgent container found");
|
||||
"unknown".to_string()
|
||||
@@ -118,31 +127,86 @@ impl DockerManager {
|
||||
}
|
||||
|
||||
/// Collects Docker metrics for all containers
|
||||
pub async fn collect_metrics(
|
||||
&self,
|
||||
) -> Result<DockerMetricDto, Box<dyn Error + Send + Sync>> {
|
||||
pub async fn collect_metrics(&self) -> Result<DockerMetricDto, Box<dyn Error + Send + Sync>> {
|
||||
let containers = self.get_containers().await?;
|
||||
let (cpu_stats, net_stats, mem_stats) = stats::get_container_stats(&self.docker).await?;
|
||||
if let Some(first_container) = containers.first() {
|
||||
println!("Debug: Testing stats for container {}", first_container.id);
|
||||
let _ = self.debug_container_stats(&first_container.id).await;
|
||||
}
|
||||
|
||||
let container_infos: Vec<_> = containers
|
||||
// Get stats with proper error handling
|
||||
let stats_result = stats::get_container_stats(&self.docker).await;
|
||||
let (cpu_stats, net_stats, mem_stats) = match stats_result {
|
||||
Ok(stats) => stats,
|
||||
Err(e) => {
|
||||
eprintln!("Warning: Failed to get container stats: {}", e);
|
||||
// Return empty stats instead of failing completely
|
||||
(Vec::new(), Vec::new(), Vec::new())
|
||||
}
|
||||
};
|
||||
|
||||
println!(
|
||||
"Debug: Found {} containers, {} CPU stats, {} network stats, {} memory stats",
|
||||
containers.len(),
|
||||
cpu_stats.len(),
|
||||
net_stats.len(),
|
||||
mem_stats.len()
|
||||
);
|
||||
|
||||
let container_infos_total: Vec<_> = containers
|
||||
.into_iter()
|
||||
.map(|container| {
|
||||
// Use short ID for matching (first 12 chars)
|
||||
let container_short_id = if container.id.len() > 12 {
|
||||
&container.id[..12]
|
||||
} else {
|
||||
&container.id
|
||||
};
|
||||
|
||||
let cpu = cpu_stats
|
||||
.iter()
|
||||
.find(|c| c.container_id == container.id)
|
||||
.find(|c| {
|
||||
c.container_id
|
||||
.as_ref()
|
||||
.map(|id| id.starts_with(container_short_id))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.cloned();
|
||||
|
||||
let network = net_stats
|
||||
.iter()
|
||||
.find(|n| n.container_id == container.id)
|
||||
.find(|n| {
|
||||
n.container_id
|
||||
.as_ref()
|
||||
.map(|id| id.starts_with(container_short_id))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.cloned();
|
||||
|
||||
let ram = mem_stats
|
||||
.iter()
|
||||
.find(|m| m.container_id == container.id)
|
||||
.find(|m| {
|
||||
m.container_id
|
||||
.as_ref()
|
||||
.map(|id| id.starts_with(container_short_id))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.cloned();
|
||||
|
||||
// Debug output for this container
|
||||
if cpu.is_none() || network.is_none() || ram.is_none() {
|
||||
println!(
|
||||
"Debug: Container {} - CPU: {:?}, Network: {:?}, RAM: {:?}",
|
||||
container_short_id,
|
||||
cpu.is_some(),
|
||||
network.is_some(),
|
||||
ram.is_some()
|
||||
);
|
||||
}
|
||||
|
||||
DockerContainerInfo {
|
||||
container: Some(container),
|
||||
status: None, // Status can be fetched if needed
|
||||
status: None,
|
||||
cpu,
|
||||
network,
|
||||
ram,
|
||||
@@ -150,13 +214,112 @@ impl DockerManager {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let container_infos: Vec<DockerCollectMetricDto> = container_infos_total
|
||||
.into_iter()
|
||||
.filter_map(|info| {
|
||||
let container = match info.container {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
eprintln!("Warning: Container info missing container data, skipping");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
// Safely handle CPU data with defaults
|
||||
let cpu_dto = if let Some(cpu) = info.cpu {
|
||||
DockerContainerCpuDto {
|
||||
cpu_load: cpu.cpu_usage_percent,
|
||||
}
|
||||
} else {
|
||||
DockerContainerCpuDto { cpu_load: None }
|
||||
};
|
||||
|
||||
// Safely handle RAM data with defaults
|
||||
let ram_dto = if let Some(ram) = info.ram {
|
||||
DockerContainerRamDto {
|
||||
ram_load: ram.memory_usage_percent,
|
||||
}
|
||||
} else {
|
||||
DockerContainerRamDto { ram_load: None }
|
||||
};
|
||||
|
||||
// Safely handle network data with defaults
|
||||
let network_dto = if let Some(net) = info.network {
|
||||
DockerContainerNetworkDto {
|
||||
net_in: net.rx_bytes.map(|bytes| bytes as f64),
|
||||
net_out: net.tx_bytes.map(|bytes| bytes as f64),
|
||||
}
|
||||
} else {
|
||||
DockerContainerNetworkDto {
|
||||
net_in: None,
|
||||
net_out: None,
|
||||
}
|
||||
};
|
||||
|
||||
Some(DockerCollectMetricDto {
|
||||
id: container.id,
|
||||
cpu: cpu_dto,
|
||||
ram: ram_dto,
|
||||
network: network_dto,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
let dto = DockerMetricDto {
|
||||
server_id: 0,
|
||||
server_id: 0, // This should be set by the caller
|
||||
containers: serde_json::to_string(&container_infos)?,
|
||||
};
|
||||
|
||||
Ok(dto)
|
||||
}
|
||||
|
||||
pub async fn create_registration_dto(
|
||||
&self,
|
||||
) -> Result<DockerRegistrationDto, Box<dyn Error + Send + Sync>> {
|
||||
let containers = self.get_containers().await?;
|
||||
let dto = DockerRegistrationDto {
|
||||
server_id: 0, // This will be set by the caller
|
||||
containers, // Fallback to empty array
|
||||
};
|
||||
|
||||
Ok(dto)
|
||||
}
|
||||
|
||||
/// Debug function to check stats collection for a specific container
|
||||
pub async fn debug_container_stats(
|
||||
&self,
|
||||
container_id: &str,
|
||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
println!("=== DEBUG STATS FOR CONTAINER {} ===", container_id);
|
||||
|
||||
let (cpu_info, net_info, mem_info) =
|
||||
stats::get_single_container_stats(&self.docker, container_id).await?;
|
||||
|
||||
println!("CPU Info: {:?}", cpu_info);
|
||||
println!("Network Info: {:?}", net_info);
|
||||
println!("Memory Info: {:?}", mem_info);
|
||||
|
||||
// Also try the individual stats functions
|
||||
println!("--- Individual CPU Stats ---");
|
||||
match stats::cpu::get_single_container_cpu_stats(&self.docker, container_id).await {
|
||||
Ok(cpu) => println!("CPU: {:?}", cpu),
|
||||
Err(e) => println!("CPU Error: {}", e),
|
||||
}
|
||||
|
||||
println!("--- Individual Network Stats ---");
|
||||
match stats::network::get_single_container_network_stats(&self.docker, container_id).await {
|
||||
Ok(net) => println!("Network: {:?}", net),
|
||||
Err(e) => println!("Network Error: {}", e),
|
||||
}
|
||||
|
||||
println!("--- Individual Memory Stats ---");
|
||||
match stats::ram::get_single_container_memory_stats(&self.docker, container_id).await {
|
||||
Ok(mem) => println!("Memory: {:?}", mem),
|
||||
Err(e) => println!("Memory Error: {}", e),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Keep these as utility functions if needed, but they should use DockerManager internally
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
use crate::models::ServerMessage;
|
||||
|
||||
use super::container::{restart_container, update_docker_image};
|
||||
use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions};
|
||||
//use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions};
|
||||
use bollard::Docker;
|
||||
use std::error::Error;
|
||||
|
||||
@@ -40,7 +40,7 @@ pub async fn handle_server_message(
|
||||
if let Some(image_name) = msg.data.get("image").and_then(|v| v.as_str()) {
|
||||
println!("Received restart command for image: {}", image_name);
|
||||
// Call your update_docker_image function here
|
||||
update_docker_image(docker, image_name).await?;
|
||||
restart_container(docker, image_name).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err("Missing image name in update message".into())
|
||||
|
||||
@@ -70,11 +70,11 @@ pub async fn get_single_container_cpu_stats(
|
||||
};
|
||||
|
||||
return Ok(Some(ContainerCpuInfo {
|
||||
container_id: container_id.to_string(),
|
||||
cpu_usage_percent: cpu_percent,
|
||||
system_cpu_usage: cpu_stats.system_cpu_usage.unwrap_or(0),
|
||||
container_cpu_usage: cpu_usage.total_usage.unwrap_or(0),
|
||||
online_cpus,
|
||||
container_id: Some(container_id.to_string()),
|
||||
cpu_usage_percent: Some(cpu_percent),
|
||||
system_cpu_usage: Some(cpu_stats.system_cpu_usage.unwrap_or(0)),
|
||||
container_cpu_usage: Some(cpu_usage.total_usage.unwrap_or(0)),
|
||||
online_cpus: Some(online_cpus),
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -91,6 +91,9 @@ pub async fn get_average_cpu_usage(docker: &Docker) -> Result<f64, Box<dyn Error
|
||||
return Ok(0.0);
|
||||
}
|
||||
|
||||
let total_cpu: f64 = cpu_infos.iter().map(|cpu| cpu.cpu_usage_percent).sum();
|
||||
let total_cpu: f64 = cpu_infos
|
||||
.iter()
|
||||
.map(|cpu| cpu.cpu_usage_percent.unwrap())
|
||||
.sum();
|
||||
Ok(total_cpu / cpu_infos.len() as f64)
|
||||
}
|
||||
@@ -6,30 +6,30 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ContainerCpuInfo {
|
||||
pub container_id: String,
|
||||
pub cpu_usage_percent: f64,
|
||||
pub system_cpu_usage: u64,
|
||||
pub container_cpu_usage: u64,
|
||||
pub online_cpus: u32,
|
||||
pub container_id: Option<String>,
|
||||
pub cpu_usage_percent: Option<f64>,
|
||||
pub system_cpu_usage: Option<u64>,
|
||||
pub container_cpu_usage: Option<u64>,
|
||||
pub online_cpus: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ContainerNetworkInfo {
|
||||
pub container_id: String,
|
||||
pub rx_bytes: u64,
|
||||
pub tx_bytes: u64,
|
||||
pub rx_packets: u64,
|
||||
pub tx_packets: u64,
|
||||
pub rx_errors: u64,
|
||||
pub tx_errors: u64,
|
||||
pub container_id: Option<String>,
|
||||
pub rx_bytes: Option<u64>,
|
||||
pub tx_bytes: Option<u64>,
|
||||
pub rx_packets: Option<u64>,
|
||||
pub tx_packets: Option<u64>,
|
||||
pub rx_errors: Option<u64>,
|
||||
pub tx_errors: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ContainerMemoryInfo {
|
||||
pub container_id: String,
|
||||
pub memory_usage: u64,
|
||||
pub memory_limit: u64,
|
||||
pub memory_usage_percent: f64,
|
||||
pub container_id: Option<String>,
|
||||
pub memory_usage: Option<u64>,
|
||||
pub memory_limit: Option<u64>,
|
||||
pub memory_usage_percent: Option<f64>,
|
||||
}
|
||||
|
||||
use bollard::Docker;
|
||||
@@ -38,7 +38,14 @@ use std::error::Error;
|
||||
/// Get container statistics for all containers using an existing Docker client
|
||||
pub async fn get_container_stats(
|
||||
docker: &Docker,
|
||||
) -> Result<(Vec<ContainerCpuInfo>, Vec<ContainerNetworkInfo>, Vec<ContainerMemoryInfo>), Box<dyn Error + Send + Sync>> {
|
||||
) -> Result<
|
||||
(
|
||||
Vec<ContainerCpuInfo>,
|
||||
Vec<ContainerNetworkInfo>,
|
||||
Vec<ContainerMemoryInfo>,
|
||||
),
|
||||
Box<dyn Error + Send + Sync>,
|
||||
> {
|
||||
let cpu_infos = cpu::get_all_containers_cpu_stats(docker).await?;
|
||||
let net_infos = network::get_all_containers_network_stats(docker).await?;
|
||||
let mem_infos = ram::get_all_containers_memory_stats(docker).await?;
|
||||
@@ -50,8 +57,14 @@ pub async fn get_container_stats(
|
||||
pub async fn get_single_container_stats(
|
||||
docker: &Docker,
|
||||
container_id: &str,
|
||||
) -> Result<(Option<ContainerCpuInfo>, Option<ContainerNetworkInfo>, Option<ContainerMemoryInfo>), Box<dyn Error + Send + Sync>>
|
||||
{
|
||||
) -> Result<
|
||||
(
|
||||
Option<ContainerCpuInfo>,
|
||||
Option<ContainerNetworkInfo>,
|
||||
Option<ContainerMemoryInfo>,
|
||||
),
|
||||
Box<dyn Error + Send + Sync>,
|
||||
> {
|
||||
let cpu_info = cpu::get_single_container_cpu_stats(docker, container_id).await?;
|
||||
let net_info = network::get_single_container_network_stats(docker, container_id).await?;
|
||||
let mem_info = ram::get_single_container_memory_stats(docker, container_id).await?;
|
||||
|
||||
@@ -51,13 +51,13 @@ pub async fn get_single_container_network_stats(
|
||||
// Take the first network interface (usually eth0)
|
||||
if let Some((_name, net)) = networks.into_iter().next() {
|
||||
return Ok(Some(ContainerNetworkInfo {
|
||||
container_id: container_id.to_string(),
|
||||
rx_bytes: net.rx_bytes.unwrap(),
|
||||
tx_bytes: net.tx_bytes.unwrap(),
|
||||
rx_packets: net.rx_packets.unwrap(),
|
||||
tx_packets: net.tx_packets.unwrap(),
|
||||
rx_errors: net.rx_errors.unwrap(),
|
||||
tx_errors: net.tx_errors.unwrap(),
|
||||
container_id: Some(container_id.to_string()),
|
||||
rx_bytes: net.rx_bytes,
|
||||
tx_bytes: net.tx_bytes,
|
||||
rx_packets: net.rx_packets,
|
||||
tx_packets: net.tx_packets,
|
||||
rx_errors: net.rx_errors,
|
||||
tx_errors: net.tx_errors,
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -72,8 +72,8 @@ pub async fn get_total_network_stats(
|
||||
) -> Result<(u64, u64), Box<dyn Error + Send + Sync>> {
|
||||
let net_infos = get_all_containers_network_stats(docker).await?;
|
||||
|
||||
let total_rx: u64 = net_infos.iter().map(|net| net.rx_bytes).sum();
|
||||
let total_tx: u64 = net_infos.iter().map(|net| net.tx_bytes).sum();
|
||||
let total_rx: u64 = net_infos.iter().map(|net| net.rx_bytes.unwrap()).sum();
|
||||
let total_tx: u64 = net_infos.iter().map(|net| net.tx_bytes.unwrap()).sum();
|
||||
|
||||
Ok((total_rx, total_tx))
|
||||
}
|
||||
@@ -58,10 +58,10 @@ pub async fn get_single_container_memory_stats(
|
||||
};
|
||||
|
||||
return Ok(Some(ContainerMemoryInfo {
|
||||
container_id: container_id.to_string(),
|
||||
memory_usage,
|
||||
memory_limit,
|
||||
memory_usage_percent,
|
||||
container_id: Some(container_id.to_string()),
|
||||
memory_usage: Some(memory_usage),
|
||||
memory_limit: Some(memory_limit),
|
||||
memory_usage_percent: Some(memory_usage_percent),
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -72,6 +72,6 @@ pub async fn get_single_container_memory_stats(
|
||||
/// Get total memory usage across all containers
|
||||
pub async fn get_total_memory_usage(docker: &Docker) -> Result<u64, Box<dyn Error + Send + Sync>> {
|
||||
let mem_infos = get_all_containers_memory_stats(docker).await?;
|
||||
let total_memory: u64 = mem_infos.iter().map(|mem| mem.memory_usage).sum();
|
||||
let total_memory: u64 = mem_infos.iter().map(|mem| mem.memory_usage.unwrap()).sum();
|
||||
Ok(total_memory)
|
||||
}
|
||||
@@ -31,10 +31,8 @@ pub mod hardware;
|
||||
pub mod metrics;
|
||||
pub mod models;
|
||||
|
||||
use bollard::Docker;
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// Awaits a spawned asynchronous task and flattens its nested `Result` type.
|
||||
@@ -94,7 +92,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
Ok((id, ip)) => {
|
||||
println!("Registered with server. ID: {}, IP: {}", id, ip);
|
||||
(id, ip)
|
||||
},
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Fehler bei der Registrierung am Server: {e}");
|
||||
return Err(e);
|
||||
@@ -112,6 +110,19 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
};
|
||||
println!("Client Version: {}", client_version);
|
||||
|
||||
// Prepare Docker registration DTO
|
||||
let container_dto = if let Some(ref docker_manager) = docker_manager {
|
||||
docker_manager.create_registration_dto().await?
|
||||
} else {
|
||||
models::DockerRegistrationDto {
|
||||
server_id: 0,
|
||||
//container_count: 0, --- IGNORE ---
|
||||
containers: Vec::new(),
|
||||
}
|
||||
};
|
||||
let _ =
|
||||
api::broadcast_docker_containers(server_url, server_id, &mut container_dto.clone()).await?;
|
||||
|
||||
// Start background tasks
|
||||
// Start server listening for commands (only if Docker is available)
|
||||
let listening_handle = if let Some(ref docker_manager) = docker_manager {
|
||||
@@ -140,7 +151,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let docker_manager = docker_manager.as_ref().cloned().unwrap();
|
||||
async move {
|
||||
let mut collector = metrics::Collector::new(server_id, ip, docker_manager);
|
||||
collector.run(&server_url).await
|
||||
if let Err(e) = collector.run(&server_url).await {
|
||||
eprintln!("Metrics collection error: {}", e);
|
||||
// Don't panic, just return the error
|
||||
Err(e)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -10,10 +10,9 @@
|
||||
/// ## Usage
|
||||
/// The [`Collector`] struct is instantiated in the main loop and runs as a background task, continuously collecting and reporting metrics.
|
||||
use std::error::Error;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::{api, docker};
|
||||
use crate::api;
|
||||
use crate::docker::DockerManager;
|
||||
//use crate::docker::DockerInfo;
|
||||
use crate::hardware::network::NetworkMonitor;
|
||||
@@ -129,10 +128,7 @@ impl Collector {
|
||||
/// NOTE: This is a compilation-safe stub. Implement the Docker collection using your
|
||||
/// DockerManager API and container helpers when available.
|
||||
pub async fn docker_collect(&self) -> Result<DockerMetricDto, Box<dyn Error + Send + Sync>> {
|
||||
let metrics = self
|
||||
.docker_manager
|
||||
.collect_metrics()
|
||||
.await?;
|
||||
let metrics = self.docker_manager.collect_metrics().await?;
|
||||
Ok(DockerMetricDto {
|
||||
server_id: self.server_id,
|
||||
containers: metrics.containers,
|
||||
|
||||
@@ -182,13 +182,13 @@ pub struct Acknowledgment {
|
||||
/// - `image`: Docker image name (string)
|
||||
/// - `Name`: Container name (string)
|
||||
/// - `Status`: Container status ("running", "stopped", etc.)
|
||||
/// - `_net_in`: Network receive rate in **bytes per second (B/s)**
|
||||
/// - `_net_out`: Network transmit rate in **bytes per second (B/s)**
|
||||
/// - `_cpu_load`: CPU usage as a percentage (**0.0–100.0**)
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerRegistrationDto {
|
||||
/// Unique server identifier (integer)
|
||||
#[serde(rename = "Server_id")]
|
||||
pub server_id: u16,
|
||||
/// Number of currently running containers
|
||||
// pub container_count: usize, --- IGNORE ---
|
||||
/// json stringified array of DockerContainer
|
||||
///
|
||||
/// ## Json Example
|
||||
@@ -198,7 +198,8 @@ pub struct DockerRegistrationDto {
|
||||
/// id: unique container ID (first 12 hex digits)
|
||||
/// image: docker image name
|
||||
/// name: container name
|
||||
pub containers: String // Vec<DockerContainer>,
|
||||
#[serde(rename = "Containers")]
|
||||
pub containers: Vec<DockerContainer>, // Vec<DockerContainer>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
@@ -217,7 +218,33 @@ pub struct DockerMetricDto {
|
||||
/// network: network stats
|
||||
/// cpu: cpu stats
|
||||
/// ram: ram stats
|
||||
pub containers: String // Vec<DockerContainerInfo>,
|
||||
pub containers: String, // Vec<DockerContainerInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
|
||||
pub struct DockerCollectMetricDto {
|
||||
pub id: String,
|
||||
pub cpu: DockerContainerCpuDto,
|
||||
pub ram: DockerContainerRamDto,
|
||||
pub network: DockerContainerNetworkDto,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainerCpuDto {
|
||||
pub cpu_load: Option<f64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainerRamDto {
|
||||
pub ram_load: Option<f64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
|
||||
pub struct DockerContainerNetworkDto {
|
||||
pub net_in: Option<f64>,
|
||||
pub net_out: Option<f64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
@@ -232,6 +259,8 @@ pub struct DockerContainerInfo {
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainer {
|
||||
pub id: String,
|
||||
#[serde(default)]
|
||||
pub image: Option<String>,
|
||||
#[serde(default)]
|
||||
pub name: Option<String>,
|
||||
}
|
||||
Reference in New Issue
Block a user