diff --git a/data_updating_rule.md b/data_updating_rule.md new file mode 100644 index 0000000..80193a6 --- /dev/null +++ b/data_updating_rule.md @@ -0,0 +1,25 @@ +# Abort-Safe Incremental JSONL Persistence Rule + +**Rule:** Persist state using an *append-only, fsync-backed JSONL log with atomic checkpoints*. + +**Requirements** +- Write updates as **single-line JSON objects** (one logical mutation per line). +- **Append only** (`O_APPEND`), never modify existing lines. +- After each write batch, call **`fsync`** (or `File::sync_data`) before reporting success. +- Treat a **line as committed only if it ends with `\n`**; ignore trailing partial lines on recovery. +- Periodically create a **checkpoint**: + - Write full state to `state.tmp` + - `fsync` + - **Atomic rename** to `state.jsonl` +- On startup: + - Load last checkpoint + - Replay log lines after it in order +- On abort/panic/crash: + - No truncation + - Replay guarantees no data loss beyond last fsynced line + +**Outcome** +- Crash/abort-safe +- O(1) writes +- Deterministic recovery +- Minimal overhead diff --git a/src/config.rs b/src/config.rs index 16d3639..e1b8864 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,21 +21,12 @@ pub struct Config { /// If set to "true", enables automatic VPN rotation between sessions #[serde(default)] pub enable_vpn_rotation: bool, - - /// Number of tasks per session before rotating VPN - /// If set to 0, rotates VPN between economic and corporate phases - #[serde(default = "default_tasks_per_session")] - pub tasks_per_vpn_session: usize, } fn default_max_parallel_instances() -> usize { 10 } -fn default_tasks_per_session() -> usize { - 0 // 0 = rotate between economic/corporate -} - impl Default for Config { fn default() -> Self { Self { @@ -45,7 +36,6 @@ impl Default for Config { max_parallel_instances: default_max_parallel_instances(), max_tasks_per_instance: 0, enable_vpn_rotation: false, - tasks_per_vpn_session: default_tasks_per_session(), } } } @@ -93,11 +83,6 @@ impl Config { .parse::() .context("Failed to parse ENABLE_VPN_ROTATION as bool")?; - let tasks_per_vpn_session: usize = dotenvy::var("TASKS_PER_VPN_SESSION") - .unwrap_or_else(|_| "0".to_string()) - .parse() - .context("Failed to parse TASKS_PER_VPN_SESSION as usize")?; - Ok(Self { economic_start_date, corporate_start_date, @@ -105,7 +90,6 @@ impl Config { max_parallel_instances, max_tasks_per_instance, enable_vpn_rotation, - tasks_per_vpn_session, }) } diff --git a/src/corporate/types.rs b/src/corporate/types.rs index 6c01c1f..2173893 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -79,10 +79,19 @@ pub struct CompanyInfo{ pub securities: HashMap>, // ISIN -> Vec } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct YahooCompanyDetails { + pub ticker: String, + pub sector: Option, + pub exchange: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CompanyCrossPlatformInfo { pub name: String, pub isin_tickers_map: HashMap>, // ISIN -> Tickers + pub sector: Option, + pub exchange: Option, } /// Warrant Info diff --git a/src/corporate/update.rs b/src/corporate/update.rs index eb8729f..ffd8dc9 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,4 +1,4 @@ -// src/corporate/update.rs - COMPLETE STREAMING VERSION +// src/corporate/update.rs - ABORT-SAFE VERSION WITH JSONL LOG use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*}; use crate::config::Config; @@ -9,14 +9,17 @@ use crate::scraper::webdriver::ChromeDriverPool; use chrono::Local; use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; -/// Main update function - fully streaming, minimal memory usage -pub async fn run_full_update(_config: &Config, pool: &Arc) -> anyhow::Result<()> { +pub async fn run_full_update( + _config: &Config, + pool: &Arc, + shutdown_flag: &Arc, +) -> anyhow::Result<()> { logger::log_info("=== Corporate Update (STREAMING MODE) ===").await; let paths = DataPaths::new(".")?; - // Step 1: Download GLEIF CSV (don't load into memory) logger::log_info("Step 1: Downloading GLEIF CSV...").await; let gleif_csv_path = match download_isin_lei_csv().await? { Some(p) => { @@ -29,14 +32,19 @@ pub async fn run_full_update(_config: &Config, pool: &Arc) -> } }; - // Step 2: Load OpenFIGI type lists (small, cached) + if shutdown_flag.load(Ordering::SeqCst) { + return Ok(()); + } + logger::log_info("Step 2: Loading OpenFIGI metadata...").await; load_figi_type_lists().await.ok(); logger::log_info(" ✓ OpenFIGI metadata loaded").await; - // Step 3: Check mapping status and process only unmapped LEIs + if shutdown_flag.load(Ordering::SeqCst) { + return Ok(()); + } + logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; - let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?; if !all_mapped { @@ -45,7 +53,10 @@ pub async fn run_full_update(_config: &Config, pool: &Arc) -> logger::log_info(" ✓ All LEIs successfully mapped").await; } - // Step 4: Build securities from FIGI data (streaming) + if shutdown_flag.load(Ordering::SeqCst) { + return Ok(()); + } + logger::log_info("Step 4: Building securities map (streaming)...").await; let date_dir = find_most_recent_figi_date_dir(&paths).await?; @@ -57,22 +68,37 @@ pub async fn run_full_update(_config: &Config, pool: &Arc) -> logger::log_warn(" ✗ No FIGI data directory found").await; } - // Step 5: Build companies JSONL (streaming from securities) - logger::log_info("Step 5: Building companies.jsonl (streaming)...").await; - let count = build_companies_jsonl_streaming(&paths, pool).await?; + if shutdown_flag.load(Ordering::SeqCst) { + return Ok(()); + } + + logger::log_info("Step 5: Building companies.jsonl (streaming with abort-safe persistence)...").await; + let count = build_companies_jsonl_streaming(&paths, pool, shutdown_flag).await?; logger::log_info(&format!(" ✓ Saved {} companies", count)).await; - // Step 6: Process events (using index, not full load) - logger::log_info("Step 6: Processing events (using index)...").await; - let _event_index = build_event_index(&paths).await?; - logger::log_info(" ✓ Event index built").await; + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 6: Processing events (using index)...").await; + let _event_index = build_event_index(&paths).await?; + logger::log_info(" ✓ Event index built").await; + } logger::log_info("✓ Corporate update complete").await; Ok(()) } -/// Stream companies.jsonl creation from securities cache - INCREMENTAL MODE -async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc) -> anyhow::Result { +/// Abort-safe incremental JSONL persistence with atomic checkpoints +/// +/// Implements the data_updating_rule.md specification: +/// - Append-only JSONL log for all updates +/// - fsync after each write batch +/// - Atomic checkpoints via temp file + rename +/// - Crash recovery by loading checkpoint + replaying log +/// - Partial lines ignored during recovery +async fn build_companies_jsonl_streaming( + paths: &DataPaths, + pool: &Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); let securities_path = corporate_path.join("common_stocks.json"); @@ -82,59 +108,116 @@ async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc = serde_json::from_str(&content)?; let companies_path = paths.data_dir().join("companies.jsonl"); + let log_path = paths.data_dir().join("companies_updates.log"); if let Some(parent) = companies_path.parent() { tokio::fs::create_dir_all(parent).await?; } - // Load existing companies into a map + // === RECOVERY PHASE 1: Load last checkpoint === let mut existing_companies: HashMap = HashMap::new(); + let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); if companies_path.exists() { - logger::log_info("Loading existing companies.jsonl...").await; + logger::log_info("Loading checkpoint from companies.jsonl...").await; let existing_content = tokio::fs::read_to_string(&companies_path).await?; for line in existing_content.lines() { if line.trim().is_empty() { continue; } + // Only process complete lines (ending with proper JSON closing brace) + // This ensures we don't process partial writes from crashed processes + if !line.ends_with('}') { + logger::log_warn(&format!("Skipping incomplete checkpoint line: {}", &line[..line.len().min(50)])).await; + continue; + } match serde_json::from_str::(line) { Ok(company) => { + processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); } Err(e) => { - logger::log_warn(&format!("Failed to parse existing company line: {}", e)).await; + logger::log_warn(&format!("Failed to parse checkpoint line: {}", e)).await; } } } - logger::log_info(&format!("Loaded {} existing companies", existing_companies.len())).await; + logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; } - // Create temporary file for atomic write - let temp_path = companies_path.with_extension("jsonl.tmp"); - let mut file = tokio::fs::File::create(&temp_path).await?; - let mut count = 0; + // === RECOVERY PHASE 2: Replay log after checkpoint === + if log_path.exists() { + logger::log_info("Replaying update log...").await; + let log_content = tokio::fs::read_to_string(&log_path).await?; + let mut replayed = 0; + for line in log_content.lines() { + if line.trim().is_empty() { + continue; + } + // Only replay complete lines (crash-safe: incomplete lines are ignored) + // A line is considered complete only if it ends with '\n' and valid JSON + if !line.ends_with('}') { + logger::log_warn(&format!("Skipping incomplete log line: {}", &line[..line.len().min(50)])).await; + continue; + } + match serde_json::from_str::(line) { + Ok(company) => { + processed_names.insert(company.name.clone()); + existing_companies.insert(company.name.clone(), company); + replayed += 1; + } + Err(e) => { + logger::log_warn(&format!("Failed to parse log line: {}", e)).await; + } + } + } + if replayed > 0 { + logger::log_info(&format!("Replayed {} updates from log", replayed)).await; + } + } + + // === APPEND-ONLY LOG: Open in append mode with O_APPEND semantics === + use tokio::fs::OpenOptions; + let mut log_file = OpenOptions::new() + .create(true) + .append(true) // O_APPEND - atomic append operations + .open(&log_path) + .await?; + + let mut count = existing_companies.len(); let mut updated_count = 0; let mut new_count = 0; + let checkpoint_interval = 50; // Create atomic checkpoint every 50 updates + let mut updates_since_checkpoint = 0; use tokio::io::AsyncWriteExt; for (name, company_info) in securities.iter() { - // Check if we already have this company - let existing_entry = existing_companies.remove(name); + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Shutdown requested - stopping company processing").await; + break; + } + + // Skip if already processed (from checkpoint or log replay) + if processed_names.contains(name) { + continue; + } + + let existing_entry = existing_companies.get(name).cloned(); let is_update = existing_entry.is_some(); - // Start with existing ISIN-ticker map or create new one let mut isin_tickers_map: HashMap> = existing_entry - .map(|e| e.isin_tickers_map) + .as_ref() + .map(|e| e.isin_tickers_map.clone()) .unwrap_or_default(); - // Step 1: Extract unique ISIN-ticker pairs from FigiInfo + let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); + let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone()); + let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); for figi_infos in company_info.securities.values() { @@ -144,7 +227,6 @@ async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc { - let log_msg = match &result { - YahooTickerResult::Found(ticker) => - format!("✓ Found Yahoo ticker {} for ISIN {}", ticker, isin), - YahooTickerResult::NoResults => - format!("○ No search results for ISIN {}", isin), - YahooTickerResult::NotFound => - format!("○ Empty ticker result for ISIN {}", isin), - YahooTickerResult::AmbiguousResults => - format!("⚠ Ambiguous results for ISIN {}", isin), - }; + match scrape_company_details_by_isin(pool, &isin).await { + Ok(Some(details)) => { + logger::log_info(&format!("✓ Found Yahoo ticker {} for ISIN {}", details.ticker, isin)).await; - if result.is_found() { - logger::log_info(&log_msg).await; - } else { - logger::log_warn(&log_msg).await; + tickers.push(format!("YAHOO:{}", details.ticker)); + + if sector.is_none() && details.sector.is_some() { + sector = details.sector.clone(); + logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await; } - tickers.push(result.to_tagged_string()); + if exchange.is_none() && details.exchange.is_some() { + exchange = details.exchange.clone(); + logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await; + } + }, + Ok(None) => { + logger::log_warn(&format!("◯ No search results for ISIN {}", isin)).await; + tickers.push("YAHOO:NO_RESULTS".to_string()); }, Err(e) => { + if shutdown_flag.load(Ordering::SeqCst) { + break; + } logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await; - tickers.push("YAHOO:ERROR".to_string()); } } - } else { - logger::log_warn(&format!("Skipping Yahoo lookup for {} ISIN {} - already has Yahoo data", name, isin)).await; } } - // Only write if we have ticker data + if shutdown_flag.load(Ordering::SeqCst) { + break; + } + if !isin_tickers_map.is_empty() { let company_entry = CompanyCrossPlatformInfo { name: name.clone(), isin_tickers_map, + sector, + exchange, }; + // === APPEND-ONLY: Write single-line JSON with fsync === + // This guarantees the line is either fully written or not at all let line = serde_json::to_string(&company_entry)?; + log_file.write_all(line.as_bytes()).await?; + log_file.write_all(b"\n").await?; + log_file.flush().await?; - file.write_all(line.as_bytes()).await?; - file.write_all(b"\n").await?; + // Critical: fsync to ensure durability before considering write successful + // This prevents data loss on power failure or kernel panic + log_file.sync_data().await?; - // Flush after each write for crash safety - file.flush().await?; + // Update in-memory state ONLY after successful fsync + processed_names.insert(name.clone()); + existing_companies.insert(name.clone(), company_entry); count += 1; + updates_since_checkpoint += 1; + if is_update { updated_count += 1; } else { new_count += 1; } + // === ATOMIC CHECKPOINT: Periodically create checkpoint === + // This reduces recovery time by snapshotting current state + if updates_since_checkpoint >= checkpoint_interval { + logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await; + + let checkpoint_tmp = companies_path.with_extension("jsonl.tmp"); + let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?; + + // Write all current state to temporary checkpoint file + for company in existing_companies.values() { + let line = serde_json::to_string(company)?; + checkpoint_file.write_all(line.as_bytes()).await?; + checkpoint_file.write_all(b"\n").await?; + } + + checkpoint_file.flush().await?; + checkpoint_file.sync_all().await?; + drop(checkpoint_file); + + // Atomic rename - this is the commit point + // After this succeeds, the checkpoint is visible + tokio::fs::rename(&checkpoint_tmp, &companies_path).await?; + + // Clear log after successful checkpoint + // Any entries before this point are now captured in the checkpoint + drop(log_file); + tokio::fs::remove_file(&log_path).await.ok(); + log_file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await?; + + updates_since_checkpoint = 0; + logger::log_info("✓ Checkpoint created and log cleared").await; + } + if count % 10 == 0 { logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await; tokio::task::yield_now().await; @@ -232,29 +364,39 @@ async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc 0 { + logger::log_info("Creating final checkpoint...").await; + + let checkpoint_tmp = companies_path.with_extension("jsonl.tmp"); + let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?; + + for company in existing_companies.values() { + let line = serde_json::to_string(company)?; + checkpoint_file.write_all(line.as_bytes()).await?; + checkpoint_file.write_all(b"\n").await?; + } + + checkpoint_file.flush().await?; + checkpoint_file.sync_all().await?; + drop(checkpoint_file); + + // Atomic rename makes final checkpoint visible + tokio::fs::rename(&checkpoint_tmp, &companies_path).await?; + + // Clean up log + drop(log_file); + tokio::fs::remove_file(&log_path).await.ok(); + + logger::log_info("✓ Final checkpoint created").await; } - // Ensure all data is written - file.sync_all().await?; - drop(file); - - // Atomic rename: replace old file with new one - tokio::fs::rename(&temp_path, &companies_path).await?; - - logger::log_info(&format!("✓ Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count)).await; + logger::log_info(&format!("Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count)).await; Ok(count) } -/// Find most recent FIGI date directory async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); @@ -284,8 +426,6 @@ async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result, } diff --git a/src/corporate/yahoo.rs b/src/corporate/yahoo.rs index fbe1a61..19648ec 100644 --- a/src/corporate/yahoo.rs +++ b/src/corporate/yahoo.rs @@ -8,6 +8,7 @@ use tokio::{time::{Duration as TokioDuration, sleep}}; use std::{sync::Arc}; use anyhow::{anyhow, Result}; +const YAHOO_COMPANY_EXTRACTION_JS: &str = include_str!("yahoo_company_extraction.js"); /// Mapping existing /// getting historical stock price data daily (xxxx - 2025) and hourly (last 30 days) @@ -20,6 +21,16 @@ pub enum YahooTickerResult { AmbiguousResults, } +#[derive(Debug, Deserialize)] +pub struct ExtractionResult { + status: String, + ticker: Option, + sector: Option, + exchange: Option, + #[serde(default)] + error_message: Option, +} + impl YahooTickerResult { pub fn to_tagged_string(&self) -> String { match self { @@ -42,69 +53,52 @@ impl YahooTickerResult { } } -pub async fn scrape_ticker_by_isin( +pub async fn scrape_company_details_by_isin( pool: &Arc, isin: &str, -) -> anyhow::Result { +) -> anyhow::Result> { let isin = isin.to_string(); - pool.execute(format!("https://finance.yahoo.com/lookup?s={}", isin), move |client| { + pool.execute(format!("https://finance.yahoo.com/lookup/?s={}", isin), move |client| { let isin = isin.clone(); Box::pin(async move { sleep(TokioDuration::from_millis(1000)).await; reject_yahoo_cookies(&client).await?; sleep(TokioDuration::from_millis(1000)).await; - extract_ticker_by_isin(&client, &isin).await + extract_company_details(&client, &isin).await }) }).await } -pub async fn extract_ticker_by_isin( +pub async fn extract_company_details( client: &Client, _isin: &str, -) -> Result { - //let search_url = format!("https://finance.yahoo.com/lookup?s={}", isin); - - // Check for "No results found" message - if client.find(Locator::Css(".noData")).await.is_ok() { - return Ok(YahooTickerResult::NoResults); - } - - // Wait for results table - let table = match client - .wait() - .for_element(Locator::Css("table[data-test='lookup-table']")) - .await - { - Ok(t) => t, - Err(_) => return Ok(YahooTickerResult::NoResults), - }; +) -> Result> { + // Execute the JavaScript extraction script + let result = client.execute(YAHOO_COMPANY_EXTRACTION_JS, vec![]).await?; - // Find first row - let first_row = match table - .find(Locator::Css("tbody tr")) - .await - { - Ok(row) => row, - Err(_) => return Ok(YahooTickerResult::NoResults), - }; + // Parse the JSON result + let extraction: ExtractionResult = serde_json::from_value(result) + .map_err(|e| anyhow!("Failed to parse extraction result: {}", e))?; - // Extract ticker from first cell - let ticker_cell = first_row - .find(Locator::Css("td:nth-child(1)")) - .await - .map_err(|e| anyhow!("Failed to find ticker cell: {}", e))?; - - let ticker = ticker_cell - .text() - .await - .map_err(|e| anyhow!("Failed to get ticker text: {}", e))? - .trim() - .to_string(); - - if ticker.is_empty() { - Ok(YahooTickerResult::NotFound) - } else { - Ok(YahooTickerResult::Found(ticker)) + match extraction.status.as_str() { + "found" => { + if let Some(ticker) = extraction.ticker { + Ok(Some(YahooCompanyDetails { + ticker, + sector: extraction.sector, + exchange: extraction.exchange, + })) + } else { + Ok(None) + } + }, + "no_results" => Ok(None), + "not_found" => Ok(None), + "error" => { + let error_msg = extraction.error_message.unwrap_or_else(|| "Unknown error".to_string()); + Err(anyhow!("JavaScript extraction error: {}", error_msg)) + }, + _ => Ok(None), } } @@ -276,9 +270,9 @@ pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result span > div > a) + const sectorCell = firstRow.querySelector('td:nth-child(4) span div a'); + let sector = sectorCell ? sectorCell.textContent.trim() : ''; + + // Normalize empty/invalid values to null + if (!sector || sector === '-' || sector === 'N/A') { + sector = null; + } + + // Extract exchange from column 6 (td:nth-child(6) > span) + const exchangeCell = firstRow.querySelector('td:nth-child(6) span'); + let exchange = exchangeCell ? exchangeCell.textContent.trim() : ''; + + // Normalize empty/invalid values to null + if (!exchange || exchange === '-' || exchange === 'N/A') { + exchange = null; + } + + return { + status: 'found', + ticker: ticker, + sector: sector, + exchange: exchange + }; + + } catch (error) { + return { + status: 'error', + error_message: error.toString(), + ticker: null, + sector: null, + exchange: null + }; + } +})(); \ No newline at end of file diff --git a/src/economic/scraper.rs b/src/economic/scraper.rs index c7c6d0c..dfb0280 100644 --- a/src/economic/scraper.rs +++ b/src/economic/scraper.rs @@ -1,5 +1,6 @@ // src/economic/scraper.rs use super::types::{EconomicEvent}; +use event_backtest_engine::logger; use fantoccini::Client; use tokio::time::{sleep, Duration}; @@ -49,6 +50,6 @@ pub async fn extract_events(client: &Client) -> anyhow::Result Result<()> { cleanup_all_proxy_containers().await.ok(); - // Load configuration from .env let config = Config::load().map_err(|err| { eprintln!("Failed to load config: {}", err); err })?; - // Initialize paths and logger let paths = DataPaths::new(".")?; logger::init_debug_logger(paths.logs_dir()).await.ok(); logger::log_info("=== Event Backtest Engine Started ===").await; @@ -35,61 +34,36 @@ async fn main() -> Result<()> { config.enable_vpn_rotation )).await; - // === Step 1: Fetch fresh VPNBook credentials and .ovpn files (if rotation enabled) === + // Simple shutdown flag + let shutdown_flag = Arc::new(AtomicBool::new(false)); + + // === Step 1: Fetch VPNBook configs === let proxy_pool: Option> = if config.enable_vpn_rotation { - logger::log_info("VPN Rotation Enabled — Fetching latest VPNBook configs").await; - - // We only need 1 Chrome instance to scrape vpnbook.com (no proxy yet) - let temp_pool = Arc::new(ChromeDriverPool::new(1).await?); + logger::log_info("VPN Rotation Enabled – Fetching latest VPNBook configs").await; + let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit(config.max_parallel_instances, None, config.max_tasks_per_instance).await?); let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?; - logger::log_info(&format!("VPNBook credentials → User: {}", username)).await; - // Count how many distinct servers (subfolders) we have in cache/openvpn/ let server_count = std::fs::read_dir(paths.cache_openvpn_dir())? .filter(|e| e.as_ref().unwrap().path().is_dir()) .count(); if server_count == 0 { - logger::log_warn("No VPN servers found — continuing without VPN").await; + logger::log_warn("No VPN servers found – continuing without VPN").await; None } else { - logger::log_info(&format!("Found {} VPN servers — starting Docker proxy containers", server_count)).await; - - let pp = Arc::new( - DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await? - ); - - // Verify all proxies are working before proceeding - logger::log_info("Verifying all proxy connections...").await; - let mut all_working = true; - for i in 0..pp.num_proxies() { - match pp.test_proxy_connection(i).await { - Ok(ip) => { - logger::log_info(&format!(" Proxy {}: working with IP: {}", i + 1, ip)).await; - } - Err(e) => { - logger::log_error(&format!(" Proxy {}: FAILED - {}", i + 1, e)).await; - all_working = false; - } - } - } - - if !all_working { - logger::log_warn("Some proxies failed, but continuing with working ones...").await; - } else { - logger::log_info("All proxies verified and ready!").await; - } + logger::log_info(&format!("Found {} VPN servers – starting Docker proxy containers", server_count)).await; + let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await?); logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await; Some(pp) } } else { - logger::log_info("VPN rotation disabled — using direct connection").await; + logger::log_info("VPN rotation disabled – using direct connection").await; None }; - // === Step 2: Initialize the main ChromeDriver pool (with proxy if enabled) === + // === Step 2: Initialize ChromeDriver pool === let pool_size = config.max_parallel_instances; let task_limit = config.max_tasks_per_instance; @@ -105,17 +79,23 @@ async fn main() -> Result<()> { logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size)).await; - // === Step 3: Graceful Ctrl+C handler === + // === Step 3: Ctrl+C handler === { + let shutdown_flag_clone = Arc::clone(&shutdown_flag); let pool_clone = Arc::clone(&pool); let proxy_clone = proxy_pool.clone(); tokio::spawn(async move { tokio::signal::ctrl_c().await.ok(); + logger::log_info("Ctrl+C received – shutting down gracefully...").await; + + // Set flag first + shutdown_flag_clone.store(true, Ordering::SeqCst); + + // Wait a bit for tasks to notice + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - logger::log_info("Ctrl+C received — shutting down gracefully...").await; - - // Now works: &*pool_clone derefs Arc → &ChromeDriverPool + // Cleanup if let Err(e) = (&*pool_clone).shutdown().await { logger::log_error(&format!("Error during pool shutdown: {}", e)).await; } @@ -129,31 +109,34 @@ async fn main() -> Result<()> { } let _ = cleanup_all_proxy_containers().await; - std::process::exit(0); }); } - // === Step 4: Run the actual scraping jobs === + // === Step 4: Run scraping jobs === logger::log_info("--- Starting ECONOMIC data update ---").await; economic::run_full_update(&config, &pool).await?; logger::log_info("Economic update completed").await; - logger::log_info("--- Starting CORPORATE data update ---").await; - corporate::run_full_update(&config, &pool).await?; - logger::log_info("Corporate update completed").await; - - // === Step 5: Final cleanup === - logger::log_info("Shutting down ChromeDriver pool...").await; - pool.shutdown().await?; - - if let Some(pp) = proxy_pool { - logger::log_info("Stopping Docker VPN proxy containers...").await; - pp.shutdown().await?; - // CLEANUP ANY LEFTOVER CONTAINERS FROM PREVIOUS RUNS - cleanup_all_proxy_containers().await.ok(); + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("--- Starting CORPORATE data update ---").await; + corporate::run_full_update(&config, &pool, &shutdown_flag).await?; + logger::log_info("Corporate update completed").await; } - logger::log_info("=== Application finished successfully ===").await; + // === Step 5: Final cleanup === + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Shutting down ChromeDriver pool...").await; + pool.shutdown().await?; + + if let Some(pp) = proxy_pool { + logger::log_info("Stopping Docker VPN proxy containers...").await; + pp.shutdown().await?; + cleanup_all_proxy_containers().await.ok(); + } + + logger::log_info("=== Application finished successfully ===").await; + } + Ok(()) } \ No newline at end of file diff --git a/src/scraper/docker_vpn_proxy.rs b/src/scraper/docker_vpn_proxy.rs index e2a4086..4d68cfb 100644 --- a/src/scraper/docker_vpn_proxy.rs +++ b/src/scraper/docker_vpn_proxy.rs @@ -106,10 +106,18 @@ impl DockerVpnProxyPool { working_ports.push(port); } Ok(None) => { - crate::util::logger::log_warn(&format!("✓ Container {} on port {} ready but IP detection failed", - container_name, port)).await; - working_containers.push(container_name); - working_ports.push(port); + let logs = Command::new("docker") + .args(["logs", "--tail", "20", &container_name]) + .output() + .await + .ok() + .and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into()); + + crate::util::logger::log_error(&format!("✗ Container {} on port {} ready but IP detection failed. Logs: {:?}", + container_name, port, logs)).await; + failed_count += 1; + // Clean up failed container + let _ = Self::cleanup_container(&container_name).await; } Err(e) => { // Get container logs to debug @@ -309,25 +317,6 @@ impl DockerVpnProxyPool { true } - /// Test if a specific proxy is working - pub async fn test_proxy_connection(&self, index: usize) -> Result { - let port = self.proxy_ports[index]; - let proxy_url = format!("socks5://localhost:{}", port); - - let client = reqwest::Client::builder() - .proxy(reqwest::Proxy::all(&proxy_url)?) - .timeout(Duration::from_secs(10)) - .build()?; - - let response = client.get("http://checkip.amazonaws.com") - .send() - .await? - .text() - .await?; - - Ok(response.trim().to_string()) - } - pub fn get_proxy_url(&self, index: usize) -> String { let port = self.proxy_ports[index % self.proxy_ports.len()]; format!("socks5://localhost:{}", port) diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index 0e61902..5180aae 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -19,11 +19,15 @@ pub struct ChromeDriverPool { semaphore: Arc, /// Optional Docker-based proxy pool (one proxy per Chrome instance) proxy_pool: Option>, + /// Whether rotation is enabled (uses half of instances at a time) + rotation_enabled: bool, + /// Index for round-robin instance selection (when rotation is enabled) + next_instance: Arc>, } impl ChromeDriverPool { /// Creates a new pool without any proxy (direct connection). - pub async fn new(pool_size: usize) -> Result { + pub async fn _new(pool_size: usize) -> Result { Self::new_with_proxy_and_task_limit(pool_size, None, 0).await } @@ -40,22 +44,53 @@ impl ChromeDriverPool { Self::new_with_proxy_and_task_limit(pool_size, proxy_pool, 0).await } - /// Full constructor: supports proxy + task limiting. + /// Full constructor: supports proxy + task limiting + rotation. + /// + /// When rotation is enabled, only half of the instances are used at once, + /// rotating to the other half when task limits are reached. + /// + /// The actual pool_size is constrained by: + /// - max_parallel_instances from config (pool_size_limit parameter) + /// - Available proxies from proxy_pool (if provided) + /// + /// Uses the minimum of these constraints to determine actual pool size. pub async fn new_with_proxy_and_task_limit( - pool_size: usize, + pool_size_limit: usize, proxy_pool: Option>, max_tasks_per_instance: usize, ) -> Result { - let mut instances = Vec::with_capacity(pool_size); + // Determine actual pool size based on available resources + let actual_pool_size = if let Some(ref pp) = proxy_pool { + let available_proxies = pp.num_proxies(); + pool_size_limit.min(available_proxies) + } else { + pool_size_limit + }; + + if actual_pool_size == 0 { + return Err(anyhow!("Pool size must be at least 1")); + } + + // Rotation is enabled when task limiting is active + let rotation_enabled = max_tasks_per_instance > 0; + + let mut instances = Vec::with_capacity(actual_pool_size); crate::util::logger::log_info(&format!( - "Initializing ChromeDriver pool with {} instances{}...", - pool_size, - if proxy_pool.is_some() { " (each using a unique Docker SOCKS5 proxy)" } else { "" } + "Initializing ChromeDriver pool with {} instances{}{}...", + actual_pool_size, + if proxy_pool.is_some() { " (each using a unique Docker SOCKS5 proxy)" } else { "" }, + if rotation_enabled { " with rotation enabled" } else { "" } )) .await; - for i in 0..pool_size { + if rotation_enabled && actual_pool_size < 2 { + crate::util::logger::log_warn( + "Rotation enabled but pool has < 2 instances - rotation will be limited" + ).await; + } + + for i in 0..actual_pool_size { let proxy_url = proxy_pool .as_ref() .map(|pp| pp.get_proxy_url(i)); @@ -68,12 +103,22 @@ impl ChromeDriverPool { Ok(Self { instances, - semaphore: Arc::new(Semaphore::new(pool_size)), + semaphore: Arc::new(Semaphore::new(actual_pool_size)), proxy_pool, + rotation_enabled, + next_instance: Arc::new(Mutex::new(0)), }) } /// Execute a scraping task using an available instance from the pool. + /// + /// When rotation is enabled: + /// - Uses only half of the instances at a time + /// - Rotates to the other half when an instance reaches its task limit + /// - Cycles through instances in round-robin fashion within the active half + /// + /// When rotation is disabled: + /// - Uses all instances with random selection pub async fn execute(&self, url: String, parse: F) -> Result where T: Send + 'static, @@ -82,8 +127,81 @@ impl ChromeDriverPool { { let _permit = self.semaphore.acquire().await.map_err(|_| anyhow!("Pool closed"))?; - // Round-robin selection - let index = rand::random_range(..self.instances.len()); + let index = if self.rotation_enabled { + // Rotation mode: use only half of instances at a time + let total_instances = self.instances.len(); + let half_size = (total_instances + 1) / 2; // Round up for odd numbers + + let mut next_idx = self.next_instance.lock().await; + let base_idx = *next_idx; + let mut selected_idx = base_idx; + let mut found_in_current_half = false; + + // Try to find an available instance in the current half + for offset in 0..half_size { + let candidate_idx = (base_idx + offset) % half_size; + + // Check if this instance has reached its task limit + let instance = &self.instances[candidate_idx]; + let guard = instance.lock().await; + + if guard.max_tasks_per_instance == 0 || + guard.task_count < guard.max_tasks_per_instance { + // This instance is available + *next_idx = (candidate_idx + 1) % half_size; + selected_idx = candidate_idx; + found_in_current_half = true; + drop(guard); + break; + } else { + drop(guard); + } + } + + if !found_in_current_half { + // All instances in current half are at limit, switch to other half + crate::util::logger::log_info( + "Current half saturated, rotating to other half of instances" + ).await; + + let other_half_start = half_size; + let other_half_size = total_instances - half_size; + + // Find available instance in other half + let mut found_in_other_half = false; + for offset in 0..other_half_size { + let candidate_idx = other_half_start + offset; + + let instance = &self.instances[candidate_idx]; + let guard = instance.lock().await; + + if guard.max_tasks_per_instance == 0 || + guard.task_count < guard.max_tasks_per_instance { + // Switch to this half for future requests + *next_idx = offset; + selected_idx = candidate_idx; + found_in_other_half = true; + drop(guard); + break; + } else { + drop(guard); + } + } + + if !found_in_other_half { + // All instances saturated - use round-robin anyway + selected_idx = *next_idx % total_instances; + *next_idx = (*next_idx + 1) % total_instances; + } + } + + drop(next_idx); + selected_idx + } else { + // Non-rotation mode: random selection as before + rand::random_range(..self.instances.len()) + }; + let instance = self.instances[index].clone(); let mut guard = instance.lock().await; @@ -91,7 +209,8 @@ impl ChromeDriverPool { if guard.max_tasks_per_instance > 0 { crate::util::logger::log_info(&format!( - "Instance task count: {}/{}", + "Instance {} task count: {}/{}", + index, guard.get_task_count(), guard.max_tasks_per_instance )) @@ -130,6 +249,20 @@ impl ChromeDriverPool { pub fn get_number_of_instances(&self) -> usize { self.instances.len() } + + /// Returns whether rotation is enabled + pub fn is_rotation_enabled(&self) -> bool { + self.rotation_enabled + } + + /// Returns the size of each half when rotation is enabled + pub fn get_rotation_half_size(&self) -> usize { + if self.rotation_enabled { + (self.instances.len() + 1) / 2 + } else { + self.instances.len() + } + } } /// Represents a single instance of chromedriver process, optionally bound to a VPN.