diff --git a/src/corporate/atomic_writer.rs b/src/corporate/atomic_writer.rs deleted file mode 100644 index a6fe657..0000000 --- a/src/corporate/atomic_writer.rs +++ /dev/null @@ -1,346 +0,0 @@ -// src/corporate/atomic_writer.rs -// -// Atomic JSONL writer that prevents partial/corrupted results from being written - -use anyhow::Result; -use serde::Serialize; -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use tokio::fs::{File, OpenOptions}; -use tokio::io::AsyncWriteExt; -use tokio::sync::mpsc; - -/// Command to write or validate data -#[derive(Debug)] -pub enum WriteCommand { - /// Stage a result for writing (held in memory until committed) - Stage { id: String, data: T }, - - /// Commit staged result to disk (atomic write) - Commit { id: String }, - - /// Rollback staged result (discard without writing) - Rollback { id: String }, - - /// Commit all pending staged results and flush - CommitAll, - - /// Shutdown writer gracefully (only commits valid staged results) - Shutdown, -} - -/// Result of a write operation -#[derive(Debug)] -pub struct WriteResult { - pub id: String, - pub success: bool, - pub error: Option, -} - -/// Atomic writer that prevents partial results from being written -pub struct AtomicJsonlWriter { - file: File, - staged: HashMap, - committed_count: usize, - rollback_count: usize, -} - -impl AtomicJsonlWriter { - pub async fn new(path: PathBuf) -> Result { - // Ensure parent directory exists - if let Some(parent) = path.parent() { - tokio::fs::create_dir_all(parent).await?; - } - - let file = OpenOptions::new() - .create(true) - .append(true) - .open(&path) - .await?; - - crate::util::logger::log_info(&format!( - "Atomic writer initialized: {:?}", - path - )).await; - - Ok(Self { - file, - staged: HashMap::new(), - committed_count: 0, - rollback_count: 0, - }) - } - - /// Stage data for writing (held in memory, not yet written) - pub async fn stage(&mut self, id: String, data: T) { - crate::util::logger::log_info(&format!( - "Staging result for: {} (total staged: {})", - id, - self.staged.len() + 1 - )).await; - - self.staged.insert(id, data); - } - - /// Commit a staged result to disk (atomic write) - pub async fn commit(&mut self, id: &str) -> Result<()> { - if let Some(data) = self.staged.remove(id) { - // Serialize to JSON - let json_line = serde_json::to_string(&data)?; - - // Write atomically (single syscall) - self.file.write_all(json_line.as_bytes()).await?; - self.file.write_all(b"\n").await?; - self.file.flush().await?; - - self.committed_count += 1; - - crate::util::logger::log_info(&format!( - "✓ Committed result for: {} (total committed: {})", - id, self.committed_count - )).await; - - Ok(()) - } else { - Err(anyhow::anyhow!("No staged result found for id: {}", id)) - } - } - - /// Rollback a staged result (discard without writing) - pub async fn rollback(&mut self, id: &str) { - if self.staged.remove(id).is_some() { - self.rollback_count += 1; - - crate::util::logger::log_warn(&format!( - "⚠ Rolled back result for: {} (total rollbacks: {})", - id, self.rollback_count - )).await; - } - } - - /// Commit all staged results - pub async fn commit_all(&mut self) -> Result { - let ids: Vec = self.staged.keys().cloned().collect(); - let mut committed = 0; - - for id in ids { - if let Ok(()) = self.commit(&id).await { - committed += 1; - } - } - - Ok(committed) - } - - /// Rollback all staged results (discard everything) - pub async fn rollback_all(&mut self) -> usize { - let count = self.staged.len(); - self.staged.clear(); - self.rollback_count += count; - - crate::util::logger::log_warn(&format!( - "⚠ Rolled back all {} staged results", - count - )).await; - - count - } - - /// Get statistics - pub fn stats(&self) -> WriterStats { - WriterStats { - staged_count: self.staged.len(), - committed_count: self.committed_count, - rollback_count: self.rollback_count, - } - } -} - -#[derive(Debug, Clone)] -pub struct WriterStats { - pub staged_count: usize, - pub committed_count: usize, - pub rollback_count: usize, -} - -/// Managed writer service that runs in its own task -pub struct AtomicWriterService { - rx: mpsc::UnboundedReceiver>, - writer: AtomicJsonlWriter, - shutdown_flag: Arc, -} - -impl AtomicWriterService { - pub async fn new( - path: PathBuf, - rx: mpsc::UnboundedReceiver>, - shutdown_flag: Arc, - ) -> Result { - let writer = AtomicJsonlWriter::new(path).await?; - - Ok(Self { - rx, - writer, - shutdown_flag, - }) - } - - /// Main service loop - pub async fn run(mut self) { - crate::util::logger::log_info("Atomic writer service started").await; - - while let Some(cmd) = self.rx.recv().await { - // Check for shutdown flag - if self.shutdown_flag.load(Ordering::SeqCst) { - crate::util::logger::log_warn( - "Shutdown detected - processing only Commit/Rollback commands" - ).await; - - // Only process commit/rollback commands during shutdown - match cmd { - WriteCommand::Commit { id } => { - if let Err(e) = self.writer.commit(&id).await { - crate::util::logger::log_error(&format!( - "Failed to commit {}: {}", - id, e - )).await; - } - } - WriteCommand::Rollback { id } => { - self.writer.rollback(&id).await; - } - WriteCommand::CommitAll => { - match self.writer.commit_all().await { - Ok(count) => { - crate::util::logger::log_info(&format!( - "Committed {} results during shutdown", - count - )).await; - } - Err(e) => { - crate::util::logger::log_error(&format!( - "Failed to commit all: {}", - e - )).await; - } - } - } - WriteCommand::Shutdown => break, - _ => { - // Ignore Stage commands during shutdown - crate::util::logger::log_warn( - "Ignoring new Stage command during shutdown" - ).await; - } - } - continue; - } - - // Normal operation - match cmd { - WriteCommand::Stage { id, data } => { - self.writer.stage(id, data).await; - } - WriteCommand::Commit { id } => { - if let Err(e) = self.writer.commit(&id).await { - crate::util::logger::log_error(&format!( - "Failed to commit {}: {}", - id, e - )).await; - } - } - WriteCommand::Rollback { id } => { - self.writer.rollback(&id).await; - } - WriteCommand::CommitAll => { - match self.writer.commit_all().await { - Ok(count) => { - crate::util::logger::log_info(&format!( - "Committed all {} staged results", - count - )).await; - } - Err(e) => { - crate::util::logger::log_error(&format!( - "Failed to commit all: {}", - e - )).await; - } - } - } - WriteCommand::Shutdown => break, - } - } - - // Final shutdown - rollback any remaining staged items - let stats = self.writer.stats(); - if stats.staged_count > 0 { - crate::util::logger::log_warn(&format!( - "⚠ Shutdown with {} uncommitted results - rolling back", - stats.staged_count - )).await; - - self.writer.rollback_all().await; - } - - crate::util::logger::log_info(&format!( - "Atomic writer service stopped. Final stats: {} committed, {} rolled back", - stats.committed_count, - stats.rollback_count - )).await; - } -} - -/// Handle for sending write commands -#[derive(Clone)] -pub struct AtomicWriterHandle { - tx: mpsc::UnboundedSender>, -} - -impl AtomicWriterHandle { - pub fn new(tx: mpsc::UnboundedSender>) -> Self { - Self { tx } - } - - /// Stage data for writing (does not write immediately) - pub fn stage(&self, id: String, data: T) { - let _ = self.tx.send(WriteCommand::Stage { id, data }); - } - - /// Commit staged data to disk - pub fn commit(&self, id: String) { - let _ = self.tx.send(WriteCommand::Commit { id }); - } - - /// Rollback staged data (discard) - pub fn rollback(&self, id: String) { - let _ = self.tx.send(WriteCommand::Rollback { id }); - } - - /// Commit all staged data - pub fn commit_all(&self) { - let _ = self.tx.send(WriteCommand::CommitAll); - } - - /// Shutdown writer gracefully - pub fn shutdown(&self) { - let _ = self.tx.send(WriteCommand::Shutdown); - } -} - -/// Create atomic writer service -pub async fn create_atomic_writer( - path: PathBuf, - shutdown_flag: Arc, -) -> Result<(AtomicWriterHandle, tokio::task::JoinHandle<()>)> { - let (tx, rx) = mpsc::unbounded_channel(); - - let service = AtomicWriterService::new(path, rx, shutdown_flag).await?; - let handle = tokio::spawn(async move { - service.run().await; - }); - - Ok((AtomicWriterHandle::new(tx), handle)) -} \ No newline at end of file diff --git a/src/corporate/checkpoint_helpers.rs b/src/corporate/checkpoint_helpers.rs new file mode 100644 index 0000000..9ce2eed --- /dev/null +++ b/src/corporate/checkpoint_helpers.rs @@ -0,0 +1,249 @@ +// src/corporate/checkpoint_helpers.rs +//! Shared helpers for checkpoint-based recovery and logging +//! +//! This module extracts common patterns used across multiple update modules +//! to reduce code duplication and improve maintainability. + +use super::types::CompanyCrossPlatformInfo; +use crate::util::logger; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use serde::Serialize; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncWriteExt}; +use anyhow::Result; +use tokio::sync::mpsc; + +/// Load companies from checkpoint and replay log for recovery +/// +/// This function implements the checkpoint + write-ahead log pattern: +/// 1. Loads the main checkpoint file +/// 2. Replays any pending updates from the log file +/// 3. Returns the merged state +pub async fn load_checkpoint_with_log( + checkpoint_path: P1, + log_path: P2, + checkpoint_desc: &str, +) -> Result> +where + P1: AsRef, + P2: AsRef, +{ + let checkpoint_path = checkpoint_path.as_ref(); + let log_path = log_path.as_ref(); + + let mut companies: HashMap = HashMap::new(); + + // Load checkpoint if it exists + if checkpoint_path.exists() { + logger::log_info(&format!("Loading checkpoint from {}...", checkpoint_desc)).await; + let content = tokio::fs::read_to_string(checkpoint_path).await?; + + for line in content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(company) => { + companies.insert(company.name.clone(), company); + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; + } + } + } + logger::log_info(&format!("Loaded checkpoint with {} companies", companies.len())).await; + } + + // Replay log if it exists + if log_path.exists() { + logger::log_info("Replaying update log...").await; + let log_content = tokio::fs::read_to_string(log_path).await?; + let mut replayed = 0; + + for line in log_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(company) => { + companies.insert(company.name.clone(), company); + replayed += 1; + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; + } + } + } + + if replayed > 0 { + logger::log_info(&format!("Replayed {} updates from log", replayed)).await; + } + } + + Ok(companies) +} + +/// Consolidate log into checkpoint and clear log +/// +/// Atomically writes all companies to a new checkpoint file and removes the log. +/// Uses atomic rename to ensure crash safety. +pub async fn consolidate_checkpoint( + checkpoint_path: P1, + log_path: P2, + companies: &HashMap, +) -> Result<()> +where + P1: AsRef, + P2: AsRef, +{ + let checkpoint_path = checkpoint_path.as_ref(); + let log_path = log_path.as_ref(); + + logger::log_info("Consolidating update log into checkpoint...").await; + + let temp_checkpoint = checkpoint_path.with_extension("tmp"); + let mut temp_file = File::create(&temp_checkpoint).await?; + + for company in companies.values() { + let json_line = serde_json::to_string(company)?; + temp_file.write_all(json_line.as_bytes()).await?; + temp_file.write_all(b"\n").await?; + } + + temp_file.flush().await?; + temp_file.sync_data().await?; + drop(temp_file); + + tokio::fs::rename(&temp_checkpoint, checkpoint_path).await?; + + // Remove log after successful consolidation + if log_path.exists() { + tokio::fs::remove_file(log_path).await.ok(); + } + + logger::log_info(&format!("✓ Consolidated {} companies", companies.len())).await; + + Ok(()) +} + +/// Check if log file has content +pub async fn log_has_content>(log_path: P) -> bool { + if let Ok(metadata) = tokio::fs::metadata(log_path.as_ref()).await { + metadata.len() > 0 + } else { + false + } +} + +/// Load enrichment progress from log file +/// +/// Used by enrichment functions to track which companies have already been processed. +/// Parses log entries with format: {"company_name": "...", "status": "enriched", ...} +pub async fn load_enrichment_progress

