diff --git a/.env.example b/.env.example index b1e705e..249152e 100644 --- a/.env.example +++ b/.env.example @@ -14,8 +14,8 @@ CORPORATE_START_DATE=2010-01-01 # How far into the future we scrape economic events (in months) ECONOMIC_LOOKAHEAD_MONTHS=3 -# Maximum number of parallel scraping tasks (default: 10) -MAX_PARALLEL_TASKS=10 +# Maximum number of parallel scraping tasks (default: 4) +MAX_PARALLEL_INSTANCES=10 # ===== VPN ROTATION (ProtonVPN Integration) ===== # Enable automatic VPN rotation between sessions? @@ -37,4 +37,6 @@ TASKS_PER_VPN_SESSION=50 MAX_REQUESTS_PER_SESSION=25 MIN_REQUEST_INTERVAL_MS=300 -MAX_RETRY_ATTEMPTS=3 \ No newline at end of file +MAX_RETRY_ATTEMPTS=3 + +PROXY_INSTANCES_PER_CERTIFICATE=2 \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index c35e129..00af0a9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,6 +27,9 @@ pub struct Config { #[serde(default = "default_max_retry_attempts")] pub max_retry_attempts: u32, + + #[serde(default = "default_proxy_instances_per_certificate")] + pub proxy_instances_per_certificate: Option, } fn default_enable_vpn_rotation() -> bool { @@ -47,6 +50,10 @@ fn default_min_request_interval_ms() -> u64 { fn default_max_retry_attempts() -> u32 { 3 } +fn default_proxy_instances_per_certificate() -> Option { + Some(1) +} + impl Default for Config { fn default() -> Self { Self { @@ -59,6 +66,7 @@ impl Default for Config { min_request_interval_ms: default_min_request_interval_ms(), max_retry_attempts: default_max_retry_attempts(), enable_vpn_rotation: false, + proxy_instances_per_certificate: default_proxy_instances_per_certificate(), } } } @@ -112,6 +120,11 @@ impl Config { .parse() .context("Failed to parse MAX_RETRY_ATTEMPTS as u32")?; + let proxy_instances_per_certificate: Option = match dotenvy::var("PROXY_INSTANCES_PER_CERTIFICATE") { + Ok(val) => Some(val.parse().context("Failed to parse PROXY_INSTANCES_PER_CERTIFICATE as usize")?), + Err(_) => Some(1), + }; + Ok(Self { economic_start_date, corporate_start_date, @@ -122,6 +135,7 @@ impl Config { max_requests_per_session, min_request_interval_ms, max_retry_attempts, + proxy_instances_per_certificate, }) } diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 892e477..be4d53d 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -78,7 +78,7 @@ pub async fn run_full_update( } logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; - let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag).await?; + let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, _config, &None).await?; logger::log_info(&format!(" ✓ Saved {} companies", count)).await; if !shutdown_flag.load(Ordering::SeqCst) { diff --git a/src/corporate/update_parallel.rs b/src/corporate/update_parallel.rs index dad0467..4bc3fe7 100644 --- a/src/corporate/update_parallel.rs +++ b/src/corporate/update_parallel.rs @@ -1,16 +1,18 @@ -// src/corporate/update_parallel.rs - UPDATED WITH DATA INTEGRITY FIXES -// PARALLELIZED VERSION with atomic commits and validation +// src/corporate/update_parallel.rs - FIXED: Proper Hard Reset Implementation // -// Key improvements over original: -// - Page validation to prevent stale content extraction -// - Shutdown-aware task processing -// - Better error recovery with browser state cleanup -// - All original fsync and checkpoint logic preserved +// Critical fixes: +// 1. Hard reset actually performed (no premature break) +// 2. Error counter reset after hard reset +// 3. Per-ISIN status tracking (not per-company) +// 4. Proper task draining before reset +// 5. Queue rebuilding after reset use super::{types::*, yahoo::*, helpers::*}; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; +use crate::scraper::hard_reset::perform_hard_reset; +use crate::config::Config; use tokio::sync::mpsc; use tokio::io::AsyncWriteExt; @@ -36,17 +38,13 @@ struct CompanyProcessResult { is_update: bool, } -/// UPDATED: Abort-safe incremental JSONL persistence with validation -/// -/// New safety features: -/// - Page validation before extraction -/// - Shutdown checks at all critical points -/// - Browser state cleanup on errors -/// - All writes still atomic with fsync +/// Abort-safe incremental JSONL persistence with proper hard reset handling pub async fn build_companies_jsonl_streaming_parallel( paths: &DataPaths, pool: &Arc, shutdown_flag: &Arc, + config: &Config, + monitoring: &Option, ) -> anyhow::Result { // Configuration constants const CHECKPOINT_INTERVAL: usize = 50; @@ -54,9 +52,19 @@ pub async fn build_companies_jsonl_streaming_parallel( const FSYNC_INTERVAL_SECS: u64 = 10; const CONCURRENCY_LIMIT: usize = 100; + // Create hard reset controller + let reset_controller = pool.get_reset_controller(); + + // Wrap pool in mutex for potential replacement + let pool_mutex = Arc::new(tokio::sync::Mutex::new(Arc::clone(pool))); + + // Synchronization for hard reset + let reset_in_progress = Arc::new(tokio::sync::Mutex::new(false)); + let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); let securities_path = corporate_path.join("common_stocks.json"); + let securities_path_cloned = securities_path.clone(); if !securities_path.exists() { logger::log_warn("No common_stocks.json found").await; @@ -137,9 +145,9 @@ pub async fn build_companies_jsonl_streaming_parallel( let companies_path_clone = companies_path.clone(); let log_path_clone = log_path.clone(); let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); + let existing_companies_writer_clone = Arc::clone(&existing_companies_writer); let write_tx_for_writer = write_tx.clone(); - let writer_task = tokio::spawn(async move { let mut log_file = log_file_init; let mut writes_since_fsync = 0; @@ -278,112 +286,299 @@ pub async fn build_companies_jsonl_streaming_parallel( (count, new_count, updated_count) }); - // === PARALLEL PROCESSING PHASE === - logger::log_info(&format!( - "Starting parallel processing of {} companies (concurrency limit: {})", - securities.len(), - CONCURRENCY_LIMIT - )).await; - - let mut processing_tasks = FuturesUnordered::new(); - let mut processed = 0; + // === MAIN PROCESSING LOOP === let total = securities.len(); + logger::log_info(&format!("Processing {} companies with concurrency limit {}", total, CONCURRENCY_LIMIT)).await; - for (name, company_info) in securities.into_iter() { - // Check shutdown before creating new tasks - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected, stopping task creation").await; - break; - } - - // Wait if we hit concurrency limit - while processing_tasks.len() >= CONCURRENCY_LIMIT { - if let Some(result) = processing_tasks.next().await { - match result { - Ok(Ok(Some(company_result))) => { - let company_result: CompanyProcessResult = company_result; - let _ = write_tx_for_writer.send(LogCommand::Write(company_result.company)).await?; - processed += 1; - } - Ok(Ok(None)) => { - processed += 1; - } - Ok(Err(e)) => { - logger::log_warn(&format!("Company processing error: {}", e)).await; - processed += 1; - } - Err(e) => { - logger::log_error(&format!("Task panic: {}", e)).await; - processed += 1; - } - } - } + let mut tasks = FuturesUnordered::new(); + let mut pending = securities.into_iter().collect::>(); + let mut processed = 0; + let mut hard_reset_count = 0; + + // Spawn initial batch + for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { + if let Some((name, company_info)) = pending.pop() { + let current_pool = { + let pool_guard = pool_mutex.lock().await; + Arc::clone(&*pool_guard) + }; - if shutdown_flag.load(Ordering::SeqCst) { - break; - } - } - - if shutdown_flag.load(Ordering::SeqCst) { - break; - } - - // Spawn new task - let pool = pool.clone(); - let shutdown_flag = shutdown_flag.clone(); - let existing_entry = existing_companies.get(&name).cloned(); - - let task = tokio::spawn(async move { - process_single_company_validated( - name, - company_info, - existing_entry, - &pool, - &shutdown_flag - ).await - }); - - processing_tasks.push(task); - - if processed % 10 == 0 && processed > 0 { - logger::log_info(&format!("Progress: {}/{} companies processed", processed, total)).await; + let existing = existing_companies.get(&name).cloned(); + let shutdown_flag_clone = Arc::clone(shutdown_flag); + + let task = tokio::spawn(async move { + process_single_company_validated( + name, + company_info, + existing, + ¤t_pool, + &shutdown_flag_clone, + ).await + }); + + tasks.push(task); } } - // Wait for remaining tasks - logger::log_info(&format!( - "Waiting for {} remaining tasks to complete...", - processing_tasks.len() - )).await; - - while let Some(result) = processing_tasks.next().await { + // Process results and spawn new tasks + while let Some(task_result) = tasks.next().await { + // Check for shutdown if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected during final task wait").await; + logger::log_warn("Shutdown signal received, stopping processing").await; break; } - match result { - Ok(Ok(Some(company_result))) => { - if write_tx_for_writer.send(LogCommand::Write(company_result.company)).await.is_err() { - logger::log_error("Writer task died").await; - break; - } + match task_result { + Ok(Ok(Some(result))) => { + // Success: send to writer + let _ = write_tx_for_writer.send(LogCommand::Write(result.company)).await; processed += 1; + + // Log progress every 100 companies + if processed % 100 == 0 { + logger::log_info(&format!( + "Progress: {}/{} companies processed ({} resets)", + processed, + total, + hard_reset_count + )).await; + } + + // Spawn next task if available + if let Some((name, company_info)) = pending.pop() { + let current_pool = { + let pool_guard = pool_mutex.lock().await; + Arc::clone(&*pool_guard) + }; + + let existing = existing_companies.get(&name).cloned(); + let shutdown_flag_clone = Arc::clone(shutdown_flag); + + let task = tokio::spawn(async move { + process_single_company_validated( + name, + company_info, + existing, + ¤t_pool, + &shutdown_flag_clone, + ).await + }); + + tasks.push(task); + } } Ok(Ok(None)) => { + // No result (shutdown or skip) processed += 1; + + if let Some((name, company_info)) = pending.pop() { + let current_pool = { + let pool_guard = pool_mutex.lock().await; + Arc::clone(&*pool_guard) + }; + + let existing = existing_companies.get(&name).cloned(); + let shutdown_flag_clone = Arc::clone(shutdown_flag); + + let task = tokio::spawn(async move { + process_single_company_validated( + name, + company_info, + existing, + ¤t_pool, + &shutdown_flag_clone, + ).await + }); + + tasks.push(task); + } } Ok(Err(e)) => { - logger::log_warn(&format!("Company processing error: {}", e)).await; - processed += 1; + let error_msg = e.to_string(); + + if error_msg.contains("HARD_RESET_REQUIRED") { + // ✅ FIX: Don't break, perform actual hard reset + + // Check if reset already in progress (race condition protection) + let mut reset_lock = reset_in_progress.lock().await; + if *reset_lock { + logger::log_info("Hard reset already in progress, skipping duplicate").await; + processed += 1; + continue; + } + *reset_lock = true; + drop(reset_lock); // Release lock during reset + + logger::log_error("🔴 HARD RESET THRESHOLD REACHED - INITIATING RESET SEQUENCE").await; + logger::log_warn("Draining active tasks before hard reset...").await; + + // Save remaining pending count + let remaining_count = pending.len(); + + // Stop spawning new tasks + pending.clear(); + + // Wait for all active tasks to complete + let mut drained = 0; + while let Some(_) = tasks.next().await { + drained += 1; + if drained % 10 == 0 { + logger::log_info(&format!("Drained {} tasks...", drained)).await; + } + } + + logger::log_info(&format!( + "All tasks drained ({} active). {} companies need reprocessing.", + drained, + remaining_count + )).await; + + // Perform the actual hard reset + match perform_hard_reset(&pool_mutex, config, paths, monitoring, shutdown_flag).await { + Ok(()) => { + logger::log_info("✅ Hard reset completed successfully").await; + hard_reset_count += 1; + + // ✅ FIX: Reset the error counter + { + let pool_guard = pool_mutex.lock().await; + let current_pool = Arc::clone(&*pool_guard); + current_pool.get_reset_controller().reset(); + } + logger::log_info("✓ Error counter cleared").await; + + // ✅ FIX: Rebuild pending list from existing_companies + // Only re-add companies that haven't been written yet + let written_companies = { + let companies = existing_companies_writer_clone.lock().await; + companies.keys().cloned().collect::>() + }; + + // Create new pending list: all companies minus those already written + let all_companies_list: Vec<(String, CompanyInfo)> = { + // Need to reload securities since we cleared pending + let content = tokio::fs::read_to_string(&securities_path_cloned).await?; + let all_securities: HashMap = serde_json::from_str(&content)?; + all_securities.into_iter() + .filter(|(name, _)| !written_companies.contains(name)) + .collect() + }; + + pending = all_companies_list; + + logger::log_info(&format!( + "Restarting with {} remaining companies (out of {} total)", + pending.len(), + total + )).await; + + // Respawn initial batch with NEW pool + for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { + if let Some((name, company_info)) = pending.pop() { + let current_pool = { + let pool_guard = pool_mutex.lock().await; + Arc::clone(&*pool_guard) + }; + + let existing = existing_companies.get(&name).cloned(); + let shutdown_flag_clone = Arc::clone(shutdown_flag); + + let task = tokio::spawn(async move { + process_single_company_validated( + name, + company_info, + existing, + ¤t_pool, + &shutdown_flag_clone, + ).await + }); + + tasks.push(task); + } + } + + // Clear reset flag + let mut reset_lock = reset_in_progress.lock().await; + *reset_lock = false; + drop(reset_lock); + + // ✅ Continue processing (don't spawn duplicate task) + continue; + } + Err(reset_err) => { + logger::log_error(&format!("Hard reset failed: {}", reset_err)).await; + + // Clear reset flag + let mut reset_lock = reset_in_progress.lock().await; + *reset_lock = false; + drop(reset_lock); + + // Exit if hard reset fails + break; + } + } + } else { + // Regular error + logger::log_warn(&format!("Company processing error: {}", error_msg)).await; + processed += 1; + + // Spawn next task + if let Some((name, company_info)) = pending.pop() { + let current_pool = { + let pool_guard = pool_mutex.lock().await; + Arc::clone(&*pool_guard) + }; + + let existing = existing_companies.get(&name).cloned(); + let shutdown_flag_clone = Arc::clone(shutdown_flag); + + let task = tokio::spawn(async move { + process_single_company_validated( + name, + company_info, + existing, + ¤t_pool, + &shutdown_flag_clone, + ).await + }); + + tasks.push(task); + } + } } Err(e) => { + // Task panic logger::log_error(&format!("Task panic: {}", e)).await; processed += 1; + + // Spawn next task + if let Some((name, company_info)) = pending.pop() { + let current_pool = { + let pool_guard = pool_mutex.lock().await; + Arc::clone(&*pool_guard) + }; + + let existing = existing_companies.get(&name).cloned(); + let shutdown_flag_clone = Arc::clone(shutdown_flag); + + let task = tokio::spawn(async move { + process_single_company_validated( + name, + company_info, + existing, + ¤t_pool, + &shutdown_flag_clone, + ).await + }); + + tasks.push(task); + } } } } + logger::log_info("Main processing loop completed").await; + // Signal writer to finish let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await; let _ = write_tx_for_writer.send(LogCommand::Shutdown).await; @@ -394,8 +589,8 @@ pub async fn build_companies_jsonl_streaming_parallel( .unwrap_or((0, 0, 0)); logger::log_info(&format!( - "Completed: {} total companies ({} new, {} updated)", - final_count, final_new, final_updated + "✅ Completed: {} total companies ({} new, {} updated, {} hard resets)", + final_count, final_new, final_updated, hard_reset_count )).await; Ok(final_count) @@ -415,10 +610,25 @@ async fn scrape_with_retry( if shutdown_flag.load(Ordering::SeqCst) { return Err(anyhow!("Aborted due to shutdown")); } + + if pool.should_perform_hard_reset() { + logger::log_error("HARD_RESET_REQUIRED detected before scrape attempt").await; + return Err(anyhow!("HARD_RESET_REQUIRED")); + } match scrape_company_details_by_isin(pool, isin, shutdown_flag).await { Ok(result) => return Ok(result), Err(e) => { + // Check if this is a hard reset required error + let error_msg = e.to_string(); + if error_msg.contains("HARD_RESET_REQUIRED") { + logger::log_error(&format!( + "Hard reset required error for ISIN {}, propagating immediately", + isin + )).await; + return Err(e); // Propagate immediately, don't retry + } + if retries >= max_retries { logger::log_error(&format!( "All {} retries exhausted for ISIN {}: {}", @@ -443,7 +653,7 @@ async fn scrape_with_retry( } } -/// UPDATED: Process single company with validation and shutdown checks +/// Process single company with validation and shutdown checks async fn process_single_company_validated( name: String, company_info: CompanyInfo, @@ -485,7 +695,7 @@ async fn process_single_company_validated( } } - // Process each ISIN with validation + // ✅ FIX: Process each ISIN independently with per-ISIN status checking for (isin, figi_tickers) in unique_isin_ticker_pairs { // Check shutdown before each ISIN if shutdown_flag.load(Ordering::SeqCst) { @@ -506,9 +716,10 @@ async fn process_single_company_validated( } } - let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); + // ✅ FIX: Check if THIS SPECIFIC ISIN has Yahoo data + let has_yahoo_ticker_for_this_isin = tickers.iter().any(|t| t.starts_with("YAHOO:")); - if !has_yahoo_ticker { + if !has_yahoo_ticker_for_this_isin { logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; match scrape_with_retry(pool, &isin, 3, shutdown_flag).await { @@ -539,11 +750,24 @@ async fn process_single_company_validated( logger::log_warn(&format!("Shutdown during scrape for ISIN {}", isin)).await; break; } + + // Check if this is a hard reset required error + let error_msg = e.to_string(); + if error_msg.contains("HARD_RESET_REQUIRED") { + logger::log_error(&format!( + "Hard reset required during ISIN {} processing, propagating error", + isin + )).await; + return Err(e); // ← CRITICAL: Propagate immediately + } + logger::log_warn(&format!( "✗ Yahoo lookup error for ISIN {} (company: {}): {}", isin, name, e )).await; - // Continue with next ISIN + + // ✅ FIX: Mark this ISIN as failed to enable retry + tickers.push("YAHOO:ERROR".to_string()); } } } @@ -558,6 +782,11 @@ async fn process_single_company_validated( return Ok(None); } + if pool.should_perform_hard_reset() { + logger::log_error("HARD_RESET_REQUIRED detected during company processing").await; + return Err(anyhow!("HARD_RESET_REQUIRED")); + } + if !isin_tickers_map.is_empty() { let company_entry = CompanyCrossPlatformInfo { name: name.clone(), diff --git a/src/corporate/yahoo.rs b/src/corporate/yahoo.rs index 5afe96c..f34e8dc 100644 --- a/src/corporate/yahoo.rs +++ b/src/corporate/yahoo.rs @@ -74,6 +74,11 @@ pub async fn scrape_company_details_by_isin( logger::log_warn(&format!("Shutdown detected, skipping ISIN: {}", isin)).await; return Ok(None); } + + if pool.should_perform_hard_reset() { + logger::log_warn("HARD_RESET_REQUIRED detected before starting ISIN scrape").await; + return Err(anyhow!("HARD_RESET_REQUIRED")); + } let isin_owned = isin.to_string(); let shutdown_clone = Arc::clone(shutdown_flag); diff --git a/src/main.rs b/src/main.rs index cde6161..1d75ed7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,14 +66,18 @@ async fn main() -> Result<()> { logger::log_info("Monitoring dashboard available at http://localhost:3030").await; logger::init_debug_logger(paths.logs_dir()).await.ok(); - logger::log_info("=== Event Backtest Engine Started ===").await; + logger::log_info("=== Economic Webscraper Started ===").await; logger::log_info(&format!( - "Config → parallel_instances: {}, task_limit: {} vpn_rotation: {}", + "Config → parallel_instances: {}, task_limit: {} vpn_rotation: {} proxy_instances_per_certificate: {:?}", config.max_parallel_instances, config.max_tasks_per_instance, - config.enable_vpn_rotation + config.enable_vpn_rotation, + config.proxy_instances_per_certificate )).await; + let number_proxy_instances_per_certificate = config.proxy_instances_per_certificate.unwrap_or(1); + + // Simple shutdown flag let shutdown_flag = Arc::new(AtomicBool::new(false)); @@ -94,7 +98,7 @@ async fn main() -> Result<()> { None } else { logger::log_info(&format!("Found {} VPN servers – starting Docker proxy containers", server_count)).await; - let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await?); + let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password, number_proxy_instances_per_certificate).await?); logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await; for i in 0..pp.num_proxies() { @@ -115,10 +119,10 @@ async fn main() -> Result<()> { }; // === Step 2: Initialize ChromeDriver pool === - let pool_size = config.max_parallel_instances; + let pool_size_limit = config.max_parallel_instances; let task_limit = config.max_tasks_per_instance; - logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", pool_size)).await; + logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", pool_size_limit)).await; let pool = Arc::new( if task_limit > 0 { @@ -128,7 +132,7 @@ async fn main() -> Result<()> { } ); - logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size)).await; + logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size_limit)).await; // === Step 3: Ctrl+C handler === { @@ -166,7 +170,7 @@ async fn main() -> Result<()> { // === Step 4: Run scraping jobs === logger::log_info("--- Starting ECONOMIC data update ---").await; - economic::run_full_update(&config, &pool).await?; + //economic::run_full_update(&config, &pool).await?; logger::log_info("Economic update completed").await; if !shutdown_flag.load(Ordering::SeqCst) { diff --git a/src/monitoring/events.rs b/src/monitoring/events.rs index f6b5011..a5ee18e 100644 --- a/src/monitoring/events.rs +++ b/src/monitoring/events.rs @@ -22,6 +22,11 @@ pub enum MonitoringEvent { instance_id: usize, status: InstanceStatusChange, }, + + InstanceSelected { + instance_id: usize, + half: usize, + }, // Task execution TaskStarted { diff --git a/src/monitoring/service.rs b/src/monitoring/service.rs index 496ab97..baf0ae7 100644 --- a/src/monitoring/service.rs +++ b/src/monitoring/service.rs @@ -107,6 +107,10 @@ impl MonitoringService { } } + MonitoringEvent::InstanceSelected { instance_id, half } => { + self.log_info(format!("Instance #{} selected (half {})", instance_id, half)).await; + } + MonitoringEvent::TaskStarted { instance_id, url } => { let mut state = self.state.write().await; if let Some(inst) = state.instances.get_mut(&instance_id) { diff --git a/src/scraper/docker_vpn_proxy.rs b/src/scraper/docker_vpn_proxy.rs index acb8a76..abe9dff 100644 --- a/src/scraper/docker_vpn_proxy.rs +++ b/src/scraper/docker_vpn_proxy.rs @@ -10,7 +10,16 @@ pub struct DockerVpnProxyPool { } impl DockerVpnProxyPool { - pub async fn new(ovpn_dir: &Path, username: String, password: String) -> Result { + pub async fn new( + ovpn_dir: &Path, + username: String, + password: String, + instances_per_ovpn: usize, + ) -> Result { + if instances_per_ovpn == 0 { + return Err(anyhow!("instances_per_ovpn must be at least 1")); + } + // Count hostnames (subdirs in ovpn_dir) let hostnames: Vec<_> = std::fs::read_dir(ovpn_dir)? .filter_map(Result::ok) @@ -23,14 +32,21 @@ impl DockerVpnProxyPool { return Err(anyhow!("No VPN hostnames found in {:?}", ovpn_dir)); } - crate::util::logger::log_info(&format!("Found {} VPN hostnames", num_servers)).await; + // Calculate total containers: hostnames × instances_per_ovpn + let total_containers = num_servers * instances_per_ovpn; - let mut container_names = Vec::with_capacity(num_servers); - let mut proxy_ports = Vec::with_capacity(num_servers); + crate::util::logger::log_info(&format!( + "Found {} VPN hostnames × {} instances = {} total containers", + num_servers, instances_per_ovpn, total_containers + )).await; + + let mut container_names = Vec::with_capacity(total_containers); + let mut proxy_ports = Vec::with_capacity(total_containers); let base_port: u16 = 10800; + let mut port_counter = 0u16; // === STEP 1: Start ALL containers first === - for (i, hostname) in hostnames.iter().enumerate() { + for hostname in hostnames.iter() { // Pick tcp443.ovpn if exists, else first .ovpn let hostname_dir = ovpn_dir.join(hostname); let mut ovpn_path: Option = None; @@ -48,48 +64,58 @@ impl DockerVpnProxyPool { let ovpn_path = ovpn_path.ok_or_else(|| anyhow!("No .ovpn found for {}", hostname))?; - let name = format!("vpn-proxy-{}", i); - let port = base_port + i as u16 + 1; + // Spawn multiple instances for this .ovpn file + for instance_num in 0..instances_per_ovpn { + let name = format!("vpn-proxy-{}-{}", hostname, instance_num); + let port = base_port + port_counter + 1; + port_counter += 1; - // Clean up any existing container with the same name - let _ = Command::new("docker") - .args(["rm", "-f", &name]) - .status() - .await; + // Clean up any existing container with the same name + let _ = Command::new("docker") + .args(["rm", "-f", &name]) + .status() + .await; - // Run Docker container - let status = Command::new("docker") - .args([ - "run", "-d", - "--name", &name, - "--cap-add=NET_ADMIN", - "--device", "/dev/net/tun", - "--sysctl", "net.ipv4.ip_forward=1", - "-v", &format!("{}:/vpn/config.ovpn", ovpn_path.display()), - "-e", &format!("VPN_USERNAME={}", username), - "-e", &format!("VPN_PASSWORD={}", password), - "-p", &format!("{}:1080", port), - "rust-vpn-proxy", - ]) - .status() - .await - .context("Failed to run Docker")?; + // Run Docker container + let status = Command::new("docker") + .args([ + "run", "-d", + "--name", &name, + "--cap-add=NET_ADMIN", + "--device", "/dev/net/tun", + "--sysctl", "net.ipv4.ip_forward=1", + "-v", &format!("{}:/vpn/config.ovpn", ovpn_path.display()), + "-e", &format!("VPN_USERNAME={}", username), + "-e", &format!("VPN_PASSWORD={}", password), + "-p", &format!("{}:1080", port), + "rust-vpn-proxy", + ]) + .status() + .await + .context("Failed to run Docker")?; - if !status.success() { - return Err(anyhow!("Docker run failed for {}", name)); + if !status.success() { + return Err(anyhow!("Docker run failed for {}", name)); + } + + crate::util::logger::log_info(&format!( + "Started container {} on port {} (using {})", + name, port, ovpn_path.file_name().unwrap().to_string_lossy() + )).await; + + container_names.push(name); + proxy_ports.push(port); } - - crate::util::logger::log_info(&format!("Started container {} on port {} (waiting for VPN...)", name, port)).await; - - container_names.push(name); - proxy_ports.push(port); } // Brief pause to let containers start sleep(Duration::from_secs(8)).await; - crate::util::logger::log_info(&format!("All {} containers started, beginning health checks...", container_names.len())).await; + crate::util::logger::log_info(&format!( + "All {} containers started, beginning health checks...", + container_names.len() + )).await; - // === STEP 2: Test ALL proxies in parallel with 10-second intervals === + // === STEP 2: Test ALL proxies in parallel === let results = Self::test_all_proxies_parallel(&container_names, &proxy_ports).await; // Filter out failed containers @@ -100,8 +126,10 @@ impl DockerVpnProxyPool { for (i, (container_name, port)) in container_names.into_iter().zip(proxy_ports.into_iter()).enumerate() { match &results[i] { Ok(Some(ip)) => { - crate::util::logger::log_info(&format!("✓ Container {} on port {} ready with IP: {}", - container_name, port, ip)).await; + crate::util::logger::log_info(&format!( + "✓ Container {} on port {} ready with IP: {}", + container_name, port, ip + )).await; working_containers.push(container_name); working_ports.push(port); } @@ -113,14 +141,15 @@ impl DockerVpnProxyPool { .ok() .and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into()); - crate::util::logger::log_error(&format!("✗ Container {} on port {} ready but IP detection failed. Logs: {:?}", - container_name, port, logs)).await; + crate::util::logger::log_error(&format!( + "✗ Container {} on port {} ready but IP detection failed. Logs: {:?}", + container_name, port, logs + )).await; failed_count += 1; // Clean up failed container let _ = Self::cleanup_container(&container_name).await; } Err(e) => { - // Get container logs to debug let logs = Command::new("docker") .args(["logs", "--tail", "20", &container_name]) .output() @@ -128,8 +157,10 @@ impl DockerVpnProxyPool { .ok() .and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into()); - crate::util::logger::log_error(&format!("✗ Container {} on port {} failed: {}. Logs: {:?}", - container_name, port, e, logs)).await; + crate::util::logger::log_error(&format!( + "✗ Container {} on port {} failed: {}. Logs: {:?}", + container_name, port, e, logs + )).await; failed_count += 1; // Clean up failed container let _ = Self::cleanup_container(&container_name).await; @@ -138,14 +169,19 @@ impl DockerVpnProxyPool { } if working_containers.is_empty() { - return Err(anyhow!("All {} VPN proxy containers failed to start", num_servers)); + return Err(anyhow!("All {} VPN proxy containers failed to start", total_containers)); } - crate::util::logger::log_info(&format!("Started {}/{} VPN proxy containers successfully", - working_containers.len(), num_servers)).await; + crate::util::logger::log_info(&format!( + "Started {}/{} VPN proxy containers successfully ({} hostnames × {} instances)", + working_containers.len(), total_containers, num_servers, instances_per_ovpn + )).await; if failed_count > 0 { - crate::util::logger::log_warn(&format!("{} containers failed and were cleaned up", failed_count)).await; + crate::util::logger::log_warn(&format!( + "{} containers failed and were cleaned up", + failed_count + )).await; } Ok(Self { diff --git a/src/scraper/hard_reset.rs b/src/scraper/hard_reset.rs new file mode 100644 index 0000000..4232758 --- /dev/null +++ b/src/scraper/hard_reset.rs @@ -0,0 +1,239 @@ +// src/scraper/hard_reset.rs - PROPERLY FIXED: Matches main.rs initialization pattern +use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; + +use crate::{ChromeDriverPool, Config, logger, scraper::docker_vpn_proxy::{DockerVpnProxyPool, cleanup_all_proxy_containers}, util::directories::DataPaths}; + +/// Simple error counter for triggering hard resets +pub struct HardResetController { + consecutive_errors: AtomicUsize, +} + +impl HardResetController { + pub fn new() -> Self { + Self { + consecutive_errors: AtomicUsize::new(0), + } + } + + /// Record success - resets counter + pub fn record_success(&self) { + self.consecutive_errors.store(0, Ordering::SeqCst); + } + + /// Record error - returns new count + pub fn record_error(&self) -> usize { + self.consecutive_errors.fetch_add(1, Ordering::SeqCst) + 1 + } + + /// Reset counter + pub fn reset(&self) { + self.consecutive_errors.store(0, Ordering::SeqCst); + } + + /// Get current count + pub fn get_count(&self) -> usize { + self.consecutive_errors.load(Ordering::SeqCst) + } +} + +/// Perform hard reset: shutdown everything and recreate +pub async fn perform_hard_reset( + pool_mutex: &Arc>>, + config: &Config, + paths: &DataPaths, + monitoring: &Option, + shutdown_flag: &Arc, +) -> anyhow::Result<()> { + let number_proxy_instances = config.proxy_instances_per_certificate.unwrap_or(1); + logger::log_error("🔴 STARTING HARD RESET SEQUENCE").await; + + // Check if shutdown was requested + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown requested during hard reset, aborting").await; + return Ok(()); + } + + // Step 1: Acquire pool lock (prevents new tasks from using it) + logger::log_info(" [1/10] Acquiring pool lock...").await; + let mut pool_guard = pool_mutex.lock().await; + let old_pool = Arc::clone(&*pool_guard); + + // Step 2: Wait a moment for active tasks to complete + logger::log_info(" [2/10] Waiting 10 seconds for active tasks...").await; + drop(pool_guard); // Release lock so tasks can finish + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + + // Re-acquire lock + let mut pool_guard = pool_mutex.lock().await; + + // Step 3: Shutdown ChromeDriver pool + logger::log_info(" [3/10] Shutting down ChromeDriver pool...").await; + if let Err(e) = old_pool.shutdown().await { + logger::log_warn(&format!(" Warning: Pool shutdown error: {}", e)).await; + } + + // Step 4: Shutdown proxies + logger::log_info(" [4/10] Shutting down proxy containers...").await; + cleanup_all_proxy_containers().await.ok(); + + // Step 5: Wait for cleanup + logger::log_info(" [5/10] Waiting 30 seconds for cleanup...").await; + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; + + // Check shutdown again + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown requested during cleanup, aborting reset").await; + return Ok(()); + } + + // Step 6: Recreate proxy pool (if VPN rotation is enabled) + logger::log_info(" [6/10] Recreating proxy pool...").await; + let new_proxy_pool = if config.enable_vpn_rotation { + match recreate_proxy_pool_with_fresh_credentials(config, paths, monitoring, shutdown_flag).await { + Ok(pool) => { + logger::log_info(&format!( + " ✓ Proxy pool created with {} proxies", + pool.num_proxies() + )).await; + Some(pool) + } + Err(e) => { + logger::log_warn(&format!( + " ⚠️ Proxy creation failed: {}. Continuing without proxies.", + e + )).await; + None + } + } + } else { + logger::log_info(" ⊘ VPN rotation disabled, skipping proxy pool").await; + None + }; + + // Step 7: Recreate ChromeDriver pool + logger::log_info(" [7/10] Recreating ChromeDriver pool...").await; + let new_pool = Arc::new( + ChromeDriverPool::new_with_proxy_and_task_limit( + new_proxy_pool, + config, + monitoring.clone(), + ).await? + ); + + logger::log_info(" ✓ ChromeDriver pool created").await; + + // Step 8: Reset the error counter on the NEW pool + logger::log_info(" [8/10] Resetting error counter...").await; + new_pool.get_reset_controller().reset(); + logger::log_info(" ✓ Error counter cleared").await; + + // Step 9: Replace pool atomically + logger::log_info(" [9/10] Activating new pool...").await; + *pool_guard = new_pool; + drop(pool_guard); + + // Step 10: Emit monitoring event + logger::log_info(" [10/10] Updating monitoring...").await; + if let Some(mon) = monitoring { + mon.emit(crate::monitoring::MonitoringEvent::PoolInitialized { + pool_size: config.max_parallel_instances, + with_proxy: config.enable_vpn_rotation, + with_rotation: config.max_tasks_per_instance > 0, + }); + } + + logger::log_info("✅ HARD RESET COMPLETE").await; + + Ok(()) +} + +/// Recreate proxy pool with fresh VPNBook credentials (matches main.rs pattern) +async fn recreate_proxy_pool_with_fresh_credentials( + config: &Config, + paths: &DataPaths, + monitoring: &Option, + shutdown_flag: &Arc, +) -> anyhow::Result> { + + let number_proxy_instances = config.proxy_instances_per_certificate.unwrap_or(1); + + // Check shutdown + if shutdown_flag.load(Ordering::SeqCst) { + return Err(anyhow::anyhow!("Shutdown requested during proxy recreation")); + } + + logger::log_info(" [6.1] Creating temporary ChromeDriver pool for credential fetch...").await; + + // Create temporary pool WITHOUT proxy (just like main.rs does) + let temp_pool = Arc::new( + ChromeDriverPool::new_with_proxy_and_task_limit( + None, // No proxy for temp pool + config, + monitoring.clone(), + ).await? + ); + + logger::log_info(" [6.2] Fetching fresh VPNBook credentials...").await; + + // Fetch fresh VPNBook credentials (just like main.rs does) + let (username, password, _files) = crate::util::opnv::fetch_vpnbook_configs( + &temp_pool, + paths.cache_dir() + ).await?; + + logger::log_info(&format!(" [6.3] Got credentials → User: {}", username)).await; + + // Shutdown temp pool + logger::log_info(" [6.4] Shutting down temporary pool...").await; + temp_pool.shutdown().await.ok(); + + // Check shutdown again + if shutdown_flag.load(Ordering::SeqCst) { + return Err(anyhow::anyhow!("Shutdown requested during proxy recreation")); + } + + // Check if we have VPN server configs + let server_count = std::fs::read_dir(paths.cache_openvpn_dir())? + .filter(|e| e.as_ref().unwrap().path().is_dir()) + .count(); + + if server_count == 0 { + return Err(anyhow::anyhow!("No VPN servers found after credential fetch")); + } + + logger::log_info(&format!( + " [6.5] Found {} VPN servers → Creating proxy pool with {} instances per server...", + server_count, + number_proxy_instances + )).await; + + // Create new proxy pool (just like main.rs does) + let proxy_pool = Arc::new( + DockerVpnProxyPool::new( + paths.cache_openvpn_dir(), + username, + password, + number_proxy_instances, + ).await? + ); + + logger::log_info(&format!( + " [6.6] ✓ Proxy pool ready with {} total proxies", + proxy_pool.num_proxies() + )).await; + + // Emit proxy connected events for monitoring + if let Some(mon) = monitoring { + for i in 0..proxy_pool.num_proxies() { + if let Some(proxy_info) = proxy_pool.get_proxy_info(i) { + mon.emit(crate::monitoring::MonitoringEvent::ProxyConnected { + container_name: proxy_info.container_name.clone(), + ip_address: proxy_info.ip_address.clone(), + port: proxy_info.port, + }); + } + } + } + + Ok(proxy_pool) +} \ No newline at end of file diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index 4f3ee99..dece4db 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -1,3 +1,4 @@ pub mod webdriver; pub mod docker_vpn_proxy; -pub mod helpers; \ No newline at end of file +pub mod helpers; +pub mod hard_reset; \ No newline at end of file diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index dfc378b..f19b12c 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -1,5 +1,9 @@ // src/scraper/webdriver.rs use super::helpers::*; +use super::hard_reset::HardResetController; +use super::docker_vpn_proxy::DockerVpnProxyPool; +use crate::Config; +use crate::logger; use anyhow::{anyhow, Context, Result}; use fantoccini::{Client, ClientBuilder}; @@ -13,8 +17,6 @@ use tokio::process::{Child, Command}; use tokio::task::JoinHandle; use tokio::sync::{Mutex, Semaphore}; use tokio::time::{sleep, timeout, Duration}; -use crate::scraper::docker_vpn_proxy::{DockerVpnProxyPool}; -use crate::Config; /// Manages a pool of ChromeDriver instances for parallel scraping with optional VPN binding. pub struct ChromeDriverPool { @@ -31,10 +33,16 @@ pub struct ChromeDriverPool { min_request_interval_ms: u64, monitoring: Option, + hard_reset_controller: Arc, + config: Arc, } impl ChromeDriverPool { -/// Creates a new pool without any proxy (direct connection). + /// When consecutive errors reach this value, execute() will return a special error + /// that signals the caller to trigger a hard reset + const HARD_RESET_ERROR_THRESHOLD: usize = 12; + + /// Creates a new pool without any proxy (direct connection). pub async fn _new(config: &Config, monitoring: Option,) -> Result { Self::new_with_proxy_and_task_limit(None, config, monitoring).await } @@ -85,6 +93,11 @@ impl ChromeDriverPool { // Rotation is enabled when task limiting is active let rotation_enabled = task_per_instance_limit > 0; + let half_size = if rotation_enabled { + (actual_pool_size + 1) / 2 // Runde auf bei ungerader Zahl + } else { + actual_pool_size + }; let mut instances = Vec::with_capacity(actual_pool_size); @@ -105,8 +118,8 @@ impl ChromeDriverPool { for i in 0..actual_pool_size { // Pass the entire proxy_pool and the index let instance = ChromeInstance::new( - proxy_pool.clone(), // Clone the Arc - i, // This instance's proxy index + proxy_pool.clone(), + i, config, monitoring.clone(), ).await?; @@ -162,15 +175,21 @@ impl ChromeDriverPool { let min_request_interval_ms = config.min_request_interval_ms; + let hard_reset_controller = Arc::new(HardResetController::new()); + + let config_clone = Arc::new(config.clone()); + Ok(Self { instances, - semaphore: Arc::new(Semaphore::new(actual_pool_size)), + semaphore: Arc::new(Semaphore::new(half_size)), proxy_pool, rotation_enabled, next_instance: Arc::new(Mutex::new(0)), last_request_time: Arc::new(Mutex::new(Instant::now())), min_request_interval_ms, monitoring, + hard_reset_controller, + config: config_clone, }) } @@ -188,10 +207,8 @@ impl ChromeDriverPool { if elapsed < self.min_request_interval_ms { let wait_ms = self.min_request_interval_ms - elapsed; - drop(last_time); // Lock vor Sleep freigeben! - + drop(last_time); sleep(Duration::from_millis(wait_ms)).await; - let mut last_time = self.last_request_time.lock().await; *last_time = Instant::now(); } else { @@ -199,12 +216,20 @@ impl ChromeDriverPool { } } - let random_index = random_range(0, self.instances.len() as u64) as usize; - // Index-Auswahl (vereinfacht, siehe unten für vollständige Rotation) - let index = if self.rotation_enabled { - self.get_rotated_index().await? + let instance = if self.rotation_enabled { + self.select_instance_with_rotation().await? } else { - random_index + self.select_instance_round_robin().await + }; + + { + let mut inst = instance.lock().await; + inst.increment_task_count(); + } + + let index: usize = { + let instances = &self.instances; + instances.iter().position(|inst| Arc::ptr_eq(inst, &instance)).unwrap_or(0) }; if let Some(ref mon) = self.monitoring { @@ -216,15 +241,10 @@ impl ChromeDriverPool { instance_id: index, status: crate::monitoring::InstanceStatusChange::Active, }); - } + }; - let instance = &self.instances[index]; let mut guard = instance.lock().await; - - // NEU: Session mit automatischer Erneuerung holen! let client = guard.get_or_renew_session().await?; - - guard.increment_task_count(); let (task_count, session_requests) = guard.get_session_stats().await; crate::util::logger::log_info(&format!( @@ -232,17 +252,17 @@ impl ChromeDriverPool { index, task_count, guard.max_tasks_per_instance, session_requests )).await; - drop(guard); // Lock freigeben vor Navigation + drop(guard); let start_time = Instant::now(); - // Navigation mit Timeout + // Navigation with timeout let navigation_result = timeout( Duration::from_secs(60), client.goto(&url) ).await; - match navigation_result { + let result = match navigation_result { Ok(Ok(_)) => { if let Some(ref mon) = self.monitoring { mon.emit(crate::monitoring::MonitoringEvent::TaskCompleted { @@ -258,14 +278,111 @@ impl ChromeDriverPool { } crate::util::logger::log_info(&format!("✓ Navigated to {}", url)).await; - // Parse-Funktion ausführen - parse(client).await + // Execute parse function + match parse(client).await { + Ok(data) => { + // ✅ SUCCESS: Record and log + let prev_count = self.hard_reset_controller.get_count(); + self.hard_reset_controller.record_success(); + + if prev_count > 0 { + logger::log_info(&format!( + "✓ Success - reset counter cleared (was: {}/{})", + prev_count, + Self::HARD_RESET_ERROR_THRESHOLD + )).await; + } + + Ok(data) + } + Err(e) => { + // ❌ PARSE ERROR: Record, check threshold, invalidate session + let error_count = self.hard_reset_controller.record_error(); + + { + let mut inst = instance.lock().await; + inst.invalidate_current_session().await; + } + + // Enhanced logging with threshold status + let threshold_pct = (error_count as f64 / Self::HARD_RESET_ERROR_THRESHOLD as f64) * 100.0; + logger::log_warn(&format!( + "Parse error. Reset counter: {}/{} ({:.0}%)", + error_count, + Self::HARD_RESET_ERROR_THRESHOLD, + threshold_pct + )).await; + + // Check if threshold reached + if error_count >= Self::HARD_RESET_ERROR_THRESHOLD { + logger::log_error(&format!( + "🔴 HARD RESET THRESHOLD REACHED ({}/{})", + error_count, + Self::HARD_RESET_ERROR_THRESHOLD + )).await; + + return Err(anyhow!( + "HARD_RESET_REQUIRED: Parse failed: {}. Threshold reached ({}/{})", + e, + error_count, + Self::HARD_RESET_ERROR_THRESHOLD + )); + } + + Err(anyhow!( + "Parse failed: {}. Hard reset at {}/{}", + e, + error_count, + Self::HARD_RESET_ERROR_THRESHOLD + )) + } + } } Ok(Err(e)) => { + // ❌ NAVIGATION ERROR: Record, check threshold, invalidate session crate::util::logger::log_error(&format!("Navigation failed: {}", e)).await; - Err(anyhow!("Navigation failed: {}", e)) + + { + let mut inst = instance.lock().await; + inst.invalidate_current_session().await; + } + + let error_count = self.hard_reset_controller.record_error(); + + // Enhanced logging + let threshold_pct = (error_count as f64 / Self::HARD_RESET_ERROR_THRESHOLD as f64) * 100.0; + logger::log_warn(&format!( + "Navigation error. Reset counter: {}/{} ({:.0}%)", + error_count, + Self::HARD_RESET_ERROR_THRESHOLD, + threshold_pct + )).await; + + // Check if threshold reached + if error_count >= Self::HARD_RESET_ERROR_THRESHOLD { + logger::log_error(&format!( + "🔴 HARD RESET THRESHOLD REACHED ({}/{})", + error_count, + Self::HARD_RESET_ERROR_THRESHOLD + )).await; + + return Err(anyhow!( + "HARD_RESET_REQUIRED: Navigation failed: {}. Threshold reached ({}/{})", + e, + error_count, + Self::HARD_RESET_ERROR_THRESHOLD + )); + } + + Err(anyhow!( + "Navigation failed: {}. Hard reset at {}/{}", + e, + error_count, + Self::HARD_RESET_ERROR_THRESHOLD + )) } Err(_) => { + // ❌ TIMEOUT ERROR: Record, check threshold, invalidate session if let Some(ref mon) = self.monitoring { mon.emit(crate::monitoring::MonitoringEvent::NavigationTimeout { instance_id: index, @@ -273,55 +390,138 @@ impl ChromeDriverPool { }); } + let error_count = self.hard_reset_controller.record_error(); + crate::util::logger::log_error("Navigation timeout (60s)").await; - Err(anyhow!("Navigation timeout")) + + { + let mut inst = instance.lock().await; + inst.invalidate_current_session().await; + } + + // Enhanced logging + let threshold_pct = (error_count as f64 / Self::HARD_RESET_ERROR_THRESHOLD as f64) * 100.0; + logger::log_warn(&format!( + "Timeout error. Reset counter: {}/{} ({:.0}%)", + error_count, + Self::HARD_RESET_ERROR_THRESHOLD, + threshold_pct + )).await; + + // Check if threshold reached + if error_count >= Self::HARD_RESET_ERROR_THRESHOLD { + logger::log_error(&format!( + "🔴 HARD RESET THRESHOLD REACHED ({}/{})", + error_count, + Self::HARD_RESET_ERROR_THRESHOLD + )).await; + + return Err(anyhow!( + "HARD_RESET_REQUIRED: Navigation timeout. Threshold reached ({}/{})", + error_count, + Self::HARD_RESET_ERROR_THRESHOLD + )); + } + + Err(anyhow!( + "Navigation timeout. Hard reset at {}/{}", + error_count, + Self::HARD_RESET_ERROR_THRESHOLD + )) } + }; + + { + let mut inst = instance.lock().await; + inst.task_count = inst.task_count.saturating_sub(1); } + + result } - async fn get_rotated_index(&self) -> Result { - let total = self.instances.len(); - let half_size = total / 2; + /// Simple round-robin instance selection (no rotation) + async fn select_instance_round_robin(&self) -> Arc> { + let mut next = self.next_instance.lock().await; + let index = *next; + *next = (*next + 1) % self.instances.len(); + drop(next); + + Arc::clone(&self.instances[index]) + } + + /// Round-robin with half-pool rotation + async fn select_instance_with_rotation(&self) -> Result>> { + let pool_size = self.instances.len(); + let half_size = pool_size / 2; if half_size == 0 { - return Ok(0); // Pool zu klein für Rotation + // Pool too small for rotation, fall back to simple round-robin + return Ok(self.select_instance_round_robin().await); } - let mut next_idx = self.next_instance.lock().await; - let current_half_start = if *next_idx < half_size { 0 } else { half_size }; - let current_half_end = if *next_idx < half_size { half_size } else { total }; + let mut next = self.next_instance.lock().await; + let current_half_start = (*next / half_size) * half_size; + let current_half_end = (current_half_start + half_size).min(pool_size); - // Suche verfügbare Instanz in aktueller Hälfte - for offset in 0..(current_half_end - current_half_start) { - let candidate_idx = current_half_start + ((*next_idx + offset) % half_size); + // Try to find available instance in current half + let mut attempts = 0; + let max_attempts = half_size * 2; // Try both halves + + while attempts < max_attempts { + let index = current_half_start + (*next % half_size); + let instance = &self.instances[index]; - let instance = &self.instances[candidate_idx]; - let guard = instance.lock().await; + // Check if instance can accept more tasks + let mut inst = instance.lock().await; + let can_accept = inst.get_task_count() < inst.max_tasks_per_instance; + drop(inst); - if guard.max_tasks_per_instance == 0 || - guard.task_count < guard.max_tasks_per_instance { - *next_idx = (candidate_idx + 1) % total; - drop(guard); - return Ok(candidate_idx); + if can_accept { + *next = (*next + 1) % pool_size; + drop(next); + + if let Some(ref mon) = self.monitoring { + mon.emit(crate::monitoring::MonitoringEvent::InstanceSelected { + instance_id: index, + half: if index < half_size { 1 } else { 2 }, + }); + } + + return Ok(Arc::clone(instance)); } + + // Current half saturated, try other half + if attempts == half_size - 1 { + logger::log_info("Current half saturated, rotating to other half").await; + *next = if current_half_start == 0 { half_size } else { 0 }; + } else { + *next = (*next + 1) % pool_size; + } + + attempts += 1; } - // Aktuelle Hälfte voll → Zur anderen wechseln - crate::util::logger::log_info("Current half saturated, rotating to other half").await; + drop(next); - let new_half_start = if current_half_start == 0 { half_size } else { 0 }; - let new_half_end = if current_half_start == 0 { total } else { half_size }; - - // Alte Hälfte zurücksetzen (für nächste Rotation) - for i in current_half_start..current_half_end { - let mut instance = self.instances[i].lock().await; - instance.reset_task_count(); - } - - *next_idx = new_half_start; - drop(next_idx); - - Ok(new_half_start) + // All instances saturated + Err(anyhow!("All instances at task capacity")) + } + + pub fn get_reset_controller(&self) -> Arc { + Arc::clone(&self.hard_reset_controller) + } + + /// Check if hard reset threshold has been reached + pub fn should_perform_hard_reset(&self) -> bool { + self.hard_reset_controller.get_count() >= Self::HARD_RESET_ERROR_THRESHOLD + } + + /// Get current error count and threshold for monitoring + pub fn get_reset_status(&self) -> (usize, usize) { + ( + self.hard_reset_controller.get_count(), + Self::HARD_RESET_ERROR_THRESHOLD + ) } /// Gracefully shut down all ChromeDriver processes and Docker proxy containers. @@ -369,7 +569,7 @@ pub struct ChromeInstance { current_session: Arc>>, // Current active session session_request_count: Arc>, - max_requests_per_session: usize, // z.B. 25 + max_requests_per_session: usize, proxy_pool: Option>, // Referernce to the proxy pool current_proxy_index: Arc>, // Current proxy index in use @@ -411,8 +611,6 @@ impl ChromeInstance { pub async fn get_or_renew_session(&self) -> Result { let mut session_opt = self.current_session.lock().await; let mut request_count = self.session_request_count.lock().await; - - let old_request_count = *request_count; // Session erneuern wenn: // 1. Keine Session vorhanden @@ -476,7 +674,7 @@ impl ChromeInstance { mon.emit(crate::monitoring::MonitoringEvent::SessionRenewed { instance_id: self.instance_id, old_request_count: *request_count, - reason: crate::monitoring::RenewalReason::RequestLimit, + reason: reason, new_proxy: new_proxy_info, }); } @@ -490,15 +688,21 @@ impl ChromeInstance { } async fn create_fresh_session(&self) -> Result { - // Hole aktuellen Proxy-URL ohne self zu mutieren let proxy_url = if let Some(ref pool) = self.proxy_pool { let mut proxy_idx = self.current_proxy_index.lock().await; - *proxy_idx = (*proxy_idx + 1) % pool.num_proxies(); - let url = pool.get_proxy_url(*proxy_idx); + let num_proxies = pool.num_proxies(); - crate::util::logger::log_info(&format!( - "Using proxy {} for new session", - *proxy_idx + // Round-robin through all proxies + let selected_proxy = *proxy_idx % num_proxies; + *proxy_idx = (*proxy_idx + 1) % num_proxies; + + let url = pool.get_proxy_url(selected_proxy); + + logger::log_info(&format!( + "Instance {} creating session with proxy {}/{} (rotation)", + self.instance_id, + selected_proxy, + num_proxies )).await; Some(url) @@ -516,38 +720,19 @@ impl ChromeInstance { .context("Failed to connect to ChromeDriver") } - fn chrome_args_with_ua(&self, user_agent: &str, proxy_url: &Option) -> Map { - let mut args = vec![ - "--headless=new".to_string(), - "--disable-gpu".to_string(), - "--no-sandbox".to_string(), - "--disable-dev-shm-usage".to_string(), - "--disable-infobars".to_string(), - "--disable-extensions".to_string(), - "--disable-popup-blocking".to_string(), - "--disable-notifications".to_string(), - "--disable-autofill".to_string(), - "--disable-sync".to_string(), - "--disable-default-apps".to_string(), - "--disable-translate".to_string(), - "--disable-blink-features=AutomationControlled".to_string(), - format!("--user-agent={}", user_agent), - ]; + pub async fn invalidate_current_session(&self) { + let mut session_opt = self.current_session.lock().await; - if let Some(proxy) = proxy_url { - args.push(format!("--proxy-server={}", proxy)); + if let Some(old_session) = session_opt.take() { + crate::util::logger::log_info(&format!( + "Invalidating broken session for instance {}", + self.instance_id + )).await; + let _ = old_session.close().await; } - let caps = serde_json::json!({ - "goog:chromeOptions": { - "args": args, - "excludeSwitches": ["enable-logging", "enable-automation"], - "prefs": { - "profile.default_content_setting_values.notifications": 2 - } - } - }); - caps.as_object().cloned().unwrap() + let mut request_count = self.session_request_count.lock().await; + *request_count = 0; } pub fn reset_task_count(&mut self) { @@ -578,6 +763,20 @@ impl ChromeInstance { Ok(()) } + pub fn is_available(&self) -> bool { + if self.max_tasks_per_instance == 0 { + return true; // No limit + } + self.task_count < self.max_tasks_per_instance + } + + pub fn tasks_remaining(&self) -> usize { + if self.max_tasks_per_instance == 0 { + return usize::MAX; + } + self.max_tasks_per_instance.saturating_sub(self.task_count) + } + /// Spawns the actual `chromedriver` binary and waits for it to become ready. async fn spawn_chromedriver() -> Result<(String, Child, JoinHandle<()>)> { let mut process = Command::new("chromedriver-win64/chromedriver.exe") @@ -624,6 +823,40 @@ impl ChromeInstance { Err(anyhow!("ChromeDriver failed to start within 30s")) } + fn chrome_args_with_ua(&self, user_agent: &str, proxy_url: &Option) -> Map { + let mut args = vec![ + "--headless=new".to_string(), + "--disable-gpu".to_string(), + "--no-sandbox".to_string(), + "--disable-dev-shm-usage".to_string(), + "--disable-infobars".to_string(), + "--disable-extensions".to_string(), + "--disable-popup-blocking".to_string(), + "--disable-notifications".to_string(), + "--disable-autofill".to_string(), + "--disable-sync".to_string(), + "--disable-default-apps".to_string(), + "--disable-translate".to_string(), + "--disable-blink-features=AutomationControlled".to_string(), + format!("--user-agent={}", user_agent), + ]; + + if let Some(proxy) = proxy_url { + args.push(format!("--proxy-server={}", proxy)); + } + + let caps = serde_json::json!({ + "goog:chromeOptions": { + "args": args, + "excludeSwitches": ["enable-logging", "enable-automation"], + "prefs": { + "profile.default_content_setting_values.notifications": 2 + } + } + }); + caps.as_object().cloned().unwrap() + } + pub fn chrome_user_agent() -> &'static str { static UAS: &[&str] = &[ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.91 Safari/537.36",