diff --git a/.gitignore b/.gitignore index 13e1600..6d7845a 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,7 @@ target/ **/*.zip **/*.log **/*.ovpn +**/*.tmp #/economic_events* #/economic_event_changes* diff --git a/check.txt b/check.txt deleted file mode 100644 index 7b36f52..0000000 Binary files a/check.txt and /dev/null differ diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index d456ee8..9cce5cc 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -7,5 +7,6 @@ pub mod helpers; pub mod aggregation; pub mod fx; pub mod openfigi; +pub mod yahoo; pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index a2c5722..db02c04 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -1,21 +1,21 @@ -// src/corporate/openfigi.rs -use super::{types::*}; +// src/corporate/openfigi.rs - STREAMING VERSION +// Key changes: Never load entire GLEIF CSV or FIGI maps into memory + use crate::util::directories::DataPaths; use crate::util::logger; - +use super::types::*; use reqwest::Client as HttpClient; use reqwest::header::{HeaderMap, HeaderValue}; use serde_json::{json, Value}; -use csv::{ReaderBuilder, StringRecord, WriterBuilder}; -use chrono::NaiveDate; use std::collections::{HashMap, HashSet}; -use std::path::{Path}; -use std::time::Instant; +use std::path::Path; +use std::io::{BufRead, BufReader}; use tokio::time::{sleep, Duration}; use tokio::fs as tokio_fs; use tokio::io::AsyncWriteExt; use anyhow::{Context, anyhow}; -use std::io::BufRead; + +const LEI_BATCH_SIZE: usize = 100; // Process 100 LEIs at a time #[derive(Clone)] pub struct OpenFigiClient { @@ -24,13 +24,6 @@ pub struct OpenFigiClient { } impl OpenFigiClient { - /// Creates a new OpenFIGI client, optionally with an API key. - /// - /// Loads the API key from the `OPENFIGI_API_KEY` environment variable if present. - /// - /// # Errors - /// - /// Returns an error if the HTTP client cannot be built or if the API key header is invalid. pub async fn new() -> anyhow::Result { let api_key = dotenvy::var("OPENFIGI_API_KEY").ok(); let has_key = api_key.is_some(); @@ -46,48 +39,12 @@ impl OpenFigiClient { } let client = builder.build().context("Failed to build HTTP client")?; - - let msg = format!( - "OpenFIGI client initialized: {}", - if has_key { "with API key" } else { "no key (limited mode)" } - ); - logger::log_info(&msg).await; + logger::log_info(&format!("OpenFIGI client: {}", + if has_key { "with API key" } else { "no key" })).await; Ok(Self { client, has_key }) } - /// Maps a batch of ISINs to FigiInfo structs, filtering for equities only. - /// - /// Batches requests according to rate limits (100 jobs/req with key, 5 without). - /// Optimizes inter-request delays to approach the rate limit without exceeding it: - /// - With key: ~240ms sleep per request (to sustain ~4 req/sec or 250 req/min). - /// - Without key: 2.4s sleep (to sustain 25 req/min). - /// Handles 429 rate limits with header-based backoff. - /// Collects detailed FigiInfo from responses, using `exchCode` as proxy for `mic_code`. - /// - /// # Arguments - /// - /// * `isins` - Slice of ISIN strings to map (deduplicated internally if needed). - /// - /// # Returns - /// - /// A vector of `FigiInfo` structs for equity instruments. - /// - /// # Errors - /// - /// Returns an error on HTTP failures, JSON parsing issues, invalid API keys, - /// or repeated rate limit violations after backoff. - /// - /// # Examples - /// - /// ```no_run - /// # use anyhow::Result; - /// # async fn example(client: &OpenFigiClient) -> Result<()> { - /// let isins = vec!["US0378331005".to_string(), "US5949181045".to_string()]; - /// let figis = client.map_isins_to_figi_infos(&isins).await?; - /// # Ok(()) - /// # } - /// ``` pub async fn map_isins_to_figi_infos(&self, isins: &[String]) -> anyhow::Result> { if isins.is_empty() { return Ok(vec![]); @@ -96,24 +53,19 @@ impl OpenFigiClient { let mut all_figi_infos = Vec::new(); let chunk_size = if self.has_key { 100 } else { 5 }; let inter_sleep = if self.has_key { - Duration::from_millis(240) // ~4.16 req/sec (250/min) + Duration::from_millis(240) } else { - Duration::from_millis(2400) // 25/min + Duration::from_millis(2400) }; - let start_time = Instant::now(); - let mut req_count = 0; - for chunk in isins.chunks(chunk_size) { let jobs: Vec = chunk.iter() .map(|isin| json!({ "idType": "ID_ISIN", "idValue": isin, - //"marketSecDes": "Equity", })) .collect(); - // Retry logic with exponential backoff for transient failures let mut retry_count = 0; let max_retries = 5; let mut backoff_ms = 1000u64; @@ -136,10 +88,8 @@ impl OpenFigiClient { return Err(anyhow!(err_msg)); } let warn_msg = format!("Transient error sending mapping request (attempt {}/{}): {}", retry_count, max_retries, e); - eprintln!("{}", warn_msg); logger::log_warn(&warn_msg).await; let retry_msg = format!(" Retrying in {}ms...", backoff_ms); - println!("{}", retry_msg); logger::log_info(&retry_msg).await; sleep(Duration::from_millis(backoff_ms)).await; backoff_ms = (backoff_ms * 2).min(60000); // Cap at 60s @@ -149,35 +99,19 @@ impl OpenFigiClient { let status = resp.status(); let headers = resp.headers().clone(); - let body = resp.text().await.context("Failed to read response body")?; + let body = resp.text().await?; - if status.is_client_error() || status.is_server_error() { - if status == 429 { - let reset_sec = headers - .get("ratelimit-reset") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(10); - let rate_msg = format!("Rate limited—backing off {}s", reset_sec); - println!("{}", rate_msg); - logger::log_warn(&rate_msg).await; - sleep(Duration::from_secs(reset_sec.max(10))).await; - continue; // Retry the same chunk - } else if status == 401 { - return Err(anyhow!("Invalid OpenFIGI API key: {}", body)); - } else if status == 413 { - return Err(anyhow!("Payload too large—reduce chunk size: {}", body)); - } else if status.is_server_error() { - // Transient server error, retry with backoff + if status == 429 { + let reset_sec = headers + .get("ratelimit-reset") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(10); + sleep(Duration::from_secs(reset_sec.max(10))).await; + continue; + } else if !status.is_success() { + if status.is_server_error() && retry_count < max_retries { retry_count += 1; - if retry_count >= max_retries { - let err_msg = format!("OpenFIGI server error {} after {} retries: {}", status, max_retries, body); - logger::log_error(&err_msg).await; - return Err(anyhow!(err_msg)); - } - let warn_msg = format!("Server error {} (attempt {}/{}), retrying in {}ms...", status, retry_count, max_retries, backoff_ms); - eprintln!("{}", warn_msg); - logger::log_warn(&warn_msg).await; sleep(Duration::from_millis(backoff_ms)).await; backoff_ms = (backoff_ms * 2).min(60000); continue; @@ -185,1282 +119,162 @@ impl OpenFigiClient { return Err(anyhow!("OpenFIGI error {}: {}", status, body)); } - let results: Vec = serde_json::from_str(&body) - .context("Failed to parse response JSON")?; + let results: Vec = serde_json::from_str(&body)?; for (isin, result) in chunk.iter().zip(results) { if let Some(data) = result["data"].as_array() { for item in data { - let sec_type = item["securityType"].as_str().unwrap_or(""); - let market_sec = item["marketSector"].as_str().unwrap_or(""); - - // Capture all security types, let caller filter by market sector if needed - let figi = match item["figi"].as_str() { - Some(f) => f.to_string(), - None => continue, - }; - - let figi_info = FigiInfo { - isin: isin.clone(), - figi, - name: item["name"].as_str().unwrap_or("").to_string(), - ticker: item["ticker"].as_str().unwrap_or("").to_string(), - exch_code: item["exchCode"].as_str().unwrap_or("").to_string(), - composite_figi: item["compositeFIGI"].as_str().unwrap_or("").to_string(), - security_type: sec_type.to_string(), - market_sector: market_sec.to_string(), - share_class_figi: item["shareClassFIGI"].as_str().unwrap_or("").to_string(), - security_type2: item["securityType2"].as_str().unwrap_or("").to_string(), - security_description: item["securityDescription"].as_str().unwrap_or("").to_string(), - }; - - all_figi_infos.push(figi_info); + if let Some(figi) = item["figi"].as_str() { + all_figi_infos.push(FigiInfo { + isin: isin.clone(), + figi: figi.to_string(), + name: item["name"].as_str().unwrap_or("").to_string(), + ticker: item["ticker"].as_str().unwrap_or("").to_string(), + exch_code: item["exchCode"].as_str().unwrap_or("").to_string(), + composite_figi: item["compositeFIGI"].as_str().unwrap_or("").to_string(), + security_type: item["securityType"].as_str().unwrap_or("").to_string(), + market_sector: item["marketSector"].as_str().unwrap_or("").to_string(), + share_class_figi: item["shareClassFIGI"].as_str().unwrap_or("").to_string(), + security_type2: item["securityType2"].as_str().unwrap_or("").to_string(), + security_description: item["securityDescription"].as_str().unwrap_or("").to_string(), + }); + } } } } - // Successfully processed this chunk, break out of retry loop break; } - req_count += 1; - if req_count % 25 == 0 { - // Optional: Enforce 6-sec window for bursts - let elapsed = start_time.elapsed(); - if self.has_key { - if elapsed < Duration::from_secs(6) { - sleep(Duration::from_secs(6) - elapsed).await; - } - } else { - if elapsed < Duration::from_secs(6) { - sleep(Duration::from_secs(60) - elapsed).await; - } - } - } else { - sleep(inter_sleep).await; - } + sleep(inter_sleep).await; } Ok(all_figi_infos) } } -/// Extracts the date from a GLEIF CSV filename in the format "isin-lei-DDMMYYYY.csv". -/// -/// # Arguments -/// -/// * `filename` - The GLEIF CSV filename (e.g., "isin-lei-24112025.csv") -/// -/// # Returns -/// -/// A string in the format "DDMMYYYY" (e.g., "24112025") if successfully parsed, otherwise the original filename. -fn extract_gleif_date_from_filename(filename: &str) -> String { - // Pattern: isin-lei-DDMMYYYY.csv - if let Some(start_idx) = filename.find("isin-lei-") { - let rest = &filename[start_idx + 9..]; // Skip "isin-lei-" - if rest.len() >= 8 && rest[0..8].chars().all(|c| c.is_numeric()) { - return rest[0..8].to_string(); - } - } - filename.to_string() -} - -/// Loads the list of market sectors from cache/openfigi/marketSecDes.json -/// -/// # Returns -/// -/// Vec of market sector strings (e.g., ["Comdty", "Corp", "Curncy", "Equity", ...]) -/// If the file doesn't exist or can't be parsed, returns a sensible default list. -async fn load_market_sectors() -> anyhow::Result> { - let dir = DataPaths::new(".")?; - let cache_file = dir.cache_openfigi_dir().join("marketSecDes.json"); - - if !cache_file.exists() { - // Return default if file doesn't exist - let warn_msg = format!("Warning: {} not found, using default sectors", cache_file.display()); - eprintln!("{}", warn_msg); - logger::log_warn(&warn_msg).await; - return Ok(vec![ - "Comdty".to_string(), - "Corp".to_string(), - "Curncy".to_string(), - "Equity".to_string(), - "Govt".to_string(), - "Index".to_string(), - "M-Mkt".to_string(), - "Mtge".to_string(), - "Muni".to_string(), - "Pfd".to_string(), - ]); - } - - let content = tokio_fs::read_to_string(&cache_file).await - .context("Failed to read marketSecDes.json")?; - - let json: Value = serde_json::from_str(&content) - .context("Failed to parse marketSecDes.json")?; - - let sectors: Vec = json["values"] - .as_array() - .ok_or_else(|| anyhow!("'values' field not found in marketSecDes.json"))? - .iter() - .filter_map(|v| v.as_str().map(|s| s.to_string())) - .collect(); - - if sectors.is_empty() { - return Err(anyhow!("No sectors found in marketSecDes.json")); - } - - let msg = format!("Loaded {} market sectors from cache", sectors.len()); - logger::log_info(&msg).await; - Ok(sectors) -} - -/// Finds the most recent GLEIF CSV file in the cache/gleif directory. -/// -/// Returns the extracted date in format "DDMMYYYY" from the filename. -/// If no GLEIF file is found, returns None. -async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result> { - // First check for subdirectories named as DDMMYYYY and pick the most recent date - let mut dir_entries = tokio_fs::read_dir(gleif_cache_dir) - .await - .context("Failed to read gleif cache directory")?; - - let mut found_dates: Vec = Vec::new(); - - while let Some(entry) = dir_entries.next_entry().await? { - let path = entry.path(); - if path.is_dir() { - if let Some(name) = path.file_name().and_then(|n| n.to_str()) { - // Expect folder name in DDMMYYYY - if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { - if let Ok(nd) = NaiveDate::parse_from_str(name, "%d%m%Y") { - found_dates.push(nd); - } - } - } - } - } - - if !found_dates.is_empty() { - found_dates.sort(); - if let Some(most_recent) = found_dates.last() { - let date_str = most_recent.format("%d%m%Y").to_string(); - let msg = format!(" Found GLEIF data dated (from subdirs): {}", date_str); - logger::log_info(&msg).await; - return Ok(Some(date_str)); - } - } - - // Fallback: look for CSV files in the directory as before - let mut entries = tokio_fs::read_dir(gleif_cache_dir) - .await - .context("Failed to read gleif cache directory")?; - let mut csv_files = Vec::new(); - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if let Some(filename) = path.file_name() { - let filename_str = filename.to_string_lossy(); - if filename_str.ends_with(".csv") && filename_str.contains("isin-lei-") { - csv_files.push(filename_str.to_string()); - } - } - } - - if csv_files.is_empty() { - return Ok(None); - } - - // Sort files in reverse order (most recent first) based on date in filename - csv_files.sort(); - csv_files.reverse(); - - let most_recent = &csv_files[0]; - let date = extract_gleif_date_from_filename(most_recent); - - let msg = format!(" Found GLEIF data dated: {}", date); - - logger::log_info(&msg).await; - Ok(Some(date)) -} - -/// Builds a LEI-to-FigiInfo map with automatic retry on transient failures. -/// -/// This is a wrapper around build_lei_to_figi_infos_internal that handles transient errors -/// by automatically retrying after a delay if the mapping process fails. The mapping can -/// resume from where it left off since already-processed LEIs are saved incrementally. -/// -/// # Arguments -/// -/// * `lei_to_isins` - HashMap of LEI to Vec (used for fetching missing entries). -/// * `gleif_date` - Optional date in format "DDMMYYYY". If None, uses the most recent GLEIF file. -/// -/// # Returns -/// -/// The complete (or partial if interrupted) HashMap>. -/// -/// # Errors -/// -/// Returns an error only on fatal errors (file I/O, invalid API key, etc.). -/// Transient errors are retried automatically. -pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap>, gleif_date: Option<&str>) -> anyhow::Result>> { - let mut retry_count = 0; - let max_retries = 3; - - loop { - match build_lei_to_figi_infos_internal(lei_to_isins, gleif_date).await { - Ok(map) => { - if !map.is_empty() { - let msg = format!("✓ LEI→FIGI mapping completed successfully with {} entries", map.len()); - - logger::log_info(&msg).await; - } - return Ok(map); - } - Err(e) => { - let error_msg = e.to_string(); - - // Check if this is a fatal error or transient - let is_fatal = error_msg.contains("Invalid OpenFIGI API key") - || error_msg.contains("No GLEIF CSV file found") - || error_msg.contains("Failed to create"); - - if is_fatal { - let err = format!("Fatal error in LEI→FIGI mapping: {}", e); - eprintln!("{}", err); - logger::log_error(&err).await; - return Err(e); - } - - retry_count += 1; - if retry_count >= max_retries { - let err = format!("LEI→FIGI mapping failed after {} retries: {}", max_retries, e); - eprintln!("{}", err); - logger::log_error(&err).await; - return Err(e); - } - - let wait_secs = 60 * retry_count; - let warn_msg = format!("Transient error in LEI→FIGI mapping (attempt {}/{}): {}", retry_count, max_retries, e); - eprintln!("{}", warn_msg); - logger::log_warn(&warn_msg).await; - let retry_msg = format!("Retrying mapping in {}s...", wait_secs); - println!("{}", retry_msg); - logger::log_info(&retry_msg).await; - sleep(Duration::from_secs(wait_secs as u64)).await; - } - } - } -} - -/// Internal implementation of LEI-to-FigiInfo mapping. -/// -/// This is the actual worker function that performs the mapping. It handles already-processed -/// LEIs gracefully but will fail on transient errors, which are caught and retried by the -/// wrapper function build_lei_to_figi_infos. -/// -/// Tracks three outcomes: -/// 1. Hit with marketSector: saved to sector-specific folder -/// 2. Hit without marketSector: saved to "uncategorized" folder -/// 3. No_hit (empty results): LEI marked for removal from GLEIF CSV -async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap>, gleif_date: Option<&str>) -> anyhow::Result>> { - let dir = DataPaths::new(".")?; - let gleif_cache_dir = dir.cache_gleif_dir(); - let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); - - // Determine the GLEIF date to use - let date = if let Some(d) = gleif_date { - let msg = format!("Using provided GLEIF date: {}", d); - logger::log_info(&msg).await; - d.to_string() - } else { - // Find the most recent GLEIF file - logger::log_info("Searching for most recent GLEIF file...").await; - match find_most_recent_gleif_date(&gleif_cache_dir).await? { - Some(d) => d, - None => { - let err = "No GLEIF CSV file found in cache/gleif directory"; - logger::log_error(err).await; - return Err(anyhow!(err)); - }, - } - }; - - // Creat date-based subdirectory in the gleif cache - let gleif_date_dir = gleif_cache_dir.join(&date); - - // Create date-based subdirectory in the mapping cache - let msg = format!("Creating date directory for: {}", date); - logger::log_info(&msg).await; - let date_dir = map_cache_dir.join(&date); - tokio_fs::create_dir_all(&date_dir).await.context("Failed to create date directory")?; - - // Load market sectors dynamically from cache - logger::log_info("Loading market sectors...").await; - let sector_dirs = load_market_sectors().await?; - let mut sector_maps: HashMap>> = HashMap::new(); - - // Create uncategorized folder - let msg = format!("Creating {} sector directories...", sector_dirs.len()); - logger::log_info(&msg).await; - let uncategorized_dir = date_dir.join("uncategorized"); - tokio_fs::create_dir_all(&uncategorized_dir).await.context("Failed to create uncategorized directory")?; - let uncategorized_path = uncategorized_dir.join("lei_to_figi.jsonl"); - let uncategorized_map = load_lei_to_figi_jsonl(&uncategorized_path).await?; - sector_maps.insert("uncategorized".to_string(), uncategorized_map); - - for sector in §or_dirs { - let sector_dir = date_dir.join(sector); - tokio_fs::create_dir_all(§or_dir).await.context("Failed to create sector directory")?; +async fn process_and_save_figi_batch( + client: &OpenFigiClient, + lei_batch: &HashMap>, + date_dir: &Path, +) -> anyhow::Result<()> { + for (lei, isins) in lei_batch { + let unique_isins: Vec<_> = isins.iter() + .cloned() + .collect::>() + .into_iter() + .collect(); - // Load existing mappings for this sector - let path = sector_dir.join("lei_to_figi.jsonl"); - let lei_map = load_lei_to_figi_jsonl(&path).await?; - sector_maps.insert(sector.clone(), lei_map); - } - - let client = OpenFigiClient::new().await?; - if !client.has_key { - let total_entries: usize = sector_maps.values().map(|m| m.len()).sum(); - let msg = format!("No API key—using partial LEI→FIGI maps with {} total entries", total_entries); + let figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; - logger::log_warn(&msg).await; - return Ok(sector_maps.get("Equity").cloned().unwrap_or_default()); - } - - // Sort LEIs for deterministic processing order - logger::log_info("Starting LEI→FIGI mapping process...").await; - let mut leis: Vec<_> = lei_to_isins.keys().cloned().collect(); - leis.sort(); - - let mut processed = sector_maps.values().map(|m| m.len()).sum::(); - let total = leis.len(); - let mut no_hit_leis = Vec::new(); // Track LEIs with no data found (no_hit) - let mut leis_to_delete_batch = Vec::new(); // Batch delete every 100 LEIs - - let msg = format!("Total LEIs to process: {}, already processed: {}", total, processed); - - logger::log_info(&msg).await; - - for lei in leis { - // Check if LEI is already processed in any sector (including uncategorized) - let mut already_processed = false; - for sector_map in sector_maps.values() { - if sector_map.contains_key(&lei) { - already_processed = true; - break; - } - } - - if already_processed { + if figi_infos.is_empty() { + // No FIGIs found - save to no_results.jsonl to avoid re-querying + append_no_result_lei(date_dir, lei, &unique_isins).await?; continue; } + + // Save FIGIs by sector as before + save_figi_infos_by_sector(lei, &figi_infos, date_dir).await?; + } + + Ok(()) +} - let isins = match lei_to_isins.get(&lei) { - Some(i) => i, - None => continue, +async fn save_figi_infos_by_sector( + lei: &str, + figi_infos: &[FigiInfo], + date_dir: &Path, +) -> anyhow::Result<()> { + let mut by_sector: HashMap> = HashMap::new(); + + for figi_info in figi_infos { + let sector = if figi_info.market_sector.is_empty() { + "uncategorized".to_string() + } else { + figi_info.market_sector.clone() }; - - let unique_isins: Vec<_> = isins.iter().cloned().collect::>().into_iter().collect(); - let debug_msg = format!("Processing LEI {} with {} ISINs...", lei, unique_isins.len()); - logger::log_info(&debug_msg).await; - - let all_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; - - // Case 1: no_hit - API succeeded but returned no data - if all_figi_infos.is_empty() { - let no_hit_msg = format!(" no_hit: LEI {} returned no FIGIs", lei); - logger::log_warn(&no_hit_msg).await; - no_hit_leis.push(lei.clone()); - leis_to_delete_batch.push(lei.clone()); - - // Delete every 100 no_hit LEIs to prevent progress loss on interrupt - if leis_to_delete_batch.len() >= 100 { - let batch_msg = format!("Batch deleting {} LEIs from GLEIF CSV...", leis_to_delete_batch.len()); - logger::log_info(&batch_msg).await; - if let Err(e) = remove_leis_batch_from_gleif_csv(&gleif_date_dir, &leis_to_delete_batch).await { - let warn_msg = format!("Warning: Failed to batch remove LEIs from GLEIF CSV: {}", e); - eprintln!("{}", warn_msg); - logger::log_warn(&warn_msg).await; - } - leis_to_delete_batch.clear(); - } - - continue; - } - - let hit_msg = format!(" hit: LEI {} found {} FIGIs", lei, all_figi_infos.len()); - logger::log_info(&hit_msg).await; - - // Organize results by marketSector - let mut figis_by_sector: HashMap> = HashMap::new(); - let mut uncategorized_figis = Vec::new(); - - for figi_info in all_figi_infos { - let sector = figi_info.market_sector.clone(); - - if sector.is_empty() { - // Case 2: Hit but no marketSecDes - save to uncategorized - uncategorized_figis.push(figi_info); - } else { - // Case 1: Hit with marketSector - organize by sector - figis_by_sector.entry(sector).or_insert_with(Vec::new).push(figi_info); - } - } - - // Save uncategorized FIGIs if any - if !uncategorized_figis.is_empty() { - uncategorized_figis.sort_by_key(|f| f.figi.clone()); - uncategorized_figis.dedup_by_key(|f| f.figi.clone()); - - append_lei_to_figi_jsonl(&uncategorized_path, &lei, &uncategorized_figis).await - .context("Failed to append to uncategorized JSONL")?; - - if let Some(uncategorized_map) = sector_maps.get_mut("uncategorized") { - uncategorized_map.insert(lei.clone(), uncategorized_figis); - } - } - - // Save to appropriate sector files - for (sector, mut figis) in figis_by_sector { - if !figis.is_empty() { - figis.sort_by_key(|f| f.figi.clone()); - figis.dedup_by_key(|f| f.figi.clone()); - - // Save to sector's JSONL file - let sector_dir = date_dir.join(§or); - let path = sector_dir.join("lei_to_figi.jsonl"); - append_lei_to_figi_jsonl(&path, &lei, &figis).await.context("Failed to append to JSONL")?; - - // Update in-memory sector map - if let Some(sector_map) = sector_maps.get_mut(§or) { - sector_map.insert(lei.clone(), figis); - } - } - } - - processed += 1; - if processed % 100 == 0 { - let totals: Vec = sector_dirs.iter().map(|s| { - let count = sector_maps.get(s).map(|m| m.len()).unwrap_or(0); - format!("{}:{}", s, count) - }).collect(); - let progress_msg = format!("Processed {}/{} LEIs → [{}] no_hit: {}", processed, total, totals.join(", "), no_hit_leis.len()); - println!("{}", progress_msg); - logger::log_info(&progress_msg).await; - } - - tokio::time::sleep(Duration::from_millis(100)).await; + by_sector.entry(sector).or_default().push(figi_info.clone()); } - // Delete any remaining LEIs in the batch - if !leis_to_delete_batch.is_empty() { - let batch_msg = format!("Final batch: Deleting {} LEIs from GLEIF CSV...", leis_to_delete_batch.len()); - logger::log_info(&batch_msg).await; - if let Err(e) = remove_leis_batch_from_gleif_csv(gleif_cache_dir, &leis_to_delete_batch).await { - let warn_msg = format!("Warning: Failed to delete final batch from GLEIF CSV: {}", e); - eprintln!("{}", warn_msg); - logger::log_warn(&warn_msg).await; - } + for (sector, figis) in by_sector { + let sector_dir = date_dir.join(§or); + let path = sector_dir.join("lei_to_figi.jsonl"); + append_lei_to_figi_jsonl(&path, lei, &figis).await?; } - // Log final summary for no_hit LEIs (they've already been removed incrementally) - if !no_hit_leis.is_empty() { - let no_hit_summary = format!("no_hit (removed in batches from GLEIF): {} LEIs", no_hit_leis.len()); - println!("{}", no_hit_summary); - logger::log_info(&no_hit_summary).await; - } - - // Return Equity sector as the main result - Ok(sector_maps.get("Equity").cloned().unwrap_or_default()) + Ok(()) } -/// Loads LEI-to-FigiInfo map from a JSON Lines file. -/// -/// Each line is expected to be a JSON object: {"lei": "ABC", "figis": [FigiInfo...]} -/// -/// # Arguments -/// -/// * `path` - Path to the .jsonl file. -/// -/// # Returns -/// -/// The loaded HashMap>. -/// -/// # Errors -/// -/// Returns an error if the file cannot be opened or if any line fails to parse as JSON. -async fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result>> { - let mut map = HashMap::new(); - - if !path.exists() { - return Ok(map); - } - - let content = tokio_fs::read_to_string(path).await.context("Failed to read JSONL file")?; - - for (line_num, line) in content.lines().enumerate() { - if line.trim().is_empty() { - continue; - } - - let entry: Value = serde_json::from_str(&line).context(format!("Failed to parse JSON on line {}", line_num + 1))?; - let lei = entry["lei"].as_str().context("Missing 'lei' field")?.to_string(); - let figis: Vec = serde_json::from_value(entry["figis"].clone()).context("Invalid 'figis' field")?; - - map.insert(lei, figis); - } - - let msg = format!("Loaded LEI→FIGI map with {} entries from {}", map.len(), path.display()); - - logger::log_info(&msg).await; - Ok(map) -} - -/// Appends a single LEI entry to the JSON Lines file. -/// -/// # Arguments -/// -/// * `path` - Path to the .jsonl file. -/// * `lei` - The LEI key. -/// * `figis` - The Vec for this LEI. -/// -/// # Errors -/// -/// Returns an error if the file cannot be opened for append or if serialization fails. async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyhow::Result<()> { let entry = json!({ "lei": lei, "figis": figis, }); - let line = serde_json::to_string(&entry).context("Failed to serialize entry")? + "\n"; + let line = serde_json::to_string(&entry)? + "\n"; let mut file = tokio_fs::OpenOptions::new() .create(true) .append(true) .open(path) - .await - .context("Failed to open JSONL file for append")?; + .await?; - file.write_all(line.as_bytes()) - .await - .context("Failed to write to JSONL file")?; - + file.write_all(line.as_bytes()).await?; Ok(()) } -/// Removes multiple invalid LEIs from the GLEIF CSV file in a single batch operation. -/// -/// This function is more efficient than removing LEIs one at a time. -/// It reads the GLEIF CSV once, filters out all specified LEIs, and overwrites the file once. -/// -/// # Arguments -/// -/// * `gleif_cache_dir` - Path to the cache/gleif directory -/// * `leis_to_remove` - Vec of LEI strings to remove -/// -/// # Returns -/// Ok(()) if successful, Err if file operations fail. -async fn remove_leis_batch_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[String]) -> anyhow::Result<()> { - if leis_to_remove.is_empty() { +/// STREAMING: Build securities without loading everything into memory +pub async fn build_securities_from_figi_streaming( + date_dir: &Path, +) -> anyhow::Result<()> { + logger::log_info("Building securities (streaming mode)...").await; + + // Load existing incrementally + let mut commons = load_from_cache_if_exists::>( + "data/corporate/by_name/common_stocks.json" + ).await?; + + let equity_file = date_dir.join("Equity").join("lei_to_figi.jsonl"); + + if !equity_file.exists() { + logger::log_warn("No Equity FIGI file found").await; return Ok(()); } - // Find the most recent GLEIF CSV file - let mut entries = tokio_fs::read_dir(gleif_cache_dir) - .await - .context("Failed to read gleif cache directory")?; + let content = tokio_fs::read_to_string(&equity_file).await?; + let mut processed = 0; + let mut stats = ProcessingStats::new(commons.len(), 0, 0); - let mut csv_files = Vec::new(); - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if let Some(filename) = path.file_name() { - let filename_str = filename.to_string_lossy(); - if filename_str.ends_with(".csv") && filename_str.contains("isin-lei-") { - csv_files.push(path); - } - } - } - - if csv_files.is_empty() { - logger::log_warn("No GLEIF CSV files found for batch removal operation").await; - return Ok(()); - } - - // Prefer an original (non-_clean) GLEIF CSV if available; otherwise use the most recent file. - csv_files.sort(); - csv_files.reverse(); - - let mut gleif_file: &std::path::PathBuf = &csv_files[0]; - // Try to find the most recent filename that does NOT end with "_clean.csv" - if let Some(non_clean) = csv_files.iter().find(|p| { - p.file_name() - .and_then(|n| n.to_str()) - .map(|s| !s.to_lowercase().ends_with("_clean.csv")) - .unwrap_or(false) - }) { - gleif_file = non_clean; - } - - // Prepare clean file path: insert "_clean" before extension - let orig_path = gleif_file; - let file_name = orig_path.file_name().and_then(|n| n.to_str()).unwrap_or("gleif.csv"); - let mut stem = orig_path.file_stem().and_then(|s| s.to_str()).unwrap_or("isin-lei").to_string(); - let parent = orig_path.parent().unwrap_or_else(|| Path::new(".")); - // Avoid creating a double "_clean_clean.csv". If stem already ends with "_clean", keep it. - if stem.to_lowercase().ends_with("_clean") { - // stem is already clean; keep same filename (no double suffix) - // e.g., stem="isin-lei-24112025_clean" -> clean_name="isin-lei-24112025_clean.csv" - } else { - stem = format!("{}_clean", stem); - } - - let clean_name = format!("{}.csv", stem); - let clean_path = parent.join(&clean_name); - - // If a clean file already exists, operate on it; otherwise read original and write clean file - let source_path = if clean_path.exists() { &clean_path } else { orig_path }; - - let debug_msg = format!("Reading GLEIF source for batch removal: {} (writing to {})", source_path.display(), clean_path.display()); - logger::log_info(&debug_msg).await; - - // Cleanup any accidental double-clean files in the same directory: if a file ends with - // "_clean_clean.csv" replace it with single "_clean.csv" or remove it if target exists. - if let Ok(mut dir_entries) = tokio_fs::read_dir(parent).await { - while let Ok(Some(entry)) = dir_entries.next_entry().await { - if let Some(name) = entry.file_name().to_str().map(|s| s.to_string()) { - if name.to_lowercase().ends_with("_clean_clean.csv") { - let offending = entry.path(); - let candidate = offending.file_name().and_then(|n| n.to_str()).unwrap_or(""); - let target_name = candidate.replacen("_clean_clean.csv", "_clean.csv", 1); - let target_path = parent.join(target_name); - - if !target_path.exists() { - // Rename offending -> target - let _ = tokio_fs::rename(&offending, &target_path).await; - let msg = format!("Renamed {} -> {}", offending.display(), target_path.display()); - logger::log_info(&msg).await; - } else { - // Target exists already; remove offending duplicate - let _ = tokio_fs::remove_file(&offending).await; - let msg = format!("Removed duplicate {}", offending.display()); - logger::log_info(&msg).await; - } - } - } - } - } - - // Read file into memory and parse with csv crate for robust handling of quoted fields - let content = tokio_fs::read_to_string(source_path) - .await - .context("Failed to read GLEIF CSV source")?; - - // Convert LEIs to remove into a HashSet (normalized) - let remove_set: std::collections::HashSet = leis_to_remove - .iter() - .map(|s| s.trim().trim_matches('"').to_uppercase()) - .collect(); - - // Build CSV reader: try with headers first; allow flexible records - let mut reader = ReaderBuilder::new() - .has_headers(true) - .flexible(true) - .from_reader(content.as_bytes()); - - // Remember headers (if present) and then iterate records. - let headers_record = match reader.headers() { - Ok(h) => Some(h.clone()), - Err(_) => None, - }; - - // We'll collect kept records and count original rows - let mut kept_records: Vec = Vec::new(); - let mut original_count: usize = 0; - let mut removed_count: usize = 0; - - // For robustness, search all columns for a matching LEI instead of relying on a single column index. - for result in reader.records() { - let record = result.context("Failed to parse CSV record")?; - original_count += 1; - - // Check every field for a match in the remove set - let mut matched = false; - for field in record.iter() { - let norm = field.trim().trim_matches('"').to_uppercase(); - if remove_set.contains(&norm) { - matched = true; - break; - } - } - - if matched { - removed_count += 1; - } else { - kept_records.push(record.clone()); - } - } - - let new_count = kept_records.len(); - - // Write back using csv writer to preserve quoting/format into clean file - let mut wtr = WriterBuilder::new().has_headers(true).from_writer(vec![]); - // If original had headers, write them back - if let Some(headers) = headers_record { - wtr.write_record(headers.iter())?; - } - - for rec in &kept_records { - wtr.write_record(rec.iter())?; - } - - let out_bytes = wtr.into_inner().context("Failed to finalize CSV writer")?; - let out_str = String::from_utf8(out_bytes).context("CSV output not valid UTF-8")?; - - tokio_fs::write(&clean_path, out_str) - .await - .context("Failed to write filtered GLEIF CSV clean file")?; - - let success_msg = format!( - "✓ Batch attempted to remove {} LEIs from GLEIF CSV (was {} records, now {} records, removed {} rows) -> {}", - leis_to_remove.len(), original_count, new_count, removed_count, clean_path.display() - ); - println!("{}", success_msg); - logger::log_info(&success_msg).await; - - Ok(()) -} - -/// Statistics tracker for processing -#[derive(Debug)] -struct ProcessingStats { - initial_companies: usize, - initial_warrants: usize, - initial_options: usize, - companies_added: usize, - companies_updated: usize, - warrants_added: usize, - warrants_updated: usize, - options_added: usize, - options_updated: usize, -} - -impl ProcessingStats { - fn new(companies: usize, warrants: usize, options: usize) -> Self { - Self { - initial_companies: companies, - initial_warrants: warrants, - initial_options: options, - companies_added: 0, - companies_updated: 0, - warrants_added: 0, - warrants_updated: 0, - options_added: 0, - options_updated: 0, - } - } - - fn print_summary(&self, final_companies: usize, final_warrants: usize, final_options: usize) { - println!("\n=== Processing Statistics ==="); - println!("Companies:"); - println!(" - Initial: {}", self.initial_companies); - println!(" - Added: {}", self.companies_added); - println!(" - Updated: {}", self.companies_updated); - println!(" - Total: {}", final_companies); - println!("Warrants:"); - println!(" - Initial: {}", self.initial_warrants); - println!(" - Added: {}", self.warrants_added); - println!(" - Updated: {}", self.warrants_updated); - println!(" - Total: {}", final_warrants); - println!("Options:"); - println!(" - Initial: {}", self.initial_options); - println!(" - Added: {}", self.options_added); - println!(" - Updated: {}", self.options_updated); - println!(" - Total: {}", final_options); - } -} - -/// Process common stocks into companies HashMap -fn process_common_stocks( - companies: &mut HashMap, - figi_infos: &[FigiInfo], - stats: &mut ProcessingStats, -) { - let name = figi_infos[0].name.clone(); - if name.is_empty() { - return; - } - - // Group by ISIN - let grouped_by_isin = group_by_isin(figi_infos); - - if let Some(existing) = companies.get_mut(&name) { - // Update existing company - let mut updated = false; - for (isin, new_figis) in grouped_by_isin { - if let Some(existing_figis) = existing.securities.get_mut(&isin) { - let merged = merge_figi_list(existing_figis, &new_figis); - if merged.len() > existing_figis.len() { - *existing_figis = merged; - updated = true; - } - } else { - existing.securities.insert(isin.clone(), new_figis); - updated = true; - } - } - - // Update primary ISIN if needed - if existing.primary_isin.is_empty() || !existing.securities.contains_key(&existing.primary_isin) { - if let Some(first_isin) = existing.securities.keys().next() { - existing.primary_isin = first_isin.clone(); - } - } - - if updated { - stats.companies_updated += 1; - } - } else { - // Add new company - let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default(); - - companies.insert(name.clone(), CompanyInfo { - name, - primary_isin, - securities: grouped_by_isin, - }); - - stats.companies_added += 1; - } -} - -/// Process warrants into warrants HashMap -fn process_warrants( - warrants: &mut HashMap>, - warrant_securities: &[FigiInfo], - stats: &mut ProcessingStats, // Assuming Stats is a struct; adjust based on actual type if it's a HashMap or other -) { - for figi in warrant_securities.iter() { - // Parse the name to extract underlying, issuer, and warrant_type - // (Assuming a parse_warrant_name function exists; this is not changed) - let (underlying, issuer, warrant_type) = parse_warrant_name(&figi.name); - - if underlying.is_empty() { + for line in content.lines() { + if line.trim().is_empty() { continue; } - - // Outer map: key by underlying - let underlying_map = warrants - .entry(underlying.clone()) - .or_insert_with(HashMap::new); - - // Inner map: key by warrant_type - let entry = underlying_map.entry(warrant_type.clone()).or_insert(WarrantInfo { - underlying_company_name: underlying.clone(), - issuer_company_name: issuer, - warrant_type: warrant_type.clone(), - warrants: HashMap::new(), - }); - - // Group by ISIN as before - entry - .warrants - .entry(figi.isin.clone()) - .or_insert_with(Vec::new) - .push(figi.clone()); - - // Update stats (assuming stats has a 'warrants' field; adjust if needed) - stats.warrants_added += 1; - } -} - -/// Process options into options HashMap -fn process_options( - options: &mut HashMap>, - option_securities: &[FigiInfo], - stats: &mut ProcessingStats, // Assuming Stats is a struct; adjust based on actual type if it's a HashMap or other -) { - for figi in option_securities.iter() { - // Parse the name to extract underlying, issuer, and option_type - // (Assuming a parse_option_name function exists; this is not changed) - let (underlying, issuer, option_type) = parse_option_name(&figi.name); - - if underlying.is_empty() { - continue; + + let entry: Value = serde_json::from_str(line)?; + let figi_infos: Vec = serde_json::from_value(entry["figis"].clone())?; + + // Process only common stocks + let common_stocks: Vec<_> = figi_infos.iter() + .filter(|f| f.security_type == "Common Stock") + .cloned() + .collect(); + + if !common_stocks.is_empty() { + process_common_stocks(&mut commons, &common_stocks, &mut stats); } - - // Outer map: key by underlying - let underlying_map = options - .entry(underlying.clone()) - .or_insert_with(HashMap::new); - - // Inner map: key by option_type - let entry = underlying_map.entry(option_type.clone()).or_insert(OptionInfo { - underlying_company_name: underlying.clone(), - issuer_company_name: issuer, - option_type: option_type.clone(), - options: HashMap::new(), - }); - - // Group by ISIN as before - entry - .options - .entry(figi.isin.clone()) - .or_insert_with(Vec::new) - .push(figi.clone()); - - // Update stats (assuming stats has an 'options' field; adjust if needed) - stats.options_added += 1; - } -} - -/// Groups FigiInfo list by ISIN -fn group_by_isin(figi_infos: &[FigiInfo]) -> HashMap> { - let mut grouped: HashMap> = HashMap::new(); - - for figi_info in figi_infos { - grouped.entry(figi_info.isin.clone()) - .or_insert_with(Vec::new) - .push(figi_info.clone()); - } - - // Sort each group by FIGI for consistency - for figis in grouped.values_mut() { - figis.sort_by(|a, b| a.figi.cmp(&b.figi)); - } - - grouped -} - -/// Merges two FigiInfo lists, deduplicating by FIGI -fn merge_figi_list(existing: &[FigiInfo], new_figis: &[FigiInfo]) -> Vec { - let mut merged = existing.to_vec(); - let existing_figis: HashSet = existing.iter() - .map(|f| f.figi.clone()) - .collect(); - - for new_figi in new_figis { - if !existing_figis.contains(&new_figi.figi) { - merged.push(new_figi.clone()); + + processed += 1; + if processed % 100 == 0 { + tokio::task::yield_now().await; } } - // Sort by FIGI for consistency - merged.sort_by(|a, b| a.figi.cmp(&b.figi)); - - merged -} - -/// Parse warrant name to extract underlying company, issuer, and warrant type -/// -/// Examples: -/// - "VONTOBE-PW26 LEONARDO SPA" -> ("LEONARDO SPA", Some("VONTOBEL"), "put") -/// - "BAYER H-CW25 L'OREAL" -> ("L'OREAL", Some("BAYER H"), "call") -/// - "APPLE INC WARRANT" -> ("APPLE INC", None, "unknown") -fn parse_warrant_name(name: &str) -> (String, Option, String) { - let name_upper = name.to_uppercase(); - - // Try to detect warrant type from code (PW=put, CW=call) - let warrant_type = if name_upper.contains("-PW") || name_upper.contains(" PW") { - "put".to_string() - } else if name_upper.contains("-CW") || name_upper.contains(" CW") { - "call".to_string() - } else { - "unknown".to_string() - }; - - // Try to split by warrant code pattern (e.g., "-PW26", "-CW25") - if let Some(pos) = name.find("-PW") { - let before = name[..pos].trim(); - let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len()); - let after = if after_idx < name.len() { - name[after_idx..].trim() - } else { - "" - }; - - return ( - after.to_string(), - if !before.is_empty() { Some(before.to_string()) } else { None }, - warrant_type, - ); - } - - if let Some(pos) = name.find("-CW") { - let before = name[..pos].trim(); - let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len()); - let after = if after_idx < name.len() { - name[after_idx..].trim() - } else { - "" - }; - - return ( - after.to_string(), - if !before.is_empty() { Some(before.to_string()) } else { None }, - warrant_type, - ); - } - - // Fallback: return entire name as underlying - (name.to_string(), None, warrant_type) -} - -/// Parse option name to extract underlying company, issuer, and option type -/// -/// Examples: -/// - "December 25 Calls on ALPHA GA" -> ("ALPHA GA", None, "call") -/// - "January 26 Puts on TESLA INC" -> ("TESLA INC", None, "put") -fn parse_option_name(name: &str) -> (String, Option, String) { - let name_upper = name.to_uppercase(); - - // Detect option type - let option_type = if name_upper.contains("CALL") { - "call".to_string() - } else if name_upper.contains("PUT") { - "put".to_string() - } else { - "unknown".to_string() - }; - - // Try to extract underlying after "on" - if let Some(pos) = name_upper.find(" ON ") { - let underlying = name[pos + 4..].trim().to_string(); - return (underlying, None, option_type); - } - - // Fallback: return entire name - (name.to_string(), None, option_type) -} - -/// Generic function to load from cache -async fn load_from_cache(path: &str) -> anyhow::Result> -where - T: serde::de::DeserializeOwned, -{ - let cache_file = Path::new(path); - - if !cache_file.exists() { - return Ok(None); - } - - let content = tokio_fs::read_to_string(cache_file).await - .context(format!("Failed to read {}", path))?; - - let data: T = serde_json::from_str(&content) - .context(format!("Failed to parse {}", path))?; - - Ok(Some(data)) -} - -/// Generic function to save to cache -async fn save_to_cache(path: &str, data: &T) -> anyhow::Result<()> -where - T: serde::Serialize, -{ - let cache_path = Path::new(path); - let cache_dir = cache_path.parent().context("Invalid cache path")?; - - tokio_fs::create_dir_all(cache_dir).await - .context(format!("Failed to create directory for {}", path))?; - - let json_str = serde_json::to_string_pretty(data) - .context("Failed to serialize data")?; - - tokio_fs::write(cache_path, json_str).await - .context(format!("Failed to write {}", path))?; - - println!(" ✓ Saved to {}", path); + logger::log_info(&format!("Processed {} FIGI entries", processed)).await; + save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?; Ok(()) } -/// Loads all OpenFIGI mapping value lists (marketSecDes, micCode, securityType). -/// -/// This function fetches the available values for each mapping parameter from the OpenFIGI API -/// and caches them as JSON files in `data/openfigi/`. If the files already exist and are recent -/// (less than 30 days old), they are reused instead of re-fetching. -/// -/// # Returns -/// Ok(()) on success. -/// -/// # Errors -/// Returns an error if API requests fail, JSON parsing fails, or file I/O fails. -pub async fn load_figi_type_lists() -> anyhow::Result<()> { - println!("Loading OpenFIGI mapping value lists..."); - - let client = OpenFigiClient::new().await?; - - // Create cache directory - let dir = DataPaths::new(".")?; - let cache_dir = dir.cache_openfigi_dir(); - tokio_fs::create_dir_all(cache_dir).await - .context("Failed to create data/openfigi directory")?; - - // Fetch each type list - get_figi_market_sec_des(&client, cache_dir).await?; - get_figi_mic_code(&client, cache_dir).await?; - get_figi_security_type(&client, cache_dir).await?; - - println!("OpenFIGI mapping value lists loaded successfully"); - Ok(()) -} - -/// Fetches and caches the list of valid marketSecDes values. -/// -/// # Arguments -/// * `client` - The OpenFIGI client instance. -/// * `cache_dir` - Directory to save the cached JSON file. -/// -/// # Returns -/// Ok(()) on success. -/// -/// # Errors -/// Returns an error if the API request fails or file I/O fails. -async fn get_figi_market_sec_des(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { - let cache_file = cache_dir.join("marketSecDes.json"); - - // Check if cache exists and is recent (< 30 days old) - if should_use_cache(&cache_file).await? { - println!(" Using cached marketSecDes values"); - return Ok(()); - } - - println!(" Fetching marketSecDes values from OpenFIGI API..."); - - let resp = client.client - .get("https://api.openfigi.com/v3/mapping/values/marketSecDes") - .send() - .await - .context("Failed to fetch marketSecDes values")?; - - handle_rate_limit(&resp).await?; - - let values: Value = resp.json().await - .context("Failed to parse marketSecDes response")?; - - // Save to cache - let json_str = serde_json::to_string_pretty(&values)?; - tokio_fs::write(&cache_file, json_str).await - .context("Failed to write marketSecDes cache")?; - - println!(" ✓ Cached marketSecDes values"); - - // Respect rate limits - sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; - - Ok(()) -} - -/// Fetches and caches the list of valid micCode values. -/// -/// # Arguments -/// * `client` - The OpenFIGI client instance. -/// * `cache_dir` - Directory to save the cached JSON file. -/// -/// # Returns -/// Ok(()) on success. -/// -/// # Errors -/// Returns an error if the API request fails or file I/O fails. -async fn get_figi_mic_code(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { - let cache_file = cache_dir.join("micCode.json"); - - if should_use_cache(&cache_file).await? { - println!(" Using cached micCode values"); - return Ok(()); - } - - println!(" Fetching micCode values from OpenFIGI API..."); - - let resp = client.client - .get("https://api.openfigi.com/v3/mapping/values/micCode") - .send() - .await - .context("Failed to fetch micCode values")?; - - handle_rate_limit(&resp).await?; - - let values: Value = resp.json().await - .context("Failed to parse micCode response")?; - - let json_str = serde_json::to_string_pretty(&values)?; - tokio_fs::write(&cache_file, json_str).await - .context("Failed to write micCode cache")?; - - println!(" ✓ Cached micCode values"); - - sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; - - Ok(()) -} - -/// Fetches and caches the list of valid securityType values. -/// -/// # Arguments -/// * `client` - The OpenFIGI client instance. -/// * `cache_dir` - Directory to save the cached JSON file. -/// -/// # Returns -/// Ok(()) on success. -/// -/// # Errors -/// Returns an error if the API request fails or file I/O fails. -async fn get_figi_security_type(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { - let cache_file = cache_dir.join("securityType.json"); - - if should_use_cache(&cache_file).await? { - println!(" Using cached securityType values"); - return Ok(()); - } - - println!(" Fetching securityType values from OpenFIGI API..."); - - let resp = client.client - .get("https://api.openfigi.com/v3/mapping/values/securityType") - .send() - .await - .context("Failed to fetch securityType values")?; - - handle_rate_limit(&resp).await?; - - let values: Value = resp.json().await - .context("Failed to parse securityType response")?; - - let json_str = serde_json::to_string_pretty(&values)?; - tokio_fs::write(&cache_file, json_str).await - .context("Failed to write securityType cache")?; - - println!(" ✓ Cached securityType values"); - - sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; - - Ok(()) -} - -/// Checks if a cache file exists and is less than 30 days old. -/// -/// # Arguments -/// * `path` - Path to the cache file. -/// -/// # Returns -/// True if the cache should be used, false if it needs refreshing. -async fn should_use_cache(path: &Path) -> anyhow::Result { - if !path.exists() { - return Ok(false); - } - - let metadata = tokio_fs::metadata(path).await?; - let modified = metadata.modified()?; - let age = modified.elapsed().unwrap_or(std::time::Duration::from_secs(u64::MAX)); - - // Cache is valid for 30 days - Ok(age < std::time::Duration::from_secs(30 * 24 * 60 * 60)) -} - /// Handles rate limit responses from the OpenFIGI API. /// /// If a 429 status is received, this function sleeps for the duration specified @@ -1485,7 +299,7 @@ async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> { .and_then(|s| s.parse::().ok()) .unwrap_or(10); - println!(" Rate limited—waiting {}s", reset_sec); + logger::log_info(&format!(" Rate limited—waiting {}s", reset_sec)).await; sleep(std::time::Duration::from_secs(reset_sec.max(10))).await; return Err(anyhow!("Rate limited, please retry")); @@ -1496,59 +310,654 @@ async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> { Ok(()) } -pub async fn stream_gleif_csv( - csv_path: &str, - mut callback: F -) -> anyhow::Result -where - F: FnMut(String, String) -> anyhow::Result<()>, -{ - logger::log_info(&format!("Streaming GLEIF CSV: {}", csv_path)).await; +fn process_common_stocks( + companies: &mut HashMap, + figi_infos: &[FigiInfo], + stats: &mut ProcessingStats, +) { + let name = figi_infos[0].name.clone(); + if name.is_empty() { + return; + } - let file = std::fs::File::open(csv_path) - .context("Failed to open GLEIF CSV")?; + let grouped_by_isin = group_by_isin(figi_infos); - let reader = std::io::BufReader::new(file); - let mut count = 0; - - for (idx, line) in reader.lines().enumerate() { - let line = line.context("Failed to read line")?; + if let Some(existing) = companies.get_mut(&name) { + let mut updated = false; + for (isin, new_figis) in grouped_by_isin { + if let Some(existing_figis) = existing.securities.get_mut(&isin) { + let merged = merge_figi_list(existing_figis, &new_figis); + if merged.len() > existing_figis.len() { + *existing_figis = merged; + updated = true; + } + } else { + existing.securities.insert(isin.clone(), new_figis); + updated = true; + } + } - // Skip header - if idx == 0 { + if existing.primary_isin.is_empty() { + if let Some(first_isin) = existing.securities.keys().next() { + existing.primary_isin = first_isin.clone(); + } + } + + if updated { + stats.companies_updated += 1; + } + } else { + let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default(); + + companies.insert(name.clone(), CompanyInfo { + name, + primary_isin, + securities: grouped_by_isin, + }); + + stats.companies_added += 1; + } +} + +fn group_by_isin(figi_infos: &[FigiInfo]) -> HashMap> { + let mut grouped: HashMap> = HashMap::new(); + + for figi_info in figi_infos { + grouped.entry(figi_info.isin.clone()) + .or_insert_with(Vec::new) + .push(figi_info.clone()); + } + + for figis in grouped.values_mut() { + figis.sort_by(|a, b| a.figi.cmp(&b.figi)); + } + + grouped +} + +fn merge_figi_list(existing: &[FigiInfo], new_figis: &[FigiInfo]) -> Vec { + let mut merged = existing.to_vec(); + let existing_figis: HashSet = existing.iter() + .map(|f| f.figi.clone()) + .collect(); + + for new_figi in new_figis { + if !existing_figis.contains(&new_figi.figi) { + merged.push(new_figi.clone()); + } + } + + merged.sort_by(|a, b| a.figi.cmp(&b.figi)); + merged +} + +#[derive(Debug)] +struct ProcessingStats { + initial_companies: usize, + companies_added: usize, + companies_updated: usize, +} + +impl ProcessingStats { + fn new(companies: usize, _warrants: usize, _options: usize) -> Self { + Self { + initial_companies: companies, + companies_added: 0, + companies_updated: 0, + } + } +} + +async fn load_from_cache_if_exists(path: &str) -> anyhow::Result +where + T: serde::de::DeserializeOwned + Default, +{ + let cache_file = Path::new(path); + + if !cache_file.exists() { + return Ok(T::default()); + } + + let content = tokio_fs::read_to_string(cache_file).await?; + Ok(serde_json::from_str(&content)?) +} + +async fn save_to_cache(path: &str, data: &T) -> anyhow::Result<()> +where + T: serde::Serialize, +{ + let cache_path = Path::new(path); + let cache_dir = cache_path.parent().context("Invalid path")?; + + tokio_fs::create_dir_all(cache_dir).await?; + let json_str = serde_json::to_string_pretty(data)?; + tokio_fs::write(cache_path, json_str).await?; + + Ok(()) +} + +async fn load_market_sectors() -> anyhow::Result> { + let dir = DataPaths::new(".")?; + let cache_file = dir.cache_openfigi_dir().join("marketSecDes.json"); + + if !cache_file.exists() { + return Ok(vec![ + "Comdty".to_string(), + "Corp".to_string(), + "Equity".to_string(), + "Govt".to_string(), + ]); + } + + let content = tokio_fs::read_to_string(&cache_file).await?; + let json: Value = serde_json::from_str(&content)?; + + let sectors: Vec = json["values"] + .as_array() + .ok_or_else(|| anyhow!("No values"))? + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect(); + + Ok(sectors) +} + +async fn determine_gleif_date( + gleif_date: Option<&str>, + paths: &DataPaths, +) -> anyhow::Result { + if let Some(d) = gleif_date { + return Ok(d.to_string()); + } + + let gleif_dir = paths.cache_gleif_dir(); + let mut entries = tokio_fs::read_dir(gleif_dir).await?; + let mut dates = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { + dates.push(name.to_string()); + } + } + } + } + + dates.sort(); + dates.last().cloned().ok_or_else(|| anyhow!("No GLEIF date found")) +} + +async fn setup_sector_directories( + date_dir: &Path, + sector_dirs: &[String], +) -> anyhow::Result<()> { + let uncategorized_dir = date_dir.join("uncategorized"); + tokio_fs::create_dir_all(&uncategorized_dir).await?; + + for sector in sector_dirs { + let sector_dir = date_dir.join(sector); + tokio_fs::create_dir_all(§or_dir).await?; + } + + Ok(()) +} + +/// Loads all OpenFIGI mapping value lists (marketSecDes, micCode, securityType). +/// +/// This function fetches the available values for each mapping parameter from the OpenFIGI API +/// and caches them as JSON files in `data/openfigi/`. If the files already exist and are recent +/// (less than 30 days old), they are reused instead of re-fetching. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if API requests fail, JSON parsing fails, or file I/O fails. +pub async fn load_figi_type_lists() -> anyhow::Result<()> { + logger::log_info("Loading OpenFIGI mapping value lists...").await; + + let client = OpenFigiClient::new().await?; + + // Create cache directory + let dir = DataPaths::new(".")?; + let cache_dir = dir.cache_openfigi_dir(); + tokio_fs::create_dir_all(cache_dir).await + .context("Failed to create data/openfigi directory")?; + + // Fetch each type list + get_figi_market_sec_des(&client, cache_dir).await?; + get_figi_mic_code(&client, cache_dir).await?; + get_figi_security_type(&client, cache_dir).await?; + + logger::log_info("OpenFIGI mapping value lists loaded successfully").await; + Ok(()) +} + +/// Fetches and caches the list of valid marketSecDes values. +/// +/// # Arguments +/// * `client` - The OpenFIGI client instance. +/// * `cache_dir` - Directory to save the cached JSON file. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if the API request fails or file I/O fails. +async fn get_figi_market_sec_des(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { + let cache_file = cache_dir.join("marketSecDes.json"); + + // Check if cache exists and is recent (< 30 days old) + if should_use_cache(&cache_file).await? { + logger::log_info(" Using cached marketSecDes values").await; + return Ok(()); + } + + logger::log_info(" Fetching marketSecDes values from OpenFIGI API...").await; + + let resp = client.client + .get("https://api.openfigi.com/v3/mapping/values/marketSecDes") + .send() + .await + .context("Failed to fetch marketSecDes values")?; + + handle_rate_limit(&resp).await?; + + let values: Value = resp.json().await + .context("Failed to parse marketSecDes response")?; + + // Save to cache + let json_str = serde_json::to_string_pretty(&values)?; + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write marketSecDes cache")?; + + logger::log_info(" ✓ Cached marketSecDes values").await; + + // Respect rate limits + sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; + + Ok(()) +} + +/// Fetches and caches the list of valid micCode values. +/// +/// # Arguments +/// * `client` - The OpenFIGI client instance. +/// * `cache_dir` - Directory to save the cached JSON file. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if the API request fails or file I/O fails. +async fn get_figi_mic_code(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { + let cache_file = cache_dir.join("micCode.json"); + + if should_use_cache(&cache_file).await? { + logger::log_info(" Using cached micCode values").await; + return Ok(()); + } + + logger::log_info(" Fetching micCode values from OpenFIGI API...").await; + + let resp = client.client + .get("https://api.openfigi.com/v3/mapping/values/micCode") + .send() + .await + .context("Failed to fetch micCode values")?; + + handle_rate_limit(&resp).await?; + + let values: Value = resp.json().await + .context("Failed to parse micCode response")?; + + let json_str = serde_json::to_string_pretty(&values)?; + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write micCode cache")?; + + logger::log_info(" ✓ Cached micCode values").await; + + sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; + + Ok(()) +} + +/// Checks if a cache file exists and is less than 30 days old. +/// +/// # Arguments +/// * `path` - Path to the cache file. +/// +/// # Returns +/// True if the cache should be used, false if it needs refreshing. +async fn should_use_cache(path: &Path) -> anyhow::Result { + if !path.exists() { + return Ok(false); + } + + let metadata = tokio_fs::metadata(path).await?; + let modified = metadata.modified()?; + let age = modified.elapsed().unwrap_or(std::time::Duration::from_secs(u64::MAX)); + + // Cache is valid for 30 days + Ok(age < std::time::Duration::from_secs(30 * 24 * 60 * 60)) +} + +/// Fetches and caches the list of valid securityType values. +/// +/// # Arguments +/// * `client` - The OpenFIGI client instance. +/// * `cache_dir` - Directory to save the cached JSON file. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if the API request fails or file I/O fails. +async fn get_figi_security_type(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { + let cache_file = cache_dir.join("securityType.json"); + + if should_use_cache(&cache_file).await? { + logger::log_info(" Using cached securityType values").await; + return Ok(()); + } + + logger::log_info(" Fetching securityType values from OpenFIGI API...").await; + + let resp = client.client + .get("https://api.openfigi.com/v3/mapping/values/securityType") + .send() + .await + .context("Failed to fetch securityType values")?; + + handle_rate_limit(&resp).await?; + + let values: Value = resp.json().await + .context("Failed to parse securityType response")?; + + let json_str = serde_json::to_string_pretty(&values)?; + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write securityType cache")?; + + logger::log_info(" ✓ Cached securityType values").await; + + sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; + + Ok(()) +} + +#[derive(Debug)] +pub struct MappingStats { + pub total_leis: usize, + pub mapped_leis: usize, + pub no_result_leis: usize, + pub unqueried_leis: usize, + pub mapping_percentage: f64, + pub queried_percentage: f64, + pub by_sector: HashMap, +} + +/// Get detailed statistics about LEI-FIGI mapping status +pub async fn get_mapping_stats( + csv_path: &str, + gleif_date: Option<&str>, +) -> anyhow::Result { + let dir = DataPaths::new(".")?; + let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); + + let date = determine_gleif_date(gleif_date, &dir).await?; + let date_dir = map_cache_dir.join(&date); + + let all_leis = get_all_leis_from_gleif(csv_path).await?; + let mapped_leis = load_existing_mapped_leis(&date_dir).await?; + let no_result_leis = load_no_result_leis(&date_dir).await?; + + let total = all_leis.len(); + let mapped = mapped_leis.len(); + let no_results = no_result_leis.len(); + let queried = mapped + no_results; + let unqueried = total.saturating_sub(queried); + + let mapping_percentage = if total > 0 { + (mapped as f64 / total as f64) * 100.0 + } else { + 0.0 + }; + + let queried_percentage = if total > 0 { + (queried as f64 / total as f64) * 100.0 + } else { + 0.0 + }; + + // Count by sector + let mut by_sector = HashMap::new(); + + if date_dir.exists() { + let mut entries = tokio_fs::read_dir(&date_dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let sector_path = entry.path(); + if !sector_path.is_dir() { + continue; + } + + let sector_name = sector_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + .to_string(); + + let jsonl_path = sector_path.join("lei_to_figi.jsonl"); + if !jsonl_path.exists() { + continue; + } + + let content = tokio_fs::read_to_string(&jsonl_path).await?; + let count = content.lines().filter(|l| !l.trim().is_empty()).count(); + by_sector.insert(sector_name, count); + } + } + + Ok(MappingStats { + total_leis: total, + mapped_leis: mapped, + no_result_leis: no_results, + unqueried_leis: unqueried, + mapping_percentage, + queried_percentage, + by_sector, + }) +} + +/// Print mapping statistics to console and logs +pub async fn print_mapping_stats(csv_path: &str) -> anyhow::Result<()> { + logger::log_info("=== LEI-FIGI Mapping Status ===").await; + + let stats = get_mapping_stats(csv_path, None).await?; + + logger::log_info(&format!( + "Total LEIs: {}", + stats.total_leis + )).await; + + logger::log_info(&format!( + "├─ Mapped (with FIGI): {} ({:.2}%)", + stats.mapped_leis, + stats.mapping_percentage + )).await; + + logger::log_info(&format!( + "├─ No Results (queried, no FIGI): {} ({:.2}%)", + stats.no_result_leis, + (stats.no_result_leis as f64 / stats.total_leis as f64) * 100.0 + )).await; + + logger::log_info(&format!( + "└─ Not Queried Yet: {} ({:.2}%)", + stats.unqueried_leis, + (stats.unqueried_leis as f64 / stats.total_leis as f64) * 100.0 + )).await; + + logger::log_info(&format!( + "\nQuery Coverage: {:.2}% ({} / {})", + stats.queried_percentage, + stats.mapped_leis + stats.no_result_leis, + stats.total_leis + )).await; + + if !stats.by_sector.is_empty() { + logger::log_info("\nMapped LEIs by sector:").await; + let mut sectors: Vec<_> = stats.by_sector.iter().collect(); + sectors.sort_by(|a, b| b.1.cmp(a.1)); // Sort by count descending + + for (sector, count) in sectors { + logger::log_info(&format!(" {}: {}", sector, count)).await; + } + } + + logger::log_info("==============================").await; + + Ok(()) +} + +/// Quick check if mapping is complete (returns true if all mapped) +pub async fn is_mapping_complete(csv_path: &str) -> anyhow::Result { + let dir = DataPaths::new(".")?; + let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); + + let date = determine_gleif_date(None, &dir).await?; + let date_dir = map_cache_dir.join(&date); + + let unmapped = get_unmapped_leis(csv_path, &date_dir).await?; + Ok(unmapped.is_empty()) +} + +/// Load all LEIs that have already been mapped from existing JSONL files +async fn load_existing_mapped_leis(date_dir: &Path) -> anyhow::Result> { + let mut mapped_leis = HashSet::new(); + + if !date_dir.exists() { + return Ok(mapped_leis); + } + + // Read all sector directories + let mut entries = tokio_fs::read_dir(date_dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let sector_path = entry.path(); + if !sector_path.is_dir() { continue; } - // Parse CSV line + let jsonl_path = sector_path.join("lei_to_figi.jsonl"); + if !jsonl_path.exists() { + continue; + } + + // Read JSONL file line by line + let content = tokio_fs::read_to_string(&jsonl_path).await?; + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + + if let Ok(entry) = serde_json::from_str::(line) { + if let Some(lei) = entry["lei"].as_str() { + mapped_leis.insert(lei.to_string()); + } + } + } + } + + if !mapped_leis.is_empty() { + logger::log_info(&format!("Found {} already mapped LEIs", mapped_leis.len())).await; + } + + Ok(mapped_leis) +} + +/// Read GLEIF CSV and return all LEIs (without loading entire file into memory) +async fn get_all_leis_from_gleif(csv_path: &str) -> anyhow::Result> { + let file = std::fs::File::open(csv_path)?; + let reader = BufReader::new(file); + + let mut all_leis = HashSet::new(); + + for (idx, line) in reader.lines().enumerate() { + if idx == 0 { + continue; // Skip header + } + + let line = line?; let parts: Vec<&str> = line.split(',').collect(); + if parts.len() < 2 { continue; } let lei = parts[0].trim().trim_matches('"').to_string(); - let isin = parts[1].trim().trim_matches('"').to_string(); - if !lei.is_empty() && !isin.is_empty() { - callback(lei, isin)?; - count += 1; - } - - // Yield periodically - if count % 10000 == 0 { - tokio::task::yield_now().await; + if !lei.is_empty() { + all_leis.insert(lei); } } - logger::log_info(&format!("Streamed {} LEI-ISIN pairs", count)).await; - Ok(count) + logger::log_info(&format!("Found {} total LEIs in GLEIF CSV", all_leis.len())).await; + Ok(all_leis) } -/// Process FIGI mappings in batches instead of all at once -pub async fn process_figi_mappings_streaming( - lei_to_isins_stream: impl Iterator)>, +/// Get unmapped LEIs by comparing GLEIF CSV with existing mappings +async fn get_unmapped_leis( + csv_path: &str, + date_dir: &Path, +) -> anyhow::Result> { + let all_leis = get_all_leis_from_gleif(csv_path).await?; + let mapped_leis = load_existing_mapped_leis(date_dir).await?; + let no_result_leis = load_no_result_leis(date_dir).await?; + + // Calculate truly unmapped: all - (mapped + no_results) + let queried_leis: HashSet = mapped_leis + .union(&no_result_leis) + .cloned() + .collect(); + + let unmapped: HashSet = all_leis + .difference(&queried_leis) + .cloned() + .collect(); + + let total = all_leis.len(); + let mapped = mapped_leis.len(); + let no_results = no_result_leis.len(); + let unqueried = unmapped.len(); + + logger::log_info(&format!( + "LEI Status: Total={}, Mapped={}, No Results={}, Unqueried={}", + total, mapped, no_results, unqueried + )).await; + + Ok(unmapped) +} + +/// Modified version that only processes specified LEIs +pub async fn stream_gleif_csv_and_build_figi_filtered( + csv_path: &str, gleif_date: Option<&str>, - batch_size: usize, + filter_leis: Option<&HashSet>, ) -> anyhow::Result<()> { + logger::log_info(&format!("Streaming GLEIF CSV: {}", csv_path)).await; + + let file = std::fs::File::open(csv_path)?; + let reader = BufReader::new(file); + + let client = OpenFigiClient::new().await?; + if !client.has_key { + logger::log_warn("No API key - skipping FIGI mapping").await; + return Ok(()); + } + let dir = DataPaths::new(".")?; let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); @@ -1556,236 +965,158 @@ pub async fn process_figi_mappings_streaming( let date_dir = map_cache_dir.join(&date); tokio_fs::create_dir_all(&date_dir).await?; - // Setup sector directories let sector_dirs = load_market_sectors().await?; setup_sector_directories(&date_dir, §or_dirs).await?; - let client = OpenFigiClient::new().await?; - if !client.has_key { - logger::log_warn("No API key - limited FIGI mapping").await; - return Ok(()); - } + let mut lei_batch: HashMap> = HashMap::new(); + let mut line_count = 0; + let mut processed_leis = 0; + let mut skipped_leis = 0; - // Process in batches - let mut batch = Vec::new(); - let mut processed = 0; - - for (lei, isins) in lei_to_isins_stream { - batch.push((lei, isins)); + for (idx, line) in reader.lines().enumerate() { + let line = line?; - if batch.len() >= batch_size { - process_figi_batch(&client, &batch, &date_dir, §or_dirs).await?; - processed += batch.len(); + if idx == 0 { continue; } + + let parts: Vec<&str> = line.split(',').collect(); + if parts.len() < 2 { continue; } + + let lei = parts[0].trim().trim_matches('"').to_string(); + let isin = parts[1].trim().trim_matches('"').to_string(); + + if lei.is_empty() || isin.is_empty() { + continue; + } + + // Apply filter if provided + if let Some(filter) = filter_leis { + if !filter.contains(&lei) { + skipped_leis += 1; + continue; + } + } + + lei_batch.entry(lei).or_default().push(isin); + line_count += 1; + + // Process batch when full + if lei_batch.len() >= LEI_BATCH_SIZE { + process_and_save_figi_batch(&client, &lei_batch, &date_dir).await?; + processed_leis += lei_batch.len(); - logger::log_info(&format!("Processed {} LEIs so far...", processed)).await; - batch.clear(); + if processed_leis % 1000 == 0 { + logger::log_info(&format!("Queried {} LEIs...", processed_leis)).await; + } - // Yield to prevent blocking + lei_batch.clear(); tokio::task::yield_now().await; } } // Process remaining - if !batch.is_empty() { - process_figi_batch(&client, &batch, &date_dir, §or_dirs).await?; - processed += batch.len(); + if !lei_batch.is_empty() { + process_and_save_figi_batch(&client, &lei_batch, &date_dir).await?; + processed_leis += lei_batch.len(); } - logger::log_info(&format!("Total processed: {} LEIs", processed)).await; + logger::log_info(&format!( + "✓ Queried {} LEIs, skipped {} already processed", + processed_leis, + skipped_leis + )).await; + Ok(()) } -async fn process_figi_batch( - client: &OpenFigiClient, - batch: &[(String, Vec)], - date_dir: &Path, - sector_dirs: &[String], -) -> anyhow::Result<()> { - for (lei, isins) in batch { - let unique_isins: Vec<_> = isins.iter() - .cloned() - .collect::>() - .into_iter() - .collect(); - - let figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; - - if figi_infos.is_empty() { +/// Check mapping completion and process only unmapped LEIs +pub async fn ensure_all_leis_mapped( + csv_path: &str, + gleif_date: Option<&str>, +) -> anyhow::Result { + let dir = DataPaths::new(".")?; + let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); + + let date = determine_gleif_date(gleif_date, &dir).await?; + let date_dir = map_cache_dir.join(&date); + + // Get unmapped LEIs (excludes both mapped and no-result LEIs) + let unmapped = get_unmapped_leis(csv_path, &date_dir).await?; + + if unmapped.is_empty() { + logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await; + return Ok(true); + } + + logger::log_info(&format!("Found {} LEIs that need querying - starting mapping...", unmapped.len())).await; + + // Process only unmapped LEIs + stream_gleif_csv_and_build_figi_filtered(csv_path, gleif_date, Some(&unmapped)).await?; + + // Verify completion + let still_unmapped = get_unmapped_leis(csv_path, &date_dir).await?; + + if still_unmapped.is_empty() { + logger::log_info("✓ All LEIs successfully queried").await; + Ok(true) + } else { + logger::log_warn(&format!( + "⚠ {} LEIs still unqueried (API errors or rate limits)", + still_unmapped.len() + )).await; + Ok(false) + } +} + +/// Load LEIs that were queried but returned no results +async fn load_no_result_leis(date_dir: &Path) -> anyhow::Result> { + let mut no_result_leis = HashSet::new(); + + let no_results_path = date_dir.join("no_results.jsonl"); + if !no_results_path.exists() { + return Ok(no_result_leis); + } + + let content = tokio_fs::read_to_string(&no_results_path).await?; + for line in content.lines() { + if line.trim().is_empty() { continue; } - // Save to appropriate sector files - save_figi_infos_by_sector(lei, &figi_infos, date_dir, sector_dirs).await?; - } - - Ok(()) -} - -async fn save_figi_infos_by_sector( - lei: &str, - figi_infos: &[FigiInfo], - date_dir: &Path, - _sector_dirs: &[String], -) -> anyhow::Result<()> { - let mut by_sector: HashMap> = HashMap::new(); - - for figi_info in figi_infos { - let sector = if figi_info.market_sector.is_empty() { - "uncategorized".to_string() - } else { - figi_info.market_sector.clone() - }; - - by_sector.entry(sector).or_default().push(figi_info.clone()); - } - - // Save to sector files - for (sector, figis) in by_sector { - let sector_dir = date_dir.join(§or); - let path = sector_dir.join("lei_to_figi.jsonl"); - append_lei_to_figi_jsonl(&path, lei, &figis).await?; - } - - Ok(()) -} - -/// Modified load_or_build_all_securities to process in streaming fashion -pub async fn load_or_build_all_securities_streaming( - date_dir: &Path, -) -> anyhow::Result<( - HashMap, - HashMap>, - HashMap> -)> { - let mut commons = HashMap::new(); - let mut warrants = HashMap::new(); - let mut options = HashMap::new(); - - // Load existing data - commons = load_from_cache("data/corporate/by_name/common_stocks.json") - .await? - .unwrap_or_default(); - warrants = load_from_cache("data/corporate/by_name/warrants.json") - .await? - .unwrap_or_default(); - options = load_from_cache("data/corporate/by_name/options.json") - .await? - .unwrap_or_default(); - - println!("Loaded existing data:"); - println!(" - Companies: {}", commons.len()); - println!(" - Warrants: {}", warrants.len()); - println!(" - Options: {}", options.len()); - - let mut stats = ProcessingStats::new(commons.len(), warrants.len(), options.len()); - - // Stream through JSONL files in date_dir - let equity_file = date_dir.join("Equity").join("lei_to_figi.jsonl"); - - if equity_file.exists() { - logger::log_info(&format!("Streaming FIGIs from {:?}", equity_file)).await; - - let content = tokio_fs::read_to_string(&equity_file).await?; - let mut processed = 0; - - for line in content.lines() { - if line.trim().is_empty() { - continue; - } - - let entry: serde_json::Value = serde_json::from_str(line)?; - let _lei = entry["lei"].as_str().unwrap_or(""); - let figi_infos: Vec = serde_json::from_value( - entry["figis"].clone() - )?; - - // Process this batch - process_figi_infos_batch( - &figi_infos, - &mut commons, - &mut warrants, - &mut options, - &mut stats - ); - - processed += 1; - if processed % 100 == 0 { - tokio::task::yield_now().await; + if let Ok(entry) = serde_json::from_str::(line) { + if let Some(lei) = entry["lei"].as_str() { + no_result_leis.insert(lei.to_string()); } } } - stats.print_summary(commons.len(), warrants.len(), options.len()); + if !no_result_leis.is_empty() { + logger::log_info(&format!( + "Found {} LEIs previously queried with no FIGI results", + no_result_leis.len() + )).await; + } - // Save incrementally - save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?; - save_to_cache("data/corporate/by_name/warrants.json", &warrants).await?; - save_to_cache("data/corporate/by_name/options.json", &options).await?; - - Ok((commons, warrants, options)) + Ok(no_result_leis) } -fn process_figi_infos_batch( - figi_infos: &[FigiInfo], - commons: &mut HashMap, - warrants: &mut HashMap>, - options: &mut HashMap>, - stats: &mut ProcessingStats, -) { - let mut common_stocks = Vec::new(); - let mut warrant_securities = Vec::new(); - let mut option_securities = Vec::new(); +/// Save LEI that was queried but returned no results +async fn append_no_result_lei(date_dir: &Path, lei: &str, isins: &[String]) -> anyhow::Result<()> { + let no_results_path = date_dir.join("no_results.jsonl"); - for figi_info in figi_infos { - match figi_info.security_type.as_str() { - "Common Stock" => common_stocks.push(figi_info.clone()), - "Equity WRT" => warrant_securities.push(figi_info.clone()), - "Equity Option" => option_securities.push(figi_info.clone()), - _ => {} - } - } - - if !common_stocks.is_empty() { - process_common_stocks(commons, &common_stocks, stats); - } - - if !warrant_securities.is_empty() { - process_warrants(warrants, &warrant_securities, stats); - } - - if !option_securities.is_empty() { - process_options(options, &option_securities, stats); - } -} + let entry = json!({ + "lei": lei, + "isins": isins, + "queried_at": chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), + }); -// Helper functions -async fn determine_gleif_date( - gleif_date: Option<&str>, - paths: &DataPaths, -) -> anyhow::Result { - if let Some(d) = gleif_date { - Ok(d.to_string()) - } else { - match find_most_recent_gleif_date(paths.cache_gleif_dir()).await? { - Some(d) => Ok(d), - None => Err(anyhow!("No GLEIF CSV file found")), - } - } -} - -async fn setup_sector_directories( - date_dir: &Path, - sector_dirs: &[String], -) -> anyhow::Result<()> { - // Create uncategorized folder - let uncategorized_dir = date_dir.join("uncategorized"); - tokio_fs::create_dir_all(&uncategorized_dir).await?; + let line = serde_json::to_string(&entry)? + "\n"; - // Create sector folders - for sector in sector_dirs { - let sector_dir = date_dir.join(sector); - tokio_fs::create_dir_all(§or_dir).await?; - } + let mut file = tokio_fs::OpenOptions::new() + .create(true) + .append(true) + .open(&no_results_path) + .await?; + file.write_all(line.as_bytes()).await?; Ok(()) } \ No newline at end of file diff --git a/src/corporate/scraper.rs b/src/corporate/scraper.rs index aa163dd..b34d681 100644 --- a/src/corporate/scraper.rs +++ b/src/corporate/scraper.rs @@ -1,318 +1,19 @@ // src/corporate/scraper.rs -use super::{types::*, helpers::*, openfigi::*}; +use super::{types::*}; //use crate::corporate::openfigi::OpenFigiClient; use crate::{scraper::webdriver::*, util::directories::DataPaths, util::logger}; -use fantoccini::{Client, Locator}; +use fantoccini::{Client}; use scraper::{Html, Selector}; use chrono::{DateTime, Duration, NaiveDate, Utc}; use tokio::{time::{Duration as TokioDuration, sleep}}; use reqwest::Client as HttpClient; use serde_json::{json, Value}; use zip::ZipArchive; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap}; use std::io::{Read}; -use anyhow::{anyhow, Result}; const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"; -/// Check if a ticker exists on Yahoo Finance and return core metadata. -/// -/// This function calls the public Yahoo Finance quoteSummary endpoint and extracts: -/// - ISIN (when available) -/// - Company name -/// - Exchange MIC code -/// - Trading currency -/// -/// It strictly filters to only accept **equity** securities. -/// -/// # Arguments -/// * `ticker` - The ticker symbol to validate (e.g., "AAPL", "7203.T", "BMW.DE") -/// -/// # Returns -/// `Ok(PrimaryInfo)` on success, `Err` if ticker doesn't exist, is not equity, or data is malformed. -/// -/// # Errors -/// - Ticker not found -/// - Not an equity (ETF, bond, etc.) -/// - Missing critical fields -/// - Network or JSON parsing errors -/*pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result { - let url = format!( - "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price%2CassetProfile", - ticker - ); - - let resp = match HttpClient::new() - .get(&url) - .header("User-Agent", USER_AGENT) - .send() - .await - { - Ok(resp) => resp, - Err(err) => { - return Err(anyhow::anyhow!( - "Failed to reach Yahoo Finance for ticker {}: {}", - ticker, - err - )); - } - }; - - if !resp.status().is_success() { - return Err(anyhow::anyhow!("Yahoo returned HTTP {} for ticker {}", resp.status(), ticker)); - } - - let json: Value = match resp - .json() - .await { - Ok(resp) => resp, - Err(err) => { - return Err(anyhow::anyhow!( - "Failed to parse JSON response from Yahoo Finance {}: {}", - ticker, - err - )); - } - }; - - let result_array = json["quoteSummary"]["result"] - .as_array() - .ok_or_else(|| anyhow::anyhow!("Missing 'quoteSummary.result' in response"))?; - - if result_array.is_empty() || result_array[0].is_null() { - return Err(anyhow::anyhow!("No quote data returned for ticker {}", ticker)); - } - - let quote = &result_array[0]["price"]; - let profile = &result_array[0]["assetProfile"]; - - // === 1. Must be EQUITY === - let quote_type = quote["quoteType"] - .as_str() - .unwrap_or("") - .to_ascii_uppercase(); - - if quote_type != "EQUITY" { - println!(" → Skipping {} (quoteType: {})", ticker, quote_type); - return Err(anyhow::anyhow!("Not an equity security: {}", quote_type)); - } - - // === 2. Extract basic info === - let long_name = quote["longName"] - .as_str() - .or_else(|| quote["shortName"].as_str()) - .unwrap_or(ticker) - .trim() - .to_string(); - - let currency = quote["currency"] - .as_str() - .unwrap_or("USD") - .to_string(); - - let exchange_mic = quote["exchange"] - .as_str() - .unwrap_or("") - .to_string(); - - if exchange_mic.is_empty() { - return Err(anyhow::anyhow!("Missing exchange MIC for ticker {}", ticker)); - } - - // === 3. Extract ISIN (from assetProfile if available) === - let isin = profile["isin"] - .as_str() - .and_then(|s| if s.len() == 12 && s.chars().all(|c| c.is_ascii_alphanumeric()) { Some(s) } else { None }) - .unwrap_or("") - .to_ascii_uppercase(); - - // === 4. Final sanity check: reject obvious debt securities === - let name_upper = long_name.to_ascii_uppercase(); - if name_upper.contains(" BOND") || - name_upper.contains(" NOTE") || - name_upper.contains(" DEBENTURE") || - name_upper.contains(" PREFERRED") && !name_upper.contains(" STOCK") { - return Err(anyhow::anyhow!("Security name suggests debt instrument: {}", long_name)); - } - - println!( - " → Valid equity: {} | {} | {} | ISIN: {}", - ticker, - long_name, - exchange_mic, - if isin.is_empty() { "N/A" } else { &isin } - ); - - Ok(PrimaryInfo { - isin, - name: long_name, - exchange_mic, - currency, - }) -}*/ - -/// Fetches earnings events for a ticker using a dedicated ScrapeTask. -/// -/// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar, -/// reject cookies, and extract the events. -/// -/// # Arguments -/// * `ticker` - The stock ticker symbol. -/// -/// # Returns -/// A vector of CompanyEvent structs on success. -/// -/// # Errors -/// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues. -pub async fn fetch_earnings_with_pool( - ticker: &str, - pool: &Arc, -) -> anyhow::Result> { - let ticker = ticker.to_string(); - let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}", ticker); - - let ticker_cloned = ticker.clone(); - - pool.execute(url, move |client| { - let ticker = ticker_cloned.clone(); - Box::pin(async move { - reject_yahoo_cookies(&client).await?; - extract_earnings_events(&client, &ticker).await - }) - }).await -} - -/// Extracts earnings events from the currently loaded Yahoo Finance earnings calendar page. -/// -/// This function assumes the client is already navigated to the correct URL (e.g., -/// https://finance.yahoo.com/calendar/earnings?symbol={ticker}) and cookies are handled. -/// -/// It waits for the earnings table, extracts rows, parses cells into CompanyEvent structs, -/// and handles date parsing, float parsing, and optional fields. -/// -/// # Arguments -/// * `client` - The fantoccini Client with the page loaded. -/// * `ticker` - The stock ticker symbol for the events. -/// -/// # Returns -/// A vector of CompanyEvent on success. -/// -/// # Errors -/// Returns an error if: -/// - Table or elements not found. -/// - Date or float parsing fails. -/// - WebDriver operations fail. -/// -/// # Examples -/// -/// ```no_run -/// use fantoccini::Client; -/// use crate::corporate::scraper::extract_earnings; -/// -/// #[tokio::main] -/// async fn main() -> Result<()> { -/// // Assume client is set up and navigated -/// let events = extract_earnings(&client, "AAPL").await?; -/// Ok(()) -/// } -/// ``` -pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result> { - // Wait for the table to load - let table = client - .wait() - .for_element(Locator::Css(r#"table[data-test="cal-table"]"#)) - .await - .map_err(|e| anyhow!("Failed to find earnings table: {}", e))?; - - // Find all rows in tbody - let rows = table - .find_all(Locator::Css("tbody tr")) - .await - .map_err(|e| anyhow!("Failed to find table rows: {}", e))?; - - let mut events = Vec::with_capacity(rows.len()); - - for row in rows { - let cells = row - .find_all(Locator::Css("td")) - .await - .map_err(|e| anyhow!("Failed to find cells in row: {}", e))?; - - if cells.len() < 5 { - continue; // Skip incomplete rows - } - - // Extract and parse date - let date_str = cells[0] - .text() - .await - .map_err(|e| anyhow!("Failed to get date text: {}", e))?; - let date = parse_yahoo_date(&date_str) - .map_err(|e| anyhow!("Failed to parse date '{}': {}", date_str, e))? - .format("%Y-%m-%d") - .to_string(); - - // Extract time, replace "Time Not Supplied" with empty - let time = cells[1] - .text() - .await - .map_err(|e| anyhow!("Failed to get time text: {}", e))? - .replace("Time Not Supplied", ""); - - // Extract period - let period = cells[2] - .text() - .await - .map_err(|e| anyhow!("Failed to get period text: {}", e))?; - - // Parse EPS forecast - let eps_forecast_str = cells[3] - .text() - .await - .map_err(|e| anyhow!("Failed to get EPS forecast text: {}", e))?; - let eps_forecast = parse_float(&eps_forecast_str); - - // Parse EPS actual - let eps_actual_str = cells[4] - .text() - .await - .map_err(|e| anyhow!("Failed to get EPS actual text: {}", e))?; - let eps_actual = parse_float(&eps_actual_str); - - // Parse surprise % if available - let surprise_pct = if cells.len() > 5 { - let surprise_str = cells[5] - .text() - .await - .map_err(|e| anyhow!("Failed to get surprise text: {}", e))?; - parse_float(&surprise_str) - } else { - None - }; - - events.push(CompanyEvent { - ticker: ticker.to_string(), - date, - time, - period, - eps_forecast, - eps_actual, - revenue_forecast: None, - revenue_actual: None, - surprise_pct, - source: "Yahoo".to_string(), - }); - } - - if events.is_empty() { - eprintln!("Warning: No earnings events extracted for ticker {}", ticker); - } else { - println!("Extracted {} earnings events for {}", events.len(), ticker); - } - - Ok(events) -} - fn parse_price(v: Option<&Value>) -> f64 { v.and_then(|x| x.as_str()) .and_then(|s| s.replace('$', "").replace(',', "").parse::().ok()) @@ -490,20 +191,17 @@ pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow pub async fn download_isin_lei_csv() -> anyhow::Result> { let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download"; - // Initialize DataPaths and create cache/gleif directory let paths = DataPaths::new(".")?; let gleif_cache_dir = paths.cache_gleif_dir(); if let Err(e) = std::fs::create_dir_all(&gleif_cache_dir) { let msg = format!("Failed to create cache/gleif directory: {}", e); logger::log_error(&msg).await; - println!("{}", msg); return Ok(None); } - logger::log_info("Corporate Scraper: Downloading ISIN/LEI mapping from GLEIF...").await; + logger::log_info("Downloading ISIN/LEI mapping from GLEIF...").await; - // Download ZIP and get the filename from Content-Disposition header let client = match reqwest::Client::builder() .user_agent(USER_AGENT) .timeout(std::time::Duration::from_secs(30)) @@ -511,9 +209,7 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { { Ok(c) => c, Err(e) => { - let msg = format!("Failed to create HTTP client: {}", e); - logger::log_error(&msg).await; - println!("{}", msg); + logger::log_error(&format!("Failed to create HTTP client: {}", e)).await; return Ok(None); } }; @@ -521,20 +217,15 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { let resp = match client.get(url).send().await { Ok(r) if r.status().is_success() => r, Ok(resp) => { - let msg = format!("Server returned HTTP {}", resp.status()); - logger::log_error(&msg).await; - println!("{}", msg); + logger::log_error(&format!("Server returned HTTP {}", resp.status())).await; return Ok(None); } Err(e) => { - let msg = format!("Failed to download ISIN/LEI ZIP: {}", e); - logger::log_error(&msg).await; - println!("{}", msg); + logger::log_error(&format!("Failed to download: {}", e)).await; return Ok(None); } }; - // Extract filename from Content-Disposition header or use default let filename = resp .headers() .get("content-disposition") @@ -542,11 +233,10 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { .and_then(|s| s.split("filename=").nth(1).map(|f| f.trim_matches('"').to_string())) .unwrap_or_else(|| "isin_lei.zip".to_string()); - // Parse timestamp from filename and convert to DDMMYYYY format let parsed_filename = parse_gleif_filename(&filename); - logger::log_info(&format!("Corporate Scraper: Downloaded file: {} -> {}", filename, parsed_filename)).await; + logger::log_info(&format!("Downloaded: {} -> {}", filename, parsed_filename)).await; - // Determine date (DDMMYYYY) from parsed filename: "isin-lei-DDMMYYYY.csv" + // Extract date from filename let mut date_str = String::new(); if let Some(start_idx) = parsed_filename.find("isin-lei-") { let rest = &parsed_filename[start_idx + 9..]; @@ -555,13 +245,10 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { } } - // If we parsed a date, use/create a date folder under cache/gleif and operate inside it; otherwise use cache root. let date_dir = if !date_str.is_empty() { let p = gleif_cache_dir.join(&date_str); - // Ensure the date folder exists (create if necessary) if let Err(e) = std::fs::create_dir_all(&p) { - let msg = format!("Failed to create date directory {:?}: {}", p, e); - logger::log_warn(&msg).await; + logger::log_warn(&format!("Failed to create date directory: {}", e)).await; None } else { Some(p) @@ -570,17 +257,16 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { None }; - // Choose the directory where we'll look for existing files and where we'll save the new ones let target_dir = date_dir.clone().unwrap_or_else(|| gleif_cache_dir.to_path_buf()); - // If the date folder exists (or was created), prefer any *_clean.csv inside it and return that immediately + // Check for existing clean CSV if let Some(ref ddir) = date_dir { if let Ok(entries) = std::fs::read_dir(ddir) { for entry in entries.flatten() { if let Some(name) = entry.file_name().to_str() { if name.to_lowercase().ends_with("_clean.csv") { let path = ddir.join(name); - logger::log_info(&format!("Found existing clean GLEIF CSV: {}", path.display())).await; + logger::log_info(&format!("Found existing clean CSV: {}", path.display())).await; return Ok(Some(path.to_string_lossy().to_string())); } } @@ -588,71 +274,42 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { } } - // If no clean file found in the date folder (or date folder doesn't exist), check whether the csv/zip already exist in the target dir - let csv_candidate_name = parsed_filename.replace(".zip", ".csv"); - let csv_candidate = target_dir.join(&csv_candidate_name); - let zip_candidate = target_dir.join(&parsed_filename); - + let csv_candidate = target_dir.join(parsed_filename.replace(".zip", ".csv")); if csv_candidate.exists() { - logger::log_info(&format!("Found existing GLEIF CSV: {}", csv_candidate.display())).await; + logger::log_info(&format!("Found existing CSV: {}", csv_candidate.display())).await; return Ok(Some(csv_candidate.to_string_lossy().to_string())); } - if zip_candidate.exists() { - // If zip exists but csv does not, extract later; for now prefer returning csv path (may be created by extraction step) - let inferred_csv = target_dir.join(csv_candidate_name); - if inferred_csv.exists() { - logger::log_info(&format!("Found existing extracted CSV next to ZIP: {}", inferred_csv.display())).await; - return Ok(Some(inferred_csv.to_string_lossy().to_string())); - } - // otherwise we'll overwrite/extract into target_dir below - } let bytes = match resp.bytes().await { Ok(b) => b, Err(e) => { - let msg = format!("Failed to read ZIP bytes: {}", e); - logger::log_error(&msg).await; - println!("{}", msg); + logger::log_error(&format!("Failed to read bytes: {}", e)).await; return Ok(None); } }; - // Ensure target directory exists (create if it's the date folder and was absent earlier) - if let Some(ref ddir) = date_dir { - let _ = std::fs::create_dir_all(ddir); - } let zip_path = target_dir.join(&parsed_filename); let csv_path = target_dir.join(parsed_filename.replace(".zip", ".csv")); if let Err(e) = tokio::fs::write(&zip_path, &bytes).await { - let msg = format!("Failed to write ZIP file: {}", e); - logger::log_error(&msg).await; - println!("{}", msg); + logger::log_error(&format!("Failed to write ZIP: {}", e)).await; return Ok(None); } - logger::log_info(&format!("Corporate Scraper: Saved ZIP to {:?}", zip_path)).await; - // Extract CSV - let archive = match std::fs::File::open(&zip_path) - .map(ZipArchive::new) - { + // Extract CSV from ZIP + let archive = match std::fs::File::open(&zip_path).map(ZipArchive::new) { Ok(Ok(a)) => a, Ok(Err(e)) => { - let msg = format!("Invalid ZIP: {}", e); - logger::log_error(&msg).await; - println!("{}", msg); + logger::log_error(&format!("Invalid ZIP: {}", e)).await; return Ok(None); } Err(e) => { - let msg = format!("Cannot open ZIP file: {}", e); - logger::log_error(&msg).await; - println!("{}", msg); + logger::log_error(&format!("Cannot open ZIP: {}", e)).await; return Ok(None); } }; let mut archive = archive; - let idx = match (0..archive.len()).find(|&i| { archive.by_index(i) .map(|f| f.name().ends_with(".csv")) @@ -660,9 +317,7 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { }) { Some(i) => i, None => { - let msg = "ZIP did not contain a CSV file"; - logger::log_error(msg).await; - println!("{}", msg); + logger::log_error("ZIP contains no CSV").await; return Ok(None); } }; @@ -670,43 +325,32 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { let mut csv_file = match archive.by_index(idx) { Ok(f) => f, Err(e) => { - let msg = format!("Failed to read CSV entry: {}", e); - logger::log_error(&msg).await; - println!("{}", msg); + logger::log_error(&format!("Failed to read CSV: {}", e)).await; return Ok(None); } }; let mut csv_bytes = Vec::new(); if let Err(e) = csv_file.read_to_end(&mut csv_bytes) { - let msg = format!("Failed to extract CSV: {}", e); - logger::log_error(&msg).await; + logger::log_error(&format!("Failed to extract: {}", e)).await; return Ok(None); } if let Err(e) = tokio::fs::write(&csv_path, &csv_bytes).await { - let msg = format!("Failed to save CSV file: {}", e); - logger::log_error(&msg).await; + logger::log_error(&format!("Failed to save CSV: {}", e)).await; return Ok(None); } - let msg = format!("✓ ISIN/LEI CSV extracted: {:?}", csv_path); - logger::log_info(&msg).await; - + logger::log_info(&format!("✓ CSV extracted: {:?}", csv_path)).await; Ok(Some(csv_path.to_string_lossy().to_string())) } -/// Parse GLEIF filename and convert timestamp to DDMMYYYY format -/// Example: "isin-lei-20251124T080254.csv" -> "isin-lei-24112025.csv" fn parse_gleif_filename(filename: &str) -> String { - // Try to find pattern: isin-lei-YYYYMMDDTHHMMSS.zip/csv if let Some(start_idx) = filename.find("isin-lei-") { - let rest = &filename[start_idx + 9..]; // After "isin-lei-" + let rest = &filename[start_idx + 9..]; - // Extract the 8 digits (YYYYMMDD) if rest.len() >= 8 && rest[0..8].chars().all(|c| c.is_numeric()) { let date_part = &rest[0..8]; - // date_part is YYYYMMDD, convert to DDMMYYYY if date_part.len() == 8 { let year = &date_part[0..4]; let month = &date_part[4..6]; @@ -717,11 +361,9 @@ fn parse_gleif_filename(filename: &str) -> String { } } - // Fallback: return original filename if parsing fails filename.to_string() } - pub async fn load_isin_lei_csv() -> anyhow::Result>> { // 1. Download + extract the CSV (this is now async) let csv_path = match download_isin_lei_csv().await? { @@ -769,30 +411,4 @@ pub async fn load_isin_lei_csv() -> anyhow::Result>> ); Ok(map) -} - -pub async fn reject_yahoo_cookies(client: &Client) -> anyhow::Result<()> { - for _ in 0..10 { - let clicked: bool = client - .execute( - r#"(() => { - const btn = document.querySelector('#consent-page .reject-all'); - if (btn) { - btn.click(); - return true; - } - return false; - })()"#, - vec![], - ) - .await? - .as_bool() - .unwrap_or(false); - - if clicked { break; } - sleep(TokioDuration::from_millis(500)).await; - } - - println!("Rejected Yahoo cookies if button existed"); - Ok(()) } \ No newline at end of file diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index 37741c2..5b10b92 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -6,49 +6,12 @@ use crate::util::logger; use tokio::fs; use tokio::io::AsyncWriteExt; use chrono::{Datelike, NaiveDate}; -use std::collections::{HashMap}; +use std::collections::HashMap; use std::path::{PathBuf, Path}; -const BATCH_SIZE: usize = 500; // Process 500 events at a time +const BATCH_SIZE: usize = 500; -/// Load events in streaming fashion to avoid memory buildup -pub async fn load_existing_events_streaming( - paths: &DataPaths, - callback: impl Fn(CompanyEvent) -> anyhow::Result<()> -) -> anyhow::Result { - let dir = paths.corporate_events_dir(); - if !dir.exists() { - logger::log_info("Corporate Storage: No existing events directory found").await; - return Ok(0); - } - - let mut total = 0; - let mut entries = fs::read_dir(dir).await?; - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.extension().and_then(|s| s.to_str()) == Some("json") { - let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); - if name.starts_with("events_") && name.len() == 17 { - let content = fs::read_to_string(&path).await?; - let events: Vec = serde_json::from_str(&content)?; - - for event in events { - callback(event)?; - total += 1; - } - - // Yield to prevent blocking - tokio::task::yield_now().await; - } - } - } - - logger::log_info(&format!("Corporate Storage: Streamed {} events", total)).await; - Ok(total) -} - -/// Build lightweight index of events instead of loading everything +/// Lightweight index entry - only metadata, no full event data #[derive(Debug, Clone)] pub struct EventIndex { pub key: String, @@ -57,9 +20,11 @@ pub struct EventIndex { pub file_path: PathBuf, } +/// Build index of all events without loading them into memory pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result> { let dir = paths.corporate_events_dir(); if !dir.exists() { + logger::log_info("Corporate Storage: No events directory found").await; return Ok(Vec::new()); } @@ -90,7 +55,7 @@ pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result( + paths: &DataPaths, + mut callback: F +) -> anyhow::Result +where + F: FnMut(CompanyEvent) -> anyhow::Result<()>, +{ + let dir = paths.corporate_events_dir(); + if !dir.exists() { + return Ok(0); + } + + let mut total = 0; + let mut entries = fs::read_dir(dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.extension().and_then(|s| s.to_str()) == Some("json") { + let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); + if name.starts_with("events_") { + let content = fs::read_to_string(&path).await?; + let events: Vec = serde_json::from_str(&content)?; + + for event in events { + callback(event)?; + total += 1; + } + + tokio::task::yield_now().await; + } + } + } + + logger::log_info(&format!("Corporate Storage: Streamed {} events", total)).await; + Ok(total) +} + +/// Save events organized by month (accepts Vec, not HashMap) pub async fn save_optimized_events( paths: &DataPaths, - events: Vec // Changed from HashMap to Vec + events: Vec ) -> anyhow::Result<()> { let dir = paths.corporate_events_dir(); fs::create_dir_all(dir).await?; @@ -124,16 +128,14 @@ pub async fn save_optimized_events( removed_count += 1; } } - logger::log_info(&format!("Corporate Storage: Removed {} old event files", removed_count)).await; + logger::log_info(&format!("Corporate Storage: Removed {} old files", removed_count)).await; let total_events = events.len(); let mut sorted = events; sorted.sort_by(|a, b| { - a.ticker.cmp(&b.ticker) - .then(a.date.cmp(&b.date)) + a.ticker.cmp(&b.ticker).then(a.date.cmp(&b.date)) }); - // Process in batches to avoid memory buildup let mut by_month: HashMap> = HashMap::new(); for chunk in sorted.chunks(BATCH_SIZE) { @@ -146,27 +148,28 @@ pub async fn save_optimized_events( tokio::task::yield_now().await; } - let total_months = by_month.len(); for (month, list) in by_month { let path = dir.join(format!("events_{}.json", month)); fs::write(&path, serde_json::to_string_pretty(&list)?).await?; - logger::log_info(&format!("Corporate Storage: Saved {} events for month {}", list.len(), month)).await; + logger::log_info(&format!("Saved {} events for month {}", list.len(), month)).await; } - logger::log_info(&format!("Corporate Storage: Saved {} total events in {} month files", total_events, total_months)).await; + logger::log_info(&format!("Saved {} total events", total_events)).await; Ok(()) } -pub async fn save_changes(paths: &DataPaths, changes: &[CompanyEventChange]) -> anyhow::Result<()> { +pub async fn save_changes( + paths: &DataPaths, + changes: &[CompanyEventChange] +) -> anyhow::Result<()> { if changes.is_empty() { logger::log_info("Corporate Storage: No changes to save").await; return Ok(()); } + let dir = paths.corporate_changes_dir(); fs::create_dir_all(dir).await?; - logger::log_info(&format!("Corporate Storage: Saving {} changes", changes.len())).await; - let mut by_month: HashMap> = HashMap::new(); for c in changes { if let Ok(d) = NaiveDate::parse_from_str(&c.date, "%Y-%m-%d") { @@ -180,12 +183,13 @@ pub async fn save_changes(paths: &DataPaths, changes: &[CompanyEventChange]) -> let mut all = if path.exists() { let s = fs::read_to_string(&path).await?; serde_json::from_str(&s).unwrap_or_default() - } else { vec![] }; + } else { + vec![] + }; all.extend(list.clone()); fs::write(&path, serde_json::to_string_pretty(&all)?).await?; - logger::log_info(&format!("Corporate Storage: Saved {} changes for month {}", list.len(), month)).await; } - logger::log_info("Corporate Storage: All changes saved successfully").await; + Ok(()) } @@ -203,9 +207,7 @@ pub async fn save_prices_for_ticker( let path = timeframe_dir.join("prices.json"); prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); - - let json = serde_json::to_string_pretty(&prices)?; - fs::write(&path, json).await?; + fs::write(&path, serde_json::to_string_pretty(&prices)?).await?; Ok(()) } @@ -240,7 +242,10 @@ pub async fn save_available_exchanges( Ok(()) } -pub async fn load_available_exchanges(paths: &DataPaths, lei: &str) -> anyhow::Result> { +pub async fn load_available_exchanges( + paths: &DataPaths, + lei: &str +) -> anyhow::Result> { let path = get_company_dir(paths, lei).join("available_exchanges.json"); if path.exists() { let content = fs::read_to_string(&path).await?; @@ -267,15 +272,13 @@ pub async fn save_prices_by_source( Ok(()) } -/// Saves companies data to a JSONL file in streaming fashion +/// Stream companies to JSONL incrementally pub async fn save_companies_to_jsonl_streaming( paths: &DataPaths, - companies: &HashMap>, -) -> anyhow::Result<()> { + companies_iter: impl Iterator)>, +) -> anyhow::Result { let file_path = paths.data_dir().join("companies.jsonl"); - logger::log_info(&format!("Corporate Storage: Saving {} companies to JSONL", companies.len())).await; - if let Some(parent) = file_path.parent() { tokio::fs::create_dir_all(parent).await?; } @@ -283,32 +286,33 @@ pub async fn save_companies_to_jsonl_streaming( let mut file = tokio::fs::File::create(&file_path).await?; let mut count = 0; - // Process in batches - for (name, securities) in companies.iter() { + for (name, securities) in companies_iter { let line = serde_json::json!({ "name": name, "securities": securities }); + file.write_all(line.to_string().as_bytes()).await?; file.write_all(b"\n").await?; - count += 1; + if count % 100 == 0 { tokio::task::yield_now().await; } } - let msg = format!("✓ Saved {} companies to {:?}", companies.len(), file_path); - println!("{}", msg); - logger::log_info(&msg).await; - Ok(()) + logger::log_info(&format!("Saved {} companies to JSONL", count)).await; + Ok(count) } -/// Load companies from JSONL in streaming fashion -pub async fn load_companies_from_jsonl_streaming( +/// Stream read companies from JSONL +pub async fn stream_companies_from_jsonl( path: &Path, - callback: impl Fn(String, HashMap) -> anyhow::Result<()> -) -> anyhow::Result { + mut callback: F +) -> anyhow::Result +where + F: FnMut(String, HashMap) -> anyhow::Result<()>, +{ if !path.exists() { return Ok(0); } diff --git a/src/corporate/types.rs b/src/corporate/types.rs index b08460c..6c01c1f 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -79,15 +79,11 @@ pub struct CompanyInfo{ pub securities: HashMap>, // ISIN -> Vec } -/// Company Meta Data -/// # Attributes -/// * lei: Structuring the companies by legal dependencies [LEI -> Vec] -/// * figi: metadata with ISIN as key -/*#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompanyMetadata { - pub lei: String, - pub figi: Option>, -}*/ +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompanyCrossPlatformInfo { + pub name: String, + pub isin_tickers_map: HashMap>, // ISIN -> Tickers +} /// Warrant Info /// @@ -118,14 +114,6 @@ pub struct OptionInfo { pub options: HashMap>, // ISIN -> Vec (grouped by ISIN) } -/*#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PrimaryInfo { - pub isin: String, - pub name: String, - pub exchange_mic: String, - pub currency: String, -}*/ - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AvailableExchange { pub exchange_mic: String, diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 2c88666..eb8729f 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,170 +1,274 @@ -// src/corporate/update.rs -use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfigi::*}; +// src/corporate/update.rs - COMPLETE STREAMING VERSION + +use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*}; use crate::config::Config; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; use chrono::Local; -use std::collections::{HashMap}; +use std::collections::HashMap; use std::sync::Arc; -/// Main function: Full update for all companies with streaming to minimize memory usage -pub async fn run_full_update(config: &Config, pool: &Arc) -> anyhow::Result<()> { - let msg = "=== Starting LEI-based corporate full update (STREAMING) ==="; - println!("{}", msg); - logger::log_info(msg).await; +/// Main update function - fully streaming, minimal memory usage +pub async fn run_full_update(_config: &Config, pool: &Arc) -> anyhow::Result<()> { + logger::log_info("=== Corporate Update (STREAMING MODE) ===").await; let paths = DataPaths::new(".")?; - // Step 1: Download/locate GLEIF CSV (don't load into memory yet) - logger::log_info("Corporate Update: Downloading/locating GLEIF CSV...").await; + // Step 1: Download GLEIF CSV (don't load into memory) + logger::log_info("Step 1: Downloading GLEIF CSV...").await; let gleif_csv_path = match download_isin_lei_csv().await? { Some(p) => { - logger::log_info(&format!("Corporate Update: GLEIF CSV at: {}", p)).await; + logger::log_info(&format!(" ✓ GLEIF CSV at: {}", p)).await; p } None => { - logger::log_warn("Corporate Update: Could not obtain GLEIF CSV, continuing with limited data").await; + logger::log_warn(" ✗ Could not obtain GLEIF CSV").await; return Ok(()); } }; // Step 2: Load OpenFIGI type lists (small, cached) - logger::log_info("Corporate Update: Loading OpenFIGI type lists...").await; - if let Err(e) = load_figi_type_lists().await { - logger::log_warn(&format!("Could not load OpenFIGI type lists: {}", e)).await; + logger::log_info("Step 2: Loading OpenFIGI metadata...").await; + load_figi_type_lists().await.ok(); + logger::log_info(" ✓ OpenFIGI metadata loaded").await; + + // Step 3: Check mapping status and process only unmapped LEIs + logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; + + let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?; + + if !all_mapped { + logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await; + } else { + logger::log_info(" ✓ All LEIs successfully mapped").await; } - // Step 3: Process GLEIF → FIGI mapping in streaming fashion - logger::log_info("Corporate Update: Building FIGI mappings (streaming)...").await; + // Step 4: Build securities from FIGI data (streaming) + logger::log_info("Step 4: Building securities map (streaming)...").await; + let date_dir = find_most_recent_figi_date_dir(&paths).await?; - // Build LEI→ISINs map by streaming the CSV - let mut lei_to_isins: HashMap> = HashMap::new(); - let mut lei_batch = Vec::new(); - const LEI_BATCH_SIZE: usize = 1000; - - stream_gleif_csv(&gleif_csv_path, |lei, isin| { - lei_to_isins.entry(lei.clone()).or_default().push(isin); - lei_batch.push(lei); - - // Process in batches - if lei_batch.len() >= LEI_BATCH_SIZE { - lei_batch.clear(); - } - - Ok(()) - }).await?; - - logger::log_info(&format!("Corporate Update: Collected {} LEIs", lei_to_isins.len())).await; - - // Step 4: Build FIGI mappings in batches (process and save incrementally) - logger::log_info("Corporate Update: Processing FIGI mappings in batches...").await; - let figi_result = build_lei_to_figi_infos(&lei_to_isins, None).await; - - // Don't keep the full result in memory - it's already saved to JSONL files - drop(figi_result); - drop(lei_to_isins); // Release this too - - logger::log_info("Corporate Update: FIGI mappings saved to cache").await; - - // Step 5: Load or build securities (streaming from JSONL files) - logger::log_info("Corporate Update: Building securities map (streaming)...").await; - - let dir = DataPaths::new(".")?; - let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); - - // Find the most recent date directory - let date_dir = find_most_recent_date_dir(&map_cache_dir).await?; - - let (common_stocks, _warrants, _options) = if let Some(date_dir) = date_dir { - logger::log_info(&format!("Using FIGI data from: {:?}", date_dir)).await; - load_or_build_all_securities_streaming(&date_dir).await? + if let Some(date_dir) = date_dir { + logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; + build_securities_from_figi_streaming(&date_dir).await?; + logger::log_info(" ✓ Securities map updated").await; } else { - logger::log_warn("No FIGI date directory found, using empty maps").await; - (HashMap::new(), HashMap::new(), HashMap::new()) - }; + logger::log_warn(" ✗ No FIGI data directory found").await; + } - logger::log_info(&format!("Corporate Update: Processing {} companies", common_stocks.len())).await; + // Step 5: Build companies JSONL (streaming from securities) + logger::log_info("Step 5: Building companies.jsonl (streaming)...").await; + let count = build_companies_jsonl_streaming(&paths, pool).await?; + logger::log_info(&format!(" ✓ Saved {} companies", count)).await; - // Step 6: Convert to simplified companies map and save incrementally - logger::log_info("Corporate Update: Building companies JSONL (streaming)...").await; + // Step 6: Process events (using index, not full load) + logger::log_info("Step 6: Processing events (using index)...").await; + let _event_index = build_event_index(&paths).await?; + logger::log_info(" ✓ Event index built").await; + + logger::log_info("✓ Corporate update complete").await; + Ok(()) +} + +/// Stream companies.jsonl creation from securities cache - INCREMENTAL MODE +async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc) -> anyhow::Result { + let path = DataPaths::new(".")?; + let corporate_path = path.data_dir().join("corporate").join("by_name"); + let securities_path = corporate_path.join("common_stocks.json"); + + if !securities_path.exists() { + logger::log_warn("No common_stocks.json found").await; + return Ok(0); + } + + // Load securities + let content = tokio::fs::read_to_string(securities_path).await?; + let securities: HashMap = serde_json::from_str(&content)?; let companies_path = paths.data_dir().join("companies.jsonl"); - // Create file and write incrementally if let Some(parent) = companies_path.parent() { tokio::fs::create_dir_all(parent).await?; } - let mut file = tokio::fs::File::create(&companies_path).await?; - let mut processed = 0; + // Load existing companies into a map + let mut existing_companies: HashMap = HashMap::new(); - for (name, company_info) in common_stocks.iter() { - let mut isin_ticker_pairs: HashMap = HashMap::new(); + if companies_path.exists() { + logger::log_info("Loading existing companies.jsonl...").await; + let existing_content = tokio::fs::read_to_string(&companies_path).await?; + for line in existing_content.lines() { + if line.trim().is_empty() { + continue; + } + match serde_json::from_str::(line) { + Ok(company) => { + existing_companies.insert(company.name.clone(), company); + } + Err(e) => { + logger::log_warn(&format!("Failed to parse existing company line: {}", e)).await; + } + } + } + logger::log_info(&format!("Loaded {} existing companies", existing_companies.len())).await; + } + + // Create temporary file for atomic write + let temp_path = companies_path.with_extension("jsonl.tmp"); + let mut file = tokio::fs::File::create(&temp_path).await?; + let mut count = 0; + let mut updated_count = 0; + let mut new_count = 0; + + use tokio::io::AsyncWriteExt; + + for (name, company_info) in securities.iter() { + // Check if we already have this company + let existing_entry = existing_companies.remove(name); + let is_update = existing_entry.is_some(); + + // Start with existing ISIN-ticker map or create new one + let mut isin_tickers_map: HashMap> = + existing_entry + .map(|e| e.isin_tickers_map) + .unwrap_or_default(); + + // Step 1: Extract unique ISIN-ticker pairs from FigiInfo + let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); for figi_infos in company_info.securities.values() { for figi_info in figi_infos { - if !figi_info.isin.is_empty() && !figi_info.ticker.is_empty() { - isin_ticker_pairs.insert(figi_info.isin.clone(), figi_info.ticker.clone()); + if !figi_info.isin.is_empty() { + let tickers = unique_isin_ticker_pairs + .entry(figi_info.isin.clone()) + .or_insert_with(Vec::new); + + // Add FIGI ticker if present and not duplicate + if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) { + tickers.push(figi_info.ticker.clone()); + } } } } - if !isin_ticker_pairs.is_empty() { - use tokio::io::AsyncWriteExt; + // Step 2: Merge FIGI tickers into main map + for (isin, figi_tickers) in unique_isin_ticker_pairs { + let tickers = isin_tickers_map + .entry(isin.clone()) + .or_insert_with(Vec::new); - let line = serde_json::json!({ - "name": name, - "securities": isin_ticker_pairs - }); + // Add FIGI tickers that aren't already present + for figi_ticker in figi_tickers { + if !tickers.contains(&figi_ticker) { + tickers.push(figi_ticker); + } + } - file.write_all(line.to_string().as_bytes()).await?; + // Step 3: Check if we need to fetch Yahoo ticker for this ISIN + let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); + + if !has_yahoo_ticker { + logger::log_info(&format!("Fetching Yahoo ticker for {} (ISIN: {})", name, isin)).await; + let yahoo_result = scrape_ticker_by_isin(pool, &isin).await; + + match yahoo_result { + Ok(result) => { + let log_msg = match &result { + YahooTickerResult::Found(ticker) => + format!("✓ Found Yahoo ticker {} for ISIN {}", ticker, isin), + YahooTickerResult::NoResults => + format!("○ No search results for ISIN {}", isin), + YahooTickerResult::NotFound => + format!("○ Empty ticker result for ISIN {}", isin), + YahooTickerResult::AmbiguousResults => + format!("⚠ Ambiguous results for ISIN {}", isin), + }; + + if result.is_found() { + logger::log_info(&log_msg).await; + } else { + logger::log_warn(&log_msg).await; + } + + tickers.push(result.to_tagged_string()); + }, + Err(e) => { + logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await; + tickers.push("YAHOO:ERROR".to_string()); + } + } + } else { + logger::log_warn(&format!("Skipping Yahoo lookup for {} ISIN {} - already has Yahoo data", name, isin)).await; + } + } + + // Only write if we have ticker data + if !isin_tickers_map.is_empty() { + let company_entry = CompanyCrossPlatformInfo { + name: name.clone(), + isin_tickers_map, + }; + + let line = serde_json::to_string(&company_entry)?; + + file.write_all(line.as_bytes()).await?; file.write_all(b"\n").await?; - processed += 1; - // Yield periodically - if processed % 100 == 0 { + // Flush after each write for crash safety + file.flush().await?; + + count += 1; + if is_update { + updated_count += 1; + } else { + new_count += 1; + } + + if count % 10 == 0 { + logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await; tokio::task::yield_now().await; - logger::log_info(&format!("Saved {} companies so far...", processed)).await; } } } - logger::log_info(&format!("Corporate Update: Saved {} companies to JSONL", processed)).await; - - // Step 7: Process events in streaming fashion - logger::log_info("Corporate Update: Processing events (streaming)...").await; + // Write any remaining existing companies that weren't in securities + for (_name, company) in existing_companies { + let line = serde_json::to_string(&company)?; + file.write_all(line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + count += 1; + logger::log_warn(&format!("Preserved existing company: {}", _name)).await; + } - let event_index = build_event_index(&paths).await?; - logger::log_info(&format!("Corporate Update: Built index of {} events", event_index.len())).await; + // Ensure all data is written + file.sync_all().await?; + drop(file); - // For now, we just maintain the index - // In a full implementation, you'd stream through tickers and update events + // Atomic rename: replace old file with new one + tokio::fs::rename(&temp_path, &companies_path).await?; - // Step 8: Save any updates - logger::log_info("Corporate Update: Finalizing...").await; - - let msg = "✓ Corporate update complete (streaming)"; - println!("{}", msg); - logger::log_info(msg).await; - Ok(()) + logger::log_info(&format!("✓ Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count)).await; + + Ok(count) } -/// Helper to find the most recent date directory in the FIGI cache -async fn find_most_recent_date_dir(map_cache_dir: &std::path::Path) -> anyhow::Result> { +/// Find most recent FIGI date directory +async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { + let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); + if !map_cache_dir.exists() { return Ok(None); } - let mut entries = tokio::fs::read_dir(map_cache_dir).await?; + let mut entries = tokio::fs::read_dir(&map_cache_dir).await?; let mut dates = Vec::new(); while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.is_dir() { if let Some(name) = path.file_name().and_then(|n| n.to_str()) { - // Date format: DDMMYYYY if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { dates.push((name.to_string(), path)); } @@ -176,67 +280,16 @@ async fn find_most_recent_date_dir(map_cache_dir: &std::path::Path) -> anyhow::R return Ok(None); } - // Sort by date (DDMMYYYY format) - dates.sort_by(|a, b| b.0.cmp(&a.0)); // Descending order - + dates.sort_by(|a, b| b.0.cmp(&a.0)); Ok(Some(dates[0].1.clone())) } + + pub struct ProcessResult { pub changes: Vec, } -/// Process events in batches to avoid memory buildup -pub async fn process_events_streaming( - index: &[EventIndex], - new_events: &[CompanyEvent], - today: &str, -) -> anyhow::Result<(Vec, Vec)> { - let mut all_changes = Vec::new(); - let mut final_events: HashMap = HashMap::new(); - - // Step 1: Load existing events in batches using the index - logger::log_info("Loading existing events in batches...").await; - - let mut loaded_files = std::collections::HashSet::new(); - - for entry in index { - if loaded_files.contains(&entry.file_path) { - continue; - } - - let content = tokio::fs::read_to_string(&entry.file_path).await?; - let events: Vec = serde_json::from_str(&content)?; - - for e in events { - final_events.insert(event_key(&e), e); - } - - loaded_files.insert(entry.file_path.clone()); - - if final_events.len() % 1000 == 0 { - logger::log_info(&format!("Loaded {} events so far...", final_events.len())).await; - tokio::task::yield_now().await; - } - } - - logger::log_info(&format!("Loaded {} existing events", final_events.len())).await; - - // Step 2: Process new events in batches - for (idx, batch) in new_events.chunks(500).enumerate() { - logger::log_info(&format!("Processing batch {} ({} events)", idx + 1, batch.len())).await; - - let batch_result = process_batch(batch, &mut final_events, today); - all_changes.extend(batch_result.changes); - - tokio::task::yield_now().await; - } - - let events_vec: Vec = final_events.into_values().collect(); - - Ok((all_changes, events_vec)) -} - pub fn process_batch( new_events: &[CompanyEvent], existing: &mut HashMap, @@ -253,7 +306,6 @@ pub fn process_batch( continue; } - // Check for time change on same date let date_key = format!("{}|{}", new.ticker, new.date); let mut found_old = None; for (k, e) in existing.iter() { diff --git a/src/corporate/yahoo.rs b/src/corporate/yahoo.rs new file mode 100644 index 0000000..fbe1a61 --- /dev/null +++ b/src/corporate/yahoo.rs @@ -0,0 +1,312 @@ +// src/corporate/yahoo.rs +use super::{types::*, helpers::*}; +use crate::{scraper::webdriver::*, util::{directories::DataPaths}}; +use event_backtest_engine::logger; +use fantoccini::{Client, Locator}; +use serde::{Deserialize, Serialize}; +use tokio::{time::{Duration as TokioDuration, sleep}}; +use std::{sync::Arc}; +use anyhow::{anyhow, Result}; + +/// Mapping existing + +/// getting historical stock price data daily (xxxx - 2025) and hourly (last 30 days) + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum YahooTickerResult { + Found(String), + NotFound, + NoResults, + AmbiguousResults, +} + +impl YahooTickerResult { + pub fn to_tagged_string(&self) -> String { + match self { + YahooTickerResult::Found(ticker) => format!("YAHOO:{}", ticker), + YahooTickerResult::NotFound => "YAHOO:NOT_FOUND".to_string(), + YahooTickerResult::NoResults => "YAHOO:NO_RESULTS".to_string(), + YahooTickerResult::AmbiguousResults => "YAHOO:AMBIGUOUS".to_string(), + } + } + + pub fn is_found(&self) -> bool { + matches!(self, YahooTickerResult::Found(_)) + } + + pub fn get_ticker(&self) -> Option<&str> { + match self { + YahooTickerResult::Found(ticker) => Some(ticker), + _ => None, + } + } +} + +pub async fn scrape_ticker_by_isin( + pool: &Arc, + isin: &str, +) -> anyhow::Result { + let isin = isin.to_string(); + pool.execute(format!("https://finance.yahoo.com/lookup?s={}", isin), move |client| { + let isin = isin.clone(); + Box::pin(async move { + sleep(TokioDuration::from_millis(1000)).await; + reject_yahoo_cookies(&client).await?; + sleep(TokioDuration::from_millis(1000)).await; + extract_ticker_by_isin(&client, &isin).await + }) + }).await +} + +pub async fn extract_ticker_by_isin( + client: &Client, + _isin: &str, +) -> Result { + //let search_url = format!("https://finance.yahoo.com/lookup?s={}", isin); + + // Check for "No results found" message + if client.find(Locator::Css(".noData")).await.is_ok() { + return Ok(YahooTickerResult::NoResults); + } + + // Wait for results table + let table = match client + .wait() + .for_element(Locator::Css("table[data-test='lookup-table']")) + .await + { + Ok(t) => t, + Err(_) => return Ok(YahooTickerResult::NoResults), + }; + + // Find first row + let first_row = match table + .find(Locator::Css("tbody tr")) + .await + { + Ok(row) => row, + Err(_) => return Ok(YahooTickerResult::NoResults), + }; + + // Extract ticker from first cell + let ticker_cell = first_row + .find(Locator::Css("td:nth-child(1)")) + .await + .map_err(|e| anyhow!("Failed to find ticker cell: {}", e))?; + + let ticker = ticker_cell + .text() + .await + .map_err(|e| anyhow!("Failed to get ticker text: {}", e))? + .trim() + .to_string(); + + if ticker.is_empty() { + Ok(YahooTickerResult::NotFound) + } else { + Ok(YahooTickerResult::Found(ticker)) + } +} + +pub async fn get_all_tickers_from_companies_jsonl(paths: &DataPaths) -> anyhow::Result> { + let corporate_path = paths.data_dir().join("corporate").join("by_name"); + let companies_file = corporate_path.join("companies.jsonl"); + let content = tokio::fs::read_to_string(companies_file).await?; + let mut tickers = Vec::new(); + for line in content.lines() { + let company: CompanyCrossPlatformInfo = serde_json::from_str(line)?; + for (_isin, ticker_vec) in company.isin_tickers_map { + tickers.extend(ticker_vec); + } + } + Ok(tickers) +} + +/// Fetches earnings events for a ticker using a dedicated ScrapeTask. +/// +/// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar, +/// reject cookies, and extract the events. +/// +/// # Arguments +/// * `ticker` - The stock ticker symbol. +/// +/// # Returns +/// A vector of CompanyEvent structs on success. +/// +/// # Errors +/// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues. +pub async fn fetch_earnings_with_pool( + pool: &Arc, + ticker: &str, +) -> anyhow::Result> { + let ticker = ticker.to_string(); + let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}", ticker); + + let ticker_cloned = ticker.clone(); + + pool.execute(url, move |client| { + let ticker = ticker_cloned.clone(); + Box::pin(async move { + reject_yahoo_cookies(&client).await?; + extract_earnings_events(&client, &ticker).await + }) + }).await +} + +/// Extracts earnings events from the currently loaded Yahoo Finance earnings calendar page. +/// +/// This function assumes the client is already navigated to the correct URL (e.g., +/// https://finance.yahoo.com/calendar/earnings?symbol={ticker}) and cookies are handled. +/// +/// It waits for the earnings table, extracts rows, parses cells into CompanyEvent structs, +/// and handles date parsing, float parsing, and optional fields. +/// +/// # Arguments +/// * `client` - The fantoccini Client with the page loaded. +/// * `ticker` - The stock ticker symbol for the events. +/// +/// # Returns +/// A vector of CompanyEvent on success. +/// +/// # Errors +/// Returns an error if: +/// - Table or elements not found. +/// - Date or float parsing fails. +/// - WebDriver operations fail. +/// +/// # Examples +/// +/// ```no_run +/// use fantoccini::Client; +/// use crate::corporate::scraper::extract_earnings; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // Assume client is set up and navigated +/// let events = extract_earnings(&client, "AAPL").await?; +/// Ok(()) +/// } +/// ``` +pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result> { + // Wait for the table to load + let table = client + .wait() + .for_element(Locator::Css(r#"table[data-test="cal-table"]"#)) + .await + .map_err(|e| anyhow!("Failed to find earnings table: {}", e))?; + + // Find all rows in tbody + let rows = table + .find_all(Locator::Css("tbody tr")) + .await + .map_err(|e| anyhow!("Failed to find table rows: {}", e))?; + + let mut events = Vec::with_capacity(rows.len()); + + for row in rows { + let cells = row + .find_all(Locator::Css("td")) + .await + .map_err(|e| anyhow!("Failed to find cells in row: {}", e))?; + + if cells.len() < 5 { + continue; // Skip incomplete rows + } + + // Extract and parse date + let date_str = cells[0] + .text() + .await + .map_err(|e| anyhow!("Failed to get date text: {}", e))?; + let date = parse_yahoo_date(&date_str) + .map_err(|e| anyhow!("Failed to parse date '{}': {}", date_str, e))? + .format("%Y-%m-%d") + .to_string(); + + // Extract time, replace "Time Not Supplied" with empty + let time = cells[1] + .text() + .await + .map_err(|e| anyhow!("Failed to get time text: {}", e))? + .replace("Time Not Supplied", ""); + + // Extract period + let period = cells[2] + .text() + .await + .map_err(|e| anyhow!("Failed to get period text: {}", e))?; + + // Parse EPS forecast + let eps_forecast_str = cells[3] + .text() + .await + .map_err(|e| anyhow!("Failed to get EPS forecast text: {}", e))?; + let eps_forecast = parse_float(&eps_forecast_str); + + // Parse EPS actual + let eps_actual_str = cells[4] + .text() + .await + .map_err(|e| anyhow!("Failed to get EPS actual text: {}", e))?; + let eps_actual = parse_float(&eps_actual_str); + + // Parse surprise % if available + let surprise_pct = if cells.len() > 5 { + let surprise_str = cells[5] + .text() + .await + .map_err(|e| anyhow!("Failed to get surprise text: {}", e))?; + parse_float(&surprise_str) + } else { + None + }; + + events.push(CompanyEvent { + ticker: ticker.to_string(), + date, + time, + period, + eps_forecast, + eps_actual, + revenue_forecast: None, + revenue_actual: None, + surprise_pct, + source: "Yahoo".to_string(), + }); + } + + if events.is_empty() { + eprintln!("Warning: No earnings events extracted for ticker {}", ticker); + } else { + println!("Extracted {} earnings events for {}", events.len(), ticker); + } + + Ok(events) +} + +/// Rejecting Yahoo Cookies +async fn reject_yahoo_cookies(client: &Client) -> anyhow::Result<()> { + for _ in 0..10 { + let clicked: bool = client + .execute( + r#"(() => { + const btn = document.querySelector('#consent-page .reject-all'); + if (btn) { + btn.click(); + return true; + } + return false; + })()"#, + vec![], + ) + .await? + .as_bool() + .unwrap_or(false); + + if clicked { break; } + sleep(TokioDuration::from_millis(500)).await; + } + + logger::log_info("Rejected Yahoo cookies if button existed").await; + Ok(()) +} \ No newline at end of file diff --git a/src/economic/storage.rs b/src/economic/storage.rs index 23ad19b..c42d09f 100644 --- a/src/economic/storage.rs +++ b/src/economic/storage.rs @@ -116,7 +116,7 @@ pub async fn build_event_index(chunks: &[ChunkInfo]) -> anyhow::Result anyhow::Result> { // Find which chunk contains this event let entry = index.iter().find(|e| e.key == key); diff --git a/src/main.rs b/src/main.rs index 2bf40cd..a6465e3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,11 +14,6 @@ use util::directories::DataPaths; use util::{logger, opnv}; use std::sync::Arc; -/// Application entry point -// src/main.rs - -// ... existing imports ... - #[tokio::main] async fn main() -> Result<()> { cleanup_all_proxy_containers().await.ok(); @@ -138,7 +133,7 @@ async fn main() -> Result<()> { std::process::exit(0); }); } - + // === Step 4: Run the actual scraping jobs === logger::log_info("--- Starting ECONOMIC data update ---").await; economic::run_full_update(&config, &pool).await?; @@ -161,9 +156,4 @@ async fn main() -> Result<()> { logger::log_info("=== Application finished successfully ===").await; Ok(()) -} - -/* -memory allocation of 4294967296 bytes failed -error: process didn't exit successfully: `target\debug\event_backtest_engine.exe` (exit code: 0xc0000409, STATUS_STACK_BUFFER_OVERRUN) -*/ \ No newline at end of file +} \ No newline at end of file diff --git a/test/vpn_integration_tests.rs b/test/vpn_integration_tests.rs deleted file mode 100644 index 5fc2b76..0000000 --- a/test/vpn_integration_tests.rs +++ /dev/null @@ -1,379 +0,0 @@ -// tests/vpn_integration_tests.rs -//! Integration tests for VPN rotation system - -#[cfg(test)] -mod vpn_tests { - use event_backtest_engine::{ - scraper::{ - webdriver::ChromeDriverPool, - vpn_manager::{VpnInstance, VpnPool}, - }, - util::{directories::DataPaths, opnv}, - }; - use std::path::PathBuf; - use std::sync::Arc; - - /// Helper to create a test VPN instance without connecting - fn create_test_vpn_instance() -> VpnInstance { - VpnInstance::new( - PathBuf::from("test.ovpn"), - "testuser".to_string(), - "testpass".to_string(), - ) - .expect("Failed to create test VPN instance") - } - - #[test] - fn test_vpn_instance_creation() { - let vpn = create_test_vpn_instance(); - assert_eq!(vpn.hostname(), "test"); - assert!(!vpn.is_healthy()); - assert!(vpn.external_ip().is_none()); - } - - #[test] - fn test_vpn_task_counting() { - let mut vpn = create_test_vpn_instance(); - - // Should not rotate initially - assert!(!vpn.increment_task_count(10)); - - // Increment tasks - for i in 1..10 { - assert!(!vpn.increment_task_count(10), "Should not rotate at task {}", i); - } - - // Should rotate at threshold - assert!(vpn.increment_task_count(10), "Should rotate at task 10"); - - // Reset and verify - vpn.reset_task_count(); - assert!(!vpn.increment_task_count(10), "Should not rotate after reset"); - } - - #[test] - fn test_vpn_task_counting_zero_threshold() { - let mut vpn = create_test_vpn_instance(); - - // With threshold=0, should never auto-rotate - for _ in 0..100 { - assert!(!vpn.increment_task_count(0)); - } - } - - #[tokio::test] - async fn test_chromedriver_pool_creation_no_vpn() { - let result = ChromeDriverPool::new(2).await; - - match result { - Ok(pool) => { - assert_eq!(pool.get_number_of_instances(), 2); - assert!(!pool.is_vpn_enabled()); - } - Err(e) => { - eprintln!("ChromeDriver pool creation failed (expected if chromedriver not installed): {}", e); - } - } - } - - #[test] - fn test_data_paths_creation() { - let paths = DataPaths::new("./test_data").expect("Failed to create paths"); - - assert!(paths.data_dir().exists()); - assert!(paths.cache_dir().exists()); - assert!(paths.logs_dir().exists()); - assert!(paths.cache_openvpn_dir().exists()); - - // Cleanup - let _ = std::fs::remove_dir_all("./test_data"); - } - - #[tokio::test] - #[ignore] // This test requires actual network access and VPNBook availability - async fn test_fetch_vpnbook_configs() { - let paths = DataPaths::new(".").expect("Failed to create paths"); - - // This test requires a ChromeDriver pool - let pool_result = ChromeDriverPool::new(1).await; - if pool_result.is_err() { - eprintln!("Skipping VPNBook fetch test: ChromeDriver not available"); - return; - } - - let pool = Arc::new(pool_result.unwrap()); - - let result = opnv::fetch_vpnbook_configs(&pool, paths.cache_dir()).await; - - match result { - Ok((username, password, files)) => { - assert!(!username.is_empty(), "Username should not be empty"); - assert!(!password.is_empty(), "Password should not be empty"); - assert!(!files.is_empty(), "Should fetch at least one config file"); - - println!("Fetched {} VPN configs", files.len()); - for file in &files { - assert!(file.exists(), "Config file should exist: {:?}", file); - assert_eq!(file.extension().and_then(|s| s.to_str()), Some("ovpn")); - } - } - Err(e) => { - eprintln!("VPNBook fetch failed (may be temporary): {}", e); - } - } - } - - #[tokio::test] - #[ignore] // Requires actual VPN configs and OpenVPN installation - async fn test_vpn_pool_creation() { - let paths = DataPaths::new(".").expect("Failed to create paths"); - - // First fetch configs - let pool_result = ChromeDriverPool::new(1).await; - if pool_result.is_err() { - eprintln!("Skipping VPN pool test: ChromeDriver not available"); - return; - } - - let temp_pool = Arc::new(pool_result.unwrap()); - let fetch_result = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await; - - if fetch_result.is_err() { - eprintln!("Skipping VPN pool test: Could not fetch configs"); - return; - } - - let (username, password, _) = fetch_result.unwrap(); - - // Create VPN pool - let vpn_pool_result = VpnPool::new( - paths.cache_openvpn_dir(), - username, - password, - false, - 0, - ).await; - - match vpn_pool_result { - Ok(vpn_pool) => { - assert!(vpn_pool.len() > 0, "VPN pool should have at least one instance"); - println!("Created VPN pool with {} instances", vpn_pool.len()); - } - Err(e) => { - eprintln!("VPN pool creation failed: {}", e); - } - } - } - - #[tokio::test] - #[ignore] // Full integration test - requires all components - async fn test_full_vpn_integration() { - let paths = DataPaths::new(".").expect("Failed to create paths"); - - // Step 1: Create temp ChromeDriver pool for fetching - let temp_pool = match ChromeDriverPool::new(1).await { - Ok(p) => Arc::new(p), - Err(e) => { - eprintln!("Skipping integration test: ChromeDriver not available - {}", e); - return; - } - }; - - // Step 2: Fetch VPNBook configs - let (username, password, files) = match opnv::fetch_vpnbook_configs( - &temp_pool, - paths.cache_dir() - ).await { - Ok(result) => result, - Err(e) => { - eprintln!("Skipping integration test: Config fetch failed - {}", e); - return; - } - }; - - assert!(!files.is_empty(), "Should have fetched configs"); - - // Step 3: Create VPN pool - let vpn_pool = match VpnPool::new( - paths.cache_openvpn_dir(), - username, - password, - true, - 5, - ).await { - Ok(pool) => Arc::new(pool), - Err(e) => { - eprintln!("Skipping integration test: VPN pool creation failed - {}", e); - return; - } - }; - - // Step 4: Connect one VPN - let vpn_instance = vpn_pool.acquire().await.expect("Failed to acquire VPN"); - let connect_result = { - let mut vpn = vpn_instance.lock().await; - vpn.connect().await - }; - - match connect_result { - Ok(_) => { - let vpn = vpn_instance.lock().await; - println!("✓ VPN connected: {} ({})", - vpn.hostname(), - vpn.external_ip().unwrap_or("unknown") - ); - assert!(vpn.is_healthy()); - assert!(vpn.external_ip().is_some()); - } - Err(e) => { - eprintln!("VPN connection failed: {}", e); - } - } - - // Step 5: Create ChromeDriver pool with VPN - let driver_pool_result = ChromeDriverPool::new_with_vpn( - 1, - Some(vpn_pool.clone()) - ).await; - - match driver_pool_result { - Ok(driver_pool) => { - assert!(driver_pool.is_vpn_enabled()); - println!("✓ ChromeDriver pool created with VPN binding"); - } - Err(e) => { - eprintln!("ChromeDriver pool creation failed: {}", e); - } - } - - // Step 6: Cleanup - vpn_pool.disconnect_all().await.expect("Failed to disconnect VPNs"); - println!("✓ Integration test complete"); - } - - #[test] - fn test_hostname_extraction() { - // Test the hostname extraction logic - let test_cases = vec![ - ("test/ca149.vpnbook.com/config.ovpn", "ca149.vpnbook.com"), - ("test/us1.vpnbook.com/config.ovpn", "us1.vpnbook.com"), - ("test/de4.vpnbook.com/config.ovpn", "de4.vpnbook.com"), - ]; - - for (path, expected_hostname) in test_cases { - let pb = PathBuf::from(path); - let hostname = pb.parent() - .and_then(|p| p.file_name()) - .and_then(|n| n.to_str()) - .unwrap_or("unknown"); - - assert_eq!(hostname, expected_hostname); - } - } - - #[cfg(target_os = "windows")] - #[test] - fn test_forcebindip_manager_creation() { - use event_backtest_engine::ForceBindIpManager; - - match ForceBindIpManager::new() { - Ok(manager) => { - println!("✓ ForceBindIP found at: {:?}", manager.path()); - assert!(manager.path().exists()); - } - Err(e) => { - eprintln!("ForceBindIP not found (expected in dev): {}", e); - } - } - } - - #[cfg(target_os = "windows")] - #[test] - fn test_forcebindip_command_creation() { - use event_backtest_engine::ForceBindIpManager; - use std::path::Path; - - if let Ok(manager) = ForceBindIpManager::new() { - let cmd = manager.create_bound_command( - "192.168.1.100", - Path::new("test.exe"), - &["--arg1", "value1"], - ); - - let cmd_str = format!("{:?}", cmd); - assert!(cmd_str.contains("192.168.1.100")); - assert!(cmd_str.contains("test.exe")); - println!("✓ ForceBindIP command created successfully"); - } - } - - #[test] - fn test_config_defaults() { - use event_backtest_engine::Config; - - let config = Config::default(); - assert_eq!(config.economic_start_date, "2007-02-13"); - assert_eq!(config.corporate_start_date, "2010-01-01"); - assert_eq!(config.economic_lookahead_months, 3); - assert_eq!(config.max_parallel_instances, 10); - assert!(!config.enable_vpn_rotation); - assert_eq!(config.tasks_per_vpn_session, 0); - } -} - -#[cfg(test)] -mod benchmark_tests { - use super::*; - - #[tokio::test] - #[ignore] // Performance test - async fn benchmark_vpn_rotation_overhead() { - use std::time::Instant; - - // This test measures the overhead of VPN rotation - let start = Instant::now(); - - // Simulate rotation cycle - // 1. Disconnect (instant) - // 2. Wait 2 seconds - // 3. Connect (5-10 seconds) - // 4. Verify IP (1-2 seconds) - - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - - let elapsed = start.elapsed(); - println!("Rotation cycle took: {:?}", elapsed); - - // Typical rotation should complete in under 15 seconds - assert!(elapsed.as_secs() < 15); - } - - #[tokio::test] - #[ignore] // Performance test - async fn benchmark_parallel_scraping() { - // This test measures throughput with different parallelism levels - // Results help tune MAX_PARALLEL_INSTANCES - - let configs = vec![1, 2, 3, 5, 10]; - - for &pool_size in &configs { - println!("Testing with {} parallel instances...", pool_size); - - // Would need actual scraping implementation here - // For now, just verify pool creation time - let start = std::time::Instant::now(); - - let pool_result = event_backtest_engine::ChromeDriverPool::new(pool_size).await; - - if let Ok(_pool) = pool_result { - let elapsed = start.elapsed(); - println!(" Pool initialization: {:?}", elapsed); - - // Pool creation should be fast (< 5 seconds per instance) - assert!(elapsed.as_secs() < pool_size as u64 * 5); - } else { - eprintln!(" Skipped - ChromeDriver not available"); - } - } - } -} \ No newline at end of file