working api calls

This commit is contained in:
2025-12-02 17:10:34 +01:00
parent de875a3ebe
commit 95fd9ca141
6 changed files with 1104 additions and 323 deletions

View File

@@ -4,8 +4,13 @@ use reqwest::Client as HttpClient;
use reqwest::header::{HeaderMap, HeaderValue};
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::Path;
use std::time::Instant;
use tokio::time::{sleep, Duration};
use anyhow::Context;
use tokio::fs as tokio_fs;
use anyhow::{Context, anyhow};
#[derive(Clone)]
pub struct OpenFigiClient {
@@ -15,6 +20,13 @@ pub struct OpenFigiClient {
}
impl OpenFigiClient {
/// Creates a new OpenFIGI client, optionally with an API key.
///
/// Loads the API key from the `OPENFIGI_API_KEY` environment variable if present.
///
/// # Errors
///
/// Returns an error if the HTTP client cannot be built or if the API key header is invalid.
pub fn new() -> anyhow::Result<Self> {
let api_key = dotenvy::var("OPENFIGI_API_KEY").ok();
let has_key = api_key.is_some();
@@ -39,19 +51,60 @@ impl OpenFigiClient {
Ok(Self { client, api_key, has_key })
}
/// Batch-map ISINs to FIGI, filtering equities only
pub async fn map_isins_to_figi(&self, isins: &[String]) -> anyhow::Result<Vec<String>> {
if isins.is_empty() { return Ok(vec![]); }
/// Maps a batch of ISINs to FigiInfo structs, filtering for equities only.
///
/// Batches requests according to rate limits (100 jobs/req with key, 5 without).
/// Optimizes inter-request delays to approach the rate limit without exceeding it:
/// - With key: ~240ms sleep per request (to sustain ~4 req/sec or 250 req/min).
/// - Without key: 2.4s sleep (to sustain 25 req/min).
/// Handles 429 rate limits with header-based backoff.
/// Collects detailed FigiInfo from responses, using `exchCode` as proxy for `mic_code`.
///
/// # Arguments
///
/// * `isins` - Slice of ISIN strings to map (deduplicated internally if needed).
///
/// # Returns
///
/// A vector of `FigiInfo` structs for equity instruments.
///
/// # Errors
///
/// Returns an error on HTTP failures, JSON parsing issues, invalid API keys,
/// or repeated rate limit violations after backoff.
///
/// # Examples
///
/// ```no_run
/// # use anyhow::Result;
/// # async fn example(client: &OpenFigiClient) -> Result<()> {
/// let isins = vec!["US0378331005".to_string(), "US5949181045".to_string()];
/// let figis = client.map_isins_to_figi_infos(&isins).await?;
/// # Ok(())
/// # }
/// ```
pub async fn map_isins_to_figi_infos(&self, isins: &[String]) -> anyhow::Result<Vec<FigiInfo>> {
if isins.is_empty() {
return Ok(vec![]);
}
let mut all_figis = Vec::new();
let mut all_figi_infos = Vec::new();
let chunk_size = if self.has_key { 100 } else { 5 };
let inter_sleep = if self.has_key {
Duration::from_millis(240) // ~4.16 req/sec (250/min)
} else {
Duration::from_millis(2400) // 25/min
};
let start_time = Instant::now();
let mut req_count = 0;
for chunk in isins.chunks(chunk_size) {
let jobs: Vec<Value> = chunk.iter()
.map(|isin| json!({
"idType": "ID_ISIN",
"idValue": isin,
"marketSecDes": "Equity", // Pre-filter to equities
"marketSecDes": "Equity",
}))
.collect();
@@ -60,36 +113,35 @@ impl OpenFigiClient {
.header("Content-Type", "application/json")
.json(&jobs)
.send()
.await?;
.await
.context("Failed to send mapping request")?;
let status = resp.status();
let headers = resp.headers().clone();
let body = resp.text().await.unwrap_or_default();
let body = resp.text().await.context("Failed to read response body")?;
if status.is_client_error() || status.is_server_error() {
if status == 401 {
return Err(anyhow::anyhow!("Invalid OpenFIGI API key: {}", body));
} else if status == 413 {
return Err(anyhow::anyhow!("Payload too large—reduce chunk size: {}", body));
} else if status == 429 {
let reset = headers
if status == 429 {
let reset_sec = headers
.get("ratelimit-reset")
.and_then(|v| v.to_str().ok())
.unwrap_or("10")
.parse::<u64>()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(10);
println!("Rate limited—backing off {}s", reset);
sleep(Duration::from_secs(reset.max(10))).await;
continue;
println!("Rate limited—backing off {}s", reset_sec);
sleep(Duration::from_secs(reset_sec.max(10))).await;
continue; // Retry the same chunk
} else if status == 401 {
return Err(anyhow!("Invalid OpenFIGI API key: {}", body));
} else if status == 413 {
return Err(anyhow!("Payload too large—reduce chunk size: {}", body));
}
return Err(anyhow::anyhow!("OpenFIGI error {}: {}", status, body));
return Err(anyhow!("OpenFIGI error {}: {}", status, body));
}
// JSON aus dem *Body-String* parsen
let results: Vec<Value> = serde_json::from_str(&body)?;
for (job, result) in chunk.iter().zip(results) {
let results: Vec<Value> = serde_json::from_str(&body)
.context("Failed to parse response JSON")?;
for (isin, result) in chunk.iter().zip(results) {
if let Some(data) = result["data"].as_array() {
for item in data {
let sec_type = item["securityType"].as_str().unwrap_or("");
@@ -97,76 +149,347 @@ impl OpenFigiClient {
if market_sec == "Equity" &&
(sec_type.contains("Stock") || sec_type.contains("Share") || sec_type.contains("Equity") ||
sec_type.contains("Common") || sec_type.contains("Preferred") || sec_type == "ADR" || sec_type == "GDR") {
if let Some(figi) = item["figi"].as_str() {
all_figis.push(figi.to_string());
}
let figi = match item["figi"].as_str() {
Some(f) => f.to_string(),
None => continue,
};
let figi_info = FigiInfo {
isin: isin.clone(),
figi,
name: item["name"].as_str().unwrap_or("").to_string(),
ticker: item["ticker"].as_str().unwrap_or("").to_string(),
mic_code: item["exchCode"].as_str().unwrap_or("").to_string(),
currency: item["currency"].as_str().unwrap_or("").to_string(),
compositeFIGI: item["compositeFIGI"].as_str().unwrap_or("").to_string(),
securityType: sec_type.to_string(),
marketSector: market_sec.to_string(),
shareClassFIGI: item["shareClassFIGI"].as_str().unwrap_or("").to_string(),
securityType2: item["securityType2"].as_str().unwrap_or("").to_string(),
securityDescription: item["securityDescription"].as_str().unwrap_or("").to_string(),
};
all_figi_infos.push(figi_info);
}
}
}
}
// Rate limit respect: 6s between requests with key
if self.has_key {
sleep(Duration::from_secs(6)).await;
req_count += 1;
if req_count % 25 == 0 {
// Optional: Enforce 6-sec window for bursts
let elapsed = start_time.elapsed();
if self.has_key {
if elapsed < Duration::from_secs(6) {
sleep(Duration::from_secs(6) - elapsed).await;
}
} else {
if elapsed < Duration::from_secs(6) {
sleep(Duration::from_secs(60) - elapsed).await;
}
}
} else {
sleep(Duration::from_millis(500)).await; // Slower without key
sleep(inter_sleep).await;
}
}
all_figis.dedup(); // Unique FIGIs per LEI
Ok(all_figis)
Ok(all_figi_infos)
}
/// Checks if the client has an API key configured.
pub fn has_key(&self) -> bool {
self.has_key
}
/// Returns a reference to the underlying HTTP client.
pub fn get_figi_client(&self) -> &HttpClient {
&self.client
}
}
/// Build FIGI → LEI map from CSV, filtering equities via OpenFIGI
pub async fn build_figi_to_lei_map(lei_to_isins: &HashMap<String, Vec<String>>) -> anyhow::Result<HashMap<String, String>> {
/// Builds a LEI-to-FigiInfo map from the LEI-ISIN mapping, filtering for equities via OpenFIGI.
///
/// Attempts to load existing entries from "data/companies_by_lei/lei_to_figi.jsonl" (JSON Lines format,
/// one LEI entry per line: {"lei": "ABC", "figis": [FigiInfo...]}). For any missing LEIs (compared to
/// `lei_to_isins`), fetches their FigiInfos and appends to the .jsonl file incrementally.
///
/// This design allows resumption after interruptions: on restart, already processed LEIs are skipped,
/// and only remaining ones are fetched. Processes LEIs in sorted order for deterministic behavior.
///
/// If no API key is present, skips building new entries and returns the loaded map (possibly partial).
///
/// # Arguments
///
/// * `lei_to_isins` - HashMap of LEI to Vec<ISIN> (used for fetching missing entries).
///
/// # Returns
///
/// The complete (or partial if interrupted) HashMap<LEI, Vec<FigiInfo>>.
///
/// # Errors
///
/// Returns an error if file I/O fails, JSON serialization/deserialization fails,
/// or if OpenFIGI queries fail during fetching.
pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
let data_dir = Path::new("data/companies_by_lei");
tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?;
let path = data_dir.join("lei_to_figi.jsonl");
let mut lei_to_figis: HashMap<String, Vec<FigiInfo>> = load_lei_to_figi_jsonl(&path)?;
let client = OpenFigiClient::new()?;
if !client.has_key {
println!("No API key—skipping FIGI mapping (using empty map)");
return Ok(HashMap::new());
println!("No API key—using partial LEI→FIGI map with {} entries", lei_to_figis.len());
return Ok(lei_to_figis);
}
let mut figi_to_lei: HashMap<String, String> = HashMap::new();
let mut processed = 0;
// Sort LEIs for deterministic processing order
let mut leis: Vec<_> = lei_to_isins.keys().cloned().collect();
leis.sort();
for (lei, isins) in lei_to_isins {
let unique_isins: Vec<_> = isins.iter().cloned().collect::<HashSet<_>>().into_iter().collect();
let equity_figis = client.map_isins_to_figi(&unique_isins).await?;
let mut processed = lei_to_figis.len();
let total = leis.len();
for figi in equity_figis {
figi_to_lei.insert(figi, lei.clone());
for lei in leis {
if lei_to_figis.contains_key(&lei) {
continue; // Skip already processed
}
let isins = match lei_to_isins.get(&lei) {
Some(i) => i,
None => continue,
};
let unique_isins: Vec<_> = isins.iter().cloned().collect::<HashSet<_>>().into_iter().collect();
let equity_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?;
let mut figis = equity_figi_infos;
if !figis.is_empty() {
figis.sort_by_key(|f| f.figi.clone());
figis.dedup_by_key(|f| f.figi.clone());
}
// Append to .jsonl incrementally
append_lei_to_figi_jsonl(&path, &lei, &figis).context("Failed to append to JSONL")?;
// Insert into in-memory map
lei_to_figis.insert(lei.clone(), figis);
processed += 1;
if processed % 100 == 0 {
println!("Processed {} LEIs → {} total equity FIGIs", processed, figi_to_lei.len());
println!("Processed {}/{} LEIs → {} total equity FIGIs", processed, total, lei_to_figis.values().map(|v| v.len()).sum::<usize>());
}
// Throttle per-LEI (heavy LEIs have 100s of ISINs)
sleep(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Save full map
let data_dir = std::path::Path::new("data");
tokio::fs::create_dir_all(data_dir).await?;
tokio::fs::write("data/figi_to_lei.json", serde_json::to_string_pretty(&figi_to_lei)?).await?;
println!("Built FIGI→LEI map: {} mappings (equity-only)", figi_to_lei.len());
Ok(figi_to_lei)
println!("Completed LEI→FIGI map: {} mappings (equity-only)", lei_to_figis.len());
Ok(lei_to_figis)
}
/// Load/build companies using FIGI as key (enriched with LEI via map)
pub async fn load_or_build_companies_figi(
/// Loads or builds the LEI-to-FigiInfo map, filtering for equities via OpenFIGI.
///
/// Attempts to load from "data/companies_by_lei/lei_to_figi.jsonl" (JSON Lines format, one LEI entry per line).
/// For any missing LEIs (compared to `lei_to_isins`), fetches their FigiInfos and appends
/// to the .jsonl file incrementally. This allows resumption after interruptions: on restart,
/// already processed LEIs are skipped, and only missing ones are fetched.
///
/// If no API key is present, skips building and returns the loaded map (possibly partial).
///
/// # Arguments
///
/// * `lei_to_isins` - HashMap of LEI to Vec<ISIN> (used for building missing entries).
///
/// # Returns
///
/// The complete (or partial if interrupted) HashMap<LEI, Vec<FigiInfo>>.
///
/// # Errors
///
/// Returns an error if file I/O fails, JSON serialization/deserialization fails,
/// or if OpenFIGI queries fail during building.
pub async fn load_or_build_lei_to_figi_infos(lei_to_isins: &HashMap<String, Vec<String>>) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
let data_dir = Path::new("data");
tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?;
let path = data_dir.join("lei_to_figi.jsonl");
let mut lei_to_figis: HashMap<String, Vec<FigiInfo>> = load_lei_to_figi_jsonl(&path)?;
let client = OpenFigiClient::new()?;
if !client.has_key {
println!("No API key—using partial LEI→FIGI map with {} entries", lei_to_figis.len());
return Ok(lei_to_figis);
}
// Sort LEIs for deterministic processing order
let mut leis: Vec<_> = lei_to_isins.keys().cloned().collect();
leis.sort();
let mut processed = lei_to_figis.len();
let total = leis.len();
for lei in leis {
if lei_to_figis.contains_key(&lei) {
continue; // Skip already processed
}
let isins = match lei_to_isins.get(&lei) {
Some(i) => i,
None => continue,
};
let unique_isins: Vec<_> = isins.iter().cloned().collect::<HashSet<_>>().into_iter().collect();
let equity_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?;
let mut figis = equity_figi_infos;
if !figis.is_empty() {
figis.sort_by_key(|f| f.figi.clone());
figis.dedup_by_key(|f| f.figi.clone());
}
// Append to .jsonl
append_lei_to_figi_jsonl(&path, &lei, &figis)?;
// Insert into in-memory map (optional, but useful for return value)
lei_to_figis.insert(lei.clone(), figis);
processed += 1;
if processed % 100 == 0 {
println!("Processed {}/{} LEIs → {} total equity FIGIs", processed, total, lei_to_figis.values().map(|v| v.len()).sum::<usize>());
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
println!("Completed LEI→FIGI map: {} mappings (equity-only)", lei_to_figis.len());
Ok(lei_to_figis)
}
/// Loads LEI-to-FigiInfo map from a JSON Lines file.
///
/// Each line is expected to be a JSON object: {"lei": "ABC", "figis": [FigiInfo...]}
///
/// # Arguments
///
/// * `path` - Path to the .jsonl file.
///
/// # Returns
///
/// The loaded HashMap<LEI, Vec<FigiInfo>>.
///
/// # Errors
///
/// Returns an error if the file cannot be opened or if any line fails to parse as JSON.
fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result<HashMap<String, Vec<FigiInfo>>> {
let mut map = HashMap::new();
if !path.exists() {
return Ok(map);
}
let file = File::open(path).context("Failed to open JSONL file for reading")?;
let reader = BufReader::new(file);
for (line_num, line) in reader.lines().enumerate() {
let line = line.context(format!("Failed to read line {}", line_num + 1))?;
if line.trim().is_empty() {
continue;
}
let entry: Value = serde_json::from_str(&line).context(format!("Failed to parse JSON on line {}", line_num + 1))?;
let lei = entry["lei"].as_str().context("Missing 'lei' field")?.to_string();
let figis: Vec<FigiInfo> = serde_json::from_value(entry["figis"].clone()).context("Invalid 'figis' field")?;
map.insert(lei, figis);
}
println!("Loaded LEI→FIGI map with {} entries from {}", map.len(), path.display());
Ok(map)
}
/// Appends a single LEI entry to the JSON Lines file.
///
/// # Arguments
///
/// * `path` - Path to the .jsonl file.
/// * `lei` - The LEI key.
/// * `figis` - The Vec<FigiInfo> for this LEI.
///
/// # Errors
///
/// Returns an error if the file cannot be opened for append or if serialization fails.
fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyhow::Result<()> {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.context("Failed to open JSONL file for append")?;
let entry = json!({
"lei": lei,
"figis": figis,
});
let line = serde_json::to_string(&entry).context("Failed to serialize entry")? + "\n";
file.write_all(line.as_bytes()).context("Failed to write to JSONL file")?;
Ok(())
}
/// Loads or builds a list of CompanyMetadata using LEI as the primary key.
///
/// Attempts to load pre-built company metadata from "data/companies_by_lei/companies_lei.json".
/// If the cache does not exist, builds the metadata by first obtaining the LEI-to-FigiInfo map
/// (loading or fetching via OpenFIGI if necessary), then constructs CompanyMetadata for each LEI.
///
/// Only includes LEIs that have associated ISINs from the input map. If no FigiInfos are available
/// for a LEI (e.g., no equity listings), the `figi` field will be `None`.
///
/// # Arguments
///
/// * `lei_to_isins` - Mapping of LEI to associated ISINs (used for building the FigiInfo map if needed).
///
/// # Returns
///
/// A vector of `CompanyMetadata` structs, sorted by LEI.
///
/// # Errors
///
/// Returns an error if file I/O fails, JSON serialization/deserialization fails,
/// or if building the LEI-to-FigiInfo map encounters issues (e.g., API errors).
pub async fn load_or_build_companies_lei(
lei_to_isins: &HashMap<String, Vec<String>>,
figi_to_lei: &HashMap<String, String>,
) -> anyhow::Result<Vec<CompanyMetadata>> {
let data_dir = std::path::Path::new("data/companies_by_figi");
tokio::fs::create_dir_all(data_dir).await?;
let cache_path = Path::new("data/companies_by_lei/companies_lei.json");
if cache_path.exists() {
let content = tokio_fs::read_to_string(cache_path).await.context("Failed to read companies cache")?;
let mut companies: Vec<CompanyMetadata> = serde_json::from_str(&content).context("Failed to parse companies JSON")?;
companies.sort_by_key(|c| c.lei.clone());
println!("Loaded {} LEI-keyed companies from cache.", companies.len());
return Ok(companies);
}
// Build or load the LEI-to-FigiInfo map (with incremental persistence)
let lei_to_figi = load_or_build_lei_to_figi_infos(lei_to_isins).await?;
// Build companies from all LEIs in lei_to_isins (even if no FigiInfos)
let mut companies = Vec::new();
for lei in lei_to_isins.keys() {
let figis = lei_to_figi.get(lei).cloned();
companies.push(CompanyMetadata {
lei: lei.clone(),
figi: figis.and_then(|v| if v.is_empty() { None } else { Some(v) }),
});
}
companies.sort_by_key(|c| c.lei.clone());
// Cache the result
let data_dir = Path::new("data");
tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?;
tokio_fs::write(cache_path, serde_json::to_string_pretty(&companies)?).await.context("Failed to write companies cache")?;
println!("Built {} FIGI-keyed companies.", companies.len());
println!("Built and cached {} LEI-keyed companies.", companies.len());
Ok(companies)
}