From 95fd9ca1414d494edd8304e6617bbc59b69927cd Mon Sep 17 00:00:00 2001 From: donpat1to Date: Tue, 2 Dec 2025 17:10:34 +0100 Subject: [PATCH] working api calls --- src/corporate/openfigi.rs | 445 ++++++++++++++++++++++++++++++++------ src/corporate/scraper.rs | 433 +++++++++++++++++++++++++------------ src/corporate/storage.rs | 20 +- src/corporate/types.rs | 38 +++- src/corporate/update.rs | 272 +++++++++++++---------- src/scraper/webdriver.rs | 219 +++++++++++++++++++ 6 files changed, 1104 insertions(+), 323 deletions(-) create mode 100644 src/scraper/webdriver.rs diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index 10f2047..80f1dc6 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -4,8 +4,13 @@ use reqwest::Client as HttpClient; use reqwest::header::{HeaderMap, HeaderValue}; use serde_json::{json, Value}; use std::collections::{HashMap, HashSet}; +use std::fs::{File, OpenOptions}; +use std::io::{BufRead, BufReader, Write}; +use std::path::Path; +use std::time::Instant; use tokio::time::{sleep, Duration}; -use anyhow::Context; +use tokio::fs as tokio_fs; +use anyhow::{Context, anyhow}; #[derive(Clone)] pub struct OpenFigiClient { @@ -15,6 +20,13 @@ pub struct OpenFigiClient { } impl OpenFigiClient { + /// Creates a new OpenFIGI client, optionally with an API key. + /// + /// Loads the API key from the `OPENFIGI_API_KEY` environment variable if present. + /// + /// # Errors + /// + /// Returns an error if the HTTP client cannot be built or if the API key header is invalid. pub fn new() -> anyhow::Result { let api_key = dotenvy::var("OPENFIGI_API_KEY").ok(); let has_key = api_key.is_some(); @@ -39,19 +51,60 @@ impl OpenFigiClient { Ok(Self { client, api_key, has_key }) } - /// Batch-map ISINs to FIGI, filtering equities only - pub async fn map_isins_to_figi(&self, isins: &[String]) -> anyhow::Result> { - if isins.is_empty() { return Ok(vec![]); } + /// Maps a batch of ISINs to FigiInfo structs, filtering for equities only. + /// + /// Batches requests according to rate limits (100 jobs/req with key, 5 without). + /// Optimizes inter-request delays to approach the rate limit without exceeding it: + /// - With key: ~240ms sleep per request (to sustain ~4 req/sec or 250 req/min). + /// - Without key: 2.4s sleep (to sustain 25 req/min). + /// Handles 429 rate limits with header-based backoff. + /// Collects detailed FigiInfo from responses, using `exchCode` as proxy for `mic_code`. + /// + /// # Arguments + /// + /// * `isins` - Slice of ISIN strings to map (deduplicated internally if needed). + /// + /// # Returns + /// + /// A vector of `FigiInfo` structs for equity instruments. + /// + /// # Errors + /// + /// Returns an error on HTTP failures, JSON parsing issues, invalid API keys, + /// or repeated rate limit violations after backoff. + /// + /// # Examples + /// + /// ```no_run + /// # use anyhow::Result; + /// # async fn example(client: &OpenFigiClient) -> Result<()> { + /// let isins = vec!["US0378331005".to_string(), "US5949181045".to_string()]; + /// let figis = client.map_isins_to_figi_infos(&isins).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn map_isins_to_figi_infos(&self, isins: &[String]) -> anyhow::Result> { + if isins.is_empty() { + return Ok(vec![]); + } - let mut all_figis = Vec::new(); + let mut all_figi_infos = Vec::new(); let chunk_size = if self.has_key { 100 } else { 5 }; + let inter_sleep = if self.has_key { + Duration::from_millis(240) // ~4.16 req/sec (250/min) + } else { + Duration::from_millis(2400) // 25/min + }; + + let start_time = Instant::now(); + let mut req_count = 0; for chunk in isins.chunks(chunk_size) { let jobs: Vec = chunk.iter() .map(|isin| json!({ "idType": "ID_ISIN", "idValue": isin, - "marketSecDes": "Equity", // Pre-filter to equities + "marketSecDes": "Equity", })) .collect(); @@ -60,36 +113,35 @@ impl OpenFigiClient { .header("Content-Type", "application/json") .json(&jobs) .send() - .await?; + .await + .context("Failed to send mapping request")?; let status = resp.status(); let headers = resp.headers().clone(); - let body = resp.text().await.unwrap_or_default(); + let body = resp.text().await.context("Failed to read response body")?; if status.is_client_error() || status.is_server_error() { - if status == 401 { - return Err(anyhow::anyhow!("Invalid OpenFIGI API key: {}", body)); - } else if status == 413 { - return Err(anyhow::anyhow!("Payload too large—reduce chunk size: {}", body)); - } else if status == 429 { - let reset = headers + if status == 429 { + let reset_sec = headers .get("ratelimit-reset") .and_then(|v| v.to_str().ok()) - .unwrap_or("10") - .parse::() + .and_then(|s| s.parse::().ok()) .unwrap_or(10); - - println!("Rate limited—backing off {}s", reset); - sleep(Duration::from_secs(reset.max(10))).await; - continue; + println!("Rate limited—backing off {}s", reset_sec); + sleep(Duration::from_secs(reset_sec.max(10))).await; + continue; // Retry the same chunk + } else if status == 401 { + return Err(anyhow!("Invalid OpenFIGI API key: {}", body)); + } else if status == 413 { + return Err(anyhow!("Payload too large—reduce chunk size: {}", body)); } - - return Err(anyhow::anyhow!("OpenFIGI error {}: {}", status, body)); + return Err(anyhow!("OpenFIGI error {}: {}", status, body)); } - // JSON aus dem *Body-String* parsen - let results: Vec = serde_json::from_str(&body)?; - for (job, result) in chunk.iter().zip(results) { + let results: Vec = serde_json::from_str(&body) + .context("Failed to parse response JSON")?; + + for (isin, result) in chunk.iter().zip(results) { if let Some(data) = result["data"].as_array() { for item in data { let sec_type = item["securityType"].as_str().unwrap_or(""); @@ -97,76 +149,347 @@ impl OpenFigiClient { if market_sec == "Equity" && (sec_type.contains("Stock") || sec_type.contains("Share") || sec_type.contains("Equity") || sec_type.contains("Common") || sec_type.contains("Preferred") || sec_type == "ADR" || sec_type == "GDR") { - if let Some(figi) = item["figi"].as_str() { - all_figis.push(figi.to_string()); - } + let figi = match item["figi"].as_str() { + Some(f) => f.to_string(), + None => continue, + }; + + let figi_info = FigiInfo { + isin: isin.clone(), + figi, + name: item["name"].as_str().unwrap_or("").to_string(), + ticker: item["ticker"].as_str().unwrap_or("").to_string(), + mic_code: item["exchCode"].as_str().unwrap_or("").to_string(), + currency: item["currency"].as_str().unwrap_or("").to_string(), + compositeFIGI: item["compositeFIGI"].as_str().unwrap_or("").to_string(), + securityType: sec_type.to_string(), + marketSector: market_sec.to_string(), + shareClassFIGI: item["shareClassFIGI"].as_str().unwrap_or("").to_string(), + securityType2: item["securityType2"].as_str().unwrap_or("").to_string(), + securityDescription: item["securityDescription"].as_str().unwrap_or("").to_string(), + }; + + all_figi_infos.push(figi_info); } } } } - // Rate limit respect: 6s between requests with key - if self.has_key { - sleep(Duration::from_secs(6)).await; + req_count += 1; + if req_count % 25 == 0 { + // Optional: Enforce 6-sec window for bursts + let elapsed = start_time.elapsed(); + if self.has_key { + if elapsed < Duration::from_secs(6) { + sleep(Duration::from_secs(6) - elapsed).await; + } + } else { + if elapsed < Duration::from_secs(6) { + sleep(Duration::from_secs(60) - elapsed).await; + } + } } else { - sleep(Duration::from_millis(500)).await; // Slower without key + sleep(inter_sleep).await; } } - all_figis.dedup(); // Unique FIGIs per LEI - Ok(all_figis) + Ok(all_figi_infos) + } + + /// Checks if the client has an API key configured. + pub fn has_key(&self) -> bool { + self.has_key + } + + /// Returns a reference to the underlying HTTP client. + pub fn get_figi_client(&self) -> &HttpClient { + &self.client } } -/// Build FIGI → LEI map from CSV, filtering equities via OpenFIGI -pub async fn build_figi_to_lei_map(lei_to_isins: &HashMap>) -> anyhow::Result> { +/// Builds a LEI-to-FigiInfo map from the LEI-ISIN mapping, filtering for equities via OpenFIGI. +/// +/// Attempts to load existing entries from "data/companies_by_lei/lei_to_figi.jsonl" (JSON Lines format, +/// one LEI entry per line: {"lei": "ABC", "figis": [FigiInfo...]}). For any missing LEIs (compared to +/// `lei_to_isins`), fetches their FigiInfos and appends to the .jsonl file incrementally. +/// +/// This design allows resumption after interruptions: on restart, already processed LEIs are skipped, +/// and only remaining ones are fetched. Processes LEIs in sorted order for deterministic behavior. +/// +/// If no API key is present, skips building new entries and returns the loaded map (possibly partial). +/// +/// # Arguments +/// +/// * `lei_to_isins` - HashMap of LEI to Vec (used for fetching missing entries). +/// +/// # Returns +/// +/// The complete (or partial if interrupted) HashMap>. +/// +/// # Errors +/// +/// Returns an error if file I/O fails, JSON serialization/deserialization fails, +/// or if OpenFIGI queries fail during fetching. +pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap>) -> anyhow::Result>> { + let data_dir = Path::new("data/companies_by_lei"); + tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?; + + let path = data_dir.join("lei_to_figi.jsonl"); + let mut lei_to_figis: HashMap> = load_lei_to_figi_jsonl(&path)?; + let client = OpenFigiClient::new()?; if !client.has_key { - println!("No API key—skipping FIGI mapping (using empty map)"); - return Ok(HashMap::new()); + println!("No API key—using partial LEI→FIGI map with {} entries", lei_to_figis.len()); + return Ok(lei_to_figis); } - let mut figi_to_lei: HashMap = HashMap::new(); - let mut processed = 0; + // Sort LEIs for deterministic processing order + let mut leis: Vec<_> = lei_to_isins.keys().cloned().collect(); + leis.sort(); - for (lei, isins) in lei_to_isins { - let unique_isins: Vec<_> = isins.iter().cloned().collect::>().into_iter().collect(); - let equity_figis = client.map_isins_to_figi(&unique_isins).await?; + let mut processed = lei_to_figis.len(); + let total = leis.len(); - for figi in equity_figis { - figi_to_lei.insert(figi, lei.clone()); + for lei in leis { + if lei_to_figis.contains_key(&lei) { + continue; // Skip already processed } + let isins = match lei_to_isins.get(&lei) { + Some(i) => i, + None => continue, + }; + + let unique_isins: Vec<_> = isins.iter().cloned().collect::>().into_iter().collect(); + let equity_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; + + let mut figis = equity_figi_infos; + if !figis.is_empty() { + figis.sort_by_key(|f| f.figi.clone()); + figis.dedup_by_key(|f| f.figi.clone()); + } + + // Append to .jsonl incrementally + append_lei_to_figi_jsonl(&path, &lei, &figis).context("Failed to append to JSONL")?; + + // Insert into in-memory map + lei_to_figis.insert(lei.clone(), figis); + processed += 1; if processed % 100 == 0 { - println!("Processed {} LEIs → {} total equity FIGIs", processed, figi_to_lei.len()); + println!("Processed {}/{} LEIs → {} total equity FIGIs", processed, total, lei_to_figis.values().map(|v| v.len()).sum::()); } - // Throttle per-LEI (heavy LEIs have 100s of ISINs) - sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } - // Save full map - let data_dir = std::path::Path::new("data"); - tokio::fs::create_dir_all(data_dir).await?; - tokio::fs::write("data/figi_to_lei.json", serde_json::to_string_pretty(&figi_to_lei)?).await?; - - println!("Built FIGI→LEI map: {} mappings (equity-only)", figi_to_lei.len()); - Ok(figi_to_lei) + println!("Completed LEI→FIGI map: {} mappings (equity-only)", lei_to_figis.len()); + Ok(lei_to_figis) } -/// Load/build companies using FIGI as key (enriched with LEI via map) -pub async fn load_or_build_companies_figi( +/// Loads or builds the LEI-to-FigiInfo map, filtering for equities via OpenFIGI. +/// +/// Attempts to load from "data/companies_by_lei/lei_to_figi.jsonl" (JSON Lines format, one LEI entry per line). +/// For any missing LEIs (compared to `lei_to_isins`), fetches their FigiInfos and appends +/// to the .jsonl file incrementally. This allows resumption after interruptions: on restart, +/// already processed LEIs are skipped, and only missing ones are fetched. +/// +/// If no API key is present, skips building and returns the loaded map (possibly partial). +/// +/// # Arguments +/// +/// * `lei_to_isins` - HashMap of LEI to Vec (used for building missing entries). +/// +/// # Returns +/// +/// The complete (or partial if interrupted) HashMap>. +/// +/// # Errors +/// +/// Returns an error if file I/O fails, JSON serialization/deserialization fails, +/// or if OpenFIGI queries fail during building. +pub async fn load_or_build_lei_to_figi_infos(lei_to_isins: &HashMap>) -> anyhow::Result>> { + let data_dir = Path::new("data"); + tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?; + + let path = data_dir.join("lei_to_figi.jsonl"); + let mut lei_to_figis: HashMap> = load_lei_to_figi_jsonl(&path)?; + + let client = OpenFigiClient::new()?; + if !client.has_key { + println!("No API key—using partial LEI→FIGI map with {} entries", lei_to_figis.len()); + return Ok(lei_to_figis); + } + + // Sort LEIs for deterministic processing order + let mut leis: Vec<_> = lei_to_isins.keys().cloned().collect(); + leis.sort(); + + let mut processed = lei_to_figis.len(); + let total = leis.len(); + + for lei in leis { + if lei_to_figis.contains_key(&lei) { + continue; // Skip already processed + } + + let isins = match lei_to_isins.get(&lei) { + Some(i) => i, + None => continue, + }; + + let unique_isins: Vec<_> = isins.iter().cloned().collect::>().into_iter().collect(); + let equity_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; + + let mut figis = equity_figi_infos; + if !figis.is_empty() { + figis.sort_by_key(|f| f.figi.clone()); + figis.dedup_by_key(|f| f.figi.clone()); + } + + // Append to .jsonl + append_lei_to_figi_jsonl(&path, &lei, &figis)?; + + // Insert into in-memory map (optional, but useful for return value) + lei_to_figis.insert(lei.clone(), figis); + + processed += 1; + if processed % 100 == 0 { + println!("Processed {}/{} LEIs → {} total equity FIGIs", processed, total, lei_to_figis.values().map(|v| v.len()).sum::()); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } + + println!("Completed LEI→FIGI map: {} mappings (equity-only)", lei_to_figis.len()); + Ok(lei_to_figis) +} + +/// Loads LEI-to-FigiInfo map from a JSON Lines file. +/// +/// Each line is expected to be a JSON object: {"lei": "ABC", "figis": [FigiInfo...]} +/// +/// # Arguments +/// +/// * `path` - Path to the .jsonl file. +/// +/// # Returns +/// +/// The loaded HashMap>. +/// +/// # Errors +/// +/// Returns an error if the file cannot be opened or if any line fails to parse as JSON. +fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result>> { + let mut map = HashMap::new(); + + if !path.exists() { + return Ok(map); + } + + let file = File::open(path).context("Failed to open JSONL file for reading")?; + let reader = BufReader::new(file); + + for (line_num, line) in reader.lines().enumerate() { + let line = line.context(format!("Failed to read line {}", line_num + 1))?; + if line.trim().is_empty() { + continue; + } + + let entry: Value = serde_json::from_str(&line).context(format!("Failed to parse JSON on line {}", line_num + 1))?; + let lei = entry["lei"].as_str().context("Missing 'lei' field")?.to_string(); + let figis: Vec = serde_json::from_value(entry["figis"].clone()).context("Invalid 'figis' field")?; + + map.insert(lei, figis); + } + + println!("Loaded LEI→FIGI map with {} entries from {}", map.len(), path.display()); + Ok(map) +} + +/// Appends a single LEI entry to the JSON Lines file. +/// +/// # Arguments +/// +/// * `path` - Path to the .jsonl file. +/// * `lei` - The LEI key. +/// * `figis` - The Vec for this LEI. +/// +/// # Errors +/// +/// Returns an error if the file cannot be opened for append or if serialization fails. +fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyhow::Result<()> { + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .context("Failed to open JSONL file for append")?; + + let entry = json!({ + "lei": lei, + "figis": figis, + }); + + let line = serde_json::to_string(&entry).context("Failed to serialize entry")? + "\n"; + file.write_all(line.as_bytes()).context("Failed to write to JSONL file")?; + + Ok(()) +} + +/// Loads or builds a list of CompanyMetadata using LEI as the primary key. +/// +/// Attempts to load pre-built company metadata from "data/companies_by_lei/companies_lei.json". +/// If the cache does not exist, builds the metadata by first obtaining the LEI-to-FigiInfo map +/// (loading or fetching via OpenFIGI if necessary), then constructs CompanyMetadata for each LEI. +/// +/// Only includes LEIs that have associated ISINs from the input map. If no FigiInfos are available +/// for a LEI (e.g., no equity listings), the `figi` field will be `None`. +/// +/// # Arguments +/// +/// * `lei_to_isins` - Mapping of LEI to associated ISINs (used for building the FigiInfo map if needed). +/// +/// # Returns +/// +/// A vector of `CompanyMetadata` structs, sorted by LEI. +/// +/// # Errors +/// +/// Returns an error if file I/O fails, JSON serialization/deserialization fails, +/// or if building the LEI-to-FigiInfo map encounters issues (e.g., API errors). +pub async fn load_or_build_companies_lei( lei_to_isins: &HashMap>, - figi_to_lei: &HashMap, ) -> anyhow::Result> { - let data_dir = std::path::Path::new("data/companies_by_figi"); - tokio::fs::create_dir_all(data_dir).await?; + let cache_path = Path::new("data/companies_by_lei/companies_lei.json"); + if cache_path.exists() { + let content = tokio_fs::read_to_string(cache_path).await.context("Failed to read companies cache")?; + let mut companies: Vec = serde_json::from_str(&content).context("Failed to parse companies JSON")?; + companies.sort_by_key(|c| c.lei.clone()); + println!("Loaded {} LEI-keyed companies from cache.", companies.len()); + return Ok(companies); + } + + // Build or load the LEI-to-FigiInfo map (with incremental persistence) + let lei_to_figi = load_or_build_lei_to_figi_infos(lei_to_isins).await?; + + // Build companies from all LEIs in lei_to_isins (even if no FigiInfos) let mut companies = Vec::new(); + for lei in lei_to_isins.keys() { + let figis = lei_to_figi.get(lei).cloned(); + companies.push(CompanyMetadata { + lei: lei.clone(), + figi: figis.and_then(|v| if v.is_empty() { None } else { Some(v) }), + }); + } + companies.sort_by_key(|c| c.lei.clone()); + // Cache the result + let data_dir = Path::new("data"); + tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?; + tokio_fs::write(cache_path, serde_json::to_string_pretty(&companies)?).await.context("Failed to write companies cache")?; - println!("Built {} FIGI-keyed companies.", companies.len()); + println!("Built and cached {} LEI-keyed companies.", companies.len()); Ok(companies) } \ No newline at end of file diff --git a/src/corporate/scraper.rs b/src/corporate/scraper.rs index 0b5b11e..1501112 100644 --- a/src/corporate/scraper.rs +++ b/src/corporate/scraper.rs @@ -1,3 +1,5 @@ +use crate::corporate::openfigi::OpenFigiClient; + // src/corporate/scraper.rs use super::{types::*, helpers::*}; use csv::ReaderBuilder; @@ -6,7 +8,7 @@ use scraper::{Html, Selector}; use chrono::{DateTime, Duration, NaiveDate, Timelike, Utc}; use tokio::{time::{Duration as TokioDuration, sleep}}; use reqwest::Client as HttpClient; -use serde_json::Value; +use serde_json::{json, Value}; use zip::ZipArchive; use std::fs::File; use std::{collections::HashMap}; @@ -14,15 +16,25 @@ use std::io::{Read, BufReader}; const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"; -/// Discover all exchanges where this ISIN trades by querying Yahoo Finance -pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> anyhow::Result> { +/// Discover all exchanges where this ISIN trades by querying Yahoo Finance and enriching with OpenFIGI API calls. +/// +/// # Arguments +/// * `isin` - The ISIN to search for. +/// * `known_ticker` - A known ticker symbol for fallback or initial check. +/// +/// # Returns +/// A vector of FigiInfo structs containing enriched data from API calls. +/// +/// # Errors +/// Returns an error if HTTP requests fail, JSON parsing fails, or OpenFIGI API responds with an error. +pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> anyhow::Result> { println!(" Discovering exchanges for ISIN {}", isin); - let mut discovered_tickers = Vec::new(); + let mut potential: Vec<(String, PrimaryInfo)> = Vec::new(); // Try the primary ticker first if let Ok(info) = check_ticker_exists(known_ticker).await { - discovered_tickers.push(info); + potential.push((known_ticker.to_string(), info)); } // Search for ISIN directly on Yahoo to find other listings @@ -31,149 +43,267 @@ pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> any isin ); - match HttpClient::new() + let resp = HttpClient::new() .get(&search_url) .header("User-Agent", USER_AGENT) .send() + .await?; + + let json = resp.json::().await?; + + if let Some(quotes) = json["quotes"].as_array() { + for quote in quotes { + // First: filter by quoteType directly from search results (faster rejection) + let quote_type = quote["quoteType"].as_str().unwrap_or(""); + if quote_type.to_uppercase() != "EQUITY" { + continue; // Skip bonds, ETFs, mutual funds, options, etc. + } + + if let Some(symbol) = quote["symbol"].as_str() { + // Avoid duplicates + if potential.iter().any(|(s, _)| s == symbol) { + continue; + } + + // Double-check with full quote data (some search results are misleading) + if let Ok(info) = check_ticker_exists(symbol).await { + potential.push((symbol.to_string(), info)); + } + } + } + } + + if potential.is_empty() { + return Ok(vec![]); + } + + // Enrich with OpenFIGI API + let client = OpenFigiClient::new()?; + + let mut discovered_figis = Vec::new(); + + if !client.has_key() { + // Fallback without API key - create FigiInfo with default/empty fields + for (symbol, info) in potential { + println!(" Found equity listing: {} on {} ({}) - no FIGI (fallback mode)", symbol, info.exchange_mic, info.currency); + let figi_info = FigiInfo { + isin: info.isin, + figi: String::new(), + name: info.name, + ticker: symbol, + mic_code: info.exchange_mic, + currency: info.currency, + compositeFIGI: String::new(), + securityType: String::new(), + marketSector: String::new(), + shareClassFIGI: String::new(), + securityType2: String::new(), + securityDescription: String::new(), + }; + discovered_figis.push(figi_info); + } + return Ok(discovered_figis); + } + + // With API key, batch the mapping requests + let chunk_size = 100; + for chunk in potential.chunks(chunk_size) { + let mut jobs = vec![]; + for (symbol, info) in chunk { + jobs.push(json!({ + "idType": "TICKER", + "idValue": symbol, + "micCode": info.exchange_mic, + "marketSecDes": "Equity", + })); + } + + let resp = client.get_figi_client() + .post("https://api.openfigi.com/v3/mapping") + .header("Content-Type", "application/json") + .json(&jobs) + .send() + .await?; + + if !resp.status().is_success() { + return Err(anyhow::anyhow!("OpenFIGI mapping failed with status: {}", resp.status())); + } + + let parsed: Vec = resp.json().await?; + + for (i, item) in parsed.iter().enumerate() { + let (symbol, info) = &chunk[i]; + if let Some(data) = item["data"].as_array() { + if let Some(entry) = data.first() { + let market_sec = entry["marketSector"].as_str().unwrap_or(""); + if market_sec != "Equity" { + continue; + } + println!(" Found equity listing: {} on {} ({}) - FIGI: {}", symbol, info.exchange_mic, info.currency, entry["figi"]); + let figi_info = FigiInfo { + isin: info.isin.clone(), + figi: entry["figi"].as_str().unwrap_or("").to_string(), + name: entry["name"].as_str().unwrap_or(&info.name).to_string(), + ticker: symbol.clone(), + mic_code: info.exchange_mic.clone(), + currency: info.currency.clone(), + compositeFIGI: entry["compositeFIGI"].as_str().unwrap_or("").to_string(), + securityType: entry["securityType"].as_str().unwrap_or("").to_string(), + marketSector: market_sec.to_string(), + shareClassFIGI: entry["shareClassFIGI"].as_str().unwrap_or("").to_string(), + securityType2: entry["securityType2"].as_str().unwrap_or("").to_string(), + securityDescription: entry["securityDescription"].as_str().unwrap_or("").to_string(), + }; + discovered_figis.push(figi_info); + } else { + println!(" No data returned for ticker {} on MIC {}", symbol, info.exchange_mic); + } + } else if let Some(error) = item["error"].as_str() { + println!(" OpenFIGI error for ticker {}: {}", symbol, error); + } + } + + // Respect rate limit (6 seconds between requests with key) + sleep(TokioDuration::from_secs(6)).await; + } + + Ok(discovered_figis) +} + +/// Check if a ticker exists on Yahoo Finance and return core metadata. +/// +/// This function calls the public Yahoo Finance quoteSummary endpoint and extracts: +/// - ISIN (when available) +/// - Company name +/// - Exchange MIC code +/// - Trading currency +/// +/// It strictly filters to only accept **equity** securities. +/// +/// # Arguments +/// * `ticker` - The ticker symbol to validate (e.g., "AAPL", "7203.T", "BMW.DE") +/// +/// # Returns +/// `Ok(PrimaryInfo)` on success, `Err` if ticker doesn't exist, is not equity, or data is malformed. +/// +/// # Errors +/// - Ticker not found +/// - Not an equity (ETF, bond, etc.) +/// - Missing critical fields +/// - Network or JSON parsing errors +pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result { + let url = format!( + "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price%2CassetProfile", + ticker + ); + + let resp = match HttpClient::new() + .get(&url) + .header("User-Agent", USER_AGENT) + .send() .await { - Ok(resp) => { - if let Ok(json) = resp.json::().await { - if let Some(quotes) = json["quotes"].as_array() { - for quote in quotes { - // First: filter by quoteType directly from search results (faster rejection) - let quote_type = quote["quoteType"].as_str().unwrap_or(""); - if quote_type.to_uppercase() != "EQUITY" { - continue; // Skip bonds, ETFs, mutual funds, options, etc. - } - - if let Some(symbol) = quote["symbol"].as_str() { - // Avoid duplicates - if discovered_tickers.iter().any(|t: &TickerInfo| t.ticker == symbol) { - continue; - } - - // Double-check with full quote data (some search results are misleading) - match check_ticker_exists(symbol).await { - Ok(info) => { - println!(" Found equity listing: {} on {} ({})", - symbol, info.exchange_mic, info.currency); - discovered_tickers.push(info); - } - Err(e) => { - // Most common: it's not actually equity or not tradable - // println!(" Rejected {}: {}", symbol, e); - continue; - } - } - - // Be respectful to Yahoo - sleep(TokioDuration::from_millis(120)).await; - } - } - } - } + Ok(resp) => resp, + Err(err) => { + return Err(anyhow::anyhow!( + "Failed to reach Yahoo Finance for ticker {}: {}", + ticker, + err + )); } - Err(e) => println!(" Search API error: {}", e), - } - - // Also try common exchange suffixes for the base ticker - if let Some(base) = known_ticker.split('.').next() { - let suffixes = vec![ - "", // US - ".L", // London - ".DE", // Frankfurt/XETRA - ".PA", // Paris - ".AS", // Amsterdam - ".MI", // Milan - ".SW", // Switzerland - ".T", // Tokyo - ".HK", // Hong Kong - ".SS", // Shanghai - ".SZ", // Shenzhen - ".TO", // Toronto - ".AX", // Australia - ".SA", // Brazil - ".MC", // Madrid - ".BO", // Bombay - ".NS", // National Stock Exchange India - ]; - - for suffix in suffixes { - let test_ticker = format!("{}{}", base, suffix); - - // Skip if already found - if discovered_tickers.iter().any(|t| t.ticker == test_ticker) { - continue; - } - - if let Ok(info) = check_ticker_exists(&test_ticker).await { - discovered_tickers.push(info); - sleep(TokioDuration::from_millis(100)).await; - } - } - } - - println!(" Found {} tradable exchanges", discovered_tickers.len()); - Ok(discovered_tickers) -} + }; -/// Check if a ticker exists and get its exchange/currency info -async fn check_ticker_exists(ticker: &str) -> anyhow::Result { - let url = format!( - "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price", - ticker + if !resp.status().is_success() { + return Err(anyhow::anyhow!("Yahoo returned HTTP {} for ticker {}", resp.status(), ticker)); + } + + let json: Value = match resp + .json() + .await { + Ok(resp) => resp, + Err(err) => { + return Err(anyhow::anyhow!( + "Failed to parse JSON response from Yahoo Finance {}: {}", + ticker, + err + )); + } + }; + + let result_array = json["quoteSummary"]["result"] + .as_array() + .ok_or_else(|| anyhow::anyhow!("Missing 'quoteSummary.result' in response"))?; + + if result_array.is_empty() || result_array[0].is_null() { + return Err(anyhow::anyhow!("No quote data returned for ticker {}", ticker)); + } + + let quote = &result_array[0]["price"]; + let profile = &result_array[0]["assetProfile"]; + + // === 1. Must be EQUITY === + let quote_type = quote["quoteType"] + .as_str() + .unwrap_or("") + .to_ascii_uppercase(); + + if quote_type != "EQUITY" { + println!(" → Skipping {} (quoteType: {})", ticker, quote_type); + return Err(anyhow::anyhow!("Not an equity security: {}", quote_type)); + } + + // === 2. Extract basic info === + let long_name = quote["longName"] + .as_str() + .or_else(|| quote["shortName"].as_str()) + .unwrap_or(ticker) + .trim() + .to_string(); + + let currency = quote["currency"] + .as_str() + .unwrap_or("USD") + .to_string(); + + let exchange_mic = quote["exchange"] + .as_str() + .unwrap_or("") + .to_string(); + + if exchange_mic.is_empty() { + return Err(anyhow::anyhow!("Missing exchange MIC for ticker {}", ticker)); + } + + // === 3. Extract ISIN (from assetProfile if available) === + let isin = profile["isin"] + .as_str() + .and_then(|s| if s.len() == 12 && s.chars().all(|c| c.is_ascii_alphanumeric()) { Some(s) } else { None }) + .unwrap_or("") + .to_ascii_uppercase(); + + // === 4. Final sanity check: reject obvious debt securities === + let name_upper = long_name.to_ascii_uppercase(); + if name_upper.contains(" BOND") || + name_upper.contains(" NOTE") || + name_upper.contains(" DEBENTURE") || + name_upper.contains(" PREFERRED") && !name_upper.contains(" STOCK") { + return Err(anyhow::anyhow!("Security name suggests debt instrument: {}", long_name)); + } + + println!( + " → Valid equity: {} | {} | {} | ISIN: {}", + ticker, + long_name, + exchange_mic, + if isin.is_empty() { "N/A" } else { &isin } ); - let resp = HttpClient::new() - .get(&url) - .header("User-Agent", USER_AGENT) - .send() - .await?; - - let json: Value = resp.json().await?; - - if let Some(result) = json["quoteSummary"]["result"].as_array() { - if result.is_empty() { - return Err(anyhow::anyhow!("No quote data for {}", ticker)); - } - - let quote = &result[0]["price"]; - - // CRITICAL: Only accept EQUITY securities - let quote_type = quote["quoteType"] - .as_str() - .unwrap_or("") - .to_uppercase(); - - if quote_type != "EQUITY" { - // Optional: debug what was filtered - println!(" → Skipping {} (quoteType: {})", ticker, quote_type); - return Err(anyhow::anyhow!("Not an equity: {}", quote_type)); - } - - let exchange = quote["exchange"].as_str().unwrap_or(""); - let currency = quote["currency"].as_str().unwrap_or("USD"); - let short_name = quote["shortName"].as_str().unwrap_or(""); - - // Optional: extra sanity — make sure it's not a bond masquerading as equity - if short_name.to_uppercase().contains("BOND") || - short_name.to_uppercase().contains("NOTE") || - short_name.to_uppercase().contains("DEBENTURE") { - return Err(anyhow::anyhow!("Name suggests debt security")); - } - - if !exchange.is_empty() { - return Ok(TickerInfo { - ticker: ticker.to_string(), - exchange_mic: exchange.to_string(), - currency: currency.to_string(), - primary: false, - }); - } - } - - Err(anyhow::anyhow!("Invalid or missing data for {}", ticker)) + Ok(PrimaryInfo { + isin, + name: long_name, + exchange_mic, + currency, + }) } /// Convert Yahoo's exchange name to MIC code (best effort) @@ -225,6 +355,31 @@ pub async fn dismiss_yahoo_consent(client: &Client) -> anyhow::Result<()> { Ok(()) } +/// Fetches earnings events for a ticker using a dedicated ScrapeTask. +/// +/// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar, +/// reject cookies, and extract the events. +/// +/// # Arguments +/// * `ticker` - The stock ticker symbol. +/// +/// # Returns +/// A vector of CompanyEvent structs on success. +/// +/// # Errors +/// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues. +pub async fn get_earnings_events_task(ticker: &str) -> anyhow::Result> { + let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}", ticker); + let task: ScrapeTask> = ScrapeTask::new( + url, + |client| Box::pin(async move { + reject_yahoo_cookies(client).await?; + extract_earnings(client).await // Assuming extract_earnings is an async fn that uses client + }), + ); + task.execute().await +} + pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Result> { let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}&offset=0&size=100", ticker); client.goto(&url).await?; diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index cffd930..6425246 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -210,18 +210,28 @@ pub async fn update_available_exchange( } /// Add a newly discovered exchange before fetching +/// +/// # Arguments +/// * `isin` - The ISIN associated with the exchange. +/// * `figi_info` - The FigiInfo containing ticker, mic_code, and currency. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if loading or saving available exchanges fails. pub async fn add_discovered_exchange( isin: &str, - ticker_info: &TickerInfo, + figi_info: &FigiInfo, ) -> anyhow::Result<()> { let mut exchanges = load_available_exchanges(isin).await?; // Only add if not already present - if !exchanges.iter().any(|e| e.ticker == ticker_info.ticker) { + if !exchanges.iter().any(|e| e.ticker == figi_info.ticker && e.exchange_mic == figi_info.mic_code) { let new_entry = AvailableExchange::new( - ticker_info.ticker.clone(), - ticker_info.exchange_mic.clone(), - ticker_info.currency.clone(), + figi_info.ticker.clone(), + figi_info.mic_code.clone(), + figi_info.currency.clone(), ); exchanges.push(new_entry); save_available_exchanges(isin, exchanges).await?; diff --git a/src/corporate/types.rs b/src/corporate/types.rs index a6bd565..5387575 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -39,21 +39,47 @@ pub struct CompanyEventChange { pub detected_at: String, } +/// Figi Info based on API calls [https://www.openfigi.com/] +/// # Attributes +/// isin: ISIN belonging to this legal entity from lei +/// +/// # Comments +/// Use Mapping the Object List onto Figi Properties #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TickerInfo { +pub struct FigiInfo { + pub isin: String, + pub figi: String, + pub name: String, pub ticker: String, - pub exchange_mic: String, + pub mic_code: String, pub currency: String, - pub isin: String, // ISIN belonging to this legal entity (primary + ADR + GDR) + pub compositeFIGI: String, + pub securityType: String, + pub marketSector: String, + pub shareClassFIGI: String, + pub securityType2: String, + pub securityDescription: String, } +/// Company Meta Data +/// # Attributes +/// * lei: Structuring the companies by legal dependencies [LEI -> Vec] +/// * figi: metadata with ISIN as key #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CompanyMetadata { pub lei: String, - pub figi: Option, + pub figi: Option>, +} + +/// Company Info +/// # Attributes +/// * Name as primary key (for one instition) -> might have to changed when first FigiInfo is coming in +/// * ISIN as the most liquid / preferred traded security (used for fallback) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompanyInfo{ pub name: String, - pub primary_isin: String, // The most liquid / preferred one (used for folder fallback) - pub tickers: Vec, + pub primary_isin: String, + pub securities: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 9567a18..486e637 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -3,184 +3,232 @@ use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfi use crate::config::Config; use chrono::Local; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; -pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> anyhow::Result<()> { - println!("Starting LEI-based corporate update"); +/// Hauptfunktion: Vollständiger Update-Durchlauf für alle Unternehmen (LEI-basiert) +/// +/// Diese Funktion koordiniert den gesamten Update-Prozess: +/// - Lädt GLEIF-Mappings +/// - Baut FIGI-LEI-Map +/// - Lädt bestehende Events +/// - Verarbeitet jede Company: Ergänzt ISINs (abgeleitet aus FIGI), entdeckt Exchanges via FIGI, +/// holt Prices & Earnings, aggregiert Daten +/// - Speichert optimierte Events +/// +/// # Arguments +/// * `config` - Konfiguration mit Startdaten etc. +/// +/// # Returns +/// `Ok(())` bei Erfolg, sonst `anyhow::Error` mit Kontext. +/// +/// # Errors +/// - Mapping-Laden fehlschlägt (Warning, fährt mit leer fort) +/// - Company-Laden/Bauen fehlschlägt +/// - Directory Creation oder Speichern fehlschlägt +/// - Discovery/Fetch/Aggregation pro Company fehlschlägt (fortgesetzt bei Fehlern, mit Log) +pub async fn run_full_update(config: &Config) -> anyhow::Result<()> { + println!("=== Starting LEI-based corporate full update ==="); - // 1. Download fresh GLEIF ISIN↔LEI mapping on every run + // 1. Frisches GLEIF ISIN ↔ LEI Mapping laden (jeder Lauf neu) let lei_to_isins: HashMap> = match load_isin_lei_csv().await { Ok(map) => map, Err(e) => { - println!("Warning: Failed to load ISIN↔LEI mapping: {}", e); + eprintln!("Warning: Could not load GLEIF ISIN↔LEI mapping: {}", e); HashMap::new() } }; - let figi_to_lei: HashMap = match build_figi_to_lei_map(&lei_to_isins).await { + // 2. FIGI → LEI Map (optional, nur mit API-Key sinnvoll) + let figi_to_lei= match build_lei_to_figi_infos(&lei_to_isins).await { Ok(map) => map, Err(e) => { - println!("Warning: Failed to build FIGI→LEI map: {}", e); + eprintln!("Warning: Could not build FIGI→LEI map: {}", e); HashMap::new() } }; - let today = chrono::Local::now().format("%Y-%m-%d").to_string(); - let mut existing_events = load_existing_events().await?; - - let mut companies: Vec = match load_or_build_companies_figi(&lei_to_isins, &figi_to_lei).await { - Ok(comps) => comps, + // 3. Bestehende Earnings-Events laden (für Change-Detection) + let today = Local::now().format("%Y-%m-%d").to_string(); + let mut existing_events = match load_existing_events().await { + Ok(events) => events, Err(e) => { - println!("Error loading/building company metadata: {}", e); - return Err(e); + eprintln!("Warning: Could not load existing events: {}", e); + HashMap::new() } - }; // Vec with lei, isins, tickers + }; - for mut company in companies { - println!("\nProcessing company: {} (LEI: {})", company.name, company.lei); + // 4. Unternehmen laden / neu aufbauen (LEI + FIGI-Infos) + let mut companies: Vec = load_or_build_companies_lei(&lei_to_isins).await?; - // === Enrich with ALL ISINs known to GLEIF (includes ADRs, GDRs, etc.) === - if let Some(all_isins) = lei_to_isins.get(&company.lei) { - let mut seen = company.isins.iter().cloned().collect::>(); - for isin in all_isins { - if !seen.contains(isin) { - company.isins.push(isin.clone()); - seen.insert(isin.clone()); + // 4.1 LEIs anreichern (falls missing, über bekannte ISINs aus FIGI suchen) + //enrich_companies_with_leis(&mut companies, &lei_to_isins).await?; + + // 5. Haupt-Loop: Jedes Unternehmen verarbeiten + for company in companies.iter_mut() { + let lei = &company.lei; + let figi_infos = company.figi.as_ref().map_or(&[][..], |v| &v[..]); + let name = figi_infos.first().map(|f| f.name.as_str()).unwrap_or("Unknown"); + println!("\nProcessing company: {} (LEI: {})", name, lei); + + // --- 5.1 Alle bekannten ISINs aus GLEIF ergänzen --- + let mut all_isins = lei_to_isins.get(lei).cloned().unwrap_or_default(); + let figi_isins: Vec = figi_infos.iter().map(|f| f.isin.clone()).collect::>().into_iter().collect(); + all_isins.extend(figi_isins); + all_isins.sort(); + all_isins.dedup(); // Unique ISINs + + // --- 5.2 Verzeichnisstruktur anlegen & Metadaten speichern --- + ensure_company_dirs(lei).await?; + save_company_metadata(company).await?; + + // --- 5.3 FIGI-Infos ermitteln (falls noch nicht vorhanden) --- + let figi_infos = company.figi.get_or_insert_with(Vec::new); + if figi_infos.is_empty() { + println!(" No FIGI data yet → discovering exchanges via first known ISIN"); + let first_isin = all_isins.first().cloned().unwrap_or_default(); + if !first_isin.is_empty() { + match discover_available_exchanges(&first_isin, "").await { + Ok(discovered) => { + figi_infos.extend(discovered); + println!(" Discovered {} exchange(s) for first ISIN", figi_infos.len()); + } + Err(e) => eprintln!(" Discovery failed for first ISIN: {}", e), } } + } else { + println!(" {} exchange(s) already known", figi_infos.len()); } - // Ensure company directory exists (now uses LEI) - //let figi_dir = format!("data/companies_by_figi/{}/", company.primary_figi); - ensure_company_dirs(&company.lei).await?; - save_company_metadata(&company).await?; - - // === STEP 1: Discover additional exchanges using each known ISIN === - let mut all_tickers = company.tickers.clone(); - - if let Some(primary_ticker) = company.tickers.iter().find(|t| t.primary) { - println!(" Discovering additional exchanges across {} ISIN(s)...", company.isins.len()); - - for isin in &company.isins { - println!(" → Checking ISIN: {}", isin); - match discover_available_exchanges(isin, &primary_ticker.ticker).await { - Ok(discovered) => { - if discovered.is_empty() { - println!(" – No new exchanges found for {}", isin); - } else { - for disc in discovered { - if !all_tickers.iter().any(|t| t.ticker == disc.ticker && t.exchange_mic == disc.exchange_mic) { - println!(" New equity listing → {} ({}) via ISIN {}", - disc.ticker, disc.exchange_mic, isin); - all_tickers.push(disc); - } - } + // --- 5.4 Weitere Exchanges über alle ISINs suchen --- + let mut new_discovered = 0; + for isin in &all_isins { + if figi_infos.iter().any(|f| f.isin == *isin) { + continue; // Schon bekannt + } + println!(" Discovering additional exchanges for ISIN {}", isin); + match discover_available_exchanges(isin, "").await { + Ok(mut found) => { + for info in found.drain(..) { + if !figi_infos.iter().any(|f| f.ticker == info.ticker && f.mic_code == info.mic_code) { + figi_infos.push(info); + new_discovered += 1; } } - Err(e) => println!(" Discovery failed for {}: {}", isin, e), } - tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + Err(e) => eprintln!(" Discovery failed for {}: {}", isin, e), + } + } + if new_discovered > 0 { + println!(" +{} new exchange(s) discovered and added", new_discovered); + } + + // --- 5.5 AvailableExchange-Einträge anlegen (für Preis-Downloads) --- + for figi in figi_infos.iter() { + if let Err(e) = add_discovered_exchange(&figi.isin, figi).await { + eprintln!(" Failed to record exchange {}: {}", figi.ticker, e); } } - // Save updated metadata if we found new listings - if all_tickers.len() > company.tickers.len() { - company.tickers = all_tickers.clone(); - save_company_metadata(&company).await?; - println!(" Updated metadata: {} total tickers", all_tickers.len()); - } - - // === STEP 2: Fetch data from ALL available tickers === - for ticker_info in &all_tickers { - let ticker = &ticker_info.ticker; - println!(" → Fetching: {} ({})", ticker, ticker_info.exchange_mic); - + // --- 5.6 Preisdaten von allen Exchanges holen --- + println!(" Fetching price data from {} exchange(s)...", figi_infos.len()); + let primary_isin = figi_infos.first().map(|f| f.isin.clone()).unwrap_or_default(); + for figi in figi_infos.iter() { + let ticker = &figi.ticker; + let mic = &figi.mic_code; + let is_primary = figi.isin == primary_isin; let mut daily_success = false; let mut intraday_success = false; // Earnings: only fetch from primary ticker to avoid duplicates - if ticker_info.primary { - if let Ok(new_events) = fetch_earnings_history(client, ticker).await { - let result = process_batch(&new_events, &mut existing_events, &today); - save_changes(&result.changes).await?; - println!(" Earnings events: {}", new_events.len()); + if is_primary { + match fetch_earnings_history(client, ticker).await { + Ok(new_events) => { + let result = process_batch(&new_events, &mut existing_events, &today); + save_changes(&result.changes).await?; + println!(" Earnings events: {}", new_events.len()); + } + Err(e) => eprintln!(" Failed to fetch earnings for {}: {}", ticker, e), } } // Daily prices - if let Ok(prices) = fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await { - if !prices.is_empty() { - save_prices_by_source(&company.lei, ticker, "daily", prices).await?; - daily_success = true; + match fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await { + Ok(prices) => { + if !prices.is_empty() { + save_prices_by_source(lei, ticker, "daily", prices).await?; + daily_success = true; + } } + Err(e) => eprintln!(" Failed to fetch daily prices for {}: {}", ticker, e), } // 5-minute intraday (last 60 days) - let sixty_days_ago = (chrono::Local::now() - chrono::Duration::days(60)) + let sixty_days_ago = (Local::now() - chrono::Duration::days(60)) .format("%Y-%m-%d") .to_string(); - - if let Ok(prices) = fetch_price_history_5min(ticker, &sixty_days_ago, &today).await { - if !prices.is_empty() { - save_prices_by_source(&company.lei, ticker, "5min", prices).await?; - intraday_success = true; + match fetch_price_history_5min(ticker, &sixty_days_ago, &today).await { + Ok(prices) => { + if !prices.is_empty() { + save_prices_by_source(lei, ticker, "5min", prices).await?; + intraday_success = true; + } } + Err(e) => eprintln!(" Failed to fetch 5min prices for {}: {}", ticker, e), } // Update available_exchanges.json (now under LEI folder) - update_available_exchange( - &company.lei, - ticker, - &ticker_info.exchange_mic, - daily_success, - intraday_success, - ).await?; + update_available_exchange(&figi.isin, ticker, mic, daily_success, intraday_success).await?; tokio::time::sleep(tokio::time::Duration::from_millis(800)).await; } - // === STEP 3: Aggregate all sources into unified USD prices === - println!(" Aggregating multi-source price data (FX-adjusted)..."); - if let Err(e) = aggregate_best_price_data(&company.lei).await { - println!(" Aggregation failed: {}", e); + // --- 5.7 Aggregation aller Quellen → einheitliche USD-Preise --- + println!(" Aggregating price data across all sources (FX-adjusted to USD)"); + if let Err(e) = aggregate_best_price_data(lei).await { + eprintln!(" Aggregation failed: {}", e); } else { - println!(" Aggregation complete"); + println!(" Aggregation completed successfully"); } + + // Metadaten erneut speichern (falls FIGIs hinzugefügt wurden) + save_company_metadata(company).await?; } - // Final save of optimized earnings events + // 6. Optimierte Earnings-Events final speichern save_optimized_events(existing_events).await?; - println!("\nCorporate update complete (LEI-based)"); + println!("\n=== Corporate full update completed successfully ==="); Ok(()) } -async fn enrich_companies_with_leis( +/// Companies mit LEIs anreichern +async fn _enrich_companies_with_leis( companies: &mut Vec, lei_to_isins: &HashMap>, -) { +) -> anyhow::Result<()> { for company in companies.iter_mut() { - if company.lei.is_empty() { - // Try to find LEI by any known ISIN - for isin in &company.isins { - for (lei, isins) in lei_to_isins { - if isins.contains(isin) { - company.lei = lei.clone(); - println!("Found real LEI {} for {}", lei, company.name); - break; - } - } - if !company.lei.is_empty() { break; } - } + if !company.lei.is_empty() { + continue; } - - // Fallback: generate fake LEI if still missing - if company.lei.is_empty() { - company.lei = format!("FAKE{:019}", rand::random::()); - println!("No real LEI found → using fake for {}", company.name); + + let figi_infos = company.figi.as_ref().map_or(&[][..], |v| &v[..]); + let isins: Vec = figi_infos.iter().map(|f| f.isin.clone()).collect::>().into_iter().collect(); + + // Try to find LEI by any known ISIN + for isin in &isins { + for (lei, isins) in lei_to_isins.iter() { + if isins.contains(isin) { + company.lei = lei.clone(); + let name = figi_infos.first().map(|f| f.name.as_str()).unwrap_or("Unknown"); + println!("Found real LEI {} for {}", lei, name); + break; + } + } + if !company.lei.is_empty() { break; } } } -} + Ok(()) +} pub struct ProcessResult { pub changes: Vec, diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs new file mode 100644 index 0000000..a31d3c0 --- /dev/null +++ b/src/scraper/webdriver.rs @@ -0,0 +1,219 @@ +// src/scraper/webdriver.rs + +use anyhow::{anyhow, Context, Result}; +use fantoccini::{Client, ClientBuilder}; +use std::process::{Stdio}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::{Child, Command}; +use tokio::time::{sleep, Duration}; +use std::pin::Pin; + +/// Represents a single instance of chromedriver process. +/// +/// This struct manages the lifecycle of a chromedriver process, starting it on a random available port +/// and providing a way to connect to it via fantoccini Client. Each instance is independent, allowing +/// for isolated scraping sessions without interference. +/// +/// # Examples +/// +/// ```no_run +/// use crate::scraper::webdriver::ChromeInstance; +/// +/// #[tokio::main] +/// async fn main() -> anyhow::Result<()> { +/// let instance = ChromeInstance::new().await?; +/// let client = instance.new_client().await?; +/// // Use client for scraping +/// client.close().await?; +/// Ok(()) +/// } +/// ``` +pub struct ChromeInstance { + process: Child, + url: String, +} + +impl ChromeInstance { + /// Creates a new ChromeInstance by spawning a chromedriver process on a random port. + /// + /// This function spawns chromedriver with `--port=0` to let it choose an available port, + /// reads the stdout to extract the listening URL, and returns the instance if successful. + /// + /// # Errors + /// + /// Returns an error if: + /// - chromedriver cannot be spawned (e.g., not found in PATH). + /// - Failed to read stdout or parse the listening URL within a reasonable time. + pub async fn new() -> Result { + let mut child = Command::new("chromedriver") + .arg("--port=0") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .context("Failed to spawn chromedriver process")?; + + let stdout = child.stdout.take().context("Failed to take stdout")?; + let mut reader = BufReader::new(stdout); + let mut line = String::new(); + + let start_time = std::time::Instant::now(); + let timeout = Duration::from_secs(10); + + loop { + if start_time.elapsed() > timeout { + let _ = child.kill().await; + return Err(anyhow!("Timeout waiting for chromedriver to start")); + } + + line.clear(); + if reader.read_line(&mut line).await.context("Failed to read line from stdout")? == 0 { + // EOF reached unexpectedly + let mut stderr_output = String::new(); + if let Some(mut stderr) = child.stderr.take() { + let mut stderr_reader = BufReader::new(&mut stderr); + let mut stderr_line = String::new(); + while stderr_reader.read_line(&mut stderr_line).await? > 0 { + stderr_output.push_str(&stderr_line); + stderr_line.clear(); + } + } + let _ = child.kill().await; + return Err(anyhow!("Chromedriver exited unexpectedly. Stderr: {}", stderr_output)); + } + + if let Some(url) = Self::extract_url(&line) { + return Ok(Self { + process: child, + url, + }); + } + + sleep(Duration::from_millis(100)).await; + } + } + + /// Extracts the listening URL from chromedriver's output line. + /// + /// Looks for lines like "Starting ChromeDriver ... port=XXXX" or "Listening on 127.0.0.1:XXXX". + /// Returns the full URL like "http://127.0.0.1:XXXX" if found. + fn extract_url(line: &str) -> Option { + if line.contains("Listening on") || line.contains("port=") { + // Simple regex-like parsing; adjust based on actual output + let parts: Vec<&str> = line.split_whitespace().collect(); + for part in parts { + if part.starts_with("127.0.0.1:") || part.starts_with("localhost:") { + return Some(format!("http://{}", part)); + } else if part.starts_with("port=") { + let port = part.split('=').nth(1)?; + return Some(format!("http://localhost:{}", port)); + } + } + } + None + } + + /// Creates a new fantoccini Client connected to this chromedriver instance. + /// + /// # Errors + /// + /// Returns an error if connection to the WebDriver URL fails. + pub async fn new_client(&self) -> Result { + ClientBuilder::rustls() + .connect(&self.url) + .await + .context("Failed to connect to chromedriver instance") + } +} + +impl Drop for ChromeInstance { + fn drop(&mut self) { + // Attempt to kill the process synchronously; for async, caller should handle if needed + if let Ok(status) = self.process.try_wait() { + if status.is_none() { + let _ = self.process.start_kill(); + } + } + } +} + +/// Represents a scrape task that can be executed asynchronously. +/// +/// This struct encapsulates the URL to scrape and a parse function that processes the page +/// using the provided Client. The parse function is async and returns a user-defined type T. +/// +/// # Type Parameters +/// +/// * `T` - The type of data returned by the parse function. +/// +/// # Examples +/// +/// ```no_run +/// use crate::scraper::webdriver::ScrapeTask; +/// use fantoccini::Client; +/// use anyhow::Result; +/// use std::pin::Pin; +/// +/// async fn example_parse(_client: &Client) -> Result { +/// Ok("Parsed data".to_string()) +/// } +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// let task: ScrapeTask = ScrapeTask::new( +/// "https://example.com".to_string(), +/// |client| Box::pin(example_parse(client)), +/// ); +/// let result = task.execute().await?; +/// println!("{}", result); +/// Ok(()) +/// } +/// ``` +pub struct ScrapeTask { + url: String, + parse: Box Pin> + Send + 'static>> + Send + 'static>, +} + +impl ScrapeTask { + /// Creates a new ScrapeTask with the given URL and parse function. + /// + /// The parse function takes a &Client and returns a future resolving to Result. + pub fn new( + url: String, + parse: impl FnOnce(&Client) -> Pin> + Send>> + Send + 'static, + ) -> Self { + Self { + url, + parse: Box::new(parse), + } + } + + /// Executes the scrape task by starting a new ChromeInstance, connecting a client, + /// navigating to the URL, running the parse function, and cleaning up. + /// + /// This method ensures isolation by using a dedicated chromedriver instance per task. + /// + /// # Errors + /// + /// Returns an error if: + /// - Failed to start chromedriver instance. + /// - Failed to connect client or navigate to URL. + /// - Parse function returns an error. + /// - Failed to close the client or kill the process. + pub async fn execute(self) -> Result { + let instance = ChromeInstance::new().await.context("Failed to create ChromeInstance")?; + let client = instance.new_client().await.context("Failed to create client")?; + + client.goto(&self.url).await.context("Failed to navigate to URL")?; + + // Optional: Add common prep like rejecting cookies, waiting for elements, etc. + // This can be customized per task if needed. + + let result = (self.parse)(&client).await; + + client.close().await.context("Failed to close client")?; + + // Instance drops here, killing the process + + result + } +} \ No newline at end of file