From 766eb803f17badcf887a50398d19e77c80ce3b9d Mon Sep 17 00:00:00 2001 From: donpat1to Date: Sat, 10 Jan 2026 17:40:16 +0100 Subject: [PATCH] added integrity check to enrichment functions --- Cargo.lock | 9 +- Cargo.toml | 8 +- src/corporate/mod.rs | 2 +- src/corporate/update.rs | 4 +- src/corporate/update_companies_enrich.rs | 84 +- ...> update_companies_enrich_option_chart.rs} | 156 ++-- src/scraper/yahoo.rs | 52 +- src/util/integrity.rs | 729 ++++++++++++++++++ src/util/mod.rs | 3 +- 9 files changed, 942 insertions(+), 105 deletions(-) rename src/corporate/{update_companies_enrich_options_chart.rs => update_companies_enrich_option_chart.rs} (82%) create mode 100644 src/util/integrity.rs diff --git a/Cargo.lock b/Cargo.lock index 12e15ec..02d1bc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2465,9 +2465,9 @@ checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" [[package]] name = "rustix" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ "bitflags", "errno", @@ -2974,9 +2974,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tempfile" -version = "3.23.0" +version = "3.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" +checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" dependencies = [ "fastrand", "getrandom 0.3.4", @@ -3632,6 +3632,7 @@ dependencies = [ "scraper", "serde", "serde_json", + "sha2", "tokio", "tokio-tungstenite 0.21.0", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 8ef5d50..eeb1559 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,4 +58,10 @@ rayon = "1.10" # optional: for parallel price downloads # Web server for dashboard axum = { version = "0.7", features = ["ws"] } -tokio-tungstenite = "0.21" # For WebSocket support \ No newline at end of file +tokio-tungstenite = "0.21" # For WebSocket support + +# tests +#tempfile = "3.24.0" + +# data integrity +sha2 = "0.10.9" \ No newline at end of file diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 35736c5..212bcc5 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -14,7 +14,7 @@ pub mod update; pub mod update_companies; pub mod update_companies_cleanse; pub mod update_companies_enrich; -pub mod update_companies_enrich_options_chart; +pub mod update_companies_enrich_option_chart; pub mod collect_exchanges; diff --git a/src/corporate/update.rs b/src/corporate/update.rs index e4f7b51..a339e93 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -5,7 +5,7 @@ use crate::check_shutdown; use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel; use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data}; use crate::corporate::update_companies_enrich::enrich_companies_with_events; -use crate::corporate::update_companies_enrich_options_chart::{enrich_companies_with_options, enrich_companies_with_chart}; +use crate::corporate::update_companies_enrich_option_chart::{enrich_companies_with_option, enrich_companies_with_chart}; use crate::corporate::collect_exchanges::collect_and_save_exchanges; use crate::economic::update_forex::collect_fx_rates; use crate::util::directories::DataPaths; @@ -107,7 +107,7 @@ pub async fn run_full_update( check_shutdown!(shutdown_flag); logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await; - let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + let options_count = enrich_companies_with_option(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; logger::log_info(&format!(" ✓ {} companies enriched with options data", options_count)).await; check_shutdown!(shutdown_flag); diff --git a/src/corporate/update_companies_enrich.rs b/src/corporate/update_companies_enrich.rs index 8d5783b..98e8778 100644 --- a/src/corporate/update_companies_enrich.rs +++ b/src/corporate/update_companies_enrich.rs @@ -1,8 +1,9 @@ -// src/corporate/update_companies_enrich_events.rs +// src/corporate/update_companies_enrich_events.rs - WITH INTEGRITY MODULE use super::{types::*, helpers::*}; use crate::config::Config; use crate::corporate::checkpoint_helpers; use crate::util::directories::DataPaths; +use crate::util::integrity::{StateManager, directory_reference, DataStage}; use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; @@ -25,7 +26,7 @@ use tokio::sync::mpsc; /// - Crash-safe persistence (checkpoint + log with fsync) /// - Smart skip logic (only process incomplete data) /// - Uses pending queue instead of retry mechanism -/// - Reuses companies_update.log for persistence +/// - Content integrity validation with hash tracking /// /// # Persistence Strategy /// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) @@ -33,6 +34,7 @@ use tokio::sync::mpsc; /// - On restart: Load checkpoint + replay log /// - Periodic checkpoints (every 50 companies) /// - Batched fsync (every 10 writes or 10 seconds) +/// - Hash validation of all event data directories pub async fn enrich_companies_with_events( paths: &DataPaths, _config: &Config, @@ -43,7 +45,7 @@ pub async fn enrich_companies_with_events( const CHECKPOINT_INTERVAL: usize = 50; const FSYNC_BATCH_SIZE: usize = 10; const FSYNC_INTERVAL_SECS: u64 = 10; - const CONCURRENCY_LIMIT: usize = 50; // Limit parallel enrichment tasks + const CONCURRENCY_LIMIT: usize = 50; let data_path = paths.data_dir(); @@ -57,29 +59,21 @@ pub async fn enrich_companies_with_events( logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping event enrichment").await; return Ok(0); } - - // Check if already completed - if state_path.exists() { - let state_content = tokio::fs::read_to_string(&state_path).await?; + + let manager = StateManager::new(&state_path, &data_path.to_path_buf()); + let step_name = "yahoo_events_enrichment_complete"; + + if manager.is_step_valid(step_name).await? { + logger::log_info(" Yahoo events enrichment already completed and valid").await; - for line in state_content.lines() { - if line.trim().is_empty() { - continue; - } - - if let Ok(state) = serde_json::from_str::(line) { - if state.get("yahoo_events_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) { - logger::log_info(" Yahoo events enrichment already completed").await; - - // Count enriched companies - let count = checkpoint_helpers::count_enriched_companies(paths, "events").await?; - logger::log_info(&format!(" ✓ Found {} companies with event data", count)).await; - return Ok(count); - } - } - } + // Count enriched companies + let count = checkpoint_helpers::count_enriched_companies(paths, "events").await?; + logger::log_info(&format!(" ✓ Found {} companies with valid event data", count)).await; + return Ok(count); } + logger::log_info(" Event data needs refresh - starting enrichment").await; + // === RECOVERY PHASE: Track enriched companies === let enriched_companies: HashSet = checkpoint_helpers::load_enrichment_progress(&log_path).await?; @@ -104,7 +98,9 @@ pub async fn enrich_companies_with_events( if pending_count == 0 { logger::log_info(" ✓ All companies already enriched").await; - checkpoint_helpers::mark_step_complete(&state_path, "yahoo_events_enrichment_complete").await?; + + track_events_completion(&manager, paths, step_name).await?; + return Ok(enriched_companies.len()); } @@ -263,13 +259,49 @@ pub async fn enrich_companies_with_events( // Mark as complete if all companies processed if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) { - checkpoint_helpers::mark_step_complete(&state_path, "yahoo_events_enrichment_complete").await?; - logger::log_info(" ✓ Event enrichment marked as complete").await; + track_events_completion(&manager, paths, step_name).await?; + logger::log_info(" ✓ Event enrichment marked as complete with integrity tracking").await; } Ok(final_success) } +/// Track event enrichment completion with content hash verification +async fn track_events_completion( + manager: &StateManager, + paths: &DataPaths, + step_name: &str, +) -> anyhow::Result<()> { + // Create content reference for all event data + // This will hash ALL files matching the pattern: {company}/events/data.jsonl + let content_reference = directory_reference( + paths.corporate_dir(), + Some(vec![ + "*/events/*.jsonl".to_string(), // Main pattern for events data + "*/events/data.jsonl".to_string(), // Specific pattern (more precise) + ]), + Some(vec![ + "*.log".to_string(), // Exclude log files + "*.tmp".to_string(), // Exclude temp files + "*.bak".to_string(), // Exclude backup files + ]), + ); + + // Track completion with: + // - Content reference: All event directories + // - Data stage: Data (7-day TTL by default) + // - Dependencies: Depends on cleaned companies data + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + vec!["yahoo_companies_cleansed".to_string()], // Dependency + None, // Use default TTL (7 days for Data stage) + ).await?; + + Ok(()) +} + /// Spawn a single enrichment task with panic isolation fn spawn_enrichment_task( company: CompanyCrossPlatformInfo, diff --git a/src/corporate/update_companies_enrich_options_chart.rs b/src/corporate/update_companies_enrich_option_chart.rs similarity index 82% rename from src/corporate/update_companies_enrich_options_chart.rs rename to src/corporate/update_companies_enrich_option_chart.rs index 29f3e00..ed20abf 100644 --- a/src/corporate/update_companies_enrich_options_chart.rs +++ b/src/corporate/update_companies_enrich_option_chart.rs @@ -1,8 +1,9 @@ -// src/corporate/update_companies_enrich_options_chart.rs +// src/corporate/update_companies_enrich_option_chart.rs use super::{types::*, helpers::*}; use crate::config::Config; use crate::corporate::checkpoint_helpers; use crate::util::directories::DataPaths; +use crate::util::integrity::{DataStage, StateManager, directory_reference}; use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool}; @@ -17,7 +18,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use serde_json::json; use tokio::sync::mpsc; -/// Yahoo Options enrichment per corporate company +/// Yahoo Option enrichment per corporate company /// /// # Features /// - Graceful shutdown (abort-safe) @@ -25,14 +26,16 @@ use tokio::sync::mpsc; /// - Crash-safe persistence (checkpoint + log with fsync) /// - Smart skip logic (only process incomplete data) /// - Uses pending queue instead of retry mechanism +/// - Content integrity validation with hash tracking /// /// # Persistence Strategy /// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) -/// - Log: companies_options_updates.log (append-only updates) +/// - Log: companies_option_updates.log (append-only updates) /// - On restart: Load checkpoint + replay log /// - Periodic checkpoints (every 50 companies) /// - Batched fsync (every 10 writes or 10 seconds) -pub async fn enrich_companies_with_options( +/// - Hash validation of all option data directories +pub async fn enrich_companies_with_option( paths: &DataPaths, _config: &Config, yahoo_pool: Arc, @@ -48,37 +51,29 @@ pub async fn enrich_companies_with_options( // File paths let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); - let log_path = data_path.join("companies_options_updates.log"); + let log_path = data_path.join("companies_option_updates.log"); let state_path = data_path.join("state.jsonl"); // Check input exists if !input_path.exists() { - logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping options enrichment").await; + logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping option enrichment").await; return Ok(0); } - // Check if already completed - if state_path.exists() { - let state_content = tokio::fs::read_to_string(&state_path).await?; + let manager = StateManager::new(&state_path, &data_path.to_path_buf()); + let step_name = "yahoo_option_enrichment_complete"; + + if manager.is_step_valid(step_name).await? { + logger::log_info(" Yahoo option enrichment already completed and valid").await; - for line in state_content.lines() { - if line.trim().is_empty() { - continue; - } - - if let Ok(state) = serde_json::from_str::(line) { - if state.get("yahoo_options_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) { - logger::log_info(" Yahoo options enrichment already completed").await; - - // Count enriched companies - let count = checkpoint_helpers::count_enriched_companies(paths, "options").await?; - logger::log_info(&format!(" ✓ Found {} companies with options data", count)).await; - return Ok(count); - } - } - } + // Count enriched companies + let count = checkpoint_helpers::count_enriched_companies(paths, "option").await?; + logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await; + return Ok(count); } + logger::log_info(" Option data needs refresh - starting enrichment").await; + // === RECOVERY PHASE: Track enriched companies === let enriched_companies = checkpoint_helpers::load_enrichment_progress(&log_path).await?; @@ -102,12 +97,12 @@ pub async fn enrich_companies_with_options( )).await; if pending_count == 0 { - logger::log_info(" ✓ All companies already enriched with options data").await; - checkpoint_helpers::mark_step_complete(&state_path, "yahoo_options_enrichment_complete").await?; + logger::log_info(" ✓ All companies already enriched").await; + track_option_completion(&manager, paths, step_name).await?; return Ok(enriched_companies.len()); } - // === PROCESSING PHASE: Enrich companies with options === + // === PROCESSING PHASE: Enrich companies with option === // Shared counters let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len())); @@ -195,7 +190,7 @@ pub async fn enrich_companies_with_options( log_tx.clone(), Arc::clone(&semaphore), Arc::clone(shutdown_flag), - EnrichmentType::Options, + EnrichmentType::Option, ); tasks.push(task); } @@ -206,7 +201,7 @@ pub async fn enrich_companies_with_options( while let Some(_result) = tasks.next().await { // Check for shutdown if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown signal received, stopping options enrichment").await; + logger::log_warn("Shutdown signal received, stopping option enrichment").await; break; } @@ -228,7 +223,7 @@ pub async fn enrich_companies_with_options( log_tx.clone(), Arc::clone(&semaphore), Arc::clone(shutdown_flag), - EnrichmentType::Options, + EnrichmentType::Option, ); tasks.push(task); } @@ -245,13 +240,14 @@ pub async fn enrich_companies_with_options( let final_failed = failed_count.load(Ordering::SeqCst); logger::log_info(&format!( - " Options enrichment: {} succeeded, {} failed", + " Option enrichment: {} succeeded, {} failed", final_success, final_failed )).await; // Mark as complete if no shutdown if !shutdown_flag.load(Ordering::SeqCst) { - checkpoint_helpers::mark_step_complete(&state_path, "yahoo_options_enrichment_complete").await?; + track_option_completion(&manager, paths, step_name).await?; + logger::log_info(" ✓ Option enrichment marked as complete with integrity tracking").await; } Ok(final_success) @@ -525,7 +521,7 @@ pub async fn enrich_companies_with_chart( /// Type of enrichment being performed #[derive(Clone, Copy)] enum EnrichmentType { - Options, + Option, Chart, } @@ -553,8 +549,8 @@ fn spawn_enrichment_task( // Perform enrichment (panic-isolated) let result = match enrichment_type { - EnrichmentType::Options => { - enrich_company_with_options(&company, &yahoo_pool, &paths).await + EnrichmentType::Option => { + enrich_company_with_option(&company, &yahoo_pool, &paths).await } EnrichmentType::Chart => { enrich_company_with_chart(&company, &yahoo_pool, &paths).await @@ -590,8 +586,8 @@ fn spawn_enrichment_task( }) } -/// Enrich a single company with options data -async fn enrich_company_with_options( +/// Enrich a single company with option data +async fn enrich_company_with_option( company: &CompanyCrossPlatformInfo, yahoo_pool: &Arc, paths: &DataPaths, @@ -603,16 +599,16 @@ async fn enrich_company_with_options( } }; - // Get options data for all available expiration dates - let options_data = yahoo_pool.get_options_data(&ticker, None).await?; + // Get option data for all available expiration dates + let option_data = yahoo_pool.get_option_data(&ticker, None).await?; // Only save if we got meaningful data - if options_data.options.is_empty() { - return Err(anyhow::anyhow!("No options data available")); + if option_data.option.is_empty() { + return Err(anyhow::anyhow!("No option data available")); } - // Save the options data - save_company_data(paths, &company.name, &options_data, "options").await?; + // Save the option data + save_company_data(paths, &company.name, &option_data, "option").await?; Ok(()) } @@ -681,4 +677,76 @@ enum LogCommand { Write(serde_json::Value), Checkpoint, Shutdown, +} + +/// Track option enrichment completion with content hash verification +async fn track_option_completion( + manager: &StateManager, + paths: &DataPaths, + step_name: &str, +) -> anyhow::Result<()> { + // Create content reference for all option data + // This will hash ALL files matching the pattern: {company}/option/data.jsonl + let content_reference = directory_reference( + paths.corporate_dir(), + Some(vec![ + "*/option/*.jsonl".to_string(), // Main pattern for option data + "*/option/data.jsonl".to_string(), // Specific pattern (more precise) + ]), + Some(vec![ + "*.log".to_string(), // Exclude log files + "*.tmp".to_string(), // Exclude temp files + "*.bak".to_string(), // Exclude backup files + ]), + ); + + // Track completion with: + // - Content reference: All option directories + // - Data stage: Data (7-day TTL by default) + // - Dependencies: Depends on cleaned companies data + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + vec!["yahoo_companies_cleansed".to_string()], // Dependency + None, // Use default TTL (7 days for Data stage) + ).await?; + + Ok(()) +} + +/// Track chart enrichment completion with content hash verification +async fn track_chart_completion( + manager: &StateManager, + paths: &DataPaths, + step_name: &str, +) -> anyhow::Result<()> { + // Create content reference for all chart data + // This will hash ALL files matching the pattern: {company}/chart/data.jsonl + let content_reference = directory_reference( + paths.corporate_dir(), + Some(vec![ + "*/chart/*.jsonl".to_string(), // Main pattern for chart data + "*/chart/data.jsonl".to_string(), // Specific pattern (more precise) + ]), + Some(vec![ + "*.log".to_string(), // Exclude log files + "*.tmp".to_string(), // Exclude temp files + "*.bak".to_string(), // Exclude backup files + ]), + ); + + // Track completion with: + // - Content reference: All chart directories + // - Data stage: Data (7-day TTL by default) + // - Dependencies: Depends on cleaned companies data + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + vec!["yahoo_companies_cleansed".to_string()], // Dependency + None, // Use default TTL (7 days for Data stage) + ).await?; + + Ok(()) } \ No newline at end of file diff --git a/src/scraper/yahoo.rs b/src/scraper/yahoo.rs index 4a95465..5e77a6d 100644 --- a/src/scraper/yahoo.rs +++ b/src/scraper/yahoo.rs @@ -18,7 +18,7 @@ use std::result::Result::Ok; // Yahoo API Endpoints const YAHOO_QUOTE_SUMMARY: &str = "https://query2.finance.yahoo.com/v10/finance/quoteSummary/"; const YAHOO_CHART_DATA: &str = "https://query2.finance.yahoo.com/v8/finance/chart/"; -const YAHOO_OPTIONS_DATA: &str = "https://query1.finance.yahoo.com/v7/finance/options/"; +const YAHOO_OPTION_DATA: &str = "https://query1.finance.yahoo.com/v7/finance/options/"; const YAHOO_SEARCH: &str = "https://query2.finance.yahoo.com/v1/finance/search"; const _YAHOO_HOMEPAGE: &str = "https://finance.yahoo.com"; const YAHOO_CRUMB_URL: &str = "https://query2.finance.yahoo.com/v1/test/getcrumb"; @@ -210,11 +210,11 @@ pub struct Quote { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct OptionsData { +pub struct OptionData { pub symbol: String, pub expiration_dates: Vec, pub strikes: Vec, - pub options: Vec, + pub option: Vec, pub timestamp: i64, } @@ -873,41 +873,41 @@ impl YahooClient { }) } - /// Get options data for a symbol - pub async fn get_options_data( + /// Get option data for a symbol + pub async fn get_option_data( &self, symbol: &str, date: Option, - ) -> Result { + ) -> Result { self.increment_request_count().await; // Try request with current crumb - let result = self.get_options_data_internal(symbol, date).await; + let result = self.get_option_data_internal(symbol, date).await; // If unauthorized, refresh crumb and retry once if let Err(ref e) = result { let error_msg = e.to_string(); if error_msg.contains("401") || error_msg.contains("Unauthorized") { self.handle_unauthorized_error(&error_msg).await?; - return self.get_options_data_internal(symbol, date).await; + return self.get_option_data_internal(symbol, date).await; } } result } - /// Internal method to fetch options data with crumb - async fn get_options_data_internal( + /// Internal method to fetch option data with crumb + async fn get_option_data_internal( &self, symbol: &str, date: Option, - ) -> Result { + ) -> Result { let crumb = self.get_crumb().await?; let url = if let Some(d) = date { format!( "{}{}?date={}&crumb={}", - YAHOO_OPTIONS_DATA, + YAHOO_OPTION_DATA, symbol, d, urlencoding::encode(&crumb) @@ -915,7 +915,7 @@ impl YahooClient { } else { format!( "{}{}?crumb={}", - YAHOO_OPTIONS_DATA, + YAHOO_OPTION_DATA, symbol, urlencoding::encode(&crumb) ) @@ -935,27 +935,27 @@ impl YahooClient { let json: Value = response.json().await?; - // Parse options data + // Parse option data let result = &json["optionChain"]["result"]; if result.is_null() || !result.is_array() || result.as_array().unwrap().is_empty() { - return Err(anyhow!("No options data for symbol: {}", symbol)); + return Err(anyhow!("No option data for symbol: {}", symbol)); } - let options = &result[0]; - let expiration_dates = options["expirationDates"] + let option = &result[0]; + let expiration_dates = option["expirationDates"] .as_array() .map(|arr| arr.iter().filter_map(|v| v.as_i64()).collect()) .unwrap_or_default(); - let strikes = options["strikes"] + let strikes = option["strikes"] .as_array() .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect()) .unwrap_or_default(); let mut option_chains = Vec::new(); - if let Some(options_array) = options["options"].as_array() { - for opt in options_array { + if let Some(option_array) = option["options"].as_array() { + for opt in option_array { let exp_date = opt["expirationDate"].as_i64().unwrap_or(0); let calls = Self::parse_option_contracts(&opt["calls"]); @@ -969,11 +969,11 @@ impl YahooClient { } } - Ok(OptionsData { + Ok(OptionData { symbol: symbol.to_string(), expiration_dates, strikes, - options: option_chains, + option: option_chains, timestamp: chrono::Utc::now().timestamp(), }) } @@ -1471,16 +1471,16 @@ impl YahooClientPool { }).await } - /// Get options data for a symbol - pub async fn get_options_data( + /// Get option data for a symbol + pub async fn get_option_data( &self, symbol: &str, date: Option, - ) -> Result { + ) -> Result { let symbol = symbol.to_string(); self.execute(move |client| async move { - client.get_options_data(&symbol, date).await + client.get_option_data(&symbol, date).await }).await } diff --git a/src/util/integrity.rs b/src/util/integrity.rs new file mode 100644 index 0000000..b1441b8 --- /dev/null +++ b/src/util/integrity.rs @@ -0,0 +1,729 @@ +// src/util/integrity.rs +//! Content integrity and state lifecycle management module +//! +//! Features: +//! - File and directory hashing (SHA-256) +//! - Hash validation against content references +//! - State invalidation based on time or validation failures +//! - 3-stage data lifecycle: cache → data → storage +//! - Inline vs. external hash storage based on size +//! - Cascade invalidation when dependencies fail validation + +use anyhow::{Context, Result}; +use chrono::{DateTime, Duration, Utc}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::collections::{HashMap, HashSet}; +use std::fs; +use std::io::{BufReader, Read}; +use std::path::{Path, PathBuf}; +use tokio::fs as async_fs; +use tokio::io::AsyncWriteExt; + +// ============================================================================ +// CONSTANTS & CONFIGURATION +// ============================================================================ + +/// Maximum hash size (in bytes) to store inline in state.jsonl +/// Hashes larger than this will be stored in separate files +const INLINE_HASH_THRESHOLD: usize = 1024; + +/// Directory for storing external hash files +const HASH_STORAGE_DIR: &str = ".integrity_hashes"; + +/// File extension for external hash files +const HASH_FILE_EXT: &str = ".hash"; + +// ============================================================================ +// DATA STRUCTURES +// ============================================================================ + +/// Represents a content reference that can be hashed +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum ContentReference { + /// Single file reference + File { path: PathBuf }, + + /// Directory reference (includes all files recursively) + Directory { + path: PathBuf, + /// Optional: specific files/patterns to include + include_patterns: Option>, + /// Optional: files/patterns to exclude + exclude_patterns: Option>, + }, + + /// Multiple files/directories combined + Composite { + references: Vec, + }, +} + +/// Storage location for hash data +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "storage", rename_all = "lowercase")] +pub enum HashStorage { + /// Hash stored directly in state.jsonl + Inline { hash: String }, + + /// Hash stored in external file + External { hash_file: PathBuf }, +} + +/// Data lifecycle stage +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "lowercase")] +pub enum DataStage { + /// Temporary/staged data (fast-changing, short-lived) + Cache, + + /// Processed data (intermediate results, medium-lived) + Data, + + /// Final storage (long-term, stable data) + Storage, +} + +impl DataStage { + /// Get default TTL (time-to-live) for this stage + pub fn default_ttl(&self) -> Duration { + match self { + DataStage::Cache => Duration::hours(24), // 1 day + DataStage::Data => Duration::days(7), // 1 week + DataStage::Storage => Duration::days(365), // 1 year + } + } + + /// Get suggested revalidation interval for this stage + pub fn revalidation_interval(&self) -> Duration { + match self { + DataStage::Cache => Duration::hours(6), // Every 6 hours + DataStage::Data => Duration::days(1), // Daily + DataStage::Storage => Duration::days(30), // Monthly + } + } +} + +/// Enhanced state entry with content integrity tracking +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateEntry { + /// Step/function name + pub step_name: String, + + /// Whether this step is completed + pub completed: bool, + + /// Completion timestamp + pub completed_at: Option>, + + /// Content reference for validation + pub content_reference: Option, + + /// Hash of the content + pub content_hash: Option, + + /// Data lifecycle stage + pub data_stage: Option, + + /// Custom TTL override (if None, uses stage default) + pub ttl_override: Option, + + /// Last validation timestamp + pub last_validated_at: Option>, + + /// Validation status + pub validation_status: ValidationStatus, + + /// Dependencies (other steps that must be valid for this to remain valid) + pub dependencies: Vec, +} + +/// Validation status of a state entry +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum ValidationStatus { + /// Not yet validated + Unknown, + + /// Validated and content matches hash + Valid, + + /// Validation failed (hash mismatch or content missing) + Invalid { reason: String }, + + /// Expired (beyond TTL) + Expired, + + /// Invalidated due to dependency failure + DependencyFailed { failed_dependency: String }, +} + +// ============================================================================ +// HASH COMPUTATION +// ============================================================================ + +/// Hash a single file using SHA-256 +pub fn hash_file>(path: P) -> Result { + let path = path.as_ref(); + let file = fs::File::open(path) + .with_context(|| format!("Failed to open file: {}", path.display()))?; + + let mut reader = BufReader::new(file); + let mut hasher = Sha256::new(); + let mut buffer = [0u8; 8192]; + + loop { + let bytes_read = reader.read(&mut buffer)?; + if bytes_read == 0 { + break; + } + hasher.update(&buffer[..bytes_read]); + } + + Ok(format!("{:x}", hasher.finalize())) +} + +/// Hash a directory recursively +/// Returns a combined hash of all files in sorted order +pub fn hash_directory>( + path: P, + include_patterns: Option<&[String]>, + exclude_patterns: Option<&[String]>, +) -> Result { + let path = path.as_ref(); + + if !path.is_dir() { + anyhow::bail!("Path is not a directory: {}", path.display()); + } + + // Collect all files recursively + let mut files = Vec::new(); + collect_files_recursive(path, &mut files, include_patterns, exclude_patterns)?; + + // Sort for deterministic hashing + files.sort(); + + if files.is_empty() { + return Ok(String::from("d41d8cd98f00b204e9800998ecf8427e")); // MD5 of empty string + } + + // Hash all files and combine + let mut combined_hasher = Sha256::new(); + + for file_path in files { + // Include relative path in hash for structure awareness + let rel_path = file_path.strip_prefix(path) + .unwrap_or(&file_path) + .to_string_lossy(); + combined_hasher.update(rel_path.as_bytes()); + + // Hash file content + let file_hash = hash_file(&file_path)?; + combined_hasher.update(file_hash.as_bytes()); + } + + Ok(format!("{:x}", combined_hasher.finalize())) +} + +/// Collect files recursively with pattern filtering +fn collect_files_recursive( + dir: &Path, + files: &mut Vec, + include_patterns: Option<&[String]>, + exclude_patterns: Option<&[String]>, +) -> Result<()> { + if !dir.is_dir() { + return Ok(()); + } + + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + + // Skip hidden files and directories + if let Some(name) = path.file_name() { + if name.to_string_lossy().starts_with('.') { + continue; + } + } + + if path.is_dir() { + collect_files_recursive(&path, files, include_patterns, exclude_patterns)?; + } else if path.is_file() { + // Apply pattern filters + if should_include_file(&path, include_patterns, exclude_patterns) { + files.push(path); + } + } + } + + Ok(()) +} + +/// Check if a file should be included based on patterns +fn should_include_file( + path: &Path, + include_patterns: Option<&[String]>, + exclude_patterns: Option<&[String]>, +) -> bool { + let path_str = path.to_string_lossy(); + + // Check exclude patterns first + if let Some(excludes) = exclude_patterns { + for pattern in excludes { + if path_str.contains(pattern) || matches_glob(path, pattern) { + return false; + } + } + } + + // Check include patterns + if let Some(includes) = include_patterns { + for pattern in includes { + if path_str.contains(pattern) || matches_glob(path, pattern) { + return true; + } + } + return false; // If includes specified but no match + } + + true // Include by default +} + +/// Simple glob pattern matching (supports * and ?) +fn matches_glob(path: &Path, pattern: &str) -> bool { + let path_str = path.to_string_lossy(); + + // Convert glob to regex + let regex_pattern = pattern + .replace(".", "\\.") + .replace("*", ".*") + .replace("?", "."); + + if let Ok(re) = regex::Regex::new(&format!("^{}$", regex_pattern)) { + re.is_match(&path_str) + } else { + false + } +} + +/// Hash a content reference +pub fn hash_content_reference(reference: &ContentReference) -> Result { + match reference { + ContentReference::File { path } => { + hash_file(path) + } + ContentReference::Directory { path, include_patterns, exclude_patterns } => { + hash_directory( + path, + include_patterns.as_deref(), + exclude_patterns.as_deref(), + ) + } + ContentReference::Composite { references } => { + let mut combined_hasher = Sha256::new(); + + for reference in references { + let hash = hash_content_reference(reference)?; + combined_hasher.update(hash.as_bytes()); + } + + Ok(format!("{:x}", combined_hasher.finalize())) + } + } +} + +// ============================================================================ +// HASH STORAGE MANAGEMENT +// ============================================================================ + +/// Determine storage method based on hash size +pub fn determine_hash_storage(hash: &str, base_dir: &Path) -> HashStorage { + if hash.len() <= INLINE_HASH_THRESHOLD { + HashStorage::Inline { + hash: hash.to_string(), + } + } else { + let hash_id = Sha256::digest(hash.as_bytes()); + let hash_filename = format!("{:x}{}", hash_id, HASH_FILE_EXT); + + HashStorage::External { + hash_file: base_dir + .join(HASH_STORAGE_DIR) + .join(hash_filename), + } + } +} + +/// Store hash externally if needed +pub async fn store_hash( + hash: &str, + storage: &HashStorage, +) -> Result<()> { + match storage { + HashStorage::Inline { .. } => { + // Nothing to do, hash is inline + Ok(()) + } + HashStorage::External { hash_file } => { + // Create directory if needed + if let Some(parent) = hash_file.parent() { + async_fs::create_dir_all(parent).await?; + } + + // Write hash to file + let mut file = async_fs::File::create(hash_file).await?; + file.write_all(hash.as_bytes()).await?; + file.flush().await?; + + Ok(()) + } + } +} + +/// Retrieve hash from storage +pub async fn retrieve_hash(storage: &HashStorage) -> Result { + match storage { + HashStorage::Inline { hash } => { + Ok(hash.clone()) + } + HashStorage::External { hash_file } => { + async_fs::read_to_string(hash_file) + .await + .with_context(|| format!("Failed to read hash file: {}", hash_file.display())) + } + } +} + +// ============================================================================ +// VALIDATION +// ============================================================================ + +/// Validate a state entry's content against its hash +pub async fn validate_entry(entry: &StateEntry) -> Result { + // Check if completed + if !entry.completed { + return Ok(ValidationStatus::Unknown); + } + + // Check TTL expiration + if let Some(completed_at) = entry.completed_at { + let ttl = entry.ttl_override + .or_else(|| entry.data_stage.map(|s| s.default_ttl())) + .unwrap_or_else(|| Duration::days(7)); + + let expiration = completed_at + ttl; + if Utc::now() > expiration { + return Ok(ValidationStatus::Expired); + } + } + + // Validate content hash if available + if let (Some(reference), Some(storage)) = (&entry.content_reference, &entry.content_hash) { + // Compute current hash + let current_hash = match hash_content_reference(reference) { + Ok(hash) => hash, + Err(e) => { + return Ok(ValidationStatus::Invalid { + reason: format!("Failed to compute hash: {}", e), + }); + } + }; + + // Retrieve stored hash + let stored_hash = match retrieve_hash(storage).await { + Ok(hash) => hash, + Err(e) => { + return Ok(ValidationStatus::Invalid { + reason: format!("Failed to retrieve stored hash: {}", e), + }); + } + }; + + // Compare hashes + if current_hash != stored_hash { + return Ok(ValidationStatus::Invalid { + reason: "Hash mismatch".to_string(), + }); + } + } + + Ok(ValidationStatus::Valid) +} + +/// Validate all state entries and handle cascade invalidation +pub async fn validate_all_entries( + entries: &mut HashMap, +) -> Result { + let mut report = ValidationReport::default(); + + // First pass: validate each entry independently + for (name, entry) in entries.iter_mut() { + let status = validate_entry(entry).await?; + entry.validation_status = status.clone(); + entry.last_validated_at = Some(Utc::now()); + + match status { + ValidationStatus::Valid => report.valid_count += 1, + ValidationStatus::Invalid { .. } => { + report.invalid_count += 1; + report.invalid_entries.push(name.clone()); + } + ValidationStatus::Expired => { + report.expired_count += 1; + report.expired_entries.push(name.clone()); + } + ValidationStatus::Unknown => report.unknown_count += 1, + ValidationStatus::DependencyFailed { .. } => {} + } + } + + // Second pass: cascade invalidation based on dependencies + let mut invalidated = HashSet::new(); + for name in &report.invalid_entries { + invalidated.insert(name.clone()); + } + + loop { + let mut newly_invalidated = Vec::new(); + + for (name, entry) in entries.iter() { + if invalidated.contains(name) { + continue; + } + + // Check if any dependency is invalidated + for dep in &entry.dependencies { + if invalidated.contains(dep) { + newly_invalidated.push((name.clone(), dep.clone())); + break; + } + } + } + + if newly_invalidated.is_empty() { + break; + } + + for (name, failed_dep) in newly_invalidated { + invalidated.insert(name.clone()); + report.cascaded_invalidations.push(name.clone()); + + if let Some(entry) = entries.get_mut(&name) { + entry.validation_status = ValidationStatus::DependencyFailed { + failed_dependency: failed_dep, + }; + } + } + } + + Ok(report) +} + +/// Validation report +#[derive(Debug, Default)] +pub struct ValidationReport { + pub valid_count: usize, + pub invalid_count: usize, + pub expired_count: usize, + pub unknown_count: usize, + pub invalid_entries: Vec, + pub expired_entries: Vec, + pub cascaded_invalidations: Vec, +} + +impl ValidationReport { + pub fn print_summary(&self) { + println!("=== Validation Report ==="); + println!("Valid: {}", self.valid_count); + println!("Invalid: {}", self.invalid_count); + println!("Expired: {}", self.expired_count); + println!("Unknown: {}", self.unknown_count); + + if !self.invalid_entries.is_empty() { + println!("\nInvalid entries:"); + for entry in &self.invalid_entries { + println!(" - {}", entry); + } + } + + if !self.expired_entries.is_empty() { + println!("\nExpired entries:"); + for entry in &self.expired_entries { + println!(" - {}", entry); + } + } + + if !self.cascaded_invalidations.is_empty() { + println!("\nCascaded invalidations:"); + for entry in &self.cascaded_invalidations { + println!(" - {}", entry); + } + } + } +} + +// ============================================================================ +// STATE MANAGEMENT +// ============================================================================ + +/// State manager for reading/writing state entries +pub struct StateManager { + state_path: PathBuf, + base_dir: PathBuf, +} + +impl StateManager { + pub fn new>(state_path: P, base_dir: P) -> Self { + Self { + state_path: state_path.as_ref().to_path_buf(), + base_dir: base_dir.as_ref().to_path_buf(), + } + } + + /// Load all state entries from state.jsonl + pub async fn load_entries(&self) -> Result> { + let mut entries = HashMap::new(); + + if !self.state_path.exists() { + return Ok(entries); + } + + let content = async_fs::read_to_string(&self.state_path).await?; + + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + + if let Ok(entry) = serde_json::from_str::(line) { + entries.insert(entry.step_name.clone(), entry); + } + } + + Ok(entries) + } + + /// Save all state entries to state.jsonl + pub async fn save_entries(&self, entries: &HashMap) -> Result<()> { + let mut lines = Vec::new(); + + for entry in entries.values() { + let json = serde_json::to_string(entry)?; + lines.push(json); + } + + let content = lines.join("\n") + "\n"; + async_fs::write(&self.state_path, content).await?; + + Ok(()) + } + + /// Create or update a state entry with integrity tracking + pub async fn update_entry( + &self, + step_name: String, + content_reference: ContentReference, + data_stage: DataStage, + dependencies: Vec, + ttl_override: Option, + ) -> Result { + // Compute hash + let hash = hash_content_reference(&content_reference)?; + + // Determine storage + let storage = determine_hash_storage(&hash, &self.base_dir); + + // Store hash if external + store_hash(&hash, &storage).await?; + + // Create entry + let entry = StateEntry { + step_name: step_name.clone(), + completed: true, + completed_at: Some(Utc::now()), + content_reference: Some(content_reference), + content_hash: Some(storage), + data_stage: Some(data_stage), + ttl_override, + last_validated_at: Some(Utc::now()), + validation_status: ValidationStatus::Valid, + dependencies, + }; + + // Load existing entries + let mut entries = self.load_entries().await?; + + // Update entry + entries.insert(step_name, entry.clone()); + + // Save + self.save_entries(&entries).await?; + + Ok(entry) + } + + /// Check if a step is valid and completed + pub async fn is_step_valid(&self, step_name: &str) -> Result { + let entries = self.load_entries().await?; + + if let Some(entry) = entries.get(step_name) { + let status = validate_entry(entry).await?; + Ok(matches!(status, ValidationStatus::Valid)) + } else { + Ok(false) + } + } + + /// Invalidate a specific entry + pub async fn invalidate_entry(&self, step_name: &str, reason: String) -> Result<()> { + let mut entries = self.load_entries().await?; + + if let Some(entry) = entries.get_mut(step_name) { + entry.validation_status = ValidationStatus::Invalid { reason }; + entry.last_validated_at = Some(Utc::now()); + } + + self.save_entries(&entries).await?; + + Ok(()) + } + + /// Run full validation on all entries + pub async fn validate_all(&self) -> Result { + let mut entries = self.load_entries().await?; + let report = validate_all_entries(&mut entries).await?; + self.save_entries(&entries).await?; + Ok(report) + } +} + +// ============================================================================ +// HELPER FUNCTIONS +// ============================================================================ + +/// Create a simple file reference +pub fn file_reference>(path: P) -> ContentReference { + ContentReference::File { + path: path.as_ref().to_path_buf(), + } +} + +/// Create a directory reference +pub fn directory_reference>( + path: P, + include_patterns: Option>, + exclude_patterns: Option>, +) -> ContentReference { + ContentReference::Directory { + path: path.as_ref().to_path_buf(), + include_patterns, + exclude_patterns, + } +} + +/// Create a composite reference +pub fn composite_reference(references: Vec) -> ContentReference { + ContentReference::Composite { references } +} \ No newline at end of file diff --git a/src/util/mod.rs b/src/util/mod.rs index 5ddd052..0d92ffa 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -2,4 +2,5 @@ pub mod logger; pub mod directories; pub mod opnv; -pub mod macros; \ No newline at end of file +pub mod macros; +pub mod integrity; \ No newline at end of file