From f95e9e2427389960f87607d0058b63c605dcc6eb Mon Sep 17 00:00:00 2001 From: donpat1to Date: Tue, 9 Dec 2025 16:56:45 +0100 Subject: [PATCH] commented unused function --- src/corporate/openfigi.rs | 359 +++++++++++++++++++++++--------------- src/corporate/scraper.rs | 76 +++++++- src/corporate/storage.rs | 4 +- src/corporate/types.rs | 50 ++---- 4 files changed, 308 insertions(+), 181 deletions(-) diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index ac7eac9..d63353a 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -6,6 +6,8 @@ use super::{types::*}; use reqwest::Client as HttpClient; use reqwest::header::{HeaderMap, HeaderValue}; use serde_json::{json, Value}; +use csv::{ReaderBuilder, StringRecord, WriterBuilder}; +use chrono::NaiveDate; use std::collections::{HashMap, HashSet}; use std::path::{Path}; use std::time::Instant; @@ -203,12 +205,12 @@ impl OpenFigiClient { name: item["name"].as_str().unwrap_or("").to_string(), ticker: item["ticker"].as_str().unwrap_or("").to_string(), exch_code: item["exchCode"].as_str().unwrap_or("").to_string(), - compositeFIGI: item["compositeFIGI"].as_str().unwrap_or("").to_string(), - securityType: sec_type.to_string(), - marketSector: market_sec.to_string(), - shareClassFIGI: item["shareClassFIGI"].as_str().unwrap_or("").to_string(), - securityType2: item["securityType2"].as_str().unwrap_or("").to_string(), - securityDescription: item["securityDescription"].as_str().unwrap_or("").to_string(), + composite_figi: item["compositeFIGI"].as_str().unwrap_or("").to_string(), + security_type: sec_type.to_string(), + market_sector: market_sec.to_string(), + share_class_figi: item["shareClassFIGI"].as_str().unwrap_or("").to_string(), + security_type2: item["securityType2"].as_str().unwrap_or("").to_string(), + security_description: item["securityDescription"].as_str().unwrap_or("").to_string(), }; all_figi_infos.push(figi_info); @@ -318,12 +320,43 @@ async fn load_market_sectors() -> anyhow::Result> { /// Returns the extracted date in format "DDMMYYYY" from the filename. /// If no GLEIF file is found, returns None. async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result> { + // First check for subdirectories named as DDMMYYYY and pick the most recent date + let mut dir_entries = tokio_fs::read_dir(gleif_cache_dir) + .await + .context("Failed to read gleif cache directory")?; + + let mut found_dates: Vec = Vec::new(); + + while let Some(entry) = dir_entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + // Expect folder name in DDMMYYYY + if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { + if let Ok(nd) = NaiveDate::parse_from_str(name, "%d%m%Y") { + found_dates.push(nd); + } + } + } + } + } + + if !found_dates.is_empty() { + found_dates.sort(); + if let Some(most_recent) = found_dates.last() { + let date_str = most_recent.format("%d%m%Y").to_string(); + let msg = format!(" Found GLEIF data dated (from subdirs): {}", date_str); + logger::log_info(&msg).await; + return Ok(Some(date_str)); + } + } + + // Fallback: look for CSV files in the directory as before let mut entries = tokio_fs::read_dir(gleif_cache_dir) .await .context("Failed to read gleif cache directory")?; - let mut csv_files = Vec::new(); - + while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if let Some(filename) = path.file_name() { @@ -333,20 +366,20 @@ async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result d, None => { let err = "No GLEIF CSV file found in cache/gleif directory"; @@ -452,7 +485,10 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap(); let total = leis.len(); let mut no_hit_leis = Vec::new(); // Track LEIs with no data found (no_hit) + let mut leis_to_delete_batch = Vec::new(); // Batch delete every 100 LEIs let msg = format!("Total LEIs to process: {}, already processed: {}", total, processed); @@ -535,12 +572,18 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap= 100 { + let batch_msg = format!("Batch deleting {} LEIs from GLEIF CSV...", leis_to_delete_batch.len()); + logger::log_info(&batch_msg).await; + if let Err(e) = remove_leis_batch_from_gleif_csv(&gleif_date_dir, &leis_to_delete_batch).await { + let warn_msg = format!("Warning: Failed to batch remove LEIs from GLEIF CSV: {}", e); + eprintln!("{}", warn_msg); + logger::log_warn(&warn_msg).await; + } + leis_to_delete_batch.clear(); } continue; @@ -554,7 +597,7 @@ async fn build_lei_to_figi_infos_internal(lei_to_isins: &HashMap Ok(()) } -/// Removes a single invalid LEI from the GLEIF CSV file immediately. +/// Removes multiple invalid LEIs from the GLEIF CSV file in a single batch operation. /// -/// This function is called after each no_hit detection to prevent progress loss on interrupt. -/// It reads the GLEIF CSV, filters out the specific LEI, and overwrites the file. -/// -/// # Arguments -/// -/// * `gleif_cache_dir` - Path to the cache/gleif directory -/// * `lei` - The LEI string to remove -/// -/// # Returns -/// Ok(()) if successful, Err if file operations fail. -async fn remove_lei_from_gleif_csv_single(gleif_cache_dir: &Path, lei: &str) -> anyhow::Result<()> { - // Find the most recent GLEIF CSV file - let mut entries = tokio_fs::read_dir(gleif_cache_dir) - .await - .context("Failed to read gleif cache directory")?; - - let mut csv_files = Vec::new(); - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if let Some(filename) = path.file_name() { - let filename_str = filename.to_string_lossy(); - if filename_str.ends_with(".csv") && filename_str.contains("isin-lei-") { - csv_files.push(path); - } - } - } - - if csv_files.is_empty() { - return Ok(()); - } - - // Sort and get the most recent - csv_files.sort(); - csv_files.reverse(); - let gleif_file = &csv_files[0]; - - // Read the CSV file - let content = tokio_fs::read_to_string(gleif_file) - .await - .context("Failed to read GLEIF CSV")?; - - // Filter out line with this LEI - let filtered_lines: Vec<&str> = content - .lines() - .filter(|line| { - // GLEIF CSV format: ISIN,LEI - let parts: Vec<&str> = line.split(',').collect(); - if parts.len() >= 2 { - parts[1] != lei - } else { - true // Keep lines that don't match the format (e.g., header) - } - }) - .collect(); - - // Only write if something was actually removed - if filtered_lines.len() < content.lines().count() { - let new_content = filtered_lines.join("\n") + "\n"; - tokio_fs::write(gleif_file, new_content) - .await - .context("Failed to write filtered GLEIF CSV")?; - } - - Ok(()) -} - -/// Removes invalid LEIs from the GLEIF CSV file. -/// -/// When an API call succeeds but returns no data (no_hit), the LEI is considered invalid -/// and should be removed from the GLEIF CSV to prevent re-scraping on future runs. -/// -/// This function reads the GLEIF CSV, filters out the specified LEIs, and overwrites the file. +/// This function is more efficient than removing LEIs one at a time. +/// It reads the GLEIF CSV once, filters out all specified LEIs, and overwrites the file once. /// /// # Arguments /// @@ -779,8 +762,10 @@ async fn remove_lei_from_gleif_csv_single(gleif_cache_dir: &Path, lei: &str) -> /// /// # Returns /// Ok(()) if successful, Err if file operations fail. -async fn remove_leis_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[String]) -> anyhow::Result<()> { - logger::log_info(&format!("Removing {} invalid LEIs from GLEIF CSV...", leis_to_remove.len())).await; +async fn remove_leis_batch_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[String]) -> anyhow::Result<()> { + if leis_to_remove.is_empty() { + return Ok(()); + } // Find the most recent GLEIF CSV file let mut entries = tokio_fs::read_dir(gleif_cache_dir) @@ -788,7 +773,7 @@ async fn remove_leis_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[St .context("Failed to read gleif cache directory")?; let mut csv_files = Vec::new(); - + while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if let Some(filename) = path.file_name() { @@ -800,53 +785,151 @@ async fn remove_leis_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove: &[St } if csv_files.is_empty() { - logger::log_warn("No GLEIF CSV files found for removal operation").await; + logger::log_warn("No GLEIF CSV files found for batch removal operation").await; return Ok(()); } - // Sort and get the most recent + // Prefer an original (non-_clean) GLEIF CSV if available; otherwise use the most recent file. csv_files.sort(); csv_files.reverse(); - let gleif_file = &csv_files[0]; - let debug_msg = format!("Reading GLEIF file: {}", gleif_file.display()); + + let mut gleif_file: &std::path::PathBuf = &csv_files[0]; + // Try to find the most recent filename that does NOT end with "_clean.csv" + if let Some(non_clean) = csv_files.iter().find(|p| { + p.file_name() + .and_then(|n| n.to_str()) + .map(|s| !s.to_lowercase().ends_with("_clean.csv")) + .unwrap_or(false) + }) { + gleif_file = non_clean; + } + + // Prepare clean file path: insert "_clean" before extension + let orig_path = gleif_file; + let file_name = orig_path.file_name().and_then(|n| n.to_str()).unwrap_or("gleif.csv"); + let mut stem = orig_path.file_stem().and_then(|s| s.to_str()).unwrap_or("isin-lei").to_string(); + let parent = orig_path.parent().unwrap_or_else(|| Path::new(".")); + // Avoid creating a double "_clean_clean.csv". If stem already ends with "_clean", keep it. + if stem.to_lowercase().ends_with("_clean") { + // stem is already clean; keep same filename (no double suffix) + // e.g., stem="isin-lei-24112025_clean" -> clean_name="isin-lei-24112025_clean.csv" + } else { + stem = format!("{}_clean", stem); + } + + let clean_name = format!("{}.csv", stem); + let clean_path = parent.join(&clean_name); + + // If a clean file already exists, operate on it; otherwise read original and write clean file + let source_path = if clean_path.exists() { &clean_path } else { orig_path }; + + let debug_msg = format!("Reading GLEIF source for batch removal: {} (writing to {})", source_path.display(), clean_path.display()); logger::log_info(&debug_msg).await; - - // Read the CSV file - let content = tokio_fs::read_to_string(gleif_file) - .await - .context("Failed to read GLEIF CSV")?; - - let original_lines = content.lines().count(); - - // Convert LEIs to remove into a HashSet for faster lookup - let remove_set: std::collections::HashSet<_> = leis_to_remove.iter().cloned().collect(); - - // Filter out lines with LEIs to remove - let filtered_lines: Vec<&str> = content - .lines() - .filter(|line| { - // GLEIF CSV format: ISIN,LEI - let parts: Vec<&str> = line.split(',').collect(); - if parts.len() >= 2 { - !remove_set.contains(parts[1]) - } else { - true // Keep lines that don't match the format (e.g., header) + + // Cleanup any accidental double-clean files in the same directory: if a file ends with + // "_clean_clean.csv" replace it with single "_clean.csv" or remove it if target exists. + if let Ok(mut dir_entries) = tokio_fs::read_dir(parent).await { + while let Ok(Some(entry)) = dir_entries.next_entry().await { + if let Some(name) = entry.file_name().to_str().map(|s| s.to_string()) { + if name.to_lowercase().ends_with("_clean_clean.csv") { + let offending = entry.path(); + let candidate = offending.file_name().and_then(|n| n.to_str()).unwrap_or(""); + let target_name = candidate.replacen("_clean_clean.csv", "_clean.csv", 1); + let target_path = parent.join(target_name); + + if !target_path.exists() { + // Rename offending -> target + let _ = tokio_fs::rename(&offending, &target_path).await; + let msg = format!("Renamed {} -> {}", offending.display(), target_path.display()); + logger::log_info(&msg).await; + } else { + // Target exists already; remove offending duplicate + let _ = tokio_fs::remove_file(&offending).await; + let msg = format!("Removed duplicate {}", offending.display()); + logger::log_info(&msg).await; + } + } } - }) - .collect(); - - let removed_count = original_lines - filtered_lines.len(); - - // Write back the filtered content - let new_content = filtered_lines.join("\n") + "\n"; - tokio_fs::write(gleif_file, new_content) + } + } + + // Read file into memory and parse with csv crate for robust handling of quoted fields + let content = tokio_fs::read_to_string(source_path) .await - .context("Failed to write filtered GLEIF CSV")?; - - let success_msg = format!("✓ Removed {} invalid LEIs from GLEIF CSV (was {} lines, now {} lines)", leis_to_remove.len(), original_lines, filtered_lines.len()); + .context("Failed to read GLEIF CSV source")?; + + // Convert LEIs to remove into a HashSet (normalized) + let remove_set: std::collections::HashSet = leis_to_remove + .iter() + .map(|s| s.trim().trim_matches('"').to_uppercase()) + .collect(); + + // Build CSV reader: try with headers first; allow flexible records + let mut reader = ReaderBuilder::new() + .has_headers(true) + .flexible(true) + .from_reader(content.as_bytes()); + + // Remember headers (if present) and then iterate records. + let headers_record = match reader.headers() { + Ok(h) => Some(h.clone()), + Err(_) => None, + }; + + // We'll collect kept records and count original rows + let mut kept_records: Vec = Vec::new(); + let mut original_count: usize = 0; + let mut removed_count: usize = 0; + + // For robustness, search all columns for a matching LEI instead of relying on a single column index. + for result in reader.records() { + let record = result.context("Failed to parse CSV record")?; + original_count += 1; + + // Check every field for a match in the remove set + let mut matched = false; + for field in record.iter() { + let norm = field.trim().trim_matches('"').to_uppercase(); + if remove_set.contains(&norm) { + matched = true; + break; + } + } + + if matched { + removed_count += 1; + } else { + kept_records.push(record.clone()); + } + } + + let new_count = kept_records.len(); + + // Write back using csv writer to preserve quoting/format into clean file + let mut wtr = WriterBuilder::new().has_headers(true).from_writer(vec![]); + // If original had headers, write them back + if let Some(headers) = headers_record { + wtr.write_record(headers.iter())?; + } + + for rec in &kept_records { + wtr.write_record(rec.iter())?; + } + + let out_bytes = wtr.into_inner().context("Failed to finalize CSV writer")?; + let out_str = String::from_utf8(out_bytes).context("CSV output not valid UTF-8")?; + + tokio_fs::write(&clean_path, out_str) + .await + .context("Failed to write filtered GLEIF CSV clean file")?; + + let success_msg = format!( + "✓ Batch attempted to remove {} LEIs from GLEIF CSV (was {} records, now {} records, removed {} rows) -> {}", + leis_to_remove.len(), original_count, new_count, removed_count, clean_path.display() + ); println!("{}", success_msg); logger::log_info(&success_msg).await; - + Ok(()) } @@ -907,7 +990,7 @@ pub async fn load_or_build_all_securities( let mut option_securities = Vec::new(); for figi_info in figi_infos { - match figi_info.securityType.as_str() { + match figi_info.security_type.as_str() { "Common Stock" => common_stocks.push(figi_info.clone()), "Equity WRT" => warrant_securities.push(figi_info.clone()), "Equity Option" => option_securities.push(figi_info.clone()), diff --git a/src/corporate/scraper.rs b/src/corporate/scraper.rs index d04d2f8..486ca88 100644 --- a/src/corporate/scraper.rs +++ b/src/corporate/scraper.rs @@ -36,7 +36,7 @@ const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/ /// - Not an equity (ETF, bond, etc.) /// - Missing critical fields /// - Network or JSON parsing errors -pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result { +/*pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result { let url = format!( "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price%2CassetProfile", ticker @@ -149,7 +149,7 @@ pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result { exchange_mic, currency, }) -} +}*/ /// Fetches earnings events for a ticker using a dedicated ScrapeTask. /// @@ -546,6 +546,67 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { let parsed_filename = parse_gleif_filename(&filename); logger::log_info(&format!("Corporate Scraper: Downloaded file: {} -> {}", filename, parsed_filename)).await; + // Determine date (DDMMYYYY) from parsed filename: "isin-lei-DDMMYYYY.csv" + let mut date_str = String::new(); + if let Some(start_idx) = parsed_filename.find("isin-lei-") { + let rest = &parsed_filename[start_idx + 9..]; + if rest.len() >= 8 { + date_str = rest[0..8].to_string(); + } + } + + // If we parsed a date, use/create a date folder under cache/gleif and operate inside it; otherwise use cache root. + let date_dir = if !date_str.is_empty() { + let p = gleif_cache_dir.join(&date_str); + // Ensure the date folder exists (create if necessary) + if let Err(e) = std::fs::create_dir_all(&p) { + let msg = format!("Failed to create date directory {:?}: {}", p, e); + logger::log_warn(&msg).await; + None + } else { + Some(p) + } + } else { + None + }; + + // Choose the directory where we'll look for existing files and where we'll save the new ones + let target_dir = date_dir.clone().unwrap_or_else(|| gleif_cache_dir.to_path_buf()); + + // If the date folder exists (or was created), prefer any *_clean.csv inside it and return that immediately + if let Some(ref ddir) = date_dir { + if let Ok(entries) = std::fs::read_dir(ddir) { + for entry in entries.flatten() { + if let Some(name) = entry.file_name().to_str() { + if name.to_lowercase().ends_with("_clean.csv") { + let path = ddir.join(name); + logger::log_info(&format!("Found existing clean GLEIF CSV: {}", path.display())).await; + return Ok(Some(path.to_string_lossy().to_string())); + } + } + } + } + } + + // If no clean file found in the date folder (or date folder doesn't exist), check whether the csv/zip already exist in the target dir + let csv_candidate_name = parsed_filename.replace(".zip", ".csv"); + let csv_candidate = target_dir.join(&csv_candidate_name); + let zip_candidate = target_dir.join(&parsed_filename); + + if csv_candidate.exists() { + logger::log_info(&format!("Found existing GLEIF CSV: {}", csv_candidate.display())).await; + return Ok(Some(csv_candidate.to_string_lossy().to_string())); + } + if zip_candidate.exists() { + // If zip exists but csv does not, extract later; for now prefer returning csv path (may be created by extraction step) + let inferred_csv = target_dir.join(csv_candidate_name); + if inferred_csv.exists() { + logger::log_info(&format!("Found existing extracted CSV next to ZIP: {}", inferred_csv.display())).await; + return Ok(Some(inferred_csv.to_string_lossy().to_string())); + } + // otherwise we'll overwrite/extract into target_dir below + } + let bytes = match resp.bytes().await { Ok(b) => b, Err(e) => { @@ -555,9 +616,13 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { return Ok(None); } }; + // Ensure target directory exists (create if it's the date folder and was absent earlier) + if let Some(ref ddir) = date_dir { + let _ = std::fs::create_dir_all(ddir); + } - let zip_path = gleif_cache_dir.join(&parsed_filename); - let csv_path = gleif_cache_dir.join(parsed_filename.replace(".zip", ".csv")); + let zip_path = target_dir.join(&parsed_filename); + let csv_path = target_dir.join(parsed_filename.replace(".zip", ".csv")); if let Err(e) = tokio::fs::write(&zip_path, &bytes).await { let msg = format!("Failed to write ZIP file: {}", e); @@ -616,19 +681,16 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { if let Err(e) = csv_file.read_to_end(&mut csv_bytes) { let msg = format!("Failed to extract CSV: {}", e); logger::log_error(&msg).await; - println!("{}", msg); return Ok(None); } if let Err(e) = tokio::fs::write(&csv_path, &csv_bytes).await { let msg = format!("Failed to save CSV file: {}", e); logger::log_error(&msg).await; - println!("{}", msg); return Ok(None); } let msg = format!("✓ ISIN/LEI CSV extracted: {:?}", csv_path); - println!("{}", msg); logger::log_info(&msg).await; Ok(Some(csv_path.to_string_lossy().to_string())) diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index 71fc198..40c5063 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -178,7 +178,7 @@ pub async fn save_prices_by_source( } /// Update available_exchanges.json with fetch results -pub async fn update_available_exchange( +/*pub async fn update_available_exchange( paths: &DataPaths, isin: &str, ticker: &str, @@ -205,7 +205,7 @@ pub async fn update_available_exchange( } save_available_exchanges(paths, isin, exchanges).await -} +}*/ /// Infer currency from ticker suffix fn infer_currency_from_ticker(ticker: &str) -> String { diff --git a/src/corporate/types.rs b/src/corporate/types.rs index d1fca4e..b08460c 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -53,12 +53,18 @@ pub struct FigiInfo { pub name: String, pub ticker: String, pub exch_code: String, - pub compositeFIGI: String, - pub securityType: String, - pub marketSector: String, - pub shareClassFIGI: String, - pub securityType2: String, - pub securityDescription: String, + #[serde(rename = "compositeFIGI")] + pub composite_figi: String, + #[serde(rename = "securityType")] + pub security_type: String, + #[serde(rename = "marketSector")] + pub market_sector: String, + #[serde(rename = "shareClassFIGI")] + pub share_class_figi: String, + #[serde(rename = "securityType2")] + pub security_type2: String, + #[serde(rename = "securityDescription")] + pub security_description: String, } /// Company Info @@ -77,11 +83,11 @@ pub struct CompanyInfo{ /// # Attributes /// * lei: Structuring the companies by legal dependencies [LEI -> Vec] /// * figi: metadata with ISIN as key -#[derive(Debug, Clone, Serialize, Deserialize)] +/*#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CompanyMetadata { pub lei: String, pub figi: Option>, -} +}*/ /// Warrant Info /// @@ -112,13 +118,13 @@ pub struct OptionInfo { pub options: HashMap>, // ISIN -> Vec (grouped by ISIN) } -#[derive(Debug, Clone, Serialize, Deserialize)] +/*#[derive(Debug, Clone, Serialize, Deserialize)] pub struct PrimaryInfo { pub isin: String, pub name: String, pub exchange_mic: String, pub currency: String, -} +}*/ #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AvailableExchange { @@ -133,28 +139,4 @@ pub struct AvailableExchange { pub discovered_at: Option, // When this exchange was first discovered #[serde(default)] pub fetch_count: u32, // How many times successfully fetched -} - -impl AvailableExchange { - pub fn new(ticker: String, exchange_mic: String, currency: String) -> Self { - Self { - exchange_mic, - ticker, - has_daily: false, - has_5min: false, - last_successful_fetch: None, - currency, - discovered_at: Some(chrono::Local::now().format("%Y-%m-%d").to_string()), - fetch_count: 0, - } - } - - pub fn record_success(&mut self, has_daily: bool, has_5min: bool) { - let today = chrono::Local::now().format("%Y-%m-%d").to_string(); - - self.has_daily |= has_daily; - self.has_5min |= has_5min; - self.last_successful_fetch = Some(today); - self.fetch_count += 1; - } } \ No newline at end of file