// src/corporate/update.rs - WITH ABORT-SAFE INCREMENTAL PERSISTENCE use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, atomic_writer::*}; use crate::config::Config; use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; use std::result::Result::Ok; use chrono::{Local, Utc}; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use futures::stream::{FuturesUnordered, StreamExt}; use serde_json::json; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use std::time::Duration; /// Result of processing a single company #[derive(Debug, Clone)] pub enum CompanyProcessResult { Valid(CompanyCrossPlatformInfo), FilteredLowCap { name: String, market_cap: f64 }, FilteredNoPrice { name: String }, Failed { company: CompanyCrossPlatformInfo, error: String, is_transient: bool }, } /// Represents a write command to be serialized through the log writer enum LogCommand { Write(CompanyCrossPlatformInfo), Checkpoint, Shutdown, } /// Result from processing a single company with priority struct CompanyTaskResult { company: CompanyCrossPlatformInfo, result: CompanyProcessResult, } /// Check if a company needs processing (validation check) fn company_needs_processing( company: &CompanyCrossPlatformInfo, existing_companies: &HashMap, ) -> bool { // If company exists in cleaned output, skip it !existing_companies.contains_key(&company.name) } /// Main corporate update entry point with shutdown awareness pub async fn run_full_update( _config: &Config, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result<()> { logger::log_info("=== Corporate Update (STREAMING MODE WITH DATA INTEGRITY) ===").await; let paths = DataPaths::new(".")?; logger::log_info("Step 1: Downloading GLEIF CSV...").await; let gleif_csv_path = match download_isin_lei_csv().await? { Some(p) => { logger::log_info(&format!(" ✓ GLEIF CSV at: {}", p)).await; p } None => { logger::log_warn(" ✗ Could not obtain GLEIF CSV").await; return Ok(()); } }; if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after GLEIF download").await; return Ok(()); } logger::log_info("Step 2: Loading OpenFIGI metadata...").await; load_figi_type_lists().await.ok(); logger::log_info(" ✓ OpenFIGI metadata loaded").await; if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after OpenFIGI load").await; return Ok(()); } logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?; if !all_mapped { logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await; } else { logger::log_info(" ✓ All LEIs successfully mapped").await; } if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after LEI-FIGI mapping").await; return Ok(()); } logger::log_info("Step 4: Building securities map (streaming)...").await; let date_dir = find_most_recent_figi_date_dir(&paths).await?; if let Some(date_dir) = date_dir { logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; load_or_build_all_securities(&date_dir).await?; logger::log_info(" ✓ Securities map updated").await; } else { logger::log_warn(" ✗ No FIGI data directory found").await; } if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after securities map build").await; return Ok(()); } logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, _config, &None).await?; logger::log_info(&format!(" ✓ Saved {} companies", count)).await; if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after companies.jsonl build").await; return Ok(()); } logger::log_info("Step 6: Cleansing up companies with missing essential data...").await; let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?; logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await; if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after companies.jsonl build").await; return Ok(()); } logger::log_info("Step 7: Cleansing up companies with too low profile (with abort-safe persistence)...").await; let proxy_pool = pool.get_proxy_pool() .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?; let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, _config, proxy_pool, shutdown_flag).await?; logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await; if !shutdown_flag.load(Ordering::SeqCst) { logger::log_info("Step 8: Processing events (using index)...").await; let _event_index = build_event_index(&paths).await?; logger::log_info(" ✓ Event index built").await; } else { logger::log_warn("Shutdown detected, skipping event index build").await; } logger::log_info("✅ Corporate update complete").await; Ok(()) } /// Cleansing function to remove companies with missing essential yahoo data for integrity pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result { let data_path = paths.data_dir(); let input_path = data_path.join("companies.jsonl"); let output_path = data_path.join("companies_yahoo.jsonl"); let state_path = data_path.join("state.jsonl"); if !input_path.exists() { logger::log_warn("companies.jsonl not found, skipping cleansing").await; return Ok(0); } if state_path.exists() { let state_content = tokio::fs::read_to_string(&state_path).await?; for line in state_content.lines() { if line.trim().is_empty() { continue; } if let Ok(state) = serde_json::from_str::(line) { if state.get("yahoo_companies_cleansed_no_data").and_then(|v| v.as_bool()).unwrap_or(false) { logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await; if output_path.exists() { let output_content = tokio::fs::read_to_string(&output_path).await?; let count = output_content.lines() .filter(|line| !line.trim().is_empty()) .count(); logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await; return Ok(count); } else { logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await; break; } } } } } logger::log_info(&format!(" Reading from: {:?}", input_path)).await; logger::log_info(&format!(" Writing to: {:?}", output_path)).await; let file = File::open(&input_path).await?; let reader = BufReader::new(file); let mut lines = reader.lines(); let mut output_file = File::create(&output_path).await?; let mut valid_count = 0; let mut removed_count = 0; let mut total_count = 0; while let Some(line) = lines.next_line().await? { if line.trim().is_empty() { continue; } total_count += 1; let company: CompanyCrossPlatformInfo = match serde_json::from_str(&line) { Ok(c) => c, Err(e) => { logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await; continue; } }; let has_valid_yahoo = company.isin_tickers_map .values() .flatten() .any(|ticker| { ticker.starts_with("YAHOO:") && ticker != "YAHOO:NO_RESULTS" && ticker != "YAHOO:ERROR" }); if has_valid_yahoo { let json_line = serde_json::to_string(&company)?; output_file.write_all(json_line.as_bytes()).await?; output_file.write_all(b"\n").await?; valid_count += 1; } else { removed_count += 1; if removed_count <= 5 { logger::log_info(&format!(" Removed company '{}' (no valid Yahoo ticker)", company.name)).await; } } if total_count % 1000 == 0 { logger::log_info(&format!(" Processed {} companies...", total_count)).await; } } output_file.flush().await?; logger::log_info(&format!( " ✓ Cleansing complete: {} total → {} valid, {} removed", total_count, valid_count, removed_count )).await; let yahoo_companies = json!({ "yahoo_companies_cleansed_no_data": true, "completed_at": chrono::Utc::now().to_rfc3339(), }); let mut state_file = File::create(&state_path).await?; let state_line = serde_json::to_string(&yahoo_companies)?; state_file.write_all(state_line.as_bytes()).await?; state_file.write_all(b"\n").await?; state_file.flush().await?; logger::log_info(&format!(" ✓ State file created at: {:?}", state_path)).await; Ok(valid_count) } /// Yahoo Low Profile Cleansing WITH ABORT-SAFE INCREMENTAL PERSISTENCE /// /// # Features /// - ✅ Graceful shutdown (abort-safe) /// - ✅ Task panic isolation (tasks fail independently) /// - ✅ Crash-safe persistence (checkpoint + log with fsync) /// - ✅ Smart skip logic (only process incomplete data) /// - Uses pending queue instead of retry mechanism /// - Reuses companies_update.log for persistence /// /// # Persistence Strategy /// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) /// - Log: companies_update.log (append-only updates) /// - On restart: Load checkpoint + replay log /// - Periodic checkpoints (every 50 companies) /// - Batched fsync (every 10 writes or 10 seconds) pub async fn companies_yahoo_cleansed_low_profile( paths: &DataPaths, config: &Config, proxy_pool: Arc, shutdown_flag: &Arc, ) -> anyhow::Result { // Configuration constants const CHECKPOINT_INTERVAL: usize = 50; const FSYNC_BATCH_SIZE: usize = 10; const FSYNC_INTERVAL_SECS: u64 = 10; const CONCURRENCY_LIMIT: usize = 50; // Limit parallel validation tasks let data_path = paths.data_dir(); // File paths (reusing companies_update.log) let input_path = data_path.join("companies_yahoo.jsonl"); let checkpoint_path = data_path.join("companies_yahoo_cleaned.jsonl"); let log_path = data_path.join("companies_update.log"); // Check input exists if !input_path.exists() { logger::log_warn(" companies_yahoo.jsonl not found, skipping low profile cleansing").await; return Ok(0); } // === RECOVERY PHASE: Load checkpoint + replay log === let mut existing_companies: HashMap = HashMap::new(); let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); if checkpoint_path.exists() { logger::log_info("Loading checkpoint from companies_yahoo_cleaned.jsonl...").await; let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?; for line in checkpoint_content.lines() { if line.trim().is_empty() || !line.ends_with('}') { continue; // Skip incomplete lines } match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); } Err(e) => { logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; } } } logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; } if log_path.exists() { logger::log_info("Replaying update log...").await; let log_content = tokio::fs::read_to_string(&log_path).await?; let mut replayed = 0; for line in log_content.lines() { if line.trim().is_empty() || !line.ends_with('}') { continue; // Skip incomplete lines } match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); replayed += 1; } Err(e) => { logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; } } } if replayed > 0 { logger::log_info(&format!("Replayed {} updates from log", replayed)).await; } } // === LOAD INPUT COMPANIES === logger::log_info(&format!("Loading companies from: {:?}", input_path)).await; let input_companies = load_companies_from_jsonl(&input_path).await?; logger::log_info(&format!("Loaded {} companies from input", input_companies.len())).await; // === BUILD PENDING LIST (smart skip logic) === let mut pending: Vec = input_companies .into_iter() .filter(|company| company_needs_processing(company, &existing_companies)) .collect(); logger::log_info(&format!( "Initial scan: {} companies need processing ({} already complete)", pending.len(), existing_companies.len() )).await; if pending.is_empty() { logger::log_info(" ✓ All companies already processed").await; return Ok(existing_companies.len()); } // === SETUP LOG WRITER TASK === let (write_tx, mut write_rx) = mpsc::channel::(1000); let log_file_init = OpenOptions::new() .create(true) .append(true) .open(&log_path) .await?; let checkpoint_path_clone = checkpoint_path.clone(); let log_path_clone = log_path.clone(); let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer); let write_tx_for_writer = write_tx.clone(); let writer_task = tokio::spawn(async move { let mut log_file = log_file_init; let mut writes_since_fsync = 0; let mut last_fsync = std::time::Instant::now(); let mut updates_since_checkpoint = 0; let mut count = 0; let mut new_count = 0; let mut updated_count = 0; while let Some(cmd) = write_rx.recv().await { match cmd { LogCommand::Write(company) => { // Write to log let line = serde_json::to_string(&company).unwrap(); if let Err(e) = log_file.write_all(line.as_bytes()).await { logger::log_error(&format!("Failed to write to log: {}", e)).await; break; } if let Err(e) = log_file.write_all(b"\n").await { logger::log_error(&format!("Failed to write newline: {}", e)).await; break; } writes_since_fsync += 1; updates_since_checkpoint += 1; count += 1; // Update in-memory state let mut existing_companies = existing_companies_writer_for_task.lock().await; let is_update = existing_companies.contains_key(&company.name); existing_companies.insert(company.name.clone(), company); drop(existing_companies); if is_update { updated_count += 1; } else { new_count += 1; } // Batched + time-based fsync let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; if should_fsync { if let Err(e) = log_file.flush().await { logger::log_error(&format!("Failed to flush: {}", e)).await; break; } if let Err(e) = log_file.sync_data().await { logger::log_error(&format!("Failed to fsync: {}", e)).await; break; } writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } } LogCommand::Checkpoint => { if let Err(e) = log_file.flush().await { logger::log_error(&format!("Failed to flush before checkpoint: {}", e)).await; break; } if let Err(e) = log_file.sync_data().await { logger::log_error(&format!("Failed to fsync before checkpoint: {}", e)).await; break; } let existing_companies = existing_companies_writer_for_task.lock().await; let companies_vec: Vec<_> = existing_companies.values().cloned().collect(); drop(existing_companies); let temp_path = checkpoint_path_clone.with_extension("tmp"); match tokio::fs::File::create(&temp_path).await { Ok(mut temp_file) => { let mut checkpoint_ok = true; for company in &companies_vec { if let Ok(line) = serde_json::to_string(company) { if temp_file.write_all(line.as_bytes()).await.is_err() || temp_file.write_all(b"\n").await.is_err() { checkpoint_ok = false; break; } } } if checkpoint_ok { if temp_file.flush().await.is_ok() && temp_file.sync_data().await.is_ok() { drop(temp_file); if tokio::fs::rename(&temp_path, &checkpoint_path_clone).await.is_ok() { if tokio::fs::remove_file(&log_path_clone).await.is_ok() { logger::log_info(&format!( "✓ Checkpoint created ({} companies), log cleared", companies_vec.len() )).await; if let Ok(new_log) = OpenOptions::new() .create(true) .append(true) .open(&log_path_clone) .await { log_file = new_log; } } } } } } Err(e) => { logger::log_error(&format!("Failed to create checkpoint temp file: {}", e)).await; } } updates_since_checkpoint = 0; } LogCommand::Shutdown => { logger::log_info("Writer shutting down...").await; break; } } // Periodic checkpoint trigger if updates_since_checkpoint >= CHECKPOINT_INTERVAL { let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await; } } // Final fsync let _ = log_file.flush().await; let _ = log_file.sync_data().await; logger::log_info(&format!( "Writer finished: {} total ({} new, {} updated)", count, new_count, updated_count )).await; (count, new_count, updated_count) }); // === CREATE YAHOO CLIENT POOL === logger::log_info("Creating YahooClientPool with proxy rotation...").await; let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?); logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await; // Wrap paths in Arc for safe sharing across tasks let paths = Arc::new((*paths).clone()); // === MAIN PROCESSING LOOP WITH TASK PANIC ISOLATION === let total = pending.len(); let mut tasks = FuturesUnordered::new(); // Counters let processed = Arc::new(AtomicUsize::new(0)); let valid_count = Arc::new(AtomicUsize::new(0)); let filtered_low_cap = Arc::new(AtomicUsize::new(0)); let filtered_no_price = Arc::new(AtomicUsize::new(0)); let failed_count = Arc::new(AtomicUsize::new(0)); // Spawn initial batch for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { if let Some(company) = pending.pop() { spawn_validation_task( company, &yahoo_pool, &paths, &write_tx, shutdown_flag, &processed, &valid_count, &filtered_low_cap, &filtered_no_price, &failed_count, total, &mut tasks, ); } } // Process results and spawn new tasks (with task panic isolation) while let Some(task_result) = tasks.next().await { // Check for shutdown if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown signal received, stopping processing").await; break; } match task_result { Ok(Ok(Some(_result))) => { // Success - spawn next task if let Some(company) = pending.pop() { spawn_validation_task( company, &yahoo_pool, &paths, &write_tx, shutdown_flag, &processed, &valid_count, &filtered_low_cap, &filtered_no_price, &failed_count, total, &mut tasks, ); } } Ok(Ok(None)) => { // Filtered or failed - spawn next task if let Some(company) = pending.pop() { spawn_validation_task( company, &yahoo_pool, &paths, &write_tx, shutdown_flag, &processed, &valid_count, &filtered_low_cap, &filtered_no_price, &failed_count, total, &mut tasks, ); } } Ok(Err(e)) => { // Processing error logger::log_error(&format!("Company processing error: {}", e)).await; if let Some(company) = pending.pop() { spawn_validation_task( company, &yahoo_pool, &paths, &write_tx, shutdown_flag, &processed, &valid_count, &filtered_low_cap, &filtered_no_price, &failed_count, total, &mut tasks, ); } } Err(e) => { // Task panic (isolated - doesn't crash entire process) logger::log_error(&format!("Task panic: {}", e)).await; if let Some(company) = pending.pop() { spawn_validation_task( company, &yahoo_pool, &paths, &write_tx, shutdown_flag, &processed, &valid_count, &filtered_low_cap, &filtered_no_price, &failed_count, total, &mut tasks, ); } } } } logger::log_info("Main processing loop completed").await; // Signal writer to finish let _ = write_tx.send(LogCommand::Checkpoint).await; let _ = write_tx.send(LogCommand::Shutdown).await; drop(write_tx); // Wait for writer to finish let (final_count, final_new, final_updated) = writer_task.await .unwrap_or((0, 0, 0)); let final_valid = valid_count.load(Ordering::SeqCst); let final_filtered_low_cap = filtered_low_cap.load(Ordering::SeqCst); let final_filtered_no_price = filtered_no_price.load(Ordering::SeqCst); let final_failed = failed_count.load(Ordering::SeqCst); logger::log_info(&format!( "✅ Completed: {} total companies ({} new, {} updated)", final_count, final_new, final_updated )).await; logger::log_info(&format!( " Valid: {}, Filtered (low cap): {}, Filtered (no price): {}, Failed: {}", final_valid, final_filtered_low_cap, final_filtered_no_price, final_failed )).await; // Shutdown Yahoo pool yahoo_pool.shutdown().await?; Ok(final_valid) } /// Helper function to spawn a validation task (reduces code duplication) fn spawn_validation_task( company: CompanyCrossPlatformInfo, yahoo_pool: &Arc, paths: &Arc, write_tx: &mpsc::Sender, shutdown_flag: &Arc, processed: &Arc, valid_count: &Arc, filtered_low_cap: &Arc, filtered_no_price: &Arc, failed_count: &Arc, total: usize, tasks: &mut FuturesUnordered>>>, ) { let yahoo_pool_clone = Arc::clone(yahoo_pool); let paths_clone = Arc::clone(paths); let shutdown_flag_clone = Arc::clone(shutdown_flag); let write_tx_clone = write_tx.clone(); let processed_clone = Arc::clone(processed); let valid_count_clone = Arc::clone(valid_count); let filtered_low_cap_clone = Arc::clone(filtered_low_cap); let filtered_no_price_clone = Arc::clone(filtered_no_price); let failed_count_clone = Arc::clone(failed_count); let task = tokio::spawn(async move { // Check shutdown at start if shutdown_flag_clone.load(Ordering::SeqCst) { return Ok::<_, anyhow::Error>(None); } let result = process_company_with_validation( &company, &yahoo_pool_clone, &*paths_clone, ).await; let task_result = match result { CompanyProcessResult::Valid(validated_company) => { // Send to writer let _ = write_tx_clone.send(LogCommand::Write(validated_company.clone())).await; valid_count_clone.fetch_add(1, Ordering::SeqCst); Some(CompanyTaskResult { company: validated_company.clone(), result: CompanyProcessResult::Valid(validated_company), }) } CompanyProcessResult::FilteredLowCap { name, market_cap } => { filtered_low_cap_clone.fetch_add(1, Ordering::SeqCst); if filtered_low_cap_clone.load(Ordering::SeqCst) <= 10 { logger::log_info(&format!(" Filtered {} - low market cap: {:.0} EUR", name, market_cap)).await; } None } CompanyProcessResult::FilteredNoPrice { name } => { filtered_no_price_clone.fetch_add(1, Ordering::SeqCst); if filtered_no_price_clone.load(Ordering::SeqCst) <= 10 { logger::log_info(&format!(" Filtered {} - no recent price data", name)).await; } None } CompanyProcessResult::Failed { company: failed_company, error, is_transient: _ } => { failed_count_clone.fetch_add(1, Ordering::SeqCst); logger::log_warn(&format!(" Failed to process '{}': {}", failed_company.name, error)).await; None } }; // Progress reporting let current = processed_clone.fetch_add(1, Ordering::SeqCst) + 1; if current % 100 == 0 { logger::log_info(&format!( "Progress: {}/{} ({} valid, {} low cap, {} no price, {} failed)", current, total, valid_count_clone.load(Ordering::SeqCst), filtered_low_cap_clone.load(Ordering::SeqCst), filtered_no_price_clone.load(Ordering::SeqCst), failed_count_clone.load(Ordering::SeqCst) )).await; } Ok(task_result) }); tasks.push(task); } /// Process a single company with full error categorization async fn process_company_with_validation( company: &CompanyCrossPlatformInfo, yahoo_pool: &Arc, paths: &DataPaths, ) -> CompanyProcessResult { // Extract Yahoo ticker let ticker = match extract_first_yahoo_ticker(company) { Some(t) => t, None => { return CompanyProcessResult::Failed { company: company.clone(), error: "No valid Yahoo ticker found".to_string(), is_transient: false, // Permanent - no ticker means no data }; } }; // Fetch core modules from Yahoo let summary = match yahoo_pool.get_quote_summary( &ticker, &QuoteSummaryModule::core_modules(), ).await { Ok(s) => s, Err(e) => { let error_msg = e.to_string(); let is_transient = is_transient_error(&error_msg); return CompanyProcessResult::Failed { company: company.clone(), error: format!("API error fetching summary: {}", error_msg), is_transient, }; } }; // Validate market cap let market_cap = extract_market_cap(&summary); if market_cap < 1_000_000.0 { return CompanyProcessResult::FilteredLowCap { name: company.name.clone(), market_cap, }; } // Validate recent price activity let has_recent_price = match check_recent_price_activity(yahoo_pool, &ticker).await { Ok(has) => has, Err(e) => { let error_msg = e.to_string(); let is_transient = is_transient_error(&error_msg); return CompanyProcessResult::Failed { company: company.clone(), error: format!("API error fetching price history: {}", error_msg), is_transient, }; } }; if !has_recent_price { return CompanyProcessResult::FilteredNoPrice { name: company.name.clone(), }; } // Save core data if let Err(e) = save_company_core_data(paths, &company.name, &summary).await { logger::log_warn(&format!( " Failed to save core data for {}: {}", company.name, e )).await; } CompanyProcessResult::Valid(company.clone()) } /// Determine if an error is transient (should retry) or permanent (skip) fn is_transient_error(error: &str) -> bool { let error_lower = error.to_lowercase(); // Transient errors (network, rate limiting, timeouts) let transient_patterns = [ "timeout", "timed out", "connection", "network", "rate limit", "too many requests", "429", "503", "502", "500", "temporarily", "unavailable", ]; for pattern in &transient_patterns { if error_lower.contains(pattern) { return true; } } // Permanent errors (invalid ticker, no data, parsing errors) let permanent_patterns = [ "404", "not found", "invalid", "no data", "parse error", "400", "401", "403", ]; for pattern in &permanent_patterns { if error_lower.contains(pattern) { return false; } } // Default: treat unknown errors as transient (safer to retry) true } /// Load companies from JSONL file async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result> { let content = tokio::fs::read_to_string(path).await?; let mut companies = Vec::new(); for line in content.lines() { if line.trim().is_empty() { continue; } if let Ok(company) = serde_json::from_str::(line) { companies.push(company); } } Ok(companies) } fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { for tickers in company.isin_tickers_map.values() { for ticker in tickers { if ticker.starts_with("YAHOO:") && ticker != "YAHOO:NO_RESULTS" && ticker != "YAHOO:ERROR" { return Some(ticker.trim_start_matches("YAHOO:").to_string()); } } } None } fn extract_market_cap(summary: &crate::scraper::yahoo::QuoteSummary) -> f64 { let price_module = match summary.modules.get("price") { Some(m) => m, None => return 0.0, }; let market_cap_raw = price_module .get("marketCap") .and_then(|v| v.get("raw")) .and_then(|v| v.as_f64()) .unwrap_or(0.0); let currency = price_module .get("currency") .and_then(|v| v.as_str()) .unwrap_or("USD"); let market_cap_eur = match currency { "EUR" => market_cap_raw, "USD" => market_cap_raw * 0.92, "GBP" => market_cap_raw * 1.17, "JPY" => market_cap_raw * 0.0061, "CHF" => market_cap_raw * 1.05, _ => market_cap_raw * 0.92, }; market_cap_eur } async fn check_recent_price_activity( yahoo_pool: &Arc, ticker: &str, ) -> anyhow::Result { let now = Utc::now().timestamp(); let one_year_ago = now - (365 * 24 * 60 * 60); let sixty_days_ago = now - (60 * 24 * 60 * 60); let chart_data = yahoo_pool.get_chart_data( ticker, "1d", sixty_days_ago, now, ).await?; if chart_data.quotes.is_empty() { return Ok(false); } let most_recent_timestamp = chart_data.quotes .iter() .map(|q| q.timestamp) .max() .unwrap_or(0); Ok(most_recent_timestamp >= one_year_ago) } async fn save_company_core_data( paths: &DataPaths, company_name: &str, summary: &crate::scraper::yahoo::QuoteSummary, ) -> anyhow::Result<()> { use tokio::fs; let safe_name = company_name .replace("/", "_") .replace("\\", "_") .replace(":", "_") .replace("*", "_") .replace("?", "_") .replace("\"", "_") .replace("<", "_") .replace(">", "_") .replace("|", "_"); let company_dir = paths.corporate_dir().join(&safe_name).join("core"); fs::create_dir_all(&company_dir).await?; let data_path = company_dir.join("data.jsonl"); let json_line = serde_json::to_string(summary)?; let mut file = fs::File::create(&data_path).await?; file.write_all(json_line.as_bytes()).await?; file.write_all(b"\n").await?; file.flush().await?; Ok(()) } async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); if !map_cache_dir.exists() { return Ok(None); } let mut entries = tokio::fs::read_dir(&map_cache_dir).await?; let mut dates = Vec::new(); while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.is_dir() { if let Some(name) = path.file_name().and_then(|n| n.to_str()) { if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { dates.push((name.to_string(), path)); } } } } if dates.is_empty() { return Ok(None); } dates.sort_by(|a, b| b.0.cmp(&a.0)); Ok(Some(dates[0].1.clone())) } pub struct ProcessResult { pub changes: Vec, } pub fn process_batch( new_events: &[CompanyEvent], existing: &mut HashMap, today: &str, ) -> ProcessResult { let mut changes = Vec::new(); for new in new_events { let key = event_key(new); if let Some(old) = existing.get(&key) { changes.extend(detect_changes(old, new, today)); existing.insert(key, new.clone()); continue; } let date_key = format!("{}|{}", new.ticker, new.date); let mut found_old = None; for (k, e) in existing.iter() { if format!("{}|{}", e.ticker, e.date) == date_key && k != &key { found_old = Some((k.clone(), e.clone())); break; } } if let Some((old_key, old_event)) = found_old { if new.date.as_str() > today { changes.push(CompanyEventChange { ticker: new.ticker.clone(), date: new.date.clone(), field_changed: "time".to_string(), old_value: old_event.time.clone(), new_value: new.time.clone(), detected_at: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), }); } existing.remove(&old_key); } existing.insert(key, new.clone()); } ProcessResult { changes } }