added integrity check to enrichment functions

This commit is contained in:
2026-01-10 17:40:16 +01:00
parent 151c96e35f
commit 766eb803f1
9 changed files with 942 additions and 105 deletions

View File

@@ -14,7 +14,7 @@ pub mod update;
pub mod update_companies;
pub mod update_companies_cleanse;
pub mod update_companies_enrich;
pub mod update_companies_enrich_options_chart;
pub mod update_companies_enrich_option_chart;
pub mod collect_exchanges;

View File

@@ -5,7 +5,7 @@ use crate::check_shutdown;
use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel;
use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data};
use crate::corporate::update_companies_enrich::enrich_companies_with_events;
use crate::corporate::update_companies_enrich_options_chart::{enrich_companies_with_options, enrich_companies_with_chart};
use crate::corporate::update_companies_enrich_option_chart::{enrich_companies_with_option, enrich_companies_with_chart};
use crate::corporate::collect_exchanges::collect_and_save_exchanges;
use crate::economic::update_forex::collect_fx_rates;
use crate::util::directories::DataPaths;
@@ -107,7 +107,7 @@ pub async fn run_full_update(
check_shutdown!(shutdown_flag);
logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await;
let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
let options_count = enrich_companies_with_option(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!("{} companies enriched with options data", options_count)).await;
check_shutdown!(shutdown_flag);

View File

@@ -1,8 +1,9 @@
// src/corporate/update_companies_enrich_events.rs
// src/corporate/update_companies_enrich_events.rs - WITH INTEGRITY MODULE
use super::{types::*, helpers::*};
use crate::config::Config;
use crate::corporate::checkpoint_helpers;
use crate::util::directories::DataPaths;
use crate::util::integrity::{StateManager, directory_reference, DataStage};
use crate::util::logger;
use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule};
@@ -25,7 +26,7 @@ use tokio::sync::mpsc;
/// - Crash-safe persistence (checkpoint + log with fsync)
/// - Smart skip logic (only process incomplete data)
/// - Uses pending queue instead of retry mechanism
/// - Reuses companies_update.log for persistence
/// - Content integrity validation with hash tracking
///
/// # Persistence Strategy
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
@@ -33,6 +34,7 @@ use tokio::sync::mpsc;
/// - On restart: Load checkpoint + replay log
/// - Periodic checkpoints (every 50 companies)
/// - Batched fsync (every 10 writes or 10 seconds)
/// - Hash validation of all event data directories
pub async fn enrich_companies_with_events(
paths: &DataPaths,
_config: &Config,
@@ -43,7 +45,7 @@ pub async fn enrich_companies_with_events(
const CHECKPOINT_INTERVAL: usize = 50;
const FSYNC_BATCH_SIZE: usize = 10;
const FSYNC_INTERVAL_SECS: u64 = 10;
const CONCURRENCY_LIMIT: usize = 50; // Limit parallel enrichment tasks
const CONCURRENCY_LIMIT: usize = 50;
let data_path = paths.data_dir();
@@ -57,29 +59,21 @@ pub async fn enrich_companies_with_events(
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping event enrichment").await;
return Ok(0);
}
// Check if already completed
if state_path.exists() {
let state_content = tokio::fs::read_to_string(&state_path).await?;
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let step_name = "yahoo_events_enrichment_complete";
if manager.is_step_valid(step_name).await? {
logger::log_info(" Yahoo events enrichment already completed and valid").await;
for line in state_content.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
if state.get("yahoo_events_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) {
logger::log_info(" Yahoo events enrichment already completed").await;
// Count enriched companies
let count = checkpoint_helpers::count_enriched_companies(paths, "events").await?;
logger::log_info(&format!(" ✓ Found {} companies with event data", count)).await;
return Ok(count);
}
}
}
// Count enriched companies
let count = checkpoint_helpers::count_enriched_companies(paths, "events").await?;
logger::log_info(&format!(" ✓ Found {} companies with valid event data", count)).await;
return Ok(count);
}
logger::log_info(" Event data needs refresh - starting enrichment").await;
// === RECOVERY PHASE: Track enriched companies ===
let enriched_companies: HashSet<String> = checkpoint_helpers::load_enrichment_progress(&log_path).await?;
@@ -104,7 +98,9 @@ pub async fn enrich_companies_with_events(
if pending_count == 0 {
logger::log_info(" ✓ All companies already enriched").await;
checkpoint_helpers::mark_step_complete(&state_path, "yahoo_events_enrichment_complete").await?;
track_events_completion(&manager, paths, step_name).await?;
return Ok(enriched_companies.len());
}
@@ -263,13 +259,49 @@ pub async fn enrich_companies_with_events(
// Mark as complete if all companies processed
if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) {
checkpoint_helpers::mark_step_complete(&state_path, "yahoo_events_enrichment_complete").await?;
logger::log_info(" ✓ Event enrichment marked as complete").await;
track_events_completion(&manager, paths, step_name).await?;
logger::log_info(" ✓ Event enrichment marked as complete with integrity tracking").await;
}
Ok(final_success)
}
/// Track event enrichment completion with content hash verification
async fn track_events_completion(
manager: &StateManager,
paths: &DataPaths,
step_name: &str,
) -> anyhow::Result<()> {
// Create content reference for all event data
// This will hash ALL files matching the pattern: {company}/events/data.jsonl
let content_reference = directory_reference(
paths.corporate_dir(),
Some(vec![
"*/events/*.jsonl".to_string(), // Main pattern for events data
"*/events/data.jsonl".to_string(), // Specific pattern (more precise)
]),
Some(vec![
"*.log".to_string(), // Exclude log files
"*.tmp".to_string(), // Exclude temp files
"*.bak".to_string(), // Exclude backup files
]),
);
// Track completion with:
// - Content reference: All event directories
// - Data stage: Data (7-day TTL by default)
// - Dependencies: Depends on cleaned companies data
manager.update_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;
Ok(())
}
/// Spawn a single enrichment task with panic isolation
fn spawn_enrichment_task(
company: CompanyCrossPlatformInfo,

View File

@@ -1,8 +1,9 @@
// src/corporate/update_companies_enrich_options_chart.rs
// src/corporate/update_companies_enrich_option_chart.rs
use super::{types::*, helpers::*};
use crate::config::Config;
use crate::corporate::checkpoint_helpers;
use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, directory_reference};
use crate::util::logger;
use crate::scraper::yahoo::{YahooClientPool};
@@ -17,7 +18,7 @@ use futures::stream::{FuturesUnordered, StreamExt};
use serde_json::json;
use tokio::sync::mpsc;
/// Yahoo Options enrichment per corporate company
/// Yahoo Option enrichment per corporate company
///
/// # Features
/// - Graceful shutdown (abort-safe)
@@ -25,14 +26,16 @@ use tokio::sync::mpsc;
/// - Crash-safe persistence (checkpoint + log with fsync)
/// - Smart skip logic (only process incomplete data)
/// - Uses pending queue instead of retry mechanism
/// - Content integrity validation with hash tracking
///
/// # Persistence Strategy
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
/// - Log: companies_options_updates.log (append-only updates)
/// - Log: companies_option_updates.log (append-only updates)
/// - On restart: Load checkpoint + replay log
/// - Periodic checkpoints (every 50 companies)
/// - Batched fsync (every 10 writes or 10 seconds)
pub async fn enrich_companies_with_options(
/// - Hash validation of all option data directories
pub async fn enrich_companies_with_option(
paths: &DataPaths,
_config: &Config,
yahoo_pool: Arc<YahooClientPool>,
@@ -48,37 +51,29 @@ pub async fn enrich_companies_with_options(
// File paths
let input_path = data_path.join("companies_yahoo_cleaned.jsonl");
let log_path = data_path.join("companies_options_updates.log");
let log_path = data_path.join("companies_option_updates.log");
let state_path = data_path.join("state.jsonl");
// Check input exists
if !input_path.exists() {
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping options enrichment").await;
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping option enrichment").await;
return Ok(0);
}
// Check if already completed
if state_path.exists() {
let state_content = tokio::fs::read_to_string(&state_path).await?;
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let step_name = "yahoo_option_enrichment_complete";
if manager.is_step_valid(step_name).await? {
logger::log_info(" Yahoo option enrichment already completed and valid").await;
for line in state_content.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
if state.get("yahoo_options_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) {
logger::log_info(" Yahoo options enrichment already completed").await;
// Count enriched companies
let count = checkpoint_helpers::count_enriched_companies(paths, "options").await?;
logger::log_info(&format!(" ✓ Found {} companies with options data", count)).await;
return Ok(count);
}
}
}
// Count enriched companies
let count = checkpoint_helpers::count_enriched_companies(paths, "option").await?;
logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await;
return Ok(count);
}
logger::log_info(" Option data needs refresh - starting enrichment").await;
// === RECOVERY PHASE: Track enriched companies ===
let enriched_companies = checkpoint_helpers::load_enrichment_progress(&log_path).await?;
@@ -102,12 +97,12 @@ pub async fn enrich_companies_with_options(
)).await;
if pending_count == 0 {
logger::log_info(" ✓ All companies already enriched with options data").await;
checkpoint_helpers::mark_step_complete(&state_path, "yahoo_options_enrichment_complete").await?;
logger::log_info(" ✓ All companies already enriched").await;
track_option_completion(&manager, paths, step_name).await?;
return Ok(enriched_companies.len());
}
// === PROCESSING PHASE: Enrich companies with options ===
// === PROCESSING PHASE: Enrich companies with option ===
// Shared counters
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
@@ -195,7 +190,7 @@ pub async fn enrich_companies_with_options(
log_tx.clone(),
Arc::clone(&semaphore),
Arc::clone(shutdown_flag),
EnrichmentType::Options,
EnrichmentType::Option,
);
tasks.push(task);
}
@@ -206,7 +201,7 @@ pub async fn enrich_companies_with_options(
while let Some(_result) = tasks.next().await {
// Check for shutdown
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown signal received, stopping options enrichment").await;
logger::log_warn("Shutdown signal received, stopping option enrichment").await;
break;
}
@@ -228,7 +223,7 @@ pub async fn enrich_companies_with_options(
log_tx.clone(),
Arc::clone(&semaphore),
Arc::clone(shutdown_flag),
EnrichmentType::Options,
EnrichmentType::Option,
);
tasks.push(task);
}
@@ -245,13 +240,14 @@ pub async fn enrich_companies_with_options(
let final_failed = failed_count.load(Ordering::SeqCst);
logger::log_info(&format!(
" Options enrichment: {} succeeded, {} failed",
" Option enrichment: {} succeeded, {} failed",
final_success, final_failed
)).await;
// Mark as complete if no shutdown
if !shutdown_flag.load(Ordering::SeqCst) {
checkpoint_helpers::mark_step_complete(&state_path, "yahoo_options_enrichment_complete").await?;
track_option_completion(&manager, paths, step_name).await?;
logger::log_info(" ✓ Option enrichment marked as complete with integrity tracking").await;
}
Ok(final_success)
@@ -525,7 +521,7 @@ pub async fn enrich_companies_with_chart(
/// Type of enrichment being performed
#[derive(Clone, Copy)]
enum EnrichmentType {
Options,
Option,
Chart,
}
@@ -553,8 +549,8 @@ fn spawn_enrichment_task(
// Perform enrichment (panic-isolated)
let result = match enrichment_type {
EnrichmentType::Options => {
enrich_company_with_options(&company, &yahoo_pool, &paths).await
EnrichmentType::Option => {
enrich_company_with_option(&company, &yahoo_pool, &paths).await
}
EnrichmentType::Chart => {
enrich_company_with_chart(&company, &yahoo_pool, &paths).await
@@ -590,8 +586,8 @@ fn spawn_enrichment_task(
})
}
/// Enrich a single company with options data
async fn enrich_company_with_options(
/// Enrich a single company with option data
async fn enrich_company_with_option(
company: &CompanyCrossPlatformInfo,
yahoo_pool: &Arc<YahooClientPool>,
paths: &DataPaths,
@@ -603,16 +599,16 @@ async fn enrich_company_with_options(
}
};
// Get options data for all available expiration dates
let options_data = yahoo_pool.get_options_data(&ticker, None).await?;
// Get option data for all available expiration dates
let option_data = yahoo_pool.get_option_data(&ticker, None).await?;
// Only save if we got meaningful data
if options_data.options.is_empty() {
return Err(anyhow::anyhow!("No options data available"));
if option_data.option.is_empty() {
return Err(anyhow::anyhow!("No option data available"));
}
// Save the options data
save_company_data(paths, &company.name, &options_data, "options").await?;
// Save the option data
save_company_data(paths, &company.name, &option_data, "option").await?;
Ok(())
}
@@ -681,4 +677,76 @@ enum LogCommand {
Write(serde_json::Value),
Checkpoint,
Shutdown,
}
/// Track option enrichment completion with content hash verification
async fn track_option_completion(
manager: &StateManager,
paths: &DataPaths,
step_name: &str,
) -> anyhow::Result<()> {
// Create content reference for all option data
// This will hash ALL files matching the pattern: {company}/option/data.jsonl
let content_reference = directory_reference(
paths.corporate_dir(),
Some(vec![
"*/option/*.jsonl".to_string(), // Main pattern for option data
"*/option/data.jsonl".to_string(), // Specific pattern (more precise)
]),
Some(vec![
"*.log".to_string(), // Exclude log files
"*.tmp".to_string(), // Exclude temp files
"*.bak".to_string(), // Exclude backup files
]),
);
// Track completion with:
// - Content reference: All option directories
// - Data stage: Data (7-day TTL by default)
// - Dependencies: Depends on cleaned companies data
manager.update_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;
Ok(())
}
/// Track chart enrichment completion with content hash verification
async fn track_chart_completion(
manager: &StateManager,
paths: &DataPaths,
step_name: &str,
) -> anyhow::Result<()> {
// Create content reference for all chart data
// This will hash ALL files matching the pattern: {company}/chart/data.jsonl
let content_reference = directory_reference(
paths.corporate_dir(),
Some(vec![
"*/chart/*.jsonl".to_string(), // Main pattern for chart data
"*/chart/data.jsonl".to_string(), // Specific pattern (more precise)
]),
Some(vec![
"*.log".to_string(), // Exclude log files
"*.tmp".to_string(), // Exclude temp files
"*.bak".to_string(), // Exclude backup files
]),
);
// Track completion with:
// - Content reference: All chart directories
// - Data stage: Data (7-day TTL by default)
// - Dependencies: Depends on cleaned companies data
manager.update_entry(
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;
Ok(())
}