diff --git a/README.md b/README.md index 1bf780c..e6be00d 100644 --- a/README.md +++ b/README.md @@ -248,4 +248,8 @@ Der Scraper unterstützt 52 Länder und Regionen (siehe `countries.json`), darun ## chromedriver Download https://chromedriver.storage.googleapis.com/index.html -https://googlechromelabs.github.io/chrome-for-testing/ \ No newline at end of file +https://googlechromelabs.github.io/chrome-for-testing/ + +## Gaphviz.org Download + +https://graphviz.org/download/ \ No newline at end of file diff --git a/logs/checkpoint_dependencies.dot b/integrity/checkpoint_dependencies.dot similarity index 89% rename from logs/checkpoint_dependencies.dot rename to integrity/checkpoint_dependencies.dot index b6b9339..6aec350 100644 --- a/logs/checkpoint_dependencies.dot +++ b/integrity/checkpoint_dependencies.dot @@ -2,24 +2,24 @@ digraph Dependencies { rankdir=LR; node [shape=box]; - "yahoo_companies_cleansed" [label="yahoo_companies_cleansed -Company data cleansed and validated"]; - "yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete -Corporate events enriched for all companies"]; "yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete Options data enriched for all companies"]; "lei_figi_mapping_complete" [label="lei_figi_mapping_complete LEI-to-FIGI mappings from OpenFIGI API"]; - "exchange_collection_complete" [label="exchange_collection_complete + "yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete +Chart data enriched for all companies"]; + "enrichment_group" [label="enrichment_group Yahoo exchanges collected and validated"]; "securities_data_complete" [label="securities_data_complete Securities data built from FIGI mappings"]; - "yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete -Chart data enriched for all companies"]; + "yahoo_companies_cleansed" [label="yahoo_companies_cleansed +Company data cleansed and validated"]; + "yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete +Corporate events enriched for all companies"]; - "yahoo_companies_cleansed" -> "exchange_collection_complete"; - "yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; "yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; - "securities_data_complete" -> "lei_figi_mapping_complete"; "yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; + "securities_data_complete" -> "lei_figi_mapping_complete"; + "yahoo_companies_cleansed" -> "securities_data_complete"; + "yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; } diff --git a/integrity/checkpoint_dependencies.toml b/integrity/checkpoint_dependencies.toml index 5668e57..8c506c8 100644 --- a/integrity/checkpoint_dependencies.toml +++ b/integrity/checkpoint_dependencies.toml @@ -4,21 +4,21 @@ # COLLECTION STAGE (No dependencies) # ============================================================================ -[checkpoints.exchange_collection_complete] -description = "Yahoo exchanges collected and validated" -depends_on = [] - [checkpoints.lei_figi_mapping_complete] description = "LEI-to-FIGI mappings from OpenFIGI API" depends_on = [] +[checkpoints.securities_data_complete] +description = "Securities data built from FIGI mappings" +depends_on = ["lei_figi_mapping_complete"] + # ============================================================================ # CLEANSING STAGE (Depends on collection) # ============================================================================ [checkpoints.yahoo_companies_cleansed] description = "Company data cleansed and validated" -depends_on = ["exchange_collection_complete"] +depends_on = ["securities_data_complete"] # ============================================================================ # ENRICHMENT GROUP (All depend on cleansed companies) @@ -52,6 +52,6 @@ group = "enrichment_group" # SECURITIES PROCESSING (Depends on LEI mapping) # ============================================================================ -[checkpoints.securities_data_complete] -description = "Securities data built from FIGI mappings" -depends_on = ["lei_figi_mapping_complete"] \ No newline at end of file +[checkpoints.enrichment_group] +description = "Yahoo exchanges collected and validated" +depends_on = [] \ No newline at end of file diff --git a/src/corporate/bond_processing.rs b/src/corporate/bond_processing.rs new file mode 100644 index 0000000..78c3039 --- /dev/null +++ b/src/corporate/bond_processing.rs @@ -0,0 +1,398 @@ +// src/corporate/bond_processing.rs +// Bond-specific processing logic for corporate and government bonds + +use super::types::*; +use std::collections::HashMap; + +/// Parse bond details from ticker and security description +/// +/// Examples: +/// - "WTFC 4.3 01/12/26 0003" -> coupon: 4.3, maturity: 2026-01-12 +/// - "SLOVAK 1.5225 05/10/28 4Y" -> coupon: 1.5225, maturity: 2028-05-10 +/// - "SEK Float 06/30/34" -> floating rate, maturity: 2034-06-30 +/// - "GGB 0 10/15/42" -> zero coupon, maturity: 2042-10-15 +pub fn parse_bond_details(ticker: &str, security_description: &str) -> BondDetails { + let mut details = BondDetails { + coupon_rate: None, + maturity_date: None, + is_floating: false, + is_zero_coupon: false, + tenor_years: None, + series_identifier: None, + }; + + // Check for floating rate - look for "Float", " F ", "V0" patterns + if ticker.contains("Float") || ticker.contains(" F ") || ticker.contains(" V0 ") + || security_description.contains("Float") { + details.is_floating = true; + } + + // Parse coupon rate if not floating + if !details.is_floating { + if let Some(coupon) = extract_coupon_rate(ticker, security_description) { + details.coupon_rate = Some(coupon); + details.is_zero_coupon = coupon == 0.0; + } + } + + // Parse maturity date + if let Some(maturity) = extract_maturity_date(ticker, security_description) { + details.maturity_date = Some(maturity.clone()); + + // Calculate tenor (simplified - just extract year) + if let Some(year_str) = maturity.split('-').next() { + if let Ok(mat_year) = year_str.parse::() { + let current_year = 2026; // From system prompt + let years_to_maturity = (mat_year - current_year).max(0) as u32; + details.tenor_years = Some(years_to_maturity); + } + } + } + + // Extract series identifier + details.series_identifier = extract_series_identifier(ticker); + + details +} + +/// Extract coupon rate from ticker/description +/// Handles: "4.3", "1.5225", "12 1/2" (fractional), "0" +fn extract_coupon_rate(ticker: &str, description: &str) -> Option { + let text = format!("{} {}", ticker, description); + + // Pattern 1: Fractional rates like "12 1/2" -> 12.5 + if let Some(frac_result) = parse_fractional_coupon(&text) { + return Some(frac_result); + } + + // Pattern 2: Decimal rates like "4.3" or "1.5225" + // Look for number followed by space and date pattern + let parts: Vec<&str> = text.split_whitespace().collect(); + for i in 0..parts.len() { + if let Ok(rate) = parts[i].parse::() { + // Sanity check: coupon rates are typically 0-20% + if rate >= 0.0 && rate <= 20.0 { + // Make sure it's before a date-like pattern + if i + 1 < parts.len() { + let next = parts[i + 1]; + if next.contains('/') || next.len() >= 8 { + return Some(rate); + } + } + } + } + } + + None +} + +/// Parse fractional coupon like "12 1/2" -> 12.5 +fn parse_fractional_coupon(text: &str) -> Option { + let parts: Vec<&str> = text.split_whitespace().collect(); + + for i in 0..parts.len().saturating_sub(1) { + // Check if current part is a number + if let Ok(whole) = parts[i].parse::() { + // Check if next part is a fraction like "1/2" + if let Some(slash_pos) = parts[i + 1].find('/') { + let frac_str = parts[i + 1]; + let num_str = &frac_str[..slash_pos]; + let den_str = &frac_str[slash_pos + 1..]; + + if let (Ok(num), Ok(den)) = (num_str.parse::(), den_str.parse::()) { + if den != 0.0 { + return Some(whole + num / den); + } + } + } + } + } + + None +} + +/// Extract maturity date from ticker/description +/// Handles: "01/12/26", "05/10/28", "06/30/2034" +fn extract_maturity_date(ticker: &str, description: &str) -> Option { + let text = format!("{} {}", ticker, description); + + // Look for MM/DD/YY or MM/DD/YYYY patterns + let parts: Vec<&str> = text.split_whitespace().collect(); + + for part in parts { + if let Some(date) = parse_date_pattern(part) { + return Some(date); + } + } + + None +} + +/// Parse various date formats to YYYY-MM-DD +fn parse_date_pattern(s: &str) -> Option { + let slash_count = s.matches('/').count(); + + if slash_count != 2 { + return None; + } + + let parts: Vec<&str> = s.split('/').collect(); + if parts.len() != 3 { + return None; + } + + let month = parts[0]; + let day = parts[1]; + let year_part = parts[2]; + + // Parse year - could be 2 or 4 digits + let year = if year_part.len() == 2 { + if let Ok(yy) = year_part.parse::() { + // Assume 20xx for values <= 50, 19xx for > 50 + if yy <= 50 { + format!("{}", 2000 + yy) + } else { + format!("{}", 1900 + yy) + } + } else { + return None; + } + } else if year_part.len() == 4 { + year_part.to_string() + } else { + return None; + }; + + // Validate month and day + if let (Ok(m), Ok(d)) = (month.parse::(), day.parse::()) { + if m >= 1 && m <= 12 && d >= 1 && d <= 31 { + return Some(format!("{}-{:02}-{:02}", year, m, d)); + } + } + + None +} + +/// Extract series identifier (tokens after the date) +/// Examples: "0003", "4Y", "144A", "REGS", "MTN", "PSI", "CD" +fn extract_series_identifier(ticker: &str) -> Option { + let parts: Vec<&str> = ticker.split_whitespace().collect(); + + // Look for date pattern, then take what comes after + for i in 0..parts.len() { + if parts[i].contains('/') && parts[i].matches('/').count() == 2 { + // Found date, check if there's something after + if i + 1 < parts.len() { + return Some(parts[i + 1].to_string()); + } + } + } + + None +} + +/// Classify government issuer type +pub fn classify_government_issuer(name: &str) -> String { + let name_lower = name.to_lowercase(); + + // Sovereign nations + if name_lower.contains("republic") + || name_lower.contains("kingdom") + || name_lower.contains("federal republic") + || name_lower.ends_with(" govt") + || name_lower.ends_with(" government") + || name_lower.contains("hellenic") // Greece + || name_lower.contains("slovak") { + return "sovereign".to_string(); + } + + // Municipalities (Norwegian communes, cities, etc.) + if name_lower.contains("kommune") + || name_lower.contains("municipality") + || name_lower.contains("city of") + || name_lower.contains("town of") + || name_lower.contains("county council") { + return "municipal".to_string(); + } + + // States/Provinces/Regions + if name_lower.contains("state of") + || name_lower.contains("province") + || name_lower.contains("region") + || name_lower.contains("county") { + return "state".to_string(); + } + + // Government agencies/entities + if name_lower.contains("export credit") + || name_lower.contains("development bank") + || name_lower.contains("housing") + || name_lower.contains("akademiska") + || name_lower.contains("byggdastofnun") { + return "agency".to_string(); + } + + "other".to_string() +} + +/// Process corporate bonds from FIGI data +/// Mirrors the pattern used for warrants/options +pub fn process_corporate_bonds( + figi_infos: &[FigiInfo], + existing_bonds: &mut HashMap, +) -> usize { + let mut new_count = 0; + + // Group by issuer name + let mut by_issuer: HashMap> = HashMap::new(); + for figi in figi_infos { + by_issuer.entry(figi.name.clone()).or_default().push(figi.clone()); + } + + for (issuer_name, figis) in by_issuer { + let bond_info = existing_bonds + .entry(issuer_name.clone()) + .or_insert_with(|| CorporateBondInfo { + issuer_name: issuer_name.clone(), + bonds: HashMap::new(), + bond_details: HashMap::new(), + }); + + for figi in figis { + // Group by ISIN + let isin_bonds = bond_info.bonds.entry(figi.isin.clone()).or_default(); + + // Check if this specific FIGI already exists + if !isin_bonds.iter().any(|f| f.figi == figi.figi) { + // Parse bond details + let details = parse_bond_details(&figi.ticker, &figi.security_description); + bond_info.bond_details.insert(figi.isin.clone(), details); + + isin_bonds.push(figi); + new_count += 1; + } + } + } + + new_count +} + +/// Process government bonds from FIGI data +/// Mirrors the pattern used for warrants/options +pub fn process_government_bonds( + figi_infos: &[FigiInfo], + existing_bonds: &mut HashMap, +) -> usize { + let mut new_count = 0; + + // Group by issuer name + let mut by_issuer: HashMap> = HashMap::new(); + for figi in figi_infos { + by_issuer.entry(figi.name.clone()).or_default().push(figi.clone()); + } + + for (issuer_name, figis) in by_issuer { + let issuer_type = classify_government_issuer(&issuer_name); + + let bond_info = existing_bonds + .entry(issuer_name.clone()) + .or_insert_with(|| GovernmentBondInfo { + issuer_name: issuer_name.clone(), + issuer_type: issuer_type.clone(), + bonds: HashMap::new(), + bond_details: HashMap::new(), + }); + + for figi in figis { + // Group by ISIN + let isin_bonds = bond_info.bonds.entry(figi.isin.clone()).or_default(); + + // Check if this specific FIGI already exists + if !isin_bonds.iter().any(|f| f.figi == figi.figi) { + // Parse bond details + let details = parse_bond_details(&figi.ticker, &figi.security_description); + bond_info.bond_details.insert(figi.isin.clone(), details); + + isin_bonds.push(figi); + new_count += 1; + } + } + } + + new_count +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_corporate_bond() { + let details = parse_bond_details( + "WTFC 4.3 01/12/26 0003", + "WTFC 4.3 01/12/26" + ); + + assert_eq!(details.coupon_rate, Some(4.3)); + assert_eq!(details.maturity_date, Some("2026-01-12".to_string())); + assert!(!details.is_floating); + assert!(!details.is_zero_coupon); + assert_eq!(details.series_identifier, Some("0003".to_string())); + } + + #[test] + fn test_parse_government_bond() { + let details = parse_bond_details( + "SLOVAK 1.5225 05/10/28 4Y", + "SLOVAK 1.5225 05/10/28" + ); + + assert_eq!(details.coupon_rate, Some(1.5225)); + assert_eq!(details.maturity_date, Some("2028-05-10".to_string())); + assert!(!details.is_floating); + assert_eq!(details.series_identifier, Some("4Y".to_string())); + } + + #[test] + fn test_parse_floating_rate() { + let details = parse_bond_details( + "SEK Float 06/30/34", + "SEK Float 06/30/34" + ); + + assert!(details.is_floating); + assert_eq!(details.maturity_date, Some("2034-06-30".to_string())); + assert_eq!(details.coupon_rate, None); + } + + #[test] + fn test_parse_fractional_coupon() { + let details = parse_bond_details( + "DANGCE 12 1/2 05/30/26 B", + "DANGCE 12 1/2 05/30/26" + ); + + assert_eq!(details.coupon_rate, Some(12.5)); + assert_eq!(details.maturity_date, Some("2026-05-30".to_string())); + } + + #[test] + fn test_parse_zero_coupon() { + let details = parse_bond_details( + "GGB 0 10/15/42", + "GGB 0 10/15/42" + ); + + assert_eq!(details.coupon_rate, Some(0.0)); + assert!(details.is_zero_coupon); + assert_eq!(details.maturity_date, Some("2042-10-15".to_string())); + } + + #[test] + fn test_classify_issuer_types() { + assert_eq!(classify_government_issuer("SLOVAK REPUBLIC"), "sovereign"); + assert_eq!(classify_government_issuer("ASNES KOMMUNE"), "municipal"); + assert_eq!(classify_government_issuer("SWEDISH EXPORT CREDIT"), "agency"); + assert_eq!(classify_government_issuer("REGION OCCITANIE"), "state"); + } +} \ No newline at end of file diff --git a/src/corporate/collect_exchanges.rs b/src/corporate/collect_exchanges.rs index 1415b6b..b6a4915 100644 --- a/src/corporate/collect_exchanges.rs +++ b/src/corporate/collect_exchanges.rs @@ -244,8 +244,7 @@ fn get_fallback_rate(currency: &str) -> f64 { /// - Handles missing or invalid data gracefully /// - Integrity tracking with content hash validation pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result { - let state_path = paths.data_dir().join("state.jsonl"); - let manager = StateManager::new(paths.integrity_dir())?; + let manager = StateManager::new(paths.integrity_dir()).await?; let step_name = "exchange_collection_complete"; let output_path = paths.data_dir().join("yahoo_exchanges.json"); diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index ff810df..37675c5 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -16,5 +16,6 @@ pub mod update_companies_cleanse; pub mod update_companies_enrich; pub mod collect_exchanges; +pub mod bond_processing; pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/types.rs b/src/corporate/types.rs index 2173893..a8c4bc0 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -69,7 +69,7 @@ pub struct FigiInfo { /// Company Info /// # Attributes -/// * Name as primary key (for one instition) -> might have to changed when first FigiInfo is coming in +/// * Name as primary key (for one institution) -> might have to changed when first FigiInfo is coming in /// * ISIN as the most liquid / preferred traded security (used for fallback) /// * securities: Grouped by ISIN, filtered for Common Stock only #[derive(Debug, Clone, Serialize, Deserialize)] @@ -123,6 +123,48 @@ pub struct OptionInfo { pub options: HashMap>, // ISIN -> Vec (grouped by ISIN) } +/// Bond parsed details from ticker/description +/// +/// Parses bond information from ticker format: +/// Corporate: "WTFC 4.3 01/12/26 0003" +/// Government: "SLOVAK 1.5225 05/10/28 4Y" +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BondDetails { + pub coupon_rate: Option, // 4.3, 1.5225 + pub maturity_date: Option, // "2026-01-12", "2028-05-10" + pub is_floating: bool, // true if "Float" in description + pub is_zero_coupon: bool, // true if coupon is 0 + pub tenor_years: Option, // Parsed from maturity or inferred + pub series_identifier: Option, // "0003", "4Y", "144A", "REGS", etc. +} + +/// Corporate Bond Info +/// +/// Information for corporate bonds grouped by issuer +/// Example: "name": "LIBERTYVILLE BK & TRUST" +/// ticker: "WTFC 4.3 01/12/26 0003" +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CorporateBondInfo { + pub issuer_name: String, // key - company name issuing the bond + pub bonds: HashMap>, // ISIN -> Vec (grouped by ISIN) + #[serde(skip_serializing_if = "HashMap::is_empty", default)] + pub bond_details: HashMap, // ISIN -> parsed bond details +} + +/// Government Bond Info +/// +/// Information for government bonds grouped by issuer (country/municipality) +/// Example: "name": "SLOVAK REPUBLIC" +/// ticker: "SLOVAK 1.5225 05/10/28 4Y" +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GovernmentBondInfo { + pub issuer_name: String, // key - government entity name + pub issuer_type: String, // "sovereign", "municipal", "state", "province", etc. + pub bonds: HashMap>, // ISIN -> Vec (grouped by ISIN) + #[serde(skip_serializing_if = "HashMap::is_empty", default)] + pub bond_details: HashMap, // ISIN -> parsed bond details +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AvailableExchange { pub exchange_mic: String, diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index c600b02..a8c2684 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -40,14 +40,13 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result(line) { - if state.get("yahoo_companies_cleansed_low_profile").and_then(|v| v.as_bool()).unwrap_or(false) { - logger::log_info(" Yahoo low profile cleansing already completed, reading existing file...").await; - - if checkpoint_path.exists() { - let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?; - let count = checkpoint_content.lines() - .filter(|line| !line.trim().is_empty()) - .count(); - - logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo_cleaned.jsonl", count)).await; - return Ok(count); - } else { - logger::log_warn(" State indicates completion but companies_yahoo_cleaned.jsonl not found, re-running...").await; - break; - } - } - } - } - } // === RECOVERY PHASE: Load checkpoint + replay log === let mut existing_companies: HashMap = HashMap::new(); diff --git a/src/corporate/update_companies_enrich.rs b/src/corporate/update_companies_enrich.rs index 4b16340..cf019cd 100644 --- a/src/corporate/update_companies_enrich.rs +++ b/src/corporate/update_companies_enrich.rs @@ -73,7 +73,6 @@ pub async fn enrich_companies_with_events( // File paths let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); let log_path = data_path.join("companies_events_updates.log"); - let state_path = data_path.join("state.jsonl"); // Check input exists if !input_path.exists() { @@ -81,7 +80,7 @@ pub async fn enrich_companies_with_events( return Ok(0); } - let manager = StateManager::new(paths.integrity_dir())?; + let manager = StateManager::new(paths.integrity_dir()).await?; let step_name = "yahoo_events_enrichment_complete"; if manager.is_step_valid(step_name).await? { @@ -410,7 +409,6 @@ pub async fn enrich_companies_with_option( // File paths let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); let log_path = data_path.join("companies_option_updates.log"); - let state_path = data_path.join("state.jsonl"); // Check input exists if !input_path.exists() { @@ -418,7 +416,7 @@ pub async fn enrich_companies_with_option( return Ok(0); } - let manager = StateManager::new(paths.integrity_dir())?; + let manager = StateManager::new(paths.integrity_dir()).await?; let step_name = "yahoo_option_enrichment_complete"; if manager.is_step_valid(step_name).await? { @@ -670,7 +668,6 @@ pub async fn enrich_companies_with_chart( // File paths let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); let log_path = data_path.join("companies_chart_updates.log"); - let state_path = data_path.join("state.jsonl"); // Check input exists if !input_path.exists() { @@ -678,7 +675,7 @@ pub async fn enrich_companies_with_chart( return Ok(0); } - let manager = StateManager::new(paths.integrity_dir())?; + let manager = StateManager::new(paths.integrity_dir()).await?; let step_name = "yahoo_chart_enrichment_complete"; if manager.is_step_valid(step_name).await? { diff --git a/src/corporate/update_openfigi.rs b/src/corporate/update_openfigi.rs index 39f5d27..0bdb047 100644 --- a/src/corporate/update_openfigi.rs +++ b/src/corporate/update_openfigi.rs @@ -102,13 +102,13 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> { logger::log_info("Building securities data from FIGI mappings...").await; let dir = DataPaths::new(".")?; - let state_path = dir.data_dir().join("state.jsonl"); - let manager = StateManager::new(&dir.integrity_dir())?; + let manager = StateManager::new(&dir.integrity_dir()).await?; let step_name = "securities_data_complete"; let data_dir = dir.data_dir(); let corporate_data_dir = data_dir.join("corporate"); - let output_dir = corporate_data_dir.join("by_name"); + let economic_data_dir = data_dir.join("economic"); + let output_dir = data_dir.join("by_name"); tokio_fs::create_dir_all(&output_dir).await .context("Failed to create corporate/by_name directory")?; @@ -130,6 +130,10 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> { let warrants_log = output_dir.join("warrants.log.jsonl"); let options_checkpoint = output_dir.join("options.jsonl"); let options_log = output_dir.join("options.log.jsonl"); + let corporate_bonds_checkpoint = output_dir.join("corporate_bonds.jsonl"); + let corporate_bonds_log = output_dir.join("corporate_bonds.log.jsonl"); + let government_bonds_checkpoint = output_dir.join("government_bonds.jsonl"); + let government_bonds_log = output_dir.join("government_bonds.log.jsonl"); // Track which sectors have been fully processed let processed_sectors_file = output_dir.join("state.jsonl"); @@ -176,15 +180,19 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> { let mut existing_companies = load_checkpoint_and_replay(&common_checkpoint, &common_log, "name").await?; let mut existing_warrants = load_checkpoint_and_replay_nested(&warrants_checkpoint, &warrants_log).await?; let mut existing_options = load_checkpoint_and_replay_nested(&options_checkpoint, &options_log).await?; + let mut existing_corporate_bonds = load_checkpoint_and_replay_nested(&corporate_bonds_checkpoint, &corporate_bonds_log).await?; + let mut existing_government_bonds = load_checkpoint_and_replay_nested(&government_bonds_checkpoint, &government_bonds_log).await?; - logger::log_info(&format!(" Existing entries - Companies: {}, Warrants: {}, Options: {}", - existing_companies.len(), existing_warrants.len(), existing_options.len())).await; + logger::log_info(&format!(" Existing entries - Companies: {}, Warrants: {}, Options: {}, Corporate Bonds: {}, Government Bonds: {}", + existing_companies.len(), existing_warrants.len(), existing_options.len(), existing_corporate_bonds.len(), existing_government_bonds.len())).await; // Process statistics let mut stats = StreamingStats::new( existing_companies.len(), existing_warrants.len(), - existing_options.len() + existing_options.len(), + existing_corporate_bonds.len(), + existing_government_bonds.len() ); logger::log_info(&format!(" Found {} sectors to process", sectors_to_process.len())).await; @@ -834,20 +842,29 @@ struct StreamingStats { initial_companies: usize, initial_warrants: usize, initial_options: usize, + initial_corporate_bonds: usize, + initial_government_bonds: usize, companies_added: usize, warrants_added: usize, options_added: usize, + corporate_bonds_added: usize, + government_bonds_added: usize, + } impl StreamingStats { - fn new(companies: usize, warrants: usize, options: usize) -> Self { + fn new(companies: usize, warrants: usize, options: usize, corporate_bonds: usize, government_bonds: usize) -> Self { Self { initial_companies: companies, initial_warrants: warrants, initial_options: options, + initial_corporate_bonds: corporate_bonds, + initial_government_bonds: government_bonds, companies_added: 0, warrants_added: 0, options_added: 0, + corporate_bonds_added: 0, + government_bonds_added: 0, } } @@ -865,6 +882,14 @@ impl StreamingStats { println!(" - Initial: {}", self.initial_options); println!(" - Added: {}", self.options_added); println!(" - Total: {}", self.initial_options + self.options_added); + println!("Corporate Bonds:"); + println!(" - Initial: {}", self.initial_corporate_bonds); + println!(" - Added: {}", self.corporate_bonds_added); + println!(" - Total: {}", self.initial_corporate_bonds + self.corporate_bonds_added); + println!("Government Bonds:"); + println!(" - Initial: {}", self.initial_government_bonds); + println!(" - Added: {}", self.government_bonds_added); + println!(" - Total: {}", self.initial_government_bonds + self.government_bonds_added); } } @@ -1078,17 +1103,17 @@ async fn load_existing_mapped_leis(date_dir: &Path) -> anyhow::Result anyhow::Result> { - let file = std::fs::File::open(csv_path)?; - let reader = BufReader::new(file); + let content = tokio::fs::read_to_string(csv_path) + .await + .context(format!("Failed to read GLEIF CSV file: {}", csv_path))?; let mut all_leis = HashSet::new(); - for (idx, line) in reader.lines().enumerate() { + for (idx, line) in content.lines().enumerate() { if idx == 0 { continue; // Skip header } - let line = line?; let parts: Vec<&str> = line.split(',').collect(); if parts.len() < 2 { @@ -1147,8 +1172,9 @@ pub async fn stream_gleif_csv_and_build_figi_filtered( ) -> anyhow::Result<()> { logger::log_info(&format!("Streaming GLEIF CSV: {}", csv_path)).await; - let file = std::fs::File::open(csv_path)?; - let reader = BufReader::new(file); + let content = tokio::fs::read_to_string(csv_path) + .await + .context(format!("Failed to read GLEIF CSV file: {}", csv_path))?; let client = OpenFigiClient::new().await?; if !client.has_key { @@ -1171,9 +1197,7 @@ pub async fn stream_gleif_csv_and_build_figi_filtered( let mut processed_leis = 0; let mut skipped_leis = 0; - for (idx, line) in reader.lines().enumerate() { - let line = line?; - + for (idx, line) in content.lines().enumerate() { if idx == 0 { continue; } let parts: Vec<&str> = line.split(',').collect(); @@ -1232,8 +1256,7 @@ pub async fn update_lei_mapping( gleif_date: Option<&str>, ) -> anyhow::Result { let dir = DataPaths::new(".")?; - let state_path = dir.cache_dir().join("state.jsonl"); - let manager = StateManager::new(&dir.integrity_dir())?; + let manager = StateManager::new(&dir.integrity_dir()).await?; let step_name = "lei_figi_mapping_complete"; let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); @@ -1251,7 +1274,7 @@ pub async fn update_lei_mapping( if unmapped.is_empty() { logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await; - track_lei_mapping_completion(&manager, &date_dir).await?; + track_lei_mapping_completion(&manager, &dir.integrity_dir()).await?; logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await; return Ok(true); diff --git a/src/economic/yahoo_update_forex.rs b/src/economic/yahoo_update_forex.rs index 03c00e7..df0df67 100644 --- a/src/economic/yahoo_update_forex.rs +++ b/src/economic/yahoo_update_forex.rs @@ -92,9 +92,8 @@ pub async fn collect_fx_rates( // File paths let output_path = data_path.join("economic").join("currency"); let log_path = data_path.join("fx_rates_updates.log"); - let state_path = data_path.join("state.jsonl"); - let manager = StateManager::new(paths.integrity_dir())?; + let manager = StateManager::new(paths.integrity_dir()).await?; let step_name = "yahoo_fx_rate_collection_completed"; let content_reference = directory_reference(&output_path, Some(vec![ diff --git a/src/main.rs b/src/main.rs index 21c3878..4c89543 100644 --- a/src/main.rs +++ b/src/main.rs @@ -251,13 +251,13 @@ async fn visualize_checkpoint_dependencies(paths: &DataPaths) -> Result<()> { // Add more detailed error handling match StateManager::new( paths.integrity_dir(), - ) { + ).await { Ok(manager) => { logger::log_info("✓ Dependency configuration loaded successfully").await; manager.print_dependency_graph(); let dot = manager.get_dependency_config().to_dot(); - let dot_path = paths.logs_dir().join("checkpoint_dependencies.dot"); + let dot_path = paths.integrity_dir().join("checkpoint_dependencies.dot"); std::fs::write(&dot_path, dot)?; logger::log_info(&format!("✓ DOT file written to: {}", dot_path.display())).await; diff --git a/src/util/integrity.rs b/src/util/integrity.rs index 3accb9d..0613591 100644 --- a/src/util/integrity.rs +++ b/src/util/integrity.rs @@ -89,10 +89,11 @@ pub struct GroupConfig { impl DependencyConfig { /// Load dependency configuration from TOML file - pub fn from_file>(path: P) -> Result { - let content = fs::read_to_string(path.as_ref()) + pub async fn from_file>(path: P) -> Result { + let content = async_fs::read_to_string(path.as_ref()) + .await .with_context(|| format!("Failed to read dependency config: {}", path.as_ref().display()))?; - + let config: DependencyConfig = toml::from_str(&content) .context("Failed to parse dependency config")?; @@ -102,7 +103,7 @@ impl DependencyConfig { } /// Load from default location (dependencies.toml in base_dir) - pub fn from_default_location>(base_dir: P) -> Result { + pub async fn from_default_location>(base_dir: P) -> Result { let config_path = base_dir.as_ref().join(DEFAULT_DEPENDENCY_CONFIG); if !config_path.exists() { @@ -110,7 +111,7 @@ impl DependencyConfig { return Ok(Self::default()); } - Self::from_file(config_path) + Self::from_file(config_path).await } /// Validate configuration (check for cycles, invalid references) @@ -772,9 +773,9 @@ pub struct StateManager { impl StateManager { /// Create new state manager and load dependency configuration - pub fn new>(base_dir: P) -> Result { + pub async fn new>(base_dir: P) -> Result { let base_dir = base_dir.as_ref().to_path_buf(); - let dependency_config = DependencyConfig::from_default_location(&base_dir)?; + let dependency_config = DependencyConfig::from_default_location(&base_dir).await?; Ok(Self { base_dir, @@ -808,7 +809,7 @@ impl StateManager { return Ok(entries); } - let content = async_fs::read_to_string(&self.base_dir).await?; + let content = async_fs::read_to_string(&self.base_dir.join("state.jsonl")).await?; for line in content.lines() { if line.trim().is_empty() { @@ -829,7 +830,7 @@ impl StateManager { async_fs::create_dir_all(parent).await?; } - let mut file = async_fs::File::create(&self.base_dir).await?; + let mut file = async_fs::File::create(&self.base_dir.join("state.jsonl")).await?; for entry in entries.values() { let line = serde_json::to_string(&entry)? + "\n";