Compare commits
23 Commits
v0.1.16
...
enhancemen
Author | SHA1 | Date | |
---|---|---|---|
a095444222 | |||
5e7bc3df54 | |||
1c7a169956 | |||
c7bce926e9 | |||
711083daa0 | |||
06cec6ff9f | |||
a7cae5e93f | |||
66428863e6 | |||
b35cac0dbe | |||
bb55b46c34 | |||
76d54cb433 | |||
4dc8c56a5c | |||
c81040b16b | |||
68c307e258 | |||
5d00f072d8 | |||
9072e253ec | |||
063ad113d7 | |||
097be0f555 | |||
c7b8e24a54 | |||
fb1d016b36 | |||
89d58e9696 | |||
45c6e8180c | |||
c395885bbd |
@@ -3,10 +3,8 @@ name: Rust Cross-Platform Build
|
||||
on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
branches: [ "development", "main", "feature/*", "bugfix/*", "enhancement/*" ]
|
||||
branches: [ "development", "main", "staging" ]
|
||||
tags: [ "v*.*.*" ]
|
||||
pull_request:
|
||||
branches: [ "development", "main" ]
|
||||
|
||||
env:
|
||||
REGISTRY: git.triggermeelmo.com
|
||||
@@ -76,44 +74,6 @@ jobs:
|
||||
# working-directory: ${{ needs.detect-project.outputs.project-dir }}
|
||||
# run: cargo clippy -- -D warnings
|
||||
|
||||
set-tag:
|
||||
name: Set Tag Name
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
tag_name: ${{ steps.set_tag.outputs.tag_name }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Determine next semantic version tag
|
||||
id: set_tag
|
||||
run: |
|
||||
git fetch --tags
|
||||
|
||||
# Find latest tag matching vX.Y.Z
|
||||
latest_tag=$(git tag --list 'v*.*.*' --sort=-v:refname | head -n 1)
|
||||
if [[ -z "$latest_tag" ]]; then
|
||||
major=0
|
||||
minor=0
|
||||
patch=0
|
||||
else
|
||||
version="${latest_tag#v}"
|
||||
IFS='.' read -r major minor patch <<< "$version"
|
||||
fi
|
||||
|
||||
if [[ "${GITHUB_REF}" == "refs/heads/main" ]]; then
|
||||
major=$((major + 1))
|
||||
minor=0
|
||||
patch=0
|
||||
elif [[ "${GITHUB_REF}" == "refs/heads/development" ]]; then
|
||||
minor=$((minor + 1))
|
||||
patch=0
|
||||
else
|
||||
patch=$((patch + 1))
|
||||
fi
|
||||
|
||||
new_tag="v${major}.${minor}.${patch}"
|
||||
echo "tag_name=${new_tag}" >> $GITHUB_OUTPUT
|
||||
|
||||
# audit:
|
||||
# name: Security Audit
|
||||
# needs: [detect-project]
|
||||
@@ -189,14 +149,66 @@ jobs:
|
||||
path: |
|
||||
${{ needs.detect-project.outputs.project-dir }}/target/${{ matrix.target }}/release/${{ needs.detect-project.outputs.project-name }}${{ matrix.os == 'windows' && '.exe' || '' }}
|
||||
|
||||
set-tag:
|
||||
name: Set Tag Name
|
||||
needs: [detect-project, build]
|
||||
#if: ${{ !failure() && !cancelled() && github.event_name != 'pull_request' }}
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
tag_name: ${{ steps.set_tag.outputs.tag_name }}
|
||||
should_tag: ${{ steps.set_tag.outputs.should_tag }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Determine next semantic version tag
|
||||
id: set_tag
|
||||
run: |
|
||||
git fetch --tags
|
||||
|
||||
# Find latest tag matching vX.Y.Z
|
||||
latest_tag=$(git tag --list 'v*.*.*' --sort=-v:refname | head -n 1)
|
||||
if [[ -z "$latest_tag" ]]; then
|
||||
major=0
|
||||
minor=0
|
||||
patch=0
|
||||
else
|
||||
version="${latest_tag#v}"
|
||||
IFS='.' read -r major minor patch <<< "$version"
|
||||
fi
|
||||
|
||||
if [[ "${GITHUB_REF}" == "refs/heads/main" ]]; then
|
||||
major=$((major + 1))
|
||||
minor=0
|
||||
patch=0
|
||||
new_tag="v${major}.${minor}.${patch}"
|
||||
echo "tag_name=${new_tag}" >> $GITHUB_OUTPUT
|
||||
echo "should_tag=true" >> $GITHUB_OUTPUT
|
||||
echo "Creating new major version tag: ${new_tag}"
|
||||
|
||||
elif [[ "${GITHUB_REF}" == "refs/heads/development" ]]; then
|
||||
minor=$((minor + 1))
|
||||
patch=0
|
||||
new_tag="v${major}.${minor}.${patch}"
|
||||
echo "tag_name=${new_tag}" >> $GITHUB_OUTPUT
|
||||
echo "should_tag=true" >> $GITHUB_OUTPUT
|
||||
echo "Creating new minor version tag: ${new_tag}"
|
||||
|
||||
elif [[ "${GITHUB_REF}" == "refs/heads/staging" ]]; then
|
||||
patch=$((patch + 1))
|
||||
new_tag="v${major}.${minor}.${patch}"
|
||||
echo "tag_name=${new_tag}" >> $GITHUB_OUTPUT
|
||||
echo "should_tag=true" >> $GITHUB_OUTPUT
|
||||
echo "Creating new patch version tag: ${new_tag}"
|
||||
fi
|
||||
|
||||
docker-build:
|
||||
name: Build and Push Docker Image
|
||||
needs: [detect-project, build, set-tag]
|
||||
#if: |
|
||||
# always() &&
|
||||
# needs.detect-project.result == 'success' &&
|
||||
# needs.build.result == 'success' &&
|
||||
# github.event_name != 'pull_request'
|
||||
if: |
|
||||
needs.detect-project.result == 'success' &&
|
||||
needs.build.result == 'success' &&
|
||||
needs.set-tag.outputs.should_tag == 'true' &&
|
||||
github.event_name != 'pull_request'
|
||||
runs-on: ubuntu-latest
|
||||
environment: production
|
||||
steps:
|
||||
@@ -248,10 +260,10 @@ jobs:
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
echo "Creating new tag: ${{ needs.set-tag.outputs.tag_name }}"
|
||||
git tag ${{ needs.set-tag.outputs.tag_name }}
|
||||
git push origin ${{ needs.set-tag.outputs.tag_name }}
|
||||
|
||||
|
||||
summary:
|
||||
name: Workflow Summary
|
||||
needs: [test, build, docker-build]
|
@@ -12,10 +12,12 @@
|
||||
/// These functions are called from the main agent loop and background tasks. All network operations are asynchronous and robust to transient failures.
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::docker::container;
|
||||
use crate::docker::serverclientcomm::handle_server_message;
|
||||
use crate::hardware::HardwareInfo;
|
||||
use crate::models::{
|
||||
Acknowledgment, HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage,
|
||||
Acknowledgment, DockerMetricDto, DockerRegistrationDto, HeartbeatDto, IdResponse, MetricDto,
|
||||
RegistrationDto, ServerMessage,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
@@ -39,7 +41,7 @@ use bollard::Docker;
|
||||
/// Returns an error if unable to register after repeated attempts.
|
||||
pub async fn register_with_server(
|
||||
base_url: &str,
|
||||
) -> Result<(i32, String), Box<dyn Error + Send + Sync>> {
|
||||
) -> Result<(u16, String), Box<dyn Error + Send + Sync>> {
|
||||
// First get local IP
|
||||
let ip = local_ip_address::local_ip()?.to_string();
|
||||
println!("Local IP address detected: {}", ip);
|
||||
@@ -103,7 +105,7 @@ pub async fn register_with_server(
|
||||
async fn get_server_id_by_ip(
|
||||
base_url: &str,
|
||||
ip: &str,
|
||||
) -> Result<(i32, String), Box<dyn Error + Send + Sync>> {
|
||||
) -> Result<(u16, String), Box<dyn Error + Send + Sync>> {
|
||||
let client = Client::builder()
|
||||
.danger_accept_invalid_certs(true)
|
||||
.build()?;
|
||||
@@ -151,6 +153,50 @@ async fn get_server_id_by_ip(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn broadcast_docker_containers(
|
||||
base_url: &str,
|
||||
server_id: u16,
|
||||
container_dto: &mut DockerRegistrationDto,
|
||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
// First get local IP
|
||||
println!("Preparing to broadcast docker containers...");
|
||||
// Create HTTP client for registration
|
||||
let client = Client::builder()
|
||||
.danger_accept_invalid_certs(true)
|
||||
.build()?;
|
||||
|
||||
// Prepare registration data
|
||||
let container_dto = container_dto;
|
||||
container_dto.server_id = server_id;
|
||||
|
||||
// Try to register (will retry on failure)
|
||||
loop {
|
||||
println!("Attempting to broadcast containers...");
|
||||
let url = format!("{}/monitoring/service-discovery", base_url);
|
||||
match client.post(&url).json(&container_dto).send().await {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
println!(
|
||||
"✅ Successfully broadcasted following docker container: {:?}",
|
||||
container_dto
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Ok(resp) => {
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
println!(
|
||||
"⚠️ Broadcasting failed ({}): {} (will retry in 10 seconds)",
|
||||
status, text
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
println!("⚠️ Broadcasting failed: {} (will retry in 10 seconds)", err);
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Periodically sends heartbeat signals to the backend server to indicate agent liveness.
|
||||
///
|
||||
/// This function runs in a background task and will retry on network errors.
|
||||
@@ -308,7 +354,7 @@ pub async fn listening_to_server(
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<(), Box<dyn Error + Send + Sync>>` - Ok if acknowledgment is sent successfully.
|
||||
async fn send_acknowledgment(
|
||||
pub async fn send_acknowledgment(
|
||||
client: &reqwest::Client,
|
||||
base_url: &str,
|
||||
message_id: &str,
|
||||
@@ -339,3 +385,23 @@ async fn send_acknowledgment(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn send_docker_metrics(
|
||||
base_url: &str,
|
||||
docker_metrics: &DockerMetricDto,
|
||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let client = Client::new();
|
||||
let url = format!("{}/monitoring/docker-metric", base_url);
|
||||
println!("Docker Metrics: {:?}", docker_metrics);
|
||||
|
||||
match client.post(&url).json(&docker_metrics).send().await {
|
||||
Ok(res) => println!(
|
||||
"✅ Sent docker metrics for server {} | Status: {}",
|
||||
docker_metrics.server_id,
|
||||
res.status()
|
||||
),
|
||||
Err(err) => eprintln!("❌ Failed to send docker metrics: {}", err),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@@ -4,7 +4,7 @@
|
||||
//!
|
||||
use crate::docker::stats;
|
||||
use crate::docker::stats::{ContainerCpuInfo, ContainerNetworkInfo};
|
||||
use crate::models::{DockerContainerDto, DockerContainerRegistrationDto};
|
||||
use crate::models::DockerContainer;
|
||||
|
||||
use bollard::query_parameters::{
|
||||
CreateImageOptions, ListContainersOptions, RestartContainerOptions,
|
||||
@@ -20,7 +20,7 @@ use std::error::Error;
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Vec<DockerContainer>` - Vector of Docker container info.
|
||||
pub async fn get_available_containers(docker: &Docker) -> Vec<DockerContainerDto> {
|
||||
pub async fn get_available_containers(docker: &Docker) -> Vec<DockerContainer> {
|
||||
println!("=== DOCKER CONTAINER LIST ===");
|
||||
|
||||
let options = Some(ListContainersOptions {
|
||||
@@ -51,29 +51,10 @@ pub async fn get_available_containers(docker: &Docker) -> Vec<DockerContainerDto
|
||||
.map(|img| img.to_string())
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
|
||||
/*let status = container
|
||||
.status
|
||||
.as_ref()
|
||||
.map(|s| match s.to_lowercase().as_str() {
|
||||
s if s.contains("up") || s.contains("running") => "running".to_string(),
|
||||
s if s.contains("exited") || s.contains("stopped") => {
|
||||
"stopped".to_string()
|
||||
}
|
||||
_ => s.to_string(),
|
||||
})
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
|
||||
println!(
|
||||
" - ID: {}, Image: {}, Name: {}",
|
||||
short_id,
|
||||
container.image.unwrap(),
|
||||
name
|
||||
);*/
|
||||
|
||||
Some(DockerContainerDto {
|
||||
Some(DockerContainer {
|
||||
id: short_id.to_string(),
|
||||
image,
|
||||
name: name,
|
||||
image: Some(image),
|
||||
name: Some(name),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
@@ -191,20 +172,21 @@ pub async fn get_network_stats(
|
||||
docker: &Docker,
|
||||
container_id: &str,
|
||||
) -> Result<ContainerNetworkInfo, Box<dyn Error + Send + Sync>> {
|
||||
let (_, net_info) = stats::get_single_container_stats(docker, container_id).await?;
|
||||
let (_, net_info, _) = stats::get_single_container_stats(docker, container_id).await?;
|
||||
|
||||
if let Some(net_info) = net_info {
|
||||
Ok(net_info)
|
||||
} else {
|
||||
// Return default network info if not found
|
||||
println!("No network info found for container {}", container_id);
|
||||
Ok(ContainerNetworkInfo {
|
||||
container_id: container_id.to_string(),
|
||||
rx_bytes: 0,
|
||||
tx_bytes: 0,
|
||||
rx_packets: 0,
|
||||
tx_packets: 0,
|
||||
rx_errors: 0,
|
||||
tx_errors: 0,
|
||||
container_id: Some(container_id.to_string()),
|
||||
rx_bytes: None,
|
||||
tx_bytes: None,
|
||||
rx_packets: None,
|
||||
tx_packets: None,
|
||||
rx_errors: None,
|
||||
tx_errors: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -214,18 +196,19 @@ pub async fn get_cpu_stats(
|
||||
docker: &Docker,
|
||||
container_id: &str,
|
||||
) -> Result<ContainerCpuInfo, Box<dyn Error + Send + Sync>> {
|
||||
let (cpu_info, _) = stats::get_single_container_stats(docker, container_id).await?;
|
||||
let (cpu_info, _, _) = stats::get_single_container_stats(docker, container_id).await?;
|
||||
|
||||
if let Some(cpu_info) = cpu_info {
|
||||
Ok(cpu_info)
|
||||
} else {
|
||||
// Return default CPU info if not found
|
||||
println!("No CPU info found for container {}", container_id);
|
||||
Ok(ContainerCpuInfo {
|
||||
container_id: container_id.to_string(),
|
||||
cpu_usage_percent: 0.0,
|
||||
system_cpu_usage: 0,
|
||||
container_cpu_usage: 0,
|
||||
online_cpus: 1,
|
||||
container_id: Some(container_id.to_string()),
|
||||
cpu_usage_percent: None,
|
||||
system_cpu_usage: None,
|
||||
container_cpu_usage: None,
|
||||
online_cpus: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@@ -11,8 +11,11 @@ pub mod container;
|
||||
pub mod serverclientcomm;
|
||||
pub mod stats;
|
||||
|
||||
use crate::models::{DockerContainerDto, DockerContainerMetricDto};
|
||||
use bollard::{query_parameters::InspectContainerOptions, Docker};
|
||||
use crate::models::{
|
||||
DockerCollectMetricDto, DockerContainer, DockerContainerCpuDto, DockerContainerInfo,
|
||||
DockerContainerNetworkDto, DockerContainerRamDto, DockerMetricDto, DockerRegistrationDto,
|
||||
};
|
||||
use bollard::Docker;
|
||||
use std::error::Error;
|
||||
|
||||
/// Main Docker manager that holds the Docker client and provides all operations
|
||||
@@ -49,14 +52,14 @@ impl DockerManager {
|
||||
/// Finds the Docker container running the agent by image name
|
||||
pub async fn get_client_container(
|
||||
&self,
|
||||
) -> Result<Option<DockerContainerDto>, Box<dyn Error + Send + Sync>> {
|
||||
) -> Result<Option<DockerContainer>, Box<dyn Error + Send + Sync>> {
|
||||
let containers = container::get_available_containers(&self.docker).await;
|
||||
let client_image = "watcher-agent";
|
||||
|
||||
Ok(containers
|
||||
.into_iter()
|
||||
.find(|c| c.image == client_image)
|
||||
.map(|container| DockerContainerDto {
|
||||
.find(|c| c.clone().image.unwrap().contains(client_image))
|
||||
.map(|container| DockerContainer {
|
||||
id: container.id,
|
||||
image: container.image,
|
||||
name: container.name,
|
||||
@@ -66,13 +69,20 @@ impl DockerManager {
|
||||
/// Gets the current client version (image name) if running in Docker
|
||||
pub async fn get_client_version(&self) -> String {
|
||||
match self.get_client_container().await {
|
||||
Ok(Some(container)) => container.image,
|
||||
Ok(Some(container)) => container
|
||||
.image
|
||||
.clone()
|
||||
.unwrap()
|
||||
.split(':')
|
||||
.next()
|
||||
.unwrap_or("unknown")
|
||||
.to_string(),
|
||||
Ok(None) => {
|
||||
eprintln!("Warning: No WatcherAgent container found");
|
||||
println!("Warning: No WatcherAgent container found");
|
||||
"unknown".to_string()
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Warning: Could not get current image version: {}", e);
|
||||
println!("Warning: Could not get current image version: {}", e);
|
||||
"unknown".to_string()
|
||||
}
|
||||
}
|
||||
@@ -87,14 +97,14 @@ impl DockerManager {
|
||||
}
|
||||
|
||||
/// Gets all available containers as DTOs for registration
|
||||
pub async fn get_containers_for_registration(
|
||||
pub async fn get_containers(
|
||||
&self,
|
||||
) -> Result<Vec<DockerContainerDto>, Box<dyn Error + Send + Sync>> {
|
||||
) -> Result<Vec<DockerContainer>, Box<dyn Error + Send + Sync>> {
|
||||
let containers = container::get_available_containers(&self.docker).await;
|
||||
|
||||
Ok(containers
|
||||
.into_iter()
|
||||
.map(|container| DockerContainerDto {
|
||||
.map(|container| DockerContainer {
|
||||
id: container.id,
|
||||
image: container.image,
|
||||
name: container.name,
|
||||
@@ -102,61 +112,6 @@ impl DockerManager {
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Gets container metrics for all containers
|
||||
pub async fn get_container_metrics(
|
||||
&self,
|
||||
) -> Result<Vec<DockerContainerMetricDto>, Box<dyn Error + Send + Sync>> {
|
||||
let containers = container::get_available_containers(&self.docker).await;
|
||||
let mut metrics = Vec::new();
|
||||
|
||||
for container in containers {
|
||||
// Get network stats (you'll need to implement this in container.rs)
|
||||
let network_stats = container::get_network_stats(&self.docker, &container.id).await?;
|
||||
// Get CPU stats (you'll need to implement this in container.rs)
|
||||
let cpu_stats = container::get_cpu_stats(&self.docker, &container.id).await?;
|
||||
|
||||
// Get current status by inspecting the container
|
||||
let status = match self
|
||||
.docker
|
||||
.inspect_container(&container.id, None::<InspectContainerOptions>)
|
||||
.await
|
||||
{
|
||||
Ok(container_info) => {
|
||||
// Extract status from container state and convert to string
|
||||
container_info
|
||||
.state
|
||||
.and_then(|state| state.status)
|
||||
.map(|status_enum| {
|
||||
match status_enum {
|
||||
bollard::models::ContainerStateStatusEnum::CREATED => "created",
|
||||
bollard::models::ContainerStateStatusEnum::RUNNING => "running",
|
||||
bollard::models::ContainerStateStatusEnum::PAUSED => "paused",
|
||||
bollard::models::ContainerStateStatusEnum::RESTARTING => {
|
||||
"restarting"
|
||||
}
|
||||
bollard::models::ContainerStateStatusEnum::REMOVING => "removing",
|
||||
bollard::models::ContainerStateStatusEnum::EXITED => "exited",
|
||||
bollard::models::ContainerStateStatusEnum::DEAD => "dead",
|
||||
bollard::secret::ContainerStateStatusEnum::EMPTY => todo!(),
|
||||
}
|
||||
.to_string()
|
||||
})
|
||||
.unwrap_or_else(|| "unknown".to_string())
|
||||
}
|
||||
Err(_) => "unknown".to_string(),
|
||||
};
|
||||
|
||||
metrics.push(DockerContainerMetricDto {
|
||||
id: container.id,
|
||||
status: status,
|
||||
network: network_stats,
|
||||
cpu: cpu_stats,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(metrics)
|
||||
}
|
||||
|
||||
/// Gets the number of running containers
|
||||
pub async fn get_container_count(&self) -> Result<usize, Box<dyn Error + Send + Sync>> {
|
||||
let containers = container::get_available_containers(&self.docker).await;
|
||||
@@ -171,21 +126,98 @@ impl DockerManager {
|
||||
container::restart_container(&self.docker, container_id).await
|
||||
}
|
||||
|
||||
/// Gets total network statistics across all containers
|
||||
pub async fn get_total_network_stats(
|
||||
/// Collects Docker metrics for all containers
|
||||
pub async fn collect_metrics(&self) -> Result<DockerMetricDto, Box<dyn Error + Send + Sync>> {
|
||||
let containers = self.get_containers().await?;
|
||||
let (cpu_stats, net_stats, mem_stats) = stats::get_container_stats(&self.docker).await?;
|
||||
|
||||
let container_infos_total: Vec<_> = containers
|
||||
.into_iter()
|
||||
.map(|container| {
|
||||
let cpu = cpu_stats
|
||||
.iter()
|
||||
.find(|c| c.container_id == Some(container.id.clone()))
|
||||
.cloned();
|
||||
let network = net_stats
|
||||
.iter()
|
||||
.find(|n| n.container_id == Some(container.id.clone()))
|
||||
.cloned();
|
||||
let ram = mem_stats
|
||||
.iter()
|
||||
.find(|m| m.container_id == Some(container.id.clone()))
|
||||
.cloned();
|
||||
|
||||
DockerContainerInfo {
|
||||
container: Some(container),
|
||||
status: None, // Status can be fetched if needed
|
||||
cpu,
|
||||
network,
|
||||
ram,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let container_infos: Vec<DockerCollectMetricDto> = container_infos_total
|
||||
.into_iter()
|
||||
.map(|info| DockerCollectMetricDto {
|
||||
id: Some(info.container.unwrap().id).unwrap_or("".to_string()),
|
||||
cpu: info
|
||||
.cpu
|
||||
.unwrap()
|
||||
.cpu_usage_percent
|
||||
.map(|load| DockerContainerCpuDto {
|
||||
cpu_load: Some(load),
|
||||
})
|
||||
.unwrap_or(DockerContainerCpuDto { cpu_load: None }),
|
||||
ram: info
|
||||
.ram
|
||||
.unwrap()
|
||||
.memory_usage_percent
|
||||
.map(|load| DockerContainerRamDto {
|
||||
cpu_load: Some(load),
|
||||
})
|
||||
.unwrap_or(DockerContainerRamDto { cpu_load: None }),
|
||||
network: DockerContainerNetworkDto {
|
||||
net_in: info
|
||||
.network
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.rx_bytes
|
||||
.map(|bytes| bytes as f64)
|
||||
.or(Some(0.0)),
|
||||
net_out: info
|
||||
.network
|
||||
.unwrap()
|
||||
.tx_bytes
|
||||
.map(|bytes| bytes as f64)
|
||||
.or(Some(0.0)),
|
||||
},
|
||||
})
|
||||
.collect();
|
||||
let dto = DockerMetricDto {
|
||||
server_id: 0,
|
||||
containers: serde_json::to_string(&container_infos)?,
|
||||
};
|
||||
|
||||
Ok(dto)
|
||||
}
|
||||
|
||||
pub async fn create_registration_dto(
|
||||
&self,
|
||||
) -> Result<(u64, u64), Box<dyn Error + Send + Sync>> {
|
||||
let metrics = self.get_container_metrics().await?;
|
||||
) -> Result<DockerRegistrationDto, Box<dyn Error + Send + Sync>> {
|
||||
let containers = self.get_containers().await?;
|
||||
let dto = DockerRegistrationDto {
|
||||
server_id: 0,
|
||||
//container_count,
|
||||
containers: serde_json::to_string(&containers)?,
|
||||
};
|
||||
|
||||
let net_in_total: u64 = metrics.iter().map(|m| m.network.rx_bytes).sum();
|
||||
let net_out_total: u64 = metrics.iter().map(|m| m.network.tx_bytes).sum();
|
||||
|
||||
Ok((net_in_total, net_out_total))
|
||||
Ok(dto)
|
||||
}
|
||||
}
|
||||
|
||||
// Keep these as utility functions if needed, but they should use DockerManager internally
|
||||
impl DockerContainerDto {
|
||||
impl DockerContainer {
|
||||
/// Returns the container ID
|
||||
pub fn id(&self) -> &str {
|
||||
&self.id
|
||||
@@ -193,11 +225,11 @@ impl DockerContainerDto {
|
||||
|
||||
/// Returns the image name
|
||||
pub fn image(&self) -> &str {
|
||||
&self.image
|
||||
&self.image.as_deref().unwrap_or("unknown")
|
||||
}
|
||||
|
||||
/// Returns the container name
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
&self.name.as_deref().unwrap_or("unknown")
|
||||
}
|
||||
}
|
||||
|
@@ -5,7 +5,7 @@
|
||||
use crate::models::ServerMessage;
|
||||
|
||||
use super::container::{restart_container, update_docker_image};
|
||||
use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions};
|
||||
//use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions};
|
||||
use bollard::Docker;
|
||||
use std::error::Error;
|
||||
|
||||
@@ -40,7 +40,7 @@ pub async fn handle_server_message(
|
||||
if let Some(image_name) = msg.data.get("image").and_then(|v| v.as_str()) {
|
||||
println!("Received restart command for image: {}", image_name);
|
||||
// Call your update_docker_image function here
|
||||
update_docker_image(docker, image_name).await?;
|
||||
restart_container(docker, image_name).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err("Missing image name in update message".into())
|
||||
|
@@ -1,206 +0,0 @@
|
||||
use bollard::query_parameters::{ListContainersOptions, StatsOptions};
|
||||
use bollard::Docker;
|
||||
use futures_util::stream::TryStreamExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::error::Error;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ContainerCpuInfo {
|
||||
pub container_id: String,
|
||||
pub cpu_usage_percent: f64,
|
||||
pub system_cpu_usage: u64,
|
||||
pub container_cpu_usage: u64,
|
||||
pub online_cpus: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ContainerNetworkInfo {
|
||||
pub container_id: String,
|
||||
pub rx_bytes: u64,
|
||||
pub tx_bytes: u64,
|
||||
pub rx_packets: u64,
|
||||
pub tx_packets: u64,
|
||||
pub rx_errors: u64,
|
||||
pub tx_errors: u64,
|
||||
}
|
||||
|
||||
/// Get container statistics for all containers using an existing Docker client
|
||||
pub async fn get_container_stats(
|
||||
docker: &Docker,
|
||||
) -> Result<(Vec<ContainerCpuInfo>, Vec<ContainerNetworkInfo>), Box<dyn Error + Send + Sync>> {
|
||||
let containers = docker
|
||||
.list_containers(Some(ListContainersOptions {
|
||||
all: true,
|
||||
..Default::default()
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let mut cpu_infos = Vec::new();
|
||||
let mut net_infos = Vec::new();
|
||||
|
||||
for container in containers {
|
||||
let id = container.id.unwrap_or_default();
|
||||
|
||||
// Skip if no ID
|
||||
if id.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut stats_stream = docker.stats(
|
||||
&id,
|
||||
Some(StatsOptions {
|
||||
stream: false,
|
||||
one_shot: true,
|
||||
}),
|
||||
);
|
||||
|
||||
if let Some(stats) = stats_stream.try_next().await? {
|
||||
// CPU Info
|
||||
if let (Some(cpu_stats), Some(precpu_stats)) = (&stats.cpu_stats, &stats.precpu_stats) {
|
||||
if let (Some(cpu_usage), Some(pre_cpu_usage)) =
|
||||
(&cpu_stats.cpu_usage, &precpu_stats.cpu_usage)
|
||||
{
|
||||
let cpu_delta = cpu_usage
|
||||
.total_usage
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(pre_cpu_usage.total_usage.unwrap_or(0));
|
||||
|
||||
let system_delta = cpu_stats
|
||||
.system_cpu_usage
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(precpu_stats.system_cpu_usage.unwrap_or(0));
|
||||
|
||||
let online_cpus = cpu_stats.online_cpus.unwrap_or(1);
|
||||
|
||||
let cpu_percent = if system_delta > 0 && online_cpus > 0 {
|
||||
(cpu_delta as f64 / system_delta as f64) * online_cpus as f64 * 100.0
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
cpu_infos.push(ContainerCpuInfo {
|
||||
container_id: id.clone(),
|
||||
cpu_usage_percent: cpu_percent,
|
||||
system_cpu_usage: cpu_stats.system_cpu_usage.unwrap_or(0),
|
||||
container_cpu_usage: cpu_usage.total_usage.unwrap_or(0),
|
||||
online_cpus,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Network Info
|
||||
if let Some(networks) = stats.networks {
|
||||
for (_name, net) in networks {
|
||||
net_infos.push(ContainerNetworkInfo {
|
||||
container_id: id.clone(),
|
||||
rx_bytes: net.rx_bytes.unwrap(),
|
||||
tx_bytes: net.tx_bytes.unwrap(),
|
||||
rx_packets: net.rx_packets.unwrap(),
|
||||
tx_packets: net.tx_packets.unwrap(),
|
||||
rx_errors: net.rx_errors.unwrap(),
|
||||
tx_errors: net.tx_errors.unwrap(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((cpu_infos, net_infos))
|
||||
}
|
||||
|
||||
/// Get container statistics for a specific container
|
||||
pub async fn get_single_container_stats(
|
||||
docker: &Docker,
|
||||
container_id: &str,
|
||||
) -> Result<(Option<ContainerCpuInfo>, Option<ContainerNetworkInfo>), Box<dyn Error + Send + Sync>>
|
||||
{
|
||||
let mut stats_stream = docker.stats(
|
||||
container_id,
|
||||
Some(StatsOptions {
|
||||
stream: false,
|
||||
one_shot: true,
|
||||
}),
|
||||
);
|
||||
|
||||
if let Some(stats) = stats_stream.try_next().await? {
|
||||
let mut cpu_info = None;
|
||||
let mut net_info = None;
|
||||
|
||||
// CPU Info
|
||||
if let (Some(cpu_stats), Some(precpu_stats)) = (&stats.cpu_stats, &stats.precpu_stats) {
|
||||
if let (Some(cpu_usage), Some(pre_cpu_usage)) =
|
||||
(&cpu_stats.cpu_usage, &precpu_stats.cpu_usage)
|
||||
{
|
||||
let cpu_delta = cpu_usage
|
||||
.total_usage
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(pre_cpu_usage.total_usage.unwrap_or(0));
|
||||
|
||||
let system_delta = cpu_stats
|
||||
.system_cpu_usage
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(precpu_stats.system_cpu_usage.unwrap_or(0));
|
||||
|
||||
let online_cpus = cpu_stats.online_cpus.unwrap_or(1);
|
||||
|
||||
let cpu_percent = if system_delta > 0 && online_cpus > 0 {
|
||||
(cpu_delta as f64 / system_delta as f64) * online_cpus as f64 * 100.0
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
cpu_info = Some(ContainerCpuInfo {
|
||||
container_id: container_id.to_string(),
|
||||
cpu_usage_percent: cpu_percent,
|
||||
system_cpu_usage: cpu_stats.system_cpu_usage.unwrap_or(0),
|
||||
container_cpu_usage: cpu_usage.total_usage.unwrap_or(0),
|
||||
online_cpus,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Network Info
|
||||
if let Some(networks) = stats.networks {
|
||||
// Take the first network interface (usually eth0)
|
||||
if let Some((_name, net)) = networks.into_iter().next() {
|
||||
net_info = Some(ContainerNetworkInfo {
|
||||
container_id: container_id.to_string(),
|
||||
rx_bytes: net.rx_bytes.unwrap(),
|
||||
tx_bytes: net.tx_bytes.unwrap(),
|
||||
rx_packets: net.rx_packets.unwrap(),
|
||||
tx_packets: net.tx_packets.unwrap(),
|
||||
rx_errors: net.rx_errors.unwrap(),
|
||||
tx_errors: net.tx_errors.unwrap(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok((cpu_info, net_info))
|
||||
} else {
|
||||
Ok((None, None))
|
||||
}
|
||||
}
|
||||
|
||||
/// Get total network statistics across all containers
|
||||
pub async fn get_total_network_stats(
|
||||
docker: &Docker,
|
||||
) -> Result<(u64, u64), Box<dyn Error + Send + Sync>> {
|
||||
let (_, net_infos) = get_container_stats(docker).await?;
|
||||
|
||||
let total_rx: u64 = net_infos.iter().map(|net| net.rx_bytes).sum();
|
||||
let total_tx: u64 = net_infos.iter().map(|net| net.tx_bytes).sum();
|
||||
|
||||
Ok((total_rx, total_tx))
|
||||
}
|
||||
|
||||
/// Get average CPU usage across all containers
|
||||
pub async fn get_average_cpu_usage(docker: &Docker) -> Result<f64, Box<dyn Error + Send + Sync>> {
|
||||
let (cpu_infos, _) = get_container_stats(docker).await?;
|
||||
|
||||
if cpu_infos.is_empty() {
|
||||
return Ok(0.0);
|
||||
}
|
||||
|
||||
let total_cpu: f64 = cpu_infos.iter().map(|cpu| cpu.cpu_usage_percent).sum();
|
||||
Ok(total_cpu / cpu_infos.len() as f64)
|
||||
}
|
99
WatcherAgent/src/docker/stats/cpu.rs
Normal file
99
WatcherAgent/src/docker/stats/cpu.rs
Normal file
@@ -0,0 +1,99 @@
|
||||
use super::ContainerCpuInfo;
|
||||
use bollard::query_parameters::{ListContainersOptions, StatsOptions};
|
||||
use bollard::Docker;
|
||||
use futures_util::stream::TryStreamExt;
|
||||
use std::error::Error;
|
||||
|
||||
/// Get CPU statistics for all containers
|
||||
pub async fn get_all_containers_cpu_stats(
|
||||
docker: &Docker,
|
||||
) -> Result<Vec<ContainerCpuInfo>, Box<dyn Error + Send + Sync>> {
|
||||
let containers = docker
|
||||
.list_containers(Some(ListContainersOptions {
|
||||
all: true,
|
||||
..Default::default()
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let mut cpu_infos = Vec::new();
|
||||
|
||||
for container in containers {
|
||||
let id = container.id.unwrap_or_default();
|
||||
|
||||
// Skip if no ID
|
||||
if id.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(cpu_info) = get_single_container_cpu_stats(docker, &id).await? {
|
||||
cpu_infos.push(cpu_info);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cpu_infos)
|
||||
}
|
||||
|
||||
/// Get CPU statistics for a specific container
|
||||
pub async fn get_single_container_cpu_stats(
|
||||
docker: &Docker,
|
||||
container_id: &str,
|
||||
) -> Result<Option<ContainerCpuInfo>, Box<dyn Error + Send + Sync>> {
|
||||
let mut stats_stream = docker.stats(
|
||||
container_id,
|
||||
Some(StatsOptions {
|
||||
stream: false,
|
||||
one_shot: true,
|
||||
}),
|
||||
);
|
||||
|
||||
if let Some(stats) = stats_stream.try_next().await? {
|
||||
if let (Some(cpu_stats), Some(precpu_stats)) = (&stats.cpu_stats, &stats.precpu_stats) {
|
||||
if let (Some(cpu_usage), Some(pre_cpu_usage)) =
|
||||
(&cpu_stats.cpu_usage, &precpu_stats.cpu_usage)
|
||||
{
|
||||
let cpu_delta = cpu_usage
|
||||
.total_usage
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(pre_cpu_usage.total_usage.unwrap_or(0));
|
||||
|
||||
let system_delta = cpu_stats
|
||||
.system_cpu_usage
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(precpu_stats.system_cpu_usage.unwrap_or(0));
|
||||
|
||||
let online_cpus = cpu_stats.online_cpus.unwrap_or(1);
|
||||
|
||||
let cpu_percent = if system_delta > 0 && online_cpus > 0 {
|
||||
(cpu_delta as f64 / system_delta as f64) * online_cpus as f64 * 100.0
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
return Ok(Some(ContainerCpuInfo {
|
||||
container_id: Some(container_id.to_string()),
|
||||
cpu_usage_percent: Some(cpu_percent),
|
||||
system_cpu_usage: Some(cpu_stats.system_cpu_usage.unwrap_or(0)),
|
||||
container_cpu_usage: Some(cpu_usage.total_usage.unwrap_or(0)),
|
||||
online_cpus: Some(online_cpus),
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Get average CPU usage across all containers
|
||||
pub async fn get_average_cpu_usage(docker: &Docker) -> Result<f64, Box<dyn Error + Send + Sync>> {
|
||||
let cpu_infos = get_all_containers_cpu_stats(docker).await?;
|
||||
|
||||
if cpu_infos.is_empty() {
|
||||
return Ok(0.0);
|
||||
}
|
||||
|
||||
let total_cpu: f64 = cpu_infos
|
||||
.iter()
|
||||
.map(|cpu| cpu.cpu_usage_percent.unwrap())
|
||||
.sum();
|
||||
Ok(total_cpu / cpu_infos.len() as f64)
|
||||
}
|
90
WatcherAgent/src/docker/stats/mod.rs
Normal file
90
WatcherAgent/src/docker/stats/mod.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
pub mod cpu;
|
||||
pub mod network;
|
||||
pub mod ram;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ContainerCpuInfo {
|
||||
pub container_id: Option<String>,
|
||||
pub cpu_usage_percent: Option<f64>,
|
||||
pub system_cpu_usage: Option<u64>,
|
||||
pub container_cpu_usage: Option<u64>,
|
||||
pub online_cpus: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ContainerNetworkInfo {
|
||||
pub container_id: Option<String>,
|
||||
pub rx_bytes: Option<u64>,
|
||||
pub tx_bytes: Option<u64>,
|
||||
pub rx_packets: Option<u64>,
|
||||
pub tx_packets: Option<u64>,
|
||||
pub rx_errors: Option<u64>,
|
||||
pub tx_errors: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct ContainerMemoryInfo {
|
||||
pub container_id: Option<String>,
|
||||
pub memory_usage: Option<u64>,
|
||||
pub memory_limit: Option<u64>,
|
||||
pub memory_usage_percent: Option<f64>,
|
||||
}
|
||||
|
||||
use bollard::Docker;
|
||||
use std::error::Error;
|
||||
|
||||
/// Get container statistics for all containers using an existing Docker client
|
||||
pub async fn get_container_stats(
|
||||
docker: &Docker,
|
||||
) -> Result<
|
||||
(
|
||||
Vec<ContainerCpuInfo>,
|
||||
Vec<ContainerNetworkInfo>,
|
||||
Vec<ContainerMemoryInfo>,
|
||||
),
|
||||
Box<dyn Error + Send + Sync>,
|
||||
> {
|
||||
let cpu_infos = cpu::get_all_containers_cpu_stats(docker).await?;
|
||||
let net_infos = network::get_all_containers_network_stats(docker).await?;
|
||||
let mem_infos = ram::get_all_containers_memory_stats(docker).await?;
|
||||
|
||||
Ok((cpu_infos, net_infos, mem_infos))
|
||||
}
|
||||
|
||||
/// Get container statistics for a specific container
|
||||
pub async fn get_single_container_stats(
|
||||
docker: &Docker,
|
||||
container_id: &str,
|
||||
) -> Result<
|
||||
(
|
||||
Option<ContainerCpuInfo>,
|
||||
Option<ContainerNetworkInfo>,
|
||||
Option<ContainerMemoryInfo>,
|
||||
),
|
||||
Box<dyn Error + Send + Sync>,
|
||||
> {
|
||||
let cpu_info = cpu::get_single_container_cpu_stats(docker, container_id).await?;
|
||||
let net_info = network::get_single_container_network_stats(docker, container_id).await?;
|
||||
let mem_info = ram::get_single_container_memory_stats(docker, container_id).await?;
|
||||
|
||||
Ok((cpu_info, net_info, mem_info))
|
||||
}
|
||||
|
||||
/// Get total network statistics across all containers
|
||||
pub async fn get_total_network_stats(
|
||||
docker: &Docker,
|
||||
) -> Result<(u64, u64), Box<dyn Error + Send + Sync>> {
|
||||
network::get_total_network_stats(docker).await
|
||||
}
|
||||
|
||||
/// Get average CPU usage across all containers
|
||||
pub async fn get_average_cpu_usage(docker: &Docker) -> Result<f64, Box<dyn Error + Send + Sync>> {
|
||||
cpu::get_average_cpu_usage(docker).await
|
||||
}
|
||||
|
||||
/// Get total memory usage across all containers
|
||||
pub async fn get_total_memory_usage(docker: &Docker) -> Result<u64, Box<dyn Error + Send + Sync>> {
|
||||
ram::get_total_memory_usage(docker).await
|
||||
}
|
79
WatcherAgent/src/docker/stats/network.rs
Normal file
79
WatcherAgent/src/docker/stats/network.rs
Normal file
@@ -0,0 +1,79 @@
|
||||
use super::ContainerNetworkInfo;
|
||||
use bollard::query_parameters::{ListContainersOptions, StatsOptions};
|
||||
use bollard::Docker;
|
||||
use futures_util::stream::TryStreamExt;
|
||||
use std::error::Error;
|
||||
|
||||
/// Get network statistics for all containers
|
||||
pub async fn get_all_containers_network_stats(
|
||||
docker: &Docker,
|
||||
) -> Result<Vec<ContainerNetworkInfo>, Box<dyn Error + Send + Sync>> {
|
||||
let containers = docker
|
||||
.list_containers(Some(ListContainersOptions {
|
||||
all: true,
|
||||
..Default::default()
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let mut net_infos = Vec::new();
|
||||
|
||||
for container in containers {
|
||||
let id = container.id.unwrap_or_default();
|
||||
|
||||
// Skip if no ID
|
||||
if id.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(net_info) = get_single_container_network_stats(docker, &id).await? {
|
||||
net_infos.push(net_info);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(net_infos)
|
||||
}
|
||||
|
||||
/// Get network statistics for a specific container
|
||||
pub async fn get_single_container_network_stats(
|
||||
docker: &Docker,
|
||||
container_id: &str,
|
||||
) -> Result<Option<ContainerNetworkInfo>, Box<dyn Error + Send + Sync>> {
|
||||
let mut stats_stream = docker.stats(
|
||||
container_id,
|
||||
Some(StatsOptions {
|
||||
stream: false,
|
||||
one_shot: true,
|
||||
}),
|
||||
);
|
||||
|
||||
if let Some(stats) = stats_stream.try_next().await? {
|
||||
if let Some(networks) = stats.networks {
|
||||
// Take the first network interface (usually eth0)
|
||||
if let Some((_name, net)) = networks.into_iter().next() {
|
||||
return Ok(Some(ContainerNetworkInfo {
|
||||
container_id: Some(container_id.to_string()),
|
||||
rx_bytes: net.rx_bytes,
|
||||
tx_bytes: net.tx_bytes,
|
||||
rx_packets: net.rx_packets,
|
||||
tx_packets: net.tx_packets,
|
||||
rx_errors: net.rx_errors,
|
||||
tx_errors: net.tx_errors,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Get total network statistics across all containers
|
||||
pub async fn get_total_network_stats(
|
||||
docker: &Docker,
|
||||
) -> Result<(u64, u64), Box<dyn Error + Send + Sync>> {
|
||||
let net_infos = get_all_containers_network_stats(docker).await?;
|
||||
|
||||
let total_rx: u64 = net_infos.iter().map(|net| net.rx_bytes.unwrap()).sum();
|
||||
let total_tx: u64 = net_infos.iter().map(|net| net.tx_bytes.unwrap()).sum();
|
||||
|
||||
Ok((total_rx, total_tx))
|
||||
}
|
77
WatcherAgent/src/docker/stats/ram.rs
Normal file
77
WatcherAgent/src/docker/stats/ram.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use super::ContainerMemoryInfo;
|
||||
use bollard::query_parameters::{ListContainersOptions, StatsOptions};
|
||||
use bollard::Docker;
|
||||
use futures_util::stream::TryStreamExt;
|
||||
use std::error::Error;
|
||||
|
||||
/// Get memory statistics for all containers
|
||||
pub async fn get_all_containers_memory_stats(
|
||||
docker: &Docker,
|
||||
) -> Result<Vec<ContainerMemoryInfo>, Box<dyn Error + Send + Sync>> {
|
||||
let containers = docker
|
||||
.list_containers(Some(ListContainersOptions {
|
||||
all: true,
|
||||
..Default::default()
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let mut mem_infos = Vec::new();
|
||||
|
||||
for container in containers {
|
||||
let id = container.id.unwrap_or_default();
|
||||
|
||||
// Skip if no ID
|
||||
if id.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(mem_info) = get_single_container_memory_stats(docker, &id).await? {
|
||||
mem_infos.push(mem_info);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(mem_infos)
|
||||
}
|
||||
|
||||
/// Get memory statistics for a specific container
|
||||
pub async fn get_single_container_memory_stats(
|
||||
docker: &Docker,
|
||||
container_id: &str,
|
||||
) -> Result<Option<ContainerMemoryInfo>, Box<dyn Error + Send + Sync>> {
|
||||
let mut stats_stream = docker.stats(
|
||||
container_id,
|
||||
Some(StatsOptions {
|
||||
stream: false,
|
||||
one_shot: true,
|
||||
}),
|
||||
);
|
||||
|
||||
if let Some(stats) = stats_stream.try_next().await? {
|
||||
if let Some(memory_stats) = &stats.memory_stats {
|
||||
let memory_usage = memory_stats.usage.unwrap_or(0);
|
||||
let memory_limit = memory_stats.limit.unwrap_or(1); // Avoid division by zero
|
||||
|
||||
let memory_usage_percent = if memory_limit > 0 {
|
||||
(memory_usage as f64 / memory_limit as f64) * 100.0
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
return Ok(Some(ContainerMemoryInfo {
|
||||
container_id: Some(container_id.to_string()),
|
||||
memory_usage: Some(memory_usage),
|
||||
memory_limit: Some(memory_limit),
|
||||
memory_usage_percent: Some(memory_usage_percent),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Get total memory usage across all containers
|
||||
pub async fn get_total_memory_usage(docker: &Docker) -> Result<u64, Box<dyn Error + Send + Sync>> {
|
||||
let mem_infos = get_all_containers_memory_stats(docker).await?;
|
||||
let total_memory: u64 = mem_infos.iter().map(|mem| mem.memory_usage.unwrap()).sum();
|
||||
Ok(total_memory)
|
||||
}
|
@@ -34,6 +34,7 @@ pub mod models;
|
||||
use bollard::Docker;
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// Awaits a spawned asynchronous task and flattens its nested `Result` type.
|
||||
@@ -93,7 +94,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
Ok((id, ip)) => {
|
||||
println!("Registered with server. ID: {}, IP: {}", id, ip);
|
||||
(id, ip)
|
||||
},
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Fehler bei der Registrierung am Server: {e}");
|
||||
return Err(e);
|
||||
@@ -111,9 +112,22 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
};
|
||||
println!("Client Version: {}", client_version);
|
||||
|
||||
// Prepare Docker registration DTO
|
||||
let container_dto = if let Some(ref docker_manager) = docker_manager {
|
||||
docker_manager.create_registration_dto().await?
|
||||
} else {
|
||||
models::DockerRegistrationDto {
|
||||
server_id: 0,
|
||||
//container_count: 0, --- IGNORE ---
|
||||
containers: "[]".to_string(),
|
||||
}
|
||||
};
|
||||
let _ =
|
||||
api::broadcast_docker_containers(server_url, server_id, &mut container_dto.clone()).await?;
|
||||
|
||||
// Start background tasks
|
||||
// Start server listening for commands (only if Docker is available)
|
||||
let listening_handle = if let Some(docker_manager) = docker_manager {
|
||||
let listening_handle = if let Some(ref docker_manager) = docker_manager {
|
||||
tokio::spawn({
|
||||
let docker = docker_manager.docker.clone();
|
||||
let server_url = server_url.to_string();
|
||||
@@ -136,8 +150,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let metrics_handle = tokio::spawn({
|
||||
let ip = ip.clone();
|
||||
let server_url = server_url.to_string();
|
||||
let docker_manager = docker_manager.as_ref().cloned().unwrap();
|
||||
async move {
|
||||
let mut collector = metrics::Collector::new(server_id, ip);
|
||||
let mut collector = metrics::Collector::new(server_id, ip, docker_manager);
|
||||
collector.run(&server_url).await
|
||||
}
|
||||
});
|
||||
|
@@ -13,10 +13,11 @@ use std::error::Error;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::api;
|
||||
use crate::docker::DockerManager;
|
||||
//use crate::docker::DockerInfo;
|
||||
use crate::hardware::network::NetworkMonitor;
|
||||
use crate::hardware::HardwareInfo;
|
||||
use crate::models::MetricDto;
|
||||
use crate::models::{DockerMetricDto, MetricDto};
|
||||
|
||||
/// Main orchestrator for hardware and network metric collection and reporting.
|
||||
///
|
||||
@@ -27,8 +28,9 @@ use crate::models::MetricDto;
|
||||
/// - `server_id`: Unique server ID assigned by the backend.
|
||||
/// - `ip_address`: IP address of the agent.
|
||||
pub struct Collector {
|
||||
docker_manager: DockerManager,
|
||||
network_monitor: NetworkMonitor,
|
||||
server_id: i32,
|
||||
server_id: u16,
|
||||
ip_address: String,
|
||||
}
|
||||
|
||||
@@ -41,8 +43,9 @@ impl Collector {
|
||||
///
|
||||
/// # Returns
|
||||
/// A new `Collector` ready to collect and report metrics.
|
||||
pub fn new(server_id: i32, ip_address: String) -> Self {
|
||||
pub fn new(server_id: u16, ip_address: String, docker_manager: DockerManager) -> Self {
|
||||
Self {
|
||||
docker_manager,
|
||||
network_monitor: NetworkMonitor::new(),
|
||||
server_id,
|
||||
ip_address,
|
||||
@@ -72,7 +75,16 @@ impl Collector {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let docker_metrics = match self.docker_collect().await {
|
||||
Ok(metrics) => metrics,
|
||||
Err(e) => {
|
||||
eprintln!("Error collecting docker metrics: {}", e);
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
api::send_metrics(base_url, &metrics).await?;
|
||||
api::send_docker_metrics(base_url, &docker_metrics).await?;
|
||||
tokio::time::sleep(Duration::from_secs(20)).await;
|
||||
}
|
||||
}
|
||||
@@ -112,4 +124,14 @@ impl Collector {
|
||||
net_tx: hardware.network.tx_rate.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
|
||||
/// NOTE: This is a compilation-safe stub. Implement the Docker collection using your
|
||||
/// DockerManager API and container helpers when available.
|
||||
pub async fn docker_collect(&self) -> Result<DockerMetricDto, Box<dyn Error + Send + Sync>> {
|
||||
let metrics = self.docker_manager.collect_metrics().await?;
|
||||
Ok(DockerMetricDto {
|
||||
server_id: self.server_id,
|
||||
containers: metrics.containers,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize};
|
||||
#[derive(Serialize, Debug)]
|
||||
pub struct RegistrationDto {
|
||||
#[serde(rename = "id")]
|
||||
pub server_id: i32,
|
||||
pub server_id: u16,
|
||||
#[serde(rename = "ipAddress")]
|
||||
pub ip_address: String,
|
||||
#[serde(rename = "cpuType")]
|
||||
@@ -59,7 +59,7 @@ pub struct RegistrationDto {
|
||||
#[derive(Serialize, Debug)]
|
||||
pub struct MetricDto {
|
||||
#[serde(rename = "serverId")]
|
||||
pub server_id: i32,
|
||||
pub server_id: u16,
|
||||
#[serde(rename = "ipAddress")]
|
||||
pub ip_address: String,
|
||||
#[serde(rename = "cpu_Load")]
|
||||
@@ -116,7 +116,7 @@ pub struct DiskInfoDetailed {
|
||||
/// - `ip_address`: IPv4 or IPv6 address (string)
|
||||
#[derive(Deserialize)]
|
||||
pub struct IdResponse {
|
||||
pub id: i32,
|
||||
pub id: u16,
|
||||
#[serde(rename = "ipAddress")]
|
||||
pub ip_address: String,
|
||||
}
|
||||
@@ -186,22 +186,80 @@ pub struct Acknowledgment {
|
||||
/// - `_net_out`: Network transmit rate in **bytes per second (B/s)**
|
||||
/// - `_cpu_load`: CPU usage as a percentage (**0.0–100.0**)
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainerRegistrationDto {
|
||||
pub server_id: u32,
|
||||
pub containers: Vec<DockerContainerDto>,
|
||||
pub struct DockerRegistrationDto {
|
||||
/// Unique server identifier (integer)
|
||||
pub server_id: u16,
|
||||
/// Number of currently running containers
|
||||
// pub container_count: usize, --- IGNORE ---
|
||||
/// json stringified array of DockerContainer
|
||||
///
|
||||
/// ## Json Example
|
||||
/// json format: [{"id":"234dsf234","image":"nginx:latest","name":"webserver"},...]
|
||||
///
|
||||
/// ## Fields
|
||||
/// id: unique container ID (first 12 hex digits)
|
||||
/// image: docker image name
|
||||
/// name: container name
|
||||
pub containers: String, // Vec<DockerContainer>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainerDto {
|
||||
pub id: String,
|
||||
pub image: String,
|
||||
pub name: String,
|
||||
pub struct DockerMetricDto {
|
||||
pub server_id: u16,
|
||||
/// json stringified array of DockerContainer
|
||||
///
|
||||
/// ## Json Example
|
||||
/// json format: [{"id":"234dsf234","status":"running","image":"nginx:latest","name":"webserver","network":{"net_in":1024,"net_out":2048},"cpu":{"cpu_load":12.5},"ram":{"ram_load":10.0}},...]
|
||||
///
|
||||
/// ## Fields
|
||||
/// id: unique container ID (first 12 hex digits)
|
||||
/// status: "running";"stopped";others
|
||||
/// image: docker image name
|
||||
/// name: container name
|
||||
/// network: network stats
|
||||
/// cpu: cpu stats
|
||||
/// ram: ram stats
|
||||
pub containers: String, // Vec<DockerContainerInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainerMetricDto {
|
||||
|
||||
pub struct DockerCollectMetricDto {
|
||||
pub id: String,
|
||||
pub status: String, // "running";"stopped";others
|
||||
pub network: stats::ContainerNetworkInfo,
|
||||
pub cpu: stats::ContainerCpuInfo,
|
||||
pub cpu: DockerContainerCpuDto,
|
||||
pub ram: DockerContainerRamDto,
|
||||
pub network: DockerContainerNetworkDto,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainerCpuDto {
|
||||
pub cpu_load: Option<f64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainerRamDto {
|
||||
pub cpu_load: Option<f64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
|
||||
pub struct DockerContainerNetworkDto {
|
||||
pub net_in: Option<f64>,
|
||||
pub net_out: Option<f64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainerInfo {
|
||||
pub container: Option<DockerContainer>,
|
||||
pub status: Option<String>, // "running";"stopped";others
|
||||
pub network: Option<stats::ContainerNetworkInfo>,
|
||||
pub cpu: Option<stats::ContainerCpuInfo>,
|
||||
pub ram: Option<stats::ContainerMemoryInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct DockerContainer {
|
||||
pub id: String,
|
||||
pub image: Option<String>,
|
||||
pub name: Option<String>,
|
||||
}
|
||||
|
Reference in New Issue
Block a user