From 0f89c8c0ce0ad8a9b0eecb9c096414a61b93b2ee Mon Sep 17 00:00:00 2001 From: donpat1to Date: Sun, 7 Dec 2025 14:49:25 +0100 Subject: [PATCH] updated cache saving --- cache/openfigi/INFO.md | 15 ++ src/corporate/openfigi.rs | 321 +++++++++++++++++++++++++++++--------- src/corporate/storage.rs | 2 +- 3 files changed, 260 insertions(+), 78 deletions(-) create mode 100644 cache/openfigi/INFO.md diff --git a/cache/openfigi/INFO.md b/cache/openfigi/INFO.md new file mode 100644 index 0000000..02ff346 --- /dev/null +++ b/cache/openfigi/INFO.md @@ -0,0 +1,15 @@ +# Openfigi Data + +## Market Security Description +| Code | Meaning | +| ---------- | --------------------------------------------------------- | +| **Comdty** | Commodity (e.g., oil, gold futures, physical commodities) | +| **Corp** | Corporate bond / corporate debt security | +| **Curncy** | Currency or FX pair (e.g., EURUSD) | +| **Equity** | Stocks / shares | +| **Govt** | Government bond (Treasuries, Bunds, Gilts, etc.) | +| **Index** | Market indices (S&P 500, DAX, NYSE Composite…) | +| **M-Mkt** | Money market instruments (commercial paper, CDs, T-bills) | +| **Mtge** | Mortgage-backed securities (MBS) | +| **Muni** | Municipal bonds (US state/local government debt) | +| **Pfd** | Preferred shares | \ No newline at end of file diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index 69bd810..5119e22 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -6,9 +6,7 @@ use reqwest::Client as HttpClient; use reqwest::header::{HeaderMap, HeaderValue}; use serde_json::{json, Value}; use std::collections::{HashMap, HashSet}; -use std::fs::File; -use std::io::Write; -use std::path::{Path, PathBuf}; +use std::path::{Path}; use std::time::Instant; use tokio::time::{sleep, Duration}; use tokio::fs as tokio_fs; @@ -110,47 +108,76 @@ impl OpenFigiClient { })) .collect(); - let resp = self.client - .post("https://api.openfigi.com/v3/mapping") - .header("Content-Type", "application/json") - .json(&jobs) - .send() - .await - .context("Failed to send mapping request")?; + // Retry logic with exponential backoff for transient failures + let mut retry_count = 0; + let max_retries = 5; + let mut backoff_ms = 1000u64; + + loop { + let resp_result = self.client + .post("https://api.openfigi.com/v3/mapping") + .header("Content-Type", "application/json") + .json(&jobs) + .send() + .await; - let status = resp.status(); - let headers = resp.headers().clone(); - let body = resp.text().await.context("Failed to read response body")?; + let resp = match resp_result { + Ok(r) => r, + Err(e) => { + retry_count += 1; + if retry_count >= max_retries { + return Err(anyhow!("Failed to send mapping request after {} retries: {}", max_retries, e)); + } + eprintln!("Transient error sending mapping request (attempt {}/{}): {}", retry_count, max_retries, e); + println!(" Retrying in {}ms...", backoff_ms); + sleep(Duration::from_millis(backoff_ms)).await; + backoff_ms = (backoff_ms * 2).min(60000); // Cap at 60s + continue; + } + }; - if status.is_client_error() || status.is_server_error() { - if status == 429 { - let reset_sec = headers - .get("ratelimit-reset") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(10); - println!("Rate limited—backing off {}s", reset_sec); - sleep(Duration::from_secs(reset_sec.max(10))).await; - continue; // Retry the same chunk - } else if status == 401 { - return Err(anyhow!("Invalid OpenFIGI API key: {}", body)); - } else if status == 413 { - return Err(anyhow!("Payload too large—reduce chunk size: {}", body)); + let status = resp.status(); + let headers = resp.headers().clone(); + let body = resp.text().await.context("Failed to read response body")?; + + if status.is_client_error() || status.is_server_error() { + if status == 429 { + let reset_sec = headers + .get("ratelimit-reset") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(10); + println!("Rate limited—backing off {}s", reset_sec); + sleep(Duration::from_secs(reset_sec.max(10))).await; + continue; // Retry the same chunk + } else if status == 401 { + return Err(anyhow!("Invalid OpenFIGI API key: {}", body)); + } else if status == 413 { + return Err(anyhow!("Payload too large—reduce chunk size: {}", body)); + } else if status.is_server_error() { + // Transient server error, retry with backoff + retry_count += 1; + if retry_count >= max_retries { + return Err(anyhow!("OpenFIGI server error {} after {} retries: {}", status, max_retries, body)); + } + eprintln!("Server error {} (attempt {}/{}), retrying in {}ms...", status, retry_count, max_retries, backoff_ms); + sleep(Duration::from_millis(backoff_ms)).await; + backoff_ms = (backoff_ms * 2).min(60000); + continue; + } + return Err(anyhow!("OpenFIGI error {}: {}", status, body)); } - return Err(anyhow!("OpenFIGI error {}: {}", status, body)); - } - let results: Vec = serde_json::from_str(&body) - .context("Failed to parse response JSON")?; + let results: Vec = serde_json::from_str(&body) + .context("Failed to parse response JSON")?; - for (isin, result) in chunk.iter().zip(results) { - if let Some(data) = result["data"].as_array() { - for item in data { - let sec_type = item["securityType"].as_str().unwrap_or(""); - let market_sec = item["marketSector"].as_str().unwrap_or(""); - if market_sec == "Equity" && - (sec_type.contains("Stock") || sec_type.contains("Share") || sec_type.contains("Equity") || - sec_type.contains("Common") || sec_type.contains("Preferred") || sec_type == "ADR" || sec_type == "GDR") { + for (isin, result) in chunk.iter().zip(results) { + if let Some(data) = result["data"].as_array() { + for item in data { + let sec_type = item["securityType"].as_str().unwrap_or(""); + let market_sec = item["marketSector"].as_str().unwrap_or(""); + + // Capture all security types, let caller filter by market sector if needed let figi = match item["figi"].as_str() { Some(f) => f.to_string(), None => continue, @@ -161,7 +188,7 @@ impl OpenFigiClient { figi, name: item["name"].as_str().unwrap_or("").to_string(), ticker: item["ticker"].as_str().unwrap_or("").to_string(), - exch_code: item["micCode"].as_str().unwrap_or("").to_string(), + exch_code: item["exchCode"].as_str().unwrap_or("").to_string(), compositeFIGI: item["compositeFIGI"].as_str().unwrap_or("").to_string(), securityType: sec_type.to_string(), marketSector: market_sec.to_string(), @@ -174,6 +201,9 @@ impl OpenFigiClient { } } } + + // Successfully processed this chunk, break out of retry loop + break; } req_count += 1; @@ -218,6 +248,54 @@ fn extract_gleif_date_from_filename(filename: &str) -> String { filename.to_string() } +/// Loads the list of market sectors from cache/openfigi/marketSecDes.json +/// +/// # Returns +/// +/// Vec of market sector strings (e.g., ["Comdty", "Corp", "Curncy", "Equity", ...]) +/// If the file doesn't exist or can't be parsed, returns a sensible default list. +async fn load_market_sectors() -> anyhow::Result> { + let dir = DataPaths::new(".")?; + let cache_file = dir.cache_openfigi_dir().join("marketSecDes.json"); + + if !cache_file.exists() { + // Return default if file doesn't exist + eprintln!("Warning: {} not found, using default sectors", cache_file.display()); + return Ok(vec![ + "Comdty".to_string(), + "Corp".to_string(), + "Curncy".to_string(), + "Equity".to_string(), + "Govt".to_string(), + "Index".to_string(), + "M-Mkt".to_string(), + "Mtge".to_string(), + "Muni".to_string(), + "Pfd".to_string(), + ]); + } + + let content = tokio_fs::read_to_string(&cache_file).await + .context("Failed to read marketSecDes.json")?; + + let json: Value = serde_json::from_str(&content) + .context("Failed to parse marketSecDes.json")?; + + let sectors: Vec = json["values"] + .as_array() + .ok_or_else(|| anyhow!("'values' field not found in marketSecDes.json"))? + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect(); + + if sectors.is_empty() { + return Err(anyhow!("No sectors found in marketSecDes.json")); + } + + println!("Loaded {} market sectors from cache", sectors.len()); + Ok(sectors) +} + /// Finds the most recent GLEIF CSV file in the cache/gleif directory. /// /// Returns the extracted date in format "DDMMYYYY" from the filename. @@ -254,20 +332,11 @@ async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result anyhow::Result>, gleif_date: Option<&str>) -> anyhow::Result>> { + let mut retry_count = 0; + let max_retries = 3; + + loop { + match build_lei_to_figi_infos_internal(lei_to_isins, gleif_date).await { + Ok(map) => { + if !map.is_empty() { + println!("✓ LEI→FIGI mapping completed successfully with {} entries", map.len()); + } + return Ok(map); + } + Err(e) => { + let error_msg = e.to_string(); + + // Check if this is a fatal error or transient + let is_fatal = error_msg.contains("Invalid OpenFIGI API key") + || error_msg.contains("No GLEIF CSV file found") + || error_msg.contains("Failed to create"); + + if is_fatal { + eprintln!("Fatal error in LEI→FIGI mapping: {}", e); + return Err(e); + } + + retry_count += 1; + if retry_count >= max_retries { + eprintln!("LEI→FIGI mapping failed after {} retries: {}", max_retries, e); + return Err(e); + } + + let wait_secs = 60 * retry_count; + eprintln!("Transient error in LEI→FIGI mapping (attempt {}/{}): {}", retry_count, max_retries, e); + println!("Retrying mapping in {}s...", wait_secs); + sleep(Duration::from_secs(wait_secs as u64)).await; + } + } + } +} + +/// Internal implementation of LEI-to-FigiInfo mapping. +/// +/// This is the actual worker function that performs the mapping. It handles already-processed +/// LEIs gracefully but will fail on transient errors, which are caught and retried by the +/// wrapper function build_lei_to_figi_infos. +async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap>, gleif_date: Option<&str>) -> anyhow::Result>> { let dir = DataPaths::new(".")?; let gleif_cache_dir = dir.cache_gleif_dir(); let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); @@ -302,26 +416,46 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap> let date_dir = map_cache_dir.join(&date); tokio_fs::create_dir_all(&date_dir).await.context("Failed to create date directory")?; - let path = date_dir.join("lei_to_figi.jsonl"); - println!("Using LEI→FIGI mapping at: {}", path.display()); - let mut lei_to_figis: HashMap> = load_lei_to_figi_jsonl(&path).await?; + // Load market sectors dynamically from cache + let sector_dirs = load_market_sectors().await?; + let mut sector_maps: HashMap>> = HashMap::new(); + + for sector in §or_dirs { + let sector_dir = date_dir.join(sector); + tokio_fs::create_dir_all(§or_dir).await.context("Failed to create sector directory")?; + + // Load existing mappings for this sector + let path = sector_dir.join("lei_to_figi.jsonl"); + let lei_map = load_lei_to_figi_jsonl(&path).await?; + sector_maps.insert(sector.clone(), lei_map); + } let client = OpenFigiClient::new()?; if !client.has_key { - println!("No API key—using partial LEI→FIGI map with {} entries", lei_to_figis.len()); - return Ok(lei_to_figis); + let total_entries: usize = sector_maps.values().map(|m| m.len()).sum(); + println!("No API key—using partial LEI→FIGI maps with {} total entries", total_entries); + return Ok(sector_maps.get("Equity").cloned().unwrap_or_default()); } // Sort LEIs for deterministic processing order let mut leis: Vec<_> = lei_to_isins.keys().cloned().collect(); leis.sort(); - let mut processed = lei_to_figis.len(); + let mut processed = sector_maps.values().map(|m| m.len()).sum::(); let total = leis.len(); for lei in leis { - if lei_to_figis.contains_key(&lei) { - continue; // Skip already processed + // Check if LEI is already processed in any sector + let mut already_processed = false; + for sector_map in sector_maps.values() { + if sector_map.contains_key(&lei) { + already_processed = true; + break; + } + } + + if already_processed { + continue; } let isins = match lei_to_isins.get(&lei) { @@ -330,30 +464,63 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap> }; let unique_isins: Vec<_> = isins.iter().cloned().collect::>().into_iter().collect(); - let equity_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; + let all_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; - let mut figis = equity_figi_infos; - if !figis.is_empty() { - figis.sort_by_key(|f| f.figi.clone()); - figis.dedup_by_key(|f| f.figi.clone()); + // Organize results by marketSector + let mut figis_by_sector: HashMap> = HashMap::new(); + + for figi_info in all_figi_infos { + let sector = figi_info.marketSector.clone(); + if sector.is_empty() { + continue; // Skip if no sector + } + + figis_by_sector.entry(sector).or_insert_with(Vec::new).push(figi_info); } - // Append to .jsonl incrementally - append_lei_to_figi_jsonl(&path, &lei, &figis).await.context("Failed to append to JSONL")?; - - // Insert into in-memory map - lei_to_figis.insert(lei.clone(), figis); + // Save to appropriate sector files + for (sector, mut figis) in figis_by_sector { + if !figis.is_empty() { + figis.sort_by_key(|f| f.figi.clone()); + figis.dedup_by_key(|f| f.figi.clone()); + + // Save to sector's JSONL file + let sector_dir = date_dir.join(§or); + let path = sector_dir.join("lei_to_figi.jsonl"); + append_lei_to_figi_jsonl(&path, &lei, &figis).await.context("Failed to append to JSONL")?; + + // Update in-memory sector map + if let Some(sector_map) = sector_maps.get_mut(§or) { + sector_map.insert(lei.clone(), figis); + } + } + } processed += 1; if processed % 100 == 0 { - println!("Processed {}/{} LEIs → {} total equity FIGIs", processed, total, lei_to_figis.values().map(|v| v.len()).sum::()); + let totals: Vec = sector_dirs.iter().map(|s| { + let count = sector_maps.get(s).map(|m| m.len()).unwrap_or(0); + format!("{}:{}", s, count) + }).collect(); + println!("Processed {}/{} LEIs → [{}]", processed, total, totals.join(", ")); } tokio::time::sleep(Duration::from_millis(100)).await; } - - println!("Completed LEI→FIGI map: {} mappings (equity-only)", lei_to_figis.len()); - Ok(lei_to_figis) + + // Print final summary + println!("\n=== LEI→FIGI Mapping Complete ==="); + for sector in §or_dirs { + if let Some(sector_map) = sector_maps.get(sector) { + let total_figis: usize = sector_map.values().map(|v| v.len()).sum(); + if total_figis > 0 { + println!("{}: {} LEIs, {} FIGIs", sector, sector_map.len(), total_figis); + } + } + } + + // Return Equity sector as the main result + Ok(sector_maps.get("Equity").cloned().unwrap_or_default()) } /// Loads LEI-to-FigiInfo map from a JSON Lines file. diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index d0bdb38..bc623ed 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -6,7 +6,7 @@ use crate::util::logger; use tokio::fs; use tokio::io::AsyncWriteExt; use chrono::{Datelike, NaiveDate}; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap}; use std::path::{PathBuf}; pub async fn load_existing_events(paths: &DataPaths) -> anyhow::Result> {