added integrity check to cleanse functions

This commit is contained in:
2026-01-10 18:42:39 +01:00
parent 766eb803f1
commit ac1345798d
4 changed files with 74 additions and 153 deletions

View File

@@ -8,8 +8,6 @@ use super::types::CompanyCrossPlatformInfo;
use crate::util::logger; use crate::util::logger;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::{Path}; use std::path::{Path};
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use tokio::fs::{File}; use tokio::fs::{File};
use tokio::io::{AsyncWriteExt}; use tokio::io::{AsyncWriteExt};
use anyhow::Result; use anyhow::Result;
@@ -214,34 +212,4 @@ pub async fn count_enriched_companies(
} }
Ok(count) Ok(count)
}
/// Mark a processing step as complete in state file
///
/// Appends a completion marker to the state file with timestamp.
/// Used to track which processing steps have been completed.
pub async fn mark_step_complete<P: AsRef<Path>>(
state_path: P,
completion_key: &str,
) -> Result<()> {
use tokio::fs::OpenOptions;
let completion_entry = serde_json::json!({
completion_key: true,
"completed_at": chrono::Utc::now().to_rfc3339(),
});
let mut state_file = OpenOptions::new()
.create(true)
.append(true)
.open(state_path.as_ref())
.await?;
let state_line = serde_json::to_string(&completion_entry)?;
state_file.write_all(state_line.as_bytes()).await?;
state_file.write_all(b"\n").await?;
state_file.flush().await?;
state_file.sync_all().await?;
Ok(())
} }

View File

