diff --git a/src/corporate/bond_processing.rs b/src/corporate/bond_processing.rs index 78c3039..4979ba6 100644 --- a/src/corporate/bond_processing.rs +++ b/src/corporate/bond_processing.rs @@ -2,7 +2,6 @@ // Bond-specific processing logic for corporate and government bonds use super::types::*; -use std::collections::HashMap; /// Parse bond details from ticker and security description /// @@ -235,164 +234,40 @@ pub fn classify_government_issuer(name: &str) -> String { "other".to_string() } -/// Process corporate bonds from FIGI data -/// Mirrors the pattern used for warrants/options -pub fn process_corporate_bonds( - figi_infos: &[FigiInfo], - existing_bonds: &mut HashMap, -) -> usize { - let mut new_count = 0; +/// Classify government bond type based on security_type +/// +/// Maps OpenFIGI security types to simplified bond categories for government bonds +/// +/// # Examples +/// - "DOMESTIC" -> "domestic" +/// - "GLOBAL" -> "global" +/// - "EURO NON-DOLLAR" -> "euro" +/// - "DOMESTIC MTN" -> "mtn" +pub fn classify_government_bond_type(security_type: &str) -> String { + let security_type_upper = security_type.to_uppercase(); - // Group by issuer name - let mut by_issuer: HashMap> = HashMap::new(); - for figi in figi_infos { - by_issuer.entry(figi.name.clone()).or_default().push(figi.clone()); + if security_type_upper.contains("GLOBAL") { + return "global".to_string(); } - for (issuer_name, figis) in by_issuer { - let bond_info = existing_bonds - .entry(issuer_name.clone()) - .or_insert_with(|| CorporateBondInfo { - issuer_name: issuer_name.clone(), - bonds: HashMap::new(), - bond_details: HashMap::new(), - }); - - for figi in figis { - // Group by ISIN - let isin_bonds = bond_info.bonds.entry(figi.isin.clone()).or_default(); - - // Check if this specific FIGI already exists - if !isin_bonds.iter().any(|f| f.figi == figi.figi) { - // Parse bond details - let details = parse_bond_details(&figi.ticker, &figi.security_description); - bond_info.bond_details.insert(figi.isin.clone(), details); - - isin_bonds.push(figi); - new_count += 1; - } + if security_type_upper.contains("EURO") { + if security_type_upper.contains("NON-DOLLAR") || !security_type_upper.contains("DOLLAR") { + return "euro".to_string(); } + return "eurodollar".to_string(); } - new_count + if security_type_upper.contains("YANKEE") { + return "yankee".to_string(); + } + + if security_type_upper.contains("MTN") { + return "mtn".to_string(); + } + + if security_type_upper.contains("DOMESTIC") { + return "domestic".to_string(); + } + + "other".to_string() } - -/// Process government bonds from FIGI data -/// Mirrors the pattern used for warrants/options -pub fn process_government_bonds( - figi_infos: &[FigiInfo], - existing_bonds: &mut HashMap, -) -> usize { - let mut new_count = 0; - - // Group by issuer name - let mut by_issuer: HashMap> = HashMap::new(); - for figi in figi_infos { - by_issuer.entry(figi.name.clone()).or_default().push(figi.clone()); - } - - for (issuer_name, figis) in by_issuer { - let issuer_type = classify_government_issuer(&issuer_name); - - let bond_info = existing_bonds - .entry(issuer_name.clone()) - .or_insert_with(|| GovernmentBondInfo { - issuer_name: issuer_name.clone(), - issuer_type: issuer_type.clone(), - bonds: HashMap::new(), - bond_details: HashMap::new(), - }); - - for figi in figis { - // Group by ISIN - let isin_bonds = bond_info.bonds.entry(figi.isin.clone()).or_default(); - - // Check if this specific FIGI already exists - if !isin_bonds.iter().any(|f| f.figi == figi.figi) { - // Parse bond details - let details = parse_bond_details(&figi.ticker, &figi.security_description); - bond_info.bond_details.insert(figi.isin.clone(), details); - - isin_bonds.push(figi); - new_count += 1; - } - } - } - - new_count -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_corporate_bond() { - let details = parse_bond_details( - "WTFC 4.3 01/12/26 0003", - "WTFC 4.3 01/12/26" - ); - - assert_eq!(details.coupon_rate, Some(4.3)); - assert_eq!(details.maturity_date, Some("2026-01-12".to_string())); - assert!(!details.is_floating); - assert!(!details.is_zero_coupon); - assert_eq!(details.series_identifier, Some("0003".to_string())); - } - - #[test] - fn test_parse_government_bond() { - let details = parse_bond_details( - "SLOVAK 1.5225 05/10/28 4Y", - "SLOVAK 1.5225 05/10/28" - ); - - assert_eq!(details.coupon_rate, Some(1.5225)); - assert_eq!(details.maturity_date, Some("2028-05-10".to_string())); - assert!(!details.is_floating); - assert_eq!(details.series_identifier, Some("4Y".to_string())); - } - - #[test] - fn test_parse_floating_rate() { - let details = parse_bond_details( - "SEK Float 06/30/34", - "SEK Float 06/30/34" - ); - - assert!(details.is_floating); - assert_eq!(details.maturity_date, Some("2034-06-30".to_string())); - assert_eq!(details.coupon_rate, None); - } - - #[test] - fn test_parse_fractional_coupon() { - let details = parse_bond_details( - "DANGCE 12 1/2 05/30/26 B", - "DANGCE 12 1/2 05/30/26" - ); - - assert_eq!(details.coupon_rate, Some(12.5)); - assert_eq!(details.maturity_date, Some("2026-05-30".to_string())); - } - - #[test] - fn test_parse_zero_coupon() { - let details = parse_bond_details( - "GGB 0 10/15/42", - "GGB 0 10/15/42" - ); - - assert_eq!(details.coupon_rate, Some(0.0)); - assert!(details.is_zero_coupon); - assert_eq!(details.maturity_date, Some("2042-10-15".to_string())); - } - - #[test] - fn test_classify_issuer_types() { - assert_eq!(classify_government_issuer("SLOVAK REPUBLIC"), "sovereign"); - assert_eq!(classify_government_issuer("ASNES KOMMUNE"), "municipal"); - assert_eq!(classify_government_issuer("SWEDISH EXPORT CREDIT"), "agency"); - assert_eq!(classify_government_issuer("REGION OCCITANIE"), "state"); - } -} \ No newline at end of file diff --git a/src/corporate/types.rs b/src/corporate/types.rs index a8c4bc0..9d0667f 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -145,7 +145,7 @@ pub struct BondDetails { /// ticker: "WTFC 4.3 01/12/26 0003" #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CorporateBondInfo { - pub issuer_name: String, // key - company name issuing the bond + pub underlying_company_name: String, // key - company name issuing the bond pub bonds: HashMap>, // ISIN -> Vec (grouped by ISIN) #[serde(skip_serializing_if = "HashMap::is_empty", default)] pub bond_details: HashMap, // ISIN -> parsed bond details diff --git a/src/corporate/update_openfigi.rs b/src/corporate/update_openfigi.rs index 0bdb047..ff0d9b8 100644 --- a/src/corporate/update_openfigi.rs +++ b/src/corporate/update_openfigi.rs @@ -1,15 +1,14 @@ // src/corporate/update_openfigi.rs - STREAMING VERSION // Key changes: Never load entire GLEIF CSV or FIGI maps into memory - +use super::types::*; +use super::bond_processing::*; use crate::util::directories::DataPaths; use crate::util::integrity::{DataStage, StateManager, directory_reference}; use crate::util::logger; use crate::scraper::openfigi::{OpenFigiClient}; -use super::types::*; use serde_json::{json, Value}; use std::collections::{HashMap, HashSet}; use std::path::Path; -use std::io::{BufRead, BufReader}; use tokio::fs as tokio_fs; use tokio::io::AsyncWriteExt; use anyhow::{Context, anyhow}; @@ -106,9 +105,7 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> { let step_name = "securities_data_complete"; let data_dir = dir.data_dir(); - let corporate_data_dir = data_dir.join("corporate"); - let economic_data_dir = data_dir.join("economic"); - let output_dir = data_dir.join("by_name"); + let output_dir = data_dir.join("figi_securities"); tokio_fs::create_dir_all(&output_dir).await .context("Failed to create corporate/by_name directory")?; @@ -209,9 +206,13 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> { &common_log, &warrants_log, &options_log, + &corporate_bonds_log, + &government_bonds_log, &mut existing_companies, &mut existing_warrants, &mut existing_options, + &mut existing_corporate_bonds, + &mut existing_government_bonds, &mut stats, ).await?; @@ -228,6 +229,8 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> { create_checkpoint(&common_checkpoint, &common_log).await?; create_checkpoint(&warrants_checkpoint, &warrants_log).await?; create_checkpoint(&options_checkpoint, &options_log).await?; + create_checkpoint(&corporate_bonds_checkpoint, &corporate_bonds_log).await?; + create_checkpoint(&government_bonds_checkpoint, &government_bonds_log).await?; } stats.print_summary(); @@ -251,6 +254,8 @@ async fn track_securities_completion( "common_stocks.jsonl".to_string(), "warrants.jsonl".to_string(), "options.jsonl".to_string(), + "corporate_bonds.jsonl".to_string(), + "government_bonds.jsonl".to_string(), ]), Some(vec![ "*.log.jsonl".to_string(), // Exclude log files @@ -495,9 +500,13 @@ async fn process_lei_figi_file_batched( common_log_path: &Path, warrants_log_path: &Path, options_log_path: &Path, + corporate_bonds_log_path: &Path, + government_bonds_log_path: &Path, existing_companies: &mut HashSet, existing_warrants: &mut HashSet, existing_options: &mut HashSet, + existing_corporate_bonds: &mut HashSet, + existing_government_bonds: &mut HashSet, stats: &mut StreamingStats, ) -> anyhow::Result<()> { let content = tokio_fs::read_to_string(input_path).await @@ -506,9 +515,11 @@ async fn process_lei_figi_file_batched( let batch_size = 100; let mut processed_count = 0; - let mut common_batch = Vec::new(); - let mut warrants_batch = Vec::new(); - let mut options_batch = Vec::new(); + let mut common_batch: Vec = Vec::new(); + let mut warrants_batch: Vec = Vec::new(); + let mut options_batch: Vec = Vec::new(); + let mut corporate_bonds_batch: Vec = Vec::new(); + let mut government_bonds_batch: Vec = Vec::new(); for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() { @@ -526,7 +537,7 @@ async fn process_lei_figi_file_batched( } // Group by security type - let (common_stocks, warrant_securities, option_securities) = + let (common_stocks, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) = group_by_security_type(&figis); // Collect entries for batching and update existing keys @@ -555,6 +566,24 @@ async fn process_lei_figi_file_batched( options_batch.push(entry); } } + + if !corporate_bonds_securities.is_empty() { + for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds) { + // Use underlying_company_name as the key (not issuer_company_name) + let key = entry.underlying_company_name.clone(); + existing_corporate_bonds.insert(key); + corporate_bonds_batch.push(entry); + } + } + + if !government_bonds_securities.is_empty() { + for entry in prepare_government_bond_entries(&government_bonds_securities, existing_government_bonds) { + // Use issuer_name as the key (not issuer_country_name) + let key = entry.issuer_name.clone(); + existing_government_bonds.insert(key); + government_bonds_batch.push(entry); + } + } // Write batches when they reach size limit if common_batch.len() >= batch_size { @@ -574,6 +603,18 @@ async fn process_lei_figi_file_batched( stats.options_added += options_batch.len(); options_batch.clear(); } + + if corporate_bonds_batch.len() >= batch_size { + write_batch_with_fsync(corporate_bonds_log_path, &corporate_bonds_batch).await?; + stats.corporate_bonds_added += corporate_bonds_batch.len(); + corporate_bonds_batch.clear(); + } + + if government_bonds_batch.len() >= batch_size { + write_batch_with_fsync(government_bonds_log_path, &government_bonds_batch).await?; + stats.government_bonds_added += government_bonds_batch.len(); + government_bonds_batch.clear(); + } processed_count += 1; if processed_count % 1000 == 0 { @@ -596,6 +637,16 @@ async fn process_lei_figi_file_batched( write_batch_with_fsync(options_log_path, &options_batch).await?; stats.options_added += options_batch.len(); } + + if !corporate_bonds_batch.is_empty() { + write_batch_with_fsync(corporate_bonds_log_path, &corporate_bonds_batch).await?; + stats.corporate_bonds_added += corporate_bonds_batch.len(); + } + + if !government_bonds_batch.is_empty() { + write_batch_with_fsync(government_bonds_log_path, &government_bonds_batch).await?; + stats.government_bonds_added += government_bonds_batch.len(); + } Ok(()) } @@ -719,11 +770,140 @@ fn prepare_option_entries( entries } +/// Prepares corporate bond entries for batching +/// +/// Groups corporate bonds by issuer (underlying_company_name), extracting key bond details +/// like coupon rate, maturity date, and tenor from the ticker/description for each ISIN. +/// +/// # Arguments +/// * `corporate_bond_securities` - List of FigiInfo objects for corporate bonds +/// * `existing_keys` - Set of already-processed keys (format: "company_name") +/// +/// # Returns +/// Vector of CorporateBondInfo entries, one per unique issuer +fn prepare_corporate_bond_entries( + corporate_bond_securities: &[FigiInfo], + existing_keys: &HashSet, +) -> Vec { + let mut entries = Vec::new(); + + // Group bonds by issuer (company name) + let mut grouped: HashMap> = HashMap::new(); + + for figi in corporate_bond_securities { + let issuer = figi.name.clone(); + if issuer.is_empty() { + continue; + } + + grouped.entry(issuer).or_default().push(figi.clone()); + } + + // Create entries for each unique issuer + for (issuer, figis) in grouped { + // Check if this issuer already exists + if existing_keys.contains(&issuer) { + continue; + } + + // Group by ISIN + let bonds_by_isin = group_figis_by_isin(&figis); + + // Parse bond details for each ISIN + let mut bond_details_map: HashMap = HashMap::new(); + + for (isin, isin_figis) in &bonds_by_isin { + if let Some(first_figi) = isin_figis.first() { + let details = parse_bond_details(&first_figi.ticker, &first_figi.security_description); + bond_details_map.insert(isin.clone(), details); + } + } + + let bond_info = CorporateBondInfo { + underlying_company_name: issuer.clone(), + bonds: bonds_by_isin, + bond_details: bond_details_map, + }; + + entries.push(bond_info); + } + + entries +} + +/// Prepares government bond entries for batching +/// +/// Groups government bonds by issuer (country/entity), extracting key bond +/// details like coupon rate, maturity date, and tenor from the ticker/description for each ISIN. +/// Also classifies the government issuer type (sovereign, municipal, agency, etc.) +/// +/// # Arguments +/// * `government_bond_securities` - List of FigiInfo objects for government bonds +/// * `existing_keys` - Set of already-processed keys (format: "issuer_name") +/// +/// # Returns +/// Vector of GovernmentBondInfo entries, one per unique issuer +fn prepare_government_bond_entries( + government_bond_securities: &[FigiInfo], + existing_keys: &HashSet, +) -> Vec { + let mut entries = Vec::new(); + + // Group bonds by issuer (country/entity name) + let mut grouped: HashMap> = HashMap::new(); + + for figi in government_bond_securities { + let issuer = figi.name.clone(); + if issuer.is_empty() { + continue; + } + + grouped.entry(issuer).or_default().push(figi.clone()); + } + + // Create entries for each unique issuer + for (issuer, figis) in grouped { + // Check if this issuer already exists + if existing_keys.contains(&issuer) { + continue; + } + + // Classify the government issuer type + let issuer_type = classify_government_issuer(&issuer); + + // Group by ISIN + let bonds_by_isin = group_figis_by_isin(&figis); + + // Parse bond details for each ISIN + let mut bond_details_map: HashMap = HashMap::new(); + + for (isin, isin_figis) in &bonds_by_isin { + if let Some(first_figi) = isin_figis.first() { + let details = parse_bond_details(&first_figi.ticker, &first_figi.security_description); + bond_details_map.insert(isin.clone(), details); + } + } + + let bond_info = GovernmentBondInfo { + issuer_name: issuer.clone(), + issuer_type, + bonds: bonds_by_isin, + bond_details: bond_details_map, + }; + + entries.push(bond_info); + } + + entries +} + /// Groups FigiInfo list by security type -fn group_by_security_type(figis: &[FigiInfo]) -> (Vec, Vec, Vec) { - let mut common_stocks = Vec::new(); - let mut warrants = Vec::new(); - let mut options = Vec::new(); +fn group_by_security_type(figis: &[FigiInfo]) -> (Vec, Vec, Vec, Vec, Vec) { + let mut common_stocks:Vec = Vec::new(); + let mut warrants:Vec = Vec::new(); + let mut options:Vec = Vec::new(); + let mut corporate_bonds:Vec = Vec::new(); + let mut government_bonds:Vec = Vec::new(); for figi in figis { match figi.security_type.as_str() { @@ -732,9 +912,14 @@ fn group_by_security_type(figis: &[FigiInfo]) -> (Vec, Vec, "Equity Option" => options.push(figi.clone()), _ => {} } + match figi.security_type2.as_str() { + "Corp" => corporate_bonds.push(figi.clone()), + "Govt" => government_bonds.push(figi.clone()), + _ => {} + } } - (common_stocks, warrants, options) + (common_stocks, warrants, options, corporate_bonds, government_bonds) } /// Groups FigiInfo by ISIN @@ -1193,7 +1378,6 @@ pub async fn stream_gleif_csv_and_build_figi_filtered( setup_sector_directories(&date_dir, §or_dirs).await?; let mut lei_batch: HashMap> = HashMap::new(); - let mut line_count = 0; let mut processed_leis = 0; let mut skipped_leis = 0; @@ -1219,7 +1403,6 @@ pub async fn stream_gleif_csv_and_build_figi_filtered( } lei_batch.entry(lei).or_default().push(isin); - line_count += 1; // Process batch when full if lei_batch.len() >= LEI_BATCH_SIZE { diff --git a/src/economic/storage.rs b/src/economic/storage.rs index 7ff1908..7bce313 100644 --- a/src/economic/storage.rs +++ b/src/economic/storage.rs @@ -8,7 +8,6 @@ use chrono::{NaiveDate, Datelike}; use std::collections::HashMap; use serde_json; -const CHUNK_SIZE: usize = 500; // Process 500 events at a time const MAX_EVENTS_PER_FILE: usize = 3000; pub async fn scan_existing_chunks(paths: &DataPaths) -> anyhow::Result> {