// src/corporate/storage.rs use super::{types::*, helpers::*}; use crate::util::directories::DataPaths; use crate::util::logger; use tokio::fs; use tokio::io::AsyncWriteExt; use chrono::{Datelike, NaiveDate}; use std::collections::{HashMap}; use std::path::{PathBuf}; pub async fn load_existing_events(paths: &DataPaths) -> anyhow::Result> { let mut map = HashMap::new(); let dir = paths.corporate_events_dir(); if !dir.exists() { logger::log_info("Corporate Storage: No existing events directory found").await; return Ok(map); } let mut entries = fs::read_dir(dir).await?; let mut loaded_count = 0; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.extension().and_then(|s| s.to_str()) == Some("json") { let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); if name.starts_with("events_") && name.len() == 17 { let content = fs::read_to_string(&path).await?; let events: Vec = serde_json::from_str(&content)?; for event in events { map.insert(event_key(&event), event); } loaded_count += 1; } } } logger::log_info(&format!("Corporate Storage: Loaded {} events from {} files", map.len(), loaded_count)).await; Ok(map) } pub async fn save_optimized_events(paths: &DataPaths, events: HashMap) -> anyhow::Result<()> { let dir = paths.corporate_events_dir(); fs::create_dir_all(dir).await?; logger::log_info("Corporate Storage: Removing old event files...").await; let mut removed_count = 0; let mut entries = fs::read_dir(dir).await?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); if name.starts_with("events_") && path.extension().map(|e| e == "json").unwrap_or(false) { fs::remove_file(&path).await?; removed_count += 1; } } logger::log_info(&format!("Corporate Storage: Removed {} old event files", removed_count)).await; let total_events = events.len(); let mut sorted: Vec<_> = events.into_values().collect(); sorted.sort_by_key(|e| (e.ticker.clone(), e.date.clone())); let mut by_month: HashMap> = HashMap::new(); for e in sorted { if let Ok(d) = NaiveDate::parse_from_str(&e.date, "%Y-%m-%d") { let key = format!("{}-{:02}", d.year(), d.month()); by_month.entry(key).or_default().push(e); } } let total_months = by_month.len(); for (month, list) in by_month { let path = dir.join(format!("events_{}.json", month)); fs::write(&path, serde_json::to_string_pretty(&list)?).await?; logger::log_info(&format!("Corporate Storage: Saved {} events for month {}", list.len(), month)).await; } logger::log_info(&format!("Corporate Storage: Saved {} total events in {} month files", total_events, total_months)).await; Ok(()) } pub async fn save_changes(paths: &DataPaths, changes: &[CompanyEventChange]) -> anyhow::Result<()> { if changes.is_empty() { logger::log_info("Corporate Storage: No changes to save").await; return Ok(()); } let dir = paths.corporate_changes_dir(); fs::create_dir_all(dir).await?; logger::log_info(&format!("Corporate Storage: Saving {} changes", changes.len())).await; let mut by_month: HashMap> = HashMap::new(); for c in changes { if let Ok(d) = NaiveDate::parse_from_str(&c.date, "%Y-%m-%d") { let key = format!("{}-{:02}", d.year(), d.month()); by_month.entry(key).or_default().push(c.clone()); } } for (month, list) in by_month { let path = dir.join(format!("changes_{}.json", month)); let mut all = if path.exists() { let s = fs::read_to_string(&path).await?; serde_json::from_str(&s).unwrap_or_default() } else { vec![] }; all.extend(list.clone()); fs::write(&path, serde_json::to_string_pretty(&all)?).await?; logger::log_info(&format!("Corporate Storage: Saved {} changes for month {}", list.len(), month)).await; } logger::log_info("Corporate Storage: All changes saved successfully").await; Ok(()) } pub async fn save_prices_for_ticker(paths: &DataPaths, ticker: &str, timeframe: &str, mut prices: Vec) -> anyhow::Result<()> { let base_dir = paths.corporate_prices_dir(); let company_dir = base_dir.join(ticker.replace(".", "_")); let timeframe_dir = company_dir.join(timeframe); fs::create_dir_all(&timeframe_dir).await?; let path = timeframe_dir.join("prices.json"); prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); let json = serde_json::to_string_pretty(&prices)?; fs::write(&path, json).await?; Ok(()) } pub fn get_company_dir(paths: &DataPaths, lei: &str) -> PathBuf { paths.corporate_prices_dir().join(lei) } pub async fn ensure_company_dirs(paths: &DataPaths, isin: &str) -> anyhow::Result<()> { let base = get_company_dir(paths, isin); let paths_to_create = [ base.clone(), base.join("5min"), base.join("daily"), base.join("aggregated").join("5min"), base.join("aggregated").join("daily"), ]; for p in paths_to_create { fs::create_dir_all(&p).await?; } Ok(()) } pub async fn save_available_exchanges(paths: &DataPaths, isin: &str, exchanges: Vec) -> anyhow::Result<()> { let dir = get_company_dir(paths, isin); fs::create_dir_all(&dir).await?; let path = dir.join("available_exchanges.json"); fs::write(&path, serde_json::to_string_pretty(&exchanges)?).await?; Ok(()) } pub async fn load_available_exchanges(paths: &DataPaths, lei: &str) -> anyhow::Result> { let path = get_company_dir(paths, lei).join("available_exchanges.json"); if path.exists() { let content = fs::read_to_string(&path).await?; Ok(serde_json::from_str(&content)?) } else { Ok(vec![]) } } pub async fn save_prices_by_source( paths: &DataPaths, lei: &str, source_ticker: &str, timeframe: &str, prices: Vec, ) -> anyhow::Result<()> { let source_safe = source_ticker.replace(".", "_").replace("/", "_"); let dir = get_company_dir(paths, lei).join(timeframe).join(&source_safe); fs::create_dir_all(&dir).await?; let path = dir.join("prices.json"); let mut prices = prices; prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); fs::write(&path, serde_json::to_string_pretty(&prices)?).await?; Ok(()) } /// Update available_exchanges.json with fetch results /*pub async fn update_available_exchange( paths: &DataPaths, isin: &str, ticker: &str, exchange_mic: &str, has_daily: bool, has_5min: bool, ) -> anyhow::Result<()> { let mut exchanges = load_available_exchanges(paths, isin).await?; if let Some(entry) = exchanges.iter_mut().find(|e| e.ticker == ticker) { // Update existing entry entry.record_success(has_daily, has_5min); } else { // Create new entry - need to get currency from somewhere // Try to infer from the ticker or use a default let currency = infer_currency_from_ticker(ticker); let mut new_entry = AvailableExchange::new( ticker.to_string(), exchange_mic.to_string(), currency, ); new_entry.record_success(has_daily, has_5min); exchanges.push(new_entry); } save_available_exchanges(paths, isin, exchanges).await }*/ /// Infer currency from ticker suffix fn infer_currency_from_ticker(ticker: &str) -> String { if ticker.ends_with(".L") { return "GBP".to_string(); } if ticker.ends_with(".PA") { return "EUR".to_string(); } if ticker.ends_with(".DE") { return "EUR".to_string(); } if ticker.ends_with(".AS") { return "EUR".to_string(); } if ticker.ends_with(".MI") { return "EUR".to_string(); } if ticker.ends_with(".SW") { return "CHF".to_string(); } if ticker.ends_with(".T") { return "JPY".to_string(); } if ticker.ends_with(".HK") { return "HKD".to_string(); } if ticker.ends_with(".SS") { return "CNY".to_string(); } if ticker.ends_with(".SZ") { return "CNY".to_string(); } if ticker.ends_with(".TO") { return "CAD".to_string(); } if ticker.ends_with(".AX") { return "AUD".to_string(); } if ticker.ends_with(".SA") { return "BRL".to_string(); } if ticker.ends_with(".MC") { return "EUR".to_string(); } if ticker.ends_with(".BO") || ticker.ends_with(".NS") { return "INR".to_string(); } "USD".to_string() // Default } /// Saves companies data to a JSONL file. /// /// # Arguments /// * `paths` - Reference to DataPaths for directory management /// * `companies` - HashMap of company names to their securities (ISIN, Ticker pairs) /// /// # Errors /// Returns an error if file operations or serialization fails. pub async fn save_companies_to_jsonl( paths: &DataPaths, companies: &HashMap>, ) -> anyhow::Result<()> { let file_path = paths.data_dir().join("companies.jsonl"); logger::log_info(&format!("Corporate Storage: Saving {} companies to JSONL", companies.len())).await; // Create parent directory if it doesn't exist if let Some(parent) = file_path.parent() { tokio::fs::create_dir_all(parent).await?; } let mut file = tokio::fs::File::create(&file_path).await?; for (name, securities) in companies.iter() { let line = serde_json::json!({ "name": name, "securities": securities }); file.write_all(line.to_string().as_bytes()).await?; file.write_all(b"\n").await?; } let msg = format!("✓ Saved {} companies to {:?}", companies.len(), file_path); println!("{}", msg); logger::log_info(&msg).await; Ok(()) }