added function aggregating multiple ticker data

This commit is contained in:
2025-11-24 17:19:36 +01:00
parent 7b680f960f
commit 9cfcae84ea
14 changed files with 443 additions and 44 deletions

View File

@@ -0,0 +1,127 @@
// src/corporate/aggregation.rs
use super::types::CompanyPrice;
use super::storage::*;
use tokio::fs;
use std::collections::HashMap;
#[derive(Debug)]
struct DayData {
sources: Vec<CompanyPrice>,
total_volume: u64,
vwap: f64,
open: f64,
high: f64,
low: f64,
close: f64,
}
pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
let company_dir = get_company_dir(isin);
for timeframe in ["daily", "5min"].iter() {
let source_dir = company_dir.join(timeframe);
if !source_dir.exists() {
continue;
}
let mut all_prices: Vec<CompanyPrice> = Vec::new();
let mut by_date_time: HashMap<String, DayData> = HashMap::new();
// Load all sources
let mut entries = tokio::fs::read_dir(&source_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let source_dir = entry.path();
if !source_dir.is_dir() { continue; }
let prices_path = source_dir.join("prices.json");
if !prices_path.exists() { continue; }
let content = tokio::fs::read_to_string(&prices_path).await?;
let mut prices: Vec<CompanyPrice> = serde_json::from_str(&content)?;
all_prices.append(&mut prices);
}
if all_prices.is_empty() {
continue;
}
// Group by date + time (for 5min) or just date
for p in all_prices {
let key = if timeframe == &"5min" && !p.time.is_empty() {
format!("{}_{}", p.date, p.time)
} else {
p.date.clone()
};
let entry = by_date_time.entry(key.clone()).or_insert(DayData {
sources: vec![],
total_volume: 0,
vwap: 0.0,
open: p.open,
high: p.high,
low: p.low,
close: p.close,
});
let volume = p.volume.max(1); // avoid div0
let vwap_contrib = p.close * volume as f64;
entry.sources.push(p.clone());
entry.total_volume += volume;
entry.vwap += vwap_contrib;
// Use first open, last close, max high, min low
if entry.sources.len() == 1 {
entry.open = p.open;
}
entry.close = p.close;
entry.high = entry.high.max(p.high);
entry.low = entry.low.min(p.low);
}
// Convert to USD + finalize
let mut aggregated: Vec<CompanyPrice> = Vec::new();
let by_date_time_len: usize = by_date_time.len();
for (key, data) in by_date_time {
let vwap = data.vwap / data.total_volume as f64;
// Pick currency from highest volume source
let best_source = data.sources.iter()
.max_by_key(|p| p.volume)
.unwrap();
let usd_rate = super::fx::get_usd_rate(&best_source.currency).await.unwrap_or(1.0);
let (date, time) = if key.contains('_') {
let parts: Vec<&str> = key.split('_').collect();
(parts[0].to_string(), parts[1].to_string())
} else {
(key, "".to_string())
};
aggregated.push(CompanyPrice {
ticker: isin.to_string(), // use ISIN as unified ticker
date,
time,
open: data.open * usd_rate,
high: data.high * usd_rate,
low: data.low * usd_rate,
close: data.close * usd_rate,
adj_close: vwap * usd_rate,
volume: data.total_volume,
currency: "USD".to_string(), // aggregated to USD
});
}
aggregated.sort_by_key(|p| (p.date.clone(), p.time.clone()));
// Save aggregated result
let agg_dir = company_dir.join("aggregated").join(timeframe);
fs::create_dir_all(&agg_dir).await?;
let path = agg_dir.join("prices.json");
fs::write(&path, serde_json::to_string_pretty(&aggregated)?).await?;
println!(" Aggregated {} {} bars → {} sources", aggregated.len(), timeframe, by_date_time_len);
}
Ok(())
}

51
src/corporate/fx.rs Normal file
View File

