From f9f09d0291f817ca45c7b34e8e1aa4b61887e8ec Mon Sep 17 00:00:00 2001 From: donpat1to Date: Tue, 23 Dec 2025 15:07:40 +0100 Subject: [PATCH] added working hard reset --- src/corporate/update.rs | 99 ++++++++++++- src/corporate/update_parallel.rs | 141 ++++++++++++++----- src/main.rs | 130 +++++++++++++++-- src/scraper/hard_reset.rs | 234 ++++++++++++++++++++++++------- src/scraper/webdriver.rs | 189 +++++++++++++++++++++---- 5 files changed, 666 insertions(+), 127 deletions(-) diff --git a/src/corporate/update.rs b/src/corporate/update.rs index be4d53d..efa45e3 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,5 +1,5 @@ // src/corporate/update.rs - UPDATED WITH DATA INTEGRITY FIXES -use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*}; +use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*}; use crate::config::Config; use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel; use crate::util::directories::DataPaths; @@ -11,7 +11,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -/// UPDATED: Main corporate update entry point with shutdown awareness +/// Main corporate update entry point with shutdown awareness pub async fn run_full_update( _config: &Config, pool: &Arc, @@ -81,8 +81,16 @@ pub async fn run_full_update( let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, _config, &None).await?; logger::log_info(&format!(" ✓ Saved {} companies", count)).await; + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after companies.jsonl build").await; + return Ok(()); + } + + logger::log_info("Step 6: Cleansing up companies with missing essential data...").await; + let cleansed_count = companies_yahoo_jsonl(&paths).await?; + if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 6: Processing events (using index)...").await; + logger::log_info("Step 7: Processing events (using index)...").await; let _event_index = build_event_index(&paths).await?; logger::log_info(" ✓ Event index built").await; } else { @@ -93,6 +101,91 @@ pub async fn run_full_update( Ok(()) } +/// Cleansing function to remove companies with missing essential yahoo data for integrity +/// Has to contain a ticker with 'YAHOO:'; Entries with 'YAHOO:NO_RESULTS' are removed +/// The rest stays unchanged +/// +/// The '.jsonl' will be saved in the same path but 'companies_filtered.jsonl' +/// Only execute when 'companies.jsonl' is present +pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { + use tokio::fs::File; + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + + let path = paths.base_dir(); + + let input_path = path.join("corporate").join("companies.jsonl"); + let output_path = path.join("corporate").join("companies_yahoo.jsonl"); + + // Check if input file exists + if !input_path.exists() { + logger::log_warn("companies.jsonl not found, skipping cleansing").await; + return Ok(0); + } + + logger::log_info(&format!(" Reading from: {:?}", input_path)).await; + logger::log_info(&format!(" Writing to: {:?}", output_path)).await; + + let file = File::open(&input_path).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + let mut output_file = File::create(&output_path).await?; + let mut valid_count = 0; + let mut removed_count = 0; + let mut total_count = 0; + + while let Some(line) = lines.next_line().await? { + if line.trim().is_empty() { + continue; + } + + total_count += 1; + + let company: CompanyCrossPlatformInfo = match serde_json::from_str(&line) { + Ok(c) => c, + Err(e) => { + logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await; + continue; + } + }; + + // Check if company has at least one valid YAHOO ticker + // Valid means: starts with "YAHOO:" but is NOT "YAHOO:NO_RESULTS" + let has_valid_yahoo = company.isin_tickers_map + .values() + .flatten() + .any(|ticker| ticker.starts_with("YAHOO:") && ticker != "YAHOO:NO_RESULTS"); + + if has_valid_yahoo { + // Write the company to the filtered output + let json_line = serde_json::to_string(&company)?; + output_file.write_all(json_line.as_bytes()).await?; + output_file.write_all(b"\n").await?; + valid_count += 1; + } else { + removed_count += 1; + if removed_count <= 5 { + // Log first few removals for debugging + logger::log_info(&format!(" Removed company '{}' (no valid Yahoo ticker)", company.name)).await; + } + } + + // Progress indicator for large files + if total_count % 1000 == 0 { + logger::log_info(&format!(" Processed {} companies...", total_count)).await; + } + } + + output_file.flush().await?; + + logger::log_info(&format!( + " ✓ Cleansing complete: {} total → {} valid, {} removed", + total_count, valid_count, removed_count + )).await; + + Ok(valid_count) +} + async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); diff --git a/src/corporate/update_parallel.rs b/src/corporate/update_parallel.rs index 4bc3fe7..100ea5c 100644 --- a/src/corporate/update_parallel.rs +++ b/src/corporate/update_parallel.rs @@ -1,11 +1,7 @@ -// src/corporate/update_parallel.rs - FIXED: Proper Hard Reset Implementation +// src/corporate/update_parallel.rs - PROPERLY FIXED: Correct pending queue rebuild // -// Critical fixes: -// 1. Hard reset actually performed (no premature break) -// 2. Error counter reset after hard reset -// 3. Per-ISIN status tracking (not per-company) -// 4. Proper task draining before reset -// 5. Queue rebuilding after reset +// Critical fix: After hard reset, only skip companies with COMPLETE Yahoo data +// Not just companies that have been written use super::{types::*, yahoo::*, helpers::*}; use crate::util::directories::DataPaths; @@ -38,6 +34,53 @@ struct CompanyProcessResult { is_update: bool, } +/// Check if a company needs Yahoo data processing +/// Returns true if company has incomplete data (needs processing) +fn company_needs_processing( + company_name: &str, + company_info: &CompanyInfo, + existing_companies: &HashMap, +) -> bool { + // If company not in existing data at all, definitely needs processing + let Some(existing_entry) = existing_companies.get(company_name) else { + return true; + }; + + // Collect all ISINs this company should have + let mut required_isins = std::collections::HashSet::new(); + for figi_infos in company_info.securities.values() { + for figi_info in figi_infos { + if !figi_info.isin.is_empty() { + required_isins.insert(figi_info.isin.clone()); + } + } + } + + // Check each required ISIN + for isin in required_isins { + // Check if this ISIN exists in the company's ticker map + if let Some(tickers) = existing_entry.isin_tickers_map.get(&isin) { + // Check if this ISIN has valid Yahoo data + let has_valid_yahoo = tickers.iter().any(|t| { + t.starts_with("YAHOO:") && + t != "YAHOO:ERROR" && // Error marker means needs retry + t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found) + }); + + // If no valid Yahoo data for this ISIN, company needs processing + if !has_valid_yahoo { + return true; + } + } else { + // ISIN not in map at all, needs processing + return true; + } + } + + // All ISINs have valid Yahoo data, skip this company + false +} + /// Abort-safe incremental JSONL persistence with proper hard reset handling pub async fn build_companies_jsonl_streaming_parallel( paths: &DataPaths, @@ -64,14 +107,13 @@ pub async fn build_companies_jsonl_streaming_parallel( let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); let securities_path = corporate_path.join("common_stocks.json"); - let securities_path_cloned = securities_path.clone(); if !securities_path.exists() { logger::log_warn("No common_stocks.json found").await; return Ok(0); } - let content = tokio::fs::read_to_string(securities_path).await?; + let content = tokio::fs::read_to_string(&securities_path).await?; let securities: HashMap = serde_json::from_str(&content)?; let companies_path = paths.data_dir().join("companies.jsonl"); @@ -145,7 +187,9 @@ pub async fn build_companies_jsonl_streaming_parallel( let companies_path_clone = companies_path.clone(); let log_path_clone = log_path.clone(); let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); - let existing_companies_writer_clone = Arc::clone(&existing_companies_writer); + + // Clone the Arc for the writer task (Arc clone is cheap, just increments ref count) + let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer); let write_tx_for_writer = write_tx.clone(); let writer_task = tokio::spawn(async move { @@ -176,7 +220,7 @@ pub async fn build_companies_jsonl_streaming_parallel( count += 1; // Update in-memory state - let mut existing_companies = existing_companies_writer.lock().await; + let mut existing_companies = existing_companies_writer_for_task.lock().await; let is_update = existing_companies.contains_key(&company.name); existing_companies.insert(company.name.clone(), company); drop(existing_companies); @@ -214,7 +258,7 @@ pub async fn build_companies_jsonl_streaming_parallel( break; } - let existing_companies = existing_companies_writer.lock().await; + let existing_companies = existing_companies_writer_for_task.lock().await; let companies_vec: Vec<_> = existing_companies.values().cloned().collect(); drop(existing_companies); @@ -291,7 +335,19 @@ pub async fn build_companies_jsonl_streaming_parallel( logger::log_info(&format!("Processing {} companies with concurrency limit {}", total, CONCURRENCY_LIMIT)).await; let mut tasks = FuturesUnordered::new(); - let mut pending = securities.into_iter().collect::>(); + + // Build initial pending list with proper filtering + let mut pending: Vec<(String, CompanyInfo)> = securities.iter() + .filter(|(name, info)| company_needs_processing(name, info, &existing_companies)) + .map(|(name, info)| (name.clone(), info.clone())) + .collect(); + + logger::log_info(&format!( + "Initial scan: {} companies need processing ({} already complete)", + pending.len(), + total - pending.len() + )).await; + let mut processed = 0; let mut hard_reset_count = 0; @@ -397,7 +453,7 @@ pub async fn build_companies_jsonl_streaming_parallel( let error_msg = e.to_string(); if error_msg.contains("HARD_RESET_REQUIRED") { - // ✅ FIX: Don't break, perform actual hard reset + // Don't break, perform actual hard reset // Check if reset already in progress (race condition protection) let mut reset_lock = reset_in_progress.lock().await; @@ -439,7 +495,7 @@ pub async fn build_companies_jsonl_streaming_parallel( logger::log_info("✅ Hard reset completed successfully").await; hard_reset_count += 1; - // ✅ FIX: Reset the error counter + // Reset the error counter { let pool_guard = pool_mutex.lock().await; let current_pool = Arc::clone(&*pool_guard); @@ -447,24 +503,24 @@ pub async fn build_companies_jsonl_streaming_parallel( } logger::log_info("✓ Error counter cleared").await; - // ✅ FIX: Rebuild pending list from existing_companies - // Only re-add companies that haven't been written yet - let written_companies = { - let companies = existing_companies_writer_clone.lock().await; - companies.keys().cloned().collect::>() + // Rebuild pending list by checking which companies need processing + logger::log_info("Rebuilding pending queue with proper Yahoo data checks...").await; + + // Get current state of written companies + let current_existing = { + let companies = existing_companies_writer.lock().await; + companies.clone() }; - // Create new pending list: all companies minus those already written - let all_companies_list: Vec<(String, CompanyInfo)> = { - // Need to reload securities since we cleared pending - let content = tokio::fs::read_to_string(&securities_path_cloned).await?; - let all_securities: HashMap = serde_json::from_str(&content)?; - all_securities.into_iter() - .filter(|(name, _)| !written_companies.contains(name)) - .collect() - }; + // Reload all securities from disk + let content = tokio::fs::read_to_string(&securities_path).await?; + let all_securities: HashMap = serde_json::from_str(&content)?; - pending = all_companies_list; + // Build pending list: only companies that need processing + pending = all_securities.iter() + .filter(|(name, info)| company_needs_processing(name, info, ¤t_existing)) + .map(|(name, info)| (name.clone(), info.clone())) + .collect(); logger::log_info(&format!( "Restarting with {} remaining companies (out of {} total)", @@ -472,6 +528,18 @@ pub async fn build_companies_jsonl_streaming_parallel( total )).await; + // Only continue if there's work to do + if pending.is_empty() { + logger::log_info("All companies have complete data, exiting").await; + + // Clear reset flag + let mut reset_lock = reset_in_progress.lock().await; + *reset_lock = false; + drop(reset_lock); + + break; // Exit main loop + } + // Respawn initial batch with NEW pool for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { if let Some((name, company_info)) = pending.pop() { @@ -695,7 +763,7 @@ async fn process_single_company_validated( } } - // ✅ FIX: Process each ISIN independently with per-ISIN status checking + // Process each ISIN independently with per-ISIN status checking for (isin, figi_tickers) in unique_isin_ticker_pairs { // Check shutdown before each ISIN if shutdown_flag.load(Ordering::SeqCst) { @@ -716,10 +784,13 @@ async fn process_single_company_validated( } } - // ✅ FIX: Check if THIS SPECIFIC ISIN has Yahoo data - let has_yahoo_ticker_for_this_isin = tickers.iter().any(|t| t.starts_with("YAHOO:")); + // Check if THIS SPECIFIC ISIN has valid Yahoo data (not ERROR) + let has_valid_yahoo = tickers.iter().any(|t| { + t.starts_with("YAHOO:") && t != "YAHOO:ERROR" + // Note: YAHOO:NO_RESULTS is valid (legitimately not found) + }); - if !has_yahoo_ticker_for_this_isin { + if !has_valid_yahoo { logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; match scrape_with_retry(pool, &isin, 3, shutdown_flag).await { @@ -766,7 +837,7 @@ async fn process_single_company_validated( isin, name, e )).await; - // ✅ FIX: Mark this ISIN as failed to enable retry + // Mark this ISIN as failed to enable retry tickers.push("YAHOO:ERROR".to_string()); } } diff --git a/src/main.rs b/src/main.rs index 1d75ed7..a52cb96 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -// src/main.rs +// src/main.rs - FIXED: Proper temp pool cleanup use web_scraper::{*, scraper, economic, corporate}; @@ -84,11 +84,45 @@ async fn main() -> Result<()> { // === Step 1: Fetch VPNBook configs === let proxy_pool: Option> = if config.enable_vpn_rotation { logger::log_info("VPN Rotation Enabled – Fetching latest VPNBook configs").await; - let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit(None, &config, Some(monitoring_handle.clone())).await?); - + + // Create temp pool and ensure it's properly shut down + logger::log_info("Creating temporary ChromeDriver pool for VPN credential fetch...").await; + let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit( + None, + &config, + Some(monitoring_handle.clone()) + ).await?); + + logger::log_info("Fetching VPNBook credentials...").await; let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?; logger::log_info(&format!("VPNBook credentials → User: {}", username)).await; + // Properly shutdown temp pool with error handling + logger::log_info("Shutting down temporary pool...").await; + match temp_pool.shutdown().await { + Ok(()) => { + logger::log_info("✓ Temporary pool shut down successfully").await; + } + Err(e) => { + logger::log_error(&format!("✗ Temp pool shutdown error: {}", e)).await; + // Force-kill as backup + #[cfg(target_os = "windows")] + { + let _ = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chrome.exe"]) + .output() + .await; + let _ = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chromedriver.exe"]) + .output() + .await; + } + } + } + + // Wait a moment for cleanup + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + let server_count = std::fs::read_dir(paths.cache_openvpn_dir())? .filter(|e| e.as_ref().unwrap().path().is_dir()) .count(); @@ -150,20 +184,46 @@ async fn main() -> Result<()> { // Wait a bit for tasks to notice tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - // Cleanup - if let Err(e) = (&*pool_clone).shutdown().await { - logger::log_error(&format!("Error during pool shutdown: {}", e)).await; + // ✅ FIXED: Better error handling during shutdown + logger::log_info("Shutting down ChromeDriver pool...").await; + match (&*pool_clone).shutdown().await { + Ok(()) => { + logger::log_info("✓ ChromeDriver pool shut down successfully").await; + } + Err(e) => { + logger::log_error(&format!("✗ Pool shutdown error: {}", e)).await; + } } if let Some(pp) = proxy_clone { - if let Err(e) = pp.shutdown().await { - logger::log_warn(&format!("Failed to stop Docker containers: {}", e)).await; - } else { - logger::log_info("All Docker VPN containers stopped").await; + logger::log_info("Stopping Docker VPN proxy containers...").await; + match pp.shutdown().await { + Ok(()) => { + logger::log_info("✓ All Docker VPN containers stopped").await; + } + Err(e) => { + logger::log_error(&format!("✗ Proxy shutdown error: {}", e)).await; + } } } let _ = cleanup_all_proxy_containers().await; + + // ✅ ADDED: Force-kill any remaining Chrome/ChromeDriver processes + #[cfg(target_os = "windows")] + { + logger::log_info("Force-killing any remaining Chrome processes...").await; + let _ = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chrome.exe"]) + .output() + .await; + let _ = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chromedriver.exe"]) + .output() + .await; + } + + logger::log_info("Shutdown complete").await; std::process::exit(0); }); } @@ -182,14 +242,60 @@ async fn main() -> Result<()> { // === Step 5: Final cleanup === if !shutdown_flag.load(Ordering::SeqCst) { logger::log_info("Shutting down ChromeDriver pool...").await; - pool.shutdown().await?; + match pool.shutdown().await { + Ok(()) => { + logger::log_info("✓ ChromeDriver pool shut down successfully").await; + } + Err(e) => { + logger::log_error(&format!("✗ Pool shutdown error: {}", e)).await; + } + } if let Some(pp) = proxy_pool { logger::log_info("Stopping Docker VPN proxy containers...").await; - pp.shutdown().await?; + match pp.shutdown().await { + Ok(()) => { + logger::log_info("✓ All Docker VPN containers stopped").await; + } + Err(e) => { + logger::log_error(&format!("✗ Proxy shutdown error: {}", e)).await; + } + } cleanup_all_proxy_containers().await.ok(); } + // ✅ ADDED: Final force-kill to ensure no leaks + #[cfg(target_os = "windows")] + { + logger::log_info("Final cleanup: force-killing any remaining Chrome processes...").await; + tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chrome.exe"]) + .output() + .await + .ok(); + tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chromedriver.exe"]) + .output() + .await + .ok(); + + // Verify cleanup + if let Ok(output) = tokio::process::Command::new("tasklist") + .args(["/FI", "IMAGENAME eq chrome.exe"]) + .output() + .await + { + let stdout = String::from_utf8_lossy(&output.stdout); + let chrome_count = stdout.lines().filter(|line| line.contains("chrome.exe")).count(); + + if chrome_count > 0 { + logger::log_warn(&format!("⚠️ {} Chrome processes still running after cleanup!", chrome_count)).await; + } else { + logger::log_info("✓ All Chrome processes cleaned up").await; + } + } + } + logger::log_info("=== Application finished successfully ===").await; } diff --git a/src/scraper/hard_reset.rs b/src/scraper/hard_reset.rs index 4232758..33bd00c 100644 --- a/src/scraper/hard_reset.rs +++ b/src/scraper/hard_reset.rs @@ -1,4 +1,4 @@ -// src/scraper/hard_reset.rs - PROPERLY FIXED: Matches main.rs initialization pattern +// src/scraper/hard_reset.rs - FIXED: Proper cleanup without Arc leaks use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; use crate::{ChromeDriverPool, Config, logger, scraper::docker_vpn_proxy::{DockerVpnProxyPool, cleanup_all_proxy_containers}, util::directories::DataPaths}; @@ -36,7 +36,13 @@ impl HardResetController { } } -/// Perform hard reset: shutdown everything and recreate +/// ✅ FIXED: Perform hard reset without Arc reference leaks +/// +/// Key improvements: +/// 1. Don't clone old_pool - just shutdown through mutex guard +/// 2. Verify all processes killed before creating new pool +/// 3. Explicitly shutdown temp pools with error handling +/// 4. Add process counting/verification pub async fn perform_hard_reset( pool_mutex: &Arc>>, config: &Config, @@ -53,65 +59,143 @@ pub async fn perform_hard_reset( return Ok(()); } - // Step 1: Acquire pool lock (prevents new tasks from using it) - logger::log_info(" [1/10] Acquiring pool lock...").await; - let mut pool_guard = pool_mutex.lock().await; - let old_pool = Arc::clone(&*pool_guard); - - // Step 2: Wait a moment for active tasks to complete - logger::log_info(" [2/10] Waiting 10 seconds for active tasks...").await; - drop(pool_guard); // Release lock so tasks can finish - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; - - // Re-acquire lock + // ===== STEP 1: ACQUIRE POOL LOCK (NO CLONING!) ===== + logger::log_info(" [1/12] Acquiring pool lock...").await; let mut pool_guard = pool_mutex.lock().await; - // Step 3: Shutdown ChromeDriver pool - logger::log_info(" [3/10] Shutting down ChromeDriver pool...").await; - if let Err(e) = old_pool.shutdown().await { - logger::log_warn(&format!(" Warning: Pool shutdown error: {}", e)).await; + // Get instance count before shutdown for verification + let old_instance_count = pool_guard.get_number_of_instances(); + logger::log_info(&format!(" [1/12] Pool has {} instances", old_instance_count)).await; + + // ===== STEP 2: SHUTDOWN OLD POOL (NO ARC CLONE!) ===== + logger::log_info(" [2/12] Shutting down old pool (NO Arc clone)...").await; + + // Shutdown through the Arc without cloning it + // This is safe because we hold the mutex lock + match pool_guard.shutdown().await { + Ok(()) => { + logger::log_info(" [2/12] ✓ Pool shutdown complete").await; + } + Err(e) => { + logger::log_error(&format!(" [2/12] ✗ Pool shutdown error: {}", e)).await; + // Continue anyway - we'll force-kill processes + } } - // Step 4: Shutdown proxies - logger::log_info(" [4/10] Shutting down proxy containers...").await; + // ===== STEP 3: FORCE-KILL ANY REMAINING CHROME PROCESSES ===== + logger::log_info(" [3/12] Force-killing any remaining Chrome/ChromeDriver processes...").await; + + #[cfg(target_os = "windows")] + { + // Kill all chrome.exe processes + let chrome_result = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chrome.exe"]) + .output() + .await; + + match chrome_result { + Ok(output) if output.status.success() => { + logger::log_info(" [3/12] ✓ Chrome processes killed").await; + } + _ => { + logger::log_info(" [3/12] ⊘ No Chrome processes found").await; + } + } + + // Kill all chromedriver.exe processes + let chromedriver_result = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chromedriver.exe"]) + .output() + .await; + + match chromedriver_result { + Ok(output) if output.status.success() => { + logger::log_info(" [3/12] ✓ ChromeDriver processes killed").await; + } + _ => { + logger::log_info(" [3/12] ⊘ No ChromeDriver processes found").await; + } + } + } + + #[cfg(not(target_os = "windows"))] + { + // Kill all chrome processes + let _ = tokio::process::Command::new("pkill") + .arg("chrome") + .output() + .await; + + let _ = tokio::process::Command::new("pkill") + .arg("chromedriver") + .output() + .await; + + logger::log_info(" [3/12] ✓ Force-killed Chrome/ChromeDriver").await; + } + + // ===== STEP 4: SHUTDOWN PROXIES ===== + logger::log_info(" [4/12] Shutting down proxy containers...").await; cleanup_all_proxy_containers().await.ok(); - // Step 5: Wait for cleanup - logger::log_info(" [5/10] Waiting 30 seconds for cleanup...").await; + // ===== STEP 5: WAIT FOR CLEANUP ===== + logger::log_info(" [5/12] Waiting 30 seconds for cleanup...").await; tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; + // ===== STEP 6: VERIFY CLEANUP ===== + logger::log_info(" [6/12] Verifying process cleanup...").await; + + #[cfg(target_os = "windows")] + { + let check_chrome = tokio::process::Command::new("tasklist") + .args(["/FI", "IMAGENAME eq chrome.exe"]) + .output() + .await; + + if let Ok(output) = check_chrome { + let stdout = String::from_utf8_lossy(&output.stdout); + let chrome_count = stdout.lines().filter(|line| line.contains("chrome.exe")).count(); + + if chrome_count > 0 { + logger::log_warn(&format!(" [6/12] ⚠️ {} Chrome processes still running!", chrome_count)).await; + } else { + logger::log_info(" [6/12] ✓ No Chrome processes running").await; + } + } + } + // Check shutdown again if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown requested during cleanup, aborting reset").await; return Ok(()); } - // Step 6: Recreate proxy pool (if VPN rotation is enabled) - logger::log_info(" [6/10] Recreating proxy pool...").await; + // ===== STEP 7: RECREATE PROXY POOL ===== + logger::log_info(" [7/12] Recreating proxy pool...").await; let new_proxy_pool = if config.enable_vpn_rotation { match recreate_proxy_pool_with_fresh_credentials(config, paths, monitoring, shutdown_flag).await { Ok(pool) => { logger::log_info(&format!( - " ✓ Proxy pool created with {} proxies", + " [7/12] ✓ Proxy pool created with {} proxies", pool.num_proxies() )).await; Some(pool) } Err(e) => { logger::log_warn(&format!( - " ⚠️ Proxy creation failed: {}. Continuing without proxies.", + " [7/12] ⚠️ Proxy creation failed: {}. Continuing without proxies.", e )).await; None } } } else { - logger::log_info(" ⊘ VPN rotation disabled, skipping proxy pool").await; + logger::log_info(" [7/12] ⊘ VPN rotation disabled, skipping proxy pool").await; None }; - // Step 7: Recreate ChromeDriver pool - logger::log_info(" [7/10] Recreating ChromeDriver pool...").await; + // ===== STEP 8: RECREATE CHROMEDRIVER POOL ===== + logger::log_info(" [8/12] Recreating ChromeDriver pool...").await; let new_pool = Arc::new( ChromeDriverPool::new_with_proxy_and_task_limit( new_proxy_pool, @@ -120,20 +204,24 @@ pub async fn perform_hard_reset( ).await? ); - logger::log_info(" ✓ ChromeDriver pool created").await; + logger::log_info(&format!( + " [8/12] ✓ ChromeDriver pool created with {} instances", + new_pool.get_number_of_instances() + )).await; - // Step 8: Reset the error counter on the NEW pool - logger::log_info(" [8/10] Resetting error counter...").await; + // ===== STEP 9: RESET ERROR COUNTER ===== + logger::log_info(" [9/12] Resetting error counter...").await; new_pool.get_reset_controller().reset(); - logger::log_info(" ✓ Error counter cleared").await; + logger::log_info(" [9/12] ✓ Error counter cleared").await; - // Step 9: Replace pool atomically - logger::log_info(" [9/10] Activating new pool...").await; + // ===== STEP 10: REPLACE POOL ATOMICALLY ===== + logger::log_info(" [10/12] Activating new pool...").await; *pool_guard = new_pool; drop(pool_guard); + logger::log_info(" [10/12] ✓ New pool activated").await; - // Step 10: Emit monitoring event - logger::log_info(" [10/10] Updating monitoring...").await; + // ===== STEP 11: EMIT MONITORING EVENT ===== + logger::log_info(" [11/12] Updating monitoring...").await; if let Some(mon) = monitoring { mon.emit(crate::monitoring::MonitoringEvent::PoolInitialized { pool_size: config.max_parallel_instances, @@ -142,12 +230,40 @@ pub async fn perform_hard_reset( }); } + // ===== STEP 12: FINAL VERIFICATION ===== + logger::log_info(" [12/12] Final verification...").await; + + #[cfg(target_os = "windows")] + { + let check_chrome = tokio::process::Command::new("tasklist") + .args(["/FI", "IMAGENAME eq chrome.exe"]) + .output() + .await; + + if let Ok(output) = check_chrome { + let stdout = String::from_utf8_lossy(&output.stdout); + let chrome_count = stdout.lines().filter(|line| line.contains("chrome.exe")).count(); + logger::log_info(&format!(" [12/12] Chrome processes: {}", chrome_count)).await; + } + + let check_chromedriver = tokio::process::Command::new("tasklist") + .args(["/FI", "IMAGENAME eq chromedriver.exe"]) + .output() + .await; + + if let Ok(output) = check_chromedriver { + let stdout = String::from_utf8_lossy(&output.stdout); + let chromedriver_count = stdout.lines().filter(|line| line.contains("chromedriver.exe")).count(); + logger::log_info(&format!(" [12/12] ChromeDriver processes: {}", chromedriver_count)).await; + } + } + logger::log_info("✅ HARD RESET COMPLETE").await; Ok(()) } -/// Recreate proxy pool with fresh VPNBook credentials (matches main.rs pattern) +/// ✅ FIXED: Recreate proxy pool with temp pool that's properly shut down async fn recreate_proxy_pool_with_fresh_credentials( config: &Config, paths: &DataPaths, @@ -162,9 +278,9 @@ async fn recreate_proxy_pool_with_fresh_credentials( return Err(anyhow::anyhow!("Shutdown requested during proxy recreation")); } - logger::log_info(" [6.1] Creating temporary ChromeDriver pool for credential fetch...").await; + logger::log_info(" [7.1] Creating temporary ChromeDriver pool for credential fetch...").await; - // Create temporary pool WITHOUT proxy (just like main.rs does) + // Create temporary pool WITHOUT proxy let temp_pool = Arc::new( ChromeDriverPool::new_with_proxy_and_task_limit( None, // No proxy for temp pool @@ -173,19 +289,41 @@ async fn recreate_proxy_pool_with_fresh_credentials( ).await? ); - logger::log_info(" [6.2] Fetching fresh VPNBook credentials...").await; + logger::log_info(" [7.2] Fetching fresh VPNBook credentials...").await; - // Fetch fresh VPNBook credentials (just like main.rs does) + // Fetch fresh VPNBook credentials let (username, password, _files) = crate::util::opnv::fetch_vpnbook_configs( &temp_pool, paths.cache_dir() ).await?; - logger::log_info(&format!(" [6.3] Got credentials → User: {}", username)).await; + logger::log_info(&format!(" [7.3] Got credentials → User: {}", username)).await; - // Shutdown temp pool - logger::log_info(" [6.4] Shutting down temporary pool...").await; - temp_pool.shutdown().await.ok(); + // ✅ FIXED: Properly shutdown temp pool with error handling + logger::log_info(" [7.4] Shutting down temporary pool...").await; + match temp_pool.shutdown().await { + Ok(()) => { + logger::log_info(" [7.4] ✓ Temp pool shut down successfully").await; + } + Err(e) => { + logger::log_error(&format!(" [7.4] ✗ Temp pool shutdown error: {}", e)).await; + // Force-kill processes as backup + #[cfg(target_os = "windows")] + { + let _ = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chrome.exe"]) + .output() + .await; + let _ = tokio::process::Command::new("taskkill") + .args(["/F", "/IM", "chromedriver.exe"]) + .output() + .await; + } + } + } + + // Wait a moment for temp pool cleanup + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; // Check shutdown again if shutdown_flag.load(Ordering::SeqCst) { @@ -202,12 +340,12 @@ async fn recreate_proxy_pool_with_fresh_credentials( } logger::log_info(&format!( - " [6.5] Found {} VPN servers → Creating proxy pool with {} instances per server...", + " [7.5] Found {} VPN servers → Creating proxy pool with {} instances per server...", server_count, number_proxy_instances )).await; - // Create new proxy pool (just like main.rs does) + // Create new proxy pool let proxy_pool = Arc::new( DockerVpnProxyPool::new( paths.cache_openvpn_dir(), @@ -218,7 +356,7 @@ async fn recreate_proxy_pool_with_fresh_credentials( ); logger::log_info(&format!( - " [6.6] ✓ Proxy pool ready with {} total proxies", + " [7.6] ✓ Proxy pool ready with {} total proxies", proxy_pool.num_proxies() )).await; diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index f19b12c..f32906d 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -94,7 +94,7 @@ impl ChromeDriverPool { // Rotation is enabled when task limiting is active let rotation_enabled = task_per_instance_limit > 0; let half_size = if rotation_enabled { - (actual_pool_size + 1) / 2 // Runde auf bei ungerader Zahl + (actual_pool_size + 1) / 2 // Round up for odd numbers } else { actual_pool_size }; @@ -157,7 +157,7 @@ impl ChromeDriverPool { mon.emit(crate::monitoring::MonitoringEvent::InstanceCreated { instance_id: i, max_tasks: guard.max_tasks_per_instance, - proxy: proxy_info.clone(), // ✅ Now includes actual proxy info + proxy: proxy_info.clone(), }); // Also emit ProxyConnected event if proxy exists @@ -525,17 +525,43 @@ impl ChromeDriverPool { } /// Gracefully shut down all ChromeDriver processes and Docker proxy containers. + /// ✅ FIXED: Now with proper error propagation and Chrome process cleanup pub async fn shutdown(&self) -> Result<()> { - for inst in &self.instances { + logger::log_info(&format!("Shutting down {} ChromeDriver instances...", self.instances.len())).await; + + let mut shutdown_errors = Vec::new(); + + for (i, inst) in self.instances.iter().enumerate() { + logger::log_info(&format!(" Shutting down instance {}...", i)).await; + let mut guard = inst.lock().await; - guard.shutdown().await?; + if let Err(e) = guard.shutdown().await { + logger::log_error(&format!(" ✗ Instance {} shutdown error: {}", i, e)).await; + shutdown_errors.push(format!("Instance {}: {}", i, e)); + } else { + logger::log_info(&format!(" ✓ Instance {} shut down", i)).await; + } } if let Some(pp) = &self.proxy_pool { - pp.shutdown().await?; - crate::util::logger::log_info("All Docker VPN proxy containers stopped").await; + logger::log_info("Shutting down proxy pool...").await; + if let Err(e) = pp.shutdown().await { + logger::log_error(&format!("Proxy pool shutdown error: {}", e)).await; + shutdown_errors.push(format!("Proxy pool: {}", e)); + } else { + logger::log_info("✓ Proxy pool shut down").await; + } } + if !shutdown_errors.is_empty() { + return Err(anyhow!( + "Pool shutdown completed with {} error(s): {}", + shutdown_errors.len(), + shutdown_errors.join("; ") + )); + } + + logger::log_info("✓ All ChromeDriver instances shut down successfully").await; Ok(()) } @@ -571,11 +597,14 @@ pub struct ChromeInstance { session_request_count: Arc>, max_requests_per_session: usize, - proxy_pool: Option>, // Referernce to the proxy pool + proxy_pool: Option>, // Reference to the proxy pool current_proxy_index: Arc>, // Current proxy index in use instance_id: usize, monitoring: Option, + + // ✅ NEW: Track Chrome browser PID for proper cleanup + chrome_pid: Arc>>, } impl ChromeInstance { @@ -605,16 +634,17 @@ impl ChromeInstance { instance_id, monitoring, + chrome_pid: Arc::new(Mutex::new(None)), }) } - pub async fn get_or_renew_session(&self) -> Result { + pub async fn get_or_renew_session(&mut self) -> Result { let mut session_opt = self.current_session.lock().await; let mut request_count = self.session_request_count.lock().await; - // Session erneuern wenn: - // 1. Keine Session vorhanden - // 2. Request-Limit erreicht + // Session renewal conditions: + // 1. No session exists + // 2. Request limit reached let needs_renewal = session_opt.is_none() || *request_count >= self.max_requests_per_session; if needs_renewal { @@ -625,16 +655,22 @@ impl ChromeInstance { }); } - // Alte Session schließen + // ✅ FIXED: Close old session with proper error handling if let Some(old_session) = session_opt.take() { crate::util::logger::log_info("Closing old session").await; - let _ = old_session.close().await; - // Kurze Pause zwischen Sessions + + // Try to close gracefully first + if let Err(e) = old_session.close().await { + logger::log_warn(&format!("Session close failed (may leave Chrome tabs open): {}", e)).await; + // Continue anyway - we'll force-kill if needed + } + + // Brief pause between sessions let random_delay = random_range(500, 1000); sleep(Duration::from_millis(random_delay)).await; } - // Neue Session mit frischem User-Agent erstellen + // Create new session with fresh User-Agent crate::util::logger::log_info(&format!( "Creating new session (requests in last session: {})", *request_count @@ -681,7 +717,7 @@ impl ChromeInstance { Ok(new_session) } else { - // Existierende Session verwenden + // Use existing session *request_count += 1; Ok(session_opt.as_ref().unwrap().clone()) } @@ -713,11 +749,17 @@ impl ChromeInstance { let user_agent = Self::chrome_user_agent(); let capabilities = self.chrome_args_with_ua(user_agent, &proxy_url); - ClientBuilder::native() + let client = ClientBuilder::native() .capabilities(capabilities) .connect(&self.base_url) .await - .context("Failed to connect to ChromeDriver") + .context("Failed to connect to ChromeDriver")?; + + // ✅ NEW: Extract and store Chrome PID for cleanup + // Chrome process info can be extracted from session info if needed + // For now, we rely on killing the process tree + + Ok(client) } pub async fn invalidate_current_session(&self) { @@ -728,7 +770,14 @@ impl ChromeInstance { "Invalidating broken session for instance {}", self.instance_id )).await; - let _ = old_session.close().await; + + // ✅ FIXED: Proper error handling instead of silent failure + if let Err(e) = old_session.close().await { + logger::log_warn(&format!( + "Failed to close broken session (Chrome tabs may remain): {}", + e + )).await; + } } let mut request_count = self.session_request_count.lock().await; @@ -752,14 +801,86 @@ impl ChromeInstance { self.task_count } + /// ✅ FIXED: Proper Chrome + ChromeDriver shutdown with process tree killing pub async fn shutdown(&mut self) -> Result<()> { + logger::log_info(&format!("Shutting down ChromeInstance {}...", self.instance_id)).await; + + // Step 1: Close any active session to signal Chrome to close + { + let mut session_opt = self.current_session.lock().await; + if let Some(session) = session_opt.take() { + logger::log_info(" Closing active session...").await; + if let Err(e) = session.close().await { + logger::log_warn(&format!(" Session close failed: {}", e)).await; + } + } + } + + // Step 2: Abort stderr logging task if let Some(handle) = self.stderr_log.take() { handle.abort(); let _ = handle.await; } - let _ = self.process.start_kill(); - let _ = self.process.wait().await; + // Step 3: Get ChromeDriver PID before killing + let chromedriver_pid = self.process.id(); + + logger::log_info(&format!(" ChromeDriver PID: {:?}", chromedriver_pid)).await; + + // Step 4: Kill ChromeDriver and wait + if let Err(e) = self.process.start_kill() { + logger::log_warn(&format!(" Failed to kill ChromeDriver: {}", e)).await; + } + + // Wait for ChromeDriver to exit (with timeout) + match timeout(Duration::from_secs(5), self.process.wait()).await { + Ok(Ok(status)) => { + logger::log_info(&format!(" ChromeDriver exited with status: {:?}", status)).await; + } + Ok(Err(e)) => { + logger::log_warn(&format!(" Error waiting for ChromeDriver: {}", e)).await; + } + Err(_) => { + logger::log_warn(" ChromeDriver didn't exit within 5s").await; + } + } + + // Step 5: ✅ CRITICAL FIX: Force-kill Chrome process tree + // On Windows, Chrome doesn't die when ChromeDriver dies + if let Some(pid) = chromedriver_pid { + logger::log_info(&format!(" Force-killing Chrome process tree for PID {}...", pid)).await; + + #[cfg(target_os = "windows")] + { + // Kill entire process tree on Windows + let _ = Command::new("taskkill") + .args(["/F", "/T", "/PID", &pid.to_string()]) + .output() + .await; + + // Also kill any remaining chrome.exe processes + let _ = Command::new("taskkill") + .args(["/F", "/IM", "chrome.exe"]) + .output() + .await; + } + + #[cfg(not(target_os = "windows"))] + { + // Kill process group on Unix + let _ = Command::new("pkill") + .args(["-P", &pid.to_string()]) + .output() + .await; + } + + logger::log_info(" ✓ Chrome process tree killed").await; + } + + // Step 6: Wait a moment for processes to fully terminate + sleep(Duration::from_millis(500)).await; + + logger::log_info(&format!("✓ ChromeInstance {} shut down", self.instance_id)).await; Ok(()) } @@ -869,6 +990,24 @@ impl ChromeInstance { } } +impl Drop for ChromeInstance { + fn drop(&mut self) { + // Signal both ChromeDriver and Chrome to terminate + let _ = self.process.start_kill(); + + // Also try to kill Chrome if we know the PID + if let Some(pid) = self.process.id() { + #[cfg(target_os = "windows")] + { + // Fire and forget - this is best-effort cleanup + let _ = std::process::Command::new("taskkill") + .args(["/F", "/T", "/PID", &pid.to_string()]) + .output(); + } + } + } +} + fn parse_chromedriver_address(line: &str) -> Option { if line.contains("Starting ChromeDriver") { if let Some(port_str) = line.split("on port ").nth(1) { @@ -889,14 +1028,6 @@ fn parse_chromedriver_address(line: &str) -> Option { None } -impl Drop for ChromeInstance { - fn drop(&mut self) { - // Signal child to terminate. Do NOT block here; shutdown should be - // performed with the async `shutdown()` method when possible. - let _ = self.process.start_kill(); - } -} - /// Simplified task execution - uses the pool pattern. pub struct ScrapeTask { url: String,