diff --git a/src/corporate/checkpoint_helpers.rs b/src/corporate/checkpoint_helpers.rs index 4e8004d..10374a9 100644 --- a/src/corporate/checkpoint_helpers.rs +++ b/src/corporate/checkpoint_helpers.rs @@ -4,7 +4,7 @@ //! This module extracts common patterns used across multiple update modules //! to reduce code duplication and improve maintainability. -use super::types::CompanyCrossPlatformData; +use super::types::CompanyData; use crate::util::logger; use std::collections::HashMap; use std::path::{Path}; @@ -22,7 +22,7 @@ pub async fn load_checkpoint_with_log( checkpoint_path: P1, log_path: P2, checkpoint_desc: &str, -) -> Result> +) -> Result> where P1: AsRef, P2: AsRef, @@ -30,7 +30,7 @@ where let checkpoint_path = checkpoint_path.as_ref(); let log_path = log_path.as_ref(); - let mut companies: HashMap = HashMap::new(); + let mut companies: HashMap = HashMap::new(); // Load checkpoint if it exists if checkpoint_path.exists() { @@ -42,7 +42,7 @@ where continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company) => { companies.insert(company.name.clone(), company); } @@ -65,7 +65,7 @@ where continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company) => { companies.insert(company.name.clone(), company); replayed += 1; @@ -91,7 +91,7 @@ where pub async fn consolidate_checkpoint( checkpoint_path: P1, log_path: P2, - companies: &HashMap, + companies: &HashMap, ) -> Result<()> where P1: AsRef, diff --git a/src/corporate/helpers.rs b/src/corporate/helpers.rs index 24c8b3d..b251176 100644 --- a/src/corporate/helpers.rs +++ b/src/corporate/helpers.rs @@ -79,14 +79,16 @@ pub fn choose_random(items: &[T]) -> T { } /// Extract first valid Yahoo ticker from company -pub fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformData) -> Option { - for tickers in company.isin_tickers_map.values() { - for ticker in tickers { - if ticker.starts_with("YAHOO:") - && ticker != "YAHOO:NO_RESULTS" - && ticker != "YAHOO:ERROR" - { - return Some(ticker.trim_start_matches("YAHOO:").to_string()); +pub fn extract_first_yahoo_ticker(company: &CompanyData) -> Option { + if let Some(isin_tickers_map) = &company.isin_tickers_map { + for tickers in isin_tickers_map.values() { + for ticker in tickers { + if ticker.starts_with("YAHOO:") + && ticker != "YAHOO:NO_RESULTS" + && ticker != "YAHOO:ERROR" + { + return Some(ticker.trim_start_matches("YAHOO:").to_string()); + } } } } @@ -109,7 +111,7 @@ pub fn sanitize_company_name(name: &str) -> String { /// Load companies from JSONL file pub async fn load_companies_from_jsonl( path: &std::path::Path -) -> anyhow::Result> { +) -> anyhow::Result> { let content = tokio::fs::read_to_string(path).await?; let mut companies = Vec::new(); @@ -117,7 +119,7 @@ pub async fn load_companies_from_jsonl( if line.trim().is_empty() { continue; } - if let Ok(company) = serde_json::from_str::(line) { + if let Ok(company) = serde_json::from_str::(line) { companies.push(company); } } diff --git a/src/corporate/types.rs b/src/corporate/types.rs index e3df470..6d4e4f4 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -52,12 +52,14 @@ pub struct FigiData { /// * Name as primary key (for one institution) -> might have to changed when first FigiInfo is coming in /// * ISIN as the most liquid / preferred traded security (used for fallback) /// * securities: Grouped by ISIN, filtered for Common Stock only +/// * isin_tickers_map: Map of ISINs to their associated tickers across platforms #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CompanyData{ pub name: String, pub primary_isin: String, pub securities: HashMap>, // ISIN -> Vec pub yahoo_company_data: Option>, + pub isin_tickers_map: Option>>, // ISIN -> Tickers } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -67,14 +69,6 @@ pub struct YahooCompanyData { pub exchange: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompanyCrossPlatformData { - pub name: String, - pub isin_tickers_map: HashMap>, // ISIN -> Tickers - pub sector: Option, - pub exchange: Option, -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WarrantData { pub company_name: String, // key in CompanyData diff --git a/src/corporate/update_companies.rs b/src/corporate/update_companies.rs index cef62a3..b5f68c3 100644 --- a/src/corporate/update_companies.rs +++ b/src/corporate/update_companies.rs @@ -20,14 +20,14 @@ use anyhow::{anyhow, Result}; /// Represents a write command to be serialized through the log writer enum LogCommand { - Write(CompanyCrossPlatformData), + Write(CompanyData), Checkpoint, Shutdown, } /// Result from processing a single company struct CompanyProcessResult { - company: CompanyCrossPlatformData, + company: CompanyData, is_update: bool, } @@ -36,7 +36,7 @@ struct CompanyProcessResult { fn company_needs_processing( company_name: &str, company_info: &CompanyData, - existing_companies: &HashMap, + existing_companies: &HashMap, ) -> bool { // If company not in existing data at all, definitely needs processing let Some(existing_entry) = existing_companies.get(company_name) else { @@ -56,20 +56,25 @@ fn company_needs_processing( // Check each required ISIN for isin in required_isins { // Check if this ISIN exists in the company's ticker map - if let Some(tickers) = existing_entry.isin_tickers_map.get(&isin) { - // Check if this ISIN has valid Yahoo data - let has_valid_yahoo = tickers.iter().any(|t| { - t.starts_with("YAHOO:") && - t != "YAHOO:ERROR" //&& // Error marker means needs retry - //t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found) - }); - - // If no valid Yahoo data for this ISIN, company needs processing - if !has_valid_yahoo { + if let Some(map) = &existing_entry.isin_tickers_map { + if let Some(tickers) = map.get(&isin) { + // Check if this ISIN has valid Yahoo data + let has_valid_yahoo = tickers.iter().any(|t| { + t.starts_with("YAHOO:") && + t != "YAHOO:ERROR" //&& // Error marker means needs retry + //t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found) + }); + + // If no valid Yahoo data for this ISIN, company needs processing + if !has_valid_yahoo { + return true; + } + } else { + // ISIN not in map at all, needs processing return true; } } else { - // ISIN not in map at all, needs processing + // No isin_tickers_map at all, needs processing return true; } } @@ -731,7 +736,7 @@ async fn scrape_with_retry( async fn process_single_company_validated( name: String, company_info: CompanyData, - existing_entry: Option, + existing_entry: Option, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result> { @@ -746,12 +751,9 @@ async fn process_single_company_validated( let mut isin_tickers_map: HashMap> = existing_entry .as_ref() - .map(|e| e.isin_tickers_map.clone()) + .and_then(|e| e.isin_tickers_map.clone()) .unwrap_or_default(); - let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); - let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone()); - // Collect unique ISIN-ticker pairs let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); @@ -808,16 +810,6 @@ async fn process_single_company_validated( )).await; tickers.push(format!("YAHOO:{}", details.ticker)); - - if sector.is_none() && details.sector.is_some() { - sector = details.sector.clone(); - logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await; - } - - if exchange.is_none() && details.exchange.is_some() { - exchange = details.exchange.clone(); - logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await; - } }, Ok(None) => { logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await; @@ -866,11 +858,12 @@ async fn process_single_company_validated( } if !isin_tickers_map.is_empty() { - let company_entry = CompanyCrossPlatformData { + let company_entry = CompanyData { name: name.clone(), - isin_tickers_map, - sector, - exchange, + primary_isin: company_info.primary_isin.clone(), + securities: company_info.securities.clone(), + yahoo_company_data: company_info.yahoo_company_data.clone(), + isin_tickers_map: Some(isin_tickers_map), }; Ok(Some(CompanyProcessResult { diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index 29a58be..9c952ee 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -20,15 +20,15 @@ use tokio::sync::mpsc; /// Result of processing a single company #[derive(Debug, Clone)] pub enum CompanyProcessResult { - Valid(CompanyCrossPlatformData), + Valid(CompanyData), FilteredLowCap { name: String, market_cap: f64 }, FilteredNoPrice { name: String }, - Failed { company: CompanyCrossPlatformData, error: String, is_transient: bool }, + Failed { company: CompanyData, error: String, is_transient: bool }, } /// Represents a write command to be serialized through the log writer enum LogCommand { - Write(CompanyCrossPlatformData), + Write(CompanyData), Checkpoint, Shutdown, } @@ -81,7 +81,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result c, Err(e) => { logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await; @@ -90,13 +90,17 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result = HashMap::new(); + let mut existing_companies: HashMap = HashMap::new(); let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); if checkpoint_path.exists() { @@ -206,7 +210,7 @@ pub async fn companies_yahoo_cleansed_low_profile( continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); @@ -229,7 +233,7 @@ pub async fn companies_yahoo_cleansed_low_profile( continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); @@ -251,7 +255,7 @@ pub async fn companies_yahoo_cleansed_low_profile( logger::log_info(&format!("Loaded {} companies from input", input_companies.len())).await; // === BUILD PENDING LIST (smart skip logic) === - let mut pending: Vec = input_companies + let mut pending: Vec = input_companies .into_iter() .filter(|company| company_needs_processing(company, &existing_companies)) .collect(); @@ -608,7 +612,7 @@ pub async fn companies_yahoo_cleansed_low_profile( /// Helper function to spawn a validation task (reduces code duplication) fn spawn_validation_task( - company: CompanyCrossPlatformData, + company: CompanyData, yahoo_pool: &Arc, paths: &Arc, write_tx: &mpsc::Sender, @@ -688,7 +692,7 @@ fn spawn_validation_task( /// Process a single company with full error categorization async fn process_company_with_validation( - company: &CompanyCrossPlatformData, + company: &CompanyData, yahoo_pool: &Arc, paths: &DataPaths, ) -> CompanyProcessResult { @@ -897,8 +901,8 @@ async fn save_company_core_data( /// Check if a company needs processing (validation check) fn company_needs_processing( - company: &CompanyCrossPlatformData, - existing_companies: &HashMap, + company: &CompanyData, + existing_companies: &HashMap, ) -> bool { // If company exists in cleaned output, skip it !existing_companies.contains_key(&company.name) diff --git a/src/corporate/update_companies_enrich.rs b/src/corporate/update_companies_enrich.rs index f6d3773..6673ac8 100644 --- a/src/corporate/update_companies_enrich.rs +++ b/src/corporate/update_companies_enrich.rs @@ -29,7 +29,7 @@ enum LogCommand { /// Type alias for enrichment function type EnrichmentFn = Arc< - dyn Fn(CompanyCrossPlatformData, Arc, DataPaths) + dyn Fn(CompanyData, Arc, DataPaths) -> Pin> + Send>> + Send + Sync @@ -104,7 +104,7 @@ pub async fn enrich_companies_with_events( logger::log_info(&format!("Found {} companies to process", total_companies)).await; // Filter companies that need enrichment - let pending_companies: Vec = companies + let pending_companies: Vec = companies .into_iter() .filter(|company| !enriched_companies.contains(&company.name)) .collect(); @@ -283,7 +283,7 @@ async fn track_events_completion( /// Enrich a single company with event data async fn enrich_company_with_events( - company: &CompanyCrossPlatformData, + company: &CompanyData, yahoo_pool: &Arc, paths: &DataPaths, ) -> anyhow::Result<()> { @@ -438,7 +438,7 @@ pub async fn enrich_companies_with_option( logger::log_info(&format!("Found {} companies to process", total_companies)).await; // Filter companies that need enrichment - let pending_companies: Vec = companies + let pending_companies: Vec = companies .into_iter() .filter(|company| !enriched_companies.contains(&company.name)) .collect(); @@ -605,7 +605,7 @@ async fn track_option_completion( /// Enrich a single company with option data async fn enrich_company_with_option( - company: &CompanyCrossPlatformData, + company: &CompanyData, yahoo_pool: &Arc, paths: &DataPaths, ) -> anyhow::Result<()> { @@ -697,7 +697,7 @@ pub async fn enrich_companies_with_chart( logger::log_info(&format!("Found {} companies to process", total_companies)).await; // Filter companies that need enrichment - let pending_companies: Vec = companies + let pending_companies: Vec = companies .into_iter() .filter(|company| !enriched_companies.contains(&company.name)) .collect(); @@ -864,7 +864,7 @@ async fn track_chart_completion( /// Enrich a single company with chart data async fn enrich_company_with_chart( - company: &CompanyCrossPlatformData, + company: &CompanyData, yahoo_pool: &Arc, paths: &DataPaths, ) -> anyhow::Result<()> { @@ -1005,7 +1005,7 @@ fn spawn_log_writer( /// - `shutdown_flag`: Flag to signal shutdown /// - `enrichment_fn`: The specific enrichment function to call (events, option, chart, etc.) fn spawn_enrichment_task( - company: CompanyCrossPlatformData, + company: CompanyData, yahoo_pool: Arc, paths: DataPaths, processed_count: Arc, diff --git a/src/corporate/update_openfigi.rs b/src/corporate/update_openfigi.rs index c1535da..eeb90b6 100644 --- a/src/corporate/update_openfigi.rs +++ b/src/corporate/update_openfigi.rs @@ -655,6 +655,7 @@ fn prepare_common_stock_entry( primary_isin, securities: grouped_by_isin, yahoo_company_data: None, + isin_tickers_map: None, }) } diff --git a/src/corporate/yahoo_company_extraction.rs b/src/corporate/yahoo_company_extraction.rs index 0e56e79..7ea3ab1 100644 --- a/src/corporate/yahoo_company_extraction.rs +++ b/src/corporate/yahoo_company_extraction.rs @@ -303,9 +303,11 @@ pub async fn get_all_tickers_from_companies_jsonl(paths: &DataPaths) -> anyhow:: let content = tokio::fs::read_to_string(companies_file).await?; let mut tickers = Vec::new(); for line in content.lines() { - let company: CompanyCrossPlatformData = serde_json::from_str(line)?; - for (_isin, ticker_vec) in company.isin_tickers_map { - tickers.extend(ticker_vec); + let company: CompanyData = serde_json::from_str(line)?; + if let Some(isin_tickers_map) = company.isin_tickers_map { + for (_isin, ticker_vec) in isin_tickers_map { + tickers.extend(ticker_vec); + } } } Ok(tickers)