@@ -0,0 +1,51 @@
// src/corporate/fx.rs
use std::collections::HashMap;
use reqwest;
use serde_json::Value;
use tokio::fs;
use std::path::Path;
static FX_CACHE_PATH: &str = "fx_rates.json";
pub async fn get_usd_rate(currency: &str) -> anyhow::Result<f64> {
if currency == "USD" {
return Ok(1.0);
}
let mut cache: HashMap<String, (f64, String)> = if Path::new(FX_CACHE_PATH).exists() {
let content = fs::read_to_string(FX_CACHE_PATH).await?;
serde_json::from_str(&content).unwrap_or_default()
} else {
HashMap::new()
};
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
if let Some((rate, date)) = cache.get(currency) {
if date == &today {
return Ok(*rate);
}
}
let symbol = format!("{}USD=X", currency);
let url = format!("https://query1.finance.yahoo.com/v8/finance/chart/{}?range=1d&interval=1d", symbol);
let json: Value = reqwest::Client::new()
.get(&url)
.header("User-Agent", "Mozilla/5.0")
.send()
.await?
.json()
.await?;
let close = json["chart"]["result"][0]["meta"]["regularMarketPrice"]
.as_f64()
.or_else(|| json["chart"]["result"][0]["indicators"]["quote"][0]["close"][0].as_f64())
.unwrap_or(1.0);
let rate = if currency == "JPY" || currency == "KRW" { close } else { 1.0 / close }; // inverse pairs
cache.insert(currency.to_string(), (rate, today.clone()));
let _ = fs::write(FX_CACHE_PATH, serde_json::to_string_pretty(&cache)?).await;
Ok(rate)
}

View File

@@ -50,3 +50,11 @@ pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Ve
changes changes
} }
pub fn price_key(p: &CompanyPrice) -> String {
if p.time.is_empty() {
format!("{}|{}", p.ticker, p.date)
} else {
format!("{}|{}|{}", p.ticker, p.date, p.time)
}
}

View File

@@ -4,6 +4,8 @@ pub mod scraper;
pub mod storage; pub mod storage;
pub mod update; pub mod update;
pub mod helpers; pub mod helpers;
pub mod aggregation;
pub mod fx;
pub use types::*; pub use types::*;
pub use update::run_full_update; pub use update::run_full_update;

View File

@@ -6,8 +6,8 @@ use chrono::{DateTime, Duration, NaiveDate, Timelike, Utc};
use tokio::time::{sleep, Duration as TokioDuration}; use tokio::time::{sleep, Duration as TokioDuration};
use reqwest::Client as HttpClient; use reqwest::Client as HttpClient;
use serde_json::Value; use serde_json::Value;
use yfinance_rs::{YfClient, Ticker, Range, Interval, HistoryBuilder}; //use yfinance_rs::{YfClient, Ticker, Range, Interval, HistoryBuilder};
use yfinance_rs::core::conversions::money_to_f64; //use yfinance_rs::core::conversions::money_to_f64;
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"; const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36";
@@ -182,21 +182,23 @@ pub async fn fetch_daily_price_history(
all_prices.push(CompanyPrice { all_prices.push(CompanyPrice {
ticker: ticker.to_string(), ticker: ticker.to_string(),
date: date_str, date: date_str,
time: "".to_string(), // Empty for daily
open, open,
high, high,
low, low,
close, close,
adj_close, adj_close,
volume, volume,
currency: "USD".to_string(), // Assuming USD for now
}); });
} }
sleep(TokioDuration::from_millis(200)); sleep(TokioDuration::from_millis(200)).await;
current = actual_end; current = actual_end;
} }
all_prices.sort_by_key(|p| p.date.clone()); all_prices.sort_by_key(|p| (p.date.clone(), p.time.clone()));
all_prices.dedup_by_key(|p| p.date.clone()); all_prices.dedup_by(|a, b| a.date == b.date && a.time == b.time);
println!(" Got {} daily bars for {ticker}", all_prices.len()); println!(" Got {} daily bars for {ticker}", all_prices.len());
Ok(all_prices) Ok(all_prices)
@@ -233,6 +235,7 @@ let now = Utc::now().timestamp();
let ts = ts_val.as_i64().unwrap_or(0); let ts = ts_val.as_i64().unwrap_or(0);
let dt: DateTime<Utc> = DateTime::from_timestamp(ts, 0).unwrap_or_default(); let dt: DateTime<Utc> = DateTime::from_timestamp(ts, 0).unwrap_or_default();
let date_str = dt.format("%Y-%m-%d").to_string(); let date_str = dt.format("%Y-%m-%d").to_string();
let time_str = dt.format("%H:%M:%S").to_string();
let open = parse_price(quote["open"].as_array().and_then(|a| a.get(i))); let open = parse_price(quote["open"].as_array().and_then(|a| a.get(i)));
let high = parse_price(quote["high"].as_array().and_then(|a| a.get(i))); let high = parse_price(quote["high"].as_array().and_then(|a| a.get(i)));
@@ -243,16 +246,18 @@ let now = Utc::now().timestamp();
prices.push(CompanyPrice { prices.push(CompanyPrice {
ticker: ticker.to_string(), ticker: ticker.to_string(),
date: date_str, date: date_str,
time: time_str, // Full time for 5min intraday
open, open,
high, high,
low, low,
close, close,
adj_close: close, // intraday usually not adjusted adj_close: close, // intraday usually not adjusted
volume, volume,
currency: "USD".to_string(), // Assuming USD for now
}); });
} }
prices.sort_by_key(|p| p.date.clone()); prices.sort_by_key(|p| (p.date.clone(), p.time.clone()));
Ok(prices) Ok(prices)
} }

