added mapping figi info onto common shares / warrants / options

This commit is contained in:
2025-12-04 21:03:55 +01:00
parent 787a08d6f1
commit b0a471ea84
13 changed files with 417 additions and 592 deletions

View File

@@ -209,7 +209,7 @@ impl OpenFigiClient {
/// Builds a LEI-to-FigiInfo map from the LEI-ISIN mapping, filtering for equities via OpenFIGI.
///
/// Attempts to load existing entries from "data/companies_by_lei/lei_to_figi.jsonl" (JSON Lines format,
/// Attempts to load existing entries from "data/corporate/by_lei/lei_to_figi.jsonl" (JSON Lines format,
/// one LEI entry per line: {"lei": "ABC", "figis": [FigiInfo...]}). For any missing LEIs (compared to
/// `lei_to_isins`), fetches their FigiInfos and appends to the .jsonl file incrementally.
///
@@ -231,7 +231,7 @@ impl OpenFigiClient {
/// Returns an error if file I/O fails, JSON serialization/deserialization fails,
/// or if OpenFIGI queries fail during fetching.
pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
let data_dir = Path::new("data/companies_by_lei");
let data_dir = Path::new("data/corporate/by_lei");
tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?;
let path = data_dir.join("lei_to_figi.jsonl");
@@ -358,127 +358,305 @@ fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyho
Ok(())
}
/// Loads or builds a HashMap of CompanyInfo objects indexed by company name.
/// Loads or builds HashMaps for companies, warrants, and options.
///
/// This function:
/// 1. Attempts to load existing companies from cache
/// 2. If cache exists, updates/extends it with new data from figi_to_lei
/// 3. If no cache exists, creates a new HashMap from scratch
/// 4. Saves the result back to cache
///
/// For existing entries (matched by name):
/// - Merges securities lists (deduplicates by FIGI)
/// - Updates primary_isin if the existing one is empty or not in the securities list
///
/// For new entries:
/// - Adds them to the HashMap
///
/// Companies with no FigiInfo data are skipped.
/// The resulting HashMap is saved to `data/companies_by_name/companies.json`.
/// 1. Attempts to load existing data from cache
/// 2. Processes new FIGI data and classifies by securityType:
/// - "Common Stock" → companies HashMap (grouped by ISIN)
/// - "Equity WRT" → warrants HashMap (parsed from name)
/// - "Equity Option" → options HashMap (parsed from name)
/// 3. Updates/extends existing entries
/// 4. Saves results to separate JSON files
///
/// # Arguments
/// * `figi_to_lei` - HashMap mapping LEI to Vec<FigiInfo>.
///
/// # Returns
/// A HashMap mapping company name to CompanyInfo.
/// A tuple of (companies, warrants, options) HashMaps.
///
/// # Errors
/// Returns an error if file I/O fails or JSON serialization fails.
pub async fn load_or_build_companies_by_name(
pub async fn load_or_build_all_securities(
figi_to_lei: &HashMap<String, Vec<FigiInfo>>
) -> anyhow::Result<HashMap<String, CompanyInfo>> {
// Try to load existing cache
let mut companies_by_name = match load_companies_by_name_internal().await? {
Some(existing) => {
println!("Loaded {} existing companies from cache", existing.len());
existing
},
None => {
println!("No existing cache found, creating new companies HashMap");
HashMap::new()
}
};
) -> anyhow::Result<(
HashMap<String, CompanyInfo>,
HashMap<String, HashMap<String, WarrantInfo>>,
HashMap<String, HashMap<String, OptionInfo>>
)> {
// Load existing data
let mut companies = load_from_cache("data/corporate/by_name/common_stocks.json").await?
.unwrap_or_else(HashMap::new);
let mut warrants = load_from_cache("data/corporate/by_name/warrants.json").await?
.unwrap_or_else(HashMap::new);
let mut options = load_from_cache("data/corporate/by_name/options.json").await?
.unwrap_or_else(HashMap::new);
let initial_count = companies_by_name.len();
let mut added_count = 0;
let mut updated_count = 0;
println!("Loaded existing data:");
println!(" - Companies: {}", companies.len());
println!(" - Warrants: {}", warrants.len());
println!(" - Options: {}", options.len());
let mut stats = ProcessingStats::new(companies.len(), warrants.len(), options.len());
println!("Processing {} LEI entries from FIGI data...", figi_to_lei.len());
for (lei, figi_infos) in figi_to_lei.iter() {
// Skip entries with no FigiInfo data
if figi_infos.is_empty() {
continue;
}
// Get company name from first FigiInfo entry
let name = figi_infos[0].name.clone();
if name.is_empty() {
continue;
// Group FigiInfos by security type
let mut common_stocks = Vec::new();
let mut warrant_securities = Vec::new();
let mut option_securities = Vec::new();
for figi_info in figi_infos {
match figi_info.securityType.as_str() {
"Common Stock" => common_stocks.push(figi_info.clone()),
"Equity WRT" => warrant_securities.push(figi_info.clone()),
"Equity Option" => option_securities.push(figi_info.clone()),
_ => {} // Ignore other types
}
}
// Check if company already exists
if let Some(existing_company) = companies_by_name.get_mut(&name) {
// Update existing entry
let merged_securities = merge_securities(&existing_company.securities, figi_infos);
let securities_added = merged_securities.len() - existing_company.securities.len();
if securities_added > 0 {
existing_company.securities = merged_securities;
// Update primary_isin if needed
if existing_company.primary_isin.is_empty() ||
!existing_company.securities.iter().any(|s| s.isin == existing_company.primary_isin) {
existing_company.primary_isin = existing_company.securities[0].isin.clone();
}
updated_count += 1;
}
} else {
// Add new entry
let primary_isin = figi_infos[0].isin.clone();
let securities = figi_infos.clone();
let company_info = CompanyInfo {
name: name.clone(),
primary_isin,
securities,
};
companies_by_name.insert(name, company_info);
added_count += 1;
// Process common stocks -> companies
if !common_stocks.is_empty() {
process_common_stocks(&mut companies, &common_stocks, &mut stats);
}
// Process warrants
if !warrant_securities.is_empty() {
process_warrants(&mut warrants, &warrant_securities, &mut stats);
}
// Process options
if !option_securities.is_empty() {
process_options(&mut options, &option_securities, &mut stats);
}
}
println!(" Companies statistics:");
println!(" - Initial: {}", initial_count);
println!(" - Added: {}", added_count);
println!(" - Updated: {}", updated_count);
println!(" - Total: {}", companies_by_name.len());
stats.print_summary(companies.len(), warrants.len(), options.len());
// Save to JSON
save_companies_by_name(&companies_by_name).await?;
// Save all three HashMaps
save_to_cache("data/corporate/by_name/common_stocks.json", &companies).await?;
save_to_cache("data/corporate/by_name/warrants.json", &warrants).await?;
save_to_cache("data/corporate/by_name/options.json", &options).await?;
Ok(companies_by_name)
Ok((companies, warrants, options))
}
/// Merges two lists of FigiInfo, deduplicating by FIGI.
///
/// # Arguments
/// * `existing` - Existing securities list
/// * `new_securities` - New securities to merge
///
/// # Returns
/// Merged and deduplicated list of FigiInfo
fn merge_securities(existing: &[FigiInfo], new_securities: &[FigiInfo]) -> Vec<FigiInfo> {
/// Statistics tracker for processing
#[derive(Debug)]
struct ProcessingStats {
initial_companies: usize,
initial_warrants: usize,
initial_options: usize,
companies_added: usize,
companies_updated: usize,
warrants_added: usize,
warrants_updated: usize,
options_added: usize,
options_updated: usize,
}
impl ProcessingStats {
fn new(companies: usize, warrants: usize, options: usize) -> Self {
Self {
initial_companies: companies,
initial_warrants: warrants,
initial_options: options,
companies_added: 0,
companies_updated: 0,
warrants_added: 0,
warrants_updated: 0,
options_added: 0,
options_updated: 0,
}
}
fn print_summary(&self, final_companies: usize, final_warrants: usize, final_options: usize) {
println!("\n=== Processing Statistics ===");
println!("Companies:");
println!(" - Initial: {}", self.initial_companies);
println!(" - Added: {}", self.companies_added);
println!(" - Updated: {}", self.companies_updated);
println!(" - Total: {}", final_companies);
println!("Warrants:");
println!(" - Initial: {}", self.initial_warrants);
println!(" - Added: {}", self.warrants_added);
println!(" - Updated: {}", self.warrants_updated);
println!(" - Total: {}", final_warrants);
println!("Options:");
println!(" - Initial: {}", self.initial_options);
println!(" - Added: {}", self.options_added);
println!(" - Updated: {}", self.options_updated);
println!(" - Total: {}", final_options);
}
}
/// Process common stocks into companies HashMap
fn process_common_stocks(
companies: &mut HashMap<String, CompanyInfo>,
figi_infos: &[FigiInfo],
stats: &mut ProcessingStats,
) {
let name = figi_infos[0].name.clone();
if name.is_empty() {
return;
}
// Group by ISIN
let grouped_by_isin = group_by_isin(figi_infos);
if let Some(existing) = companies.get_mut(&name) {
// Update existing company
let mut updated = false;
for (isin, new_figis) in grouped_by_isin {
if let Some(existing_figis) = existing.securities.get_mut(&isin) {
let merged = merge_figi_list(existing_figis, &new_figis);
if merged.len() > existing_figis.len() {
*existing_figis = merged;
updated = true;
}
} else {
existing.securities.insert(isin.clone(), new_figis);
updated = true;
}
}
// Update primary ISIN if needed
if existing.primary_isin.is_empty() || !existing.securities.contains_key(&existing.primary_isin) {
if let Some(first_isin) = existing.securities.keys().next() {
existing.primary_isin = first_isin.clone();
}
}
if updated {
stats.companies_updated += 1;
}
} else {
// Add new company
let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default();
companies.insert(name.clone(), CompanyInfo {
name,
primary_isin,
securities: grouped_by_isin,
});
stats.companies_added += 1;
}
}
/// Process warrants into warrants HashMap
fn process_warrants(
warrants: &mut HashMap<String, HashMap<String, WarrantInfo>>,
warrant_securities: &[FigiInfo],
stats: &mut ProcessingStats, // Assuming Stats is a struct; adjust based on actual type if it's a HashMap or other
) {
for figi in warrant_securities.iter() {
// Parse the name to extract underlying, issuer, and warrant_type
// (Assuming a parse_warrant_name function exists; this is not changed)
let (underlying, issuer, warrant_type) = parse_warrant_name(&figi.name);
if underlying.is_empty() {
continue;
}
// Outer map: key by underlying
let underlying_map = warrants
.entry(underlying.clone())
.or_insert_with(HashMap::new);
// Inner map: key by warrant_type
let entry = underlying_map.entry(warrant_type.clone()).or_insert(WarrantInfo {
underlying_company_name: underlying.clone(),
issuer_company_name: issuer,
warrant_type: warrant_type.clone(),
warrants: HashMap::new(),
});
// Group by ISIN as before
entry
.warrants
.entry(figi.isin.clone())
.or_insert_with(Vec::new)
.push(figi.clone());
// Update stats (assuming stats has a 'warrants' field; adjust if needed)
stats.warrants_added += 1;
}
}
/// Process options into options HashMap
fn process_options(
options: &mut HashMap<String, HashMap<String, OptionInfo>>,
option_securities: &[FigiInfo],
stats: &mut ProcessingStats, // Assuming Stats is a struct; adjust based on actual type if it's a HashMap or other
) {
for figi in option_securities.iter() {
// Parse the name to extract underlying, issuer, and option_type
// (Assuming a parse_option_name function exists; this is not changed)
let (underlying, issuer, option_type) = parse_option_name(&figi.name);
if underlying.is_empty() {
continue;
}
// Outer map: key by underlying
let underlying_map = options
.entry(underlying.clone())
.or_insert_with(HashMap::new);
// Inner map: key by option_type
let entry = underlying_map.entry(option_type.clone()).or_insert(OptionInfo {
underlying_company_name: underlying.clone(),
issuer_company_name: issuer,
option_type: option_type.clone(),
options: HashMap::new(),
});
// Group by ISIN as before
entry
.options
.entry(figi.isin.clone())
.or_insert_with(Vec::new)
.push(figi.clone());
// Update stats (assuming stats has an 'options' field; adjust if needed)
stats.options_added += 1;
}
}
/// Groups FigiInfo list by ISIN
fn group_by_isin(figi_infos: &[FigiInfo]) -> HashMap<String, Vec<FigiInfo>> {
let mut grouped: HashMap<String, Vec<FigiInfo>> = HashMap::new();
for figi_info in figi_infos {
grouped.entry(figi_info.isin.clone())
.or_insert_with(Vec::new)
.push(figi_info.clone());
}
// Sort each group by FIGI for consistency
for figis in grouped.values_mut() {
figis.sort_by(|a, b| a.figi.cmp(&b.figi));
}
grouped
}
/// Merges two FigiInfo lists, deduplicating by FIGI
fn merge_figi_list(existing: &[FigiInfo], new_figis: &[FigiInfo]) -> Vec<FigiInfo> {
let mut merged = existing.to_vec();
let existing_figis: HashSet<String> = existing.iter()
.map(|f| f.figi.clone())
.collect();
for new_sec in new_securities {
if !existing_figis.contains(&new_sec.figi) {
merged.push(new_sec.clone());
for new_figi in new_figis {
if !existing_figis.contains(&new_figi.figi) {
merged.push(new_figi.clone());
}
}
@@ -488,49 +666,126 @@ fn merge_securities(existing: &[FigiInfo], new_securities: &[FigiInfo]) -> Vec<F
merged
}
/// Internal function to load the companies HashMap from cache.
///
/// # Returns
/// Some(HashMap) if the cache file exists and is valid, None otherwise.
///
/// # Errors
/// Returns an error if file I/O fails or JSON parsing fails.
async fn load_companies_by_name_internal() -> anyhow::Result<Option<HashMap<String, CompanyInfo>>> {
let cache_file = Path::new("data/companies_by_name/companies.json");
/// Parse warrant name to extract underlying company, issuer, and warrant type
///
/// Examples:
/// - "VONTOBE-PW26 LEONARDO SPA" -> ("LEONARDO SPA", Some("VONTOBEL"), "put")
/// - "BAYER H-CW25 L'OREAL" -> ("L'OREAL", Some("BAYER H"), "call")
/// - "APPLE INC WARRANT" -> ("APPLE INC", None, "unknown")
fn parse_warrant_name(name: &str) -> (String, Option<String>, String) {
let name_upper = name.to_uppercase();
// Try to detect warrant type from code (PW=put, CW=call)
let warrant_type = if name_upper.contains("-PW") || name_upper.contains(" PW") {
"put".to_string()
} else if name_upper.contains("-CW") || name_upper.contains(" CW") {
"call".to_string()
} else {
"unknown".to_string()
};
// Try to split by warrant code pattern (e.g., "-PW26", "-CW25")
if let Some(pos) = name.find("-PW") {
let before = name[..pos].trim();
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
let after = if after_idx < name.len() {
name[after_idx..].trim()
} else {
""
};
return (
after.to_string(),
if !before.is_empty() { Some(before.to_string()) } else { None },
warrant_type,
);
}
if let Some(pos) = name.find("-CW") {
let before = name[..pos].trim();
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
let after = if after_idx < name.len() {
name[after_idx..].trim()
} else {
""
};
return (
after.to_string(),
if !before.is_empty() { Some(before.to_string()) } else { None },
warrant_type,
);
}
// Fallback: return entire name as underlying
(name.to_string(), None, warrant_type)
}
/// Parse option name to extract underlying company, issuer, and option type
///
/// Examples:
/// - "December 25 Calls on ALPHA GA" -> ("ALPHA GA", None, "call")
/// - "January 26 Puts on TESLA INC" -> ("TESLA INC", None, "put")
fn parse_option_name(name: &str) -> (String, Option<String>, String) {
let name_upper = name.to_uppercase();
// Detect option type
let option_type = if name_upper.contains("CALL") {
"call".to_string()
} else if name_upper.contains("PUT") {
"put".to_string()
} else {
"unknown".to_string()
};
// Try to extract underlying after "on"
if let Some(pos) = name_upper.find(" ON ") {
let underlying = name[pos + 4..].trim().to_string();
return (underlying, None, option_type);
}
// Fallback: return entire name
(name.to_string(), None, option_type)
}
/// Generic function to load from cache
async fn load_from_cache<T>(path: &str) -> anyhow::Result<Option<T>>
where
T: serde::de::DeserializeOwned,
{
let cache_file = Path::new(path);
if !cache_file.exists() {
return Ok(None);
}
let content = tokio_fs::read_to_string(cache_file).await
.context("Failed to read companies.json")?;
.context(format!("Failed to read {}", path))?;
let companies: HashMap<String, CompanyInfo> = serde_json::from_str(&content)
.context("Failed to parse companies.json")?;
let data: T = serde_json::from_str(&content)
.context(format!("Failed to parse {}", path))?;
Ok(Some(companies))
Ok(Some(data))
}
/// Saves the companies HashMap to cache.
///
/// # Arguments
/// * `companies` - The companies HashMap to save
///
/// # Errors
/// Returns an error if file I/O fails or JSON serialization fails.
async fn save_companies_by_name(companies: &HashMap<String, CompanyInfo>) -> anyhow::Result<()> {
let cache_dir = Path::new("data/companies_by_name");
/// Generic function to save to cache
async fn save_to_cache<T>(path: &str, data: &T) -> anyhow::Result<()>
where
T: serde::Serialize,
{
let cache_path = Path::new(path);
let cache_dir = cache_path.parent().context("Invalid cache path")?;
tokio_fs::create_dir_all(cache_dir).await
.context("Failed to create data/companies_by_name directory")?;
.context(format!("Failed to create directory for {}", path))?;
let cache_file = cache_dir.join("companies.json");
let json_str = serde_json::to_string_pretty(&companies)
.context("Failed to serialize companies to JSON")?;
let json_str = serde_json::to_string_pretty(data)
.context("Failed to serialize data")?;
tokio_fs::write(&cache_file, json_str).await
.context("Failed to write companies.json")?;
tokio_fs::write(cache_path, json_str).await
.context(format!("Failed to write {}", path))?;
println!(" ✓ Saved {} companies to {}", companies.len(), cache_file.display());
println!(" ✓ Saved to {}", path);
Ok(())
}

View File

@@ -670,8 +670,8 @@ pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow
pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download";
let zip_path = "data/isin_lei.zip";
let csv_path = "data/isin_lei.csv";
let zip_path = "data/gleif/isin_lei.zip";
let csv_path = "data/gleif/isin_lei.csv";
if let Err(e) = std::fs::create_dir_all("data") {
println!("Failed to create data directory: {e}");

View File

@@ -1,3 +1,5 @@
use std::collections::HashMap;
// src/corporate/types.rs
use serde::{Deserialize, Serialize};
@@ -75,11 +77,42 @@ pub struct CompanyMetadata {
/// # Attributes
/// * Name as primary key (for one instition) -> might have to changed when first FigiInfo is coming in
/// * ISIN as the most liquid / preferred traded security (used for fallback)
/// * securities: Grouped by ISIN, filtered for Common Stock only
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyInfo{
pub name: String,
pub primary_isin: String,
pub securities: Vec<FigiInfo>,
pub securities: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo>
}
/// Warrant Info
///
/// Information for Warrant securities fetched out of Name in FigiInfo
/// example1: "name": "VONTOBE-PW26 LEONARDO SPA",
/// issued by VONTOBEL Put Warrant for underlying company LEONARDO SPA
/// example2: "BAYER H-CW25 L'OREAL",
/// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WarrantInfo {
pub underlying_company_name: String, // key in CompanyInfo, key for WarrantInfo
pub issuer_company_name: Option<String>, // key in CompanyInfo
pub warrant_type: String, // "put" or "call"
pub warrants: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
}
/// Option Info
///
/// Information for Option securities fetched out of Name in FigiInfo
/// example1: "name": "December 25 Calls on ALPHA GA",
/// issued by NULL Call Option for underlying company ALPHA GA
/// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionInfo {
pub underlying_company_name: String, // key in CompanyInfo, key for OptionInfo
pub issuer_company_name: Option<String>, // key in CompanyInfo
pub option_type: String, // "put" or "call"
pub options: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -4,9 +4,8 @@ use crate::config::Config;
use crate::scraper::webdriver::ChromeDriverPool;
use chrono::Local;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap};
use std::sync::Arc;
use futures::{stream::{self, StreamExt}};
/// Main function: Full update for all companies (LEI-based) with optimized parallel execution.
///
@@ -54,8 +53,8 @@ pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> a
};
// 4. Load or build companies
let mut companies = load_or_build_companies_by_name(&figi_to_lei).await?;
println!("Processing {} companies", companies.len());
let mut companies = load_or_build_all_securities(&figi_to_lei).await?;
println!("Processing {} companies", companies.0.len());
// 5. Load existing earnings events (for change detection)
let today = Local::now().format("%Y-%m-%d").to_string();
@@ -96,35 +95,6 @@ pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> a
Ok(())
}
async fn assign_leis_from_figi(
companies: &mut [CompanyMetadata],
lei_to_isins: &HashMap<String, Vec<String>>
) -> anyhow::Result<()> {
for company in companies {
let figi_infos = company.figi.as_ref().map_or(&[][..], |v| &v[..]);
let isins: Vec<String> = figi_infos
.iter()
.map(|f| f.isin.clone())
.collect::<HashSet<_>>()
.into_iter()
.collect();
// Try to find LEI by any known ISIN
for isin in &isins {
for (lei, isins) in lei_to_isins.iter() {
if isins.contains(isin) {
company.lei = lei.clone();
let name = figi_infos.first().map(|f| f.name.as_str()).unwrap_or("Unknown");
println!("Found real LEI {} for {}", lei, name);
break;
}
}
if !company.lei.is_empty() { break; }
}
}
Ok(())
}
pub struct ProcessResult {
pub changes: Vec<CompanyEventChange>,
}