diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index 5119e22..ac7eac9 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -1,4 +1,5 @@ use crate::util::directories::DataPaths; +use crate::util::logger; // src/corporate/openfigi.rs use super::{types::*}; @@ -27,7 +28,7 @@ impl OpenFigiClient { /// # Errors /// /// Returns an error if the HTTP client cannot be built or if the API key header is invalid. - pub fn new() -> anyhow::Result { + pub async fn new() -> anyhow::Result { let api_key = dotenvy::var("OPENFIGI_API_KEY").ok(); let has_key = api_key.is_some(); @@ -43,10 +44,11 @@ impl OpenFigiClient { let client = builder.build().context("Failed to build HTTP client")?; - println!( + let msg = format!( "OpenFIGI client initialized: {}", if has_key { "with API key" } else { "no key (limited mode)" } ); + logger::log_info(&msg).await; Ok(Self { client, has_key }) } @@ -126,10 +128,16 @@ impl OpenFigiClient { Err(e) => { retry_count += 1; if retry_count >= max_retries { - return Err(anyhow!("Failed to send mapping request after {} retries: {}", max_retries, e)); + let err_msg = format!("Failed to send mapping request after {} retries: {}", max_retries, e); + logger::log_error(&err_msg).await; + return Err(anyhow!(err_msg)); } - eprintln!("Transient error sending mapping request (attempt {}/{}): {}", retry_count, max_retries, e); - println!(" Retrying in {}ms...", backoff_ms); + let warn_msg = format!("Transient error sending mapping request (attempt {}/{}): {}", retry_count, max_retries, e); + eprintln!("{}", warn_msg); + logger::log_warn(&warn_msg).await; + let retry_msg = format!(" Retrying in {}ms...", backoff_ms); + println!("{}", retry_msg); + logger::log_info(&retry_msg).await; sleep(Duration::from_millis(backoff_ms)).await; backoff_ms = (backoff_ms * 2).min(60000); // Cap at 60s continue; @@ -147,7 +155,9 @@ impl OpenFigiClient { .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()) .unwrap_or(10); - println!("Rate limited—backing off {}s", reset_sec); + let rate_msg = format!("Rate limited—backing off {}s", reset_sec); + println!("{}", rate_msg); + logger::log_warn(&rate_msg).await; sleep(Duration::from_secs(reset_sec.max(10))).await; continue; // Retry the same chunk } else if status == 401 { @@ -158,9 +168,13 @@ impl OpenFigiClient { // Transient server error, retry with backoff retry_count += 1; if retry_count >= max_retries { - return Err(anyhow!("OpenFIGI server error {} after {} retries: {}", status, max_retries, body)); + let err_msg = format!("OpenFIGI server error {} after {} retries: {}", status, max_retries, body); + logger::log_error(&err_msg).await; + return Err(anyhow!(err_msg)); } - eprintln!("Server error {} (attempt {}/{}), retrying in {}ms...", status, retry_count, max_retries, backoff_ms); + let warn_msg = format!("Server error {} (attempt {}/{}), retrying in {}ms...", status, retry_count, max_retries, backoff_ms); + eprintln!("{}", warn_msg); + logger::log_warn(&warn_msg).await; sleep(Duration::from_millis(backoff_ms)).await; backoff_ms = (backoff_ms * 2).min(60000); continue; @@ -260,7 +274,9 @@ async fn load_market_sectors() -> anyhow::Result> { if !cache_file.exists() { // Return default if file doesn't exist - eprintln!("Warning: {} not found, using default sectors", cache_file.display()); + let warn_msg = format!("Warning: {} not found, using default sectors", cache_file.display()); + eprintln!("{}", warn_msg); + logger::log_warn(&warn_msg).await; return Ok(vec![ "Comdty".to_string(), "Corp".to_string(), @@ -292,7 +308,8 @@ async fn load_market_sectors() -> anyhow::Result> { return Err(anyhow!("No sectors found in marketSecDes.json")); } - println!("Loaded {} market sectors from cache", sectors.len()); + let msg = format!("Loaded {} market sectors from cache", sectors.len()); + logger::log_info(&msg).await; Ok(sectors) } @@ -328,7 +345,9 @@ async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result> match build_lei_to_figi_infos_internal(lei_to_isins, gleif_date).await { Ok(map) => { if !map.is_empty() { - println!("✓ LEI→FIGI mapping completed successfully with {} entries", map.len()); + let msg = format!("✓ LEI→FIGI mapping completed successfully with {} entries", map.len()); + + logger::log_info(&msg).await; } return Ok(map); } @@ -372,19 +393,27 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap> || error_msg.contains("Failed to create"); if is_fatal { - eprintln!("Fatal error in LEI→FIGI mapping: {}", e); + let err = format!("Fatal error in LEI→FIGI mapping: {}", e); + eprintln!("{}", err); + logger::log_error(&err).await; return Err(e); } retry_count += 1; if retry_count >= max_retries { - eprintln!("LEI→FIGI mapping failed after {} retries: {}", max_retries, e); + let err = format!("LEI→FIGI mapping failed after {} retries: {}", max_retries, e); + eprintln!("{}", err); + logger::log_error(&err).await; return Err(e); } let wait_secs = 60 * retry_count; - eprintln!("Transient error in LEI→FIGI mapping (attempt {}/{}): {}", retry_count, max_retries, e); - println!("Retrying mapping in {}s...", wait_secs); + let warn_msg = format!("Transient error in LEI→FIGI mapping (attempt {}/{}): {}", retry_count, max_retries, e); + eprintln!("{}", warn_msg); + logger::log_warn(&warn_msg).await; + let retry_msg = format!("Retrying mapping in {}s...", wait_secs); + println!("{}", retry_msg); + logger::log_info(&retry_msg).await; sleep(Duration::from_secs(wait_secs as u64)).await; } } @@ -396,6 +425,11 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap> /// This is the actual worker function that performs the mapping. It handles already-processed /// LEIs gracefully but will fail on transient errors, which are caught and retried by the /// wrapper function build_lei_to_figi_infos. +/// +/// Tracks three outcomes: +/// 1. Hit with marketSector: saved to sector-specific folder +/// 2. Hit without marketSector: saved to "uncategorized" folder +/// 3. No_hit (empty results): LEI marked for removal from GLEIF CSV async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap>, gleif_date: Option<&str>) -> anyhow::Result>> { let dir = DataPaths::new(".")?; let gleif_cache_dir = dir.cache_gleif_dir(); @@ -403,23 +437,42 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap d, - None => return Err(anyhow!("No GLEIF CSV file found in cache/gleif directory")), + None => { + let err = "No GLEIF CSV file found in cache/gleif directory"; + logger::log_error(err).await; + return Err(anyhow!(err)); + }, } }; // Create date-based subdirectory in the mapping cache + let msg = format!("Creating date directory for: {}", date); + logger::log_info(&msg).await; let date_dir = map_cache_dir.join(&date); tokio_fs::create_dir_all(&date_dir).await.context("Failed to create date directory")?; // Load market sectors dynamically from cache + logger::log_info("Loading market sectors...").await; let sector_dirs = load_market_sectors().await?; let mut sector_maps: HashMap>> = HashMap::new(); + // Create uncategorized folder + let msg = format!("Creating {} sector directories...", sector_dirs.len()); + logger::log_info(&msg).await; + let uncategorized_dir = date_dir.join("uncategorized"); + tokio_fs::create_dir_all(&uncategorized_dir).await.context("Failed to create uncategorized directory")?; + let uncategorized_path = uncategorized_dir.join("lei_to_figi.jsonl"); + let uncategorized_map = load_lei_to_figi_jsonl(&uncategorized_path).await?; + sector_maps.insert("uncategorized".to_string(), uncategorized_map); + for sector in §or_dirs { let sector_dir = date_dir.join(sector); tokio_fs::create_dir_all(§or_dir).await.context("Failed to create sector directory")?; @@ -430,22 +483,30 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap = lei_to_isins.keys().cloned().collect(); leis.sort(); let mut processed = sector_maps.values().map(|m| m.len()).sum::(); let total = leis.len(); + let mut no_hit_leis = Vec::new(); // Track LEIs with no data found (no_hit) + + let msg = format!("Total LEIs to process: {}, already processed: {}", total, processed); + + logger::log_info(&msg).await; for lei in leis { - // Check if LEI is already processed in any sector + // Check if LEI is already processed in any sector (including uncategorized) let mut already_processed = false; for sector_map in sector_maps.values() { if sector_map.contains_key(&lei) { @@ -464,18 +525,57 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap = isins.iter().cloned().collect::>().into_iter().collect(); + let debug_msg = format!("Processing LEI {} with {} ISINs...", lei, unique_isins.len()); + logger::log_info(&debug_msg).await; + let all_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; + // Case 1: no_hit - API succeeded but returned no data + if all_figi_infos.is_empty() { + let no_hit_msg = format!(" no_hit: LEI {} returned no FIGIs", lei); + logger::log_warn(&no_hit_msg).await; + no_hit_leis.push(lei.clone()); + + // Remove immediately from GLEIF CSV to prevent progress loss on interrupt + if let Err(e) = remove_lei_from_gleif_csv_single(gleif_cache_dir, &lei).await { + let warn_msg = format!("Warning: Failed to remove LEI {} from GLEIF CSV: {}", lei, e); + eprintln!("{}", warn_msg); + logger::log_warn(&warn_msg).await; + } + + continue; + } + + let hit_msg = format!(" hit: LEI {} found {} FIGIs", lei, all_figi_infos.len()); + logger::log_info(&hit_msg).await; + // Organize results by marketSector let mut figis_by_sector: HashMap> = HashMap::new(); + let mut uncategorized_figis = Vec::new(); for figi_info in all_figi_infos { let sector = figi_info.marketSector.clone(); - if sector.is_empty() { - continue; // Skip if no sector - } - figis_by_sector.entry(sector).or_insert_with(Vec::new).push(figi_info); + if sector.is_empty() { + // Case 2: Hit but no marketSecDes - save to uncategorized + uncategorized_figis.push(figi_info); + } else { + // Case 1: Hit with marketSector - organize by sector + figis_by_sector.entry(sector).or_insert_with(Vec::new).push(figi_info); + } + } + + // Save uncategorized FIGIs if any + if !uncategorized_figis.is_empty() { + uncategorized_figis.sort_by_key(|f| f.figi.clone()); + uncategorized_figis.dedup_by_key(|f| f.figi.clone()); + + append_lei_to_figi_jsonl(&uncategorized_path, &lei, &uncategorized_figis).await + .context("Failed to append to uncategorized JSONL")?; + + if let Some(uncategorized_map) = sector_maps.get_mut("uncategorized") { + uncategorized_map.insert(lei.clone(), uncategorized_figis); + } } // Save to appropriate sector files @@ -502,21 +602,19 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap 0 { - println!("{}: {} LEIs, {} FIGIs", sector, sector_map.len(), total_figis); - } - } + // Log final summary for no_hit LEIs (they've already been removed incrementally) + if !no_hit_leis.is_empty() { + let no_hit_summary = format!("no_hit (removed incrementally from GLEIF): {} LEIs", no_hit_leis.len()); + println!("{}", no_hit_summary); + logger::log_info(&no_hit_summary).await; } // Return Equity sector as the main result @@ -559,7 +657,9 @@ async fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result Ok(()) } +/// Removes a single invalid LEI from the GLEIF CSV file immediately. +/// +/// This function is called after each no_hit detection to prevent progress loss on interrupt. +/// It reads the GLEIF CSV, filters out the specific LEI, and overwrites the file. +/// +/// # Arguments +/// +/// * `gleif_cache_dir` - Path to the cache/gleif directory +/// * `lei` - The LEI string to remove +/// +/// # Returns +/// Ok(()) if successful, Err if file operations fail. +async fn remove_lei_from_gleif_csv_single(gleif_cache_dir: &Path, lei: &str) -> anyhow::Result<()> { + // Find the most recent GLEIF CSV file + let mut entries = tokio_fs::read_dir(gleif_cache_dir) + .await + .context("Failed to read gleif cache directory")?; + + let mut csv_files = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if let Some(filename) = path.file_name() { + let filename_str = filename.to_string_lossy(); + if filename_str.ends_with(".csv") && filename_str.contains("isin-lei-") { + csv_files.push(path); + } + } + } + + if csv_files.is_empty() { + return Ok(()); + } + + // Sort and get the most recent + csv_files.sort(); + csv_files.reverse(); + let gleif_file = &csv_files[0]; + + // Read the CSV file + let content = tokio_fs::read_to_string(gleif_file) + .await + .context("Failed to read GLEIF CSV")?; + + // Filter out line with this LEI + let filtered_lines: Vec<&str> = content + .lines() + .filter(|line| { + // GLEIF CSV format: ISIN,LEI + let parts: Vec<&str> = line.split(',').collect(); + if parts.len() >= 2 { + parts[1] != lei + } else { + true // Keep lines that don't match the format (e.g., header) + } + }) + .collect(); + + // Only write if something was actually removed + if filtered_lines.len() < content.lines().count() { + let new_content = filtered_lines.join("\n") + "\n"; + tokio_fs::write(gleif_file, new_content) + .await + .context("Failed to write filtered GLEIF CSV")?; + } + + Ok(()) +} + +/// Removes invalid LEIs from the GLEIF CSV file. +/// +/// When an API call succeeds but returns no data (no_hit), the LEI is considered invalid +/// and should be removed from the GLEIF CSV to prevent re-scraping on future runs. +/// +/// This function reads the GLEIF CSV, filters out the specified LEIs, and overwrites the file. +/// +/// # Arguments +/// +/// * `gleif_cache_dir` - Path to the cache/gleif directory +/// * `leis_to_remove` - Vec of LEI strings to remove +/// +/// # Returns +/// Ok(()) if successful, Err if file operations fail. +async fn remove_leis_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[String]) -> anyhow::Result<()> { + logger::log_info(&format!("Removing {} invalid LEIs from GLEIF CSV...", leis_to_remove.len())).await; + + // Find the most recent GLEIF CSV file + let mut entries = tokio_fs::read_dir(gleif_cache_dir) + .await + .context("Failed to read gleif cache directory")?; + + let mut csv_files = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if let Some(filename) = path.file_name() { + let filename_str = filename.to_string_lossy(); + if filename_str.ends_with(".csv") && filename_str.contains("isin-lei-") { + csv_files.push(path); + } + } + } + + if csv_files.is_empty() { + logger::log_warn("No GLEIF CSV files found for removal operation").await; + return Ok(()); + } + + // Sort and get the most recent + csv_files.sort(); + csv_files.reverse(); + let gleif_file = &csv_files[0]; + let debug_msg = format!("Reading GLEIF file: {}", gleif_file.display()); + logger::log_info(&debug_msg).await; + + // Read the CSV file + let content = tokio_fs::read_to_string(gleif_file) + .await + .context("Failed to read GLEIF CSV")?; + + let original_lines = content.lines().count(); + + // Convert LEIs to remove into a HashSet for faster lookup + let remove_set: std::collections::HashSet<_> = leis_to_remove.iter().cloned().collect(); + + // Filter out lines with LEIs to remove + let filtered_lines: Vec<&str> = content + .lines() + .filter(|line| { + // GLEIF CSV format: ISIN,LEI + let parts: Vec<&str> = line.split(',').collect(); + if parts.len() >= 2 { + !remove_set.contains(parts[1]) + } else { + true // Keep lines that don't match the format (e.g., header) + } + }) + .collect(); + + let removed_count = original_lines - filtered_lines.len(); + + // Write back the filtered content + let new_content = filtered_lines.join("\n") + "\n"; + tokio_fs::write(gleif_file, new_content) + .await + .context("Failed to write filtered GLEIF CSV")?; + + let success_msg = format!("✓ Removed {} invalid LEIs from GLEIF CSV (was {} lines, now {} lines)", leis_to_remove.len(), original_lines, filtered_lines.len()); + println!("{}", success_msg); + logger::log_info(&success_msg).await; + + Ok(()) +} + /// Loads or builds HashMaps for companies, warrants, and options. /// /// This function: @@ -1045,7 +1299,7 @@ where pub async fn load_figi_type_lists() -> anyhow::Result<()> { println!("Loading OpenFIGI mapping value lists..."); - let client = OpenFigiClient::new()?; + let client = OpenFigiClient::new().await?; // Create cache directory let dir = DataPaths::new(".")?; diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index bc623ed..71fc198 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -238,7 +238,7 @@ fn infer_currency_from_ticker(ticker: &str) -> String { /// Returns an error if file operations or serialization fails. pub async fn save_companies_to_jsonl( paths: &DataPaths, - companies: &HashMap>, + companies: &HashMap>, ) -> anyhow::Result<()> { let file_path = paths.data_dir().join("companies.jsonl"); diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 9f13fc0..8e73b58 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -83,19 +83,25 @@ pub async fn run_full_update(config: &Config, pool: &Arc) -> a println!("{}", msg); logger::log_info(&msg).await; - // HashMap> - let companies: HashMap> = securities.0 + // HashMap> - unique pairs only + let companies: HashMap> = securities.0 .iter() .fold(HashMap::new(), |mut acc, security| { - let isin: Vec = security.1.securities.values() - .flat_map(|figi_info| figi_info.iter().map(|x| x.isin.clone())) - .collect(); - let ticker: Vec = security.1.securities.values() - .flat_map(|figi_info| figi_info.iter().map(|x| x.ticker.clone())) - .collect(); - acc.entry(security.1.name.clone()) - .or_insert_with(Vec::new) - .push((isin.join(", "), ticker.join(", "))); + let mut isin_ticker_pairs: HashMap = HashMap::new(); + + // Collect all unique ISIN-Ticker pairs + for figi_infos in security.1.securities.values() { + for figi_info in figi_infos { + if !figi_info.isin.is_empty() && !figi_info.ticker.is_empty() { + isin_ticker_pairs.insert(figi_info.isin.clone(), figi_info.ticker.clone()); + } + } + } + + // Only add if there are pairs + if !isin_ticker_pairs.is_empty() { + acc.insert(security.1.name.clone(), isin_ticker_pairs); + } acc });