From 17207161442fd4b32f13749502b8f70112941ba4 Mon Sep 17 00:00:00 2001 From: donpat1to Date: Thu, 8 Jan 2026 00:35:10 +0100 Subject: [PATCH] added event enrichment --- src/corporate/mod.rs | 9 +- src/corporate/update.rs | 25 +- src/corporate/update_companies_cleanse.rs | 2 +- src/corporate/update_companies_enrich.rs | 526 +++++++++++++++++++++- src/main.rs | 2 +- src/scraper/yahoo.rs | 325 ++++++++----- 6 files changed, 751 insertions(+), 138 deletions(-) diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 38648ea..5a74594 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -2,15 +2,18 @@ pub mod types; pub mod scraper; pub mod storage; -pub mod update; pub mod helpers; pub mod aggregation; pub mod fx; pub mod openfigi; pub mod yahoo; -pub mod update_companies; -pub mod update_companies_cleanse; pub mod page_validation; pub mod atomic_writer; +// Corporate update modules +pub mod update; +pub mod update_companies; +pub mod update_companies_cleanse; +pub mod update_companies_enrich; + pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 2fbcb63..68c4de7 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -3,6 +3,7 @@ use super::{scraper::*, storage::*, openfigi::*}; use crate::config::Config; use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel; use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data}; +use crate::corporate::update_companies_enrich::enrich_companies_with_events; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; @@ -87,16 +88,16 @@ pub async fn run_full_update( return Ok(()); } - logger::log_info("Step 6: Cleansing up companies with missing essential data...").await; + logger::log_info("Step 6: Cleansing companies with missing essential data...").await; let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?; logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await; if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after companies.jsonl build").await; + logger::log_warn("Shutdown detected after no-data cleansing").await; return Ok(()); } - logger::log_info("Step 7: Cleansing up companies with too low profile (with abort-safe persistence)...").await; + logger::log_info("Step 7: Cleansing companies with too low profile (with abort-safe persistence)...").await; let proxy_pool = pool.get_proxy_pool() .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?; @@ -104,11 +105,25 @@ pub async fn run_full_update( let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?); logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await; - let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool, shutdown_flag).await?; + let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await; + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after low-profile cleansing").await; + return Ok(()); + } + + logger::log_info("Step 8: Enriching companies with Yahoo Events (with abort-safe persistence)...").await; + let enriched_count = enrich_companies_with_events(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies enriched with event data", enriched_count)).await; + + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after event enrichment").await; + return Ok(()); + } + if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 8: Processing events (using index)...").await; + logger::log_info("Step 9: Processing events (using index)...").await; let _event_index = build_event_index(&paths).await?; logger::log_info(" ✓ Event index built").await; } else { diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index 6e2afe2..7d6df77 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -174,7 +174,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result, shutdown_flag: &Arc, ) -> anyhow::Result { diff --git a/src/corporate/update_companies_enrich.rs b/src/corporate/update_companies_enrich.rs index 884f7b7..9675de0 100644 --- a/src/corporate/update_companies_enrich.rs +++ b/src/corporate/update_companies_enrich.rs @@ -1,22 +1,22 @@ -// src/corporate/update_companies_enrich.rs -use super::{helpers::*, types::*}; +// src/corporate/update_companies_enrich_events.rs +use super::{types::*}; use crate::config::Config; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; use std::result::Result::Ok; -use chrono::{Local, Utc}; -use std::collections::HashMap; +use chrono::Utc; +use std::collections::{HashSet}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::fs::{OpenOptions}; +use tokio::io::{AsyncWriteExt}; use futures::stream::{FuturesUnordered, StreamExt}; use serde_json::json; use tokio::sync::mpsc; -/// Yahoo enriching data per corporate +/// Yahoo Event enrichment per corporate company /// /// # Features /// - Graceful shutdown (abort-safe) @@ -28,7 +28,515 @@ use tokio::sync::mpsc; /// /// # Persistence Strategy /// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) -/// - Log: companies_update.log (append-only updates) +/// - Log: companies_events_updates.log (append-only updates) /// - On restart: Load checkpoint + replay log /// - Periodic checkpoints (every 50 companies) -/// - Batched fsync (every 10 writes or 10 seconds) \ No newline at end of file +/// - Batched fsync (every 10 writes or 10 seconds) +pub async fn enrich_companies_with_events( + paths: &DataPaths, + _config: &Config, + yahoo_pool: Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 50; + const FSYNC_BATCH_SIZE: usize = 10; + const FSYNC_INTERVAL_SECS: u64 = 10; + const CONCURRENCY_LIMIT: usize = 50; // Limit parallel enrichment tasks + + let data_path = paths.data_dir(); + + // File paths + let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); + let log_path = data_path.join("companies_events_updates.log"); + let state_path = data_path.join("state.jsonl"); + + // Check input exists + if !input_path.exists() { + logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping event enrichment").await; + return Ok(0); + } + + // Check if already completed + if state_path.exists() { + let state_content = tokio::fs::read_to_string(&state_path).await?; + + for line in state_content.lines() { + if line.trim().is_empty() { + continue; + } + + if let Ok(state) = serde_json::from_str::(line) { + if state.get("yahoo_events_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) { + logger::log_info(" Yahoo events enrichment already completed").await; + + // Count enriched companies + let count = count_enriched_companies(paths).await?; + logger::log_info(&format!(" ✓ Found {} companies with event data", count)).await; + return Ok(count); + } + } + } + } + + // === RECOVERY PHASE: Track enriched companies === + let mut enriched_companies: HashSet = HashSet::new(); + + if log_path.exists() { + logger::log_info("Loading enrichment progress from log...").await; + let log_content = tokio::fs::read_to_string(&log_path).await?; + + for line in log_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(entry) => { + if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) { + if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") { + enriched_companies.insert(name.to_string()); + } + } + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; + } + } + } + logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await; + } + + // Load all companies from input + logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await; + let companies = load_companies_from_jsonl(&input_path).await?; + let total_companies = companies.len(); + logger::log_info(&format!("Found {} companies to process", total_companies)).await; + + // Filter companies that need enrichment + let pending_companies: Vec = companies + .into_iter() + .filter(|company| !enriched_companies.contains(&company.name)) + .collect(); + + let pending_count = pending_companies.len(); + logger::log_info(&format!( + " {} already enriched, {} pending", + enriched_companies.len(), + pending_count + )).await; + + if pending_count == 0 { + logger::log_info(" ✓ All companies already enriched").await; + mark_enrichment_complete(&state_path).await?; + return Ok(enriched_companies.len()); + } + + // === PROCESSING PHASE: Enrich companies with events === + + // Shared counters + let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let success_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let failed_count = Arc::new(AtomicUsize::new(0)); + + // Log writer channel with batching and fsync + let (log_tx, mut log_rx) = mpsc::channel::(1000); + + // Spawn log writer task + let log_writer_handle = { + let log_path = log_path.clone(); + let processed_count = Arc::clone(&processed_count); + let total_companies = total_companies; + + tokio::spawn(async move { + let mut log_file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await + .expect("Failed to open log file"); + + let mut write_count = 0; + let mut last_fsync = tokio::time::Instant::now(); + + while let Some(cmd) = log_rx.recv().await { + match cmd { + LogCommand::Write(entry) => { + let json_line = serde_json::to_string(&entry).expect("Serialization failed"); + log_file.write_all(json_line.as_bytes()).await.expect("Write failed"); + log_file.write_all(b"\n").await.expect("Write failed"); + + write_count += 1; + + // Batched fsync + if write_count >= FSYNC_BATCH_SIZE + || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS + { + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + } + } + LogCommand::Checkpoint => { + // Force fsync on checkpoint + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + + let current = processed_count.load(Ordering::SeqCst); + logger::log_info(&format!( + " Checkpoint: {}/{} companies processed", + current, total_companies + )).await; + } + LogCommand::Shutdown => { + // Final fsync before shutdown + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + break; + } + } + } + }) + }; + + // Process companies concurrently with task panic isolation + let mut tasks = FuturesUnordered::new(); + let mut pending_iter = pending_companies.into_iter(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT)); + + // Initial batch of tasks + for _ in 0..CONCURRENCY_LIMIT.min(pending_count) { + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + ); + tasks.push(task); + } + } + + // Process results and spawn new tasks + let mut checkpoint_counter = enriched_companies.len(); + + while let Some(result) = tasks.next().await { + // Handle task result (even if panicked) + match result { + Ok(_) => { + // Task completed successfully + } + Err(e) => { + logger::log_warn(&format!("Task panicked: {}", e)).await; + failed_count.fetch_add(1, Ordering::SeqCst); + } + } + + // Check for shutdown + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected, stopping new enrichment tasks...").await; + break; + } + + // Spawn next task + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + ); + tasks.push(task); + } + + // Periodic checkpoint + checkpoint_counter += 1; + if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { + let _ = log_tx.send(LogCommand::Checkpoint).await; + } + } + + // Shutdown log writer + let _ = log_tx.send(LogCommand::Shutdown).await; + drop(log_tx); + + // Wait for log writer to finish + let _ = log_writer_handle.await; + + let final_processed = processed_count.load(Ordering::SeqCst); + let final_success = success_count.load(Ordering::SeqCst); + let final_failed = failed_count.load(Ordering::SeqCst); + + logger::log_info(&format!( + " Event enrichment summary: {} total, {} success, {} failed", + final_processed, final_success, final_failed + )).await; + + // Mark as complete if all companies processed + if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) { + mark_enrichment_complete(&state_path).await?; + logger::log_info(" ✓ Event enrichment marked as complete").await; + } + + Ok(final_success) +} + +/// Spawn a single enrichment task with panic isolation +fn spawn_enrichment_task( + company: CompanyCrossPlatformInfo, + yahoo_pool: Arc, + paths: DataPaths, + processed_count: Arc, + success_count: Arc, + failed_count: Arc, + log_tx: mpsc::Sender, + semaphore: Arc, + shutdown_flag: Arc, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + // Acquire semaphore permit + let _permit = semaphore.acquire().await.expect("Semaphore closed"); + + // Check shutdown before processing + if shutdown_flag.load(Ordering::SeqCst) { + return; + } + + // Process company + let result = enrich_company_with_events( + &company, + &yahoo_pool, + &paths, + ).await; + + // Update counters + processed_count.fetch_add(1, Ordering::SeqCst); + + let status = match result { + Ok(_) => { + success_count.fetch_add(1, Ordering::SeqCst); + "enriched" + } + Err(e) => { + failed_count.fetch_add(1, Ordering::SeqCst); + logger::log_warn(&format!( + " Failed to enrich {}: {}", + company.name, e + )).await; + "failed" + } + }; + + // Log result + let log_entry = json!({ + "company_name": company.name, + "status": status, + "timestamp": Utc::now().to_rfc3339(), + }); + + let _ = log_tx.send(LogCommand::Write(log_entry)).await; + }) +} + +/// Enrich a single company with event data +async fn enrich_company_with_events( + company: &CompanyCrossPlatformInfo, + yahoo_pool: &Arc, + paths: &DataPaths, +) -> anyhow::Result<()> { + use std::collections::HashMap; + + let ticker = match extract_first_yahoo_ticker(company) { + Some(t) => t, + None => { + return Err(anyhow::anyhow!("No valid Yahoo ticker found")); + } + }; + + // Combined summary to accumulate data from all available modules + let mut combined_modules: HashMap = HashMap::new(); + let timestamp = chrono::Utc::now().timestamp(); + + // Try each event module individually + let event_modules = QuoteSummaryModule::event_modules(); + + for module in event_modules { + match yahoo_pool.get_quote_summary(&ticker, &[module]).await { + Ok(summary) => { + // Merge this module's data into combined summary + for (key, value) in summary.modules { + combined_modules.insert(key, value); + } + } + Err(e) => { + // Module not available - silently continue for expected errors + let err_str = e.to_string(); + if err_str.contains("500") || err_str.contains("404") || err_str.contains("Not Found") { + // Expected for securities without this data - continue silently + continue; + } else { + // Unexpected error - log but continue trying other modules + logger::log_warn(&format!( + " Unexpected error fetching event module for {}: {}", + ticker, e + )).await; + } + } + } + } + + // Only save if we got at least some data + if combined_modules.is_empty() { + return Err(anyhow::anyhow!("No event data available for any module")); + } + + // Create combined summary with all available modules + let combined_summary = crate::scraper::yahoo::QuoteSummary { + symbol: ticker.clone(), + modules: combined_modules, + timestamp, + }; + + // Save the combined event data + save_company_event_data(paths, &company.name, &combined_summary).await?; + + Ok(()) +} + +/// Save event data to company directory +async fn save_company_event_data( + paths: &DataPaths, + company_name: &str, + summary: &crate::scraper::yahoo::QuoteSummary, +) -> anyhow::Result<()> { + use tokio::fs; + + let safe_name = sanitize_company_name(company_name); + + let company_dir = paths.corporate_dir().join(&safe_name).join("events"); + fs::create_dir_all(&company_dir).await?; + + let data_path = company_dir.join("data.jsonl"); + let json_line = serde_json::to_string(summary)?; + + let mut file = fs::File::create(&data_path).await?; + file.write_all(json_line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + file.sync_all().await?; // Ensure data is persisted + + Ok(()) +} + +/// Extract first valid Yahoo ticker from company +fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { + for tickers in company.isin_tickers_map.values() { + for ticker in tickers { + if ticker.starts_with("YAHOO:") + && ticker != "YAHOO:NO_RESULTS" + && ticker != "YAHOO:ERROR" + { + return Some(ticker.trim_start_matches("YAHOO:").to_string()); + } + } + } + None +} + +/// Sanitize company name for file system +fn sanitize_company_name(name: &str) -> String { + name.replace("/", "_") + .replace("\\", "_") + .replace(":", "_") + .replace("*", "_") + .replace("?", "_") + .replace("\"", "_") + .replace("<", "_") + .replace(">", "_") + .replace("|", "_") +} + +/// Load companies from JSONL file +async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result> { + let content = tokio::fs::read_to_string(path).await?; + let mut companies = Vec::new(); + + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + if let Ok(company) = serde_json::from_str::(line) { + companies.push(company); + } + } + + Ok(companies) +} + +/// Count enriched companies (companies with event data) +async fn count_enriched_companies(paths: &DataPaths) -> anyhow::Result { + let corporate_dir = paths.corporate_dir(); + + if !corporate_dir.exists() { + return Ok(0); + } + + let mut count = 0; + let mut entries = tokio::fs::read_dir(&corporate_dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + let events_dir = path.join("events"); + let events_file = events_dir.join("data.jsonl"); + + if events_file.exists() { + count += 1; + } + } + } + + Ok(count) +} + +/// Mark enrichment as complete in state file +async fn mark_enrichment_complete(state_path: &std::path::Path) -> anyhow::Result<()> { + let enrichment_complete = json!({ + "yahoo_events_enrichment_complete": true, + "completed_at": Utc::now().to_rfc3339(), + }); + + let mut state_file = OpenOptions::new() + .create(true) + .append(true) + .open(state_path) + .await?; + + let state_line = serde_json::to_string(&enrichment_complete)?; + state_file.write_all(state_line.as_bytes()).await?; + state_file.write_all(b"\n").await?; + state_file.flush().await?; + state_file.sync_all().await?; + + Ok(()) +} + +/// Log command enum +enum LogCommand { + Write(serde_json::Value), + Checkpoint, + Shutdown, +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index a52cb96..4dee183 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ // src/main.rs - FIXED: Proper temp pool cleanup -use web_scraper::{*, scraper, economic, corporate}; +use web_scraper::{*, scraper, corporate}; use anyhow::Result; use web_scraper::config::Config; diff --git a/src/scraper/yahoo.rs b/src/scraper/yahoo.rs index 5b790ea..8f7cf0f 100644 --- a/src/scraper/yahoo.rs +++ b/src/scraper/yahoo.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use tokio::sync::{Mutex, Semaphore, RwLock}; -use tokio::time::sleep; +use tokio::time::{sleep, timeout}; use reqwest::{Client, ClientBuilder}; use reqwest::Proxy as ReqwestProxy; use std::result::Result::Ok; @@ -154,9 +154,11 @@ impl QuoteSummaryModule { pub fn event_modules() -> Vec { vec![ - Self::FinancialData, Self::CalendarEvents, Self::SecFilings, + Self::Earnings, + Self::EarningsHistory, + Self::UpgradeDowngradeHistory, ] } @@ -277,8 +279,8 @@ impl YahooClient { let client = ClientBuilder::new() .proxy(proxy) - .timeout(Duration::from_secs(90)) - .connect_timeout(Duration::from_secs(30)) + .timeout(Duration::from_secs(30)) // CHANGED: Reduced from 90s to 30s + .connect_timeout(Duration::from_secs(10)) // CHANGED: Reduced from 30s to 10s .pool_max_idle_per_host(2) .pool_idle_timeout(Duration::from_secs(60)) .cookie_store(true) @@ -314,49 +316,85 @@ impl YahooClient { is_marked_for_replacement: Arc::new(AtomicBool::new(false)), }; - // Initialize crumb - yahoo_client.retry_initialize_crumb(&yahoo_client).await?; - + // NEW: Quick proxy health check before crumb initialization logger::log_info(&format!( - " ✓ YahooClient[{}] initialized (max_tasks: {})", - client_id, max_tasks_per_client + " YahooClient[{}] testing proxy connectivity...", + client_id )).await; - - Ok(yahoo_client) - } - - async fn retry_initialize_crumb(&self, client: &YahooClient) -> Result<()> { - let mut last_err = None; - for attempt in 1..=3 { - match client.initialize_crumb().await { - Ok(()) => return Ok(()), - Err(e) => { - let error_str = e.to_string(); - last_err = Some(e); - - // If it's a permanent error, don't retry - if error_str.contains("Invalid Cookie") && attempt >= 2 { - logger::log_error(&format!( - " YahooClient[{}] permanent cookie error, not retrying", - self.client_id - )).await; - break; - } - - if attempt < 3 { - let delay_ms = 300 * attempt; // Exponential backoff - logger::log_info(&format!( - " YahooClient[{}] crumb attempt {}/3 failed, retrying in {}ms: {}", - self.client_id, attempt, delay_ms, error_str - )).await; - sleep(Duration::from_millis(delay_ms)).await; - } - } + match timeout( + Duration::from_secs(5), + yahoo_client.client.get("https://finance.yahoo.com").send() + ).await { + Ok(Ok(_)) => { + logger::log_info(&format!( + " ✓ YahooClient[{}] proxy is responsive", + client_id + )).await; + } + Ok(Err(e)) => { + return Err(anyhow!( + "Proxy connection failed for YahooClient[{}]: {}", + client_id, e + )); + } + Err(_) => { + return Err(anyhow!( + "Proxy connection timeout for YahooClient[{}]", + client_id + )); + } + } + + // Initialize crumb with timeout wrapper + match timeout( + Duration::from_secs(30), // NEW: 30 second timeout for entire initialization + yahoo_client.try_initialize_crumb(&yahoo_client) + ).await { + Ok(Ok(())) => { + logger::log_info(&format!( + " ✓ YahooClient[{}] initialized (max_tasks: {})", + client_id, max_tasks_per_client + )).await; + Ok(yahoo_client) + } + Ok(Err(e)) => { + Err(anyhow!( + "Failed to initialize YahooClient[{}]: {}", + client_id, e + )) + } + Err(_) => { + Err(anyhow!( + "Timeout initializing YahooClient[{}] (>30s)", + client_id + )) + } + } + } + + async fn try_initialize_crumb(&self, client: &YahooClient) -> Result<()> { + match client.initialize_crumb().await { + Ok(()) => return Ok(()), + Err(e) => { + let error_str: String = e.to_string(); + + // If it's a permanent error, don't retry + if error_str.contains("Invalid Cookie") { + logger::log_error(&format!( + " YahooClient[{}] permanent cookie error, not retrying", + self.client_id + )).await; + } + + logger::log_info(&format!( + " YahooClient[{}] crumb attempt failed {}", + self.client_id, error_str + )).await; + + Err(anyhow!("Failed fetching crumb: {}", error_str)) } } - - Err(last_err.unwrap_or_else(|| anyhow!("Max retries exceeded"))) } // Update the crumb fetching in the initialize_crumb function @@ -366,15 +404,7 @@ impl YahooClient { self.client_id )).await; - // Step 1: Make multiple requests to establish a proper session - let mut cookies_established = false; - - logger::log_info(&format!( - " YahooClient[{}] Session establishment", - self.client_id - )).await; - - // Try different Yahoo domains to get valid cookies + // Step 1: Establish session with timeout let yahoo_domains = [ "https://finance.yahoo.com", "https://www.yahoo.com", @@ -382,41 +412,75 @@ impl YahooClient { ]; for domain in yahoo_domains.iter() { - let _ = self.client - .get(*domain) - .header("User-Agent", Self::random_user_agent()) - .header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8") - .header("Accept-Language", "en-US,en;q=0.9") - .header("Accept-Encoding", "gzip, deflate") - .header("DNT", "1") - .header("Connection", "keep-alive") - .header("Upgrade-Insecure-Requests", "1") - .send() - .await - .ok(); + // NEW: Add timeout and better error handling + match timeout( + Duration::from_secs(5), + self.client + .get(*domain) + .header("User-Agent", Self::random_user_agent()) + .header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8") + .header("Accept-Language", "en-US,en;q=0.9") + .header("Accept-Encoding", "gzip, deflate") + .header("DNT", "1") + .header("Connection", "keep-alive") + .header("Upgrade-Insecure-Requests", "1") + .send() + ).await { + Ok(Ok(response)) if response.status().is_success() => { + logger::log_info(&format!( + " ✓ YahooClient[{}] session established via {}", + self.client_id, domain + )).await; + } + Ok(Ok(response)) => { + logger::log_warn(&format!( + " ⚠ YahooClient[{}] session request to {} returned {}", + self.client_id, domain, response.status() + )).await; + } + Ok(Err(e)) => { + logger::log_warn(&format!( + " ⚠ YahooClient[{}] failed to connect to {}: {}", + self.client_id, domain, e + )).await; + } + Err(_) => { + logger::log_warn(&format!( + " ⚠ YahooClient[{}] timeout connecting to {}", + self.client_id, domain + )).await; + } + } sleep(Duration::from_millis(500)).await; } - // Step 2: Now try to fetch the crumb with enhanced headers - let crumb_response = self.client - .get(YAHOO_CRUMB_URL) - .header("User-Agent", Self::random_user_agent()) - .header("Accept", "*/*") - .header("Accept-Language", "en-US,en;q=0.9") - .header("Accept-Encoding", "gzip, deflate, br") - .header("Referer", "https://finance.yahoo.com/") - .header("Origin", "https://finance.yahoo.com") - .header("Sec-Fetch-Dest", "empty") - .header("Sec-Fetch-Mode", "cors") - .header("Sec-Fetch-Site", "same-site") - .header("Pragma", "no-cache") - .header("Cache-Control", "no-cache") - .send() - .await; + // Step 2: Fetch crumb with timeout + logger::log_info(&format!( + " YahooClient[{}] requesting crumb...", + self.client_id + )).await; - match crumb_response { - Ok(response) if response.status().is_success() => { + let crumb_result = timeout( + Duration::from_secs(30), // NEW: 30 second timeout for crumb fetch + self.client + .get(YAHOO_CRUMB_URL) + .header("User-Agent", Self::random_user_agent()) + .header("Accept", "*/*") + .header("Accept-Language", "en-US,en;q=0.9") + .header("Accept-Encoding", "gzip, deflate, br") + .header("Referer", "https://finance.yahoo.com/") + .header("Origin", "https://finance.yahoo.com") + .header("Sec-Fetch-Dest", "empty") + .header("Sec-Fetch-Mode", "cors") + .header("Sec-Fetch-Site", "same-site") + .header("Pragma", "no-cache") + .header("Cache-Control", "no-cache") + .send() + ).await; + + match crumb_result { + Ok(Ok(response)) if response.status().is_success() => { let crumb_text = response.text().await?; let crumb = crumb_text.trim().to_string(); @@ -434,38 +498,35 @@ impl YahooClient { let mut refresh_guard = self.crumb_last_refresh.lock().await; *refresh_guard = Some(Instant::now()); - cookies_established = true; + return Ok(()); } else { - logger::log_warn(&format!( - " YahooClient[{}] got invalid crumb format", - self.client_id - )).await; + return Err(anyhow!( + "YahooClient[{}] got invalid crumb format: '{}'", + self.client_id, crumb + )); } } - Ok(response) => { + Ok(Ok(response)) => { let status = response.status(); - let error_text = response.text().await?; - logger::log_warn(&format!( - " YahooClient[{}] failed: HTTP {}: {}", + let error_text = response.text().await.unwrap_or_default(); + return Err(anyhow!( + "YahooClient[{}] crumb fetch failed: HTTP {} - {}", self.client_id, status, &error_text[..error_text.len().min(100)] - )).await; + )); } - Err(e) => { - logger::log_warn(&format!( - " YahooClient[{}] connection error: {}", + Ok(Err(e)) => { + return Err(anyhow!( + "YahooClient[{}] crumb connection error: {}", self.client_id, e - )).await; + )); + } + Err(_) => { + return Err(anyhow!( + "YahooClient[{}] crumb fetch timeout (>10s)", + self.client_id + )); } } - - if !cookies_established { - return Err(anyhow!( - "Failed to initialize cookies/crumb for YahooClient[{}]", - self.client_id - )); - } - - Ok(()) } /// Get the current crumb (refresh if needed) @@ -528,7 +589,15 @@ impl YahooClient { } /// Record a failure - return true if client should be replaced - pub async fn record_failure(&self, error_type: &str) -> bool { + pub async fn record_failure(&self, error_str: &str) -> bool { + // Don't count 404 as failures - these are expected when data doesn't exist + if error_str.contains("404") || error_str.contains("Not Found") { + return false; // Don't mark for replacement + } + if error_str.contains("500") || error_str.contains("Not Found") { + return false; // Don't mark for replacement + } + let old_failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed); let new_failures = old_failures + 1; @@ -538,13 +607,13 @@ impl YahooClient { if should_replace { logger::log_warn(&format!( " 🚨 YahooClient[{}] marked for replacement ({} consecutive failures, last error: {})", - self.client_id, new_failures, error_type + self.client_id, new_failures, error_str )).await; self.is_marked_for_replacement.store(true, Ordering::Relaxed); } else { logger::log_warn(&format!( " ⚠ YahooClient[{}] failure {}/{}: {}", - self.client_id, new_failures, self.max_consecutive_failures, error_type + self.client_id, new_failures, self.max_consecutive_failures, error_str )).await; } @@ -1064,28 +1133,46 @@ impl YahooClientPool { actual_pool_size, if rotation_enabled { "enabled" } else { "disabled" } )).await; - - let mut clients = Vec::with_capacity(actual_pool_size); - + + logger::log_info(&format!( + "Initializing {} YahooClients in parallel...", + actual_pool_size + )).await; + + let mut handles = Vec::new(); + for i in 0..actual_pool_size { let proxy_url = proxy_pool.get_proxy_url(i); + let monitoring_clone = monitoring.clone(); - match YahooClient::new( - i, - proxy_url, - max_tasks_per_instance, - monitoring.clone(), - ).await { - Ok(client) => { + let handle = tokio::spawn(async move { + YahooClient::new(i, proxy_url, max_tasks_per_instance, monitoring_clone).await + }); + + handles.push(handle); + } + + // Wait for all to complete + let results = futures::future::join_all(handles).await; + + let mut clients = Vec::with_capacity(actual_pool_size); + for (i, result) in results.into_iter().enumerate() { + match result { + Ok(Ok(client)) => { clients.push(Arc::new(client)); logger::log_info(&format!(" ✓ Client {} ready", i)).await; } - Err(e) => { + Ok(Err(e)) => { logger::log_warn(&format!( " ✗ Failed to initialize Client {}: {} - skipping", i, e )).await; - // Continue with next client instead of failing entire pool + } + Err(e) => { + logger::log_warn(&format!( + " ✗ Client {} task panicked: {} - skipping", + i, e + )).await; } } }