diff --git a/src/corporate/collect_exchanges.rs b/src/corporate/collect_exchanges.rs index d6290b8..7731761 100644 --- a/src/corporate/collect_exchanges.rs +++ b/src/corporate/collect_exchanges.rs @@ -1,5 +1,6 @@ // src/corporate/collect_exchanges.rs use crate::util::directories::DataPaths; +use crate::util::integrity::{DataStage, StateManager, file_reference}; use crate::util::logger; use crate::scraper::yahoo::ChartData; @@ -238,10 +239,28 @@ fn get_fallback_rate(currency: &str) -> f64 { /// - Extracts exchange data from core/data.jsonl /// - Groups companies by exchange /// - Sums up market caps for each exchange -/// - **NEW**: Converts all market caps to USD using FX rates +/// - Converts all market caps to USD using FX rates /// - Saves consolidated mapping to data/yahoo_exchanges.json /// - Handles missing or invalid data gracefully +/// - Integrity tracking with content hash validation pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result { + let state_path = paths.data_dir().join("state.jsonl"); + let manager = StateManager::new(&state_path, &paths.data_dir().to_path_buf()); + let step_name = "exchange_collection_complete"; + + let output_path = paths.data_dir().join("yahoo_exchanges.json"); + + if manager.is_step_valid(step_name).await? { + logger::log_info(" Exchange collection already completed and valid").await; + + // Load and count exchanges + if output_path.exists() { + let content = fs::read_to_string(&output_path).await?; + let exchanges: HashMap = serde_json::from_str(&content)?; + logger::log_info(&format!(" ✓ Found {} valid exchanges", exchanges.len())).await; + return Ok(exchanges.len()); + } + } logger::log_info("Collecting exchange information from company directories...").await; let corporate_dir = paths.corporate_dir(); @@ -353,13 +372,15 @@ pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result anyhow::Result anyhow::Result<()> { + // Create content reference for the output file + let content_reference = file_reference(output_path); + + // Track completion with: + // - Content reference: The yahoo_exchanges.json file + // - Data stage: Data (7-day TTL by default) + // - Dependencies: None (this is a collection step, not dependent on other tracked steps) + // Note: In practice, it depends on core data, but we track the output file + // which will change if core data changes, so explicit dependency not needed + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + vec![], // No explicit dependencies - output file serves as verification + None, // Use default TTL (7 days for Data stage) + ).await?; + + Ok(()) +} + /// Extract exchange information from a company's core data file async fn extract_exchange_info( core_data_path: &std::path::Path, diff --git a/src/corporate/update.rs b/src/corporate/update.rs index a339e93..4dcb8f5 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -7,7 +7,7 @@ use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_pr use crate::corporate::update_companies_enrich::enrich_companies_with_events; use crate::corporate::update_companies_enrich_option_chart::{enrich_companies_with_option, enrich_companies_with_chart}; use crate::corporate::collect_exchanges::collect_and_save_exchanges; -use crate::economic::update_forex::collect_fx_rates; +use crate::economic::yahoo_update_forex::collect_fx_rates; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; diff --git a/src/economic/mod.rs b/src/economic/mod.rs index 13a4bcb..f561d29 100644 --- a/src/economic/mod.rs +++ b/src/economic/mod.rs @@ -5,6 +5,6 @@ pub mod storage; pub mod helpers; pub mod update; -pub mod update_forex; +pub mod yahoo_update_forex; pub use update::run_full_update; \ No newline at end of file diff --git a/src/economic/update_forex.rs b/src/economic/yahoo_update_forex.rs similarity index 89% rename from src/economic/update_forex.rs rename to src/economic/yahoo_update_forex.rs index 821e1b6..862f638 100644 --- a/src/economic/update_forex.rs +++ b/src/economic/yahoo_update_forex.rs @@ -1,6 +1,7 @@ // src/forex/update_forex.rs use crate::config::Config; use crate::util::directories::DataPaths; +use crate::util::integrity::{DataStage, StateManager, directory_reference}; use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool, ChartData}; @@ -89,32 +90,33 @@ pub async fn collect_fx_rates( let data_path = paths.data_dir(); // File paths - let checkpoint_path = data_path.join("fx_rates_collected.jsonl"); + let output_path = data_path.join("economic").join("currency"); let log_path = data_path.join("fx_rates_updates.log"); let state_path = data_path.join("state.jsonl"); - // Check if already completed (check state file) - if state_path.exists() { - let state_content = tokio::fs::read_to_string(&state_path).await?; - - for line in state_content.lines() { - if line.trim().is_empty() { - continue; - } - - if let Ok(state) = serde_json::from_str::(line) { - if state.get("fx_rates_collection_complete").and_then(|v| v.as_bool()).unwrap_or(false) { - logger::log_info(" FX rates collection already completed").await; - - // Count collected currencies - let count = count_collected_currencies(paths).await?; - logger::log_info(&format!(" ✓ Found {} currencies with chart data", count)).await; - return Ok(count); - } - } - } + let manager = StateManager::new(&state_path, &data_path.to_path_buf()); + let step_name = "yahoo_fx_rate_collection_completed"; + let content_reference = directory_reference(&output_path, + Some(vec![ + "*/chart/*.jsonl".to_string(), // Main pattern for events data + "*/chart/data.jsonl".to_string(), // Specific pattern (more precise) + ]), + Some(vec![ + "*.log".to_string(), // Exclude log files + "*.tmp".to_string(), // Exclude temp files + "*.bak".to_string(), // Exclude backup files + ]), + ); + + if manager.is_step_valid(step_name).await? { + logger::log_info(" FX rates collection already completed").await; + let count = count_collected_currencies(paths).await?; + logger::log_info(&format!(" ✓ Found {} currencies with chart data", count)).await; + return Ok(count); } + logger::log_info(" Updating missing forex data...").await; + // === RECOVERY PHASE: Track collected currencies === let mut collected_currencies: HashSet = HashSet::new(); @@ -163,7 +165,13 @@ pub async fn collect_fx_rates( if pending_count == 0 { logger::log_info(" ✓ All currencies already collected").await; - mark_collection_complete(&state_path).await?; + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + vec!["yahoo_companies_cleansed".to_string()], // Dependency + None, // Use default TTL (7 days for Data stage) + ).await?; return Ok(collected_currencies.len()); } @@ -309,9 +317,14 @@ pub async fn collect_fx_rates( // Mark as complete if not shutdown if !shutdown_flag.load(Ordering::SeqCst) { - mark_collection_complete(&state_path).await?; - } - + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + vec!["yahoo_companies_cleansed".to_string()], // Dependency + None, // Use default TTL (7 days for Data stage) + ).await?; + } Ok(final_success) } @@ -463,28 +476,6 @@ async fn count_collected_currencies(paths: &DataPaths) -> anyhow::Result Ok(count) } -/// Mark collection as complete in state file -async fn mark_collection_complete(state_path: &std::path::Path) -> anyhow::Result<()> { - let collection_complete = json!({ - "fx_rates_collection_complete": true, - "completed_at": Utc::now().to_rfc3339(), - }); - - let mut state_file = OpenOptions::new() - .create(true) - .append(true) - .open(state_path) - .await?; - - let state_line = serde_json::to_string(&collection_complete)?; - state_file.write_all(state_line.as_bytes()).await?; - state_file.write_all(b"\n").await?; - state_file.flush().await?; - state_file.sync_all().await?; - - Ok(()) -} - /// Log command enum enum LogCommand { Write(serde_json::Value),