diff --git a/Cargo.lock b/Cargo.lock index d3006cb..93bad74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3508,6 +3508,7 @@ version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ + "getrandom 0.3.4", "js-sys", "wasm-bindgen", ] @@ -3670,6 +3671,7 @@ dependencies = [ "tracing-subscriber", "url", "urlencoding", + "uuid", "walkdir", "yfinance-rs", "zip", diff --git a/Cargo.toml b/Cargo.toml index 337f007..f77152b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,4 +65,5 @@ tokio-tungstenite = "0.21" # For WebSocket support #tempfile = "3.24.0" # data integrity -sha2 = "0.10.9" \ No newline at end of file +sha2 = "0.10.9" +uuid = { version = "1.0", features = ["v4", "v7"] } \ No newline at end of file diff --git a/src/corporate/types.rs b/src/corporate/types.rs index 73aa649..16c1cbb 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -149,6 +149,7 @@ pub struct BondDetails { /// ticker: "WTFC 4.3 01/12/26 0003" #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CorporateBondData { + pub company_id: String, // key - company id issuing the bond pub underlying_company_name: String, // key - company name issuing the bond pub bonds: HashMap>, // ISIN -> Vec (grouped by ISIN) #[serde(skip_serializing_if = "HashMap::is_empty", default)] diff --git a/src/corporate/update_openfigi.rs b/src/corporate/update_openfigi.rs index 3255e24..fca515c 100644 --- a/src/corporate/update_openfigi.rs +++ b/src/corporate/update_openfigi.rs @@ -429,6 +429,11 @@ async fn create_checkpoint(checkpoint_path: &Path, log_path: &Path) -> anyhow::R } /// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync +/// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync +/// +/// Two-phase processing: +/// Phase 1: Process all common stocks first to generate company UUIDs +/// Phase 2: Process dependent securities (warrants, options, corporate bonds) using company UUIDs async fn process_lei_figi_file_batched( input_path: &Path, common_log_path: &Path, @@ -449,11 +454,11 @@ async fn process_lei_figi_file_batched( let batch_size = 100; let mut processed_count = 0; + // === PHASE 1: Process common stocks and build company_id mapping === + logger::log_info(" Phase 1: Processing common stocks...").await; + let mut common_batch: Vec = Vec::new(); - let mut warrants_batch: Vec = Vec::new(); - let mut options_batch: Vec = Vec::new(); - let mut corporate_bonds_batch: Vec = Vec::new(); - let mut government_bonds_batch: Vec = Vec::new(); + let mut company_id_map: HashMap = HashMap::new(); // company_name -> company_id for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() { @@ -471,54 +476,24 @@ async fn process_lei_figi_file_batched( } // Group by security type - let (common_stocks, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) = - group_securities(&figis); + let (common_stocks, _, _, _, _) = group_securities(&figis); - // Collect entries for batching and update existing keys + // Process common stocks if !common_stocks.is_empty() { - if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) { + if let Some(mut entry) = prepare_common_stock_entry(&common_stocks, existing_companies) { + // Generate UUID for company if not already done + if !company_id_map.contains_key(&entry.name) { + let company_id = uuid::Uuid::new_v4().to_string(); + company_id_map.insert(entry.name.clone(), company_id.clone()); + entry.id = company_id; + } + // Add to existing set immediately to prevent duplicates in same run existing_companies.insert(entry.name.clone()); common_batch.push(entry); } } - if !warrant_securities.is_empty() { - for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) { - // Add to existing set immediately - let key = entry.company_id.clone(); - existing_warrants.insert(key); - warrants_batch.push(entry); - } - } - - if !option_securities.is_empty() { - for entry in prepare_option_entries(&option_securities, existing_options) { - // Add to existing set immediately - let key = entry.company_name.clone(); - existing_options.insert(key); - options_batch.push(entry); - } - } - - if !corporate_bonds_securities.is_empty() { - for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds) { - // Use underlying_company_name as the key (not issuer_company_name) - let key = entry.underlying_company_name.clone(); - existing_corporate_bonds.insert(key); - corporate_bonds_batch.push(entry); - } - } - - if !government_bonds_securities.is_empty() { - for entry in prepare_government_bond_entries(&government_bonds_securities, existing_government_bonds) { - // Use issuer_name as the key (not issuer_country_name) - let key = entry.issuer_name.clone(); - existing_government_bonds.insert(key); - government_bonds_batch.push(entry); - } - } - // Write batches when they reach size limit if common_batch.len() >= batch_size { write_batch_with_fsync(common_log_path, &common_batch).await?; @@ -526,6 +501,82 @@ async fn process_lei_figi_file_batched( common_batch.clear(); } + processed_count += 1; + if processed_count % 1000 == 0 { + logger::log_info(&format!(" Phase 1: Processed {} LEI entries...", processed_count)).await; + } + } + + // Write remaining common stocks batch + if !common_batch.is_empty() { + write_batch_with_fsync(common_log_path, &common_batch).await?; + stats.companies_added += common_batch.len(); + } + + logger::log_info(&format!(" Phase 1 complete: Generated {} company UUIDs", company_id_map.len())).await; + + // === PHASE 2: Process dependent securities using company_id mapping === + logger::log_info(" Phase 2: Processing warrants, options, and corporate bonds...").await; + + let mut warrants_batch: Vec = Vec::new(); + let mut options_batch: Vec = Vec::new(); + let mut corporate_bonds_batch: Vec = Vec::new(); + let mut government_bonds_batch: Vec = Vec::new(); + + processed_count = 0; + + for (line_num, line) in content.lines().enumerate() { + if line.trim().is_empty() { + continue; + } + + let entry: Value = serde_json::from_str(line) + .context(format!("Failed to parse JSON on line {}", line_num + 1))?; + + let figis: Vec = serde_json::from_value(entry["figis"].clone()) + .context("Invalid 'figis' field")?; + + if figis.is_empty() { + continue; + } + + // Group by security type + let (_, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) = + group_securities(&figis); + + if !warrant_securities.is_empty() { + for entry in prepare_warrant_entries(&warrant_securities, existing_warrants, &company_id_map) { + let key = entry.company_id.clone(); + existing_warrants.insert(key); + warrants_batch.push(entry); + } + } + + if !option_securities.is_empty() { + for entry in prepare_option_entries(&option_securities, existing_options, &company_id_map) { + let key = entry.company_name.clone(); + existing_options.insert(key); + options_batch.push(entry); + } + } + + if !corporate_bonds_securities.is_empty() { + for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds, &company_id_map) { + let key = entry.underlying_company_name.clone(); + existing_corporate_bonds.insert(key); + corporate_bonds_batch.push(entry); + } + } + + if !government_bonds_securities.is_empty() { + for entry in prepare_government_bond_entries(&government_bonds_securities, existing_government_bonds) { + let key = entry.issuer_name.clone(); + existing_government_bonds.insert(key); + government_bonds_batch.push(entry); + } + } + + // Write batches when they reach size limit if warrants_batch.len() >= batch_size { write_batch_with_fsync(warrants_log_path, &warrants_batch).await?; stats.warrants_added += warrants_batch.len(); @@ -552,16 +603,11 @@ async fn process_lei_figi_file_batched( processed_count += 1; if processed_count % 1000 == 0 { - logger::log_info(&format!(" Processed {} LEI entries...", processed_count)).await; + logger::log_info(&format!(" Phase 2: Processed {} LEI entries...", processed_count)).await; } } // Write remaining batches - if !common_batch.is_empty() { - write_batch_with_fsync(common_log_path, &common_batch).await?; - stats.companies_added += common_batch.len(); - } - if !warrants_batch.is_empty() { write_batch_with_fsync(warrants_log_path, &warrants_batch).await?; stats.warrants_added += warrants_batch.len(); @@ -582,6 +628,7 @@ async fn process_lei_figi_file_batched( stats.government_bonds_added += government_bonds_batch.len(); } + logger::log_info(" Phase 2 complete").await; Ok(()) } @@ -639,9 +686,14 @@ fn prepare_common_stock_entry( } /// Prepares warrant entries for batching +/// Prepares warrant entries for batching +/// +/// Groups warrant contracts by underlying company, using company_id from the company_id_map +/// if the company exists, otherwise generates a new ID for the warrant. fn prepare_warrant_entries( warrant_securities: &[FigiData], existing_keys: &HashSet, + company_id_map: &HashMap, ) -> Vec { let mut entries = Vec::new(); @@ -666,10 +718,10 @@ fn prepare_warrant_entries( continue; } - let company_id = format!("warrant_{}", std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos()); + // Use company_id from map if company exists, otherwise generate new ID for warrant + let company_id = company_id_map.get(&underlying_company) + .cloned() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); let mut warrants_by_type: HashMap = HashMap::new(); @@ -715,9 +767,14 @@ fn prepare_warrant_entries( /// /// # Returns /// Vector of OptionData entries, one per unique underlying company +/// Prepares option entries for batching +/// +/// Groups option contracts by underlying company, using company_id from the company_id_map +/// if the company exists, otherwise generates a new ID for the option. fn prepare_option_entries( option_securities: &[FigiData], existing_keys: &HashSet, + company_id_map: &HashMap, ) -> Vec { let mut entries = Vec::new(); @@ -742,6 +799,11 @@ fn prepare_option_entries( continue; } + // Use company_id from map if company exists, otherwise generate new ID for option + let company_id = company_id_map.get(&underlying_company) + .cloned() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + // Build OptionContracts and extract strikes/expirations let mut option_contracts: HashMap, Vec)> = HashMap::new(); let mut all_strikes: std::collections::HashSet = std::collections::HashSet::new(); @@ -795,7 +857,7 @@ fn prepare_option_entries( .collect::>(); let option_data = OptionData { - company_id: underlying_company.clone(), + company_id, company_name: underlying_company.clone(), expiration_dates, strikes, @@ -820,9 +882,14 @@ fn prepare_option_entries( /// /// # Returns /// Vector of CorporateBondInfo entries, one per unique issuer +/// Prepares corporate bond entries for batching +/// +/// Groups corporate bonds by issuer (underlying_company_name), using company_id from the company_id_map +/// if the company exists, otherwise generates a new ID for the bond. fn prepare_corporate_bond_entries( corporate_bond_securities: &[FigiData], existing_keys: &HashSet, + company_id_map: &HashMap, ) -> Vec { let mut entries = Vec::new(); @@ -845,6 +912,11 @@ fn prepare_corporate_bond_entries( continue; } + // Use company_id from map if company exists, otherwise generate new ID for bond + let company_id = company_id_map.get(&issuer) + .cloned() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + // Group by ISIN let bonds_by_isin = group_figis_by_isin(&figis); @@ -859,6 +931,7 @@ fn prepare_corporate_bond_entries( } let bond_info = CorporateBondData { + company_id, underlying_company_name: issuer.clone(), bonds: bonds_by_isin, bond_details: bond_details_map,