View File

@@ -1,9 +1,9 @@
// src/corporate/storage.rs // src/corporate/storage.rs
use super::types::{CompanyEvent, CompanyPrice, CompanyEventChange}; use super::{types::*, helpers::*};
use super::helpers::*;
use tokio::fs; use tokio::fs;
use chrono::{Datelike, NaiveDate}; use chrono::{Datelike, NaiveDate};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::{Path, PathBuf};
pub async fn load_existing_events() -> anyhow::Result<HashMap<String, CompanyEvent>> { pub async fn load_existing_events() -> anyhow::Result<HashMap<String, CompanyEvent>> {
let mut map = HashMap::new(); let mut map = HashMap::new();
@@ -87,13 +87,120 @@ pub async fn save_changes(changes: &[CompanyEventChange]) -> anyhow::Result<()>
} }
pub async fn save_prices_for_ticker(ticker: &str, timeframe: &str, mut prices: Vec<CompanyPrice>) -> anyhow::Result<()> { pub async fn save_prices_for_ticker(ticker: &str, timeframe: &str, mut prices: Vec<CompanyPrice>) -> anyhow::Result<()> {
let dir = std::path::Path::new("corporate_prices"); let base_dir = Path::new("corporate_prices");
fs::create_dir_all(dir).await?; let company_dir = base_dir.join(ticker.replace(".", "_"));
let path = dir.join(format!("{}_{}.json", ticker.replace(".", "_"), timeframe)); let timeframe_dir = company_dir.join(timeframe);
prices.sort_by_key(|p| p.date.clone()); // Ensure directories exist
fs::create_dir_all(&timeframe_dir).await?;
// File name includes timeframe (e.g., prices.json)
let path = timeframe_dir.join("prices.json");
prices.sort_by_key(|p| (p.date.clone(), p.time.clone()));
let json = serde_json::to_string_pretty(&prices)?; let json = serde_json::to_string_pretty(&prices)?;
fs::write(&path, json).await?; fs::write(&path, json).await?;
Ok(()) Ok(())
} }
pub async fn load_companies() -> Result<Vec<CompanyMetadata>, anyhow::Error> {
let path = Path::new("src/data/companies.json");
if !path.exists() {
println!("Missing companies.json file at src/data/companies.json");
return Ok(vec![]);
}
let content = fs::read_to_string(path).await?;
let companies: Vec<CompanyMetadata> = serde_json::from_str(&content)?;
Ok(companies)
}
pub fn get_company_dir(isin: &str) -> PathBuf {
PathBuf::from("corporate_prices").join(isin)
}
pub async fn ensure_company_dirs(isin: &str) -> anyhow::Result<()> {
let base = get_company_dir(isin);
let paths = [
base.clone(),
base.join("5min"),
base.join("daily"),
base.join("aggregated").join("5min"),
base.join("aggregated").join("daily"),
];
for p in paths {
fs::create_dir_all(&p).await?;
}
Ok(())
}
pub async fn save_company_metadata(company: &CompanyMetadata) -> anyhow::Result<()> {
let dir = get_company_dir(&company.isin);
fs::create_dir_all(&dir).await?;
let path = dir.join("metadata.json");
fs::write(path, serde_json::to_string_pretty(company)?).await?;
Ok(())
}
pub async fn save_available_exchanges(isin: &str, exchanges: Vec<AvailableExchange>) -> anyhow::Result<()> {
let dir = get_company_dir(isin);
fs::create_dir_all(&dir).await?;
let path = dir.join("available_exchanges.json");
fs::write(&path, serde_json::to_string_pretty(&exchanges)?).await?;
Ok(())
}
pub async fn load_available_exchanges(isin: &str) -> anyhow::Result<Vec<AvailableExchange>> {
let path = get_company_dir(isin).join("available_exchanges.json");
if path.exists() {
let content = fs::read_to_string(&path).await?;
Ok(serde_json::from_str(&content)?)
} else {
Ok(vec![])
}
}
pub async fn save_prices_by_source(
isin: &str,
source_ticker: &str,
timeframe: &str,
prices: Vec<CompanyPrice>,
) -> anyhow::Result<()> {
let source_safe = source_ticker.replace(".", "_").replace("/", "_");
let dir = get_company_dir(isin).join(timeframe).join(&source_safe);
fs::create_dir_all(&dir).await?;
let path = dir.join("prices.json");
let mut prices = prices;
prices.sort_by_key(|p| (p.date.clone(), p.time.clone()));
fs::write(&path, serde_json::to_string_pretty(&prices)?).await?;
Ok(())
}
pub async fn update_available_exchange(
isin: &str,
ticker: &str,
exchange_mic: &str,
has_daily: bool,
has_5min: bool,
) -> anyhow::Result<()> {
let mut exchanges = load_available_exchanges(isin).await?;
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
if let Some(entry) = exchanges.iter_mut().find(|e| e.ticker == ticker) {
entry.has_daily |= has_daily;
entry.has_5min |= has_5min;
entry.last_successful_fetch = Some(today);
} else {
exchanges.push(AvailableExchange {
exchange_mic: exchange_mic.to_string(),
ticker: ticker.to_string(),
has_daily,
has_5min,
last_successful_fetch: Some(today),
});
}
save_available_exchanges(isin, exchanges).await
}

