diff --git a/Cargo.lock b/Cargo.lock index 02d1bc1..d3006cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2743,6 +2743,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_spanned" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3213,10 +3222,25 @@ dependencies = [ ] [[package]] -name = "toml_datetime" -version = "0.7.3" +name = "toml" +version = "0.9.11+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +checksum = "f3afc9a848309fe1aaffaed6e1546a7a14de1f935dc9d89d32afd9a44bab7c46" +dependencies = [ + "indexmap", + "serde_core", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + +[[package]] +name = "toml_datetime" +version = "0.7.5+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" dependencies = [ "serde_core", ] @@ -3235,13 +3259,19 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.0.4" +version = "1.0.6+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" dependencies = [ "winnow", ] +[[package]] +name = "toml_writer" +version = "1.0.6+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" + [[package]] name = "tower" version = "0.5.2" @@ -3635,6 +3665,7 @@ dependencies = [ "sha2", "tokio", "tokio-tungstenite 0.21.0", + "toml", "tracing", "tracing-subscriber", "url", diff --git a/Cargo.toml b/Cargo.toml index eeb1559..337f007 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ rand = "0.9.2" # Environment handling dotenvy = "0.15" +toml = "0.9.8" # Date & time chrono = { version = "0.4", features = ["serde"] } diff --git a/src/checkpoint_dependencies.toml b/src/checkpoint_dependencies.toml new file mode 100644 index 0000000..bc46259 --- /dev/null +++ b/src/checkpoint_dependencies.toml @@ -0,0 +1,13 @@ +[checkpoints.lei_figi_mapping_complete] +depends_on = [] + +[checkpoints.securities_data_complete] +depends_on = ["lei_figi_mapping_complete"] + +[groups.enrichment_group] +members = ["yahoo_events_enrichment_complete", "yahoo_options_enrichment_complete"] +depends_on = ["yahoo_companies_cleansed"] + +[checkpoints.yahoo_events_enrichment_complete] +depends_on = [] +group = "enrichment_group" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 00af0a9..d6ac832 100644 --- a/src/config.rs +++ b/src/config.rs @@ -32,6 +32,23 @@ pub struct Config { pub proxy_instances_per_certificate: Option, } +impl Default for Config { + fn default() -> Self { + Self { + economic_start_date: "2007-02-13".to_string(), + corporate_start_date: "2010-01-01".to_string(), + economic_lookahead_months: 3, + max_parallel_instances: default_max_parallel_instances(), + max_tasks_per_instance: 0, + max_requests_per_session: default_max_requests_per_session(), + min_request_interval_ms: default_min_request_interval_ms(), + max_retry_attempts: default_max_retry_attempts(), + enable_vpn_rotation: false, + proxy_instances_per_certificate: default_proxy_instances_per_certificate(), + } + } +} + fn default_enable_vpn_rotation() -> bool { false } @@ -54,25 +71,6 @@ fn default_proxy_instances_per_certificate() -> Option { Some(1) } -impl Default for Config { - fn default() -> Self { - Self { - economic_start_date: "2007-02-13".to_string(), - corporate_start_date: "2010-01-01".to_string(), - economic_lookahead_months: 3, - max_parallel_instances: default_max_parallel_instances(), - max_tasks_per_instance: 0, - max_requests_per_session: default_max_requests_per_session(), - min_request_interval_ms: default_min_request_interval_ms(), - max_retry_attempts: default_max_retry_attempts(), - enable_vpn_rotation: false, - proxy_instances_per_certificate: default_proxy_instances_per_certificate(), - } - } -} - - - impl Config { /// Loads configuration from environment variables using dotenvy. pub fn load() -> Result { diff --git a/src/corporate/collect_exchanges.rs b/src/corporate/collect_exchanges.rs index 7731761..6b5994c 100644 --- a/src/corporate/collect_exchanges.rs +++ b/src/corporate/collect_exchanges.rs @@ -245,7 +245,7 @@ fn get_fallback_rate(currency: &str) -> f64 { /// - Integrity tracking with content hash validation pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result { let state_path = paths.data_dir().join("state.jsonl"); - let manager = StateManager::new(&state_path, &paths.data_dir().to_path_buf()); + let manager = StateManager::new(&state_path, &paths.data_dir().to_path_buf())?; let step_name = "exchange_collection_complete"; let output_path = paths.data_dir().join("yahoo_exchanges.json"); @@ -407,7 +407,6 @@ async fn track_exchange_collection_completion( step_name.to_string(), content_reference, DataStage::Data, - vec![], // No explicit dependencies - output file serves as verification None, // Use default TTL (7 days for Data stage) ).await?; diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 4e7c406..dbc9095 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -12,6 +12,7 @@ use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; use crate::scraper::yahoo::{YahooClientPool}; +use crate::scraper::openfigi::load_figi_type_lists; use std::result::Result::Ok; use std::sync::Arc; diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index 5d33fdb..b804f8a 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -47,7 +47,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result Result anyhow::Result { - let api_key = dotenvy::var("OPENFIGI_API_KEY").ok(); - let has_key = api_key.is_some(); - - let mut builder = HttpClient::builder() - .user_agent("Mozilla/5.0 (compatible; OpenFIGI-Rust/1.0)") - .timeout(Duration::from_secs(30)); - - if let Some(key) = &api_key { - let mut headers = HeaderMap::new(); - headers.insert("X-OPENFIGI-APIKEY", HeaderValue::from_str(key)?); - builder = builder.default_headers(headers); - } - - let client = builder.build().context("Failed to build HTTP client")?; - logger::log_info(&format!("OpenFIGI client: {}", - if has_key { "with API key" } else { "no key" })).await; - - Ok(Self { client, has_key }) - } - - pub async fn map_isins_to_figi_infos(&self, isins: &[String]) -> anyhow::Result> { - if isins.is_empty() { - return Ok(vec![]); - } - - let mut all_figi_infos = Vec::new(); - let chunk_size = if self.has_key { 100 } else { 5 }; - let inter_sleep = if self.has_key { - Duration::from_millis(240) - } else { - Duration::from_millis(2400) - }; - - for chunk in isins.chunks(chunk_size) { - let jobs: Vec = chunk.iter() - .map(|isin| json!({ - "idType": "ID_ISIN", - "idValue": isin, - })) - .collect(); - - let mut retry_count = 0; - let max_retries = 5; - let mut backoff_ms = 1000u64; - - loop { - let resp_result = self.client - .post("https://api.openfigi.com/v3/mapping") - .header("Content-Type", "application/json") - .json(&jobs) - .send() - .await; - - let resp = match resp_result { - Ok(r) => r, - Err(e) => { - retry_count += 1; - if retry_count >= max_retries { - let err_msg = format!("Failed to send mapping request after {} retries: {}", max_retries, e); - logger::log_error(&err_msg).await; - return Err(anyhow!(err_msg)); - } - let warn_msg = format!("Transient error sending mapping request (attempt {}/{}): {}", retry_count, max_retries, e); - logger::log_warn(&warn_msg).await; - let retry_msg = format!(" Retrying in {}ms...", backoff_ms); - logger::log_info(&retry_msg).await; - sleep(Duration::from_millis(backoff_ms)).await; - backoff_ms = (backoff_ms * 2).min(60000); // Cap at 60s - continue; - } - }; - - let status = resp.status(); - let headers = resp.headers().clone(); - let body = resp.text().await?; - - if status == 429 { - let reset_sec = headers - .get("ratelimit-reset") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(10); - sleep(Duration::from_secs(reset_sec.max(10))).await; - continue; - } else if !status.is_success() { - if status.is_server_error() && retry_count < max_retries { - retry_count += 1; - sleep(Duration::from_millis(backoff_ms)).await; - backoff_ms = (backoff_ms * 2).min(60000); - continue; - } - return Err(anyhow!("OpenFIGI error {}: {}", status, body)); - } - - let results: Vec = serde_json::from_str(&body)?; - - for (isin, result) in chunk.iter().zip(results) { - if let Some(data) = result["data"].as_array() { - for item in data { - if let Some(figi) = item["figi"].as_str() { - all_figi_infos.push(FigiInfo { - isin: isin.clone(), - figi: figi.to_string(), - name: item["name"].as_str().unwrap_or("").to_string(), - ticker: item["ticker"].as_str().unwrap_or("").to_string(), - exch_code: item["exchCode"].as_str().unwrap_or("").to_string(), - composite_figi: item["compositeFIGI"].as_str().unwrap_or("").to_string(), - security_type: item["securityType"].as_str().unwrap_or("").to_string(), - market_sector: item["marketSector"].as_str().unwrap_or("").to_string(), - share_class_figi: item["shareClassFIGI"].as_str().unwrap_or("").to_string(), - security_type2: item["securityType2"].as_str().unwrap_or("").to_string(), - security_description: item["securityDescription"].as_str().unwrap_or("").to_string(), - }); - } - } - } - } - - break; - } - - sleep(inter_sleep).await; - } - - Ok(all_figi_infos) - } -} - async fn process_and_save_figi_batch( client: &OpenFigiClient, lei_batch: &HashMap>, @@ -224,41 +86,6 @@ async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> Ok(()) } -/// Handles rate limit responses from the OpenFIGI API. -/// -/// If a 429 status is received, this function sleeps for the duration specified -/// in the `ratelimit-reset` header (or 10 seconds by default). -/// -/// # Arguments -/// * `resp` - The HTTP response to check. -/// -/// # Returns -/// Ok(()) if no rate limit, or after waiting for the reset period. -/// -/// # Errors -/// Returns an error if the response status indicates a non-rate-limit error. -async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> { - let status = resp.status(); - - if status == 429 { - let headers = resp.headers(); - let reset_sec = headers - .get("ratelimit-reset") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(10); - - logger::log_info(&format!(" Rate limited—waiting {}s", reset_sec)).await; - sleep(std::time::Duration::from_secs(reset_sec.max(10))).await; - - return Err(anyhow!("Rate limited, please retry")); - } else if status.is_client_error() || status.is_server_error() { - return Err(anyhow!("OpenFIGI API error: {}", status)); - } - - Ok(()) -} - /// Loads or builds securities data by streaming through FIGI mapping files. /// /// Implements abort-safe incremental persistence with checkpoints and replay logs. @@ -276,7 +103,7 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> { let dir = DataPaths::new(".")?; let state_path = dir.data_dir().join("state.jsonl"); - let manager = StateManager::new(&state_path, &dir.data_dir().to_path_buf()); + let manager = StateManager::new(&state_path, &dir.data_dir().to_path_buf())?; let step_name = "securities_data_complete"; let data_dir = dir.data_dir(); @@ -432,7 +259,6 @@ async fn track_securities_completion( "securities_data_complete".to_string(), content_reference, DataStage::Data, - vec!["lei_figi_mapping_complete".to_string()], // Depends on LEI mapping None, // Use default TTL (7 days) ).await?; @@ -1110,220 +936,6 @@ async fn setup_sector_directories( Ok(()) } -/// Loads all OpenFIGI mapping value lists (marketSecDes, micCode, securityType). -/// -/// This function fetches the available values for each mapping parameter from the OpenFIGI API -/// and caches them as JSON files in `data/openfigi/`. If the files already exist and are recent -/// (less than 30 days old), they are reused instead of re-fetching. -/// -/// # Returns -/// Ok(()) on success. -/// -/// # Errors -/// Returns an error if API requests fail, JSON parsing fails, or file I/O fails. -pub async fn load_figi_type_lists(paths: &DataPaths) -> anyhow::Result<()> { - logger::log_info("Loading OpenFIGI mapping value lists...").await; - - let state_path = paths.cache_dir().join("state.jsonl"); - let cache_openfigi_dir = paths.cache_openfigi_dir(); - tokio_fs::create_dir_all(cache_openfigi_dir).await - .context("Failed to create data/openfigi directory")?; - - /*if state_path.exists() { - let state_content = tokio::fs::read_to_string(&state_path).await?; - - for line in state_content.lines() { - if line.trim().is_empty() { - continue; - } - - if let Ok(state) = serde_json::from_str::(line) { - if state.get("yahoo_companies_cleansed_no_data").and_then(|v| v.as_bool()).unwrap_or(false) { - logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await; - - if output_path.exists() { - let output_content = tokio::fs::read_to_string(&output_path).await?; - let count = output_content.lines() - .filter(|line| !line.trim().is_empty()) - .count(); - - logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await; - return Ok(count); - } else { - logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await; - break; - } - } - } - } - }*/ - - let client = OpenFigiClient::new().await?; - - // Fetch each type list - get_figi_market_sec_des(&client, cache_openfigi_dir).await?; - get_figi_mic_code(&client, cache_openfigi_dir).await?; - get_figi_security_type(&client, cache_openfigi_dir).await?; - - logger::log_info("OpenFIGI mapping value lists loaded successfully").await; - - - - Ok(()) -} - -/// Fetches and caches the list of valid marketSecDes values. -/// -/// # Arguments -/// * `client` - The OpenFIGI client instance. -/// * `cache_dir` - Directory to save the cached JSON file. -/// -/// # Returns -/// Ok(()) on success. -/// -/// # Errors -/// Returns an error if the API request fails or file I/O fails. -async fn get_figi_market_sec_des(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { - let cache_file = cache_dir.join("marketSecDes.json"); - - // Check if cache exists and is recent (< 30 days old) - if should_use_cache(&cache_file).await? { - logger::log_info(" Using cached marketSecDes values").await; - return Ok(()); - } - - logger::log_info(" Fetching marketSecDes values from OpenFIGI API...").await; - - let resp = client.client - .get("https://api.openfigi.com/v3/mapping/values/marketSecDes") - .send() - .await - .context("Failed to fetch marketSecDes values")?; - - handle_rate_limit(&resp).await?; - - let values: Value = resp.json().await - .context("Failed to parse marketSecDes response")?; - - // Save to cache - let json_str = serde_json::to_string_pretty(&values)?; - tokio_fs::write(&cache_file, json_str).await - .context("Failed to write marketSecDes cache")?; - - logger::log_info(" ✓ Cached marketSecDes values").await; - - // Respect rate limits - sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; - - Ok(()) -} - -/// Fetches and caches the list of valid micCode values. -/// -/// # Arguments -/// * `client` - The OpenFIGI client instance. -/// * `cache_dir` - Directory to save the cached JSON file. -/// -/// # Returns -/// Ok(()) on success. -/// -/// # Errors -/// Returns an error if the API request fails or file I/O fails. -async fn get_figi_mic_code(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { - let cache_file = cache_dir.join("micCode.json"); - - if should_use_cache(&cache_file).await? { - logger::log_info(" Using cached micCode values").await; - return Ok(()); - } - - logger::log_info(" Fetching micCode values from OpenFIGI API...").await; - - let resp = client.client - .get("https://api.openfigi.com/v3/mapping/values/micCode") - .send() - .await - .context("Failed to fetch micCode values")?; - - handle_rate_limit(&resp).await?; - - let values: Value = resp.json().await - .context("Failed to parse micCode response")?; - - let json_str = serde_json::to_string_pretty(&values)?; - tokio_fs::write(&cache_file, json_str).await - .context("Failed to write micCode cache")?; - - logger::log_info(" ✓ Cached micCode values").await; - - sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; - - Ok(()) -} - -/// Checks if a cache file exists and is less than 30 days old. -/// -/// # Arguments -/// * `path` - Path to the cache file. -/// -/// # Returns -/// True if the cache should be used, false if it needs refreshing. -async fn should_use_cache(path: &Path) -> anyhow::Result { - if !path.exists() { - return Ok(false); - } - - let metadata = tokio_fs::metadata(path).await?; - let modified = metadata.modified()?; - let age = modified.elapsed().unwrap_or(std::time::Duration::from_secs(u64::MAX)); - - // Cache is valid for 30 days - Ok(age < std::time::Duration::from_secs(30 * 24 * 60 * 60)) -} - -/// Fetches and caches the list of valid securityType values. -/// -/// # Arguments -/// * `client` - The OpenFIGI client instance. -/// * `cache_dir` - Directory to save the cached JSON file. -/// -/// # Returns -/// Ok(()) on success. -/// -/// # Errors -/// Returns an error if the API request fails or file I/O fails. -async fn get_figi_security_type(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { - let cache_file = cache_dir.join("securityType.json"); - - if should_use_cache(&cache_file).await? { - logger::log_info(" Using cached securityType values").await; - return Ok(()); - } - - logger::log_info(" Fetching securityType values from OpenFIGI API...").await; - - let resp = client.client - .get("https://api.openfigi.com/v3/mapping/values/securityType") - .send() - .await - .context("Failed to fetch securityType values")?; - - handle_rate_limit(&resp).await?; - - let values: Value = resp.json().await - .context("Failed to parse securityType response")?; - - let json_str = serde_json::to_string_pretty(&values)?; - tokio_fs::write(&cache_file, json_str).await - .context("Failed to write securityType cache")?; - - logger::log_info(" ✓ Cached securityType values").await; - - sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; - - Ok(()) -} - #[derive(Debug)] pub struct MappingStats { pub total_leis: usize, @@ -1621,7 +1233,7 @@ pub async fn update_lei_mapping( ) -> anyhow::Result { let dir = DataPaths::new(".")?; let state_path = dir.cache_dir().join("state.jsonl"); - let manager = StateManager::new(&state_path, &dir.cache_dir().to_path_buf()); + let manager = StateManager::new(&state_path, &dir.cache_dir().to_path_buf())?; let step_name = "lei_figi_mapping_complete"; let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); @@ -1694,7 +1306,6 @@ async fn track_lei_mapping_completion( "lei_figi_mapping_complete".to_string(), content_reference, DataStage::Cache, // 24-hour TTL for API data - vec![], // No dependencies None, // Use default TTL ).await?; diff --git a/src/economic/yahoo_update_forex.rs b/src/economic/yahoo_update_forex.rs index 862f638..6d4a994 100644 --- a/src/economic/yahoo_update_forex.rs +++ b/src/economic/yahoo_update_forex.rs @@ -94,7 +94,7 @@ pub async fn collect_fx_rates( let log_path = data_path.join("fx_rates_updates.log"); let state_path = data_path.join("state.jsonl"); - let manager = StateManager::new(&state_path, &data_path.to_path_buf()); + let manager = StateManager::new(&state_path, &data_path.to_path_buf())?; let step_name = "yahoo_fx_rate_collection_completed"; let content_reference = directory_reference(&output_path, Some(vec![ @@ -169,7 +169,6 @@ pub async fn collect_fx_rates( step_name.to_string(), content_reference, DataStage::Data, - vec!["yahoo_companies_cleansed".to_string()], // Dependency None, // Use default TTL (7 days for Data stage) ).await?; return Ok(collected_currencies.len()); @@ -321,7 +320,6 @@ pub async fn collect_fx_rates( step_name.to_string(), content_reference, DataStage::Data, - vec!["yahoo_companies_cleansed".to_string()], // Dependency None, // Use default TTL (7 days for Data stage) ).await?; } diff --git a/src/main.rs b/src/main.rs index 20d378d..a53c93f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use web_scraper::util::integrity::StateManager; // src/main.rs - Cleaned up version with extracted helpers use web_scraper::{*, scraper, corporate}; use crate::check_shutdown; @@ -233,7 +234,7 @@ fn format_duration(duration: Duration) -> String { format!("{:02}::{:02}::{:02}::{:02}", days, hours, minutes, seconds) } -pub async fn create_state_files(paths: &DataPaths) -> Result<()> { +async fn create_state_files(paths: &DataPaths) -> Result<()> { let paths = ( paths.data_dir().join("state.jsonl"), paths.cache_dir().join("state.jsonl"), @@ -251,6 +252,14 @@ pub async fn create_state_files(paths: &DataPaths) -> Result<()> { Ok(()) } +fn visualize_checkpoint_dependencies(paths: &DataPaths) -> Result<()> { + let manager = StateManager::new(&paths.data_dir().join("state.jsonl"), &paths.data_dir().to_path_buf())?; + manager.print_dependency_graph(); + let dot = manager.get_dependency_config().to_dot(); + std::fs::write(paths.logs_dir().join("checkpoint_dependencies.dot"), dot)?; + Ok(()) +} + // ============================================================================ // MAIN FUNCTION - Simplified with extracted helpers // ============================================================================ @@ -264,14 +273,13 @@ async fn main() -> Result<()> { start_docker_desktop().await; cleanup_all_proxy_containers().await.ok(); create_state_files(&paths).await.ok(); + visualize_checkpoint_dependencies(&paths).ok(); let config = Config::load().unwrap_or_else(|_| { eprintln!("Using default configuration"); Config::default() }); - - // Initialize monitoring let (monitoring_handle, _monitoring_task) = initialize_monitoring(&config, &paths).await?; diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index 4dbad6d..01cd5b9 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -3,3 +3,4 @@ pub mod docker_vpn_proxy; pub mod helpers; pub mod hard_reset; pub mod yahoo; +pub mod openfigi; \ No newline at end of file diff --git a/src/scraper/openfigi.rs b/src/scraper/openfigi.rs index e69de29..0c51c48 100644 --- a/src/scraper/openfigi.rs +++ b/src/scraper/openfigi.rs @@ -0,0 +1,370 @@ +// src/scraper/openfigi.rs - STREAMING VERSION +// Key changes: Never load entire GLEIF CSV or FIGI maps into memory + +use crate::util::directories::DataPaths; +use crate::util::logger; +use crate::corporate::{types::*}; +use reqwest::Client as HttpClient; +use reqwest::header::{HeaderMap, HeaderValue}; +use serde_json::{json, Value}; +use std::path::Path; +use tokio::time::{sleep, Duration}; +use tokio::fs as tokio_fs; +use anyhow::{Context, anyhow}; + +#[derive(Clone)] +pub struct OpenFigiClient { + pub client: HttpClient, + pub has_key: bool, +} + +impl OpenFigiClient { + pub async fn new() -> anyhow::Result { + let api_key = dotenvy::var("OPENFIGI_API_KEY").ok(); + let has_key = api_key.is_some(); + + let mut builder = HttpClient::builder() + .user_agent("Mozilla/5.0 (compatible; OpenFIGI-Rust/1.0)") + .timeout(Duration::from_secs(30)); + + if let Some(key) = &api_key { + let mut headers = HeaderMap::new(); + headers.insert("X-OPENFIGI-APIKEY", HeaderValue::from_str(key)?); + builder = builder.default_headers(headers); + } + + let client = builder.build().context("Failed to build HTTP client")?; + logger::log_info(&format!("OpenFIGI client: {}", + if has_key { "with API key" } else { "no key" })).await; + + Ok(Self { client, has_key }) + } + + pub async fn map_isins_to_figi_infos(&self, isins: &[String]) -> anyhow::Result> { + if isins.is_empty() { + return Ok(vec![]); + } + + let mut all_figi_infos = Vec::new(); + let chunk_size = if self.has_key { 100 } else { 5 }; + let inter_sleep = if self.has_key { + Duration::from_millis(240) + } else { + Duration::from_millis(2400) + }; + + for chunk in isins.chunks(chunk_size) { + let jobs: Vec = chunk.iter() + .map(|isin| json!({ + "idType": "ID_ISIN", + "idValue": isin, + })) + .collect(); + + let mut retry_count = 0; + let max_retries = 5; + let mut backoff_ms = 1000u64; + + loop { + let resp_result = self.client + .post("https://api.openfigi.com/v3/mapping") + .header("Content-Type", "application/json") + .json(&jobs) + .send() + .await; + + let resp = match resp_result { + Ok(r) => r, + Err(e) => { + retry_count += 1; + if retry_count >= max_retries { + let err_msg = format!("Failed to send mapping request after {} retries: {}", max_retries, e); + logger::log_error(&err_msg).await; + return Err(anyhow!(err_msg)); + } + let warn_msg = format!("Transient error sending mapping request (attempt {}/{}): {}", retry_count, max_retries, e); + logger::log_warn(&warn_msg).await; + let retry_msg = format!(" Retrying in {}ms...", backoff_ms); + logger::log_info(&retry_msg).await; + sleep(Duration::from_millis(backoff_ms)).await; + backoff_ms = (backoff_ms * 2).min(60000); // Cap at 60s + continue; + } + }; + + let status = resp.status(); + let headers = resp.headers().clone(); + let body = resp.text().await?; + + if status == 429 { + let reset_sec = headers + .get("ratelimit-reset") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(10); + sleep(Duration::from_secs(reset_sec.max(10))).await; + continue; + } else if !status.is_success() { + if status.is_server_error() && retry_count < max_retries { + retry_count += 1; + sleep(Duration::from_millis(backoff_ms)).await; + backoff_ms = (backoff_ms * 2).min(60000); + continue; + } + return Err(anyhow!("OpenFIGI error {}: {}", status, body)); + } + + let results: Vec = serde_json::from_str(&body)?; + + for (isin, result) in chunk.iter().zip(results) { + if let Some(data) = result["data"].as_array() { + for item in data { + if let Some(figi) = item["figi"].as_str() { + all_figi_infos.push(FigiInfo { + isin: isin.clone(), + figi: figi.to_string(), + name: item["name"].as_str().unwrap_or("").to_string(), + ticker: item["ticker"].as_str().unwrap_or("").to_string(), + exch_code: item["exchCode"].as_str().unwrap_or("").to_string(), + composite_figi: item["compositeFIGI"].as_str().unwrap_or("").to_string(), + security_type: item["securityType"].as_str().unwrap_or("").to_string(), + market_sector: item["marketSector"].as_str().unwrap_or("").to_string(), + share_class_figi: item["shareClassFIGI"].as_str().unwrap_or("").to_string(), + security_type2: item["securityType2"].as_str().unwrap_or("").to_string(), + security_description: item["securityDescription"].as_str().unwrap_or("").to_string(), + }); + } + } + } + } + + break; + } + + sleep(inter_sleep).await; + } + + Ok(all_figi_infos) + } +} + +/// Fetches and caches the list of valid securityType values. +/// +/// # Arguments +/// * `client` - The OpenFIGI client instance. +/// * `cache_dir` - Directory to save the cached JSON file. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if the API request fails or file I/O fails. +async fn get_figi_security_type(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { + let cache_file = cache_dir.join("securityType.json"); + + if should_use_cache(&cache_file).await? { + logger::log_info(" Using cached securityType values").await; + return Ok(()); + } + + logger::log_info(" Fetching securityType values from OpenFIGI API...").await; + + let resp = client.client + .get("https://api.openfigi.com/v3/mapping/values/securityType") + .send() + .await + .context("Failed to fetch securityType values")?; + + handle_rate_limit(&resp).await?; + + let values: Value = resp.json().await + .context("Failed to parse securityType response")?; + + let json_str = serde_json::to_string_pretty(&values)?; + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write securityType cache")?; + + logger::log_info(" ✓ Cached securityType values").await; + + sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; + + Ok(()) +} + + +/// Loads all OpenFIGI mapping value lists (marketSecDes, micCode, securityType). +/// +/// This function fetches the available values for each mapping parameter from the OpenFIGI API +/// and caches them as JSON files in `data/openfigi/`. If the files already exist and are recent +/// (less than 30 days old), they are reused instead of re-fetching. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if API requests fail, JSON parsing fails, or file I/O fails. +pub async fn load_figi_type_lists(paths: &DataPaths) -> anyhow::Result<()> { + logger::log_info("Loading OpenFIGI mapping value lists...").await; + + let state_path = paths.cache_dir().join("state.jsonl"); + let cache_openfigi_dir = paths.cache_openfigi_dir(); + tokio_fs::create_dir_all(cache_openfigi_dir).await + .context("Failed to create data/openfigi directory")?; + + let client = OpenFigiClient::new().await?; + + // Fetch each type list + get_figi_market_sec_des(&client, cache_openfigi_dir).await?; + get_figi_mic_code(&client, cache_openfigi_dir).await?; + get_figi_security_type(&client, cache_openfigi_dir).await?; + + logger::log_info("OpenFIGI mapping value lists loaded successfully").await; + + + + Ok(()) +} + +/// Fetches and caches the list of valid marketSecDes values. +/// +/// # Arguments +/// * `client` - The OpenFIGI client instance. +/// * `cache_dir` - Directory to save the cached JSON file. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if the API request fails or file I/O fails. +async fn get_figi_market_sec_des(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { + let cache_file = cache_dir.join("marketSecDes.json"); + + // Check if cache exists and is recent (< 30 days old) + if should_use_cache(&cache_file).await? { + logger::log_info(" Using cached marketSecDes values").await; + return Ok(()); + } + + logger::log_info(" Fetching marketSecDes values from OpenFIGI API...").await; + + let resp = client.client + .get("https://api.openfigi.com/v3/mapping/values/marketSecDes") + .send() + .await + .context("Failed to fetch marketSecDes values")?; + + handle_rate_limit(&resp).await?; + + let values: Value = resp.json().await + .context("Failed to parse marketSecDes response")?; + + // Save to cache + let json_str = serde_json::to_string_pretty(&values)?; + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write marketSecDes cache")?; + + logger::log_info(" ✓ Cached marketSecDes values").await; + + // Respect rate limits + sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; + + Ok(()) +} + +/// Fetches and caches the list of valid micCode values. +/// +/// # Arguments +/// * `client` - The OpenFIGI client instance. +/// * `cache_dir` - Directory to save the cached JSON file. +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if the API request fails or file I/O fails. +async fn get_figi_mic_code(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> { + let cache_file = cache_dir.join("micCode.json"); + + if should_use_cache(&cache_file).await? { + logger::log_info(" Using cached micCode values").await; + return Ok(()); + } + + logger::log_info(" Fetching micCode values from OpenFIGI API...").await; + + let resp = client.client + .get("https://api.openfigi.com/v3/mapping/values/micCode") + .send() + .await + .context("Failed to fetch micCode values")?; + + handle_rate_limit(&resp).await?; + + let values: Value = resp.json().await + .context("Failed to parse micCode response")?; + + let json_str = serde_json::to_string_pretty(&values)?; + tokio_fs::write(&cache_file, json_str).await + .context("Failed to write micCode cache")?; + + logger::log_info(" ✓ Cached micCode values").await; + + sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await; + + Ok(()) +} + +/// Handles rate limit responses from the OpenFIGI API. +/// +/// If a 429 status is received, this function sleeps for the duration specified +/// in the `ratelimit-reset` header (or 10 seconds by default). +/// +/// # Arguments +/// * `resp` - The HTTP response to check. +/// +/// # Returns +/// Ok(()) if no rate limit, or after waiting for the reset period. +/// +/// # Errors +/// Returns an error if the response status indicates a non-rate-limit error. +async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> { + let status = resp.status(); + + if status == 429 { + let headers = resp.headers(); + let reset_sec = headers + .get("ratelimit-reset") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(10); + + logger::log_info(&format!(" Rate limited—waiting {}s", reset_sec)).await; + sleep(std::time::Duration::from_secs(reset_sec.max(10))).await; + + return Err(anyhow!("Rate limited, please retry")); + } else if status.is_client_error() || status.is_server_error() { + return Err(anyhow!("OpenFIGI API error: {}", status)); + } + + Ok(()) +} + +/// Checks if a cache file exists and is less than 30 days old. +/// +/// # Arguments +/// * `path` - Path to the cache file. +/// +/// # Returns +/// True if the cache should be used, false if it needs refreshing. +async fn should_use_cache(path: &Path) -> anyhow::Result { + if !path.exists() { + return Ok(false); + } + + let metadata = tokio_fs::metadata(path).await?; + let modified = metadata.modified()?; + let age = modified.elapsed().unwrap_or(std::time::Duration::from_secs(u64::MAX)); + + // Cache is valid for 30 days + Ok(age < std::time::Duration::from_secs(30 * 24 * 60 * 60)) +} \ No newline at end of file diff --git a/src/util/integrity.rs b/src/util/integrity.rs index b1441b8..99b0c40 100644 --- a/src/util/integrity.rs +++ b/src/util/integrity.rs @@ -1,5 +1,5 @@ // src/util/integrity.rs -//! Content integrity and state lifecycle management module +//! Content integrity and state lifecycle management module with centralized dependencies //! //! Features: //! - File and directory hashing (SHA-256) @@ -7,9 +7,12 @@ //! - State invalidation based on time or validation failures //! - 3-stage data lifecycle: cache → data → storage //! - Inline vs. external hash storage based on size -//! - Cascade invalidation when dependencies fail validation +//! - **Centralized dependency configuration** (Single Source of Truth) +//! - Support for checkpoint groups and hierarchies +//! - Automatic transitive dependency resolution +//! - Cycle detection in dependency graph -use anyhow::{Context, Result}; +use anyhow::{Context, Result, bail}; use chrono::{DateTime, Duration, Utc}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -34,6 +37,252 @@ const HASH_STORAGE_DIR: &str = ".integrity_hashes"; /// File extension for external hash files const HASH_FILE_EXT: &str = ".hash"; +/// Default dependency configuration file name +const DEFAULT_DEPENDENCY_CONFIG: &str = "checkpoint_dependencies.toml"; + +// ============================================================================ +// DEPENDENCY CONFIGURATION (SINGLE SOURCE OF TRUTH) +// ============================================================================ + +/// Centralized dependency configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DependencyConfig { + /// Individual checkpoint dependencies + #[serde(default)] + pub checkpoints: HashMap, + + /// Checkpoint groups (for hierarchical dependencies) + #[serde(default)] + pub groups: HashMap, +} + +/// Configuration for a single checkpoint +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CheckpointConfig { + /// Description of this checkpoint + #[serde(default)] + pub description: String, + + /// Direct dependencies (checkpoint names this depends on) + #[serde(default)] + pub depends_on: Vec, + + /// Whether this checkpoint is part of a group + #[serde(default)] + pub group: Option, +} + +/// Configuration for a checkpoint group +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GroupConfig { + /// Description of this group + #[serde(default)] + pub description: String, + + /// Members of this group + pub members: Vec, + + /// Dependencies for all members of this group + #[serde(default)] + pub depends_on: Vec, +} + +impl DependencyConfig { + /// Load dependency configuration from TOML file + pub fn from_file>(path: P) -> Result { + let content = fs::read_to_string(path.as_ref()) + .with_context(|| format!("Failed to read dependency config: {}", path.as_ref().display()))?; + + let config: DependencyConfig = toml::from_str(&content) + .context("Failed to parse dependency config")?; + + config.validate()?; + + Ok(config) + } + + /// Load from default location (dependencies.toml in base_dir) + pub fn from_default_location>(base_dir: P) -> Result { + let config_path = base_dir.as_ref().join(DEFAULT_DEPENDENCY_CONFIG); + + if !config_path.exists() { + // Return empty config if file doesn't exist + return Ok(Self::default()); + } + + Self::from_file(config_path) + } + + /// Validate configuration (check for cycles, invalid references) + pub fn validate(&self) -> Result<()> { + // Check for cycles + for checkpoint in self.checkpoints.keys() { + self.detect_cycle(checkpoint)?; + } + + // Validate group memberships + for (group_name, group) in &self.groups { + for member in &group.members { + if !self.checkpoints.contains_key(member) { + bail!("Group '{}' references non-existent checkpoint: {}", group_name, member); + } + } + } + + // Validate that checkpoints in groups actually declare the group + for (checkpoint_name, checkpoint) in &self.checkpoints { + if let Some(group_name) = &checkpoint.group { + if let Some(group) = self.groups.get(group_name) { + if !group.members.contains(checkpoint_name) { + bail!("Checkpoint '{}' claims to be in group '{}' but group doesn't list it", + checkpoint_name, group_name); + } + } else { + bail!("Checkpoint '{}' references non-existent group: {}", checkpoint_name, group_name); + } + } + } + + Ok(()) + } + + /// Detect if there's a cycle in dependencies starting from checkpoint + fn detect_cycle(&self, start: &str) -> Result<()> { + let mut visited = HashSet::new(); + let mut stack = HashSet::new(); + + self.detect_cycle_helper(start, &mut visited, &mut stack) + } + + fn detect_cycle_helper( + &self, + checkpoint: &str, + visited: &mut HashSet, + stack: &mut HashSet, + ) -> Result<()> { + if stack.contains(checkpoint) { + bail!("Cycle detected in dependency graph at checkpoint: {}", checkpoint); + } + + if visited.contains(checkpoint) { + return Ok(()); + } + + visited.insert(checkpoint.to_string()); + stack.insert(checkpoint.to_string()); + + // Check direct dependencies + if let Some(config) = self.checkpoints.get(checkpoint) { + for dep in &config.depends_on { + self.detect_cycle_helper(dep, visited, stack)?; + } + } + + stack.remove(checkpoint); + Ok(()) + } + + /// Get all dependencies for a checkpoint (including transitive and group dependencies) + pub fn get_all_dependencies(&self, checkpoint: &str) -> Result> { + let mut all_deps = Vec::new(); + let mut visited = HashSet::new(); + + self.collect_dependencies(checkpoint, &mut all_deps, &mut visited)?; + + // Remove duplicates while preserving order + let mut seen = HashSet::new(); + all_deps.retain(|dep| seen.insert(dep.clone())); + + Ok(all_deps) + } + + fn collect_dependencies( + &self, + checkpoint: &str, + deps: &mut Vec, + visited: &mut HashSet, + ) -> Result<()> { + if visited.contains(checkpoint) { + return Ok(()); + } + visited.insert(checkpoint.to_string()); + + // Get checkpoint config + let config = self.checkpoints.get(checkpoint) + .ok_or_else(|| anyhow::anyhow!("Unknown checkpoint: {}", checkpoint))?; + + // Add group dependencies first (if checkpoint is in a group) + if let Some(group_name) = &config.group { + if let Some(group) = self.groups.get(group_name) { + for dep in &group.depends_on { + if !visited.contains(dep) { + deps.push(dep.clone()); + self.collect_dependencies(dep, deps, visited)?; + } + } + } + } + + // Add direct dependencies + for dep in &config.depends_on { + if !visited.contains(dep) { + deps.push(dep.clone()); + self.collect_dependencies(dep, deps, visited)?; + } + } + + Ok(()) + } + + /// Get dependency graph as DOT format (for visualization) + pub fn to_dot(&self) -> String { + let mut dot = String::from("digraph Dependencies {\n"); + dot.push_str(" rankdir=LR;\n"); + dot.push_str(" node [shape=box];\n\n"); + + // Add checkpoints + for (name, config) in &self.checkpoints { + let label = if config.description.is_empty() { + name.clone() + } else { + format!("{}\n{}", name, config.description) + }; + dot.push_str(&format!(" \"{}\" [label=\"{}\"];\n", name, label)); + } + + // Add dependencies + dot.push_str("\n"); + for (name, config) in &self.checkpoints { + // Add group dependencies + if let Some(group_name) = &config.group { + if let Some(group) = self.groups.get(group_name) { + for dep in &group.depends_on { + dot.push_str(&format!(" \"{}\" -> \"{}\" [label=\"via group {}\"];\n", + name, dep, group_name)); + } + } + } + + // Add direct dependencies + for dep in &config.depends_on { + dot.push_str(&format!(" \"{}\" -> \"{}\";\n", name, dep)); + } + } + + dot.push_str("}\n"); + dot + } +} + +impl Default for DependencyConfig { + fn default() -> Self { + Self { + checkpoints: HashMap::new(), + groups: HashMap::new(), + } + } +} + // ============================================================================ // DATA STRUCTURES // ============================================================================ @@ -135,7 +384,8 @@ pub struct StateEntry { /// Validation status pub validation_status: ValidationStatus, - /// Dependencies (other steps that must be valid for this to remain valid) + /// Dependencies (resolved automatically from config, stored for reference) + #[serde(default)] pub dependencies: Vec, } @@ -160,7 +410,7 @@ pub enum ValidationStatus { } // ============================================================================ -// HASH COMPUTATION +// HASH COMPUTATION (UNCHANGED FROM ORIGINAL) // ============================================================================ /// Hash a single file using SHA-256 @@ -185,7 +435,6 @@ pub fn hash_file>(path: P) -> Result { } /// Hash a directory recursively -/// Returns a combined hash of all files in sorted order pub fn hash_directory>( path: P, include_patterns: Option<&[String]>, @@ -194,31 +443,26 @@ pub fn hash_directory>( let path = path.as_ref(); if !path.is_dir() { - anyhow::bail!("Path is not a directory: {}", path.display()); + bail!("Path is not a directory: {}", path.display()); } - // Collect all files recursively let mut files = Vec::new(); collect_files_recursive(path, &mut files, include_patterns, exclude_patterns)?; - // Sort for deterministic hashing files.sort(); if files.is_empty() { - return Ok(String::from("d41d8cd98f00b204e9800998ecf8427e")); // MD5 of empty string + return Ok(String::from("d41d8cd98f00b204e9800998ecf8427e")); } - // Hash all files and combine let mut combined_hasher = Sha256::new(); for file_path in files { - // Include relative path in hash for structure awareness let rel_path = file_path.strip_prefix(path) .unwrap_or(&file_path) .to_string_lossy(); combined_hasher.update(rel_path.as_bytes()); - // Hash file content let file_hash = hash_file(&file_path)?; combined_hasher.update(file_hash.as_bytes()); } @@ -241,7 +485,6 @@ fn collect_files_recursive( let entry = entry?; let path = entry.path(); - // Skip hidden files and directories if let Some(name) = path.file_name() { if name.to_string_lossy().starts_with('.') { continue; @@ -251,7 +494,6 @@ fn collect_files_recursive( if path.is_dir() { collect_files_recursive(&path, files, include_patterns, exclude_patterns)?; } else if path.is_file() { - // Apply pattern filters if should_include_file(&path, include_patterns, exclude_patterns) { files.push(path); } @@ -261,7 +503,7 @@ fn collect_files_recursive( Ok(()) } -/// Check if a file should be included based on patterns +/// Check if file should be included based on patterns fn should_include_file( path: &Path, include_patterns: Option<&[String]>, @@ -269,51 +511,44 @@ fn should_include_file( ) -> bool { let path_str = path.to_string_lossy(); - // Check exclude patterns first if let Some(excludes) = exclude_patterns { for pattern in excludes { - if path_str.contains(pattern) || matches_glob(path, pattern) { + if glob_match(&path_str, pattern) { return false; } } } - // Check include patterns if let Some(includes) = include_patterns { for pattern in includes { - if path_str.contains(pattern) || matches_glob(path, pattern) { + if glob_match(&path_str, pattern) { return true; } } - return false; // If includes specified but no match + return false; } - true // Include by default + true } -/// Simple glob pattern matching (supports * and ?) -fn matches_glob(path: &Path, pattern: &str) -> bool { - let path_str = path.to_string_lossy(); - - // Convert glob to regex - let regex_pattern = pattern - .replace(".", "\\.") - .replace("*", ".*") - .replace("?", "."); - - if let Ok(re) = regex::Regex::new(&format!("^{}$", regex_pattern)) { - re.is_match(&path_str) +/// Simple glob pattern matching +fn glob_match(path: &str, pattern: &str) -> bool { + if pattern.contains('*') { + let parts: Vec<&str> = pattern.split('*').collect(); + if parts.len() == 2 { + path.contains(parts[0]) && path.ends_with(parts[1]) + } else { + false + } } else { - false + path.ends_with(pattern) } } -/// Hash a content reference +/// Hash content based on reference type pub fn hash_content_reference(reference: &ContentReference) -> Result { match reference { - ContentReference::File { path } => { - hash_file(path) - } + ContentReference::File { path } => hash_file(path), ContentReference::Directory { path, include_patterns, exclude_patterns } => { hash_directory( path, @@ -323,75 +558,44 @@ pub fn hash_content_reference(reference: &ContentReference) -> Result { } ContentReference::Composite { references } => { let mut combined_hasher = Sha256::new(); - - for reference in references { - let hash = hash_content_reference(reference)?; + for ref_item in references { + let hash = hash_content_reference(ref_item)?; combined_hasher.update(hash.as_bytes()); } - Ok(format!("{:x}", combined_hasher.finalize())) } } } -// ============================================================================ -// HASH STORAGE MANAGEMENT -// ============================================================================ - -/// Determine storage method based on hash size -pub fn determine_hash_storage(hash: &str, base_dir: &Path) -> HashStorage { - if hash.len() <= INLINE_HASH_THRESHOLD { - HashStorage::Inline { - hash: hash.to_string(), - } +/// Determine whether to store hash inline or externally +fn determine_hash_storage(hash: &str, base_dir: &Path) -> HashStorage { + if hash.len() > INLINE_HASH_THRESHOLD { + let hash_dir = base_dir.join(HASH_STORAGE_DIR); + let hash_file = hash_dir.join(format!("{}{}", &hash[..16], HASH_FILE_EXT)); + HashStorage::External { hash_file } } else { - let hash_id = Sha256::digest(hash.as_bytes()); - let hash_filename = format!("{:x}{}", hash_id, HASH_FILE_EXT); - - HashStorage::External { - hash_file: base_dir - .join(HASH_STORAGE_DIR) - .join(hash_filename), - } + HashStorage::Inline { hash: hash.to_string() } } } -/// Store hash externally if needed -pub async fn store_hash( - hash: &str, - storage: &HashStorage, -) -> Result<()> { - match storage { - HashStorage::Inline { .. } => { - // Nothing to do, hash is inline - Ok(()) - } - HashStorage::External { hash_file } => { - // Create directory if needed - if let Some(parent) = hash_file.parent() { - async_fs::create_dir_all(parent).await?; - } - - // Write hash to file - let mut file = async_fs::File::create(hash_file).await?; - file.write_all(hash.as_bytes()).await?; - file.flush().await?; - - Ok(()) +/// Store hash to file if external storage +async fn store_hash(hash: &str, storage: &HashStorage) -> Result<()> { + if let HashStorage::External { hash_file } = storage { + if let Some(parent) = hash_file.parent() { + async_fs::create_dir_all(parent).await?; } + async_fs::write(hash_file, hash.as_bytes()).await?; } + Ok(()) } -/// Retrieve hash from storage -pub async fn retrieve_hash(storage: &HashStorage) -> Result { +/// Load hash from storage +async fn load_hash(storage: &HashStorage) -> Result { match storage { - HashStorage::Inline { hash } => { - Ok(hash.clone()) - } + HashStorage::Inline { hash } => Ok(hash.clone()), HashStorage::External { hash_file } => { - async_fs::read_to_string(hash_file) - .await - .with_context(|| format!("Failed to read hash file: {}", hash_file.display())) + let content = async_fs::read_to_string(hash_file).await?; + Ok(content.trim().to_string()) } } } @@ -400,52 +604,47 @@ pub async fn retrieve_hash(storage: &HashStorage) -> Result { // VALIDATION // ============================================================================ -/// Validate a state entry's content against its hash -pub async fn validate_entry(entry: &StateEntry) -> Result { - // Check if completed +/// Validate a single state entry +async fn validate_entry(entry: &StateEntry) -> Result { if !entry.completed { return Ok(ValidationStatus::Unknown); } - // Check TTL expiration - if let Some(completed_at) = entry.completed_at { - let ttl = entry.ttl_override - .or_else(|| entry.data_stage.map(|s| s.default_ttl())) - .unwrap_or_else(|| Duration::days(7)); - - let expiration = completed_at + ttl; - if Utc::now() > expiration { - return Ok(ValidationStatus::Expired); + let content_ref = match &entry.content_reference { + Some(r) => r, + None => return Ok(ValidationStatus::Unknown), + }; + + let stored_hash_storage = match &entry.content_hash { + Some(h) => h, + None => return Ok(ValidationStatus::Unknown), + }; + + let stored_hash = load_hash(stored_hash_storage).await?; + + let current_hash = match hash_content_reference(content_ref) { + Ok(h) => h, + Err(e) => { + return Ok(ValidationStatus::Invalid { + reason: format!("Failed to compute hash: {}", e), + }); } + }; + + if stored_hash != current_hash { + return Ok(ValidationStatus::Invalid { + reason: "Hash mismatch".to_string(), + }); } - // Validate content hash if available - if let (Some(reference), Some(storage)) = (&entry.content_reference, &entry.content_hash) { - // Compute current hash - let current_hash = match hash_content_reference(reference) { - Ok(hash) => hash, - Err(e) => { - return Ok(ValidationStatus::Invalid { - reason: format!("Failed to compute hash: {}", e), - }); - } - }; + if let Some(stage) = entry.data_stage { + let ttl = entry.ttl_override.unwrap_or_else(|| stage.default_ttl()); - // Retrieve stored hash - let stored_hash = match retrieve_hash(storage).await { - Ok(hash) => hash, - Err(e) => { - return Ok(ValidationStatus::Invalid { - reason: format!("Failed to retrieve stored hash: {}", e), - }); + if let Some(completed_at) = entry.completed_at { + let age = Utc::now() - completed_at; + if age > ttl { + return Ok(ValidationStatus::Expired); } - }; - - // Compare hashes - if current_hash != stored_hash { - return Ok(ValidationStatus::Invalid { - reason: "Hash mismatch".to_string(), - }); } } @@ -458,7 +657,6 @@ pub async fn validate_all_entries( ) -> Result { let mut report = ValidationReport::default(); - // First pass: validate each entry independently for (name, entry) in entries.iter_mut() { let status = validate_entry(entry).await?; entry.validation_status = status.clone(); @@ -479,7 +677,6 @@ pub async fn validate_all_entries( } } - // Second pass: cascade invalidation based on dependencies let mut invalidated = HashSet::new(); for name in &report.invalid_entries { invalidated.insert(name.clone()); @@ -493,7 +690,6 @@ pub async fn validate_all_entries( continue; } - // Check if any dependency is invalidated for dep in &entry.dependencies { if invalidated.contains(dep) { newly_invalidated.push((name.clone(), dep.clone())); @@ -565,21 +761,47 @@ impl ValidationReport { } // ============================================================================ -// STATE MANAGEMENT +// STATE MANAGEMENT (UPDATED WITH CENTRALIZED DEPENDENCIES) // ============================================================================ -/// State manager for reading/writing state entries +/// State manager with centralized dependency configuration pub struct StateManager { state_path: PathBuf, base_dir: PathBuf, + dependency_config: DependencyConfig, } impl StateManager { - pub fn new>(state_path: P, base_dir: P) -> Self { - Self { + /// Create new state manager and load dependency configuration + pub fn new>(state_path: P, base_dir: P) -> Result { + let base_dir = base_dir.as_ref().to_path_buf(); + let dependency_config = DependencyConfig::from_default_location(&base_dir)?; + + Ok(Self { + state_path: state_path.as_ref().to_path_buf(), + base_dir, + dependency_config, + }) + } + + /// Create with explicit dependency configuration + pub fn with_config>( + state_path: P, + base_dir: P, + dependency_config: DependencyConfig, + ) -> Result { + dependency_config.validate()?; + + Ok(Self { state_path: state_path.as_ref().to_path_buf(), base_dir: base_dir.as_ref().to_path_buf(), - } + dependency_config, + }) + } + + /// Get the dependency configuration (for inspection/debugging) + pub fn get_dependency_config(&self) -> &DependencyConfig { + &self.dependency_config } /// Load all state entries from state.jsonl @@ -607,28 +829,39 @@ impl StateManager { /// Save all state entries to state.jsonl pub async fn save_entries(&self, entries: &HashMap) -> Result<()> { - let mut lines = Vec::new(); - - for entry in entries.values() { - let json = serde_json::to_string(entry)?; - lines.push(json); + if let Some(parent) = self.state_path.parent() { + async_fs::create_dir_all(parent).await?; } - let content = lines.join("\n") + "\n"; - async_fs::write(&self.state_path, content).await?; + let mut file = async_fs::File::create(&self.state_path).await?; + + for entry in entries.values() { + let line = serde_json::to_string(&entry)? + "\n"; + file.write_all(line.as_bytes()).await?; + } + + file.sync_all().await?; Ok(()) } /// Create or update a state entry with integrity tracking + /// **UPDATED**: Dependencies are now resolved automatically from config pub async fn update_entry( &self, step_name: String, content_reference: ContentReference, data_stage: DataStage, - dependencies: Vec, ttl_override: Option, ) -> Result { + // Resolve dependencies from configuration + let dependencies = self.dependency_config + .get_all_dependencies(&step_name) + .unwrap_or_else(|_| { + // If checkpoint not in config, no dependencies + Vec::new() + }); + // Compute hash let hash = hash_content_reference(&content_reference)?; @@ -697,6 +930,48 @@ impl StateManager { self.save_entries(&entries).await?; Ok(report) } + + /// Print dependency graph information + pub fn print_dependency_graph(&self) { + println!("=== Dependency Configuration ==="); + println!("\nCheckpoints: {}", self.dependency_config.checkpoints.len()); + println!("Groups: {}", self.dependency_config.groups.len()); + + println!("\n--- Checkpoints ---"); + for (name, config) in &self.dependency_config.checkpoints { + println!("{}", name); + if !config.description.is_empty() { + println!(" Description: {}", config.description); + } + if let Some(group) = &config.group { + println!(" Group: {}", group); + } + if !config.depends_on.is_empty() { + println!(" Depends on: {}", config.depends_on.join(", ")); + } + + // Show resolved dependencies + if let Ok(all_deps) = self.dependency_config.get_all_dependencies(name) { + if !all_deps.is_empty() { + println!(" Resolved (including transitive): {}", all_deps.join(", ")); + } + } + println!(); + } + + println!("\n--- Groups ---"); + for (name, group) in &self.dependency_config.groups { + println!("{}", name); + if !group.description.is_empty() { + println!(" Description: {}", group.description); + } + println!(" Members: {}", group.members.join(", ")); + if !group.depends_on.is_empty() { + println!(" Group dependencies: {}", group.depends_on.join(", ")); + } + println!(); + } + } } // ============================================================================