added inital server communcation task

This commit is contained in:
2025-09-27 21:34:30 +02:00
parent 67ebbdaa19
commit 67b24b33aa
4 changed files with 165 additions and 52 deletions

View File

@@ -1,16 +1,15 @@
use crate::serverclientcomm::handle_server_message;
use std::time::Duration;
use crate::hardware::HardwareInfo;
use crate::models::{HeartbeatDto, IdResponse, MetricDto, RegistrationDto};
use crate::models::{HeartbeatDto, IdResponse, MetricDto, RegistrationDto, ServerMessage, Acknowledgment};
use crate::serverclientcomm::handle_server_message;
use anyhow::Result;
use reqwest::{Client, StatusCode};
use std::error::Error;
use tokio::time::sleep;
use bollard::Docker;
use crate::models::ServerMessage;
pub async fn register_with_server(
base_url: &str,
@@ -159,24 +158,84 @@ pub async fn send_metrics(
Ok(())
}
pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(), Box<dyn Error + Send + Sync>>{
pub async fn listening_to_server(docker: &Docker, base_url: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
let url = format!("{}/api/message", base_url);
let client = reqwest::Client::new();
loop {
// Replace with your server endpoint
let resp = reqwest::get(&url)
.await;
// Get message from server
let resp = client.get(&url).send().await;
if let Ok(resp) = resp {
if let Ok(msg) = resp.json::<ServerMessage>().await {
handle_server_message(docker, msg).await;
} else {
eprintln!("Failed to parse message");
match resp {
Ok(response) => {
if response.status().is_success() {
match response.json::<ServerMessage>().await {
Ok(msg) => {
// Acknowledge receipt immediately
if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, "received", "Message received successfully").await {
eprintln!("Failed to send receipt acknowledgment: {}", e);
}
// Handle the message
let result = handle_server_message(docker, msg.clone()).await;
// Send execution result acknowledgment
let (status, details) = match result {
Ok(_) => ("success", "Message executed successfully".to_string()),
Err(e) => ("error", format!("Execution failed: {}", e)),
};
if let Err(e) = send_acknowledgment(&client, base_url, &msg.message_id, status, &details).await {
eprintln!("Failed to send execution acknowledgment: {}", e);
}
}
Err(e) => {
eprintln!("Failed to parse message: {}", e);
}
}
} else if response.status() == reqwest::StatusCode::NO_CONTENT {
// No new messages, continue polling
println!("No new messages from server");
} else {
eprintln!("Server returned error status: {}", response.status());
}
}
Err(e) => {
eprintln!("Failed to reach server: {}", e);
}
} else {
eprintln!("Failed to reach server");
}
// Poll every 5 seconds (or use WebSocket for real-time)
sleep(Duration::from_secs(5)).await;
}
}
async fn send_acknowledgment(
client: &reqwest::Client,
base_url: &str,
message_id: &str,
status: &str,
details: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let ack_url = format!("{}/api/acknowledge", base_url);
let acknowledgment = Acknowledgment {
message_id: message_id.to_string(),
status: status.to_string(),
details: details.to_string(),
};
let response = client
.post(&ack_url)
.json(&acknowledgment)
.send()
.await?;
if response.status().is_success() {
println!("Acknowledgment sent successfully for message {}", message_id);
} else {
eprintln!("Server returned error for acknowledgment: {}", response.status());
}
Ok(())
}

View File

@@ -13,6 +13,9 @@ use std::marker::Send;
use std::marker::Sync;
use std::result::Result;
use tokio::task::JoinHandle;
use bollard::Docker;
use crate::serverclientcomm::{get_current_image};
async fn flatten<T>(
handle: JoinHandle<Result<T, Box<dyn Error + Send + Sync>>>,
@@ -26,14 +29,26 @@ async fn flatten<T>(
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let args: Vec<String> = env::args().collect();
// Initialize Docker client
let docker = Docker::connect_with_local_defaults()
.map_err(|e| format!("Failed to connect to Docker: {}", e))?;
// Get current image version
let client_version = match get_current_image(&docker).await {
Ok(version) => version.unwrap(),
Err(e) => {
eprintln!("Warning: Could not get current image version: {}", e);
"unknown".to_string()
}
};
println!("Client Version: {}", client_version);
let args: Vec<String> = env::args().collect();
// args[0] is the binary name, args[1] is the first actual argument
if args.len() < 2 {
eprintln!("Usage: {} <server-url>", args[0]);
return Err("Missing server URL argument".into());
}
let server_url = &args[1];
println!("Server URL: {:?}", server_url);
@@ -47,6 +62,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
};
// Start background tasks
// Start server listening for commands
let listening_handle = tokio::spawn({
let docker = docker.clone();
let server_url = server_url.to_string();
async move { api::listening_to_server(&docker, &server_url).await }
});
// Start heartbeat in background
let heartbeat_handle = tokio::spawn({
let ip = ip.clone();
@@ -65,14 +87,16 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
}
});
// Warte auf beide Tasks und prüfe explizit auf Fehler
let (heartbeat_handle, metrics_handle) =
tokio::try_join!(flatten(heartbeat_handle), flatten(metrics_handle))?;
// Wait for all tasks and check for errors
let (listening_result, heartbeat_result, metrics_result) = tokio::try_join!(
flatten(listening_handle),
flatten(heartbeat_handle),
flatten(metrics_handle)
)?;
let (heartbeat, metrics) = (heartbeat_handle, metrics_handle);
println!(
"All tasks completed successfully: {:?}, {:?}.",
heartbeat, metrics
"All tasks completed: listening={:?}, heartbeat={:?}, metrics={:?}",
listening_result, heartbeat_result, metrics_result
);
println!("All tasks completed successfully.");

View File

@@ -73,11 +73,17 @@ pub struct HardwareDto {
pub ip_address: String,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "command", content = "data")]
pub enum ServerMessage {
Update(String),
Restart,
#[serde(other)]
Unknown,
#[derive(Debug, Deserialize, Clone)]
pub struct ServerMessage {
// Define your message structure here
pub message_type: String,
pub data: serde_json::Value,
pub message_id: String, // Add an ID for acknowledgment
}
#[derive(Debug, Serialize, Clone)]
pub struct Acknowledgment {
pub message_id: String,
pub status: String, // "success" or "error"
pub details: String,
}

View File

@@ -1,26 +1,46 @@
use crate::models::ServerMessage;
use crate::models::{ServerMessage};
use std::error::Error;
use bollard::Docker;
use bollard::query_parameters::{CreateImageOptions, RestartContainerOptions, InspectContainerOptions};
use futures_util::StreamExt;
pub fn parse_message(raw: &str) -> ServerMessage {
match raw {
"restart" => ServerMessage::Restart,
msg if msg.starts_with("update:") => ServerMessage::Update(msg[7..].to_string()),
_ => ServerMessage::Unknown,
pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) -> Result<(), Box<dyn Error + Send + Sync>> {
let msg = msg.clone();
println!("Handling server message: {:?}", msg);
// Handle different message types
match msg.message_type.as_str() {
"update_image" => {
if let Some(image_name) = msg.data.get("image").and_then(|v| v.as_str()) {
println!("Received update command for image: {}", image_name);
// Call your update_docker_image function here
update_docker_image(docker, image_name).await?;
Ok(())
} else {
Err("Missing image name in update message".into())
}
}
"restart_container" => {
println!("Received restart container command");
// Call your restart_container function here
restart_container(docker).await?;
Ok(())
}
"stop_agent" => {
println!("Received stop agent command");
// Implement graceful shutdown
std::process::exit(0);
}
_ => {
eprintln!("Unknown message type: {}", msg.message_type);
Err(format!("Unknown message type: {}", msg.message_type).into())
}
}
}
pub async fn handle_server_message(docker: &Docker, msg: ServerMessage) {
match msg {
ServerMessage::Update(version) => update_docker_image(docker, &version).await,
ServerMessage::Restart => restart_container(docker).await,
ServerMessage::Unknown => eprintln!("Unknown message"),
}
}
pub async fn update_docker_image(docker: &Docker, image: &str) {
pub async fn update_docker_image(docker: &Docker, image: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
println!("Updating to {}", image);
// 1. Pull new image
@@ -49,10 +69,12 @@ pub async fn update_docker_image(docker: &Docker, image: &str) {
}
// 2. Restart the current container
restart_container(docker).await;
let _ = restart_container(docker).await;
Ok(())
}
pub async fn get_current_image(docker: &Docker) -> Option<String> {
pub async fn get_current_image(docker: &Docker) -> Result<Option<String>, Box<dyn Error + Send + Sync>> {
// Get the current container ID from /proc/self/cgroup
let container_id = match std::fs::read_to_string("/proc/self/cgroup") {
Ok(content) => {
@@ -68,7 +90,7 @@ pub async fn get_current_image(docker: &Docker) -> Option<String> {
}
Err(e) => {
eprintln!("Error reading cgroup file: {}", e);
return None;
return Ok(None);
}
};
@@ -76,23 +98,23 @@ pub async fn get_current_image(docker: &Docker) -> Option<String> {
Some(id) => id,
None => {
eprintln!("Could not find container ID in cgroup");
return None;
return Ok(None);
}
};
// Inspect the current container to get its image
match docker.inspect_container(&container_id, None::<InspectContainerOptions>).await {
Ok(container_info) => {
container_info.config.map(|config| config.image.unwrap_or_else(|| "unknown".to_string()))
Ok(container_info.config.map(|config| config.image.unwrap_or_else(|| "unknown".to_string())))
}
Err(e) => {
eprintln!("Error inspecting container: {}", e);
None
Ok(None)
}
}
}
pub async fn restart_container(docker: &Docker) {
pub async fn restart_container(docker: &Docker) -> Result<(), Box<dyn Error + Send + Sync>> {
if let Ok(container_id) = std::env::var("HOSTNAME") {
println!("Restarting container {}", container_id);
if let Err(e) = docker.restart_container(&container_id, Some(RestartContainerOptions { signal: None, t: Some(0) }))
@@ -103,4 +125,6 @@ pub async fn restart_container(docker: &Docker) {
} else {
eprintln!("No container ID found (HOSTNAME not set?)");
}
Ok(())
}