@@ -3,6 +3,7 @@ use super::{helpers::*, types::*};
use crate::config::Config; use crate::config::Config;
use crate::corporate::checkpoint_helpers; use crate::corporate::checkpoint_helpers;
use crate::util::directories::DataPaths; use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, file_reference};
use crate::util::logger; use crate::util::logger;
use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule};
@@ -14,7 +15,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::fs::{File, OpenOptions}; use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use serde_json::json;
use tokio::sync::mpsc; use tokio::sync::mpsc;
/// Result of processing a single company /// Result of processing a single company
@@ -33,11 +33,6 @@ enum LogCommand {
Shutdown, Shutdown,
} }
/// Result from processing a single company with priority
struct CompanyTaskResult {
company: CompanyCrossPlatformInfo,
result: CompanyProcessResult,
}
/// Cleansing function to remove companies with missing essential yahoo data for integrity /// Cleansing function to remove companies with missing essential yahoo data for integrity
pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize, anyhow::Error> { pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize, anyhow::Error> {
@@ -51,36 +46,23 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize
logger::log_warn("companies.jsonl not found, skipping cleansing").await; logger::log_warn("companies.jsonl not found, skipping cleansing").await;
return Ok(0); return Ok(0);
} }
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let step_name = "yahoo_companies_cleansed_no_data";
let content_reference = file_reference(&output_path);
if state_path.exists() { if manager.is_step_valid(step_name).await? {
let state_content = tokio::fs::read_to_string(&state_path).await?; let output_content = tokio::fs::read_to_string(&output_path).await?;
let count = output_content.lines()
.filter(|line| !line.trim().is_empty())
.count();
for line in state_content.lines() { logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await;
if line.trim().is_empty() { return Ok(count);
continue;
}
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
if state.get("yahoo_companies_cleansed_no_data").and_then(|v| v.as_bool()).unwrap_or(false) {
logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await;
if output_path.exists() {
let output_content = tokio::fs::read_to_string(&output_path).await?;
let count = output_content.lines()
.filter(|line| !line.trim().is_empty())
.count();
logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await;
return Ok(count);
} else {
logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await;
break;
}
}
}
}
} }
logger::log_info(" Cleansing companies with missing Yahoo data...").await;
logger::log_info(&format!(" Reading from: {:?}", input_path)).await; logger::log_info(&format!(" Reading from: {:?}", input_path)).await;
logger::log_info(&format!(" Writing to: {:?}", output_path)).await; logger::log_info(&format!(" Writing to: {:?}", output_path)).await;
@@ -141,18 +123,17 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize
total_count, valid_count, removed_count total_count, valid_count, removed_count
)).await; )).await;
let yahoo_companies = json!({ // Track completion with:
"yahoo_companies_cleansed_no_data": true, // - Content reference: All event directories
"completed_at": chrono::Utc::now().to_rfc3339(), // - Data stage: Data (7-day TTL by default)
}); // - Dependencies: Depends on cleaned companies data
manager.update_entry(
let mut state_file = File::create(&state_path).await?; step_name.to_string(),
let state_line = serde_json::to_string(&yahoo_companies)?; content_reference,
state_file.write_all(state_line.as_bytes()).await?; DataStage::Data,
state_file.write_all(b"\n").await?; vec!["yahoo_companies_cleansed".to_string()], // Dependency
state_file.flush().await?; None, // Use default TTL (7 days for Data stage)
).await?;
logger::log_info(&format!(" ✓ State file created at: {:?}", state_path)).await;
Ok(valid_count) Ok(valid_count)
} }
@@ -199,6 +180,22 @@ pub async fn companies_yahoo_cleansed_low_profile(
return Ok(0); return Ok(0);
} }
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let step_name = "yahoo_companies_cleansed_no_data";
let content_reference = file_reference(&checkpoint_path);
if manager.is_step_valid(step_name).await? {
let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?;
let count = checkpoint_content.lines()
.filter(|line| !line.trim().is_empty())
.count();
logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo_cleaned.jsonl", count)).await;
return Ok(count);
}
logger::log_info(" Cleansing companies with low Yahoo profile...").await;
if state_path.exists() { if state_path.exists() {
let state_content = tokio::fs::read_to_string(&state_path).await?; let state_content = tokio::fs::read_to_string(&state_path).await?;
@@ -505,7 +502,7 @@ pub async fn companies_yahoo_cleansed_low_profile(
} }
match task_result { match task_result {
Ok(Ok(Some(_result))) => { Ok(Ok(_)) => {
// Success - spawn next task // Success - spawn next task
if let Some(company) = pending.pop() { if let Some(company) = pending.pop() {
spawn_validation_task( spawn_validation_task(
@@ -524,25 +521,6 @@ pub async fn companies_yahoo_cleansed_low_profile(
); );
} }
} }
Ok(Ok(None)) => {
// Filtered or failed - spawn next task
if let Some(company) = pending.pop() {
spawn_validation_task(
company,
&yahoo_pool,
&paths,
&write_tx,
shutdown_flag,
&processed,
&valid_count,
&filtered_low_cap,
&filtered_no_price,
&failed_count,
total,
&mut tasks,
);
}
}
Ok(Err(e)) => { Ok(Err(e)) => {
// Processing error // Processing error
logger::log_error(&format!("Company processing error: {}", e)).await; logger::log_error(&format!("Company processing error: {}", e)).await;
@@ -646,24 +624,17 @@ pub async fn companies_yahoo_cleansed_low_profile(
// Shutdown Yahoo pool // Shutdown Yahoo pool
yahoo_pool.shutdown().await?; yahoo_pool.shutdown().await?;
// Write completion milestone to state.jsonl // Track completion with:
let state_path = data_path.join("state.jsonl"); // - Content reference: All event directories
let yahoo_low_profile = json!({ // - Data stage: Data (7-day TTL by default)
"yahoo_companies_cleansed_low_profile": true, // - Dependencies: Depends on cleaned companies data
"completed_at": chrono::Utc::now().to_rfc3339(), manager.update_entry(
}); step_name.to_string(),
content_reference,
let mut state_file = OpenOptions::new() DataStage::Data,
.create(true) vec!["yahoo_companies_cleansed".to_string()], // Dependency
.append(true) None, // Use default TTL (7 days for Data stage)
.open(&state_path) ).await?;
.await?;
let state_line = serde_json::to_string(&yahoo_low_profile)?;
state_file.write_all(state_line.as_bytes()).await?;
state_file.write_all(b"\n").await?;
state_file.flush().await?;
logger::log_info(&format!(" ✓ State milestone saved to: {:?}", state_path)).await;
Ok(final_count) Ok(final_count)
} }
@@ -681,7 +652,7 @@ fn spawn_validation_task(
filtered_no_price: &Arc<AtomicUsize>, filtered_no_price: &Arc<AtomicUsize>,
failed_count: &Arc<AtomicUsize>, failed_count: &Arc<AtomicUsize>,
total: usize, total: usize,
tasks: &mut FuturesUnordered<tokio::task::JoinHandle<anyhow::Result<Option<CompanyTaskResult>>>>, tasks: &mut FuturesUnordered<tokio::task::JoinHandle<anyhow::Result<Option<()>>>>,
) { ) {
let yahoo_pool_clone = Arc::clone(yahoo_pool); let yahoo_pool_clone = Arc::clone(yahoo_pool);
let paths_clone = Arc::clone(paths); let paths_clone = Arc::clone(paths);
@@ -705,36 +676,29 @@ fn spawn_validation_task(
&*paths_clone, &*paths_clone,
).await; ).await;
let task_result = match result { match result {
CompanyProcessResult::Valid(validated_company) => { CompanyProcessResult::Valid(validated_company) => {
// Send to writer // Send to writer
let _ = write_tx_clone.send(LogCommand::Write(validated_company.clone())).await; let _ = write_tx_clone.send(LogCommand::Write(validated_company)).await;
valid_count_clone.fetch_add(1, Ordering::SeqCst); valid_count_clone.fetch_add(1, Ordering::SeqCst);
Some(CompanyTaskResult {
company: validated_company.clone(),
result: CompanyProcessResult::Valid(validated_company),
})
} }
CompanyProcessResult::FilteredLowCap { name, market_cap } => { CompanyProcessResult::FilteredLowCap { name, market_cap } => {
filtered_low_cap_clone.fetch_add(1, Ordering::SeqCst); filtered_low_cap_clone.fetch_add(1, Ordering::SeqCst);
if filtered_low_cap_clone.load(Ordering::SeqCst) <= 10 { if filtered_low_cap_clone.load(Ordering::SeqCst) <= 10 {
logger::log_info(&format!(" Filtered {} - low market cap: {:.0} EUR", name, market_cap)).await; logger::log_info(&format!(" Filtered {} - low market cap: {:.0} EUR", name, market_cap)).await;
} }
None
} }
CompanyProcessResult::FilteredNoPrice { name } => { CompanyProcessResult::FilteredNoPrice { name } => {
filtered_no_price_clone.fetch_add(1, Ordering::SeqCst); filtered_no_price_clone.fetch_add(1, Ordering::SeqCst);
if filtered_no_price_clone.load(Ordering::SeqCst) <= 10 { if filtered_no_price_clone.load(Ordering::SeqCst) <= 10 {
logger::log_info(&format!(" Filtered {} - no recent price data", name)).await; logger::log_info(&format!(" Filtered {} - no recent price data", name)).await;
} }
None
} }
CompanyProcessResult::Failed { company: failed_company, error, is_transient: _ } => { CompanyProcessResult::Failed { company: failed_company, error, is_transient: _ } => {
failed_count_clone.fetch_add(1, Ordering::SeqCst); failed_count_clone.fetch_add(1, Ordering::SeqCst);
logger::log_warn(&format!(" Failed to process '{}': {}", failed_company.name, error)).await; logger::log_warn(&format!(" Failed to process '{}': {}", failed_company.name, error)).await;
None
} }
}; }
// Progress reporting // Progress reporting
let current = processed_clone.fetch_add(1, Ordering::SeqCst) + 1; let current = processed_clone.fetch_add(1, Ordering::SeqCst) + 1;
@@ -749,7 +713,7 @@ fn spawn_validation_task(
)).await; )).await;
} }
Ok(task_result) Ok(None::<()>)
}); });
tasks.push(task); tasks.push(task);

