Compare commits
7 Commits
1c7a169956
...
v0.1.27
| Author | SHA1 | Date | |
|---|---|---|---|
| dc4c23f9d9 | |||
| 3182d57539 | |||
| 8c1ef7f9f6 | |||
| 16020eea50 | |||
| 432a798210 | |||
| a095444222 | |||
| 5e7bc3df54 |
@@ -12,7 +12,6 @@
|
|||||||
/// These functions are called from the main agent loop and background tasks. All network operations are asynchronous and robust to transient failures.
|
/// These functions are called from the main agent loop and background tasks. All network operations are asynchronous and robust to transient failures.
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::docker::container;
|
|
||||||
use crate::docker::serverclientcomm::handle_server_message;
|
use crate::docker::serverclientcomm::handle_server_message;
|
||||||
use crate::hardware::HardwareInfo;
|
use crate::hardware::HardwareInfo;
|
||||||
use crate::models::{
|
use crate::models::{
|
||||||
@@ -153,11 +152,46 @@ async fn get_server_id_by_ip(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Broadcasts Docker container information to the monitoring server for service discovery.
|
||||||
|
///
|
||||||
|
/// This function sends the current Docker container configuration to the server
|
||||||
|
/// to register available containers and enable service monitoring. It will
|
||||||
|
/// continuously retry until successful, making it suitable for initial
|
||||||
|
/// registration scenarios.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `base_url` - The base URL of the monitoring server API (e.g., "https://monitoring.example.com")
|
||||||
|
/// * `server_id` - The ID of the server to associate the containers with
|
||||||
|
/// * `container_dto` - Mutable reference to Docker container information for broadcast
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// * `Ok(())` - When container information is successfully broadcasted to the server
|
||||||
|
/// * `Err(Box<dyn Error + Send + Sync>)` - If an unrecoverable error occurs (though the function typically retries on transient failures)
|
||||||
|
///
|
||||||
|
/// # Behavior
|
||||||
|
///
|
||||||
|
/// This function operates in a retry loop with the following characteristics:
|
||||||
|
///
|
||||||
|
/// - **Retry Logic**: Attempts broadcast every 10 seconds until successful
|
||||||
|
/// - **Mutation**: Modifies the `container_dto` to set the `server_id` before sending
|
||||||
|
/// - **TLS**: Accepts invalid TLS certificates for development environments
|
||||||
|
/// - **Logging**: Provides detailed console output about broadcast attempts and results
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// This function may return an error in the following cases:
|
||||||
|
///
|
||||||
|
/// * **HTTP Client Creation**: Failed to create HTTP client with TLS configuration
|
||||||
|
/// * **Network Issues**: Persistent connection failures to the backend server
|
||||||
|
/// * **Server Errors**: Backend returns non-success HTTP status codes repeatedly
|
||||||
|
/// * **JSON Serialization**: Cannot serialize container data (should be rare with proper DTOs)
|
||||||
pub async fn broadcast_docker_containers(
|
pub async fn broadcast_docker_containers(
|
||||||
base_url: &str,
|
base_url: &str,
|
||||||
server_id: u16,
|
server_id: u16,
|
||||||
container_dto: &mut DockerRegistrationDto,
|
container_dto: &DockerRegistrationDto,
|
||||||
) -> Result<(u16, String), Box<dyn Error + Send + Sync>> {
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
// First get local IP
|
// First get local IP
|
||||||
println!("Preparing to broadcast docker containers...");
|
println!("Preparing to broadcast docker containers...");
|
||||||
// Create HTTP client for registration
|
// Create HTTP client for registration
|
||||||
@@ -166,8 +200,8 @@ pub async fn broadcast_docker_containers(
|
|||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
// Prepare registration data
|
// Prepare registration data
|
||||||
let container_dto = container_dto;
|
let mut broadcast_data = container_dto.clone();
|
||||||
container_dto.server_id = server_id;
|
broadcast_data.server_id = server_id;
|
||||||
|
|
||||||
// Try to register (will retry on failure)
|
// Try to register (will retry on failure)
|
||||||
loop {
|
loop {
|
||||||
@@ -175,7 +209,11 @@ pub async fn broadcast_docker_containers(
|
|||||||
let url = format!("{}/monitoring/service-discovery", base_url);
|
let url = format!("{}/monitoring/service-discovery", base_url);
|
||||||
match client.post(&url).json(&container_dto).send().await {
|
match client.post(&url).json(&container_dto).send().await {
|
||||||
Ok(resp) if resp.status().is_success() => {
|
Ok(resp) if resp.status().is_success() => {
|
||||||
println!("✅ Successfully broadcasted docker container.");
|
println!(
|
||||||
|
"✅ Successfully broadcasted following docker container: {:?}",
|
||||||
|
container_dto
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
let status = resp.status();
|
let status = resp.status();
|
||||||
|
|||||||
@@ -159,43 +159,58 @@ impl DockerManager {
|
|||||||
|
|
||||||
let container_infos: Vec<DockerCollectMetricDto> = container_infos_total
|
let container_infos: Vec<DockerCollectMetricDto> = container_infos_total
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|info| DockerCollectMetricDto {
|
.filter_map(|info| {
|
||||||
id: Some(info.container.unwrap().id).unwrap_or("".to_string()),
|
// Safely handle container extraction
|
||||||
cpu: info
|
let container = match info.container {
|
||||||
.cpu
|
Some(c) => c,
|
||||||
.unwrap()
|
None => {
|
||||||
.cpu_usage_percent
|
eprintln!("Warning: Container info missing container data, skipping");
|
||||||
.map(|load| DockerContainerCpuDto {
|
return None;
|
||||||
cpu_load: Some(load),
|
}
|
||||||
})
|
};
|
||||||
.unwrap_or(DockerContainerCpuDto { cpu_load: None }),
|
|
||||||
ram: info
|
// Safely handle CPU data with defaults
|
||||||
.ram
|
let cpu_dto = if let Some(cpu) = info.cpu {
|
||||||
.unwrap()
|
DockerContainerCpuDto {
|
||||||
.memory_usage_percent
|
cpu_load: cpu.cpu_usage_percent,
|
||||||
.map(|load| DockerContainerRamDto {
|
}
|
||||||
cpu_load: Some(load),
|
} else {
|
||||||
})
|
DockerContainerCpuDto { cpu_load: None }
|
||||||
.unwrap_or(DockerContainerRamDto { cpu_load: None }),
|
};
|
||||||
network: DockerContainerNetworkDto {
|
|
||||||
net_in: info
|
// Safely handle RAM data with defaults
|
||||||
.network
|
let ram_dto = if let Some(ram) = info.ram {
|
||||||
.as_ref()
|
DockerContainerRamDto {
|
||||||
.unwrap()
|
ram_load: ram.memory_usage_percent,
|
||||||
.rx_bytes
|
}
|
||||||
.map(|bytes| bytes as f64)
|
} else {
|
||||||
.or(Some(0.0)),
|
DockerContainerRamDto { ram_load: None }
|
||||||
net_out: info
|
};
|
||||||
.network
|
|
||||||
.unwrap()
|
// Safely handle network data with defaults
|
||||||
.tx_bytes
|
let network_dto = if let Some(net) = info.network {
|
||||||
.map(|bytes| bytes as f64)
|
DockerContainerNetworkDto {
|
||||||
.or(Some(0.0)),
|
net_in: net.rx_bytes.map(|bytes| bytes as f64).or(Some(0.0)),
|
||||||
},
|
net_out: net.tx_bytes.map(|bytes| bytes as f64).or(Some(0.0)),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
DockerContainerNetworkDto {
|
||||||
|
net_in: Some(0.0),
|
||||||
|
net_out: Some(0.0),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(DockerCollectMetricDto {
|
||||||
|
id: container.id,
|
||||||
|
cpu: cpu_dto,
|
||||||
|
ram: ram_dto,
|
||||||
|
network: network_dto,
|
||||||
|
})
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let dto = DockerMetricDto {
|
let dto = DockerMetricDto {
|
||||||
server_id: 0,
|
server_id: 0, // This should be set by the caller
|
||||||
containers: serde_json::to_string(&container_infos)?,
|
containers: serde_json::to_string(&container_infos)?,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -207,9 +222,9 @@ impl DockerManager {
|
|||||||
) -> Result<DockerRegistrationDto, Box<dyn Error + Send + Sync>> {
|
) -> Result<DockerRegistrationDto, Box<dyn Error + Send + Sync>> {
|
||||||
let containers = self.get_containers().await?;
|
let containers = self.get_containers().await?;
|
||||||
let dto = DockerRegistrationDto {
|
let dto = DockerRegistrationDto {
|
||||||
server_id: 0,
|
server_id: 0, // This will be set by the caller
|
||||||
//container_count,
|
containers: serde_json::to_string(&containers)
|
||||||
containers: serde_json::to_string(&containers)?,
|
.unwrap_or_else(|_| "[]".to_string()), // Fallback to empty array
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(dto)
|
Ok(dto)
|
||||||
|
|||||||
@@ -31,10 +31,8 @@ pub mod hardware;
|
|||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
pub mod models;
|
pub mod models;
|
||||||
|
|
||||||
use bollard::Docker;
|
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
/// Awaits a spawned asynchronous task and flattens its nested `Result` type.
|
/// Awaits a spawned asynchronous task and flattens its nested `Result` type.
|
||||||
@@ -153,7 +151,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
|||||||
let docker_manager = docker_manager.as_ref().cloned().unwrap();
|
let docker_manager = docker_manager.as_ref().cloned().unwrap();
|
||||||
async move {
|
async move {
|
||||||
let mut collector = metrics::Collector::new(server_id, ip, docker_manager);
|
let mut collector = metrics::Collector::new(server_id, ip, docker_manager);
|
||||||
collector.run(&server_url).await
|
if let Err(e) = collector.run(&server_url).await {
|
||||||
|
eprintln!("Metrics collection error: {}", e);
|
||||||
|
// Don't panic, just return the error
|
||||||
|
Err(e)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -182,9 +182,6 @@ pub struct Acknowledgment {
|
|||||||
/// - `image`: Docker image name (string)
|
/// - `image`: Docker image name (string)
|
||||||
/// - `Name`: Container name (string)
|
/// - `Name`: Container name (string)
|
||||||
/// - `Status`: Container status ("running", "stopped", etc.)
|
/// - `Status`: Container status ("running", "stopped", etc.)
|
||||||
/// - `_net_in`: Network receive rate in **bytes per second (B/s)**
|
|
||||||
/// - `_net_out`: Network transmit rate in **bytes per second (B/s)**
|
|
||||||
/// - `_cpu_load`: CPU usage as a percentage (**0.0–100.0**)
|
|
||||||
#[derive(Debug, Serialize, Clone)]
|
#[derive(Debug, Serialize, Clone)]
|
||||||
pub struct DockerRegistrationDto {
|
pub struct DockerRegistrationDto {
|
||||||
/// Unique server identifier (integer)
|
/// Unique server identifier (integer)
|
||||||
@@ -238,7 +235,7 @@ pub struct DockerContainerCpuDto {
|
|||||||
|
|
||||||
#[derive(Debug, Serialize, Clone)]
|
#[derive(Debug, Serialize, Clone)]
|
||||||
pub struct DockerContainerRamDto {
|
pub struct DockerContainerRamDto {
|
||||||
pub cpu_load: Option<f64>,
|
pub ram_load: Option<f64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Clone)]
|
#[derive(Debug, Serialize, Clone)]
|
||||||
|
|||||||
Reference in New Issue
Block a user