From a6823dc938810087e9215c0f6c50b223a24f1a96 Mon Sep 17 00:00:00 2001 From: donpat1to Date: Fri, 5 Dec 2025 22:32:42 +0100 Subject: [PATCH] moved data capturing into cache folder --- .gitignore | 1 + src/corporate/openfigi.rs | 153 ++++++++++++++++++++++++++++++-------- src/corporate/scraper.rs | 98 +++++++++++++++++++----- src/corporate/update.rs | 2 +- src/economic/scraper.rs | 29 -------- src/util/directories.rs | 27 +++++++ 6 files changed, 230 insertions(+), 80 deletions(-) diff --git a/.gitignore b/.gitignore index 87a9f5f..b209807 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ target/ **/*.jsonl **/*.csv **/*.zip +**/*.log #/economic_events* #/economic_event_changes* diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index 5068954..69bd810 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -1,15 +1,18 @@ +use crate::util::directories::DataPaths; + // src/corporate/openfigi.rs use super::{types::*}; use reqwest::Client as HttpClient; use reqwest::header::{HeaderMap, HeaderValue}; use serde_json::{json, Value}; use std::collections::{HashMap, HashSet}; -use std::fs::{File, OpenOptions}; -use std::io::{BufRead, BufReader, Write}; -use std::path::Path; +use std::fs::File; +use std::io::Write; +use std::path::{Path, PathBuf}; use std::time::Instant; use tokio::time::{sleep, Duration}; use tokio::fs as tokio_fs; +use tokio::io::AsyncWriteExt; use anyhow::{Context, anyhow}; #[derive(Clone)] @@ -103,7 +106,7 @@ impl OpenFigiClient { .map(|isin| json!({ "idType": "ID_ISIN", "idValue": isin, - "marketSecDes": "Equity", + //"marketSecDes": "Equity", })) .collect(); @@ -195,9 +198,69 @@ impl OpenFigiClient { } } +/// Extracts the date from a GLEIF CSV filename in the format "isin-lei-DDMMYYYY.csv". +/// +/// # Arguments +/// +/// * `filename` - The GLEIF CSV filename (e.g., "isin-lei-24112025.csv") +/// +/// # Returns +/// +/// A string in the format "DDMMYYYY" (e.g., "24112025") if successfully parsed, otherwise the original filename. +fn extract_gleif_date_from_filename(filename: &str) -> String { + // Pattern: isin-lei-DDMMYYYY.csv + if let Some(start_idx) = filename.find("isin-lei-") { + let rest = &filename[start_idx + 9..]; // Skip "isin-lei-" + if rest.len() >= 8 && rest[0..8].chars().all(|c| c.is_numeric()) { + return rest[0..8].to_string(); + } + } + filename.to_string() +} + +/// Finds the most recent GLEIF CSV file in the cache/gleif directory. +/// +/// Returns the extracted date in format "DDMMYYYY" from the filename. +/// If no GLEIF file is found, returns None. +async fn find_most_recent_gleif_date(gleif_cache_dir: &Path) -> anyhow::Result> { + let mut entries = tokio_fs::read_dir(gleif_cache_dir) + .await + .context("Failed to read gleif cache directory")?; + + let mut csv_files = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if let Some(filename) = path.file_name() { + let filename_str = filename.to_string_lossy(); + if filename_str.ends_with(".csv") && filename_str.contains("isin-lei-") { + csv_files.push(filename_str.to_string()); + } + } + } + + if csv_files.is_empty() { + return Ok(None); + } + + // Sort files in reverse order (most recent first) based on date in filename + csv_files.sort(); + csv_files.reverse(); + + let most_recent = &csv_files[0]; + let date = extract_gleif_date_from_filename(most_recent); + + println!(" Found GLEIF data dated: {}", date); + Ok(Some(date)) +} + /// Builds a LEI-to-FigiInfo map from the LEI-ISIN mapping, filtering for equities via OpenFIGI. /// -/// Attempts to load existing entries from "data/corporate/by_lei/lei_to_figi.jsonl" (JSON Lines format, +/// The mapping is stored in cache/glei_openfigi/{GLEIF_DATE}/lei_to_figi.jsonl, where GLEIF_DATE +/// is extracted from the GLEIF CSV filename (format: DDMMYYYY). If no specific date is provided, +/// the most recent GLEIF file in cache/gleif is used. +/// +/// Attempts to load existing entries from the date-based jsonl file (JSON Lines format, /// one LEI entry per line: {"lei": "ABC", "figis": [FigiInfo...]}). For any missing LEIs (compared to /// `lei_to_isins`), fetches their FigiInfos and appends to the .jsonl file incrementally. /// @@ -209,6 +272,7 @@ impl OpenFigiClient { /// # Arguments /// /// * `lei_to_isins` - HashMap of LEI to Vec (used for fetching missing entries). +/// * `gleif_date` - Optional date in format "DDMMYYYY". If None, uses the most recent GLEIF file. /// /// # Returns /// @@ -218,12 +282,29 @@ impl OpenFigiClient { /// /// Returns an error if file I/O fails, JSON serialization/deserialization fails, /// or if OpenFIGI queries fail during fetching. -pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap>) -> anyhow::Result>> { - let data_dir = Path::new("data/corporate/by_lei"); - tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?; - - let path = data_dir.join("lei_to_figi.jsonl"); - let mut lei_to_figis: HashMap> = load_lei_to_figi_jsonl(&path)?; +pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap>, gleif_date: Option<&str>) -> anyhow::Result>> { + let dir = DataPaths::new(".")?; + let gleif_cache_dir = dir.cache_gleif_dir(); + let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); + + // Determine the GLEIF date to use + let date = if let Some(d) = gleif_date { + d.to_string() + } else { + // Find the most recent GLEIF file + match find_most_recent_gleif_date(gleif_cache_dir).await? { + Some(d) => d, + None => return Err(anyhow!("No GLEIF CSV file found in cache/gleif directory")), + } + }; + + // Create date-based subdirectory in the mapping cache + let date_dir = map_cache_dir.join(&date); + tokio_fs::create_dir_all(&date_dir).await.context("Failed to create date directory")?; + + let path = date_dir.join("lei_to_figi.jsonl"); + println!("Using LEI→FIGI mapping at: {}", path.display()); + let mut lei_to_figis: HashMap> = load_lei_to_figi_jsonl(&path).await?; let client = OpenFigiClient::new()?; if !client.has_key { @@ -258,7 +339,7 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap> } // Append to .jsonl incrementally - append_lei_to_figi_jsonl(&path, &lei, &figis).context("Failed to append to JSONL")?; + append_lei_to_figi_jsonl(&path, &lei, &figis).await.context("Failed to append to JSONL")?; // Insert into in-memory map lei_to_figis.insert(lei.clone(), figis); @@ -290,18 +371,16 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap> /// # Errors /// /// Returns an error if the file cannot be opened or if any line fails to parse as JSON. -fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result>> { +async fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result>> { let mut map = HashMap::new(); if !path.exists() { return Ok(map); } - let file = File::open(path).context("Failed to open JSONL file for reading")?; - let reader = BufReader::new(file); + let content = tokio_fs::read_to_string(path).await.context("Failed to read JSONL file")?; - for (line_num, line) in reader.lines().enumerate() { - let line = line.context(format!("Failed to read line {}", line_num + 1))?; + for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() { continue; } @@ -328,20 +407,24 @@ fn load_lei_to_figi_jsonl(path: &Path) -> anyhow::Result anyhow::Result<()> { - let mut file = OpenOptions::new() - .create(true) - .append(true) - .open(path) - .context("Failed to open JSONL file for append")?; - +async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyhow::Result<()> { let entry = json!({ "lei": lei, "figis": figis, }); let line = serde_json::to_string(&entry).context("Failed to serialize entry")? + "\n"; - file.write_all(line.as_bytes()).context("Failed to write to JSONL file")?; + + let mut file = tokio_fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + .await + .context("Failed to open JSONL file for append")?; + + file.write_all(line.as_bytes()) + .await + .context("Failed to write to JSONL file")?; Ok(()) } @@ -373,19 +456,22 @@ pub async fn load_or_build_all_securities( HashMap> )> { // Load existing data - let mut companies = load_from_cache("data/corporate/by_name/common_stocks.json").await? + let mut commons = load_from_cache("data/corporate/by_name/common_stocks.json").await? .unwrap_or_else(HashMap::new); let mut warrants = load_from_cache("data/corporate/by_name/warrants.json").await? .unwrap_or_else(HashMap::new); let mut options = load_from_cache("data/corporate/by_name/options.json").await? .unwrap_or_else(HashMap::new); + /*let mut preferred = load_from_cache("data/corporate/by_name/preferred.json").await? + .unwrap_or_else(HashMap::new);*/ + println!("Loaded existing data:"); - println!(" - Companies: {}", companies.len()); + println!(" - Companies: {}", commons.len()); println!(" - Warrants: {}", warrants.len()); println!(" - Options: {}", options.len()); - let mut stats = ProcessingStats::new(companies.len(), warrants.len(), options.len()); + let mut stats = ProcessingStats::new(commons.len(), warrants.len(), options.len()); println!("Processing {} LEI entries from FIGI data...", figi_to_lei.len()); @@ -410,7 +496,7 @@ pub async fn load_or_build_all_securities( // Process common stocks -> companies if !common_stocks.is_empty() { - process_common_stocks(&mut companies, &common_stocks, &mut stats); + process_common_stocks(&mut commons, &common_stocks, &mut stats); } // Process warrants @@ -424,14 +510,14 @@ pub async fn load_or_build_all_securities( } } - stats.print_summary(companies.len(), warrants.len(), options.len()); + stats.print_summary(commons.len(), warrants.len(), options.len()); // Save all three HashMaps - save_to_cache("data/corporate/by_name/common_stocks.json", &companies).await?; + save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?; save_to_cache("data/corporate/by_name/warrants.json", &warrants).await?; save_to_cache("data/corporate/by_name/options.json", &options).await?; - Ok((companies, warrants, options)) + Ok((commons, warrants, options)) } /// Statistics tracker for processing @@ -795,7 +881,8 @@ pub async fn load_figi_type_lists() -> anyhow::Result<()> { let client = OpenFigiClient::new()?; // Create cache directory - let cache_dir = Path::new("data/openfigi"); + let dir = DataPaths::new(".")?; + let cache_dir = dir.cache_openfigi_dir(); tokio_fs::create_dir_all(cache_dir).await .context("Failed to create data/openfigi directory")?; diff --git a/src/corporate/scraper.rs b/src/corporate/scraper.rs index 43af244..d04d2f8 100644 --- a/src/corporate/scraper.rs +++ b/src/corporate/scraper.rs @@ -1,7 +1,7 @@ // src/corporate/scraper.rs use super::{types::*, helpers::*, openfigi::*}; //use crate::corporate::openfigi::OpenFigiClient; -use crate::{webdriver::webdriver::*}; +use crate::{webdriver::webdriver::*, util::directories::DataPaths, util::logger}; use fantoccini::{Client, Locator}; use scraper::{Html, Selector}; use chrono::{DateTime, Duration, NaiveDate, Utc}; @@ -490,11 +490,19 @@ pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow pub async fn download_isin_lei_csv() -> anyhow::Result> { let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download"; - if let Err(e) = std::fs::create_dir_all("data/gleif") { - println!("Failed to create data directory: {e}"); + // Initialize DataPaths and create cache/gleif directory + let paths = DataPaths::new(".")?; + let gleif_cache_dir = paths.cache_gleif_dir(); + + if let Err(e) = std::fs::create_dir_all(&gleif_cache_dir) { + let msg = format!("Failed to create cache/gleif directory: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } + logger::log_info("Corporate Scraper: Downloading ISIN/LEI mapping from GLEIF...").await; + // Download ZIP and get the filename from Content-Disposition header let client = match reqwest::Client::builder() .user_agent(USER_AGENT) @@ -503,7 +511,9 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { { Ok(c) => c, Err(e) => { - println!("Failed to create HTTP client: {e}"); + let msg = format!("Failed to create HTTP client: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } }; @@ -511,11 +521,15 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { let resp = match client.get(url).send().await { Ok(r) if r.status().is_success() => r, Ok(resp) => { - println!("Server returned HTTP {}", resp.status()); + let msg = format!("Server returned HTTP {}", resp.status()); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } Err(e) => { - println!("Failed to download ISIN/LEI ZIP: {e}"); + let msg = format!("Failed to download ISIN/LEI ZIP: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } }; @@ -528,21 +542,30 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { .and_then(|s| s.split("filename=").nth(1).map(|f| f.trim_matches('"').to_string())) .unwrap_or_else(|| "isin_lei.zip".to_string()); + // Parse timestamp from filename and convert to DDMMYYYY format + let parsed_filename = parse_gleif_filename(&filename); + logger::log_info(&format!("Corporate Scraper: Downloaded file: {} -> {}", filename, parsed_filename)).await; + let bytes = match resp.bytes().await { Ok(b) => b, Err(e) => { - println!("Failed to read ZIP bytes: {e}"); + let msg = format!("Failed to read ZIP bytes: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } }; - let zip_path = format!("data/gleif/{}", filename); - let csv_path = format!("data/gleif/{}", filename.replace(".zip", ".csv")); + let zip_path = gleif_cache_dir.join(&parsed_filename); + let csv_path = gleif_cache_dir.join(parsed_filename.replace(".zip", ".csv")); if let Err(e) = tokio::fs::write(&zip_path, &bytes).await { - println!("Failed to write ZIP file: {e}"); + let msg = format!("Failed to write ZIP file: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } + logger::log_info(&format!("Corporate Scraper: Saved ZIP to {:?}", zip_path)).await; // Extract CSV let archive = match std::fs::File::open(&zip_path) @@ -550,11 +573,15 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { { Ok(Ok(a)) => a, Ok(Err(e)) => { - println!("Invalid ZIP: {e}"); + let msg = format!("Invalid ZIP: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } Err(e) => { - println!("Cannot open ZIP file: {e}"); + let msg = format!("Cannot open ZIP file: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } }; @@ -568,7 +595,9 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { }) { Some(i) => i, None => { - println!("ZIP did not contain a CSV file"); + let msg = "ZIP did not contain a CSV file"; + logger::log_error(msg).await; + println!("{}", msg); return Ok(None); } }; @@ -576,23 +605,58 @@ pub async fn download_isin_lei_csv() -> anyhow::Result> { let mut csv_file = match archive.by_index(idx) { Ok(f) => f, Err(e) => { - println!("Failed to read CSV entry: {e}"); + let msg = format!("Failed to read CSV entry: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } }; let mut csv_bytes = Vec::new(); if let Err(e) = csv_file.read_to_end(&mut csv_bytes) { - println!("Failed to extract CSV: {e}"); + let msg = format!("Failed to extract CSV: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } if let Err(e) = tokio::fs::write(&csv_path, &csv_bytes).await { - println!("Failed to save CSV file: {e}"); + let msg = format!("Failed to save CSV file: {}", e); + logger::log_error(&msg).await; + println!("{}", msg); return Ok(None); } - Ok(Some(csv_path)) + let msg = format!("✓ ISIN/LEI CSV extracted: {:?}", csv_path); + println!("{}", msg); + logger::log_info(&msg).await; + + Ok(Some(csv_path.to_string_lossy().to_string())) +} + +/// Parse GLEIF filename and convert timestamp to DDMMYYYY format +/// Example: "isin-lei-20251124T080254.csv" -> "isin-lei-24112025.csv" +fn parse_gleif_filename(filename: &str) -> String { + // Try to find pattern: isin-lei-YYYYMMDDTHHMMSS.zip/csv + if let Some(start_idx) = filename.find("isin-lei-") { + let rest = &filename[start_idx + 9..]; // After "isin-lei-" + + // Extract the 8 digits (YYYYMMDD) + if rest.len() >= 8 && rest[0..8].chars().all(|c| c.is_numeric()) { + let date_part = &rest[0..8]; + // date_part is YYYYMMDD, convert to DDMMYYYY + if date_part.len() == 8 { + let year = &date_part[0..4]; + let month = &date_part[4..6]; + let day = &date_part[6..8]; + let extension = if filename.ends_with(".zip") { ".zip" } else { ".csv" }; + return format!("isin-lei-{}{}{}{}", day, month, year, extension); + } + } + } + + // Fallback: return original filename if parsing fails + filename.to_string() } diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 035652f..9f13fc0 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -61,7 +61,7 @@ pub async fn run_full_update(config: &Config, pool: &Arc) -> a // 3. Build FIGI → LEI map logger::log_info("Corporate Update: Building FIGI → LEI map...").await; - let figi_to_lei:HashMap> = match build_lei_to_figi_infos(&lei_to_isins).await { + let figi_to_lei:HashMap> = match build_lei_to_figi_infos(&lei_to_isins, None).await { Ok(map) => { let msg = format!("Corporate Update: Built FIGI map with {} entries", map.len()); println!("{}", msg); diff --git a/src/economic/scraper.rs b/src/economic/scraper.rs index c442fb6..c7c6d0c 100644 --- a/src/economic/scraper.rs +++ b/src/economic/scraper.rs @@ -7,39 +7,10 @@ const EXTRACTION_JS: &str = include_str!("extraction_script.js"); pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> { client.goto("https://www.finanzen.net/termine/wirtschaftsdaten/").await?; - //dismiss_overlays(client).await?; - /*if let Ok(tab) = client.find(fantoccini::Locator::Css(r#"div[data-sg-tab-item="teletrader-dates-three-stars"]"#)).await { - tab.click().await?; - println!("High importance tab selected"); - sleep(Duration::from_secs(2)).await; - }*/ Ok(()) } -/*pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> { - for _ in 0..10 { - let removed: bool = client - .execute( - r#"(() => { - const iframe = document.querySelector('iframe[title="Contentpass First Layer"]'); - if (iframe && iframe.parentNode) { - iframe.parentNode.removeChild(iframe); - return true; - } - return false; - })()"#, - vec![], - ) - .await? - .as_bool() - .unwrap_or(false); - if removed { break; } - sleep(Duration::from_millis(500)).await; - } - Ok(()) -}*/ - pub async fn set_date_range(client: &Client, start: &str, end: &str) -> anyhow::Result<()> { let script = format!( r#" diff --git a/src/util/directories.rs b/src/util/directories.rs index 4928bf9..4296bb6 100644 --- a/src/util/directories.rs +++ b/src/util/directories.rs @@ -7,6 +7,10 @@ pub struct DataPaths { data_dir: PathBuf, cache_dir: PathBuf, logs_dir: PathBuf, + // Cache data subdirectories + cache_gleif_dir: PathBuf, + cache_openfigi_dir: PathBuf, + cache_gleif_openfigi_map_dir: PathBuf, // Economic data subdirectories economic_events_dir: PathBuf, economic_changes_dir: PathBuf, @@ -25,6 +29,11 @@ impl DataPaths { let cache_dir = base_dir.join("cache"); let logs_dir = base_dir.join("logs"); + // Cache subdirectories + let cache_gleif_dir = cache_dir.join("gleif"); + let cache_openfigi_dir = cache_dir.join("openfigi"); + let cache_gleif_openfigi_map_dir = cache_dir.join("glei_openfigi"); + // Economic subdirectories let economic_events_dir = data_dir.join("economic").join("events"); let economic_changes_dir = economic_events_dir.join("changes"); @@ -39,6 +48,9 @@ impl DataPaths { fs::create_dir_all(&data_dir)?; fs::create_dir_all(&cache_dir)?; fs::create_dir_all(&logs_dir)?; + fs::create_dir_all(&cache_gleif_dir)?; + fs::create_dir_all(&cache_openfigi_dir)?; + fs::create_dir_all(&cache_gleif_openfigi_map_dir)?; fs::create_dir_all(&economic_events_dir)?; fs::create_dir_all(&economic_changes_dir)?; fs::create_dir_all(&corporate_events_dir)?; @@ -50,6 +62,9 @@ impl DataPaths { data_dir, cache_dir, logs_dir, + cache_gleif_dir, + cache_openfigi_dir, + cache_gleif_openfigi_map_dir, economic_events_dir, economic_changes_dir, corporate_events_dir, @@ -74,6 +89,18 @@ impl DataPaths { &self.logs_dir } + pub fn cache_gleif_dir(&self) -> &Path { + &self.cache_gleif_dir + } + + pub fn cache_openfigi_dir(&self) -> &Path { + &self.cache_openfigi_dir + } + + pub fn cache_gleif_openfigi_map_dir(&self) -> &Path { + &self.cache_gleif_openfigi_map_dir + } + /// Get the economic events directory pub fn economic_events_dir(&self) -> &Path { &self.economic_events_dir