View File

@@ -65,8 +65,6 @@ pub async fn enrich_companies_with_option(
if manager.is_step_valid(step_name).await? { if manager.is_step_valid(step_name).await? {
logger::log_info(" Yahoo option enrichment already completed and valid").await; logger::log_info(" Yahoo option enrichment already completed and valid").await;
// Count enriched companies
let count = checkpoint_helpers::count_enriched_companies(paths, "option").await?; let count = checkpoint_helpers::count_enriched_companies(paths, "option").await?;
logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await; logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await;
return Ok(count); return Ok(count);
@@ -292,29 +290,19 @@ pub async fn enrich_companies_with_chart(
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping chart enrichment").await; logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping chart enrichment").await;
return Ok(0); return Ok(0);
} }
// Check if already completed let manager = StateManager::new(&state_path, &data_path.to_path_buf());
if state_path.exists() { let step_name = "yahoo_chart_enrichment_complete";
let state_content = tokio::fs::read_to_string(&state_path).await?;
if manager.is_step_valid(step_name).await? {
for line in state_content.lines() { logger::log_info(" Yahoo chart enrichment already completed and valid").await;
if line.trim().is_empty() { let count = checkpoint_helpers::count_enriched_companies(paths, "chart").await?;
continue; logger::log_info(&format!(" ✓ Found {} companies with valid chart data", count)).await;
} return Ok(count);
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
if state.get("yahoo_chart_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) {
logger::log_info(" Yahoo chart enrichment already completed").await;
// Count enriched companies
let count = checkpoint_helpers::count_enriched_companies(paths, "chart").await?;
logger::log_info(&format!(" ✓ Found {} companies with chart data", count)).await;
return Ok(count);
}
}
}
} }
logger::log_info(" Chart data needs refresh - starting enrichment").await;
// === RECOVERY PHASE: Track enriched companies === // === RECOVERY PHASE: Track enriched companies ===
let mut enriched_companies: HashSet<String> = HashSet::new(); let mut enriched_companies: HashSet<String> = HashSet::new();
@@ -363,8 +351,8 @@ pub async fn enrich_companies_with_chart(
)).await; )).await;
if pending_count == 0 { if pending_count == 0 {
logger::log_info(" ✓ All companies already enriched with chart data").await; logger::log_info(" ✓ All companies already enriched").await;
checkpoint_helpers::mark_step_complete(&state_path, "yahoo_chart_enrichment_complete").await?; track_chart_completion(&manager, paths, step_name).await?;
return Ok(enriched_companies.len()); return Ok(enriched_companies.len());
} }
@@ -512,7 +500,8 @@ pub async fn enrich_companies_with_chart(
// Mark as complete if no shutdown // Mark as complete if no shutdown
if !shutdown_flag.load(Ordering::SeqCst) { if !shutdown_flag.load(Ordering::SeqCst) {
checkpoint_helpers::mark_step_complete(&state_path, "yahoo_chart_enrichment_complete").await?; track_chart_completion(&manager, paths, step_name).await?;
logger::log_info(" ✓ Chart enrichment marked as complete with integrity tracking").await;
} }
Ok(final_success) Ok(final_success)

View File

@@ -18,5 +18,5 @@ pub use util::logger;
pub use util::macros; pub use util::macros;
pub use scraper::yahoo::{ pub use scraper::yahoo::{
YahooClient, YahooClientPool, QuoteSummaryModule, QuoteSummary, ChartData, YahooClient, YahooClientPool, QuoteSummaryModule, QuoteSummary, ChartData,
OptionsData, SearchResult OptionData, SearchResult
}; };