diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index 1d6f047..ad450f4 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -2,6 +2,7 @@ // Key changes: Never load entire GLEIF CSV or FIGI maps into memory use crate::util::directories::DataPaths; +use crate::util::integrity::{DataStage, StateManager, directory_reference}; use crate::util::logger; use super::types::*; use reqwest::Client as HttpClient; @@ -270,13 +271,28 @@ async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> { /// /// # Errors /// Returns an error if file I/O fails or JSON parsing fails. -pub async fn load_or_build_all_securities(date_dir: &Path) -> anyhow::Result<()> { +pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> { logger::log_info("Building securities data from FIGI mappings...").await; let dir = DataPaths::new(".")?; + let state_path = dir.data_dir().join("state.jsonl"); + let manager = StateManager::new(&state_path, &dir.data_dir().to_path_buf()); + let step_name = "securities_data_complete"; + let data_dir = dir.data_dir(); let corporate_data_dir = data_dir.join("corporate"); let output_dir = corporate_data_dir.join("by_name"); + tokio_fs::create_dir_all(&output_dir).await + .context("Failed to create corporate/by_name directory")?; + + if manager.is_step_valid(step_name).await? { + logger::log_info(" Securities data already built and valid").await; + logger::log_info(" All sectors already processed, nothing to do").await; + return Ok(()); + } + + logger::log_info("Building securities data from FIGI mappings...").await; + tokio_fs::create_dir_all(&output_dir).await .context("Failed to create corporate/by_name directory")?; @@ -381,6 +397,44 @@ pub async fn load_or_build_all_securities(date_dir: &Path) -> anyhow::Result<()> stats.print_summary(); logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await; + + track_securities_completion(&manager, &output_dir).await?; + logger::log_info(" ✓ Securities data marked as complete with integrity tracking").await; + + Ok(()) +} + +/// Track securities data completion with content hash verification +async fn track_securities_completion( + manager: &StateManager, + output_dir: &Path, +) -> anyhow::Result<()> { + // Create content reference for all output files + let content_reference = directory_reference( + output_dir, + Some(vec![ + "common_stocks.jsonl".to_string(), + "warrants.jsonl".to_string(), + "options.jsonl".to_string(), + ]), + Some(vec![ + "*.log.jsonl".to_string(), // Exclude log files + "*.tmp".to_string(), // Exclude temp files + "state.jsonl".to_string(), // Exclude internal state tracking + ]), + ); + + // Track completion with: + // - Content reference: All output JSONL files + // - Data stage: Data (7-day TTL) - Securities data relatively stable + // - Dependencies: LEI-FIGI mapping must be valid + manager.update_entry( + "securities_data_complete".to_string(), + content_reference, + DataStage::Data, + vec!["lei_figi_mapping_complete".to_string()], // Depends on LEI mapping + None, // Use default TTL (7 days) + ).await?; Ok(()) } @@ -1561,21 +1615,33 @@ pub async fn stream_gleif_csv_and_build_figi_filtered( } /// Check mapping completion and process only unmapped LEIs -pub async fn ensure_all_leis_mapped( +pub async fn update_lei_mapping( csv_path: &str, gleif_date: Option<&str>, ) -> anyhow::Result { let dir = DataPaths::new(".")?; + let state_path = dir.cache_dir().join("state.jsonl"); + let manager = StateManager::new(&state_path, &dir.cache_dir().to_path_buf()); + let step_name = "lei_figi_mapping_complete"; + let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); - let date = determine_gleif_date(gleif_date, &dir).await?; let date_dir = map_cache_dir.join(&date); + + if manager.is_step_valid(step_name).await? { + logger::log_info(" LEI-FIGI mapping already completed and valid").await; + logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await; + return Ok(true); + } // Get unmapped LEIs (excludes both mapped and no-result LEIs) let unmapped = get_unmapped_leis(csv_path, &date_dir).await?; if unmapped.is_empty() { logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await; + track_lei_mapping_completion(&manager, &date_dir).await?; + logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await; + return Ok(true); } @@ -1589,6 +1655,8 @@ pub async fn ensure_all_leis_mapped( if still_unmapped.is_empty() { logger::log_info("✓ All LEIs successfully queried").await; + track_lei_mapping_completion(&manager, &date_dir).await?; + logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await; Ok(true) } else { logger::log_warn(&format!( @@ -1599,6 +1667,40 @@ pub async fn ensure_all_leis_mapped( } } +/// Track LEI-FIGI mapping completion with content hash verification +async fn track_lei_mapping_completion( + manager: &StateManager, + date_dir: &Path, +) -> anyhow::Result<()> { + // Create content reference for all FIGI mapping files + // This will hash ALL lei_to_figi.jsonl files in sector directories + let content_reference = directory_reference( + date_dir, + Some(vec![ + "*/lei_to_figi.jsonl".to_string(), // All sector mapping files + "no_results.jsonl".to_string(), // LEIs with no results + ]), + Some(vec![ + "*.tmp".to_string(), // Exclude temp files + "*.log".to_string(), // Exclude log files + ]), + ); + + // Track completion with: + // - Content reference: All FIGI mapping files in date directory + // - Data stage: Cache (24-hour TTL) - FIGI data can change frequently + // - Dependencies: None (this is a collection step from external API) + manager.update_entry( + "lei_figi_mapping_complete".to_string(), + content_reference, + DataStage::Cache, // 24-hour TTL for API data + vec![], // No dependencies + None, // Use default TTL + ).await?; + + Ok(()) +} + /// Load LEIs that were queried but returned no results async fn load_no_result_leis(date_dir: &Path) -> anyhow::Result> { let mut no_result_leis = HashSet::new(); diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 4dcb8f5..6201832 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -50,7 +50,7 @@ pub async fn run_full_update( check_shutdown!(shutdown_flag); logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; - let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?; + let all_mapped = update_lei_mapping(&gleif_csv_path, None).await?; if !all_mapped { logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await; @@ -65,7 +65,7 @@ pub async fn run_full_update( if let Some(date_dir) = date_dir { logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; - load_or_build_all_securities(&date_dir).await?; + update_securities(&date_dir).await?; logger::log_info(" ✓ Securities map updated").await; } else { logger::log_warn(" ✗ No FIGI data directory found").await; diff --git a/src/corporate/update_companies_enrich.rs b/src/corporate/update_companies_enrich.rs index 98e8778..8d9f0ef 100644 --- a/src/corporate/update_companies_enrich.rs +++ b/src/corporate/update_companies_enrich.rs @@ -98,9 +98,7 @@ pub async fn enrich_companies_with_events( if pending_count == 0 { logger::log_info(" ✓ All companies already enriched").await; - track_events_completion(&manager, paths, step_name).await?; - return Ok(enriched_companies.len()); }