diff --git a/src/corporate/aggregation.rs b/src/corporate/aggregation.rs index 555a920..4648e6b 100644 --- a/src/corporate/aggregation.rs +++ b/src/corporate/aggregation.rs @@ -48,7 +48,7 @@ pub async fn aggregate_best_price_data(paths: &DataPaths, lei: &str) -> anyhow:: if !prices_path.exists() { continue; } let content = tokio::fs::read_to_string(&prices_path).await?; - let mut prices: Vec = serde_json::from_str(&content)?; + let prices: Vec = serde_json::from_str(&content)?; if !prices.is_empty() { sources_used.insert(source_ticker.clone()); @@ -80,8 +80,8 @@ pub async fn aggregate_best_price_data(paths: &DataPaths, lei: &str) -> anyhow:: p.date.clone() }; - // Convert to USD immediately - let usd_rate = super::fx::get_usd_rate(&p.currency).await.unwrap_or(1.0); + // Convert to USD immediately DUMMY ------------------------------------------------------------------------------------------- + let usd_rate = 0.1; let mut p_usd = p.clone(); p_usd.open *= usd_rate; diff --git a/src/corporate/fx.rs b/src/corporate/fx.rs deleted file mode 100644 index 801ce7a..0000000 --- a/src/corporate/fx.rs +++ /dev/null @@ -1,51 +0,0 @@ -// src/corporate/fx.rs -use std::collections::HashMap; -use reqwest; -use serde_json::Value; -use tokio::fs; -use std::path::Path; - -static FX_CACHE_PATH: &str = "fx_rates.json"; - -pub async fn get_usd_rate(currency: &str) -> anyhow::Result { - if currency == "USD" { - return Ok(1.0); - } - - let mut cache: HashMap = if Path::new(FX_CACHE_PATH).exists() { - let content = fs::read_to_string(FX_CACHE_PATH).await?; - serde_json::from_str(&content).unwrap_or_default() - } else { - HashMap::new() - }; - - let today = chrono::Local::now().format("%Y-%m-%d").to_string(); - if let Some((rate, date)) = cache.get(currency) { - if date == &today { - return Ok(*rate); - } - } - - let symbol = format!("{}USD=X", currency); - let url = format!("https://query1.finance.yahoo.com/v8/finance/chart/{}?range=1d&interval=1d", symbol); - - let json: Value = reqwest::Client::new() - .get(&url) - .header("User-Agent", "Mozilla/5.0") - .send() - .await? - .json() - .await?; - - let close = json["chart"]["result"][0]["meta"]["regularMarketPrice"] - .as_f64() - .or_else(|| json["chart"]["result"][0]["indicators"]["quote"][0]["close"][0].as_f64()) - .unwrap_or(1.0); - - let rate = if currency == "JPY" || currency == "KRW" { close } else { 1.0 / close }; // inverse pairs - - cache.insert(currency.to_string(), (rate, today.clone())); - let _ = fs::write(FX_CACHE_PATH, serde_json::to_string_pretty(&cache)?).await; - - Ok(rate) -} \ No newline at end of file diff --git a/src/corporate/helpers.rs b/src/corporate/helpers.rs index ba00548..c643a7f 100644 --- a/src/corporate/helpers.rs +++ b/src/corporate/helpers.rs @@ -1,7 +1,6 @@ // src/corporate/helpers.rs use super::types::*; use chrono::{Local, NaiveDate}; -use std::collections::{HashMap, HashSet}; use rand::rngs::StdRng; use rand::prelude::{Rng, SeedableRng, IndexedRandom}; @@ -74,7 +73,7 @@ pub fn parse_yahoo_date(s: &str) -> anyhow::Result { /// Send-safe random range pub fn random_range(min: u64, max: u64) -> u64 { let mut rng = StdRng::from_rng(&mut rand::rng()); - rng.gen_range(min..max) + rng.random_range(min..max) } /// Send-safe random choice diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 21d16d0..83e3d77 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -4,9 +4,8 @@ pub mod scraper; pub mod storage; pub mod helpers; pub mod aggregation; -pub mod fx; pub mod openfigi; -pub mod yahoo; +pub mod yahoo_company_extraction; pub mod page_validation; pub mod atomic_writer; diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index 5b10b92..9edfd66 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -5,11 +5,9 @@ use crate::util::logger; use tokio::fs; use tokio::io::AsyncWriteExt; -use chrono::{Datelike, NaiveDate}; use std::collections::HashMap; use std::path::{PathBuf, Path}; -const BATCH_SIZE: usize = 500; /// Lightweight index entry - only metadata, no full event data #[derive(Debug, Clone)] @@ -55,162 +53,6 @@ pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result anyhow::Result> { - let entry = index.iter().find(|e| e.key == key); - - if let Some(entry) = entry { - let content = fs::read_to_string(&entry.file_path).await?; - let events: Vec = serde_json::from_str(&content)?; - Ok(events.into_iter().find(|e| event_key(e) == key)) - } else { - Ok(None) - } -} - -/// Stream events file by file with callback -pub async fn stream_events_with_callback( - paths: &DataPaths, - mut callback: F -) -> anyhow::Result -where - F: FnMut(CompanyEvent) -> anyhow::Result<()>, -{ - let dir = paths.corporate_events_dir(); - if !dir.exists() { - return Ok(0); - } - - let mut total = 0; - let mut entries = fs::read_dir(dir).await?; - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.extension().and_then(|s| s.to_str()) == Some("json") { - let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); - if name.starts_with("events_") { - let content = fs::read_to_string(&path).await?; - let events: Vec = serde_json::from_str(&content)?; - - for event in events { - callback(event)?; - total += 1; - } - - tokio::task::yield_now().await; - } - } - } - - logger::log_info(&format!("Corporate Storage: Streamed {} events", total)).await; - Ok(total) -} - -/// Save events organized by month (accepts Vec, not HashMap) -pub async fn save_optimized_events( - paths: &DataPaths, - events: Vec -) -> anyhow::Result<()> { - let dir = paths.corporate_events_dir(); - fs::create_dir_all(dir).await?; - - logger::log_info("Corporate Storage: Removing old event files...").await; - let mut removed_count = 0; - let mut entries = fs::read_dir(dir).await?; - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); - if name.starts_with("events_") && path.extension().map(|e| e == "json").unwrap_or(false) { - fs::remove_file(&path).await?; - removed_count += 1; - } - } - logger::log_info(&format!("Corporate Storage: Removed {} old files", removed_count)).await; - - let total_events = events.len(); - let mut sorted = events; - sorted.sort_by(|a, b| { - a.ticker.cmp(&b.ticker).then(a.date.cmp(&b.date)) - }); - - let mut by_month: HashMap> = HashMap::new(); - - for chunk in sorted.chunks(BATCH_SIZE) { - for e in chunk { - if let Ok(d) = NaiveDate::parse_from_str(&e.date, "%Y-%m-%d") { - let key = format!("{}-{:02}", d.year(), d.month()); - by_month.entry(key).or_default().push(e.clone()); - } - } - tokio::task::yield_now().await; - } - - for (month, list) in by_month { - let path = dir.join(format!("events_{}.json", month)); - fs::write(&path, serde_json::to_string_pretty(&list)?).await?; - logger::log_info(&format!("Saved {} events for month {}", list.len(), month)).await; - } - - logger::log_info(&format!("Saved {} total events", total_events)).await; - Ok(()) -} - -pub async fn save_changes( - paths: &DataPaths, - changes: &[CompanyEventChange] -) -> anyhow::Result<()> { - if changes.is_empty() { - logger::log_info("Corporate Storage: No changes to save").await; - return Ok(()); - } - - let dir = paths.corporate_changes_dir(); - fs::create_dir_all(dir).await?; - - let mut by_month: HashMap> = HashMap::new(); - for c in changes { - if let Ok(d) = NaiveDate::parse_from_str(&c.date, "%Y-%m-%d") { - let key = format!("{}-{:02}", d.year(), d.month()); - by_month.entry(key).or_default().push(c.clone()); - } - } - - for (month, list) in by_month { - let path = dir.join(format!("changes_{}.json", month)); - let mut all = if path.exists() { - let s = fs::read_to_string(&path).await?; - serde_json::from_str(&s).unwrap_or_default() - } else { - vec![] - }; - all.extend(list.clone()); - fs::write(&path, serde_json::to_string_pretty(&all)?).await?; - } - - Ok(()) -} - -pub async fn save_prices_for_ticker( - paths: &DataPaths, - ticker: &str, - timeframe: &str, - mut prices: Vec -) -> anyhow::Result<()> { - let base_dir = paths.corporate_prices_dir(); - let company_dir = base_dir.join(ticker.replace(".", "_")); - let timeframe_dir = company_dir.join(timeframe); - - fs::create_dir_all(&timeframe_dir).await?; - let path = timeframe_dir.join("prices.json"); - - prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); - fs::write(&path, serde_json::to_string_pretty(&prices)?).await?; - Ok(()) -} - pub fn get_company_dir(paths: &DataPaths, lei: &str) -> PathBuf { paths.corporate_prices_dir().join(lei) } @@ -230,48 +72,6 @@ pub async fn ensure_company_dirs(paths: &DataPaths, isin: &str) -> anyhow::Resul Ok(()) } -pub async fn save_available_exchanges( - paths: &DataPaths, - isin: &str, - exchanges: Vec -) -> anyhow::Result<()> { - let dir = get_company_dir(paths, isin); - fs::create_dir_all(&dir).await?; - let path = dir.join("available_exchanges.json"); - fs::write(&path, serde_json::to_string_pretty(&exchanges)?).await?; - Ok(()) -} - -pub async fn load_available_exchanges( - paths: &DataPaths, - lei: &str -) -> anyhow::Result> { - let path = get_company_dir(paths, lei).join("available_exchanges.json"); - if path.exists() { - let content = fs::read_to_string(&path).await?; - Ok(serde_json::from_str(&content)?) - } else { - Ok(vec![]) - } -} - -pub async fn save_prices_by_source( - paths: &DataPaths, - lei: &str, - source_ticker: &str, - timeframe: &str, - prices: Vec, -) -> anyhow::Result<()> { - let source_safe = source_ticker.replace(".", "_").replace("/", "_"); - let dir = get_company_dir(paths, lei).join(timeframe).join(&source_safe); - fs::create_dir_all(&dir).await?; - let path = dir.join("prices.json"); - let mut prices = prices; - prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); - fs::write(&path, serde_json::to_string_pretty(&prices)?).await?; - Ok(()) -} - /// Stream companies to JSONL incrementally pub async fn save_companies_to_jsonl_streaming( paths: &DataPaths, diff --git a/src/corporate/update.rs b/src/corporate/update.rs index a620d6a..b27201e 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,5 +1,5 @@ // src/corporate/update.rs -use super::{scraper::*, storage::*, openfigi::*}; +use super::{scraper::*, openfigi::*}; use crate::config::Config; use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel; use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data}; @@ -22,7 +22,7 @@ pub async fn run_full_update( pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result<()> { - logger::log_info("=== Corporate Update (STREAMING MODE WITH DATA INTEGRITY) ===").await; + logger::log_info("=== Corporate Update ===").await; let paths = DataPaths::new(".")?; @@ -42,7 +42,7 @@ pub async fn run_full_update( logger::log_warn("Shutdown detected after GLEIF download").await; return Ok(()); } - + if !shutdown_flag.load(Ordering::SeqCst) { logger::log_info("Step 2: Loading OpenFIGI metadata...").await; load_figi_type_lists().await.ok(); @@ -144,15 +144,7 @@ pub async fn run_full_update( } if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 11: Processing events (using index)...").await; - let _event_index = build_event_index(&paths).await?; - logger::log_info(" ✓ Event index built").await; - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } - - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 12: Collecting FX rates...").await; + logger::log_info("Step 11: Collecting FX rates...").await; let proxy_pool = pool.get_proxy_pool() .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must have proxy rotation"))?; @@ -166,14 +158,14 @@ pub async fn run_full_update( } if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 13: Collecting exchange information...").await; + logger::log_info("Step 12: Collecting exchange information...").await; let exchange_count = collect_and_save_exchanges(&paths).await?; logger::log_info(&format!(" ✓ Collected {} exchanges", exchange_count)).await; } else { logger::log_warn("Shutdown detected, skipping exchange collection").await; } - logger::log_info("✅ Corporate update complete").await; + logger::log_info("=== Corporate update complete === ").await; Ok(()) } diff --git a/src/corporate/update_companies.rs b/src/corporate/update_companies.rs index 5dd5e38..81c6384 100644 --- a/src/corporate/update_companies.rs +++ b/src/corporate/update_companies.rs @@ -1,5 +1,5 @@ // src/corporate/update_companies.rs -use super::{types::*, yahoo::*, helpers::*}; +use super::{types::*, yahoo_company_extraction::*, helpers::*}; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; diff --git a/src/corporate/yahoo.rs b/src/corporate/yahoo_company_extraction.rs similarity index 100% rename from src/corporate/yahoo.rs rename to src/corporate/yahoo_company_extraction.rs diff --git a/src/economic/update.rs b/src/economic/update.rs index 315dd21..6c09d56 100644 --- a/src/economic/update.rs +++ b/src/economic/update.rs @@ -2,11 +2,11 @@ use super::{scraper::*, storage::*, helpers::*, types::*}; use crate::{config::Config, scraper::webdriver::{ScrapeTask, ChromeDriverPool}, util::directories::DataPaths, util::logger}; use chrono::{Local}; -use std::sync::Arc; +use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; use std::collections::HashMap; /// Runs the full update for economic data using streaming to minimize memory usage -pub async fn run_full_update(config: &Config, pool: &Arc) -> anyhow::Result<()> { +pub async fn run_full_update(config: &Config, pool: &Arc, shutdown_flag: &Arc) -> anyhow::Result<()> { let paths = DataPaths::new(".")?; logger::log_info("Economic Update: Initializing...").await; @@ -14,17 +14,23 @@ pub async fn run_full_update(config: &Config, pool: &Arc) -> a let today_str = chrono::Local::now().date_naive().format("%Y-%m-%d").to_string(); let end_date = config.target_end_date(); + logger::log_info("=== Economic Update ===").await; + // Step 1: Build lightweight index instead of loading all events - logger::log_info("Economic Update: Building event index...").await; + logger::log_info("Step 1: Building event index...").await; let chunks = scan_existing_chunks(&paths).await?; let event_index = build_event_index(&chunks).await?; - - logger::log_info(&format!("Economic Update: Indexed {} events from {} chunks", + logger::log_info(&format!(" Economic Update: Indexed {} events from {} chunks", event_index.len(), chunks.len())).await; + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after GLEIF download").await; + return Ok(()); + } + // Step 2: Determine start date let start_date = if event_index.is_empty() { - logger::log_warn("Economic Update: No existing events found, starting from config date").await; + logger::log_warn("Step 2: No existing events found, starting from config date").await; config.economic_start_date.clone() } else { // Find the latest date in the index @@ -35,7 +41,7 @@ pub async fn run_full_update(config: &Config, pool: &Arc) -> a .unwrap_or(today_str.clone()); if max_date >= today_str { - logger::log_info("Economic Update: Events exist for today, starting from today").await; + logger::log_info(" Events exist for today, starting from today").await; today_str.clone() } else { let next = chrono::NaiveDate::parse_from_str(&max_date, "%Y-%m-%d") @@ -43,34 +49,46 @@ pub async fn run_full_update(config: &Config, pool: &Arc) -> a .and_then(|d| d.succ_opt()) .map(|d| d.format("%Y-%m-%d").to_string()) .unwrap_or(today_str.clone()); - logger::log_info(&format!("Economic Update: Resuming from: {}", next)).await; + logger::log_info(&format!(" Resuming from: {}", next)).await; next } }; - logger::log_info(&format!("Economic Update: Scraping events from {} → {}", start_date, end_date)).await; + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after GLEIF download").await; + return Ok(()); + } // Step 3: Scrape new events in batches + logger::log_info(&format!("Step 3: Scraping events from {} → {}", start_date, end_date)).await; let new_events = scrape_all_economic_events(&start_date, &end_date, pool).await?; - - logger::log_info(&format!("Economic Update: Scraped {} new events", new_events.len())).await; + logger::log_info(&format!(" Scraped {} new events", new_events.len())).await; + + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after GLEIF download").await; + return Ok(()); + } // Step 4: Process events in streaming fashion + logger::log_info(&format!("Step 4: Detecting changes")).await; let (changes, updated_events) = process_events_streaming(&chunks, &new_events, &today_str).await?; - - logger::log_info(&format!("Economic Update: Detected {} changes", changes.len())).await; - + logger::log_info(&format!(" Detected {} changes", changes.len())).await; if !changes.is_empty() { - logger::log_info(&format!("Economic Update: Saving {} changes to log", changes.len())).await; + logger::log_info(&format!(" Saving {} changes to log", changes.len())).await; save_changes(&paths, &changes).await?; - logger::log_info("Economic Update: Changes saved successfully").await; + logger::log_info(" Changes saved successfully").await; + } + + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after GLEIF download").await; + return Ok(()); } // Step 5: Save consolidated events - logger::log_info(&format!("Economic Update: Saving {} total events to chunks", updated_events.len())).await; + logger::log_info(&format!("Step 5: Saving {} total events to chunks", updated_events.len())).await; save_optimized_chunks(&paths, updated_events).await?; - - logger::log_info(&format!("✓ Economic update complete — {} changes detected", changes.len())).await; + logger::log_info(&format!(" ✓ Economic update complete — {} changes detected", changes.len())).await; + Ok(()) } diff --git a/src/economic/update_forex.rs b/src/economic/update_forex.rs index 54d746d..821e1b6 100644 --- a/src/economic/update_forex.rs +++ b/src/economic/update_forex.rs @@ -1,4 +1,4 @@ -// src/forex/update_rates.rs +// src/forex/update_forex.rs use crate::config::Config; use crate::util::directories::DataPaths; use crate::util::logger; diff --git a/src/main.rs b/src/main.rs index d97ba12..3966ae9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -229,9 +229,11 @@ async fn main() -> Result<()> { } // === Step 4: Run scraping jobs === - logger::log_info("--- Starting ECONOMIC data update ---").await; - economic::run_full_update(&config, &pool).await?; - logger::log_info("Economic update completed").await; + if !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info("--- Starting ECONOMIC data update ---").await; + economic::run_full_update(&config, &pool, &shutdown_flag).await?; + logger::log_info("Economic update completed").await; + } if !shutdown_flag.load(Ordering::SeqCst) { logger::log_info("--- Starting CORPORATE data update ---").await;