View File

@@ -19,12 +19,14 @@ pub struct CompanyEvent {
pub struct CompanyPrice { pub struct CompanyPrice {
pub ticker: String, pub ticker: String,
pub date: String, // YYYY-MM-DD pub date: String, // YYYY-MM-DD
pub time: String, // HH:MM:SS for intraday, "" for daily
pub open: f64, pub open: f64,
pub high: f64, pub high: f64,
pub low: f64, pub low: f64,
pub close: f64, pub close: f64,
pub adj_close: f64, pub adj_close: f64,
pub volume: u64, pub volume: u64,
pub currency: String,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -36,3 +38,27 @@ pub struct CompanyEventChange {
pub new_value: String, pub new_value: String,
pub detected_at: String, pub detected_at: String,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TickerInfo {
pub ticker: String,
pub exchange_mic: String,
pub currency: String,
pub primary: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyMetadata {
pub isin: String,
pub name: String,
pub tickers: Vec<TickerInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AvailableExchange {
pub exchange_mic: String,
pub ticker: String,
pub has_daily: bool,
pub has_5min: bool,
pub last_successful_fetch: Option<String>, // YYYY-MM-DD
}

View File

@@ -1,54 +1,80 @@
// src/corporate/update.rs // src/corporate/update.rs
use super::{scraper::*, storage::*, helpers::*, types::*}; use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::aggregate_best_price_data};
use crate::config::Config; use crate::config::Config;
use yfinance_rs::{Range, Interval}; use yfinance_rs::{Range, Interval};
use chrono::Local; use chrono::Local;
use std::collections::HashMap; use std::collections::HashMap;
pub async fn run_full_update(client: &fantoccini::Client, tickers: Vec<String>, config: &Config) -> anyhow::Result<()> { pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> anyhow::Result<()> {
println!("Updating {} tickers (prices from {})", tickers.len(), config.corporate_start_date); println!("Starting company-centric corporate update (ISIN-based)");
let companies = load_companies().await?;
let today = chrono::Local::now().format("%Y-%m-%d").to_string(); let today = chrono::Local::now().format("%Y-%m-%d").to_string();
let mut existing = load_existing_events().await?; let mut existing_events = load_existing_events().await?;
for ticker in &tickers { for company in companies {
print!("{:6} ", ticker); println!("\nProcessing company: {} ({})", company.name, company.isin);
if let Ok(new_events) = fetch_earnings_history(client, ticker).await { ensure_company_dirs(&company.isin).await?;
let result = process_batch(&new_events, &mut existing, &today); save_company_metadata(&company).await?;
save_changes(&result.changes).await?;
println!("{} earnings, {} changes", new_events.len(), result.changes.len());
}
// DAILY full history for ticker_info in &company.tickers {
if let Ok(prices) = fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await { let ticker = &ticker_info.ticker;
save_prices_for_ticker(ticker, "daily", prices).await?; println!(" → Trying ticker: {ticker} ({})", ticker_info.exchange_mic);
}
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; let mut daily_success = false;
let mut intraday_success = false;
// 5-MINUTE only last 60 days (Yahoo limit for intraday) // Earnings
let sixty_days_ago = (chrono::Local::now() - chrono::Duration::days(60)) if let Ok(new_events) = fetch_earnings_history(client, ticker).await {
.format("%Y-%m-%d") let result = process_batch(&new_events, &mut existing_events, &today);
.to_string(); save_changes(&result.changes).await?;
println!(" {} earnings events", new_events.len());
if let Ok(prices) = fetch_price_history_5min(ticker, &sixty_days_ago, &today).await {
if !prices.is_empty() {
save_prices_for_ticker(ticker, "5min", prices.clone()).await?;
println!(" Saved {} 5min bars for {ticker}", prices.len());
} else {
println!(" No 5min data available for {ticker} (market closed? retry later)");
} }
} else {
println!(" 5min fetch failed for {ticker} (rate limit? try again)"); // Daily prices
if let Ok(prices) = fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await {
if !prices.is_empty() {
save_prices_by_source(&company.isin, ticker, "daily", prices).await?;
daily_success = true;
}
}
// 5-minute prices (last 60 days)
let sixty_days_ago = (chrono::Local::now() - chrono::Duration::days(60))
.format("%Y-%m-%d")
.to_string();
if let Ok(prices) = fetch_price_history_5min(ticker, &sixty_days_ago, &today).await {
if !prices.is_empty() {
save_prices_by_source(&company.isin, ticker, "5min", prices.clone()).await?;
intraday_success = true;
println!(" Saved {} 5min bars from {ticker}", prices.len());
}
}
// Record success
update_available_exchange(
&company.isin,
ticker,
&ticker_info.exchange_mic,
daily_success,
intraday_success,
).await?;
aggregate_best_price_data(&company.isin).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(800)).await;
} }
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await; // Optional: run aggregation after all sources
// aggregate_best_price_data(&company.isin).await?;
} }
save_optimized_events(existing).await?; save_optimized_events(existing_events).await?;
println!("Corporate update complete (ISIN-based)");
Ok(()) Ok(())
} }

41
src/data/companies.json Normal file
View File

@@ -0,0 +1,41 @@
[
{
"isin": "US46625H1005",
"name": "JPMorgan Chase & Co.",
"tickers": [
{ "ticker": "JPM", "exchange_mic": "XNYS", "currency": "USD", "primary": true },
{ "ticker": "JPM-PC", "exchange_mic": "XNYS", "currency": "USD", "primary": false }
]
},
{
"isin": "US5949181045",
"name": "Microsoft Corporation",
"tickers": [
{ "ticker": "MSFT", "exchange_mic": "XNAS", "currency": "USD", "primary": true }
]
},
{
"isin": "CNE000001P37",
"name": "Industrial and Commercial Bank of China",
"tickers": [
{ "ticker": "601398.SS", "exchange_mic": "XSHG", "currency": "CNY", "primary": true },
{ "ticker": "1398.HK", "exchange_mic": "XHKG", "currency": "HKD", "primary": false }
]
},
{
"isin": "JP3702200000",
"name": "Toyota Motor Corporation",
"tickers": [
{ "ticker": "7203.T", "exchange_mic": "XJPX", "currency": "JPY", "primary": true },
{ "ticker": "TM", "exchange_mic": "XNYS", "currency": "USD", "primary": false }
]
},
{
"isin": "HK0000069689",
"name": "Tencent Holdings Limited",
"tickers": [
{ "ticker": "0700.HK", "exchange_mic": "XHKG", "currency": "HKD", "primary": true },
{ "ticker": "TCEHY", "exchange_mic": "OTCM", "currency": "USD", "primary": false }
]
}
]

6
src/data/index.txt Normal file
View File

@@ -0,0 +1,6 @@
data/*
companies.json
continents.json
countries.json
exchanges.json

View File

@@ -40,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
// === Corporate Earnings Update === // === Corporate Earnings Update ===
println!("\nUpdating Corporate Earnings"); println!("\nUpdating Corporate Earnings");
let tickers = config::get_tickers(); let tickers = config::get_tickers();
corporate::run_full_update(&client, tickers, &config).await?; corporate::run_full_update(&client, &config).await?;
// === Cleanup === // === Cleanup ===
client.close().await?; client.close().await?;