diff --git a/SHUTDOWN_STATE_COORDINATION.md b/SHUTDOWN_STATE_COORDINATION.md deleted file mode 100644 index 42780dc..0000000 --- a/SHUTDOWN_STATE_COORDINATION.md +++ /dev/null @@ -1,373 +0,0 @@ -# Shutdown Flag & State Management Orchestration - -## Problem Statement - -Previously, the shutdown flag and StateManager worked independently: -- **Shutdown Flag**: `Arc` signals code to stop execution -- **StateManager**: Tracks completion of work with hash validation and dependencies - -This caused a critical issue: **when shutdown occurred mid-process, no state was recorded**, so on restart the entire step would be retried from scratch, losing all progress. - -## Solution: Coordinated Lifecycle Management - -### Overview - -The shutdown flag and StateManager now work together in a coordinated lifecycle: - -``` -Work In Progress - ↓ - Shutdown Signal (Ctrl+C) - ↓ - Record Incomplete State - ↓ - Return & Cleanup - ↓ - Next Run: Retry From Checkpoint -``` - -### Core Concepts - -#### 1. **StateEntry Lifecycle** - -Each checkpoint has two completion states: - -```rust -// Happy Path: Work Completed Successfully -StateEntry { - completed: true, // ✓ Finished - completed_at: Some(timestamp), // When it finished - validation_status: Valid, // Hash is current -} - -// Shutdown Path: Work Interrupted -StateEntry { - completed: false, // ✗ Incomplete - completed_at: None, // Never finished - validation_status: Invalid { // Won't be skipped - reason: "Incomplete due to shutdown" - } -} -``` - -#### 2. **State Management Functions** - -Two key functions orchestrate the shutdown/completion dance: - -```rust -// Normal Completion (happy path) -manager.update_entry( - "step_name".to_string(), - content_reference, - DataStage::Data, - None, -).await?; - -// Shutdown Completion (incomplete work) -manager.mark_incomplete( - "step_name".to_string(), - Some(content_reference), - Some(DataStage::Data), - "Incomplete: processed 50 of 1000 items".to_string(), -).await?; -``` - -### Implementation Pattern - -Every long-running function should follow this pattern: - -```rust -pub async fn process_large_dataset( - paths: &DataPaths, - shutdown_flag: &Arc, -) -> Result { - // 1. Initialize state manager and content reference - let manager = StateManager::new(&paths.integrity_dir()).await?; - let step_name = "process_large_dataset"; - let content_ref = directory_reference(&output_dir, None, None); - - let mut processed_count = 0; - - // 2. Main processing loop - loop { - // CRITICAL: Check shutdown at key points - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected - marking state as incomplete").await; - - // Record incomplete state for retry - manager.mark_incomplete( - step_name.to_string(), - Some(content_ref.clone()), - Some(DataStage::Data), - format!("Incomplete: processed {} items", processed_count), - ).await?; - - return Ok(processed_count); - } - - // 3. Do work... - processed_count += 1; - } - - // 4. If we reach here, work is complete - // Shutdown check BEFORE marking complete - if shutdown_flag.load(Ordering::SeqCst) { - manager.mark_incomplete( - step_name.to_string(), - Some(content_ref), - Some(DataStage::Data), - format!("Incomplete during final stage: processed {} items", processed_count), - ).await?; - } else { - // Only mark complete if shutdown was NOT signaled - manager.update_entry( - step_name.to_string(), - content_ref, - DataStage::Data, - None, - ).await?; - } - - Ok(processed_count) -} -``` - -### Why Two Functions Are Different - -| Aspect | `update_entry()` | `mark_incomplete()` | -|--------|------------------|-------------------| -| **Use Case** | Normal completion | Shutdown/abort | -| `completed` | `true` | `false` | -| `completed_at` | `Some(now)` | `None` | -| `validation_status` | `Valid` | `Invalid { reason }` | -| Next Run | **Skipped** (already done) | **Retried** (incomplete) | -| Hash Stored | Always | Optional (may fail to compute) | -| Semantics | "This work is finished" | "This work wasn't finished" | - -### Shutdown Flag Setup - -The shutdown flag is initialized in `main.rs`: - -```rust -let shutdown_flag = Arc::new(AtomicBool::new(false)); - -// Ctrl+C handler -fn setup_shutdown_handler( - shutdown_flag: Arc, - pool: Arc, - proxy_pool: Option>, -) { - tokio::spawn(async move { - tokio::signal::ctrl_c().await.ok(); - logger::log_info("Ctrl+C received – shutting down gracefully...").await; - - // Set flag to signal all tasks to stop - shutdown_flag.store(true, Ordering::SeqCst); - - // Wait for tasks to clean up - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - - // Final cleanup - perform_full_cleanup(&pool, proxy_pool.as_deref()).await; - std::process::exit(0); - }); -} -``` - -### Multi-Level Shutdown Checks - -For efficiency, shutdown is checked at different levels: - -```rust -// 1. Macro for quick checks (returns early) -check_shutdown!(shutdown_flag); - -// 2. Loop check (inside tight processing loops) -if shutdown_flag.load(Ordering::SeqCst) { - break; -} - -// 3. Final completion check (before marking complete) -if shutdown_flag.load(Ordering::SeqCst) { - manager.mark_incomplete(...).await?; -} else { - manager.update_entry(...).await?; -} -``` - -### Practical Example: Update Companies - -The `update_companies` function shows the full pattern: - -```rust -pub async fn update_companies( - paths: &DataPaths, - config: &Config, - pool: &Arc, - shutdown_flag: &Arc, -) -> anyhow::Result { - let manager = StateManager::new(&paths.integrity_dir()).await?; - let step_name = "update_companies"; - let content_reference = directory_reference(...); - - // Process companies... - loop { - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected").await; - break; - } - // Process items... - } - - // Final checkpoint - let (final_count, _, _) = writer_task.await.unwrap_or((0, 0, 0)); - - // CRITICAL: Check shutdown before marking complete - if shutdown_flag.load(Ordering::SeqCst) { - manager.mark_incomplete( - step_name.to_string(), - Some(content_reference), - Some(DataStage::Data), - format!("Incomplete: processed {} items", final_count), - ).await?; - } else { - manager.update_entry( - step_name.to_string(), - content_reference, - DataStage::Data, - None, - ).await?; - } - - Ok(final_count) -} -``` - -### State Tracking in `state.jsonl` - -With this pattern, the state file captures work progression: - -**Before Shutdown:** -```jsonl -{"step_name":"update_companies","completed":false,"validation_status":{"Invalid":"Processing 523 items..."},"dependencies":["lei_figi_mapping_complete"]} -``` - -**After Completion:** -```jsonl -{"step_name":"update_companies","completed":true,"completed_at":"2026-01-14T21:30:45Z","validation_status":"Valid","dependencies":["lei_figi_mapping_complete"]} -``` - -**After Resume:** -- System detects `completed: false` and `validation_status: Invalid` -- Retries `update_companies` from checkpoint -- Uses `.log` files to skip already-processed items -- On success, updates to `completed: true` - -## Benefits - -### 1. **Crash Safety** -- Progress is recorded at shutdown -- No lost work on restart -- Checkpoints prevent reprocessing - -### 2. **Graceful Degradation** -- Long-running functions can be interrupted -- State is always consistent -- Dependencies are tracked - -### 3. **Debugging** -- `state.jsonl` shows exactly which steps were incomplete -- Reasons are recorded for incomplete states -- Progress counts help diagnose where it was interrupted - -### 4. **Consistency** -- `update_entry()` only used for complete work -- `mark_incomplete()` only used for interrupted work -- No ambiguous states - -## Common Mistakes to Avoid - -### ❌ Don't: Call `update_entry()` without shutdown check -```rust -// BAD: Might mark shutdown state as complete! -manager.update_entry(...).await?; -``` - -### ✅ Do: Check shutdown before `update_entry()` -```rust -// GOOD: Only marks complete if not shutting down -if !shutdown_flag.load(Ordering::SeqCst) { - manager.update_entry(...).await?; -} -``` - -### ❌ Don't: Forget `mark_incomplete()` on shutdown -```rust -if shutdown_flag.load(Ordering::SeqCst) { - return Ok(()); // Lost progress! -} -``` - -### ✅ Do: Record incomplete state -```rust -if shutdown_flag.load(Ordering::SeqCst) { - manager.mark_incomplete(...).await?; - return Ok(()); -} -``` - -### ❌ Don't: Store partial data without recording state -```rust -// Write output, but forget to track in state -write_output(...).await?; -// If shutdown here, next run won't know it's incomplete -``` - -### ✅ Do: Update state atomically -```rust -// Update output and state together -write_output(...).await?; -manager.update_entry(...).await?; // Or mark_incomplete if shutdown -``` - -## Testing the Orchestration - -### Test 1: Normal Completion -```bash -cargo run # Let it finish -grep completed state.jsonl # Should show "true" -``` - -### Test 2: Shutdown & Restart -```bash -# Terminal 1: -cargo run # Running... -# Wait a bit - -# Terminal 2: -pkill -f "web_scraper" # Send shutdown - -# Check state: -grep update_companies state.jsonl # Should show "completed: false" - -# Restart: -cargo run # Continues from checkpoint -``` - -### Test 3: Verify No Reprocessing -```bash -# Modify a file to add 1000 test items -# Run first time - processes 1000, shutdown at 500 -# Check state.jsonl - shows "Incomplete: 500 items" -# Run second time - should skip first 500, process remaining 500 -``` - -## Summary - -The coordinated shutdown & state system ensures: - -1. **Work is never lost** - Progress recorded at shutdown -2. **No reprocessing** - Checkpoints skip completed items -3. **Transparent state** - `state.jsonl` shows exactly what's done -4. **Easy debugging** - Reason for incompleteness is recorded -5. **Graceful scaling** - Works with concurrent tasks and hard resets