( + log_path: P, +) -> Result> +where + P: AsRef, +{ + let mut enriched_companies = std::collections::HashSet::new(); + + if !log_path.as_ref().exists() { + return Ok(enriched_companies); + } + + logger::log_info("Loading enrichment progress from log...").await; + let log_content = tokio::fs::read_to_string(log_path.as_ref()).await?; + + for line in log_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(entry) => { + if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) { + if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") { + enriched_companies.insert(name.to_string()); + } + } + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; + } + } + } + + logger::log_info(&format!( + "Loaded {} enriched companies from log", + enriched_companies.len() + )).await; + + Ok(enriched_companies) +} + +/// Count enriched companies by checking for data files +/// +/// Walks through the corporate directory and counts companies that have +/// a data file in the specified subdirectory (e.g., "events", "options", "chart"). +pub async fn count_enriched_companies( + paths: &crate::util::directories::DataPaths, + data_type: &str, +) -> Result { + let corporate_dir = paths.corporate_dir(); + + if !corporate_dir.exists() { + return Ok(0); + } + + let mut count = 0; + let mut entries = tokio::fs::read_dir(&corporate_dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + let data_dir = path.join(data_type); + let data_file = data_dir.join("data.jsonl"); + + if data_file.exists() { + count += 1; + } + } + } + + Ok(count) +} + +/// Mark a processing step as complete in state file +/// +/// Appends a completion marker to the state file with timestamp. +/// Used to track which processing steps have been completed. +pub async fn mark_step_complete>( + state_path: P, + completion_key: &str, +) -> Result<()> { + use tokio::fs::OpenOptions; + + let completion_entry = serde_json::json!({ + completion_key: true, + "completed_at": chrono::Utc::now().to_rfc3339(), + }); + + let mut state_file = OpenOptions::new() + .create(true) + .append(true) + .open(state_path.as_ref()) + .await?; + + let state_line = serde_json::to_string(&completion_entry)?; + state_file.write_all(state_line.as_bytes()).await?; + state_file.write_all(b"\n").await?; + state_file.flush().await?; + state_file.sync_all().await?; + + Ok(()) +} \ No newline at end of file diff --git a/src/corporate/helpers.rs b/src/corporate/helpers.rs index c643a7f..de2c031 100644 --- a/src/corporate/helpers.rs +++ b/src/corporate/helpers.rs @@ -80,4 +80,51 @@ pub fn random_range(min: u64, max: u64) -> u64 { pub fn choose_random(items: &[T]) -> T { let mut rng = StdRng::from_rng(&mut rand::rng()); items.choose(&mut rng).unwrap().clone() +} + +/// Extract first valid Yahoo ticker from company +pub fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { + for tickers in company.isin_tickers_map.values() { + for ticker in tickers { + if ticker.starts_with("YAHOO:") + && ticker != "YAHOO:NO_RESULTS" + && ticker != "YAHOO:ERROR" + { + return Some(ticker.trim_start_matches("YAHOO:").to_string()); + } + } + } + None +} + +/// Sanitize company name for file system use +pub fn sanitize_company_name(name: &str) -> String { + name.replace("/", "_") + .replace("\\", "_") + .replace(":", "_") + .replace("*", "_") + .replace("?", "_") + .replace("\"", "_") + .replace("<", "_") + .replace(">", "_") + .replace("|", "_") +} + +/// Load companies from JSONL file +pub async fn load_companies_from_jsonl( + path: &std::path::Path +) -> anyhow::Result> { + let content = tokio::fs::read_to_string(path).await?; + let mut companies = Vec::new(); + + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + if let Ok(company) = serde_json::from_str::(line) { + companies.push(company); + } + } + + Ok(companies) } \ No newline at end of file diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 83e3d77..35736c5 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -7,7 +7,7 @@ pub mod aggregation; pub mod openfigi; pub mod yahoo_company_extraction; pub mod page_validation; -pub mod atomic_writer; +pub mod checkpoint_helpers; // Corporate update modules pub mod update; diff --git a/src/corporate/update.rs b/src/corporate/update.rs index b27201e..5144235 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,6 +1,7 @@ // src/corporate/update.rs use super::{scraper::*, openfigi::*}; use crate::config::Config; +use crate::check_shutdown; use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel; use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data}; use crate::corporate::update_companies_enrich::enrich_companies_with_events; @@ -26,6 +27,8 @@ pub async fn run_full_update( let paths = DataPaths::new(".")?; + check_shutdown!(shutdown_flag); + logger::log_info("Step 1: Downloading GLEIF CSV...").await; let gleif_csv_path = match download_isin_lei_csv().await? { Some(p) => { @@ -38,70 +41,49 @@ pub async fn run_full_update( } }; - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after GLEIF download").await; - return Ok(()); + check_shutdown!(shutdown_flag); + + logger::log_info("Step 2: Loading OpenFIGI metadata...").await; + load_figi_type_lists().await.ok(); + logger::log_info(" ✓ OpenFIGI metadata loaded").await; + + check_shutdown!(shutdown_flag); + + logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; + let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?; + + if !all_mapped { + logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await; + } else { + logger::log_info(" ✓ All LEIs successfully mapped").await; } - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 2: Loading OpenFIGI metadata...").await; - load_figi_type_lists().await.ok(); - logger::log_info(" ✓ OpenFIGI metadata loaded").await; + check_shutdown!(shutdown_flag); + + logger::log_info("Step 4: Building securities map (streaming)...").await; + let date_dir = find_most_recent_figi_date_dir(&paths).await?; + + if let Some(date_dir) = date_dir { + logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; + load_or_build_all_securities(&date_dir).await?; + logger::log_info(" ✓ Securities map updated").await; } else { - logger::log_warn("Shutdown detected, skipping event index build").await; + logger::log_warn(" ✗ No FIGI data directory found").await; } - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 2: Loading OpenFIGI metadata...").await; - load_figi_type_lists().await.ok(); - logger::log_info(" ✓ OpenFIGI metadata loaded").await; - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } + check_shutdown!(shutdown_flag); - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await; - let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?; - - if !all_mapped { - logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await; - } else { - logger::log_info(" ✓ All LEIs successfully mapped").await; - } - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } + logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; + let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, config, &None).await?; + logger::log_info(&format!(" ✓ Saved {} companies", count)).await; - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 4: Building securities map (streaming)...").await; - let date_dir = find_most_recent_figi_date_dir(&paths).await?; - - if let Some(date_dir) = date_dir { - logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; - load_or_build_all_securities(&date_dir).await?; - logger::log_info(" ✓ Securities map updated").await; - } else { - logger::log_warn(" ✗ No FIGI data directory found").await; - } - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } + check_shutdown!(shutdown_flag); - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; - let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, config, &None).await?; - logger::log_info(&format!(" ✓ Saved {} companies", count)).await; - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } + logger::log_info("Step 6: Cleansing companies with missing essential data...").await; + let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?; + logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await; - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 6: Cleansing companies with missing essential data...").await; - let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?; - logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await; - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } + check_shutdown!(shutdown_flag); let proxy_pool = pool.get_proxy_pool() .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?; @@ -110,60 +92,41 @@ pub async fn run_full_update( let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?); logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await; + check_shutdown!(shutdown_flag); - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 7: Cleansing companies with too low profile (with abort-safe persistence)...").await; - let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; - logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await; - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } + logger::log_info("Step 7: Cleansing companies with too low profile (with abort-safe persistence)...").await; + let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await; - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 8: Enriching companies with Yahoo Events (with abort-safe persistence)...").await; - let enriched_count = enrich_companies_with_events(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; - logger::log_info(&format!(" ✓ {} companies enriched with event data", enriched_count)).await; - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } + check_shutdown!(shutdown_flag); + + logger::log_info("Step 8: Enriching companies with Yahoo Events (with abort-safe persistence)...").await; + let enriched_count = enrich_companies_with_events(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies enriched with event data", enriched_count)).await; - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await; - let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; - logger::log_info(&format!(" ✓ {} companies enriched with options data", options_count)).await; - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } + check_shutdown!(shutdown_flag); - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 10: Enriching companies with Yahoo Chart (with abort-safe persistence)...").await; - let chart_count = enrich_companies_with_chart(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; - logger::log_info(&format!(" ✓ {} companies enriched with chart data", chart_count)).await; - } else { - logger::log_warn("Shutdown detected, skipping event index build").await; - } + logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await; + let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies enriched with options data", options_count)).await; - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 11: Collecting FX rates...").await; - - let proxy_pool = pool.get_proxy_pool() - .ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must have proxy rotation"))?; - - let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?); - - let fx_count = collect_fx_rates(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; - logger::log_info(&format!(" ✓ Collected {} FX rates", fx_count)).await; - } else { - logger::log_warn("Shutdown detected, skipping FX rates collection").await; - } + check_shutdown!(shutdown_flag); - if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 12: Collecting exchange information...").await; - let exchange_count = collect_and_save_exchanges(&paths).await?; - logger::log_info(&format!(" ✓ Collected {} exchanges", exchange_count)).await; - } else { - logger::log_warn("Shutdown detected, skipping exchange collection").await; - } + logger::log_info("Step 10: Enriching companies with Yahoo Chart (with abort-safe persistence)...").await; + let chart_count = enrich_companies_with_chart(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies enriched with chart data", chart_count)).await; + + check_shutdown!(shutdown_flag); + + logger::log_info("Step 11: Collecting FX rates...").await; + let fx_count = collect_fx_rates(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ Collected {} FX rates", fx_count)).await; + + check_shutdown!(shutdown_flag); + + logger::log_info("Step 12: Collecting exchange information...").await; + let exchange_count = collect_and_save_exchanges(&paths).await?; + logger::log_info(&format!(" ✓ Collected {} exchanges", exchange_count)).await; logger::log_info("=== Corporate update complete === ").await; Ok(()) diff --git a/src/corporate/update_companies.rs b/src/corporate/update_companies.rs index 81c6384..d79b170 100644 --- a/src/corporate/update_companies.rs +++ b/src/corporate/update_companies.rs @@ -4,6 +4,7 @@ use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; use crate::scraper::hard_reset::perform_hard_reset; +use crate::corporate::checkpoint_helpers; use crate::config::Config; use tokio::sync::mpsc; @@ -120,56 +121,11 @@ pub async fn build_companies_jsonl_streaming_parallel( } // === RECOVERY PHASE: Load checkpoint + replay log === - let mut existing_companies: HashMap = HashMap::new(); - let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); - - if companies_path.exists() { - logger::log_info("Loading checkpoint from companies.jsonl...").await; - let existing_content = tokio::fs::read_to_string(&companies_path).await?; - - for line in existing_content.lines() { - if line.trim().is_empty() || !line.ends_with('}') { - continue; // Skip incomplete lines - } - - match serde_json::from_str::(line) { - Ok(company) => { - processed_names.insert(company.name.clone()); - existing_companies.insert(company.name.clone(), company); - } - Err(e) => { - logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; - } - } - } - logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; - } - - if log_path.exists() { - logger::log_info("Replaying update log...").await; - let log_content = tokio::fs::read_to_string(&log_path).await?; - let mut replayed = 0; - - for line in log_content.lines() { - if line.trim().is_empty() || !line.ends_with('}') { - continue; // Skip incomplete lines - } - - match serde_json::from_str::(line) { - Ok(company) => { - processed_names.insert(company.name.clone()); - existing_companies.insert(company.name.clone(), company); - replayed += 1; - } - Err(e) => { - logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; - } - } - } - if replayed > 0 { - logger::log_info(&format!("Replayed {} updates from log", replayed)).await; - } - } + let existing_companies = checkpoint_helpers::load_checkpoint_with_log( + &companies_path, + &log_path, + "companies.jsonl" + ).await?; // === SETUP LOG WRITER TASK === let (write_tx, mut write_rx) = mpsc::channel::(1000); diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index d4b1c93..a495761 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -1,6 +1,7 @@ // src/corporate/update_companies_cleanse.rs use super::{helpers::*, types::*}; use crate::config::Config; +use crate::corporate::checkpoint_helpers; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; @@ -271,29 +272,8 @@ pub async fn companies_yahoo_cleansed_low_profile( logger::log_info(" ✓ All companies already processed").await; // Consolidate log into checkpoint before exiting - if log_path.exists() { - let log_metadata = tokio::fs::metadata(&log_path).await.ok(); - if log_metadata.map(|m| m.len() > 0).unwrap_or(false) { - logger::log_info(" Consolidating update log into checkpoint...").await; - - let temp_checkpoint = checkpoint_path.with_extension("tmp"); - let mut temp_file = File::create(&temp_checkpoint).await?; - - for company in existing_companies.values() { - let json_line = serde_json::to_string(company)?; - temp_file.write_all(json_line.as_bytes()).await?; - temp_file.write_all(b"\n").await?; - } - - temp_file.flush().await?; - temp_file.sync_data().await?; - drop(temp_file); - - tokio::fs::rename(&temp_checkpoint, &checkpoint_path).await?; - tokio::fs::remove_file(&log_path).await.ok(); - - logger::log_info(&format!(" ✓ Consolidated {} companies", existing_companies.len())).await; - } + if checkpoint_helpers::log_has_content(&log_path).await { + checkpoint_helpers::consolidate_checkpoint(&checkpoint_path, &log_path, &existing_companies).await?; } return Ok(existing_companies.len()); @@ -851,37 +831,6 @@ fn is_transient_error(error: &str) -> bool { true } -/// Load companies from JSONL file -async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result> { - let content = tokio::fs::read_to_string(path).await?; - let mut companies = Vec::new(); - - for line in content.lines() { - if line.trim().is_empty() { - continue; - } - if let Ok(company) = serde_json::from_str::(line) { - companies.push(company); - } - } - - Ok(companies) -} - -fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { - for tickers in company.isin_tickers_map.values() { - for ticker in tickers { - if ticker.starts_with("YAHOO:") - && ticker != "YAHOO:NO_RESULTS" - && ticker != "YAHOO:ERROR" - { - return Some(ticker.trim_start_matches("YAHOO:").to_string()); - } - } - } - None -} - fn extract_market_cap(summary: &crate::scraper::yahoo::QuoteSummary) -> f64 { let price_module = match summary.modules.get("price") { Some(m) => m, @@ -946,16 +895,7 @@ async fn save_company_core_data( ) -> anyhow::Result<()> { use tokio::fs; - let safe_name = company_name - .replace("/", "_") - .replace("\\", "_") - .replace(":", "_") - .replace("*", "_") - .replace("?", "_") - .replace("\"", "_") - .replace("<", "_") - .replace(">", "_") - .replace("|", "_"); + let safe_name = sanitize_company_name(company_name); let company_dir = paths.corporate_dir().join(&safe_name).join("core"); fs::create_dir_all(&company_dir).await?; diff --git a/src/corporate/update_companies_enrich.rs b/src/corporate/update_companies_enrich.rs index 9675de0..8d5783b 100644 --- a/src/corporate/update_companies_enrich.rs +++ b/src/corporate/update_companies_enrich.rs @@ -1,6 +1,7 @@ // src/corporate/update_companies_enrich_events.rs -use super::{types::*}; +use super::{types::*, helpers::*}; use crate::config::Config; +use crate::corporate::checkpoint_helpers; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule}; @@ -71,7 +72,7 @@ pub async fn enrich_companies_with_events( logger::log_info(" Yahoo events enrichment already completed").await; // Count enriched companies - let count = count_enriched_companies(paths).await?; + let count = checkpoint_helpers::count_enriched_companies(paths, "events").await?; logger::log_info(&format!(" ✓ Found {} companies with event data", count)).await; return Ok(count); } @@ -80,32 +81,7 @@ pub async fn enrich_companies_with_events( } // === RECOVERY PHASE: Track enriched companies === - let mut enriched_companies: HashSet = HashSet::new(); - - if log_path.exists() { - logger::log_info("Loading enrichment progress from log...").await; - let log_content = tokio::fs::read_to_string(&log_path).await?; - - for line in log_content.lines() { - if line.trim().is_empty() || !line.ends_with('}') { - continue; // Skip incomplete lines - } - - match serde_json::from_str::(line) { - Ok(entry) => { - if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) { - if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") { - enriched_companies.insert(name.to_string()); - } - } - } - Err(e) => { - logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; - } - } - } - logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await; - } + let enriched_companies: HashSet = checkpoint_helpers::load_enrichment_progress(&log_path).await?; // Load all companies from input logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await; @@ -128,7 +104,7 @@ pub async fn enrich_companies_with_events( if pending_count == 0 { logger::log_info(" ✓ All companies already enriched").await; - mark_enrichment_complete(&state_path).await?; + checkpoint_helpers::mark_step_complete(&state_path, "yahoo_events_enrichment_complete").await?; return Ok(enriched_companies.len()); } @@ -287,7 +263,7 @@ pub async fn enrich_companies_with_events( // Mark as complete if all companies processed if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) { - mark_enrichment_complete(&state_path).await?; + checkpoint_helpers::mark_step_complete(&state_path, "yahoo_events_enrichment_complete").await?; logger::log_info(" ✓ Event enrichment marked as complete").await; } @@ -441,99 +417,6 @@ async fn save_company_event_data( Ok(()) } -/// Extract first valid Yahoo ticker from company -fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { - for tickers in company.isin_tickers_map.values() { - for ticker in tickers { - if ticker.starts_with("YAHOO:") - && ticker != "YAHOO:NO_RESULTS" - && ticker != "YAHOO:ERROR" - { - return Some(ticker.trim_start_matches("YAHOO:").to_string()); - } - } - } - None -} - -/// Sanitize company name for file system -fn sanitize_company_name(name: &str) -> String { - name.replace("/", "_") - .replace("\\", "_") - .replace(":", "_") - .replace("*", "_") - .replace("?", "_") - .replace("\"", "_") - .replace("<", "_") - .replace(">", "_") - .replace("|", "_") -} - -/// Load companies from JSONL file -async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result> { - let content = tokio::fs::read_to_string(path).await?; - let mut companies = Vec::new(); - - for line in content.lines() { - if line.trim().is_empty() { - continue; - } - if let Ok(company) = serde_json::from_str::(line) { - companies.push(company); - } - } - - Ok(companies) -} - -/// Count enriched companies (companies with event data) -async fn count_enriched_companies(paths: &DataPaths) -> anyhow::Result { - let corporate_dir = paths.corporate_dir(); - - if !corporate_dir.exists() { - return Ok(0); - } - - let mut count = 0; - let mut entries = tokio::fs::read_dir(&corporate_dir).await?; - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.is_dir() { - let events_dir = path.join("events"); - let events_file = events_dir.join("data.jsonl"); - - if events_file.exists() { - count += 1; - } - } - } - - Ok(count) -} - -/// Mark enrichment as complete in state file -async fn mark_enrichment_complete(state_path: &std::path::Path) -> anyhow::Result<()> { - let enrichment_complete = json!({ - "yahoo_events_enrichment_complete": true, - "completed_at": Utc::now().to_rfc3339(), - }); - - let mut state_file = OpenOptions::new() - .create(true) - .append(true) - .open(state_path) - .await?; - - let state_line = serde_json::to_string(&enrichment_complete)?; - state_file.write_all(state_line.as_bytes()).await?; - state_file.write_all(b"\n").await?; - state_file.flush().await?; - state_file.sync_all().await?; - - Ok(()) -} - /// Log command enum enum LogCommand { Write(serde_json::Value), diff --git a/src/corporate/update_companies_enrich_options_chart.rs b/src/corporate/update_companies_enrich_options_chart.rs index 355e9c6..29f3e00 100644 --- a/src/corporate/update_companies_enrich_options_chart.rs +++ b/src/corporate/update_companies_enrich_options_chart.rs @@ -1,6 +1,7 @@ // src/corporate/update_companies_enrich_options_chart.rs -use super::{types::*}; +use super::{types::*, helpers::*}; use crate::config::Config; +use crate::corporate::checkpoint_helpers; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::yahoo::{YahooClientPool}; @@ -70,7 +71,7 @@ pub async fn enrich_companies_with_options( logger::log_info(" Yahoo options enrichment already completed").await; // Count enriched companies - let count = count_enriched_companies(paths, "options").await?; + let count = checkpoint_helpers::count_enriched_companies(paths, "options").await?; logger::log_info(&format!(" ✓ Found {} companies with options data", count)).await; return Ok(count); } @@ -79,32 +80,7 @@ pub async fn enrich_companies_with_options( } // === RECOVERY PHASE: Track enriched companies === - let mut enriched_companies: HashSet = HashSet::new(); - - if log_path.exists() { - logger::log_info("Loading options enrichment progress from log...").await; - let log_content = tokio::fs::read_to_string(&log_path).await?; - - for line in log_content.lines() { - if line.trim().is_empty() || !line.ends_with('}') { - continue; // Skip incomplete lines - } - - match serde_json::from_str::(line) { - Ok(entry) => { - if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) { - if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") { - enriched_companies.insert(name.to_string()); - } - } - } - Err(e) => { - logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; - } - } - } - logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await; - } + let enriched_companies = checkpoint_helpers::load_enrichment_progress(&log_path).await?; // Load all companies from input logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await; @@ -127,7 +103,7 @@ pub async fn enrich_companies_with_options( if pending_count == 0 { logger::log_info(" ✓ All companies already enriched with options data").await; - mark_enrichment_complete(&state_path, "yahoo_options_enrichment_complete").await?; + checkpoint_helpers::mark_step_complete(&state_path, "yahoo_options_enrichment_complete").await?; return Ok(enriched_companies.len()); } @@ -275,7 +251,7 @@ pub async fn enrich_companies_with_options( // Mark as complete if no shutdown if !shutdown_flag.load(Ordering::SeqCst) { - mark_enrichment_complete(&state_path, "yahoo_options_enrichment_complete").await?; + checkpoint_helpers::mark_step_complete(&state_path, "yahoo_options_enrichment_complete").await?; } Ok(final_success) @@ -335,7 +311,7 @@ pub async fn enrich_companies_with_chart( logger::log_info(" Yahoo chart enrichment already completed").await; // Count enriched companies - let count = count_enriched_companies(paths, "chart").await?; + let count = checkpoint_helpers::count_enriched_companies(paths, "chart").await?; logger::log_info(&format!(" ✓ Found {} companies with chart data", count)).await; return Ok(count); } @@ -392,7 +368,7 @@ pub async fn enrich_companies_with_chart( if pending_count == 0 { logger::log_info(" ✓ All companies already enriched with chart data").await; - mark_enrichment_complete(&state_path, "yahoo_chart_enrichment_complete").await?; + checkpoint_helpers::mark_step_complete(&state_path, "yahoo_chart_enrichment_complete").await?; return Ok(enriched_companies.len()); } @@ -540,7 +516,7 @@ pub async fn enrich_companies_with_chart( // Mark as complete if no shutdown if !shutdown_flag.load(Ordering::SeqCst) { - mark_enrichment_complete(&state_path, "yahoo_chart_enrichment_complete").await?; + checkpoint_helpers::mark_step_complete(&state_path, "yahoo_chart_enrichment_complete").await?; } Ok(final_success) @@ -700,99 +676,6 @@ async fn save_company_data( Ok(()) } -/// Extract first valid Yahoo ticker from company -fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { - for tickers in company.isin_tickers_map.values() { - for ticker in tickers { - if ticker.starts_with("YAHOO:") - && ticker != "YAHOO:NO_RESULTS" - && ticker != "YAHOO:ERROR" - { - return Some(ticker.trim_start_matches("YAHOO:").to_string()); - } - } - } - None -} - -/// Sanitize company name for file system -fn sanitize_company_name(name: &str) -> String { - name.replace("/", "_") - .replace("\\", "_") - .replace(":", "_") - .replace("*", "_") - .replace("?", "_") - .replace("\"", "_") - .replace("<", "_") - .replace(">", "_") - .replace("|", "_") -} - -/// Load companies from JSONL file -async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result> { - let content = tokio::fs::read_to_string(path).await?; - let mut companies = Vec::new(); - - for line in content.lines() { - if line.trim().is_empty() { - continue; - } - if let Ok(company) = serde_json::from_str::(line) { - companies.push(company); - } - } - - Ok(companies) -} - -/// Count enriched companies (companies with specific data type) -async fn count_enriched_companies(paths: &DataPaths, data_type: &str) -> anyhow::Result { - let corporate_dir = paths.corporate_dir(); - - if !corporate_dir.exists() { - return Ok(0); - } - - let mut count = 0; - let mut entries = tokio::fs::read_dir(&corporate_dir).await?; - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.is_dir() { - let data_dir = path.join(data_type); - let data_file = data_dir.join("data.jsonl"); - - if data_file.exists() { - count += 1; - } - } - } - - Ok(count) -} - -/// Mark enrichment as complete in state file -async fn mark_enrichment_complete(state_path: &std::path::Path, key: &str) -> anyhow::Result<()> { - let enrichment_complete = json!({ - key: true, - "completed_at": Utc::now().to_rfc3339(), - }); - - let mut state_file = OpenOptions::new() - .create(true) - .append(true) - .open(state_path) - .await?; - - let state_line = serde_json::to_string(&enrichment_complete)?; - state_file.write_all(state_line.as_bytes()).await?; - state_file.write_all(b"\n").await?; - state_file.flush().await?; - state_file.sync_all().await?; - - Ok(()) -} - /// Log command enum enum LogCommand { Write(serde_json::Value), diff --git a/src/economic/update.rs b/src/economic/update.rs index 6c09d56..a8c1b60 100644 --- a/src/economic/update.rs +++ b/src/economic/update.rs @@ -1,8 +1,9 @@ // src/economic/update.rs use super::{scraper::*, storage::*, helpers::*, types::*}; +use crate::check_shutdown; use crate::{config::Config, scraper::webdriver::{ScrapeTask, ChromeDriverPool}, util::directories::DataPaths, util::logger}; use chrono::{Local}; -use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; +use std::sync::{Arc, atomic::{AtomicBool}}; use std::collections::HashMap; /// Runs the full update for economic data using streaming to minimize memory usage @@ -16,6 +17,8 @@ pub async fn run_full_update(config: &Config, pool: &Arc, shut logger::log_info("=== Economic Update ===").await; + check_shutdown!(shutdown_flag); + // Step 1: Build lightweight index instead of loading all events logger::log_info("Step 1: Building event index...").await; let chunks = scan_existing_chunks(&paths).await?; @@ -23,10 +26,7 @@ pub async fn run_full_update(config: &Config, pool: &Arc, shut logger::log_info(&format!(" Economic Update: Indexed {} events from {} chunks", event_index.len(), chunks.len())).await; - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after GLEIF download").await; - return Ok(()); - } + check_shutdown!(shutdown_flag); // Step 2: Determine start date let start_date = if event_index.is_empty() { @@ -54,20 +54,14 @@ pub async fn run_full_update(config: &Config, pool: &Arc, shut } }; - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after GLEIF download").await; - return Ok(()); - } + check_shutdown!(shutdown_flag); // Step 3: Scrape new events in batches logger::log_info(&format!("Step 3: Scraping events from {} → {}", start_date, end_date)).await; let new_events = scrape_all_economic_events(&start_date, &end_date, pool).await?; logger::log_info(&format!(" Scraped {} new events", new_events.len())).await; - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after GLEIF download").await; - return Ok(()); - } + check_shutdown!(shutdown_flag); // Step 4: Process events in streaming fashion logger::log_info(&format!("Step 4: Detecting changes")).await; @@ -79,10 +73,7 @@ pub async fn run_full_update(config: &Config, pool: &Arc, shut logger::log_info(" Changes saved successfully").await; } - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn("Shutdown detected after GLEIF download").await; - return Ok(()); - } + check_shutdown!(shutdown_flag); // Step 5: Save consolidated events logger::log_info(&format!("Step 5: Saving {} total events to chunks", updated_events.len())).await; diff --git a/src/lib.rs b/src/lib.rs index d136605..d2cd5d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ pub use monitoring::{init_monitoring, ConfigSnapshot, MonitoringEvent}; pub use config::Config; pub use scraper::webdriver::{ChromeDriverPool, ChromeInstance, ScrapeTask}; pub use util::logger; +pub use util::macros; pub use scraper::yahoo::{ YahooClient, YahooClientPool, QuoteSummaryModule, QuoteSummary, ChartData, OptionsData, SearchResult diff --git a/src/main.rs b/src/main.rs index 3966ae9..c05f0a8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -266,7 +266,7 @@ async fn main() -> Result<()> { cleanup_all_proxy_containers().await.ok(); } - // ✅ ADDED: Final force-kill to ensure no leaks + // Final force-kill to ensure no leaks #[cfg(target_os = "windows")] { logger::log_info("Final cleanup: force-killing any remaining Chrome processes...").await; diff --git a/src/util/macros.rs b/src/util/macros.rs new file mode 100644 index 0000000..4e59b51 --- /dev/null +++ b/src/util/macros.rs @@ -0,0 +1,10 @@ +// src/macros.rs +#[macro_export] +macro_rules! check_shutdown { + ($shutdown_flag:expr) => { + if $shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) { + logger::log_warn("Shutdown detected, stopping update").await; + return Ok(()); + } + }; +} diff --git a/src/util/mod.rs b/src/util/mod.rs index 696c249..5ddd052 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,4 +1,5 @@ // src/util/mod.rs pub mod logger; pub mod directories; -pub mod opnv; \ No newline at end of file +pub mod opnv; +pub mod macros; \ No newline at end of file