// src/corporate/update_companies_cleanse.rs use super::{helpers::*, types::*}; use crate::config::Config; use crate::corporate::checkpoint_helpers; use crate::util::directories::DataPaths; use crate::util::integrity::{DataStage, StateManager, file_reference}; use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; use std::result::Result::Ok; use chrono::{Utc}; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::sync::mpsc; /// Result of processing a single company #[derive(Debug, Clone)] pub enum CompanyProcessResult { Valid(CompanyData), FilteredLowCap { name: String, market_cap: f64 }, FilteredNoPrice { name: String }, Failed { company: CompanyData, error: String, is_transient: bool }, } /// Represents a write command to be serialized through the log writer enum LogCommand { Write(CompanyData), Checkpoint, Shutdown, } /// Cleansing function to remove companies with missing essential yahoo data for integrity pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result { let data_path = paths.data_dir(); let input_path = data_path.join("companies.jsonl"); let output_path = data_path.join("companies_yahoo.jsonl"); if !input_path.exists() { logger::log_warn("companies.jsonl not found, skipping cleansing").await; return Ok(0); } let manager = StateManager::new(paths.integrity_dir()).await?; let step_name = "yahoo_companies_cleansed_no_data"; let content_reference = file_reference(&output_path); if manager.is_step_valid(step_name).await? { let output_content = tokio::fs::read_to_string(&output_path).await?; let count = output_content.lines() .filter(|line| !line.trim().is_empty()) .count(); logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await; return Ok(count); } logger::log_info(" Cleansing companies with missing Yahoo data...").await; logger::log_info(&format!(" Reading from: {:?}", input_path)).await; logger::log_info(&format!(" Writing to: {:?}", output_path)).await; let file = File::open(&input_path).await?; let reader = BufReader::new(file); let mut lines = reader.lines(); let mut output_file = File::create(&output_path).await?; let mut valid_count = 0; let mut removed_count = 0; let mut total_count = 0; while let Some(line) = lines.next_line().await? { if line.trim().is_empty() { continue; } total_count += 1; let company: CompanyData = match serde_json::from_str(&line) { Ok(c) => c, Err(e) => { logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await; continue; } }; let has_valid_yahoo = company.isin_tickers_map .as_ref() .map(|map| { map.values() .flatten() .any(|ticker| { ticker.starts_with("YAHOO:") && ticker != "YAHOO:NO_RESULTS" && ticker != "YAHOO:ERROR" }) }) .unwrap_or(false); if has_valid_yahoo { let json_line = serde_json::to_string(&company)?; output_file.write_all(json_line.as_bytes()).await?; output_file.write_all(b"\n").await?; valid_count += 1; } else { removed_count += 1; if removed_count <= 5 { logger::log_info(&format!(" Removed company '{}' (no valid Yahoo ticker)", company.name)).await; } } if total_count % 1000 == 0 { logger::log_info(&format!(" Processed {} companies...", total_count)).await; } } output_file.flush().await?; logger::log_info(&format!( " ✓ Cleansing complete: {} total → {} valid, {} removed", total_count, valid_count, removed_count )).await; // Track completion with: // - Content reference: All event directories // - Data stage: Data (7-day TTL by default) // - Dependencies: Depends on cleaned companies data manager.update_entry( step_name.to_string(), content_reference, DataStage::Data, None, // Use default TTL (7 days for Data stage) ).await?; Ok(valid_count) } /// Yahoo Low Profile Cleansing WITH ABORT-SAFE INCREMENTAL PERSISTENCE /// /// # Features /// - Graceful shutdown (abort-safe) /// - Task panic isolation (tasks fail independently) /// - Crash-safe persistence (checkpoint + log with fsync) /// - Smart skip logic (only process incomplete data) /// - Uses pending queue instead of retry mechanism /// - Reuses companies_update.log for persistence /// /// # Persistence Strategy /// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) /// - Log: companies_update.log (append-only updates) /// - On restart: Load checkpoint + replay log /// - Periodic checkpoints (every 50 companies) /// - Batched fsync (every 10 writes or 10 seconds) pub async fn companies_yahoo_cleansed_low_profile( paths: &DataPaths, _config: &Config, yahoo_pool: Arc, shutdown_flag: &Arc, ) -> anyhow::Result { // Configuration constants const CHECKPOINT_INTERVAL: usize = 50; const FSYNC_BATCH_SIZE: usize = 10; const FSYNC_INTERVAL_SECS: u64 = 10; const CONCURRENCY_LIMIT: usize = 50; // Limit parallel validation tasks let data_path = paths.data_dir(); // File paths (reusing companies_update.log) let input_path = data_path.join("companies_yahoo.jsonl"); let checkpoint_path = data_path.join("companies_yahoo_cleaned.jsonl"); let log_path = data_path.join("companies_updates.log"); // Check input exists if !input_path.exists() { logger::log_warn(" companies_yahoo.jsonl not found, skipping low profile cleansing").await; return Ok(0); } let manager = StateManager::new(paths.integrity_dir()).await?; let step_name = "yahoo_companies_cleansed_low_profile"; let content_reference = file_reference(&checkpoint_path); if manager.is_step_valid(step_name).await? { let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?; let count = checkpoint_content.lines() .filter(|line| !line.trim().is_empty()) .count(); logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo_cleaned.jsonl", count)).await; return Ok(count); } logger::log_info(" Cleansing companies with low Yahoo profile...").await; // === RECOVERY PHASE: Load checkpoint + replay log === let mut existing_companies: HashMap = HashMap::new(); let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); if checkpoint_path.exists() { logger::log_info("Loading checkpoint from companies_yahoo_cleaned.jsonl...").await; let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?; for line in checkpoint_content.lines() { if line.trim().is_empty() || !line.ends_with('}') { continue; // Skip incomplete lines } match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); } Err(e) => { logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; } } } logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; } if log_path.exists() { logger::log_info("Replaying update log...").await; let log_content = tokio::fs::read_to_string(&log_path).await?; let mut replayed = 0; for line in log_content.lines() { if line.trim().is_empty() || !line.ends_with('}') { continue; // Skip incomplete lines } match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); replayed += 1; } Err(e) => { logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; } } } if replayed > 0 { logger::log_info(&format!("Replayed {} updates from log", replayed)).await; } } // === LOAD INPUT COMPANIES === logger::log_info(&format!("Loading companies from: {:?}", input_path)).await; let input_companies = load_companies_from_jsonl(&input_path).await?; logger::log_info(&format!("Loaded {} companies from input", input_companies.len())).await; // === BUILD PENDING LIST (smart skip logic) === let mut pending: Vec = input_companies .into_iter() .filter(|company| company_needs_processing(company, &existing_companies)) .collect(); logger::log_info(&format!( "Initial scan: {} companies need processing ({} already complete)", pending.len(), existing_companies.len() )).await; // === CONSOLIDATE LOG BEFORE EARLY EXIT === if pending.is_empty() { logger::log_info(" ✓ All companies already processed").await; // Consolidate log into checkpoint before exiting if checkpoint_helpers::log_has_content(&log_path).await { checkpoint_helpers::consolidate_checkpoint(&checkpoint_path, &log_path, &existing_companies).await?; } return Ok(existing_companies.len()); } // === SETUP LOG WRITER TASK === let (write_tx, mut write_rx) = mpsc::channel::(1000); let log_file_init = OpenOptions::new() .create(true) .append(true) .open(&log_path) .await?; let checkpoint_path_clone = checkpoint_path.clone(); let log_path_clone = log_path.clone(); let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer); let write_tx_for_writer = write_tx.clone(); let writer_task = tokio::spawn(async move { let mut log_file = log_file_init; let mut writes_since_fsync = 0; let mut last_fsync = std::time::Instant::now(); let mut updates_since_checkpoint = 0; let mut count = 0; let mut new_count = 0; let mut updated_count = 0; while let Some(cmd) = write_rx.recv().await { match cmd { LogCommand::Write(company) => { // Write to log let line = serde_json::to_string(&company).unwrap(); if let Err(e) = log_file.write_all(line.as_bytes()).await { logger::log_error(&format!("Failed to write to log: {}", e)).await; break; } if let Err(e) = log_file.write_all(b"\n").await { logger::log_error(&format!("Failed to write newline: {}", e)).await; break; } writes_since_fsync += 1; updates_since_checkpoint += 1; count += 1; // Update in-memory state let mut existing_companies = existing_companies_writer_for_task.lock().await; let is_update = existing_companies.contains_key(&company.name); existing_companies.insert(company.name.clone(), company); drop(existing_companies); if is_update { updated_count += 1; } else { new_count += 1; } // Batched + time-based fsync let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; if should_fsync { if let Err(e) = log_file.flush().await { logger::log_error(&format!("Failed to flush: {}", e)).await; break; } if let Err(e) = log_file.sync_data().await { logger::log_error(&format!("Failed to fsync: {}", e)).await; break; } writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } } LogCommand::Checkpoint => { if let Err(e) = log_file.flush().await { logger::log_error(&format!("Failed to flush before checkpoint: {}", e)).await; break; } if let Err(e) = log_file.sync_data().await { logger::log_error(&format!("Failed to fsync before checkpoint: {}", e)).await; break; } let existing_companies = existing_companies_writer_for_task.lock().await; let companies_vec: Vec<_> = existing_companies.values().cloned().collect(); drop(existing_companies); let temp_path = checkpoint_path_clone.with_extension("tmp"); match tokio::fs::File::create(&temp_path).await { Ok(mut temp_file) => { let mut checkpoint_ok = true; for company in &companies_vec { if let Ok(line) = serde_json::to_string(company) { if temp_file.write_all(line.as_bytes()).await.is_err() || temp_file.write_all(b"\n").await.is_err() { checkpoint_ok = false; break; } } } if checkpoint_ok { if temp_file.flush().await.is_ok() && temp_file.sync_data().await.is_ok() { drop(temp_file); if tokio::fs::rename(&temp_path, &checkpoint_path_clone).await.is_ok() { if tokio::fs::remove_file(&log_path_clone).await.is_ok() { logger::log_info(&format!( "✓ Checkpoint created ({} companies), log cleared", companies_vec.len() )).await; if let Ok(new_log) = OpenOptions::new() .create(true) .append(true) .open(&log_path_clone) .await { log_file = new_log; } } } } } } Err(e) => { logger::log_error(&format!("Failed to create checkpoint temp file: {}", e)).await; } } updates_since_checkpoint = 0; } LogCommand::Shutdown => { logger::log_info("Writer shutting down...").await; break; } } // Periodic checkpoint trigger if updates_since_checkpoint >= CHECKPOINT_INTERVAL { let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await; } } // Final fsync let _ = log_file.flush().await; let _ = log_file.sync_data().await; logger::log_info(&format!( "Writer finished: {} total ({} new, {} updated)", count, new_count, updated_count )).await; (count, new_count, updated_count) }); // Wrap paths in Arc for safe sharing across tasks let paths = Arc::new((*paths).clone()); // === MAIN PROCESSING LOOP WITH TASK PANIC ISOLATION === let total = pending.len(); let mut tasks = FuturesUnordered::new(); // Counters let processed = Arc::new(AtomicUsize::new(0)); let valid_count = Arc::new(AtomicUsize::new(0)); let filtered_low_cap = Arc::new(AtomicUsize::new(0)); let filtered_no_price = Arc::new(AtomicUsize::new(0)); let failed_count = Arc::new(AtomicUsize::new(0)); // Spawn initial batch for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { if let Some(company) = pending.pop() { spawn_validation_task( company, &yahoo_pool, &paths, &write_tx, shutdown_flag, &processed, &valid_count, &filtered_low_cap, &filtered_no_price, &failed_count, total, &mut tasks, ); } } // Process results and spawn new tasks (with task panic isolation) while let Some(task_result) = tasks.next().await { // Check for shutdown if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown signal received, stopping processing").await; break; } match task_result { Ok(Ok(_)) => { // Success - spawn next task if let Some(company) = pending.pop() { spawn_validation_task( company, &yahoo_pool, &paths, &write_tx, shutdown_flag, &processed, &valid_count, &filtered_low_cap, &filtered_no_price, &failed_count, total, &mut tasks, ); } } Ok(Err(e)) => { // Processing error logger::log_error(&format!("Company processing error: {}", e)).await; if let Some(company) = pending.pop() { spawn_validation_task( company, &yahoo_pool, &paths, &write_tx, shutdown_flag, &processed, &valid_count, &filtered_low_cap, &filtered_no_price, &failed_count, total, &mut tasks, ); } } Err(e) => { // Task panic (isolated - doesn't crash entire process) logger::log_error(&format!("Task panic: {}", e)).await; if let Some(company) = pending.pop() { spawn_validation_task( company, &yahoo_pool, &paths, &write_tx, shutdown_flag, &processed, &valid_count, &filtered_low_cap, &filtered_no_price, &failed_count, total, &mut tasks, ); } } } } logger::log_info("Main processing loop completed").await; // Signal writer to finish let _ = write_tx.send(LogCommand::Checkpoint).await; let _ = write_tx.send(LogCommand::Shutdown).await; drop(write_tx); // Wait for writer to finish let (final_count, final_new, final_updated) = writer_task.await .unwrap_or((0, 0, 0)); let final_valid = valid_count.load(Ordering::SeqCst); let final_filtered_low_cap = filtered_low_cap.load(Ordering::SeqCst); let final_filtered_no_price = filtered_no_price.load(Ordering::SeqCst); let final_failed = failed_count.load(Ordering::SeqCst); logger::log_info(&format!( "✅ Completed: {} total companies ({} new, {} updated)", final_count, final_new, final_updated )).await; logger::log_info(&format!( " Valid: {}, Filtered (low cap): {}, Filtered (no price): {}, Failed: {}", final_valid, final_filtered_low_cap, final_filtered_no_price, final_failed )).await; // === VERIFY AND RECREATE FINAL OUTPUT === logger::log_info("Verifying final output integrity...").await; let final_companies_map = existing_companies_writer.lock().await; let expected_count = final_companies_map.len(); // Always write final consolidated checkpoint let temp_checkpoint = checkpoint_path.with_extension("tmp"); let mut temp_file = File::create(&temp_checkpoint).await?; for company in final_companies_map.values() { let json_line = serde_json::to_string(company)?; temp_file.write_all(json_line.as_bytes()).await?; temp_file.write_all(b"\n").await?; } temp_file.flush().await?; temp_file.sync_data().await?; drop(temp_file); tokio::fs::rename(&temp_checkpoint, &checkpoint_path).await?; drop(final_companies_map); // Clear log since everything is in checkpoint if log_path.exists() { tokio::fs::remove_file(&log_path).await.ok(); } logger::log_info(&format!("✓ Final output: {} companies in {:?}", expected_count, checkpoint_path)).await; // Shutdown Yahoo pool yahoo_pool.shutdown().await?; // Track completion with: // - Content reference: All event directories // - Data stage: Data (7-day TTL by default) // - Dependencies: Depends on cleaned companies data manager.update_entry( step_name.to_string(), content_reference, DataStage::Data, None, // Use default TTL (7 days for Data stage) ).await?; Ok(final_count) } /// Helper function to spawn a validation task (reduces code duplication) fn spawn_validation_task( company: CompanyData, yahoo_pool: &Arc, paths: &Arc, write_tx: &mpsc::Sender, shutdown_flag: &Arc, processed: &Arc, valid_count: &Arc, filtered_low_cap: &Arc, filtered_no_price: &Arc, failed_count: &Arc, total: usize, tasks: &mut FuturesUnordered>>>, ) { let yahoo_pool_clone = Arc::clone(yahoo_pool); let paths_clone = Arc::clone(paths); let shutdown_flag_clone = Arc::clone(shutdown_flag); let write_tx_clone = write_tx.clone(); let processed_clone = Arc::clone(processed); let valid_count_clone = Arc::clone(valid_count); let filtered_low_cap_clone = Arc::clone(filtered_low_cap); let filtered_no_price_clone = Arc::clone(filtered_no_price); let failed_count_clone = Arc::clone(failed_count); let task = tokio::spawn(async move { // Check shutdown at start if shutdown_flag_clone.load(Ordering::SeqCst) { return Ok::<_, anyhow::Error>(None); } let result = process_company_with_validation( &company, &yahoo_pool_clone, &*paths_clone, ).await; match result { CompanyProcessResult::Valid(validated_company) => { // Send to writer let _ = write_tx_clone.send(LogCommand::Write(validated_company)).await; valid_count_clone.fetch_add(1, Ordering::SeqCst); } CompanyProcessResult::FilteredLowCap { name, market_cap } => { filtered_low_cap_clone.fetch_add(1, Ordering::SeqCst); if filtered_low_cap_clone.load(Ordering::SeqCst) <= 10 { logger::log_info(&format!(" Filtered {} - low market cap: {:.0} EUR", name, market_cap)).await; } } CompanyProcessResult::FilteredNoPrice { name } => { filtered_no_price_clone.fetch_add(1, Ordering::SeqCst); if filtered_no_price_clone.load(Ordering::SeqCst) <= 10 { logger::log_info(&format!(" Filtered {} - no recent price data", name)).await; } } CompanyProcessResult::Failed { company: failed_company, error, is_transient: _ } => { failed_count_clone.fetch_add(1, Ordering::SeqCst); logger::log_warn(&format!(" Failed to process '{}': {}", failed_company.name, error)).await; } } // Progress reporting let current = processed_clone.fetch_add(1, Ordering::SeqCst) + 1; if current % 100 == 0 { logger::log_info(&format!( "Progress: {}/{} ({} valid, {} low cap, {} no price, {} failed)", current, total, valid_count_clone.load(Ordering::SeqCst), filtered_low_cap_clone.load(Ordering::SeqCst), filtered_no_price_clone.load(Ordering::SeqCst), failed_count_clone.load(Ordering::SeqCst) )).await; } Ok(None::<()>) }); tasks.push(task); } /// Process a single company with full error categorization async fn process_company_with_validation( company: &CompanyData, yahoo_pool: &Arc, paths: &DataPaths, ) -> CompanyProcessResult { // Extract Yahoo ticker let ticker = match extract_first_yahoo_ticker(company) { Some(t) => t, None => { return CompanyProcessResult::Failed { company: company.clone(), error: "No valid Yahoo ticker found".to_string(), is_transient: false, // Permanent - no ticker means no data }; } }; // Fetch core modules from Yahoo let summary = match yahoo_pool.get_quote_summary( &ticker, &QuoteSummaryModule::core_modules(), ).await { Ok(s) => s, Err(e) => { let error_msg = e.to_string(); let is_transient = is_transient_error(&error_msg); return CompanyProcessResult::Failed { company: company.clone(), error: format!("API error fetching summary: {}", error_msg), is_transient, }; } }; // Validate market cap let market_cap = extract_market_cap(&summary); if market_cap < 100_000_000.0 { return CompanyProcessResult::FilteredLowCap { name: company.name.clone(), market_cap, }; } // Validate recent price activity let has_recent_price = match check_recent_price_activity(yahoo_pool, &ticker).await { Ok(has) => has, Err(e) => { let error_msg = e.to_string(); let is_transient = is_transient_error(&error_msg); return CompanyProcessResult::Failed { company: company.clone(), error: format!("API error fetching price history: {}", error_msg), is_transient, }; } }; if !has_recent_price { return CompanyProcessResult::FilteredNoPrice { name: company.name.clone(), }; } // Save core data if let Err(e) = save_company_core_data(paths, &company.name, &summary).await { logger::log_warn(&format!( " Failed to save core data for {}: {}", company.name, e )).await; } CompanyProcessResult::Valid(company.clone()) } /// Determine if an error is transient (should retry) or permanent (skip) fn is_transient_error(error: &str) -> bool { let error_lower = error.to_lowercase(); // Transient errors (network, rate limiting, timeouts) let transient_patterns = [ "timeout", "timed out", "connection", "network", "rate limit", "too many requests", "429", "503", "502", "500", "temporarily", "unavailable", ]; for pattern in &transient_patterns { if error_lower.contains(pattern) { return true; } } // Permanent errors (invalid ticker, no data, parsing errors) let permanent_patterns = [ "404", "not found", "invalid", "no data", "parse error", "400", "401", "403", ]; for pattern in &permanent_patterns { if error_lower.contains(pattern) { return false; } } // Default: treat unknown errors as transient (safer to retry) true } fn extract_market_cap(summary: &crate::scraper::yahoo::QuoteSummary) -> f64 { let price_module = match summary.modules.get("price") { Some(m) => m, None => return 0.0, }; let market_cap_raw = price_module .get("marketCap") .and_then(|v| v.get("raw")) .and_then(|v| v.as_f64()) .unwrap_or(0.0); let currency = price_module .get("currency") .and_then(|v| v.as_str()) .unwrap_or("USD"); let market_cap_eur = match currency { "EUR" => market_cap_raw, "USD" => market_cap_raw * 0.92, "GBP" => market_cap_raw * 1.17, "JPY" => market_cap_raw * 0.0061, "CHF" => market_cap_raw * 1.05, _ => market_cap_raw * 0.92, }; market_cap_eur } async fn check_recent_price_activity( yahoo_pool: &Arc, ticker: &str, ) -> anyhow::Result { let now = Utc::now().timestamp(); let one_year_ago = now - (365 * 24 * 60 * 60); let sixty_days_ago = now - (60 * 24 * 60 * 60); let chart_data = yahoo_pool.get_chart_data( ticker, "1d", sixty_days_ago, now, ).await?; if chart_data.quotes.is_empty() { return Ok(false); } let most_recent_timestamp = chart_data.quotes .iter() .map(|q| q.timestamp) .max() .unwrap_or(0); Ok(most_recent_timestamp >= one_year_ago) } async fn save_company_core_data( paths: &DataPaths, company_name: &str, summary: &crate::scraper::yahoo::QuoteSummary, ) -> anyhow::Result<()> { use tokio::fs; let safe_name = sanitize_company_name(company_name); let company_dir = paths.corporate_dir().join(&safe_name).join("core"); fs::create_dir_all(&company_dir).await?; let data_path = company_dir.join("data.jsonl"); let json_line = serde_json::to_string(summary)?; let mut file = fs::File::create(&data_path).await?; file.write_all(json_line.as_bytes()).await?; file.write_all(b"\n").await?; file.flush().await?; Ok(()) } /// Check if a company needs processing (validation check) fn company_needs_processing( company: &CompanyData, existing_companies: &HashMap, ) -> bool { // If company exists in cleaned output, skip it !existing_companies.contains_key(&company.name) }