Files
watcheragent/WatcherAgent/src/api.rs

365 lines
13 KiB
Rust

/// # API Module
///
/// This module provides all HTTP communication between WatcherAgent and the backend server.
///
/// ## Responsibilities
/// - **Registration:** Registers the agent with the backend and retrieves its server ID and IP address.
/// - **Heartbeat:** Periodically sends heartbeat signals to indicate liveness.
/// - **Metrics Reporting:** Sends collected hardware and network metrics to the backend.
/// - **Command Listening:** Polls for and executes remote commands from the backend (e.g., update image, restart container).
///
/// ## Usage
/// These functions are called from the main agent loop and background tasks. All network operations are asynchronous and robust to transient failures.
use std::time::Duration;
use crate::docker::serverclientcomm::handle_server_message;
use crate::hardware::HardwareInfo;
use crate::models::{
Acknowledgment, HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage,
};
use anyhow::Result;
use reqwest::{Client, StatusCode};
use std::error::Error;
use tokio::time::sleep;
use bollard::Docker;
/// Registers this agent with the backend server and retrieves its server ID and IP address.
///
/// This function collects local hardware information, prepares a registration payload, and sends it to the backend. It will retry registration until successful, handling network errors and server-side failures gracefully.
///
/// # Arguments
/// * `base_url` - The base URL of the backend server (e.g., `https://server.example.com`).
///
/// # Returns
/// * `Result<(i32, String), Box<dyn Error + Send + Sync>>` - Tuple of server ID and registered IP address on success.
///
/// # Errors
/// Returns an error if unable to register after repeated attempts.
pub async fn register_with_server(
base_url: &str,
) -> Result<(i32, String), Box<dyn Error + Send + Sync>> {
// First get local IP
let ip = local_ip_address::local_ip()?.to_string();
println!("Local IP address detected: {}", ip);
// Get server ID from backend (this will retry until successful)
let (server_id, registered_ip) = get_server_id_by_ip(base_url, &ip).await?;
// Create HTTP client for registration
let client = Client::builder()
.danger_accept_invalid_certs(true)
.build()?;
// Collect hardware info
let hardware = HardwareInfo::collect().await?;
// Prepare registration data
let registration = RegistrationDto {
server_id: server_id,
ip_address: registered_ip.clone(),
cpu_type: hardware.cpu.name.clone().unwrap_or_default(),
cpu_cores: (hardware.cpu.cores).unwrap_or_default(),
gpu_type: hardware.gpu.name.clone().unwrap_or_default(),
ram_size: hardware.memory.total_size.unwrap_or_default(),
};
// Try to register (will retry on failure)
loop {
println!("Attempting to register with server...");
let url = format!("{}/monitoring/hardware-info", base_url);
match client.post(&url).json(&registration).send().await {
Ok(resp) if resp.status().is_success() => {
println!("✅ Successfully registered with server.");
return Ok((server_id, registered_ip));
}
Ok(resp) => {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
println!(
"⚠️ Registration failed ({}): {} (will retry in 10 seconds)",
status, text
);
}
Err(err) => {
println!("⚠️ Registration error: {} (will retry in 10 seconds)", err);
}
}
sleep(Duration::from_secs(10)).await;
}
}
/// Looks up the server ID for the given IP address from the backend server.
///
/// This function will retry until a valid response is received, handling network errors and server-side failures.
///
/// # Arguments
/// * `base_url` - The base URL of the backend server.
/// * `ip` - The local IP address to look up.
///
/// # Returns
/// * `Result<(i32, String), Box<dyn Error + Send + Sync>>` - Tuple of server ID and registered IP address.
async fn get_server_id_by_ip(
base_url: &str,
ip: &str,
) -> Result<(i32, String), Box<dyn Error + Send + Sync>> {
let client = Client::builder()
.danger_accept_invalid_certs(true)
.build()?;
let url = format!("{}/monitoring/register?ipAddress={}", base_url, ip);
loop {
println!("Attempting to fetch server ID for IP {}...", ip);
match client.get(&url).send().await {
Ok(resp) if resp.status().is_success() => {
let text = resp.text().await?;
println!("Raw response: {}", text); // Debug output
let id_resp: IdResponse = serde_json::from_str(&text).map_err(|e| {
println!("Failed to parse response: {}", e);
e
})?;
println!(
"✅ Received ID {} for IP {}",
id_resp.id, id_resp.ip_address
);
return Ok((id_resp.id, id_resp.ip_address));
}
Ok(resp) if resp.status() == StatusCode::NOT_FOUND => {
println!(
"❌ Server with IP {} not found in database (will retry in 10 seconds)",
ip
);
sleep(Duration::from_secs(10)).await;
}
Ok(resp) => {
println!(
"⚠️ Server responded with status: {} - {}",
resp.status(),
resp.text().await?
);
sleep(Duration::from_secs(10)).await;
}
Err(err) => {
println!("⚠️ Request failed: {} (will retry in 10 seconds)", err);
sleep(Duration::from_secs(10)).await;
}
}
}
}
/// Periodically sends heartbeat signals to the backend server to indicate agent liveness.
///
/// This function runs in a background task and will retry on network errors.
///
/// # Arguments
/// * `base_url` - The base URL of the backend server.
/// * `ip` - The IP address of the agent.
///
/// # Returns
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if heartbeats are sent successfully.
pub async fn heartbeat_loop(base_url: &str, ip: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
let client = Client::builder()
.danger_accept_invalid_certs(true)
.build()?;
let url = format!("{}/heartbeat/receive", base_url);
loop {
let payload = HeartbeatDto {
ip_address: ip.to_string(),
};
match client.post(&url).json(&payload).send().await {
Ok(res) if res.status().is_success() => {
println!("✅ Heartbeat sent successfully.");
}
Ok(res) => eprintln!("Server responded with status: {}", res.status()),
Err(e) => eprintln!("Heartbeat error: {}", e),
}
sleep(Duration::from_secs(20)).await;
}
}
/// Sends collected hardware and network metrics to the backend server.
///
/// This function is called periodically from the metrics collection loop. It logs the result and retries on network errors.
///
/// # Arguments
/// * `base_url` - The base URL of the backend server.
/// * `metrics` - The metrics data to send (see [`MetricDto`]).
///
/// # Returns
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if metrics are sent successfully.
pub async fn send_metrics(
base_url: &str,
metrics: &MetricDto,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let client = Client::new();
let url = format!("{}/monitoring/metric", base_url);
println!("Metrics: {:?}", metrics);
match client.post(&url).json(&metrics).send().await {
Ok(res) => println!(
"✅ Sent metrics for server {} | Status: {}",
metrics.server_id,
res.status()
),
Err(err) => eprintln!("❌ Failed to send metrics: {}", err),
}
Ok(())
}
/// Polls the backend server for remote commands and executes them.
///
/// This function runs in a background task, polling the server for new messages. It acknowledges receipt and execution of each command, and handles errors gracefully.
///
/// # Arguments
/// * `docker` - Reference to a Bollard Docker client.
/// * `base_url` - The base URL of the backend server.
///
/// # Returns
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if commands are handled successfully.
pub async fn listening_to_server(
docker: &Docker,
base_url: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let url = format!("{}/api/message", base_url);
let client = reqwest::Client::new();
loop {
// Get message from server
let resp = client.get(&url).send().await;
match resp {
Ok(response) => {
if response.status().is_success() {
match response.json::<ServerMessage>().await {
Ok(msg) => {
// Acknowledge receipt immediately
if let Err(e) = send_acknowledgment(
&client,
base_url,
&msg.message_id,
"received",
"Message received successfully",
)
.await
{
eprintln!("Failed to send receipt acknowledgment: {}", e);
}
// Handle the message
let result = handle_server_message(docker, msg.clone()).await;
// Send execution result acknowledgment
let (status, details) = match result {
Ok(_) => ("success", "Message executed successfully".to_string()),
Err(e) => ("error", format!("Execution failed: {}", e)),
};
if let Err(e) = send_acknowledgment(
&client,
base_url,
&msg.message_id,
status,
&details,
)
.await
{
eprintln!("Failed to send execution acknowledgment: {}", e);
}
}
Err(e) => {
eprintln!("Failed to parse message: {}", e);
}
}
} else if response.status() == reqwest::StatusCode::NO_CONTENT {
// No new messages, continue polling
println!("No new messages from server");
} else {
eprintln!("Server returned error status: {}", response.status());
}
}
Err(e) => {
eprintln!("Failed to reach server: {}", e);
}
}
// Poll every 5 seconds (or use WebSocket for real-time)
sleep(Duration::from_secs(5)).await;
}
}
/// Sends an acknowledgment to the backend server for a received or executed command message.
///
/// This function is used internally by [`listening_to_server`] to confirm receipt and execution status of commands.
///
/// # Arguments
/// * `client` - Reference to a reqwest HTTP client.
/// * `base_url` - The base URL of the backend server.
/// * `message_id` - The ID of the message being acknowledged.
/// * `status` - Status string (e.g., "received", "success", "error").
/// * `details` - Additional details about the acknowledgment.
///
/// # Returns
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if acknowledgment is sent successfully.
pub async fn send_acknowledgment(
client: &reqwest::Client,
base_url: &str,
message_id: &str,
status: &str,
details: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let ack_url = format!("{}/api/acknowledge", base_url);
let acknowledgment = Acknowledgment {
message_id: message_id.to_string(),
status: status.to_string(),
details: details.to_string(),
};
let response = client.post(&ack_url).json(&acknowledgment).send().await?;
if response.status().is_success() {
println!(
"Acknowledgment sent successfully for message {}",
message_id
);
} else {
eprintln!(
"Server returned error for acknowledgment: {}",
response.status()
);
}
Ok(())
}
pub async fn send_docker_metrics(
base_url: &str,
docker_metrics: &MetricDto,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let client = Client::new();
let url = format!("{}/monitoring/docker-metric", base_url);
println!("Metrics: {:?}", docker_metrics);
match client.post(&url).json(&docker_metrics).send().await {
Ok(res) => println!(
"✅ Sent metrics for server {} | Status: {}",
docker_metrics.server_id,
res.status()
),
Err(err) => eprintln!("❌ Failed to send metrics: {}", err),
}
Ok(())
}
pub async fn broadcast_docker_containers() {
// Placeholder for future implementation
}