From 75ab1969c7b78d97b509fbcba54f6897e23af62d Mon Sep 17 00:00:00 2001 From: donpat1to Date: Thu, 15 Jan 2026 00:22:55 +0100 Subject: [PATCH] added cross compatiblity between shutdown flag and state entries --- SHUTDOWN_STATE_COORDINATION.md | 373 +++++++++++++ integrity/checkpoint_dependencies.dot | 22 +- src/corporate/collect_exchanges.rs | 20 +- src/corporate/update.rs | 4 +- src/corporate/update_companies.rs | 34 +- src/corporate/update_companies_cleanse.rs | 26 +- src/corporate/update_companies_enrich.rs | 76 +-- src/corporate/update_openfigi.rs | 140 ++--- src/corporate/yahoo_company_extraction.js | 14 +- src/corporate/yahoo_company_extraction.rs | 28 +- src/economic/update.rs | 2 +- src/economic/yahoo_update_forex.rs | 19 +- src/util/integrity.rs | 617 +++++++++------------- src/util/macros.rs | 18 + 14 files changed, 850 insertions(+), 543 deletions(-) create mode 100644 SHUTDOWN_STATE_COORDINATION.md diff --git a/SHUTDOWN_STATE_COORDINATION.md b/SHUTDOWN_STATE_COORDINATION.md new file mode 100644 index 0000000..42780dc --- /dev/null +++ b/SHUTDOWN_STATE_COORDINATION.md @@ -0,0 +1,373 @@ +# Shutdown Flag & State Management Orchestration + +## Problem Statement + +Previously, the shutdown flag and StateManager worked independently: +- **Shutdown Flag**: `Arc` signals code to stop execution +- **StateManager**: Tracks completion of work with hash validation and dependencies + +This caused a critical issue: **when shutdown occurred mid-process, no state was recorded**, so on restart the entire step would be retried from scratch, losing all progress. + +## Solution: Coordinated Lifecycle Management + +### Overview + +The shutdown flag and StateManager now work together in a coordinated lifecycle: + +``` +Work In Progress + ↓ + Shutdown Signal (Ctrl+C) + ↓ + Record Incomplete State + ↓ + Return & Cleanup + ↓ + Next Run: Retry From Checkpoint +``` + +### Core Concepts + +#### 1. **StateEntry Lifecycle** + +Each checkpoint has two completion states: + +```rust +// Happy Path: Work Completed Successfully +StateEntry { + completed: true, // ✓ Finished + completed_at: Some(timestamp), // When it finished + validation_status: Valid, // Hash is current +} + +// Shutdown Path: Work Interrupted +StateEntry { + completed: false, // ✗ Incomplete + completed_at: None, // Never finished + validation_status: Invalid { // Won't be skipped + reason: "Incomplete due to shutdown" + } +} +``` + +#### 2. **State Management Functions** + +Two key functions orchestrate the shutdown/completion dance: + +```rust +// Normal Completion (happy path) +manager.update_entry( + "step_name".to_string(), + content_reference, + DataStage::Data, + None, +).await?; + +// Shutdown Completion (incomplete work) +manager.mark_incomplete( + "step_name".to_string(), + Some(content_reference), + Some(DataStage::Data), + "Incomplete: processed 50 of 1000 items".to_string(), +).await?; +``` + +### Implementation Pattern + +Every long-running function should follow this pattern: + +```rust +pub async fn process_large_dataset( + paths: &DataPaths, + shutdown_flag: &Arc, +) -> Result { + // 1. Initialize state manager and content reference + let manager = StateManager::new(&paths.integrity_dir()).await?; + let step_name = "process_large_dataset"; + let content_ref = directory_reference(&output_dir, None, None); + + let mut processed_count = 0; + + // 2. Main processing loop + loop { + // CRITICAL: Check shutdown at key points + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected - marking state as incomplete").await; + + // Record incomplete state for retry + manager.mark_incomplete( + step_name.to_string(), + Some(content_ref.clone()), + Some(DataStage::Data), + format!("Incomplete: processed {} items", processed_count), + ).await?; + + return Ok(processed_count); + } + + // 3. Do work... + processed_count += 1; + } + + // 4. If we reach here, work is complete + // Shutdown check BEFORE marking complete + if shutdown_flag.load(Ordering::SeqCst) { + manager.mark_incomplete( + step_name.to_string(), + Some(content_ref), + Some(DataStage::Data), + format!("Incomplete during final stage: processed {} items", processed_count), + ).await?; + } else { + // Only mark complete if shutdown was NOT signaled + manager.update_entry( + step_name.to_string(), + content_ref, + DataStage::Data, + None, + ).await?; + } + + Ok(processed_count) +} +``` + +### Why Two Functions Are Different + +| Aspect | `update_entry()` | `mark_incomplete()` | +|--------|------------------|-------------------| +| **Use Case** | Normal completion | Shutdown/abort | +| `completed` | `true` | `false` | +| `completed_at` | `Some(now)` | `None` | +| `validation_status` | `Valid` | `Invalid { reason }` | +| Next Run | **Skipped** (already done) | **Retried** (incomplete) | +| Hash Stored | Always | Optional (may fail to compute) | +| Semantics | "This work is finished" | "This work wasn't finished" | + +### Shutdown Flag Setup + +The shutdown flag is initialized in `main.rs`: + +```rust +let shutdown_flag = Arc::new(AtomicBool::new(false)); + +// Ctrl+C handler +fn setup_shutdown_handler( + shutdown_flag: Arc, + pool: Arc, + proxy_pool: Option>, +) { + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + logger::log_info("Ctrl+C received – shutting down gracefully...").await; + + // Set flag to signal all tasks to stop + shutdown_flag.store(true, Ordering::SeqCst); + + // Wait for tasks to clean up + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // Final cleanup + perform_full_cleanup(&pool, proxy_pool.as_deref()).await; + std::process::exit(0); + }); +} +``` + +### Multi-Level Shutdown Checks + +For efficiency, shutdown is checked at different levels: + +```rust +// 1. Macro for quick checks (returns early) +check_shutdown!(shutdown_flag); + +// 2. Loop check (inside tight processing loops) +if shutdown_flag.load(Ordering::SeqCst) { + break; +} + +// 3. Final completion check (before marking complete) +if shutdown_flag.load(Ordering::SeqCst) { + manager.mark_incomplete(...).await?; +} else { + manager.update_entry(...).await?; +} +``` + +### Practical Example: Update Companies + +The `update_companies` function shows the full pattern: + +```rust +pub async fn update_companies( + paths: &DataPaths, + config: &Config, + pool: &Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + let manager = StateManager::new(&paths.integrity_dir()).await?; + let step_name = "update_companies"; + let content_reference = directory_reference(...); + + // Process companies... + loop { + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected").await; + break; + } + // Process items... + } + + // Final checkpoint + let (final_count, _, _) = writer_task.await.unwrap_or((0, 0, 0)); + + // CRITICAL: Check shutdown before marking complete + if shutdown_flag.load(Ordering::SeqCst) { + manager.mark_incomplete( + step_name.to_string(), + Some(content_reference), + Some(DataStage::Data), + format!("Incomplete: processed {} items", final_count), + ).await?; + } else { + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + None, + ).await?; + } + + Ok(final_count) +} +``` + +### State Tracking in `state.jsonl` + +With this pattern, the state file captures work progression: + +**Before Shutdown:** +```jsonl +{"step_name":"update_companies","completed":false,"validation_status":{"Invalid":"Processing 523 items..."},"dependencies":["lei_figi_mapping_complete"]} +``` + +**After Completion:** +```jsonl +{"step_name":"update_companies","completed":true,"completed_at":"2026-01-14T21:30:45Z","validation_status":"Valid","dependencies":["lei_figi_mapping_complete"]} +``` + +**After Resume:** +- System detects `completed: false` and `validation_status: Invalid` +- Retries `update_companies` from checkpoint +- Uses `.log` files to skip already-processed items +- On success, updates to `completed: true` + +## Benefits + +### 1. **Crash Safety** +- Progress is recorded at shutdown +- No lost work on restart +- Checkpoints prevent reprocessing + +### 2. **Graceful Degradation** +- Long-running functions can be interrupted +- State is always consistent +- Dependencies are tracked + +### 3. **Debugging** +- `state.jsonl` shows exactly which steps were incomplete +- Reasons are recorded for incomplete states +- Progress counts help diagnose where it was interrupted + +### 4. **Consistency** +- `update_entry()` only used for complete work +- `mark_incomplete()` only used for interrupted work +- No ambiguous states + +## Common Mistakes to Avoid + +### ❌ Don't: Call `update_entry()` without shutdown check +```rust +// BAD: Might mark shutdown state as complete! +manager.update_entry(...).await?; +``` + +### ✅ Do: Check shutdown before `update_entry()` +```rust +// GOOD: Only marks complete if not shutting down +if !shutdown_flag.load(Ordering::SeqCst) { + manager.update_entry(...).await?; +} +``` + +### ❌ Don't: Forget `mark_incomplete()` on shutdown +```rust +if shutdown_flag.load(Ordering::SeqCst) { + return Ok(()); // Lost progress! +} +``` + +### ✅ Do: Record incomplete state +```rust +if shutdown_flag.load(Ordering::SeqCst) { + manager.mark_incomplete(...).await?; + return Ok(()); +} +``` + +### ❌ Don't: Store partial data without recording state +```rust +// Write output, but forget to track in state +write_output(...).await?; +// If shutdown here, next run won't know it's incomplete +``` + +### ✅ Do: Update state atomically +```rust +// Update output and state together +write_output(...).await?; +manager.update_entry(...).await?; // Or mark_incomplete if shutdown +``` + +## Testing the Orchestration + +### Test 1: Normal Completion +```bash +cargo run # Let it finish +grep completed state.jsonl # Should show "true" +``` + +### Test 2: Shutdown & Restart +```bash +# Terminal 1: +cargo run # Running... +# Wait a bit + +# Terminal 2: +pkill -f "web_scraper" # Send shutdown + +# Check state: +grep update_companies state.jsonl # Should show "completed: false" + +# Restart: +cargo run # Continues from checkpoint +``` + +### Test 3: Verify No Reprocessing +```bash +# Modify a file to add 1000 test items +# Run first time - processes 1000, shutdown at 500 +# Check state.jsonl - shows "Incomplete: 500 items" +# Run second time - should skip first 500, process remaining 500 +``` + +## Summary + +The coordinated shutdown & state system ensures: + +1. **Work is never lost** - Progress recorded at shutdown +2. **No reprocessing** - Checkpoints skip completed items +3. **Transparent state** - `state.jsonl` shows exactly what's done +4. **Easy debugging** - Reason for incompleteness is recorded +5. **Graceful scaling** - Works with concurrent tasks and hard resets diff --git a/integrity/checkpoint_dependencies.dot b/integrity/checkpoint_dependencies.dot index 7c2eb21..23fe8bc 100644 --- a/integrity/checkpoint_dependencies.dot +++ b/integrity/checkpoint_dependencies.dot @@ -4,25 +4,25 @@ digraph Dependencies { "yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete Options data enriched for all companies"]; + "yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete +Corporate events enriched for all companies"]; "yahoo_companies_cleansed_no_data" [label="yahoo_companies_cleansed_no_data Companies cleansed of data with no Yahoo results"]; + "yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete +Chart data enriched for all companies"]; + "enrichment_group" [label="enrichment_group +Yahoo exchanges collected and validated"]; + "yahoo_companies_cleansed_low_profile" [label="yahoo_companies_cleansed_low_profile +Companies cleansed of low profile (insufficient market cap/price data)"]; "lei_figi_mapping_complete" [label="lei_figi_mapping_complete LEI-to-FIGI mappings from OpenFIGI API"]; "securities_data_complete" [label="securities_data_complete Securities data built from FIGI mappings"]; - "yahoo_companies_cleansed_low_profile" [label="yahoo_companies_cleansed_low_profile -Companies cleansed of low profile (insufficient market cap/price data)"]; - "yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete -Corporate events enriched for all companies"]; - "enrichment_group" [label="enrichment_group -Yahoo exchanges collected and validated"]; - "yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete -Chart data enriched for all companies"]; "yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed_low_profile" [label="via group enrichment_group"]; - "yahoo_companies_cleansed_no_data" -> "securities_data_complete"; - "securities_data_complete" -> "lei_figi_mapping_complete"; - "yahoo_companies_cleansed_low_profile" -> "yahoo_companies_cleansed_no_data"; "yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed_low_profile" [label="via group enrichment_group"]; + "yahoo_companies_cleansed_no_data" -> "securities_data_complete"; "yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed_low_profile" [label="via group enrichment_group"]; + "yahoo_companies_cleansed_low_profile" -> "yahoo_companies_cleansed_no_data"; + "securities_data_complete" -> "lei_figi_mapping_complete"; } diff --git a/src/corporate/collect_exchanges.rs b/src/corporate/collect_exchanges.rs index c8ee8a3..9a0f0bd 100644 --- a/src/corporate/collect_exchanges.rs +++ b/src/corporate/collect_exchanges.rs @@ -1,6 +1,6 @@ // src/corporate/collect_exchanges.rs use crate::util::directories::DataPaths; -use crate::util::integrity::{DataStage, StateManager, file_reference}; +use crate::util::integrity::{DataStage, StateEntry, StateManager, file_reference}; use crate::util::logger; use crate::corporate::types::*; @@ -244,11 +244,11 @@ fn get_fallback_rate(currency: &str) -> f64 { /// - Handles missing or invalid data gracefully /// - Integrity tracking with content hash validation pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result { + let output_path = paths.data_dir().join("yahoo_exchanges.json"); + let manager = StateManager::new(paths.integrity_dir()).await?; let step_name = "exchange_collection_complete"; - let output_path = paths.data_dir().join("yahoo_exchanges.json"); - if manager.is_step_valid(step_name).await? { logger::log_info(" Exchange collection already completed and valid").await; @@ -260,6 +260,7 @@ pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result anyhow::Result anyhow::Result anyhow::Result<()> { +) -> anyhow::Result { // Create content reference for the output file let content_reference = file_reference(output_path); @@ -402,14 +403,11 @@ async fn track_exchange_collection_completion( // - Dependencies: None (this is a collection step, not dependent on other tracked steps) // Note: In practice, it depends on core data, but we track the output file // which will change if core data changes, so explicit dependency not needed - manager.update_entry( + Ok(manager.create_entry( step_name.to_string(), content_reference, DataStage::Data, - None, // Use default TTL (7 days for Data stage) - ).await?; - - Ok(()) + ).await?) } /// Extract exchange information from a company's core data file diff --git a/src/corporate/update.rs b/src/corporate/update.rs index b75a6ee..3deb976 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -50,7 +50,7 @@ pub async fn run_full_update( check_shutdown!(shutdown_flag); logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; - let all_mapped = update_lei_mapping(&gleif_csv_path, None).await?; + let all_mapped = update_lei_mapping(&paths, &gleif_csv_path, None).await?; if !all_mapped { logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await; @@ -61,7 +61,7 @@ pub async fn run_full_update( check_shutdown!(shutdown_flag); logger::log_info("Step 4: Building securities map (streaming)...").await; - update_securities().await?; + update_securities(&paths).await?; logger::log_info(" ✓ Securities map updated").await; let paths = DataPaths::new(".")?; diff --git a/src/corporate/update_companies.rs b/src/corporate/update_companies.rs index b5f68c3..080e9c8 100644 --- a/src/corporate/update_companies.rs +++ b/src/corporate/update_companies.rs @@ -1,6 +1,7 @@ // src/corporate/update_companies.rs use super::{types::*, yahoo_company_extraction::*, helpers::*}; use crate::util::directories::DataPaths; +use crate::util::integrity::{DataStage, StateManager, file_reference}; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; use crate::scraper::hard_reset::perform_hard_reset; @@ -103,7 +104,7 @@ pub async fn update_companies( // Synchronization for hard reset let reset_in_progress = Arc::new(tokio::sync::Mutex::new(false)); - let securities_path = paths.corporate_dir().join("figi_securities"); + let securities_path = paths.figi_securities_dir(); let securities_checkpoint = securities_path.join("common_stocks.jsonl"); let securities_log = securities_path.join("common_stocks.log.jsonl"); @@ -123,7 +124,19 @@ pub async fn update_companies( if let Some(parent) = companies_path.parent() { tokio::fs::create_dir_all(parent).await?; } - + + let manager = StateManager::new(paths.integrity_dir()).await?; + let content_reference = file_reference(&companies_path); + let step_name = "corporate_companies_update"; + let data_stage = DataStage::Data; + + if manager.is_step_valid(step_name).await? { + logger::log_info(" Companies data already built and valid").await; + return Ok(securities.len()); + } + logger::log_info(" Companies data incomplete or missing, proceeding with update").await; + let entry: crate::util::integrity::StateEntry = manager.create_entry(step_name.to_string(), content_reference, data_stage).await?; + // === RECOVERY PHASE: Load checkpoint + replay log === let existing_companies = checkpoint_helpers::load_checkpoint_with_log( &companies_path, @@ -615,6 +628,23 @@ pub async fn update_companies( "✅ Completed: {} total companies ({} new, {} updated, {} hard resets)", final_count, final_new, final_updated, hard_reset_count )).await; + + // Track completion with: + // - Content reference: All output JSONL files + // - Data stage: Data (7-day TTL) - Securities data relatively stable + // - Dependencies: LEI-FIGI mapping must be valid + + // Check for shutdown BEFORE marking complete + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected during company update - marking as invalid for retry").await; + manager.mark_invalid( + entry, + format!("Invalid: processed {} of {} companies before shutdown", final_count, total), + ).await?; + } else { + // Only mark complete if we got here without shutdown + manager.mark_valid(entry).await?; + } Ok(final_count) } diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index 9c952ee..fcda58e 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -59,6 +59,11 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result Result= total_companies && !shutdown_flag.load(Ordering::SeqCst) { - track_events_completion(&manager, paths, step_name).await?; + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected during company update - marking as invalid for retry").await; + manager.mark_invalid( + entry, + format!("Invalid: processed {} companies before shutdown", final_processed), + ).await?; + } else { + manager.mark_valid(entry).await?; logger::log_info(" ✓ Event enrichment marked as complete with integrity tracking").await; } @@ -247,11 +253,11 @@ pub async fn enrich_companies_with_events( } /// Track event enrichment completion with content hash verification -async fn track_events_completion( +async fn create_events_state_entry( manager: &StateManager, paths: &DataPaths, step_name: &str, -) -> anyhow::Result<()> { +) -> anyhow::Result { // Create content reference for all event data // This will hash ALL files matching the pattern: {company}/events/data.jsonl let content_reference = directory_reference( @@ -271,14 +277,11 @@ async fn track_events_completion( // - Content reference: All event directories // - Data stage: Data (7-day TTL by default) // - Dependencies: Depends on cleaned companies data - manager.update_entry( + Ok(manager.create_entry( step_name.to_string(), content_reference, DataStage::Data, - None, // Use default TTL (7 days for Data stage) - ).await?; - - Ok(()) + ).await?) } /// Enrich a single company with event data @@ -425,6 +428,8 @@ pub async fn enrich_companies_with_option( logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await; return Ok(count); } + let entry = create_option_state_entry(&manager, paths, step_name).await?; + logger::log_info(" Option data needs refresh - starting enrichment").await; @@ -452,7 +457,7 @@ pub async fn enrich_companies_with_option( if pending_count == 0 { logger::log_info(" ✓ All companies already enriched").await; - track_option_completion(&manager, paths, step_name).await?; + manager.mark_valid(entry).await?; return Ok(enriched_companies.len()); } @@ -560,8 +565,14 @@ pub async fn enrich_companies_with_option( )).await; // Mark as complete if all companies processed - if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) { - track_option_completion(&manager, paths, step_name).await?; + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected during company update - marking as invalid for retry").await; + manager.mark_invalid( + entry, + format!("Invalid: processed {} companies before shutdown", final_processed), + ).await?; + } else { + manager.mark_valid(entry).await?; logger::log_info(" ✓ Option enrichment marked as complete with integrity tracking").await; } @@ -569,11 +580,11 @@ pub async fn enrich_companies_with_option( } /// Track option enrichment completion with content hash verification -async fn track_option_completion( +async fn create_option_state_entry( manager: &StateManager, paths: &DataPaths, step_name: &str, -) -> anyhow::Result<()> { +) -> anyhow::Result { // Create content reference for all option data // This will hash ALL files matching the pattern: {company}/option/data.jsonl let content_reference = directory_reference( @@ -593,14 +604,11 @@ async fn track_option_completion( // - Content reference: All option directories // - Data stage: Data (7-day TTL by default) // - Dependencies: Depends on cleaned companies data - manager.update_entry( + Ok(manager.create_entry( step_name.to_string(), content_reference, DataStage::Data, - None, // Use default TTL (7 days for Data stage) - ).await?; - - Ok(()) + ).await?) } /// Enrich a single company with option data @@ -684,6 +692,7 @@ pub async fn enrich_companies_with_chart( logger::log_info(&format!(" ✓ Found {} companies with valid chart data", count)).await; return Ok(count); } + let entry = create_chart_state_entry(&manager, paths, step_name).await?; logger::log_info(" Chart data needs refresh - starting enrichment").await; @@ -711,7 +720,7 @@ pub async fn enrich_companies_with_chart( if pending_count == 0 { logger::log_info(" ✓ All companies already enriched").await; - track_chart_completion(&manager, paths, step_name).await?; + manager.mark_valid(entry).await?; return Ok(enriched_companies.len()); } @@ -819,8 +828,14 @@ pub async fn enrich_companies_with_chart( )).await; // Mark as complete if all companies processed - if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) { - track_chart_completion(&manager, paths, step_name).await?; + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected during company update - marking as invalid for retry").await; + manager.mark_invalid( + entry, + format!("Invalid: processed {} companies before shutdown", final_processed), + ).await?; + } else { + manager.mark_valid(entry).await?; logger::log_info(" ✓ Chart enrichment marked as complete with integrity tracking").await; } @@ -828,11 +843,11 @@ pub async fn enrich_companies_with_chart( } /// Track chart enrichment completion with content hash verification -async fn track_chart_completion( +async fn create_chart_state_entry( manager: &StateManager, paths: &DataPaths, step_name: &str, -) -> anyhow::Result<()> { +) -> anyhow::Result { // Create content reference for all chart data // This will hash ALL files matching the pattern: {company}/chart/data.jsonl let content_reference = directory_reference( @@ -852,14 +867,11 @@ async fn track_chart_completion( // - Content reference: All chart directories // - Data stage: Data (7-day TTL by default) // - Dependencies: Depends on cleaned companies data - manager.update_entry( + Ok(manager.create_entry( step_name.to_string(), content_reference, DataStage::Data, - None, // Use default TTL (7 days for Data stage) - ).await?; - - Ok(()) + ).await?) } /// Enrich a single company with chart data diff --git a/src/corporate/update_openfigi.rs b/src/corporate/update_openfigi.rs index eeb90b6..c624fa9 100644 --- a/src/corporate/update_openfigi.rs +++ b/src/corporate/update_openfigi.rs @@ -28,32 +28,42 @@ const LEI_BATCH_SIZE: usize = 100; // Process 100 LEIs at a time /// /// # Errors /// Returns an error if file I/O fails or JSON parsing fails. -pub async fn update_securities() -> anyhow::Result<()> { +pub async fn update_securities(paths: &DataPaths) -> anyhow::Result<()> { logger::log_info("Building securities data from FIGI mappings...").await; - - let dir = DataPaths::new(".")?; - let manager = StateManager::new(&dir.integrity_dir()).await?; - let step_name = "securities_data_complete"; - let date_dir = find_most_recent_figi_date_dir(&dir).await? + let date_dir = find_most_recent_figi_date_dir(&paths).await? .ok_or_else(|| anyhow!("No FIGI date directory found"))?; - - let data_dir = dir.data_dir(); - let output_dir = data_dir.join("figi_securities"); - tokio_fs::create_dir_all(&output_dir).await - .context("Failed to create corporate/by_name directory")?; + let output_dir = paths.figi_securities_dir(); + + let manager = StateManager::new(&paths.integrity_dir()).await?; + let step_name = "securities_data_complete"; + let content_reference = directory_reference( + output_dir, + Some(vec![ + "common_stocks.jsonl".to_string(), + "warrants.jsonl".to_string(), + "options.jsonl".to_string(), + "corporate_bonds.jsonl".to_string(), + "government_bonds.jsonl".to_string(), + ]), + Some(vec![ + "*.log.jsonl".to_string(), // Exclude log files + "*.tmp".to_string(), // Exclude temp files + "state.jsonl".to_string(), // Exclude internal state tracking + ]), + ); + let data_stage = DataStage::Data; if manager.is_step_valid(step_name).await? { logger::log_info(" Securities data already built and valid").await; logger::log_info(" All sectors already processed, nothing to do").await; return Ok(()); } + logger::log_info(" Securities data incomplete or missing, proceeding with update").await; + let entry = manager.create_entry(step_name.to_string(), content_reference, data_stage).await?; logger::log_info("Building securities data from FIGI mappings...").await; - tokio_fs::create_dir_all(&output_dir).await - .context("Failed to create corporate/by_name directory")?; - // Setup checkpoint and log paths for each security type let common_checkpoint = output_dir.join("common_stocks.jsonl"); let common_log = output_dir.join("common_stocks.log.jsonl"); @@ -104,6 +114,7 @@ pub async fn update_securities() -> anyhow::Result<()> { if sectors_to_process.is_empty() { logger::log_info(" All sectors already processed, nothing to do").await; + manager.mark_valid(entry).await?; return Ok(()); } @@ -170,48 +181,12 @@ pub async fn update_securities() -> anyhow::Result<()> { stats.print_summary(); logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await; - track_securities_completion(&manager, &output_dir).await?; + manager.mark_valid(entry).await?; logger::log_info(" ✓ Securities data marked as complete with integrity tracking").await; Ok(()) } -/// Track securities data completion with content hash verification -async fn track_securities_completion( - manager: &StateManager, - output_dir: &Path, -) -> anyhow::Result<()> { - // Create content reference for all output files - let content_reference = directory_reference( - output_dir, - Some(vec![ - "common_stocks.jsonl".to_string(), - "warrants.jsonl".to_string(), - "options.jsonl".to_string(), - "corporate_bonds.jsonl".to_string(), - "government_bonds.jsonl".to_string(), - ]), - Some(vec![ - "*.log.jsonl".to_string(), // Exclude log files - "*.tmp".to_string(), // Exclude temp files - "state.jsonl".to_string(), // Exclude internal state tracking - ]), - ); - - // Track completion with: - // - Content reference: All output JSONL files - // - Data stage: Data (7-day TTL) - Securities data relatively stable - // - Dependencies: LEI-FIGI mapping must be valid - manager.update_entry( - "securities_data_complete".to_string(), - content_reference, - DataStage::Data, - None, // Use default TTL (7 days) - ).await?; - - Ok(()) -} - /// Loads the list of sectors that have been fully processed async fn load_processed_sectors(path: &Path) -> anyhow::Result> { let mut sectors = HashSet::new(); @@ -1442,29 +1417,42 @@ pub async fn stream_gleif_csv_and_build_figi_filtered( /// Check mapping completion and process only unmapped LEIs pub async fn update_lei_mapping( + paths: &DataPaths, csv_path: &str, gleif_date: Option<&str>, ) -> anyhow::Result { - let dir = DataPaths::new(".")?; - let manager = StateManager::new(&dir.integrity_dir()).await?; - let step_name = "lei_figi_mapping_complete"; - - let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); - let date = determine_gleif_date(gleif_date, &dir).await?; + let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); + let date = determine_gleif_date(gleif_date, &paths).await?; let date_dir = map_cache_dir.join(&date); + let manager = StateManager::new(&paths.integrity_dir()).await?; + let step_name = "lei_figi_mapping_complete"; + let content_reference = directory_reference( + &date_dir, + Some(vec![ + "*/lei_to_figi.jsonl".to_string(), // All sector mapping files + "no_results.jsonl".to_string(), // LEIs with no results + ]), + Some(vec![ + "*.tmp".to_string(), // Exclude temp files + "*.log".to_string(), // Exclude log files + ]), + ); + let data_stage = DataStage::Cache; // 24-hour TTL for API data + if manager.is_step_valid(step_name).await? { logger::log_info(" LEI-FIGI mapping already completed and valid").await; logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await; return Ok(true); } + let entry = manager.create_entry(step_name.to_string(), content_reference, data_stage).await?; // Get unmapped LEIs (excludes both mapped and no-result LEIs) let unmapped = get_unmapped_leis(csv_path, &date_dir).await?; if unmapped.is_empty() { logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await; - track_lei_mapping_completion(&manager, &dir.integrity_dir()).await?; + manager.mark_valid(entry).await?; logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await; return Ok(true); @@ -1480,7 +1468,7 @@ pub async fn update_lei_mapping( if still_unmapped.is_empty() { logger::log_info("✓ All LEIs successfully queried").await; - track_lei_mapping_completion(&manager, &date_dir).await?; + manager.mark_valid(entry).await?; logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await; Ok(true) } else { @@ -1488,43 +1476,11 @@ pub async fn update_lei_mapping( "⚠ {} LEIs still unqueried (API errors or rate limits)", still_unmapped.len() )).await; + manager.mark_invalid(entry, " Some LEIs remain unqueried".to_string()).await?; Ok(false) } } -/// Track LEI-FIGI mapping completion with content hash verification -async fn track_lei_mapping_completion( - manager: &StateManager, - date_dir: &Path, -) -> anyhow::Result<()> { - // Create content reference for all FIGI mapping files - // This will hash ALL lei_to_figi.jsonl files in sector directories - let content_reference = directory_reference( - date_dir, - Some(vec![ - "*/lei_to_figi.jsonl".to_string(), // All sector mapping files - "no_results.jsonl".to_string(), // LEIs with no results - ]), - Some(vec![ - "*.tmp".to_string(), // Exclude temp files - "*.log".to_string(), // Exclude log files - ]), - ); - - // Track completion with: - // - Content reference: All FIGI mapping files in date directory - // - Data stage: Cache (24-hour TTL) - FIGI data can change frequently - // - Dependencies: None (this is a collection step from external API) - manager.update_entry( - "lei_figi_mapping_complete".to_string(), - content_reference, - DataStage::Cache, // 24-hour TTL for API data - None, // Use default TTL - ).await?; - - Ok(()) -} - /// Load LEIs that were queried but returned no results async fn load_no_result_leis(date_dir: &Path) -> anyhow::Result> { let mut no_result_leis = HashSet::new(); diff --git a/src/corporate/yahoo_company_extraction.js b/src/corporate/yahoo_company_extraction.js index 0b72d81..f5f88d5 100644 --- a/src/corporate/yahoo_company_extraction.js +++ b/src/corporate/yahoo_company_extraction.js @@ -20,14 +20,20 @@ // Using a wrapper to ensure the result is properly captured var extractionResult = (function() { try { - // Check for "No results found" message using exact selector - const noDataElement = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn'); + // Check for "No results found" message using very flexible selector + const noDataElement = document.querySelector('[class*="noData"]') || + document.querySelector('[class*="error"]') || + (document.body.innerText && document.body.innerText.includes('No results')); if (noDataElement) { return { status: 'no_results', ticker: null, sector: null, exchange: null }; } - // Find the results table using exact selector - const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table'); + // Find the results table using most flexible selector possible + // Try multiple strategies to find the table + const table = document.querySelector('table') || + document.querySelector('[role="table"]') || + document.querySelector('.table') || + document.querySelector('#main-content-wrapper > section > section[class*="container"] > div[class*="tableContainer"] > div > table'); if (!table) { return { status: 'no_results', ticker: null, sector: null, exchange: null }; } diff --git a/src/corporate/yahoo_company_extraction.rs b/src/corporate/yahoo_company_extraction.rs index 7ea3ab1..d32c29a 100644 --- a/src/corporate/yahoo_company_extraction.rs +++ b/src/corporate/yahoo_company_extraction.rs @@ -123,13 +123,20 @@ pub async fn scrape_company_details_by_isin( } } - // Additional content validation + // Additional content validation - look for table or noData element anywhere on page let page_ready: bool = client .execute( r#" - const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table'); - const noData = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn'); - return !!(table || noData); + // Try multiple selector strategies + const table = document.querySelector('table') || + document.querySelector('[role="table"]') || + document.querySelector('.table'); + const noData = document.querySelector('[class*="noData"]') || + document.querySelector('[class*="error"]') || + document.body.innerText.includes('No results'); + const hasContent = !!(table || noData); + console.log('Page ready check - table:', !!table, 'noData:', !!noData, 'hasContent:', hasContent); + return hasContent; "#, vec![], ) @@ -203,7 +210,7 @@ pub async fn extract_company_details( client: &Client, _isin: &str, ) -> Result> { - // Wait for page to load - look for either the table or the no-data element + // Wait for page to load - look for either the table or the no-data element using simple selectors let wait_result: Result> = timeout( TokioDuration::from_secs(30), async { @@ -211,9 +218,14 @@ pub async fn extract_company_details( let has_content: bool = client .execute( r#" - const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table'); - const noData = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn'); - return !!(table || noData); + // Use flexible selectors that don't depend on exact DOM structure + const table = document.querySelector('table') || + document.querySelector('[role="table"]') || + document.querySelector('.table'); + const noData = document.querySelector('[class*="noData"]') || + document.querySelector('[class*="error"]'); + const hasContent = !!(table || noData); + return hasContent; "#, vec![], ) diff --git a/src/economic/update.rs b/src/economic/update.rs index a8c1b60..09dbab0 100644 --- a/src/economic/update.rs +++ b/src/economic/update.rs @@ -192,7 +192,7 @@ pub fn process_batch( let mut changes = Vec::new(); let mut removed = std::collections::HashSet::new(); - let identity_map = build_identity_lookup(existing); + //let identity_map = build_identity_lookup(existing); let date_map = build_date_event_lookup(existing); for new in new_events { diff --git a/src/economic/yahoo_update_forex.rs b/src/economic/yahoo_update_forex.rs index 321eea7..e8e25d6 100644 --- a/src/economic/yahoo_update_forex.rs +++ b/src/economic/yahoo_update_forex.rs @@ -114,6 +114,11 @@ pub async fn collect_fx_rates( logger::log_info(&format!(" ✓ Found {} currencies with chart data", count)).await; return Ok(count); } + let entry = manager.create_entry( + step_name.to_string(), + content_reference.clone(), + DataStage::Data, + ).await?; logger::log_info(" Updating missing forex data...").await; @@ -165,12 +170,7 @@ pub async fn collect_fx_rates( if pending_count == 0 { logger::log_info(" ✓ All currencies already collected").await; - manager.update_entry( - step_name.to_string(), - content_reference, - DataStage::Data, - None, // Use default TTL (7 days for Data stage) - ).await?; + manager.mark_valid(entry).await?; return Ok(collected_currencies.len()); } @@ -316,12 +316,7 @@ pub async fn collect_fx_rates( // Mark as complete if not shutdown if !shutdown_flag.load(Ordering::SeqCst) { - manager.update_entry( - step_name.to_string(), - content_reference, - DataStage::Data, - None, // Use default TTL (7 days for Data stage) - ).await?; + manager.mark_valid(entry).await?; } Ok(final_success) } diff --git a/src/util/integrity.rs b/src/util/integrity.rs index fffa986..a0fd416 100644 --- a/src/util/integrity.rs +++ b/src/util/integrity.rs @@ -1,5 +1,5 @@ // src/util/integrity.rs -//! Content integrity and state lifecycle management module with centralized dependencies +//! Content integrity and state lifecycle management module //! //! Features: //! - File and directory hashing (SHA-256) @@ -7,7 +7,7 @@ //! - State invalidation based on time or validation failures //! - 3-stage data lifecycle: cache → data → storage //! - Inline vs. external hash storage based on size -//! - **Centralized dependency configuration** (Single Source of Truth) +//! - Centralized dependency configuration (Single Source of Truth) //! - Support for checkpoint groups and hierarchies //! - Automatic transitive dependency resolution //! - Cycle detection in dependency graph @@ -24,97 +24,64 @@ use tokio::fs as async_fs; use tokio::io::AsyncWriteExt; // ============================================================================ -// CONSTANTS & CONFIGURATION +// CONSTANTS // ============================================================================ -/// Maximum hash size (in bytes) to store inline in state.jsonl -/// Hashes larger than this will be stored in separate files const INLINE_HASH_THRESHOLD: usize = 1024; - -/// Directory for storing external hash files const HASH_STORAGE_DIR: &str = ".integrity_hashes"; - -/// File extension for external hash files const HASH_FILE_EXT: &str = ".hash"; - -/// Default dependency configuration file name const DEFAULT_DEPENDENCY_CONFIG: &str = "checkpoint_dependencies.toml"; // ============================================================================ -// DEPENDENCY CONFIGURATION (SINGLE SOURCE OF TRUTH) +// DEPENDENCY CONFIGURATION // ============================================================================ -/// Centralized dependency configuration -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct DependencyConfig { - /// Individual checkpoint dependencies #[serde(default)] pub checkpoints: HashMap, - - /// Checkpoint groups (for hierarchical dependencies) #[serde(default)] pub groups: HashMap, } -/// Configuration for a single checkpoint #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CheckpointConfig { - /// Description of this checkpoint #[serde(default)] pub description: String, - - /// Direct dependencies (checkpoint names this depends on) #[serde(default)] pub depends_on: Vec, - - /// Whether this checkpoint is part of a group #[serde(default)] pub group: Option, } -/// Configuration for a checkpoint group #[derive(Debug, Clone, Serialize, Deserialize)] pub struct GroupConfig { - /// Description of this group #[serde(default)] pub description: String, - - /// Members of this group pub members: Vec, - - /// Dependencies for all members of this group #[serde(default)] pub depends_on: Vec, } impl DependencyConfig { - /// Load dependency configuration from TOML file + /// Load from file or return empty config pub async fn from_file>(path: P) -> Result { - let content = async_fs::read_to_string(path.as_ref()) - .await - .with_context(|| format!("Failed to read dependency config: {}", path.as_ref().display()))?; - - let config: DependencyConfig = toml::from_str(&content) - .context("Failed to parse dependency config")?; - - config.validate()?; - - Ok(config) - } - - /// Load from default location (dependencies.toml in base_dir) - pub async fn from_default_location>(base_dir: P) -> Result { - let config_path = base_dir.as_ref().join(DEFAULT_DEPENDENCY_CONFIG); - - if !config_path.exists() { - // Return empty config if file doesn't exist + let path = path.as_ref(); + if !path.exists() { return Ok(Self::default()); } - Self::from_file(config_path).await + let content = async_fs::read_to_string(path).await + .with_context(|| format!("Failed to read: {}", path.display()))?; + + let config: Self = toml::from_str(&content) + .context("Failed to parse dependency config")?; + + config.validate()?; + Ok(config) } - /// Validate configuration (check for cycles, invalid references) + /// Validate configuration (checks for cycles and invalid references) pub fn validate(&self) -> Result<()> { // Check for cycles for checkpoint in self.checkpoints.keys() { @@ -125,21 +92,20 @@ impl DependencyConfig { for (group_name, group) in &self.groups { for member in &group.members { if !self.checkpoints.contains_key(member) { - bail!("Group '{}' references non-existent checkpoint: {}", group_name, member); + bail!("Group '{}' references unknown checkpoint: {}", group_name, member); } } } - // Validate that checkpoints in groups actually declare the group + // Validate checkpoint group declarations for (checkpoint_name, checkpoint) in &self.checkpoints { if let Some(group_name) = &checkpoint.group { - if let Some(group) = self.groups.get(group_name) { - if !group.members.contains(checkpoint_name) { - bail!("Checkpoint '{}' claims to be in group '{}' but group doesn't list it", - checkpoint_name, group_name); - } - } else { - bail!("Checkpoint '{}' references non-existent group: {}", checkpoint_name, group_name); + let group = self.groups.get(group_name) + .ok_or_else(|| anyhow::anyhow!("Checkpoint '{}' references unknown group: {}", checkpoint_name, group_name))?; + + if !group.members.contains(checkpoint_name) { + bail!("Checkpoint '{}' claims group '{}' but group doesn't list it", + checkpoint_name, group_name); } } } @@ -147,78 +113,63 @@ impl DependencyConfig { Ok(()) } - /// Detect if there's a cycle in dependencies starting from checkpoint + /// Detect cycles using DFS fn detect_cycle(&self, start: &str) -> Result<()> { let mut visited = HashSet::new(); let mut stack = HashSet::new(); - - self.detect_cycle_helper(start, &mut visited, &mut stack) + self.dfs_cycle_check(start, &mut visited, &mut stack) } - fn detect_cycle_helper( - &self, - checkpoint: &str, - visited: &mut HashSet, - stack: &mut HashSet, - ) -> Result<()> { - if stack.contains(checkpoint) { - bail!("Cycle detected in dependency graph at checkpoint: {}", checkpoint); + fn dfs_cycle_check(&self, node: &str, visited: &mut HashSet, stack: &mut HashSet) -> Result<()> { + if stack.contains(node) { + bail!("Cycle detected at checkpoint: {}", node); } - - if visited.contains(checkpoint) { + if visited.contains(node) { return Ok(()); } - visited.insert(checkpoint.to_string()); - stack.insert(checkpoint.to_string()); + visited.insert(node.to_string()); + stack.insert(node.to_string()); - // Check direct dependencies - if let Some(config) = self.checkpoints.get(checkpoint) { + if let Some(config) = self.checkpoints.get(node) { for dep in &config.depends_on { - self.detect_cycle_helper(dep, visited, stack)?; + self.dfs_cycle_check(dep, visited, stack)?; } } - stack.remove(checkpoint); + stack.remove(node); Ok(()) } - /// Get all dependencies for a checkpoint (including transitive and group dependencies) + /// Get all dependencies (including transitive and group dependencies) pub fn get_all_dependencies(&self, checkpoint: &str) -> Result> { - let mut all_deps = Vec::new(); + let mut deps = Vec::new(); let mut visited = HashSet::new(); - - self.collect_dependencies(checkpoint, &mut all_deps, &mut visited)?; + self.collect_deps(checkpoint, &mut deps, &mut visited)?; // Remove duplicates while preserving order let mut seen = HashSet::new(); - all_deps.retain(|dep| seen.insert(dep.clone())); + deps.retain(|d| seen.insert(d.clone())); - Ok(all_deps) + Ok(deps) } - fn collect_dependencies( - &self, - checkpoint: &str, - deps: &mut Vec, - visited: &mut HashSet, - ) -> Result<()> { - if visited.contains(checkpoint) { + fn collect_deps(&self, node: &str, deps: &mut Vec, visited: &mut HashSet) -> Result<()> { + if visited.contains(node) { return Ok(()); } - visited.insert(checkpoint.to_string()); + visited.insert(node.to_string()); - // Get checkpoint config - let config = self.checkpoints.get(checkpoint) - .ok_or_else(|| anyhow::anyhow!("Unknown checkpoint: {}", checkpoint))?; + let config = self.checkpoints.get(node) + .ok_or_else(|| anyhow::anyhow!("Unknown checkpoint: {}", node))?; - // Add group dependencies first (if checkpoint is in a group) + // Add group dependencies first if let Some(group_name) = &config.group { if let Some(group) = self.groups.get(group_name) { for dep in &group.depends_on { if !visited.contains(dep) { deps.push(dep.clone()); - self.collect_dependencies(dep, deps, visited)?; + self.collect_deps(dep, deps, visited)?; } } } @@ -228,43 +179,40 @@ impl DependencyConfig { for dep in &config.depends_on { if !visited.contains(dep) { deps.push(dep.clone()); - self.collect_dependencies(dep, deps, visited)?; + self.collect_deps(dep, deps, visited)?; } } Ok(()) } - /// Get dependency graph as DOT format (for visualization) + /// Generate DOT format for visualization pub fn to_dot(&self) -> String { - let mut dot = String::from("digraph Dependencies {\n"); - dot.push_str(" rankdir=LR;\n"); - dot.push_str(" node [shape=box];\n\n"); + let mut dot = String::from("digraph Dependencies {\n rankdir=LR;\n node [shape=box];\n\n"); - // Add checkpoints + // Nodes for (name, config) in &self.checkpoints { let label = if config.description.is_empty() { name.clone() } else { - format!("{}\n{}", name, config.description) + format!("{}\\n{}", name, config.description) }; dot.push_str(&format!(" \"{}\" [label=\"{}\"];\n", name, label)); } - // Add dependencies + // Edges dot.push_str("\n"); for (name, config) in &self.checkpoints { - // Add group dependencies + // Group dependencies if let Some(group_name) = &config.group { if let Some(group) = self.groups.get(group_name) { for dep in &group.depends_on { - dot.push_str(&format!(" \"{}\" -> \"{}\" [label=\"via group {}\"];\n", - name, dep, group_name)); + dot.push_str(&format!(" \"{}\" -> \"{}\" [label=\"via {}\"];\n", name, dep, group_name)); } } } - // Add direct dependencies + // Direct dependencies for dep in &config.depends_on { dot.push_str(&format!(" \"{}\" -> \"{}\";\n", name, dep)); } @@ -275,150 +223,89 @@ impl DependencyConfig { } } -impl Default for DependencyConfig { - fn default() -> Self { - Self { - checkpoints: HashMap::new(), - groups: HashMap::new(), - } - } -} - // ============================================================================ // DATA STRUCTURES // ============================================================================ -/// Represents a content reference that can be hashed #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(tag = "type", rename_all = "lowercase")] pub enum ContentReference { - /// Single file reference File { path: PathBuf }, - - /// Directory reference (includes all files recursively) Directory { path: PathBuf, - /// Optional: specific files/patterns to include include_patterns: Option>, - /// Optional: files/patterns to exclude exclude_patterns: Option>, }, - - /// Multiple files/directories combined - Composite { - references: Vec, - }, + Composite { references: Vec }, } -/// Storage location for hash data #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(tag = "storage", rename_all = "lowercase")] pub enum HashStorage { - /// Hash stored directly in state.jsonl Inline { hash: String }, - - /// Hash stored in external file External { hash_file: PathBuf }, } -/// Data lifecycle stage #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "lowercase")] pub enum DataStage { - /// Temporary/staged data (fast-changing, short-lived) Cache, - - /// Processed data (intermediate results, medium-lived) Data, - - /// Final storage (long-term, stable data) Storage, } impl DataStage { - /// Get default TTL (time-to-live) for this stage pub fn default_ttl(&self) -> Duration { match self { - DataStage::Cache => Duration::hours(24), // 1 day - DataStage::Data => Duration::days(7), // 1 week - DataStage::Storage => Duration::days(365), // 1 year + Self::Cache => Duration::hours(24), + Self::Data => Duration::days(7), + Self::Storage => Duration::days(365), } } - /// Get suggested revalidation interval for this stage pub fn revalidation_interval(&self) -> Duration { match self { - DataStage::Cache => Duration::hours(6), // Every 6 hours - DataStage::Data => Duration::days(1), // Daily - DataStage::Storage => Duration::days(30), // Monthly + Self::Cache => Duration::hours(6), + Self::Data => Duration::days(1), + Self::Storage => Duration::days(30), } } } -/// Enhanced state entry with content integrity tracking #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StateEntry { - /// Step/function name pub step_name: String, - - /// Whether this step is completed pub completed: bool, - - /// Completion timestamp pub completed_at: Option>, - - /// Content reference for validation pub content_reference: Option, - - /// Hash of the content pub content_hash: Option, - - /// Data lifecycle stage pub data_stage: Option, - - /// Custom TTL override (if None, uses stage default) pub ttl_override: Option, - - /// Last validation timestamp pub last_validated_at: Option>, - - /// Validation status pub validation_status: ValidationStatus, - - /// Dependencies (resolved automatically from config, stored for reference) #[serde(default)] pub dependencies: Vec, } -/// Validation status of a state entry #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "lowercase")] pub enum ValidationStatus { - /// Not yet validated Unknown, - - /// Validated and content matches hash Valid, - - /// Validation failed (hash mismatch or content missing) Invalid { reason: String }, - - /// Expired (beyond TTL) Expired, - - /// Invalidated due to dependency failure DependencyFailed { failed_dependency: String }, } // ============================================================================ -// HASH COMPUTATION (UNCHANGED FROM ORIGINAL) +// HASH COMPUTATION // ============================================================================ /// Hash a single file using SHA-256 pub fn hash_file>(path: P) -> Result { let path = path.as_ref(); let file = fs::File::open(path) - .with_context(|| format!("Failed to open file: {}", path.display()))?; + .with_context(|| format!("Failed to open: {}", path.display()))?; let mut reader = BufReader::new(file); let mut hasher = Sha256::new(); @@ -426,9 +313,7 @@ pub fn hash_file>(path: P) -> Result { loop { let bytes_read = reader.read(&mut buffer)?; - if bytes_read == 0 { - break; - } + if bytes_read == 0 { break; } hasher.update(&buffer[..bytes_read]); } @@ -442,97 +327,72 @@ pub fn hash_directory>( exclude_patterns: Option<&[String]>, ) -> Result { let path = path.as_ref(); - if !path.is_dir() { - bail!("Path is not a directory: {}", path.display()); + bail!("Not a directory: {}", path.display()); } let mut files = Vec::new(); collect_files_recursive(path, &mut files, include_patterns, exclude_patterns)?; - files.sort(); if files.is_empty() { - return Ok(String::from("d41d8cd98f00b204e9800998ecf8427e")); + return Ok(String::from("d41d8cd98f00b204e9800998ecf8427e")); // Empty hash } - let mut combined_hasher = Sha256::new(); - + let mut hasher = Sha256::new(); for file_path in files { let rel_path = file_path.strip_prefix(path) .unwrap_or(&file_path) .to_string_lossy(); - combined_hasher.update(rel_path.as_bytes()); - - let file_hash = hash_file(&file_path)?; - combined_hasher.update(file_hash.as_bytes()); + hasher.update(rel_path.as_bytes()); + hasher.update(hash_file(&file_path)?.as_bytes()); } - Ok(format!("{:x}", combined_hasher.finalize())) + Ok(format!("{:x}", hasher.finalize())) } -/// Collect files recursively with pattern filtering fn collect_files_recursive( dir: &Path, files: &mut Vec, - include_patterns: Option<&[String]>, - exclude_patterns: Option<&[String]>, + include: Option<&[String]>, + exclude: Option<&[String]>, ) -> Result<()> { - if !dir.is_dir() { - return Ok(()); - } - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); + let path = entry?.path(); - if let Some(name) = path.file_name() { - if name.to_string_lossy().starts_with('.') { - continue; - } + // Skip hidden files + if path.file_name() + .and_then(|n| n.to_str()) + .map_or(false, |n| n.starts_with('.')) { + continue; } if path.is_dir() { - collect_files_recursive(&path, files, include_patterns, exclude_patterns)?; - } else if path.is_file() { - if should_include_file(&path, include_patterns, exclude_patterns) { - files.push(path); - } + collect_files_recursive(&path, files, include, exclude)?; + } else if path.is_file() && should_include(&path, include, exclude) { + files.push(path); } } - Ok(()) } -/// Check if file should be included based on patterns -fn should_include_file( - path: &Path, - include_patterns: Option<&[String]>, - exclude_patterns: Option<&[String]>, -) -> bool { +fn should_include(path: &Path, include: Option<&[String]>, exclude: Option<&[String]>) -> bool { let path_str = path.to_string_lossy(); - if let Some(excludes) = exclude_patterns { - for pattern in excludes { - if glob_match(&path_str, pattern) { - return false; - } + // Check exclusions first + if let Some(patterns) = exclude { + if patterns.iter().any(|p| glob_match(&path_str, p)) { + return false; } } - if let Some(includes) = include_patterns { - for pattern in includes { - if glob_match(&path_str, pattern) { - return true; - } - } - return false; + // Check inclusions + match include { + Some(patterns) => patterns.iter().any(|p| glob_match(&path_str, p)), + None => true, } - - true } -/// Simple glob pattern matching fn glob_match(path: &str, pattern: &str) -> bool { if pattern.contains('*') { let parts: Vec<&str> = pattern.split('*').collect(); @@ -551,25 +411,23 @@ pub fn hash_content_reference(reference: &ContentReference) -> Result { match reference { ContentReference::File { path } => hash_file(path), ContentReference::Directory { path, include_patterns, exclude_patterns } => { - hash_directory( - path, - include_patterns.as_deref(), - exclude_patterns.as_deref(), - ) + hash_directory(path, include_patterns.as_deref(), exclude_patterns.as_deref()) } ContentReference::Composite { references } => { - let mut combined_hasher = Sha256::new(); + let mut hasher = Sha256::new(); for ref_item in references { - let hash = hash_content_reference(ref_item)?; - combined_hasher.update(hash.as_bytes()); + hasher.update(hash_content_reference(ref_item)?.as_bytes()); } - Ok(format!("{:x}", combined_hasher.finalize())) + Ok(format!("{:x}", hasher.finalize())) } } } -/// Determine whether to store hash inline or externally -fn determine_hash_storage(hash: &str, base_dir: &Path) -> HashStorage { +// ============================================================================ +// HASH STORAGE +// ============================================================================ + +fn determine_storage(hash: &str, base_dir: &Path) -> HashStorage { if hash.len() > INLINE_HASH_THRESHOLD { let hash_dir = base_dir.join(HASH_STORAGE_DIR); let hash_file = hash_dir.join(format!("{}{}", &hash[..16], HASH_FILE_EXT)); @@ -579,7 +437,6 @@ fn determine_hash_storage(hash: &str, base_dir: &Path) -> HashStorage { } } -/// Store hash to file if external storage async fn store_hash(hash: &str, storage: &HashStorage) -> Result<()> { if let HashStorage::External { hash_file } = storage { if let Some(parent) = hash_file.parent() { @@ -590,13 +447,11 @@ async fn store_hash(hash: &str, storage: &HashStorage) -> Result<()> { Ok(()) } -/// Load hash from storage async fn load_hash(storage: &HashStorage) -> Result { match storage { HashStorage::Inline { hash } => Ok(hash.clone()), HashStorage::External { hash_file } => { - let content = async_fs::read_to_string(hash_file).await?; - Ok(content.trim().to_string()) + Ok(async_fs::read_to_string(hash_file).await?.trim().to_string()) } } } @@ -607,43 +462,38 @@ async fn load_hash(storage: &HashStorage) -> Result { /// Validate a single state entry async fn validate_entry(entry: &StateEntry) -> Result { + // Check if completed if !entry.completed { return Ok(ValidationStatus::Unknown); } - let content_ref = match &entry.content_reference { - Some(r) => r, - None => return Ok(ValidationStatus::Unknown), + // Get content reference and hash + let (content_ref, hash_storage) = match (&entry.content_reference, &entry.content_hash) { + (Some(r), Some(h)) => (r, h), + _ => return Ok(ValidationStatus::Unknown), }; - let stored_hash_storage = match &entry.content_hash { - Some(h) => h, - None => return Ok(ValidationStatus::Unknown), - }; - - let stored_hash = load_hash(stored_hash_storage).await?; + // Load stored hash + let stored_hash = load_hash(hash_storage).await?; + // Compute current hash let current_hash = match hash_content_reference(content_ref) { Ok(h) => h, - Err(e) => { - return Ok(ValidationStatus::Invalid { - reason: format!("Failed to compute hash: {}", e), - }); - } + Err(e) => return Ok(ValidationStatus::Invalid { + reason: format!("Failed to compute hash: {}", e) + }), }; + // Check hash match if stored_hash != current_hash { - return Ok(ValidationStatus::Invalid { - reason: "Hash mismatch".to_string(), - }); + return Ok(ValidationStatus::Invalid { reason: "Hash mismatch".to_string() }); } + // Check TTL if let Some(stage) = entry.data_stage { let ttl = entry.ttl_override.unwrap_or_else(|| stage.default_ttl()); - if let Some(completed_at) = entry.completed_at { - let age = Utc::now() - completed_at; - if age > ttl { + if Utc::now() - completed_at > ttl { return Ok(ValidationStatus::Expired); } } @@ -652,12 +502,11 @@ async fn validate_entry(entry: &StateEntry) -> Result { Ok(ValidationStatus::Valid) } -/// Validate all state entries and handle cascade invalidation -pub async fn validate_all_entries( - entries: &mut HashMap, -) -> Result { +/// Validate all entries with cascade invalidation +async fn validate_all_entries(entries: &mut HashMap) -> Result { let mut report = ValidationReport::default(); + // Validate each entry for (name, entry) in entries.iter_mut() { let status = validate_entry(entry).await?; entry.validation_status = status.clone(); @@ -678,10 +527,8 @@ pub async fn validate_all_entries( } } - let mut invalidated = HashSet::new(); - for name in &report.invalid_entries { - invalidated.insert(name.clone()); - } + // Cascade invalidation + let mut invalidated: HashSet = report.invalid_entries.iter().cloned().collect(); loop { let mut newly_invalidated = Vec::new(); @@ -691,11 +538,9 @@ pub async fn validate_all_entries( continue; } - for dep in &entry.dependencies { - if invalidated.contains(dep) { - newly_invalidated.push((name.clone(), dep.clone())); - break; - } + // Check if any dependency is invalidated + if let Some(failed_dep) = entry.dependencies.iter().find(|d| invalidated.contains(*d)) { + newly_invalidated.push((name.clone(), failed_dep.clone())); } } @@ -708,9 +553,7 @@ pub async fn validate_all_entries( report.cascaded_invalidations.push(name.clone()); if let Some(entry) = entries.get_mut(&name) { - entry.validation_status = ValidationStatus::DependencyFailed { - failed_dependency: failed_dep, - }; + entry.validation_status = ValidationStatus::DependencyFailed { failed_dependency: failed_dep }; } } } @@ -718,7 +561,6 @@ pub async fn validate_all_entries( Ok(report) } -/// Validation report #[derive(Debug, Default)] pub struct ValidationReport { pub valid_count: usize, @@ -762,10 +604,48 @@ impl ValidationReport { } // ============================================================================ -// STATE MANAGEMENT (UPDATED WITH CENTRALIZED DEPENDENCIES) +// STATE MANAGEMENT // ============================================================================ /// State manager with centralized dependency configuration +/// +/// # Orchestration: Shutdown Flag + State Management +/// +/// ## Happy Path (Normal Completion) +/// 1. Work completes successfully +/// 2. Call `update_entry()` with `completed: true` +/// 3. StateEntry saved with timestamp and valid hash +/// 4. On next run: skips already-completed step +/// +/// ## Shutdown Path (Interrupted Work) +/// 1. Shutdown flag is set via Ctrl+C handler +/// 2. Long-running code checks: `if shutdown_flag.load(Ordering::SeqCst) { break }` +/// 3. Before returning, call `mark_invalid()` +/// 4. StateEntry saved with `completed: false` and ValidationStatus::Invalid +/// 5. On next run: retries invalid step +/// +/// ## Usage Pattern +/// +/// ```rust +/// let manager = StateManager::new(&paths.integrity_dir()).await?; +/// let content_ref = directory_reference(&output_dir, None, None); +/// +/// loop { +/// if shutdown_flag.load(Ordering::SeqCst) { +/// manager.mark_invalid( +/// step_name.to_string(), +/// Some(content_ref.clone()), +/// Some(DataStage::Data), +/// "invalid due to shutdown".to_string(), +/// ).await?; +/// return Ok(()); +/// } +/// // Do work... +/// } +/// +/// // Completed successfully +/// manager.update_entry(step_name.to_string(), content_ref, DataStage::Data, None).await?; +/// ``` pub struct StateManager { base_dir: PathBuf, dependency_config: DependencyConfig, @@ -775,52 +655,40 @@ impl StateManager { /// Create new state manager and load dependency configuration pub async fn new>(base_dir: P) -> Result { let base_dir = base_dir.as_ref().to_path_buf(); - let dependency_config = DependencyConfig::from_default_location(&base_dir).await?; + let config_path = base_dir.join(DEFAULT_DEPENDENCY_CONFIG); + let dependency_config = DependencyConfig::from_file(config_path).await?; - Ok(Self { - base_dir, - dependency_config, - }) + Ok(Self { base_dir, dependency_config }) } /// Create with explicit dependency configuration - pub fn with_config>( - base_dir: P, - dependency_config: DependencyConfig, - ) -> Result { + pub fn with_config>(base_dir: P, dependency_config: DependencyConfig) -> Result { dependency_config.validate()?; - Ok(Self { base_dir: base_dir.as_ref().to_path_buf(), dependency_config, }) } - /// Get the dependency configuration (for inspection/debugging) + /// Get the dependency configuration pub fn get_dependency_config(&self) -> &DependencyConfig { &self.dependency_config } /// Load all state entries from state.jsonl pub async fn load_entries(&self) -> Result> { - let mut entries = HashMap::new(); - - if !self.base_dir.exists() { - return Ok(entries); - } - let state_file = self.base_dir.join("state.jsonl"); if !state_file.exists() { - return Ok(entries); + return Ok(HashMap::new()); } let content = async_fs::read_to_string(&state_file).await?; + let mut entries = HashMap::new(); for line in content.lines() { if line.trim().is_empty() { continue; } - if let Ok(entry) = serde_json::from_str::(line) { entries.insert(entry.step_name.clone(), entry); } @@ -835,65 +703,118 @@ impl StateManager { async_fs::create_dir_all(parent).await?; } - let mut file = async_fs::File::create(&self.base_dir.join("state.jsonl")).await?; + let mut file = async_fs::File::create(self.base_dir.join("state.jsonl")).await?; for entry in entries.values() { - let line = serde_json::to_string(&entry)? + "\n"; - file.write_all(line.as_bytes()).await?; + file.write_all((serde_json::to_string(&entry)? + "\n").as_bytes()).await?; } file.sync_all().await?; - Ok(()) } - /// Create or update a state entry with integrity tracking - /// Dependencies are now resolved automatically from config - pub async fn update_entry( - &self, - step_name: String, - content_reference: ContentReference, - data_stage: DataStage, - ttl_override: Option, - ) -> Result { + /// Create an empty entry for a step (can be updated later) + /// + /// Creates a placeholder entry that marks the step as incomplete and unknown, + /// allowing you to later mark it as valid or invalid via `mark_valid()` or `mark_invalid()`. + /// + /// # Example + /// ```rust + /// let manager = StateManager::new(&paths.integrity_dir()).await?; + /// + /// // Start tracking a long step + /// let mut entry = manager.create_entry("long_operation".to_string()).await?; + /// + /// // Do work... + /// + /// // Mark as valid when done + /// entry.content_reference = Some(content_ref); + /// entry.data_stage = Some(DataStage::Data); + /// manager.mark_valid(entry).await?; + /// ``` + pub async fn create_entry(&self, step_name: String, content_reference: ContentReference, data_stage: DataStage) -> Result { // Resolve dependencies from configuration let dependencies = self.dependency_config .get_all_dependencies(&step_name) - .unwrap_or_else(|_| { - // If checkpoint not in config, no dependencies - Vec::new() - }); + .unwrap_or_default(); - // Compute hash - let hash = hash_content_reference(&content_reference)?; - - // Determine storage - let storage = determine_hash_storage(&hash, &self.base_dir); - - // Store hash if external - store_hash(&hash, &storage).await?; - - // Create entry + // Create empty entry with Unknown status let entry = StateEntry { step_name: step_name.clone(), - completed: true, - completed_at: Some(Utc::now()), + completed: false, + completed_at: None, content_reference: Some(content_reference), - content_hash: Some(storage), + content_hash: None, data_stage: Some(data_stage), - ttl_override, + ttl_override: None, last_validated_at: Some(Utc::now()), - validation_status: ValidationStatus::Valid, + validation_status: ValidationStatus::Unknown, dependencies, }; - // Load existing entries + // Update and save let mut entries = self.load_entries().await?; + entries.insert(step_name, entry.clone()); + self.save_entries(&entries).await?; + + Ok(entry) + } + + /// Mark a StateEntry as valid and save to disk + /// + /// Updates the entry with: + /// - `completed: true` + /// - `completed_at: now` + /// - `validation_status: Valid` + /// - Computes and stores content hash + /// + /// # Requires + /// - `entry.content_reference` must be `Some()` + /// - `entry.data_stage` must be `Some()` + pub async fn mark_valid(&self, mut entry: StateEntry) -> Result { + // Get content reference and data stage (required) + let content_reference = entry.content_reference.as_ref() + .ok_or_else(|| anyhow::anyhow!("content_reference is required to mark entry valid"))?; + let data_stage = entry.data_stage + .ok_or_else(|| anyhow::anyhow!("data_stage is required to mark entry valid"))?; + + // Compute and store hash + let hash = hash_content_reference(content_reference)?; + let storage = determine_storage(&hash, &self.base_dir); + store_hash(&hash, &storage).await?; // Update entry - entries.insert(step_name, entry.clone()); + entry.completed = true; + entry.completed_at = Some(Utc::now()); + entry.content_hash = Some(storage); + entry.data_stage = Some(data_stage); + entry.last_validated_at = Some(Utc::now()); + entry.validation_status = ValidationStatus::Valid; // Save + let mut entries = self.load_entries().await?; + entries.insert(entry.step_name.clone(), entry.clone()); + self.save_entries(&entries).await?; + + Ok(entry) + } + + /// Mark a StateEntry as invalid and save to disk + /// + /// Updates the entry with: + /// - `completed: false` + /// - `completed_at: None` + /// - `validation_status: Invalid { reason }` + pub async fn mark_invalid(&self, mut entry: StateEntry, reason: String) -> Result { + // Update entry + entry.completed = false; + entry.completed_at = None; + entry.last_validated_at = Some(Utc::now()); + entry.validation_status = ValidationStatus::Invalid { reason }; + + // Save + let mut entries = self.load_entries().await?; + entries.insert(entry.step_name.clone(), entry.clone()); self.save_entries(&entries).await?; Ok(entry) @@ -911,20 +832,6 @@ impl StateManager { } } - /// Invalidate a specific entry - pub async fn invalidate_entry(&self, step_name: &str, reason: String) -> Result<()> { - let mut entries = self.load_entries().await?; - - if let Some(entry) = entries.get_mut(step_name) { - entry.validation_status = ValidationStatus::Invalid { reason }; - entry.last_validated_at = Some(Utc::now()); - } - - self.save_entries(&entries).await?; - - Ok(()) - } - /// Run full validation on all entries pub async fn validate_all(&self) -> Result { let mut entries = self.load_entries().await?; @@ -982,9 +889,7 @@ impl StateManager { /// Create a simple file reference pub fn file_reference>(path: P) -> ContentReference { - ContentReference::File { - path: path.as_ref().to_path_buf(), - } + ContentReference::File { path: path.as_ref().to_path_buf() } } /// Create a directory reference diff --git a/src/util/macros.rs b/src/util/macros.rs index 9188d36..ea2c414 100644 --- a/src/util/macros.rs +++ b/src/util/macros.rs @@ -7,4 +7,22 @@ macro_rules! check_shutdown { return Ok(()); } }; +} + +/// Mark incomplete state on shutdown +/// Usage: mark_incomplete_on_shutdown!(&manager, "step_name", content_ref, DataStage::Data, &shutdown_flag)?; +#[macro_export] +macro_rules! mark_incomplete_on_shutdown { + ($manager:expr, $step_name:expr, $content_ref:expr, $data_stage:expr, $shutdown_flag:expr) => { + if $shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) { + $manager + .mark_incomplete( + $step_name.to_string(), + $content_ref, + $data_stage, + "Incomplete due to shutdown".to_string(), + ) + .await?; + } + }; } \ No newline at end of file