added cross compatiblity between shutdown flag and state entries

This commit is contained in:
2026-01-15 00:22:55 +01:00
parent f4b20f824d
commit 75ab1969c7
14 changed files with 850 additions and 543 deletions

View File

@@ -0,0 +1,373 @@
# Shutdown Flag & State Management Orchestration
## Problem Statement
Previously, the shutdown flag and StateManager worked independently:
- **Shutdown Flag**: `Arc<AtomicBool>` signals code to stop execution
- **StateManager**: Tracks completion of work with hash validation and dependencies
This caused a critical issue: **when shutdown occurred mid-process, no state was recorded**, so on restart the entire step would be retried from scratch, losing all progress.
## Solution: Coordinated Lifecycle Management
### Overview
The shutdown flag and StateManager now work together in a coordinated lifecycle:
```
Work In Progress
Shutdown Signal (Ctrl+C)
Record Incomplete State
Return & Cleanup
Next Run: Retry From Checkpoint
```
### Core Concepts
#### 1. **StateEntry Lifecycle**
Each checkpoint has two completion states:
```rust
// Happy Path: Work Completed Successfully
StateEntry {
completed: true, // ✓ Finished
completed_at: Some(timestamp), // When it finished
validation_status: Valid, // Hash is current
}
// Shutdown Path: Work Interrupted
StateEntry {
completed: false, // ✗ Incomplete
completed_at: None, // Never finished
validation_status: Invalid { // Won't be skipped
reason: "Incomplete due to shutdown"
}
}
```
#### 2. **State Management Functions**
Two key functions orchestrate the shutdown/completion dance:
```rust
// Normal Completion (happy path)
manager.update_entry(
"step_name".to_string(),
content_reference,
DataStage::Data,
None,
).await?;
// Shutdown Completion (incomplete work)
manager.mark_incomplete(
"step_name".to_string(),
Some(content_reference),
Some(DataStage::Data),
"Incomplete: processed 50 of 1000 items".to_string(),
).await?;
```
### Implementation Pattern
Every long-running function should follow this pattern:
```rust
pub async fn process_large_dataset(
paths: &DataPaths,
shutdown_flag: &Arc<AtomicBool>,
) -> Result<usize> {
// 1. Initialize state manager and content reference
let manager = StateManager::new(&paths.integrity_dir()).await?;
let step_name = "process_large_dataset";
let content_ref = directory_reference(&output_dir, None, None);
let mut processed_count = 0;
// 2. Main processing loop
loop {
// CRITICAL: Check shutdown at key points
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected - marking state as incomplete").await;
// Record incomplete state for retry
manager.mark_incomplete(
step_name.to_string(),
Some(content_ref.clone()),
Some(DataStage::Data),
format!("Incomplete: processed {} items", processed_count),
).await?;
return Ok(processed_count);
}
// 3. Do work...
processed_count += 1;
}
// 4. If we reach here, work is complete
// Shutdown check BEFORE marking complete
if shutdown_flag.load(Ordering::SeqCst) {
manager.mark_incomplete(
step_name.to_string(),
Some(content_ref),
Some(DataStage::Data),
format!("Incomplete during final stage: processed {} items", processed_count),
).await?;
} else {
// Only mark complete if shutdown was NOT signaled
manager.update_entry(
step_name.to_string(),
content_ref,
DataStage::Data,
None,
).await?;
}
Ok(processed_count)
}
```
### Why Two Functions Are Different
| Aspect | `update_entry()` | `mark_incomplete()` |
|--------|------------------|-------------------|
| **Use Case** | Normal completion | Shutdown/abort |
| `completed` | `true` | `false` |
| `completed_at` | `Some(now)` | `None` |
| `validation_status` | `Valid` | `Invalid { reason }` |
| Next Run | **Skipped** (already done) | **Retried** (incomplete) |
| Hash Stored | Always | Optional (may fail to compute) |
| Semantics | "This work is finished" | "This work wasn't finished" |
### Shutdown Flag Setup
The shutdown flag is initialized in `main.rs`:
```rust
let shutdown_flag = Arc::new(AtomicBool::new(false));
// Ctrl+C handler
fn setup_shutdown_handler(
shutdown_flag: Arc<AtomicBool>,
pool: Arc<ChromeDriverPool>,
proxy_pool: Option<Arc<DockerVpnProxyPool>>,
) {
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
logger::log_info("Ctrl+C received shutting down gracefully...").await;
// Set flag to signal all tasks to stop
shutdown_flag.store(true, Ordering::SeqCst);
// Wait for tasks to clean up
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// Final cleanup
perform_full_cleanup(&pool, proxy_pool.as_deref()).await;
std::process::exit(0);
});
}
```
### Multi-Level Shutdown Checks
For efficiency, shutdown is checked at different levels:
```rust
// 1. Macro for quick checks (returns early)
check_shutdown!(shutdown_flag);
// 2. Loop check (inside tight processing loops)
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
// 3. Final completion check (before marking complete)
if shutdown_flag.load(Ordering::SeqCst) {
manager.mark_incomplete(...).await?;
} else {
manager.update_entry(...).await?;
}
```
### Practical Example: Update Companies
The `update_companies` function shows the full pattern:
```rust
pub async fn update_companies(
paths: &DataPaths,
config: &Config,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<usize> {
let manager = StateManager::new(&paths.integrity_dir()).await?;
let step_name = "update_companies";
let content_reference = directory_reference(...);
// Process companies...
loop {
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected").await;
break;
}
// Process items...
}
// Final checkpoint
let (final_count, _, _) = writer_task.await.unwrap_or((0, 0, 0));
// CRITICAL: Check shutdown before marking complete
if shutdown_flag.load(Ordering::SeqCst) {
manager.mark_incomplete(
step_name.to_string(),
Some(content_reference),
Some(DataStage::Data),
format!("Incomplete: processed {} items", final_count),
).await?;
} else {
manager.update_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
None,
).await?;
}
Ok(final_count)
}
```
### State Tracking in `state.jsonl`
With this pattern, the state file captures work progression:
**Before Shutdown:**
```jsonl
{"step_name":"update_companies","completed":false,"validation_status":{"Invalid":"Processing 523 items..."},"dependencies":["lei_figi_mapping_complete"]}
```
**After Completion:**
```jsonl
{"step_name":"update_companies","completed":true,"completed_at":"2026-01-14T21:30:45Z","validation_status":"Valid","dependencies":["lei_figi_mapping_complete"]}
```
**After Resume:**
- System detects `completed: false` and `validation_status: Invalid`
- Retries `update_companies` from checkpoint
- Uses `.log` files to skip already-processed items
- On success, updates to `completed: true`
## Benefits
### 1. **Crash Safety**
- Progress is recorded at shutdown
- No lost work on restart
- Checkpoints prevent reprocessing
### 2. **Graceful Degradation**
- Long-running functions can be interrupted
- State is always consistent
- Dependencies are tracked
### 3. **Debugging**
- `state.jsonl` shows exactly which steps were incomplete
- Reasons are recorded for incomplete states
- Progress counts help diagnose where it was interrupted
### 4. **Consistency**
- `update_entry()` only used for complete work
- `mark_incomplete()` only used for interrupted work
- No ambiguous states
## Common Mistakes to Avoid
### ❌ Don't: Call `update_entry()` without shutdown check
```rust
// BAD: Might mark shutdown state as complete!
manager.update_entry(...).await?;
```
### ✅ Do: Check shutdown before `update_entry()`
```rust
// GOOD: Only marks complete if not shutting down
if !shutdown_flag.load(Ordering::SeqCst) {
manager.update_entry(...).await?;
}
```
### ❌ Don't: Forget `mark_incomplete()` on shutdown
```rust
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(()); // Lost progress!
}
```
### ✅ Do: Record incomplete state
```rust
if shutdown_flag.load(Ordering::SeqCst) {
manager.mark_incomplete(...).await?;
return Ok(());
}
```
### ❌ Don't: Store partial data without recording state
```rust
// Write output, but forget to track in state
write_output(...).await?;
// If shutdown here, next run won't know it's incomplete
```
### ✅ Do: Update state atomically
```rust
// Update output and state together
write_output(...).await?;
manager.update_entry(...).await?; // Or mark_incomplete if shutdown
```
## Testing the Orchestration
### Test 1: Normal Completion
```bash
cargo run # Let it finish
grep completed state.jsonl # Should show "true"
```
### Test 2: Shutdown & Restart
```bash
# Terminal 1:
cargo run # Running...
# Wait a bit
# Terminal 2:
pkill -f "web_scraper" # Send shutdown
# Check state:
grep update_companies state.jsonl # Should show "completed: false"
# Restart:
cargo run # Continues from checkpoint
```
### Test 3: Verify No Reprocessing
```bash
# Modify a file to add 1000 test items
# Run first time - processes 1000, shutdown at 500
# Check state.jsonl - shows "Incomplete: 500 items"
# Run second time - should skip first 500, process remaining 500
```
## Summary
The coordinated shutdown & state system ensures:
1. **Work is never lost** - Progress recorded at shutdown
2. **No reprocessing** - Checkpoints skip completed items
3. **Transparent state** - `state.jsonl` shows exactly what's done
4. **Easy debugging** - Reason for incompleteness is recorded
5. **Graceful scaling** - Works with concurrent tasks and hard resets

