From c86d828940545c885bd2bcc3c3e7ee3f3233739b Mon Sep 17 00:00:00 2001 From: donpat1to Date: Sat, 10 Jan 2026 00:30:42 +0100 Subject: [PATCH] cleaned up main --- src/corporate/checkpoint_helpers.rs | 10 +- src/corporate/update.rs | 2 +- src/corporate/update_companies_cleanse.rs | 98 ++-- src/main.rs | 522 ++++++++++++---------- src/util/macros.rs | 4 +- 5 files changed, 337 insertions(+), 299 deletions(-) diff --git a/src/corporate/checkpoint_helpers.rs b/src/corporate/checkpoint_helpers.rs index 9ce2eed..9e27424 100644 --- a/src/corporate/checkpoint_helpers.rs +++ b/src/corporate/checkpoint_helpers.rs @@ -7,14 +7,12 @@ use super::types::CompanyCrossPlatformInfo; use crate::util::logger; use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use serde::Serialize; -use tokio::fs::{File, OpenOptions}; +use std::path::{Path}; +use chrono::{DateTime, Duration, Utc}; +use serde::{Deserialize, Serialize}; +use tokio::fs::{File}; use tokio::io::{AsyncWriteExt}; use anyhow::Result; -use tokio::sync::mpsc; /// Load companies from checkpoint and replay log for recovery /// diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 5144235..722dc42 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -15,7 +15,7 @@ use crate::scraper::yahoo::{YahooClientPool}; use std::result::Result::Ok; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool}; /// Main corporate update entry point with shutdown awareness pub async fn run_full_update( diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index a495761..4d828a2 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -7,7 +7,7 @@ use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; use std::result::Result::Ok; -use chrono::{Local, Utc}; +use chrono::{Utc}; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -191,12 +191,42 @@ pub async fn companies_yahoo_cleansed_low_profile( let input_path = data_path.join("companies_yahoo.jsonl"); let checkpoint_path = data_path.join("companies_yahoo_cleaned.jsonl"); let log_path = data_path.join("companies_updates.log"); + let state_path = data_path.join("state.jsonl"); // Check input exists if !input_path.exists() { logger::log_warn(" companies_yahoo.jsonl not found, skipping low profile cleansing").await; return Ok(0); } + + if state_path.exists() { + let state_content = tokio::fs::read_to_string(&state_path).await?; + + for line in state_content.lines() { + if line.trim().is_empty() { + continue; + } + + if let Ok(state) = serde_json::from_str::(line) { + if state.get("yahoo_companies_cleansed_low_profile").and_then(|v| v.as_bool()).unwrap_or(false) { + logger::log_info(" Yahoo low profile cleansing already completed, reading existing file...").await; + + if checkpoint_path.exists() { + let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?; + let count = checkpoint_content.lines() + .filter(|line| !line.trim().is_empty()) + .count(); + + logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo_cleaned.jsonl", count)).await; + return Ok(count); + } else { + logger::log_warn(" State indicates completion but companies_yahoo_cleaned.jsonl not found, re-running...").await; + break; + } + } + } + } + } // === RECOVERY PHASE: Load checkpoint + replay log === let mut existing_companies: HashMap = HashMap::new(); @@ -616,7 +646,26 @@ pub async fn companies_yahoo_cleansed_low_profile( // Shutdown Yahoo pool yahoo_pool.shutdown().await?; - Ok(final_valid) + // Write completion milestone to state.jsonl + let state_path = data_path.join("state.jsonl"); + let yahoo_low_profile = json!({ + "yahoo_companies_cleansed_low_profile": true, + "completed_at": chrono::Utc::now().to_rfc3339(), + }); + + let mut state_file = OpenOptions::new() + .create(true) + .append(true) + .open(&state_path) + .await?; + let state_line = serde_json::to_string(&yahoo_low_profile)?; + state_file.write_all(state_line.as_bytes()).await?; + state_file.write_all(b"\n").await?; + state_file.flush().await?; + + logger::log_info(&format!(" ✓ State milestone saved to: {:?}", state_path)).await; + + Ok(final_count) } /// Helper function to spawn a validation task (reduces code duplication) @@ -911,54 +960,9 @@ async fn save_company_core_data( Ok(()) } -pub struct ProcessResult { - pub changes: Vec, -} -pub fn process_batch( - new_events: &[CompanyEvent], - existing: &mut HashMap, - today: &str, -) -> ProcessResult { - let mut changes = Vec::new(); - for new in new_events { - let key = event_key(new); - if let Some(old) = existing.get(&key) { - changes.extend(detect_changes(old, new, today)); - existing.insert(key, new.clone()); - continue; - } - - let date_key = format!("{}|{}", new.ticker, new.date); - let mut found_old = None; - for (k, e) in existing.iter() { - if format!("{}|{}", e.ticker, e.date) == date_key && k != &key { - found_old = Some((k.clone(), e.clone())); - break; - } - } - - if let Some((old_key, old_event)) = found_old { - if new.date.as_str() > today { - changes.push(CompanyEventChange { - ticker: new.ticker.clone(), - date: new.date.clone(), - field_changed: "time".to_string(), - old_value: old_event.time.clone(), - new_value: new.time.clone(), - detected_at: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), - }); - } - existing.remove(&old_key); - } - - existing.insert(key, new.clone()); - } - - ProcessResult { changes } -} /// Check if a company needs processing (validation check) fn company_needs_processing( diff --git a/src/main.rs b/src/main.rs index c05f0a8..20d378d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,46 +1,181 @@ -// src/main.rs - FIXED: Proper temp pool cleanup - +// src/main.rs - Cleaned up version with extracted helpers use web_scraper::{*, scraper, corporate}; - -use anyhow::Result; +use crate::check_shutdown; +use anyhow::{Result}; use web_scraper::config::Config; use scraper::docker_vpn_proxy::{DockerVpnProxyPool, cleanup_all_proxy_containers}; use scraper::webdriver::ChromeDriverPool; use util::directories::DataPaths; use util::{logger, opnv}; +use std::fs::{OpenOptions}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::process::Command; +use std::time::{Duration, Instant}; -#[tokio::main] -async fn main() -> Result<()> { - let output = if cfg!(target_os = "windows") { - Command::new("cmd") +// ============================================================================ +// HELPER FUNCTIONS - Extracted to reduce duplication +// ============================================================================ + +/// Start Docker Desktop on Windows +async fn start_docker_desktop() { + if cfg!(target_os = "windows") { + let _ = Command::new("cmd") .args(["/C", "docker desktop start"]) - .output() - .expect("failed to execute process") - } else { - Command::new("sh") - .arg("-c") - .arg("echo hello") - .output() - .expect("failed to execute process") - }; - let _start_docker_desktop = output.stdout; - - cleanup_all_proxy_containers().await.ok(); + .output(); + } +} - let config = match Config::load() { - Ok(cfg) => cfg, - Err(_) => { - eprintln!("Using default configuration"); - Config::default() +/// Shutdown ChromeDriver pool with error handling +async fn shutdown_chrome_pool(pool: &ChromeDriverPool) { + logger::log_info("Shutting down ChromeDriver pool...").await; + match pool.shutdown().await { + Ok(()) => logger::log_info("✓ ChromeDriver pool shut down successfully").await, + Err(e) => logger::log_error(&format!("✗ Pool shutdown error: {}", e)).await, + } +} + +/// Shutdown Docker VPN proxy pool with error handling +async fn shutdown_proxy_pool(proxy_pool: &DockerVpnProxyPool) { + logger::log_info("Stopping Docker VPN proxy containers...").await; + match proxy_pool.shutdown().await { + Ok(()) => logger::log_info("✓ All Docker VPN containers stopped").await, + Err(e) => logger::log_error(&format!("✗ Proxy shutdown error: {}", e)).await, + } +} + +/// Force-kill Chrome and ChromeDriver processes (Windows only) +#[cfg(target_os = "windows")] +async fn force_kill_chrome_processes() { + logger::log_info("Force-killing any remaining Chrome processes...").await; + let _ = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chrome.exe"]) + .output() + .await; + let _ = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chromedriver.exe"]) + .output() + .await; +} + +#[cfg(not(target_os = "windows"))] +async fn force_kill_chrome_processes() { + // No-op on non-Windows platforms +} + +/// Verify Chrome processes are cleaned up (Windows only) +#[cfg(target_os = "windows")] +async fn verify_chrome_cleanup() { + if let Ok(output) = tokio::process::Command::new("tasklist") + .args(["/FI", "IMAGENAME eq chrome.exe"]) + .output() + .await + { + let stdout = String::from_utf8_lossy(&output.stdout); + let chrome_count = stdout.lines().filter(|line| line.contains("chrome.exe")).count(); + + if chrome_count > 0 { + logger::log_warn(&format!("⚠️ {} Chrome processes still running after cleanup!", chrome_count)).await; + } else { + logger::log_info("✓ All Chrome processes cleaned up").await; } - }; + } +} - let paths = DataPaths::new(".")?; +#[cfg(not(target_os = "windows"))] +async fn verify_chrome_cleanup() { + // No-op on non-Windows platforms +} - // Initialize monitoring system +/// Complete cleanup sequence: shutdown pools, cleanup containers, kill processes +async fn perform_full_cleanup( + pool: &ChromeDriverPool, + proxy_pool: Option<&DockerVpnProxyPool>, +) { + shutdown_chrome_pool(pool).await; + + if let Some(pp) = proxy_pool { + shutdown_proxy_pool(pp).await; + cleanup_all_proxy_containers().await.ok(); + } + + force_kill_chrome_processes().await; +} + +/// Create temporary ChromeDriver pool, fetch VPN credentials, and cleanup +async fn fetch_vpn_credentials_with_temp_pool( + config: &Config, + paths: &DataPaths, + monitoring_handle: &monitoring::MonitoringHandle, +) -> Result>> { + logger::log_info("VPN Rotation Enabled – Fetching latest VPNBook configs").await; + + // Create temp pool + logger::log_info("Creating temporary ChromeDriver pool for VPN credential fetch...").await; + let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit( + None, + config, + Some(monitoring_handle.clone()) + ).await?); + + // Fetch credentials + logger::log_info("Fetching VPNBook credentials...").await; + let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?; + logger::log_info(&format!("VPNBook credentials → User: {}", username)).await; + + // Cleanup temp pool + logger::log_info("Shutting down temporary pool...").await; + match temp_pool.shutdown().await { + Ok(()) => logger::log_info("✓ Temporary pool shut down successfully").await, + Err(e) => { + logger::log_error(&format!("✗ Temp pool shutdown error: {}", e)).await; + force_kill_chrome_processes().await; + } + } + + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // Count VPN servers and create proxy pool + let server_count = std::fs::read_dir(paths.cache_openvpn_dir())? + .filter(|e| e.as_ref().unwrap().path().is_dir()) + .count(); + + if server_count == 0 { + logger::log_warn("No VPN servers found – continuing without VPN").await; + return Ok(None); + } + + logger::log_info(&format!("Found {} VPN servers – starting Docker proxy containers", server_count)).await; + + let number_proxy_instances = config.proxy_instances_per_certificate.unwrap_or(1); + let proxy_pool = Arc::new(DockerVpnProxyPool::new( + paths.cache_openvpn_dir(), + username, + password, + number_proxy_instances + ).await?); + + logger::log_info(&format!("All {} Docker proxy containers started and ready", proxy_pool.num_proxies())).await; + + // Emit proxy connection events + for i in 0..proxy_pool.num_proxies() { + if let Some(proxy_info) = proxy_pool.get_proxy_info(i) { + monitoring_handle.emit(monitoring::MonitoringEvent::ProxyConnected { + container_name: proxy_info.container_name.clone(), + ip_address: proxy_info.ip_address.clone(), + port: proxy_info.port, + }); + } + } + + Ok(Some(proxy_pool)) +} + +/// Initialize monitoring system +async fn initialize_monitoring( + config: &Config, + paths: &DataPaths, +) -> Result<(monitoring::MonitoringHandle, tokio::task::JoinHandle<()>)> { let config_snapshot = ConfigSnapshot { max_parallel_instances: config.max_parallel_instances, max_tasks_per_instance: config.max_tasks_per_instance, @@ -50,13 +185,12 @@ async fn main() -> Result<()> { max_retry_attempts: config.max_retry_attempts, }; - let (monitoring_handle, _monitoring_task) = init_monitoring( + let (monitoring_handle, monitoring_task) = init_monitoring( config_snapshot, paths.logs_dir().to_path_buf(), - 3030, // Dashboard port + 3030, ).await?; - // Emit pool initialization event monitoring_handle.emit(monitoring::MonitoringEvent::PoolInitialized { pool_size: config.max_parallel_instances, with_proxy: config.enable_vpn_rotation, @@ -64,242 +198,144 @@ async fn main() -> Result<()> { }); logger::log_info("Monitoring dashboard available at http://localhost:3030").await; + + Ok((monitoring_handle, monitoring_task)) +} +/// Setup Ctrl+C handler for graceful shutdown +fn setup_shutdown_handler( + shutdown_flag: Arc, + pool: Arc, + proxy_pool: Option>, +) { + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + logger::log_info("Ctrl+C received – shutting down gracefully...").await; + + shutdown_flag.store(true, Ordering::SeqCst); + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + perform_full_cleanup(&pool, proxy_pool.as_deref()).await; + + logger::log_info("Shutdown complete").await; + std::process::exit(0); + }); +} + +fn format_duration(duration: Duration) -> String { + let total_seconds = duration.as_secs(); + + let days = total_seconds / 86400; + let hours = (total_seconds % 86400) / 3600; + let minutes = (total_seconds % 3600) / 60; + let seconds = total_seconds % 60; + + format!("{:02}::{:02}::{:02}::{:02}", days, hours, minutes, seconds) +} + +pub async fn create_state_files(paths: &DataPaths) -> Result<()> { + let paths = ( + paths.data_dir().join("state.jsonl"), + paths.cache_dir().join("state.jsonl"), + ); + + // Use OpenOptions to create the file only if it doesn't exist + for path in &[&paths.0, &paths.1] { + OpenOptions::new() + .create(true) // Create if it doesn't exist + .write(true) // Ensure we can write to the file + .open(path)?; + logger::log_info(&format!("Checked or created file: {}", path.display())).await; + } + + Ok(()) +} + +// ============================================================================ +// MAIN FUNCTION - Simplified with extracted helpers +// ============================================================================ + +#[tokio::main] +async fn main() -> Result<()> { + // Initial setup + let start = Instant::now(); + let paths = DataPaths::new(".")?; + + start_docker_desktop().await; + cleanup_all_proxy_containers().await.ok(); + create_state_files(&paths).await.ok(); + + let config = Config::load().unwrap_or_else(|_| { + eprintln!("Using default configuration"); + Config::default() + }); + + + + // Initialize monitoring + let (monitoring_handle, _monitoring_task) = initialize_monitoring(&config, &paths).await?; + + // Initialize debug logger logger::init_debug_logger(paths.logs_dir()).await.ok(); logger::log_info("=== Economic Webscraper Started ===").await; logger::log_info(&format!( - "Config → parallel_instances: {}, task_limit: {} vpn_rotation: {} proxy_instances_per_certificate: {:?}", + "Config → parallel_instances: {}, task_limit: {}, vpn_rotation: {}, proxy_instances_per_certificate: {:?}", config.max_parallel_instances, config.max_tasks_per_instance, config.enable_vpn_rotation, config.proxy_instances_per_certificate )).await; - let number_proxy_instances_per_certificate = config.proxy_instances_per_certificate.unwrap_or(1); - - - // Simple shutdown flag let shutdown_flag = Arc::new(AtomicBool::new(false)); - // === Step 1: Fetch VPNBook configs === - let proxy_pool: Option> = if config.enable_vpn_rotation { - logger::log_info("VPN Rotation Enabled – Fetching latest VPNBook configs").await; - - // Create temp pool and ensure it's properly shut down - logger::log_info("Creating temporary ChromeDriver pool for VPN credential fetch...").await; - let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit( - None, - &config, - Some(monitoring_handle.clone()) - ).await?); - - logger::log_info("Fetching VPNBook credentials...").await; - let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?; - logger::log_info(&format!("VPNBook credentials → User: {}", username)).await; - - // Properly shutdown temp pool with error handling - logger::log_info("Shutting down temporary pool...").await; - match temp_pool.shutdown().await { - Ok(()) => { - logger::log_info("✓ Temporary pool shut down successfully").await; - } - Err(e) => { - logger::log_error(&format!("✗ Temp pool shutdown error: {}", e)).await; - // Force-kill as backup - #[cfg(target_os = "windows")] - { - let _ = tokio::process::Command::new("taskkill") - .args(["/F", "/IM", "chrome.exe"]) - .output() - .await; - let _ = tokio::process::Command::new("taskkill") - .args(["/F", "/IM", "chromedriver.exe"]) - .output() - .await; - } - } - } - - // Wait a moment for cleanup - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - - let server_count = std::fs::read_dir(paths.cache_openvpn_dir())? - .filter(|e| e.as_ref().unwrap().path().is_dir()) - .count(); - - if server_count == 0 { - logger::log_warn("No VPN servers found – continuing without VPN").await; - None - } else { - logger::log_info(&format!("Found {} VPN servers – starting Docker proxy containers", server_count)).await; - let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password, number_proxy_instances_per_certificate).await?); - - logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await; - for i in 0..pp.num_proxies() { - if let Some(proxy_info) = pp.get_proxy_info(i) { - monitoring_handle.emit(monitoring::MonitoringEvent::ProxyConnected { - container_name: proxy_info.container_name.clone(), - ip_address: proxy_info.ip_address.clone(), - port: proxy_info.port, - }); - } - } - - Some(pp) - } + // Fetch VPN credentials and setup proxy pool if enabled + let proxy_pool = if config.enable_vpn_rotation { + fetch_vpn_credentials_with_temp_pool(&config, &paths, &monitoring_handle).await? } else { logger::log_info("VPN rotation disabled – using direct connection").await; None }; - // === Step 2: Initialize ChromeDriver pool === - let pool_size_limit = config.max_parallel_instances; - let task_limit = config.max_tasks_per_instance; - - logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", pool_size_limit)).await; + // Create main ChromeDriver pool + logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", config.max_parallel_instances)).await; - let pool = Arc::new( - if task_limit > 0 { - ChromeDriverPool::new_with_proxy_and_task_limit(proxy_pool.clone(), &config, Some(monitoring_handle.clone())).await? - } else { - ChromeDriverPool::new_with_proxy_and_task_limit(proxy_pool.clone(), &config, Some(monitoring_handle.clone())).await? - } + let pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit( + proxy_pool.clone(), + &config, + Some(monitoring_handle.clone()) + ).await?); + + logger::log_info(&format!("ChromeDriver pool ready with {} instances", config.max_parallel_instances)).await; + + // Setup Ctrl+C handler + setup_shutdown_handler( + Arc::clone(&shutdown_flag), + Arc::clone(&pool), + proxy_pool.clone(), ); - - logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size_limit)).await; - - // === Step 3: Ctrl+C handler === - { - let shutdown_flag_clone = Arc::clone(&shutdown_flag); - let pool_clone = Arc::clone(&pool); - let proxy_clone = proxy_pool.clone(); - - tokio::spawn(async move { - tokio::signal::ctrl_c().await.ok(); - logger::log_info("Ctrl+C received – shutting down gracefully...").await; - - // Set flag first - shutdown_flag_clone.store(true, Ordering::SeqCst); - - // Wait a bit for tasks to notice - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - - // ✅ FIXED: Better error handling during shutdown - logger::log_info("Shutting down ChromeDriver pool...").await; - match (&*pool_clone).shutdown().await { - Ok(()) => { - logger::log_info("✓ ChromeDriver pool shut down successfully").await; - } - Err(e) => { - logger::log_error(&format!("✗ Pool shutdown error: {}", e)).await; - } - } - - if let Some(pp) = proxy_clone { - logger::log_info("Stopping Docker VPN proxy containers...").await; - match pp.shutdown().await { - Ok(()) => { - logger::log_info("✓ All Docker VPN containers stopped").await; - } - Err(e) => { - logger::log_error(&format!("✗ Proxy shutdown error: {}", e)).await; - } - } - } - - let _ = cleanup_all_proxy_containers().await; - - // ✅ ADDED: Force-kill any remaining Chrome/ChromeDriver processes - #[cfg(target_os = "windows")] - { - logger::log_info("Force-killing any remaining Chrome processes...").await; - let _ = tokio::process::Command::new("taskkill") - .args(["/F", "/IM", "chrome.exe"]) - .output() - .await; - let _ = tokio::process::Command::new("taskkill") - .args(["/F", "/IM", "chromedriver.exe"]) - .output() - .await; - } - - logger::log_info("Shutdown complete").await; - std::process::exit(0); - }); - } - // === Step 4: Run scraping jobs === - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("--- Starting ECONOMIC data update ---").await; - economic::run_full_update(&config, &pool, &shutdown_flag).await?; - logger::log_info("Economic update completed").await; - } + // Run scraping jobs + check_shutdown!(&shutdown_flag); - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("--- Starting CORPORATE data update ---").await; - corporate::run_full_update(&config, &pool, &shutdown_flag).await?; - logger::log_info("Corporate update completed").await; - } + logger::log_info("--- Starting ECONOMIC data update ---").await; + economic::run_full_update(&config, &pool, &shutdown_flag).await?; + logger::log_info("Economic update completed").await; - // === Step 5: Final cleanup === - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Shutting down ChromeDriver pool...").await; - match pool.shutdown().await { - Ok(()) => { - logger::log_info("✓ ChromeDriver pool shut down successfully").await; - } - Err(e) => { - logger::log_error(&format!("✗ Pool shutdown error: {}", e)).await; - } - } + check_shutdown!(&shutdown_flag); - if let Some(pp) = proxy_pool { - logger::log_info("Stopping Docker VPN proxy containers...").await; - match pp.shutdown().await { - Ok(()) => { - logger::log_info("✓ All Docker VPN containers stopped").await; - } - Err(e) => { - logger::log_error(&format!("✗ Proxy shutdown error: {}", e)).await; - } - } - cleanup_all_proxy_containers().await.ok(); - } + logger::log_info("--- Starting CORPORATE data update ---").await; + corporate::run_full_update(&config, &pool, &shutdown_flag).await?; + logger::log_info("Corporate update completed").await; - // Final force-kill to ensure no leaks - #[cfg(target_os = "windows")] - { - logger::log_info("Final cleanup: force-killing any remaining Chrome processes...").await; - tokio::process::Command::new("taskkill") - .args(["/F", "/IM", "chrome.exe"]) - .output() - .await - .ok(); - tokio::process::Command::new("taskkill") - .args(["/F", "/IM", "chromedriver.exe"]) - .output() - .await - .ok(); - - // Verify cleanup - if let Ok(output) = tokio::process::Command::new("tasklist") - .args(["/FI", "IMAGENAME eq chrome.exe"]) - .output() - .await - { - let stdout = String::from_utf8_lossy(&output.stdout); - let chrome_count = stdout.lines().filter(|line| line.contains("chrome.exe")).count(); - - if chrome_count > 0 { - logger::log_warn(&format!("⚠️ {} Chrome processes still running after cleanup!", chrome_count)).await; - } else { - logger::log_info("✓ All Chrome processes cleaned up").await; - } - } - } + check_shutdown!(&shutdown_flag); + + // Final cleanup if not already shutting down + perform_full_cleanup(&pool, proxy_pool.as_deref()).await; + verify_chrome_cleanup().await; + + logger::log_info(&format!("=== Application finished after {} ===", format_duration(start.elapsed()))).await; + + logger::log_info("=== Application finished successfully ===").await; - logger::log_info("=== Application finished successfully ===").await; - } - Ok(()) } \ No newline at end of file diff --git a/src/util/macros.rs b/src/util/macros.rs index 4e59b51..9188d36 100644 --- a/src/util/macros.rs +++ b/src/util/macros.rs @@ -3,8 +3,8 @@ macro_rules! check_shutdown { ($shutdown_flag:expr) => { if $shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) { - logger::log_warn("Shutdown detected, stopping update").await; + logger::log_warn("Shutdown detected, stopping processes").await; return Ok(()); } }; -} +} \ No newline at end of file