Files
WebScraper/src/corporate/update_openfigi.rs

1629 lines
55 KiB
Rust

// src/corporate/update_openfigi.rs - STREAMING VERSION
use super::types::*;
use super::helpers::{find_most_recent_figi_date_dir, determine_gleif_date};
use super::bond_processing::*;
use super::option_processing::*;
use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, directory_reference};
use crate::util::logger;
use crate::scraper::openfigi::{OpenFigiClient};
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use tokio::fs as tokio_fs;
use tokio::io::AsyncWriteExt;
use anyhow::{Context, anyhow};
const LEI_BATCH_SIZE: usize = 100; // Process 100 LEIs at a time
/// Loads or builds securities data by streaming through FIGI mapping files.
///
/// Implements abort-safe incremental persistence with checkpoints and replay logs.
///
/// # Arguments
/// * `date_dir` - Path to the date-specific mapping directory (e.g., cache/gleif_openfigi_map/24112025/)
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if file I/O fails or JSON parsing fails.
pub async fn update_securities() -> anyhow::Result<()> {
logger::log_info("Building securities data from FIGI mappings...").await;
let dir = DataPaths::new(".")?;
let manager = StateManager::new(&dir.integrity_dir()).await?;
let step_name = "securities_data_complete";
let date_dir = find_most_recent_figi_date_dir(&dir).await?
.ok_or_else(|| anyhow!("No FIGI date directory found"))?;
let data_dir = dir.data_dir();
let output_dir = data_dir.join("figi_securities");
tokio_fs::create_dir_all(&output_dir).await
.context("Failed to create corporate/by_name directory")?;
if manager.is_step_valid(step_name).await? {
logger::log_info(" Securities data already built and valid").await;
logger::log_info(" All sectors already processed, nothing to do").await;
return Ok(());
}
logger::log_info("Building securities data from FIGI mappings...").await;
tokio_fs::create_dir_all(&output_dir).await
.context("Failed to create corporate/by_name directory")?;
// Setup checkpoint and log paths for each security type
let common_checkpoint = output_dir.join("common_stocks.jsonl");
let common_log = output_dir.join("common_stocks.log.jsonl");
let warrants_checkpoint = output_dir.join("warrants.jsonl");
let warrants_log = output_dir.join("warrants.log.jsonl");
let options_checkpoint = output_dir.join("options.jsonl");
let options_log = output_dir.join("options.log.jsonl");
let corporate_bonds_checkpoint = output_dir.join("corporate_bonds.jsonl");
let corporate_bonds_log = output_dir.join("corporate_bonds.log.jsonl");
let government_bonds_checkpoint = output_dir.join("government_bonds.jsonl");
let government_bonds_log = output_dir.join("government_bonds.log.jsonl");
// Track which sectors have been fully processed
let processed_sectors_file = output_dir.join("state.jsonl");
let processed_sectors = load_processed_sectors(&processed_sectors_file).await?;
logger::log_info(&format!(" Already processed {} sectors", processed_sectors.len())).await;
// Collect sectors to process
let mut sectors_to_process = Vec::new();
let mut entries = tokio_fs::read_dir(date_dir).await
.context("Failed to read date directory")?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if !path.is_dir() {
continue;
}
let sector_name = path.file_name()
.and_then(|n| n.to_str())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string());
let lei_figi_file = path.join("lei_to_figi.jsonl");
if !lei_figi_file.exists() {
continue;
}
// Skip if already processed
if processed_sectors.contains(&sector_name) {
logger::log_info(&format!(" Skipping already processed sector: {}", sector_name)).await;
continue;
}
sectors_to_process.push((sector_name, lei_figi_file));
}
if sectors_to_process.is_empty() {
logger::log_info(" All sectors already processed, nothing to do").await;
return Ok(());
}
// Load checkpoints and replay logs - these are MUTABLE now
let mut existing_companies = load_checkpoint_and_replay(&common_checkpoint, &common_log, "name").await?;
let mut existing_warrants = load_checkpoint_and_replay_nested(&warrants_checkpoint, &warrants_log).await?;
let mut existing_options = load_checkpoint_and_replay_nested(&options_checkpoint, &options_log).await?;
let mut existing_corporate_bonds = load_checkpoint_and_replay_nested(&corporate_bonds_checkpoint, &corporate_bonds_log).await?;
let mut existing_government_bonds = load_checkpoint_and_replay_nested(&government_bonds_checkpoint, &government_bonds_log).await?;
logger::log_info(&format!(" Existing entries - Companies: {}, Warrants: {}, Options: {}, Corporate Bonds: {}, Government Bonds: {}",
existing_companies.len(), existing_warrants.len(), existing_options.len(), existing_corporate_bonds.len(), existing_government_bonds.len())).await;
// Process statistics
let mut stats = StreamingStats::new(
existing_companies.len(),
existing_warrants.len(),
existing_options.len(),
existing_corporate_bonds.len(),
existing_government_bonds.len()
);
logger::log_info(&format!(" Found {} sectors to process", sectors_to_process.len())).await;
// Process each sector
let mut newly_processed_sectors = Vec::new();
for (sector_name, lei_figi_file) in sectors_to_process {
logger::log_info(&format!(" Processing sector: {}", sector_name)).await;
// Stream through the lei_to_figi.jsonl file with batched writes
process_lei_figi_file_batched(
&lei_figi_file,
&common_log,
&warrants_log,
&options_log,
&corporate_bonds_log,
&government_bonds_log,
&mut existing_companies,
&mut existing_warrants,
&mut existing_options,
&mut existing_corporate_bonds,
&mut existing_government_bonds,
&mut stats,
).await?;
// Mark sector as processed
newly_processed_sectors.push(sector_name.clone());
// Append to processed sectors file immediately for crash safety
append_processed_sector(&processed_sectors_file, &sector_name).await?;
}
// Create checkpoints after all processing
if !newly_processed_sectors.is_empty() {
logger::log_info("Creating checkpoints...").await;
create_checkpoint(&common_checkpoint, &common_log).await?;
create_checkpoint(&warrants_checkpoint, &warrants_log).await?;
create_checkpoint(&options_checkpoint, &options_log).await?;
create_checkpoint(&corporate_bonds_checkpoint, &corporate_bonds_log).await?;
create_checkpoint(&government_bonds_checkpoint, &government_bonds_log).await?;
}
stats.print_summary();
logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await;
track_securities_completion(&manager, &output_dir).await?;
logger::log_info(" ✓ Securities data marked as complete with integrity tracking").await;
Ok(())
}
/// Track securities data completion with content hash verification
async fn track_securities_completion(
manager: &StateManager,
output_dir: &Path,
) -> anyhow::Result<()> {
// Create content reference for all output files
let content_reference = directory_reference(
output_dir,
Some(vec![
"common_stocks.jsonl".to_string(),
"warrants.jsonl".to_string(),
"options.jsonl".to_string(),
"corporate_bonds.jsonl".to_string(),
"government_bonds.jsonl".to_string(),
]),
Some(vec![
"*.log.jsonl".to_string(), // Exclude log files
"*.tmp".to_string(), // Exclude temp files
"state.jsonl".to_string(), // Exclude internal state tracking
]),
);
// Track completion with:
// - Content reference: All output JSONL files
// - Data stage: Data (7-day TTL) - Securities data relatively stable
// - Dependencies: LEI-FIGI mapping must be valid
manager.update_entry(
"securities_data_complete".to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days)
).await?;
Ok(())
}
/// Loads the list of sectors that have been fully processed
async fn load_processed_sectors(path: &Path) -> anyhow::Result<HashSet<String>> {
let mut sectors = HashSet::new();
if !path.exists() {
return Ok(sectors);
}
let content = tokio_fs::read_to_string(path).await
.context("Failed to read processed sectors file")?;
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
match serde_json::from_str::<Value>(line) {
Ok(entry) => {
if let Some(sector) = entry["sector"].as_str() {
sectors.insert(sector.to_string());
}
}
Err(e) => {
logger::log_warn(&format!(
"Skipping invalid processed sector line {}: {}",
line_num + 1, e
)).await;
}
}
}
Ok(sectors)
}
/// Appends a sector name to the processed sectors file with fsync
/// Appends a sector name to the processed sectors JSONL file with fsync
async fn append_processed_sector(path: &Path, sector_name: &str) -> anyhow::Result<()> {
use std::fs::OpenOptions;
use std::io::Write;
let entry = json!({
"sector": sector_name,
"completed_at": chrono::Utc::now().to_rfc3339(),
});
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.context("Failed to open processed sectors file")?;
let line = serde_json::to_string(&entry)
.context("Failed to serialize sector entry")? + "\n";
file.write_all(line.as_bytes())?;
// Ensure durability
file.sync_data()
.context("Failed to fsync processed sectors file")?;
Ok(())
}
/// Loads checkpoint and replays log, returning set of existing keys
async fn load_checkpoint_and_replay(
checkpoint_path: &Path,
log_path: &Path,
key_field: &str,
) -> anyhow::Result<HashSet<String>> {
let mut keys = HashSet::new();
// Load checkpoint if it exists
if checkpoint_path.exists() {
let content = tokio_fs::read_to_string(checkpoint_path).await
.context("Failed to read checkpoint")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(key) = entry[key_field].as_str() {
keys.insert(key.to_string());
}
}
}
}
// Replay log if it exists
if log_path.exists() {
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(key) = entry[key_field].as_str() {
keys.insert(key.to_string());
}
}
}
}
Ok(keys)
}
/// Loads checkpoint and replays log for nested structures (warrants/options)
async fn load_checkpoint_and_replay_nested(
checkpoint_path: &Path,
log_path: &Path,
) -> anyhow::Result<HashSet<String>> {
let mut keys = HashSet::new();
// Load checkpoint if it exists
if checkpoint_path.exists() {
let content = tokio_fs::read_to_string(checkpoint_path).await
.context("Failed to read checkpoint")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
let type_field = if entry.get("warrant_type").is_some() {
entry["warrant_type"].as_str().unwrap_or("")
} else {
entry["option_type"].as_str().unwrap_or("")
};
if !underlying.is_empty() && !type_field.is_empty() {
keys.insert(format!("{}::{}", underlying, type_field));
}
}
}
}
// Replay log if it exists
if log_path.exists() {
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
let type_field = if entry.get("warrant_type").is_some() {
entry["warrant_type"].as_str().unwrap_or("")
} else {
entry["option_type"].as_str().unwrap_or("")
};
if !underlying.is_empty() && !type_field.is_empty() {
keys.insert(format!("{}::{}", underlying, type_field));
}
}
}
}
Ok(keys)
}
/// Creates a checkpoint by copying log to checkpoint atomically
async fn create_checkpoint(checkpoint_path: &Path, log_path: &Path) -> anyhow::Result<()> {
if !log_path.exists() {
return Ok(());
}
// Read all committed lines from log
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log for checkpoint")?;
let committed_lines: Vec<&str> = content
.lines()
.filter(|line| !line.trim().is_empty() && line.ends_with('}'))
.collect();
if committed_lines.is_empty() {
return Ok(());
}
// Write to temporary file
let tmp_path = checkpoint_path.with_extension("tmp");
let mut tmp_file = std::fs::File::create(&tmp_path)
.context("Failed to create temp checkpoint")?;
for line in committed_lines {
use std::io::Write;
writeln!(tmp_file, "{}", line)?;
}
// Ensure data is flushed to disk
tmp_file.sync_data()
.context("Failed to sync temp checkpoint")?;
drop(tmp_file);
// Atomic rename
tokio_fs::rename(&tmp_path, checkpoint_path).await
.context("Failed to rename checkpoint")?;
// Clear log after successful checkpoint
tokio_fs::remove_file(log_path).await
.context("Failed to remove log after checkpoint")?;
Ok(())
}
/// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync
async fn process_lei_figi_file_batched(
input_path: &Path,
common_log_path: &Path,
warrants_log_path: &Path,
options_log_path: &Path,
corporate_bonds_log_path: &Path,
government_bonds_log_path: &Path,
existing_companies: &mut HashSet<String>,
existing_warrants: &mut HashSet<String>,
existing_options: &mut HashSet<String>,
existing_corporate_bonds: &mut HashSet<String>,
existing_government_bonds: &mut HashSet<String>,
stats: &mut StreamingStats,
) -> anyhow::Result<()> {
let content = tokio_fs::read_to_string(input_path).await
.context("Failed to read lei_to_figi.jsonl")?;
let batch_size = 100;
let mut processed_count = 0;
let mut common_batch: Vec<CompanyData> = Vec::new();
let mut warrants_batch: Vec<WarrantData> = Vec::new();
let mut options_batch: Vec<OptionData> = Vec::new();
let mut corporate_bonds_batch: Vec<CorporateBondData> = Vec::new();
let mut government_bonds_batch: Vec<GovernmentBondData> = Vec::new();
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() {
continue;
}
let entry: Value = serde_json::from_str(line)
.context(format!("Failed to parse JSON on line {}", line_num + 1))?;
let figis: Vec<FigiData> = serde_json::from_value(entry["figis"].clone())
.context("Invalid 'figis' field")?;
if figis.is_empty() {
continue;
}
// Group by security type
let (common_stocks, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) =
group_securities(&figis);
// Collect entries for batching and update existing keys
if !common_stocks.is_empty() {
if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) {
// Add to existing set immediately to prevent duplicates in same run
existing_companies.insert(entry.name.clone());
common_batch.push(entry);
}
}
if !warrant_securities.is_empty() {
for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) {
// Add to existing set immediately
let key = entry.company_id.clone();
existing_warrants.insert(key);
warrants_batch.push(entry);
}
}
if !option_securities.is_empty() {
for entry in prepare_option_entries(&option_securities, existing_options) {
// Add to existing set immediately
let key = entry.company_name.clone();
existing_options.insert(key);
options_batch.push(entry);
}
}
if !corporate_bonds_securities.is_empty() {
for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds) {
// Use underlying_company_name as the key (not issuer_company_name)
let key = entry.underlying_company_name.clone();
existing_corporate_bonds.insert(key);
corporate_bonds_batch.push(entry);
}
}
if !government_bonds_securities.is_empty() {
for entry in prepare_government_bond_entries(&government_bonds_securities, existing_government_bonds) {
// Use issuer_name as the key (not issuer_country_name)
let key = entry.issuer_name.clone();
existing_government_bonds.insert(key);
government_bonds_batch.push(entry);
}
}
// Write batches when they reach size limit
if common_batch.len() >= batch_size {
write_batch_with_fsync(common_log_path, &common_batch).await?;
stats.companies_added += common_batch.len();
common_batch.clear();
}
if warrants_batch.len() >= batch_size {
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
stats.warrants_added += warrants_batch.len();
warrants_batch.clear();
}
if options_batch.len() >= batch_size {
write_batch_with_fsync(options_log_path, &options_batch).await?;
stats.options_added += options_batch.len();
options_batch.clear();
}
if corporate_bonds_batch.len() >= batch_size {
write_batch_with_fsync(corporate_bonds_log_path, &corporate_bonds_batch).await?;
stats.corporate_bonds_added += corporate_bonds_batch.len();
corporate_bonds_batch.clear();
}
if government_bonds_batch.len() >= batch_size {
write_batch_with_fsync(government_bonds_log_path, &government_bonds_batch).await?;
stats.government_bonds_added += government_bonds_batch.len();
government_bonds_batch.clear();
}
processed_count += 1;
if processed_count % 1000 == 0 {
logger::log_info(&format!(" Processed {} LEI entries...", processed_count)).await;
}
}
// Write remaining batches
if !common_batch.is_empty() {
write_batch_with_fsync(common_log_path, &common_batch).await?;
stats.companies_added += common_batch.len();
}
if !warrants_batch.is_empty() {
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
stats.warrants_added += warrants_batch.len();
}
if !options_batch.is_empty() {
write_batch_with_fsync(options_log_path, &options_batch).await?;
stats.options_added += options_batch.len();
}
if !corporate_bonds_batch.is_empty() {
write_batch_with_fsync(corporate_bonds_log_path, &corporate_bonds_batch).await?;
stats.corporate_bonds_added += corporate_bonds_batch.len();
}
if !government_bonds_batch.is_empty() {
write_batch_with_fsync(government_bonds_log_path, &government_bonds_batch).await?;
stats.government_bonds_added += government_bonds_batch.len();
}
Ok(())
}
/// Writes a batch of entries to log with fsync
async fn write_batch_with_fsync<T: serde::Serialize>(
log_path: &Path,
entries: &[T],
) -> anyhow::Result<()> {
use std::fs::OpenOptions;
use std::io::Write;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
.context("Failed to open log file")?;
for entry in entries {
let line = serde_json::to_string(entry)
.context("Failed to serialize entry")?;
writeln!(file, "{}", line)?;
}
// Critical: fsync to ensure durability
file.sync_data()
.context("Failed to fsync log file")?;
Ok(())
}
/// Prepares a common stock entry if it doesn't exist
fn prepare_common_stock_entry(
figi_infos: &[FigiData],
existing_keys: &HashSet<String>,
) -> Option<CompanyData> {
let name = figi_infos[0].name.clone();
if name.is_empty() || existing_keys.contains(&name) {
return None;
}
let grouped_by_isin = group_figis_by_isin(figi_infos);
let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default();
let id = format!("company_{}", std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos());
Some(CompanyData {
id,
name,
primary_isin,
securities: grouped_by_isin,
yahoo_company_data: None,
})
}
/// Prepares warrant entries for batching
fn prepare_warrant_entries(
warrant_securities: &[FigiData],
existing_keys: &HashSet<String>,
) -> Vec<WarrantData> {
let mut entries = Vec::new();
// Group by underlying company
let mut grouped: HashMap<String, Vec<(String, FigiData)>> = HashMap::new();
for figi in warrant_securities {
let (underlying, _issuer, warrant_type) = parse_warrant_name(&figi.name);
if underlying.is_empty() {
continue;
}
grouped.entry(underlying.clone())
.or_default()
.push((warrant_type, figi.clone()));
}
// Create WarrantData for each underlying company
for (underlying_company, contracts) in grouped {
if existing_keys.contains(&underlying_company) {
continue;
}
let company_id = format!("warrant_{}", std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos());
let mut warrants_by_type: HashMap<String, WarrantDetails> = HashMap::new();
for (warrant_type, figi) in contracts {
let (_, issuer, _) = parse_warrant_name(&figi.name);
let warrant_detail = WarrantDetails {
company_id: company_id.clone(),
company_name: underlying_company.clone(),
issuer_company_name: issuer,
warrant_type: warrant_type.clone(),
warrants: {
let mut map = HashMap::new();
map.insert(figi.isin.clone(), vec![figi.clone()]);
map
},
};
let key = format!("{}_{}", underlying_company, warrant_type);
warrants_by_type.insert(key, warrant_detail);
}
let warrant_info = WarrantData {
company_id,
company_name: underlying_company.clone(),
warrants: warrants_by_type,
};
entries.push(warrant_info);
}
entries
}
/// Prepares option entries for batching
///
/// Groups option contracts by underlying company, extracts strike prices and expiration dates,
/// and builds OptionChain structures organizing calls and puts by expiration date.
///
/// # Arguments
/// * `option_securities` - List of FigiData objects for option contracts
/// * `existing_keys` - Set of already-processed keys (format: "company_name")
///
/// # Returns
/// Vector of OptionData entries, one per unique underlying company
fn prepare_option_entries(
option_securities: &[FigiData],
existing_keys: &HashSet<String>,
) -> Vec<OptionData> {
let mut entries = Vec::new();
// Group by underlying company
let mut grouped: HashMap<String, Vec<(String, FigiData)>> = HashMap::new();
for figi in option_securities {
let (underlying, _issuer, option_type) = parse_option_name(&figi.name);
if underlying.is_empty() {
continue;
}
grouped.entry(underlying.clone())
.or_default()
.push((option_type, figi.clone()));
}
// Create OptionData for each underlying company
for (underlying_company, contracts) in grouped {
if existing_keys.contains(&underlying_company) {
continue;
}
// Build OptionContracts and extract strikes/expirations
let mut option_contracts: HashMap<i64, (Vec<OptionContract>, Vec<OptionContract>)> = HashMap::new();
let mut all_strikes: std::collections::HashSet<u64> = std::collections::HashSet::new();
for (option_type, figi) in contracts {
// Parse strike price from ticker if available
let strike = parse_strike_from_ticker(&figi.ticker).unwrap_or(0.0);
let expiration = parse_expiration_from_ticker(&figi.ticker).unwrap_or(0);
if strike > 0.0 && expiration > 0 {
all_strikes.insert((strike * 100.0) as u64);
let contract = OptionContract {
strike,
last_price: None,
bid: None,
ask: None,
volume: None,
open_interest: None,
implied_volatility: None,
};
let entry = option_contracts.entry(expiration).or_insert((Vec::new(), Vec::new()));
match option_type.as_str() {
"call" => entry.0.push(contract),
"put" => entry.1.push(contract),
_ => {}
}
}
}
// Build OptionChains from contracts
let mut option_chains = Vec::new();
let mut expiration_dates = Vec::new();
for (expiration, (calls, puts)) in option_contracts {
expiration_dates.push(expiration);
option_chains.push(OptionChain {
expiration_date: expiration,
calls,
puts,
});
}
expiration_dates.sort();
option_chains.sort_by_key(|oc| oc.expiration_date);
let strikes: Vec<f64> = all_strikes
.iter()
.map(|s| *s as f64 / 100.0)
.collect::<Vec<_>>();
let option_data = OptionData {
company_id: underlying_company.clone(),
company_name: underlying_company.clone(),
expiration_dates,
strikes,
option: option_chains,
timestamp: chrono::Utc::now().timestamp(),
};
entries.push(option_data);
}
entries
}
/// Prepares corporate bond entries for batching
///
/// Groups corporate bonds by issuer (underlying_company_name), extracting key bond details
/// like coupon rate, maturity date, and tenor from the ticker/description for each ISIN.
///
/// # Arguments
/// * `corporate_bond_securities` - List of FigiInfo objects for corporate bonds
/// * `existing_keys` - Set of already-processed keys (format: "company_name")
///
/// # Returns
/// Vector of CorporateBondInfo entries, one per unique issuer
fn prepare_corporate_bond_entries(
corporate_bond_securities: &[FigiData],
existing_keys: &HashSet<String>,
) -> Vec<CorporateBondData> {
let mut entries = Vec::new();
// Group bonds by issuer (company name)
let mut grouped: HashMap<String, Vec<FigiData>> = HashMap::new();
for figi in corporate_bond_securities {
let issuer = figi.name.clone();
if issuer.is_empty() {
continue;
}
grouped.entry(issuer).or_default().push(figi.clone());
}
// Create entries for each unique issuer
for (issuer, figis) in grouped {
// Check if this issuer already exists
if existing_keys.contains(&issuer) {
continue;
}
// Group by ISIN
let bonds_by_isin = group_figis_by_isin(&figis);
// Parse bond details for each ISIN
let mut bond_details_map: HashMap<String, BondDetails> = HashMap::new();
for (isin, isin_figis) in &bonds_by_isin {
if let Some(first_figi) = isin_figis.first() {
let details = parse_bond_details(&first_figi.ticker, &first_figi.security_description);
bond_details_map.insert(isin.clone(), details);
}
}
let bond_info = CorporateBondData {
underlying_company_name: issuer.clone(),
bonds: bonds_by_isin,
bond_details: bond_details_map,
};
entries.push(bond_info);
}
entries
}
/// Prepares government bond entries for batching
///
/// Groups government bonds by issuer (country/entity), extracting key bond
/// details like coupon rate, maturity date, and tenor from the ticker/description for each ISIN.
/// Also classifies the government issuer type (sovereign, municipal, agency, etc.)
///
/// # Arguments
/// * `government_bond_securities` - List of FigiInfo objects for government bonds
/// * `existing_keys` - Set of already-processed keys (format: "issuer_name")
///
/// # Returns
/// Vector of GovernmentBondInfo entries, one per unique issuer
fn prepare_government_bond_entries(
government_bond_securities: &[FigiData],
existing_keys: &HashSet<String>,
) -> Vec<GovernmentBondData> {
let mut entries = Vec::new();
// Group bonds by issuer (country/entity name)
let mut grouped: HashMap<String, Vec<FigiData>> = HashMap::new();
for figi in government_bond_securities {
let issuer = figi.name.clone();
if issuer.is_empty() {
continue;
}
grouped.entry(issuer).or_default().push(figi.clone());
}
// Create entries for each unique issuer
for (issuer, figis) in grouped {
// Check if this issuer already exists
if existing_keys.contains(&issuer) {
continue;
}
// Classify the government issuer type
let issuer_type = classify_government_issuer(&issuer);
// Group by ISIN
let bonds_by_isin = group_figis_by_isin(&figis);
// Parse bond details for each ISIN
let mut bond_details_map: HashMap<String, BondDetails> = HashMap::new();
for (isin, isin_figis) in &bonds_by_isin {
if let Some(first_figi) = isin_figis.first() {
let details = parse_bond_details(&first_figi.ticker, &first_figi.security_description);
bond_details_map.insert(isin.clone(), details);
}
}
let bond_info = GovernmentBondData {
issuer_name: issuer.clone(),
issuer_type,
bonds: bonds_by_isin,
bond_details: bond_details_map,
};
entries.push(bond_info);
}
entries
}
/// Groups FigiInfo list by security type
fn group_securities(figis: &[FigiData]) -> (Vec<FigiData>, Vec<FigiData>, Vec<FigiData>, Vec<FigiData>, Vec<FigiData>) {
let mut common_stocks:Vec<FigiData> = Vec::new();
let mut warrants:Vec<FigiData> = Vec::new();
let mut options:Vec<FigiData> = Vec::new();
let mut corporate_bonds:Vec<FigiData> = Vec::new();
let mut government_bonds:Vec<FigiData> = Vec::new();
for figi in figis {
match figi.security_type.as_str() {
"Common Stock" => common_stocks.push(figi.clone()),
"Equity WRT" => warrants.push(figi.clone()),
"Equity Option" => options.push(figi.clone()),
_ => {}
}
match figi.security_type2.as_str() {
"Corp" => corporate_bonds.push(figi.clone()),
"Govt" => government_bonds.push(figi.clone()),
_ => {}
}
}
(common_stocks, warrants, options, corporate_bonds, government_bonds)
}
/// Groups FigiInfo by ISIN
fn group_figis_by_isin(figi_infos: &[FigiData]) -> HashMap<String, Vec<FigiData>> {
let mut grouped: HashMap<String, Vec<FigiData>> = HashMap::new();
for figi_info in figi_infos {
grouped.entry(figi_info.isin.clone())
.or_insert_with(Vec::new)
.push(figi_info.clone());
}
for figis in grouped.values_mut() {
figis.sort_by(|a, b| a.figi.cmp(&b.figi));
}
grouped
}
/// Parse warrant name to extract underlying company, issuer, and warrant type
///
/// Examples:
/// - "VONTOBE-PW26 LEONARDO SPA" -> ("LEONARDO SPA", Some("VONTOBEL"), "put")
/// - "BAYER H-CW25 L'OREAL" -> ("L'OREAL", Some("BAYER H"), "call")
/// - "APPLE INC WARRANT" -> ("APPLE INC", None, "unknown")
fn parse_warrant_name(name: &str) -> (String, Option<String>, String) {
let name_upper = name.to_uppercase();
// Try to detect warrant type from code (PW=put, CW=call)
let warrant_type = if name_upper.contains("-PW") || name_upper.contains(" PW") {
"put".to_string()
} else if name_upper.contains("-CW") || name_upper.contains(" CW") {
"call".to_string()
} else {
"unknown".to_string()
};
// Try to split by warrant code pattern (e.g., "-PW26", "-CW25")
if let Some(pos) = name.find("-PW") {
let before = name[..pos].trim();
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
let after = if after_idx < name.len() {
name[after_idx..].trim()
} else {
""
};
return (
after.to_string(),
if !before.is_empty() { Some(before.to_string()) } else { None },
warrant_type,
);
}
if let Some(pos) = name.find("-CW") {
let before = name[..pos].trim();
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
let after = if after_idx < name.len() {
name[after_idx..].trim()
} else {
""
};
return (
after.to_string(),
if !before.is_empty() { Some(before.to_string()) } else { None },
warrant_type,
);
}
// Fallback: return entire name as underlying
(name.to_string(), None, warrant_type)
}
/// Statistics tracker for streaming processing
#[derive(Debug)]
struct StreamingStats {
initial_companies: usize,
initial_warrants: usize,
initial_options: usize,
initial_corporate_bonds: usize,
initial_government_bonds: usize,
companies_added: usize,
warrants_added: usize,
options_added: usize,
corporate_bonds_added: usize,
government_bonds_added: usize,
}
impl StreamingStats {
fn new(companies: usize, warrants: usize, options: usize, corporate_bonds: usize, government_bonds: usize) -> Self {
Self {
initial_companies: companies,
initial_warrants: warrants,
initial_options: options,
initial_corporate_bonds: corporate_bonds,
initial_government_bonds: government_bonds,
companies_added: 0,
warrants_added: 0,
options_added: 0,
corporate_bonds_added: 0,
government_bonds_added: 0,
}
}
fn print_summary(&self) {
println!("\n=== Processing Statistics ===");
println!("Companies:");
println!(" - Initial: {}", self.initial_companies);
println!(" - Added: {}", self.companies_added);
println!(" - Total: {}", self.initial_companies + self.companies_added);
println!("Warrants:");
println!(" - Initial: {}", self.initial_warrants);
println!(" - Added: {}", self.warrants_added);
println!(" - Total: {}", self.initial_warrants + self.warrants_added);
println!("Options:");
println!(" - Initial: {}", self.initial_options);
println!(" - Added: {}", self.options_added);
println!(" - Total: {}", self.initial_options + self.options_added);
println!("Corporate Bonds:");
println!(" - Initial: {}", self.initial_corporate_bonds);
println!(" - Added: {}", self.corporate_bonds_added);
println!(" - Total: {}", self.initial_corporate_bonds + self.corporate_bonds_added);
println!("Government Bonds:");
println!(" - Initial: {}", self.initial_government_bonds);
println!(" - Added: {}", self.government_bonds_added);
println!(" - Total: {}", self.initial_government_bonds + self.government_bonds_added);
}
}
async fn load_market_sectors() -> anyhow::Result<Vec<String>> {
let dir = DataPaths::new(".")?;
let cache_file = dir.cache_openfigi_dir().join("marketSecDes.json");
if !cache_file.exists() {
return Ok(vec![
"Comdty".to_string(),
"Corp".to_string(),
"Equity".to_string(),
"Govt".to_string(),
]);
}
let content = tokio_fs::read_to_string(&cache_file).await?;
let json: Value = serde_json::from_str(&content)?;
let sectors: Vec<String> = json["values"]
.as_array()
.ok_or_else(|| anyhow!("No values"))?
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
Ok(sectors)
}
async fn setup_sector_directories(
date_dir: &Path,
sector_dirs: &[String],
) -> anyhow::Result<()> {
let uncategorized_dir = date_dir.join("uncategorized");
tokio_fs::create_dir_all(&uncategorized_dir).await?;
for sector in sector_dirs {
let sector_dir = date_dir.join(sector);
tokio_fs::create_dir_all(&sector_dir).await?;
}
Ok(())
}
#[derive(Debug)]
pub struct MappingStats {
pub total_leis: usize,
pub mapped_leis: usize,
pub no_result_leis: usize,
pub unqueried_leis: usize,
pub mapping_percentage: f64,
pub queried_percentage: f64,
pub by_sector: HashMap<String, usize>,
}
/// Get detailed statistics about LEI-FIGI mapping status
pub async fn get_mapping_stats(
csv_path: &str,
gleif_date: Option<&str>,
) -> anyhow::Result<MappingStats> {
let dir = DataPaths::new(".")?;
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
let date = determine_gleif_date(gleif_date, &dir).await?;
let date_dir = map_cache_dir.join(&date);
let all_leis = get_all_leis_from_gleif(csv_path).await?;
let mapped_leis = load_existing_mapped_leis(&date_dir).await?;
let no_result_leis = load_no_result_leis(&date_dir).await?;
let total = all_leis.len();
let mapped = mapped_leis.len();
let no_results = no_result_leis.len();
let queried = mapped + no_results;
let unqueried = total.saturating_sub(queried);
let mapping_percentage = if total > 0 {
(mapped as f64 / total as f64) * 100.0
} else {
0.0
};
let queried_percentage = if total > 0 {
(queried as f64 / total as f64) * 100.0
} else {
0.0
};
// Count by sector
let mut by_sector = HashMap::new();
if date_dir.exists() {
let mut entries = tokio_fs::read_dir(&date_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let sector_path = entry.path();
if !sector_path.is_dir() {
continue;
}
let sector_name = sector_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let jsonl_path = sector_path.join("lei_to_figi.jsonl");
if !jsonl_path.exists() {
continue;
}
let content = tokio_fs::read_to_string(&jsonl_path).await?;
let count = content.lines().filter(|l| !l.trim().is_empty()).count();
by_sector.insert(sector_name, count);
}
}
Ok(MappingStats {
total_leis: total,
mapped_leis: mapped,
no_result_leis: no_results,
unqueried_leis: unqueried,
mapping_percentage,
queried_percentage,
by_sector,
})
}
/// Quick check if mapping is complete (returns true if all mapped)
pub async fn is_mapping_complete(csv_path: &str) -> anyhow::Result<bool> {
let dir = DataPaths::new(".")?;
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
let date = determine_gleif_date(None, &dir).await?;
let date_dir = map_cache_dir.join(&date);
let unmapped = get_unmapped_leis(csv_path, &date_dir).await?;
Ok(unmapped.is_empty())
}
/// Load all LEIs that have already been mapped from existing JSONL files
async fn load_existing_mapped_leis(date_dir: &Path) -> anyhow::Result<HashSet<String>> {
let mut mapped_leis = HashSet::new();
if !date_dir.exists() {
return Ok(mapped_leis);
}
// Read all sector directories
let mut entries = tokio_fs::read_dir(date_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let sector_path = entry.path();
if !sector_path.is_dir() {
continue;
}
let jsonl_path = sector_path.join("lei_to_figi.jsonl");
if !jsonl_path.exists() {
continue;
}
// Read JSONL file line by line
let content = tokio_fs::read_to_string(&jsonl_path).await?;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(lei) = entry["lei"].as_str() {
mapped_leis.insert(lei.to_string());
}
}
}
}
if !mapped_leis.is_empty() {
logger::log_info(&format!("Found {} already mapped LEIs", mapped_leis.len())).await;
}
Ok(mapped_leis)
}
/// Read GLEIF CSV and return all LEIs (without loading entire file into memory)
async fn get_all_leis_from_gleif(csv_path: &str) -> anyhow::Result<HashSet<String>> {
let content = tokio::fs::read_to_string(csv_path)
.await
.context(format!("Failed to read GLEIF CSV file: {}", csv_path))?;
let mut all_leis = HashSet::new();
for (idx, line) in content.lines().enumerate() {
if idx == 0 {
continue; // Skip header
}
let parts: Vec<&str> = line.split(',').collect();
if parts.len() < 2 {
continue;
}
let lei = parts[0].trim().trim_matches('"').to_string();
if !lei.is_empty() {
all_leis.insert(lei);
}
}
logger::log_info(&format!("Found {} total LEIs in GLEIF CSV", all_leis.len())).await;
Ok(all_leis)
}
/// Get unmapped LEIs by comparing GLEIF CSV with existing mappings
async fn get_unmapped_leis(
csv_path: &str,
date_dir: &Path,
) -> anyhow::Result<HashSet<String>> {
let all_leis = get_all_leis_from_gleif(csv_path).await?;
let mapped_leis = load_existing_mapped_leis(date_dir).await?;
let no_result_leis = load_no_result_leis(date_dir).await?;
// Calculate truly unmapped: all - (mapped + no_results)
let queried_leis: HashSet<String> = mapped_leis
.union(&no_result_leis)
.cloned()
.collect();
let unmapped: HashSet<String> = all_leis
.difference(&queried_leis)
.cloned()
.collect();
let total = all_leis.len();
let mapped = mapped_leis.len();
let no_results = no_result_leis.len();
let unqueried = unmapped.len();
logger::log_info(&format!(
"LEI Status: Total={}, Mapped={}, No Results={}, Unqueried={}",
total, mapped, no_results, unqueried
)).await;
Ok(unmapped)
}
/// Modified version that only processes specified LEIs
pub async fn stream_gleif_csv_and_build_figi_filtered(
csv_path: &str,
gleif_date: Option<&str>,
filter_leis: Option<&HashSet<String>>,
) -> anyhow::Result<()> {
logger::log_info(&format!("Streaming GLEIF CSV: {}", csv_path)).await;
let content = tokio::fs::read_to_string(csv_path)
.await
.context(format!("Failed to read GLEIF CSV file: {}", csv_path))?;
let client = OpenFigiClient::new().await?;
if !client.has_key {
logger::log_warn("No API key - skipping FIGI mapping").await;
return Ok(());
}
let dir = DataPaths::new(".")?;
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
let date = determine_gleif_date(gleif_date, &dir).await?;
let date_dir = map_cache_dir.join(&date);
tokio_fs::create_dir_all(&date_dir).await?;
let sector_dirs = load_market_sectors().await?;
setup_sector_directories(&date_dir, &sector_dirs).await?;
let mut lei_batch: HashMap<String, Vec<String>> = HashMap::new();
let mut processed_leis = 0;
let mut skipped_leis = 0;
for (idx, line) in content.lines().enumerate() {
if idx == 0 { continue; }
let parts: Vec<&str> = line.split(',').collect();
if parts.len() < 2 { continue; }
let lei = parts[0].trim().trim_matches('"').to_string();
let isin = parts[1].trim().trim_matches('"').to_string();
if lei.is_empty() || isin.is_empty() {
continue;
}
// Apply filter if provided
if let Some(filter) = filter_leis {
if !filter.contains(&lei) {
skipped_leis += 1;
continue;
}
}
lei_batch.entry(lei).or_default().push(isin);
// Process batch when full
if lei_batch.len() >= LEI_BATCH_SIZE {
process_and_save_figi_batch(&client, &lei_batch, &date_dir).await?;
processed_leis += lei_batch.len();
if processed_leis % 1000 == 0 {
logger::log_info(&format!("Queried {} LEIs...", processed_leis)).await;
}
lei_batch.clear();
tokio::task::yield_now().await;
}
}
// Process remaining
if !lei_batch.is_empty() {
process_and_save_figi_batch(&client, &lei_batch, &date_dir).await?;
processed_leis += lei_batch.len();
}
logger::log_info(&format!(
"✓ Queried {} LEIs, skipped {} already processed",
processed_leis,
skipped_leis
)).await;
Ok(())
}
/// Check mapping completion and process only unmapped LEIs
pub async fn update_lei_mapping(
csv_path: &str,
gleif_date: Option<&str>,
) -> anyhow::Result<bool> {
let dir = DataPaths::new(".")?;
let manager = StateManager::new(&dir.integrity_dir()).await?;
let step_name = "lei_figi_mapping_complete";
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
let date = determine_gleif_date(gleif_date, &dir).await?;
let date_dir = map_cache_dir.join(&date);
if manager.is_step_valid(step_name).await? {
logger::log_info(" LEI-FIGI mapping already completed and valid").await;
logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await;
return Ok(true);
}
// Get unmapped LEIs (excludes both mapped and no-result LEIs)
let unmapped = get_unmapped_leis(csv_path, &date_dir).await?;
if unmapped.is_empty() {
logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await;
track_lei_mapping_completion(&manager, &dir.integrity_dir()).await?;
logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await;
return Ok(true);
}
logger::log_info(&format!("Found {} LEIs that need querying - starting mapping...", unmapped.len())).await;
// Process only unmapped LEIs
stream_gleif_csv_and_build_figi_filtered(csv_path, gleif_date, Some(&unmapped)).await?;
// Verify completion
let still_unmapped = get_unmapped_leis(csv_path, &date_dir).await?;
if still_unmapped.is_empty() {
logger::log_info("✓ All LEIs successfully queried").await;
track_lei_mapping_completion(&manager, &date_dir).await?;
logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await;
Ok(true)
} else {
logger::log_warn(&format!(
"{} LEIs still unqueried (API errors or rate limits)",
still_unmapped.len()
)).await;
Ok(false)
}
}
/// Track LEI-FIGI mapping completion with content hash verification
async fn track_lei_mapping_completion(
manager: &StateManager,
date_dir: &Path,
) -> anyhow::Result<()> {
// Create content reference for all FIGI mapping files
// This will hash ALL lei_to_figi.jsonl files in sector directories
let content_reference = directory_reference(
date_dir,
Some(vec![
"*/lei_to_figi.jsonl".to_string(), // All sector mapping files
"no_results.jsonl".to_string(), // LEIs with no results
]),
Some(vec![
"*.tmp".to_string(), // Exclude temp files
"*.log".to_string(), // Exclude log files
]),
);
// Track completion with:
// - Content reference: All FIGI mapping files in date directory
// - Data stage: Cache (24-hour TTL) - FIGI data can change frequently
// - Dependencies: None (this is a collection step from external API)
manager.update_entry(
"lei_figi_mapping_complete".to_string(),
content_reference,
DataStage::Cache, // 24-hour TTL for API data
None, // Use default TTL
).await?;
Ok(())
}
/// Load LEIs that were queried but returned no results
async fn load_no_result_leis(date_dir: &Path) -> anyhow::Result<HashSet<String>> {
let mut no_result_leis = HashSet::new();
let no_results_path = date_dir.join("no_results.jsonl");
if !no_results_path.exists() {
return Ok(no_result_leis);
}
let content = tokio_fs::read_to_string(&no_results_path).await?;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(lei) = entry["lei"].as_str() {
no_result_leis.insert(lei.to_string());
}
}
}
if !no_result_leis.is_empty() {
logger::log_info(&format!(
"Found {} LEIs previously queried with no FIGI results",
no_result_leis.len()
)).await;
}
Ok(no_result_leis)
}
/// Save LEI that was queried but returned no results
async fn append_no_result_lei(date_dir: &Path, lei: &str, isins: &[String]) -> anyhow::Result<()> {
let no_results_path = date_dir.join("no_results.jsonl");
let entry = json!({
"lei": lei,
"isins": isins,
"queried_at": chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
});
let line = serde_json::to_string(&entry)? + "\n";
let mut file = tokio_fs::OpenOptions::new()
.create(true)
.append(true)
.open(&no_results_path)
.await?;
file.write_all(line.as_bytes()).await?;
Ok(())
}
async fn process_and_save_figi_batch(
client: &OpenFigiClient,
lei_batch: &HashMap<String, Vec<String>>,
date_dir: &Path,
) -> anyhow::Result<()> {
for (lei, isins) in lei_batch {
let unique_isins: Vec<_> = isins.iter()
.cloned()
.collect::<HashSet<_>>()
.into_iter()
.collect();
let figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?;
if figi_infos.is_empty() {
// No FIGIs found - save to no_results.jsonl to avoid re-querying
append_no_result_lei(date_dir, lei, &unique_isins).await?;
continue;
}
// Save FIGIs by sector as before
save_figi_infos_by_sector(lei, &figi_infos, date_dir).await?;
}
Ok(())
}
async fn save_figi_infos_by_sector(
lei: &str,
figi_infos: &[FigiData],
date_dir: &Path,
) -> anyhow::Result<()> {
let mut by_sector: HashMap<String, Vec<FigiData>> = HashMap::new();
for figi_info in figi_infos {
let sector = if figi_info.market_sector.is_empty() {
"uncategorized".to_string()
} else {
figi_info.market_sector.clone()
};
by_sector.entry(sector).or_default().push(figi_info.clone());
}
for (sector, figis) in by_sector {
let sector_dir = date_dir.join(&sector);
let path = sector_dir.join("lei_to_figi.jsonl");
append_lei_to_figi_jsonl(&path, lei, &figis).await?;
}
Ok(())
}
async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiData]) -> anyhow::Result<()> {
let entry = json!({
"lei": lei,
"figis": figis,
});
let line = serde_json::to_string(&entry)? + "\n";
let mut file = tokio_fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?;
file.write_all(line.as_bytes()).await?;
Ok(())
}