Compare commits
31 Commits
Author | SHA1 | Date | |
---|---|---|---|
bb55b46c34 | |||
76d54cb433 | |||
4dc8c56a5c | |||
c81040b16b | |||
68c307e258 | |||
5d00f072d8 | |||
9072e253ec | |||
063ad113d7 | |||
097be0f555 | |||
c7b8e24a54 | |||
fb1d016b36 | |||
89d58e9696 | |||
45c6e8180c | |||
c395885bbd | |||
75494e2a71 | |||
8e6e291f6a | |||
bfeb43f38a | |||
2f5e2391f7 | |||
820952089a | |||
c745125f20 | |||
758fa7608f | |||
ee6b947f29 | |||
18dd1ef528 | |||
8fa7866cc2 | |||
238ad87119 | |||
d584e21fd9 | |||
ac79a2e0b7 | |||
1798c1270b | |||
97d1019b69 | |||
e30ceb75d9 | |||
4681e0c694 |
@@ -3,10 +3,8 @@ name: Rust Cross-Platform Build
|
|||||||
on:
|
on:
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
push:
|
push:
|
||||||
branches: [ "development", "main", "feature/*", "bugfix/*", "enhancement/*" ]
|
branches: [ "development", "main", "staging" ]
|
||||||
tags: [ "v*.*.*" ]
|
tags: [ "v*.*.*" ]
|
||||||
pull_request:
|
|
||||||
branches: [ "development", "main" ]
|
|
||||||
|
|
||||||
env:
|
env:
|
||||||
REGISTRY: git.triggermeelmo.com
|
REGISTRY: git.triggermeelmo.com
|
||||||
@@ -76,44 +74,6 @@ jobs:
|
|||||||
# working-directory: ${{ needs.detect-project.outputs.project-dir }}
|
# working-directory: ${{ needs.detect-project.outputs.project-dir }}
|
||||||
# run: cargo clippy -- -D warnings
|
# run: cargo clippy -- -D warnings
|
||||||
|
|
||||||
set-tag:
|
|
||||||
name: Set Tag Name
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
outputs:
|
|
||||||
tag_name: ${{ steps.set_tag.outputs.tag_name }}
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
|
|
||||||
- name: Determine next semantic version tag
|
|
||||||
id: set_tag
|
|
||||||
run: |
|
|
||||||
git fetch --tags
|
|
||||||
|
|
||||||
# Find latest tag matching vX.Y.Z
|
|
||||||
latest_tag=$(git tag --list 'v*.*.*' --sort=-v:refname | head -n 1)
|
|
||||||
if [[ -z "$latest_tag" ]]; then
|
|
||||||
major=0
|
|
||||||
minor=0
|
|
||||||
patch=0
|
|
||||||
else
|
|
||||||
version="${latest_tag#v}"
|
|
||||||
IFS='.' read -r major minor patch <<< "$version"
|
|
||||||
fi
|
|
||||||
|
|
||||||
if [[ "${GITHUB_REF}" == "refs/heads/main" ]]; then
|
|
||||||
major=$((major + 1))
|
|
||||||
minor=0
|
|
||||||
patch=0
|
|
||||||
elif [[ "${GITHUB_REF}" == "refs/heads/development" ]]; then
|
|
||||||
minor=$((minor + 1))
|
|
||||||
patch=0
|
|
||||||
else
|
|
||||||
patch=$((patch + 1))
|
|
||||||
fi
|
|
||||||
|
|
||||||
new_tag="v${major}.${minor}.${patch}"
|
|
||||||
echo "tag_name=${new_tag}" >> $GITHUB_OUTPUT
|
|
||||||
|
|
||||||
# audit:
|
# audit:
|
||||||
# name: Security Audit
|
# name: Security Audit
|
||||||
# needs: [detect-project]
|
# needs: [detect-project]
|
||||||
@@ -189,13 +149,65 @@ jobs:
|
|||||||
path: |
|
path: |
|
||||||
${{ needs.detect-project.outputs.project-dir }}/target/${{ matrix.target }}/release/${{ needs.detect-project.outputs.project-name }}${{ matrix.os == 'windows' && '.exe' || '' }}
|
${{ needs.detect-project.outputs.project-dir }}/target/${{ matrix.target }}/release/${{ needs.detect-project.outputs.project-name }}${{ matrix.os == 'windows' && '.exe' || '' }}
|
||||||
|
|
||||||
|
set-tag:
|
||||||
|
name: Set Tag Name
|
||||||
|
needs: [detect-project, build]
|
||||||
|
#if: ${{ !failure() && !cancelled() && github.event_name != 'pull_request' }}
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
outputs:
|
||||||
|
tag_name: ${{ steps.set_tag.outputs.tag_name }}
|
||||||
|
should_tag: ${{ steps.set_tag.outputs.should_tag }}
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Determine next semantic version tag
|
||||||
|
id: set_tag
|
||||||
|
run: |
|
||||||
|
git fetch --tags
|
||||||
|
|
||||||
|
# Find latest tag matching vX.Y.Z
|
||||||
|
latest_tag=$(git tag --list 'v*.*.*' --sort=-v:refname | head -n 1)
|
||||||
|
if [[ -z "$latest_tag" ]]; then
|
||||||
|
major=0
|
||||||
|
minor=0
|
||||||
|
patch=0
|
||||||
|
else
|
||||||
|
version="${latest_tag#v}"
|
||||||
|
IFS='.' read -r major minor patch <<< "$version"
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ "${GITHUB_REF}" == "refs/heads/main" ]]; then
|
||||||
|
major=$((major + 1))
|
||||||
|
minor=0
|
||||||
|
patch=0
|
||||||
|
new_tag="v${major}.${minor}.${patch}"
|
||||||
|
echo "tag_name=${new_tag}" >> $GITHUB_OUTPUT
|
||||||
|
echo "should_tag=true" >> $GITHUB_OUTPUT
|
||||||
|
echo "Creating new major version tag: ${new_tag}"
|
||||||
|
|
||||||
|
elif [[ "${GITHUB_REF}" == "refs/heads/development" ]]; then
|
||||||
|
minor=$((minor + 1))
|
||||||
|
patch=0
|
||||||
|
new_tag="v${major}.${minor}.${patch}"
|
||||||
|
echo "tag_name=${new_tag}" >> $GITHUB_OUTPUT
|
||||||
|
echo "should_tag=true" >> $GITHUB_OUTPUT
|
||||||
|
echo "Creating new minor version tag: ${new_tag}"
|
||||||
|
|
||||||
|
elif [[ "${GITHUB_REF}" == "refs/heads/staging" ]]; then
|
||||||
|
patch=$((patch + 1))
|
||||||
|
new_tag="v${major}.${minor}.${patch}"
|
||||||
|
echo "tag_name=${new_tag}" >> $GITHUB_OUTPUT
|
||||||
|
echo "should_tag=true" >> $GITHUB_OUTPUT
|
||||||
|
echo "Creating new patch version tag: ${new_tag}"
|
||||||
|
fi
|
||||||
|
|
||||||
docker-build:
|
docker-build:
|
||||||
name: Build and Push Docker Image
|
name: Build and Push Docker Image
|
||||||
needs: [detect-project, build, set-tag]
|
needs: [detect-project, build, set-tag]
|
||||||
if: |
|
if: |
|
||||||
always() &&
|
|
||||||
needs.detect-project.result == 'success' &&
|
needs.detect-project.result == 'success' &&
|
||||||
needs.build.result == 'success' &&
|
needs.build.result == 'success' &&
|
||||||
|
needs.set-tag.outputs.should_tag == 'true' &&
|
||||||
github.event_name != 'pull_request'
|
github.event_name != 'pull_request'
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
environment: production
|
environment: production
|
||||||
@@ -233,9 +245,6 @@ jobs:
|
|||||||
tag:
|
tag:
|
||||||
name: Create Tag
|
name: Create Tag
|
||||||
needs: [docker-build, build, set-tag]
|
needs: [docker-build, build, set-tag]
|
||||||
if: |
|
|
||||||
github.event_name == 'push' &&
|
|
||||||
needs.docker-build.result == 'success'
|
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
@@ -251,13 +260,13 @@ jobs:
|
|||||||
env:
|
env:
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||||
run: |
|
run: |
|
||||||
|
echo "Creating new tag: ${{ needs.set-tag.outputs.tag_name }}"
|
||||||
git tag ${{ needs.set-tag.outputs.tag_name }}
|
git tag ${{ needs.set-tag.outputs.tag_name }}
|
||||||
git push origin ${{ needs.set-tag.outputs.tag_name }}
|
git push origin ${{ needs.set-tag.outputs.tag_name }}
|
||||||
|
|
||||||
|
|
||||||
summary:
|
summary:
|
||||||
name: Workflow Summary
|
name: Workflow Summary
|
||||||
needs: [test, audit, build, docker-build]
|
needs: [test, build, docker-build]
|
||||||
if: always()
|
if: always()
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
@@ -1,5 +1,3 @@
|
|||||||
|
|
||||||
|
|
||||||
/// # API Module
|
/// # API Module
|
||||||
///
|
///
|
||||||
/// This module provides all HTTP communication between WatcherAgent and the backend server.
|
/// This module provides all HTTP communication between WatcherAgent and the backend server.
|
||||||
@@ -14,9 +12,11 @@
|
|||||||
/// These functions are called from the main agent loop and background tasks. All network operations are asynchronous and robust to transient failures.
|
/// These functions are called from the main agent loop and background tasks. All network operations are asynchronous and robust to transient failures.
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::hardware::HardwareInfo;
|
|
||||||
use crate::models::{HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage, Acknowledgment};
|
|
||||||
use crate::docker::serverclientcomm::handle_server_message;
|
use crate::docker::serverclientcomm::handle_server_message;
|
||||||
|
use crate::hardware::HardwareInfo;
|
||||||
|
use crate::models::{
|
||||||
|
Acknowledgment, HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage,
|
||||||
|
};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use reqwest::{Client, StatusCode};
|
use reqwest::{Client, StatusCode};
|
||||||
@@ -57,7 +57,7 @@ pub async fn register_with_server(
|
|||||||
|
|
||||||
// Prepare registration data
|
// Prepare registration data
|
||||||
let registration = RegistrationDto {
|
let registration = RegistrationDto {
|
||||||
id: server_id,
|
server_id: server_id,
|
||||||
ip_address: registered_ip.clone(),
|
ip_address: registered_ip.clone(),
|
||||||
cpu_type: hardware.cpu.name.clone().unwrap_or_default(),
|
cpu_type: hardware.cpu.name.clone().unwrap_or_default(),
|
||||||
cpu_cores: (hardware.cpu.cores).unwrap_or_default(),
|
cpu_cores: (hardware.cpu.cores).unwrap_or_default(),
|
||||||
@@ -68,7 +68,7 @@ pub async fn register_with_server(
|
|||||||
// Try to register (will retry on failure)
|
// Try to register (will retry on failure)
|
||||||
loop {
|
loop {
|
||||||
println!("Attempting to register with server...");
|
println!("Attempting to register with server...");
|
||||||
let url = format!("{}/monitoring/register-agent-by-id", base_url);
|
let url = format!("{}/monitoring/hardware-info", base_url);
|
||||||
match client.post(&url).json(®istration).send().await {
|
match client.post(&url).json(®istration).send().await {
|
||||||
Ok(resp) if resp.status().is_success() => {
|
Ok(resp) if resp.status().is_success() => {
|
||||||
println!("✅ Successfully registered with server.");
|
println!("✅ Successfully registered with server.");
|
||||||
@@ -108,7 +108,7 @@ async fn get_server_id_by_ip(
|
|||||||
.danger_accept_invalid_certs(true)
|
.danger_accept_invalid_certs(true)
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
let url = format!("{}/monitoring/server-id-by-ip?ipAddress={}", base_url, ip);
|
let url = format!("{}/monitoring/register?ipAddress={}", base_url, ip);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
println!("Attempting to fetch server ID for IP {}...", ip);
|
println!("Attempting to fetch server ID for IP {}...", ip);
|
||||||
@@ -224,10 +224,13 @@ pub async fn send_metrics(
|
|||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if commands are handled successfully.
|
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if commands are handled successfully.
|
||||||
pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
|
pub async fn listening_to_server(
|
||||||
|
docker: &Docker,
|
||||||
|
base_url: &str,
|
||||||
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
let url = format!("{}/api/message", base_url);
|
let url = format!("{}/api/message", base_url);
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Get message from server
|
// Get message from server
|
||||||
let resp = client.get(&url).send().await;
|
let resp = client.get(&url).send().await;
|
||||||
@@ -238,20 +241,36 @@ pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(),
|
|||||||
match response.json::<ServerMessage>().await {
|
match response.json::<ServerMessage>().await {
|
||||||
Ok(msg) => {
|
Ok(msg) => {
|
||||||
// Acknowledge receipt immediately
|
// Acknowledge receipt immediately
|
||||||
if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, "received", "Message received successfully").await {
|
if let Err(e) = send_acknowledgment(
|
||||||
|
&client,
|
||||||
|
base_url,
|
||||||
|
&msg.message_id,
|
||||||
|
"received",
|
||||||
|
"Message received successfully",
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
eprintln!("Failed to send receipt acknowledgment: {}", e);
|
eprintln!("Failed to send receipt acknowledgment: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle the message
|
// Handle the message
|
||||||
let result = handle_server_message(docker, msg.clone()).await;
|
let result = handle_server_message(docker, msg.clone()).await;
|
||||||
|
|
||||||
// Send execution result acknowledgment
|
// Send execution result acknowledgment
|
||||||
let (status, details) = match result {
|
let (status, details) = match result {
|
||||||
Ok(_) => ("success", "Message executed successfully".to_string()),
|
Ok(_) => ("success", "Message executed successfully".to_string()),
|
||||||
Err(e) => ("error", format!("Execution failed: {}", e)),
|
Err(e) => ("error", format!("Execution failed: {}", e)),
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, status, &details).await {
|
if let Err(e) = send_acknowledgment(
|
||||||
|
&client,
|
||||||
|
base_url,
|
||||||
|
&msg.message_id,
|
||||||
|
status,
|
||||||
|
&details,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
eprintln!("Failed to send execution acknowledgment: {}", e);
|
eprintln!("Failed to send execution acknowledgment: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -297,24 +316,26 @@ async fn send_acknowledgment(
|
|||||||
details: &str,
|
details: &str,
|
||||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
let ack_url = format!("{}/api/acknowledge", base_url);
|
let ack_url = format!("{}/api/acknowledge", base_url);
|
||||||
|
|
||||||
let acknowledgment = Acknowledgment {
|
let acknowledgment = Acknowledgment {
|
||||||
message_id: message_id.to_string(),
|
message_id: message_id.to_string(),
|
||||||
status: status.to_string(),
|
status: status.to_string(),
|
||||||
details: details.to_string(),
|
details: details.to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let response = client
|
let response = client.post(&ack_url).json(&acknowledgment).send().await?;
|
||||||
.post(&ack_url)
|
|
||||||
.json(&acknowledgment)
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if response.status().is_success() {
|
if response.status().is_success() {
|
||||||
println!("Acknowledgment sent successfully for message {}", message_id);
|
println!(
|
||||||
|
"Acknowledgment sent successfully for message {}",
|
||||||
|
message_id
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
eprintln!("Server returned error for acknowledgment: {}", response.status());
|
eprintln!(
|
||||||
|
"Server returned error for acknowledgment: {}",
|
||||||
|
response.status()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@@ -1,15 +1,17 @@
|
|||||||
|
|
||||||
//! Docker container utilities for WatcherAgent
|
//! Docker container utilities for WatcherAgent
|
||||||
//!
|
//!
|
||||||
//! Provides functions to list and process Docker containers using the Bollard library.
|
//! Provides functions to list and process Docker containers using the Bollard library.
|
||||||
//!
|
//!
|
||||||
use crate::models::DockerContainer;
|
use crate::docker::stats;
|
||||||
|
use crate::docker::stats::{ContainerCpuInfo, ContainerNetworkInfo};
|
||||||
|
use crate::models::{DockerContainerDto, DockerContainerRegistrationDto};
|
||||||
|
|
||||||
use bollard::query_parameters::{ListContainersOptions};
|
use bollard::query_parameters::{
|
||||||
|
CreateImageOptions, ListContainersOptions, RestartContainerOptions,
|
||||||
|
};
|
||||||
use bollard::Docker;
|
use bollard::Docker;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
|
||||||
/// Returns a list of available Docker containers.
|
/// Returns a list of available Docker containers.
|
||||||
///
|
///
|
||||||
@@ -18,54 +20,60 @@ use bollard::Docker;
|
|||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
/// * `Vec<DockerContainer>` - Vector of Docker container info.
|
/// * `Vec<DockerContainer>` - Vector of Docker container info.
|
||||||
pub async fn get_available_container(docker: &Docker) -> Vec<DockerContainer> {
|
pub async fn get_available_containers(docker: &Docker) -> Vec<DockerContainerDto> {
|
||||||
println!("=== DOCKER CONTAINER LIST ===");
|
println!("=== DOCKER CONTAINER LIST ===");
|
||||||
|
|
||||||
let options = Some(ListContainersOptions {
|
let options = Some(ListContainersOptions {
|
||||||
all: true,
|
all: true,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
});
|
});
|
||||||
|
|
||||||
let containers_list = match docker.list_containers(options).await {
|
let containers_list = match docker.list_containers(options).await {
|
||||||
Ok(containers) => {
|
Ok(containers) => {
|
||||||
println!("Available containers ({}):", containers.len());
|
println!("Available containers ({}):", containers.len());
|
||||||
containers.into_iter()
|
containers
|
||||||
|
.into_iter()
|
||||||
.filter_map(|container| {
|
.filter_map(|container| {
|
||||||
container.id.as_ref()?; // Skip if no ID
|
container.id.as_ref()?; // Skip if no ID
|
||||||
|
|
||||||
let id = container.id?;
|
|
||||||
let short_string_id = if id.len() > 12 { &id[..12] } else { &id };
|
|
||||||
let short_id: u32 = short_string_id.trim().parse().unwrap();
|
|
||||||
|
|
||||||
let name = container.names
|
let id = container.id?;
|
||||||
|
let short_id = if id.len() > 12 { &id[..12] } else { &id };
|
||||||
|
|
||||||
|
let name = container
|
||||||
|
.names
|
||||||
.and_then(|names| names.into_iter().next())
|
.and_then(|names| names.into_iter().next())
|
||||||
.map(|name| name.trim_start_matches('/').to_string())
|
.map(|name| name.trim_start_matches('/').to_string())
|
||||||
.unwrap_or_else(|| "unknown".to_string());
|
.unwrap_or_else(|| "unknown".to_string());
|
||||||
|
|
||||||
let image = container.image
|
let image = container
|
||||||
|
.image
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|img| img.to_string())
|
.map(|img| img.to_string())
|
||||||
.unwrap_or_else(|| "unknown".to_string());
|
.unwrap_or_else(|| "unknown".to_string());
|
||||||
|
|
||||||
let status = container.status
|
/*let status = container
|
||||||
.as_ref()
|
.status
|
||||||
.map(|s| match s.to_lowercase().as_str() {
|
.as_ref()
|
||||||
s if s.contains("up") || s.contains("running") => "running".to_string(),
|
.map(|s| match s.to_lowercase().as_str() {
|
||||||
s if s.contains("exited") || s.contains("stopped") => "stopped".to_string(),
|
s if s.contains("up") || s.contains("running") => "running".to_string(),
|
||||||
_ => s.to_string(),
|
s if s.contains("exited") || s.contains("stopped") => {
|
||||||
})
|
"stopped".to_string()
|
||||||
.unwrap_or_else(|| "unknown".to_string());
|
}
|
||||||
|
_ => s.to_string(),
|
||||||
println!(" - ID: {}, Image: {:?}, Name: {}", short_id, container.image, name);
|
})
|
||||||
|
.unwrap_or_else(|| "unknown".to_string());
|
||||||
Some(DockerContainer {
|
|
||||||
ID: short_id,
|
println!(
|
||||||
|
" - ID: {}, Image: {}, Name: {}",
|
||||||
|
short_id,
|
||||||
|
container.image.unwrap(),
|
||||||
|
name
|
||||||
|
);*/
|
||||||
|
|
||||||
|
Some(DockerContainerDto {
|
||||||
|
id: short_id.to_string(),
|
||||||
image,
|
image,
|
||||||
Name: name,
|
name: name,
|
||||||
Status: status,
|
|
||||||
_net_in: 0.0,
|
|
||||||
_net_out: 0.0,
|
|
||||||
_cpu_load: 0.0,
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
@@ -75,10 +83,96 @@ pub async fn get_available_container(docker: &Docker) -> Vec<DockerContainer> {
|
|||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
containers_list
|
containers_list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pulls a new Docker image and restarts the current container.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `docker` - Reference to a Bollard Docker client.
|
||||||
|
/// * `image` - The name of the Docker image to pull.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if updated successfully, error otherwise.
|
||||||
|
pub async fn update_docker_image(
|
||||||
|
docker: &Docker,
|
||||||
|
image: &str,
|
||||||
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
|
println!("Updating to {}", image);
|
||||||
|
|
||||||
|
// 1. Pull new image
|
||||||
|
let mut stream = docker.create_image(
|
||||||
|
Some(CreateImageOptions {
|
||||||
|
from_image: Some(image.to_string()),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Use the stream with proper trait bounds
|
||||||
|
while let Some(result) = StreamExt::next(&mut stream).await {
|
||||||
|
match result {
|
||||||
|
Ok(progress) => {
|
||||||
|
if let Some(status) = progress.status {
|
||||||
|
println!("Pull status: {}", status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Error pulling image: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Restart the current container
|
||||||
|
let options = Some(ListContainersOptions {
|
||||||
|
all: true,
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
let container_id = docker
|
||||||
|
.list_containers(options)
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.find_map(|c| {
|
||||||
|
c.image
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|img| if img == image { c.id } else { None })
|
||||||
|
});
|
||||||
|
let _ = restart_container(docker, &container_id.unwrap()).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Restarts the agent's own Docker container.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `docker` - Reference to a Bollard Docker client.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if restarted successfully, error otherwise.
|
||||||
|
pub async fn restart_container(
|
||||||
|
docker: &Docker,
|
||||||
|
container_id: &str,
|
||||||
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
|
println!("Restarting container {}", container_id);
|
||||||
|
if let Err(e) = docker
|
||||||
|
.restart_container(
|
||||||
|
&container_id.to_string(),
|
||||||
|
Some(RestartContainerOptions {
|
||||||
|
signal: None,
|
||||||
|
t: Some(0),
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
eprintln!("Failed to restart container: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
/// Extracts a Docker container ID from a string line.
|
/// Extracts a Docker container ID from a string line.
|
||||||
///
|
///
|
||||||
@@ -90,4 +184,48 @@ pub async fn get_available_container(docker: &Docker) -> Vec<DockerContainer> {
|
|||||||
pub fn extract_client_container_id(line: &str) -> Option<String> {
|
pub fn extract_client_container_id(line: &str) -> Option<String> {
|
||||||
// ...existing code...
|
// ...existing code...
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/// Gets network statistics for a specific container
|
||||||
|
pub async fn get_network_stats(
|
||||||
|
docker: &Docker,
|
||||||
|
container_id: &str,
|
||||||
|
) -> Result<ContainerNetworkInfo, Box<dyn Error + Send + Sync>> {
|
||||||
|
let (_, net_info) = stats::get_single_container_stats(docker, container_id).await?;
|
||||||
|
|
||||||
|
if let Some(net_info) = net_info {
|
||||||
|
Ok(net_info)
|
||||||
|
} else {
|
||||||
|
// Return default network info if not found
|
||||||
|
Ok(ContainerNetworkInfo {
|
||||||
|
container_id: container_id.to_string(),
|
||||||
|
rx_bytes: 0,
|
||||||
|
tx_bytes: 0,
|
||||||
|
rx_packets: 0,
|
||||||
|
tx_packets: 0,
|
||||||
|
rx_errors: 0,
|
||||||
|
tx_errors: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets CPU statistics for a specific container
|
||||||
|
pub async fn get_cpu_stats(
|
||||||
|
docker: &Docker,
|
||||||
|
container_id: &str,
|
||||||
|
) -> Result<ContainerCpuInfo, Box<dyn Error + Send + Sync>> {
|
||||||
|
let (cpu_info, _) = stats::get_single_container_stats(docker, container_id).await?;
|
||||||
|
|
||||||
|
if let Some(cpu_info) = cpu_info {
|
||||||
|
Ok(cpu_info)
|
||||||
|
} else {
|
||||||
|
// Return default CPU info if not found
|
||||||
|
Ok(ContainerCpuInfo {
|
||||||
|
container_id: container_id.to_string(),
|
||||||
|
cpu_usage_percent: 0.0,
|
||||||
|
system_cpu_usage: 0,
|
||||||
|
container_cpu_usage: 0,
|
||||||
|
online_cpus: 1,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -1,4 +1,3 @@
|
|||||||
|
|
||||||
/// # Docker Module
|
/// # Docker Module
|
||||||
///
|
///
|
||||||
/// This module provides Docker integration for WatcherAgent, including container enumeration, statistics, and lifecycle management.
|
/// This module provides Docker integration for WatcherAgent, including container enumeration, statistics, and lifecycle management.
|
||||||
@@ -10,71 +9,195 @@
|
|||||||
///
|
///
|
||||||
pub mod container;
|
pub mod container;
|
||||||
pub mod serverclientcomm;
|
pub mod serverclientcomm;
|
||||||
|
pub mod stats;
|
||||||
|
|
||||||
|
use crate::models::{DockerContainerDto, DockerContainerMetricDto};
|
||||||
|
use bollard::{query_parameters::InspectContainerOptions, Docker};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use crate::models::DockerContainer;
|
|
||||||
|
|
||||||
|
/// Main Docker manager that holds the Docker client and provides all operations
|
||||||
/// Aggregated Docker statistics for all managed containers.
|
|
||||||
///
|
|
||||||
/// # Fields
|
|
||||||
/// - `number`: Number of running containers (optional)
|
|
||||||
/// - `net_in_total`: Total network receive rate in **bytes per second (B/s)** (optional)
|
|
||||||
/// - `net_out_total`: Total network transmit rate in **bytes per second (B/s)** (optional)
|
|
||||||
/// - `dockers`: List of [`DockerContainer`] statistics (optional)
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct DockerInfo {
|
pub struct DockerManager {
|
||||||
pub number: Option<u16>,
|
pub docker: Docker,
|
||||||
pub net_in_total: Option<f64>,
|
|
||||||
pub net_out_total: Option<f64>,
|
|
||||||
pub dockers: Option<Vec<DockerContainer>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for DockerManager {
|
||||||
impl DockerInfo {
|
fn default() -> Self {
|
||||||
/// Collects Docker statistics for all managed containers.
|
Self {
|
||||||
///
|
docker: Docker::connect_with_local_defaults()
|
||||||
/// # Returns
|
.unwrap_or_else(|e| panic!("Failed to create default Docker connection: {}", e)),
|
||||||
/// * `Result<DockerInfo, Box<dyn Error + Send + Sync>>` - Aggregated Docker statistics or error if collection fails.
|
}
|
||||||
pub async fn collect() -> Result<Self, Box<dyn Error + Send + Sync>> {
|
|
||||||
Ok(Self { number: None, net_in_total: None, net_out_total: None, dockers: None })
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl DockerManager {
|
||||||
|
/// Creates a new DockerManager instance
|
||||||
|
pub fn new() -> Result<Self, Box<dyn Error + Send + Sync>> {
|
||||||
|
let docker = Docker::connect_with_local_defaults()
|
||||||
|
.map_err(|e| format!("Failed to connect to Docker: {}", e))?;
|
||||||
|
|
||||||
impl DockerContainer {
|
Ok(Self { docker })
|
||||||
/*
|
|
||||||
/// Restarts the specified Docker container by ID.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `docker` - Reference to a Bollard Docker client
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if restarted successfully, error otherwise.
|
|
||||||
pub async fn restart_container(docker: &Docker) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
||||||
// ...existing code...
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
/// Returns the container ID for a given [`DockerContainer`].
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `container` - Reference to a [`DockerContainer`]
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// * `Result<u32, Box<dyn Error + Send + Sync>>` - Container ID as integer.
|
|
||||||
pub async fn get_docker_container_id(container: DockerContainer) -> Result<u32, Box<dyn Error + Send + Sync>> {
|
|
||||||
Ok(container.ID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the image name for a given [`DockerContainer`].
|
/// Creates a DockerManager instance with optional Docker connection
|
||||||
///
|
pub fn new_optional() -> Option<Self> {
|
||||||
/// # Arguments
|
Docker::connect_with_local_defaults()
|
||||||
/// * `container` - Reference to a [`DockerContainer`]
|
.map(|docker| Self { docker })
|
||||||
///
|
.ok()
|
||||||
/// # Returns
|
|
||||||
/// * `Result<String, Box<dyn Error + Send + Sync>>` - Image name as string.
|
|
||||||
pub async fn get_docker_container_image(container: DockerContainer) -> Result<String, Box<dyn Error + Send + Sync>> {
|
|
||||||
Ok(container.image)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/// Finds the Docker container running the agent by image name
|
||||||
|
pub async fn get_client_container(
|
||||||
|
&self,
|
||||||
|
) -> Result<Option<DockerContainerDto>, Box<dyn Error + Send + Sync>> {
|
||||||
|
let containers = container::get_available_containers(&self.docker).await;
|
||||||
|
let client_image = "watcher-agent";
|
||||||
|
|
||||||
|
Ok(containers
|
||||||
|
.into_iter()
|
||||||
|
.find(|c| c.image.contains(client_image))
|
||||||
|
.map(|container| DockerContainerDto {
|
||||||
|
id: container.id,
|
||||||
|
image: container.image,
|
||||||
|
name: container.name,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the current client version (image name) if running in Docker
|
||||||
|
pub async fn get_client_version(&self) -> String {
|
||||||
|
match self.get_client_container().await {
|
||||||
|
Ok(Some(container)) => container.image.split(':').next().unwrap_or("unknown").to_string(),
|
||||||
|
Ok(None) => {
|
||||||
|
println!("Warning: No WatcherAgent container found");
|
||||||
|
"unknown".to_string()
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("Warning: Could not get current image version: {}", e);
|
||||||
|
"unknown".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks if Docker is available and the agent is running in a container
|
||||||
|
pub async fn is_dockerized(&self) -> bool {
|
||||||
|
self.get_client_container()
|
||||||
|
.await
|
||||||
|
.map(|c| c.is_some())
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets all available containers as DTOs for registration
|
||||||
|
pub async fn get_containers_for_registration(
|
||||||
|
&self,
|
||||||
|
) -> Result<Vec<DockerContainerDto>, Box<dyn Error + Send + Sync>> {
|
||||||
|
let containers = container::get_available_containers(&self.docker).await;
|
||||||
|
|
||||||
|
Ok(containers
|
||||||
|
.into_iter()
|
||||||
|
.map(|container| DockerContainerDto {
|
||||||
|
id: container.id,
|
||||||
|
image: container.image,
|
||||||
|
name: container.name,
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets container metrics for all containers
|
||||||
|
pub async fn get_container_metrics(
|
||||||
|
&self,
|
||||||
|
) -> Result<Vec<DockerContainerMetricDto>, Box<dyn Error + Send + Sync>> {
|
||||||
|
let containers = container::get_available_containers(&self.docker).await;
|
||||||
|
let mut metrics = Vec::new();
|
||||||
|
|
||||||
|
for container in containers {
|
||||||
|
// Get network stats (you'll need to implement this in container.rs)
|
||||||
|
let network_stats = container::get_network_stats(&self.docker, &container.id).await?;
|
||||||
|
// Get CPU stats (you'll need to implement this in container.rs)
|
||||||
|
let cpu_stats = container::get_cpu_stats(&self.docker, &container.id).await?;
|
||||||
|
|
||||||
|
// Get current status by inspecting the container
|
||||||
|
let status = match self
|
||||||
|
.docker
|
||||||
|
.inspect_container(&container.id, None::<InspectContainerOptions>)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(container_info) => {
|
||||||
|
// Extract status from container state and convert to string
|
||||||
|
container_info
|
||||||
|
.state
|
||||||
|
.and_then(|state| state.status)
|
||||||
|
.map(|status_enum| {
|
||||||
|
match status_enum {
|
||||||
|
bollard::models::ContainerStateStatusEnum::CREATED => "created",
|
||||||
|
bollard::models::ContainerStateStatusEnum::RUNNING => "running",
|
||||||
|
bollard::models::ContainerStateStatusEnum::PAUSED => "paused",
|
||||||
|
bollard::models::ContainerStateStatusEnum::RESTARTING => {
|
||||||
|
"restarting"
|
||||||
|
}
|
||||||
|
bollard::models::ContainerStateStatusEnum::REMOVING => "removing",
|
||||||
|
bollard::models::ContainerStateStatusEnum::EXITED => "exited",
|
||||||
|
bollard::models::ContainerStateStatusEnum::DEAD => "dead",
|
||||||
|
bollard::secret::ContainerStateStatusEnum::EMPTY => todo!(),
|
||||||
|
}
|
||||||
|
.to_string()
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|| "unknown".to_string())
|
||||||
|
}
|
||||||
|
Err(_) => "unknown".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
metrics.push(DockerContainerMetricDto {
|
||||||
|
id: container.id,
|
||||||
|
status: status,
|
||||||
|
network: network_stats,
|
||||||
|
cpu: cpu_stats,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(metrics)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the number of running containers
|
||||||
|
pub async fn get_container_count(&self) -> Result<usize, Box<dyn Error + Send + Sync>> {
|
||||||
|
let containers = container::get_available_containers(&self.docker).await;
|
||||||
|
Ok(containers.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Restarts a specific container by ID
|
||||||
|
pub async fn restart_container(
|
||||||
|
&self,
|
||||||
|
container_id: &str,
|
||||||
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
|
container::restart_container(&self.docker, container_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets total network statistics across all containers
|
||||||
|
pub async fn get_total_network_stats(
|
||||||
|
&self,
|
||||||
|
) -> Result<(u64, u64), Box<dyn Error + Send + Sync>> {
|
||||||
|
let metrics = self.get_container_metrics().await?;
|
||||||
|
|
||||||
|
let net_in_total: u64 = metrics.iter().map(|m| m.network.rx_bytes).sum();
|
||||||
|
let net_out_total: u64 = metrics.iter().map(|m| m.network.tx_bytes).sum();
|
||||||
|
|
||||||
|
Ok((net_in_total, net_out_total))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep these as utility functions if needed, but they should use DockerManager internally
|
||||||
|
impl DockerContainerDto {
|
||||||
|
/// Returns the container ID
|
||||||
|
pub fn id(&self) -> &str {
|
||||||
|
&self.id
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the image name
|
||||||
|
pub fn image(&self) -> &str {
|
||||||
|
&self.image
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the container name
|
||||||
|
pub fn name(&self) -> &str {
|
||||||
|
&self.name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -1,15 +1,13 @@
|
|||||||
|
|
||||||
//! Server-client communication utilities for WatcherAgent
|
//! Server-client communication utilities for WatcherAgent
|
||||||
//!
|
//!
|
||||||
//! Handles server commands, Docker image updates, and container management using the Bollard library.
|
//! Handles server commands, Docker image updates, and container management using the Bollard library.
|
||||||
//!
|
//!
|
||||||
use crate::models::{DockerContainer, ServerMessage};
|
use crate::models::ServerMessage;
|
||||||
use crate::docker::container::{get_available_container};
|
|
||||||
|
|
||||||
use std::error::Error;
|
use super::container::{restart_container, update_docker_image};
|
||||||
use bollard::Docker;
|
|
||||||
use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions};
|
use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions};
|
||||||
use futures_util::StreamExt;
|
use bollard::Docker;
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
/// Handles a message from the backend server and dispatches the appropriate action.
|
/// Handles a message from the backend server and dispatches the appropriate action.
|
||||||
///
|
///
|
||||||
@@ -19,7 +17,10 @@ use futures_util::StreamExt;
|
|||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if handled successfully, error otherwise.
|
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if handled successfully, error otherwise.
|
||||||
pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) -> Result<(), Box<dyn Error + Send + Sync>> {
|
pub async fn handle_server_message(
|
||||||
|
docker: &Docker,
|
||||||
|
msg: ServerMessage,
|
||||||
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
let msg = msg.clone();
|
let msg = msg.clone();
|
||||||
println!("Handling server message: {:?}", msg);
|
println!("Handling server message: {:?}", msg);
|
||||||
|
|
||||||
@@ -36,10 +37,14 @@ pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) -> Resul
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
"restart_container" => {
|
"restart_container" => {
|
||||||
println!("Received restart container command");
|
if let Some(image_name) = msg.data.get("image").and_then(|v| v.as_str()) {
|
||||||
// Call your restart_container function here
|
println!("Received restart command for image: {}", image_name);
|
||||||
restart_container(docker).await?;
|
// Call your update_docker_image function here
|
||||||
Ok(())
|
update_docker_image(docker, image_name).await?;
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err("Missing image name in update message".into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
"stop_agent" => {
|
"stop_agent" => {
|
||||||
println!("Received stop agent command");
|
println!("Received stop agent command");
|
||||||
@@ -52,87 +57,3 @@ pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) -> Resul
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pulls a new Docker image and restarts the current container.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `docker` - Reference to a Bollard Docker client.
|
|
||||||
/// * `image` - The name of the Docker image to pull.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if updated successfully, error otherwise.
|
|
||||||
pub async fn update_docker_image(docker: &Docker, image: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
||||||
println!("Updating to {}", image);
|
|
||||||
|
|
||||||
// 1. Pull new image
|
|
||||||
let mut stream = docker.create_image(
|
|
||||||
Some(CreateImageOptions {
|
|
||||||
from_image: Some(image.to_string()),
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Use the stream with proper trait bounds
|
|
||||||
while let Some(result) = StreamExt::next(&mut stream).await {
|
|
||||||
match result {
|
|
||||||
Ok(progress) => {
|
|
||||||
if let Some(status) = progress.status {
|
|
||||||
println!("Pull status: {}", status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Error pulling image: {}", e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Restart the current container
|
|
||||||
let _ = restart_container(docker).await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Finds the Docker container running the agent by image name.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `docker` - Reference to a Bollard Docker client.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// * `Result<Option<DockerContainer>, Box<dyn Error + Send + Sync>>` - The agent's container info if found.
|
|
||||||
pub async fn get_client_container(docker: &Docker) -> Result<Option<DockerContainer>, Box<dyn Error + Send + Sync>> {
|
|
||||||
let containers = get_available_container(docker).await;
|
|
||||||
let client_image = "watcher-agent";
|
|
||||||
|
|
||||||
// Find container with the specific image
|
|
||||||
if let Some(container) = containers.iter().find(|c| c.image == client_image) {
|
|
||||||
Ok(Some(container.clone()))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Restarts the agent's own Docker container.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `docker` - Reference to a Bollard Docker client.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if restarted successfully, error otherwise.
|
|
||||||
pub async fn restart_container(docker: &Docker) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
||||||
if let Ok(Some(container)) = get_client_container(docker).await {
|
|
||||||
let container_id = container.clone().ID;
|
|
||||||
println!("Restarting container {}", container_id);
|
|
||||||
if let Err(e) = docker.restart_container(&container_id.to_string(), Some(RestartContainerOptions { signal: None, t: Some(0) }))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
eprintln!("Failed to restart container: {}", e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
eprintln!("No container ID found (HOSTNAME not set?)");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
206
WatcherAgent/src/docker/stats.rs
Normal file
206
WatcherAgent/src/docker/stats.rs
Normal file
@@ -0,0 +1,206 @@
|
|||||||
|
use bollard::query_parameters::{ListContainersOptions, StatsOptions};
|
||||||
|
use bollard::Docker;
|
||||||
|
use futures_util::stream::TryStreamExt;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct ContainerCpuInfo {
|
||||||
|
pub container_id: String,
|
||||||
|
pub cpu_usage_percent: f64,
|
||||||
|
pub system_cpu_usage: u64,
|
||||||
|
pub container_cpu_usage: u64,
|
||||||
|
pub online_cpus: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct ContainerNetworkInfo {
|
||||||
|
pub container_id: String,
|
||||||
|
pub rx_bytes: u64,
|
||||||
|
pub tx_bytes: u64,
|
||||||
|
pub rx_packets: u64,
|
||||||
|
pub tx_packets: u64,
|
||||||
|
pub rx_errors: u64,
|
||||||
|
pub tx_errors: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get container statistics for all containers using an existing Docker client
|
||||||
|
pub async fn get_container_stats(
|
||||||
|
docker: &Docker,
|
||||||
|
) -> Result<(Vec<ContainerCpuInfo>, Vec<ContainerNetworkInfo>), Box<dyn Error + Send + Sync>> {
|
||||||
|
let containers = docker
|
||||||
|
.list_containers(Some(ListContainersOptions {
|
||||||
|
all: true,
|
||||||
|
..Default::default()
|
||||||
|
}))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut cpu_infos = Vec::new();
|
||||||
|
let mut net_infos = Vec::new();
|
||||||
|
|
||||||
|
for container in containers {
|
||||||
|
let id = container.id.unwrap_or_default();
|
||||||
|
|
||||||
|
// Skip if no ID
|
||||||
|
if id.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut stats_stream = docker.stats(
|
||||||
|
&id,
|
||||||
|
Some(StatsOptions {
|
||||||
|
stream: false,
|
||||||
|
one_shot: true,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(stats) = stats_stream.try_next().await? {
|
||||||
|
// CPU Info
|
||||||
|
if let (Some(cpu_stats), Some(precpu_stats)) = (&stats.cpu_stats, &stats.precpu_stats) {
|
||||||
|
if let (Some(cpu_usage), Some(pre_cpu_usage)) =
|
||||||
|
(&cpu_stats.cpu_usage, &precpu_stats.cpu_usage)
|
||||||
|
{
|
||||||
|
let cpu_delta = cpu_usage
|
||||||
|
.total_usage
|
||||||
|
.unwrap_or(0)
|
||||||
|
.saturating_sub(pre_cpu_usage.total_usage.unwrap_or(0));
|
||||||
|
|
||||||
|
let system_delta = cpu_stats
|
||||||
|
.system_cpu_usage
|
||||||
|
.unwrap_or(0)
|
||||||
|
.saturating_sub(precpu_stats.system_cpu_usage.unwrap_or(0));
|
||||||
|
|
||||||
|
let online_cpus = cpu_stats.online_cpus.unwrap_or(1);
|
||||||
|
|
||||||
|
let cpu_percent = if system_delta > 0 && online_cpus > 0 {
|
||||||
|
(cpu_delta as f64 / system_delta as f64) * online_cpus as f64 * 100.0
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
};
|
||||||
|
|
||||||
|
cpu_infos.push(ContainerCpuInfo {
|
||||||
|
container_id: id.clone(),
|
||||||
|
cpu_usage_percent: cpu_percent,
|
||||||
|
system_cpu_usage: cpu_stats.system_cpu_usage.unwrap_or(0),
|
||||||
|
container_cpu_usage: cpu_usage.total_usage.unwrap_or(0),
|
||||||
|
online_cpus,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Network Info
|
||||||
|
if let Some(networks) = stats.networks {
|
||||||
|
for (_name, net) in networks {
|
||||||
|
net_infos.push(ContainerNetworkInfo {
|
||||||
|
container_id: id.clone(),
|
||||||
|
rx_bytes: net.rx_bytes.unwrap(),
|
||||||
|
tx_bytes: net.tx_bytes.unwrap(),
|
||||||
|
rx_packets: net.rx_packets.unwrap(),
|
||||||
|
tx_packets: net.tx_packets.unwrap(),
|
||||||
|
rx_errors: net.rx_errors.unwrap(),
|
||||||
|
tx_errors: net.tx_errors.unwrap(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((cpu_infos, net_infos))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get container statistics for a specific container
|
||||||
|
pub async fn get_single_container_stats(
|
||||||
|
docker: &Docker,
|
||||||
|
container_id: &str,
|
||||||
|
) -> Result<(Option<ContainerCpuInfo>, Option<ContainerNetworkInfo>), Box<dyn Error + Send + Sync>>
|
||||||
|
{
|
||||||
|
let mut stats_stream = docker.stats(
|
||||||
|
container_id,
|
||||||
|
Some(StatsOptions {
|
||||||
|
stream: false,
|
||||||
|
one_shot: true,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(stats) = stats_stream.try_next().await? {
|
||||||
|
let mut cpu_info = None;
|
||||||
|
let mut net_info = None;
|
||||||
|
|
||||||
|
// CPU Info
|
||||||
|
if let (Some(cpu_stats), Some(precpu_stats)) = (&stats.cpu_stats, &stats.precpu_stats) {
|
||||||
|
if let (Some(cpu_usage), Some(pre_cpu_usage)) =
|
||||||
|
(&cpu_stats.cpu_usage, &precpu_stats.cpu_usage)
|
||||||
|
{
|
||||||
|
let cpu_delta = cpu_usage
|
||||||
|
.total_usage
|
||||||
|
.unwrap_or(0)
|
||||||
|
.saturating_sub(pre_cpu_usage.total_usage.unwrap_or(0));
|
||||||
|
|
||||||
|
let system_delta = cpu_stats
|
||||||
|
.system_cpu_usage
|
||||||
|
.unwrap_or(0)
|
||||||
|
.saturating_sub(precpu_stats.system_cpu_usage.unwrap_or(0));
|
||||||
|
|
||||||
|
let online_cpus = cpu_stats.online_cpus.unwrap_or(1);
|
||||||
|
|
||||||
|
let cpu_percent = if system_delta > 0 && online_cpus > 0 {
|
||||||
|
(cpu_delta as f64 / system_delta as f64) * online_cpus as f64 * 100.0
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
};
|
||||||
|
|
||||||
|
cpu_info = Some(ContainerCpuInfo {
|
||||||
|
container_id: container_id.to_string(),
|
||||||
|
cpu_usage_percent: cpu_percent,
|
||||||
|
system_cpu_usage: cpu_stats.system_cpu_usage.unwrap_or(0),
|
||||||
|
container_cpu_usage: cpu_usage.total_usage.unwrap_or(0),
|
||||||
|
online_cpus,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Network Info
|
||||||
|
if let Some(networks) = stats.networks {
|
||||||
|
// Take the first network interface (usually eth0)
|
||||||
|
if let Some((_name, net)) = networks.into_iter().next() {
|
||||||
|
net_info = Some(ContainerNetworkInfo {
|
||||||
|
container_id: container_id.to_string(),
|
||||||
|
rx_bytes: net.rx_bytes.unwrap(),
|
||||||
|
tx_bytes: net.tx_bytes.unwrap(),
|
||||||
|
rx_packets: net.rx_packets.unwrap(),
|
||||||
|
tx_packets: net.tx_packets.unwrap(),
|
||||||
|
rx_errors: net.rx_errors.unwrap(),
|
||||||
|
tx_errors: net.tx_errors.unwrap(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((cpu_info, net_info))
|
||||||
|
} else {
|
||||||
|
Ok((None, None))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get total network statistics across all containers
|
||||||
|
pub async fn get_total_network_stats(
|
||||||
|
docker: &Docker,
|
||||||
|
) -> Result<(u64, u64), Box<dyn Error + Send + Sync>> {
|
||||||
|
let (_, net_infos) = get_container_stats(docker).await?;
|
||||||
|
|
||||||
|
let total_rx: u64 = net_infos.iter().map(|net| net.rx_bytes).sum();
|
||||||
|
let total_tx: u64 = net_infos.iter().map(|net| net.tx_bytes).sum();
|
||||||
|
|
||||||
|
Ok((total_rx, total_tx))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get average CPU usage across all containers
|
||||||
|
pub async fn get_average_cpu_usage(docker: &Docker) -> Result<f64, Box<dyn Error + Send + Sync>> {
|
||||||
|
let (cpu_infos, _) = get_container_stats(docker).await?;
|
||||||
|
|
||||||
|
if cpu_infos.is_empty() {
|
||||||
|
return Ok(0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let total_cpu: f64 = cpu_infos.iter().map(|cpu| cpu.cpu_usage_percent).sum();
|
||||||
|
Ok(total_cpu / cpu_infos.len() as f64)
|
||||||
|
}
|
@@ -1,4 +1,3 @@
|
|||||||
|
|
||||||
/// # WatcherAgent
|
/// # WatcherAgent
|
||||||
///
|
///
|
||||||
/// **WatcherAgent** is a cross-platform system monitoring agent written in Rust.
|
/// **WatcherAgent** is a cross-platform system monitoring agent written in Rust.
|
||||||
@@ -26,18 +25,16 @@
|
|||||||
/// ```
|
/// ```
|
||||||
///
|
///
|
||||||
/// The agent will register itself, start collecting metrics, and listen for remote commands.
|
/// The agent will register itself, start collecting metrics, and listen for remote commands.
|
||||||
|
|
||||||
pub mod api;
|
pub mod api;
|
||||||
|
pub mod docker;
|
||||||
pub mod hardware;
|
pub mod hardware;
|
||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
pub mod models;
|
pub mod models;
|
||||||
pub mod docker;
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
use bollard::Docker;
|
use bollard::Docker;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
|
|
||||||
/// Awaits a spawned asynchronous task and flattens its nested `Result` type.
|
/// Awaits a spawned asynchronous task and flattens its nested `Result` type.
|
||||||
///
|
///
|
||||||
@@ -82,26 +79,8 @@ async fn flatten<T>(
|
|||||||
/// Returns an error if registration or any background task fails, or if required arguments are missing.
|
/// Returns an error if registration or any background task fails, or if required arguments are missing.
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
// Initialize Docker client
|
// Parse command-line arguments
|
||||||
let docker = Docker::connect_with_local_defaults()
|
|
||||||
.map_err(|e| format!("Failed to connect to Docker: {}", e))?;
|
|
||||||
|
|
||||||
// Get current image version
|
|
||||||
let client_version = match docker::serverclientcomm::get_client_container(&docker).await {
|
|
||||||
Ok(Some(version)) => version.image,
|
|
||||||
Ok(None) => {
|
|
||||||
eprintln!("Warning: No image version found");
|
|
||||||
"unknown".to_string()
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Warning: Could not get current image version: {}", e);
|
|
||||||
"unknown".to_string()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
println!("Client Version: {}", client_version);
|
|
||||||
|
|
||||||
let args: Vec<String> = env::args().collect();
|
let args: Vec<String> = env::args().collect();
|
||||||
// args[0] is the binary name, args[1] is the first actual argument
|
|
||||||
if args.len() < 2 {
|
if args.len() < 2 {
|
||||||
eprintln!("Usage: {} <server-url>", args[0]);
|
eprintln!("Usage: {} <server-url>", args[0]);
|
||||||
return Err("Missing server URL argument".into());
|
return Err("Missing server URL argument".into());
|
||||||
@@ -111,20 +90,39 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
|||||||
|
|
||||||
// Registration with backend server
|
// Registration with backend server
|
||||||
let (server_id, ip) = match api::register_with_server(&server_url).await {
|
let (server_id, ip) = match api::register_with_server(&server_url).await {
|
||||||
Ok((id, ip)) => (id, ip),
|
Ok((id, ip)) => {
|
||||||
|
println!("Registered with server. ID: {}, IP: {}", id, ip);
|
||||||
|
(id, ip)
|
||||||
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
eprintln!("Fehler bei der Registrierung am Server: {e}");
|
eprintln!("Fehler bei der Registrierung am Server: {e}");
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Initialize Docker (optional - agent can run without Docker)
|
||||||
|
let docker_manager = docker::DockerManager::new_optional();
|
||||||
|
|
||||||
|
// Get current image version
|
||||||
|
let client_version = if let Some(ref docker_manager) = docker_manager {
|
||||||
|
docker_manager.get_client_version().await
|
||||||
|
} else {
|
||||||
|
"unknown".to_string()
|
||||||
|
};
|
||||||
|
println!("Client Version: {}", client_version);
|
||||||
|
|
||||||
// Start background tasks
|
// Start background tasks
|
||||||
// Start server listening for commands
|
// Start server listening for commands (only if Docker is available)
|
||||||
let listening_handle = tokio::spawn({
|
let listening_handle = if let Some(docker_manager) = docker_manager {
|
||||||
let docker = docker.clone();
|
tokio::spawn({
|
||||||
let server_url = server_url.to_string();
|
let docker = docker_manager.docker.clone();
|
||||||
async move { api::listening_to_server(&docker, &server_url).await }
|
let server_url = server_url.to_string();
|
||||||
});
|
async move { api::listening_to_server(&docker, &server_url).await }
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
println!("Docker not available, skipping server command listener.");
|
||||||
|
tokio::spawn(async { Ok(()) }) // Dummy task
|
||||||
|
};
|
||||||
|
|
||||||
// Start heartbeat in background
|
// Start heartbeat in background
|
||||||
let heartbeat_handle = tokio::spawn({
|
let heartbeat_handle = tokio::spawn({
|
||||||
|
@@ -1,5 +1,3 @@
|
|||||||
|
|
||||||
|
|
||||||
/// # Metrics Module
|
/// # Metrics Module
|
||||||
///
|
///
|
||||||
/// This module orchestrates the collection and reporting of hardware and network metrics for WatcherAgent.
|
/// This module orchestrates the collection and reporting of hardware and network metrics for WatcherAgent.
|
||||||
@@ -15,11 +13,11 @@ use std::error::Error;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::api;
|
use crate::api;
|
||||||
|
//use crate::docker::DockerInfo;
|
||||||
use crate::hardware::network::NetworkMonitor;
|
use crate::hardware::network::NetworkMonitor;
|
||||||
use crate::hardware::HardwareInfo;
|
use crate::hardware::HardwareInfo;
|
||||||
use crate::models::MetricDto;
|
use crate::models::MetricDto;
|
||||||
|
|
||||||
|
|
||||||
/// Main orchestrator for hardware and network metric collection and reporting.
|
/// Main orchestrator for hardware and network metric collection and reporting.
|
||||||
///
|
///
|
||||||
/// The `Collector` struct manages the state required to collect metrics and send them to the backend server. It maintains a network monitor for bandwidth tracking, the agent's server ID, and its IP address.
|
/// The `Collector` struct manages the state required to collect metrics and send them to the backend server. It maintains a network monitor for bandwidth tracking, the agent's server ID, and its IP address.
|
||||||
@@ -34,7 +32,6 @@ pub struct Collector {
|
|||||||
ip_address: String,
|
ip_address: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
impl Collector {
|
impl Collector {
|
||||||
/// Creates a new `Collector` instance for metric collection and reporting.
|
/// Creates a new `Collector` instance for metric collection and reporting.
|
||||||
///
|
///
|
||||||
@@ -109,7 +106,7 @@ impl Collector {
|
|||||||
ram_load: hardware.memory.current_load.unwrap_or_default(),
|
ram_load: hardware.memory.current_load.unwrap_or_default(),
|
||||||
ram_size: hardware.memory.total_size.unwrap_or_default(),
|
ram_size: hardware.memory.total_size.unwrap_or_default(),
|
||||||
disk_size: hardware.disk.total_size.unwrap_or_default(),
|
disk_size: hardware.disk.total_size.unwrap_or_default(),
|
||||||
disk_usage: hardware.disk.total_used.unwrap_or_default(),
|
disk_usage: hardware.disk.total_usage.unwrap_or_default(),
|
||||||
disk_temp: 0.0, // not supported
|
disk_temp: 0.0, // not supported
|
||||||
net_rx: hardware.network.rx_rate.unwrap_or_default(),
|
net_rx: hardware.network.rx_rate.unwrap_or_default(),
|
||||||
net_tx: hardware.network.tx_rate.unwrap_or_default(),
|
net_tx: hardware.network.tx_rate.unwrap_or_default(),
|
||||||
|
@@ -1,5 +1,3 @@
|
|||||||
|
|
||||||
|
|
||||||
/// # Models Module
|
/// # Models Module
|
||||||
///
|
///
|
||||||
/// This module defines all data structures (DTOs) used for communication between WatcherAgent and the backend server, as well as hardware metrics and Docker container info.
|
/// This module defines all data structures (DTOs) used for communication between WatcherAgent and the backend server, as well as hardware metrics and Docker container info.
|
||||||
@@ -11,6 +9,8 @@
|
|||||||
///
|
///
|
||||||
/// ## Usage
|
/// ## Usage
|
||||||
/// These types are serialized/deserialized for HTTP communication and used throughout the agent for data exchange.
|
/// These types are serialized/deserialized for HTTP communication and used throughout the agent for data exchange.
|
||||||
|
use crate::docker::stats;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
/// Registration data sent to the backend server.
|
/// Registration data sent to the backend server.
|
||||||
@@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
#[derive(Serialize, Debug)]
|
#[derive(Serialize, Debug)]
|
||||||
pub struct RegistrationDto {
|
pub struct RegistrationDto {
|
||||||
#[serde(rename = "id")]
|
#[serde(rename = "id")]
|
||||||
pub id: i32,
|
pub server_id: i32,
|
||||||
#[serde(rename = "ipAddress")]
|
#[serde(rename = "ipAddress")]
|
||||||
pub ip_address: String,
|
pub ip_address: String,
|
||||||
#[serde(rename = "cpuType")]
|
#[serde(rename = "cpuType")]
|
||||||
@@ -186,12 +186,22 @@ pub struct Acknowledgment {
|
|||||||
/// - `_net_out`: Network transmit rate in **bytes per second (B/s)**
|
/// - `_net_out`: Network transmit rate in **bytes per second (B/s)**
|
||||||
/// - `_cpu_load`: CPU usage as a percentage (**0.0–100.0**)
|
/// - `_cpu_load`: CPU usage as a percentage (**0.0–100.0**)
|
||||||
#[derive(Debug, Serialize, Clone)]
|
#[derive(Debug, Serialize, Clone)]
|
||||||
pub struct DockerContainer {
|
pub struct DockerContainerRegistrationDto {
|
||||||
pub ID: u32,
|
pub server_id: u32,
|
||||||
|
pub containers: Vec<DockerContainerDto>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Clone)]
|
||||||
|
pub struct DockerContainerDto {
|
||||||
|
pub id: String,
|
||||||
pub image: String,
|
pub image: String,
|
||||||
pub Name: String,
|
pub name: String,
|
||||||
pub Status: String, // "running";"stopped";others
|
}
|
||||||
pub _net_in: f64,
|
|
||||||
pub _net_out: f64,
|
#[derive(Debug, Serialize, Clone)]
|
||||||
pub _cpu_load: f64,
|
pub struct DockerContainerMetricDto {
|
||||||
}
|
pub id: String,
|
||||||
|
pub status: String, // "running";"stopped";others
|
||||||
|
pub network: stats::ContainerNetworkInfo,
|
||||||
|
pub cpu: stats::ContainerCpuInfo,
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user