// src/corporate/update_openfigi.rs - STREAMING VERSION use super::types::*; use super::helpers::{find_most_recent_figi_date_dir, determine_gleif_date}; use super::bond_processing::*; use super::option_processing::*; use crate::util::directories::DataPaths; use crate::util::integrity::{DataStage, StateManager, directory_reference}; use crate::util::logger; use crate::scraper::openfigi::{OpenFigiClient}; use serde_json::{json, Value}; use std::collections::{HashMap, HashSet}; use std::path::Path; use tokio::fs as tokio_fs; use tokio::io::AsyncWriteExt; use anyhow::{Context, anyhow}; const LEI_BATCH_SIZE: usize = 100; // Process 100 LEIs at a time /// Loads or builds securities data by streaming through FIGI mapping files. /// /// Implements abort-safe incremental persistence with checkpoints and replay logs. /// /// # Arguments /// * `date_dir` - Path to the date-specific mapping directory (e.g., cache/gleif_openfigi_map/24112025/) /// /// # Returns /// Ok(()) on success. /// /// # Errors /// Returns an error if file I/O fails or JSON parsing fails. pub async fn update_securities(paths: &DataPaths) -> anyhow::Result<()> { logger::log_info("Building securities data from FIGI mappings...").await; let date_dir = find_most_recent_figi_date_dir(&paths).await? .ok_or_else(|| anyhow!("No FIGI date directory found"))?; let output_dir = paths.figi_securities_dir(); let manager = StateManager::new(&paths.integrity_dir()).await?; let step_name = "securities_data_complete"; let content_reference = directory_reference( output_dir, Some(vec![ "common_stocks.jsonl".to_string(), "warrants.jsonl".to_string(), "options.jsonl".to_string(), "corporate_bonds.jsonl".to_string(), "government_bonds.jsonl".to_string(), ]), Some(vec![ "*.log.jsonl".to_string(), // Exclude log files "*.tmp".to_string(), // Exclude temp files "state.jsonl".to_string(), // Exclude internal state tracking ]), ); let data_stage = DataStage::Data; if manager.is_step_valid(step_name).await? { logger::log_info(" Securities data already built and valid").await; logger::log_info(" All sectors already processed, nothing to do").await; return Ok(()); } logger::log_info(" Securities data incomplete or missing, proceeding with update").await; let entry = manager.create_entry(step_name.to_string(), content_reference, data_stage).await?; logger::log_info("Building securities data from FIGI mappings...").await; // Setup checkpoint and log paths for each security type let common_checkpoint = output_dir.join("common_stocks.jsonl"); let common_log = output_dir.join("common_stocks.log.jsonl"); let warrants_checkpoint = output_dir.join("warrants.jsonl"); let warrants_log = output_dir.join("warrants.log.jsonl"); let options_checkpoint = output_dir.join("options.jsonl"); let options_log = output_dir.join("options.log.jsonl"); let corporate_bonds_checkpoint = output_dir.join("corporate_bonds.jsonl"); let corporate_bonds_log = output_dir.join("corporate_bonds.log.jsonl"); let government_bonds_checkpoint = output_dir.join("government_bonds.jsonl"); let government_bonds_log = output_dir.join("government_bonds.log.jsonl"); // Track which sectors have been fully processed let processed_sectors_file = output_dir.join("state.jsonl"); let processed_sectors = load_processed_sectors(&processed_sectors_file).await?; logger::log_info(&format!(" Already processed {} sectors", processed_sectors.len())).await; // Collect sectors to process let mut sectors_to_process = Vec::new(); let mut entries = tokio_fs::read_dir(date_dir).await .context("Failed to read date directory")?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if !path.is_dir() { continue; } let sector_name = path.file_name() .and_then(|n| n.to_str()) .map(|s| s.to_string()) .unwrap_or_else(|| "unknown".to_string()); let lei_figi_file = path.join("lei_to_figi.jsonl"); if !lei_figi_file.exists() { continue; } // Skip if already processed if processed_sectors.contains(§or_name) { logger::log_info(&format!(" Skipping already processed sector: {}", sector_name)).await; continue; } sectors_to_process.push((sector_name, lei_figi_file)); } if sectors_to_process.is_empty() { logger::log_info(" All sectors already processed, nothing to do").await; manager.mark_valid(entry).await?; return Ok(()); } // Load checkpoints and replay logs - these are MUTABLE now let mut existing_companies = load_checkpoint_and_replay(&common_checkpoint, &common_log, "name").await?; let mut existing_warrants = load_checkpoint_and_replay_nested(&warrants_checkpoint, &warrants_log).await?; let mut existing_options = load_checkpoint_and_replay_nested(&options_checkpoint, &options_log).await?; let mut existing_corporate_bonds = load_checkpoint_and_replay_nested(&corporate_bonds_checkpoint, &corporate_bonds_log).await?; let mut existing_government_bonds = load_checkpoint_and_replay_nested(&government_bonds_checkpoint, &government_bonds_log).await?; logger::log_info(&format!(" Existing entries - Companies: {}, Warrants: {}, Options: {}, Corporate Bonds: {}, Government Bonds: {}", existing_companies.len(), existing_warrants.len(), existing_options.len(), existing_corporate_bonds.len(), existing_government_bonds.len())).await; // Process statistics let mut stats = StreamingStats::new( existing_companies.len(), existing_warrants.len(), existing_options.len(), existing_corporate_bonds.len(), existing_government_bonds.len() ); logger::log_info(&format!(" Found {} sectors to process", sectors_to_process.len())).await; // Process each sector let mut newly_processed_sectors = Vec::new(); for (sector_name, lei_figi_file) in sectors_to_process { logger::log_info(&format!(" Processing sector: {}", sector_name)).await; // Stream through the lei_to_figi.jsonl file with batched writes process_lei_figi_file_batched( &lei_figi_file, &common_log, &warrants_log, &options_log, &corporate_bonds_log, &government_bonds_log, &mut existing_companies, &mut existing_warrants, &mut existing_options, &mut existing_corporate_bonds, &mut existing_government_bonds, &mut stats, ).await?; // Mark sector as processed newly_processed_sectors.push(sector_name.clone()); // Append to processed sectors file immediately for crash safety append_processed_sector(&processed_sectors_file, §or_name).await?; } // Create checkpoints after all processing if !newly_processed_sectors.is_empty() { logger::log_info("Creating checkpoints...").await; create_checkpoint(&common_checkpoint, &common_log).await?; create_checkpoint(&warrants_checkpoint, &warrants_log).await?; create_checkpoint(&options_checkpoint, &options_log).await?; create_checkpoint(&corporate_bonds_checkpoint, &corporate_bonds_log).await?; create_checkpoint(&government_bonds_checkpoint, &government_bonds_log).await?; } stats.print_summary(); logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await; manager.mark_valid(entry).await?; logger::log_info(" ✓ Securities data marked as complete with integrity tracking").await; Ok(()) } /// Loads the list of sectors that have been fully processed async fn load_processed_sectors(path: &Path) -> anyhow::Result> { let mut sectors = HashSet::new(); if !path.exists() { return Ok(sectors); } let content = tokio_fs::read_to_string(path).await .context("Failed to read processed sectors file")?; for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() || !line.ends_with('}') { continue; // Skip incomplete lines } match serde_json::from_str::(line) { Ok(entry) => { if let Some(sector) = entry["sector"].as_str() { sectors.insert(sector.to_string()); } } Err(e) => { logger::log_warn(&format!( "Skipping invalid processed sector line {}: {}", line_num + 1, e )).await; } } } Ok(sectors) } /// Appends a sector name to the processed sectors file with fsync /// Appends a sector name to the processed sectors JSONL file with fsync async fn append_processed_sector(path: &Path, sector_name: &str) -> anyhow::Result<()> { use std::fs::OpenOptions; use std::io::Write; let entry = json!({ "sector": sector_name, "completed_at": chrono::Utc::now().to_rfc3339(), }); let mut file = OpenOptions::new() .create(true) .append(true) .open(path) .context("Failed to open processed sectors file")?; let line = serde_json::to_string(&entry) .context("Failed to serialize sector entry")? + "\n"; file.write_all(line.as_bytes())?; // Ensure durability file.sync_data() .context("Failed to fsync processed sectors file")?; Ok(()) } /// Generic function to load checkpoint and replay log with custom key extraction /// /// This function handles the common pattern of loading and merging checkpoint and log files, /// with custom key extraction logic provided by a closure. /// /// # Arguments /// * `checkpoint_path` - Path to checkpoint file /// * `log_path` - Path to log file /// * `key_extractor` - Closure that extracts a key from a JSON entry /// /// # Returns /// HashSet of extracted keys async fn load_checkpoint_and_replay_generic( checkpoint_path: &Path, log_path: &Path, key_extractor: F, ) -> anyhow::Result> where F: Fn(&Value) -> Option, { let mut keys = HashSet::new(); // Load checkpoint if it exists if checkpoint_path.exists() { let content = tokio_fs::read_to_string(checkpoint_path).await .context("Failed to read checkpoint")?; for line in content.lines() { if line.trim().is_empty() || !line.ends_with('}') { continue; } if let Ok(entry) = serde_json::from_str::(line) { if let Some(key) = key_extractor(&entry) { keys.insert(key); } } } } // Replay log if it exists if log_path.exists() { let content = tokio_fs::read_to_string(log_path).await .context("Failed to read log")?; for line in content.lines() { if line.trim().is_empty() || !line.ends_with('}') { continue; } if let Ok(entry) = serde_json::from_str::(line) { if let Some(key) = key_extractor(&entry) { keys.insert(key); } } } } Ok(keys) } /// Loads checkpoint and replays log, returning set of existing keys (simple field extraction) async fn load_checkpoint_and_replay( checkpoint_path: &Path, log_path: &Path, key_field: &str, ) -> anyhow::Result> { load_checkpoint_and_replay_generic(checkpoint_path, log_path, |entry| { entry[key_field].as_str().map(|s| s.to_string()) }).await } /// Loads checkpoint and replays log for nested structures (warrants/options) async fn load_checkpoint_and_replay_nested( checkpoint_path: &Path, log_path: &Path, ) -> anyhow::Result> { load_checkpoint_and_replay_generic(checkpoint_path, log_path, |entry| { let underlying = entry["underlying_company_name"].as_str().unwrap_or(""); let type_field = if entry.get("warrant_type").is_some() { entry["warrant_type"].as_str().unwrap_or("") } else { entry["option_type"].as_str().unwrap_or("") }; if !underlying.is_empty() && !type_field.is_empty() { Some(format!("{}::{}", underlying, type_field)) } else { None } }).await } /// Creates a checkpoint by copying log to checkpoint atomically async fn create_checkpoint(checkpoint_path: &Path, log_path: &Path) -> anyhow::Result<()> { if !log_path.exists() { return Ok(()); } // Read all committed lines from log let content = tokio_fs::read_to_string(log_path).await .context("Failed to read log for checkpoint")?; let committed_lines: Vec<&str> = content .lines() .filter(|line| !line.trim().is_empty() && line.ends_with('}')) .collect(); if committed_lines.is_empty() { return Ok(()); } // Write to temporary file let tmp_path = checkpoint_path.with_extension("tmp"); let mut tmp_file = std::fs::File::create(&tmp_path) .context("Failed to create temp checkpoint")?; for line in committed_lines { use std::io::Write; writeln!(tmp_file, "{}", line)?; } // Ensure data is flushed to disk tmp_file.sync_data() .context("Failed to sync temp checkpoint")?; drop(tmp_file); // Atomic rename tokio_fs::rename(&tmp_path, checkpoint_path).await .context("Failed to rename checkpoint")?; // Clear log after successful checkpoint tokio_fs::remove_file(log_path).await .context("Failed to remove log after checkpoint")?; Ok(()) } /// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync /// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync /// /// Two-phase processing: /// Phase 1: Process all common stocks first to generate company UUIDs /// Phase 2: Process dependent securities (warrants, options, corporate bonds) using company UUIDs async fn process_lei_figi_file_batched( input_path: &Path, common_log_path: &Path, warrants_log_path: &Path, options_log_path: &Path, corporate_bonds_log_path: &Path, government_bonds_log_path: &Path, existing_companies: &mut HashSet, existing_warrants: &mut HashSet, existing_options: &mut HashSet, existing_corporate_bonds: &mut HashSet, existing_government_bonds: &mut HashSet, stats: &mut StreamingStats, ) -> anyhow::Result<()> { let content = tokio_fs::read_to_string(input_path).await .context("Failed to read lei_to_figi.jsonl")?; let batch_size = 100; let mut processed_count = 0; // === PHASE 1: Process common stocks === logger::log_info(" Phase 1: Processing common stocks...").await; let mut common_batch: Vec = Vec::new(); for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() { continue; } let entry: Value = serde_json::from_str(line) .context(format!("Failed to parse JSON on line {}", line_num + 1))?; let figis: Vec = serde_json::from_value(entry["figis"].clone()) .context("Invalid 'figis' field")?; if figis.is_empty() { continue; } // Group by security type let (common_stocks, _, _, _, _) = group_securities(&figis); // Process common stocks if !common_stocks.is_empty() { if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) { // Add to existing set immediately to prevent duplicates in same run existing_companies.insert(entry.name.clone()); common_batch.push(entry); } } // Write batches when they reach size limit if common_batch.len() >= batch_size { write_batch_with_fsync(common_log_path, &common_batch).await?; stats.companies_added += common_batch.len(); common_batch.clear(); } processed_count += 1; if processed_count % 1000 == 0 { logger::log_info(&format!(" Phase 1: Processed {} LEI entries...", processed_count)).await; } } // Write remaining common stocks batch if !common_batch.is_empty() { write_batch_with_fsync(common_log_path, &common_batch).await?; stats.companies_added += common_batch.len(); } logger::log_info(" Phase 1 complete").await; // === PHASE 2: Process dependent securities (warrants, options, corporate bonds) === logger::log_info(" Phase 2: Processing warrants, options, and corporate bonds...").await; let mut warrants_batch: Vec = Vec::new(); let mut options_batch: Vec = Vec::new(); let mut corporate_bonds_batch: Vec = Vec::new(); let mut government_bonds_batch: Vec = Vec::new(); processed_count = 0; for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() { continue; } let entry: Value = serde_json::from_str(line) .context(format!("Failed to parse JSON on line {}", line_num + 1))?; let figis: Vec = serde_json::from_value(entry["figis"].clone()) .context("Invalid 'figis' field")?; if figis.is_empty() { continue; } // Group by security type let (_, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) = group_securities(&figis); if !warrant_securities.is_empty() { for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) { let key = entry.company_name.clone(); existing_warrants.insert(key); warrants_batch.push(entry); } } if !option_securities.is_empty() { for entry in prepare_option_entries(&option_securities, existing_options) { let key = entry.company_name.clone(); existing_options.insert(key); options_batch.push(entry); } } if !corporate_bonds_securities.is_empty() { for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds) { let key = entry.underlying_company_name.clone(); existing_corporate_bonds.insert(key); corporate_bonds_batch.push(entry); } } if !government_bonds_securities.is_empty() { for entry in prepare_government_bond_entries(&government_bonds_securities, existing_government_bonds) { let key = entry.issuer_name.clone(); existing_government_bonds.insert(key); government_bonds_batch.push(entry); } } // Write batches when they reach size limit if warrants_batch.len() >= batch_size { write_batch_with_fsync(warrants_log_path, &warrants_batch).await?; stats.warrants_added += warrants_batch.len(); warrants_batch.clear(); } if options_batch.len() >= batch_size { write_batch_with_fsync(options_log_path, &options_batch).await?; stats.options_added += options_batch.len(); options_batch.clear(); } if corporate_bonds_batch.len() >= batch_size { write_batch_with_fsync(corporate_bonds_log_path, &corporate_bonds_batch).await?; stats.corporate_bonds_added += corporate_bonds_batch.len(); corporate_bonds_batch.clear(); } if government_bonds_batch.len() >= batch_size { write_batch_with_fsync(government_bonds_log_path, &government_bonds_batch).await?; stats.government_bonds_added += government_bonds_batch.len(); government_bonds_batch.clear(); } processed_count += 1; if processed_count % 1000 == 0 { logger::log_info(&format!(" Phase 2: Processed {} LEI entries...", processed_count)).await; } } // Write remaining batches if !warrants_batch.is_empty() { write_batch_with_fsync(warrants_log_path, &warrants_batch).await?; stats.warrants_added += warrants_batch.len(); } if !options_batch.is_empty() { write_batch_with_fsync(options_log_path, &options_batch).await?; stats.options_added += options_batch.len(); } if !corporate_bonds_batch.is_empty() { write_batch_with_fsync(corporate_bonds_log_path, &corporate_bonds_batch).await?; stats.corporate_bonds_added += corporate_bonds_batch.len(); } if !government_bonds_batch.is_empty() { write_batch_with_fsync(government_bonds_log_path, &government_bonds_batch).await?; stats.government_bonds_added += government_bonds_batch.len(); } logger::log_info(" Phase 2 complete").await; Ok(()) } /// Writes a batch of entries to log with fsync async fn write_batch_with_fsync( log_path: &Path, entries: &[T], ) -> anyhow::Result<()> { use std::fs::OpenOptions; use std::io::Write; let mut file = OpenOptions::new() .create(true) .append(true) .open(log_path) .context("Failed to open log file")?; for entry in entries { let line = serde_json::to_string(entry) .context("Failed to serialize entry")?; writeln!(file, "{}", line)?; } // Critical: fsync to ensure durability file.sync_data() .context("Failed to fsync log file")?; Ok(()) } /// Prepares a common stock entry if it doesn't exist fn prepare_common_stock_entry( figi_infos: &[FigiData], existing_keys: &HashSet, ) -> Option { let name = figi_infos[0].name.clone(); if name.is_empty() || existing_keys.contains(&name) { return None; } let grouped_by_isin = group_figis_by_isin(figi_infos); let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default(); Some(CompanyData { name, primary_isin, securities: grouped_by_isin, yahoo_company_data: None, isin_tickers_map: None, }) } /// Prepares warrant entries for batching /// Prepares warrant entries for batching /// /// Groups warrant contracts by underlying company. fn prepare_warrant_entries( warrant_securities: &[FigiData], existing_keys: &HashSet, ) -> Vec { let mut entries = Vec::new(); // Group by underlying company let mut grouped: HashMap> = HashMap::new(); for figi in warrant_securities { let (underlying, _issuer, warrant_type) = parse_warrant_name(&figi.name); if underlying.is_empty() { continue; } grouped.entry(underlying.clone()) .or_default() .push((warrant_type, figi.clone())); } // Create WarrantData for each underlying company for (underlying_company, contracts) in grouped { if existing_keys.contains(&underlying_company) { continue; } let mut warrants_by_type: HashMap = HashMap::new(); for (warrant_type, figi) in contracts { let (_, issuer, _) = parse_warrant_name(&figi.name); let warrant_detail = WarrantDetails { company_name: underlying_company.clone(), issuer_company_name: issuer, warrant_type: warrant_type.clone(), warrants: { let mut map = HashMap::new(); map.insert(figi.isin.clone(), vec![figi.clone()]); map }, }; let key = format!("{}_{}", underlying_company, warrant_type); warrants_by_type.insert(key, warrant_detail); } let warrant_info = WarrantData { company_name: underlying_company.clone(), warrants: warrants_by_type, }; entries.push(warrant_info); } entries } /// Prepares option entries for batching /// /// Groups option contracts by underlying company, extracts strike prices and expiration dates, /// and builds OptionChain structures organizing calls and puts by expiration date. /// /// # Arguments /// * `option_securities` - List of FigiData objects for option contracts /// * `existing_keys` - Set of already-processed keys (format: "company_name") /// /// # Returns /// Vector of OptionData entries, one per unique underlying company /// Prepares option entries for batching /// /// Groups option contracts by underlying company. fn prepare_option_entries( option_securities: &[FigiData], existing_keys: &HashSet, ) -> Vec { let mut entries = Vec::new(); // Group by underlying company let mut grouped: HashMap> = HashMap::new(); for figi in option_securities { let (underlying, _issuer, option_type) = parse_option_name(&figi.name); if underlying.is_empty() { continue; } grouped.entry(underlying.clone()) .or_default() .push((option_type, figi.clone())); } // Create OptionData for each underlying company for (underlying_company, contracts) in grouped { if existing_keys.contains(&underlying_company) { continue; } // Build OptionContracts and extract strikes/expirations let mut option_contracts: HashMap, Vec)> = HashMap::new(); let mut all_strikes: std::collections::HashSet = std::collections::HashSet::new(); for (option_type, figi) in contracts { // Parse strike price from ticker if available let strike = parse_strike_from_ticker(&figi.ticker).unwrap_or(0.0); let expiration = parse_expiration_from_ticker(&figi.ticker).unwrap_or(0); if strike > 0.0 && expiration > 0 { all_strikes.insert((strike * 100.0) as u64); let contract = OptionContract { strike, last_price: None, bid: None, ask: None, volume: None, open_interest: None, implied_volatility: None, }; let entry = option_contracts.entry(expiration).or_insert((Vec::new(), Vec::new())); match option_type.as_str() { "call" => entry.0.push(contract), "put" => entry.1.push(contract), _ => {} } } } // Build OptionChains from contracts let mut option_chains = Vec::new(); let mut expiration_dates = Vec::new(); for (expiration, (calls, puts)) in option_contracts { expiration_dates.push(expiration); option_chains.push(OptionChain { expiration_date: expiration, calls, puts, }); } expiration_dates.sort(); option_chains.sort_by_key(|oc| oc.expiration_date); let strikes: Vec = all_strikes .iter() .map(|s| *s as f64 / 100.0) .collect::>(); let option_data = OptionData { company_name: underlying_company.clone(), expiration_dates, strikes, option: option_chains, timestamp: chrono::Utc::now().timestamp(), }; entries.push(option_data); } entries } /// Prepares corporate bond entries for batching /// /// Groups corporate bonds by issuer (underlying_company_name), extracting key bond details /// like coupon rate, maturity date, and tenor from the ticker/description for each ISIN. /// /// # Arguments /// * `corporate_bond_securities` - List of FigiInfo objects for corporate bonds /// * `existing_keys` - Set of already-processed keys (format: "company_name") /// /// # Returns /// Vector of CorporateBondInfo entries, one per unique issuer /// Prepares corporate bond entries for batching /// /// Groups corporate bonds by issuer (underlying_company_name). fn prepare_corporate_bond_entries( corporate_bond_securities: &[FigiData], existing_keys: &HashSet, ) -> Vec { let mut entries = Vec::new(); // Group bonds by issuer (company name) let mut grouped: HashMap> = HashMap::new(); for figi in corporate_bond_securities { let issuer = figi.name.clone(); if issuer.is_empty() { continue; } grouped.entry(issuer).or_default().push(figi.clone()); } // Create entries for each unique issuer for (issuer, figis) in grouped { // Check if this issuer already exists if existing_keys.contains(&issuer) { continue; } // Group by ISIN let bonds_by_isin = group_figis_by_isin(&figis); // Parse bond details for each ISIN let mut bond_details_map: HashMap = HashMap::new(); for (isin, isin_figis) in &bonds_by_isin { if let Some(first_figi) = isin_figis.first() { let details = parse_bond_details(&first_figi.ticker, &first_figi.security_description); bond_details_map.insert(isin.clone(), details); } } let bond_info = CorporateBondData { underlying_company_name: issuer.clone(), bonds: bonds_by_isin, bond_details: bond_details_map, }; entries.push(bond_info); } entries } /// Prepares government bond entries for batching /// /// Groups government bonds by issuer (country/entity), extracting key bond /// details like coupon rate, maturity date, and tenor from the ticker/description for each ISIN. /// Also classifies the government issuer type (sovereign, municipal, agency, etc.) /// /// # Arguments /// * `government_bond_securities` - List of FigiInfo objects for government bonds /// * `existing_keys` - Set of already-processed keys (format: "issuer_name") /// /// # Returns /// Vector of GovernmentBondInfo entries, one per unique issuer fn prepare_government_bond_entries( government_bond_securities: &[FigiData], existing_keys: &HashSet, ) -> Vec { let mut entries = Vec::new(); // Group bonds by issuer (country/entity name) let mut grouped: HashMap> = HashMap::new(); for figi in government_bond_securities { let issuer = figi.name.clone(); if issuer.is_empty() { continue; } grouped.entry(issuer).or_default().push(figi.clone()); } // Create entries for each unique issuer for (issuer, figis) in grouped { // Check if this issuer already exists if existing_keys.contains(&issuer) { continue; } // Classify the government issuer type let issuer_type = classify_government_issuer(&issuer); // Group by ISIN let bonds_by_isin = group_figis_by_isin(&figis); // Parse bond details for each ISIN let mut bond_details_map: HashMap = HashMap::new(); for (isin, isin_figis) in &bonds_by_isin { if let Some(first_figi) = isin_figis.first() { let details = parse_bond_details(&first_figi.ticker, &first_figi.security_description); bond_details_map.insert(isin.clone(), details); } } let bond_info = GovernmentBondData { issuer_name: issuer.clone(), issuer_type, bonds: bonds_by_isin, bond_details: bond_details_map, }; entries.push(bond_info); } entries } /// Groups FigiInfo list by security type fn group_securities(figis: &[FigiData]) -> (Vec, Vec, Vec, Vec, Vec) { let mut common_stocks:Vec = Vec::new(); let mut warrants:Vec = Vec::new(); let mut options:Vec = Vec::new(); let mut corporate_bonds:Vec = Vec::new(); let mut government_bonds:Vec = Vec::new(); for figi in figis { match figi.security_type.as_str() { "Common Stock" => common_stocks.push(figi.clone()), "Equity WRT" => warrants.push(figi.clone()), "Equity Option" => options.push(figi.clone()), _ => {} } match figi.security_type2.as_str() { "Corp" => corporate_bonds.push(figi.clone()), "Govt" => government_bonds.push(figi.clone()), _ => {} } } (common_stocks, warrants, options, corporate_bonds, government_bonds) } /// Groups FigiInfo by ISIN fn group_figis_by_isin(figi_infos: &[FigiData]) -> HashMap> { let mut grouped: HashMap> = HashMap::new(); for figi_info in figi_infos { grouped.entry(figi_info.isin.clone()) .or_insert_with(Vec::new) .push(figi_info.clone()); } for figis in grouped.values_mut() { figis.sort_by(|a, b| a.figi.cmp(&b.figi)); } grouped } /// Parse warrant name to extract underlying company, issuer, and warrant type /// /// Examples: /// - "VONTOBE-PW26 LEONARDO SPA" -> ("LEONARDO SPA", Some("VONTOBEL"), "put") /// - "BAYER H-CW25 L'OREAL" -> ("L'OREAL", Some("BAYER H"), "call") /// - "APPLE INC WARRANT" -> ("APPLE INC", None, "unknown") fn parse_warrant_name(name: &str) -> (String, Option, String) { let name_upper = name.to_uppercase(); // Try to detect warrant type from code (PW=put, CW=call) let warrant_type = if name_upper.contains("-PW") || name_upper.contains(" PW") { "put".to_string() } else if name_upper.contains("-CW") || name_upper.contains(" CW") { "call".to_string() } else { "unknown".to_string() }; // Try to split by warrant code pattern (e.g., "-PW26", "-CW25") if let Some(pos) = name.find("-PW") { let before = name[..pos].trim(); let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len()); let after = if after_idx < name.len() { name[after_idx..].trim() } else { "" }; return ( after.to_string(), if !before.is_empty() { Some(before.to_string()) } else { None }, warrant_type, ); } if let Some(pos) = name.find("-CW") { let before = name[..pos].trim(); let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len()); let after = if after_idx < name.len() { name[after_idx..].trim() } else { "" }; return ( after.to_string(), if !before.is_empty() { Some(before.to_string()) } else { None }, warrant_type, ); } // Fallback: return entire name as underlying (name.to_string(), None, warrant_type) } /// Statistics tracker for streaming processing #[derive(Debug)] struct StreamingStats { initial_companies: usize, initial_warrants: usize, initial_options: usize, initial_corporate_bonds: usize, initial_government_bonds: usize, companies_added: usize, warrants_added: usize, options_added: usize, corporate_bonds_added: usize, government_bonds_added: usize, } impl StreamingStats { fn new(companies: usize, warrants: usize, options: usize, corporate_bonds: usize, government_bonds: usize) -> Self { Self { initial_companies: companies, initial_warrants: warrants, initial_options: options, initial_corporate_bonds: corporate_bonds, initial_government_bonds: government_bonds, companies_added: 0, warrants_added: 0, options_added: 0, corporate_bonds_added: 0, government_bonds_added: 0, } } fn print_summary(&self) { println!("\n=== Processing Statistics ==="); println!("Companies:"); println!(" - Initial: {}", self.initial_companies); println!(" - Added: {}", self.companies_added); println!(" - Total: {}", self.initial_companies + self.companies_added); println!("Warrants:"); println!(" - Initial: {}", self.initial_warrants); println!(" - Added: {}", self.warrants_added); println!(" - Total: {}", self.initial_warrants + self.warrants_added); println!("Options:"); println!(" - Initial: {}", self.initial_options); println!(" - Added: {}", self.options_added); println!(" - Total: {}", self.initial_options + self.options_added); println!("Corporate Bonds:"); println!(" - Initial: {}", self.initial_corporate_bonds); println!(" - Added: {}", self.corporate_bonds_added); println!(" - Total: {}", self.initial_corporate_bonds + self.corporate_bonds_added); println!("Government Bonds:"); println!(" - Initial: {}", self.initial_government_bonds); println!(" - Added: {}", self.government_bonds_added); println!(" - Total: {}", self.initial_government_bonds + self.government_bonds_added); } } async fn load_market_sectors() -> anyhow::Result> { let dir = DataPaths::new(".")?; let cache_file = dir.cache_openfigi_dir().join("marketSecDes.json"); if !cache_file.exists() { return Ok(vec![ "Comdty".to_string(), "Corp".to_string(), "Equity".to_string(), "Govt".to_string(), ]); } let content = tokio_fs::read_to_string(&cache_file).await?; let json: Value = serde_json::from_str(&content)?; let sectors: Vec = json["values"] .as_array() .ok_or_else(|| anyhow!("No values"))? .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect(); Ok(sectors) } async fn setup_sector_directories( date_dir: &Path, sector_dirs: &[String], ) -> anyhow::Result<()> { let uncategorized_dir = date_dir.join("uncategorized"); tokio_fs::create_dir_all(&uncategorized_dir).await?; for sector in sector_dirs { let sector_dir = date_dir.join(sector); tokio_fs::create_dir_all(§or_dir).await?; } Ok(()) } #[derive(Debug)] pub struct MappingStats { pub total_leis: usize, pub mapped_leis: usize, pub no_result_leis: usize, pub unqueried_leis: usize, pub mapping_percentage: f64, pub queried_percentage: f64, pub by_sector: HashMap, } /// Get detailed statistics about LEI-FIGI mapping status pub async fn get_mapping_stats( csv_path: &str, gleif_date: Option<&str>, ) -> anyhow::Result { let dir = DataPaths::new(".")?; let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); let date = determine_gleif_date(gleif_date, &dir).await?; let date_dir = map_cache_dir.join(&date); let all_leis = get_all_leis_from_gleif(csv_path).await?; let mapped_leis = load_existing_mapped_leis(&date_dir).await?; let no_result_leis = load_no_result_leis(&date_dir).await?; let total = all_leis.len(); let mapped = mapped_leis.len(); let no_results = no_result_leis.len(); let queried = mapped + no_results; let unqueried = total.saturating_sub(queried); let mapping_percentage = if total > 0 { (mapped as f64 / total as f64) * 100.0 } else { 0.0 }; let queried_percentage = if total > 0 { (queried as f64 / total as f64) * 100.0 } else { 0.0 }; // Count by sector let mut by_sector = HashMap::new(); if date_dir.exists() { let mut entries = tokio_fs::read_dir(&date_dir).await?; while let Some(entry) = entries.next_entry().await? { let sector_path = entry.path(); if !sector_path.is_dir() { continue; } let sector_name = sector_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("unknown") .to_string(); let jsonl_path = sector_path.join("lei_to_figi.jsonl"); if !jsonl_path.exists() { continue; } let content = tokio_fs::read_to_string(&jsonl_path).await?; let count = content.lines().filter(|l| !l.trim().is_empty()).count(); by_sector.insert(sector_name, count); } } Ok(MappingStats { total_leis: total, mapped_leis: mapped, no_result_leis: no_results, unqueried_leis: unqueried, mapping_percentage, queried_percentage, by_sector, }) } /// Quick check if mapping is complete (returns true if all mapped) pub async fn is_mapping_complete(csv_path: &str) -> anyhow::Result { let dir = DataPaths::new(".")?; let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); let date = determine_gleif_date(None, &dir).await?; let date_dir = map_cache_dir.join(&date); let unmapped = get_unmapped_leis(csv_path, &date_dir).await?; Ok(unmapped.is_empty()) } /// Load all LEIs that have already been mapped from existing JSONL files async fn load_existing_mapped_leis(date_dir: &Path) -> anyhow::Result> { let mut mapped_leis = HashSet::new(); if !date_dir.exists() { return Ok(mapped_leis); } // Read all sector directories let mut entries = tokio_fs::read_dir(date_dir).await?; while let Some(entry) = entries.next_entry().await? { let sector_path = entry.path(); if !sector_path.is_dir() { continue; } let jsonl_path = sector_path.join("lei_to_figi.jsonl"); if !jsonl_path.exists() { continue; } // Read JSONL file line by line let content = tokio_fs::read_to_string(&jsonl_path).await?; for line in content.lines() { if line.trim().is_empty() { continue; } if let Ok(entry) = serde_json::from_str::(line) { if let Some(lei) = entry["lei"].as_str() { mapped_leis.insert(lei.to_string()); } } } } if !mapped_leis.is_empty() { logger::log_info(&format!("Found {} already mapped LEIs", mapped_leis.len())).await; } Ok(mapped_leis) } /// Read GLEIF CSV and return all LEIs (without loading entire file into memory) async fn get_all_leis_from_gleif(csv_path: &str) -> anyhow::Result> { let content = tokio::fs::read_to_string(csv_path) .await .context(format!("Failed to read GLEIF CSV file: {}", csv_path))?; let mut all_leis = HashSet::new(); for (idx, line) in content.lines().enumerate() { if idx == 0 { continue; // Skip header } let parts: Vec<&str> = line.split(',').collect(); if parts.len() < 2 { continue; } let lei = parts[0].trim().trim_matches('"').to_string(); if !lei.is_empty() { all_leis.insert(lei); } } logger::log_info(&format!("Found {} total LEIs in GLEIF CSV", all_leis.len())).await; Ok(all_leis) } /// Get unmapped LEIs by comparing GLEIF CSV with existing mappings async fn get_unmapped_leis( csv_path: &str, date_dir: &Path, ) -> anyhow::Result> { let all_leis = get_all_leis_from_gleif(csv_path).await?; let mapped_leis = load_existing_mapped_leis(date_dir).await?; let no_result_leis = load_no_result_leis(date_dir).await?; // Calculate truly unmapped: all - (mapped + no_results) let queried_leis: HashSet = mapped_leis .union(&no_result_leis) .cloned() .collect(); let unmapped: HashSet = all_leis .difference(&queried_leis) .cloned() .collect(); let total = all_leis.len(); let mapped = mapped_leis.len(); let no_results = no_result_leis.len(); let unqueried = unmapped.len(); logger::log_info(&format!( "LEI Status: Total={}, Mapped={}, No Results={}, Unqueried={}", total, mapped, no_results, unqueried )).await; Ok(unmapped) } /// Modified version that only processes specified LEIs pub async fn stream_gleif_csv_and_build_figi_filtered( csv_path: &str, gleif_date: Option<&str>, filter_leis: Option<&HashSet>, ) -> anyhow::Result<()> { logger::log_info(&format!("Streaming GLEIF CSV: {}", csv_path)).await; let content = tokio::fs::read_to_string(csv_path) .await .context(format!("Failed to read GLEIF CSV file: {}", csv_path))?; let client = OpenFigiClient::new().await?; if !client.has_key { logger::log_warn("No API key - skipping FIGI mapping").await; return Ok(()); } let dir = DataPaths::new(".")?; let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); let date = determine_gleif_date(gleif_date, &dir).await?; let date_dir = map_cache_dir.join(&date); tokio_fs::create_dir_all(&date_dir).await?; let sector_dirs = load_market_sectors().await?; setup_sector_directories(&date_dir, §or_dirs).await?; let mut lei_batch: HashMap> = HashMap::new(); let mut processed_leis = 0; let mut skipped_leis = 0; for (idx, line) in content.lines().enumerate() { if idx == 0 { continue; } let parts: Vec<&str> = line.split(',').collect(); if parts.len() < 2 { continue; } let lei = parts[0].trim().trim_matches('"').to_string(); let isin = parts[1].trim().trim_matches('"').to_string(); if lei.is_empty() || isin.is_empty() { continue; } // Apply filter if provided if let Some(filter) = filter_leis { if !filter.contains(&lei) { skipped_leis += 1; continue; } } lei_batch.entry(lei).or_default().push(isin); // Process batch when full if lei_batch.len() >= LEI_BATCH_SIZE { process_and_save_figi_batch(&client, &lei_batch, &date_dir).await?; processed_leis += lei_batch.len(); if processed_leis % 1000 == 0 { logger::log_info(&format!("Queried {} LEIs...", processed_leis)).await; } lei_batch.clear(); tokio::task::yield_now().await; } } // Process remaining if !lei_batch.is_empty() { process_and_save_figi_batch(&client, &lei_batch, &date_dir).await?; processed_leis += lei_batch.len(); } logger::log_info(&format!( "✓ Queried {} LEIs, skipped {} already processed", processed_leis, skipped_leis )).await; Ok(()) } /// Check mapping completion and process only unmapped LEIs pub async fn update_lei_mapping( paths: &DataPaths, csv_path: &str, gleif_date: Option<&str>, ) -> anyhow::Result { let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); let date = determine_gleif_date(gleif_date, &paths).await?; let date_dir = map_cache_dir.join(&date); let manager = StateManager::new(&paths.integrity_dir()).await?; let step_name = "lei_figi_mapping_complete"; let content_reference = directory_reference( &date_dir, Some(vec![ "*/lei_to_figi.jsonl".to_string(), // All sector mapping files "no_results.jsonl".to_string(), // LEIs with no results ]), Some(vec![ "*.tmp".to_string(), // Exclude temp files "*.log".to_string(), // Exclude log files ]), ); let data_stage = DataStage::Cache; // 24-hour TTL for API data if manager.is_step_valid(step_name).await? { logger::log_info(" LEI-FIGI mapping already completed and valid").await; logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await; return Ok(true); } let entry = manager.create_entry(step_name.to_string(), content_reference, data_stage).await?; // Get unmapped LEIs (excludes both mapped and no-result LEIs) let unmapped = get_unmapped_leis(csv_path, &date_dir).await?; if unmapped.is_empty() { logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await; manager.mark_valid(entry).await?; logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await; return Ok(true); } logger::log_info(&format!("Found {} LEIs that need querying - starting mapping...", unmapped.len())).await; // Process only unmapped LEIs stream_gleif_csv_and_build_figi_filtered(csv_path, gleif_date, Some(&unmapped)).await?; // Verify completion let still_unmapped = get_unmapped_leis(csv_path, &date_dir).await?; if still_unmapped.is_empty() { logger::log_info("✓ All LEIs successfully queried").await; manager.mark_valid(entry).await?; logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await; Ok(true) } else { logger::log_warn(&format!( "⚠ {} LEIs still unqueried (API errors or rate limits)", still_unmapped.len() )).await; manager.mark_invalid(entry, " Some LEIs remain unqueried".to_string()).await?; Ok(false) } } /// Load LEIs that were queried but returned no results async fn load_no_result_leis(date_dir: &Path) -> anyhow::Result> { let mut no_result_leis = HashSet::new(); let no_results_path = date_dir.join("no_results.jsonl"); if !no_results_path.exists() { return Ok(no_result_leis); } let content = tokio_fs::read_to_string(&no_results_path).await?; for line in content.lines() { if line.trim().is_empty() { continue; } if let Ok(entry) = serde_json::from_str::(line) { if let Some(lei) = entry["lei"].as_str() { no_result_leis.insert(lei.to_string()); } } } if !no_result_leis.is_empty() { logger::log_info(&format!( "Found {} LEIs previously queried with no FIGI results", no_result_leis.len() )).await; } Ok(no_result_leis) } /// Save LEI that was queried but returned no results async fn append_no_result_lei(date_dir: &Path, lei: &str, isins: &[String]) -> anyhow::Result<()> { let no_results_path = date_dir.join("no_results.jsonl"); let entry = json!({ "lei": lei, "isins": isins, "queried_at": chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), }); let line = serde_json::to_string(&entry)? + "\n"; let mut file = tokio_fs::OpenOptions::new() .create(true) .append(true) .open(&no_results_path) .await?; file.write_all(line.as_bytes()).await?; Ok(()) } async fn process_and_save_figi_batch( client: &OpenFigiClient, lei_batch: &HashMap>, date_dir: &Path, ) -> anyhow::Result<()> { for (lei, isins) in lei_batch { let unique_isins: Vec<_> = isins.iter() .cloned() .collect::>() .into_iter() .collect(); let figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; if figi_infos.is_empty() { // No FIGIs found - save to no_results.jsonl to avoid re-querying append_no_result_lei(date_dir, lei, &unique_isins).await?; continue; } // Save FIGIs by sector as before save_figi_infos_by_sector(lei, &figi_infos, date_dir).await?; } Ok(()) } async fn save_figi_infos_by_sector( lei: &str, figi_infos: &[FigiData], date_dir: &Path, ) -> anyhow::Result<()> { let mut by_sector: HashMap> = HashMap::new(); for figi_info in figi_infos { let sector = if figi_info.market_sector.is_empty() { "uncategorized".to_string() } else { figi_info.market_sector.clone() }; by_sector.entry(sector).or_default().push(figi_info.clone()); } for (sector, figis) in by_sector { let sector_dir = date_dir.join(§or); let path = sector_dir.join("lei_to_figi.jsonl"); append_lei_to_figi_jsonl(&path, lei, &figis).await?; } Ok(()) } async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiData]) -> anyhow::Result<()> { let entry = json!({ "lei": lei, "figis": figis, }); let line = serde_json::to_string(&entry)? + "\n"; let mut file = tokio_fs::OpenOptions::new() .create(true) .append(true) .open(path) .await?; file.write_all(line.as_bytes()).await?; Ok(()) }