1390 lines
45 KiB
Rust
1390 lines
45 KiB
Rust
// src/corporate/update_openfigi.rs - STREAMING VERSION
|
|
// Key changes: Never load entire GLEIF CSV or FIGI maps into memory
|
|
|
|
use crate::util::directories::DataPaths;
|
|
use crate::util::integrity::{DataStage, StateManager, directory_reference};
|
|
use crate::util::logger;
|
|
use crate::scraper::openfigi::{OpenFigiClient};
|
|
use super::types::*;
|
|
use serde_json::{json, Value};
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::path::Path;
|
|
use std::io::{BufRead, BufReader};
|
|
use tokio::fs as tokio_fs;
|
|
use tokio::io::AsyncWriteExt;
|
|
use anyhow::{Context, anyhow};
|
|
|
|
const LEI_BATCH_SIZE: usize = 100; // Process 100 LEIs at a time
|
|
|
|
async fn process_and_save_figi_batch(
|
|
client: &OpenFigiClient,
|
|
lei_batch: &HashMap<String, Vec<String>>,
|
|
date_dir: &Path,
|
|
) -> anyhow::Result<()> {
|
|
for (lei, isins) in lei_batch {
|
|
let unique_isins: Vec<_> = isins.iter()
|
|
.cloned()
|
|
.collect::<HashSet<_>>()
|
|
.into_iter()
|
|
.collect();
|
|
|
|
let figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?;
|
|
|
|
if figi_infos.is_empty() {
|
|
// No FIGIs found - save to no_results.jsonl to avoid re-querying
|
|
append_no_result_lei(date_dir, lei, &unique_isins).await?;
|
|
continue;
|
|
}
|
|
|
|
// Save FIGIs by sector as before
|
|
save_figi_infos_by_sector(lei, &figi_infos, date_dir).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn save_figi_infos_by_sector(
|
|
lei: &str,
|
|
figi_infos: &[FigiInfo],
|
|
date_dir: &Path,
|
|
) -> anyhow::Result<()> {
|
|
let mut by_sector: HashMap<String, Vec<FigiInfo>> = HashMap::new();
|
|
|
|
for figi_info in figi_infos {
|
|
let sector = if figi_info.market_sector.is_empty() {
|
|
"uncategorized".to_string()
|
|
} else {
|
|
figi_info.market_sector.clone()
|
|
};
|
|
by_sector.entry(sector).or_default().push(figi_info.clone());
|
|
}
|
|
|
|
for (sector, figis) in by_sector {
|
|
let sector_dir = date_dir.join(§or);
|
|
let path = sector_dir.join("lei_to_figi.jsonl");
|
|
append_lei_to_figi_jsonl(&path, lei, &figis).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyhow::Result<()> {
|
|
let entry = json!({
|
|
"lei": lei,
|
|
"figis": figis,
|
|
});
|
|
|
|
let line = serde_json::to_string(&entry)? + "\n";
|
|
|
|
let mut file = tokio_fs::OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(path)
|
|
.await?;
|
|
|
|
file.write_all(line.as_bytes()).await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Loads or builds securities data by streaming through FIGI mapping files.
|
|
///
|
|
/// Implements abort-safe incremental persistence with checkpoints and replay logs.
|
|
///
|
|
/// # Arguments
|
|
/// * `date_dir` - Path to the date-specific mapping directory (e.g., cache/gleif_openfigi_map/24112025/)
|
|
///
|
|
/// # Returns
|
|
/// Ok(()) on success.
|
|
///
|
|
/// # Errors
|
|
/// Returns an error if file I/O fails or JSON parsing fails.
|
|
pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> {
|
|
logger::log_info("Building securities data from FIGI mappings...").await;
|
|
|
|
let dir = DataPaths::new(".")?;
|
|
let manager = StateManager::new(&dir.integrity_dir()).await?;
|
|
let step_name = "securities_data_complete";
|
|
|
|
let data_dir = dir.data_dir();
|
|
let corporate_data_dir = data_dir.join("corporate");
|
|
let economic_data_dir = data_dir.join("economic");
|
|
let output_dir = data_dir.join("by_name");
|
|
tokio_fs::create_dir_all(&output_dir).await
|
|
.context("Failed to create corporate/by_name directory")?;
|
|
|
|
if manager.is_step_valid(step_name).await? {
|
|
logger::log_info(" Securities data already built and valid").await;
|
|
logger::log_info(" All sectors already processed, nothing to do").await;
|
|
return Ok(());
|
|
}
|
|
|
|
logger::log_info("Building securities data from FIGI mappings...").await;
|
|
|
|
tokio_fs::create_dir_all(&output_dir).await
|
|
.context("Failed to create corporate/by_name directory")?;
|
|
|
|
// Setup checkpoint and log paths for each security type
|
|
let common_checkpoint = output_dir.join("common_stocks.jsonl");
|
|
let common_log = output_dir.join("common_stocks.log.jsonl");
|
|
let warrants_checkpoint = output_dir.join("warrants.jsonl");
|
|
let warrants_log = output_dir.join("warrants.log.jsonl");
|
|
let options_checkpoint = output_dir.join("options.jsonl");
|
|
let options_log = output_dir.join("options.log.jsonl");
|
|
let corporate_bonds_checkpoint = output_dir.join("corporate_bonds.jsonl");
|
|
let corporate_bonds_log = output_dir.join("corporate_bonds.log.jsonl");
|
|
let government_bonds_checkpoint = output_dir.join("government_bonds.jsonl");
|
|
let government_bonds_log = output_dir.join("government_bonds.log.jsonl");
|
|
|
|
// Track which sectors have been fully processed
|
|
let processed_sectors_file = output_dir.join("state.jsonl");
|
|
let processed_sectors = load_processed_sectors(&processed_sectors_file).await?;
|
|
|
|
logger::log_info(&format!(" Already processed {} sectors", processed_sectors.len())).await;
|
|
|
|
// Collect sectors to process
|
|
let mut sectors_to_process = Vec::new();
|
|
let mut entries = tokio_fs::read_dir(date_dir).await
|
|
.context("Failed to read date directory")?;
|
|
|
|
while let Some(entry) = entries.next_entry().await? {
|
|
let path = entry.path();
|
|
if !path.is_dir() {
|
|
continue;
|
|
}
|
|
|
|
let sector_name = path.file_name()
|
|
.and_then(|n| n.to_str())
|
|
.map(|s| s.to_string())
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
|
|
let lei_figi_file = path.join("lei_to_figi.jsonl");
|
|
if !lei_figi_file.exists() {
|
|
continue;
|
|
}
|
|
|
|
// Skip if already processed
|
|
if processed_sectors.contains(§or_name) {
|
|
logger::log_info(&format!(" Skipping already processed sector: {}", sector_name)).await;
|
|
continue;
|
|
}
|
|
|
|
sectors_to_process.push((sector_name, lei_figi_file));
|
|
}
|
|
|
|
if sectors_to_process.is_empty() {
|
|
logger::log_info(" All sectors already processed, nothing to do").await;
|
|
return Ok(());
|
|
}
|
|
|
|
// Load checkpoints and replay logs - these are MUTABLE now
|
|
let mut existing_companies = load_checkpoint_and_replay(&common_checkpoint, &common_log, "name").await?;
|
|
let mut existing_warrants = load_checkpoint_and_replay_nested(&warrants_checkpoint, &warrants_log).await?;
|
|
let mut existing_options = load_checkpoint_and_replay_nested(&options_checkpoint, &options_log).await?;
|
|
let mut existing_corporate_bonds = load_checkpoint_and_replay_nested(&corporate_bonds_checkpoint, &corporate_bonds_log).await?;
|
|
let mut existing_government_bonds = load_checkpoint_and_replay_nested(&government_bonds_checkpoint, &government_bonds_log).await?;
|
|
|
|
logger::log_info(&format!(" Existing entries - Companies: {}, Warrants: {}, Options: {}, Corporate Bonds: {}, Government Bonds: {}",
|
|
existing_companies.len(), existing_warrants.len(), existing_options.len(), existing_corporate_bonds.len(), existing_government_bonds.len())).await;
|
|
|
|
// Process statistics
|
|
let mut stats = StreamingStats::new(
|
|
existing_companies.len(),
|
|
existing_warrants.len(),
|
|
existing_options.len(),
|
|
existing_corporate_bonds.len(),
|
|
existing_government_bonds.len()
|
|
);
|
|
|
|
logger::log_info(&format!(" Found {} sectors to process", sectors_to_process.len())).await;
|
|
|
|
// Process each sector
|
|
let mut newly_processed_sectors = Vec::new();
|
|
|
|
for (sector_name, lei_figi_file) in sectors_to_process {
|
|
logger::log_info(&format!(" Processing sector: {}", sector_name)).await;
|
|
|
|
// Stream through the lei_to_figi.jsonl file with batched writes
|
|
process_lei_figi_file_batched(
|
|
&lei_figi_file,
|
|
&common_log,
|
|
&warrants_log,
|
|
&options_log,
|
|
&mut existing_companies,
|
|
&mut existing_warrants,
|
|
&mut existing_options,
|
|
&mut stats,
|
|
).await?;
|
|
|
|
// Mark sector as processed
|
|
newly_processed_sectors.push(sector_name.clone());
|
|
|
|
// Append to processed sectors file immediately for crash safety
|
|
append_processed_sector(&processed_sectors_file, §or_name).await?;
|
|
}
|
|
|
|
// Create checkpoints after all processing
|
|
if !newly_processed_sectors.is_empty() {
|
|
logger::log_info("Creating checkpoints...").await;
|
|
create_checkpoint(&common_checkpoint, &common_log).await?;
|
|
create_checkpoint(&warrants_checkpoint, &warrants_log).await?;
|
|
create_checkpoint(&options_checkpoint, &options_log).await?;
|
|
}
|
|
|
|
stats.print_summary();
|
|
logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await;
|
|
|
|
track_securities_completion(&manager, &output_dir).await?;
|
|
logger::log_info(" ✓ Securities data marked as complete with integrity tracking").await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Track securities data completion with content hash verification
|
|
async fn track_securities_completion(
|
|
manager: &StateManager,
|
|
output_dir: &Path,
|
|
) -> anyhow::Result<()> {
|
|
// Create content reference for all output files
|
|
let content_reference = directory_reference(
|
|
output_dir,
|
|
Some(vec![
|
|
"common_stocks.jsonl".to_string(),
|
|
"warrants.jsonl".to_string(),
|
|
"options.jsonl".to_string(),
|
|
]),
|
|
Some(vec![
|
|
"*.log.jsonl".to_string(), // Exclude log files
|
|
"*.tmp".to_string(), // Exclude temp files
|
|
"state.jsonl".to_string(), // Exclude internal state tracking
|
|
]),
|
|
);
|
|
|
|
// Track completion with:
|
|
// - Content reference: All output JSONL files
|
|
// - Data stage: Data (7-day TTL) - Securities data relatively stable
|
|
// - Dependencies: LEI-FIGI mapping must be valid
|
|
manager.update_entry(
|
|
"securities_data_complete".to_string(),
|
|
content_reference,
|
|
DataStage::Data,
|
|
None, // Use default TTL (7 days)
|
|
).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Loads the list of sectors that have been fully processed
|
|
async fn load_processed_sectors(path: &Path) -> anyhow::Result<HashSet<String>> {
|
|
let mut sectors = HashSet::new();
|
|
|
|
if !path.exists() {
|
|
return Ok(sectors);
|
|
}
|
|
|
|
let content = tokio_fs::read_to_string(path).await
|
|
.context("Failed to read processed sectors file")?;
|
|
|
|
for (line_num, line) in content.lines().enumerate() {
|
|
if line.trim().is_empty() || !line.ends_with('}') {
|
|
continue; // Skip incomplete lines
|
|
}
|
|
|
|
match serde_json::from_str::<Value>(line) {
|
|
Ok(entry) => {
|
|
if let Some(sector) = entry["sector"].as_str() {
|
|
sectors.insert(sector.to_string());
|
|
}
|
|
}
|
|
Err(e) => {
|
|
logger::log_warn(&format!(
|
|
"Skipping invalid processed sector line {}: {}",
|
|
line_num + 1, e
|
|
)).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(sectors)
|
|
}
|
|
|
|
/// Appends a sector name to the processed sectors file with fsync
|
|
/// Appends a sector name to the processed sectors JSONL file with fsync
|
|
async fn append_processed_sector(path: &Path, sector_name: &str) -> anyhow::Result<()> {
|
|
use std::fs::OpenOptions;
|
|
use std::io::Write;
|
|
|
|
let entry = json!({
|
|
"sector": sector_name,
|
|
"completed_at": chrono::Utc::now().to_rfc3339(),
|
|
});
|
|
|
|
let mut file = OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(path)
|
|
.context("Failed to open processed sectors file")?;
|
|
|
|
let line = serde_json::to_string(&entry)
|
|
.context("Failed to serialize sector entry")? + "\n";
|
|
|
|
file.write_all(line.as_bytes())?;
|
|
|
|
// Ensure durability
|
|
file.sync_data()
|
|
.context("Failed to fsync processed sectors file")?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Loads checkpoint and replays log, returning set of existing keys
|
|
async fn load_checkpoint_and_replay(
|
|
checkpoint_path: &Path,
|
|
log_path: &Path,
|
|
key_field: &str,
|
|
) -> anyhow::Result<HashSet<String>> {
|
|
let mut keys = HashSet::new();
|
|
|
|
// Load checkpoint if it exists
|
|
if checkpoint_path.exists() {
|
|
let content = tokio_fs::read_to_string(checkpoint_path).await
|
|
.context("Failed to read checkpoint")?;
|
|
|
|
for line in content.lines() {
|
|
if line.trim().is_empty() || !line.ends_with('}') {
|
|
continue; // Skip incomplete lines
|
|
}
|
|
|
|
if let Ok(entry) = serde_json::from_str::<Value>(line) {
|
|
if let Some(key) = entry[key_field].as_str() {
|
|
keys.insert(key.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Replay log if it exists
|
|
if log_path.exists() {
|
|
let content = tokio_fs::read_to_string(log_path).await
|
|
.context("Failed to read log")?;
|
|
|
|
for line in content.lines() {
|
|
if line.trim().is_empty() || !line.ends_with('}') {
|
|
continue; // Skip incomplete lines
|
|
}
|
|
|
|
if let Ok(entry) = serde_json::from_str::<Value>(line) {
|
|
if let Some(key) = entry[key_field].as_str() {
|
|
keys.insert(key.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(keys)
|
|
}
|
|
|
|
/// Loads checkpoint and replays log for nested structures (warrants/options)
|
|
async fn load_checkpoint_and_replay_nested(
|
|
checkpoint_path: &Path,
|
|
log_path: &Path,
|
|
) -> anyhow::Result<HashSet<String>> {
|
|
let mut keys = HashSet::new();
|
|
|
|
// Load checkpoint if it exists
|
|
if checkpoint_path.exists() {
|
|
let content = tokio_fs::read_to_string(checkpoint_path).await
|
|
.context("Failed to read checkpoint")?;
|
|
|
|
for line in content.lines() {
|
|
if line.trim().is_empty() || !line.ends_with('}') {
|
|
continue;
|
|
}
|
|
|
|
if let Ok(entry) = serde_json::from_str::<Value>(line) {
|
|
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
|
|
let type_field = if entry.get("warrant_type").is_some() {
|
|
entry["warrant_type"].as_str().unwrap_or("")
|
|
} else {
|
|
entry["option_type"].as_str().unwrap_or("")
|
|
};
|
|
|
|
if !underlying.is_empty() && !type_field.is_empty() {
|
|
keys.insert(format!("{}::{}", underlying, type_field));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Replay log if it exists
|
|
if log_path.exists() {
|
|
let content = tokio_fs::read_to_string(log_path).await
|
|
.context("Failed to read log")?;
|
|
|
|
for line in content.lines() {
|
|
if line.trim().is_empty() || !line.ends_with('}') {
|
|
continue;
|
|
}
|
|
|
|
if let Ok(entry) = serde_json::from_str::<Value>(line) {
|
|
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
|
|
let type_field = if entry.get("warrant_type").is_some() {
|
|
entry["warrant_type"].as_str().unwrap_or("")
|
|
} else {
|
|
entry["option_type"].as_str().unwrap_or("")
|
|
};
|
|
|
|
if !underlying.is_empty() && !type_field.is_empty() {
|
|
keys.insert(format!("{}::{}", underlying, type_field));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(keys)
|
|
}
|
|
|
|
/// Creates a checkpoint by copying log to checkpoint atomically
|
|
async fn create_checkpoint(checkpoint_path: &Path, log_path: &Path) -> anyhow::Result<()> {
|
|
if !log_path.exists() {
|
|
return Ok(());
|
|
}
|
|
|
|
// Read all committed lines from log
|
|
let content = tokio_fs::read_to_string(log_path).await
|
|
.context("Failed to read log for checkpoint")?;
|
|
|
|
let committed_lines: Vec<&str> = content
|
|
.lines()
|
|
.filter(|line| !line.trim().is_empty() && line.ends_with('}'))
|
|
.collect();
|
|
|
|
if committed_lines.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
// Write to temporary file
|
|
let tmp_path = checkpoint_path.with_extension("tmp");
|
|
let mut tmp_file = std::fs::File::create(&tmp_path)
|
|
.context("Failed to create temp checkpoint")?;
|
|
|
|
for line in committed_lines {
|
|
use std::io::Write;
|
|
writeln!(tmp_file, "{}", line)?;
|
|
}
|
|
|
|
// Ensure data is flushed to disk
|
|
tmp_file.sync_data()
|
|
.context("Failed to sync temp checkpoint")?;
|
|
|
|
drop(tmp_file);
|
|
|
|
// Atomic rename
|
|
tokio_fs::rename(&tmp_path, checkpoint_path).await
|
|
.context("Failed to rename checkpoint")?;
|
|
|
|
// Clear log after successful checkpoint
|
|
tokio_fs::remove_file(log_path).await
|
|
.context("Failed to remove log after checkpoint")?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync
|
|
async fn process_lei_figi_file_batched(
|
|
input_path: &Path,
|
|
common_log_path: &Path,
|
|
warrants_log_path: &Path,
|
|
options_log_path: &Path,
|
|
existing_companies: &mut HashSet<String>,
|
|
existing_warrants: &mut HashSet<String>,
|
|
existing_options: &mut HashSet<String>,
|
|
stats: &mut StreamingStats,
|
|
) -> anyhow::Result<()> {
|
|
let content = tokio_fs::read_to_string(input_path).await
|
|
.context("Failed to read lei_to_figi.jsonl")?;
|
|
|
|
let batch_size = 100;
|
|
let mut processed_count = 0;
|
|
|
|
let mut common_batch = Vec::new();
|
|
let mut warrants_batch = Vec::new();
|
|
let mut options_batch = Vec::new();
|
|
|
|
for (line_num, line) in content.lines().enumerate() {
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let entry: Value = serde_json::from_str(line)
|
|
.context(format!("Failed to parse JSON on line {}", line_num + 1))?;
|
|
|
|
let figis: Vec<FigiInfo> = serde_json::from_value(entry["figis"].clone())
|
|
.context("Invalid 'figis' field")?;
|
|
|
|
if figis.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
// Group by security type
|
|
let (common_stocks, warrant_securities, option_securities) =
|
|
group_by_security_type(&figis);
|
|
|
|
// Collect entries for batching and update existing keys
|
|
if !common_stocks.is_empty() {
|
|
if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) {
|
|
// Add to existing set immediately to prevent duplicates in same run
|
|
existing_companies.insert(entry.name.clone());
|
|
common_batch.push(entry);
|
|
}
|
|
}
|
|
|
|
if !warrant_securities.is_empty() {
|
|
for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) {
|
|
// Add to existing set immediately
|
|
let key = format!("{}::{}", entry.underlying_company_name, entry.warrant_type);
|
|
existing_warrants.insert(key);
|
|
warrants_batch.push(entry);
|
|
}
|
|
}
|
|
|
|
if !option_securities.is_empty() {
|
|
for entry in prepare_option_entries(&option_securities, existing_options) {
|
|
// Add to existing set immediately
|
|
let key = format!("{}::{}", entry.underlying_company_name, entry.option_type);
|
|
existing_options.insert(key);
|
|
options_batch.push(entry);
|
|
}
|
|
}
|
|
|
|
// Write batches when they reach size limit
|
|
if common_batch.len() >= batch_size {
|
|
write_batch_with_fsync(common_log_path, &common_batch).await?;
|
|
stats.companies_added += common_batch.len();
|
|
common_batch.clear();
|
|
}
|
|
|
|
if warrants_batch.len() >= batch_size {
|
|
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
|
|
stats.warrants_added += warrants_batch.len();
|
|
warrants_batch.clear();
|
|
}
|
|
|
|
if options_batch.len() >= batch_size {
|
|
write_batch_with_fsync(options_log_path, &options_batch).await?;
|
|
stats.options_added += options_batch.len();
|
|
options_batch.clear();
|
|
}
|
|
|
|
processed_count += 1;
|
|
if processed_count % 1000 == 0 {
|
|
logger::log_info(&format!(" Processed {} LEI entries...", processed_count)).await;
|
|
}
|
|
}
|
|
|
|
// Write remaining batches
|
|
if !common_batch.is_empty() {
|
|
write_batch_with_fsync(common_log_path, &common_batch).await?;
|
|
stats.companies_added += common_batch.len();
|
|
}
|
|
|
|
if !warrants_batch.is_empty() {
|
|
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
|
|
stats.warrants_added += warrants_batch.len();
|
|
}
|
|
|
|
if !options_batch.is_empty() {
|
|
write_batch_with_fsync(options_log_path, &options_batch).await?;
|
|
stats.options_added += options_batch.len();
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Writes a batch of entries to log with fsync
|
|
async fn write_batch_with_fsync<T: serde::Serialize>(
|
|
log_path: &Path,
|
|
entries: &[T],
|
|
) -> anyhow::Result<()> {
|
|
use std::fs::OpenOptions;
|
|
use std::io::Write;
|
|
|
|
let mut file = OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(log_path)
|
|
.context("Failed to open log file")?;
|
|
|
|
for entry in entries {
|
|
let line = serde_json::to_string(entry)
|
|
.context("Failed to serialize entry")?;
|
|
writeln!(file, "{}", line)?;
|
|
}
|
|
|
|
// Critical: fsync to ensure durability
|
|
file.sync_data()
|
|
.context("Failed to fsync log file")?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Prepares a common stock entry if it doesn't exist
|
|
fn prepare_common_stock_entry(
|
|
figi_infos: &[FigiInfo],
|
|
existing_keys: &HashSet<String>,
|
|
) -> Option<CompanyInfo> {
|
|
let name = figi_infos[0].name.clone();
|
|
if name.is_empty() || existing_keys.contains(&name) {
|
|
return None;
|
|
}
|
|
|
|
let grouped_by_isin = group_figis_by_isin(figi_infos);
|
|
let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default();
|
|
|
|
Some(CompanyInfo {
|
|
name,
|
|
primary_isin,
|
|
securities: grouped_by_isin,
|
|
})
|
|
}
|
|
|
|
/// Prepares warrant entries for batching
|
|
fn prepare_warrant_entries(
|
|
warrant_securities: &[FigiInfo],
|
|
existing_keys: &HashSet<String>,
|
|
) -> Vec<WarrantInfo> {
|
|
let mut entries = Vec::new();
|
|
|
|
for figi in warrant_securities {
|
|
let (underlying, issuer, warrant_type) = parse_warrant_name(&figi.name);
|
|
|
|
if underlying.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let key = format!("{}::{}", underlying, warrant_type);
|
|
if existing_keys.contains(&key) {
|
|
continue;
|
|
}
|
|
|
|
let warrant_info = WarrantInfo {
|
|
underlying_company_name: underlying.clone(),
|
|
issuer_company_name: issuer,
|
|
warrant_type: warrant_type.clone(),
|
|
warrants: {
|
|
let mut map = HashMap::new();
|
|
map.insert(figi.isin.clone(), vec![figi.clone()]);
|
|
map
|
|
},
|
|
};
|
|
|
|
entries.push(warrant_info);
|
|
}
|
|
|
|
entries
|
|
}
|
|
|
|
/// Prepares option entries for batching
|
|
fn prepare_option_entries(
|
|
option_securities: &[FigiInfo],
|
|
existing_keys: &HashSet<String>,
|
|
) -> Vec<OptionInfo> {
|
|
let mut entries = Vec::new();
|
|
|
|
for figi in option_securities {
|
|
let (underlying, issuer, option_type) = parse_option_name(&figi.name);
|
|
|
|
if underlying.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let key = format!("{}::{}", underlying, option_type);
|
|
if existing_keys.contains(&key) {
|
|
continue;
|
|
}
|
|
|
|
let option_info = OptionInfo {
|
|
underlying_company_name: underlying.clone(),
|
|
issuer_company_name: issuer,
|
|
option_type: option_type.clone(),
|
|
options: {
|
|
let mut map = HashMap::new();
|
|
map.insert(figi.isin.clone(), vec![figi.clone()]);
|
|
map
|
|
},
|
|
};
|
|
|
|
entries.push(option_info);
|
|
}
|
|
|
|
entries
|
|
}
|
|
|
|
/// Groups FigiInfo list by security type
|
|
fn group_by_security_type(figis: &[FigiInfo]) -> (Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>) {
|
|
let mut common_stocks = Vec::new();
|
|
let mut warrants = Vec::new();
|
|
let mut options = Vec::new();
|
|
|
|
for figi in figis {
|
|
match figi.security_type.as_str() {
|
|
"Common Stock" => common_stocks.push(figi.clone()),
|
|
"Equity WRT" => warrants.push(figi.clone()),
|
|
"Equity Option" => options.push(figi.clone()),
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
(common_stocks, warrants, options)
|
|
}
|
|
|
|
/// Groups FigiInfo by ISIN
|
|
fn group_figis_by_isin(figi_infos: &[FigiInfo]) -> HashMap<String, Vec<FigiInfo>> {
|
|
let mut grouped: HashMap<String, Vec<FigiInfo>> = HashMap::new();
|
|
|
|
for figi_info in figi_infos {
|
|
grouped.entry(figi_info.isin.clone())
|
|
.or_insert_with(Vec::new)
|
|
.push(figi_info.clone());
|
|
}
|
|
|
|
for figis in grouped.values_mut() {
|
|
figis.sort_by(|a, b| a.figi.cmp(&b.figi));
|
|
}
|
|
|
|
grouped
|
|
}
|
|
|
|
/// Parse warrant name to extract underlying company, issuer, and warrant type
|
|
///
|
|
/// Examples:
|
|
/// - "VONTOBE-PW26 LEONARDO SPA" -> ("LEONARDO SPA", Some("VONTOBEL"), "put")
|
|
/// - "BAYER H-CW25 L'OREAL" -> ("L'OREAL", Some("BAYER H"), "call")
|
|
/// - "APPLE INC WARRANT" -> ("APPLE INC", None, "unknown")
|
|
fn parse_warrant_name(name: &str) -> (String, Option<String>, String) {
|
|
let name_upper = name.to_uppercase();
|
|
|
|
// Try to detect warrant type from code (PW=put, CW=call)
|
|
let warrant_type = if name_upper.contains("-PW") || name_upper.contains(" PW") {
|
|
"put".to_string()
|
|
} else if name_upper.contains("-CW") || name_upper.contains(" CW") {
|
|
"call".to_string()
|
|
} else {
|
|
"unknown".to_string()
|
|
};
|
|
|
|
// Try to split by warrant code pattern (e.g., "-PW26", "-CW25")
|
|
if let Some(pos) = name.find("-PW") {
|
|
let before = name[..pos].trim();
|
|
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
|
|
let after = if after_idx < name.len() {
|
|
name[after_idx..].trim()
|
|
} else {
|
|
""
|
|
};
|
|
|
|
return (
|
|
after.to_string(),
|
|
if !before.is_empty() { Some(before.to_string()) } else { None },
|
|
warrant_type,
|
|
);
|
|
}
|
|
|
|
if let Some(pos) = name.find("-CW") {
|
|
let before = name[..pos].trim();
|
|
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
|
|
let after = if after_idx < name.len() {
|
|
name[after_idx..].trim()
|
|
} else {
|
|
""
|
|
};
|
|
|
|
return (
|
|
after.to_string(),
|
|
if !before.is_empty() { Some(before.to_string()) } else { None },
|
|
warrant_type,
|
|
);
|
|
}
|
|
|
|
// Fallback: return entire name as underlying
|
|
(name.to_string(), None, warrant_type)
|
|
}
|
|
|
|
/// Parse option name to extract underlying company, issuer, and option type
|
|
///
|
|
/// Examples:
|
|
/// - "December 25 Calls on ALPHA GA" -> ("ALPHA GA", None, "call")
|
|
/// - "January 26 Puts on TESLA INC" -> ("TESLA INC", None, "put")
|
|
fn parse_option_name(name: &str) -> (String, Option<String>, String) {
|
|
let name_upper = name.to_uppercase();
|
|
|
|
// Detect option type
|
|
let option_type = if name_upper.contains("CALL") {
|
|
"call".to_string()
|
|
} else if name_upper.contains("PUT") {
|
|
"put".to_string()
|
|
} else {
|
|
"unknown".to_string()
|
|
};
|
|
|
|
// Try to extract underlying after "on"
|
|
if let Some(pos) = name_upper.find(" ON ") {
|
|
let underlying = name[pos + 4..].trim().to_string();
|
|
return (underlying, None, option_type);
|
|
}
|
|
|
|
// Fallback: return entire name
|
|
(name.to_string(), None, option_type)
|
|
}
|
|
|
|
/// Statistics tracker for streaming processing
|
|
#[derive(Debug)]
|
|
struct StreamingStats {
|
|
initial_companies: usize,
|
|
initial_warrants: usize,
|
|
initial_options: usize,
|
|
initial_corporate_bonds: usize,
|
|
initial_government_bonds: usize,
|
|
companies_added: usize,
|
|
warrants_added: usize,
|
|
options_added: usize,
|
|
corporate_bonds_added: usize,
|
|
government_bonds_added: usize,
|
|
|
|
}
|
|
|
|
impl StreamingStats {
|
|
fn new(companies: usize, warrants: usize, options: usize, corporate_bonds: usize, government_bonds: usize) -> Self {
|
|
Self {
|
|
initial_companies: companies,
|
|
initial_warrants: warrants,
|
|
initial_options: options,
|
|
initial_corporate_bonds: corporate_bonds,
|
|
initial_government_bonds: government_bonds,
|
|
companies_added: 0,
|
|
warrants_added: 0,
|
|
options_added: 0,
|
|
corporate_bonds_added: 0,
|
|
government_bonds_added: 0,
|
|
}
|
|
}
|
|
|
|
fn print_summary(&self) {
|
|
println!("\n=== Processing Statistics ===");
|
|
println!("Companies:");
|
|
println!(" - Initial: {}", self.initial_companies);
|
|
println!(" - Added: {}", self.companies_added);
|
|
println!(" - Total: {}", self.initial_companies + self.companies_added);
|
|
println!("Warrants:");
|
|
println!(" - Initial: {}", self.initial_warrants);
|
|
println!(" - Added: {}", self.warrants_added);
|
|
println!(" - Total: {}", self.initial_warrants + self.warrants_added);
|
|
println!("Options:");
|
|
println!(" - Initial: {}", self.initial_options);
|
|
println!(" - Added: {}", self.options_added);
|
|
println!(" - Total: {}", self.initial_options + self.options_added);
|
|
println!("Corporate Bonds:");
|
|
println!(" - Initial: {}", self.initial_corporate_bonds);
|
|
println!(" - Added: {}", self.corporate_bonds_added);
|
|
println!(" - Total: {}", self.initial_corporate_bonds + self.corporate_bonds_added);
|
|
println!("Government Bonds:");
|
|
println!(" - Initial: {}", self.initial_government_bonds);
|
|
println!(" - Added: {}", self.government_bonds_added);
|
|
println!(" - Total: {}", self.initial_government_bonds + self.government_bonds_added);
|
|
}
|
|
}
|
|
|
|
async fn load_market_sectors() -> anyhow::Result<Vec<String>> {
|
|
let dir = DataPaths::new(".")?;
|
|
let cache_file = dir.cache_openfigi_dir().join("marketSecDes.json");
|
|
|
|
if !cache_file.exists() {
|
|
return Ok(vec![
|
|
"Comdty".to_string(),
|
|
"Corp".to_string(),
|
|
"Equity".to_string(),
|
|
"Govt".to_string(),
|
|
]);
|
|
}
|
|
|
|
let content = tokio_fs::read_to_string(&cache_file).await?;
|
|
let json: Value = serde_json::from_str(&content)?;
|
|
|
|
let sectors: Vec<String> = json["values"]
|
|
.as_array()
|
|
.ok_or_else(|| anyhow!("No values"))?
|
|
.iter()
|
|
.filter_map(|v| v.as_str().map(|s| s.to_string()))
|
|
.collect();
|
|
|
|
Ok(sectors)
|
|
}
|
|
|
|
async fn determine_gleif_date(
|
|
gleif_date: Option<&str>,
|
|
paths: &DataPaths,
|
|
) -> anyhow::Result<String> {
|
|
if let Some(d) = gleif_date {
|
|
return Ok(d.to_string());
|
|
}
|
|
|
|
let gleif_dir = paths.cache_gleif_dir();
|
|
let mut entries = tokio_fs::read_dir(gleif_dir).await?;
|
|
let mut dates = Vec::new();
|
|
|
|
while let Some(entry) = entries.next_entry().await? {
|
|
let path = entry.path();
|
|
if path.is_dir() {
|
|
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
|
|
if name.len() == 8 && name.chars().all(|c| c.is_numeric()) {
|
|
dates.push(name.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
dates.sort();
|
|
dates.last().cloned().ok_or_else(|| anyhow!("No GLEIF date found"))
|
|
}
|
|
|
|
async fn setup_sector_directories(
|
|
date_dir: &Path,
|
|
sector_dirs: &[String],
|
|
) -> anyhow::Result<()> {
|
|
let uncategorized_dir = date_dir.join("uncategorized");
|
|
tokio_fs::create_dir_all(&uncategorized_dir).await?;
|
|
|
|
for sector in sector_dirs {
|
|
let sector_dir = date_dir.join(sector);
|
|
tokio_fs::create_dir_all(§or_dir).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct MappingStats {
|
|
pub total_leis: usize,
|
|
pub mapped_leis: usize,
|
|
pub no_result_leis: usize,
|
|
pub unqueried_leis: usize,
|
|
pub mapping_percentage: f64,
|
|
pub queried_percentage: f64,
|
|
pub by_sector: HashMap<String, usize>,
|
|
}
|
|
|
|
/// Get detailed statistics about LEI-FIGI mapping status
|
|
pub async fn get_mapping_stats(
|
|
csv_path: &str,
|
|
gleif_date: Option<&str>,
|
|
) -> anyhow::Result<MappingStats> {
|
|
let dir = DataPaths::new(".")?;
|
|
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
|
|
|
|
let date = determine_gleif_date(gleif_date, &dir).await?;
|
|
let date_dir = map_cache_dir.join(&date);
|
|
|
|
let all_leis = get_all_leis_from_gleif(csv_path).await?;
|
|
let mapped_leis = load_existing_mapped_leis(&date_dir).await?;
|
|
let no_result_leis = load_no_result_leis(&date_dir).await?;
|
|
|
|
let total = all_leis.len();
|
|
let mapped = mapped_leis.len();
|
|
let no_results = no_result_leis.len();
|
|
let queried = mapped + no_results;
|
|
let unqueried = total.saturating_sub(queried);
|
|
|
|
let mapping_percentage = if total > 0 {
|
|
(mapped as f64 / total as f64) * 100.0
|
|
} else {
|
|
0.0
|
|
};
|
|
|
|
let queried_percentage = if total > 0 {
|
|
(queried as f64 / total as f64) * 100.0
|
|
} else {
|
|
0.0
|
|
};
|
|
|
|
// Count by sector
|
|
let mut by_sector = HashMap::new();
|
|
|
|
if date_dir.exists() {
|
|
let mut entries = tokio_fs::read_dir(&date_dir).await?;
|
|
|
|
while let Some(entry) = entries.next_entry().await? {
|
|
let sector_path = entry.path();
|
|
if !sector_path.is_dir() {
|
|
continue;
|
|
}
|
|
|
|
let sector_name = sector_path
|
|
.file_name()
|
|
.and_then(|n| n.to_str())
|
|
.unwrap_or("unknown")
|
|
.to_string();
|
|
|
|
let jsonl_path = sector_path.join("lei_to_figi.jsonl");
|
|
if !jsonl_path.exists() {
|
|
continue;
|
|
}
|
|
|
|
let content = tokio_fs::read_to_string(&jsonl_path).await?;
|
|
let count = content.lines().filter(|l| !l.trim().is_empty()).count();
|
|
by_sector.insert(sector_name, count);
|
|
}
|
|
}
|
|
|
|
Ok(MappingStats {
|
|
total_leis: total,
|
|
mapped_leis: mapped,
|
|
no_result_leis: no_results,
|
|
unqueried_leis: unqueried,
|
|
mapping_percentage,
|
|
queried_percentage,
|
|
by_sector,
|
|
})
|
|
}
|
|
|
|
/// Quick check if mapping is complete (returns true if all mapped)
|
|
pub async fn is_mapping_complete(csv_path: &str) -> anyhow::Result<bool> {
|
|
let dir = DataPaths::new(".")?;
|
|
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
|
|
|
|
let date = determine_gleif_date(None, &dir).await?;
|
|
let date_dir = map_cache_dir.join(&date);
|
|
|
|
let unmapped = get_unmapped_leis(csv_path, &date_dir).await?;
|
|
Ok(unmapped.is_empty())
|
|
}
|
|
|
|
/// Load all LEIs that have already been mapped from existing JSONL files
|
|
async fn load_existing_mapped_leis(date_dir: &Path) -> anyhow::Result<HashSet<String>> {
|
|
let mut mapped_leis = HashSet::new();
|
|
|
|
if !date_dir.exists() {
|
|
return Ok(mapped_leis);
|
|
}
|
|
|
|
// Read all sector directories
|
|
let mut entries = tokio_fs::read_dir(date_dir).await?;
|
|
|
|
while let Some(entry) = entries.next_entry().await? {
|
|
let sector_path = entry.path();
|
|
if !sector_path.is_dir() {
|
|
continue;
|
|
}
|
|
|
|
let jsonl_path = sector_path.join("lei_to_figi.jsonl");
|
|
if !jsonl_path.exists() {
|
|
continue;
|
|
}
|
|
|
|
// Read JSONL file line by line
|
|
let content = tokio_fs::read_to_string(&jsonl_path).await?;
|
|
for line in content.lines() {
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
|
|
if let Ok(entry) = serde_json::from_str::<Value>(line) {
|
|
if let Some(lei) = entry["lei"].as_str() {
|
|
mapped_leis.insert(lei.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !mapped_leis.is_empty() {
|
|
logger::log_info(&format!("Found {} already mapped LEIs", mapped_leis.len())).await;
|
|
}
|
|
|
|
Ok(mapped_leis)
|
|
}
|
|
|
|
/// Read GLEIF CSV and return all LEIs (without loading entire file into memory)
|
|
async fn get_all_leis_from_gleif(csv_path: &str) -> anyhow::Result<HashSet<String>> {
|
|
let content = tokio::fs::read_to_string(csv_path)
|
|
.await
|
|
.context(format!("Failed to read GLEIF CSV file: {}", csv_path))?;
|
|
|
|
let mut all_leis = HashSet::new();
|
|
|
|
for (idx, line) in content.lines().enumerate() {
|
|
if idx == 0 {
|
|
continue; // Skip header
|
|
}
|
|
|
|
let parts: Vec<&str> = line.split(',').collect();
|
|
|
|
if parts.len() < 2 {
|
|
continue;
|
|
}
|
|
|
|
let lei = parts[0].trim().trim_matches('"').to_string();
|
|
|
|
if !lei.is_empty() {
|
|
all_leis.insert(lei);
|
|
}
|
|
}
|
|
|
|
logger::log_info(&format!("Found {} total LEIs in GLEIF CSV", all_leis.len())).await;
|
|
Ok(all_leis)
|
|
}
|
|
|
|
/// Get unmapped LEIs by comparing GLEIF CSV with existing mappings
|
|
async fn get_unmapped_leis(
|
|
csv_path: &str,
|
|
date_dir: &Path,
|
|
) -> anyhow::Result<HashSet<String>> {
|
|
let all_leis = get_all_leis_from_gleif(csv_path).await?;
|
|
let mapped_leis = load_existing_mapped_leis(date_dir).await?;
|
|
let no_result_leis = load_no_result_leis(date_dir).await?;
|
|
|
|
// Calculate truly unmapped: all - (mapped + no_results)
|
|
let queried_leis: HashSet<String> = mapped_leis
|
|
.union(&no_result_leis)
|
|
.cloned()
|
|
.collect();
|
|
|
|
let unmapped: HashSet<String> = all_leis
|
|
.difference(&queried_leis)
|
|
.cloned()
|
|
.collect();
|
|
|
|
let total = all_leis.len();
|
|
let mapped = mapped_leis.len();
|
|
let no_results = no_result_leis.len();
|
|
let unqueried = unmapped.len();
|
|
|
|
logger::log_info(&format!(
|
|
"LEI Status: Total={}, Mapped={}, No Results={}, Unqueried={}",
|
|
total, mapped, no_results, unqueried
|
|
)).await;
|
|
|
|
Ok(unmapped)
|
|
}
|
|
|
|
/// Modified version that only processes specified LEIs
|
|
pub async fn stream_gleif_csv_and_build_figi_filtered(
|
|
csv_path: &str,
|
|
gleif_date: Option<&str>,
|
|
filter_leis: Option<&HashSet<String>>,
|
|
) -> anyhow::Result<()> {
|
|
logger::log_info(&format!("Streaming GLEIF CSV: {}", csv_path)).await;
|
|
|
|
let content = tokio::fs::read_to_string(csv_path)
|
|
.await
|
|
.context(format!("Failed to read GLEIF CSV file: {}", csv_path))?;
|
|
|
|
let client = OpenFigiClient::new().await?;
|
|
if !client.has_key {
|
|
logger::log_warn("No API key - skipping FIGI mapping").await;
|
|
return Ok(());
|
|
}
|
|
|
|
let dir = DataPaths::new(".")?;
|
|
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
|
|
|
|
let date = determine_gleif_date(gleif_date, &dir).await?;
|
|
let date_dir = map_cache_dir.join(&date);
|
|
tokio_fs::create_dir_all(&date_dir).await?;
|
|
|
|
let sector_dirs = load_market_sectors().await?;
|
|
setup_sector_directories(&date_dir, §or_dirs).await?;
|
|
|
|
let mut lei_batch: HashMap<String, Vec<String>> = HashMap::new();
|
|
let mut line_count = 0;
|
|
let mut processed_leis = 0;
|
|
let mut skipped_leis = 0;
|
|
|
|
for (idx, line) in content.lines().enumerate() {
|
|
if idx == 0 { continue; }
|
|
|
|
let parts: Vec<&str> = line.split(',').collect();
|
|
if parts.len() < 2 { continue; }
|
|
|
|
let lei = parts[0].trim().trim_matches('"').to_string();
|
|
let isin = parts[1].trim().trim_matches('"').to_string();
|
|
|
|
if lei.is_empty() || isin.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
// Apply filter if provided
|
|
if let Some(filter) = filter_leis {
|
|
if !filter.contains(&lei) {
|
|
skipped_leis += 1;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
lei_batch.entry(lei).or_default().push(isin);
|
|
line_count += 1;
|
|
|
|
// Process batch when full
|
|
if lei_batch.len() >= LEI_BATCH_SIZE {
|
|
process_and_save_figi_batch(&client, &lei_batch, &date_dir).await?;
|
|
processed_leis += lei_batch.len();
|
|
|
|
if processed_leis % 1000 == 0 {
|
|
logger::log_info(&format!("Queried {} LEIs...", processed_leis)).await;
|
|
}
|
|
|
|
lei_batch.clear();
|
|
tokio::task::yield_now().await;
|
|
}
|
|
}
|
|
|
|
// Process remaining
|
|
if !lei_batch.is_empty() {
|
|
process_and_save_figi_batch(&client, &lei_batch, &date_dir).await?;
|
|
processed_leis += lei_batch.len();
|
|
}
|
|
|
|
logger::log_info(&format!(
|
|
"✓ Queried {} LEIs, skipped {} already processed",
|
|
processed_leis,
|
|
skipped_leis
|
|
)).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Check mapping completion and process only unmapped LEIs
|
|
pub async fn update_lei_mapping(
|
|
csv_path: &str,
|
|
gleif_date: Option<&str>,
|
|
) -> anyhow::Result<bool> {
|
|
let dir = DataPaths::new(".")?;
|
|
let manager = StateManager::new(&dir.integrity_dir()).await?;
|
|
let step_name = "lei_figi_mapping_complete";
|
|
|
|
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
|
|
let date = determine_gleif_date(gleif_date, &dir).await?;
|
|
let date_dir = map_cache_dir.join(&date);
|
|
|
|
if manager.is_step_valid(step_name).await? {
|
|
logger::log_info(" LEI-FIGI mapping already completed and valid").await;
|
|
logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await;
|
|
return Ok(true);
|
|
}
|
|
|
|
// Get unmapped LEIs (excludes both mapped and no-result LEIs)
|
|
let unmapped = get_unmapped_leis(csv_path, &date_dir).await?;
|
|
|
|
if unmapped.is_empty() {
|
|
logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await;
|
|
track_lei_mapping_completion(&manager, &dir.integrity_dir()).await?;
|
|
logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await;
|
|
|
|
return Ok(true);
|
|
}
|
|
|
|
logger::log_info(&format!("Found {} LEIs that need querying - starting mapping...", unmapped.len())).await;
|
|
|
|
// Process only unmapped LEIs
|
|
stream_gleif_csv_and_build_figi_filtered(csv_path, gleif_date, Some(&unmapped)).await?;
|
|
|
|
// Verify completion
|
|
let still_unmapped = get_unmapped_leis(csv_path, &date_dir).await?;
|
|
|
|
if still_unmapped.is_empty() {
|
|
logger::log_info("✓ All LEIs successfully queried").await;
|
|
track_lei_mapping_completion(&manager, &date_dir).await?;
|
|
logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await;
|
|
Ok(true)
|
|
} else {
|
|
logger::log_warn(&format!(
|
|
"⚠ {} LEIs still unqueried (API errors or rate limits)",
|
|
still_unmapped.len()
|
|
)).await;
|
|
Ok(false)
|
|
}
|
|
}
|
|
|
|
/// Track LEI-FIGI mapping completion with content hash verification
|
|
async fn track_lei_mapping_completion(
|
|
manager: &StateManager,
|
|
date_dir: &Path,
|
|
) -> anyhow::Result<()> {
|
|
// Create content reference for all FIGI mapping files
|
|
// This will hash ALL lei_to_figi.jsonl files in sector directories
|
|
let content_reference = directory_reference(
|
|
date_dir,
|
|
Some(vec![
|
|
"*/lei_to_figi.jsonl".to_string(), // All sector mapping files
|
|
"no_results.jsonl".to_string(), // LEIs with no results
|
|
]),
|
|
Some(vec![
|
|
"*.tmp".to_string(), // Exclude temp files
|
|
"*.log".to_string(), // Exclude log files
|
|
]),
|
|
);
|
|
|
|
// Track completion with:
|
|
// - Content reference: All FIGI mapping files in date directory
|
|
// - Data stage: Cache (24-hour TTL) - FIGI data can change frequently
|
|
// - Dependencies: None (this is a collection step from external API)
|
|
manager.update_entry(
|
|
"lei_figi_mapping_complete".to_string(),
|
|
content_reference,
|
|
DataStage::Cache, // 24-hour TTL for API data
|
|
None, // Use default TTL
|
|
).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Load LEIs that were queried but returned no results
|
|
async fn load_no_result_leis(date_dir: &Path) -> anyhow::Result<HashSet<String>> {
|
|
let mut no_result_leis = HashSet::new();
|
|
|
|
let no_results_path = date_dir.join("no_results.jsonl");
|
|
if !no_results_path.exists() {
|
|
return Ok(no_result_leis);
|
|
}
|
|
|
|
let content = tokio_fs::read_to_string(&no_results_path).await?;
|
|
for line in content.lines() {
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
|
|
if let Ok(entry) = serde_json::from_str::<Value>(line) {
|
|
if let Some(lei) = entry["lei"].as_str() {
|
|
no_result_leis.insert(lei.to_string());
|
|
}
|
|
}
|
|
}
|
|
|
|
if !no_result_leis.is_empty() {
|
|
logger::log_info(&format!(
|
|
"Found {} LEIs previously queried with no FIGI results",
|
|
no_result_leis.len()
|
|
)).await;
|
|
}
|
|
|
|
Ok(no_result_leis)
|
|
}
|
|
|
|
/// Save LEI that was queried but returned no results
|
|
async fn append_no_result_lei(date_dir: &Path, lei: &str, isins: &[String]) -> anyhow::Result<()> {
|
|
let no_results_path = date_dir.join("no_results.jsonl");
|
|
|
|
let entry = json!({
|
|
"lei": lei,
|
|
"isins": isins,
|
|
"queried_at": chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
|
|
});
|
|
|
|
let line = serde_json::to_string(&entry)? + "\n";
|
|
|
|
let mut file = tokio_fs::OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(&no_results_path)
|
|
.await?;
|
|
|
|
file.write_all(line.as_bytes()).await?;
|
|
Ok(())
|
|
} |