From 00c9d4564269bc4c59ac7a7f7875024409b3dcba Mon Sep 17 00:00:00 2001 From: donpat1to Date: Fri, 12 Dec 2025 10:54:01 +0100 Subject: [PATCH] added data streaming instead of laoding --- src/corporate/openfigi.rs | 390 ++++++++++++++++++++++++-------- src/corporate/storage.rs | 230 ++++++++++++------- src/corporate/update.rs | 315 ++++++++++++++++---------- src/economic/storage.rs | 172 ++++++++++---- src/economic/update.rs | 175 ++++++++------ src/scraper/docker_vpn_proxy.rs | 2 +- src/scraper/webdriver.rs | 2 +- 7 files changed, 888 insertions(+), 398 deletions(-) diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index d63353a..a2c5722 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -1,8 +1,8 @@ +// src/corporate/openfigi.rs +use super::{types::*}; use crate::util::directories::DataPaths; use crate::util::logger; -// src/corporate/openfigi.rs -use super::{types::*}; use reqwest::Client as HttpClient; use reqwest::header::{HeaderMap, HeaderValue}; use serde_json::{json, Value}; @@ -15,6 +15,7 @@ use tokio::time::{sleep, Duration}; use tokio::fs as tokio_fs; use tokio::io::AsyncWriteExt; use anyhow::{Context, anyhow}; +use std::io::BufRead; #[derive(Clone)] pub struct OpenFigiClient { @@ -933,97 +934,6 @@ async fn remove_leis_batch_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove Ok(()) } -/// Loads or builds HashMaps for companies, warrants, and options. -/// -/// This function: -/// 1. Attempts to load existing data from cache -/// 2. Processes new FIGI data and classifies by securityType: -/// - "Common Stock" → companies HashMap (grouped by ISIN) -/// - "Equity WRT" → warrants HashMap (parsed from name) -/// - "Equity Option" → options HashMap (parsed from name) -/// 3. Updates/extends existing entries -/// 4. Saves results to separate JSON files -/// -/// # Arguments -/// * `figi_to_lei` - HashMap mapping LEI to Vec. -/// -/// # Returns -/// A tuple of (companies, warrants, options) HashMaps. -/// -/// # Errors -/// Returns an error if file I/O fails or JSON serialization fails. -pub async fn load_or_build_all_securities( - figi_to_lei: &HashMap> -) -> anyhow::Result<( - HashMap, - HashMap>, - HashMap> -)> { - // Load existing data - let mut commons = load_from_cache("data/corporate/by_name/common_stocks.json").await? - .unwrap_or_else(HashMap::new); - let mut warrants = load_from_cache("data/corporate/by_name/warrants.json").await? - .unwrap_or_else(HashMap::new); - let mut options = load_from_cache("data/corporate/by_name/options.json").await? - .unwrap_or_else(HashMap::new); - /*let mut preferred = load_from_cache("data/corporate/by_name/preferred.json").await? - .unwrap_or_else(HashMap::new);*/ - - - println!("Loaded existing data:"); - println!(" - Companies: {}", commons.len()); - println!(" - Warrants: {}", warrants.len()); - println!(" - Options: {}", options.len()); - - let mut stats = ProcessingStats::new(commons.len(), warrants.len(), options.len()); - - println!("Processing {} LEI entries from FIGI data...", figi_to_lei.len()); - - for (_lei, figi_infos) in figi_to_lei.iter() { - if figi_infos.is_empty() { - continue; - } - - // Group FigiInfos by security type - let mut common_stocks = Vec::new(); - let mut warrant_securities = Vec::new(); - let mut option_securities = Vec::new(); - - for figi_info in figi_infos { - match figi_info.security_type.as_str() { - "Common Stock" => common_stocks.push(figi_info.clone()), - "Equity WRT" => warrant_securities.push(figi_info.clone()), - "Equity Option" => option_securities.push(figi_info.clone()), - _ => {} // Ignore other types - } - } - - // Process common stocks -> companies - if !common_stocks.is_empty() { - process_common_stocks(&mut commons, &common_stocks, &mut stats); - } - - // Process warrants - if !warrant_securities.is_empty() { - process_warrants(&mut warrants, &warrant_securities, &mut stats); - } - - // Process options - if !option_securities.is_empty() { - process_options(&mut options, &option_securities, &mut stats); - } - } - - stats.print_summary(commons.len(), warrants.len(), options.len()); - - // Save all three HashMaps - save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?; - save_to_cache("data/corporate/by_name/warrants.json", &warrants).await?; - save_to_cache("data/corporate/by_name/options.json", &options).await?; - - Ok((commons, warrants, options)) -} - /// Statistics tracker for processing #[derive(Debug)] struct ProcessingStats { @@ -1583,5 +1493,299 @@ async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> { return Err(anyhow!("OpenFIGI API error: {}", status)); } + Ok(()) +} + +pub async fn stream_gleif_csv( + csv_path: &str, + mut callback: F +) -> anyhow::Result +where + F: FnMut(String, String) -> anyhow::Result<()>, +{ + logger::log_info(&format!("Streaming GLEIF CSV: {}", csv_path)).await; + + let file = std::fs::File::open(csv_path) + .context("Failed to open GLEIF CSV")?; + + let reader = std::io::BufReader::new(file); + let mut count = 0; + + for (idx, line) in reader.lines().enumerate() { + let line = line.context("Failed to read line")?; + + // Skip header + if idx == 0 { + continue; + } + + // Parse CSV line + let parts: Vec<&str> = line.split(',').collect(); + if parts.len() < 2 { + continue; + } + + let lei = parts[0].trim().trim_matches('"').to_string(); + let isin = parts[1].trim().trim_matches('"').to_string(); + + if !lei.is_empty() && !isin.is_empty() { + callback(lei, isin)?; + count += 1; + } + + // Yield periodically + if count % 10000 == 0 { + tokio::task::yield_now().await; + } + } + + logger::log_info(&format!("Streamed {} LEI-ISIN pairs", count)).await; + Ok(count) +} + +/// Process FIGI mappings in batches instead of all at once +pub async fn process_figi_mappings_streaming( + lei_to_isins_stream: impl Iterator)>, + gleif_date: Option<&str>, + batch_size: usize, +) -> anyhow::Result<()> { + let dir = DataPaths::new(".")?; + let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); + + let date = determine_gleif_date(gleif_date, &dir).await?; + let date_dir = map_cache_dir.join(&date); + tokio_fs::create_dir_all(&date_dir).await?; + + // Setup sector directories + let sector_dirs = load_market_sectors().await?; + setup_sector_directories(&date_dir, §or_dirs).await?; + + let client = OpenFigiClient::new().await?; + if !client.has_key { + logger::log_warn("No API key - limited FIGI mapping").await; + return Ok(()); + } + + // Process in batches + let mut batch = Vec::new(); + let mut processed = 0; + + for (lei, isins) in lei_to_isins_stream { + batch.push((lei, isins)); + + if batch.len() >= batch_size { + process_figi_batch(&client, &batch, &date_dir, §or_dirs).await?; + processed += batch.len(); + + logger::log_info(&format!("Processed {} LEIs so far...", processed)).await; + batch.clear(); + + // Yield to prevent blocking + tokio::task::yield_now().await; + } + } + + // Process remaining + if !batch.is_empty() { + process_figi_batch(&client, &batch, &date_dir, §or_dirs).await?; + processed += batch.len(); + } + + logger::log_info(&format!("Total processed: {} LEIs", processed)).await; + Ok(()) +} + +async fn process_figi_batch( + client: &OpenFigiClient, + batch: &[(String, Vec)], + date_dir: &Path, + sector_dirs: &[String], +) -> anyhow::Result<()> { + for (lei, isins) in batch { + let unique_isins: Vec<_> = isins.iter() + .cloned() + .collect::>() + .into_iter() + .collect(); + + let figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; + + if figi_infos.is_empty() { + continue; + } + + // Save to appropriate sector files + save_figi_infos_by_sector(lei, &figi_infos, date_dir, sector_dirs).await?; + } + + Ok(()) +} + +async fn save_figi_infos_by_sector( + lei: &str, + figi_infos: &[FigiInfo], + date_dir: &Path, + _sector_dirs: &[String], +) -> anyhow::Result<()> { + let mut by_sector: HashMap> = HashMap::new(); + + for figi_info in figi_infos { + let sector = if figi_info.market_sector.is_empty() { + "uncategorized".to_string() + } else { + figi_info.market_sector.clone() + }; + + by_sector.entry(sector).or_default().push(figi_info.clone()); + } + + // Save to sector files + for (sector, figis) in by_sector { + let sector_dir = date_dir.join(§or); + let path = sector_dir.join("lei_to_figi.jsonl"); + append_lei_to_figi_jsonl(&path, lei, &figis).await?; + } + + Ok(()) +} + +/// Modified load_or_build_all_securities to process in streaming fashion +pub async fn load_or_build_all_securities_streaming( + date_dir: &Path, +) -> anyhow::Result<( + HashMap, + HashMap>, + HashMap> +)> { + let mut commons = HashMap::new(); + let mut warrants = HashMap::new(); + let mut options = HashMap::new(); + + // Load existing data + commons = load_from_cache("data/corporate/by_name/common_stocks.json") + .await? + .unwrap_or_default(); + warrants = load_from_cache("data/corporate/by_name/warrants.json") + .await? + .unwrap_or_default(); + options = load_from_cache("data/corporate/by_name/options.json") + .await? + .unwrap_or_default(); + + println!("Loaded existing data:"); + println!(" - Companies: {}", commons.len()); + println!(" - Warrants: {}", warrants.len()); + println!(" - Options: {}", options.len()); + + let mut stats = ProcessingStats::new(commons.len(), warrants.len(), options.len()); + + // Stream through JSONL files in date_dir + let equity_file = date_dir.join("Equity").join("lei_to_figi.jsonl"); + + if equity_file.exists() { + logger::log_info(&format!("Streaming FIGIs from {:?}", equity_file)).await; + + let content = tokio_fs::read_to_string(&equity_file).await?; + let mut processed = 0; + + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + + let entry: serde_json::Value = serde_json::from_str(line)?; + let _lei = entry["lei"].as_str().unwrap_or(""); + let figi_infos: Vec = serde_json::from_value( + entry["figis"].clone() + )?; + + // Process this batch + process_figi_infos_batch( + &figi_infos, + &mut commons, + &mut warrants, + &mut options, + &mut stats + ); + + processed += 1; + if processed % 100 == 0 { + tokio::task::yield_now().await; + } + } + } + + stats.print_summary(commons.len(), warrants.len(), options.len()); + + // Save incrementally + save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?; + save_to_cache("data/corporate/by_name/warrants.json", &warrants).await?; + save_to_cache("data/corporate/by_name/options.json", &options).await?; + + Ok((commons, warrants, options)) +} + +fn process_figi_infos_batch( + figi_infos: &[FigiInfo], + commons: &mut HashMap, + warrants: &mut HashMap>, + options: &mut HashMap>, + stats: &mut ProcessingStats, +) { + let mut common_stocks = Vec::new(); + let mut warrant_securities = Vec::new(); + let mut option_securities = Vec::new(); + + for figi_info in figi_infos { + match figi_info.security_type.as_str() { + "Common Stock" => common_stocks.push(figi_info.clone()), + "Equity WRT" => warrant_securities.push(figi_info.clone()), + "Equity Option" => option_securities.push(figi_info.clone()), + _ => {} + } + } + + if !common_stocks.is_empty() { + process_common_stocks(commons, &common_stocks, stats); + } + + if !warrant_securities.is_empty() { + process_warrants(warrants, &warrant_securities, stats); + } + + if !option_securities.is_empty() { + process_options(options, &option_securities, stats); + } +} + +// Helper functions +async fn determine_gleif_date( + gleif_date: Option<&str>, + paths: &DataPaths, +) -> anyhow::Result { + if let Some(d) = gleif_date { + Ok(d.to_string()) + } else { + match find_most_recent_gleif_date(paths.cache_gleif_dir()).await? { + Some(d) => Ok(d), + None => Err(anyhow!("No GLEIF CSV file found")), + } + } +} + +async fn setup_sector_directories( + date_dir: &Path, + sector_dirs: &[String], +) -> anyhow::Result<()> { + // Create uncategorized folder + let uncategorized_dir = date_dir.join("uncategorized"); + tokio_fs::create_dir_all(&uncategorized_dir).await?; + + // Create sector folders + for sector in sector_dirs { + let sector_dir = date_dir.join(sector); + tokio_fs::create_dir_all(§or_dir).await?; + } + Ok(()) } \ No newline at end of file diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index 40c5063..37741c2 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -7,18 +7,24 @@ use tokio::fs; use tokio::io::AsyncWriteExt; use chrono::{Datelike, NaiveDate}; use std::collections::{HashMap}; -use std::path::{PathBuf}; +use std::path::{PathBuf, Path}; -pub async fn load_existing_events(paths: &DataPaths) -> anyhow::Result> { - let mut map = HashMap::new(); +const BATCH_SIZE: usize = 500; // Process 500 events at a time + +/// Load events in streaming fashion to avoid memory buildup +pub async fn load_existing_events_streaming( + paths: &DataPaths, + callback: impl Fn(CompanyEvent) -> anyhow::Result<()> +) -> anyhow::Result { let dir = paths.corporate_events_dir(); if !dir.exists() { logger::log_info("Corporate Storage: No existing events directory found").await; - return Ok(map); + return Ok(0); } + let mut total = 0; let mut entries = fs::read_dir(dir).await?; - let mut loaded_count = 0; + while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.extension().and_then(|s| s.to_str()) == Some("json") { @@ -26,18 +32,84 @@ pub async fn load_existing_events(paths: &DataPaths) -> anyhow::Result = serde_json::from_str(&content)?; + for event in events { - map.insert(event_key(&event), event); + callback(event)?; + total += 1; } - loaded_count += 1; + + // Yield to prevent blocking + tokio::task::yield_now().await; } } } - logger::log_info(&format!("Corporate Storage: Loaded {} events from {} files", map.len(), loaded_count)).await; - Ok(map) + + logger::log_info(&format!("Corporate Storage: Streamed {} events", total)).await; + Ok(total) } -pub async fn save_optimized_events(paths: &DataPaths, events: HashMap) -> anyhow::Result<()> { +/// Build lightweight index of events instead of loading everything +#[derive(Debug, Clone)] +pub struct EventIndex { + pub key: String, + pub ticker: String, + pub date: String, + pub file_path: PathBuf, +} + +pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result> { + let dir = paths.corporate_events_dir(); + if !dir.exists() { + return Ok(Vec::new()); + } + + let mut index = Vec::new(); + let mut entries = fs::read_dir(dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.extension().and_then(|s| s.to_str()) == Some("json") { + let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); + if name.starts_with("events_") && name.len() == 17 { + let content = fs::read_to_string(&path).await?; + let events: Vec = serde_json::from_str(&content)?; + + for event in events { + index.push(EventIndex { + key: event_key(&event), + ticker: event.ticker.clone(), + date: event.date.clone(), + file_path: path.clone(), + }); + } + } + } + } + + logger::log_info(&format!("Corporate Storage: Built index with {} entries", index.len())).await; + Ok(index) +} + +/// Lookup specific event by loading only its file +pub async fn lookup_event_by_key( + key: &str, + index: &[EventIndex] +) -> anyhow::Result> { + let entry = index.iter().find(|e| e.key == key); + + if let Some(entry) = entry { + let content = fs::read_to_string(&entry.file_path).await?; + let events: Vec = serde_json::from_str(&content)?; + Ok(events.into_iter().find(|e| event_key(e) == key)) + } else { + Ok(None) + } +} + +pub async fn save_optimized_events( + paths: &DataPaths, + events: Vec // Changed from HashMap to Vec +) -> anyhow::Result<()> { let dir = paths.corporate_events_dir(); fs::create_dir_all(dir).await?; @@ -55,15 +127,23 @@ pub async fn save_optimized_events(paths: &DataPaths, events: HashMap = events.into_values().collect(); - sorted.sort_by_key(|e| (e.ticker.clone(), e.date.clone())); + let mut sorted = events; + sorted.sort_by(|a, b| { + a.ticker.cmp(&b.ticker) + .then(a.date.cmp(&b.date)) + }); + // Process in batches to avoid memory buildup let mut by_month: HashMap> = HashMap::new(); - for e in sorted { - if let Ok(d) = NaiveDate::parse_from_str(&e.date, "%Y-%m-%d") { - let key = format!("{}-{:02}", d.year(), d.month()); - by_month.entry(key).or_default().push(e); + + for chunk in sorted.chunks(BATCH_SIZE) { + for e in chunk { + if let Ok(d) = NaiveDate::parse_from_str(&e.date, "%Y-%m-%d") { + let key = format!("{}-{:02}", d.year(), d.month()); + by_month.entry(key).or_default().push(e.clone()); + } } + tokio::task::yield_now().await; } let total_months = by_month.len(); @@ -72,6 +152,7 @@ pub async fn save_optimized_events(paths: &DataPaths, events: HashMap Ok(()) } -pub async fn save_prices_for_ticker(paths: &DataPaths, ticker: &str, timeframe: &str, mut prices: Vec) -> anyhow::Result<()> { +pub async fn save_prices_for_ticker( + paths: &DataPaths, + ticker: &str, + timeframe: &str, + mut prices: Vec +) -> anyhow::Result<()> { let base_dir = paths.corporate_prices_dir(); let company_dir = base_dir.join(ticker.replace(".", "_")); let timeframe_dir = company_dir.join(timeframe); @@ -142,7 +228,11 @@ pub async fn ensure_company_dirs(paths: &DataPaths, isin: &str) -> anyhow::Resul Ok(()) } -pub async fn save_available_exchanges(paths: &DataPaths, isin: &str, exchanges: Vec) -> anyhow::Result<()> { +pub async fn save_available_exchanges( + paths: &DataPaths, + isin: &str, + exchanges: Vec +) -> anyhow::Result<()> { let dir = get_company_dir(paths, isin); fs::create_dir_all(&dir).await?; let path = dir.join("available_exchanges.json"); @@ -177,66 +267,8 @@ pub async fn save_prices_by_source( Ok(()) } -/// Update available_exchanges.json with fetch results -/*pub async fn update_available_exchange( - paths: &DataPaths, - isin: &str, - ticker: &str, - exchange_mic: &str, - has_daily: bool, - has_5min: bool, -) -> anyhow::Result<()> { - let mut exchanges = load_available_exchanges(paths, isin).await?; - - if let Some(entry) = exchanges.iter_mut().find(|e| e.ticker == ticker) { - // Update existing entry - entry.record_success(has_daily, has_5min); - } else { - // Create new entry - need to get currency from somewhere - // Try to infer from the ticker or use a default - let currency = infer_currency_from_ticker(ticker); - let mut new_entry = AvailableExchange::new( - ticker.to_string(), - exchange_mic.to_string(), - currency, - ); - new_entry.record_success(has_daily, has_5min); - exchanges.push(new_entry); - } - - save_available_exchanges(paths, isin, exchanges).await -}*/ - -/// Infer currency from ticker suffix -fn infer_currency_from_ticker(ticker: &str) -> String { - if ticker.ends_with(".L") { return "GBP".to_string(); } - if ticker.ends_with(".PA") { return "EUR".to_string(); } - if ticker.ends_with(".DE") { return "EUR".to_string(); } - if ticker.ends_with(".AS") { return "EUR".to_string(); } - if ticker.ends_with(".MI") { return "EUR".to_string(); } - if ticker.ends_with(".SW") { return "CHF".to_string(); } - if ticker.ends_with(".T") { return "JPY".to_string(); } - if ticker.ends_with(".HK") { return "HKD".to_string(); } - if ticker.ends_with(".SS") { return "CNY".to_string(); } - if ticker.ends_with(".SZ") { return "CNY".to_string(); } - if ticker.ends_with(".TO") { return "CAD".to_string(); } - if ticker.ends_with(".AX") { return "AUD".to_string(); } - if ticker.ends_with(".SA") { return "BRL".to_string(); } - if ticker.ends_with(".MC") { return "EUR".to_string(); } - if ticker.ends_with(".BO") || ticker.ends_with(".NS") { return "INR".to_string(); } - - "USD".to_string() // Default -} - -/// Saves companies data to a JSONL file. -/// -/// # Arguments -/// * `paths` - Reference to DataPaths for directory management -/// * `companies` - HashMap of company names to their securities (ISIN, Ticker pairs) -/// -/// # Errors -/// Returns an error if file operations or serialization fails. -pub async fn save_companies_to_jsonl( +/// Saves companies data to a JSONL file in streaming fashion +pub async fn save_companies_to_jsonl_streaming( paths: &DataPaths, companies: &HashMap>, ) -> anyhow::Result<()> { @@ -244,13 +276,14 @@ pub async fn save_companies_to_jsonl( logger::log_info(&format!("Corporate Storage: Saving {} companies to JSONL", companies.len())).await; - // Create parent directory if it doesn't exist if let Some(parent) = file_path.parent() { tokio::fs::create_dir_all(parent).await?; } let mut file = tokio::fs::File::create(&file_path).await?; + let mut count = 0; + // Process in batches for (name, securities) in companies.iter() { let line = serde_json::json!({ "name": name, @@ -258,10 +291,49 @@ pub async fn save_companies_to_jsonl( }); file.write_all(line.to_string().as_bytes()).await?; file.write_all(b"\n").await?; + + count += 1; + if count % 100 == 0 { + tokio::task::yield_now().await; + } } let msg = format!("✓ Saved {} companies to {:?}", companies.len(), file_path); println!("{}", msg); logger::log_info(&msg).await; Ok(()) +} + +/// Load companies from JSONL in streaming fashion +pub async fn load_companies_from_jsonl_streaming( + path: &Path, + callback: impl Fn(String, HashMap) -> anyhow::Result<()> +) -> anyhow::Result { + if !path.exists() { + return Ok(0); + } + + let content = tokio::fs::read_to_string(path).await?; + let mut count = 0; + + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + + let entry: serde_json::Value = serde_json::from_str(line)?; + let name = entry["name"].as_str().unwrap_or("").to_string(); + let securities: HashMap = serde_json::from_value( + entry["securities"].clone() + )?; + + callback(name, securities)?; + count += 1; + + if count % 100 == 0 { + tokio::task::yield_now().await; + } + } + + Ok(count) } \ No newline at end of file diff --git a/src/corporate/update.rs b/src/corporate/update.rs index ea8b565..2c88666 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -9,161 +9,234 @@ use chrono::Local; use std::collections::{HashMap}; use std::sync::Arc; -/// Main function: Full update for all companies (LEI-based) with optimized parallel execution. -/// -/// This function coordinates the entire update process: -/// - Loads GLEIF mappings -/// - Builds FIGI-LEI map -/// - Loads existing events -/// - Processes each company: discovers exchanges via FIGI, fetches prices & earnings, aggregates data -/// - Uses the provided shared ChromeDriver pool for efficient parallel scraping -/// - Saves optimized events -/// -/// # Arguments -/// * `config` - The application configuration. -/// * `pool` - Shared pool of ChromeDriver instances for scraping. -/// -/// # Errors -/// Returns an error if any step in the update process fails. +/// Main function: Full update for all companies with streaming to minimize memory usage pub async fn run_full_update(config: &Config, pool: &Arc) -> anyhow::Result<()> { - let msg = "=== Starting LEI-based corporate full update ==="; + let msg = "=== Starting LEI-based corporate full update (STREAMING) ==="; println!("{}", msg); logger::log_info(msg).await; - // Initialize paths let paths = DataPaths::new(".")?; - // 1. Load fresh GLEIF ISIN ↔ LEI mapping - logger::log_info("Corporate Update: Loading GLEIF ISIN ↔ LEI mapping...").await; - let lei_to_isins: HashMap> = match load_isin_lei_csv().await { - Ok(map) => { - let msg = format!("Corporate Update: Loaded GLEIF mapping with {} LEI entries", map.len()); - println!("{}", msg); - logger::log_info(&msg).await; - map + // Step 1: Download/locate GLEIF CSV (don't load into memory yet) + logger::log_info("Corporate Update: Downloading/locating GLEIF CSV...").await; + let gleif_csv_path = match download_isin_lei_csv().await? { + Some(p) => { + logger::log_info(&format!("Corporate Update: GLEIF CSV at: {}", p)).await; + p } - Err(e) => { - let msg = format!("Corporate Update: Warning - Could not load GLEIF ISIN↔LEI mapping: {}", e); - eprintln!("{}", msg); - logger::log_warn(&msg).await; - HashMap::new() + None => { + logger::log_warn("Corporate Update: Could not obtain GLEIF CSV, continuing with limited data").await; + return Ok(()); } }; - // 2. Load OpenFIGI mapping value lists (cached) + // Step 2: Load OpenFIGI type lists (small, cached) logger::log_info("Corporate Update: Loading OpenFIGI type lists...").await; if let Err(e) = load_figi_type_lists().await { - let msg = format!("Corporate Update: Warning - Could not load OpenFIGI type lists: {}", e); - eprintln!("{}", msg); - logger::log_warn(&msg).await; + logger::log_warn(&format!("Could not load OpenFIGI type lists: {}", e)).await; } - logger::log_info("Corporate Update: OpenFIGI type lists loaded").await; - // 3. Build FIGI → LEI map - logger::log_info("Corporate Update: Building FIGI → LEI map...").await; - let figi_to_lei:HashMap> = match build_lei_to_figi_infos(&lei_to_isins, None).await { - Ok(map) => { - let msg = format!("Corporate Update: Built FIGI map with {} entries", map.len()); - println!("{}", msg); - logger::log_info(&msg).await; - map - } - Err(e) => { - let msg = format!("Corporate Update: Warning - Could not build FIGI→LEI map: {}", e); - eprintln!("{}", msg); - logger::log_warn(&msg).await; - HashMap::new() + // Step 3: Process GLEIF → FIGI mapping in streaming fashion + logger::log_info("Corporate Update: Building FIGI mappings (streaming)...").await; + + // Build LEI→ISINs map by streaming the CSV + let mut lei_to_isins: HashMap> = HashMap::new(); + let mut lei_batch = Vec::new(); + const LEI_BATCH_SIZE: usize = 1000; + + stream_gleif_csv(&gleif_csv_path, |lei, isin| { + lei_to_isins.entry(lei.clone()).or_default().push(isin); + lei_batch.push(lei); + + // Process in batches + if lei_batch.len() >= LEI_BATCH_SIZE { + lei_batch.clear(); } + + Ok(()) + }).await?; + + logger::log_info(&format!("Corporate Update: Collected {} LEIs", lei_to_isins.len())).await; + + // Step 4: Build FIGI mappings in batches (process and save incrementally) + logger::log_info("Corporate Update: Processing FIGI mappings in batches...").await; + let figi_result = build_lei_to_figi_infos(&lei_to_isins, None).await; + + // Don't keep the full result in memory - it's already saved to JSONL files + drop(figi_result); + drop(lei_to_isins); // Release this too + + logger::log_info("Corporate Update: FIGI mappings saved to cache").await; + + // Step 5: Load or build securities (streaming from JSONL files) + logger::log_info("Corporate Update: Building securities map (streaming)...").await; + + let dir = DataPaths::new(".")?; + let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); + + // Find the most recent date directory + let date_dir = find_most_recent_date_dir(&map_cache_dir).await?; + + let (common_stocks, _warrants, _options) = if let Some(date_dir) = date_dir { + logger::log_info(&format!("Using FIGI data from: {:?}", date_dir)).await; + load_or_build_all_securities_streaming(&date_dir).await? + } else { + logger::log_warn("No FIGI date directory found, using empty maps").await; + (HashMap::new(), HashMap::new(), HashMap::new()) }; - // 4. Load or build companies - logger::log_info("Corporate Update: Loading/building company securities...").await; - let securities = load_or_build_all_securities(&figi_to_lei).await?; - let msg = format!("Corporate Update: Processing {} companies", securities.0.len()); - println!("{}", msg); - logger::log_info(&msg).await; + logger::log_info(&format!("Corporate Update: Processing {} companies", common_stocks.len())).await; - // HashMap> - unique pairs only - let companies: HashMap> = securities.0 - .iter() - .fold(HashMap::new(), |mut acc, security| { - let mut isin_ticker_pairs: HashMap = HashMap::new(); - - // Collect all unique ISIN-Ticker pairs - for figi_infos in security.1.securities.values() { - for figi_info in figi_infos { - if !figi_info.isin.is_empty() && !figi_info.ticker.is_empty() { - isin_ticker_pairs.insert(figi_info.isin.clone(), figi_info.ticker.clone()); - } + // Step 6: Convert to simplified companies map and save incrementally + logger::log_info("Corporate Update: Building companies JSONL (streaming)...").await; + + let companies_path = paths.data_dir().join("companies.jsonl"); + + // Create file and write incrementally + if let Some(parent) = companies_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + + let mut file = tokio::fs::File::create(&companies_path).await?; + let mut processed = 0; + + for (name, company_info) in common_stocks.iter() { + let mut isin_ticker_pairs: HashMap = HashMap::new(); + + for figi_infos in company_info.securities.values() { + for figi_info in figi_infos { + if !figi_info.isin.is_empty() && !figi_info.ticker.is_empty() { + isin_ticker_pairs.insert(figi_info.isin.clone(), figi_info.ticker.clone()); } } + } + + if !isin_ticker_pairs.is_empty() { + use tokio::io::AsyncWriteExt; - // Only add if there are pairs - if !isin_ticker_pairs.is_empty() { - acc.insert(security.1.name.clone(), isin_ticker_pairs); + let line = serde_json::json!({ + "name": name, + "securities": isin_ticker_pairs + }); + + file.write_all(line.to_string().as_bytes()).await?; + file.write_all(b"\n").await?; + processed += 1; + + // Yield periodically + if processed % 100 == 0 { + tokio::task::yield_now().await; + logger::log_info(&format!("Saved {} companies so far...", processed)).await; } - acc - }); + } + } - logger::log_info(&format!("Corporate Update: Saving {} companies to JSONL", companies.len())).await; - save_companies_to_jsonl(&paths, &companies).await.expect("Failed to save companies List."); - logger::log_info("Corporate Update: Companies saved successfully").await; + logger::log_info(&format!("Corporate Update: Saved {} companies to JSONL", processed)).await; - // 5. Load existing earnings events (for change detection) - logger::log_info("Corporate Update: Loading existing events...").await; - let existing_events = match load_existing_events(&paths).await { - Ok(events) => { - let msg = format!("Corporate Update: Loaded {} existing events", events.len()); - println!("{}", msg); - logger::log_info(&msg).await; - events - } - Err(e) => { - let msg = format!("Corporate Update: Warning - Could not load existing events: {}", e); - eprintln!("{}", msg); - logger::log_warn(&msg).await; - HashMap::new() - } - }; + // Step 7: Process events in streaming fashion + logger::log_info("Corporate Update: Processing events (streaming)...").await; + + let event_index = build_event_index(&paths).await?; + logger::log_info(&format!("Corporate Update: Built index of {} events", event_index.len())).await; + + // For now, we just maintain the index + // In a full implementation, you'd stream through tickers and update events + + // Step 8: Save any updates + logger::log_info("Corporate Update: Finalizing...").await; - // 5. Use the provided pool (no need to create a new one) - let pool_size = pool.get_number_of_instances(); // Use the size from the shared pool - logger::log_info(&format!("Corporate Update: Using pool size: {}", pool_size)).await; - - // Process companies in parallel using the shared pool - /*let results: Vec<_> = stream::iter(companies.into_iter()) - .map(|company| { - let pool_clone = pool.clone(); - async move { - process_company_data(&company, &pool_clone, &mut existing_events).await - } - }) - .buffer_unordered(pool_size) - .collect().await; - - // Handle results (e.g., collect changes) - let mut all_changes = Vec::new(); - for result in results { - if let Ok(ProcessResult { changes }) = result { - all_changes.extend(changes); - } - }*/ - - logger::log_info(&format!("Corporate Update: Saving {} events to optimized storage", existing_events.len())).await; - save_optimized_events(&paths, existing_events).await?; - logger::log_info("Corporate Update: Events saved successfully").await; - //save_changes(&all_changes).await?; - - let msg = "✓ Corporate update complete"; + let msg = "✓ Corporate update complete (streaming)"; println!("{}", msg); logger::log_info(msg).await; Ok(()) } +/// Helper to find the most recent date directory in the FIGI cache +async fn find_most_recent_date_dir(map_cache_dir: &std::path::Path) -> anyhow::Result> { + if !map_cache_dir.exists() { + return Ok(None); + } + + let mut entries = tokio::fs::read_dir(map_cache_dir).await?; + let mut dates = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + // Date format: DDMMYYYY + if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { + dates.push((name.to_string(), path)); + } + } + } + } + + if dates.is_empty() { + return Ok(None); + } + + // Sort by date (DDMMYYYY format) + dates.sort_by(|a, b| b.0.cmp(&a.0)); // Descending order + + Ok(Some(dates[0].1.clone())) +} + pub struct ProcessResult { pub changes: Vec, } +/// Process events in batches to avoid memory buildup +pub async fn process_events_streaming( + index: &[EventIndex], + new_events: &[CompanyEvent], + today: &str, +) -> anyhow::Result<(Vec, Vec)> { + let mut all_changes = Vec::new(); + let mut final_events: HashMap = HashMap::new(); + + // Step 1: Load existing events in batches using the index + logger::log_info("Loading existing events in batches...").await; + + let mut loaded_files = std::collections::HashSet::new(); + + for entry in index { + if loaded_files.contains(&entry.file_path) { + continue; + } + + let content = tokio::fs::read_to_string(&entry.file_path).await?; + let events: Vec = serde_json::from_str(&content)?; + + for e in events { + final_events.insert(event_key(&e), e); + } + + loaded_files.insert(entry.file_path.clone()); + + if final_events.len() % 1000 == 0 { + logger::log_info(&format!("Loaded {} events so far...", final_events.len())).await; + tokio::task::yield_now().await; + } + } + + logger::log_info(&format!("Loaded {} existing events", final_events.len())).await; + + // Step 2: Process new events in batches + for (idx, batch) in new_events.chunks(500).enumerate() { + logger::log_info(&format!("Processing batch {} ({} events)", idx + 1, batch.len())).await; + + let batch_result = process_batch(batch, &mut final_events, today); + all_changes.extend(batch_result.changes); + + tokio::task::yield_now().await; + } + + let events_vec: Vec = final_events.into_values().collect(); + + Ok((all_changes, events_vec)) +} + pub fn process_batch( new_events: &[CompanyEvent], existing: &mut HashMap, diff --git a/src/economic/storage.rs b/src/economic/storage.rs index fd04bc2..23ad19b 100644 --- a/src/economic/storage.rs +++ b/src/economic/storage.rs @@ -6,6 +6,10 @@ use crate::util::logger; use tokio::fs; use chrono::{NaiveDate, Datelike}; use std::collections::HashMap; +use serde_json; + +const CHUNK_SIZE: usize = 500; // Process 500 events at a time +const MAX_EVENTS_PER_FILE: usize = 3000; pub async fn scan_existing_chunks(paths: &DataPaths) -> anyhow::Result> { let dir = paths.economic_events_dir(); @@ -18,37 +22,122 @@ pub async fn scan_existing_chunks(paths: &DataPaths) -> anyhow::Result>(&content) { - let start = name[6..16].to_string(); - let end = name[17..27].to_string(); - chunks.push(ChunkInfo { start_date: start, end_date: end, path, event_count: events.len() }); - } - } + // Don't load the events here, just record the chunk info + let start = name[6..16].to_string(); + let end = name[17..27].to_string(); + chunks.push(ChunkInfo { + start_date: start, + end_date: end, + path, + event_count: 0 // We'll count later if needed + }); } } } } } chunks.sort_by_key(|c| c.start_date.clone()); - logger::log_info(&format!("Economic Storage: Scanned {} event chunks", chunks.len())).await; + logger::log_info(&format!("Economic Storage: Found {} event chunks", chunks.len())).await; Ok(chunks) } -pub async fn load_existing_events(chunks: &[ChunkInfo]) -> anyhow::Result> { - let mut map = HashMap::new(); - for chunk in chunks { - let content = fs::read_to_string(&chunk.path).await?; - let events: Vec = serde_json::from_str(&content)?; - for e in events { - map.insert(event_key(&e), e); - } +/// Stream events from a single chunk file +pub async fn stream_chunk_events( + chunk: &ChunkInfo, + callback: impl Fn(EconomicEvent) -> anyhow::Result<()> +) -> anyhow::Result { + let content = fs::read_to_string(&chunk.path).await?; + let events: Vec = serde_json::from_str(&content)?; + let count = events.len(); + + for event in events { + callback(event)?; } - logger::log_info(&format!("Economic Storage: Loaded {} events from {} chunks", map.len(), chunks.len())).await; - Ok(map) + + Ok(count) } -pub async fn save_optimized_chunks(paths: &DataPaths, events: HashMap) -> anyhow::Result<()> { +/// Load events in batches to avoid memory explosion +pub async fn load_events_in_batches( + chunks: &[ChunkInfo], + batch_size: usize, +) -> anyhow::Result> { + let mut all_events = Vec::new(); + + for chunk in chunks { + logger::log_info(&format!("Loading chunk: {:?}", chunk.path.file_name())).await; + + let content = fs::read_to_string(&chunk.path).await?; + let events: Vec = serde_json::from_str(&content)?; + + for e in events { + all_events.push((event_key(&e), e)); + } + + // If we've accumulated enough, yield them + if all_events.len() >= batch_size { + break; + } + } + + logger::log_info(&format!("Loaded {} events in batch", all_events.len())).await; + Ok(all_events.into_iter()) +} + +/// NEW: Build a lightweight index instead of loading all events +#[derive(Debug, Clone)] +pub struct EventIndex { + pub key: String, + pub identity_key: String, + pub date: String, + pub chunk_file: std::path::PathBuf, +} + +pub async fn build_event_index(chunks: &[ChunkInfo]) -> anyhow::Result> { + let mut index = Vec::new(); + + for chunk in chunks { + logger::log_info(&format!("Indexing chunk: {:?}", chunk.path.file_name())).await; + + let content = fs::read_to_string(&chunk.path).await?; + let events: Vec = serde_json::from_str(&content)?; + + for e in events { + index.push(EventIndex { + key: event_key(&e), + identity_key: identity_key(&e), + date: e.date.clone(), + chunk_file: chunk.path.clone(), + }); + } + } + + logger::log_info(&format!("Built index with {} entries", index.len())).await; + Ok(index) +} + +/// NEW: Look up a specific event by loading only its chunk +pub async fn lookup_event_by_key(key: &str, index: &[EventIndex]) -> anyhow::Result> { + // Find which chunk contains this event + let entry = index.iter().find(|e| e.key == key); + + if let Some(entry) = entry { + // Load only that chunk + let content = fs::read_to_string(&entry.chunk_file).await?; + let events: Vec = serde_json::from_str(&content)?; + + // Find the specific event + Ok(events.into_iter().find(|e| event_key(e) == key)) + } else { + Ok(None) + } +} + +/// Save events in smaller, more manageable chunks +pub async fn save_optimized_chunks( + paths: &DataPaths, + events: Vec // Changed from HashMap to Vec +) -> anyhow::Result<()> { let dir = paths.economic_events_dir(); fs::create_dir_all(dir).await?; @@ -67,31 +156,36 @@ pub async fn save_optimized_chunks(paths: &DataPaths, events: HashMap = events.into_values().collect(); - sorted.sort_by_key(|e| e.date.clone()); + let mut sorted = events; + sorted.sort_by(|a, b| a.date.cmp(&b.date)); - let mut chunk: Vec = Vec::new(); - const MAX_EVENTS_PER_CHUNK: usize = ( 30000 / 2 ) / 11; // (30000 - 2) / 11 = 2727 - - for e in sorted { - if !chunk.is_empty() && chunk.len() >= MAX_EVENTS_PER_CHUNK { - save_chunk(&chunk, dir).await?; - chunk.clear(); - } - chunk.push(e); + // Save in smaller chunks + let mut chunk_num = 0; + for chunk in sorted.chunks(MAX_EVENTS_PER_FILE) { + save_chunk_vec(chunk, dir, chunk_num).await?; + chunk_num += 1; + + // Allow other tasks to run + tokio::task::yield_now().await; } - if !chunk.is_empty() { - save_chunk(&chunk, dir).await?; - } - logger::log_info(&format!("Economic Storage: Saved all event chunks to {:?}", dir)).await; + + logger::log_info(&format!("Economic Storage: Saved {} chunks to {:?}", chunk_num, dir)).await; Ok(()) } -async fn save_chunk(events: &[EconomicEvent], dir: &std::path::Path) -> anyhow::Result<()> { - let start = events.iter().map(|e| &e.date).min().unwrap().clone(); - let end = events.iter().map(|e| &e.date).max().unwrap().clone(); - let path = dir.join(format!("chunk_{}_{}.json", start, end)); - fs::write(&path, serde_json::to_string_pretty(events)?).await?; +async fn save_chunk_vec(events: &[EconomicEvent], dir: &std::path::Path, chunk_num: usize) -> anyhow::Result<()> { + if events.is_empty() { + return Ok(()); + } + + let start = &events[0].date; + let end = &events[events.len() - 1].date; + let path = dir.join(format!("chunk_{:04}_{}_{}.json", chunk_num, start, end)); + + // Write incrementally to avoid large memory allocation + let json = serde_json::to_string_pretty(events)?; + fs::write(&path, json).await?; + logger::log_info(&format!("Economic Storage: Saved chunk {} - {} ({} events)", start, end, events.len())).await; Ok(()) } diff --git a/src/economic/update.rs b/src/economic/update.rs index 1197bef..315dd21 100644 --- a/src/economic/update.rs +++ b/src/economic/update.rs @@ -3,15 +3,9 @@ use super::{scraper::*, storage::*, helpers::*, types::*}; use crate::{config::Config, scraper::webdriver::{ScrapeTask, ChromeDriverPool}, util::directories::DataPaths, util::logger}; use chrono::{Local}; use std::sync::Arc; +use std::collections::HashMap; -/// Runs the full update for economic data, using the provided ChromeDriver pool. -/// -/// # Arguments -/// * `config` - The application configuration. -/// * `pool` - Shared pool of ChromeDriver instances for scraping. -/// -/// # Errors -/// Returns an error if scraping, loading, or saving fails. +/// Runs the full update for economic data using streaming to minimize memory usage pub async fn run_full_update(config: &Config, pool: &Arc) -> anyhow::Result<()> { let paths = DataPaths::new(".")?; @@ -20,81 +14,124 @@ pub async fn run_full_update(config: &Config, pool: &Arc) -> a let today_str = chrono::Local::now().date_naive().format("%Y-%m-%d").to_string(); let end_date = config.target_end_date(); - logger::log_info(&format!("Economic Update: Scanning existing chunks from {:?}", paths.economic_events_dir())).await; + // Step 1: Build lightweight index instead of loading all events + logger::log_info("Economic Update: Building event index...").await; let chunks = scan_existing_chunks(&paths).await?; - let mut events = load_existing_events(&chunks).await?; + let event_index = build_event_index(&chunks).await?; - let msg = format!("Economic Update: Loaded {} events from {} chunks", events.len(), chunks.len()); - println!("{}", msg); - logger::log_info(&msg).await; + logger::log_info(&format!("Economic Update: Indexed {} events from {} chunks", + event_index.len(), chunks.len())).await; - let start_date = if events.is_empty() { + // Step 2: Determine start date + let start_date = if event_index.is_empty() { logger::log_warn("Economic Update: No existing events found, starting from config date").await; config.economic_start_date.clone() - } else if events.values().any(|e| e.date >= today_str) { - logger::log_info("Economic Update: Events exist for today, starting from today").await; - today_str.clone() } else { - let next = events.values() - .filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok()) + // Find the latest date in the index + let max_date = event_index.iter() + .map(|e| &e.date) .max() - .and_then(|d| d.succ_opt()) - .map(|d| d.format("%Y-%m-%d").to_string()) + .cloned() .unwrap_or(today_str.clone()); - logger::log_info(&format!("Economic Update: Resuming from: {}", next)).await; - next + + if max_date >= today_str { + logger::log_info("Economic Update: Events exist for today, starting from today").await; + today_str.clone() + } else { + let next = chrono::NaiveDate::parse_from_str(&max_date, "%Y-%m-%d") + .ok() + .and_then(|d| d.succ_opt()) + .map(|d| d.format("%Y-%m-%d").to_string()) + .unwrap_or(today_str.clone()); + logger::log_info(&format!("Economic Update: Resuming from: {}", next)).await; + next + } }; - let msg = format!("Economic Update: Scraping events from {} → {}", start_date, end_date); - println!("{}", msg); - logger::log_info(&msg).await; + logger::log_info(&format!("Economic Update: Scraping events from {} → {}", start_date, end_date)).await; - // Pass the pool to the scraping function - let new_events_all = scrape_all_economic_events(&start_date, &end_date, pool).await?; + // Step 3: Scrape new events in batches + let new_events = scrape_all_economic_events(&start_date, &end_date, pool).await?; - let msg = format!("Economic Update: Scraped {} new events", new_events_all.len()); - println!("{}", msg); - logger::log_info(&msg).await; + logger::log_info(&format!("Economic Update: Scraped {} new events", new_events.len())).await; - // Process all at once or in batches - let result = process_batch(&new_events_all, &mut events, &today_str); - let total_changes = result.changes.len(); + // Step 4: Process events in streaming fashion + let (changes, updated_events) = process_events_streaming(&chunks, &new_events, &today_str).await?; - let msg = format!("Economic Update: Detected {} changes", total_changes); - println!("{}", msg); - logger::log_info(&msg).await; + logger::log_info(&format!("Economic Update: Detected {} changes", changes.len())).await; - if total_changes > 0 { - logger::log_info(&format!("Economic Update: Saving {} changes to log", total_changes)).await; - save_changes(&paths, &result.changes).await?; + if !changes.is_empty() { + logger::log_info(&format!("Economic Update: Saving {} changes to log", changes.len())).await; + save_changes(&paths, &changes).await?; logger::log_info("Economic Update: Changes saved successfully").await; } - logger::log_info(&format!("Economic Update: Saving {} total events to chunks", events.len())).await; - save_optimized_chunks(&paths, events).await?; + // Step 5: Save consolidated events + logger::log_info(&format!("Economic Update: Saving {} total events to chunks", updated_events.len())).await; + save_optimized_chunks(&paths, updated_events).await?; - let msg = format!("✓ Economic update complete — {} changes detected", total_changes); - println!("{}", msg); - logger::log_info(&msg).await; + logger::log_info(&format!("✓ Economic update complete — {} changes detected", changes.len())).await; Ok(()) } -/// Scrapes all economic events from start to end date using a dedicated ScrapeTask with the provided pool. -/// -/// This function creates a ScrapeTask to navigate to the Finanzen.net page, prepare it, -/// and then loop through date ranges to extract events. -/// -/// # Arguments -/// * `start` - Start date in YYYY-MM-DD. -/// * `end` - End date in YYYY-MM-DD. -/// * `pool` - Shared pool of ChromeDriver instances. -/// -/// # Returns -/// A vector of all extracted EconomicEvent structs. -/// -/// # Errors -/// Returns an error if task execution fails or extraction issues occur. -pub async fn scrape_all_economic_events(start: &str, end: &str, pool: &Arc) -> anyhow::Result> { +/// Process events using streaming to minimize memory usage +async fn process_events_streaming( + chunks: &[ChunkInfo], + new_events: &[EconomicEvent], + today: &str, +) -> anyhow::Result<(Vec, Vec)> { + let mut all_changes = Vec::new(); + let mut final_events: HashMap = HashMap::new(); + + // Step 1: Load existing events in batches + logger::log_info("Processing existing events in batches...").await; + + for chunk in chunks { + logger::log_info(&format!("Loading chunk: {:?}", chunk.path.file_name())).await; + + let content = tokio::fs::read_to_string(&chunk.path).await?; + let events: Vec = serde_json::from_str(&content)?; + + // Add to final events map + for e in events { + final_events.insert(event_key(&e), e); + } + + // Clear memory periodically + if final_events.len() > 10000 { + logger::log_info(&format!("Loaded {} events so far...", final_events.len())).await; + } + } + + logger::log_info(&format!("Loaded {} existing events total", final_events.len())).await; + + // Step 2: Process new events in batches + logger::log_info("Processing new events...").await; + + for (idx, batch) in new_events.chunks(500).enumerate() { + logger::log_info(&format!("Processing batch {} ({} events)", idx + 1, batch.len())).await; + + let batch_result = process_batch(batch, &mut final_events, today); + all_changes.extend(batch_result.changes); + + // Yield to prevent blocking + tokio::task::yield_now().await; + } + + logger::log_info(&format!("Processing complete. Total events: {}", final_events.len())).await; + + // Convert HashMap to Vec for saving + let events_vec: Vec = final_events.into_values().collect(); + + Ok((all_changes, events_vec)) +} + +/// Scrapes all economic events from start to end date +pub async fn scrape_all_economic_events( + start: &str, + end: &str, + pool: &Arc +) -> anyhow::Result> { let url = "https://www.finanzen.net/termine/wirtschaftsdaten/".to_string(); let start_clone = start.to_string(); let end_clone = end.to_string(); @@ -108,9 +145,18 @@ pub async fn scrape_all_economic_events(start: &str, end: &str, pool: &Arc 5000 { + logger::log_info(&format!("Scraped {} events so far, continuing...", all_events.len())).await; + } + let next = new_events.iter() .filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok()) .max() @@ -121,16 +167,17 @@ pub async fn scrape_all_economic_events(start: &str, end: &str, pool: &Arc end_clone { break; } current = next; } + Ok(all_events) }); - // Use the pool for execution task.execute_with_pool(pool).await } +/// Process a batch of events and detect changes pub fn process_batch( new_events: &[EconomicEvent], - existing: &mut std::collections::HashMap, + existing: &mut HashMap, today: &str, ) -> ScrapeResult { let mut changes = Vec::new(); diff --git a/src/scraper/docker_vpn_proxy.rs b/src/scraper/docker_vpn_proxy.rs index 05ef77a..e2a4086 100644 --- a/src/scraper/docker_vpn_proxy.rs +++ b/src/scraper/docker_vpn_proxy.rs @@ -150,7 +150,7 @@ impl DockerVpnProxyPool { async fn test_all_proxies_parallel(container_names: &[String], proxy_ports: &[u16]) -> Vec>> { let mut tasks = Vec::new(); - for (i, (container_name, port)) in container_names.iter().zip(proxy_ports.iter()).enumerate() { + for (_i, (container_name, port)) in container_names.iter().zip(proxy_ports.iter()).enumerate() { let name = container_name.clone(); let port = *port; diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index 4f67bce..0e61902 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -28,7 +28,7 @@ impl ChromeDriverPool { } /// Creates a new pool with task-per-instance limit but no proxy. - pub async fn new_with_task_limit(pool_size: usize, max_tasks_per_instance: usize) -> Result { + pub async fn _new_with_task_limit(pool_size: usize, max_tasks_per_instance: usize) -> Result { Self::new_with_proxy_and_task_limit(pool_size, None, max_tasks_per_instance).await }