From ac1345798d0da877b981341c93f15b106c3b8921 Mon Sep 17 00:00:00 2001 From: donpat1to Date: Sat, 10 Jan 2026 18:42:39 +0100 Subject: [PATCH] added integrity check to cleanse functions --- src/corporate/checkpoint_helpers.rs | 32 ---- src/corporate/update_companies_cleanse.rs | 152 +++++++----------- .../update_companies_enrich_option_chart.rs | 41 ++--- src/lib.rs | 2 +- 4 files changed, 74 insertions(+), 153 deletions(-) diff --git a/src/corporate/checkpoint_helpers.rs b/src/corporate/checkpoint_helpers.rs index 9e27424..034ef64 100644 --- a/src/corporate/checkpoint_helpers.rs +++ b/src/corporate/checkpoint_helpers.rs @@ -8,8 +8,6 @@ use super::types::CompanyCrossPlatformInfo; use crate::util::logger; use std::collections::HashMap; use std::path::{Path}; -use chrono::{DateTime, Duration, Utc}; -use serde::{Deserialize, Serialize}; use tokio::fs::{File}; use tokio::io::{AsyncWriteExt}; use anyhow::Result; @@ -214,34 +212,4 @@ pub async fn count_enriched_companies( } Ok(count) -} - -/// Mark a processing step as complete in state file -/// -/// Appends a completion marker to the state file with timestamp. -/// Used to track which processing steps have been completed. -pub async fn mark_step_complete>( - state_path: P, - completion_key: &str, -) -> Result<()> { - use tokio::fs::OpenOptions; - - let completion_entry = serde_json::json!({ - completion_key: true, - "completed_at": chrono::Utc::now().to_rfc3339(), - }); - - let mut state_file = OpenOptions::new() - .create(true) - .append(true) - .open(state_path.as_ref()) - .await?; - - let state_line = serde_json::to_string(&completion_entry)?; - state_file.write_all(state_line.as_bytes()).await?; - state_file.write_all(b"\n").await?; - state_file.flush().await?; - state_file.sync_all().await?; - - Ok(()) } \ No newline at end of file diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index 4d828a2..5d33fdb 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -3,6 +3,7 @@ use super::{helpers::*, types::*}; use crate::config::Config; use crate::corporate::checkpoint_helpers; use crate::util::directories::DataPaths; +use crate::util::integrity::{DataStage, StateManager, file_reference}; use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; @@ -14,7 +15,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use futures::stream::{FuturesUnordered, StreamExt}; -use serde_json::json; use tokio::sync::mpsc; /// Result of processing a single company @@ -33,11 +33,6 @@ enum LogCommand { Shutdown, } -/// Result from processing a single company with priority -struct CompanyTaskResult { - company: CompanyCrossPlatformInfo, - result: CompanyProcessResult, -} /// Cleansing function to remove companies with missing essential yahoo data for integrity pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result { @@ -51,36 +46,23 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result(line) { - if state.get("yahoo_companies_cleansed_no_data").and_then(|v| v.as_bool()).unwrap_or(false) { - logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await; - - if output_path.exists() { - let output_content = tokio::fs::read_to_string(&output_path).await?; - let count = output_content.lines() - .filter(|line| !line.trim().is_empty()) - .count(); - - logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await; - return Ok(count); - } else { - logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await; - break; - } - } - } - } + logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await; + return Ok(count); } + logger::log_info(" Cleansing companies with missing Yahoo data...").await; + logger::log_info(&format!(" Reading from: {:?}", input_path)).await; logger::log_info(&format!(" Writing to: {:?}", output_path)).await; @@ -141,18 +123,17 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result { + Ok(Ok(_)) => { // Success - spawn next task if let Some(company) = pending.pop() { spawn_validation_task( @@ -524,25 +521,6 @@ pub async fn companies_yahoo_cleansed_low_profile( ); } } - Ok(Ok(None)) => { - // Filtered or failed - spawn next task - if let Some(company) = pending.pop() { - spawn_validation_task( - company, - &yahoo_pool, - &paths, - &write_tx, - shutdown_flag, - &processed, - &valid_count, - &filtered_low_cap, - &filtered_no_price, - &failed_count, - total, - &mut tasks, - ); - } - } Ok(Err(e)) => { // Processing error logger::log_error(&format!("Company processing error: {}", e)).await; @@ -646,24 +624,17 @@ pub async fn companies_yahoo_cleansed_low_profile( // Shutdown Yahoo pool yahoo_pool.shutdown().await?; - // Write completion milestone to state.jsonl - let state_path = data_path.join("state.jsonl"); - let yahoo_low_profile = json!({ - "yahoo_companies_cleansed_low_profile": true, - "completed_at": chrono::Utc::now().to_rfc3339(), - }); - - let mut state_file = OpenOptions::new() - .create(true) - .append(true) - .open(&state_path) - .await?; - let state_line = serde_json::to_string(&yahoo_low_profile)?; - state_file.write_all(state_line.as_bytes()).await?; - state_file.write_all(b"\n").await?; - state_file.flush().await?; - - logger::log_info(&format!(" ✓ State milestone saved to: {:?}", state_path)).await; + // Track completion with: + // - Content reference: All event directories + // - Data stage: Data (7-day TTL by default) + // - Dependencies: Depends on cleaned companies data + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + vec!["yahoo_companies_cleansed".to_string()], // Dependency + None, // Use default TTL (7 days for Data stage) + ).await?; Ok(final_count) } @@ -681,7 +652,7 @@ fn spawn_validation_task( filtered_no_price: &Arc, failed_count: &Arc, total: usize, - tasks: &mut FuturesUnordered>>>, + tasks: &mut FuturesUnordered>>>, ) { let yahoo_pool_clone = Arc::clone(yahoo_pool); let paths_clone = Arc::clone(paths); @@ -705,36 +676,29 @@ fn spawn_validation_task( &*paths_clone, ).await; - let task_result = match result { + match result { CompanyProcessResult::Valid(validated_company) => { // Send to writer - let _ = write_tx_clone.send(LogCommand::Write(validated_company.clone())).await; + let _ = write_tx_clone.send(LogCommand::Write(validated_company)).await; valid_count_clone.fetch_add(1, Ordering::SeqCst); - Some(CompanyTaskResult { - company: validated_company.clone(), - result: CompanyProcessResult::Valid(validated_company), - }) } CompanyProcessResult::FilteredLowCap { name, market_cap } => { filtered_low_cap_clone.fetch_add(1, Ordering::SeqCst); if filtered_low_cap_clone.load(Ordering::SeqCst) <= 10 { logger::log_info(&format!(" Filtered {} - low market cap: {:.0} EUR", name, market_cap)).await; } - None } CompanyProcessResult::FilteredNoPrice { name } => { filtered_no_price_clone.fetch_add(1, Ordering::SeqCst); if filtered_no_price_clone.load(Ordering::SeqCst) <= 10 { logger::log_info(&format!(" Filtered {} - no recent price data", name)).await; } - None } CompanyProcessResult::Failed { company: failed_company, error, is_transient: _ } => { failed_count_clone.fetch_add(1, Ordering::SeqCst); logger::log_warn(&format!(" Failed to process '{}': {}", failed_company.name, error)).await; - None } - }; + } // Progress reporting let current = processed_clone.fetch_add(1, Ordering::SeqCst) + 1; @@ -749,7 +713,7 @@ fn spawn_validation_task( )).await; } - Ok(task_result) + Ok(None::<()>) }); tasks.push(task); diff --git a/src/corporate/update_companies_enrich_option_chart.rs b/src/corporate/update_companies_enrich_option_chart.rs index ed20abf..726fc98 100644 --- a/src/corporate/update_companies_enrich_option_chart.rs +++ b/src/corporate/update_companies_enrich_option_chart.rs @@ -65,8 +65,6 @@ pub async fn enrich_companies_with_option( if manager.is_step_valid(step_name).await? { logger::log_info(" Yahoo option enrichment already completed and valid").await; - - // Count enriched companies let count = checkpoint_helpers::count_enriched_companies(paths, "option").await?; logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await; return Ok(count); @@ -292,29 +290,19 @@ pub async fn enrich_companies_with_chart( logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping chart enrichment").await; return Ok(0); } - - // Check if already completed - if state_path.exists() { - let state_content = tokio::fs::read_to_string(&state_path).await?; - - for line in state_content.lines() { - if line.trim().is_empty() { - continue; - } - - if let Ok(state) = serde_json::from_str::(line) { - if state.get("yahoo_chart_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) { - logger::log_info(" Yahoo chart enrichment already completed").await; - - // Count enriched companies - let count = checkpoint_helpers::count_enriched_companies(paths, "chart").await?; - logger::log_info(&format!(" ✓ Found {} companies with chart data", count)).await; - return Ok(count); - } - } - } + + let manager = StateManager::new(&state_path, &data_path.to_path_buf()); + let step_name = "yahoo_chart_enrichment_complete"; + + if manager.is_step_valid(step_name).await? { + logger::log_info(" Yahoo chart enrichment already completed and valid").await; + let count = checkpoint_helpers::count_enriched_companies(paths, "chart").await?; + logger::log_info(&format!(" ✓ Found {} companies with valid chart data", count)).await; + return Ok(count); } + logger::log_info(" Chart data needs refresh - starting enrichment").await; + // === RECOVERY PHASE: Track enriched companies === let mut enriched_companies: HashSet = HashSet::new(); @@ -363,8 +351,8 @@ pub async fn enrich_companies_with_chart( )).await; if pending_count == 0 { - logger::log_info(" ✓ All companies already enriched with chart data").await; - checkpoint_helpers::mark_step_complete(&state_path, "yahoo_chart_enrichment_complete").await?; + logger::log_info(" ✓ All companies already enriched").await; + track_chart_completion(&manager, paths, step_name).await?; return Ok(enriched_companies.len()); } @@ -512,7 +500,8 @@ pub async fn enrich_companies_with_chart( // Mark as complete if no shutdown if !shutdown_flag.load(Ordering::SeqCst) { - checkpoint_helpers::mark_step_complete(&state_path, "yahoo_chart_enrichment_complete").await?; + track_chart_completion(&manager, paths, step_name).await?; + logger::log_info(" ✓ Chart enrichment marked as complete with integrity tracking").await; } Ok(final_success) diff --git a/src/lib.rs b/src/lib.rs index d2cd5d4..8ea1791 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,5 +18,5 @@ pub use util::logger; pub use util::macros; pub use scraper::yahoo::{ YahooClient, YahooClientPool, QuoteSummaryModule, QuoteSummary, ChartData, - OptionsData, SearchResult + OptionData, SearchResult };