diff --git a/logs/checkpoint_dependencies.dot b/logs/checkpoint_dependencies.dot new file mode 100644 index 0000000..ef63945 --- /dev/null +++ b/logs/checkpoint_dependencies.dot @@ -0,0 +1,6 @@ +digraph Dependencies { + rankdir=LR; + node [shape=box]; + + +} diff --git a/src/checkpoint_dependencies.toml b/src/checkpoint_dependencies.toml index bc46259..5668e57 100644 --- a/src/checkpoint_dependencies.toml +++ b/src/checkpoint_dependencies.toml @@ -1,13 +1,57 @@ -[checkpoints.lei_figi_mapping_complete] +# checkpoint_dependencies.toml - Complete configuration + +# ============================================================================ +# COLLECTION STAGE (No dependencies) +# ============================================================================ + +[checkpoints.exchange_collection_complete] +description = "Yahoo exchanges collected and validated" depends_on = [] -[checkpoints.securities_data_complete] -depends_on = ["lei_figi_mapping_complete"] +[checkpoints.lei_figi_mapping_complete] +description = "LEI-to-FIGI mappings from OpenFIGI API" +depends_on = [] + +# ============================================================================ +# CLEANSING STAGE (Depends on collection) +# ============================================================================ + +[checkpoints.yahoo_companies_cleansed] +description = "Company data cleansed and validated" +depends_on = ["exchange_collection_complete"] + +# ============================================================================ +# ENRICHMENT GROUP (All depend on cleansed companies) +# ============================================================================ [groups.enrichment_group] -members = ["yahoo_events_enrichment_complete", "yahoo_options_enrichment_complete"] +description = "Yahoo Finance enrichment functions" +members = [ + "yahoo_events_enrichment_complete", + "yahoo_options_enrichment_complete", + "yahoo_chart_enrichment_complete" +] depends_on = ["yahoo_companies_cleansed"] [checkpoints.yahoo_events_enrichment_complete] +description = "Corporate events enriched for all companies" depends_on = [] -group = "enrichment_group" \ No newline at end of file +group = "enrichment_group" + +[checkpoints.yahoo_options_enrichment_complete] +description = "Options data enriched for all companies" +depends_on = [] +group = "enrichment_group" + +[checkpoints.yahoo_chart_enrichment_complete] +description = "Chart data enriched for all companies" +depends_on = [] +group = "enrichment_group" + +# ============================================================================ +# SECURITIES PROCESSING (Depends on LEI mapping) +# ============================================================================ + +[checkpoints.securities_data_complete] +description = "Securities data built from FIGI mappings" +depends_on = ["lei_figi_mapping_complete"] \ No newline at end of file diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 9b398f4..ff810df 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -14,7 +14,6 @@ pub mod update; pub mod update_companies; pub mod update_companies_cleanse; pub mod update_companies_enrich; -pub mod update_companies_enrich_option_chart; pub mod collect_exchanges; diff --git a/src/corporate/update.rs b/src/corporate/update.rs index dbc9095..b08bec5 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -4,8 +4,7 @@ use crate::config::Config; use crate::check_shutdown; use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel; use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data}; -use crate::corporate::update_companies_enrich::enrich_companies_with_events; -use crate::corporate::update_companies_enrich_option_chart::{enrich_companies_with_option, enrich_companies_with_chart}; +use crate::corporate::update_companies_enrich::{enrich_companies_with_events, enrich_companies_with_chart, enrich_companies_with_option}; use crate::corporate::collect_exchanges::collect_and_save_exchanges; use crate::economic::yahoo_update_forex::collect_fx_rates; use crate::util::directories::DataPaths; diff --git a/src/corporate/update_companies_enrich.rs b/src/corporate/update_companies_enrich.rs index 2b9687d..101886c 100644 --- a/src/corporate/update_companies_enrich.rs +++ b/src/corporate/update_companies_enrich.rs @@ -1,4 +1,4 @@ -// src/corporate/update_companies_enrich_events.rs - WITH INTEGRITY MODULE +// src/corporate/update_companies_enrich.rs - MERGED VERSION WITH GENERIC ENRICHMENT use super::{types::*, helpers::*}; use crate::config::Config; use crate::corporate::checkpoint_helpers; @@ -8,7 +8,7 @@ use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; use std::result::Result::Ok; -use chrono::Utc; +use chrono::{TimeZone, Utc}; use std::collections::{HashSet}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -17,6 +17,27 @@ use tokio::io::{AsyncWriteExt}; use futures::stream::{FuturesUnordered, StreamExt}; use serde_json::json; use tokio::sync::mpsc; +use std::future::Future; +use std::pin::Pin; + +/// Log command enum (shared across all enrichment types) +enum LogCommand { + Write(serde_json::Value), + Checkpoint, + Shutdown, +} + +/// Type alias for enrichment function +type EnrichmentFn = Arc< + dyn Fn(CompanyCrossPlatformInfo, Arc, DataPaths) + -> Pin> + Send>> + + Send + + Sync +>; + +// ============================================================================ +// EVENTS ENRICHMENT +// ============================================================================ /// Yahoo Event enrichment per corporate company /// @@ -104,6 +125,16 @@ pub async fn enrich_companies_with_events( // === PROCESSING PHASE: Enrich companies with events === + // Create enrichment function + let enrichment_fn: EnrichmentFn = Arc::new(move |company, pool, paths| { + let company = company.clone(); + let pool = Arc::clone(&pool); + let paths = paths.clone(); + Box::pin(async move { + enrich_company_with_events(&company, &pool, &paths).await + }) + }); + // Shared counters let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len())); let success_count = Arc::new(AtomicUsize::new(enriched_companies.len())); @@ -113,64 +144,14 @@ pub async fn enrich_companies_with_events( let (log_tx, mut log_rx) = mpsc::channel::(1000); // Spawn log writer task - let log_writer_handle = { - let log_path = log_path.clone(); - let processed_count = Arc::clone(&processed_count); - let total_companies = total_companies; - - tokio::spawn(async move { - let mut log_file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path) - .await - .expect("Failed to open log file"); - - let mut write_count = 0; - let mut last_fsync = tokio::time::Instant::now(); - - while let Some(cmd) = log_rx.recv().await { - match cmd { - LogCommand::Write(entry) => { - let json_line = serde_json::to_string(&entry).expect("Serialization failed"); - log_file.write_all(json_line.as_bytes()).await.expect("Write failed"); - log_file.write_all(b"\n").await.expect("Write failed"); - - write_count += 1; - - // Batched fsync - if write_count >= FSYNC_BATCH_SIZE - || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS - { - log_file.flush().await.expect("Flush failed"); - log_file.sync_all().await.expect("Fsync failed"); - write_count = 0; - last_fsync = tokio::time::Instant::now(); - } - } - LogCommand::Checkpoint => { - // Force fsync on checkpoint - log_file.flush().await.expect("Flush failed"); - log_file.sync_all().await.expect("Fsync failed"); - write_count = 0; - last_fsync = tokio::time::Instant::now(); - - let current = processed_count.load(Ordering::SeqCst); - logger::log_info(&format!( - " Checkpoint: {}/{} companies processed", - current, total_companies - )).await; - } - LogCommand::Shutdown => { - // Final fsync before shutdown - log_file.flush().await.expect("Flush failed"); - log_file.sync_all().await.expect("Fsync failed"); - break; - } - } - } - }) - }; + let log_writer_handle = spawn_log_writer( + log_path, + log_rx, + Arc::clone(&processed_count), + total_companies, + FSYNC_BATCH_SIZE, + FSYNC_INTERVAL_SECS, + ); // Process companies concurrently with task panic isolation let mut tasks = FuturesUnordered::new(); @@ -190,6 +171,7 @@ pub async fn enrich_companies_with_events( log_tx.clone(), Arc::clone(&semaphore), Arc::clone(shutdown_flag), + Arc::clone(&enrichment_fn), ); tasks.push(task); } @@ -212,11 +194,17 @@ pub async fn enrich_companies_with_events( // Check for shutdown if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected, stopping new enrichment tasks...").await; + logger::log_warn("Shutdown signal received, stopping event enrichment").await; break; } - // Spawn next task + // Checkpoint periodically + checkpoint_counter += 1; + if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { + let _ = log_tx.send(LogCommand::Checkpoint).await; + } + + // Spawn next task if available if let Some(company) = pending_iter.next() { let task = spawn_enrichment_task( company, @@ -228,18 +216,13 @@ pub async fn enrich_companies_with_events( log_tx.clone(), Arc::clone(&semaphore), Arc::clone(shutdown_flag), + Arc::clone(&enrichment_fn), ); tasks.push(task); } - - // Periodic checkpoint - checkpoint_counter += 1; - if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { - let _ = log_tx.send(LogCommand::Checkpoint).await; - } } - // Shutdown log writer + // Signal log writer to shutdown let _ = log_tx.send(LogCommand::Shutdown).await; drop(log_tx); @@ -299,63 +282,6 @@ async fn track_events_completion( Ok(()) } -/// Spawn a single enrichment task with panic isolation -fn spawn_enrichment_task( - company: CompanyCrossPlatformInfo, - yahoo_pool: Arc, - paths: DataPaths, - processed_count: Arc, - success_count: Arc, - failed_count: Arc, - log_tx: mpsc::Sender, - semaphore: Arc, - shutdown_flag: Arc, -) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - // Acquire semaphore permit - let _permit = semaphore.acquire().await.expect("Semaphore closed"); - - // Check shutdown before processing - if shutdown_flag.load(Ordering::SeqCst) { - return; - } - - // Process company - let result = enrich_company_with_events( - &company, - &yahoo_pool, - &paths, - ).await; - - // Update counters - processed_count.fetch_add(1, Ordering::SeqCst); - - let status = match result { - Ok(_) => { - success_count.fetch_add(1, Ordering::SeqCst); - "enriched" - } - Err(e) => { - failed_count.fetch_add(1, Ordering::SeqCst); - logger::log_warn(&format!( - " Failed to enrich {}: {}", - company.name, e - )).await; - "failed" - } - }; - - // Log result - let log_entry = json!({ - "company_name": company.name, - "status": status, - "timestamp": Utc::now().to_rfc3339(), - }); - - let _ = log_tx.send(LogCommand::Write(log_entry)).await; - }) -} - /// Enrich a single company with event data async fn enrich_company_with_events( company: &CompanyCrossPlatformInfo, @@ -446,9 +372,690 @@ async fn save_company_event_data( Ok(()) } -/// Log command enum -enum LogCommand { - Write(serde_json::Value), - Checkpoint, - Shutdown, +// ============================================================================ +// OPTION ENRICHMENT +// ============================================================================ + +/// Yahoo Option enrichment per corporate company +/// +/// # Features +/// - Graceful shutdown (abort-safe) +/// - Task panic isolation (tasks fail independently) +/// - Crash-safe persistence (checkpoint + log with fsync) +/// - Smart skip logic (only process incomplete data) +/// - Uses pending queue instead of retry mechanism +/// - Content integrity validation with hash tracking +/// +/// # Persistence Strategy +/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) +/// - Log: companies_option_updates.log (append-only updates) +/// - On restart: Load checkpoint + replay log +/// - Periodic checkpoints (every 50 companies) +/// - Batched fsync (every 10 writes or 10 seconds) +/// - Hash validation of all option data directories +pub async fn enrich_companies_with_option( + paths: &DataPaths, + _config: &Config, + yahoo_pool: Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 50; + const FSYNC_BATCH_SIZE: usize = 10; + const FSYNC_INTERVAL_SECS: u64 = 10; + const CONCURRENCY_LIMIT: usize = 50; + + let data_path = paths.data_dir(); + + // File paths + let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); + let log_path = data_path.join("companies_option_updates.log"); + let state_path = data_path.join("state.jsonl"); + + // Check input exists + if !input_path.exists() { + logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping option enrichment").await; + return Ok(0); + } + + let manager = StateManager::new(&state_path, &data_path.to_path_buf())?; + let step_name = "yahoo_option_enrichment_complete"; + + if manager.is_step_valid(step_name).await? { + logger::log_info(" Yahoo option enrichment already completed and valid").await; + let count = checkpoint_helpers::count_enriched_companies(paths, "option").await?; + logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await; + return Ok(count); + } + + logger::log_info(" Option data needs refresh - starting enrichment").await; + + // === RECOVERY PHASE: Track enriched companies === + let enriched_companies = checkpoint_helpers::load_enrichment_progress(&log_path).await?; + + // Load all companies from input + logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await; + let companies = load_companies_from_jsonl(&input_path).await?; + let total_companies = companies.len(); + logger::log_info(&format!("Found {} companies to process", total_companies)).await; + + // Filter companies that need enrichment + let pending_companies: Vec = companies + .into_iter() + .filter(|company| !enriched_companies.contains(&company.name)) + .collect(); + + let pending_count = pending_companies.len(); + logger::log_info(&format!( + " {} already enriched, {} pending", + enriched_companies.len(), + pending_count + )).await; + + if pending_count == 0 { + logger::log_info(" ✓ All companies already enriched").await; + track_option_completion(&manager, paths, step_name).await?; + return Ok(enriched_companies.len()); + } + + // === PROCESSING PHASE: Enrich companies with option === + + // Create enrichment function + let enrichment_fn: EnrichmentFn = Arc::new(move |company, pool, paths| { + let company = company.clone(); + let pool = Arc::clone(&pool); + let paths = paths.clone(); + Box::pin(async move { + enrich_company_with_option(&company, &pool, &paths).await + }) + }); + + // Shared counters + let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let success_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let failed_count = Arc::new(AtomicUsize::new(0)); + + // Log writer channel with batching and fsync + let (log_tx, mut log_rx) = mpsc::channel::(1000); + + // Spawn log writer task + let log_writer_handle = spawn_log_writer( + log_path, + log_rx, + Arc::clone(&processed_count), + total_companies, + FSYNC_BATCH_SIZE, + FSYNC_INTERVAL_SECS, + ); + + // Process companies concurrently with task panic isolation + let mut tasks = FuturesUnordered::new(); + let mut pending_iter = pending_companies.into_iter(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT)); + + // Initial batch of tasks + for _ in 0..CONCURRENCY_LIMIT.min(pending_count) { + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + Arc::clone(&enrichment_fn), + ); + tasks.push(task); + } + } + + // Process tasks as they complete and spawn new ones + let mut checkpoint_counter = 0; + while let Some(_result) = tasks.next().await { + // Check for shutdown + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown signal received, stopping option enrichment").await; + break; + } + + // Checkpoint periodically + checkpoint_counter += 1; + if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { + let _ = log_tx.send(LogCommand::Checkpoint).await; + } + + // Spawn next task if available + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + Arc::clone(&enrichment_fn), + ); + tasks.push(task); + } + } + + // Signal log writer to shutdown + let _ = log_tx.send(LogCommand::Shutdown).await; + drop(log_tx); + + // Wait for log writer to finish + let _ = log_writer_handle.await; + + let final_processed = processed_count.load(Ordering::SeqCst); + let final_success = success_count.load(Ordering::SeqCst); + let final_failed = failed_count.load(Ordering::SeqCst); + + logger::log_info(&format!( + " Option enrichment summary: {} total, {} success, {} failed", + final_processed, final_success, final_failed + )).await; + + // Mark as complete if all companies processed + if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) { + track_option_completion(&manager, paths, step_name).await?; + logger::log_info(" ✓ Option enrichment marked as complete with integrity tracking").await; + } + + Ok(final_success) +} + +/// Track option enrichment completion with content hash verification +async fn track_option_completion( + manager: &StateManager, + paths: &DataPaths, + step_name: &str, +) -> anyhow::Result<()> { + // Create content reference for all option data + // This will hash ALL files matching the pattern: {company}/option/data.jsonl + let content_reference = directory_reference( + paths.corporate_dir(), + Some(vec![ + "*/option/*.jsonl".to_string(), // Main pattern for option data + "*/option/data.jsonl".to_string(), // Specific pattern (more precise) + ]), + Some(vec![ + "*.log".to_string(), // Exclude log files + "*.tmp".to_string(), // Exclude temp files + "*.bak".to_string(), // Exclude backup files + ]), + ); + + // Track completion with: + // - Content reference: All option directories + // - Data stage: Data (7-day TTL by default) + // - Dependencies: Depends on cleaned companies data + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + None, // Use default TTL (7 days for Data stage) + ).await?; + + Ok(()) +} + +/// Enrich a single company with option data +async fn enrich_company_with_option( + company: &CompanyCrossPlatformInfo, + yahoo_pool: &Arc, + paths: &DataPaths, +) -> anyhow::Result<()> { + let ticker = match extract_first_yahoo_ticker(company) { + Some(t) => t, + None => { + return Err(anyhow::anyhow!("No valid Yahoo ticker found")); + } + }; + + // Get option data for all available expiration dates + let option_data = yahoo_pool.get_option_data(&ticker, None).await?; + + // Only save if we got meaningful data + if option_data.option.is_empty() { + return Err(anyhow::anyhow!("No option data available")); + } + + // Save the option data + save_company_data(paths, &company.name, &option_data, "option").await?; + + Ok(()) +} + +// ============================================================================ +// CHART ENRICHMENT +// ============================================================================ + +/// Yahoo Chart enrichment per corporate company +/// +/// # Features +/// - Graceful shutdown (abort-safe) +/// - Task panic isolation (tasks fail independently) +/// - Crash-safe persistence (checkpoint + log with fsync) +/// - Smart skip logic (only process incomplete data) +/// - Uses pending queue instead of retry mechanism +/// - Content integrity validation with hash tracking +/// +/// # Persistence Strategy +/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) +/// - Log: companies_chart_updates.log (append-only updates) +/// - On restart: Load checkpoint + replay log +/// - Periodic checkpoints (every 50 companies) +/// - Batched fsync (every 10 writes or 10 seconds) +/// - Hash validation of all chart data directories +pub async fn enrich_companies_with_chart( + paths: &DataPaths, + _config: &Config, + yahoo_pool: Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 50; + const FSYNC_BATCH_SIZE: usize = 10; + const FSYNC_INTERVAL_SECS: u64 = 10; + const CONCURRENCY_LIMIT: usize = 50; + + let data_path = paths.data_dir(); + + // File paths + let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); + let log_path = data_path.join("companies_chart_updates.log"); + let state_path = data_path.join("state.jsonl"); + + // Check input exists + if !input_path.exists() { + logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping chart enrichment").await; + return Ok(0); + } + + let manager = StateManager::new(&state_path, &data_path.to_path_buf())?; + let step_name = "yahoo_chart_enrichment_complete"; + + if manager.is_step_valid(step_name).await? { + logger::log_info(" Yahoo chart enrichment already completed and valid").await; + let count = checkpoint_helpers::count_enriched_companies(paths, "chart").await?; + logger::log_info(&format!(" ✓ Found {} companies with valid chart data", count)).await; + return Ok(count); + } + + logger::log_info(" Chart data needs refresh - starting enrichment").await; + + // === RECOVERY PHASE: Track enriched companies === + let enriched_companies = checkpoint_helpers::load_enrichment_progress(&log_path).await?; + + // Load all companies from input + logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await; + let companies = load_companies_from_jsonl(&input_path).await?; + let total_companies = companies.len(); + logger::log_info(&format!("Found {} companies to process", total_companies)).await; + + // Filter companies that need enrichment + let pending_companies: Vec = companies + .into_iter() + .filter(|company| !enriched_companies.contains(&company.name)) + .collect(); + + let pending_count = pending_companies.len(); + logger::log_info(&format!( + " {} already enriched, {} pending", + enriched_companies.len(), + pending_count + )).await; + + if pending_count == 0 { + logger::log_info(" ✓ All companies already enriched").await; + track_chart_completion(&manager, paths, step_name).await?; + return Ok(enriched_companies.len()); + } + + // === PROCESSING PHASE: Enrich companies with chart === + + // Create enrichment function + let enrichment_fn: EnrichmentFn = Arc::new(move |company, pool, paths| { + let company = company.clone(); + let pool = Arc::clone(&pool); + let paths = paths.clone(); + Box::pin(async move { + enrich_company_with_chart(&company, &pool, &paths).await + }) + }); + + // Shared counters + let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let success_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let failed_count = Arc::new(AtomicUsize::new(0)); + + // Log writer channel with batching and fsync + let (log_tx, mut log_rx) = mpsc::channel::(1000); + + // Spawn log writer task + let log_writer_handle = spawn_log_writer( + log_path, + log_rx, + Arc::clone(&processed_count), + total_companies, + FSYNC_BATCH_SIZE, + FSYNC_INTERVAL_SECS, + ); + + // Process companies concurrently with task panic isolation + let mut tasks = FuturesUnordered::new(); + let mut pending_iter = pending_companies.into_iter(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT)); + + // Initial batch of tasks + for _ in 0..CONCURRENCY_LIMIT.min(pending_count) { + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + Arc::clone(&enrichment_fn), + ); + tasks.push(task); + } + } + + // Process tasks as they complete and spawn new ones + let mut checkpoint_counter = 0; + while let Some(_result) = tasks.next().await { + // Check for shutdown + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown signal received, stopping chart enrichment").await; + break; + } + + // Checkpoint periodically + checkpoint_counter += 1; + if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { + let _ = log_tx.send(LogCommand::Checkpoint).await; + } + + // Spawn next task if available + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + Arc::clone(&enrichment_fn), + ); + tasks.push(task); + } + } + + // Signal log writer to shutdown + let _ = log_tx.send(LogCommand::Shutdown).await; + drop(log_tx); + + // Wait for log writer to finish + let _ = log_writer_handle.await; + + let final_processed = processed_count.load(Ordering::SeqCst); + let final_success = success_count.load(Ordering::SeqCst); + let final_failed = failed_count.load(Ordering::SeqCst); + + logger::log_info(&format!( + " Chart enrichment summary: {} total, {} success, {} failed", + final_processed, final_success, final_failed + )).await; + + // Mark as complete if all companies processed + if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) { + track_chart_completion(&manager, paths, step_name).await?; + logger::log_info(" ✓ Chart enrichment marked as complete with integrity tracking").await; + } + + Ok(final_success) +} + +/// Track chart enrichment completion with content hash verification +async fn track_chart_completion( + manager: &StateManager, + paths: &DataPaths, + step_name: &str, +) -> anyhow::Result<()> { + // Create content reference for all chart data + // This will hash ALL files matching the pattern: {company}/chart/data.jsonl + let content_reference = directory_reference( + paths.corporate_dir(), + Some(vec![ + "*/chart/*.jsonl".to_string(), // Main pattern for chart data + "*/chart/data.jsonl".to_string(), // Specific pattern (more precise) + ]), + Some(vec![ + "*.log".to_string(), // Exclude log files + "*.tmp".to_string(), // Exclude temp files + "*.bak".to_string(), // Exclude backup files + ]), + ); + + // Track completion with: + // - Content reference: All chart directories + // - Data stage: Data (7-day TTL by default) + // - Dependencies: Depends on cleaned companies data + manager.update_entry( + step_name.to_string(), + content_reference, + DataStage::Data, + None, // Use default TTL (7 days for Data stage) + ).await?; + + Ok(()) +} + +/// Enrich a single company with chart data +async fn enrich_company_with_chart( + company: &CompanyCrossPlatformInfo, + yahoo_pool: &Arc, + paths: &DataPaths, +) -> anyhow::Result<()> { + let ticker = match extract_first_yahoo_ticker(company) { + Some(t) => t, + None => { + return Err(anyhow::anyhow!("No valid Yahoo ticker found")); + } + }; + + // Get 1 year of daily chart data + let now = chrono::Utc::now().timestamp(); + let start = chrono::Utc + .with_ymd_and_hms(2000, 1, 1, 0, 0, 0) + .unwrap() + .timestamp(); + + let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", start, now).await?; + + // Only save if we got meaningful data + if chart_data.quotes.is_empty() { + return Err(anyhow::anyhow!("No chart data available")); + } + + // Save the chart data + save_company_data(paths, &company.name, &chart_data, "chart").await?; + + Ok(()) +} + +/// Save data to company directory (generic version) +async fn save_company_data( + paths: &DataPaths, + company_name: &str, + data: &T, + data_type: &str, +) -> anyhow::Result<()> { + use tokio::fs; + + let safe_name = sanitize_company_name(company_name); + + let company_dir = paths.corporate_dir().join(&safe_name).join(data_type); + fs::create_dir_all(&company_dir).await?; + + let data_path = company_dir.join("data.jsonl"); + let json_line = serde_json::to_string(data)?; + + let mut file = fs::File::create(&data_path).await?; + file.write_all(json_line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + file.sync_all().await?; // Ensure data is persisted + + Ok(()) +} + +// ============================================================================ +// GENERIC SHARED FUNCTIONS +// ============================================================================ + +/// Spawn log writer task (shared across all enrichment types) +fn spawn_log_writer( + log_path: std::path::PathBuf, + mut log_rx: mpsc::Receiver, + processed_count: Arc, + total_companies: usize, + fsync_batch_size: usize, + fsync_interval_secs: u64, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut log_file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await + .expect("Failed to open log file"); + + let mut write_count = 0; + let mut last_fsync = tokio::time::Instant::now(); + + while let Some(cmd) = log_rx.recv().await { + match cmd { + LogCommand::Write(entry) => { + let json_line = serde_json::to_string(&entry).expect("Serialization failed"); + log_file.write_all(json_line.as_bytes()).await.expect("Write failed"); + log_file.write_all(b"\n").await.expect("Write failed"); + + write_count += 1; + + // Batched fsync + if write_count >= fsync_batch_size + || last_fsync.elapsed().as_secs() >= fsync_interval_secs + { + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + } + } + LogCommand::Checkpoint => { + // Force fsync on checkpoint + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + + let current = processed_count.load(Ordering::SeqCst); + logger::log_info(&format!( + " Checkpoint: {}/{} companies processed", + current, total_companies + )).await; + } + LogCommand::Shutdown => { + // Final fsync before shutdown + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + break; + } + } + } + }) +} + +/// Spawn a single enrichment task with panic isolation (GENERIC VERSION) +/// +/// This generic version accepts an enrichment function as a parameter, +/// allowing it to be reused for events, options, charts, or any other enrichment type. +/// +/// # Parameters +/// - `company`: The company to enrich +/// - `yahoo_pool`: Yahoo API client pool +/// - `paths`: Data paths +/// - `processed_count`: Counter for processed companies +/// - `success_count`: Counter for successful enrichments +/// - `failed_count`: Counter for failed enrichments +/// - `log_tx`: Channel to send log commands +/// - `semaphore`: Semaphore for concurrency control +/// - `shutdown_flag`: Flag to signal shutdown +/// - `enrichment_fn`: The specific enrichment function to call (events, option, chart, etc.) +fn spawn_enrichment_task( + company: CompanyCrossPlatformInfo, + yahoo_pool: Arc, + paths: DataPaths, + processed_count: Arc, + success_count: Arc, + failed_count: Arc, + log_tx: mpsc::Sender, + semaphore: Arc, + shutdown_flag: Arc, + enrichment_fn: EnrichmentFn, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + // Acquire semaphore permit + let _permit = semaphore.acquire().await.expect("Semaphore closed"); + + // Check shutdown before processing + if shutdown_flag.load(Ordering::SeqCst) { + return; + } + + // Call the enrichment function (this is where the type-specific logic happens) + let result = enrichment_fn(company.clone(), Arc::clone(&yahoo_pool), paths).await; + + // Update counters + processed_count.fetch_add(1, Ordering::SeqCst); + + let status = match result { + Ok(_) => { + success_count.fetch_add(1, Ordering::SeqCst); + "enriched" + } + Err(e) => { + failed_count.fetch_add(1, Ordering::SeqCst); + logger::log_warn(&format!( + " Failed to enrich {}: {}", + company.name, e + )).await; + "failed" + } + }; + + // Log result + let log_entry = json!({ + "company_name": company.name, + "status": status, + "timestamp": Utc::now().to_rfc3339(), + }); + + let _ = log_tx.send(LogCommand::Write(log_entry)).await; + }) } \ No newline at end of file diff --git a/src/corporate/update_companies_enrich_option_chart.rs b/src/corporate/update_companies_enrich_option_chart.rs deleted file mode 100644 index 5a76bca..0000000 --- a/src/corporate/update_companies_enrich_option_chart.rs +++ /dev/null @@ -1,739 +0,0 @@ -// src/corporate/update_companies_enrich_option_chart.rs -use super::{types::*, helpers::*}; -use crate::config::Config; -use crate::corporate::checkpoint_helpers; -use crate::util::directories::DataPaths; -use crate::util::integrity::{DataStage, StateManager, directory_reference}; -use crate::util::logger; -use crate::scraper::yahoo::{YahooClientPool}; - -use std::result::Result::Ok; -use chrono::{TimeZone, Utc}; -use std::collections::{HashSet}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use tokio::fs::{OpenOptions}; -use tokio::io::{AsyncWriteExt}; -use futures::stream::{FuturesUnordered, StreamExt}; -use serde_json::json; -use tokio::sync::mpsc; - -/// Yahoo Option enrichment per corporate company -/// -/// # Features -/// - Graceful shutdown (abort-safe) -/// - Task panic isolation (tasks fail independently) -/// - Crash-safe persistence (checkpoint + log with fsync) -/// - Smart skip logic (only process incomplete data) -/// - Uses pending queue instead of retry mechanism -/// - Content integrity validation with hash tracking -/// -/// # Persistence Strategy -/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) -/// - Log: companies_option_updates.log (append-only updates) -/// - On restart: Load checkpoint + replay log -/// - Periodic checkpoints (every 50 companies) -/// - Batched fsync (every 10 writes or 10 seconds) -/// - Hash validation of all option data directories -pub async fn enrich_companies_with_option( - paths: &DataPaths, - _config: &Config, - yahoo_pool: Arc, - shutdown_flag: &Arc, -) -> anyhow::Result { - // Configuration constants - const CHECKPOINT_INTERVAL: usize = 50; - const FSYNC_BATCH_SIZE: usize = 10; - const FSYNC_INTERVAL_SECS: u64 = 10; - const CONCURRENCY_LIMIT: usize = 50; - - let data_path = paths.data_dir(); - - // File paths - let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); - let log_path = data_path.join("companies_option_updates.log"); - let state_path = data_path.join("state.jsonl"); - - // Check input exists - if !input_path.exists() { - logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping option enrichment").await; - return Ok(0); - } - - let manager = StateManager::new(&state_path, &data_path.to_path_buf())?; - let step_name = "yahoo_option_enrichment_complete"; - - if manager.is_step_valid(step_name).await? { - logger::log_info(" Yahoo option enrichment already completed and valid").await; - let count = checkpoint_helpers::count_enriched_companies(paths, "option").await?; - logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await; - return Ok(count); - } - - logger::log_info(" Option data needs refresh - starting enrichment").await; - - // === RECOVERY PHASE: Track enriched companies === - let enriched_companies = checkpoint_helpers::load_enrichment_progress(&log_path).await?; - - // Load all companies from input - logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await; - let companies = load_companies_from_jsonl(&input_path).await?; - let total_companies = companies.len(); - logger::log_info(&format!("Found {} companies to process", total_companies)).await; - - // Filter companies that need enrichment - let pending_companies: Vec = companies - .into_iter() - .filter(|company| !enriched_companies.contains(&company.name)) - .collect(); - - let pending_count = pending_companies.len(); - logger::log_info(&format!( - " {} already enriched, {} pending", - enriched_companies.len(), - pending_count - )).await; - - if pending_count == 0 { - logger::log_info(" ✓ All companies already enriched").await; - track_option_completion(&manager, paths, step_name).await?; - return Ok(enriched_companies.len()); - } - - // === PROCESSING PHASE: Enrich companies with option === - - // Shared counters - let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len())); - let success_count = Arc::new(AtomicUsize::new(enriched_companies.len())); - let failed_count = Arc::new(AtomicUsize::new(0)); - - // Log writer channel with batching and fsync - let (log_tx, mut log_rx) = mpsc::channel::(1000); - - // Spawn log writer task - let log_writer_handle = { - let log_path = log_path.clone(); - let processed_count = Arc::clone(&processed_count); - let total_companies = total_companies; - - tokio::spawn(async move { - let mut log_file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path) - .await - .expect("Failed to open log file"); - - let mut write_count = 0; - let mut last_fsync = tokio::time::Instant::now(); - - while let Some(cmd) = log_rx.recv().await { - match cmd { - LogCommand::Write(entry) => { - let json_line = serde_json::to_string(&entry).expect("Serialization failed"); - log_file.write_all(json_line.as_bytes()).await.expect("Write failed"); - log_file.write_all(b"\n").await.expect("Write failed"); - - write_count += 1; - - // Batched fsync - if write_count >= FSYNC_BATCH_SIZE - || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS - { - log_file.flush().await.expect("Flush failed"); - log_file.sync_all().await.expect("Fsync failed"); - write_count = 0; - last_fsync = tokio::time::Instant::now(); - } - } - LogCommand::Checkpoint => { - // Force fsync on checkpoint - log_file.flush().await.expect("Flush failed"); - log_file.sync_all().await.expect("Fsync failed"); - write_count = 0; - last_fsync = tokio::time::Instant::now(); - - let current = processed_count.load(Ordering::SeqCst); - logger::log_info(&format!( - " Checkpoint: {}/{} companies processed", - current, total_companies - )).await; - } - LogCommand::Shutdown => { - // Final fsync before shutdown - log_file.flush().await.expect("Flush failed"); - log_file.sync_all().await.expect("Fsync failed"); - break; - } - } - } - }) - }; - - // Process companies concurrently with task panic isolation - let mut tasks = FuturesUnordered::new(); - let mut pending_iter = pending_companies.into_iter(); - let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT)); - - // Initial batch of tasks - for _ in 0..CONCURRENCY_LIMIT.min(pending_count) { - if let Some(company) = pending_iter.next() { - let task = spawn_enrichment_task( - company, - Arc::clone(&yahoo_pool), - paths.clone(), - Arc::clone(&processed_count), - Arc::clone(&success_count), - Arc::clone(&failed_count), - log_tx.clone(), - Arc::clone(&semaphore), - Arc::clone(shutdown_flag), - EnrichmentType::Option, - ); - tasks.push(task); - } - } - - // Process tasks as they complete and spawn new ones - let mut checkpoint_counter = 0; - while let Some(_result) = tasks.next().await { - // Check for shutdown - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown signal received, stopping option enrichment").await; - break; - } - - // Checkpoint periodically - checkpoint_counter += 1; - if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { - let _ = log_tx.send(LogCommand::Checkpoint).await; - } - - // Spawn next task if available - if let Some(company) = pending_iter.next() { - let task = spawn_enrichment_task( - company, - Arc::clone(&yahoo_pool), - paths.clone(), - Arc::clone(&processed_count), - Arc::clone(&success_count), - Arc::clone(&failed_count), - log_tx.clone(), - Arc::clone(&semaphore), - Arc::clone(shutdown_flag), - EnrichmentType::Option, - ); - tasks.push(task); - } - } - - // Final checkpoint and shutdown - let _ = log_tx.send(LogCommand::Checkpoint).await; - let _ = log_tx.send(LogCommand::Shutdown).await; - - // Wait for log writer to finish - let _ = log_writer_handle.await; - - let final_success = success_count.load(Ordering::SeqCst); - let final_failed = failed_count.load(Ordering::SeqCst); - - logger::log_info(&format!( - " Option enrichment: {} succeeded, {} failed", - final_success, final_failed - )).await; - - // Mark as complete if no shutdown - if !shutdown_flag.load(Ordering::SeqCst) { - track_option_completion(&manager, paths, step_name).await?; - logger::log_info(" ✓ Option enrichment marked as complete with integrity tracking").await; - } - - Ok(final_success) -} - -/// Yahoo Chart enrichment per corporate company -/// -/// # Features -/// - Graceful shutdown (abort-safe) -/// - Task panic isolation (tasks fail independently) -/// - Crash-safe persistence (checkpoint + log with fsync) -/// - Smart skip logic (only process incomplete data) -/// - Uses pending queue instead of retry mechanism -/// -/// # Persistence Strategy -/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) -/// - Log: companies_chart_updates.log (append-only updates) -/// - On restart: Load checkpoint + replay log -/// - Periodic checkpoints (every 50 companies) -/// - Batched fsync (every 10 writes or 10 seconds) -pub async fn enrich_companies_with_chart( - paths: &DataPaths, - _config: &Config, - yahoo_pool: Arc, - shutdown_flag: &Arc, -) -> anyhow::Result { - // Configuration constants - const CHECKPOINT_INTERVAL: usize = 50; - const FSYNC_BATCH_SIZE: usize = 10; - const FSYNC_INTERVAL_SECS: u64 = 10; - const CONCURRENCY_LIMIT: usize = 50; - - let data_path = paths.data_dir(); - - // File paths - let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); - let log_path = data_path.join("companies_chart_updates.log"); - let state_path = data_path.join("state.jsonl"); - - // Check input exists - if !input_path.exists() { - logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping chart enrichment").await; - return Ok(0); - } - - let manager = StateManager::new(&state_path, &data_path.to_path_buf())?; - let step_name = "yahoo_chart_enrichment_complete"; - - if manager.is_step_valid(step_name).await? { - logger::log_info(" Yahoo chart enrichment already completed and valid").await; - let count = checkpoint_helpers::count_enriched_companies(paths, "chart").await?; - logger::log_info(&format!(" ✓ Found {} companies with valid chart data", count)).await; - return Ok(count); - } - - logger::log_info(" Chart data needs refresh - starting enrichment").await; - - // === RECOVERY PHASE: Track enriched companies === - let mut enriched_companies: HashSet = HashSet::new(); - - if log_path.exists() { - logger::log_info("Loading chart enrichment progress from log...").await; - let log_content = tokio::fs::read_to_string(&log_path).await?; - - for line in log_content.lines() { - if line.trim().is_empty() || !line.ends_with('}') { - continue; // Skip incomplete lines - } - - match serde_json::from_str::(line) { - Ok(entry) => { - if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) { - if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") { - enriched_companies.insert(name.to_string()); - } - } - } - Err(e) => { - logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; - } - } - } - logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await; - } - - // Load all companies from input - logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await; - let companies = load_companies_from_jsonl(&input_path).await?; - let total_companies = companies.len(); - logger::log_info(&format!("Found {} companies to process", total_companies)).await; - - // Filter companies that need enrichment - let pending_companies: Vec = companies - .into_iter() - .filter(|company| !enriched_companies.contains(&company.name)) - .collect(); - - let pending_count = pending_companies.len(); - logger::log_info(&format!( - " {} already enriched, {} pending", - enriched_companies.len(), - pending_count - )).await; - - if pending_count == 0 { - logger::log_info(" ✓ All companies already enriched").await; - track_chart_completion(&manager, paths, step_name).await?; - return Ok(enriched_companies.len()); - } - - // === PROCESSING PHASE: Enrich companies with chart === - - // Shared counters - let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len())); - let success_count = Arc::new(AtomicUsize::new(enriched_companies.len())); - let failed_count = Arc::new(AtomicUsize::new(0)); - - // Log writer channel with batching and fsync - let (log_tx, mut log_rx) = mpsc::channel::(1000); - - // Spawn log writer task - let log_writer_handle = { - let log_path = log_path.clone(); - let processed_count = Arc::clone(&processed_count); - let total_companies = total_companies; - - tokio::spawn(async move { - let mut log_file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path) - .await - .expect("Failed to open log file"); - - let mut write_count = 0; - let mut last_fsync = tokio::time::Instant::now(); - - while let Some(cmd) = log_rx.recv().await { - match cmd { - LogCommand::Write(entry) => { - let json_line = serde_json::to_string(&entry).expect("Serialization failed"); - log_file.write_all(json_line.as_bytes()).await.expect("Write failed"); - log_file.write_all(b"\n").await.expect("Write failed"); - - write_count += 1; - - // Batched fsync - if write_count >= FSYNC_BATCH_SIZE - || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS - { - log_file.flush().await.expect("Flush failed"); - log_file.sync_all().await.expect("Fsync failed"); - write_count = 0; - last_fsync = tokio::time::Instant::now(); - } - } - LogCommand::Checkpoint => { - // Force fsync on checkpoint - log_file.flush().await.expect("Flush failed"); - log_file.sync_all().await.expect("Fsync failed"); - write_count = 0; - last_fsync = tokio::time::Instant::now(); - - let current = processed_count.load(Ordering::SeqCst); - logger::log_info(&format!( - " Checkpoint: {}/{} companies processed", - current, total_companies - )).await; - } - LogCommand::Shutdown => { - // Final fsync before shutdown - log_file.flush().await.expect("Flush failed"); - log_file.sync_all().await.expect("Fsync failed"); - break; - } - } - } - }) - }; - - // Process companies concurrently with task panic isolation - let mut tasks = FuturesUnordered::new(); - let mut pending_iter = pending_companies.into_iter(); - let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT)); - - // Initial batch of tasks - for _ in 0..CONCURRENCY_LIMIT.min(pending_count) { - if let Some(company) = pending_iter.next() { - let task = spawn_enrichment_task( - company, - Arc::clone(&yahoo_pool), - paths.clone(), - Arc::clone(&processed_count), - Arc::clone(&success_count), - Arc::clone(&failed_count), - log_tx.clone(), - Arc::clone(&semaphore), - Arc::clone(shutdown_flag), - EnrichmentType::Chart, - ); - tasks.push(task); - } - } - - // Process tasks as they complete and spawn new ones - let mut checkpoint_counter = 0; - while let Some(_result) = tasks.next().await { - // Check for shutdown - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown signal received, stopping chart enrichment").await; - break; - } - - // Checkpoint periodically - checkpoint_counter += 1; - if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { - let _ = log_tx.send(LogCommand::Checkpoint).await; - } - - // Spawn next task if available - if let Some(company) = pending_iter.next() { - let task = spawn_enrichment_task( - company, - Arc::clone(&yahoo_pool), - paths.clone(), - Arc::clone(&processed_count), - Arc::clone(&success_count), - Arc::clone(&failed_count), - log_tx.clone(), - Arc::clone(&semaphore), - Arc::clone(shutdown_flag), - EnrichmentType::Chart, - ); - tasks.push(task); - } - } - - // Final checkpoint and shutdown - let _ = log_tx.send(LogCommand::Checkpoint).await; - let _ = log_tx.send(LogCommand::Shutdown).await; - - // Wait for log writer to finish - let _ = log_writer_handle.await; - - let final_success = success_count.load(Ordering::SeqCst); - let final_failed = failed_count.load(Ordering::SeqCst); - - logger::log_info(&format!( - " Chart enrichment: {} succeeded, {} failed", - final_success, final_failed - )).await; - - // Mark as complete if no shutdown - if !shutdown_flag.load(Ordering::SeqCst) { - track_chart_completion(&manager, paths, step_name).await?; - logger::log_info(" ✓ Chart enrichment marked as complete with integrity tracking").await; - } - - Ok(final_success) -} - -/// Type of enrichment being performed -#[derive(Clone, Copy)] -enum EnrichmentType { - Option, - Chart, -} - -/// Spawn an enrichment task with panic isolation -fn spawn_enrichment_task( - company: CompanyCrossPlatformInfo, - yahoo_pool: Arc, - paths: DataPaths, - processed_count: Arc, - success_count: Arc, - failed_count: Arc, - log_tx: mpsc::Sender, - semaphore: Arc, - shutdown_flag: Arc, - enrichment_type: EnrichmentType, -) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - // Acquire semaphore permit - let _permit = semaphore.acquire().await.expect("Semaphore closed"); - - // Check shutdown before processing - if shutdown_flag.load(Ordering::SeqCst) { - return; - } - - // Perform enrichment (panic-isolated) - let result = match enrichment_type { - EnrichmentType::Option => { - enrich_company_with_option(&company, &yahoo_pool, &paths).await - } - EnrichmentType::Chart => { - enrich_company_with_chart(&company, &yahoo_pool, &paths).await - } - }; - - // Update counters - processed_count.fetch_add(1, Ordering::SeqCst); - - let status = match result { - Ok(_) => { - success_count.fetch_add(1, Ordering::SeqCst); - "enriched" - } - Err(e) => { - failed_count.fetch_add(1, Ordering::SeqCst); - logger::log_warn(&format!( - " Failed to enrich {}: {}", - company.name, e - )).await; - "failed" - } - }; - - // Log result - let log_entry = json!({ - "company_name": company.name, - "status": status, - "timestamp": Utc::now().to_rfc3339(), - }); - - let _ = log_tx.send(LogCommand::Write(log_entry)).await; - }) -} - -/// Enrich a single company with option data -async fn enrich_company_with_option( - company: &CompanyCrossPlatformInfo, - yahoo_pool: &Arc, - paths: &DataPaths, -) -> anyhow::Result<()> { - let ticker = match extract_first_yahoo_ticker(company) { - Some(t) => t, - None => { - return Err(anyhow::anyhow!("No valid Yahoo ticker found")); - } - }; - - // Get option data for all available expiration dates - let option_data = yahoo_pool.get_option_data(&ticker, None).await?; - - // Only save if we got meaningful data - if option_data.option.is_empty() { - return Err(anyhow::anyhow!("No option data available")); - } - - // Save the option data - save_company_data(paths, &company.name, &option_data, "option").await?; - - Ok(()) -} - -/// Enrich a single company with chart data -async fn enrich_company_with_chart( - company: &CompanyCrossPlatformInfo, - yahoo_pool: &Arc, - paths: &DataPaths, -) -> anyhow::Result<()> { - let ticker = match extract_first_yahoo_ticker(company) { - Some(t) => t, - None => { - return Err(anyhow::anyhow!("No valid Yahoo ticker found")); - } - }; - - // Get 1 year of daily chart data - let now = chrono::Utc::now().timestamp(); - let start = chrono::Utc - .with_ymd_and_hms(2000, 1, 1, 0, 0, 0) - .unwrap() - .timestamp(); - - let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", start, now).await?; - - // Only save if we got meaningful data - if chart_data.quotes.is_empty() { - return Err(anyhow::anyhow!("No chart data available")); - } - - // Save the chart data - save_company_data(paths, &company.name, &chart_data, "chart").await?; - - Ok(()) -} - -/// Save data to company directory -async fn save_company_data( - paths: &DataPaths, - company_name: &str, - data: &T, - data_type: &str, -) -> anyhow::Result<()> { - use tokio::fs; - - let safe_name = sanitize_company_name(company_name); - - let company_dir = paths.corporate_dir().join(&safe_name).join(data_type); - fs::create_dir_all(&company_dir).await?; - - let data_path = company_dir.join("data.jsonl"); - let json_line = serde_json::to_string(data)?; - - let mut file = fs::File::create(&data_path).await?; - file.write_all(json_line.as_bytes()).await?; - file.write_all(b"\n").await?; - file.flush().await?; - file.sync_all().await?; // Ensure data is persisted - - Ok(()) -} - -/// Log command enum -enum LogCommand { - Write(serde_json::Value), - Checkpoint, - Shutdown, -} - -/// Track option enrichment completion with content hash verification -async fn track_option_completion( - manager: &StateManager, - paths: &DataPaths, - step_name: &str, -) -> anyhow::Result<()> { - // Create content reference for all option data - // This will hash ALL files matching the pattern: {company}/option/data.jsonl - let content_reference = directory_reference( - paths.corporate_dir(), - Some(vec![ - "*/option/*.jsonl".to_string(), // Main pattern for option data - "*/option/data.jsonl".to_string(), // Specific pattern (more precise) - ]), - Some(vec![ - "*.log".to_string(), // Exclude log files - "*.tmp".to_string(), // Exclude temp files - "*.bak".to_string(), // Exclude backup files - ]), - ); - - // Track completion with: - // - Content reference: All option directories - // - Data stage: Data (7-day TTL by default) - // - Dependencies: Depends on cleaned companies data - manager.update_entry( - step_name.to_string(), - content_reference, - DataStage::Data, - None, // Use default TTL (7 days for Data stage) - ).await?; - - Ok(()) -} - -/// Track chart enrichment completion with content hash verification -async fn track_chart_completion( - manager: &StateManager, - paths: &DataPaths, - step_name: &str, -) -> anyhow::Result<()> { - // Create content reference for all chart data - // This will hash ALL files matching the pattern: {company}/chart/data.jsonl - let content_reference = directory_reference( - paths.corporate_dir(), - Some(vec![ - "*/chart/*.jsonl".to_string(), // Main pattern for chart data - "*/chart/data.jsonl".to_string(), // Specific pattern (more precise) - ]), - Some(vec![ - "*.log".to_string(), // Exclude log files - "*.tmp".to_string(), // Exclude temp files - "*.bak".to_string(), // Exclude backup files - ]), - ); - - // Track completion with: - // - Content reference: All chart directories - // - Data stage: Data (7-day TTL by default) - // - Dependencies: Depends on cleaned companies data - manager.update_entry( - step_name.to_string(), - content_reference, - DataStage::Data, - None, // Use default TTL (7 days for Data stage) - ).await?; - - Ok(()) -} \ No newline at end of file diff --git a/src/economic/scraper.rs b/src/economic/scraper.rs index 7feaebf..b6e5c15 100644 --- a/src/economic/scraper.rs +++ b/src/economic/scraper.rs @@ -8,7 +8,30 @@ const EXTRACTION_JS: &str = include_str!("extraction_script.js"); pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> { client.goto("https://www.finanzen.net/termine/wirtschaftsdaten/").await?; + dismiss_overlays(client).await?; + Ok(()) +} +pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> { + for _ in 0..10 { + let removed: bool = client + .execute( + r#"(() => { + const iframe = document.querySelector('iframe[title="Contentpass First Layer"]'); + if (iframe && iframe.parentNode) { + iframe.parentNode.removeChild(iframe); + return true; + } + return false; + })()"#, + vec![], + ) + .await? + .as_bool() + .unwrap_or(false); + if removed { break; } + sleep(Duration::from_millis(500)).await; + } Ok(()) } diff --git a/src/main.rs b/src/main.rs index a53c93f..b3dc936 100644 --- a/src/main.rs +++ b/src/main.rs @@ -252,12 +252,28 @@ async fn create_state_files(paths: &DataPaths) -> Result<()> { Ok(()) } -fn visualize_checkpoint_dependencies(paths: &DataPaths) -> Result<()> { - let manager = StateManager::new(&paths.data_dir().join("state.jsonl"), &paths.data_dir().to_path_buf())?; - manager.print_dependency_graph(); - let dot = manager.get_dependency_config().to_dot(); - std::fs::write(paths.logs_dir().join("checkpoint_dependencies.dot"), dot)?; - Ok(()) +async fn visualize_checkpoint_dependencies(paths: &DataPaths) -> Result<()> { + // Add more detailed error handling + match StateManager::new( + &paths.data_dir().join("state.jsonl"), + &paths.data_dir().to_path_buf() + ) { + Ok(manager) => { + logger::log_info("✓ Dependency configuration loaded successfully").await; + manager.print_dependency_graph(); + + let dot = manager.get_dependency_config().to_dot(); + let dot_path = paths.logs_dir().join("checkpoint_dependencies.dot"); + std::fs::write(&dot_path, dot)?; + + logger::log_info(&format!("✓ DOT file written to: {}", dot_path.display())).await; + Ok(()) + } + Err(e) => { + logger::log_error(&format!("✗ Failed to load dependency config: {}", e)).await; + Err(e) + } + } } // ============================================================================ @@ -273,7 +289,7 @@ async fn main() -> Result<()> { start_docker_desktop().await; cleanup_all_proxy_containers().await.ok(); create_state_files(&paths).await.ok(); - visualize_checkpoint_dependencies(&paths).ok(); + visualize_checkpoint_dependencies(&paths).await.ok(); let config = Config::load().unwrap_or_else(|_| { eprintln!("Using default configuration");