diff --git a/Cargo.lock b/Cargo.lock index 6e09974..12e15ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3454,6 +3454,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" @@ -3631,6 +3637,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "urlencoding", "walkdir", "yfinance-rs", "zip", diff --git a/Cargo.toml b/Cargo.toml index e5afa63..8ef5d50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,11 +17,12 @@ categories = ["finance", "data-structures", "asynchronous"] tokio = { version = "1.38", features = ["full"] } # Web scraping & HTTP -reqwest = { version = "0.12", features = ["json", "gzip", "brotli", "deflate", "blocking"] } +reqwest = { version = "0.12", features = ["json", "gzip", "brotli", "deflate", "blocking", "socks", "cookies"] } scraper = "0.19" # HTML parsing for Yahoo earnings pages fantoccini = { version = "0.20", features = ["rustls-tls"] } # Headless Chrome for finanzen.net yfinance-rs = "0.7.2" url = "2.5.7" +urlencoding = "2.1" # Serialization serde = { version = "1.0", features = ["derive"] } diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 3d0fe24..2940ea7 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,15 +1,55 @@ -// src/corporate/update.rs - UPDATED WITH DATA INTEGRITY FIXES -use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*}; +// src/corporate/update.rs - WITH ABORT-SAFE INCREMENTAL PERSISTENCE +use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, atomic_writer::*}; use crate::config::Config; use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; +use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; -use chrono::Local; +use std::result::Result::Ok; +use chrono::{Local, Utc}; use std::collections::HashMap; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use futures::stream::{FuturesUnordered, StreamExt}; +use serde_json::json; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; +use std::time::Duration; + +/// Result of processing a single company +#[derive(Debug, Clone)] +pub enum CompanyProcessResult { + Valid(CompanyCrossPlatformInfo), + FilteredLowCap { name: String, market_cap: f64 }, + FilteredNoPrice { name: String }, + Failed { company: CompanyCrossPlatformInfo, error: String, is_transient: bool }, +} + +/// Represents a write command to be serialized through the log writer +enum LogCommand { + Write(CompanyCrossPlatformInfo), + Checkpoint, + Shutdown, +} + +/// Result from processing a single company with priority +struct CompanyTaskResult { + company: CompanyCrossPlatformInfo, + result: CompanyProcessResult, +} + +/// Check if a company needs processing (validation check) +fn company_needs_processing( + company: &CompanyCrossPlatformInfo, + existing_companies: &HashMap, +) -> bool { + // If company exists in cleaned output, skip it + !existing_companies.contains_key(&company.name) +} /// Main corporate update entry point with shutdown awareness pub async fn run_full_update( @@ -87,47 +127,46 @@ pub async fn run_full_update( } logger::log_info("Step 6: Cleansing up companies with missing essential data...").await; - let cleansed_count = companies_yahoo_jsonl(&paths).await?; + let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?; logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await; + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after companies.jsonl build").await; + return Ok(()); + } + + logger::log_info("Step 7: Cleansing up companies with too low profile (with abort-safe persistence)...").await; + let proxy_pool = pool.get_proxy_pool() + .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?; + + let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, _config, proxy_pool, shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await; + if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 7: Processing events (using index)...").await; + logger::log_info("Step 8: Processing events (using index)...").await; let _event_index = build_event_index(&paths).await?; logger::log_info(" ✓ Event index built").await; } else { logger::log_warn("Shutdown detected, skipping event index build").await; } - logger::log_info("✓ Corporate update complete").await; + logger::log_info("✅ Corporate update complete").await; Ok(()) } - /// Cleansing function to remove companies with missing essential yahoo data for integrity -/// Has to contain a ticker with 'YAHOO:'; Entries with 'YAHOO:NO_RESULTS' and 'YAHOO:ERROR' are removed -/// The rest stays unchanged -/// -/// Uses state.jsonl to track completion and avoid re-running the cleansing operation -/// The '.jsonl' will be saved in the same path but 'companies_yahoo.jsonl' -/// Only execute when 'companies.jsonl' is present -pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { - use tokio::fs::File; - use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; - use serde_json::json; - +pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result { let data_path = paths.data_dir(); let input_path = data_path.join("companies.jsonl"); let output_path = data_path.join("companies_yahoo.jsonl"); let state_path = data_path.join("state.jsonl"); - // Check if input file exists if !input_path.exists() { logger::log_warn("companies.jsonl not found, skipping cleansing").await; return Ok(0); } - // Check if state file exists and cleansing was already completed if state_path.exists() { let state_content = tokio::fs::read_to_string(&state_path).await?; @@ -137,10 +176,9 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { } if let Ok(state) = serde_json::from_str::(line) { - if state.get("yahoo_companies").and_then(|v| v.as_bool()).unwrap_or(false) { + if state.get("yahoo_companies_cleansed_no_data").and_then(|v| v.as_bool()).unwrap_or(false) { logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await; - // Count lines in existing output file if output_path.exists() { let output_content = tokio::fs::read_to_string(&output_path).await?; let count = output_content.lines() @@ -185,8 +223,6 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { } }; - // Check if company has at least one valid YAHOO ticker - // Valid means: starts with "YAHOO:" but is NOT "YAHOO:NO_RESULTS" or "YAHOO:ERROR" let has_valid_yahoo = company.isin_tickers_map .values() .flatten() @@ -197,7 +233,6 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { }); if has_valid_yahoo { - // Write the company to the filtered output let json_line = serde_json::to_string(&company)?; output_file.write_all(json_line.as_bytes()).await?; output_file.write_all(b"\n").await?; @@ -205,12 +240,10 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { } else { removed_count += 1; if removed_count <= 5 { - // Log first few removals for debugging logger::log_info(&format!(" Removed company '{}' (no valid Yahoo ticker)", company.name)).await; } } - // Progress indicator for large files if total_count % 1000 == 0 { logger::log_info(&format!(" Processed {} companies...", total_count)).await; } @@ -223,9 +256,8 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { total_count, valid_count, removed_count )).await; - // Write state file to mark completion let yahoo_companies = json!({ - "yahoo_companies": true, + "yahoo_companies_cleansed_no_data": true, "completed_at": chrono::Utc::now().to_rfc3339(), }); @@ -240,6 +272,768 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { Ok(valid_count) } +/// Yahoo Low Profile Cleansing WITH ABORT-SAFE INCREMENTAL PERSISTENCE +/// +/// # Features +/// - ✅ Graceful shutdown (abort-safe) +/// - ✅ Task panic isolation (tasks fail independently) +/// - ✅ Crash-safe persistence (checkpoint + log with fsync) +/// - ✅ Smart skip logic (only process incomplete data) +/// - Uses pending queue instead of retry mechanism +/// - Reuses companies_update.log for persistence +/// +/// # Persistence Strategy +/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) +/// - Log: companies_update.log (append-only updates) +/// - On restart: Load checkpoint + replay log +/// - Periodic checkpoints (every 50 companies) +/// - Batched fsync (every 10 writes or 10 seconds) +pub async fn companies_yahoo_cleansed_low_profile( + paths: &DataPaths, + config: &Config, + proxy_pool: Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 50; + const FSYNC_BATCH_SIZE: usize = 10; + const FSYNC_INTERVAL_SECS: u64 = 10; + const CONCURRENCY_LIMIT: usize = 50; // Limit parallel validation tasks + + let data_path = paths.data_dir(); + + // File paths (reusing companies_update.log) + let input_path = data_path.join("companies_yahoo.jsonl"); + let checkpoint_path = data_path.join("companies_yahoo_cleaned.jsonl"); + let log_path = data_path.join("companies_update.log"); + + // Check input exists + if !input_path.exists() { + logger::log_warn(" companies_yahoo.jsonl not found, skipping low profile cleansing").await; + return Ok(0); + } + + // === RECOVERY PHASE: Load checkpoint + replay log === + let mut existing_companies: HashMap = HashMap::new(); + let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); + + if checkpoint_path.exists() { + logger::log_info("Loading checkpoint from companies_yahoo_cleaned.jsonl...").await; + let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?; + + for line in checkpoint_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(company) => { + processed_names.insert(company.name.clone()); + existing_companies.insert(company.name.clone(), company); + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; + } + } + } + logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; + } + + if log_path.exists() { + logger::log_info("Replaying update log...").await; + let log_content = tokio::fs::read_to_string(&log_path).await?; + let mut replayed = 0; + + for line in log_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(company) => { + processed_names.insert(company.name.clone()); + existing_companies.insert(company.name.clone(), company); + replayed += 1; + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; + } + } + } + if replayed > 0 { + logger::log_info(&format!("Replayed {} updates from log", replayed)).await; + } + } + + // === LOAD INPUT COMPANIES === + logger::log_info(&format!("Loading companies from: {:?}", input_path)).await; + let input_companies = load_companies_from_jsonl(&input_path).await?; + logger::log_info(&format!("Loaded {} companies from input", input_companies.len())).await; + + // === BUILD PENDING LIST (smart skip logic) === + let mut pending: Vec = input_companies + .into_iter() + .filter(|company| company_needs_processing(company, &existing_companies)) + .collect(); + + logger::log_info(&format!( + "Initial scan: {} companies need processing ({} already complete)", + pending.len(), + existing_companies.len() + )).await; + + if pending.is_empty() { + logger::log_info(" ✓ All companies already processed").await; + return Ok(existing_companies.len()); + } + + // === SETUP LOG WRITER TASK === + let (write_tx, mut write_rx) = mpsc::channel::(1000); + + let log_file_init = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await?; + + let checkpoint_path_clone = checkpoint_path.clone(); + let log_path_clone = log_path.clone(); + let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); + let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer); + + let write_tx_for_writer = write_tx.clone(); + let writer_task = tokio::spawn(async move { + let mut log_file = log_file_init; + let mut writes_since_fsync = 0; + let mut last_fsync = std::time::Instant::now(); + let mut updates_since_checkpoint = 0; + let mut count = 0; + let mut new_count = 0; + let mut updated_count = 0; + + while let Some(cmd) = write_rx.recv().await { + match cmd { + LogCommand::Write(company) => { + // Write to log + let line = serde_json::to_string(&company).unwrap(); + if let Err(e) = log_file.write_all(line.as_bytes()).await { + logger::log_error(&format!("Failed to write to log: {}", e)).await; + break; + } + if let Err(e) = log_file.write_all(b"\n").await { + logger::log_error(&format!("Failed to write newline: {}", e)).await; + break; + } + + writes_since_fsync += 1; + updates_since_checkpoint += 1; + count += 1; + + // Update in-memory state + let mut existing_companies = existing_companies_writer_for_task.lock().await; + let is_update = existing_companies.contains_key(&company.name); + existing_companies.insert(company.name.clone(), company); + drop(existing_companies); + + if is_update { + updated_count += 1; + } else { + new_count += 1; + } + + // Batched + time-based fsync + let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE + || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; + + if should_fsync { + if let Err(e) = log_file.flush().await { + logger::log_error(&format!("Failed to flush: {}", e)).await; + break; + } + if let Err(e) = log_file.sync_data().await { + logger::log_error(&format!("Failed to fsync: {}", e)).await; + break; + } + writes_since_fsync = 0; + last_fsync = std::time::Instant::now(); + } + } + LogCommand::Checkpoint => { + if let Err(e) = log_file.flush().await { + logger::log_error(&format!("Failed to flush before checkpoint: {}", e)).await; + break; + } + if let Err(e) = log_file.sync_data().await { + logger::log_error(&format!("Failed to fsync before checkpoint: {}", e)).await; + break; + } + + let existing_companies = existing_companies_writer_for_task.lock().await; + let companies_vec: Vec<_> = existing_companies.values().cloned().collect(); + drop(existing_companies); + + let temp_path = checkpoint_path_clone.with_extension("tmp"); + match tokio::fs::File::create(&temp_path).await { + Ok(mut temp_file) => { + let mut checkpoint_ok = true; + for company in &companies_vec { + if let Ok(line) = serde_json::to_string(company) { + if temp_file.write_all(line.as_bytes()).await.is_err() || + temp_file.write_all(b"\n").await.is_err() { + checkpoint_ok = false; + break; + } + } + } + + if checkpoint_ok { + if temp_file.flush().await.is_ok() && + temp_file.sync_data().await.is_ok() { + drop(temp_file); + + if tokio::fs::rename(&temp_path, &checkpoint_path_clone).await.is_ok() { + if tokio::fs::remove_file(&log_path_clone).await.is_ok() { + logger::log_info(&format!( + "✓ Checkpoint created ({} companies), log cleared", + companies_vec.len() + )).await; + + if let Ok(new_log) = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path_clone) + .await { + log_file = new_log; + } + } + } + } + } + } + Err(e) => { + logger::log_error(&format!("Failed to create checkpoint temp file: {}", e)).await; + } + } + updates_since_checkpoint = 0; + } + LogCommand::Shutdown => { + logger::log_info("Writer shutting down...").await; + break; + } + } + + // Periodic checkpoint trigger + if updates_since_checkpoint >= CHECKPOINT_INTERVAL { + let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await; + } + } + + // Final fsync + let _ = log_file.flush().await; + let _ = log_file.sync_data().await; + + logger::log_info(&format!( + "Writer finished: {} total ({} new, {} updated)", + count, new_count, updated_count + )).await; + + (count, new_count, updated_count) + }); + + // === CREATE YAHOO CLIENT POOL === + logger::log_info("Creating YahooClientPool with proxy rotation...").await; + let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?); + logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await; + + // Wrap paths in Arc for safe sharing across tasks + let paths = Arc::new((*paths).clone()); + + // === MAIN PROCESSING LOOP WITH TASK PANIC ISOLATION === + let total = pending.len(); + let mut tasks = FuturesUnordered::new(); + + // Counters + let processed = Arc::new(AtomicUsize::new(0)); + let valid_count = Arc::new(AtomicUsize::new(0)); + let filtered_low_cap = Arc::new(AtomicUsize::new(0)); + let filtered_no_price = Arc::new(AtomicUsize::new(0)); + let failed_count = Arc::new(AtomicUsize::new(0)); + + // Spawn initial batch + for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + + // Process results and spawn new tasks (with task panic isolation) + while let Some(task_result) = tasks.next().await { + // Check for shutdown + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown signal received, stopping processing").await; + break; + } + + match task_result { + Ok(Ok(Some(_result))) => { + // Success - spawn next task + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + Ok(Ok(None)) => { + // Filtered or failed - spawn next task + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + Ok(Err(e)) => { + // Processing error + logger::log_error(&format!("Company processing error: {}", e)).await; + + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + Err(e) => { + // Task panic (isolated - doesn't crash entire process) + logger::log_error(&format!("Task panic: {}", e)).await; + + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + } + } + + logger::log_info("Main processing loop completed").await; + + // Signal writer to finish + let _ = write_tx.send(LogCommand::Checkpoint).await; + let _ = write_tx.send(LogCommand::Shutdown).await; + drop(write_tx); + + // Wait for writer to finish + let (final_count, final_new, final_updated) = writer_task.await + .unwrap_or((0, 0, 0)); + + let final_valid = valid_count.load(Ordering::SeqCst); + let final_filtered_low_cap = filtered_low_cap.load(Ordering::SeqCst); + let final_filtered_no_price = filtered_no_price.load(Ordering::SeqCst); + let final_failed = failed_count.load(Ordering::SeqCst); + + logger::log_info(&format!( + "✅ Completed: {} total companies ({} new, {} updated)", + final_count, final_new, final_updated + )).await; + logger::log_info(&format!( + " Valid: {}, Filtered (low cap): {}, Filtered (no price): {}, Failed: {}", + final_valid, final_filtered_low_cap, final_filtered_no_price, final_failed + )).await; + + // Shutdown Yahoo pool + yahoo_pool.shutdown().await?; + + Ok(final_valid) +} + +/// Helper function to spawn a validation task (reduces code duplication) +fn spawn_validation_task( + company: CompanyCrossPlatformInfo, + yahoo_pool: &Arc, + paths: &Arc, + write_tx: &mpsc::Sender, + shutdown_flag: &Arc, + processed: &Arc, + valid_count: &Arc, + filtered_low_cap: &Arc, + filtered_no_price: &Arc, + failed_count: &Arc, + total: usize, + tasks: &mut FuturesUnordered>>>, +) { + let yahoo_pool_clone = Arc::clone(yahoo_pool); + let paths_clone = Arc::clone(paths); + let shutdown_flag_clone = Arc::clone(shutdown_flag); + let write_tx_clone = write_tx.clone(); + let processed_clone = Arc::clone(processed); + let valid_count_clone = Arc::clone(valid_count); + let filtered_low_cap_clone = Arc::clone(filtered_low_cap); + let filtered_no_price_clone = Arc::clone(filtered_no_price); + let failed_count_clone = Arc::clone(failed_count); + + let task = tokio::spawn(async move { + // Check shutdown at start + if shutdown_flag_clone.load(Ordering::SeqCst) { + return Ok::<_, anyhow::Error>(None); + } + + let result = process_company_with_validation( + &company, + &yahoo_pool_clone, + &*paths_clone, + ).await; + + let task_result = match result { + CompanyProcessResult::Valid(validated_company) => { + // Send to writer + let _ = write_tx_clone.send(LogCommand::Write(validated_company.clone())).await; + valid_count_clone.fetch_add(1, Ordering::SeqCst); + Some(CompanyTaskResult { + company: validated_company.clone(), + result: CompanyProcessResult::Valid(validated_company), + }) + } + CompanyProcessResult::FilteredLowCap { name, market_cap } => { + filtered_low_cap_clone.fetch_add(1, Ordering::SeqCst); + if filtered_low_cap_clone.load(Ordering::SeqCst) <= 10 { + logger::log_info(&format!(" Filtered {} - low market cap: {:.0} EUR", name, market_cap)).await; + } + None + } + CompanyProcessResult::FilteredNoPrice { name } => { + filtered_no_price_clone.fetch_add(1, Ordering::SeqCst); + if filtered_no_price_clone.load(Ordering::SeqCst) <= 10 { + logger::log_info(&format!(" Filtered {} - no recent price data", name)).await; + } + None + } + CompanyProcessResult::Failed { company: failed_company, error, is_transient: _ } => { + failed_count_clone.fetch_add(1, Ordering::SeqCst); + logger::log_warn(&format!(" Failed to process '{}': {}", failed_company.name, error)).await; + None + } + }; + + // Progress reporting + let current = processed_clone.fetch_add(1, Ordering::SeqCst) + 1; + if current % 100 == 0 { + logger::log_info(&format!( + "Progress: {}/{} ({} valid, {} low cap, {} no price, {} failed)", + current, total, + valid_count_clone.load(Ordering::SeqCst), + filtered_low_cap_clone.load(Ordering::SeqCst), + filtered_no_price_clone.load(Ordering::SeqCst), + failed_count_clone.load(Ordering::SeqCst) + )).await; + } + + Ok(task_result) + }); + + tasks.push(task); +} + +/// Process a single company with full error categorization +async fn process_company_with_validation( + company: &CompanyCrossPlatformInfo, + yahoo_pool: &Arc, + paths: &DataPaths, +) -> CompanyProcessResult { + // Extract Yahoo ticker + let ticker = match extract_first_yahoo_ticker(company) { + Some(t) => t, + None => { + return CompanyProcessResult::Failed { + company: company.clone(), + error: "No valid Yahoo ticker found".to_string(), + is_transient: false, // Permanent - no ticker means no data + }; + } + }; + + // Fetch core modules from Yahoo + let summary = match yahoo_pool.get_quote_summary( + &ticker, + &QuoteSummaryModule::core_modules(), + ).await { + Ok(s) => s, + Err(e) => { + let error_msg = e.to_string(); + let is_transient = is_transient_error(&error_msg); + + return CompanyProcessResult::Failed { + company: company.clone(), + error: format!("API error fetching summary: {}", error_msg), + is_transient, + }; + } + }; + + // Validate market cap + let market_cap = extract_market_cap(&summary); + if market_cap < 1_000_000.0 { + return CompanyProcessResult::FilteredLowCap { + name: company.name.clone(), + market_cap, + }; + } + + // Validate recent price activity + let has_recent_price = match check_recent_price_activity(yahoo_pool, &ticker).await { + Ok(has) => has, + Err(e) => { + let error_msg = e.to_string(); + let is_transient = is_transient_error(&error_msg); + + return CompanyProcessResult::Failed { + company: company.clone(), + error: format!("API error fetching price history: {}", error_msg), + is_transient, + }; + } + }; + + if !has_recent_price { + return CompanyProcessResult::FilteredNoPrice { + name: company.name.clone(), + }; + } + + // Save core data + if let Err(e) = save_company_core_data(paths, &company.name, &summary).await { + logger::log_warn(&format!( + " Failed to save core data for {}: {}", + company.name, e + )).await; + } + + CompanyProcessResult::Valid(company.clone()) +} + +/// Determine if an error is transient (should retry) or permanent (skip) +fn is_transient_error(error: &str) -> bool { + let error_lower = error.to_lowercase(); + + // Transient errors (network, rate limiting, timeouts) + let transient_patterns = [ + "timeout", + "timed out", + "connection", + "network", + "rate limit", + "too many requests", + "429", + "503", + "502", + "500", + "temporarily", + "unavailable", + ]; + + for pattern in &transient_patterns { + if error_lower.contains(pattern) { + return true; + } + } + + // Permanent errors (invalid ticker, no data, parsing errors) + let permanent_patterns = [ + "404", + "not found", + "invalid", + "no data", + "parse error", + "400", + "401", + "403", + ]; + + for pattern in &permanent_patterns { + if error_lower.contains(pattern) { + return false; + } + } + + // Default: treat unknown errors as transient (safer to retry) + true +} + +/// Load companies from JSONL file +async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result> { + let content = tokio::fs::read_to_string(path).await?; + let mut companies = Vec::new(); + + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + if let Ok(company) = serde_json::from_str::(line) { + companies.push(company); + } + } + + Ok(companies) +} + +fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { + for tickers in company.isin_tickers_map.values() { + for ticker in tickers { + if ticker.starts_with("YAHOO:") + && ticker != "YAHOO:NO_RESULTS" + && ticker != "YAHOO:ERROR" + { + return Some(ticker.trim_start_matches("YAHOO:").to_string()); + } + } + } + None +} + +fn extract_market_cap(summary: &crate::scraper::yahoo::QuoteSummary) -> f64 { + let price_module = match summary.modules.get("price") { + Some(m) => m, + None => return 0.0, + }; + + let market_cap_raw = price_module + .get("marketCap") + .and_then(|v| v.get("raw")) + .and_then(|v| v.as_f64()) + .unwrap_or(0.0); + + let currency = price_module + .get("currency") + .and_then(|v| v.as_str()) + .unwrap_or("USD"); + + let market_cap_eur = match currency { + "EUR" => market_cap_raw, + "USD" => market_cap_raw * 0.92, + "GBP" => market_cap_raw * 1.17, + "JPY" => market_cap_raw * 0.0061, + "CHF" => market_cap_raw * 1.05, + _ => market_cap_raw * 0.92, + }; + + market_cap_eur +} + +async fn check_recent_price_activity( + yahoo_pool: &Arc, + ticker: &str, +) -> anyhow::Result { + let now = Utc::now().timestamp(); + let one_year_ago = now - (365 * 24 * 60 * 60); + let sixty_days_ago = now - (60 * 24 * 60 * 60); + + let chart_data = yahoo_pool.get_chart_data( + ticker, + "1d", + sixty_days_ago, + now, + ).await?; + + if chart_data.quotes.is_empty() { + return Ok(false); + } + + let most_recent_timestamp = chart_data.quotes + .iter() + .map(|q| q.timestamp) + .max() + .unwrap_or(0); + + Ok(most_recent_timestamp >= one_year_ago) +} + +async fn save_company_core_data( + paths: &DataPaths, + company_name: &str, + summary: &crate::scraper::yahoo::QuoteSummary, +) -> anyhow::Result<()> { + use tokio::fs; + + let safe_name = company_name + .replace("/", "_") + .replace("\\", "_") + .replace(":", "_") + .replace("*", "_") + .replace("?", "_") + .replace("\"", "_") + .replace("<", "_") + .replace(">", "_") + .replace("|", "_"); + + let company_dir = paths.corporate_dir().join(&safe_name).join("core"); + fs::create_dir_all(&company_dir).await?; + + let data_path = company_dir.join("data.jsonl"); + let json_line = serde_json::to_string(summary)?; + + let mut file = fs::File::create(&data_path).await?; + file.write_all(json_line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + + Ok(()) +} + async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); diff --git a/src/corporate/yahoo.rs b/src/corporate/yahoo.rs index f34e8dc..fa82531 100644 --- a/src/corporate/yahoo.rs +++ b/src/corporate/yahoo.rs @@ -316,7 +316,7 @@ pub async fn fetch_earnings_with_pool( ticker: &str, ) -> anyhow::Result> { let ticker = ticker.to_string(); - let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}", ticker); + let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}&offset=0&size=100", ticker); let ticker_cloned = ticker.clone(); diff --git a/src/lib.rs b/src/lib.rs index 91fe582..d136605 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,3 +15,7 @@ pub use monitoring::{init_monitoring, ConfigSnapshot, MonitoringEvent}; pub use config::Config; pub use scraper::webdriver::{ChromeDriverPool, ChromeInstance, ScrapeTask}; pub use util::logger; +pub use scraper::yahoo::{ + YahooClient, YahooClientPool, QuoteSummaryModule, QuoteSummary, ChartData, + OptionsData, SearchResult +}; diff --git a/src/monitoring/dashboard.html b/src/monitoring/dashboard.html index ad62db4..8704c29 100644 --- a/src/monitoring/dashboard.html +++ b/src/monitoring/dashboard.html @@ -250,6 +250,35 @@ text-transform: uppercase; } + /* Yahoo Stats */ + .yahoo-stats-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); + gap: 12px; + margin-top: 10px; + } + + .yahoo-stat-box { + background: #2a3a4a; + padding: 15px; + border-radius: 5px; + text-align: center; + border-left: 4px solid #FF9800; + } + + .yahoo-stat-value { + font-size: 28px; + font-weight: bold; + color: #FF9800; + margin-bottom: 5px; + } + + .yahoo-stat-label { + font-size: 11px; + color: #aaa; + text-transform: uppercase; + } + /* Logs */ .log-container { max-height: 300px; @@ -339,6 +368,31 @@ .pulse { animation: pulse 2s infinite; } + + /* Yahoo Client Box */ + .yahoo-client-box { + background: #2a3a4a; + border: 2px solid #FF9800; + border-radius: 5px; + padding: 12px; + display: flex; + gap: 0; + overflow: hidden; + } + + .yahoo-client-side { + flex: 1; + padding: 12px; + } + + .yahoo-client-side.left { + background: #3a4a5a; + border-right: 1px solid #555; + } + + .yahoo-client-side.right { + background: #2a3a4a; + } @@ -363,6 +417,13 @@
+ +
+
📈 YAHOO API METRICS
+
+
+
+
📊 GLOBAL METRICS
@@ -432,6 +493,8 @@ updateConfig(state.config); updateInstances(state.instances); updateGlobalStats(state.global); + updateYahooStats(state.global); + updateYahooClients(state.yahoo_clients); updateLogs(state.logs); } @@ -480,6 +543,10 @@ ? ((inst.success_count / inst.total_requests) * 100).toFixed(1) : '0.0'; + const yahooSuccessRate = inst.yahoo_requests > 0 + ? ((inst.yahoo_success / inst.yahoo_requests) * 100).toFixed(1) + : '0.0'; + return `
@@ -511,6 +578,16 @@ ${successRate}%
+
+ Yahoo Requests + ${inst.yahoo_requests} +
+
+ Yahoo Rate + + ${yahooSuccessRate}% + +
Last Activity ${inst.last_activity} @@ -556,6 +633,115 @@ }).join(''); } + function updateYahooStats(global) { + const container = document.getElementById('yahoo-stats'); + const yahooSuccessRate = global.total_yahoo_requests > 0 + ? ((global.successful_yahoo_requests / global.total_yahoo_requests) * 100).toFixed(1) + : '0.0'; + + container.innerHTML = ` +
+
${global.total_yahoo_requests || 0}
+
Total Requests
+
+
+
${yahooSuccessRate}%
+
Success Rate
+
+
+
${global.successful_yahoo_requests || 0}
+
Successful
+
+
+
${global.failed_yahoo_requests || 0}
+
Failed
+
+
+
${global.yahoo_client_count || 0}
+
Active Clients
+
+
+
${global.yahoo_batch_requests || 0}
+
Batch Requests
+
+
+
${global.yahoo_session_renewals || 0}
+
Session Renewals
+
+ `; + } + + function updateYahooClients(yahooClients) { + const container = document.getElementById('yahoo-clients'); + if (!yahooClients || yahooClients.length === 0) { + container.innerHTML = '
No Yahoo clients available
'; + return; + } + + container.innerHTML = yahooClients.map(client => { + const successRate = client.requests_total > 0 + ? ((client.requests_successful / client.requests_total) * 100).toFixed(1) + : '0.0'; + + return ` +
+
+
+ 📊 Yahoo Client #${client.instance_id} + ${client.has_proxy ? '🔗' : '🌐'} +
+
+ Total Requests + ${client.requests_total} +
+
+ Success / Fail + ${client.requests_successful} / ${client.requests_failed} +
+
+ Success Rate + + ${successRate}% + +
+
+ Current / Max + + ${client.current_requests} / ${client.max_requests} + +
+
+ Last Activity + ${client.last_activity} +
+
+
+ ${client.proxy_info ? ` +
🔗 ${client.proxy_info.container_name}
+
+ IP Address + ${client.proxy_info.ip_address} +
+
+ Port + ${client.proxy_info.port} +
+
+ Status + ${client.proxy_info.status} +
+ ` : ` +
+ ${client.has_proxy ? '⚠️' : '🌐'}
+ ${client.has_proxy ? 'Proxy Not Connected' : 'Direct Connection'} +
+ `} +
+
+ `; + }).join(''); + } + function updateGlobalStats(global) { const container = document.getElementById('global-stats'); diff --git a/src/monitoring/events.rs b/src/monitoring/events.rs index a5ee18e..a8b1b26 100644 --- a/src/monitoring/events.rs +++ b/src/monitoring/events.rs @@ -92,6 +92,45 @@ pub enum MonitoringEvent { reason: String, }, + // Yahoo API events + YahooRequestStarted { + instance_id: usize, + endpoint: String, + symbol: Option, + }, + + YahooRequestCompleted { + instance_id: usize, + success: bool, + duration_ms: u64, + error: Option, + }, + + YahooBatchRequestStarted { + count: usize, + symbols: Vec, + endpoint: String, + }, + + YahooBatchRequestCompleted { + successful: usize, + failed: usize, + total: usize, + duration_ms: u64, + }, + + YahooClientCreated { + instance_id: usize, + has_proxy: bool, + max_requests: u32, + }, + + YahooClientReset { + instance_id: usize, + previous_requests: u32, + reason: String, + }, + // Logging LogMessage { level: LogLevel, diff --git a/src/monitoring/metrics.rs b/src/monitoring/metrics.rs index da7afec..cb44134 100644 --- a/src/monitoring/metrics.rs +++ b/src/monitoring/metrics.rs @@ -9,6 +9,7 @@ pub struct DashboardState { pub config: ConfigSnapshot, pub instances: Vec, pub proxies: Vec, + pub yahoo_clients: Vec, pub global: GlobalMetrics, pub logs: Vec, } @@ -38,6 +39,14 @@ pub struct InstanceMetrics { pub failure_count: usize, pub connected_proxy: Option, pub last_activity: String, // Timestamp + pub yahoo_requests: usize, + pub yahoo_success: usize, + pub yahoo_failures: usize, + pub yahoo_success_rate: f64, + pub yahoo_current_requests: u32, + pub yahoo_max_requests: u32, + pub yahoo_last_endpoint: Option, + pub yahoo_last_symbol: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -75,6 +84,20 @@ pub struct ProxyMetrics { pub instances_using: Vec, } +/// Metrics for a Yahoo client +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct YahooClientMetrics { + pub instance_id: usize, + pub requests_total: usize, + pub requests_successful: usize, + pub requests_failed: usize, + pub current_requests: u32, + pub max_requests: u32, + pub has_proxy: bool, + pub last_activity: String, + pub proxy_info: Option, +} + /// Global pool metrics #[derive(Debug, Clone, Serialize, Deserialize)] pub struct GlobalMetrics { @@ -88,6 +111,13 @@ pub struct GlobalMetrics { pub bot_detection_hits: usize, pub proxy_failures: usize, pub uptime_seconds: u64, + pub total_yahoo_requests: usize, + pub successful_yahoo_requests: usize, + pub failed_yahoo_requests: usize, + pub yahoo_success_rate: f64, + pub yahoo_batch_requests: usize, + pub yahoo_session_renewals: usize, + pub yahoo_client_count: usize, } /// Log entry for display in dashboard @@ -111,6 +141,7 @@ pub enum LogLevel { pub struct MonitoringState { pub instances: HashMap, pub proxies: HashMap, + pub yahoo_clients: HashMap, pub global: GlobalState, pub start_time: Instant, } @@ -128,6 +159,13 @@ pub struct InstanceState { pub failure_count: usize, pub connected_proxy: Option, pub last_activity: Instant, + pub yahoo_requests: usize, + pub yahoo_success: usize, + pub yahoo_failures: usize, + pub yahoo_current_requests: u32, + pub yahoo_max_requests: u32, + pub yahoo_last_endpoint: Option, + pub yahoo_last_symbol: Option, } #[derive(Debug, Clone)] @@ -139,6 +177,19 @@ pub struct ProxyState { pub instances_using: Vec, } +#[derive(Debug, Clone)] +pub struct YahooClientState { + pub instance_id: usize, + pub requests_total: usize, + pub requests_successful: usize, + pub requests_failed: usize, + pub current_requests: u32, + pub max_requests: u32, + pub has_proxy: bool, + pub last_activity: Instant, + pub proxy_info: Option, +} + #[derive(Debug, Clone)] pub struct GlobalState { pub total_requests: usize, @@ -149,6 +200,12 @@ pub struct GlobalState { pub navigation_timeouts: usize, pub bot_detection_hits: usize, pub proxy_failures: usize, + pub total_yahoo_requests: usize, + pub successful_yahoo_requests: usize, + pub failed_yahoo_requests: usize, + pub yahoo_batch_requests: usize, + pub yahoo_session_renewals: usize, + pub yahoo_client_count: usize, } impl MonitoringState { @@ -156,6 +213,7 @@ impl MonitoringState { Self { instances: HashMap::new(), proxies: HashMap::new(), + yahoo_clients: HashMap::new(), global: GlobalState { total_requests: 0, successful_requests: 0, @@ -165,6 +223,12 @@ impl MonitoringState { navigation_timeouts: 0, bot_detection_hits: 0, proxy_failures: 0, + total_yahoo_requests: 0, + successful_yahoo_requests: 0, + failed_yahoo_requests: 0, + yahoo_batch_requests: 0, + yahoo_session_renewals: 0, + yahoo_client_count: 0, }, start_time: Instant::now(), } @@ -175,18 +239,34 @@ impl MonitoringState { let instances: Vec = self .instances .values() - .map(|inst| InstanceMetrics { - id: inst.id, - status: inst.status.clone(), - current_task: inst.current_task.clone(), - tasks_current_session: inst.tasks_current_session, - tasks_max: inst.tasks_max, - session_requests: inst.session_requests, - total_requests: inst.total_requests, - success_count: inst.success_count, - failure_count: inst.failure_count, - connected_proxy: inst.connected_proxy.clone(), - last_activity: format_timestamp(inst.last_activity), + .map(|inst| { + let yahoo_success_rate = if inst.yahoo_success + inst.yahoo_failures > 0 { + (inst.yahoo_success as f64 / (inst.yahoo_success + inst.yahoo_failures) as f64) * 100.0 + } else { + 0.0 + }; + + InstanceMetrics { + id: inst.id, + status: inst.status.clone(), + current_task: inst.current_task.clone(), + tasks_current_session: inst.tasks_current_session, + tasks_max: inst.tasks_max, + session_requests: inst.session_requests, + total_requests: inst.total_requests, + success_count: inst.success_count, + failure_count: inst.failure_count, + connected_proxy: inst.connected_proxy.clone(), + last_activity: format_timestamp(inst.last_activity), + yahoo_requests: inst.yahoo_requests, + yahoo_success: inst.yahoo_success, + yahoo_failures: inst.yahoo_failures, + yahoo_success_rate, + yahoo_current_requests: inst.yahoo_current_requests, + yahoo_max_requests: inst.yahoo_max_requests, + yahoo_last_endpoint: inst.yahoo_last_endpoint.clone(), + yahoo_last_symbol: inst.yahoo_last_symbol.clone(), + } }) .collect(); @@ -202,12 +282,34 @@ impl MonitoringState { }) .collect(); + let yahoo_clients: Vec = self + .yahoo_clients + .values() + .map(|client| YahooClientMetrics { + instance_id: client.instance_id, + requests_total: client.requests_total, + requests_successful: client.requests_successful, + requests_failed: client.requests_failed, + current_requests: client.current_requests, + max_requests: client.max_requests, + has_proxy: client.has_proxy, + last_activity: format_timestamp(client.last_activity), + proxy_info: client.proxy_info.clone(), + }) + .collect(); + let success_rate = if self.global.total_requests > 0 { (self.global.successful_requests as f64 / self.global.total_requests as f64) * 100.0 } else { 0.0 }; + let yahoo_success_rate = if self.global.total_yahoo_requests > 0 { + (self.global.successful_yahoo_requests as f64 / self.global.total_yahoo_requests as f64) * 100.0 + } else { + 0.0 + }; + let global = GlobalMetrics { total_requests: self.global.total_requests, successful_requests: self.global.successful_requests, @@ -219,12 +321,20 @@ impl MonitoringState { bot_detection_hits: self.global.bot_detection_hits, proxy_failures: self.global.proxy_failures, uptime_seconds: self.start_time.elapsed().as_secs(), + total_yahoo_requests: self.global.total_yahoo_requests, + successful_yahoo_requests: self.global.successful_yahoo_requests, + failed_yahoo_requests: self.global.failed_yahoo_requests, + yahoo_success_rate, + yahoo_batch_requests: self.global.yahoo_batch_requests, + yahoo_session_renewals: self.global.yahoo_session_renewals, + yahoo_client_count: self.global.yahoo_client_count, }; DashboardState { config, instances, proxies, + yahoo_clients, global, logs, } @@ -233,7 +343,6 @@ impl MonitoringState { fn format_timestamp(instant: Instant) -> String { use chrono::Local; - // This is a placeholder - in real impl we'd track actual wall-clock time Local::now().format("%H:%M:%S").to_string() } diff --git a/src/monitoring/service.rs b/src/monitoring/service.rs index baf0ae7..d5d3591 100644 --- a/src/monitoring/service.rs +++ b/src/monitoring/service.rs @@ -76,6 +76,13 @@ impl MonitoringService { failure_count: 0, connected_proxy: proxy.clone(), last_activity: Instant::now(), + yahoo_requests: 0, + yahoo_success: 0, + yahoo_failures: 0, + yahoo_current_requests: 0, + yahoo_max_requests: 0, + yahoo_last_endpoint: None, + yahoo_last_symbol: None, }, ); @@ -193,9 +200,9 @@ impl MonitoringService { if let Some(inst) = state.instances.get(&instance_id) { Some(SessionSummary { instance_id, - session_start: "N/A".to_string(), // We'd need to track this + session_start: "N/A".to_string(), session_end: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), - duration_seconds: 0, // We'd need to track session start time + duration_seconds: 0, total_requests: old_request_count, successful_requests: inst.success_count, failed_requests: inst.failure_count, @@ -283,6 +290,154 @@ impl MonitoringService { self.log_info(format!("Pool rotation triggered: {}", reason)).await; } + // Yahoo API Events + MonitoringEvent::YahooRequestStarted { instance_id, endpoint, symbol } => { + let mut state = self.state.write().await; + + // Update global Yahoo stats + state.global.total_yahoo_requests += 1; + + // Update instance stats + if let Some(inst) = state.instances.get_mut(&instance_id) { + inst.yahoo_requests += 1; + inst.yahoo_current_requests += 1; + inst.yahoo_last_endpoint = Some(endpoint.clone()); + inst.yahoo_last_symbol = symbol.clone(); + inst.last_activity = Instant::now(); + } + + // Update Yahoo client stats + if let Some(client) = state.yahoo_clients.get_mut(&instance_id) { + client.requests_total += 1; + client.current_requests += 1; + client.last_activity = Instant::now(); + } + + self.log_info(format!( + "YahooClient[{}] started request: {} {}", + instance_id, + endpoint, + symbol.unwrap_or_else(|| "search".to_string()) + )).await; + } + + MonitoringEvent::YahooRequestCompleted { instance_id, success, duration_ms, error } => { + let mut state = self.state.write().await; + + // Update global Yahoo stats + if success { + state.global.successful_yahoo_requests += 1; + } else { + state.global.failed_yahoo_requests += 1; + } + + // Update instance stats + if let Some(inst) = state.instances.get_mut(&instance_id) { + inst.yahoo_current_requests = inst.yahoo_current_requests.saturating_sub(1); + if success { + inst.yahoo_success += 1; + } else { + inst.yahoo_failures += 1; + } + inst.last_activity = Instant::now(); + } + + // Update Yahoo client stats + if let Some(client) = state.yahoo_clients.get_mut(&instance_id) { + client.current_requests = client.current_requests.saturating_sub(1); + if success { + client.requests_successful += 1; + } else { + client.requests_failed += 1; + } + client.last_activity = Instant::now(); + } + + if success { + self.log_info(format!( + "YahooClient[{}] completed request in {}ms", + instance_id, duration_ms + )).await; + } else { + self.log_error(format!( + "YahooClient[{}] failed request in {}ms: {}", + instance_id, + duration_ms, + error.unwrap_or_else(|| "unknown error".to_string()) + )).await; + } + } + + MonitoringEvent::YahooBatchRequestStarted { count, symbols, endpoint } => { + let mut state = self.state.write().await; + state.global.yahoo_batch_requests += 1; + + self.log_info(format!( + "Yahoo batch request started: {} symbols, endpoint: {}", + count, endpoint + )).await; + + if !symbols.is_empty() { + self.log_debug(format!( + "Batch symbols: {}", + symbols.join(", ") + )).await; + } + } + + MonitoringEvent::YahooBatchRequestCompleted { successful, failed, total, duration_ms } => { + let success_rate = if total > 0 { + (successful as f64 / total as f64) * 100.0 + } else { + 0.0 + }; + + self.log_info(format!( + "Yahoo batch completed: {}/{} successful ({:.1}%) in {}ms", + successful, total, success_rate, duration_ms + )).await; + } + + MonitoringEvent::YahooClientCreated { instance_id, has_proxy, max_requests } => { + let mut state = self.state.write().await; + state.global.yahoo_client_count += 1; + + state.yahoo_clients.insert( + instance_id, + YahooClientState { + instance_id, + requests_total: 0, + requests_successful: 0, + requests_failed: 0, + current_requests: 0, + max_requests, + has_proxy, + last_activity: Instant::now(), + proxy_info: None, + }, + ); + + self.log_info(format!( + "YahooClient[{}] created (proxy: {}, max requests: {})", + instance_id, has_proxy, max_requests + )).await; + } + + MonitoringEvent::YahooClientReset { instance_id, previous_requests, reason } => { + let mut state = self.state.write().await; + state.global.yahoo_session_renewals += 1; + + if let Some(client) = state.yahoo_clients.get_mut(&instance_id) { + client.current_requests = 0; + client.last_activity = Instant::now(); + } + + self.log_info(format!( + "YahooClient[{}] reset (had {} requests, reason: {})", + instance_id, previous_requests, reason + )).await; + } + MonitoringEvent::LogMessage { level, message } => { match level { crate::monitoring::events::LogLevel::Info => self.log_info(message).await, @@ -317,6 +472,17 @@ impl MonitoringService { }).await; } + async fn log_debug(&self, message: String) { + // Only log debug if DEBUG_LOGGING is enabled + if std::env::var("DEBUG_LOGGING").is_ok() { + self.add_log(LogEntry { + timestamp: Local::now().format("%H:%M:%S").to_string(), + level: super::metrics::LogLevel::Info, + message: format!("[DEBUG] {}", message), + }).await; + } + } + async fn add_log(&self, entry: LogEntry) { let mut logs = self.logs.write().await; if logs.len() >= MAX_LOGS { diff --git a/src/scraper/docker_vpn_proxy.rs b/src/scraper/docker_vpn_proxy.rs index abe9dff..d9b598e 100644 --- a/src/scraper/docker_vpn_proxy.rs +++ b/src/scraper/docker_vpn_proxy.rs @@ -355,7 +355,7 @@ impl DockerVpnProxyPool { pub fn get_proxy_url(&self, index: usize) -> String { let port = self.proxy_ports[index % self.proxy_ports.len()]; - format!("socks5://localhost:{}", port) + format!("socks5h://localhost:{}", port) } pub fn num_proxies(&self) -> usize { diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index dece4db..4dbad6d 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -1,4 +1,5 @@ pub mod webdriver; pub mod docker_vpn_proxy; pub mod helpers; -pub mod hard_reset; \ No newline at end of file +pub mod hard_reset; +pub mod yahoo; diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index f32906d..ca69469 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -582,6 +582,9 @@ impl ChromeDriverPool { self.instances.len() } } + pub fn get_proxy_pool(&self) -> Option> { + self.proxy_pool.clone() + } } /// Represents a single instance of chromedriver process, optionally bound to a VPN. diff --git a/src/scraper/yahoo.rs b/src/scraper/yahoo.rs new file mode 100644 index 0000000..2756cca --- /dev/null +++ b/src/scraper/yahoo.rs @@ -0,0 +1,1349 @@ +// src/scraper/yahoo.rs - WITH COOKIE AND CRUMB AUTHENTICATION + CLIENT REPLACEMENT +use super::docker_vpn_proxy::DockerVpnProxyPool; +use crate::config::Config; +use crate::util::logger; +use anyhow::{Context, Result, anyhow}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; +use tokio::sync::{Mutex, Semaphore, RwLock}; +use tokio::time::sleep; +use reqwest::{Client, ClientBuilder, retry}; +use reqwest::Proxy as ReqwestProxy; +use std::result::Result::Ok; + +// Yahoo API Endpoints +const YAHOO_QUOTE_SUMMARY: &str = "https://query2.finance.yahoo.com/v10/finance/quoteSummary/"; +const YAHOO_CHART_DATA: &str = "https://query2.finance.yahoo.com/v8/finance/chart/"; +const YAHOO_OPTIONS_DATA: &str = "https://query1.finance.yahoo.com/v7/finance/options/"; +const YAHOO_SEARCH: &str = "https://query2.finance.yahoo.com/v1/finance/search"; +const YAHOO_HOMEPAGE: &str = "https://finance.yahoo.com"; +const YAHOO_CRUMB_URL: &str = "https://query2.finance.yahoo.com/v1/test/getcrumb"; + +// QuoteSummary Modules +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum QuoteSummaryModule { + AssetProfile, + SummaryProfile, + SummaryDetail, + EsgScores, + Price, + IncomeStatementHistory, + IncomeStatementHistoryQuarterly, + BalanceSheetHistory, + BalanceSheetHistoryQuarterly, + CashflowStatementHistory, + CashflowStatementHistoryQuarterly, + DefaultKeyStatistics, + FinancialData, + CalendarEvents, + SecFilings, + RecommendationTrend, + UpgradeDowngradeHistory, + InstitutionOwnership, + FundOwnership, + MajorDirectHolders, + MajorHoldersBreakdown, + InsiderTransactions, + InsiderHolders, + NetSharePurchaseActivity, + Earnings, + EarningsHistory, + EarningsTrend, + IndustryTrend, + IndexTrend, + SectorTrend, +} + +impl QuoteSummaryModule { + fn as_str(&self) -> &'static str { + match self { + Self::AssetProfile => "assetProfile", + Self::SummaryProfile => "summaryProfile", + Self::SummaryDetail => "summaryDetail", + Self::EsgScores => "esgScores", + Self::Price => "price", + Self::IncomeStatementHistory => "incomeStatementHistory", + Self::IncomeStatementHistoryQuarterly => "incomeStatementHistoryQuarterly", + Self::BalanceSheetHistory => "balanceSheetHistory", + Self::BalanceSheetHistoryQuarterly => "balanceSheetHistoryQuarterly", + Self::CashflowStatementHistory => "cashflowStatementHistory", + Self::CashflowStatementHistoryQuarterly => "cashflowStatementHistoryQuarterly", + Self::DefaultKeyStatistics => "defaultKeyStatistics", + Self::FinancialData => "financialData", + Self::CalendarEvents => "calendarEvents", + Self::SecFilings => "secFilings", + Self::RecommendationTrend => "recommendationTrend", + Self::UpgradeDowngradeHistory => "upgradeDowngradeHistory", + Self::InstitutionOwnership => "institutionOwnership", + Self::FundOwnership => "fundOwnership", + Self::MajorDirectHolders => "majorDirectHolders", + Self::MajorHoldersBreakdown => "majorHoldersBreakdown", + Self::InsiderTransactions => "insiderTransactions", + Self::InsiderHolders => "insiderHolders", + Self::NetSharePurchaseActivity => "netSharePurchaseActivity", + Self::Earnings => "earnings", + Self::EarningsHistory => "earningsHistory", + Self::EarningsTrend => "earningsTrend", + Self::IndustryTrend => "industryTrend", + Self::IndexTrend => "indexTrend", + Self::SectorTrend => "sectorTrend", + } + } + + pub fn all_modules() -> Vec { + vec![ + Self::AssetProfile, + Self::SummaryProfile, + Self::SummaryDetail, + Self::EsgScores, + Self::Price, + Self::IncomeStatementHistory, + Self::IncomeStatementHistoryQuarterly, + Self::BalanceSheetHistory, + Self::BalanceSheetHistoryQuarterly, + Self::CashflowStatementHistory, + Self::CashflowStatementHistoryQuarterly, + Self::DefaultKeyStatistics, + Self::FinancialData, + Self::CalendarEvents, + Self::SecFilings, + Self::RecommendationTrend, + Self::UpgradeDowngradeHistory, + Self::InstitutionOwnership, + Self::FundOwnership, + Self::MajorDirectHolders, + Self::MajorHoldersBreakdown, + Self::InsiderTransactions, + Self::InsiderHolders, + Self::NetSharePurchaseActivity, + Self::Earnings, + Self::EarningsHistory, + Self::EarningsTrend, + Self::IndustryTrend, + Self::IndexTrend, + Self::SectorTrend, + ] + } + + pub fn core_modules() -> Vec { + vec![ + Self::Price, + Self::SummaryDetail, + Self::DefaultKeyStatistics, + Self::FinancialData, + ] + } +} + +// Response structures +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct QuoteSummary { + pub symbol: String, + pub modules: HashMap, + pub timestamp: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChartData { + pub symbol: String, + pub quotes: Vec, + pub timestamp: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Quote { + pub timestamp: i64, + pub open: Option, + pub high: Option, + pub low: Option, + pub close: Option, + pub volume: Option, + pub adjusted_close: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OptionsData { + pub symbol: String, + pub expiration_dates: Vec, + pub strikes: Vec, + pub options: Vec, + pub timestamp: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OptionChain { + pub expiration_date: i64, + pub calls: Vec, + pub puts: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OptionContract { + pub strike: f64, + pub last_price: Option, + pub bid: Option, + pub ask: Option, + pub volume: Option, + pub open_interest: Option, + pub implied_volatility: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SearchResult { + pub symbol: String, + pub name: String, + pub exchange: String, + pub asset_type: String, +} + +/// YahooClient - Individual client with unique proxy and cookie/crumb management +pub struct YahooClient { + client: Client, + client_id: usize, + proxy_url: String, + task_count: Arc>, + max_tasks_per_client: usize, + request_count: Arc>, + monitoring: Option, + crumb: Arc>>, // Cached crumb + crumb_last_refresh: Arc>>, // Track when crumb was last refreshed + consecutive_failures: Arc, + max_consecutive_failures: usize, // Default: 3 + is_marked_for_replacement: Arc, +} + +impl YahooClient { + /// Create a new YahooClient with mandatory proxy assignment and cookie/crumb authentication + pub async fn new( + client_id: usize, + proxy_url: String, + max_tasks_per_client: usize, + monitoring: Option, + ) -> Result { + logger::log_info(&format!( + "Creating YahooClient[{}] with proxy: {}", + client_id, proxy_url + )).await; + + // Configure reqwest client with SOCKS5 proxy and cookie support + let proxy = ReqwestProxy::all(&proxy_url) + .context("Failed to create proxy configuration")?; + + let client = ClientBuilder::new() + .proxy(proxy) + .timeout(Duration::from_secs(90)) + .connect_timeout(Duration::from_secs(30)) + .pool_max_idle_per_host(2) + .pool_idle_timeout(Duration::from_secs(60)) + .cookie_store(true) + .user_agent(Self::random_user_agent()) + .default_headers({ + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert("Accept", "application/json, text/plain, */*".parse().unwrap()); + headers.insert("Accept-Language", "en-US,en;q=0.9".parse().unwrap()); + headers.insert("Accept-Encoding", "gzip, deflate, br".parse().unwrap()); + headers.insert("Connection", "keep-alive".parse().unwrap()); + headers.insert("Referer", "https://finance.yahoo.com/".parse().unwrap()); + headers.insert("Origin", "https://finance.yahoo.com".parse().unwrap()); + headers.insert("Sec-Fetch-Dest", "empty".parse().unwrap()); + headers.insert("Sec-Fetch-Mode", "cors".parse().unwrap()); + headers.insert("Sec-Fetch-Site", "same-site".parse().unwrap()); + headers + }) + .build() + .context("Failed to build HTTP client")?; + + let yahoo_client = Self { + client, + client_id, + proxy_url, + task_count: Arc::new(Mutex::new(0)), + max_tasks_per_client, + request_count: Arc::new(Mutex::new(0)), + monitoring, + crumb: Arc::new(Mutex::new(None)), + crumb_last_refresh: Arc::new(Mutex::new(None)), + consecutive_failures: Arc::new(AtomicUsize::new(0)), + max_consecutive_failures: 3, + is_marked_for_replacement: Arc::new(AtomicBool::new(false)), + }; + + // Initialize crumb + yahoo_client.retry_initialize_crumb(&yahoo_client).await?; + + logger::log_info(&format!( + " ✓ YahooClient[{}] initialized (max_tasks: {})", + client_id, max_tasks_per_client + )).await; + + Ok(yahoo_client) + } + + async fn retry_initialize_crumb(&self, client: &YahooClient) -> Result<()> { + let mut last_err = None; + + for attempt in 1..=3 { + match client.initialize_crumb().await { + Ok(()) => return Ok(()), + Err(e) => { + let error_str = e.to_string(); + last_err = Some(e); + + // If it's a permanent error, don't retry + if error_str.contains("Invalid Cookie") && attempt >= 2 { + logger::log_error(&format!( + " YahooClient[{}] permanent cookie error, not retrying", + self.client_id + )).await; + break; + } + + if attempt < 3 { + let delay_ms = 300 * attempt; // Exponential backoff + logger::log_info(&format!( + " YahooClient[{}] crumb attempt {}/3 failed, retrying in {}ms: {}", + self.client_id, attempt, delay_ms, error_str + )).await; + sleep(Duration::from_millis(delay_ms)).await; + } + } + } + } + + Err(last_err.unwrap_or_else(|| anyhow!("Max retries exceeded"))) + } + + // Update the crumb fetching in the initialize_crumb function + async fn initialize_crumb(&self) -> Result<()> { + logger::log_info(&format!( + " YahooClient[{}] fetching cookies and crumb...", + self.client_id + )).await; + + let mut cookies_established = false; + + // Step 1: Visit Yahoo Finance homepage to establish session and get cookies + let homepage_response = self.client + .get(YAHOO_HOMEPAGE) + .header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8") + .header("Accept-Language", "en-US,en;q=0.9") + .header("Accept-Encoding", "gzip, deflate, br") + .header("Connection", "keep-alive") + .header("Upgrade-Insecure-Requests", "1") + .header("Sec-Fetch-Dest", "document") + .header("Sec-Fetch-Mode", "navigate") + .header("Sec-Fetch-Site", "none") + .header("Sec-Fetch-User", "?1") + .header("Cache-Control", "max-age=0") + .send() + .await + .context("Failed to fetch Yahoo Finance homepage")?; + + if !homepage_response.status().is_success() { + return Err(anyhow!( + "Failed to load Yahoo Finance homepage: HTTP {}", + homepage_response.status() + )); + } + + // Step 2: Now try to fetch the crumb with enhanced headers + let crumb_response = self.client + .get(YAHOO_CRUMB_URL) + .header("Accept", "*/*") + .header("Accept-Language", "en-US,en;q=0.9") + .header("Accept-Encoding", "gzip, deflate, br") + .header("Connection", "keep-alive") + .header("Referer", "https://finance.yahoo.com/") + .header("Sec-Fetch-Dest", "script") + .header("Sec-Fetch-Mode", "no-cors") + .header("Sec-Fetch-Site", "same-site") + .header("Origin", "https://finance.yahoo.com") + .send() + .await; + + match crumb_response { + Ok(response) if response.status().is_success() => { + let crumb_text = response.text().await?; + let crumb = crumb_text.trim().to_string(); + + if !crumb.is_empty() && crumb != "null" && !crumb.contains("