updated cache saving

This commit is contained in:
2025-12-07 14:49:25 +01:00
parent a6823dc938
commit 0f89c8c0ce
3 changed files with 260 additions and 78 deletions

15
cache/openfigi/INFO.md vendored Normal file
View File

@@ -0,0 +1,15 @@
# Openfigi Data
## Market Security Description
| Code | Meaning |
| ---------- | --------------------------------------------------------- |
| **Comdty** | Commodity (e.g., oil, gold futures, physical commodities) |
| **Corp** | Corporate bond / corporate debt security |
| **Curncy** | Currency or FX pair (e.g., EURUSD) |
| **Equity** | Stocks / shares |
| **Govt** | Government bond (Treasuries, Bunds, Gilts, etc.) |
| **Index** | Market indices (S&P 500, DAX, NYSE Composite…) |
| **M-Mkt** | Money market instruments (commercial paper, CDs, T-bills) |
| **Mtge** | Mortgage-backed securities (MBS) |
| **Muni** | Municipal bonds (US state/local government debt) |
| **Pfd** | Preferred shares |

View File

@@ -6,9 +6,7 @@ use reqwest::Client as HttpClient;
use reqwest::header::{HeaderMap, HeaderValue};
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::path::{Path};
use std::time::Instant;
use tokio::time::{sleep, Duration};
use tokio::fs as tokio_fs;
@@ -110,47 +108,76 @@ impl OpenFigiClient {
}))
.collect();
let resp = self.client
.post("https://api.openfigi.com/v3/mapping")
.header("Content-Type", "application/json")
.json(&jobs)
.send()
.await
.context("Failed to send mapping request")?;
// Retry logic with exponential backoff for transient failures
let mut retry_count = 0;
let max_retries = 5;
let mut backoff_ms = 1000u64;
let status = resp.status();
let headers = resp.headers().clone();
let body = resp.text().await.context("Failed to read response body")?;
loop {
let resp_result = self.client
.post("https://api.openfigi.com/v3/mapping")
.header("Content-Type", "application/json")
.json(&jobs)
.send()
.await;
if status.is_client_error() || status.is_server_error() {
if status == 429 {
let reset_sec = headers
.get("ratelimit-reset")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(10);
println!("Rate limited—backing off {}s", reset_sec);
sleep(Duration::from_secs(reset_sec.max(10))).await;
continue; // Retry the same chunk
} else if status == 401 {
return Err(anyhow!("Invalid OpenFIGI API key: {}", body));
} else if status == 413 {
return Err(anyhow!("Payload too large—reduce chunk size: {}", body));
let resp = match resp_result {
Ok(r) => r,
Err(e) => {
retry_count += 1;
if retry_count >= max_retries {
return Err(anyhow!("Failed to send mapping request after {} retries: {}", max_retries, e));
}
eprintln!("Transient error sending mapping request (attempt {}/{}): {}", retry_count, max_retries, e);
println!(" Retrying in {}ms...", backoff_ms);
sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(60000); // Cap at 60s
continue;
}
};
let status = resp.status();
let headers = resp.headers().clone();
let body = resp.text().await.context("Failed to read response body")?;
if status.is_client_error() || status.is_server_error() {
if status == 429 {
let reset_sec = headers
.get("ratelimit-reset")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(10);
println!("Rate limited—backing off {}s", reset_sec);
sleep(Duration::from_secs(reset_sec.max(10))).await;
continue; // Retry the same chunk
} else if status == 401 {
return Err(anyhow!("Invalid OpenFIGI API key: {}", body));
} else if status == 413 {
return Err(anyhow!("Payload too large—reduce chunk size: {}", body));
} else if status.is_server_error() {
// Transient server error, retry with backoff
retry_count += 1;
if retry_count >= max_retries {
return Err(anyhow!("OpenFIGI server error {} after {} retries: {}", status, max_retries, body));
}
eprintln!("Server error {} (attempt {}/{}), retrying in {}ms...", status, retry_count, max_retries, backoff_ms);
sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(60000);
continue;
}
return Err(anyhow!("OpenFIGI error {}: {}", status, body));
}
return Err(anyhow!("OpenFIGI error {}: {}", status, body));
}
let results: Vec<Value> = serde_json::from_str(&body)
.context("Failed to parse response JSON")?;
let results: Vec<Value> = serde_json::from_str(&body)
.context("Failed to parse response JSON")?;
for (isin, result) in chunk.iter().zip(results) {
if let Some(data) = result["data"].as_array() {
for item in data {
let sec_type = item["securityType"].as_str().unwrap_or("");
let market_sec = item["marketSector"].as_str().unwrap_or("");
if market_sec == "Equity" &&
(sec_type.contains("Stock") || sec_type.contains("Share") || sec_type.contains("Equity") ||
sec_type.contains("Common") || sec_type.contains("Preferred") || sec_type == "ADR" || sec_type == "GDR") {
for (isin, result) in chunk.iter().zip(results) {
if let Some(data) = result["data"].as_array() {
for item in data {
let sec_type = item["securityType"].as_str().unwrap_or("");
let market_sec = item["marketSector"].as_str().unwrap_or("");
// Capture all security types, let caller filter by market sector if needed
let figi = match item["figi"].as_str() {
Some(f) => f.to_string(),
None => continue,
@@ -161,7 +188,7 @@ impl OpenFigiClient {
figi,
name: item["name"].as_str().unwrap_or("").to_string(),
ticker: item["ticker"].as_str().unwrap_or("").to_string(),
exch_code: item["micCode"].as_str().unwrap_or("").to_string(),
exch_code: item["exchCode"].as_str().unwrap_or("").to_string(),
compositeFIGI: item["compositeFIGI"].as_str().unwrap_or("").to_string(),
securityType: sec_type.to_string(),
marketSector: market_sec.to_string(),
@@ -174,6 +201,9 @@ impl OpenFigiClient {
}
}
}
// Successfully processed this chunk, break out of retry loop
break;
}
req_count += 1;
@@ -218,6 +248,54 @@ fn extract_gleif_date_from_filename(filename: &str) -> String {
filename.to_string()
}
/// Loads the list of market sectors from cache/openfigi/marketSecDes.json
///
/// # Returns
///
/// Vec of market sector strings (e.g., ["Comdty", "Corp", "Curncy", "Equity", ...])
/// If the file doesn't exist or can't be parsed, returns a sensible default list.
async fn load_market_sectors() -> anyhow::Result<Vec<String>> {
let dir = DataPaths::new(".")?;
let cache_file = dir.cache_openfigi_dir().join("marketSecDes.json");
if !cache_file.exists() {
// Return default if file doesn't exist
eprintln!("Warning: {} not found, using default sectors", cache_file.display());
return Ok(vec![
"Comdty".to_string(),
"Corp".to_string(),
"Curncy".to_string(),
"Equity".to_string(),
"Govt".to_string(),
"Index".to_string(),
"M-Mkt".to_string(),
"Mtge".to_string(),
"Muni".to_string(),
"Pfd".to_string(),
]);
}
let content = tokio_fs::read_to_string(&cache_file).await
.context("Failed to read marketSecDes.json")?;
let json: Value = serde_json::from_str(&content)
.context("Failed to parse marketSecDes.json")?;
let sectors: Vec<String> = json["values"]
.as_array()
.ok_or_else(|| anyhow!("'values' field not found in marketSecDes.json"))?
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
if sectors.is_empty() {
return Err(anyhow!("No sectors found in marketSecDes.json"));
}
println!("Loaded {} market sectors from cache", sectors.len());
Ok(sectors)
}
/// Finds the most recent GLEIF CSV file in the cache/gleif directory.
///
/// Returns the extracted date in format "DDMMYYYY" from the filename.
@@ -254,20 +332,11 @@ async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result<O
Ok(Some(date))
}
/// Builds a LEI-to-FigiInfo map from the LEI-ISIN mapping, filtering for equities via OpenFIGI.
/// Builds a LEI-to-FigiInfo map with automatic retry on transient failures.
///
/// The mapping is stored in cache/glei_openfigi/{GLEIF_DATE}/lei_to_figi.jsonl, where GLEIF_DATE
/// is extracted from the GLEIF CSV filename (format: DDMMYYYY). If no specific date is provided,
/// the most recent GLEIF file in cache/gleif is used.
///
/// Attempts to load existing entries from the date-based jsonl file (JSON Lines format,
/// one LEI entry per line: {"lei": "ABC", "figis": [FigiInfo...]}). For any missing LEIs (compared to
/// `lei_to_isins`), fetches their FigiInfos and appends to the .jsonl file incrementally.
///
/// This design allows resumption after interruptions: on restart, already processed LEIs are skipped,
/// and only remaining ones are fetched. Processes LEIs in sorted order for deterministic behavior.
///
/// If no API key is present, skips building new entries and returns the loaded map (possibly partial).
/// This is a wrapper around build_lei_to_figi_infos_internal that handles transient errors
/// by automatically retrying after a delay if the mapping process fails. The mapping can
/// resume from where it left off since already-processed LEIs are saved incrementally.
///
/// # Arguments
///
@@ -280,9 +349,54 @@ async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result<O
///
/// # Errors
///
/// Returns an error if file I/O fails, JSON serialization/deserialization fails,
/// or if OpenFIGI queries fail during fetching.
/// Returns an error only on fatal errors (file I/O, invalid API key, etc.).
/// Transient errors are retried automatically.
pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>, gleif_date: Option<&str>) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
let mut retry_count = 0;
let max_retries = 3;
loop {
match build_lei_to_figi_infos_internal(lei_to_isins, gleif_date).await {
Ok(map) => {
if !map.is_empty() {
println!("✓ LEI→FIGI mapping completed successfully with {} entries", map.len());
}
return Ok(map);
}
Err(e) => {
let error_msg = e.to_string();
// Check if this is a fatal error or transient
let is_fatal = error_msg.contains("Invalid OpenFIGI API key")
|| error_msg.contains("No GLEIF CSV file found")
|| error_msg.contains("Failed to create");
if is_fatal {
eprintln!("Fatal error in LEI→FIGI mapping: {}", e);
return Err(e);
}
retry_count += 1;
if retry_count >= max_retries {
eprintln!("LEI→FIGI mapping failed after {} retries: {}", max_retries, e);
return Err(e);
}
let wait_secs = 60 * retry_count;
eprintln!("Transient error in LEI→FIGI mapping (attempt {}/{}): {}", retry_count, max_retries, e);
println!("Retrying mapping in {}s...", wait_secs);
sleep(Duration::from_secs(wait_secs as u64)).await;
}
}
}
}
/// Internal implementation of LEI-to-FigiInfo mapping.
///
/// This is the actual worker function that performs the mapping. It handles already-processed
/// LEIs gracefully but will fail on transient errors, which are caught and retried by the
/// wrapper function build_lei_to_figi_infos.
async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap<String, Vec<String>>, gleif_date: Option<&str>) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
let dir = DataPaths::new(".")?;
let gleif_cache_dir = dir.cache_gleif_dir();
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
@@ -302,26 +416,46 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>
let date_dir = map_cache_dir.join(&date);
tokio_fs::create_dir_all(&date_dir).await.context("Failed to create date directory")?;
let path = date_dir.join("lei_to_figi.jsonl");
println!("Using LEI→FIGI mapping at: {}", path.display());
let mut lei_to_figis: HashMap<String, Vec<FigiInfo>> = load_lei_to_figi_jsonl(&path).await?;
// Load market sectors dynamically from cache
let sector_dirs = load_market_sectors().await?;
let mut sector_maps: HashMap<String, HashMap<String, Vec<FigiInfo>>> = HashMap::new();
for sector in &sector_dirs {
let sector_dir = date_dir.join(sector);
tokio_fs::create_dir_all(&sector_dir).await.context("Failed to create sector directory")?;
// Load existing mappings for this sector
let path = sector_dir.join("lei_to_figi.jsonl");
let lei_map = load_lei_to_figi_jsonl(&path).await?;
sector_maps.insert(sector.clone(), lei_map);
}
let client = OpenFigiClient::new()?;
if !client.has_key {
println!("No API key—using partial LEI→FIGI map with {} entries", lei_to_figis.len());
return Ok(lei_to_figis);
let total_entries: usize = sector_maps.values().map(|m| m.len()).sum();
println!("No API key—using partial LEI→FIGI maps with {} total entries", total_entries);
return Ok(sector_maps.get("Equity").cloned().unwrap_or_default());
}
// Sort LEIs for deterministic processing order
let mut leis: Vec<_> = lei_to_isins.keys().cloned().collect();
leis.sort();
let mut processed = lei_to_figis.len();
let mut processed = sector_maps.values().map(|m| m.len()).sum::<usize>();
let total = leis.len();
for lei in leis {
if lei_to_figis.contains_key(&lei) {
continue; // Skip already processed
// Check if LEI is already processed in any sector
let mut already_processed = false;
for sector_map in sector_maps.values() {
if sector_map.contains_key(&lei) {
already_processed = true;
break;
}
}
if already_processed {
continue;
}
let isins = match lei_to_isins.get(&lei) {
@@ -330,30 +464,63 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>
};
let unique_isins: Vec<_> = isins.iter().cloned().collect::<HashSet<_>>().into_iter().collect();
let equity_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?;
let all_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?;
let mut figis = equity_figi_infos;
if !figis.is_empty() {
figis.sort_by_key(|f| f.figi.clone());
figis.dedup_by_key(|f| f.figi.clone());
// Organize results by marketSector
let mut figis_by_sector: HashMap<String, Vec<FigiInfo>> = HashMap::new();
for figi_info in all_figi_infos {
let sector = figi_info.marketSector.clone();
if sector.is_empty() {
continue; // Skip if no sector
}
figis_by_sector.entry(sector).or_insert_with(Vec::new).push(figi_info);
}
// Append to .jsonl incrementally
append_lei_to_figi_jsonl(&path, &lei, &figis).await.context("Failed to append to JSONL")?;
// Save to appropriate sector files
for (sector, mut figis) in figis_by_sector {
if !figis.is_empty() {
figis.sort_by_key(|f| f.figi.clone());
figis.dedup_by_key(|f| f.figi.clone());
// Insert into in-memory map
lei_to_figis.insert(lei.clone(), figis);
// Save to sector's JSONL file
let sector_dir = date_dir.join(&sector);
let path = sector_dir.join("lei_to_figi.jsonl");
append_lei_to_figi_jsonl(&path, &lei, &figis).await.context("Failed to append to JSONL")?;
// Update in-memory sector map
if let Some(sector_map) = sector_maps.get_mut(&sector) {
sector_map.insert(lei.clone(), figis);
}
}
}
processed += 1;
if processed % 100 == 0 {
println!("Processed {}/{} LEIs → {} total equity FIGIs", processed, total, lei_to_figis.values().map(|v| v.len()).sum::<usize>());
let totals: Vec<String> = sector_dirs.iter().map(|s| {
let count = sector_maps.get(s).map(|m| m.len()).unwrap_or(0);
format!("{}:{}", s, count)
}).collect();
println!("Processed {}/{} LEIs → [{}]", processed, total, totals.join(", "));
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
println!("Completed LEI→FIGI map: {} mappings (equity-only)", lei_to_figis.len());
Ok(lei_to_figis)
// Print final summary
println!("\n=== LEI→FIGI Mapping Complete ===");
for sector in &sector_dirs {
if let Some(sector_map) = sector_maps.get(sector) {
let total_figis: usize = sector_map.values().map(|v| v.len()).sum();
if total_figis > 0 {
println!("{}: {} LEIs, {} FIGIs", sector, sector_map.len(), total_figis);
}
}
}
// Return Equity sector as the main result
Ok(sector_maps.get("Equity").cloned().unwrap_or_default())
}
/// Loads LEI-to-FigiInfo map from a JSON Lines file.

View File

@@ -6,7 +6,7 @@ use crate::util::logger;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use chrono::{Datelike, NaiveDate};
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap};
use std::path::{PathBuf};
pub async fn load_existing_events(paths: &DataPaths) -> anyhow::Result<HashMap<String, CompanyEvent>> {