// src/corporate/update_companies.rs use super::{types::*, yahoo_company_extraction::*, helpers::*}; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; use crate::scraper::hard_reset::perform_hard_reset; use crate::corporate::checkpoint_helpers; use crate::config::Config; use tokio::sync::mpsc; use tokio::io::AsyncWriteExt; use tokio::fs::OpenOptions; use tokio::time::sleep; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use futures::stream::{FuturesUnordered, StreamExt}; use anyhow::{anyhow, Result}; /// Represents a write command to be serialized through the log writer enum LogCommand { Write(CompanyCrossPlatformInfo), Checkpoint, Shutdown, } /// Result from processing a single company struct CompanyProcessResult { company: CompanyCrossPlatformInfo, is_update: bool, } /// Check if a company needs Yahoo data processing /// Returns true if company has incomplete data (needs processing) fn company_needs_processing( company_name: &str, company_info: &CompanyInfo, existing_companies: &HashMap, ) -> bool { // If company not in existing data at all, definitely needs processing let Some(existing_entry) = existing_companies.get(company_name) else { return true; }; // Collect all ISINs this company should have let mut required_isins = std::collections::HashSet::new(); for figi_infos in company_info.securities.values() { for figi_info in figi_infos { if !figi_info.isin.is_empty() { required_isins.insert(figi_info.isin.clone()); } } } // Check each required ISIN for isin in required_isins { // Check if this ISIN exists in the company's ticker map if let Some(tickers) = existing_entry.isin_tickers_map.get(&isin) { // Check if this ISIN has valid Yahoo data let has_valid_yahoo = tickers.iter().any(|t| { t.starts_with("YAHOO:") && t != "YAHOO:ERROR" //&& // Error marker means needs retry //t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found) }); // If no valid Yahoo data for this ISIN, company needs processing if !has_valid_yahoo { return true; } } else { // ISIN not in map at all, needs processing return true; } } // All ISINs have valid Yahoo data, skip this company false } /// Abort-safe incremental JSONL persistence with proper hard reset handling pub async fn build_companies_jsonl_streaming_parallel( paths: &DataPaths, pool: &Arc, shutdown_flag: &Arc, config: &Config, monitoring: &Option, ) -> anyhow::Result { // Configuration constants const CHECKPOINT_INTERVAL: usize = 50; const FSYNC_BATCH_SIZE: usize = 10; const FSYNC_INTERVAL_SECS: u64 = 10; const CONCURRENCY_LIMIT: usize = 100; // Wrap pool in mutex for potential replacement let pool_mutex = Arc::new(tokio::sync::Mutex::new(Arc::clone(pool))); // Synchronization for hard reset let reset_in_progress = Arc::new(tokio::sync::Mutex::new(false)); let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); let securities_checkpoint = corporate_path.join("common_stocks.jsonl"); let securities_log = corporate_path.join("common_stocks.log.jsonl"); if !securities_checkpoint.exists() { logger::log_warn("No common_stocks.jsonl found").await; return Ok(0); } // Load securities from checkpoint and replay log logger::log_info("Loading common stocks from JSONL checkpoint and log...").await; let securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?; logger::log_info(&format!("Loaded {} companies from common stocks", securities.len())).await; let companies_path = paths.data_dir().join("companies.jsonl"); let log_path = paths.data_dir().join("companies_updates.log"); if let Some(parent) = companies_path.parent() { tokio::fs::create_dir_all(parent).await?; } // === RECOVERY PHASE: Load checkpoint + replay log === let existing_companies = checkpoint_helpers::load_checkpoint_with_log( &companies_path, &log_path, "companies.jsonl" ).await?; // === SETUP LOG WRITER TASK === let (write_tx, mut write_rx) = mpsc::channel::(1000); let log_file_init = OpenOptions::new() .create(true) .append(true) .open(&log_path) .await?; let companies_path_clone = companies_path.clone(); let log_path_clone = log_path.clone(); let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); // Clone the Arc for the writer task (Arc clone is cheap, just increments ref count) let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer); let write_tx_for_writer = write_tx.clone(); let writer_task = tokio::spawn(async move { let mut log_file = log_file_init; let mut writes_since_fsync = 0; let mut last_fsync = std::time::Instant::now(); let mut updates_since_checkpoint = 0; let mut count = 0; let mut new_count = 0; let mut updated_count = 0; while let Some(cmd) = write_rx.recv().await { match cmd { LogCommand::Write(company) => { // Write to log let line = serde_json::to_string(&company).unwrap(); if let Err(e) = log_file.write_all(line.as_bytes()).await { logger::log_error(&format!("Failed to write to log: {}", e)).await; break; } if let Err(e) = log_file.write_all(b"\n").await { logger::log_error(&format!("Failed to write newline: {}", e)).await; break; } writes_since_fsync += 1; updates_since_checkpoint += 1; count += 1; // Update in-memory state let mut existing_companies = existing_companies_writer_for_task.lock().await; let is_update = existing_companies.contains_key(&company.name); existing_companies.insert(company.name.clone(), company); drop(existing_companies); if is_update { updated_count += 1; } else { new_count += 1; } // Batched + time-based fsync let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; if should_fsync { if let Err(e) = log_file.flush().await { logger::log_error(&format!("Failed to flush: {}", e)).await; break; } if let Err(e) = log_file.sync_data().await { logger::log_error(&format!("Failed to fsync: {}", e)).await; break; } writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } } LogCommand::Checkpoint => { if let Err(e) = log_file.flush().await { logger::log_error(&format!("Failed to flush before checkpoint: {}", e)).await; break; } if let Err(e) = log_file.sync_data().await { logger::log_error(&format!("Failed to fsync before checkpoint: {}", e)).await; break; } let existing_companies = existing_companies_writer_for_task.lock().await; let companies_vec: Vec<_> = existing_companies.values().cloned().collect(); drop(existing_companies); let temp_path = companies_path_clone.with_extension("tmp"); match tokio::fs::File::create(&temp_path).await { Ok(mut temp_file) => { let mut checkpoint_ok = true; for company in &companies_vec { if let Ok(line) = serde_json::to_string(company) { if temp_file.write_all(line.as_bytes()).await.is_err() || temp_file.write_all(b"\n").await.is_err() { checkpoint_ok = false; break; } } } if checkpoint_ok { if temp_file.flush().await.is_ok() && temp_file.sync_data().await.is_ok() { drop(temp_file); if tokio::fs::rename(&temp_path, &companies_path_clone).await.is_ok() { if tokio::fs::remove_file(&log_path_clone).await.is_ok() { logger::log_info(&format!( "✓ Checkpoint created ({} companies), log cleared", companies_vec.len() )).await; if let Ok(new_log) = OpenOptions::new() .create(true) .append(true) .open(&log_path_clone) .await { log_file = new_log; } } } } } } Err(e) => { logger::log_error(&format!("Failed to create checkpoint temp file: {}", e)).await; } } updates_since_checkpoint = 0; } LogCommand::Shutdown => { logger::log_info("Writer shutting down...").await; break; } } // Periodic checkpoint trigger if updates_since_checkpoint >= CHECKPOINT_INTERVAL { let _ = write_tx.send(LogCommand::Checkpoint).await; } } // Final fsync let _ = log_file.flush().await; let _ = log_file.sync_data().await; logger::log_info(&format!( "Writer finished: {} total ({} new, {} updated)", count, new_count, updated_count )).await; (count, new_count, updated_count) }); // === MAIN PROCESSING LOOP === let total = securities.len(); logger::log_info(&format!("Processing {} companies with concurrency limit {}", total, CONCURRENCY_LIMIT)).await; let mut tasks = FuturesUnordered::new(); // Build initial pending list with proper filtering let mut pending: Vec<(String, CompanyInfo)> = securities.iter() .filter(|(name, info)| company_needs_processing(name, info, &existing_companies)) .map(|(name, info)| (name.clone(), info.clone())) .collect(); logger::log_info(&format!( "Initial scan: {} companies need processing ({} already complete)", pending.len(), total - pending.len() )).await; let mut processed = 0; let mut hard_reset_count = 0; // Spawn initial batch for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { if let Some((name, company_info)) = pending.pop() { let current_pool = { let pool_guard = pool_mutex.lock().await; Arc::clone(&*pool_guard) }; let existing = existing_companies.get(&name).cloned(); let shutdown_flag_clone = Arc::clone(shutdown_flag); let task = tokio::spawn(async move { process_single_company_validated( name, company_info, existing, ¤t_pool, &shutdown_flag_clone, ).await }); tasks.push(task); } } // Process results and spawn new tasks while let Some(task_result) = tasks.next().await { // Check for shutdown if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown signal received, stopping processing").await; break; } match task_result { Ok(Ok(Some(result))) => { // Success: send to writer let _ = write_tx_for_writer.send(LogCommand::Write(result.company)).await; processed += 1; // Log progress every 100 companies if processed % 100 == 0 { logger::log_info(&format!( "Progress: {}/{} companies processed ({} resets)", processed, total, hard_reset_count )).await; } // Spawn next task if available if let Some((name, company_info)) = pending.pop() { let current_pool = { let pool_guard = pool_mutex.lock().await; Arc::clone(&*pool_guard) }; let existing = existing_companies.get(&name).cloned(); let shutdown_flag_clone = Arc::clone(shutdown_flag); let task = tokio::spawn(async move { process_single_company_validated( name, company_info, existing, ¤t_pool, &shutdown_flag_clone, ).await }); tasks.push(task); } } Ok(Ok(None)) => { // No result (shutdown or skip) processed += 1; if let Some((name, company_info)) = pending.pop() { let current_pool = { let pool_guard = pool_mutex.lock().await; Arc::clone(&*pool_guard) }; let existing = existing_companies.get(&name).cloned(); let shutdown_flag_clone = Arc::clone(shutdown_flag); let task = tokio::spawn(async move { process_single_company_validated( name, company_info, existing, ¤t_pool, &shutdown_flag_clone, ).await }); tasks.push(task); } } Ok(Err(e)) => { let error_msg = e.to_string(); if error_msg.contains("HARD_RESET_REQUIRED") { // Check if reset already in progress (race condition protection) let mut reset_lock = reset_in_progress.lock().await; if *reset_lock { logger::log_info("Hard reset already in progress, skipping duplicate").await; processed += 1; continue; } *reset_lock = true; drop(reset_lock); // Release lock during reset logger::log_error("🔴 HARD RESET THRESHOLD REACHED - INITIATING RESET SEQUENCE").await; logger::log_warn("Draining active tasks before hard reset...").await; // Save remaining pending count let remaining_count = pending.len(); // Stop spawning new tasks pending.clear(); // Wait for all active tasks to complete let mut drained = 0; while let Some(_) = tasks.next().await { drained += 1; if drained % 10 == 0 { logger::log_info(&format!("Drained {} tasks...", drained)).await; } } logger::log_info(&format!( "All tasks drained ({} active). {} companies need reprocessing.", drained, remaining_count )).await; // Perform the actual hard reset match perform_hard_reset(&pool_mutex, config, paths, monitoring, shutdown_flag).await { Ok(()) => { logger::log_info("✅ Hard reset completed successfully").await; hard_reset_count += 1; // Reset the error counter { let pool_guard = pool_mutex.lock().await; let current_pool = Arc::clone(&*pool_guard); current_pool.get_reset_controller().reset(); } logger::log_info("✓ Error counter cleared").await; // Rebuild pending list by checking which companies need processing logger::log_info("Rebuilding pending queue with proper Yahoo data checks...").await; // Get current state of written companies let current_existing = { let companies = existing_companies_writer.lock().await; companies.clone() }; // Reload all securities from disk (checkpoint + log) logger::log_info("Reloading securities from JSONL...").await; let all_securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?; logger::log_info(&format!("Reloaded {} companies", all_securities.len())).await; // Build pending list: only companies that need processing pending = all_securities.iter() .filter(|(name, info)| company_needs_processing(name, info, ¤t_existing)) .map(|(name, info)| (name.clone(), info.clone())) .collect(); logger::log_info(&format!( "Restarting with {} remaining companies (out of {} total)", pending.len(), total )).await; // Only continue if there's work to do if pending.is_empty() { logger::log_info("All companies have complete data, exiting").await; // Clear reset flag let mut reset_lock = reset_in_progress.lock().await; *reset_lock = false; drop(reset_lock); break; // Exit main loop } // Respawn initial batch with NEW pool for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) { if let Some((name, company_info)) = pending.pop() { let current_pool = { let pool_guard = pool_mutex.lock().await; Arc::clone(&*pool_guard) }; let existing = existing_companies.get(&name).cloned(); let shutdown_flag_clone = Arc::clone(shutdown_flag); let task = tokio::spawn(async move { process_single_company_validated( name, company_info, existing, ¤t_pool, &shutdown_flag_clone, ).await }); tasks.push(task); } } // Clear reset flag let mut reset_lock = reset_in_progress.lock().await; *reset_lock = false; drop(reset_lock); // ✅ Continue processing (don't spawn duplicate task) continue; } Err(reset_err) => { logger::log_error(&format!("Hard reset failed: {}", reset_err)).await; // Clear reset flag let mut reset_lock = reset_in_progress.lock().await; *reset_lock = false; drop(reset_lock); // Exit if hard reset fails break; } } } else { // Regular error logger::log_warn(&format!("Company processing error: {}", error_msg)).await; processed += 1; // Spawn next task if let Some((name, company_info)) = pending.pop() { let current_pool = { let pool_guard = pool_mutex.lock().await; Arc::clone(&*pool_guard) }; let existing = existing_companies.get(&name).cloned(); let shutdown_flag_clone = Arc::clone(shutdown_flag); let task = tokio::spawn(async move { process_single_company_validated( name, company_info, existing, ¤t_pool, &shutdown_flag_clone, ).await }); tasks.push(task); } } } Err(e) => { // Task panic logger::log_error(&format!("Task panic: {}", e)).await; processed += 1; // Spawn next task if let Some((name, company_info)) = pending.pop() { let current_pool = { let pool_guard = pool_mutex.lock().await; Arc::clone(&*pool_guard) }; let existing = existing_companies.get(&name).cloned(); let shutdown_flag_clone = Arc::clone(shutdown_flag); let task = tokio::spawn(async move { process_single_company_validated( name, company_info, existing, ¤t_pool, &shutdown_flag_clone, ).await }); tasks.push(task); } } } } logger::log_info("Main processing loop completed").await; // Signal writer to finish let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await; let _ = write_tx_for_writer.send(LogCommand::Shutdown).await; drop(write_tx_for_writer); // Wait for writer to finish let (final_count, final_new, final_updated) = writer_task.await .unwrap_or((0, 0, 0)); logger::log_info(&format!( "✅ Completed: {} total companies ({} new, {} updated, {} hard resets)", final_count, final_new, final_updated, hard_reset_count )).await; Ok(final_count) } /// Loads CompanyInfo securities from checkpoint and log JSONL files async fn load_securities_from_jsonl( checkpoint_path: &std::path::Path, log_path: &std::path::Path, ) -> anyhow::Result> { let mut securities: HashMap = HashMap::new(); // Load checkpoint if checkpoint_path.exists() { let content = tokio::fs::read_to_string(checkpoint_path).await?; for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() || !line.ends_with('}') { continue; // Skip incomplete lines } match serde_json::from_str::(line) { Ok(company_info) => { securities.insert(company_info.name.clone(), company_info); } Err(e) => { logger::log_warn(&format!( "Skipping invalid line {} in checkpoint: {}", line_num + 1, e )).await; } } } } // Replay log (overwrites checkpoint entries if they exist) if log_path.exists() { let content = tokio::fs::read_to_string(log_path).await?; for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() || !line.ends_with('}') { continue; // Skip incomplete lines } match serde_json::from_str::(line) { Ok(company_info) => { securities.insert(company_info.name.clone(), company_info); } Err(e) => { logger::log_warn(&format!( "Skipping invalid line {} in log: {}", line_num + 1, e )).await; } } } } Ok(securities) } /// Scrape with retry, validation, and shutdown awareness async fn scrape_with_retry( pool: &Arc, isin: &str, max_retries: u32, shutdown_flag: &Arc, ) -> Result> { let mut retries = 0; loop { // Check shutdown before each attempt if shutdown_flag.load(Ordering::SeqCst) { return Err(anyhow!("Aborted due to shutdown")); } if pool.should_perform_hard_reset() { logger::log_error("HARD_RESET_REQUIRED detected before scrape attempt").await; return Err(anyhow!("HARD_RESET_REQUIRED")); } match scrape_company_details_by_isin(pool, isin, shutdown_flag).await { Ok(result) => return Ok(result), Err(e) => { // Check if this is a hard reset required error let error_msg = e.to_string(); if error_msg.contains("HARD_RESET_REQUIRED") { logger::log_error(&format!( "Hard reset required error for ISIN {}, propagating immediately", isin )).await; return Err(e); // Propagate immediately, don't retry } if retries >= max_retries { logger::log_error(&format!( "All {} retries exhausted for ISIN {}: {}", max_retries, isin, e )).await; return Err(e); } let backoff_ms = 1000 * 2u64.pow(retries); let jitter_ms = random_range(0, 500); let total_delay = backoff_ms + jitter_ms; logger::log_warn(&format!( "Retry {}/{} for ISIN {} after {}ms: {}", retries + 1, max_retries, isin, total_delay, e )).await; sleep(Duration::from_millis(total_delay)).await; retries += 1; } } } } /// Process single company with validation and shutdown checks async fn process_single_company_validated( name: String, company_info: CompanyInfo, existing_entry: Option, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result> { // Check shutdown at start if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn(&format!("Shutdown detected, skipping company: {}", name)).await; return Ok(None); } let is_update = existing_entry.is_some(); let mut isin_tickers_map: HashMap> = existing_entry .as_ref() .map(|e| e.isin_tickers_map.clone()) .unwrap_or_default(); let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone()); // Collect unique ISIN-ticker pairs let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); for figi_infos in company_info.securities.values() { for figi_info in figi_infos { if !figi_info.isin.is_empty() { let tickers = unique_isin_ticker_pairs .entry(figi_info.isin.clone()) .or_insert_with(Vec::new); if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) { tickers.push(figi_info.ticker.clone()); } } } } // Process each ISIN independently with per-ISIN status checking for (isin, figi_tickers) in unique_isin_ticker_pairs { // Check shutdown before each ISIN if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn(&format!( "Shutdown detected while processing company: {}", name )).await; break; } let tickers = isin_tickers_map .entry(isin.clone()) .or_insert_with(Vec::new); for figi_ticker in figi_tickers { if !tickers.contains(&figi_ticker) { tickers.push(figi_ticker); } } // Check if THIS SPECIFIC ISIN has valid Yahoo data (not ERROR) let has_valid_yahoo = tickers.iter().any(|t| { t.starts_with("YAHOO:") && t != "YAHOO:ERROR" // Note: YAHOO:NO_RESULTS is valid (legitimately not found) }); if !has_valid_yahoo { logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; tickers.retain(|t| !t.starts_with("YAHOO:")); match scrape_with_retry(pool, &isin, 3, shutdown_flag).await { Ok(Some(details)) => { logger::log_info(&format!( "✓ Found Yahoo ticker {} for ISIN {} (company: {})", details.ticker, isin, name )).await; tickers.push(format!("YAHOO:{}", details.ticker)); if sector.is_none() && details.sector.is_some() { sector = details.sector.clone(); logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await; } if exchange.is_none() && details.exchange.is_some() { exchange = details.exchange.clone(); logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await; } }, Ok(None) => { logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await; tickers.push("YAHOO:NO_RESULTS".to_string()); }, Err(e) => { if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn(&format!("Shutdown during scrape for ISIN {}", isin)).await; break; } // Check if this is a hard reset required error let error_msg = e.to_string(); if error_msg.contains("HARD_RESET_REQUIRED") { logger::log_error(&format!( "Hard reset required during ISIN {} processing, propagating error", isin )).await; return Err(e); // ← CRITICAL: Propagate immediately } logger::log_warn(&format!( "✗ Yahoo lookup error for ISIN {} (company: {}): {}", isin, name, e )).await; // Mark this ISIN as failed to enable retry tickers.push("YAHOO:ERROR".to_string()); } } } } // Final shutdown check before returning result if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn(&format!( "Shutdown detected, discarding incomplete result for: {}", name )).await; return Ok(None); } if pool.should_perform_hard_reset() { logger::log_error("HARD_RESET_REQUIRED detected during company processing").await; return Err(anyhow!("HARD_RESET_REQUIRED")); } if !isin_tickers_map.is_empty() { let company_entry = CompanyCrossPlatformInfo { name: name.clone(), isin_tickers_map, sector, exchange, }; Ok(Some(CompanyProcessResult { company: company_entry, is_update, })) } else { logger::log_warn(&format!("No ISINs found for company: {}", name)).await; Ok(None) } }