diff --git a/fx_rates.json b/fx_rates.json new file mode 100644 index 0000000..bfdd282 --- /dev/null +++ b/fx_rates.json @@ -0,0 +1,26 @@ +{ + "CAD": [ + 1.4110342881332016, + "2025-11-24" + ], + "GBp": [ + 0.7637668983426259, + "2025-11-24" + ], + "CNY": [ + 7.102272727272727, + "2025-11-24" + ], + "HKD": [ + 7.782101167315175, + "2025-11-24" + ], + "EUR": [ + 0.8681309141418526, + "2025-11-24" + ], + "JPY": [ + 0.0064, + "2025-11-24" + ] +} \ No newline at end of file diff --git a/src/corporate/aggregation.rs b/src/corporate/aggregation.rs index 0849a5d..67393c0 100644 --- a/src/corporate/aggregation.rs +++ b/src/corporate/aggregation.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; #[derive(Debug)] struct DayData { - sources: Vec, + sources: Vec<(CompanyPrice, String)>, // (price, source_ticker) total_volume: u64, vwap: f64, open: f64, @@ -15,6 +15,7 @@ struct DayData { close: f64, } +/// Aggregate price data from multiple exchanges, converting all to USD pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> { let company_dir = get_company_dir(isin); @@ -24,74 +25,103 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> { continue; } - let mut all_prices: Vec = Vec::new(); + let mut all_prices: Vec<(CompanyPrice, String)> = Vec::new(); let mut by_date_time: HashMap = HashMap::new(); - // Load all sources + // Load all sources with their ticker names let mut entries = tokio::fs::read_dir(&source_dir).await?; + let mut source_count = 0; + let mut sources_used = std::collections::HashSet::new(); + while let Some(entry) = entries.next_entry().await? { - let source_dir = entry.path(); - if !source_dir.is_dir() { continue; } - let prices_path = source_dir.join("prices.json"); + let source_dir_path = entry.path(); + if !source_dir_path.is_dir() { continue; } + + let source_ticker = source_dir_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + .to_string(); + + let prices_path = source_dir_path.join("prices.json"); if !prices_path.exists() { continue; } let content = tokio::fs::read_to_string(&prices_path).await?; let mut prices: Vec = serde_json::from_str(&content)?; - all_prices.append(&mut prices); + + if !prices.is_empty() { + sources_used.insert(source_ticker.clone()); + source_count += 1; + } + + for price in prices { + all_prices.push((price, source_ticker.clone())); + } } if all_prices.is_empty() { continue; } + println!(" Aggregating from {} exchanges: {}", + sources_used.len(), + sources_used.iter() + .map(|s| s.as_str()) + .collect::>() + .join(", ") + ); + // Group by date + time (for 5min) or just date - for p in all_prices { + for (p, source) in all_prices { let key = if timeframe == &"5min" && !p.time.is_empty() { format!("{}_{}", p.date, p.time) } else { p.date.clone() }; + // Convert to USD immediately + let usd_rate = super::fx::get_usd_rate(&p.currency).await.unwrap_or(1.0); + + let mut p_usd = p.clone(); + p_usd.open *= usd_rate; + p_usd.high *= usd_rate; + p_usd.low *= usd_rate; + p_usd.close *= usd_rate; + p_usd.adj_close *= usd_rate; + p_usd.currency = "USD".to_string(); + let entry = by_date_time.entry(key.clone()).or_insert(DayData { sources: vec![], total_volume: 0, vwap: 0.0, - open: p.open, - high: p.high, - low: p.low, - close: p.close, + open: p_usd.open, + high: p_usd.high, + low: p_usd.low, + close: p_usd.close, }); let volume = p.volume.max(1); // avoid div0 - let vwap_contrib = p.close * volume as f64; + let vwap_contrib = p_usd.close * volume as f64; - entry.sources.push(p.clone()); + entry.sources.push((p_usd.clone(), source)); entry.total_volume += volume; entry.vwap += vwap_contrib; // Use first open, last close, max high, min low if entry.sources.len() == 1 { - entry.open = p.open; + entry.open = p_usd.open; } - entry.close = p.close; - entry.high = entry.high.max(p.high); - entry.low = entry.low.min(p.low); + entry.close = p_usd.close; + entry.high = entry.high.max(p_usd.high); + entry.low = entry.low.min(p_usd.low); } - // Convert to USD + finalize + // Finalize aggregated data let mut aggregated: Vec = Vec::new(); - let by_date_time_len: usize = by_date_time.len(); for (key, data) in by_date_time { let vwap = data.vwap / data.total_volume as f64; - // Pick currency from highest volume source - let best_source = data.sources.iter() - .max_by_key(|p| p.volume) - .unwrap(); - - let usd_rate = super::fx::get_usd_rate(&best_source.currency).await.unwrap_or(1.0); - let (date, time) = if key.contains('_') { let parts: Vec<&str> = key.split('_').collect(); (parts[0].to_string(), parts[1].to_string()) @@ -99,17 +129,23 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> { (key, "".to_string()) }; + // Track which exchange contributed most volume + let best_source = data.sources.iter() + .max_by_key(|(p, _)| p.volume) + .map(|(_, src)| src.clone()) + .unwrap_or_else(|| "unknown".to_string()); + aggregated.push(CompanyPrice { - ticker: isin.to_string(), // use ISIN as unified ticker + ticker: format!("{}@agg", isin), // Mark as aggregated date, time, - open: data.open * usd_rate, - high: data.high * usd_rate, - low: data.low * usd_rate, - close: data.close * usd_rate, - adj_close: vwap * usd_rate, + open: data.open, + high: data.high, + low: data.low, + close: data.close, + adj_close: vwap, volume: data.total_volume, - currency: "USD".to_string(), // aggregated to USD + currency: "USD".to_string(), }); } @@ -120,8 +156,39 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> { fs::create_dir_all(&agg_dir).await?; let path = agg_dir.join("prices.json"); fs::write(&path, serde_json::to_string_pretty(&aggregated)?).await?; - println!(" Aggregated {} {} bars → {} sources", aggregated.len(), timeframe, by_date_time_len); + + // Save aggregation metadata + let meta = AggregationMetadata { + isin: isin.to_string(), + timeframe: timeframe.to_string(), + sources: sources_used.into_iter().collect(), + total_bars: aggregated.len(), + date_range: ( + aggregated.first().map(|p| p.date.clone()).unwrap_or_default(), + aggregated.last().map(|p| p.date.clone()).unwrap_or_default(), + ), + aggregated_at: chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), + }; + + let meta_path = agg_dir.join("metadata.json"); + fs::write(&meta_path, serde_json::to_string_pretty(&meta)?).await?; + + println!(" āœ“ {} {} bars from {} sources (USD)", + aggregated.len(), + timeframe, + source_count + ); } Ok(()) +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct AggregationMetadata { + isin: String, + timeframe: String, + sources: Vec, + total_bars: usize, + date_range: (String, String), + aggregated_at: String, } \ No newline at end of file diff --git a/src/corporate/scraper.rs b/src/corporate/scraper.rs index e904a31..686aef7 100644 --- a/src/corporate/scraper.rs +++ b/src/corporate/scraper.rs @@ -1,16 +1,173 @@ // src/corporate/scraper.rs -use super::types::{CompanyEvent, CompanyPrice}; +use super::types::{CompanyEvent, CompanyPrice, TickerInfo}; use fantoccini::{Client, Locator}; use scraper::{Html, Selector}; use chrono::{DateTime, Duration, NaiveDate, Timelike, Utc}; use tokio::time::{sleep, Duration as TokioDuration}; use reqwest::Client as HttpClient; use serde_json::Value; -//use yfinance_rs::{YfClient, Ticker, Range, Interval, HistoryBuilder}; -//use yfinance_rs::core::conversions::money_to_f64; const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"; +/// Discover all exchanges where this ISIN trades by querying Yahoo Finance +pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> anyhow::Result> { + println!(" Discovering exchanges for ISIN {}", isin); + + let mut discovered_tickers = Vec::new(); + + // Try the primary ticker first + if let Ok(info) = check_ticker_exists(known_ticker).await { + discovered_tickers.push(info); + } + + // Search for ISIN directly on Yahoo to find other listings + let search_url = format!( + "https://query2.finance.yahoo.com/v1/finance/search?q={}"esCount=20&newsCount=0", + isin + ); + + match HttpClient::new() + .get(&search_url) + .header("User-Agent", USER_AGENT) + .send() + .await + { + Ok(resp) => { + if let Ok(json) = resp.json::().await { + if let Some(quotes) = json["quotes"].as_array() { + for quote in quotes { + if let Some(symbol) = quote["symbol"].as_str() { + // Skip if already found + if discovered_tickers.iter().any(|t| t.ticker == symbol) { + continue; + } + + // Validate this ticker actually works + if let Ok(info) = check_ticker_exists(symbol).await { + discovered_tickers.push(info); + } + + sleep(TokioDuration::from_millis(100)).await; + } + } + } + } + } + Err(e) => println!(" Search API error: {}", e), + } + + // Also try common exchange suffixes for the base ticker + if let Some(base) = known_ticker.split('.').next() { + let suffixes = vec![ + "", // US + ".L", // London + ".DE", // Frankfurt/XETRA + ".PA", // Paris + ".AS", // Amsterdam + ".MI", // Milan + ".SW", // Switzerland + ".T", // Tokyo + ".HK", // Hong Kong + ".SS", // Shanghai + ".SZ", // Shenzhen + ".TO", // Toronto + ".AX", // Australia + ".SA", // Brazil + ".MC", // Madrid + ".BO", // Bombay + ".NS", // National Stock Exchange India + ]; + + for suffix in suffixes { + let test_ticker = format!("{}{}", base, suffix); + + // Skip if already found + if discovered_tickers.iter().any(|t| t.ticker == test_ticker) { + continue; + } + + if let Ok(info) = check_ticker_exists(&test_ticker).await { + discovered_tickers.push(info); + sleep(TokioDuration::from_millis(100)).await; + } + } + } + + println!(" Found {} tradable exchanges", discovered_tickers.len()); + Ok(discovered_tickers) +} + +/// Check if a ticker exists and get its exchange/currency info +async fn check_ticker_exists(ticker: &str) -> anyhow::Result { + let url = format!( + "https://query1.finance.yahoo.com/v8/finance/chart/{}?range=1d&interval=1d", + ticker + ); + + let json: Value = HttpClient::new() + .get(&url) + .header("User-Agent", USER_AGENT) + .timeout(std::time::Duration::from_secs(5)) + .send() + .await? + .json() + .await?; + + // Check if we got valid data + let result = &json["chart"]["result"]; + if result.is_null() || result.as_array().map(|a| a.is_empty()).unwrap_or(true) { + return Err(anyhow::anyhow!("No data for ticker {}", ticker)); + } + + let meta = &result[0]["meta"]; + + let exchange_name = meta["exchangeName"].as_str().unwrap_or("UNKNOWN"); + let exchange_mic = exchange_name_to_mic(exchange_name); + let currency = meta["currency"].as_str().unwrap_or("USD").to_string(); + + // Check if this ticker has actual price data + let has_data = meta["regularMarketPrice"].is_number() + || result[0]["timestamp"].as_array().map(|a| !a.is_empty()).unwrap_or(false); + + if !has_data { + return Err(anyhow::anyhow!("Ticker {} exists but has no price data", ticker)); + } + + Ok(TickerInfo { + ticker: ticker.to_string(), + exchange_mic, + currency, + primary: false, // Will be set separately + }) +} + +/// Convert Yahoo's exchange name to MIC code (best effort) +fn exchange_name_to_mic(name: &str) -> String { + match name { + "NMS" | "NasdaqGS" | "NASDAQ" => "XNAS", + "NYQ" | "NYSE" => "XNYS", + "LSE" | "London" => "XLON", + "FRA" | "Frankfurt" | "GER" | "XETRA" => "XFRA", + "PAR" | "Paris" => "XPAR", + "AMS" | "Amsterdam" => "XAMS", + "MIL" | "Milan" => "XMIL", + "JPX" | "Tokyo" => "XJPX", + "HKG" | "Hong Kong" => "XHKG", + "SHH" | "Shanghai" => "XSHG", + "SHZ" | "Shenzhen" => "XSHE", + "TOR" | "Toronto" => "XTSE", + "ASX" | "Australia" => "XASX", + "SAU" | "Saudi" => "XSAU", + "SWX" | "Switzerland" => "XSWX", + "BSE" | "Bombay" => "XBSE", + "NSE" | "NSI" => "XNSE", + "TAI" | "Taiwan" => "XTAI", + "SAO" | "Sao Paulo" => "BVMF", + "MCE" | "Madrid" => "XMAD", + _ => name, // Fallback to name itself + }.to_string() +} + pub async fn dismiss_yahoo_consent(client: &Client) -> anyhow::Result<()> { let script = r#" (() => { @@ -34,14 +191,10 @@ pub async fn dismiss_yahoo_consent(client: &Client) -> anyhow::Result<()> { } pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Result> { - // Navigate to Yahoo Earnings Calendar for the ticker - // offset=0&size=100 to get up to 100 entries - // offset up to 99 loading older entries if needed let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}&offset=0&size=100", ticker); client.goto(&url).await?; dismiss_yahoo_consent(client).await?; - // Load all by clicking "Show More" if present (unchanged) loop { match client.find(Locator::XPath(r#"//button[contains(text(), 'Show More')]"#)).await { Ok(btn) => { @@ -61,9 +214,9 @@ pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Re let cols: Vec = row.select(&Selector::parse("td").unwrap()) .map(|td| td.text().collect::>().join(" ").trim().to_string()) .collect(); - if cols.len() < 6 { continue; } // Updated to match current 6-column structure + if cols.len() < 6 { continue; } - let full_date = &cols[2]; // Now Earnings Date + let full_date = &cols[2]; let parts: Vec<&str> = full_date.split(" at ").collect(); let raw_date = parts[0].trim(); let time_str = if parts.len() > 1 { parts[1].trim() } else { "" }; @@ -73,8 +226,8 @@ pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Re Err(_) => continue, }; - let eps_forecast = parse_float(&cols[3]); // EPS Estimate - let eps_actual = if cols[4] == "-" { None } else { parse_float(&cols[4]) }; // Reported EPS + let eps_forecast = parse_float(&cols[3]); + let eps_actual = if cols[4] == "-" { None } else { parse_float(&cols[4]) }; let surprise_pct = if let (Some(f), Some(a)) = (eps_forecast, eps_actual) { if f.abs() > 0.001 { Some((a - f) / f.abs() * 100.0) } else { None } @@ -105,7 +258,6 @@ pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Re Ok(events) } -// Helper: Yahoo returns prices as strings like "$123.45" or null fn parse_price(v: Option<&Value>) -> f64 { v.and_then(|x| x.as_str()) .and_then(|s| s.replace('$', "").replace(',', "").parse::().ok()) @@ -126,13 +278,13 @@ pub async fn fetch_daily_price_history( end_str: &str, ) -> anyhow::Result> { let start = NaiveDate::parse_from_str(start_str, "%Y-%m-%d")?; - let end = NaiveDate::parse_from_str(end_str, "%Y-%m-%d")? + Duration::days(1); // inclusive + let end = NaiveDate::parse_from_str(end_str, "%Y-%m-%d")? + Duration::days(1); let mut all_prices = Vec::new(); let mut current = start; while current < end { - let chunk_end = current + Duration::days(730); // 2-year chunks = safe + let chunk_end = current + Duration::days(730); let actual_end = chunk_end.min(end); let period1 = current.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp(); @@ -146,7 +298,7 @@ pub async fn fetch_daily_price_history( let json: Value = HttpClient::new() .get(&url) - .header("User-Agent", "Mozilla/5.0") + .header("User-Agent", USER_AGENT) .send() .await? .json() @@ -155,12 +307,15 @@ pub async fn fetch_daily_price_history( let result = &json["chart"]["result"][0]; let timestamps = result["timestamp"].as_array().ok_or_else(|| anyhow::anyhow!("No timestamps"))?; let quote = &result["indicators"]["quote"][0]; + let meta = &result["meta"]; + let currency = meta["currency"].as_str().unwrap_or("USD").to_string(); let opens = quote["open"].as_array(); let highs = quote["high"].as_array(); let lows = quote["low"].as_array(); let closes = quote["close"].as_array(); - let adj_closes = result["meta"]["adjClose"].as_array().or_else(|| quote["close"].as_array()); // fallback + let adj_closes = result["indicators"]["adjclose"][0]["adjclose"].as_array() + .or_else(|| closes); let volumes = quote["volume"].as_array(); for (i, ts_val) in timestamps.iter().enumerate() { @@ -182,14 +337,14 @@ pub async fn fetch_daily_price_history( all_prices.push(CompanyPrice { ticker: ticker.to_string(), date: date_str, - time: "".to_string(), // Empty for daily + time: "".to_string(), open, high, low, close, adj_close, volume, - currency: "USD".to_string(), // Assuming USD for now + currency: currency.clone(), }); } @@ -209,8 +364,8 @@ pub async fn fetch_price_history_5min( _start: &str, _end: &str, ) -> anyhow::Result> { -let now = Utc::now().timestamp(); - let period1 = now - 5184000; // 60 days ago + let now = Utc::now().timestamp(); + let period1 = now - 5184000; let period2 = now; let url = format!( @@ -219,7 +374,7 @@ let now = Utc::now().timestamp(); let json: Value = HttpClient::new() .get(&url) - .header("User-Agent", "Mozilla/5.0") + .header("User-Agent", USER_AGENT) .send() .await? .json() @@ -228,6 +383,8 @@ let now = Utc::now().timestamp(); let result = &json["chart"]["result"][0]; let timestamps = result["timestamp"].as_array().ok_or_else(|| anyhow::anyhow!("No timestamps"))?; let quote = &result["indicators"]["quote"][0]; + let meta = &result["meta"]; + let currency = meta["currency"].as_str().unwrap_or("USD").to_string(); let mut prices = Vec::new(); @@ -246,14 +403,14 @@ let now = Utc::now().timestamp(); prices.push(CompanyPrice { ticker: ticker.to_string(), date: date_str, - time: time_str, // Full time for 5min intraday + time: time_str, open, high, low, close, - adj_close: close, // intraday usually not adjusted + adj_close: close, volume, - currency: "USD".to_string(), // Assuming USD for now + currency: currency.clone(), }); } diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index 923cb44..5aa8aac 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -17,7 +17,7 @@ pub async fn load_existing_events() -> anyhow::Result = serde_json::from_str(&content)?; for event in events { @@ -33,7 +33,6 @@ pub async fn save_optimized_events(events: HashMap) -> any let dir = std::path::Path::new("corporate_events"); fs::create_dir_all(dir).await?; - // Delete old files let mut entries = fs::read_dir(dir).await?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); @@ -91,10 +90,7 @@ pub async fn save_prices_for_ticker(ticker: &str, timeframe: &str, mut prices: V let company_dir = base_dir.join(ticker.replace(".", "_")); let timeframe_dir = company_dir.join(timeframe); - // Ensure directories exist fs::create_dir_all(&timeframe_dir).await?; - - // File name includes timeframe (e.g., prices.json) let path = timeframe_dir.join("prices.json"); prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); @@ -178,6 +174,7 @@ pub async fn save_prices_by_source( Ok(()) } +/// Update available_exchanges.json with fetch results pub async fn update_available_exchange( isin: &str, ticker: &str, @@ -186,21 +183,64 @@ pub async fn update_available_exchange( has_5min: bool, ) -> anyhow::Result<()> { let mut exchanges = load_available_exchanges(isin).await?; - let today = chrono::Local::now().format("%Y-%m-%d").to_string(); if let Some(entry) = exchanges.iter_mut().find(|e| e.ticker == ticker) { - entry.has_daily |= has_daily; - entry.has_5min |= has_5min; - entry.last_successful_fetch = Some(today); + // Update existing entry + entry.record_success(has_daily, has_5min); } else { - exchanges.push(AvailableExchange { - exchange_mic: exchange_mic.to_string(), - ticker: ticker.to_string(), - has_daily, - has_5min, - last_successful_fetch: Some(today), - }); + // Create new entry - need to get currency from somewhere + // Try to infer from the ticker or use a default + let currency = infer_currency_from_ticker(ticker); + let mut new_entry = AvailableExchange::new( + ticker.to_string(), + exchange_mic.to_string(), + currency, + ); + new_entry.record_success(has_daily, has_5min); + exchanges.push(new_entry); } save_available_exchanges(isin, exchanges).await +} + +/// Add a newly discovered exchange before fetching +pub async fn add_discovered_exchange( + isin: &str, + ticker_info: &TickerInfo, +) -> anyhow::Result<()> { + let mut exchanges = load_available_exchanges(isin).await?; + + // Only add if not already present + if !exchanges.iter().any(|e| e.ticker == ticker_info.ticker) { + let new_entry = AvailableExchange::new( + ticker_info.ticker.clone(), + ticker_info.exchange_mic.clone(), + ticker_info.currency.clone(), + ); + exchanges.push(new_entry); + save_available_exchanges(isin, exchanges).await?; + } + + Ok(()) +} + +/// Infer currency from ticker suffix +fn infer_currency_from_ticker(ticker: &str) -> String { + if ticker.ends_with(".L") { return "GBP".to_string(); } + if ticker.ends_with(".PA") { return "EUR".to_string(); } + if ticker.ends_with(".DE") { return "EUR".to_string(); } + if ticker.ends_with(".AS") { return "EUR".to_string(); } + if ticker.ends_with(".MI") { return "EUR".to_string(); } + if ticker.ends_with(".SW") { return "CHF".to_string(); } + if ticker.ends_with(".T") { return "JPY".to_string(); } + if ticker.ends_with(".HK") { return "HKD".to_string(); } + if ticker.ends_with(".SS") { return "CNY".to_string(); } + if ticker.ends_with(".SZ") { return "CNY".to_string(); } + if ticker.ends_with(".TO") { return "CAD".to_string(); } + if ticker.ends_with(".AX") { return "AUD".to_string(); } + if ticker.ends_with(".SA") { return "BRL".to_string(); } + if ticker.ends_with(".MC") { return "EUR".to_string(); } + if ticker.ends_with(".BO") || ticker.ends_with(".NS") { return "INR".to_string(); } + + "USD".to_string() // Default } \ No newline at end of file diff --git a/src/corporate/types.rs b/src/corporate/types.rs index 96ccc36..9d3c091 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -33,7 +33,7 @@ pub struct CompanyPrice { pub struct CompanyEventChange { pub ticker: String, pub date: String, - pub field_changed: String, // "time", "eps_forecast", "eps_actual", "new_event" + pub field_changed: String, // "time", "eps_forecast", "eps_actual", "new_event" pub old_value: String, pub new_value: String, pub detected_at: String, @@ -61,4 +61,34 @@ pub struct AvailableExchange { pub has_daily: bool, pub has_5min: bool, pub last_successful_fetch: Option, // YYYY-MM-DD + #[serde(default)] + pub currency: String, + #[serde(default)] + pub discovered_at: Option, // When this exchange was first discovered + #[serde(default)] + pub fetch_count: u32, // How many times successfully fetched +} + +impl AvailableExchange { + pub fn new(ticker: String, exchange_mic: String, currency: String) -> Self { + Self { + exchange_mic, + ticker, + has_daily: false, + has_5min: false, + last_successful_fetch: None, + currency, + discovered_at: Some(chrono::Local::now().format("%Y-%m-%d").to_string()), + fetch_count: 0, + } + } + + pub fn record_success(&mut self, has_daily: bool, has_5min: bool) { + let today = chrono::Local::now().format("%Y-%m-%d").to_string(); + + self.has_daily |= has_daily; + self.has_5min |= has_5min; + self.last_successful_fetch = Some(today); + self.fetch_count += 1; + } } \ No newline at end of file diff --git a/src/corporate/update.rs b/src/corporate/update.rs index a6efc75..df68de3 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,7 +1,6 @@ // src/corporate/update.rs use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::aggregate_best_price_data}; use crate::config::Config; -use yfinance_rs::{Range, Interval}; use chrono::Local; use std::collections::HashMap; @@ -20,26 +19,67 @@ pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> an ensure_company_dirs(&company.isin).await?; save_company_metadata(&company).await?; - for ticker_info in &company.tickers { + // === STEP 1: Discover all available exchanges === + let mut all_tickers = company.tickers.clone(); + + // Try to discover additional exchanges using the primary ticker + if let Some(primary_ticker) = company.tickers.iter().find(|t| t.primary) { + println!(" šŸ” Discovering additional exchanges..."); + match discover_available_exchanges(&company.isin, &primary_ticker.ticker).await { + Ok(discovered) => { + // Merge discovered tickers with existing ones + for disc in discovered { + if !all_tickers.iter().any(|t| t.ticker == disc.ticker) { + println!(" āœ“ Found new exchange: {} ({})", disc.ticker, disc.exchange_mic); + all_tickers.push(disc); + } + } + } + Err(e) => println!(" ⚠ Discovery failed: {}", e), + } + } + + // Update metadata with newly discovered tickers + if all_tickers.len() > company.tickers.len() { + let updated_company = CompanyMetadata { + isin: company.isin.clone(), + name: company.name.clone(), + tickers: all_tickers.clone(), + }; + save_company_metadata(&updated_company).await?; + println!(" šŸ“ Updated metadata with {} total tickers", all_tickers.len()); + } + + // === STEP 2: Fetch data from all available exchanges === + for ticker_info in &all_tickers { let ticker = &ticker_info.ticker; - println!(" → Trying ticker: {ticker} ({})", ticker_info.exchange_mic); + println!(" → Trying ticker: {} ({})", ticker, ticker_info.exchange_mic); let mut daily_success = false; let mut intraday_success = false; - // Earnings - if let Ok(new_events) = fetch_earnings_history(client, ticker).await { - let result = process_batch(&new_events, &mut existing_events, &today); - save_changes(&result.changes).await?; - println!(" {} earnings events", new_events.len()); + // Earnings (only from primary ticker to avoid duplicates) + if ticker_info.primary { + if let Ok(new_events) = fetch_earnings_history(client, ticker).await { + let result = process_batch(&new_events, &mut existing_events, &today); + save_changes(&result.changes).await?; + println!(" āœ“ {} earnings events", new_events.len()); + } } // Daily prices - if let Ok(prices) = fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await { - if !prices.is_empty() { - save_prices_by_source(&company.isin, ticker, "daily", prices).await?; - daily_success = true; + match fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await { + Ok(prices) => { + if !prices.is_empty() { + save_prices_by_source(&company.isin, ticker, "daily", prices.clone()).await?; + daily_success = true; + println!(" āœ“ Saved {} daily bars ({} currency)", + prices.len(), + prices.first().map(|p| p.currency.as_str()).unwrap_or("?") + ); + } } + Err(e) => println!(" āœ— Daily fetch failed: {}", e), } // 5-minute prices (last 60 days) @@ -47,34 +87,41 @@ pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> an .format("%Y-%m-%d") .to_string(); - if let Ok(prices) = fetch_price_history_5min(ticker, &sixty_days_ago, &today).await { - if !prices.is_empty() { - save_prices_by_source(&company.isin, ticker, "5min", prices.clone()).await?; - intraday_success = true; - println!(" Saved {} 5min bars from {ticker}", prices.len()); + match fetch_price_history_5min(ticker, &sixty_days_ago, &today).await { + Ok(prices) => { + if !prices.is_empty() { + save_prices_by_source(&company.isin, ticker, "5min", prices.clone()).await?; + intraday_success = true; + println!(" āœ“ Saved {} 5min bars", prices.len()); + } } + Err(e) => println!(" āœ— 5min fetch failed: {}", e), } - // Record success - update_available_exchange( - &company.isin, - ticker, - &ticker_info.exchange_mic, - daily_success, - intraday_success, - ).await?; - - aggregate_best_price_data(&company.isin).await?; + // Record success in available_exchanges.json + if daily_success || intraday_success { + update_available_exchange( + &company.isin, + ticker, + &ticker_info.exchange_mic, + daily_success, + intraday_success, + ).await?; + } tokio::time::sleep(tokio::time::Duration::from_millis(800)).await; } - // Optional: run aggregation after all sources - // aggregate_best_price_data(&company.isin).await?; + // === STEP 3: Aggregate prices from all sources === + println!(" šŸ“Š Aggregating multi-exchange data with FX conversion..."); + match aggregate_best_price_data(&company.isin).await { + Ok(_) => println!(" āœ“ Aggregation complete"), + Err(e) => println!(" ⚠ Aggregation warning: {}", e), + } } save_optimized_events(existing_events).await?; - println!("Corporate update complete (ISIN-based)"); + println!("\nāœ… Corporate update complete (ISIN-based)"); Ok(()) }