diff --git a/src/corporate/collect_exchanges.rs b/src/corporate/collect_exchanges.rs new file mode 100644 index 0000000..d6290b8 --- /dev/null +++ b/src/corporate/collect_exchanges.rs @@ -0,0 +1,677 @@ +// src/corporate/collect_exchanges.rs +use crate::util::directories::DataPaths; +use crate::util::logger; +use crate::scraper::yahoo::ChartData; + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tokio::fs; +use tokio::io::AsyncWriteExt; + +/// Exchange information collected from company data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExchangeInfo { + #[serde(rename = "exchangeName")] + pub exchange_name: String, + pub currency: String, + #[serde(rename = "currencySymbol")] + pub currency_symbol: String, + #[serde(rename = "exchangeDataDelayedBy")] + pub exchange_data_delayed_by: i64, + #[serde(rename = "totalMarketCap")] + pub total_market_cap: u64, + #[serde(rename = "totalMarketCapUSD")] + pub total_market_cap_usd: f64, // NEW: Market cap converted to USD + pub companies: Vec, +} + +/// Extract exchange data from company core data +#[derive(Debug, Deserialize)] +struct CompanyCoreData { + modules: Option, +} + +#[derive(Debug, Deserialize)] +struct CoreModules { + price: Option, +} + +#[derive(Debug, Deserialize)] +struct PriceModule { + #[serde(rename = "exchangeName")] + exchange_name: Option, + currency: Option, + #[serde(rename = "currencySymbol")] + currency_symbol: Option, + exchange: Option, + #[serde(rename = "exchangeDataDelayedBy")] + exchange_data_delayed_by: Option, + #[serde(rename = "marketCap")] + market_cap: Option, +} + +#[derive(Debug, Deserialize)] +struct MarketCapData { + raw: Option, +} + +/// Normalize currency code and get conversion factor +/// Handles special cases like GBp (pence) and ZAc (cents) +fn normalize_currency(currency: &str) -> (&str, f64) { + match currency { + "GBp" => ("GBP", 100.0), // British Pence -> Pounds (divide by 100) + "ZAc" => ("ZAR", 100.0), // South African Cents -> Rand (divide by 100) + _ => (currency, 1.0), // No conversion needed + } +} + +/// FX rate cache for currency conversion +struct FxRateCache { + rates: HashMap, +} + +impl FxRateCache { + /// Create new FX rate cache by loading all currency charts + async fn new(paths: &DataPaths) -> anyhow::Result { + let mut rates = HashMap::new(); + + // USD to USD is always 1.0 + rates.insert("USD".to_string(), 1.0); + + let currency_dir = paths.data_dir().join("economic").join("currency"); + + if !currency_dir.exists() { + logger::log_warn(" FX rates directory not found - will use default rates").await; + return Ok(Self { rates }); + } + + let mut entries = fs::read_dir(¤cy_dir).await?; + let mut loaded_count = 0; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if !path.is_dir() { + continue; + } + + let currency_code = match path.file_name().and_then(|n| n.to_str()) { + Some(code) => code.to_string(), + None => continue, + }; + + let chart_path = path.join("chart").join("data.jsonl"); + + if !chart_path.exists() { + continue; + } + + // Load chart and get latest rate + match load_latest_fx_rate(&chart_path).await { + Ok(rate) => { + rates.insert(currency_code.clone(), rate); + loaded_count += 1; + } + Err(e) => { + logger::log_warn(&format!( + " Failed to load FX rate for {}: {}", + currency_code, e + )).await; + } + } + } + + logger::log_info(&format!(" ✓ Loaded {} FX rates", loaded_count)).await; + + Ok(Self { rates }) + } + + /// Convert amount from given currency to USD + fn to_usd(&self, amount: u64, currency: &str) -> f64 { + // Normalize currency and get conversion factor + // e.g., GBp -> (GBP, 100.0), ZAc -> (ZAR, 100.0) + let (normalized_currency, factor) = normalize_currency(currency); + + // First convert to base currency unit (e.g., pence to pounds) + let amount_in_base = amount as f64 / factor; + + if normalized_currency == "USD" { + return amount_in_base; + } + + // Get rate (USD per currency unit) + // For USD/EUR = 0.92, this means 1 USD = 0.92 EUR + // To convert EUR to USD: EUR_amount / 0.92 + match self.rates.get(normalized_currency) { + Some(&rate) if rate > 0.0 => { + amount_in_base / rate + } + _ => { + // Fallback: use approximate rates for common currencies + let fallback_rate = get_fallback_rate(normalized_currency); + amount_in_base / fallback_rate + } + } + } + + /// Get rate for a currency (USD per unit) + fn get_rate(&self, currency: &str) -> Option { + let (normalized_currency, _) = normalize_currency(currency); + self.rates.get(normalized_currency).copied() + } +} + +/// Load latest FX rate from chart data +async fn load_latest_fx_rate(chart_path: &std::path::Path) -> anyhow::Result { + let content = fs::read_to_string(chart_path).await?; + + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + + let chart: ChartData = serde_json::from_str(line)?; + + if chart.quotes.is_empty() { + return Err(anyhow::anyhow!("No quotes in chart data")); + } + + // Get most recent quote with a close price + let latest_rate = chart.quotes + .iter() + .rev() + .find_map(|q| q.close) + .ok_or_else(|| anyhow::anyhow!("No valid close prices"))?; + + return Ok(latest_rate); + } + + Err(anyhow::anyhow!("No data in chart file")) +} + +/// Fallback rates for common currencies (approximate, as of 2024) +/// These are USD per currency unit (same format as our FX data) +fn get_fallback_rate(currency: &str) -> f64 { + match currency { + "USD" => 1.0, + "EUR" => 0.92, // 1 USD = 0.92 EUR + "GBP" => 0.79, // 1 USD = 0.79 GBP + "JPY" => 150.0, // 1 USD = 150 JPY + "CNY" | "RMB" => 7.2, + "CHF" => 0.88, + "AUD" => 1.52, + "CAD" => 1.36, + "HKD" => 7.8, + "SGD" => 1.34, + "SEK" => 10.5, + "NOK" => 10.8, + "DKK" => 6.9, + "PLN" => 4.0, + "CZK" => 23.0, + "TRY" => 32.0, + "ZAR" => 18.5, + "ILS" => 3.7, + "RON" => 4.6, + "KWD" => 0.31, + "TWD" => 31.5, + "ISK" => 138.0, + "NZD" => 1.65, + "MXN" => 17.0, + "BRL" => 5.0, + "INR" => 83.0, + "KRW" => 1320.0, + "THB" => 35.0, + "MYR" => 4.6, + "IDR" => 15700.0, + "PHP" => 56.0, + "VND" => 24500.0, + _ => { + // Default: assume similar to USD + 1.0 + } + } +} + +/// Collect all exchanges from company directories and create yahoo_exchanges.json +/// +/// # Features +/// - Iterates through all company directories +/// - Extracts exchange data from core/data.jsonl +/// - Groups companies by exchange +/// - Sums up market caps for each exchange +/// - **NEW**: Converts all market caps to USD using FX rates +/// - Saves consolidated mapping to data/yahoo_exchanges.json +/// - Handles missing or invalid data gracefully +pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result { + logger::log_info("Collecting exchange information from company directories...").await; + + let corporate_dir = paths.corporate_dir(); + + if !corporate_dir.exists() { + logger::log_warn(" Corporate directory does not exist").await; + return Ok(0); + } + + // Load FX rates for currency conversion + logger::log_info("Loading FX rates for currency conversion...").await; + let fx_cache = FxRateCache::new(paths).await?; + + // Map of exchange code -> ExchangeInfo + let mut exchanges: HashMap = HashMap::new(); + + let mut entries = fs::read_dir(&corporate_dir).await?; + let mut processed_count = 0; + let mut skipped_count = 0; + + while let Some(entry) = entries.next_entry().await? { + let company_path = entry.path(); + + if !company_path.is_dir() { + continue; + } + + let company_name = match company_path.file_name().and_then(|n| n.to_str()) { + Some(name) => name.to_string(), + None => { + skipped_count += 1; + continue; + } + }; + + // Read core/data.jsonl + let core_data_path = company_path.join("core").join("data.jsonl"); + + if !core_data_path.exists() { + skipped_count += 1; + continue; + } + + // Parse core data + match extract_exchange_info(&core_data_path, &company_name).await { + Ok(Some((exchange_code, exchange_name, currency, currency_symbol, delay, market_cap))) => { + // Convert market cap to USD + let market_cap_usd = fx_cache.to_usd(market_cap, ¤cy); + + // Add or update exchange entry + exchanges + .entry(exchange_code.clone()) + .and_modify(|info| { + // Add company to existing exchange and sum market caps + info.companies.push(company_name.clone()); + info.total_market_cap = info.total_market_cap.saturating_add(market_cap); + info.total_market_cap_usd += market_cap_usd; + }) + .or_insert_with(|| { + // Create new exchange entry + ExchangeInfo { + exchange_name, + currency, + currency_symbol, + exchange_data_delayed_by: delay, + total_market_cap: market_cap, + total_market_cap_usd: market_cap_usd, + companies: vec![company_name.clone()], + } + }); + + processed_count += 1; + } + Ok(None) => { + // No exchange data found + skipped_count += 1; + } + Err(e) => { + logger::log_warn(&format!( + " Failed to parse exchange data for {}: {}", + company_name, e + )).await; + skipped_count += 1; + } + } + + // Progress logging every 100 companies + if (processed_count + skipped_count) % 100 == 0 { + logger::log_info(&format!( + " Progress: {} companies processed, {} skipped", + processed_count, skipped_count + )).await; + } + } + + logger::log_info(&format!( + " ✓ Collected data from {} companies ({} skipped)", + processed_count, skipped_count + )).await; + + logger::log_info(&format!( + " ✓ Found {} unique exchanges", + exchanges.len() + )).await; + + // Sort companies within each exchange for consistency + for exchange_info in exchanges.values_mut() { + exchange_info.companies.sort(); + } + + // Save to yahoo_exchanges.json + let output_path = paths.data_dir().join("yahoo_exchanges.json"); + save_exchanges_json(&output_path, &exchanges).await?; + + logger::log_info(&format!( + " ✓ Saved exchange mapping to {}", + output_path.display() + )).await; + + // Print summary statistics + print_exchange_statistics(&exchanges, &fx_cache).await; + + Ok(exchanges.len()) +} + +/// Extract exchange information from a company's core data file +async fn extract_exchange_info( + core_data_path: &std::path::Path, + company_name: &str, +) -> anyhow::Result> { + let content = fs::read_to_string(core_data_path).await?; + + // Parse JSONL - should be single line + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + + match serde_json::from_str::(line) { + Ok(data) => { + // Extract from modules.price + let price_module = match data.modules.and_then(|m| m.price) { + Some(p) => p, + None => return Ok(None), + }; + + // Extract required fields + let exchange = match price_module.exchange { + Some(e) if !e.is_empty() => e, + _ => return Ok(None), + }; + + // Filter out invalid placeholder exchange codes + if exchange == "CCC" { + return Ok(None); + } + + let exchange_name = price_module.exchange_name.unwrap_or_else(|| exchange.clone()); + let currency = price_module.currency.unwrap_or_else(|| "USD".to_string()); + let currency_symbol = price_module.currency_symbol.unwrap_or_else(|| "$".to_string()); + let delay = price_module.exchange_data_delayed_by.unwrap_or(0); + let market_cap = price_module + .market_cap + .and_then(|mc| mc.raw) + .unwrap_or(0); + + return Ok(Some(( + exchange, + exchange_name, + currency, + currency_symbol, + delay, + market_cap, + ))); + } + Err(e) => { + // Try to parse as generic JSON to check if exchange field exists in modules.price + if let Ok(json) = serde_json::from_str::(line) { + // Try to access modules.price.exchange + if let Some(price) = json.get("modules").and_then(|m| m.get("price")) { + if let Some(exchange) = price.get("exchange").and_then(|v| v.as_str()) { + if !exchange.is_empty() && exchange != "CCC" { + let exchange_name = price + .get("exchangeName") + .and_then(|v| v.as_str()) + .unwrap_or(exchange) + .to_string(); + + let currency = price + .get("currency") + .and_then(|v| v.as_str()) + .unwrap_or("USD") + .to_string(); + + let currency_symbol = price + .get("currencySymbol") + .and_then(|v| v.as_str()) + .unwrap_or("$") + .to_string(); + + let delay = price + .get("exchangeDataDelayedBy") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + + let market_cap = price + .get("marketCap") + .and_then(|mc| mc.get("raw")) + .and_then(|v| v.as_u64()) + .unwrap_or(0); + + return Ok(Some(( + exchange.to_string(), + exchange_name, + currency, + currency_symbol, + delay, + market_cap, + ))); + } + } + } + } + + return Err(anyhow::anyhow!( + "Failed to parse core data for {}: {}", + company_name, + e + )); + } + } + } + + Ok(None) +} + +/// Save exchanges map to JSON file with fsync +async fn save_exchanges_json( + path: &std::path::Path, + exchanges: &HashMap, +) -> anyhow::Result<()> { + // Create sorted output for consistency + let mut sorted_exchanges: Vec<_> = exchanges.iter().collect(); + sorted_exchanges.sort_by_key(|(code, _)| code.as_str()); + + let exchanges_map: HashMap = sorted_exchanges + .into_iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + // Serialize with pretty printing + let json_content = serde_json::to_string_pretty(&exchanges_map)?; + + // Write to temporary file first (atomic write pattern) + let tmp_path = path.with_extension("json.tmp"); + let mut file = fs::File::create(&tmp_path).await?; + file.write_all(json_content.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + file.sync_all().await?; + + // Atomic rename + fs::rename(&tmp_path, path).await?; + + Ok(()) +} + +/// Format market cap as a human-readable string +fn format_market_cap(market_cap: f64) -> String { + if market_cap >= 1_000_000_000_000.0 { + format!("{:.2}T", market_cap / 1_000_000_000_000.0) + } else if market_cap >= 1_000_000_000.0 { + format!("{:.2}B", market_cap / 1_000_000_000.0) + } else if market_cap >= 1_000_000.0 { + format!("{:.2}M", market_cap / 1_000_000.0) + } else if market_cap >= 1_000.0 { + format!("{:.2}K", market_cap / 1_000.0) + } else { + format!("{:.2}", market_cap) + } +} + +/// Print statistics about collected exchanges +async fn print_exchange_statistics(exchanges: &HashMap, fx_cache: &FxRateCache) { + logger::log_info("Exchange Statistics (sorted by USD market cap):").await; + + // Sort by total market cap in USD (descending) + let mut exchange_list: Vec<_> = exchanges.iter().collect(); + exchange_list.sort_by(|a, b| { + b.1.total_market_cap_usd + .partial_cmp(&a.1.total_market_cap_usd) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + // Print top 20 exchanges by total market cap (USD) + logger::log_info(" Top 20 exchanges by total market cap (USD):").await; + for (i, (code, info)) in exchange_list.iter().take(20).enumerate() { + let (normalized_currency, factor) = normalize_currency(&info.currency); + let fx_rate = fx_cache.get_rate(&info.currency); + + let fx_info = match fx_rate { + Some(rate) => { + if factor > 1.0 { + // Show conversion for pence/cents + format!(" (1 {} = {} {}, {} {} = 1 {})", + normalized_currency, + format!("{:.4}", rate), + "USD", + factor as i32, + info.currency, + normalized_currency) + } else { + format!(" (1 USD = {:.4} {})", rate, info.currency) + } + } + None => format!(" (using fallback rate for {})", info.currency), + }; + + logger::log_info(&format!( + " {}. {} ({}) - ${} USD ({}{} {}) - {} companies{}", + i + 1, + info.exchange_name, + code, + format_market_cap(info.total_market_cap_usd), + info.currency_symbol, + format_market_cap(info.total_market_cap as f64), + info.currency, + info.companies.len(), + if info.currency != "USD" { &fx_info } else { "" } + )).await; + } + + // Count by currency + let mut currency_counts: HashMap = HashMap::new(); + let mut currency_market_caps: HashMap = HashMap::new(); + for info in exchanges.values() { + *currency_counts.entry(info.currency.clone()).or_insert(0) += info.companies.len(); + *currency_market_caps.entry(info.currency.clone()).or_insert(0.0) += info.total_market_cap_usd; + } + + let mut currencies: Vec<_> = currency_counts.iter().collect(); + currencies.sort_by(|a, b| { + currency_market_caps.get(b.0) + .unwrap_or(&0.0) + .partial_cmp(currency_market_caps.get(a.0).unwrap_or(&0.0)) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + logger::log_info(" Market cap by currency (USD equivalent):").await; + for (currency, count) in currencies.iter().take(10) { + let market_cap_usd = currency_market_caps.get(*currency).unwrap_or(&0.0); + let (normalized_currency, factor) = normalize_currency(currency); + let fx_rate = fx_cache.get_rate(currency); + + let fx_info = match fx_rate { + Some(rate) => { + if factor > 1.0 { + format!(" (1 {} = {:.4} USD, {} {} = 1 {})", + normalized_currency, rate, factor as i32, currency, normalized_currency) + } else { + format!(" (1 USD = {:.4} {})", rate, currency) + } + } + None => format!(" (fallback)"), + }; + + logger::log_info(&format!( + " {}: {} companies, ${} USD{}", + currency, + count, + format_market_cap(*market_cap_usd), + if *currency != "USD" { &fx_info } else { "" } + )).await; + } + + // Delay statistics + let delayed_exchanges: Vec<_> = exchanges + .iter() + .filter(|(_, info)| info.exchange_data_delayed_by > 0) + .collect(); + + if !delayed_exchanges.is_empty() { + logger::log_info(&format!( + " Exchanges with data delay: {} (out of {})", + delayed_exchanges.len(), + exchanges.len() + )).await; + } + + // Total market cap across all exchanges (in USD) + let total_market_cap_usd: f64 = exchanges.values() + .map(|info| info.total_market_cap_usd) + .sum(); + + logger::log_info(&format!( + " Total market cap across all exchanges: ${} USD", + format_market_cap(total_market_cap_usd) + )).await; +} + +/// Get exchange information for a specific exchange code +pub async fn get_exchange_info( + paths: &DataPaths, + exchange_code: &str, +) -> anyhow::Result> { + let exchanges_path = paths.data_dir().join("yahoo_exchanges.json"); + + if !exchanges_path.exists() { + return Ok(None); + } + + let content = fs::read_to_string(&exchanges_path).await?; + let exchanges: HashMap = serde_json::from_str(&content)?; + + Ok(exchanges.get(exchange_code).cloned()) +} + +/// List all available exchanges +pub async fn list_all_exchanges(paths: &DataPaths) -> anyhow::Result> { + let exchanges_path = paths.data_dir().join("yahoo_exchanges.json"); + + if !exchanges_path.exists() { + return Ok(Vec::new()); + } + + let content = fs::read_to_string(&exchanges_path).await?; + let exchanges: HashMap = serde_json::from_str(&content)?; + + let mut exchange_list: Vec<_> = exchanges.into_iter().collect(); + exchange_list.sort_by(|a, b| a.0.cmp(&b.0)); + + Ok(exchange_list) +} \ No newline at end of file diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 2c770fd..21d16d0 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -17,4 +17,6 @@ pub mod update_companies_cleanse; pub mod update_companies_enrich; pub mod update_companies_enrich_options_chart; +pub mod collect_exchanges; + pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/scraper.rs b/src/corporate/scraper.rs index b34d681..156d2b9 100644 --- a/src/corporate/scraper.rs +++ b/src/corporate/scraper.rs @@ -1,179 +1,13 @@ // src/corporate/scraper.rs -use super::{types::*}; -//use crate::corporate::openfigi::OpenFigiClient; -use crate::{scraper::webdriver::*, util::directories::DataPaths, util::logger}; +use crate::{util::directories::DataPaths, util::logger}; use fantoccini::{Client}; use scraper::{Html, Selector}; -use chrono::{DateTime, Duration, NaiveDate, Utc}; -use tokio::{time::{Duration as TokioDuration, sleep}}; -use reqwest::Client as HttpClient; -use serde_json::{json, Value}; use zip::ZipArchive; use std::{collections::HashMap}; use std::io::{Read}; const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"; -fn parse_price(v: Option<&Value>) -> f64 { - v.and_then(|x| x.as_str()) - .and_then(|s| s.replace('$', "").replace(',', "").parse::().ok()) - .or_else(|| v.and_then(|x| x.as_f64())) - .unwrap_or(0.0) -} - -fn parse_volume(v: Option<&Value>) -> u64 { - v.and_then(|x| x.as_str()) - .and_then(|s| s.replace(',', "").parse::().ok()) - .or_else(|| v.and_then(|x| x.as_u64())) - .unwrap_or(0) -} - -pub async fn fetch_daily_price_history( - ticker: &str, - start_str: &str, - end_str: &str, -) -> anyhow::Result> { - let start = NaiveDate::parse_from_str(start_str, "%Y-%m-%d")?; - let end = NaiveDate::parse_from_str(end_str, "%Y-%m-%d")? + Duration::days(1); - - let mut all_prices = Vec::new(); - let mut current = start; - - while current < end { - let chunk_end = current + Duration::days(730); - let actual_end = chunk_end.min(end); - - let period1 = current.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp(); - let period2 = actual_end.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp(); - - println!(" Fetching {ticker} {} → {}", current, actual_end - Duration::days(1)); - - let url = format!( - "https://query1.finance.yahoo.com/v8/finance/chart/{ticker}?period1={period1}&period2={period2}&interval=1d&includeAdjustedClose=true" - ); - - let json: Value = HttpClient::new() - .get(&url) - .header("User-Agent", USER_AGENT) - .send() - .await? - .json() - .await?; - - let result = &json["chart"]["result"][0]; - let timestamps = result["timestamp"].as_array().ok_or_else(|| anyhow::anyhow!("No timestamps"))?; - let quote = &result["indicators"]["quote"][0]; - let meta = &result["meta"]; - let currency = meta["currency"].as_str().unwrap_or("USD").to_string(); - - let opens = quote["open"].as_array(); - let highs = quote["high"].as_array(); - let lows = quote["low"].as_array(); - let closes = quote["close"].as_array(); - let adj_closes = result["indicators"]["adjclose"][0]["adjclose"].as_array() - .or_else(|| closes); - let volumes = quote["volume"].as_array(); - - for (i, ts_val) in timestamps.iter().enumerate() { - let ts = ts_val.as_i64().unwrap_or(0); - let dt: DateTime = DateTime::from_timestamp(ts, 0).unwrap_or_default(); - let date_str = dt.format("%Y-%m-%d").to_string(); - - if date_str < start_str.to_string() || date_str > end_str.to_string() { - continue; - } - - let open = parse_price(opens.and_then(|a| a.get(i))); - let high = parse_price(highs.and_then(|a| a.get(i))); - let low = parse_price(lows.and_then(|a| a.get(i))); - let close = parse_price(closes.and_then(|a| a.get(i))); - let adj_close = parse_price(adj_closes.and_then(|a| a.get(i))); - let volume = parse_volume(volumes.and_then(|a| a.get(i))); - - all_prices.push(CompanyPrice { - ticker: ticker.to_string(), - date: date_str, - time: "".to_string(), - open, - high, - low, - close, - adj_close, - volume, - currency: currency.clone(), - }); - } - - sleep(TokioDuration::from_millis(200)).await; - current = actual_end; - } - - all_prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); - all_prices.dedup_by(|a, b| a.date == b.date && a.time == b.time); - - println!(" Got {} daily bars for {ticker}", all_prices.len()); - Ok(all_prices) -} - -pub async fn fetch_price_history_5min( - ticker: &str, - _start: &str, - _end: &str, -) -> anyhow::Result> { - let now = Utc::now().timestamp(); - let period1 = now - 5184000; - let period2 = now; - - let url = format!( - "https://query1.finance.yahoo.com/v8/finance/chart/{ticker}?period1={period1}&period2={period2}&interval=5m&includeAdjustedClose=true" - ); - - let json: Value = HttpClient::new() - .get(&url) - .header("User-Agent", USER_AGENT) - .send() - .await? - .json() - .await?; - - let result = &json["chart"]["result"][0]; - let timestamps = result["timestamp"].as_array().ok_or_else(|| anyhow::anyhow!("No timestamps"))?; - let quote = &result["indicators"]["quote"][0]; - let meta = &result["meta"]; - let currency = meta["currency"].as_str().unwrap_or("USD").to_string(); - - let mut prices = Vec::new(); - - for (i, ts_val) in timestamps.iter().enumerate() { - let ts = ts_val.as_i64().unwrap_or(0); - let dt: DateTime = DateTime::from_timestamp(ts, 0).unwrap_or_default(); - let date_str = dt.format("%Y-%m-%d").to_string(); - let time_str = dt.format("%H:%M:%S").to_string(); - - let open = parse_price(quote["open"].as_array().and_then(|a| a.get(i))); - let high = parse_price(quote["high"].as_array().and_then(|a| a.get(i))); - let low = parse_price(quote["low"].as_array().and_then(|a| a.get(i))); - let close = parse_price(quote["close"].as_array().and_then(|a| a.get(i))); - let volume = parse_volume(quote["volume"].as_array().and_then(|a| a.get(i))); - - prices.push(CompanyPrice { - ticker: ticker.to_string(), - date: date_str, - time: time_str, - open, - high, - low, - close, - adj_close: close, - volume, - currency: currency.clone(), - }); - } - - prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); - Ok(prices) -} - /// Fetch the URL of the latest ISIN↔LEI mapping CSV from GLEIF /// Overengineered; we could just use the static URL, but this shows how to scrape if needed pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow::Result { diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 0880789..a620d6a 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -5,6 +5,8 @@ use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data}; use crate::corporate::update_companies_enrich::enrich_companies_with_events; use crate::corporate::update_companies_enrich_options_chart::{enrich_companies_with_options, enrich_companies_with_chart}; +use crate::corporate::collect_exchanges::collect_and_save_exchanges; +use crate::economic::update_forex::collect_fx_rates; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; @@ -40,105 +42,105 @@ pub async fn run_full_update( logger::log_warn("Shutdown detected after GLEIF download").await; return Ok(()); } - - logger::log_info("Step 2: Loading OpenFIGI metadata...").await; - load_figi_type_lists().await.ok(); - logger::log_info(" ✓ OpenFIGI metadata loaded").await; - - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after OpenFIGI load").await; - return Ok(()); - } - - logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; - let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?; - if !all_mapped { - logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await; + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 2: Loading OpenFIGI metadata...").await; + load_figi_type_lists().await.ok(); + logger::log_info(" ✓ OpenFIGI metadata loaded").await; } else { - logger::log_info(" ✓ All LEIs successfully mapped").await; + logger::log_warn("Shutdown detected, skipping event index build").await; } - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after LEI-FIGI mapping").await; - return Ok(()); - } - - logger::log_info("Step 4: Building securities map (streaming)...").await; - let date_dir = find_most_recent_figi_date_dir(&paths).await?; - - if let Some(date_dir) = date_dir { - logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; - load_or_build_all_securities(&date_dir).await?; - logger::log_info(" ✓ Securities map updated").await; + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 2: Loading OpenFIGI metadata...").await; + load_figi_type_lists().await.ok(); + logger::log_info(" ✓ OpenFIGI metadata loaded").await; } else { - logger::log_warn(" ✗ No FIGI data directory found").await; + logger::log_warn("Shutdown detected, skipping event index build").await; } - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after securities map build").await; - return Ok(()); + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; + let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?; + + if !all_mapped { + logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await; + } else { + logger::log_info(" ✓ All LEIs successfully mapped").await; + } + } else { + logger::log_warn("Shutdown detected, skipping event index build").await; } - logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; - let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, config, &None).await?; - logger::log_info(&format!(" ✓ Saved {} companies", count)).await; - - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after companies.jsonl build").await; - return Ok(()); + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 4: Building securities map (streaming)...").await; + let date_dir = find_most_recent_figi_date_dir(&paths).await?; + + if let Some(date_dir) = date_dir { + logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; + load_or_build_all_securities(&date_dir).await?; + logger::log_info(" ✓ Securities map updated").await; + } else { + logger::log_warn(" ✗ No FIGI data directory found").await; + } + } else { + logger::log_warn("Shutdown detected, skipping event index build").await; } - logger::log_info("Step 6: Cleansing companies with missing essential data...").await; - let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?; - logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await; - - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after no-data cleansing").await; - return Ok(()); + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; + let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, config, &None).await?; + logger::log_info(&format!(" ✓ Saved {} companies", count)).await; + } else { + logger::log_warn("Shutdown detected, skipping event index build").await; + } + + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 6: Cleansing companies with missing essential data...").await; + let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?; + logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await; + } else { + logger::log_warn("Shutdown detected, skipping event index build").await; } - logger::log_info("Step 7: Cleansing companies with too low profile (with abort-safe persistence)...").await; let proxy_pool = pool.get_proxy_pool() - .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?; + .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?; logger::log_info("Creating YahooClientPool with proxy rotation...").await; let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?); logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await; - let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; - logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await; - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after low-profile cleansing").await; - return Ok(()); + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 7: Cleansing companies with too low profile (with abort-safe persistence)...").await; + let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await; + } else { + logger::log_warn("Shutdown detected, skipping event index build").await; } - logger::log_info("Step 8: Enriching companies with Yahoo Events (with abort-safe persistence)...").await; - let enriched_count = enrich_companies_with_events(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; - logger::log_info(&format!(" ✓ {} companies enriched with event data", enriched_count)).await; - - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after event enrichment").await; - return Ok(()); + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 8: Enriching companies with Yahoo Events (with abort-safe persistence)...").await; + let enriched_count = enrich_companies_with_events(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies enriched with event data", enriched_count)).await; + } else { + logger::log_warn("Shutdown detected, skipping event index build").await; } - logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await; - let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; - logger::log_info(&format!(" ✓ {} companies enriched with options data", options_count)).await; - - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after options enrichment").await; - return Ok(()); + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await; + let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies enriched with options data", options_count)).await; + } else { + logger::log_warn("Shutdown detected, skipping event index build").await; } - logger::log_info("Step 10: Enriching companies with Yahoo Chart (with abort-safe persistence)...").await; - let chart_count = enrich_companies_with_chart(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; - logger::log_info(&format!(" ✓ {} companies enriched with chart data", chart_count)).await; - - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after chart enrichment").await; - return Ok(()); + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 10: Enriching companies with Yahoo Chart (with abort-safe persistence)...").await; + let chart_count = enrich_companies_with_chart(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies enriched with chart data", chart_count)).await; + } else { + logger::log_warn("Shutdown detected, skipping event index build").await; } if !shutdown_flag.load(Ordering::SeqCst) { @@ -149,6 +151,28 @@ pub async fn run_full_update( logger::log_warn("Shutdown detected, skipping event index build").await; } + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 12: Collecting FX rates...").await; + + let proxy_pool = pool.get_proxy_pool() + .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must have proxy rotation"))?; + + let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?); + + let fx_count = collect_fx_rates(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ Collected {} FX rates", fx_count)).await; + } else { + logger::log_warn("Shutdown detected, skipping FX rates collection").await; + } + + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("Step 13: Collecting exchange information...").await; + let exchange_count = collect_and_save_exchanges(&paths).await?; + logger::log_info(&format!(" ✓ Collected {} exchanges", exchange_count)).await; + } else { + logger::log_warn("Shutdown detected, skipping exchange collection").await; + } + logger::log_info("✅ Corporate update complete").await; Ok(()) } diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index 7d6df77..d4b1c93 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -266,8 +266,36 @@ pub async fn companies_yahoo_cleansed_low_profile( existing_companies.len() )).await; + // === CONSOLIDATE LOG BEFORE EARLY EXIT === if pending.is_empty() { logger::log_info(" ✓ All companies already processed").await; + + // Consolidate log into checkpoint before exiting + if log_path.exists() { + let log_metadata = tokio::fs::metadata(&log_path).await.ok(); + if log_metadata.map(|m| m.len() > 0).unwrap_or(false) { + logger::log_info(" Consolidating update log into checkpoint...").await; + + let temp_checkpoint = checkpoint_path.with_extension("tmp"); + let mut temp_file = File::create(&temp_checkpoint).await?; + + for company in existing_companies.values() { + let json_line = serde_json::to_string(company)?; + temp_file.write_all(json_line.as_bytes()).await?; + temp_file.write_all(b"\n").await?; + } + + temp_file.flush().await?; + temp_file.sync_data().await?; + drop(temp_file); + + tokio::fs::rename(&temp_checkpoint, &checkpoint_path).await?; + tokio::fs::remove_file(&log_path).await.ok(); + + logger::log_info(&format!(" ✓ Consolidated {} companies", existing_companies.len())).await; + } + } + return Ok(existing_companies.len()); } @@ -575,6 +603,36 @@ pub async fn companies_yahoo_cleansed_low_profile( final_valid, final_filtered_low_cap, final_filtered_no_price, final_failed )).await; + // === VERIFY AND RECREATE FINAL OUTPUT === + logger::log_info("Verifying final output integrity...").await; + + let final_companies_map = existing_companies_writer.lock().await; + let expected_count = final_companies_map.len(); + + // Always write final consolidated checkpoint + let temp_checkpoint = checkpoint_path.with_extension("tmp"); + let mut temp_file = File::create(&temp_checkpoint).await?; + + for company in final_companies_map.values() { + let json_line = serde_json::to_string(company)?; + temp_file.write_all(json_line.as_bytes()).await?; + temp_file.write_all(b"\n").await?; + } + + temp_file.flush().await?; + temp_file.sync_data().await?; + drop(temp_file); + + tokio::fs::rename(&temp_checkpoint, &checkpoint_path).await?; + drop(final_companies_map); + + // Clear log since everything is in checkpoint + if log_path.exists() { + tokio::fs::remove_file(&log_path).await.ok(); + } + + logger::log_info(&format!("✓ Final output: {} companies in {:?}", expected_count, checkpoint_path)).await; + // Shutdown Yahoo pool yahoo_pool.shutdown().await?; @@ -706,7 +764,7 @@ async fn process_company_with_validation( // Validate market cap let market_cap = extract_market_cap(&summary); - if market_cap < 1_000_000.0 { + if market_cap < 100_000_000.0 { return CompanyProcessResult::FilteredLowCap { name: company.name.clone(), market_cap, diff --git a/src/corporate/update_companies_enrich_options_chart.rs b/src/corporate/update_companies_enrich_options_chart.rs index aaac8fe..355e9c6 100644 --- a/src/corporate/update_companies_enrich_options_chart.rs +++ b/src/corporate/update_companies_enrich_options_chart.rs @@ -6,7 +6,7 @@ use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool}; use std::result::Result::Ok; -use chrono::Utc; +use chrono::{TimeZone, Utc}; use std::collections::{HashSet}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -656,9 +656,12 @@ async fn enrich_company_with_chart( // Get 1 year of daily chart data let now = chrono::Utc::now().timestamp(); - let twenty_five_years_ago = now - (25 * 365 * 24 * 60 * 60); - - let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", twenty_five_years_ago, now).await?; + let start = chrono::Utc + .with_ymd_and_hms(2000, 1, 1, 0, 0, 0) + .unwrap() + .timestamp(); + + let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", start, now).await?; // Only save if we got meaningful data if chart_data.quotes.is_empty() { diff --git a/src/economic/mod.rs b/src/economic/mod.rs index 1cc7bf3..13a4bcb 100644 --- a/src/economic/mod.rs +++ b/src/economic/mod.rs @@ -2,7 +2,9 @@ pub mod types; pub mod scraper; pub mod storage; -pub mod update; pub mod helpers; +pub mod update; +pub mod update_forex; + pub use update::run_full_update; \ No newline at end of file diff --git a/src/economic/update_forex.rs b/src/economic/update_forex.rs new file mode 100644 index 0000000..54d746d --- /dev/null +++ b/src/economic/update_forex.rs @@ -0,0 +1,493 @@ +// src/forex/update_rates.rs +use crate::config::Config; +use crate::util::directories::DataPaths; +use crate::util::logger; +use crate::scraper::yahoo::{YahooClientPool, ChartData}; + +use std::result::Result::Ok; +use chrono::{TimeZone, Utc}; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use tokio::fs::{OpenOptions}; +use tokio::io::{AsyncWriteExt}; +use futures::stream::{FuturesUnordered, StreamExt}; +use serde_json::json; +use tokio::sync::mpsc; + +/// Currency information +#[derive(Debug, Clone)] +struct CurrencyPair { + code: String, // e.g., "EUR", "JPY" + name: String, // e.g., "Euro", "Japanese Yen" + yahoo_symbol: String, // e.g., "USDEUR=X", "USDJPY=X" +} + +impl CurrencyPair { + fn new(code: &str, name: &str) -> Self { + Self { + code: code.to_string(), + name: name.to_string(), + yahoo_symbol: format!("USD{}=X", code), + } + } +} + +/// Get list of currency pairs to fetch (USD as base currency) +fn get_currency_pairs() -> Vec { + vec![ + CurrencyPair::new("EUR", "Euro"), + CurrencyPair::new("TRY", "Turkish Lira"), + CurrencyPair::new("CHF", "Swiss Franc"), + CurrencyPair::new("SEK", "Swedish Krona"), + CurrencyPair::new("TWD", "New Taiwan Dollar"), + CurrencyPair::new("AUD", "Australian Dollar"), + CurrencyPair::new("GBP", "British Pound"), // Fixed: GBp -> GBP + CurrencyPair::new("NOK", "Norwegian Krone"), + CurrencyPair::new("CAD", "Canadian Dollar"), + CurrencyPair::new("CZK", "Czech Koruna"), + CurrencyPair::new("SGD", "Singapore Dollar"), + CurrencyPair::new("ISK", "Icelandic Króna"), + CurrencyPair::new("ZAR", "South African Rand"), // Fixed: ZAc -> ZAR + CurrencyPair::new("JPY", "Japanese Yen"), + CurrencyPair::new("PLN", "Polish Złoty"), + CurrencyPair::new("DKK", "Danish Krone"), + CurrencyPair::new("HKD", "Hong Kong Dollar"), + CurrencyPair::new("ILS", "Israeli Shekel"), // Fixed: ILA -> ILS + CurrencyPair::new("RON", "Romanian Leu"), + CurrencyPair::new("KWD", "Kuwaiti Dinar"), // Fixed: KWF -> KWD + ] +} + +/// Yahoo Collect Foreign Exchange Charts WITH ABORT-SAFE INCREMENTAL PERSISTENCE +/// +/// # Features +/// - Graceful shutdown (abort-safe) +/// - Task panic isolation (tasks fail independently) +/// - Crash-safe persistence (checkpoint + log with fsync) +/// - Smart skip logic (only process incomplete data) +/// - Uses pending queue instead of retry mechanism +/// +/// # Persistence Strategy +/// - Checkpoint: fx_rates_collected.jsonl (atomic state) +/// - Log: fx_rates_updates.log (append-only updates) +/// - On restart: Load checkpoint + replay log +/// - Periodic checkpoints (every 10 currencies) +/// - Batched fsync (every 5 writes or 10 seconds) +pub async fn collect_fx_rates( + paths: &DataPaths, + _config: &Config, + yahoo_pool: Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 10; + const FSYNC_BATCH_SIZE: usize = 5; + const FSYNC_INTERVAL_SECS: u64 = 10; + const CONCURRENCY_LIMIT: usize = 10; // Limit parallel fetch tasks + + let data_path = paths.data_dir(); + + // File paths + let checkpoint_path = data_path.join("fx_rates_collected.jsonl"); + let log_path = data_path.join("fx_rates_updates.log"); + let state_path = data_path.join("state.jsonl"); + + // Check if already completed (check state file) + if state_path.exists() { + let state_content = tokio::fs::read_to_string(&state_path).await?; + + for line in state_content.lines() { + if line.trim().is_empty() { + continue; + } + + if let Ok(state) = serde_json::from_str::(line) { + if state.get("fx_rates_collection_complete").and_then(|v| v.as_bool()).unwrap_or(false) { + logger::log_info(" FX rates collection already completed").await; + + // Count collected currencies + let count = count_collected_currencies(paths).await?; + logger::log_info(&format!(" ✓ Found {} currencies with chart data", count)).await; + return Ok(count); + } + } + } + } + + // === RECOVERY PHASE: Track collected currencies === + let mut collected_currencies: HashSet = HashSet::new(); + + if log_path.exists() { + logger::log_info("Loading FX rates collection progress from log...").await; + let log_content = tokio::fs::read_to_string(&log_path).await?; + + for line in log_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(entry) => { + if let Some(code) = entry.get("currency_code").and_then(|v| v.as_str()) { + if entry.get("status").and_then(|v| v.as_str()) == Some("collected") { + collected_currencies.insert(code.to_string()); + } + } + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; + } + } + } + logger::log_info(&format!("Loaded {} collected currencies from log", collected_currencies.len())).await; + } + + // Get all currency pairs + let currency_pairs = get_currency_pairs(); + let total_currencies = currency_pairs.len(); + logger::log_info(&format!("Found {} currency pairs to collect", total_currencies)).await; + + // Filter currencies that need collection + let pending_pairs: Vec = currency_pairs + .into_iter() + .filter(|pair| !collected_currencies.contains(&pair.code)) + .collect(); + + let pending_count = pending_pairs.len(); + logger::log_info(&format!( + " {} already collected, {} pending", + collected_currencies.len(), + pending_count + )).await; + + if pending_count == 0 { + logger::log_info(" ✓ All currencies already collected").await; + mark_collection_complete(&state_path).await?; + return Ok(collected_currencies.len()); + } + + // === PROCESSING PHASE: Collect FX rates === + + // Shared counters + let processed_count = Arc::new(AtomicUsize::new(collected_currencies.len())); + let success_count = Arc::new(AtomicUsize::new(collected_currencies.len())); + let failed_count = Arc::new(AtomicUsize::new(0)); + + // Log writer channel with batching and fsync + let (log_tx, mut log_rx) = mpsc::channel::(1000); + + // Spawn log writer task + let log_writer_handle = { + let log_path = log_path.clone(); + let processed_count = Arc::clone(&processed_count); + let total_currencies = total_currencies; + + tokio::spawn(async move { + let mut log_file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await + .expect("Failed to open log file"); + + let mut write_count = 0; + let mut last_fsync = tokio::time::Instant::now(); + + while let Some(cmd) = log_rx.recv().await { + match cmd { + LogCommand::Write(entry) => { + let json_line = serde_json::to_string(&entry).expect("Serialization failed"); + log_file.write_all(json_line.as_bytes()).await.expect("Write failed"); + log_file.write_all(b"\n").await.expect("Write failed"); + + write_count += 1; + + // Batched fsync + if write_count >= FSYNC_BATCH_SIZE + || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS + { + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + } + } + LogCommand::Checkpoint => { + // Force fsync on checkpoint + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + + let current = processed_count.load(Ordering::SeqCst); + logger::log_info(&format!( + " Checkpoint: {}/{} currencies processed", + current, total_currencies + )).await; + } + LogCommand::Shutdown => { + // Final fsync before shutdown + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + break; + } + } + } + }) + }; + + // Process currencies concurrently with task panic isolation + let mut tasks = FuturesUnordered::new(); + let mut pending_iter = pending_pairs.into_iter(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT)); + + // Initial batch of tasks + for _ in 0..CONCURRENCY_LIMIT.min(pending_count) { + if let Some(pair) = pending_iter.next() { + let task = spawn_collection_task( + pair, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + ); + tasks.push(task); + } + } + + // Process tasks as they complete and spawn new ones + let mut checkpoint_counter = 0; + while let Some(_result) = tasks.next().await { + // Check for shutdown + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown signal received, stopping FX collection").await; + break; + } + + // Spawn new task if more pending + if let Some(pair) = pending_iter.next() { + let task = spawn_collection_task( + pair, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + ); + tasks.push(task); + } + + // Periodic checkpoint + checkpoint_counter += 1; + if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { + let _ = log_tx.send(LogCommand::Checkpoint).await; + } + } + + // Signal shutdown to log writer + let _ = log_tx.send(LogCommand::Shutdown).await; + + // Wait for log writer to finish + let _ = log_writer_handle.await; + + // Final statistics + let final_success = success_count.load(Ordering::SeqCst); + let final_failed = failed_count.load(Ordering::SeqCst); + + logger::log_info(&format!( + " FX collection complete: {} succeeded, {} failed", + final_success, final_failed + )).await; + + // Mark as complete if not shutdown + if !shutdown_flag.load(Ordering::SeqCst) { + mark_collection_complete(&state_path).await?; + } + + Ok(final_success) +} + +/// Spawn a collection task with panic isolation +fn spawn_collection_task( + pair: CurrencyPair, + yahoo_pool: Arc, + paths: DataPaths, + processed_count: Arc, + success_count: Arc, + failed_count: Arc, + log_tx: mpsc::Sender, + semaphore: Arc, + shutdown_flag: Arc, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + // Acquire semaphore permit + let _permit = semaphore.acquire().await.expect("Semaphore closed"); + + // Check shutdown before processing + if shutdown_flag.load(Ordering::SeqCst) { + return; + } + + // Perform collection (panic-isolated) + let result = collect_currency_chart(&pair, &yahoo_pool, &paths).await; + + // Update counters + processed_count.fetch_add(1, Ordering::SeqCst); + + let status = match result { + Ok(_) => { + success_count.fetch_add(1, Ordering::SeqCst); + logger::log_info(&format!( + " ✓ Collected {} ({})", + pair.code, pair.name + )).await; + "collected" + } + Err(e) => { + failed_count.fetch_add(1, Ordering::SeqCst); + logger::log_warn(&format!( + " ✗ Failed to collect {} ({}): {}", + pair.code, pair.name, e + )).await; + "failed" + } + }; + + // Log result + let log_entry = json!({ + "currency_code": pair.code, + "currency_name": pair.name, + "yahoo_symbol": pair.yahoo_symbol, + "status": status, + "timestamp": Utc::now().to_rfc3339(), + }); + + let _ = log_tx.send(LogCommand::Write(log_entry)).await; + }) +} + +/// Collect chart data for a single currency pair +async fn collect_currency_chart( + pair: &CurrencyPair, + yahoo_pool: &Arc, + paths: &DataPaths, +) -> anyhow::Result<()> { + // Get historical data from year 2000 to now + let now = Utc::now().timestamp(); + let start_2000 = Utc + .with_ymd_and_hms(2000, 1, 1, 0, 0, 0) + .unwrap() + .timestamp(); + + // Fetch chart data from Yahoo + let chart_data = yahoo_pool.get_chart_data( + &pair.yahoo_symbol, + "1d", // Daily interval + start_2000, + now, + ).await?; + + // Validate we got data + if chart_data.quotes.is_empty() { + return Err(anyhow::anyhow!( + "No chart data available for {} ({})", + pair.code, + pair.yahoo_symbol + )); + } + + // Save chart data to currency directory + save_currency_chart(paths, &pair.code, &chart_data).await?; + + Ok(()) +} + +/// Save currency chart data to filesystem +async fn save_currency_chart( + paths: &DataPaths, + currency_code: &str, + chart_data: &ChartData, +) -> anyhow::Result<()> { + use tokio::fs; + + // Create directory structure: data/economic/currency/{code}/chart/ + let economic_dir = paths.data_dir().join("economic"); + let currency_dir = economic_dir.join("currency").join(currency_code); + let chart_dir = currency_dir.join("chart"); + + fs::create_dir_all(&chart_dir).await?; + + // Write chart data to data.jsonl + let data_path = chart_dir.join("data.jsonl"); + let json_line = serde_json::to_string(chart_data)?; + + let mut file = fs::File::create(&data_path).await?; + file.write_all(json_line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + file.sync_all().await?; // Ensure data is persisted + + Ok(()) +} + +/// Count collected currencies (currencies with chart data) +async fn count_collected_currencies(paths: &DataPaths) -> anyhow::Result { + let currency_dir = paths.data_dir().join("economic").join("currency"); + + if !currency_dir.exists() { + return Ok(0); + } + + let mut count = 0; + let mut entries = tokio::fs::read_dir(¤cy_dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + let chart_file = path.join("chart").join("data.jsonl"); + + if chart_file.exists() { + count += 1; + } + } + } + + Ok(count) +} + +/// Mark collection as complete in state file +async fn mark_collection_complete(state_path: &std::path::Path) -> anyhow::Result<()> { + let collection_complete = json!({ + "fx_rates_collection_complete": true, + "completed_at": Utc::now().to_rfc3339(), + }); + + let mut state_file = OpenOptions::new() + .create(true) + .append(true) + .open(state_path) + .await?; + + let state_line = serde_json::to_string(&collection_complete)?; + state_file.write_all(state_line.as_bytes()).await?; + state_file.write_all(b"\n").await?; + state_file.flush().await?; + state_file.sync_all().await?; + + Ok(()) +} + +/// Log command enum +enum LogCommand { + Write(serde_json::Value), + Checkpoint, + Shutdown, +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 4dee183..d97ba12 100644 --- a/src/main.rs +++ b/src/main.rs @@ -230,7 +230,7 @@ async fn main() -> Result<()> { // === Step 4: Run scraping jobs === logger::log_info("--- Starting ECONOMIC data update ---").await; - //economic::run_full_update(&config, &pool).await?; + economic::run_full_update(&config, &pool).await?; logger::log_info("Economic update completed").await; if !shutdown_flag.load(Ordering::SeqCst) { diff --git a/src/scraper/docker_vpn_proxy.rs b/src/scraper/docker_vpn_proxy.rs index d9b598e..81fd859 100644 --- a/src/scraper/docker_vpn_proxy.rs +++ b/src/scraper/docker_vpn_proxy.rs @@ -1,12 +1,13 @@ use anyhow::{anyhow, Context, Result}; use futures::future::join_all; -use std::{path::{Path, PathBuf}, time::Duration}; +use std::{collections::HashSet, path::{Path, PathBuf}, sync::{Arc, RwLock}, time::Duration}; use tokio::{process::Command, time::{sleep}}; use walkdir::WalkDir; pub struct DockerVpnProxyPool { container_names: Vec, proxy_ports: Vec, // e.g., [10801, 10802, ...] + dead_proxies: Arc>>, } impl DockerVpnProxyPool { @@ -187,6 +188,7 @@ impl DockerVpnProxyPool { Ok(Self { container_names: working_containers, proxy_ports: working_ports, + dead_proxies: Arc::new(RwLock::new(HashSet::new())), }) } @@ -397,6 +399,69 @@ impl DockerVpnProxyPool { pub fn get_container_name(&self, index: usize) -> Option { self.container_names.get(index).cloned() } + + // Get a healthy proxy URL (skips dead proxies) + pub async fn get_healthy_proxy_url(&self, start_index: usize) -> Option<(usize, String)> { + let dead = match self.dead_proxies.read() { + Ok(value) => value, + Err(_) => return None, + }; + let total = self.proxy_ports.len(); + + // Try up to 'total' proxies starting from start_index + for attempt in 0..total { + let index = (start_index + attempt) % total; + + // Skip if dead + if dead.contains(&index) { + continue; + } + + let port = self.proxy_ports[index]; + return Some((index, format!("socks5h://localhost:{}", port))); + } + + None + } + + // Mark a proxy as dead + pub async fn mark_proxy_dead(&self, index: usize) -> Option { + // Acquire lock, perform mutation, and get values for logging + let (port, remaining, total) = { + let mut dead = match self.dead_proxies.write() { + Ok(value) => value, + Err(_) => return None, + }; + dead.insert(index); + + let port = self.proxy_ports.get(index).copied().unwrap_or(0); + let remaining = self.proxy_ports.len() - dead.len(); + let total = self.proxy_ports.len(); + + // Lock is automatically dropped here when the scope ends + (port, remaining, total) + }; + + // Now we can await without holding the lock + crate::util::logger::log_warn(&format!( + "⚠ Marked proxy {} (port {}) as DEAD ({}/{} proxies remaining)", + index, + port, + remaining, + total + )).await; + + Some(true) + } + + // Get count of healthy proxies + pub async fn num_healthy_proxies(&self) -> Option { + let dead = match self.dead_proxies.read() { + Ok(value) => value, + Err(_) => return None, + }; + Some(self.proxy_ports.len() - dead.len()) + } } pub async fn cleanup_all_proxy_containers() -> Result<()> { diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index 04e1eda..0a78292 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -281,7 +281,7 @@ impl ChromeDriverPool { // Execute parse function match parse(client).await { Ok(data) => { - // ✅ SUCCESS: Record and log + // SUCCESS: Record and log let prev_count = self.hard_reset_controller.get_count(); self.hard_reset_controller.record_success(); @@ -296,7 +296,7 @@ impl ChromeDriverPool { Ok(data) } Err(e) => { - // ❌ PARSE ERROR: Record, check threshold, invalidate session + // PARSE ERROR: Record, check threshold, invalidate session let error_count = self.hard_reset_controller.record_error(); { diff --git a/src/scraper/yahoo.rs b/src/scraper/yahoo.rs index 8f7cf0f..4a95465 100644 --- a/src/scraper/yahoo.rs +++ b/src/scraper/yahoo.rs @@ -1210,15 +1210,14 @@ impl YahooClientPool { } /// Replace a marked client with a new one using next available proxy (round-robin) - async fn replace_marked_client(&self, marked_index: usize) -> Result<()> { - let mut clients_write = self.clients.write().await; + async fn replace_marked_client(&self, index: usize) -> Result<()> { + let clients_write = self.clients.write().await; - // Double-check that the client is still marked - if !clients_write[marked_index].is_marked_for_replacement() { - return Ok(()); + if index >= clients_write.len() { + return Err(anyhow!("Invalid client index: {}", index)); } - let old_client = &clients_write[marked_index]; + let old_client = &clients_write[index]; let old_client_id = old_client.get_client_id(); let old_proxy_url = old_client.get_proxy_url().to_string(); @@ -1227,59 +1226,81 @@ impl YahooClientPool { old_client_id, old_proxy_url )).await; - // Get next proxy with round-robin + drop(clients_write); // Release write lock during retry attempts + + // NEW: Try multiple proxies with health checking + const MAX_RETRY_ATTEMPTS: usize = 5; + let mut next_proxy = self.next_proxy_index.lock().await; - let proxy_index = *next_proxy % self.proxy_pool.num_proxies(); - *next_proxy += 1; - drop(next_proxy); + let start_proxy_index = *next_proxy; - let new_proxy_url = self.proxy_pool.get_proxy_url(proxy_index); - - // Create new client - match YahooClient::new( - old_client_id, // Keep same client ID - new_proxy_url.clone(), - self.max_tasks_per_client, - self.monitoring.clone(), - ).await { - Ok(new_client) => { - // Replace the client atomically - clients_write[marked_index] = Arc::new(new_client); - - logger::log_info(&format!( - "✓ Replaced YahooClient[{}] with new proxy: {}", - old_client_id, new_proxy_url - )).await; - - // Emit monitoring event - /*if let Some(ref mon) = self.monitoring { - mon.emit(crate::monitoring::MonitoringEvent::YahooClientReplaced { - client_id: old_client_id, - old_proxy: old_proxy_url, - new_proxy: new_proxy_url, - }); - }*/ - - Ok(()) - } - Err(e) => { - logger::log_error(&format!( - "✗ Failed to replace YahooClient[{}]: {}", - old_client_id, e - )).await; - - // Remove the broken client from the pool entirely - let mut clients = self.clients.write().await; - clients.retain(|c| c.client_id != old_client_id); - - logger::log_warn(&format!( - "⚠ Removed YahooClient[{}] from pool (now {} clients active)", - old_client_id, clients.len() - )).await; - - Err(e) + for attempt in 0..MAX_RETRY_ATTEMPTS { + let current_proxy_index = (start_proxy_index + attempt) % self.proxy_pool.num_proxies(); + + // NEW: Get healthy proxy URL (skips dead proxies) + let (proxy_index, new_proxy_url) = match self.proxy_pool.get_healthy_proxy_url(current_proxy_index).await { + Some(proxy_info) => proxy_info, + None => { + logger::log_error("❌ No healthy proxies available!").await; + drop(next_proxy); + return Err(anyhow!("All proxies are dead")); + } + }; + + logger::log_info(&format!( + " Attempt {}/{}: Trying proxy {} ({})", + attempt + 1, MAX_RETRY_ATTEMPTS, proxy_index, new_proxy_url + )).await; + + // Try to create new client + match YahooClient::new( + old_client_id, + new_proxy_url.clone(), + self.max_tasks_per_client, + self.monitoring.clone(), + ).await { + Ok(new_client) => { + // SUCCESS: Replace the client + let mut clients_write = self.clients.write().await; + clients_write[index] = Arc::new(new_client); + + // Update next_proxy_index for next replacement + *next_proxy = proxy_index + 1; + drop(next_proxy); + + logger::log_info(&format!( + "✓ Replaced YahooClient[{}] with proxy: {}", + old_client_id, new_proxy_url + )).await; + + return Ok(()); + } + Err(e) => { + logger::log_warn(&format!( + " ⚠ Proxy {} failed: {}", + proxy_index, e + )).await; + + // NEW: Mark this proxy as dead + self.proxy_pool.mark_proxy_dead(proxy_index).await; + + // Continue to next proxy + continue; + } } } + + drop(next_proxy); + + // FAILED after all attempts + logger::log_error(&format!( + "❌ Failed to replace YahooClient[{}] after {} attempts", + old_client_id, MAX_RETRY_ATTEMPTS + )).await; + + // NEW: Instead of removing client, keep it marked for replacement + // Background task will retry later + Err(anyhow!("Failed to replace client after {} attempts", MAX_RETRY_ATTEMPTS)) } /// Execute a task using an available client with round-robin selection @@ -1293,32 +1314,64 @@ impl YahooClientPool { // Acquire semaphore permit (limits concurrency) let _permit = self.semaphore.acquire().await?; - // Select client with round-robin + availability check - let (client, client_index) = 'select: loop { - let clients_read = self.clients.read().await; - let mut next_idx = self.next_client.lock().await; - let start_idx = *next_idx; - let total = clients_read.len(); - let mut checked = 0; + // NEW: Add timeout to client selection + const CLIENT_SELECTION_TIMEOUT_SECS: u64 = 30; + + let selection_future = async { + // Select client with round-robin + availability check + Ok('select: loop { + let clients_read = self.clients.read().await; + let mut next_idx = self.next_client.lock().await; + let start_idx = *next_idx; + let total = clients_read.len(); + let mut checked = 0; - // Try to find an available client - while checked < total { - let idx = *next_idx; - let client = &clients_read[idx]; + // Try to find an available client + while checked < total { + let idx = *next_idx; + let client = &clients_read[idx]; - // Move to next for next iteration - *next_idx = (*next_idx + 1) % total; - checked += 1; + // Move to next for next iteration + *next_idx = (*next_idx + 1) % total; + checked += 1; - // Skip marked clients - if !client.is_marked_for_replacement() && client.is_available().await { - break 'select (Arc::clone(client), idx); + // Skip marked clients + if !client.is_marked_for_replacement() && client.is_available().await { + break 'select (Arc::clone(client), idx); + } } - } - drop(next_idx); - drop(clients_read); - sleep(Duration::from_millis(100)).await; + // NEW: Check if all clients are unavailable + let all_marked = clients_read.iter().all(|c| c.is_marked_for_replacement()); + + drop(next_idx); + drop(clients_read); + + if all_marked { + // All clients marked - this is a critical error + return Err(anyhow!( + "All {} clients are marked for replacement - pool critically degraded", + total + )); + } + + sleep(Duration::from_millis(100)).await; + }) + }; + + // NEW: Wrap in timeout to prevent infinite loop + let (client, client_index) = match timeout( + Duration::from_secs(CLIENT_SELECTION_TIMEOUT_SECS), + selection_future + ).await { + Ok(Ok(result)) => result, + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(anyhow!( + "Timeout waiting for available client (>{}s) - pool is overloaded or degraded", + CLIENT_SELECTION_TIMEOUT_SECS + )); + } }; // Increment task count diff --git a/src/util/logger.rs b/src/util/logger.rs index 54f085c..122b34b 100644 --- a/src/util/logger.rs +++ b/src/util/logger.rs @@ -5,8 +5,6 @@ use tokio::sync::Mutex; use std::fs::{self, OpenOptions}; use std::io::Write; use std::path::PathBuf; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; static LOGGER: Lazy>> = Lazy::new(|| Mutex::new(None)); @@ -77,83 +75,4 @@ pub async fn log_warn(msg: &str) { pub async fn log_error(msg: &str) { log_detailed("ERROR", msg).await; -} - -struct PoolLogger { - file: std::fs::File, - log_path: PathBuf, -} - -impl PoolLogger { - fn new(log_dir: &std::path::Path) -> std::io::Result { - fs::create_dir_all(log_dir)?; - let filename = format!("webdriver_{}.log", Local::now().format("%Y%m%d_%H%M%S")); - let log_path = log_dir.join(&filename); - let file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path)?; - Ok(Self { file, log_path }) - } - - async fn log(&mut self, msg: &str) { - let line = format!("[{}] {}\n", Local::now().format("%H:%M:%S"), msg); - let _ = self.file.write_all(line.as_bytes()); - let _ = self.file.flush(); - println!("{}", line.trim_end()); - } -} - -pub struct PoolMetrics { - pub total_requests: Arc, - pub successful_requests: Arc, - pub failed_requests: Arc, - pub session_renewals: Arc, - pub rotation_events: Arc, - pub retries: Arc, - - pub navigation_timeouts: Arc, - pub bot_detection_hits: Arc, - pub proxy_failures: Arc, -} - -impl PoolMetrics { - pub fn new() -> Self { - Self { - total_requests: Arc::new(AtomicUsize::new(0)), - successful_requests: Arc::new(AtomicUsize::new(0)), - failed_requests: Arc::new(AtomicUsize::new(0)), - session_renewals: Arc::new(AtomicUsize::new(0)), - rotation_events: Arc::new(AtomicUsize::new(0)), - retries: Arc::new(AtomicUsize::new(0)), - navigation_timeouts: Arc::new(AtomicUsize::new(0)), - bot_detection_hits: Arc::new(AtomicUsize::new(0)), - proxy_failures: Arc::new(AtomicUsize::new(0)), - } - } - - pub async fn log_stats(&self) { - let total = self.total_requests.load(Ordering::Relaxed); - let success = self.successful_requests.load(Ordering::Relaxed); - // FIX: Prefix unused variable with underscore - let _failed = self.failed_requests.load(Ordering::Relaxed); - let renewals = self.session_renewals.load(Ordering::Relaxed); - let rotations = self.rotation_events.load(Ordering::Relaxed); - let retries = self.retries.load(Ordering::Relaxed); - let timeouts = self.navigation_timeouts.load(Ordering::Relaxed); - let bot_hits = self.bot_detection_hits.load(Ordering::Relaxed); - let proxy_fails = self.proxy_failures.load(Ordering::Relaxed); - - let success_rate = if total > 0 { - (success as f64 / total as f64) * 100.0 - } else { - 0.0 - }; - - crate::util::logger::log_info(&format!( - "Pool Metrics: {} total requests, {:.1}% success rate, {} renewals, {} rotations, {} retries, {} timeouts, {} bot detections, {} proxy failures", - total, success_rate, renewals, rotations, retries, timeouts, bot_hits, proxy_fails - )).await; - } -} - +} \ No newline at end of file