added integrity check to openfigi functions

This commit is contained in:
2026-01-11 00:06:25 +01:00
parent 6f05dc8c99
commit 04f4b0d0c4
3 changed files with 107 additions and 7 deletions

View File

@@ -2,6 +2,7 @@
// Key changes: Never load entire GLEIF CSV or FIGI maps into memory
use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, directory_reference};
use crate::util::logger;
use super::types::*;
use reqwest::Client as HttpClient;
@@ -270,13 +271,28 @@ async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> {
///
/// # Errors
/// Returns an error if file I/O fails or JSON parsing fails.
pub async fn load_or_build_all_securities(date_dir: &Path) -> anyhow::Result<()> {
pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> {
logger::log_info("Building securities data from FIGI mappings...").await;
let dir = DataPaths::new(".")?;
let state_path = dir.data_dir().join("state.jsonl");
let manager = StateManager::new(&state_path, &dir.data_dir().to_path_buf());
let step_name = "securities_data_complete";
let data_dir = dir.data_dir();
let corporate_data_dir = data_dir.join("corporate");
let output_dir = corporate_data_dir.join("by_name");
tokio_fs::create_dir_all(&output_dir).await
.context("Failed to create corporate/by_name directory")?;
if manager.is_step_valid(step_name).await? {
logger::log_info(" Securities data already built and valid").await;
logger::log_info(" All sectors already processed, nothing to do").await;
return Ok(());
}
logger::log_info("Building securities data from FIGI mappings...").await;
tokio_fs::create_dir_all(&output_dir).await
.context("Failed to create corporate/by_name directory")?;
@@ -382,6 +398,44 @@ pub async fn load_or_build_all_securities(date_dir: &Path) -> anyhow::Result<()>
stats.print_summary();
logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await;
track_securities_completion(&manager, &output_dir).await?;
logger::log_info(" ✓ Securities data marked as complete with integrity tracking").await;
Ok(())
}
/// Track securities data completion with content hash verification
async fn track_securities_completion(
manager: &StateManager,
output_dir: &Path,
) -> anyhow::Result<()> {
// Create content reference for all output files
let content_reference = directory_reference(
output_dir,
Some(vec![
"common_stocks.jsonl".to_string(),
"warrants.jsonl".to_string(),
"options.jsonl".to_string(),
]),
Some(vec![
"*.log.jsonl".to_string(), // Exclude log files
"*.tmp".to_string(), // Exclude temp files
"state.jsonl".to_string(), // Exclude internal state tracking
]),
);
// Track completion with:
// - Content reference: All output JSONL files
// - Data stage: Data (7-day TTL) - Securities data relatively stable
// - Dependencies: LEI-FIGI mapping must be valid
manager.update_entry(
"securities_data_complete".to_string(),
content_reference,
DataStage::Data,
vec!["lei_figi_mapping_complete".to_string()], // Depends on LEI mapping
None, // Use default TTL (7 days)
).await?;
Ok(())
}
@@ -1561,21 +1615,33 @@ pub async fn stream_gleif_csv_and_build_figi_filtered(
}
/// Check mapping completion and process only unmapped LEIs
pub async fn ensure_all_leis_mapped(
pub async fn update_lei_mapping(
csv_path: &str,
gleif_date: Option<&str>,
) -> anyhow::Result<bool> {
let dir = DataPaths::new(".")?;
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
let state_path = dir.cache_dir().join("state.jsonl");
let manager = StateManager::new(&state_path, &dir.cache_dir().to_path_buf());
let step_name = "lei_figi_mapping_complete";
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
let date = determine_gleif_date(gleif_date, &dir).await?;
let date_dir = map_cache_dir.join(&date);
if manager.is_step_valid(step_name).await? {
logger::log_info(" LEI-FIGI mapping already completed and valid").await;
logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await;
return Ok(true);
}
// Get unmapped LEIs (excludes both mapped and no-result LEIs)
let unmapped = get_unmapped_leis(csv_path, &date_dir).await?;
if unmapped.is_empty() {
logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await;
track_lei_mapping_completion(&manager, &date_dir).await?;
logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await;
return Ok(true);
}
@@ -1589,6 +1655,8 @@ pub async fn ensure_all_leis_mapped(
if still_unmapped.is_empty() {
logger::log_info("✓ All LEIs successfully queried").await;
track_lei_mapping_completion(&manager, &date_dir).await?;
logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await;
Ok(true)
} else {
logger::log_warn(&format!(
@@ -1599,6 +1667,40 @@ pub async fn ensure_all_leis_mapped(
}
}
/// Track LEI-FIGI mapping completion with content hash verification
async fn track_lei_mapping_completion(
manager: &StateManager,
date_dir: &Path,
) -> anyhow::Result<()> {
// Create content reference for all FIGI mapping files
// This will hash ALL lei_to_figi.jsonl files in sector directories
let content_reference = directory_reference(
date_dir,
Some(vec![
"*/lei_to_figi.jsonl".to_string(), // All sector mapping files
"no_results.jsonl".to_string(), // LEIs with no results
]),
Some(vec![
"*.tmp".to_string(), // Exclude temp files
"*.log".to_string(), // Exclude log files
]),
);
// Track completion with:
// - Content reference: All FIGI mapping files in date directory
// - Data stage: Cache (24-hour TTL) - FIGI data can change frequently
// - Dependencies: None (this is a collection step from external API)
manager.update_entry(
"lei_figi_mapping_complete".to_string(),
content_reference,
DataStage::Cache, // 24-hour TTL for API data
vec![], // No dependencies
None, // Use default TTL
).await?;
Ok(())
}
/// Load LEIs that were queried but returned no results
async fn load_no_result_leis(date_dir: &Path) -> anyhow::Result<HashSet<String>> {
let mut no_result_leis = HashSet::new();

View File

@@ -50,7 +50,7 @@ pub async fn run_full_update(
check_shutdown!(shutdown_flag);
logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await;
let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?;
let all_mapped = update_lei_mapping(&gleif_csv_path, None).await?;
if !all_mapped {
logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await;
@@ -65,7 +65,7 @@ pub async fn run_full_update(
if let Some(date_dir) = date_dir {
logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await;
load_or_build_all_securities(&date_dir).await?;
update_securities(&date_dir).await?;
logger::log_info(" ✓ Securities map updated").await;
} else {
logger::log_warn(" ✗ No FIGI data directory found").await;

View File

@@ -98,9 +98,7 @@ pub async fn enrich_companies_with_events(
if pending_count == 0 {
logger::log_info(" ✓ All companies already enriched").await;
track_events_completion(&manager, paths, step_name).await?;
return Ok(enriched_companies.len());
}