diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 829a268..38648ea 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -8,7 +8,8 @@ pub mod aggregation; pub mod fx; pub mod openfigi; pub mod yahoo; -pub mod update_parallel; +pub mod update_companies; +pub mod update_companies_cleanse; pub mod page_validation; pub mod atomic_writer; diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 2940ea7..2fbcb63 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,59 +1,20 @@ -// src/corporate/update.rs - WITH ABORT-SAFE INCREMENTAL PERSISTENCE -use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, atomic_writer::*}; +// src/corporate/update.rs +use super::{scraper::*, storage::*, openfigi::*}; use crate::config::Config; -use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel; +use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel; +use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data}; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; -use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; +use crate::scraper::yahoo::{YahooClientPool}; use std::result::Result::Ok; -use chrono::{Local, Utc}; -use std::collections::HashMap; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use futures::stream::{FuturesUnordered, StreamExt}; -use serde_json::json; -use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; -use std::time::Duration; - -/// Result of processing a single company -#[derive(Debug, Clone)] -pub enum CompanyProcessResult { - Valid(CompanyCrossPlatformInfo), - FilteredLowCap { name: String, market_cap: f64 }, - FilteredNoPrice { name: String }, - Failed { company: CompanyCrossPlatformInfo, error: String, is_transient: bool }, -} - -/// Represents a write command to be serialized through the log writer -enum LogCommand { - Write(CompanyCrossPlatformInfo), - Checkpoint, - Shutdown, -} - -/// Result from processing a single company with priority -struct CompanyTaskResult { - company: CompanyCrossPlatformInfo, - result: CompanyProcessResult, -} - -/// Check if a company needs processing (validation check) -fn company_needs_processing( - company: &CompanyCrossPlatformInfo, - existing_companies: &HashMap, -) -> bool { - // If company exists in cleaned output, skip it - !existing_companies.contains_key(&company.name) -} +use std::sync::atomic::{AtomicBool, Ordering}; /// Main corporate update entry point with shutdown awareness pub async fn run_full_update( - _config: &Config, + config: &Config, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result<()> { @@ -118,7 +79,7 @@ pub async fn run_full_update( } logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; - let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, _config, &None).await?; + let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, config, &None).await?; logger::log_info(&format!(" ✓ Saved {} companies", count)).await; if shutdown_flag.load(Ordering::SeqCst) { @@ -138,8 +99,12 @@ pub async fn run_full_update( logger::log_info("Step 7: Cleansing up companies with too low profile (with abort-safe persistence)...").await; let proxy_pool = pool.get_proxy_pool() .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?; - - let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, _config, proxy_pool, shutdown_flag).await?; + + logger::log_info("Creating YahooClientPool with proxy rotation...").await; + let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?); + logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await; + + let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool, shutdown_flag).await?; logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await; if !shutdown_flag.load(Ordering::SeqCst) { @@ -154,886 +119,6 @@ pub async fn run_full_update( Ok(()) } -/// Cleansing function to remove companies with missing essential yahoo data for integrity -pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result { - let data_path = paths.data_dir(); - - let input_path = data_path.join("companies.jsonl"); - let output_path = data_path.join("companies_yahoo.jsonl"); - let state_path = data_path.join("state.jsonl"); - - if !input_path.exists() { - logger::log_warn("companies.jsonl not found, skipping cleansing").await; - return Ok(0); - } - - if state_path.exists() { - let state_content = tokio::fs::read_to_string(&state_path).await?; - - for line in state_content.lines() { - if line.trim().is_empty() { - continue; - } - - if let Ok(state) = serde_json::from_str::(line) { - if state.get("yahoo_companies_cleansed_no_data").and_then(|v| v.as_bool()).unwrap_or(false) { - logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await; - - if output_path.exists() { - let output_content = tokio::fs::read_to_string(&output_path).await?; - let count = output_content.lines() - .filter(|line| !line.trim().is_empty()) - .count(); - - logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await; - return Ok(count); - } else { - logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await; - break; - } - } - } - } - } - - logger::log_info(&format!(" Reading from: {:?}", input_path)).await; - logger::log_info(&format!(" Writing to: {:?}", output_path)).await; - - let file = File::open(&input_path).await?; - let reader = BufReader::new(file); - let mut lines = reader.lines(); - - let mut output_file = File::create(&output_path).await?; - let mut valid_count = 0; - let mut removed_count = 0; - let mut total_count = 0; - - while let Some(line) = lines.next_line().await? { - if line.trim().is_empty() { - continue; - } - - total_count += 1; - - let company: CompanyCrossPlatformInfo = match serde_json::from_str(&line) { - Ok(c) => c, - Err(e) => { - logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await; - continue; - } - }; - - let has_valid_yahoo = company.isin_tickers_map - .values() - .flatten() - .any(|ticker| { - ticker.starts_with("YAHOO:") - && ticker != "YAHOO:NO_RESULTS" - && ticker != "YAHOO:ERROR" - }); - - if has_valid_yahoo { - let json_line = serde_json::to_string(&company)?; - output_file.write_all(json_line.as_bytes()).await?; - output_file.write_all(b"\n").await?; - valid_count += 1; - } else { - removed_count += 1; - if removed_count <= 5 { - logger::log_info(&format!(" Removed company '{}' (no valid Yahoo ticker)", company.name)).await; - } - } - - if total_count % 1000 == 0 { - logger::log_info(&format!(" Processed {} companies...", total_count)).await; - } - } - - output_file.flush().await?; - - logger::log_info(&format!( - " ✓ Cleansing complete: {} total → {} valid, {} removed", - total_count, valid_count, removed_count - )).await; - - let yahoo_companies = json!({ - "yahoo_companies_cleansed_no_data": true, - "completed_at": chrono::Utc::now().to_rfc3339(), - }); - - let mut state_file = File::create(&state_path).await?; - let state_line = serde_json::to_string(&yahoo_companies)?; - state_file.write_all(state_line.as_bytes()).await?; - state_file.write_all(b"\n").await?; - state_file.flush().await?; - - logger::log_info(&format!(" ✓ State file created at: {:?}", state_path)).await; - - Ok(valid_count) -} - -/// Yahoo Low Profile Cleansing WITH ABORT-SAFE INCREMENTAL PERSISTENCE -/// -/// # Features -/// - ✅ Graceful shutdown (abort-safe) -/// - ✅ Task panic isolation (tasks fail independently) -/// - ✅ Crash-safe persistence (checkpoint + log with fsync) -/// - ✅ Smart skip logic (only process incomplete data) -/// - Uses pending queue instead of retry mechanism -/// - Reuses companies_update.log for persistence -/// -/// # Persistence Strategy -/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) -/// - Log: companies_update.log (append-only updates) -/// - On restart: Load checkpoint + replay log -/// - Periodic checkpoints (every 50 companies) -/// - Batched fsync (every 10 writes or 10 seconds) -pub async fn companies_yahoo_cleansed_low_profile( - paths: &DataPaths, - config: &Config, - proxy_pool: Arc, - shutdown_flag: &Arc, -) -> anyhow::Result { - // Configuration constants - const CHECKPOINT_INTERVAL: usize = 50; - const FSYNC_BATCH_SIZE: usize = 10; - const FSYNC_INTERVAL_SECS: u64 = 10; - const CONCURRENCY_LIMIT: usize = 50; // Limit parallel validation tasks - - let data_path = paths.data_dir(); - - // File paths (reusing companies_update.log) - let input_path = data_path.join("companies_yahoo.jsonl"); - let checkpoint_path = data_path.join("companies_yahoo_cleaned.jsonl"); - let log_path = data_path.join("companies_update.log"); - - // Check input exists - if !input_path.exists() { - logger::log_warn(" companies_yahoo.jsonl not found, skipping low profile cleansing").await; - return Ok(0); - } - - // === RECOVERY PHASE: Load checkpoint + replay log === - let mut existing_companies: HashMap = HashMap::new(); - let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); - - if checkpoint_path.exists() { - logger::log_info("Loading checkpoint from companies_yahoo_cleaned.jsonl...").await; - let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?; - - for line in checkpoint_content.lines() { - if line.trim().is_empty() || !line.ends_with('}') { - continue; // Skip incomplete lines - } - - match serde_json::from_str::(line) { - Ok(company) => { - processed_names.insert(company.name.clone()); - existing_companies.insert(company.name.clone(), company); - } - Err(e) => { - logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; - } - } - } - logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; - } - - if log_path.exists() { - logger::log_info("Replaying update log...").await; - let log_content = tokio::fs::read_to_string(&log_path).await?; - let mut replayed = 0; - - for line in log_content.lines() { - if line.trim().is_empty() || !line.ends_with('}') { - continue; // Skip incomplete lines - } - - match serde_json::from_str::(line) { - Ok(company) => { - processed_names.insert(company.name.clone()); - existing_companies.insert(company.name.clone(), company); - replayed += 1; - } - Err(e) => { - logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; - } - } - } - if replayed > 0 { - logger::log_info(&format!("Replayed {} updates from log", replayed)).await; - } - } - - // === LOAD INPUT COMPANIES === - logger::log_info(&format!("Loading companies from: {:?}", input_path)).await; - let input_companies = load_companies_from_jsonl(&input_path).await?; - logger::log_info(&format!("Loaded {} companies from input", input_companies.len())).await; - - // === BUILD PENDING LIST (smart skip logic) === - let mut pending: Vec = input_companies - .into_iter() - .filter(|company| company_needs_processing(company, &existing_companies)) - .collect(); - - logger::log_info(&format!( - "Initial scan: {} companies need processing ({} already complete)", - pending.len(), - existing_companies.len() - )).await; - - if pending.is_empty() { - logger::log_info(" ✓ All companies already processed").await; - return Ok(existing_companies.len()); - } - - // === SETUP LOG WRITER TASK === - let (write_tx, mut write_rx) = mpsc::channel::(1000); - - let log_file_init = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path) - .await?; - - let checkpoint_path_clone = checkpoint_path.clone(); - let log_path_clone = log_path.clone(); - let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); - let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer); - - let write_tx_for_writer = write_tx.clone(); - let writer_task = tokio::spawn(async move { - let mut log_file = log_file_init; - let mut writes_since_fsync = 0; - let mut last_fsync = std::time::Instant::now(); - let mut updates_since_checkpoint = 0; - let mut count = 0; - let mut new_count = 0; - let mut updated_count = 0; - - while let Some(cmd) = write_rx.recv().await { - match cmd { - LogCommand::Write(company) => { - // Write to log - let line = serde_json::to_string(&company).unwrap(); - if let Err(e) = log_file.write_all(line.as_bytes()).await { - logger::log_error(&format!("Failed to write to log: {}", e)).await; - break; - } - if let Err(e) = log_file.write_all(b"\n").await { - logger::log_error(&format!("Failed to write newline: {}", e)).await; - break; - } - - writes_since_fsync += 1; - updates_since_checkpoint += 1; - count += 1; - - // Update in-memory state - let mut existing_companies = existing_companies_writer_for_task.lock().await; - let is_update = existing_companies.contains_key(&company.name); - existing_companies.insert(company.name.clone(), company); - drop(existing_companies); - - if is_update { - updated_count += 1; - } else { - new_count += 1; - } - - // Batched + time-based fsync - let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE - || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; - - if should_fsync { - if let Err(e) = log_file.flush().await { - logger::log_error(&format!("Failed to flush: {}", e)).await; - break; - } - if let Err(e) = log_file.sync_data().await { - logger::log_error(&format!("Failed to fsync: {}", e)).await; - break; - } - writes_since_fsync = 0; - last_fsync = std::time::Instant::now(); - } - } - LogCommand::Checkpoint => { - if let Err(e) = log_file.flush().await { - logger::log_error(&format!("Failed to flush before checkpoint: {}", e)).await; - break; - } - if let Err(e) = log_file.sync_data().await { - logger::log_error(&format!("Failed to fsync before checkpoint: {}", e)).await; - break; - } - - let existing_companies = existing_companies_writer_for_task.lock().await; - let companies_vec: Vec<_> = existing_companies.values().cloned().collect(); - drop(existing_companies); - - let temp_path = checkpoint_path_clone.with_extension("tmp"); - match tokio::fs::File::create(&temp_path).await { - Ok(mut temp_file) => { - let mut checkpoint_ok = true; - for company in &companies_vec { - if let Ok(line) = serde_json::to_string(company) { - if temp_file.write_all(line.as_bytes()).await.is_err() || - temp_file.write_all(b"\n").await.is_err() { - checkpoint_ok = false; - break; - } - } - } - - if checkpoint_ok { - if temp_file.flush().await.is_ok() && - temp_file.sync_data().await.is_ok() { - drop(temp_file); - - if tokio::fs::rename(&temp_path, &checkpoint_path_clone).await.is_ok() { - if tokio::fs::remove_file(&log_path_clone).await.is_ok() { - logger::log_info(&format!( - "✓ Checkpoint created ({} companies), log cleared", - companies_vec.len() - )).await; - - if let Ok(new_log) = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path_clone) - .await { - log_file = new_log; - } - } - } - } - } - } - Err(e) => { - logger::log_error(&format!("Failed to create checkpoint temp file: {}", e)).await; - } - } - updates_since_checkpoint = 0; - } - LogCommand::Shutdown => { - logger::log_info("Writer shutting down...").await; - break; - } - } - - // Periodic checkpoint trigger - if updates_since_checkpoint >= CHECKPOINT_INTERVAL { - let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await; - } - } - - // Final fsync - let _ = log_file.flush().await; - let _ = log_file.sync_data().await; - - logger::log_info(&format!( - "Writer finished: {} total ({} new, {} updated)", - count, new_count, updated_count - )).await; - - (count, new_count, updated_count) - }); - - // === CREATE YAHOO CLIENT POOL === - logger::log_info("Creating YahooClientPool with proxy rotation...").await; - let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?); - logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await; - - // Wrap paths in Arc for safe sharing across tasks - let paths = Arc::new((*paths).clone()); - - // === MAIN PROCESSING LOOP WITH TASK PANIC ISOLATION === - let total = pending.len(); - let mut tasks = FuturesUnordered::new(); - - // Counters - let processed = Arc::new(AtomicUsize::new(0)); - let valid_count = Arc::new(AtomicUsize::new(0)); - let filtered_low_cap = Arc::new(AtomicUsize::new(0)); - let filtered_no_price = Arc::new(AtomicUsize::new(0)); - let failed_count = Arc::new(AtomicUsize::new(0)); - - // Spawn initial batch - for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { - if let Some(company) = pending.pop() { - spawn_validation_task( - company, - &yahoo_pool, - &paths, - &write_tx, - shutdown_flag, - &processed, - &valid_count, - &filtered_low_cap, - &filtered_no_price, - &failed_count, - total, - &mut tasks, - ); - } - } - - // Process results and spawn new tasks (with task panic isolation) - while let Some(task_result) = tasks.next().await { - // Check for shutdown - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown signal received, stopping processing").await; - break; - } - - match task_result { - Ok(Ok(Some(_result))) => { - // Success - spawn next task - if let Some(company) = pending.pop() { - spawn_validation_task( - company, - &yahoo_pool, - &paths, - &write_tx, - shutdown_flag, - &processed, - &valid_count, - &filtered_low_cap, - &filtered_no_price, - &failed_count, - total, - &mut tasks, - ); - } - } - Ok(Ok(None)) => { - // Filtered or failed - spawn next task - if let Some(company) = pending.pop() { - spawn_validation_task( - company, - &yahoo_pool, - &paths, - &write_tx, - shutdown_flag, - &processed, - &valid_count, - &filtered_low_cap, - &filtered_no_price, - &failed_count, - total, - &mut tasks, - ); - } - } - Ok(Err(e)) => { - // Processing error - logger::log_error(&format!("Company processing error: {}", e)).await; - - if let Some(company) = pending.pop() { - spawn_validation_task( - company, - &yahoo_pool, - &paths, - &write_tx, - shutdown_flag, - &processed, - &valid_count, - &filtered_low_cap, - &filtered_no_price, - &failed_count, - total, - &mut tasks, - ); - } - } - Err(e) => { - // Task panic (isolated - doesn't crash entire process) - logger::log_error(&format!("Task panic: {}", e)).await; - - if let Some(company) = pending.pop() { - spawn_validation_task( - company, - &yahoo_pool, - &paths, - &write_tx, - shutdown_flag, - &processed, - &valid_count, - &filtered_low_cap, - &filtered_no_price, - &failed_count, - total, - &mut tasks, - ); - } - } - } - } - - logger::log_info("Main processing loop completed").await; - - // Signal writer to finish - let _ = write_tx.send(LogCommand::Checkpoint).await; - let _ = write_tx.send(LogCommand::Shutdown).await; - drop(write_tx); - - // Wait for writer to finish - let (final_count, final_new, final_updated) = writer_task.await - .unwrap_or((0, 0, 0)); - - let final_valid = valid_count.load(Ordering::SeqCst); - let final_filtered_low_cap = filtered_low_cap.load(Ordering::SeqCst); - let final_filtered_no_price = filtered_no_price.load(Ordering::SeqCst); - let final_failed = failed_count.load(Ordering::SeqCst); - - logger::log_info(&format!( - "✅ Completed: {} total companies ({} new, {} updated)", - final_count, final_new, final_updated - )).await; - logger::log_info(&format!( - " Valid: {}, Filtered (low cap): {}, Filtered (no price): {}, Failed: {}", - final_valid, final_filtered_low_cap, final_filtered_no_price, final_failed - )).await; - - // Shutdown Yahoo pool - yahoo_pool.shutdown().await?; - - Ok(final_valid) -} - -/// Helper function to spawn a validation task (reduces code duplication) -fn spawn_validation_task( - company: CompanyCrossPlatformInfo, - yahoo_pool: &Arc, - paths: &Arc, - write_tx: &mpsc::Sender, - shutdown_flag: &Arc, - processed: &Arc, - valid_count: &Arc, - filtered_low_cap: &Arc, - filtered_no_price: &Arc, - failed_count: &Arc, - total: usize, - tasks: &mut FuturesUnordered>>>, -) { - let yahoo_pool_clone = Arc::clone(yahoo_pool); - let paths_clone = Arc::clone(paths); - let shutdown_flag_clone = Arc::clone(shutdown_flag); - let write_tx_clone = write_tx.clone(); - let processed_clone = Arc::clone(processed); - let valid_count_clone = Arc::clone(valid_count); - let filtered_low_cap_clone = Arc::clone(filtered_low_cap); - let filtered_no_price_clone = Arc::clone(filtered_no_price); - let failed_count_clone = Arc::clone(failed_count); - - let task = tokio::spawn(async move { - // Check shutdown at start - if shutdown_flag_clone.load(Ordering::SeqCst) { - return Ok::<_, anyhow::Error>(None); - } - - let result = process_company_with_validation( - &company, - &yahoo_pool_clone, - &*paths_clone, - ).await; - - let task_result = match result { - CompanyProcessResult::Valid(validated_company) => { - // Send to writer - let _ = write_tx_clone.send(LogCommand::Write(validated_company.clone())).await; - valid_count_clone.fetch_add(1, Ordering::SeqCst); - Some(CompanyTaskResult { - company: validated_company.clone(), - result: CompanyProcessResult::Valid(validated_company), - }) - } - CompanyProcessResult::FilteredLowCap { name, market_cap } => { - filtered_low_cap_clone.fetch_add(1, Ordering::SeqCst); - if filtered_low_cap_clone.load(Ordering::SeqCst) <= 10 { - logger::log_info(&format!(" Filtered {} - low market cap: {:.0} EUR", name, market_cap)).await; - } - None - } - CompanyProcessResult::FilteredNoPrice { name } => { - filtered_no_price_clone.fetch_add(1, Ordering::SeqCst); - if filtered_no_price_clone.load(Ordering::SeqCst) <= 10 { - logger::log_info(&format!(" Filtered {} - no recent price data", name)).await; - } - None - } - CompanyProcessResult::Failed { company: failed_company, error, is_transient: _ } => { - failed_count_clone.fetch_add(1, Ordering::SeqCst); - logger::log_warn(&format!(" Failed to process '{}': {}", failed_company.name, error)).await; - None - } - }; - - // Progress reporting - let current = processed_clone.fetch_add(1, Ordering::SeqCst) + 1; - if current % 100 == 0 { - logger::log_info(&format!( - "Progress: {}/{} ({} valid, {} low cap, {} no price, {} failed)", - current, total, - valid_count_clone.load(Ordering::SeqCst), - filtered_low_cap_clone.load(Ordering::SeqCst), - filtered_no_price_clone.load(Ordering::SeqCst), - failed_count_clone.load(Ordering::SeqCst) - )).await; - } - - Ok(task_result) - }); - - tasks.push(task); -} - -/// Process a single company with full error categorization -async fn process_company_with_validation( - company: &CompanyCrossPlatformInfo, - yahoo_pool: &Arc, - paths: &DataPaths, -) -> CompanyProcessResult { - // Extract Yahoo ticker - let ticker = match extract_first_yahoo_ticker(company) { - Some(t) => t, - None => { - return CompanyProcessResult::Failed { - company: company.clone(), - error: "No valid Yahoo ticker found".to_string(), - is_transient: false, // Permanent - no ticker means no data - }; - } - }; - - // Fetch core modules from Yahoo - let summary = match yahoo_pool.get_quote_summary( - &ticker, - &QuoteSummaryModule::core_modules(), - ).await { - Ok(s) => s, - Err(e) => { - let error_msg = e.to_string(); - let is_transient = is_transient_error(&error_msg); - - return CompanyProcessResult::Failed { - company: company.clone(), - error: format!("API error fetching summary: {}", error_msg), - is_transient, - }; - } - }; - - // Validate market cap - let market_cap = extract_market_cap(&summary); - if market_cap < 1_000_000.0 { - return CompanyProcessResult::FilteredLowCap { - name: company.name.clone(), - market_cap, - }; - } - - // Validate recent price activity - let has_recent_price = match check_recent_price_activity(yahoo_pool, &ticker).await { - Ok(has) => has, - Err(e) => { - let error_msg = e.to_string(); - let is_transient = is_transient_error(&error_msg); - - return CompanyProcessResult::Failed { - company: company.clone(), - error: format!("API error fetching price history: {}", error_msg), - is_transient, - }; - } - }; - - if !has_recent_price { - return CompanyProcessResult::FilteredNoPrice { - name: company.name.clone(), - }; - } - - // Save core data - if let Err(e) = save_company_core_data(paths, &company.name, &summary).await { - logger::log_warn(&format!( - " Failed to save core data for {}: {}", - company.name, e - )).await; - } - - CompanyProcessResult::Valid(company.clone()) -} - -/// Determine if an error is transient (should retry) or permanent (skip) -fn is_transient_error(error: &str) -> bool { - let error_lower = error.to_lowercase(); - - // Transient errors (network, rate limiting, timeouts) - let transient_patterns = [ - "timeout", - "timed out", - "connection", - "network", - "rate limit", - "too many requests", - "429", - "503", - "502", - "500", - "temporarily", - "unavailable", - ]; - - for pattern in &transient_patterns { - if error_lower.contains(pattern) { - return true; - } - } - - // Permanent errors (invalid ticker, no data, parsing errors) - let permanent_patterns = [ - "404", - "not found", - "invalid", - "no data", - "parse error", - "400", - "401", - "403", - ]; - - for pattern in &permanent_patterns { - if error_lower.contains(pattern) { - return false; - } - } - - // Default: treat unknown errors as transient (safer to retry) - true -} - -/// Load companies from JSONL file -async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result> { - let content = tokio::fs::read_to_string(path).await?; - let mut companies = Vec::new(); - - for line in content.lines() { - if line.trim().is_empty() { - continue; - } - if let Ok(company) = serde_json::from_str::(line) { - companies.push(company); - } - } - - Ok(companies) -} - -fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { - for tickers in company.isin_tickers_map.values() { - for ticker in tickers { - if ticker.starts_with("YAHOO:") - && ticker != "YAHOO:NO_RESULTS" - && ticker != "YAHOO:ERROR" - { - return Some(ticker.trim_start_matches("YAHOO:").to_string()); - } - } - } - None -} - -fn extract_market_cap(summary: &crate::scraper::yahoo::QuoteSummary) -> f64 { - let price_module = match summary.modules.get("price") { - Some(m) => m, - None => return 0.0, - }; - - let market_cap_raw = price_module - .get("marketCap") - .and_then(|v| v.get("raw")) - .and_then(|v| v.as_f64()) - .unwrap_or(0.0); - - let currency = price_module - .get("currency") - .and_then(|v| v.as_str()) - .unwrap_or("USD"); - - let market_cap_eur = match currency { - "EUR" => market_cap_raw, - "USD" => market_cap_raw * 0.92, - "GBP" => market_cap_raw * 1.17, - "JPY" => market_cap_raw * 0.0061, - "CHF" => market_cap_raw * 1.05, - _ => market_cap_raw * 0.92, - }; - - market_cap_eur -} - -async fn check_recent_price_activity( - yahoo_pool: &Arc, - ticker: &str, -) -> anyhow::Result { - let now = Utc::now().timestamp(); - let one_year_ago = now - (365 * 24 * 60 * 60); - let sixty_days_ago = now - (60 * 24 * 60 * 60); - - let chart_data = yahoo_pool.get_chart_data( - ticker, - "1d", - sixty_days_ago, - now, - ).await?; - - if chart_data.quotes.is_empty() { - return Ok(false); - } - - let most_recent_timestamp = chart_data.quotes - .iter() - .map(|q| q.timestamp) - .max() - .unwrap_or(0); - - Ok(most_recent_timestamp >= one_year_ago) -} - -async fn save_company_core_data( - paths: &DataPaths, - company_name: &str, - summary: &crate::scraper::yahoo::QuoteSummary, -) -> anyhow::Result<()> { - use tokio::fs; - - let safe_name = company_name - .replace("/", "_") - .replace("\\", "_") - .replace(":", "_") - .replace("*", "_") - .replace("?", "_") - .replace("\"", "_") - .replace("<", "_") - .replace(">", "_") - .replace("|", "_"); - - let company_dir = paths.corporate_dir().join(&safe_name).join("core"); - fs::create_dir_all(&company_dir).await?; - - let data_path = company_dir.join("data.jsonl"); - let json_line = serde_json::to_string(summary)?; - - let mut file = fs::File::create(&data_path).await?; - file.write_all(json_line.as_bytes()).await?; - file.write_all(b"\n").await?; - file.flush().await?; - - Ok(()) -} - async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); @@ -1061,53 +146,4 @@ async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result, -} - -pub fn process_batch( - new_events: &[CompanyEvent], - existing: &mut HashMap, - today: &str, -) -> ProcessResult { - let mut changes = Vec::new(); - - for new in new_events { - let key = event_key(new); - - if let Some(old) = existing.get(&key) { - changes.extend(detect_changes(old, new, today)); - existing.insert(key, new.clone()); - continue; - } - - let date_key = format!("{}|{}", new.ticker, new.date); - let mut found_old = None; - for (k, e) in existing.iter() { - if format!("{}|{}", e.ticker, e.date) == date_key && k != &key { - found_old = Some((k.clone(), e.clone())); - break; - } - } - - if let Some((old_key, old_event)) = found_old { - if new.date.as_str() > today { - changes.push(CompanyEventChange { - ticker: new.ticker.clone(), - date: new.date.clone(), - field_changed: "time".to_string(), - old_value: old_event.time.clone(), - new_value: new.time.clone(), - detected_at: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), - }); - } - existing.remove(&old_key); - } - - existing.insert(key, new.clone()); - } - - ProcessResult { changes } } \ No newline at end of file diff --git a/src/corporate/update_parallel.rs b/src/corporate/update_companies.rs similarity index 99% rename from src/corporate/update_parallel.rs rename to src/corporate/update_companies.rs index 184ab34..5dd5e38 100644 --- a/src/corporate/update_parallel.rs +++ b/src/corporate/update_companies.rs @@ -1,8 +1,4 @@ -// src/corporate/update_parallel.rs - PROPERLY FIXED: Correct pending queue rebuild -// -// Critical fix: After hard reset, only skip companies with COMPLETE Yahoo data -// Not just companies that have been written - +// src/corporate/update_companies.rs use super::{types::*, yahoo::*, helpers::*}; use crate::util::directories::DataPaths; use crate::util::logger; diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs new file mode 100644 index 0000000..6e2afe2 --- /dev/null +++ b/src/corporate/update_companies_cleanse.rs @@ -0,0 +1,972 @@ +// src/corporate/update_companies_cleanse.rs +use super::{helpers::*, types::*}; +use crate::config::Config; +use crate::util::directories::DataPaths; +use crate::util::logger; +use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; + +use std::result::Result::Ok; +use chrono::{Local, Utc}; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use futures::stream::{FuturesUnordered, StreamExt}; +use serde_json::json; +use tokio::sync::mpsc; + +/// Result of processing a single company +#[derive(Debug, Clone)] +pub enum CompanyProcessResult { + Valid(CompanyCrossPlatformInfo), + FilteredLowCap { name: String, market_cap: f64 }, + FilteredNoPrice { name: String }, + Failed { company: CompanyCrossPlatformInfo, error: String, is_transient: bool }, +} + +/// Represents a write command to be serialized through the log writer +enum LogCommand { + Write(CompanyCrossPlatformInfo), + Checkpoint, + Shutdown, +} + +/// Result from processing a single company with priority +struct CompanyTaskResult { + company: CompanyCrossPlatformInfo, + result: CompanyProcessResult, +} + +/// Cleansing function to remove companies with missing essential yahoo data for integrity +pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result { + let data_path = paths.data_dir(); + + let input_path = data_path.join("companies.jsonl"); + let output_path = data_path.join("companies_yahoo.jsonl"); + let state_path = data_path.join("state.jsonl"); + + if !input_path.exists() { + logger::log_warn("companies.jsonl not found, skipping cleansing").await; + return Ok(0); + } + + if state_path.exists() { + let state_content = tokio::fs::read_to_string(&state_path).await?; + + for line in state_content.lines() { + if line.trim().is_empty() { + continue; + } + + if let Ok(state) = serde_json::from_str::(line) { + if state.get("yahoo_companies_cleansed_no_data").and_then(|v| v.as_bool()).unwrap_or(false) { + logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await; + + if output_path.exists() { + let output_content = tokio::fs::read_to_string(&output_path).await?; + let count = output_content.lines() + .filter(|line| !line.trim().is_empty()) + .count(); + + logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await; + return Ok(count); + } else { + logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await; + break; + } + } + } + } + } + + logger::log_info(&format!(" Reading from: {:?}", input_path)).await; + logger::log_info(&format!(" Writing to: {:?}", output_path)).await; + + let file = File::open(&input_path).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + let mut output_file = File::create(&output_path).await?; + let mut valid_count = 0; + let mut removed_count = 0; + let mut total_count = 0; + + while let Some(line) = lines.next_line().await? { + if line.trim().is_empty() { + continue; + } + + total_count += 1; + + let company: CompanyCrossPlatformInfo = match serde_json::from_str(&line) { + Ok(c) => c, + Err(e) => { + logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await; + continue; + } + }; + + let has_valid_yahoo = company.isin_tickers_map + .values() + .flatten() + .any(|ticker| { + ticker.starts_with("YAHOO:") + && ticker != "YAHOO:NO_RESULTS" + && ticker != "YAHOO:ERROR" + }); + + if has_valid_yahoo { + let json_line = serde_json::to_string(&company)?; + output_file.write_all(json_line.as_bytes()).await?; + output_file.write_all(b"\n").await?; + valid_count += 1; + } else { + removed_count += 1; + if removed_count <= 5 { + logger::log_info(&format!(" Removed company '{}' (no valid Yahoo ticker)", company.name)).await; + } + } + + if total_count % 1000 == 0 { + logger::log_info(&format!(" Processed {} companies...", total_count)).await; + } + } + + output_file.flush().await?; + + logger::log_info(&format!( + " ✓ Cleansing complete: {} total → {} valid, {} removed", + total_count, valid_count, removed_count + )).await; + + let yahoo_companies = json!({ + "yahoo_companies_cleansed_no_data": true, + "completed_at": chrono::Utc::now().to_rfc3339(), + }); + + let mut state_file = File::create(&state_path).await?; + let state_line = serde_json::to_string(&yahoo_companies)?; + state_file.write_all(state_line.as_bytes()).await?; + state_file.write_all(b"\n").await?; + state_file.flush().await?; + + logger::log_info(&format!(" ✓ State file created at: {:?}", state_path)).await; + + Ok(valid_count) +} + +/// Yahoo Low Profile Cleansing WITH ABORT-SAFE INCREMENTAL PERSISTENCE +/// +/// # Features +/// - Graceful shutdown (abort-safe) +/// - Task panic isolation (tasks fail independently) +/// - Crash-safe persistence (checkpoint + log with fsync) +/// - Smart skip logic (only process incomplete data) +/// - Uses pending queue instead of retry mechanism +/// - Reuses companies_update.log for persistence +/// +/// # Persistence Strategy +/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) +/// - Log: companies_update.log (append-only updates) +/// - On restart: Load checkpoint + replay log +/// - Periodic checkpoints (every 50 companies) +/// - Batched fsync (every 10 writes or 10 seconds) +pub async fn companies_yahoo_cleansed_low_profile( + paths: &DataPaths, + config: &Config, + yahoo_pool: Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 50; + const FSYNC_BATCH_SIZE: usize = 10; + const FSYNC_INTERVAL_SECS: u64 = 10; + const CONCURRENCY_LIMIT: usize = 50; // Limit parallel validation tasks + + let data_path = paths.data_dir(); + + // File paths (reusing companies_update.log) + let input_path = data_path.join("companies_yahoo.jsonl"); + let checkpoint_path = data_path.join("companies_yahoo_cleaned.jsonl"); + let log_path = data_path.join("companies_updates.log"); + + // Check input exists + if !input_path.exists() { + logger::log_warn(" companies_yahoo.jsonl not found, skipping low profile cleansing").await; + return Ok(0); + } + + // === RECOVERY PHASE: Load checkpoint + replay log === + let mut existing_companies: HashMap = HashMap::new(); + let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); + + if checkpoint_path.exists() { + logger::log_info("Loading checkpoint from companies_yahoo_cleaned.jsonl...").await; + let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?; + + for line in checkpoint_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(company) => { + processed_names.insert(company.name.clone()); + existing_companies.insert(company.name.clone(), company); + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; + } + } + } + logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; + } + + if log_path.exists() { + logger::log_info("Replaying update log...").await; + let log_content = tokio::fs::read_to_string(&log_path).await?; + let mut replayed = 0; + + for line in log_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(company) => { + processed_names.insert(company.name.clone()); + existing_companies.insert(company.name.clone(), company); + replayed += 1; + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; + } + } + } + if replayed > 0 { + logger::log_info(&format!("Replayed {} updates from log", replayed)).await; + } + } + + // === LOAD INPUT COMPANIES === + logger::log_info(&format!("Loading companies from: {:?}", input_path)).await; + let input_companies = load_companies_from_jsonl(&input_path).await?; + logger::log_info(&format!("Loaded {} companies from input", input_companies.len())).await; + + // === BUILD PENDING LIST (smart skip logic) === + let mut pending: Vec = input_companies + .into_iter() + .filter(|company| company_needs_processing(company, &existing_companies)) + .collect(); + + logger::log_info(&format!( + "Initial scan: {} companies need processing ({} already complete)", + pending.len(), + existing_companies.len() + )).await; + + if pending.is_empty() { + logger::log_info(" ✓ All companies already processed").await; + return Ok(existing_companies.len()); + } + + // === SETUP LOG WRITER TASK === + let (write_tx, mut write_rx) = mpsc::channel::(1000); + + let log_file_init = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await?; + + let checkpoint_path_clone = checkpoint_path.clone(); + let log_path_clone = log_path.clone(); + let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); + let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer); + + let write_tx_for_writer = write_tx.clone(); + let writer_task = tokio::spawn(async move { + let mut log_file = log_file_init; + let mut writes_since_fsync = 0; + let mut last_fsync = std::time::Instant::now(); + let mut updates_since_checkpoint = 0; + let mut count = 0; + let mut new_count = 0; + let mut updated_count = 0; + + while let Some(cmd) = write_rx.recv().await { + match cmd { + LogCommand::Write(company) => { + // Write to log + let line = serde_json::to_string(&company).unwrap(); + if let Err(e) = log_file.write_all(line.as_bytes()).await { + logger::log_error(&format!("Failed to write to log: {}", e)).await; + break; + } + if let Err(e) = log_file.write_all(b"\n").await { + logger::log_error(&format!("Failed to write newline: {}", e)).await; + break; + } + + writes_since_fsync += 1; + updates_since_checkpoint += 1; + count += 1; + + // Update in-memory state + let mut existing_companies = existing_companies_writer_for_task.lock().await; + let is_update = existing_companies.contains_key(&company.name); + existing_companies.insert(company.name.clone(), company); + drop(existing_companies); + + if is_update { + updated_count += 1; + } else { + new_count += 1; + } + + // Batched + time-based fsync + let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE + || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; + + if should_fsync { + if let Err(e) = log_file.flush().await { + logger::log_error(&format!("Failed to flush: {}", e)).await; + break; + } + if let Err(e) = log_file.sync_data().await { + logger::log_error(&format!("Failed to fsync: {}", e)).await; + break; + } + writes_since_fsync = 0; + last_fsync = std::time::Instant::now(); + } + } + LogCommand::Checkpoint => { + if let Err(e) = log_file.flush().await { + logger::log_error(&format!("Failed to flush before checkpoint: {}", e)).await; + break; + } + if let Err(e) = log_file.sync_data().await { + logger::log_error(&format!("Failed to fsync before checkpoint: {}", e)).await; + break; + } + + let existing_companies = existing_companies_writer_for_task.lock().await; + let companies_vec: Vec<_> = existing_companies.values().cloned().collect(); + drop(existing_companies); + + let temp_path = checkpoint_path_clone.with_extension("tmp"); + match tokio::fs::File::create(&temp_path).await { + Ok(mut temp_file) => { + let mut checkpoint_ok = true; + for company in &companies_vec { + if let Ok(line) = serde_json::to_string(company) { + if temp_file.write_all(line.as_bytes()).await.is_err() || + temp_file.write_all(b"\n").await.is_err() { + checkpoint_ok = false; + break; + } + } + } + + if checkpoint_ok { + if temp_file.flush().await.is_ok() && + temp_file.sync_data().await.is_ok() { + drop(temp_file); + + if tokio::fs::rename(&temp_path, &checkpoint_path_clone).await.is_ok() { + if tokio::fs::remove_file(&log_path_clone).await.is_ok() { + logger::log_info(&format!( + "✓ Checkpoint created ({} companies), log cleared", + companies_vec.len() + )).await; + + if let Ok(new_log) = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path_clone) + .await { + log_file = new_log; + } + } + } + } + } + } + Err(e) => { + logger::log_error(&format!("Failed to create checkpoint temp file: {}", e)).await; + } + } + updates_since_checkpoint = 0; + } + LogCommand::Shutdown => { + logger::log_info("Writer shutting down...").await; + break; + } + } + + // Periodic checkpoint trigger + if updates_since_checkpoint >= CHECKPOINT_INTERVAL { + let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await; + } + } + + // Final fsync + let _ = log_file.flush().await; + let _ = log_file.sync_data().await; + + logger::log_info(&format!( + "Writer finished: {} total ({} new, {} updated)", + count, new_count, updated_count + )).await; + + (count, new_count, updated_count) + }); + + // Wrap paths in Arc for safe sharing across tasks + let paths = Arc::new((*paths).clone()); + + // === MAIN PROCESSING LOOP WITH TASK PANIC ISOLATION === + let total = pending.len(); + let mut tasks = FuturesUnordered::new(); + + // Counters + let processed = Arc::new(AtomicUsize::new(0)); + let valid_count = Arc::new(AtomicUsize::new(0)); + let filtered_low_cap = Arc::new(AtomicUsize::new(0)); + let filtered_no_price = Arc::new(AtomicUsize::new(0)); + let failed_count = Arc::new(AtomicUsize::new(0)); + + // Spawn initial batch + for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + + // Process results and spawn new tasks (with task panic isolation) + while let Some(task_result) = tasks.next().await { + // Check for shutdown + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown signal received, stopping processing").await; + break; + } + + match task_result { + Ok(Ok(Some(_result))) => { + // Success - spawn next task + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + Ok(Ok(None)) => { + // Filtered or failed - spawn next task + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + Ok(Err(e)) => { + // Processing error + logger::log_error(&format!("Company processing error: {}", e)).await; + + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + Err(e) => { + // Task panic (isolated - doesn't crash entire process) + logger::log_error(&format!("Task panic: {}", e)).await; + + if let Some(company) = pending.pop() { + spawn_validation_task( + company, + &yahoo_pool, + &paths, + &write_tx, + shutdown_flag, + &processed, + &valid_count, + &filtered_low_cap, + &filtered_no_price, + &failed_count, + total, + &mut tasks, + ); + } + } + } + } + + logger::log_info("Main processing loop completed").await; + + // Signal writer to finish + let _ = write_tx.send(LogCommand::Checkpoint).await; + let _ = write_tx.send(LogCommand::Shutdown).await; + drop(write_tx); + + // Wait for writer to finish + let (final_count, final_new, final_updated) = writer_task.await + .unwrap_or((0, 0, 0)); + + let final_valid = valid_count.load(Ordering::SeqCst); + let final_filtered_low_cap = filtered_low_cap.load(Ordering::SeqCst); + let final_filtered_no_price = filtered_no_price.load(Ordering::SeqCst); + let final_failed = failed_count.load(Ordering::SeqCst); + + logger::log_info(&format!( + "✅ Completed: {} total companies ({} new, {} updated)", + final_count, final_new, final_updated + )).await; + logger::log_info(&format!( + " Valid: {}, Filtered (low cap): {}, Filtered (no price): {}, Failed: {}", + final_valid, final_filtered_low_cap, final_filtered_no_price, final_failed + )).await; + + // Shutdown Yahoo pool + yahoo_pool.shutdown().await?; + + Ok(final_valid) +} + +/// Helper function to spawn a validation task (reduces code duplication) +fn spawn_validation_task( + company: CompanyCrossPlatformInfo, + yahoo_pool: &Arc, + paths: &Arc, + write_tx: &mpsc::Sender, + shutdown_flag: &Arc, + processed: &Arc, + valid_count: &Arc, + filtered_low_cap: &Arc, + filtered_no_price: &Arc, + failed_count: &Arc, + total: usize, + tasks: &mut FuturesUnordered>>>, +) { + let yahoo_pool_clone = Arc::clone(yahoo_pool); + let paths_clone = Arc::clone(paths); + let shutdown_flag_clone = Arc::clone(shutdown_flag); + let write_tx_clone = write_tx.clone(); + let processed_clone = Arc::clone(processed); + let valid_count_clone = Arc::clone(valid_count); + let filtered_low_cap_clone = Arc::clone(filtered_low_cap); + let filtered_no_price_clone = Arc::clone(filtered_no_price); + let failed_count_clone = Arc::clone(failed_count); + + let task = tokio::spawn(async move { + // Check shutdown at start + if shutdown_flag_clone.load(Ordering::SeqCst) { + return Ok::<_, anyhow::Error>(None); + } + + let result = process_company_with_validation( + &company, + &yahoo_pool_clone, + &*paths_clone, + ).await; + + let task_result = match result { + CompanyProcessResult::Valid(validated_company) => { + // Send to writer + let _ = write_tx_clone.send(LogCommand::Write(validated_company.clone())).await; + valid_count_clone.fetch_add(1, Ordering::SeqCst); + Some(CompanyTaskResult { + company: validated_company.clone(), + result: CompanyProcessResult::Valid(validated_company), + }) + } + CompanyProcessResult::FilteredLowCap { name, market_cap } => { + filtered_low_cap_clone.fetch_add(1, Ordering::SeqCst); + if filtered_low_cap_clone.load(Ordering::SeqCst) <= 10 { + logger::log_info(&format!(" Filtered {} - low market cap: {:.0} EUR", name, market_cap)).await; + } + None + } + CompanyProcessResult::FilteredNoPrice { name } => { + filtered_no_price_clone.fetch_add(1, Ordering::SeqCst); + if filtered_no_price_clone.load(Ordering::SeqCst) <= 10 { + logger::log_info(&format!(" Filtered {} - no recent price data", name)).await; + } + None + } + CompanyProcessResult::Failed { company: failed_company, error, is_transient: _ } => { + failed_count_clone.fetch_add(1, Ordering::SeqCst); + logger::log_warn(&format!(" Failed to process '{}': {}", failed_company.name, error)).await; + None + } + }; + + // Progress reporting + let current = processed_clone.fetch_add(1, Ordering::SeqCst) + 1; + if current % 100 == 0 { + logger::log_info(&format!( + "Progress: {}/{} ({} valid, {} low cap, {} no price, {} failed)", + current, total, + valid_count_clone.load(Ordering::SeqCst), + filtered_low_cap_clone.load(Ordering::SeqCst), + filtered_no_price_clone.load(Ordering::SeqCst), + failed_count_clone.load(Ordering::SeqCst) + )).await; + } + + Ok(task_result) + }); + + tasks.push(task); +} + +/// Process a single company with full error categorization +async fn process_company_with_validation( + company: &CompanyCrossPlatformInfo, + yahoo_pool: &Arc, + paths: &DataPaths, +) -> CompanyProcessResult { + // Extract Yahoo ticker + let ticker = match extract_first_yahoo_ticker(company) { + Some(t) => t, + None => { + return CompanyProcessResult::Failed { + company: company.clone(), + error: "No valid Yahoo ticker found".to_string(), + is_transient: false, // Permanent - no ticker means no data + }; + } + }; + + // Fetch core modules from Yahoo + let summary = match yahoo_pool.get_quote_summary( + &ticker, + &QuoteSummaryModule::core_modules(), + ).await { + Ok(s) => s, + Err(e) => { + let error_msg = e.to_string(); + let is_transient = is_transient_error(&error_msg); + + return CompanyProcessResult::Failed { + company: company.clone(), + error: format!("API error fetching summary: {}", error_msg), + is_transient, + }; + } + }; + + // Validate market cap + let market_cap = extract_market_cap(&summary); + if market_cap < 1_000_000.0 { + return CompanyProcessResult::FilteredLowCap { + name: company.name.clone(), + market_cap, + }; + } + + // Validate recent price activity + let has_recent_price = match check_recent_price_activity(yahoo_pool, &ticker).await { + Ok(has) => has, + Err(e) => { + let error_msg = e.to_string(); + let is_transient = is_transient_error(&error_msg); + + return CompanyProcessResult::Failed { + company: company.clone(), + error: format!("API error fetching price history: {}", error_msg), + is_transient, + }; + } + }; + + if !has_recent_price { + return CompanyProcessResult::FilteredNoPrice { + name: company.name.clone(), + }; + } + + // Save core data + if let Err(e) = save_company_core_data(paths, &company.name, &summary).await { + logger::log_warn(&format!( + " Failed to save core data for {}: {}", + company.name, e + )).await; + } + + CompanyProcessResult::Valid(company.clone()) +} + +/// Determine if an error is transient (should retry) or permanent (skip) +fn is_transient_error(error: &str) -> bool { + let error_lower = error.to_lowercase(); + + // Transient errors (network, rate limiting, timeouts) + let transient_patterns = [ + "timeout", + "timed out", + "connection", + "network", + "rate limit", + "too many requests", + "429", + "503", + "502", + "500", + "temporarily", + "unavailable", + ]; + + for pattern in &transient_patterns { + if error_lower.contains(pattern) { + return true; + } + } + + // Permanent errors (invalid ticker, no data, parsing errors) + let permanent_patterns = [ + "404", + "not found", + "invalid", + "no data", + "parse error", + "400", + "401", + "403", + ]; + + for pattern in &permanent_patterns { + if error_lower.contains(pattern) { + return false; + } + } + + // Default: treat unknown errors as transient (safer to retry) + true +} + +/// Load companies from JSONL file +async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result> { + let content = tokio::fs::read_to_string(path).await?; + let mut companies = Vec::new(); + + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + if let Ok(company) = serde_json::from_str::(line) { + companies.push(company); + } + } + + Ok(companies) +} + +fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { + for tickers in company.isin_tickers_map.values() { + for ticker in tickers { + if ticker.starts_with("YAHOO:") + && ticker != "YAHOO:NO_RESULTS" + && ticker != "YAHOO:ERROR" + { + return Some(ticker.trim_start_matches("YAHOO:").to_string()); + } + } + } + None +} + +fn extract_market_cap(summary: &crate::scraper::yahoo::QuoteSummary) -> f64 { + let price_module = match summary.modules.get("price") { + Some(m) => m, + None => return 0.0, + }; + + let market_cap_raw = price_module + .get("marketCap") + .and_then(|v| v.get("raw")) + .and_then(|v| v.as_f64()) + .unwrap_or(0.0); + + let currency = price_module + .get("currency") + .and_then(|v| v.as_str()) + .unwrap_or("USD"); + + let market_cap_eur = match currency { + "EUR" => market_cap_raw, + "USD" => market_cap_raw * 0.92, + "GBP" => market_cap_raw * 1.17, + "JPY" => market_cap_raw * 0.0061, + "CHF" => market_cap_raw * 1.05, + _ => market_cap_raw * 0.92, + }; + + market_cap_eur +} + +async fn check_recent_price_activity( + yahoo_pool: &Arc, + ticker: &str, +) -> anyhow::Result { + let now = Utc::now().timestamp(); + let one_year_ago = now - (365 * 24 * 60 * 60); + let sixty_days_ago = now - (60 * 24 * 60 * 60); + + let chart_data = yahoo_pool.get_chart_data( + ticker, + "1d", + sixty_days_ago, + now, + ).await?; + + if chart_data.quotes.is_empty() { + return Ok(false); + } + + let most_recent_timestamp = chart_data.quotes + .iter() + .map(|q| q.timestamp) + .max() + .unwrap_or(0); + + Ok(most_recent_timestamp >= one_year_ago) +} + +async fn save_company_core_data( + paths: &DataPaths, + company_name: &str, + summary: &crate::scraper::yahoo::QuoteSummary, +) -> anyhow::Result<()> { + use tokio::fs; + + let safe_name = company_name + .replace("/", "_") + .replace("\\", "_") + .replace(":", "_") + .replace("*", "_") + .replace("?", "_") + .replace("\"", "_") + .replace("<", "_") + .replace(">", "_") + .replace("|", "_"); + + let company_dir = paths.corporate_dir().join(&safe_name).join("core"); + fs::create_dir_all(&company_dir).await?; + + let data_path = company_dir.join("data.jsonl"); + let json_line = serde_json::to_string(summary)?; + + let mut file = fs::File::create(&data_path).await?; + file.write_all(json_line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + + Ok(()) +} + +pub struct ProcessResult { + pub changes: Vec, +} + +pub fn process_batch( + new_events: &[CompanyEvent], + existing: &mut HashMap, + today: &str, +) -> ProcessResult { + let mut changes = Vec::new(); + + for new in new_events { + let key = event_key(new); + + if let Some(old) = existing.get(&key) { + changes.extend(detect_changes(old, new, today)); + existing.insert(key, new.clone()); + continue; + } + + let date_key = format!("{}|{}", new.ticker, new.date); + let mut found_old = None; + for (k, e) in existing.iter() { + if format!("{}|{}", e.ticker, e.date) == date_key && k != &key { + found_old = Some((k.clone(), e.clone())); + break; + } + } + + if let Some((old_key, old_event)) = found_old { + if new.date.as_str() > today { + changes.push(CompanyEventChange { + ticker: new.ticker.clone(), + date: new.date.clone(), + field_changed: "time".to_string(), + old_value: old_event.time.clone(), + new_value: new.time.clone(), + detected_at: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), + }); + } + existing.remove(&old_key); + } + + existing.insert(key, new.clone()); + } + + ProcessResult { changes } +} + +/// Check if a company needs processing (validation check) +fn company_needs_processing( + company: &CompanyCrossPlatformInfo, + existing_companies: &HashMap, +) -> bool { + // If company exists in cleaned output, skip it + !existing_companies.contains_key(&company.name) +} \ No newline at end of file diff --git a/src/corporate/update_companies_enrich.rs b/src/corporate/update_companies_enrich.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/corporate/yahoo.rs b/src/corporate/yahoo.rs index fa82531..9acb454 100644 --- a/src/corporate/yahoo.rs +++ b/src/corporate/yahoo.rs @@ -1,4 +1,4 @@ -// src/corporate/yahoo.rs - UPDATED WITH DATA INTEGRITY FIXES +// src/corporate/yahoo.rs use super::{types::*, helpers::*, page_validation::*}; use crate::{scraper::webdriver::*, util::{directories::DataPaths}}; use crate::logger; diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index ca69469..04e1eda 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -605,9 +605,6 @@ pub struct ChromeInstance { instance_id: usize, monitoring: Option, - - // ✅ NEW: Track Chrome browser PID for proper cleanup - chrome_pid: Arc>>, } impl ChromeInstance { @@ -637,7 +634,6 @@ impl ChromeInstance { instance_id, monitoring, - chrome_pid: Arc::new(Mutex::new(None)), }) } diff --git a/src/scraper/yahoo.rs b/src/scraper/yahoo.rs index 2756cca..5b790ea 100644 --- a/src/scraper/yahoo.rs +++ b/src/scraper/yahoo.rs @@ -11,7 +11,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use tokio::sync::{Mutex, Semaphore, RwLock}; use tokio::time::sleep; -use reqwest::{Client, ClientBuilder, retry}; +use reqwest::{Client, ClientBuilder}; use reqwest::Proxy as ReqwestProxy; use std::result::Result::Ok; @@ -20,7 +20,7 @@ const YAHOO_QUOTE_SUMMARY: &str = "https://query2.finance.yahoo.com/v10/finance/ const YAHOO_CHART_DATA: &str = "https://query2.finance.yahoo.com/v8/finance/chart/"; const YAHOO_OPTIONS_DATA: &str = "https://query1.finance.yahoo.com/v7/finance/options/"; const YAHOO_SEARCH: &str = "https://query2.finance.yahoo.com/v1/finance/search"; -const YAHOO_HOMEPAGE: &str = "https://finance.yahoo.com"; +const _YAHOO_HOMEPAGE: &str = "https://finance.yahoo.com"; const YAHOO_CRUMB_URL: &str = "https://query2.finance.yahoo.com/v1/test/getcrumb"; // QuoteSummary Modules @@ -129,6 +129,48 @@ impl QuoteSummaryModule { ] } + pub fn financial_modules() -> Vec { + vec![ + Self::IncomeStatementHistory, + Self::IncomeStatementHistoryQuarterly, + Self::BalanceSheetHistory, + Self::BalanceSheetHistoryQuarterly, + Self::CashflowStatementHistory, + Self::CashflowStatementHistoryQuarterly, + ] + } + + pub fn ownership_modules() -> Vec { + vec![ + Self::InstitutionOwnership, + Self::FundOwnership, + Self::MajorDirectHolders, + Self::MajorHoldersBreakdown, + Self::InsiderTransactions, + Self::InsiderHolders, + Self::NetSharePurchaseActivity, + ] + } + + pub fn event_modules() -> Vec { + vec![ + Self::FinancialData, + Self::CalendarEvents, + Self::SecFilings, + ] + } + + pub fn sentiment_modules() -> Vec { + vec![ + Self::EarningsTrend, + Self::IndustryTrend, + Self::IndexTrend, + Self::SectorTrend, + Self::RecommendationTrend, + Self::UpgradeDowngradeHistory, + ] + } + pub fn core_modules() -> Vec { vec![ Self::Price, @@ -324,44 +366,52 @@ impl YahooClient { self.client_id )).await; + // Step 1: Make multiple requests to establish a proper session let mut cookies_established = false; - - // Step 1: Visit Yahoo Finance homepage to establish session and get cookies - let homepage_response = self.client - .get(YAHOO_HOMEPAGE) - .header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8") - .header("Accept-Language", "en-US,en;q=0.9") - .header("Accept-Encoding", "gzip, deflate, br") - .header("Connection", "keep-alive") - .header("Upgrade-Insecure-Requests", "1") - .header("Sec-Fetch-Dest", "document") - .header("Sec-Fetch-Mode", "navigate") - .header("Sec-Fetch-Site", "none") - .header("Sec-Fetch-User", "?1") - .header("Cache-Control", "max-age=0") - .send() - .await - .context("Failed to fetch Yahoo Finance homepage")?; - - if !homepage_response.status().is_success() { - return Err(anyhow!( - "Failed to load Yahoo Finance homepage: HTTP {}", - homepage_response.status() - )); + + logger::log_info(&format!( + " YahooClient[{}] Session establishment", + self.client_id + )).await; + + // Try different Yahoo domains to get valid cookies + let yahoo_domains = [ + "https://finance.yahoo.com", + "https://www.yahoo.com", + "https://login.yahoo.com", + ]; + + for domain in yahoo_domains.iter() { + let _ = self.client + .get(*domain) + .header("User-Agent", Self::random_user_agent()) + .header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8") + .header("Accept-Language", "en-US,en;q=0.9") + .header("Accept-Encoding", "gzip, deflate") + .header("DNT", "1") + .header("Connection", "keep-alive") + .header("Upgrade-Insecure-Requests", "1") + .send() + .await + .ok(); + + sleep(Duration::from_millis(500)).await; } // Step 2: Now try to fetch the crumb with enhanced headers let crumb_response = self.client .get(YAHOO_CRUMB_URL) + .header("User-Agent", Self::random_user_agent()) .header("Accept", "*/*") .header("Accept-Language", "en-US,en;q=0.9") .header("Accept-Encoding", "gzip, deflate, br") - .header("Connection", "keep-alive") .header("Referer", "https://finance.yahoo.com/") - .header("Sec-Fetch-Dest", "script") - .header("Sec-Fetch-Mode", "no-cors") - .header("Sec-Fetch-Site", "same-site") .header("Origin", "https://finance.yahoo.com") + .header("Sec-Fetch-Dest", "empty") + .header("Sec-Fetch-Mode", "cors") + .header("Sec-Fetch-Site", "same-site") + .header("Pragma", "no-cache") + .header("Cache-Control", "no-cache") .send() .await; @@ -387,7 +437,7 @@ impl YahooClient { cookies_established = true; } else { logger::log_warn(&format!( - " YahooClient[{}] got invalid crumb format", + " YahooClient[{}] got invalid crumb format", self.client_id )).await; } @@ -396,13 +446,13 @@ impl YahooClient { let status = response.status(); let error_text = response.text().await?; logger::log_warn(&format!( - " YahooClient[{}] failed: HTTP {}: {}", + " YahooClient[{}] failed: HTTP {}: {}", self.client_id, status, &error_text[..error_text.len().min(100)] )).await; } Err(e) => { logger::log_warn(&format!( - " YahooClient[{}] connection error: {}", + " YahooClient[{}] connection error: {}", self.client_id, e )).await; } @@ -410,7 +460,7 @@ impl YahooClient { if !cookies_established { return Err(anyhow!( - "Failed to initialize cookies/crumb for YahooClient[{}] after all attempts", + "Failed to initialize cookies/crumb for YahooClient[{}]", self.client_id )); } @@ -1020,23 +1070,45 @@ impl YahooClientPool { for i in 0..actual_pool_size { let proxy_url = proxy_pool.get_proxy_url(i); - let client = YahooClient::new( + match YahooClient::new( i, proxy_url, max_tasks_per_instance, monitoring.clone(), - ).await?; - - clients.push(Arc::new(client)); - - logger::log_info(&format!(" Client {} ready", i + 1)).await; + ).await { + Ok(client) => { + clients.push(Arc::new(client)); + logger::log_info(&format!(" ✓ Client {} ready", i)).await; + } + Err(e) => { + logger::log_warn(&format!( + " ✗ Failed to initialize Client {}: {} - skipping", + i, e + )).await; + // Continue with next client instead of failing entire pool + } + } + } + + if clients.is_empty() { + return Err(anyhow!( + "Failed to initialize any YahooClients - all {} proxy connections failed", + actual_pool_size + )); + } + + if clients.len() < actual_pool_size { + logger::log_warn(&format!( + "⚠ Pool initialized with {}/{} clients (some connections failed)", + clients.len(), actual_pool_size + )).await; + } else { + logger::log_info(&format!( + "✓ YahooClientPool initialized with {} clients (active: {})", + clients.len(), + if rotation_enabled { (clients.len() + 1) / 2 } else { clients.len() } + )).await; } - - logger::log_info(&format!( - "✓ YahooClientPool initialized with {} clients (active: {})", - actual_pool_size, - half_size - )).await; Ok(Self { proxy_pool, diff --git a/src/util/logger.rs b/src/util/logger.rs index 1142519..54f085c 100644 --- a/src/util/logger.rs +++ b/src/util/logger.rs @@ -112,7 +112,6 @@ pub struct PoolMetrics { pub rotation_events: Arc, pub retries: Arc, - // IMPROVEMENT: Neue Metriken für besseres Monitoring pub navigation_timeouts: Arc, pub bot_detection_hits: Arc, pub proxy_failures: Arc,