added bond extraction from figi

This commit is contained in:
2026-01-12 15:58:06 +01:00
parent 659757482d
commit c0c9bc0ed9
4 changed files with 231 additions and 174 deletions

View File

@@ -1,15 +1,14 @@
// src/corporate/update_openfigi.rs - STREAMING VERSION
// Key changes: Never load entire GLEIF CSV or FIGI maps into memory
use super::types::*;
use super::bond_processing::*;
use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, directory_reference};
use crate::util::logger;
use crate::scraper::openfigi::{OpenFigiClient};
use super::types::*;
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::io::{BufRead, BufReader};
use tokio::fs as tokio_fs;
use tokio::io::AsyncWriteExt;
use anyhow::{Context, anyhow};
@@ -106,9 +105,7 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> {
let step_name = "securities_data_complete";
let data_dir = dir.data_dir();
let corporate_data_dir = data_dir.join("corporate");
let economic_data_dir = data_dir.join("economic");
let output_dir = data_dir.join("by_name");
let output_dir = data_dir.join("figi_securities");
tokio_fs::create_dir_all(&output_dir).await
.context("Failed to create corporate/by_name directory")?;
@@ -209,9 +206,13 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> {
&common_log,
&warrants_log,
&options_log,
&corporate_bonds_log,
&government_bonds_log,
&mut existing_companies,
&mut existing_warrants,
&mut existing_options,
&mut existing_corporate_bonds,
&mut existing_government_bonds,
&mut stats,
).await?;
@@ -228,6 +229,8 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> {
create_checkpoint(&common_checkpoint, &common_log).await?;
create_checkpoint(&warrants_checkpoint, &warrants_log).await?;
create_checkpoint(&options_checkpoint, &options_log).await?;
create_checkpoint(&corporate_bonds_checkpoint, &corporate_bonds_log).await?;
create_checkpoint(&government_bonds_checkpoint, &government_bonds_log).await?;
}
stats.print_summary();
@@ -251,6 +254,8 @@ async fn track_securities_completion(
"common_stocks.jsonl".to_string(),
"warrants.jsonl".to_string(),
"options.jsonl".to_string(),
"corporate_bonds.jsonl".to_string(),
"government_bonds.jsonl".to_string(),
]),
Some(vec![
"*.log.jsonl".to_string(), // Exclude log files
@@ -495,9 +500,13 @@ async fn process_lei_figi_file_batched(
common_log_path: &Path,
warrants_log_path: &Path,
options_log_path: &Path,
corporate_bonds_log_path: &Path,
government_bonds_log_path: &Path,
existing_companies: &mut HashSet<String>,
existing_warrants: &mut HashSet<String>,
existing_options: &mut HashSet<String>,
existing_corporate_bonds: &mut HashSet<String>,
existing_government_bonds: &mut HashSet<String>,
stats: &mut StreamingStats,
) -> anyhow::Result<()> {
let content = tokio_fs::read_to_string(input_path).await
@@ -506,9 +515,11 @@ async fn process_lei_figi_file_batched(
let batch_size = 100;
let mut processed_count = 0;
let mut common_batch = Vec::new();
let mut warrants_batch = Vec::new();
let mut options_batch = Vec::new();
let mut common_batch: Vec<CompanyInfo> = Vec::new();
let mut warrants_batch: Vec<WarrantInfo> = Vec::new();
let mut options_batch: Vec<OptionInfo> = Vec::new();
let mut corporate_bonds_batch: Vec<CorporateBondInfo> = Vec::new();
let mut government_bonds_batch: Vec<GovernmentBondInfo> = Vec::new();
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() {
@@ -526,7 +537,7 @@ async fn process_lei_figi_file_batched(
}
// Group by security type
let (common_stocks, warrant_securities, option_securities) =
let (common_stocks, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) =
group_by_security_type(&figis);
// Collect entries for batching and update existing keys
@@ -555,6 +566,24 @@ async fn process_lei_figi_file_batched(
options_batch.push(entry);
}
}
if !corporate_bonds_securities.is_empty() {
for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds) {
// Use underlying_company_name as the key (not issuer_company_name)
let key = entry.underlying_company_name.clone();
existing_corporate_bonds.insert(key);
corporate_bonds_batch.push(entry);
}
}
if !government_bonds_securities.is_empty() {
for entry in prepare_government_bond_entries(&government_bonds_securities, existing_government_bonds) {
// Use issuer_name as the key (not issuer_country_name)
let key = entry.issuer_name.clone();
existing_government_bonds.insert(key);
government_bonds_batch.push(entry);
}
}
// Write batches when they reach size limit
if common_batch.len() >= batch_size {
@@ -574,6 +603,18 @@ async fn process_lei_figi_file_batched(
stats.options_added += options_batch.len();
options_batch.clear();
}
if corporate_bonds_batch.len() >= batch_size {
write_batch_with_fsync(corporate_bonds_log_path, &corporate_bonds_batch).await?;
stats.corporate_bonds_added += corporate_bonds_batch.len();
corporate_bonds_batch.clear();
}
if government_bonds_batch.len() >= batch_size {
write_batch_with_fsync(government_bonds_log_path, &government_bonds_batch).await?;
stats.government_bonds_added += government_bonds_batch.len();
government_bonds_batch.clear();
}
processed_count += 1;
if processed_count % 1000 == 0 {
@@ -596,6 +637,16 @@ async fn process_lei_figi_file_batched(
write_batch_with_fsync(options_log_path, &options_batch).await?;
stats.options_added += options_batch.len();
}
if !corporate_bonds_batch.is_empty() {
write_batch_with_fsync(corporate_bonds_log_path, &corporate_bonds_batch).await?;
stats.corporate_bonds_added += corporate_bonds_batch.len();
}
if !government_bonds_batch.is_empty() {
write_batch_with_fsync(government_bonds_log_path, &government_bonds_batch).await?;
stats.government_bonds_added += government_bonds_batch.len();
}
Ok(())
}
@@ -719,11 +770,140 @@ fn prepare_option_entries(
entries
}
/// Prepares corporate bond entries for batching
///
/// Groups corporate bonds by issuer (underlying_company_name), extracting key bond details
/// like coupon rate, maturity date, and tenor from the ticker/description for each ISIN.
///
/// # Arguments
/// * `corporate_bond_securities` - List of FigiInfo objects for corporate bonds
/// * `existing_keys` - Set of already-processed keys (format: "company_name")
///
/// # Returns
/// Vector of CorporateBondInfo entries, one per unique issuer
fn prepare_corporate_bond_entries(
corporate_bond_securities: &[FigiInfo],
existing_keys: &HashSet<String>,
) -> Vec<CorporateBondInfo> {
let mut entries = Vec::new();
// Group bonds by issuer (company name)
let mut grouped: HashMap<String, Vec<FigiInfo>> = HashMap::new();
for figi in corporate_bond_securities {
let issuer = figi.name.clone();
if issuer.is_empty() {
continue;
}
grouped.entry(issuer).or_default().push(figi.clone());
}
// Create entries for each unique issuer
for (issuer, figis) in grouped {
// Check if this issuer already exists
if existing_keys.contains(&issuer) {
continue;
}
// Group by ISIN
let bonds_by_isin = group_figis_by_isin(&figis);
// Parse bond details for each ISIN
let mut bond_details_map: HashMap<String, BondDetails> = HashMap::new();
for (isin, isin_figis) in &bonds_by_isin {
if let Some(first_figi) = isin_figis.first() {
let details = parse_bond_details(&first_figi.ticker, &first_figi.security_description);
bond_details_map.insert(isin.clone(), details);
}
}
let bond_info = CorporateBondInfo {
underlying_company_name: issuer.clone(),
bonds: bonds_by_isin,
bond_details: bond_details_map,
};
entries.push(bond_info);
}
entries
}
/// Prepares government bond entries for batching
///
/// Groups government bonds by issuer (country/entity), extracting key bond
/// details like coupon rate, maturity date, and tenor from the ticker/description for each ISIN.
/// Also classifies the government issuer type (sovereign, municipal, agency, etc.)
///
/// # Arguments
/// * `government_bond_securities` - List of FigiInfo objects for government bonds
/// * `existing_keys` - Set of already-processed keys (format: "issuer_name")
///
/// # Returns
/// Vector of GovernmentBondInfo entries, one per unique issuer
fn prepare_government_bond_entries(
government_bond_securities: &[FigiInfo],
existing_keys: &HashSet<String>,
) -> Vec<GovernmentBondInfo> {
let mut entries = Vec::new();
// Group bonds by issuer (country/entity name)
let mut grouped: HashMap<String, Vec<FigiInfo>> = HashMap::new();
for figi in government_bond_securities {
let issuer = figi.name.clone();
if issuer.is_empty() {
continue;
}
grouped.entry(issuer).or_default().push(figi.clone());
}
// Create entries for each unique issuer
for (issuer, figis) in grouped {
// Check if this issuer already exists
if existing_keys.contains(&issuer) {
continue;
}
// Classify the government issuer type
let issuer_type = classify_government_issuer(&issuer);
// Group by ISIN
let bonds_by_isin = group_figis_by_isin(&figis);
// Parse bond details for each ISIN
let mut bond_details_map: HashMap<String, BondDetails> = HashMap::new();
for (isin, isin_figis) in &bonds_by_isin {
if let Some(first_figi) = isin_figis.first() {
let details = parse_bond_details(&first_figi.ticker, &first_figi.security_description);
bond_details_map.insert(isin.clone(), details);
}
}
let bond_info = GovernmentBondInfo {
issuer_name: issuer.clone(),
issuer_type,
bonds: bonds_by_isin,
bond_details: bond_details_map,
};
entries.push(bond_info);
}
entries
}
/// Groups FigiInfo list by security type
fn group_by_security_type(figis: &[FigiInfo]) -> (Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>) {
let mut common_stocks = Vec::new();
let mut warrants = Vec::new();
let mut options = Vec::new();
fn group_by_security_type(figis: &[FigiInfo]) -> (Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>) {
let mut common_stocks:Vec<FigiInfo> = Vec::new();
let mut warrants:Vec<FigiInfo> = Vec::new();
let mut options:Vec<FigiInfo> = Vec::new();
let mut corporate_bonds:Vec<FigiInfo> = Vec::new();
let mut government_bonds:Vec<FigiInfo> = Vec::new();
for figi in figis {
match figi.security_type.as_str() {
@@ -732,9 +912,14 @@ fn group_by_security_type(figis: &[FigiInfo]) -> (Vec<FigiInfo>, Vec<FigiInfo>,
"Equity Option" => options.push(figi.clone()),
_ => {}
}
match figi.security_type2.as_str() {
"Corp" => corporate_bonds.push(figi.clone()),
"Govt" => government_bonds.push(figi.clone()),
_ => {}
}
}
(common_stocks, warrants, options)
(common_stocks, warrants, options, corporate_bonds, government_bonds)
}
/// Groups FigiInfo by ISIN
@@ -1193,7 +1378,6 @@ pub async fn stream_gleif_csv_and_build_figi_filtered(
setup_sector_directories(&date_dir, &sector_dirs).await?;
let mut lei_batch: HashMap<String, Vec<String>> = HashMap::new();
let mut line_count = 0;
let mut processed_leis = 0;
let mut skipped_leis = 0;
@@ -1219,7 +1403,6 @@ pub async fn stream_gleif_csv_and_build_figi_filtered(
}
lei_batch.entry(lei).or_default().push(isin);
line_count += 1;
// Process batch when full
if lei_batch.len() >= LEI_BATCH_SIZE {