View File

@@ -4,25 +4,25 @@ digraph Dependencies {
"yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete
Options data enriched for all companies"];
"yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete
Corporate events enriched for all companies"];
"yahoo_companies_cleansed_no_data" [label="yahoo_companies_cleansed_no_data
Companies cleansed of data with no Yahoo results"];
"yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete
Chart data enriched for all companies"];
"enrichment_group" [label="enrichment_group
Yahoo exchanges collected and validated"];
"yahoo_companies_cleansed_low_profile" [label="yahoo_companies_cleansed_low_profile
Companies cleansed of low profile (insufficient market cap/price data)"];
"lei_figi_mapping_complete" [label="lei_figi_mapping_complete
LEI-to-FIGI mappings from OpenFIGI API"];
"securities_data_complete" [label="securities_data_complete
Securities data built from FIGI mappings"];
"yahoo_companies_cleansed_low_profile" [label="yahoo_companies_cleansed_low_profile
Companies cleansed of low profile (insufficient market cap/price data)"];
"yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete
Corporate events enriched for all companies"];
"enrichment_group" [label="enrichment_group
Yahoo exchanges collected and validated"];
"yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete
Chart data enriched for all companies"];
"yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed_low_profile" [label="via group enrichment_group"];
"yahoo_companies_cleansed_no_data" -> "securities_data_complete";
"securities_data_complete" -> "lei_figi_mapping_complete";
"yahoo_companies_cleansed_low_profile" -> "yahoo_companies_cleansed_no_data";
"yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed_low_profile" [label="via group enrichment_group"];
"yahoo_companies_cleansed_no_data" -> "securities_data_complete";
"yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed_low_profile" [label="via group enrichment_group"];
"yahoo_companies_cleansed_low_profile" -> "yahoo_companies_cleansed_no_data";
"securities_data_complete" -> "lei_figi_mapping_complete";
}

View File

@@ -1,6 +1,6 @@
// src/corporate/collect_exchanges.rs
use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, file_reference};
use crate::util::integrity::{DataStage, StateEntry, StateManager, file_reference};
use crate::util::logger;
use crate::corporate::types::*;
@@ -244,11 +244,11 @@ fn get_fallback_rate(currency: &str) -> f64 {
/// - Handles missing or invalid data gracefully
/// - Integrity tracking with content hash validation
pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usize> {
let output_path = paths.data_dir().join("yahoo_exchanges.json");
let manager = StateManager::new(paths.integrity_dir()).await?;
let step_name = "exchange_collection_complete";
let output_path = paths.data_dir().join("yahoo_exchanges.json");
if manager.is_step_valid(step_name).await? {
logger::log_info(" Exchange collection already completed and valid").await;
@@ -260,6 +260,7 @@ pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usi
return Ok(exchanges.len());
}
}
let entry = create_exchange_collection_state_entry(&manager, &output_path, step_name).await?;
logger::log_info("Collecting exchange information from company directories...").await;
let corporate_dir = paths.corporate_dir();
@@ -378,7 +379,7 @@ pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usi
output_path.display()
)).await;
track_exchange_collection_completion(&manager, &output_path, step_name).await?;
manager.mark_valid(entry).await?;
logger::log_info(" ✓ Exchange collection marked as complete with integrity tracking").await;
// Print summary statistics
@@ -388,11 +389,11 @@ pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usi
}
/// Track exchange collection completion with content hash verification
async fn track_exchange_collection_completion(
async fn create_exchange_collection_state_entry(
manager: &StateManager,
output_path: &std::path::Path,
step_name: &str,
) -> anyhow::Result<()> {
) -> anyhow::Result<StateEntry> {
// Create content reference for the output file
let content_reference = file_reference(output_path);
@@ -402,14 +403,11 @@ async fn track_exchange_collection_completion(
// - Dependencies: None (this is a collection step, not dependent on other tracked steps)
// Note: In practice, it depends on core data, but we track the output file
// which will change if core data changes, so explicit dependency not needed
manager.update_entry(
Ok(manager.create_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days for Data stage)
).await?;
Ok(())
).await?)
}
/// Extract exchange information from a company's core data file

View File

@@ -50,7 +50,7 @@ pub async fn run_full_update(
check_shutdown!(shutdown_flag);
logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await;
let all_mapped = update_lei_mapping(&gleif_csv_path, None).await?;
let all_mapped = update_lei_mapping(&paths, &gleif_csv_path, None).await?;
if !all_mapped {
logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await;
@@ -61,7 +61,7 @@ pub async fn run_full_update(
check_shutdown!(shutdown_flag);
logger::log_info("Step 4: Building securities map (streaming)...").await;
update_securities().await?;
update_securities(&paths).await?;
logger::log_info(" ✓ Securities map updated").await;
let paths = DataPaths::new(".")?;

View File

@@ -1,6 +1,7 @@
// src/corporate/update_companies.rs
use super::{types::*, yahoo_company_extraction::*, helpers::*};
use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, file_reference};
use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool;
use crate::scraper::hard_reset::perform_hard_reset;
@@ -103,7 +104,7 @@ pub async fn update_companies(
// Synchronization for hard reset
let reset_in_progress = Arc::new(tokio::sync::Mutex::new(false));
let securities_path = paths.corporate_dir().join("figi_securities");
let securities_path = paths.figi_securities_dir();
let securities_checkpoint = securities_path.join("common_stocks.jsonl");
let securities_log = securities_path.join("common_stocks.log.jsonl");
@@ -123,7 +124,19 @@ pub async fn update_companies(
if let Some(parent) = companies_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let manager = StateManager::new(paths.integrity_dir()).await?;
let content_reference = file_reference(&companies_path);
let step_name = "corporate_companies_update";
let data_stage = DataStage::Data;
if manager.is_step_valid(step_name).await? {
logger::log_info(" Companies data already built and valid").await;
return Ok(securities.len());
}
logger::log_info(" Companies data incomplete or missing, proceeding with update").await;
let entry: crate::util::integrity::StateEntry = manager.create_entry(step_name.to_string(), content_reference, data_stage).await?;
// === RECOVERY PHASE: Load checkpoint + replay log ===
let existing_companies = checkpoint_helpers::load_checkpoint_with_log(
&companies_path,
@@ -615,6 +628,23 @@ pub async fn update_companies(
"✅ Completed: {} total companies ({} new, {} updated, {} hard resets)",
final_count, final_new, final_updated, hard_reset_count
)).await;
// Track completion with:
// - Content reference: All output JSONL files
// - Data stage: Data (7-day TTL) - Securities data relatively stable
// - Dependencies: LEI-FIGI mapping must be valid
// Check for shutdown BEFORE marking complete
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected during company update - marking as invalid for retry").await;
manager.mark_invalid(
entry,
format!("Invalid: processed {} of {} companies before shutdown", final_count, total),
).await?;
} else {
// Only mark complete if we got here without shutdown
manager.mark_valid(entry).await?;
}
Ok(final_count)
}

View File

@@ -59,6 +59,11 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize
logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await;
return Ok(count);
}
let entry = manager.create_entry(
step_name.to_string(),
content_reference.clone(),
DataStage::Data,
).await?;
logger::log_info(" Cleansing companies with missing Yahoo data...").await;
@@ -130,12 +135,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize
// - Content reference: All event directories
// - Data stage: Data (7-day TTL by default)
// - Dependencies: Depends on cleaned companies data
manager.update_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days for Data stage)
).await?;
manager.mark_valid(entry).await?;
Ok(valid_count)
}
@@ -194,6 +194,11 @@ pub async fn companies_yahoo_cleansed_low_profile(
logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo_cleaned.jsonl", count)).await;
return Ok(count);
}
let entry = manager.create_entry(
step_name.to_string(),
content_reference.clone(),
DataStage::Data,
).await?;
logger::log_info(" Cleansing companies with low Yahoo profile...").await;
@@ -600,12 +605,9 @@ pub async fn companies_yahoo_cleansed_low_profile(
// - Content reference: All event directories
// - Data stage: Data (7-day TTL by default)
// - Dependencies: Depends on cleaned companies data
manager.update_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days for Data stage)
).await?;
if !shutdown_flag.load(Ordering::SeqCst) {
manager.mark_valid(entry).await?;
}
Ok(final_count)
}

