// src/corporate/update.rs - UPDATED WITH DATA INTEGRITY FIXES use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*}; use crate::config::Config; use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; use chrono::Local; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; /// UPDATED: Main corporate update entry point with shutdown awareness pub async fn run_full_update( _config: &Config, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result<()> { logger::log_info("=== Corporate Update (STREAMING MODE WITH DATA INTEGRITY) ===").await; let paths = DataPaths::new(".")?; logger::log_info("Step 1: Downloading GLEIF CSV...").await; let gleif_csv_path = match download_isin_lei_csv().await? { Some(p) => { logger::log_info(&format!(" ✓ GLEIF CSV at: {}", p)).await; p } None => { logger::log_warn(" ✗ Could not obtain GLEIF CSV").await; return Ok(()); } }; if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after GLEIF download").await; return Ok(()); } logger::log_info("Step 2: Loading OpenFIGI metadata...").await; load_figi_type_lists().await.ok(); logger::log_info(" ✓ OpenFIGI metadata loaded").await; if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after OpenFIGI load").await; return Ok(()); } logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?; if !all_mapped { logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await; } else { logger::log_info(" ✓ All LEIs successfully mapped").await; } if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after LEI-FIGI mapping").await; return Ok(()); } logger::log_info("Step 4: Building securities map (streaming)...").await; let date_dir = find_most_recent_figi_date_dir(&paths).await?; if let Some(date_dir) = date_dir { logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; build_securities_from_figi_streaming(&date_dir).await?; logger::log_info(" ✓ Securities map updated").await; } else { logger::log_warn(" ✗ No FIGI data directory found").await; } if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn("Shutdown detected after securities map build").await; return Ok(()); } logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag).await?; logger::log_info(&format!(" ✓ Saved {} companies", count)).await; if !shutdown_flag.load(Ordering::SeqCst) { logger::log_info("Step 6: Processing events (using index)...").await; let _event_index = build_event_index(&paths).await?; logger::log_info(" ✓ Event index built").await; } else { logger::log_warn("Shutdown detected, skipping event index build").await; } logger::log_info("✓ Corporate update complete").await; Ok(()) } /// UPDATED: Serial version with validation (kept for compatibility/debugging) /// /// This is the non-parallel version that processes companies sequentially. /// Updated with same validation and shutdown checks as parallel version. /// /// Use this for: /// - Debugging issues with specific companies /// - Environments where parallel processing isn't desired /// - Testing validation logic without concurrency complexity async fn build_companies_jsonl_streaming_serial( paths: &DataPaths, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result { // Configuration constants const CHECKPOINT_INTERVAL: usize = 50; const FSYNC_BATCH_SIZE: usize = 10; const FSYNC_INTERVAL_SECS: u64 = 10; let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); let securities_path = corporate_path.join("common_stocks.json"); if !securities_path.exists() { logger::log_warn("No common_stocks.json found").await; return Ok(0); } let content = tokio::fs::read_to_string(securities_path).await?; let securities: HashMap = serde_json::from_str(&content)?; let companies_path = paths.data_dir().join("companies.jsonl"); let log_path = paths.data_dir().join("companies_updates.log"); if let Some(parent) = companies_path.parent() { tokio::fs::create_dir_all(parent).await?; } // === RECOVERY PHASE: Load checkpoint + replay log === let mut existing_companies: HashMap = HashMap::new(); let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); if companies_path.exists() { logger::log_info("Loading checkpoint from companies.jsonl...").await; let existing_content = tokio::fs::read_to_string(&companies_path).await?; for line in existing_content.lines() { if line.trim().is_empty() { continue; } match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); } Err(e) => { logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; } } } logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; } if log_path.exists() { logger::log_info("Replaying update log...").await; let log_content = tokio::fs::read_to_string(&log_path).await?; let mut replayed = 0; for line in log_content.lines() { if line.trim().is_empty() { continue; } match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); replayed += 1; } Err(e) => { logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; } } } if replayed > 0 { logger::log_info(&format!("Replayed {} updates from log", replayed)).await; } } // === OPEN LOG FILE === use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; let mut log_file = OpenOptions::new() .create(true) .append(true) .open(&log_path) .await?; let mut writes_since_fsync = 0; let mut last_fsync = std::time::Instant::now(); let mut updates_since_checkpoint = 0; let mut count = 0; let mut new_count = 0; let mut updated_count = 0; logger::log_info(&format!("Processing {} companies sequentially...", securities.len())).await; // === PROCESS COMPANIES SEQUENTIALLY === for (name, company_info) in securities.clone() { // Check shutdown before each company if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn(&format!( "Shutdown detected at company: {} (progress: {}/{})", name, count, count + securities.len() )).await; break; } let existing_entry = existing_companies.get(&name).cloned(); let is_update = existing_entry.is_some(); // Process company with validation match process_single_company_serial( name.clone(), company_info, existing_entry, pool, shutdown_flag, ).await { Ok(Some(company_entry)) => { // Write to log let line = serde_json::to_string(&company_entry)?; log_file.write_all(line.as_bytes()).await?; log_file.write_all(b"\n").await?; writes_since_fsync += 1; // Batched + time-based fsync let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; if should_fsync { log_file.flush().await?; log_file.sync_data().await?; writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } // Update in-memory state processed_names.insert(name.clone()); existing_companies.insert(name.clone(), company_entry); count += 1; updates_since_checkpoint += 1; if is_update { updated_count += 1; } else { new_count += 1; } // Periodic checkpoint if updates_since_checkpoint >= CHECKPOINT_INTERVAL { if writes_since_fsync > 0 { log_file.flush().await?; log_file.sync_data().await?; writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await; let checkpoint_tmp = companies_path.with_extension("jsonl.tmp"); let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?; for company in existing_companies.values() { let line = serde_json::to_string(company)?; checkpoint_file.write_all(line.as_bytes()).await?; checkpoint_file.write_all(b"\n").await?; } checkpoint_file.flush().await?; checkpoint_file.sync_all().await?; drop(checkpoint_file); tokio::fs::rename(&checkpoint_tmp, &companies_path).await?; drop(log_file); tokio::fs::remove_file(&log_path).await.ok(); log_file = OpenOptions::new() .create(true) .append(true) .open(&log_path) .await?; updates_since_checkpoint = 0; logger::log_info("✓ Checkpoint created and log cleared").await; } if count % 10 == 0 { logger::log_info(&format!( "Progress: {} companies ({} new, {} updated)", count, new_count, updated_count )).await; } } Ok(None) => { // Company had no ISINs or was skipped logger::log_info(&format!("Skipped company: {} (no ISINs)", name)).await; } Err(e) => { logger::log_warn(&format!("Error processing company {}: {}", name, e)).await; } } // Time-based fsync if writes_since_fsync > 0 && last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS { log_file.flush().await?; log_file.sync_data().await?; writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } } // === FSYNC PENDING WRITES === if writes_since_fsync > 0 { logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await; log_file.flush().await?; log_file.sync_data().await?; logger::log_info("✓ Pending writes saved").await; } // === FINAL CHECKPOINT === if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 { logger::log_info("Creating final checkpoint...").await; let checkpoint_tmp = companies_path.with_extension("jsonl.tmp"); let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?; for company in existing_companies.values() { let line = serde_json::to_string(company)?; checkpoint_file.write_all(line.as_bytes()).await?; checkpoint_file.write_all(b"\n").await?; } checkpoint_file.flush().await?; checkpoint_file.sync_all().await?; drop(checkpoint_file); tokio::fs::rename(&checkpoint_tmp, &companies_path).await?; drop(log_file); tokio::fs::remove_file(&log_path).await.ok(); logger::log_info("✓ Final checkpoint created").await; } logger::log_info(&format!( "Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count )).await; Ok(count) } /// UPDATED: Process single company serially with validation async fn process_single_company_serial( name: String, company_info: CompanyInfo, existing_entry: Option, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result> { // Check shutdown at start if shutdown_flag.load(Ordering::SeqCst) { return Ok(None); } let mut isin_tickers_map: HashMap> = existing_entry .as_ref() .map(|e| e.isin_tickers_map.clone()) .unwrap_or_default(); let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone()); // Collect unique ISIN-ticker pairs let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); for figi_infos in company_info.securities.values() { for figi_info in figi_infos { if !figi_info.isin.is_empty() { let tickers = unique_isin_ticker_pairs .entry(figi_info.isin.clone()) .or_insert_with(Vec::new); if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) { tickers.push(figi_info.ticker.clone()); } } } } // Process each ISIN with validation for (isin, figi_tickers) in unique_isin_ticker_pairs { // Check shutdown before each ISIN if shutdown_flag.load(Ordering::SeqCst) { return Ok(None); } let tickers = isin_tickers_map .entry(isin.clone()) .or_insert_with(Vec::new); for figi_ticker in figi_tickers { if !tickers.contains(&figi_ticker) { tickers.push(figi_ticker); } } let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); if !has_yahoo_ticker { logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; // Use validated scraping with retry match scrape_with_retry_serial(pool, &isin, 3, shutdown_flag).await { Ok(Some(details)) => { logger::log_info(&format!( "✓ Found Yahoo ticker {} for ISIN {} (company: {})", details.ticker, isin, name )).await; tickers.push(format!("YAHOO:{}", details.ticker)); if sector.is_none() && details.sector.is_some() { sector = details.sector.clone(); } if exchange.is_none() && details.exchange.is_some() { exchange = details.exchange.clone(); } }, Ok(None) => { logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await; tickers.push("YAHOO:NO_RESULTS".to_string()); }, Err(e) => { if shutdown_flag.load(Ordering::SeqCst) { return Ok(None); } logger::log_warn(&format!( "✗ Yahoo lookup error for ISIN {} (company: {}): {}", isin, name, e )).await; } } } } // Final shutdown check if shutdown_flag.load(Ordering::SeqCst) { return Ok(None); } if !isin_tickers_map.is_empty() { Ok(Some(CompanyCrossPlatformInfo { name, isin_tickers_map, sector, exchange, })) } else { Ok(None) } } /// UPDATED: Scrape with retry for serial processing async fn scrape_with_retry_serial( pool: &Arc, isin: &str, max_retries: u32, shutdown_flag: &Arc, ) -> anyhow::Result> { let mut retries = 0; loop { if shutdown_flag.load(Ordering::SeqCst) { return Err(anyhow::anyhow!("Aborted due to shutdown")); } match scrape_company_details_by_isin(pool, isin, shutdown_flag).await { Ok(result) => return Ok(result), Err(e) => { if retries >= max_retries { return Err(e); } let backoff_ms = 1000 * 2u64.pow(retries); let jitter_ms = random_range(0, 500); let total_delay = backoff_ms + jitter_ms; logger::log_warn(&format!( "Retry {}/{} for ISIN {} after {}ms: {}", retries + 1, max_retries, isin, total_delay, e )).await; tokio::time::sleep(tokio::time::Duration::from_millis(total_delay)).await; retries += 1; } } } } async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); if !map_cache_dir.exists() { return Ok(None); } let mut entries = tokio::fs::read_dir(&map_cache_dir).await?; let mut dates = Vec::new(); while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.is_dir() { if let Some(name) = path.file_name().and_then(|n| n.to_str()) { if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { dates.push((name.to_string(), path)); } } } } if dates.is_empty() { return Ok(None); } dates.sort_by(|a, b| b.0.cmp(&a.0)); Ok(Some(dates[0].1.clone())) } pub struct ProcessResult { pub changes: Vec, } pub fn process_batch( new_events: &[CompanyEvent], existing: &mut HashMap, today: &str, ) -> ProcessResult { let mut changes = Vec::new(); for new in new_events { let key = event_key(new); if let Some(old) = existing.get(&key) { changes.extend(detect_changes(old, new, today)); existing.insert(key, new.clone()); continue; } let date_key = format!("{}|{}", new.ticker, new.date); let mut found_old = None; for (k, e) in existing.iter() { if format!("{}|{}", e.ticker, e.date) == date_key && k != &key { found_old = Some((k.clone(), e.clone())); break; } } if let Some((old_key, old_event)) = found_old { if new.date.as_str() > today { changes.push(CompanyEventChange { ticker: new.ticker.clone(), date: new.date.clone(), field_changed: "time".to_string(), old_value: old_event.time.clone(), new_value: new.time.clone(), detected_at: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), }); } existing.remove(&old_key); } existing.insert(key, new.clone()); } ProcessResult { changes } }