From 9cfcae84eae9eb6526204112fc97359032d13d62 Mon Sep 17 00:00:00 2001 From: donpat1to Date: Mon, 24 Nov 2025 17:19:36 +0100 Subject: [PATCH] added function aggregating multiple ticker data --- src/corporate/aggregation.rs | 127 +++++++++++++++++++++++++++++ src/corporate/fx.rs | 51 ++++++++++++ src/corporate/helpers.rs | 8 ++ src/corporate/mod.rs | 2 + src/corporate/scraper.rs | 17 ++-- src/corporate/storage.rs | 119 +++++++++++++++++++++++++-- src/corporate/types.rs | 26 ++++++ src/corporate/update.rs | 88 +++++++++++++------- src/data/companies.json | 41 ++++++++++ {data => src/data}/continents.json | 0 {data => src/data}/countries.json | 0 {data => src/data}/exchanges.json | 0 src/data/index.txt | 6 ++ src/main.rs | 2 +- 14 files changed, 443 insertions(+), 44 deletions(-) create mode 100644 src/corporate/aggregation.rs create mode 100644 src/corporate/fx.rs create mode 100644 src/data/companies.json rename {data => src/data}/continents.json (100%) rename {data => src/data}/countries.json (100%) rename {data => src/data}/exchanges.json (100%) create mode 100644 src/data/index.txt diff --git a/src/corporate/aggregation.rs b/src/corporate/aggregation.rs new file mode 100644 index 0000000..0849a5d --- /dev/null +++ b/src/corporate/aggregation.rs @@ -0,0 +1,127 @@ +// src/corporate/aggregation.rs +use super::types::CompanyPrice; +use super::storage::*; +use tokio::fs; +use std::collections::HashMap; + +#[derive(Debug)] +struct DayData { + sources: Vec, + total_volume: u64, + vwap: f64, + open: f64, + high: f64, + low: f64, + close: f64, +} + +pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> { + let company_dir = get_company_dir(isin); + + for timeframe in ["daily", "5min"].iter() { + let source_dir = company_dir.join(timeframe); + if !source_dir.exists() { + continue; + } + + let mut all_prices: Vec = Vec::new(); + let mut by_date_time: HashMap = HashMap::new(); + + // Load all sources + let mut entries = tokio::fs::read_dir(&source_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let source_dir = entry.path(); + if !source_dir.is_dir() { continue; } + let prices_path = source_dir.join("prices.json"); + if !prices_path.exists() { continue; } + + let content = tokio::fs::read_to_string(&prices_path).await?; + let mut prices: Vec = serde_json::from_str(&content)?; + all_prices.append(&mut prices); + } + + if all_prices.is_empty() { + continue; + } + + // Group by date + time (for 5min) or just date + for p in all_prices { + let key = if timeframe == &"5min" && !p.time.is_empty() { + format!("{}_{}", p.date, p.time) + } else { + p.date.clone() + }; + + let entry = by_date_time.entry(key.clone()).or_insert(DayData { + sources: vec![], + total_volume: 0, + vwap: 0.0, + open: p.open, + high: p.high, + low: p.low, + close: p.close, + }); + + let volume = p.volume.max(1); // avoid div0 + let vwap_contrib = p.close * volume as f64; + + entry.sources.push(p.clone()); + entry.total_volume += volume; + entry.vwap += vwap_contrib; + + // Use first open, last close, max high, min low + if entry.sources.len() == 1 { + entry.open = p.open; + } + entry.close = p.close; + entry.high = entry.high.max(p.high); + entry.low = entry.low.min(p.low); + } + + // Convert to USD + finalize + let mut aggregated: Vec = Vec::new(); + let by_date_time_len: usize = by_date_time.len(); + + for (key, data) in by_date_time { + let vwap = data.vwap / data.total_volume as f64; + + // Pick currency from highest volume source + let best_source = data.sources.iter() + .max_by_key(|p| p.volume) + .unwrap(); + + let usd_rate = super::fx::get_usd_rate(&best_source.currency).await.unwrap_or(1.0); + + let (date, time) = if key.contains('_') { + let parts: Vec<&str> = key.split('_').collect(); + (parts[0].to_string(), parts[1].to_string()) + } else { + (key, "".to_string()) + }; + + aggregated.push(CompanyPrice { + ticker: isin.to_string(), // use ISIN as unified ticker + date, + time, + open: data.open * usd_rate, + high: data.high * usd_rate, + low: data.low * usd_rate, + close: data.close * usd_rate, + adj_close: vwap * usd_rate, + volume: data.total_volume, + currency: "USD".to_string(), // aggregated to USD + }); + } + + aggregated.sort_by_key(|p| (p.date.clone(), p.time.clone())); + + // Save aggregated result + let agg_dir = company_dir.join("aggregated").join(timeframe); + fs::create_dir_all(&agg_dir).await?; + let path = agg_dir.join("prices.json"); + fs::write(&path, serde_json::to_string_pretty(&aggregated)?).await?; + println!(" Aggregated {} {} bars → {} sources", aggregated.len(), timeframe, by_date_time_len); + } + + Ok(()) +} \ No newline at end of file diff --git a/src/corporate/fx.rs b/src/corporate/fx.rs new file mode 100644 index 0000000..801ce7a --- /dev/null +++ b/src/corporate/fx.rs @@ -0,0 +1,51 @@ +// src/corporate/fx.rs +use std::collections::HashMap; +use reqwest; +use serde_json::Value; +use tokio::fs; +use std::path::Path; + +static FX_CACHE_PATH: &str = "fx_rates.json"; + +pub async fn get_usd_rate(currency: &str) -> anyhow::Result { + if currency == "USD" { + return Ok(1.0); + } + + let mut cache: HashMap = if Path::new(FX_CACHE_PATH).exists() { + let content = fs::read_to_string(FX_CACHE_PATH).await?; + serde_json::from_str(&content).unwrap_or_default() + } else { + HashMap::new() + }; + + let today = chrono::Local::now().format("%Y-%m-%d").to_string(); + if let Some((rate, date)) = cache.get(currency) { + if date == &today { + return Ok(*rate); + } + } + + let symbol = format!("{}USD=X", currency); + let url = format!("https://query1.finance.yahoo.com/v8/finance/chart/{}?range=1d&interval=1d", symbol); + + let json: Value = reqwest::Client::new() + .get(&url) + .header("User-Agent", "Mozilla/5.0") + .send() + .await? + .json() + .await?; + + let close = json["chart"]["result"][0]["meta"]["regularMarketPrice"] + .as_f64() + .or_else(|| json["chart"]["result"][0]["indicators"]["quote"][0]["close"][0].as_f64()) + .unwrap_or(1.0); + + let rate = if currency == "JPY" || currency == "KRW" { close } else { 1.0 / close }; // inverse pairs + + cache.insert(currency.to_string(), (rate, today.clone())); + let _ = fs::write(FX_CACHE_PATH, serde_json::to_string_pretty(&cache)?).await; + + Ok(rate) +} \ No newline at end of file diff --git a/src/corporate/helpers.rs b/src/corporate/helpers.rs index 9d9933c..fdde549 100644 --- a/src/corporate/helpers.rs +++ b/src/corporate/helpers.rs @@ -49,4 +49,12 @@ pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Ve // Add similar for revenue if applicable changes +} + +pub fn price_key(p: &CompanyPrice) -> String { + if p.time.is_empty() { + format!("{}|{}", p.ticker, p.date) + } else { + format!("{}|{}|{}", p.ticker, p.date, p.time) + } } \ No newline at end of file diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index b762e48..56726c3 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -4,6 +4,8 @@ pub mod scraper; pub mod storage; pub mod update; pub mod helpers; +pub mod aggregation; +pub mod fx; pub use types::*; pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/scraper.rs b/src/corporate/scraper.rs index afc60e7..e904a31 100644 --- a/src/corporate/scraper.rs +++ b/src/corporate/scraper.rs @@ -6,8 +6,8 @@ use chrono::{DateTime, Duration, NaiveDate, Timelike, Utc}; use tokio::time::{sleep, Duration as TokioDuration}; use reqwest::Client as HttpClient; use serde_json::Value; -use yfinance_rs::{YfClient, Ticker, Range, Interval, HistoryBuilder}; -use yfinance_rs::core::conversions::money_to_f64; +//use yfinance_rs::{YfClient, Ticker, Range, Interval, HistoryBuilder}; +//use yfinance_rs::core::conversions::money_to_f64; const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"; @@ -182,21 +182,23 @@ pub async fn fetch_daily_price_history( all_prices.push(CompanyPrice { ticker: ticker.to_string(), date: date_str, + time: "".to_string(), // Empty for daily open, high, low, close, adj_close, volume, + currency: "USD".to_string(), // Assuming USD for now }); } - sleep(TokioDuration::from_millis(200)); + sleep(TokioDuration::from_millis(200)).await; current = actual_end; } - all_prices.sort_by_key(|p| p.date.clone()); - all_prices.dedup_by_key(|p| p.date.clone()); + all_prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); + all_prices.dedup_by(|a, b| a.date == b.date && a.time == b.time); println!(" Got {} daily bars for {ticker}", all_prices.len()); Ok(all_prices) @@ -233,6 +235,7 @@ let now = Utc::now().timestamp(); let ts = ts_val.as_i64().unwrap_or(0); let dt: DateTime = DateTime::from_timestamp(ts, 0).unwrap_or_default(); let date_str = dt.format("%Y-%m-%d").to_string(); + let time_str = dt.format("%H:%M:%S").to_string(); let open = parse_price(quote["open"].as_array().and_then(|a| a.get(i))); let high = parse_price(quote["high"].as_array().and_then(|a| a.get(i))); @@ -243,16 +246,18 @@ let now = Utc::now().timestamp(); prices.push(CompanyPrice { ticker: ticker.to_string(), date: date_str, + time: time_str, // Full time for 5min intraday open, high, low, close, adj_close: close, // intraday usually not adjusted volume, + currency: "USD".to_string(), // Assuming USD for now }); } - prices.sort_by_key(|p| p.date.clone()); + prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); Ok(prices) } diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index d4cce33..923cb44 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -1,9 +1,9 @@ // src/corporate/storage.rs -use super::types::{CompanyEvent, CompanyPrice, CompanyEventChange}; -use super::helpers::*; +use super::{types::*, helpers::*}; use tokio::fs; use chrono::{Datelike, NaiveDate}; use std::collections::HashMap; +use std::path::{Path, PathBuf}; pub async fn load_existing_events() -> anyhow::Result> { let mut map = HashMap::new(); @@ -87,13 +87,120 @@ pub async fn save_changes(changes: &[CompanyEventChange]) -> anyhow::Result<()> } pub async fn save_prices_for_ticker(ticker: &str, timeframe: &str, mut prices: Vec) -> anyhow::Result<()> { - let dir = std::path::Path::new("corporate_prices"); - fs::create_dir_all(dir).await?; - let path = dir.join(format!("{}_{}.json", ticker.replace(".", "_"), timeframe)); + let base_dir = Path::new("corporate_prices"); + let company_dir = base_dir.join(ticker.replace(".", "_")); + let timeframe_dir = company_dir.join(timeframe); - prices.sort_by_key(|p| p.date.clone()); + // Ensure directories exist + fs::create_dir_all(&timeframe_dir).await?; + + // File name includes timeframe (e.g., prices.json) + let path = timeframe_dir.join("prices.json"); + + prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); let json = serde_json::to_string_pretty(&prices)?; fs::write(&path, json).await?; Ok(()) +} + +pub async fn load_companies() -> Result, anyhow::Error> { + let path = Path::new("src/data/companies.json"); + if !path.exists() { + println!("Missing companies.json file at src/data/companies.json"); + return Ok(vec![]); + } + let content = fs::read_to_string(path).await?; + let companies: Vec = serde_json::from_str(&content)?; + Ok(companies) +} + +pub fn get_company_dir(isin: &str) -> PathBuf { + PathBuf::from("corporate_prices").join(isin) +} + +pub async fn ensure_company_dirs(isin: &str) -> anyhow::Result<()> { + let base = get_company_dir(isin); + let paths = [ + base.clone(), + base.join("5min"), + base.join("daily"), + base.join("aggregated").join("5min"), + base.join("aggregated").join("daily"), + ]; + for p in paths { + fs::create_dir_all(&p).await?; + } + Ok(()) +} + +pub async fn save_company_metadata(company: &CompanyMetadata) -> anyhow::Result<()> { + let dir = get_company_dir(&company.isin); + fs::create_dir_all(&dir).await?; + let path = dir.join("metadata.json"); + fs::write(path, serde_json::to_string_pretty(company)?).await?; + Ok(()) +} + +pub async fn save_available_exchanges(isin: &str, exchanges: Vec) -> anyhow::Result<()> { + let dir = get_company_dir(isin); + fs::create_dir_all(&dir).await?; + let path = dir.join("available_exchanges.json"); + fs::write(&path, serde_json::to_string_pretty(&exchanges)?).await?; + Ok(()) +} + +pub async fn load_available_exchanges(isin: &str) -> anyhow::Result> { + let path = get_company_dir(isin).join("available_exchanges.json"); + if path.exists() { + let content = fs::read_to_string(&path).await?; + Ok(serde_json::from_str(&content)?) + } else { + Ok(vec![]) + } +} + +pub async fn save_prices_by_source( + isin: &str, + source_ticker: &str, + timeframe: &str, + prices: Vec, +) -> anyhow::Result<()> { + let source_safe = source_ticker.replace(".", "_").replace("/", "_"); + let dir = get_company_dir(isin).join(timeframe).join(&source_safe); + fs::create_dir_all(&dir).await?; + let path = dir.join("prices.json"); + + let mut prices = prices; + prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); + + fs::write(&path, serde_json::to_string_pretty(&prices)?).await?; + Ok(()) +} + +pub async fn update_available_exchange( + isin: &str, + ticker: &str, + exchange_mic: &str, + has_daily: bool, + has_5min: bool, +) -> anyhow::Result<()> { + let mut exchanges = load_available_exchanges(isin).await?; + let today = chrono::Local::now().format("%Y-%m-%d").to_string(); + + if let Some(entry) = exchanges.iter_mut().find(|e| e.ticker == ticker) { + entry.has_daily |= has_daily; + entry.has_5min |= has_5min; + entry.last_successful_fetch = Some(today); + } else { + exchanges.push(AvailableExchange { + exchange_mic: exchange_mic.to_string(), + ticker: ticker.to_string(), + has_daily, + has_5min, + last_successful_fetch: Some(today), + }); + } + + save_available_exchanges(isin, exchanges).await } \ No newline at end of file diff --git a/src/corporate/types.rs b/src/corporate/types.rs index 855dfb5..96ccc36 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -19,12 +19,14 @@ pub struct CompanyEvent { pub struct CompanyPrice { pub ticker: String, pub date: String, // YYYY-MM-DD + pub time: String, // HH:MM:SS for intraday, "" for daily pub open: f64, pub high: f64, pub low: f64, pub close: f64, pub adj_close: f64, pub volume: u64, + pub currency: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -35,4 +37,28 @@ pub struct CompanyEventChange { pub old_value: String, pub new_value: String, pub detected_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TickerInfo { + pub ticker: String, + pub exchange_mic: String, + pub currency: String, + pub primary: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompanyMetadata { + pub isin: String, + pub name: String, + pub tickers: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AvailableExchange { + pub exchange_mic: String, + pub ticker: String, + pub has_daily: bool, + pub has_5min: bool, + pub last_successful_fetch: Option, // YYYY-MM-DD } \ No newline at end of file diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 1505532..a6efc75 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,54 +1,80 @@ // src/corporate/update.rs -use super::{scraper::*, storage::*, helpers::*, types::*}; +use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::aggregate_best_price_data}; use crate::config::Config; use yfinance_rs::{Range, Interval}; use chrono::Local; use std::collections::HashMap; -pub async fn run_full_update(client: &fantoccini::Client, tickers: Vec, config: &Config) -> anyhow::Result<()> { - println!("Updating {} tickers (prices from {})", tickers.len(), config.corporate_start_date); +pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> anyhow::Result<()> { + println!("Starting company-centric corporate update (ISIN-based)"); + let companies = load_companies().await?; let today = chrono::Local::now().format("%Y-%m-%d").to_string(); - let mut existing = load_existing_events().await?; + let mut existing_events = load_existing_events().await?; - for ticker in &tickers { - print!(" → {:6} ", ticker); + for company in companies { + println!("\nProcessing company: {} ({})", company.name, company.isin); - if let Ok(new_events) = fetch_earnings_history(client, ticker).await { - let result = process_batch(&new_events, &mut existing, &today); - save_changes(&result.changes).await?; - println!("{} earnings, {} changes", new_events.len(), result.changes.len()); - } + ensure_company_dirs(&company.isin).await?; + save_company_metadata(&company).await?; - // DAILY – full history - if let Ok(prices) = fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await { - save_prices_for_ticker(ticker, "daily", prices).await?; - } + for ticker_info in &company.tickers { + let ticker = &ticker_info.ticker; + println!(" → Trying ticker: {ticker} ({})", ticker_info.exchange_mic); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + let mut daily_success = false; + let mut intraday_success = false; - // 5-MINUTE – only last 60 days (Yahoo limit for intraday) - let sixty_days_ago = (chrono::Local::now() - chrono::Duration::days(60)) - .format("%Y-%m-%d") - .to_string(); - - if let Ok(prices) = fetch_price_history_5min(ticker, &sixty_days_ago, &today).await { - if !prices.is_empty() { - save_prices_for_ticker(ticker, "5min", prices.clone()).await?; - println!(" Saved {} 5min bars for {ticker}", prices.len()); - } else { - println!(" No 5min data available for {ticker} (market closed? retry later)"); + // Earnings + if let Ok(new_events) = fetch_earnings_history(client, ticker).await { + let result = process_batch(&new_events, &mut existing_events, &today); + save_changes(&result.changes).await?; + println!(" {} earnings events", new_events.len()); } - } else { - println!(" 5min fetch failed for {ticker} (rate limit? try again)"); + + // Daily prices + if let Ok(prices) = fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await { + if !prices.is_empty() { + save_prices_by_source(&company.isin, ticker, "daily", prices).await?; + daily_success = true; + } + } + + // 5-minute prices (last 60 days) + let sixty_days_ago = (chrono::Local::now() - chrono::Duration::days(60)) + .format("%Y-%m-%d") + .to_string(); + + if let Ok(prices) = fetch_price_history_5min(ticker, &sixty_days_ago, &today).await { + if !prices.is_empty() { + save_prices_by_source(&company.isin, ticker, "5min", prices.clone()).await?; + intraday_success = true; + println!(" Saved {} 5min bars from {ticker}", prices.len()); + } + } + + // Record success + update_available_exchange( + &company.isin, + ticker, + &ticker_info.exchange_mic, + daily_success, + intraday_success, + ).await?; + + aggregate_best_price_data(&company.isin).await?; + + tokio::time::sleep(tokio::time::Duration::from_millis(800)).await; } - tokio::time::sleep(tokio::time::Duration::from_millis(250)).await; + // Optional: run aggregation after all sources + // aggregate_best_price_data(&company.isin).await?; } - save_optimized_events(existing).await?; + save_optimized_events(existing_events).await?; + println!("Corporate update complete (ISIN-based)"); Ok(()) } diff --git a/src/data/companies.json b/src/data/companies.json new file mode 100644 index 0000000..53b064f --- /dev/null +++ b/src/data/companies.json @@ -0,0 +1,41 @@ +[ + { + "isin": "US46625H1005", + "name": "JPMorgan Chase & Co.", + "tickers": [ + { "ticker": "JPM", "exchange_mic": "XNYS", "currency": "USD", "primary": true }, + { "ticker": "JPM-PC", "exchange_mic": "XNYS", "currency": "USD", "primary": false } + ] + }, + { + "isin": "US5949181045", + "name": "Microsoft Corporation", + "tickers": [ + { "ticker": "MSFT", "exchange_mic": "XNAS", "currency": "USD", "primary": true } + ] + }, + { + "isin": "CNE000001P37", + "name": "Industrial and Commercial Bank of China", + "tickers": [ + { "ticker": "601398.SS", "exchange_mic": "XSHG", "currency": "CNY", "primary": true }, + { "ticker": "1398.HK", "exchange_mic": "XHKG", "currency": "HKD", "primary": false } + ] + }, + { + "isin": "JP3702200000", + "name": "Toyota Motor Corporation", + "tickers": [ + { "ticker": "7203.T", "exchange_mic": "XJPX", "currency": "JPY", "primary": true }, + { "ticker": "TM", "exchange_mic": "XNYS", "currency": "USD", "primary": false } + ] + }, + { + "isin": "HK0000069689", + "name": "Tencent Holdings Limited", + "tickers": [ + { "ticker": "0700.HK", "exchange_mic": "XHKG", "currency": "HKD", "primary": true }, + { "ticker": "TCEHY", "exchange_mic": "OTCM", "currency": "USD", "primary": false } + ] + } +] \ No newline at end of file diff --git a/data/continents.json b/src/data/continents.json similarity index 100% rename from data/continents.json rename to src/data/continents.json diff --git a/data/countries.json b/src/data/countries.json similarity index 100% rename from data/countries.json rename to src/data/countries.json diff --git a/data/exchanges.json b/src/data/exchanges.json similarity index 100% rename from data/exchanges.json rename to src/data/exchanges.json diff --git a/src/data/index.txt b/src/data/index.txt new file mode 100644 index 0000000..547e36e --- /dev/null +++ b/src/data/index.txt @@ -0,0 +1,6 @@ +data/* + +companies.json +continents.json +countries.json +exchanges.json \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 3977876..4575a08 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,7 +40,7 @@ async fn main() -> anyhow::Result<()> { // === Corporate Earnings Update === println!("\nUpdating Corporate Earnings"); let tickers = config::get_tickers(); - corporate::run_full_update(&client, tickers, &config).await?; + corporate::run_full_update(&client, &config).await?; // === Cleanup === client.close().await?;