diff --git a/Cargo.lock b/Cargo.lock index 2e2087c..144f41e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -678,6 +678,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "toml", "tracing", "tracing-subscriber", "yfinance-rs", @@ -2671,6 +2672,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_spanned" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e24345aa0fe688594e73770a5f6d1b216508b4f93484c0026d521acd30134392" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3116,6 +3126,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dc8b1fb61449e27716ec0e1bdf0f6b8f3e8f6b05391e8497b8b6d7804ea6d8" +dependencies = [ + "indexmap", + "serde_core", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + [[package]] name = "toml_datetime" version = "0.7.3" @@ -3146,6 +3171,12 @@ dependencies = [ "winnow", ] +[[package]] +name = "toml_writer" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2" + [[package]] name = "tower" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index 3a29d96..ca5ac52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ rand = "0.9.2" # Environment handling dotenvy = "0.15" +toml = "0.9.8" # Date & time chrono = { version = "0.4", features = ["serde"] } diff --git a/src/config.rs b/src/config.rs index 86d78a1..15a566d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,14 +1,23 @@ -// src/config.rs -#[derive(Debug, Clone)] +use anyhow::{Context, Result}; +use chrono::{self, Duration}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { // Economic calendar start (usually the earliest available on finanzen.net) pub economic_start_date: String, // e.g. "2007-02-13" - // Corporate earnings & price history start pub corporate_start_date: String, // e.g. "2000-01-01" or "2010-01-01" - // How far into the future we scrape economic events pub economic_lookahead_months: u32, // default: 3 + /// Maximum number of parallel scraping tasks (default: 10). + /// This limits concurrency to protect system load and prevent website spamming. + #[serde(default = "default_max_parallel")] + pub max_parallel_tasks: usize, +} + +fn default_max_parallel() -> usize { + 10 } impl Default for Config { @@ -17,11 +26,52 @@ impl Default for Config { economic_start_date: "2007-02-13".to_string(), corporate_start_date: "2010-01-01".to_string(), economic_lookahead_months: 3, + max_parallel_tasks: default_max_parallel(), } } } impl Config { + /// Loads the configuration from environment variables using dotenvy. + /// + /// This function loads a `.env` file if present (via `dotenvy::dotenv()`), + /// then retrieves each configuration value from environment variables. + /// If a variable is missing, it falls back to the default value. + /// Variable names are uppercase with underscores (e.g., ECONOMIC_START_DATE). + /// + /// # Returns + /// The loaded Config on success. + /// + /// # Errors + /// Returns an error if parsing fails (e.g., invalid integer for lookahead months). + pub fn load() -> Result { + // Load .env file if it exists; ignore if not found (dotenvy::dotenv returns Ok if no file) + let _ = dotenvy::dotenv().context("Failed to load .env file (optional)")?; + + let economic_start_date = dotenvy::var("ECONOMIC_START_DATE") + .unwrap_or_else(|_| "2007-02-13".to_string()); + + let corporate_start_date = dotenvy::var("CORPORATE_START_DATE") + .unwrap_or_else(|_| "2010-01-01".to_string()); + + let economic_lookahead_months: u32 = dotenvy::var("ECONOMIC_LOOKAHEAD_MONTHS") + .unwrap_or_else(|_| "3".to_string()) + .parse() + .context("Failed to parse ECONOMIC_LOOKAHEAD_MONTHS as u32")?; + + let max_parallel_tasks: usize = dotenvy::var("MAX_PARALLEL_TASKS") + .unwrap_or_else(|_| "10".to_string()) + .parse() + .context("Failed to parse MAX_PARALLEL_TASKS as usize")?; + + Ok(Self { + economic_start_date, + corporate_start_date, + economic_lookahead_months, + max_parallel_tasks, + }) + } + pub fn target_end_date(&self) -> String { let now = chrono::Local::now().naive_local().date(); let future = now + chrono::Duration::days(30 * self.economic_lookahead_months as i64); diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index f8a47eb..d456ee8 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -8,5 +8,4 @@ pub mod aggregation; pub mod fx; pub mod openfigi; -pub use types::*; pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index 80f1dc6..20e98fe 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -287,84 +287,6 @@ pub async fn build_lei_to_figi_infos(lei_to_isins: &HashMap> Ok(lei_to_figis) } -/// Loads or builds the LEI-to-FigiInfo map, filtering for equities via OpenFIGI. -/// -/// Attempts to load from "data/companies_by_lei/lei_to_figi.jsonl" (JSON Lines format, one LEI entry per line). -/// For any missing LEIs (compared to `lei_to_isins`), fetches their FigiInfos and appends -/// to the .jsonl file incrementally. This allows resumption after interruptions: on restart, -/// already processed LEIs are skipped, and only missing ones are fetched. -/// -/// If no API key is present, skips building and returns the loaded map (possibly partial). -/// -/// # Arguments -/// -/// * `lei_to_isins` - HashMap of LEI to Vec (used for building missing entries). -/// -/// # Returns -/// -/// The complete (or partial if interrupted) HashMap>. -/// -/// # Errors -/// -/// Returns an error if file I/O fails, JSON serialization/deserialization fails, -/// or if OpenFIGI queries fail during building. -pub async fn load_or_build_lei_to_figi_infos(lei_to_isins: &HashMap>) -> anyhow::Result>> { - let data_dir = Path::new("data"); - tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?; - - let path = data_dir.join("lei_to_figi.jsonl"); - let mut lei_to_figis: HashMap> = load_lei_to_figi_jsonl(&path)?; - - let client = OpenFigiClient::new()?; - if !client.has_key { - println!("No API key—using partial LEI→FIGI map with {} entries", lei_to_figis.len()); - return Ok(lei_to_figis); - } - - // Sort LEIs for deterministic processing order - let mut leis: Vec<_> = lei_to_isins.keys().cloned().collect(); - leis.sort(); - - let mut processed = lei_to_figis.len(); - let total = leis.len(); - - for lei in leis { - if lei_to_figis.contains_key(&lei) { - continue; // Skip already processed - } - - let isins = match lei_to_isins.get(&lei) { - Some(i) => i, - None => continue, - }; - - let unique_isins: Vec<_> = isins.iter().cloned().collect::>().into_iter().collect(); - let equity_figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; - - let mut figis = equity_figi_infos; - if !figis.is_empty() { - figis.sort_by_key(|f| f.figi.clone()); - figis.dedup_by_key(|f| f.figi.clone()); - } - - // Append to .jsonl - append_lei_to_figi_jsonl(&path, &lei, &figis)?; - - // Insert into in-memory map (optional, but useful for return value) - lei_to_figis.insert(lei.clone(), figis); - - processed += 1; - if processed % 100 == 0 { - println!("Processed {}/{} LEIs → {} total equity FIGIs", processed, total, lei_to_figis.values().map(|v| v.len()).sum::()); - } - - tokio::time::sleep(Duration::from_millis(100)).await; - } - - println!("Completed LEI→FIGI map: {} mappings (equity-only)", lei_to_figis.len()); - Ok(lei_to_figis) -} - /// Loads LEI-to-FigiInfo map from a JSON Lines file. /// /// Each line is expected to be a JSON object: {"lei": "ABC", "figis": [FigiInfo...]} @@ -436,60 +358,396 @@ fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyho Ok(()) } -/// Loads or builds a list of CompanyMetadata using LEI as the primary key. +/// Loads or builds a HashMap of CompanyInfo objects indexed by company name. /// -/// Attempts to load pre-built company metadata from "data/companies_by_lei/companies_lei.json". -/// If the cache does not exist, builds the metadata by first obtaining the LEI-to-FigiInfo map -/// (loading or fetching via OpenFIGI if necessary), then constructs CompanyMetadata for each LEI. +/// This function: +/// 1. Attempts to load existing companies from cache +/// 2. If cache exists, updates/extends it with new data from figi_to_lei +/// 3. If no cache exists, creates a new HashMap from scratch +/// 4. Saves the result back to cache /// -/// Only includes LEIs that have associated ISINs from the input map. If no FigiInfos are available -/// for a LEI (e.g., no equity listings), the `figi` field will be `None`. +/// For existing entries (matched by name): +/// - Merges securities lists (deduplicates by FIGI) +/// - Updates primary_isin if the existing one is empty or not in the securities list +/// +/// For new entries: +/// - Adds them to the HashMap +/// +/// Companies with no FigiInfo data are skipped. +/// The resulting HashMap is saved to `data/companies_by_name/companies.json`. /// /// # Arguments -/// -/// * `lei_to_isins` - Mapping of LEI to associated ISINs (used for building the FigiInfo map if needed). +/// * `figi_to_lei` - HashMap mapping LEI to Vec. /// /// # Returns -/// -/// A vector of `CompanyMetadata` structs, sorted by LEI. +/// A HashMap mapping company name to CompanyInfo. /// /// # Errors +/// Returns an error if file I/O fails or JSON serialization fails. +pub async fn load_or_build_companies_by_name( + figi_to_lei: &HashMap> +) -> anyhow::Result> { + // Try to load existing cache + let mut companies_by_name = match load_companies_by_name_internal().await? { + Some(existing) => { + println!("Loaded {} existing companies from cache", existing.len()); + existing + }, + None => { + println!("No existing cache found, creating new companies HashMap"); + HashMap::new() + } + }; + + let initial_count = companies_by_name.len(); + let mut added_count = 0; + let mut updated_count = 0; + + println!("Processing {} LEI entries from FIGI data...", figi_to_lei.len()); + + for (lei, figi_infos) in figi_to_lei.iter() { + // Skip entries with no FigiInfo data + if figi_infos.is_empty() { + continue; + } + + // Get company name from first FigiInfo entry + let name = figi_infos[0].name.clone(); + if name.is_empty() { + continue; + } + + // Check if company already exists + if let Some(existing_company) = companies_by_name.get_mut(&name) { + // Update existing entry + let merged_securities = merge_securities(&existing_company.securities, figi_infos); + let securities_added = merged_securities.len() - existing_company.securities.len(); + + if securities_added > 0 { + existing_company.securities = merged_securities; + + // Update primary_isin if needed + if existing_company.primary_isin.is_empty() || + !existing_company.securities.iter().any(|s| s.isin == existing_company.primary_isin) { + existing_company.primary_isin = existing_company.securities[0].isin.clone(); + } + + updated_count += 1; + } + } else { + // Add new entry + let primary_isin = figi_infos[0].isin.clone(); + let securities = figi_infos.clone(); + + let company_info = CompanyInfo { + name: name.clone(), + primary_isin, + securities, + }; + + companies_by_name.insert(name, company_info); + added_count += 1; + } + } + + println!(" Companies statistics:"); + println!(" - Initial: {}", initial_count); + println!(" - Added: {}", added_count); + println!(" - Updated: {}", updated_count); + println!(" - Total: {}", companies_by_name.len()); + + // Save to JSON + save_companies_by_name(&companies_by_name).await?; + + Ok(companies_by_name) +} + +/// Merges two lists of FigiInfo, deduplicating by FIGI. /// -/// Returns an error if file I/O fails, JSON serialization/deserialization fails, -/// or if building the LEI-to-FigiInfo map encounters issues (e.g., API errors). -pub async fn load_or_build_companies_lei( - lei_to_isins: &HashMap>, -) -> anyhow::Result> { - let cache_path = Path::new("data/companies_by_lei/companies_lei.json"); - - if cache_path.exists() { - let content = tokio_fs::read_to_string(cache_path).await.context("Failed to read companies cache")?; - let mut companies: Vec = serde_json::from_str(&content).context("Failed to parse companies JSON")?; - companies.sort_by_key(|c| c.lei.clone()); - println!("Loaded {} LEI-keyed companies from cache.", companies.len()); - return Ok(companies); +/// # Arguments +/// * `existing` - Existing securities list +/// * `new_securities` - New securities to merge +/// +/// # Returns +/// Merged and deduplicated list of FigiInfo +fn merge_securities(existing: &[FigiInfo], new_securities: &[FigiInfo]) -> Vec { + let mut merged = existing.to_vec(); + let existing_figis: HashSet = existing.iter() + .map(|f| f.figi.clone()) + .collect(); + + for new_sec in new_securities { + if !existing_figis.contains(&new_sec.figi) { + merged.push(new_sec.clone()); + } } + + // Sort by FIGI for consistency + merged.sort_by(|a, b| a.figi.cmp(&b.figi)); + + merged +} - // Build or load the LEI-to-FigiInfo map (with incremental persistence) - let lei_to_figi = load_or_build_lei_to_figi_infos(lei_to_isins).await?; - - // Build companies from all LEIs in lei_to_isins (even if no FigiInfos) - let mut companies = Vec::new(); - for lei in lei_to_isins.keys() { - let figis = lei_to_figi.get(lei).cloned(); - companies.push(CompanyMetadata { - lei: lei.clone(), - figi: figis.and_then(|v| if v.is_empty() { None } else { Some(v) }), - }); +/// Internal function to load the companies HashMap from cache. +/// +/// # Returns +/// Some(HashMap) if the cache file exists and is valid, None otherwise. +/// +/// # Errors +/// Returns an error if file I/O fails or JSON parsing fails. +async fn load_companies_by_name_internal() -> anyhow::Result>> { + let cache_file = Path::new("data/companies_by_name/companies.json"); + + if !cache_file.exists() { + return Ok(None); } + + let content = tokio_fs::read_to_string(cache_file).await + .context("Failed to read companies.json")?; + + let companies: HashMap = serde_json::from_str(&content) + .context("Failed to parse companies.json")?; + + Ok(Some(companies)) +} - companies.sort_by_key(|c| c.lei.clone()); +/// Saves the companies HashMap to cache. +/// +/// # Arguments +/// * `companies` - The companies HashMap to save +/// +/// # Errors +/// Returns an error if file I/O fails or JSON serialization fails. +async fn save_companies_by_name(companies: &HashMap) -> anyhow::Result<()> { + let cache_dir = Path::new("data/companies_by_name"); + tokio_fs::create_dir_all(cache_dir).await + .context("Failed to create data/companies_by_name directory")?; + + let cache_file = cache_dir.join("companies.json"); + let json_str = serde_json::to_string_pretty(&companies) + .context("Failed to serialize companies to JSON")?; + + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write companies.json")?; + + println!(" ✓ Saved {} companies to {}", companies.len(), cache_file.display()); + + Ok(()) +} - // Cache the result - let data_dir = Path::new("data"); - tokio_fs::create_dir_all(data_dir).await.context("Failed to create data directory")?; - tokio_fs::write(cache_path, serde_json::to_string_pretty(&companies)?).await.context("Failed to write companies cache")?; +/// Loads all OpenFIGI mapping value lists (marketSecDes, micCode, securityType). +/// +/// This function fetches the available values for each mapping parameter from the OpenFIGI API +/// and caches them as JSON files in `data/openfigi/`. If the files already exist and are recent +/// (less than 30 days old), they are reused instead of re-fetching. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if API requests fail, JSON parsing fails, or file I/O fails. +pub async fn load_figi_type_lists() -> anyhow::Result<()> { + println!("Loading OpenFIGI mapping value lists..."); + + let client = OpenFigiClient::new()?; + + // Create cache directory + let cache_dir = Path::new("data/openfigi"); + tokio_fs::create_dir_all(cache_dir).await + .context("Failed to create data/openfigi directory")?; + + // Fetch each type list + get_figi_market_sec_des(&client, cache_dir).await?; + get_figi_mic_code(&client, cache_dir).await?; + get_figi_security_type(&client, cache_dir).await?; + + println!("OpenFIGI mapping value lists loaded successfully"); + Ok(()) +} - println!("Built and cached {} LEI-keyed companies.", companies.len()); - Ok(companies) +/// Fetches and caches the list of valid marketSecDes values. +/// +/// # Arguments +/// * `client` - The OpenFIGI client instance. +/// * `cache_dir` - Directory to save the cached JSON file. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if the API request fails or file I/O fails. +async fn get_figi_market_sec_des(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { + let cache_file = cache_dir.join("marketSecDes.json"); + + // Check if cache exists and is recent (< 30 days old) + if should_use_cache(&cache_file).await? { + println!(" Using cached marketSecDes values"); + return Ok(()); + } + + println!(" Fetching marketSecDes values from OpenFIGI API..."); + + let resp = client.client + .get("https://api.openfigi.com/v3/mapping/values/marketSecDes") + .send() + .await + .context("Failed to fetch marketSecDes values")?; + + handle_rate_limit(&resp).await?; + + let values: Value = resp.json().await + .context("Failed to parse marketSecDes response")?; + + // Save to cache + let json_str = serde_json::to_string_pretty(&values)?; + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write marketSecDes cache")?; + + println!(" ✓ Cached marketSecDes values"); + + // Respect rate limits + sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; + + Ok(()) +} + +/// Fetches and caches the list of valid micCode values. +/// +/// # Arguments +/// * `client` - The OpenFIGI client instance. +/// * `cache_dir` - Directory to save the cached JSON file. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if the API request fails or file I/O fails. +async fn get_figi_mic_code(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { + let cache_file = cache_dir.join("micCode.json"); + + if should_use_cache(&cache_file).await? { + println!(" Using cached micCode values"); + return Ok(()); + } + + println!(" Fetching micCode values from OpenFIGI API..."); + + let resp = client.client + .get("https://api.openfigi.com/v3/mapping/values/micCode") + .send() + .await + .context("Failed to fetch micCode values")?; + + handle_rate_limit(&resp).await?; + + let values: Value = resp.json().await + .context("Failed to parse micCode response")?; + + let json_str = serde_json::to_string_pretty(&values)?; + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write micCode cache")?; + + println!(" ✓ Cached micCode values"); + + sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; + + Ok(()) +} + +/// Fetches and caches the list of valid securityType values. +/// +/// # Arguments +/// * `client` - The OpenFIGI client instance. +/// * `cache_dir` - Directory to save the cached JSON file. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if the API request fails or file I/O fails. +async fn get_figi_security_type(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { + let cache_file = cache_dir.join("securityType.json"); + + if should_use_cache(&cache_file).await? { + println!(" Using cached securityType values"); + return Ok(()); + } + + println!(" Fetching securityType values from OpenFIGI API..."); + + let resp = client.client + .get("https://api.openfigi.com/v3/mapping/values/securityType") + .send() + .await + .context("Failed to fetch securityType values")?; + + handle_rate_limit(&resp).await?; + + let values: Value = resp.json().await + .context("Failed to parse securityType response")?; + + let json_str = serde_json::to_string_pretty(&values)?; + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write securityType cache")?; + + println!(" ✓ Cached securityType values"); + + sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; + + Ok(()) +} + +/// Checks if a cache file exists and is less than 30 days old. +/// +/// # Arguments +/// * `path` - Path to the cache file. +/// +/// # Returns +/// True if the cache should be used, false if it needs refreshing. +async fn should_use_cache(path: &Path) -> anyhow::Result { + if !path.exists() { + return Ok(false); + } + + let metadata = tokio_fs::metadata(path).await?; + let modified = metadata.modified()?; + let age = modified.elapsed().unwrap_or(std::time::Duration::from_secs(u64::MAX)); + + // Cache is valid for 30 days + Ok(age < std::time::Duration::from_secs(30 * 24 * 60 * 60)) +} + +/// Handles rate limit responses from the OpenFIGI API. +/// +/// If a 429 status is received, this function sleeps for the duration specified +/// in the `ratelimit-reset` header (or 10 seconds by default). +/// +/// # Arguments +/// * `resp` - The HTTP response to check. +/// +/// # Returns +/// Ok(()) if no rate limit, or after waiting for the reset period. +/// +/// # Errors +/// Returns an error if the response status indicates a non-rate-limit error. +async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> { + let status = resp.status(); + + if status == 429 { + let headers = resp.headers(); + let reset_sec = headers + .get("ratelimit-reset") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(10); + + println!(" Rate limited—waiting {}s", reset_sec); + sleep(std::time::Duration::from_secs(reset_sec.max(10))).await; + + return Err(anyhow!("Rate limited, please retry")); + } else if status.is_client_error() || status.is_server_error() { + return Err(anyhow!("OpenFIGI API error: {}", status)); + } + + Ok(()) } \ No newline at end of file diff --git a/src/corporate/scraper.rs b/src/corporate/scraper.rs index 1501112..a0d2b8c 100644 --- a/src/corporate/scraper.rs +++ b/src/corporate/scraper.rs @@ -1,18 +1,17 @@ -use crate::corporate::openfigi::OpenFigiClient; - // src/corporate/scraper.rs -use super::{types::*, helpers::*}; -use csv::ReaderBuilder; +use super::{types::*, helpers::*, openfigi::*}; +//use crate::corporate::openfigi::OpenFigiClient; +use crate::{scraper::webdriver::*}; use fantoccini::{Client, Locator}; use scraper::{Html, Selector}; -use chrono::{DateTime, Duration, NaiveDate, Timelike, Utc}; +use chrono::{DateTime, Duration, NaiveDate, Utc}; use tokio::{time::{Duration as TokioDuration, sleep}}; use reqwest::Client as HttpClient; use serde_json::{json, Value}; use zip::ZipArchive; -use std::fs::File; -use std::{collections::HashMap}; -use std::io::{Read, BufReader}; +use std::{collections::HashMap, sync::Arc}; +use std::io::{Read}; +use anyhow::{anyhow, Result}; const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"; @@ -333,28 +332,6 @@ fn exchange_name_to_mic(name: &str) -> String { }.to_string() } -pub async fn dismiss_yahoo_consent(client: &Client) -> anyhow::Result<()> { - let script = r#" - (() => { - const agree = document.querySelector('button[name="agree"]'); - if (agree) { - agree.click(); - return true; - } - return false; - })() - "#; - - for _ in 0..10 { - let done: bool = client.execute(script, vec![]).await?.as_bool().unwrap_or(false); - if done { - break; - } - sleep(TokioDuration::from_millis(500)).await; - } - Ok(()) -} - /// Fetches earnings events for a ticker using a dedicated ScrapeTask. /// /// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar, @@ -368,74 +345,137 @@ pub async fn dismiss_yahoo_consent(client: &Client) -> anyhow::Result<()> { /// /// # Errors /// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues. -pub async fn get_earnings_events_task(ticker: &str) -> anyhow::Result> { +pub async fn fetch_earnings_with_pool( + ticker: &str, + pool: &Arc, +) -> anyhow::Result> { + let ticker = ticker.to_string(); let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}", ticker); - let task: ScrapeTask> = ScrapeTask::new( - url, - |client| Box::pin(async move { - reject_yahoo_cookies(client).await?; - extract_earnings(client).await // Assuming extract_earnings is an async fn that uses client - }), - ); - task.execute().await + + let ticker_cloned = ticker.clone(); + + pool.execute(url, move |client| { + let ticker = ticker_cloned.clone(); + Box::pin(async move { + reject_yahoo_cookies(&client).await?; + extract_earnings_events(&client, &ticker).await + }) + }).await } -pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Result> { - let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}&offset=0&size=100", ticker); - client.goto(&url).await?; - dismiss_yahoo_consent(client).await?; +/// Extracts earnings events from the currently loaded Yahoo Finance earnings calendar page. +/// +/// This function assumes the client is already navigated to the correct URL (e.g., +/// https://finance.yahoo.com/calendar/earnings?symbol={ticker}) and cookies are handled. +/// +/// It waits for the earnings table, extracts rows, parses cells into CompanyEvent structs, +/// and handles date parsing, float parsing, and optional fields. +/// +/// # Arguments +/// * `client` - The fantoccini Client with the page loaded. +/// * `ticker` - The stock ticker symbol for the events. +/// +/// # Returns +/// A vector of CompanyEvent on success. +/// +/// # Errors +/// Returns an error if: +/// - Table or elements not found. +/// - Date or float parsing fails. +/// - WebDriver operations fail. +/// +/// # Examples +/// +/// ```no_run +/// use fantoccini::Client; +/// use crate::corporate::scraper::extract_earnings; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // Assume client is set up and navigated +/// let events = extract_earnings(&client, "AAPL").await?; +/// Ok(()) +/// } +/// ``` +pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result> { + // Wait for the table to load + let table = client + .wait() + .for_element(Locator::Css(r#"table[data-test="cal-table"]"#)) + .await + .map_err(|e| anyhow!("Failed to find earnings table: {}", e))?; - loop { - match client.find(Locator::XPath(r#"//button[contains(text(), 'Show More')]"#)).await { - Ok(btn) => { - btn.click().await?; - sleep(TokioDuration::from_secs(2)).await; - } - Err(_) => break, + // Find all rows in tbody + let rows = table + .find_all(Locator::Css("tbody tr")) + .await + .map_err(|e| anyhow!("Failed to find table rows: {}", e))?; + + let mut events = Vec::with_capacity(rows.len()); + + for row in rows { + let cells = row + .find_all(Locator::Css("td")) + .await + .map_err(|e| anyhow!("Failed to find cells in row: {}", e))?; + + if cells.len() < 5 { + continue; // Skip incomplete rows } - } - let html = client.source().await?; - let document = Html::parse_document(&html); - let row_sel = Selector::parse("table tbody tr").unwrap(); - let mut events = Vec::new(); + // Extract and parse date + let date_str = cells[0] + .text() + .await + .map_err(|e| anyhow!("Failed to get date text: {}", e))?; + let date = parse_yahoo_date(&date_str) + .map_err(|e| anyhow!("Failed to parse date '{}': {}", date_str, e))? + .format("%Y-%m-%d") + .to_string(); - for row in document.select(&row_sel) { - let cols: Vec = row.select(&Selector::parse("td").unwrap()) - .map(|td| td.text().collect::>().join(" ").trim().to_string()) - .collect(); - if cols.len() < 6 { continue; } + // Extract time, replace "Time Not Supplied" with empty + let time = cells[1] + .text() + .await + .map_err(|e| anyhow!("Failed to get time text: {}", e))? + .replace("Time Not Supplied", ""); - let full_date = &cols[2]; - let parts: Vec<&str> = full_date.split(" at ").collect(); - let raw_date = parts[0].trim(); - let time_str = if parts.len() > 1 { parts[1].trim() } else { "" }; + // Extract period + let period = cells[2] + .text() + .await + .map_err(|e| anyhow!("Failed to get period text: {}", e))?; - let date = match parse_yahoo_date(raw_date) { - Ok(d) => d, - Err(_) => continue, - }; + // Parse EPS forecast + let eps_forecast_str = cells[3] + .text() + .await + .map_err(|e| anyhow!("Failed to get EPS forecast text: {}", e))?; + let eps_forecast = parse_float(&eps_forecast_str); - let eps_forecast = parse_float(&cols[3]); - let eps_actual = if cols[4] == "-" { None } else { parse_float(&cols[4]) }; + // Parse EPS actual + let eps_actual_str = cells[4] + .text() + .await + .map_err(|e| anyhow!("Failed to get EPS actual text: {}", e))?; + let eps_actual = parse_float(&eps_actual_str); - let surprise_pct = if let (Some(f), Some(a)) = (eps_forecast, eps_actual) { - if f.abs() > 0.001 { Some((a - f) / f.abs() * 100.0) } else { None } - } else { None }; - - let time = if time_str.contains("PM") { - "AMC".to_string() - } else if time_str.contains("AM") { - "BMO".to_string() + // Parse surprise % if available + let surprise_pct = if cells.len() > 5 { + let surprise_str = cells[5] + .text() + .await + .map_err(|e| anyhow!("Failed to get surprise text: {}", e))?; + parse_float(&surprise_str) } else { - "".to_string() + None }; events.push(CompanyEvent { ticker: ticker.to_string(), - date: date.format("%Y-%m-%d").to_string(), + date, time, - period: "".to_string(), + period, eps_forecast, eps_actual, revenue_forecast: None, @@ -445,6 +485,12 @@ pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Re }); } + if events.is_empty() { + eprintln!("Warning: No earnings events extracted for ticker {}", ticker); + } else { + println!("Extracted {} earnings events for {}", events.len(), ticker); + } + Ok(events) } @@ -768,57 +814,6 @@ pub async fn load_isin_lei_csv() -> anyhow::Result>> Ok(map) } -pub async fn get_primary_isin_and_name( - client: &Client, // Pass your existing Selenium client - ticker: &str, -) -> anyhow::Result { - // Navigate to the actual quote page (always works) - let quote_url = format!("https://finance.yahoo.com/quote/{}", ticker); - client.goto("e_url).await?; - - // Dismiss overlays/banners (your function + guce-specific) - reject_yahoo_cookies(client).await?; - - // Wait for page to load (key data elements) - sleep(TokioDuration::from_millis(2000)).await; - - // Get page HTML and parse - let html = client.source().await?; - let document = Html::parse_document(&html); - - // Selectors for key fields (tested on real Yahoo pages Nov 2025) - let name_sel = Selector::parse("h1[data-testid='qsp-price-header']").unwrap_or_else(|_| Selector::parse("h1").unwrap()); - let isin_sel = Selector::parse("[data-testid='qsp-symbol'] + div [data-field='isin']").unwrap_or_else(|_| Selector::parse("[data-field='isin']").unwrap()); - let exchange_sel = Selector::parse("[data-testid='qsp-market'] span").unwrap_or_else(|_| Selector::parse(".TopNav__Exchange").unwrap()); - let currency_sel = Selector::parse("[data-testid='qsp-price'] span:contains('USD')").unwrap_or_else(|_| Selector::parse(".TopNav__Currency").unwrap()); // Adjust for dynamic - - let name_elem = document.select(&name_sel).next().map(|e| e.text().collect::().trim().to_string()); - let isin_elem = document.select(&isin_sel).next().map(|e| e.text().collect::().trim().to_uppercase()); - let exchange_elem = document.select(&exchange_sel).next().map(|e| e.text().collect::().trim().to_string()); - let currency_elem = document.select(¤cy_sel).next().map(|e| e.text().collect::().trim().to_string()); - - let name = name_elem.unwrap_or_else(|| ticker.to_string()); - let isin = isin_elem.unwrap_or_default(); - let exchange_mic = exchange_elem.unwrap_or_default(); - let currency = currency_elem.unwrap_or_else(|| "USD".to_string()); - - // Validate ISIN - let valid_isin = if isin.len() == 12 && isin.chars().all(|c| c.is_alphanumeric()) { - isin - } else { - "".to_string() - }; - - println!(" → Scraped {}: {} | ISIN: {} | Exchange: {}", ticker, name, valid_isin, exchange_mic); - - Ok(PrimaryInfo { - isin: valid_isin, - name, - exchange_mic, - currency, - }) -} - pub async fn reject_yahoo_cookies(client: &Client) -> anyhow::Result<()> { for _ in 0..10 { let clicked: bool = client diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index 6425246..ec1610e 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -1,5 +1,5 @@ // src/corporate/storage.rs -use super::{types::*, helpers::*, scraper::get_primary_isin_and_name}; +use super::{types::*, helpers::*}; use crate::config; use tokio::fs; @@ -102,17 +102,6 @@ pub async fn save_prices_for_ticker(ticker: &str, timeframe: &str, mut prices: V Ok(()) } -pub async fn _load_companies() -> Result, anyhow::Error> { - let path = Path::new("src/data/companies.json"); - if !path.exists() { - println!("Missing companies.json file at src/data/companies.json"); - return Ok(vec![]); - } - let content = fs::read_to_string(path).await?; - let companies: Vec = serde_json::from_str(&content)?; - Ok(companies) -} - pub fn get_company_dir(lei: &str) -> PathBuf { PathBuf::from("corporate_prices").join(lei) } @@ -132,20 +121,6 @@ pub async fn ensure_company_dirs(isin: &str) -> anyhow::Result<()> { Ok(()) } -pub async fn save_company_metadata(company: &CompanyMetadata) -> anyhow::Result<()> { - let dir = get_company_dir(&company.lei); - fs::create_dir_all(&dir).await?; - let path = dir.join("metadata.json"); - fs::write(&path, serde_json::to_string_pretty(company)?).await?; - Ok(()) -} - -pub async fn load_company_metadata(lei: &str) -> anyhow::Result { - let path = get_company_dir(lei).join("metadata.json"); - let content = fs::read_to_string(path).await?; - Ok(serde_json::from_str(&content)?) -} - pub async fn save_available_exchanges(isin: &str, exchanges: Vec) -> anyhow::Result<()> { let dir = get_company_dir(isin); fs::create_dir_all(&dir).await?; diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 486e637..e202cf7 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,35 +1,33 @@ // src/corporate/update.rs use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfigi::*}; use crate::config::Config; +use crate::scraper::webdriver::ChromeDriverPool; use chrono::Local; use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use futures::{stream::{self, StreamExt}}; -/// Hauptfunktion: Vollständiger Update-Durchlauf für alle Unternehmen (LEI-basiert) +/// Main function: Full update for all companies (LEI-based) with optimized parallel execution. /// -/// Diese Funktion koordiniert den gesamten Update-Prozess: -/// - Lädt GLEIF-Mappings -/// - Baut FIGI-LEI-Map -/// - Lädt bestehende Events -/// - Verarbeitet jede Company: Ergänzt ISINs (abgeleitet aus FIGI), entdeckt Exchanges via FIGI, -/// holt Prices & Earnings, aggregiert Daten -/// - Speichert optimierte Events +/// This function coordinates the entire update process: +/// - Loads GLEIF mappings +/// - Builds FIGI-LEI map +/// - Loads existing events +/// - Processes each company: discovers exchanges via FIGI, fetches prices & earnings, aggregates data +/// - Uses the provided shared ChromeDriver pool for efficient parallel scraping +/// - Saves optimized events /// /// # Arguments -/// * `config` - Konfiguration mit Startdaten etc. -/// -/// # Returns -/// `Ok(())` bei Erfolg, sonst `anyhow::Error` mit Kontext. +/// * `config` - The application configuration. +/// * `pool` - Shared pool of ChromeDriver instances for scraping. /// /// # Errors -/// - Mapping-Laden fehlschlägt (Warning, fährt mit leer fort) -/// - Company-Laden/Bauen fehlschlägt -/// - Directory Creation oder Speichern fehlschlägt -/// - Discovery/Fetch/Aggregation pro Company fehlschlägt (fortgesetzt bei Fehlern, mit Log) -pub async fn run_full_update(config: &Config) -> anyhow::Result<()> { +/// Returns an error if any step in the update process fails. +pub async fn run_full_update(config: &Config, pool: &Arc) -> anyhow::Result<()> { println!("=== Starting LEI-based corporate full update ==="); - // 1. Frisches GLEIF ISIN ↔ LEI Mapping laden (jeder Lauf neu) + // 1. Load fresh GLEIF ISIN ↔ LEI mapping let lei_to_isins: HashMap> = match load_isin_lei_csv().await { Ok(map) => map, Err(e) => { @@ -38,8 +36,16 @@ pub async fn run_full_update(config: &Config) -> anyhow::Result<()> { } }; - // 2. FIGI → LEI Map (optional, nur mit API-Key sinnvoll) - let figi_to_lei= match build_lei_to_figi_infos(&lei_to_isins).await { + // 2. Load OpenFIGI mapping value lists (cached) + if let Err(e) = load_figi_type_lists().await { + eprintln!("Warning: Could not load OpenFIGI type lists: {}", e); + } + + // 3. Build FIGI → LEI map + // # Attributes + // * lei: Structuring the companies by legal dependencies [LEI -> Vec] + // * figi: metadata with ISIN as key + let figi_to_lei:HashMap> = match build_lei_to_figi_infos(&lei_to_isins).await { Ok(map) => map, Err(e) => { eprintln!("Warning: Could not build FIGI→LEI map: {}", e); @@ -47,7 +53,11 @@ pub async fn run_full_update(config: &Config) -> anyhow::Result<()> { } }; - // 3. Bestehende Earnings-Events laden (für Change-Detection) + // 4. Load or build companies + let mut companies = load_or_build_companies_by_name(&figi_to_lei).await?; + println!("Processing {} companies", companies.len()); + + // 5. Load existing earnings events (for change detection) let today = Local::now().format("%Y-%m-%d").to_string(); let mut existing_events = match load_existing_events().await { Ok(events) => events, @@ -57,162 +67,47 @@ pub async fn run_full_update(config: &Config) -> anyhow::Result<()> { } }; - // 4. Unternehmen laden / neu aufbauen (LEI + FIGI-Infos) - let mut companies: Vec = load_or_build_companies_lei(&lei_to_isins).await?; + // 5. Use the provided pool (no need to create a new one) + let pool_size = pool.get_number_of_instances(); // Use the size from the shared pool - // 4.1 LEIs anreichern (falls missing, über bekannte ISINs aus FIGI suchen) - //enrich_companies_with_leis(&mut companies, &lei_to_isins).await?; - - // 5. Haupt-Loop: Jedes Unternehmen verarbeiten - for company in companies.iter_mut() { - let lei = &company.lei; - let figi_infos = company.figi.as_ref().map_or(&[][..], |v| &v[..]); - let name = figi_infos.first().map(|f| f.name.as_str()).unwrap_or("Unknown"); - println!("\nProcessing company: {} (LEI: {})", name, lei); - - // --- 5.1 Alle bekannten ISINs aus GLEIF ergänzen --- - let mut all_isins = lei_to_isins.get(lei).cloned().unwrap_or_default(); - let figi_isins: Vec = figi_infos.iter().map(|f| f.isin.clone()).collect::>().into_iter().collect(); - all_isins.extend(figi_isins); - all_isins.sort(); - all_isins.dedup(); // Unique ISINs - - // --- 5.2 Verzeichnisstruktur anlegen & Metadaten speichern --- - ensure_company_dirs(lei).await?; - save_company_metadata(company).await?; - - // --- 5.3 FIGI-Infos ermitteln (falls noch nicht vorhanden) --- - let figi_infos = company.figi.get_or_insert_with(Vec::new); - if figi_infos.is_empty() { - println!(" No FIGI data yet → discovering exchanges via first known ISIN"); - let first_isin = all_isins.first().cloned().unwrap_or_default(); - if !first_isin.is_empty() { - match discover_available_exchanges(&first_isin, "").await { - Ok(discovered) => { - figi_infos.extend(discovered); - println!(" Discovered {} exchange(s) for first ISIN", figi_infos.len()); - } - Err(e) => eprintln!(" Discovery failed for first ISIN: {}", e), - } + // Process companies in parallel using the shared pool + /*let results: Vec<_> = stream::iter(companies.into_iter()) + .map(|company| { + let pool_clone = pool.clone(); + async move { + process_company_data(&company, &pool_clone, &mut existing_events).await } - } else { - println!(" {} exchange(s) already known", figi_infos.len()); + }) + .buffer_unordered(pool_size) + .collect().await; + + // Handle results (e.g., collect changes) + let mut all_changes = Vec::new(); + for result in results { + if let Ok(ProcessResult { changes }) = result { + all_changes.extend(changes); } + }*/ - // --- 5.4 Weitere Exchanges über alle ISINs suchen --- - let mut new_discovered = 0; - for isin in &all_isins { - if figi_infos.iter().any(|f| f.isin == *isin) { - continue; // Schon bekannt - } - println!(" Discovering additional exchanges for ISIN {}", isin); - match discover_available_exchanges(isin, "").await { - Ok(mut found) => { - for info in found.drain(..) { - if !figi_infos.iter().any(|f| f.ticker == info.ticker && f.mic_code == info.mic_code) { - figi_infos.push(info); - new_discovered += 1; - } - } - } - Err(e) => eprintln!(" Discovery failed for {}: {}", isin, e), - } - } - if new_discovered > 0 { - println!(" +{} new exchange(s) discovered and added", new_discovered); - } - - // --- 5.5 AvailableExchange-Einträge anlegen (für Preis-Downloads) --- - for figi in figi_infos.iter() { - if let Err(e) = add_discovered_exchange(&figi.isin, figi).await { - eprintln!(" Failed to record exchange {}: {}", figi.ticker, e); - } - } - - // --- 5.6 Preisdaten von allen Exchanges holen --- - println!(" Fetching price data from {} exchange(s)...", figi_infos.len()); - let primary_isin = figi_infos.first().map(|f| f.isin.clone()).unwrap_or_default(); - for figi in figi_infos.iter() { - let ticker = &figi.ticker; - let mic = &figi.mic_code; - let is_primary = figi.isin == primary_isin; - let mut daily_success = false; - let mut intraday_success = false; - - // Earnings: only fetch from primary ticker to avoid duplicates - if is_primary { - match fetch_earnings_history(client, ticker).await { - Ok(new_events) => { - let result = process_batch(&new_events, &mut existing_events, &today); - save_changes(&result.changes).await?; - println!(" Earnings events: {}", new_events.len()); - } - Err(e) => eprintln!(" Failed to fetch earnings for {}: {}", ticker, e), - } - } - - // Daily prices - match fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await { - Ok(prices) => { - if !prices.is_empty() { - save_prices_by_source(lei, ticker, "daily", prices).await?; - daily_success = true; - } - } - Err(e) => eprintln!(" Failed to fetch daily prices for {}: {}", ticker, e), - } - - // 5-minute intraday (last 60 days) - let sixty_days_ago = (Local::now() - chrono::Duration::days(60)) - .format("%Y-%m-%d") - .to_string(); - match fetch_price_history_5min(ticker, &sixty_days_ago, &today).await { - Ok(prices) => { - if !prices.is_empty() { - save_prices_by_source(lei, ticker, "5min", prices).await?; - intraday_success = true; - } - } - Err(e) => eprintln!(" Failed to fetch 5min prices for {}: {}", ticker, e), - } - - // Update available_exchanges.json (now under LEI folder) - update_available_exchange(&figi.isin, ticker, mic, daily_success, intraday_success).await?; - - tokio::time::sleep(tokio::time::Duration::from_millis(800)).await; - } - - // --- 5.7 Aggregation aller Quellen → einheitliche USD-Preise --- - println!(" Aggregating price data across all sources (FX-adjusted to USD)"); - if let Err(e) = aggregate_best_price_data(lei).await { - eprintln!(" Aggregation failed: {}", e); - } else { - println!(" Aggregation completed successfully"); - } - - // Metadaten erneut speichern (falls FIGIs hinzugefügt wurden) - save_company_metadata(company).await?; - } - - // 6. Optimierte Earnings-Events final speichern save_optimized_events(existing_events).await?; - println!("\n=== Corporate full update completed successfully ==="); + //save_changes(&all_changes).await?; + //println!("Corporate update complete — {} changes detected", all_changes.len()); Ok(()) } -/// Companies mit LEIs anreichern -async fn _enrich_companies_with_leis( - companies: &mut Vec, - lei_to_isins: &HashMap>, +async fn assign_leis_from_figi( + companies: &mut [CompanyMetadata], + lei_to_isins: &HashMap> ) -> anyhow::Result<()> { - for company in companies.iter_mut() { - if !company.lei.is_empty() { - continue; - } - + for company in companies { let figi_infos = company.figi.as_ref().map_or(&[][..], |v| &v[..]); - let isins: Vec = figi_infos.iter().map(|f| f.isin.clone()).collect::>().into_iter().collect(); + let isins: Vec = figi_infos + .iter() + .map(|f| f.isin.clone()) + .collect::>() + .into_iter() + .collect(); // Try to find LEI by any known ISIN for isin in &isins { @@ -228,7 +123,7 @@ async fn _enrich_companies_with_leis( } } Ok(()) -} +} pub struct ProcessResult { pub changes: Vec, diff --git a/src/economic/scraper.rs b/src/economic/scraper.rs index e21b064..cac7a40 100644 --- a/src/economic/scraper.rs +++ b/src/economic/scraper.rs @@ -8,7 +8,7 @@ const EXTRACTION_JS: &str = include_str!("extraction_script.js"); pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> { client.goto("https://www.finanzen.net/termine/wirtschaftsdaten/").await?; - dismiss_overlays(client).await?; + //dismiss_overlays(client).await?; if let Ok(tab) = client.find(fantoccini::Locator::Css(r#"div[data-sg-tab-item="teletrader-dates-three-stars"]"#)).await { tab.click().await?; @@ -18,7 +18,7 @@ pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> { Ok(()) } -pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> { +/*pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> { for _ in 0..10 { let removed: bool = client .execute( @@ -39,7 +39,7 @@ pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> { sleep(Duration::from_millis(500)).await; } Ok(()) -} +}*/ pub async fn set_date_range(client: &Client, start: &str, end: &str) -> anyhow::Result<()> { let script = format!( diff --git a/src/economic/update.rs b/src/economic/update.rs index bae5c6d..8ca6a3d 100644 --- a/src/economic/update.rs +++ b/src/economic/update.rs @@ -1,9 +1,19 @@ // src/economic/update.rs use super::{scraper::*, storage::*, helpers::*, types::*}; -use crate::config::Config; +use crate::{config::Config, scraper::webdriver::ScrapeTask}; +use crate::scraper::webdriver::ChromeDriverPool; use chrono::{Local, NaiveDate}; +use std::sync::Arc; -pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> anyhow::Result<()> { +/// Runs the full update for economic data, using the provided ChromeDriver pool. +/// +/// # Arguments +/// * `config` - The application configuration. +/// * `pool` - Shared pool of ChromeDriver instances for scraping. +/// +/// # Errors +/// Returns an error if scraping, loading, or saving fails. +pub async fn run_full_update(config: &Config, pool: &Arc) -> anyhow::Result<()> { let today_str = chrono::Local::now().date_naive().format("%Y-%m-%d").to_string(); let end_date = config.target_end_date(); @@ -26,36 +36,68 @@ pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> an println!("Scraping economic events: {} → {}", start_date, end_date); - let mut current = start_date; - let mut total_changes = 0; + // Pass the pool to the scraping function + let new_events_all = scrape_all_economic_events(&start_date, &end_date, pool).await?; - while current <= end_date { - set_date_range(client, ¤t, &end_date).await?; - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - - let new_events = extract_events(client).await?; - if new_events.is_empty() { break; } - - let result = process_batch(&new_events, &mut events, &today_str); - total_changes += result.changes.len(); - save_changes(&result.changes).await?; - - let next = new_events.iter() - .filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok()) - .max() - .and_then(|d| d.succ_opt()) - .map(|d| d.format("%Y-%m-%d").to_string()) - .unwrap_or(end_date.clone()); - - if next > end_date { break; } - current = next; - } + // Process all at once or in batches + let result = process_batch(&new_events_all, &mut events, &today_str); + let total_changes = result.changes.len(); + save_changes(&result.changes).await?; save_optimized_chunks(events).await?; println!("Economic update complete — {} changes detected", total_changes); Ok(()) } +/// Scrapes all economic events from start to end date using a dedicated ScrapeTask with the provided pool. +/// +/// This function creates a ScrapeTask to navigate to the Finanzen.net page, prepare it, +/// and then loop through date ranges to extract events. +/// +/// # Arguments +/// * `start` - Start date in YYYY-MM-DD. +/// * `end` - End date in YYYY-MM-DD. +/// * `pool` - Shared pool of ChromeDriver instances. +/// +/// # Returns +/// A vector of all extracted EconomicEvent structs. +/// +/// # Errors +/// Returns an error if task execution fails or extraction issues occur. +pub async fn scrape_all_economic_events(start: &str, end: &str, pool: &Arc) -> anyhow::Result> { + let url = "https://www.finanzen.net/termine/wirtschaftsdaten/".to_string(); + let start_clone = start.to_string(); + let end_clone = end.to_string(); + + let task = ScrapeTask::new(url, move |client| async move { + goto_and_prepare(&client).await?; + let mut all_events = Vec::new(); + let mut current = start_clone; + + while current <= end_clone { + set_date_range(&client, ¤t, &end_clone).await?; + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + let new_events = extract_events(&client).await?; + if new_events.is_empty() { break; } + all_events.extend(new_events.clone()); + + let next = new_events.iter() + .filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok()) + .max() + .and_then(|d| d.succ_opt()) + .map(|d| d.format("%Y-%m-%d").to_string()) + .unwrap_or(end_clone.clone()); + + if next > end_clone { break; } + current = next; + } + Ok(all_events) + }); + + // Use the pool for execution + task.execute_with_pool(pool).await +} + pub fn process_batch( new_events: &[EconomicEvent], existing: &mut std::collections::HashMap, diff --git a/src/main.rs b/src/main.rs index fcbdf45..f6b6d13 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,69 +3,41 @@ mod economic; mod corporate; mod config; mod util; +mod scraper; -use fantoccini::{ClientBuilder}; -use serde_json::{Map, Value}; -use tokio::signal; +use anyhow::Result; +use config::Config; +use scraper::webdriver::ChromeDriverPool; +use std::sync::Arc; +/// The entry point of the application. +/// +/// This function loads the configuration, initializes a shared ChromeDriver pool, +/// and sequentially runs the full updates for corporate and economic data. +/// Sequential execution helps prevent resource exhaustion from concurrent +/// chromedriver instances and avoids spamming the target websites with too many requests. +/// +/// # Errors +/// +/// Returns an error if configuration loading fails, pool initialization fails, +/// or if either update function encounters an issue (e.g., network errors, +/// scraping failures, or chromedriver spawn failures like "program not found"). #[tokio::main] -async fn main() -> anyhow::Result<()> { - // === Ensure data directories exist === - util::ensure_data_dirs().await?; +async fn main() -> Result<()> { + let config = Config::load().map_err(|err| { + println!("Failed to load Config .env: {}", err); + err + })?; - // === Load configuration === - let config = config::Config::default(); + // Initialize the shared ChromeDriver pool once + let pool_size = config.max_parallel_tasks; + let pool = Arc::new(ChromeDriverPool::new(pool_size).await?); - // === Start ChromeDriver === - let mut child = std::process::Command::new("chromedriver-win64/chromedriver.exe") - .args(["--port=9515"]) // Level 3 = minimal logs - .spawn()?; + // Run economic update first, passing the shared pool + economic::run_full_update(&config, &pool).await?; - // Build capabilities to hide infobar + enable full rendering - let port = 9515; - let caps_value = serde_json::json!({ - "goog:chromeOptions": { - "args": [ - //"--headless", - "--disable-gpu", - "--disable-notifications", - "--disable-popup-blocking", - "--disable-blink-features=AutomationControlled" - ], - "excludeSwitches": ["enable-automation"] - } - }); + // Then run corporate update, passing the shared pool + corporate::run_full_update(&config, &pool).await?; - let caps_map: Map = caps_value.as_object() - .expect("Capabilities should be a JSON object") - .clone(); - - let mut client = ClientBuilder::native() - .capabilities(caps_map) - .connect(&format!("http://localhost:{}", port)) - .await?; - - // Graceful shutdown - let client_clone = client.clone(); - tokio::spawn(async move { - signal::ctrl_c().await.unwrap(); - client_clone.close().await.ok(); - std::process::exit(0); - }); - - // === Economic Calendar Update === - println!("Updating Economic Calendar (High Impact Only)"); - economic::goto_and_prepare(&client).await?; - economic::run_full_update(&client, &config).await?; - - // === Corporate Earnings Update === - println!("\nUpdating Corporate Earnings"); - corporate::run_full_update(&client, &config).await?; - - // === Cleanup === - client.close().await?; - child.kill()?; - - println!("\nAll data updated successfully!"); Ok(()) } \ No newline at end of file diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs new file mode 100644 index 0000000..3bd0fd2 --- /dev/null +++ b/src/scraper/mod.rs @@ -0,0 +1 @@ +pub mod webdriver; diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index a31d3c0..f71bdd4 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -2,218 +2,268 @@ use anyhow::{anyhow, Context, Result}; use fantoccini::{Client, ClientBuilder}; -use std::process::{Stdio}; +use serde_json::{Map, Value}; +use std::process::Stdio; +use std::sync::Arc; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; -use tokio::time::{sleep, Duration}; +use tokio::sync::{Mutex, Semaphore}; +use tokio::time::{Duration, sleep, timeout}; use std::pin::Pin; +/// Manages a pool of ChromeDriver instances for parallel scraping. +/// +/// This struct maintains multiple ChromeDriver processes and allows controlled +/// concurrent access via a semaphore. Instances are reused across tasks to avoid +/// the overhead of spawning new processes. +pub struct ChromeDriverPool { + instances: Vec>>, + semaphore: Arc, +} + +impl ChromeDriverPool { + /// Creates a new pool with the specified number of ChromeDriver instances. + /// + /// # Arguments + /// * `pool_size` - Number of concurrent ChromeDriver instances to maintain + pub async fn new(pool_size: usize) -> Result { + let mut instances = Vec::with_capacity(pool_size); + + println!("Initializing ChromeDriver pool with {} instances...", pool_size); + + for i in 0..pool_size { + match ChromeInstance::new().await { + Ok(instance) => { + println!(" ✓ Instance {} ready", i + 1); + instances.push(Arc::new(Mutex::new(instance))); + } + Err(e) => { + eprintln!(" ✗ Failed to create instance {}: {}", i + 1, e); + // Clean up already created instances + drop(instances); + return Err(e); + } + } + } + + Ok(Self { + instances, + semaphore: Arc::new(Semaphore::new(pool_size)), + }) + } + + /// Executes a scrape task using an available instance from the pool. + pub async fn execute(&self, url: String, parse: F) -> Result + where + T: Send + 'static, + F: FnOnce(Client) -> Fut + Send + 'static, + Fut: std::future::Future> + Send + 'static, + { + // Acquire semaphore permit + let _permit = self.semaphore.acquire().await + .map_err(|_| anyhow!("Semaphore closed"))?; + + // Find an available instance (round-robin or first available) + let instance = self.instances[0].clone(); // Simple: use first, could be round-robin + let mut guard = instance.lock().await; + + // Create a new session for this task + let client = guard.new_session().await?; + + // Release lock while we do the actual scraping + drop(guard); + + // Navigate and parse + client.goto(&url).await.context("Failed to navigate")?; + let result = timeout(Duration::from_secs(60), parse(client)) + .await + .context("Parse function timed out after 60s")??; + + Ok(result) + } + + pub fn get_number_of_instances (&self) -> usize { + self.instances.len() + } +} + /// Represents a single instance of chromedriver process. -/// -/// This struct manages the lifecycle of a chromedriver process, starting it on a random available port -/// and providing a way to connect to it via fantoccini Client. Each instance is independent, allowing -/// for isolated scraping sessions without interference. -/// -/// # Examples -/// -/// ```no_run -/// use crate::scraper::webdriver::ChromeInstance; -/// -/// #[tokio::main] -/// async fn main() -> anyhow::Result<()> { -/// let instance = ChromeInstance::new().await?; -/// let client = instance.new_client().await?; -/// // Use client for scraping -/// client.close().await?; -/// Ok(()) -/// } -/// ``` pub struct ChromeInstance { process: Child, - url: String, + base_url: String, } impl ChromeInstance { - /// Creates a new ChromeInstance by spawning a chromedriver process on a random port. - /// - /// This function spawns chromedriver with `--port=0` to let it choose an available port, - /// reads the stdout to extract the listening URL, and returns the instance if successful. - /// +/// Creates a new ChromeInstance by spawning chromedriver with random port. + /// + /// This spawns `chromedriver --port=0` to avoid port conflicts, reads stdout to extract + /// the listening address, and waits for the success message. If timeout occurs or + /// spawning fails, returns an error with context. + /// /// # Errors - /// - /// Returns an error if: - /// - chromedriver cannot be spawned (e.g., not found in PATH). - /// - Failed to read stdout or parse the listening URL within a reasonable time. + /// + /// Returns an error if chromedriver fails to spawn (e.g., not in PATH, version mismatch), + /// if the process exits early, or if the address/success message isn't found within 30s. pub async fn new() -> Result { - let mut child = Command::new("chromedriver") - .arg("--port=0") + let mut command = Command::new("chromedriver-win64/chromedriver.exe"); + command + .arg("--port=0") // Use random available port to support pooling .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .context("Failed to spawn chromedriver process")?; + .stderr(Stdio::piped()); - let stdout = child.stdout.take().context("Failed to take stdout")?; - let mut reader = BufReader::new(stdout); - let mut line = String::new(); + let mut process = command + .spawn() + .context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?; + + let mut stdout = BufReader::new( + process.stdout.take().context("Failed to capture stdout")? + ).lines(); + + let mut stderr = BufReader::new( + process.stderr.take().context("Failed to capture stderr")? + ).lines(); let start_time = std::time::Instant::now(); - let timeout = Duration::from_secs(10); + let mut address: Option = None; + let mut success = false; - loop { - if start_time.elapsed() > timeout { - let _ = child.kill().await; - return Err(anyhow!("Timeout waiting for chromedriver to start")); + // Log stderr in background for debugging + tokio::spawn(async move { + while let Ok(Some(line)) = stderr.next_line().await { + eprintln!("ChromeDriver stderr: {}", line); } + }); - line.clear(); - if reader.read_line(&mut line).await.context("Failed to read line from stdout")? == 0 { - // EOF reached unexpectedly - let mut stderr_output = String::new(); - if let Some(mut stderr) = child.stderr.take() { - let mut stderr_reader = BufReader::new(&mut stderr); - let mut stderr_line = String::new(); - while stderr_reader.read_line(&mut stderr_line).await? > 0 { - stderr_output.push_str(&stderr_line); - stderr_line.clear(); - } + // Wait for address and success (up to 30s) + while start_time.elapsed() < Duration::from_secs(30) { + if let Ok(Ok(Some(line))) = + timeout(Duration::from_secs(1), stdout.next_line()).await + { + if let Some(addr) = parse_chromedriver_address(&line) { + address = Some(addr.to_string()); } - let _ = child.kill().await; - return Err(anyhow!("Chromedriver exited unexpectedly. Stderr: {}", stderr_output)); - } - if let Some(url) = Self::extract_url(&line) { - return Ok(Self { - process: child, - url, - }); + if line.contains("ChromeDriver was started successfully") { + success = true; + } + + if let (Some(addr), true) = (&address, success) { + return Ok(Self { + process, + base_url: addr.clone(), + }); + } } sleep(Duration::from_millis(100)).await; } + + // Cleanup on failure + let _ = process.kill().await; + Err(anyhow!("Timeout: ChromeDriver did not start within 30 seconds. Check version match with Chrome browser and system resources.")) } - /// Extracts the listening URL from chromedriver's output line. - /// - /// Looks for lines like "Starting ChromeDriver ... port=XXXX" or "Listening on 127.0.0.1:XXXX". - /// Returns the full URL like "http://127.0.0.1:XXXX" if found. - fn extract_url(line: &str) -> Option { - if line.contains("Listening on") || line.contains("port=") { - // Simple regex-like parsing; adjust based on actual output - let parts: Vec<&str> = line.split_whitespace().collect(); - for part in parts { - if part.starts_with("127.0.0.1:") || part.starts_with("localhost:") { - return Some(format!("http://{}", part)); - } else if part.starts_with("port=") { - let port = part.split('=').nth(1)?; + /// Creates a new browser session (client) from this ChromeDriver instance. + /// Each session is independent and can be closed without affecting the driver. + pub async fn new_session(&self) -> Result { + ClientBuilder::native() + .capabilities(Self::chrome_args()) + .connect(&self.base_url) + .await + .context("Failed to create new session") + } + + fn chrome_args() -> Map { + let args = serde_json::json!({ + "goog:chromeOptions": { + "args": [ + "--headless=new", + "--disable-gpu", + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-infobars", + "--disable-extensions", + "--disable-popup-blocking", + "--disable-notifications", + "--disable-logging", + "--disable-autofill", + "--disable-features=TranslateUI,OptimizationGuideModelDownloading", + "--window-size=1920,1080", + "--disable-blink-features=AutomationControlled", + "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + ], + "excludeSwitches": ["enable-logging", "enable-automation"], + "useAutomationExtension": false, + "prefs": { + "profile.default_content_setting_values.notifications": 2 + } + } + }); + args.as_object() + .expect("Capabilities should be a JSON object") + .clone() + } +} + +/// Parses the ChromeDriver address from a log line. +/// +/// Looks for the "Starting ChromeDriver ... on port XXXX" line and extracts the port. +/// Returns `Some("http://localhost:XXXX")` if found, else `None`. +fn parse_chromedriver_address(line: &str) -> Option { + if line.contains("Starting ChromeDriver") { + if let Some(port_str) = line.split("on port ").nth(1) { + if let Some(port) = port_str.split_whitespace().next() { + if port.parse::().is_ok() { return Some(format!("http://localhost:{}", port)); } } } - None } - - /// Creates a new fantoccini Client connected to this chromedriver instance. - /// - /// # Errors - /// - /// Returns an error if connection to the WebDriver URL fails. - pub async fn new_client(&self) -> Result { - ClientBuilder::rustls() - .connect(&self.url) - .await - .context("Failed to connect to chromedriver instance") + // Fallback for other formats (e.g., explicit port mentions) + for word in line.split_whitespace() { + if let Ok(port) = word.trim_matches(|c: char| !c.is_numeric()).parse::() { + if port > 1024 && port < 65535 && line.to_lowercase().contains("port") { + return Some(format!("http://localhost:{}", port)); + } + } } + None } impl Drop for ChromeInstance { fn drop(&mut self) { - // Attempt to kill the process synchronously; for async, caller should handle if needed - if let Ok(status) = self.process.try_wait() { - if status.is_none() { - let _ = self.process.start_kill(); - } - } + let _ = self.process.start_kill(); + std::thread::sleep(std::time::Duration::from_millis(100)); } } -/// Represents a scrape task that can be executed asynchronously. +/// Simplified task execution - now uses the pool pattern. /// -/// This struct encapsulates the URL to scrape and a parse function that processes the page -/// using the provided Client. The parse function is async and returns a user-defined type T. -/// -/// # Type Parameters -/// -/// * `T` - The type of data returned by the parse function. -/// -/// # Examples -/// -/// ```no_run -/// use crate::scraper::webdriver::ScrapeTask; -/// use fantoccini::Client; -/// use anyhow::Result; -/// use std::pin::Pin; -/// -/// async fn example_parse(_client: &Client) -> Result { -/// Ok("Parsed data".to_string()) -/// } -/// -/// #[tokio::main] -/// async fn main() -> Result<()> { -/// let task: ScrapeTask = ScrapeTask::new( -/// "https://example.com".to_string(), -/// |client| Box::pin(example_parse(client)), -/// ); -/// let result = task.execute().await?; -/// println!("{}", result); -/// Ok(()) -/// } -/// ``` +/// For backwards compatibility with existing code. pub struct ScrapeTask { url: String, - parse: Box Pin> + Send + 'static>> + Send + 'static>, + parse: Box Pin> + Send>> + Send>, } impl ScrapeTask { - /// Creates a new ScrapeTask with the given URL and parse function. - /// - /// The parse function takes a &Client and returns a future resolving to Result. - pub fn new( - url: String, - parse: impl FnOnce(&Client) -> Pin> + Send>> + Send + 'static, - ) -> Self { + pub fn new(url: String, parse: F) -> Self + where + F: FnOnce(Client) -> Fut + Send + 'static, + Fut: std::future::Future> + Send + 'static, + { Self { url, - parse: Box::new(parse), + parse: Box::new(move |client| Box::pin(parse(client))), } } - /// Executes the scrape task by starting a new ChromeInstance, connecting a client, - /// navigating to the URL, running the parse function, and cleaning up. - /// - /// This method ensures isolation by using a dedicated chromedriver instance per task. - /// - /// # Errors - /// - /// Returns an error if: - /// - Failed to start chromedriver instance. - /// - Failed to connect client or navigate to URL. - /// - Parse function returns an error. - /// - Failed to close the client or kill the process. - pub async fn execute(self) -> Result { - let instance = ChromeInstance::new().await.context("Failed to create ChromeInstance")?; - let client = instance.new_client().await.context("Failed to create client")?; - - client.goto(&self.url).await.context("Failed to navigate to URL")?; - - // Optional: Add common prep like rejecting cookies, waiting for elements, etc. - // This can be customized per task if needed. - - let result = (self.parse)(&client).await; - - client.close().await.context("Failed to close client")?; - - // Instance drops here, killing the process - - result + /// Executes using a provided pool (more efficient for multiple tasks). + pub async fn execute_with_pool(self, pool: &ChromeDriverPool) -> Result { + let url = self.url; + let parse = self.parse; + + pool.execute(url, move |client| async move { + (parse)(client).await + }).await } } \ No newline at end of file