// src/corporate/update.rs use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfigi::*}; use crate::config::Config; use crate::util::directories::DataPaths; use crate::util::logger; use crate::webdriver::webdriver::ChromeDriverPool; use chrono::Local; use std::collections::{HashMap}; use std::sync::Arc; /// Main function: Full update for all companies (LEI-based) with optimized parallel execution. /// /// This function coordinates the entire update process: /// - Loads GLEIF mappings /// - Builds FIGI-LEI map /// - Loads existing events /// - Processes each company: discovers exchanges via FIGI, fetches prices & earnings, aggregates data /// - Uses the provided shared ChromeDriver pool for efficient parallel scraping /// - Saves optimized events /// /// # Arguments /// * `config` - The application configuration. /// * `pool` - Shared pool of ChromeDriver instances for scraping. /// /// # Errors /// Returns an error if any step in the update process fails. pub async fn run_full_update(config: &Config, pool: &Arc) -> anyhow::Result<()> { let msg = "=== Starting LEI-based corporate full update ==="; println!("{}", msg); logger::log_info(msg).await; // Initialize paths let paths = DataPaths::new(".")?; // 1. Load fresh GLEIF ISIN ↔ LEI mapping logger::log_info("Corporate Update: Loading GLEIF ISIN ↔ LEI mapping...").await; let lei_to_isins: HashMap> = match load_isin_lei_csv().await { Ok(map) => { let msg = format!("Corporate Update: Loaded GLEIF mapping with {} LEI entries", map.len()); println!("{}", msg); logger::log_info(&msg).await; map } Err(e) => { let msg = format!("Corporate Update: Warning - Could not load GLEIF ISIN↔LEI mapping: {}", e); eprintln!("{}", msg); logger::log_warn(&msg).await; HashMap::new() } }; // 2. Load OpenFIGI mapping value lists (cached) logger::log_info("Corporate Update: Loading OpenFIGI type lists...").await; if let Err(e) = load_figi_type_lists().await { let msg = format!("Corporate Update: Warning - Could not load OpenFIGI type lists: {}", e); eprintln!("{}", msg); logger::log_warn(&msg).await; } logger::log_info("Corporate Update: OpenFIGI type lists loaded").await; // 3. Build FIGI → LEI map logger::log_info("Corporate Update: Building FIGI → LEI map...").await; let figi_to_lei:HashMap> = match build_lei_to_figi_infos(&lei_to_isins, None).await { Ok(map) => { let msg = format!("Corporate Update: Built FIGI map with {} entries", map.len()); println!("{}", msg); logger::log_info(&msg).await; map } Err(e) => { let msg = format!("Corporate Update: Warning - Could not build FIGI→LEI map: {}", e); eprintln!("{}", msg); logger::log_warn(&msg).await; HashMap::new() } }; // 4. Load or build companies logger::log_info("Corporate Update: Loading/building company securities...").await; let securities = load_or_build_all_securities(&figi_to_lei).await?; let msg = format!("Corporate Update: Processing {} companies", securities.0.len()); println!("{}", msg); logger::log_info(&msg).await; // HashMap> - unique pairs only let companies: HashMap> = securities.0 .iter() .fold(HashMap::new(), |mut acc, security| { let mut isin_ticker_pairs: HashMap = HashMap::new(); // Collect all unique ISIN-Ticker pairs for figi_infos in security.1.securities.values() { for figi_info in figi_infos { if !figi_info.isin.is_empty() && !figi_info.ticker.is_empty() { isin_ticker_pairs.insert(figi_info.isin.clone(), figi_info.ticker.clone()); } } } // Only add if there are pairs if !isin_ticker_pairs.is_empty() { acc.insert(security.1.name.clone(), isin_ticker_pairs); } acc }); logger::log_info(&format!("Corporate Update: Saving {} companies to JSONL", companies.len())).await; save_companies_to_jsonl(&paths, &companies).await.expect("Failed to save companies List."); logger::log_info("Corporate Update: Companies saved successfully").await; // 5. Load existing earnings events (for change detection) logger::log_info("Corporate Update: Loading existing events...").await; let existing_events = match load_existing_events(&paths).await { Ok(events) => { let msg = format!("Corporate Update: Loaded {} existing events", events.len()); println!("{}", msg); logger::log_info(&msg).await; events } Err(e) => { let msg = format!("Corporate Update: Warning - Could not load existing events: {}", e); eprintln!("{}", msg); logger::log_warn(&msg).await; HashMap::new() } }; // 5. Use the provided pool (no need to create a new one) let pool_size = pool.get_number_of_instances(); // Use the size from the shared pool logger::log_info(&format!("Corporate Update: Using pool size: {}", pool_size)).await; // Process companies in parallel using the shared pool /*let results: Vec<_> = stream::iter(companies.into_iter()) .map(|company| { let pool_clone = pool.clone(); async move { process_company_data(&company, &pool_clone, &mut existing_events).await } }) .buffer_unordered(pool_size) .collect().await; // Handle results (e.g., collect changes) let mut all_changes = Vec::new(); for result in results { if let Ok(ProcessResult { changes }) = result { all_changes.extend(changes); } }*/ logger::log_info(&format!("Corporate Update: Saving {} events to optimized storage", existing_events.len())).await; save_optimized_events(&paths, existing_events).await?; logger::log_info("Corporate Update: Events saved successfully").await; //save_changes(&all_changes).await?; let msg = "✓ Corporate update complete"; println!("{}", msg); logger::log_info(msg).await; Ok(()) } pub struct ProcessResult { pub changes: Vec, } pub fn process_batch( new_events: &[CompanyEvent], existing: &mut HashMap, today: &str, ) -> ProcessResult { let mut changes = Vec::new(); for new in new_events { let key = event_key(new); if let Some(old) = existing.get(&key) { changes.extend(detect_changes(old, new, today)); existing.insert(key, new.clone()); continue; } // Check for time change on same date let date_key = format!("{}|{}", new.ticker, new.date); let mut found_old = None; for (k, e) in existing.iter() { if format!("{}|{}", e.ticker, e.date) == date_key && k != &key { found_old = Some((k.clone(), e.clone())); break; } } if let Some((old_key, old_event)) = found_old { if new.date.as_str() > today { changes.push(CompanyEventChange { ticker: new.ticker.clone(), date: new.date.clone(), field_changed: "time".to_string(), old_value: old_event.time.clone(), new_value: new.time.clone(), detected_at: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), }); } existing.remove(&old_key); } existing.insert(key, new.clone()); } ProcessResult { changes } }