storing data for multiple exchanges for a single isin
This commit is contained in:
@@ -6,7 +6,7 @@ use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct DayData {
|
||||
sources: Vec<CompanyPrice>,
|
||||
sources: Vec<(CompanyPrice, String)>, // (price, source_ticker)
|
||||
total_volume: u64,
|
||||
vwap: f64,
|
||||
open: f64,
|
||||
@@ -15,6 +15,7 @@ struct DayData {
|
||||
close: f64,
|
||||
}
|
||||
|
||||
/// Aggregate price data from multiple exchanges, converting all to USD
|
||||
pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
|
||||
let company_dir = get_company_dir(isin);
|
||||
|
||||
@@ -24,74 +25,103 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut all_prices: Vec<CompanyPrice> = Vec::new();
|
||||
let mut all_prices: Vec<(CompanyPrice, String)> = Vec::new();
|
||||
let mut by_date_time: HashMap<String, DayData> = HashMap::new();
|
||||
|
||||
// Load all sources
|
||||
// Load all sources with their ticker names
|
||||
let mut entries = tokio::fs::read_dir(&source_dir).await?;
|
||||
let mut source_count = 0;
|
||||
let mut sources_used = std::collections::HashSet::new();
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let source_dir = entry.path();
|
||||
if !source_dir.is_dir() { continue; }
|
||||
let prices_path = source_dir.join("prices.json");
|
||||
let source_dir_path = entry.path();
|
||||
if !source_dir_path.is_dir() { continue; }
|
||||
|
||||
let source_ticker = source_dir_path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
let prices_path = source_dir_path.join("prices.json");
|
||||
if !prices_path.exists() { continue; }
|
||||
|
||||
let content = tokio::fs::read_to_string(&prices_path).await?;
|
||||
let mut prices: Vec<CompanyPrice> = serde_json::from_str(&content)?;
|
||||
all_prices.append(&mut prices);
|
||||
|
||||
if !prices.is_empty() {
|
||||
sources_used.insert(source_ticker.clone());
|
||||
source_count += 1;
|
||||
}
|
||||
|
||||
for price in prices {
|
||||
all_prices.push((price, source_ticker.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
if all_prices.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
println!(" Aggregating from {} exchanges: {}",
|
||||
sources_used.len(),
|
||||
sources_used.iter()
|
||||
.map(|s| s.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
);
|
||||
|
||||
// Group by date + time (for 5min) or just date
|
||||
for p in all_prices {
|
||||
for (p, source) in all_prices {
|
||||
let key = if timeframe == &"5min" && !p.time.is_empty() {
|
||||
format!("{}_{}", p.date, p.time)
|
||||
} else {
|
||||
p.date.clone()
|
||||
};
|
||||
|
||||
// Convert to USD immediately
|
||||
let usd_rate = super::fx::get_usd_rate(&p.currency).await.unwrap_or(1.0);
|
||||
|
||||
let mut p_usd = p.clone();
|
||||
p_usd.open *= usd_rate;
|
||||
p_usd.high *= usd_rate;
|
||||
p_usd.low *= usd_rate;
|
||||
p_usd.close *= usd_rate;
|
||||
p_usd.adj_close *= usd_rate;
|
||||
p_usd.currency = "USD".to_string();
|
||||
|
||||
let entry = by_date_time.entry(key.clone()).or_insert(DayData {
|
||||
sources: vec![],
|
||||
total_volume: 0,
|
||||
vwap: 0.0,
|
||||
open: p.open,
|
||||
high: p.high,
|
||||
low: p.low,
|
||||
close: p.close,
|
||||
open: p_usd.open,
|
||||
high: p_usd.high,
|
||||
low: p_usd.low,
|
||||
close: p_usd.close,
|
||||
});
|
||||
|
||||
let volume = p.volume.max(1); // avoid div0
|
||||
let vwap_contrib = p.close * volume as f64;
|
||||
let vwap_contrib = p_usd.close * volume as f64;
|
||||
|
||||
entry.sources.push(p.clone());
|
||||
entry.sources.push((p_usd.clone(), source));
|
||||
entry.total_volume += volume;
|
||||
entry.vwap += vwap_contrib;
|
||||
|
||||
// Use first open, last close, max high, min low
|
||||
if entry.sources.len() == 1 {
|
||||
entry.open = p.open;
|
||||
entry.open = p_usd.open;
|
||||
}
|
||||
entry.close = p.close;
|
||||
entry.high = entry.high.max(p.high);
|
||||
entry.low = entry.low.min(p.low);
|
||||
entry.close = p_usd.close;
|
||||
entry.high = entry.high.max(p_usd.high);
|
||||
entry.low = entry.low.min(p_usd.low);
|
||||
}
|
||||
|
||||
// Convert to USD + finalize
|
||||
// Finalize aggregated data
|
||||
let mut aggregated: Vec<CompanyPrice> = Vec::new();
|
||||
let by_date_time_len: usize = by_date_time.len();
|
||||
|
||||
for (key, data) in by_date_time {
|
||||
let vwap = data.vwap / data.total_volume as f64;
|
||||
|
||||
// Pick currency from highest volume source
|
||||
let best_source = data.sources.iter()
|
||||
.max_by_key(|p| p.volume)
|
||||
.unwrap();
|
||||
|
||||
let usd_rate = super::fx::get_usd_rate(&best_source.currency).await.unwrap_or(1.0);
|
||||
|
||||
let (date, time) = if key.contains('_') {
|
||||
let parts: Vec<&str> = key.split('_').collect();
|
||||
(parts[0].to_string(), parts[1].to_string())
|
||||
@@ -99,17 +129,23 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
|
||||
(key, "".to_string())
|
||||
};
|
||||
|
||||
// Track which exchange contributed most volume
|
||||
let best_source = data.sources.iter()
|
||||
.max_by_key(|(p, _)| p.volume)
|
||||
.map(|(_, src)| src.clone())
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
|
||||
aggregated.push(CompanyPrice {
|
||||
ticker: isin.to_string(), // use ISIN as unified ticker
|
||||
ticker: format!("{}@agg", isin), // Mark as aggregated
|
||||
date,
|
||||
time,
|
||||
open: data.open * usd_rate,
|
||||
high: data.high * usd_rate,
|
||||
low: data.low * usd_rate,
|
||||
close: data.close * usd_rate,
|
||||
adj_close: vwap * usd_rate,
|
||||
open: data.open,
|
||||
high: data.high,
|
||||
low: data.low,
|
||||
close: data.close,
|
||||
adj_close: vwap,
|
||||
volume: data.total_volume,
|
||||
currency: "USD".to_string(), // aggregated to USD
|
||||
currency: "USD".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -120,8 +156,39 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
|
||||
fs::create_dir_all(&agg_dir).await?;
|
||||
let path = agg_dir.join("prices.json");
|
||||
fs::write(&path, serde_json::to_string_pretty(&aggregated)?).await?;
|
||||
println!(" Aggregated {} {} bars → {} sources", aggregated.len(), timeframe, by_date_time_len);
|
||||
|
||||
// Save aggregation metadata
|
||||
let meta = AggregationMetadata {
|
||||
isin: isin.to_string(),
|
||||
timeframe: timeframe.to_string(),
|
||||
sources: sources_used.into_iter().collect(),
|
||||
total_bars: aggregated.len(),
|
||||
date_range: (
|
||||
aggregated.first().map(|p| p.date.clone()).unwrap_or_default(),
|
||||
aggregated.last().map(|p| p.date.clone()).unwrap_or_default(),
|
||||
),
|
||||
aggregated_at: chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
|
||||
};
|
||||
|
||||
let meta_path = agg_dir.join("metadata.json");
|
||||
fs::write(&meta_path, serde_json::to_string_pretty(&meta)?).await?;
|
||||
|
||||
println!(" ✓ {} {} bars from {} sources (USD)",
|
||||
aggregated.len(),
|
||||
timeframe,
|
||||
source_count
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
struct AggregationMetadata {
|
||||
isin: String,
|
||||
timeframe: String,
|
||||
sources: Vec<String>,
|
||||
total_bars: usize,
|
||||
date_range: (String, String),
|
||||
aggregated_at: String,
|
||||
}
|
||||
Reference in New Issue
Block a user