View File

@@ -3,7 +3,7 @@ use super::{types::*, helpers::*};
use crate::config::Config;
use crate::corporate::checkpoint_helpers;
use crate::util::directories::DataPaths;
use crate::util::integrity::{StateManager, directory_reference, DataStage};
use crate::util::integrity::{DataStage, StateEntry, StateManager, directory_reference};
use crate::util::logger;
use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule};
@@ -82,6 +82,7 @@ pub async fn enrich_companies_with_events(
let manager = StateManager::new(paths.integrity_dir()).await?;
let step_name = "yahoo_events_enrichment_complete";
let entry = create_events_state_entry(&manager, paths, step_name).await?;
if manager.is_step_valid(step_name).await? {
logger::log_info(" Yahoo events enrichment already completed and valid").await;
@@ -118,7 +119,7 @@ pub async fn enrich_companies_with_events(
if pending_count == 0 {
logger::log_info(" ✓ All companies already enriched").await;
track_events_completion(&manager, paths, step_name).await?;
manager.mark_valid(entry).await?;
return Ok(enriched_companies.len());
}
@@ -237,9 +238,14 @@ pub async fn enrich_companies_with_events(
final_processed, final_success, final_failed
)).await;
// Mark as complete if all companies processed
if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) {
track_events_completion(&manager, paths, step_name).await?;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected during company update - marking as invalid for retry").await;
manager.mark_invalid(
entry,
format!("Invalid: processed {} companies before shutdown", final_processed),
).await?;
} else {
manager.mark_valid(entry).await?;
logger::log_info(" ✓ Event enrichment marked as complete with integrity tracking").await;
}
@@ -247,11 +253,11 @@ pub async fn enrich_companies_with_events(
}
/// Track event enrichment completion with content hash verification
async fn track_events_completion(
async fn create_events_state_entry(
manager: &StateManager,
paths: &DataPaths,
step_name: &str,
) -> anyhow::Result<()> {
) -> anyhow::Result<StateEntry> {
// Create content reference for all event data
// This will hash ALL files matching the pattern: {company}/events/data.jsonl
let content_reference = directory_reference(
@@ -271,14 +277,11 @@ async fn track_events_completion(
// - Content reference: All event directories
// - Data stage: Data (7-day TTL by default)
// - Dependencies: Depends on cleaned companies data
manager.update_entry(
Ok(manager.create_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days for Data stage)
).await?;
Ok(())
).await?)
}
/// Enrich a single company with event data
@@ -425,6 +428,8 @@ pub async fn enrich_companies_with_option(
logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await;
return Ok(count);
}
let entry = create_option_state_entry(&manager, paths, step_name).await?;
logger::log_info(" Option data needs refresh - starting enrichment").await;
@@ -452,7 +457,7 @@ pub async fn enrich_companies_with_option(
if pending_count == 0 {
logger::log_info(" ✓ All companies already enriched").await;
track_option_completion(&manager, paths, step_name).await?;
manager.mark_valid(entry).await?;
return Ok(enriched_companies.len());
}
@@ -560,8 +565,14 @@ pub async fn enrich_companies_with_option(
)).await;
// Mark as complete if all companies processed
if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) {
track_option_completion(&manager, paths, step_name).await?;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected during company update - marking as invalid for retry").await;
manager.mark_invalid(
entry,
format!("Invalid: processed {} companies before shutdown", final_processed),
).await?;
} else {
manager.mark_valid(entry).await?;
logger::log_info(" ✓ Option enrichment marked as complete with integrity tracking").await;
}
@@ -569,11 +580,11 @@ pub async fn enrich_companies_with_option(
}
/// Track option enrichment completion with content hash verification
async fn track_option_completion(
async fn create_option_state_entry(
manager: &StateManager,
paths: &DataPaths,
step_name: &str,
) -> anyhow::Result<()> {
) -> anyhow::Result<StateEntry> {
// Create content reference for all option data
// This will hash ALL files matching the pattern: {company}/option/data.jsonl
let content_reference = directory_reference(
@@ -593,14 +604,11 @@ async fn track_option_completion(
// - Content reference: All option directories
// - Data stage: Data (7-day TTL by default)
// - Dependencies: Depends on cleaned companies data
manager.update_entry(
Ok(manager.create_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days for Data stage)
).await?;
Ok(())
).await?)
}
/// Enrich a single company with option data
@@ -684,6 +692,7 @@ pub async fn enrich_companies_with_chart(
logger::log_info(&format!(" ✓ Found {} companies with valid chart data", count)).await;
return Ok(count);
}
let entry = create_chart_state_entry(&manager, paths, step_name).await?;
logger::log_info(" Chart data needs refresh - starting enrichment").await;
@@ -711,7 +720,7 @@ pub async fn enrich_companies_with_chart(
if pending_count == 0 {
logger::log_info(" ✓ All companies already enriched").await;
track_chart_completion(&manager, paths, step_name).await?;
manager.mark_valid(entry).await?;
return Ok(enriched_companies.len());
}
@@ -819,8 +828,14 @@ pub async fn enrich_companies_with_chart(
)).await;
// Mark as complete if all companies processed
if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) {
track_chart_completion(&manager, paths, step_name).await?;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected during company update - marking as invalid for retry").await;
manager.mark_invalid(
entry,
format!("Invalid: processed {} companies before shutdown", final_processed),
).await?;
} else {
manager.mark_valid(entry).await?;
logger::log_info(" ✓ Chart enrichment marked as complete with integrity tracking").await;
}
@@ -828,11 +843,11 @@ pub async fn enrich_companies_with_chart(
}
/// Track chart enrichment completion with content hash verification
async fn track_chart_completion(
async fn create_chart_state_entry(
manager: &StateManager,
paths: &DataPaths,
step_name: &str,
) -> anyhow::Result<()> {
) -> anyhow::Result<StateEntry> {
// Create content reference for all chart data
// This will hash ALL files matching the pattern: {company}/chart/data.jsonl
let content_reference = directory_reference(
@@ -852,14 +867,11 @@ async fn track_chart_completion(
// - Content reference: All chart directories
// - Data stage: Data (7-day TTL by default)
// - Dependencies: Depends on cleaned companies data
manager.update_entry(
Ok(manager.create_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days for Data stage)
).await?;
Ok(())
).await?)
}
/// Enrich a single company with chart data

View File

@@ -28,32 +28,42 @@ const LEI_BATCH_SIZE: usize = 100; // Process 100 LEIs at a time
///
/// # Errors
/// Returns an error if file I/O fails or JSON parsing fails.
pub async fn update_securities() -> anyhow::Result<()> {
pub async fn update_securities(paths: &DataPaths) -> anyhow::Result<()> {
logger::log_info("Building securities data from FIGI mappings...").await;
let dir = DataPaths::new(".")?;
let manager = StateManager::new(&dir.integrity_dir()).await?;
let step_name = "securities_data_complete";
let date_dir = find_most_recent_figi_date_dir(&dir).await?
let date_dir = find_most_recent_figi_date_dir(&paths).await?
.ok_or_else(|| anyhow!("No FIGI date directory found"))?;
let data_dir = dir.data_dir();
let output_dir = data_dir.join("figi_securities");
tokio_fs::create_dir_all(&output_dir).await
.context("Failed to create corporate/by_name directory")?;
let output_dir = paths.figi_securities_dir();
let manager = StateManager::new(&paths.integrity_dir()).await?;
let step_name = "securities_data_complete";
let content_reference = directory_reference(
output_dir,
Some(vec![
"common_stocks.jsonl".to_string(),
"warrants.jsonl".to_string(),
"options.jsonl".to_string(),
"corporate_bonds.jsonl".to_string(),
"government_bonds.jsonl".to_string(),
]),
Some(vec![
"*.log.jsonl".to_string(), // Exclude log files
"*.tmp".to_string(), // Exclude temp files
"state.jsonl".to_string(), // Exclude internal state tracking
]),
);
let data_stage = DataStage::Data;
if manager.is_step_valid(step_name).await? {
logger::log_info(" Securities data already built and valid").await;
logger::log_info(" All sectors already processed, nothing to do").await;
return Ok(());
}
logger::log_info(" Securities data incomplete or missing, proceeding with update").await;
let entry = manager.create_entry(step_name.to_string(), content_reference, data_stage).await?;
logger::log_info("Building securities data from FIGI mappings...").await;
tokio_fs::create_dir_all(&output_dir).await
.context("Failed to create corporate/by_name directory")?;
// Setup checkpoint and log paths for each security type
let common_checkpoint = output_dir.join("common_stocks.jsonl");
let common_log = output_dir.join("common_stocks.log.jsonl");
@@ -104,6 +114,7 @@ pub async fn update_securities() -> anyhow::Result<()> {
if sectors_to_process.is_empty() {
logger::log_info(" All sectors already processed, nothing to do").await;
manager.mark_valid(entry).await?;
return Ok(());
}
@@ -170,48 +181,12 @@ pub async fn update_securities() -> anyhow::Result<()> {
stats.print_summary();
logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await;
track_securities_completion(&manager, &output_dir).await?;
manager.mark_valid(entry).await?;
logger::log_info(" ✓ Securities data marked as complete with integrity tracking").await;
Ok(())
}
/// Track securities data completion with content hash verification
async fn track_securities_completion(
manager: &StateManager,
output_dir: &Path,
) -> anyhow::Result<()> {
// Create content reference for all output files
let content_reference = directory_reference(
output_dir,
Some(vec![
"common_stocks.jsonl".to_string(),
"warrants.jsonl".to_string(),
"options.jsonl".to_string(),
"corporate_bonds.jsonl".to_string(),
"government_bonds.jsonl".to_string(),
]),
Some(vec![
"*.log.jsonl".to_string(), // Exclude log files
"*.tmp".to_string(), // Exclude temp files
"state.jsonl".to_string(), // Exclude internal state tracking
]),
);
// Track completion with:
// - Content reference: All output JSONL files
// - Data stage: Data (7-day TTL) - Securities data relatively stable
// - Dependencies: LEI-FIGI mapping must be valid
manager.update_entry(
"securities_data_complete".to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days)
).await?;
Ok(())
}
/// Loads the list of sectors that have been fully processed
async fn load_processed_sectors(path: &Path) -> anyhow::Result<HashSet<String>> {
let mut sectors = HashSet::new();
@@ -1442,29 +1417,42 @@ pub async fn stream_gleif_csv_and_build_figi_filtered(
/// Check mapping completion and process only unmapped LEIs
pub async fn update_lei_mapping(
paths: &DataPaths,
csv_path: &str,
gleif_date: Option<&str>,
) -> anyhow::Result<bool> {
let dir = DataPaths::new(".")?;
let manager = StateManager::new(&dir.integrity_dir()).await?;
let step_name = "lei_figi_mapping_complete";
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
let date = determine_gleif_date(gleif_date, &dir).await?;
let map_cache_dir = paths.cache_gleif_openfigi_map_dir();
let date = determine_gleif_date(gleif_date, &paths).await?;
let date_dir = map_cache_dir.join(&date);
let manager = StateManager::new(&paths.integrity_dir()).await?;
let step_name = "lei_figi_mapping_complete";
let content_reference = directory_reference(
&date_dir,
Some(vec![
"*/lei_to_figi.jsonl".to_string(), // All sector mapping files
"no_results.jsonl".to_string(), // LEIs with no results
]),
Some(vec![
"*.tmp".to_string(), // Exclude temp files
"*.log".to_string(), // Exclude log files
]),
);
let data_stage = DataStage::Cache; // 24-hour TTL for API data
if manager.is_step_valid(step_name).await? {
logger::log_info(" LEI-FIGI mapping already completed and valid").await;
logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await;
return Ok(true);
}
let entry = manager.create_entry(step_name.to_string(), content_reference, data_stage).await?;
// Get unmapped LEIs (excludes both mapped and no-result LEIs)
let unmapped = get_unmapped_leis(csv_path, &date_dir).await?;
if unmapped.is_empty() {
logger::log_info("✓ All LEIs have been queried (mapped or confirmed no results)").await;
track_lei_mapping_completion(&manager, &dir.integrity_dir()).await?;
manager.mark_valid(entry).await?;
logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await;
return Ok(true);
@@ -1480,7 +1468,7 @@ pub async fn update_lei_mapping(
if still_unmapped.is_empty() {
logger::log_info("✓ All LEIs successfully queried").await;
track_lei_mapping_completion(&manager, &date_dir).await?;
manager.mark_valid(entry).await?;
logger::log_info(" ✓ LEI-FIGI mapping marked as complete with integrity tracking").await;
Ok(true)
} else {
@@ -1488,43 +1476,11 @@ pub async fn update_lei_mapping(
"{} LEIs still unqueried (API errors or rate limits)",
still_unmapped.len()
)).await;
manager.mark_invalid(entry, " Some LEIs remain unqueried".to_string()).await?;
Ok(false)
}
}
/// Track LEI-FIGI mapping completion with content hash verification
async fn track_lei_mapping_completion(
manager: &StateManager,
date_dir: &Path,
) -> anyhow::Result<()> {
// Create content reference for all FIGI mapping files
// This will hash ALL lei_to_figi.jsonl files in sector directories
let content_reference = directory_reference(
date_dir,
Some(vec![
"*/lei_to_figi.jsonl".to_string(), // All sector mapping files
"no_results.jsonl".to_string(), // LEIs with no results
]),
Some(vec![
"*.tmp".to_string(), // Exclude temp files
"*.log".to_string(), // Exclude log files
]),
);
// Track completion with:
// - Content reference: All FIGI mapping files in date directory
// - Data stage: Cache (24-hour TTL) - FIGI data can change frequently
// - Dependencies: None (this is a collection step from external API)
manager.update_entry(
"lei_figi_mapping_complete".to_string(),
content_reference,
DataStage::Cache, // 24-hour TTL for API data
None, // Use default TTL
).await?;
Ok(())
}
/// Load LEIs that were queried but returned no results
async fn load_no_result_leis(date_dir: &Path) -> anyhow::Result<HashSet<String>> {
let mut no_result_leis = HashSet::new();

View File

@@ -20,14 +20,20 @@
// Using a wrapper to ensure the result is properly captured
var extractionResult = (function() {
try {
// Check for "No results found" message using exact selector
const noDataElement = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn');
// Check for "No results found" message using very flexible selector
const noDataElement = document.querySelector('[class*="noData"]') ||
document.querySelector('[class*="error"]') ||
(document.body.innerText && document.body.innerText.includes('No results'));
if (noDataElement) {
return { status: 'no_results', ticker: null, sector: null, exchange: null };
}
// Find the results table using exact selector
const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table');
// Find the results table using most flexible selector possible
// Try multiple strategies to find the table
const table = document.querySelector('table') ||
document.querySelector('[role="table"]') ||
document.querySelector('.table') ||
document.querySelector('#main-content-wrapper > section > section[class*="container"] > div[class*="tableContainer"] > div > table');
if (!table) {
return { status: 'no_results', ticker: null, sector: null, exchange: null };
}

View File

@@ -123,13 +123,20 @@ pub async fn scrape_company_details_by_isin(
}
}
// Additional content validation
// Additional content validation - look for table or noData element anywhere on page
let page_ready: bool = client
.execute(
r#"
const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table');
const noData = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn');
return !!(table || noData);
// Try multiple selector strategies
const table = document.querySelector('table') ||
document.querySelector('[role="table"]') ||
document.querySelector('.table');
const noData = document.querySelector('[class*="noData"]') ||
document.querySelector('[class*="error"]') ||
document.body.innerText.includes('No results');
const hasContent = !!(table || noData);
console.log('Page ready check - table:', !!table, 'noData:', !!noData, 'hasContent:', hasContent);
return hasContent;
"#,
vec![],
)
@@ -203,7 +210,7 @@ pub async fn extract_company_details(
client: &Client,
_isin: &str,
) -> Result<Option<YahooCompanyData>> {
// Wait for page to load - look for either the table or the no-data element
// Wait for page to load - look for either the table or the no-data element using simple selectors
let wait_result: Result<Result<bool, anyhow::Error>> = timeout(
TokioDuration::from_secs(30),
async {
@@ -211,9 +218,14 @@ pub async fn extract_company_details(
let has_content: bool = client
.execute(
r#"
const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table');
const noData = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn');
return !!(table || noData);
// Use flexible selectors that don't depend on exact DOM structure
const table = document.querySelector('table') ||
document.querySelector('[role="table"]') ||
document.querySelector('.table');
const noData = document.querySelector('[class*="noData"]') ||
document.querySelector('[class*="error"]');
const hasContent = !!(table || noData);
return hasContent;
"#,
vec![],
)

View File

@@ -192,7 +192,7 @@ pub fn process_batch(
let mut changes = Vec::new();
let mut removed = std::collections::HashSet::new();
let identity_map = build_identity_lookup(existing);
//let identity_map = build_identity_lookup(existing);
let date_map = build_date_event_lookup(existing);
for new in new_events {

View File

@@ -114,6 +114,11 @@ pub async fn collect_fx_rates(
logger::log_info(&format!(" ✓ Found {} currencies with chart data", count)).await;
return Ok(count);
}
let entry = manager.create_entry(
step_name.to_string(),
content_reference.clone(),
DataStage::Data,
).await?;
logger::log_info(" Updating missing forex data...").await;
@@ -165,12 +170,7 @@ pub async fn collect_fx_rates(
if pending_count == 0 {
logger::log_info(" ✓ All currencies already collected").await;
manager.update_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days for Data stage)
).await?;
manager.mark_valid(entry).await?;
return Ok(collected_currencies.len());
}
@@ -316,12 +316,7 @@ pub async fn collect_fx_rates(
// Mark as complete if not shutdown
if !shutdown_flag.load(Ordering::SeqCst) {
manager.update_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
None, // Use default TTL (7 days for Data stage)
).await?;
manager.mark_valid(entry).await?;
}
Ok(final_success)
}

File diff suppressed because it is too large Load Diff

View File

@@ -7,4 +7,22 @@ macro_rules! check_shutdown {
return Ok(());
}
};
}
/// Mark incomplete state on shutdown
/// Usage: mark_incomplete_on_shutdown!(&manager, "step_name", content_ref, DataStage::Data, &shutdown_flag)?;
#[macro_export]
macro_rules! mark_incomplete_on_shutdown {
($manager:expr, $step_name:expr, $content_ref:expr, $data_stage:expr, $shutdown_flag:expr) => {
if $shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
$manager
.mark_incomplete(
$step_name.to_string(),
$content_ref,
$data_stage,
"Incomplete due to shutdown".to_string(),
)
.await?;
}
};
}