From ea128f6187c5544336bb4dca24c7d7527e7100b6 Mon Sep 17 00:00:00 2001 From: donpat1to Date: Thu, 8 Jan 2026 11:35:25 +0100 Subject: [PATCH] added options chart enrichment --- src/corporate/mod.rs | 1 + src/corporate/update.rs | 21 +- .../update_companies_enrich_options_chart.rs | 798 ++++++++++++++++++ src/corporate/yahoo.rs | 2 +- 4 files changed, 820 insertions(+), 2 deletions(-) create mode 100644 src/corporate/update_companies_enrich_options_chart.rs diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 5a74594..2c770fd 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -15,5 +15,6 @@ pub mod update; pub mod update_companies; pub mod update_companies_cleanse; pub mod update_companies_enrich; +pub mod update_companies_enrich_options_chart; pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 68c4de7..0880789 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -4,6 +4,7 @@ use crate::config::Config; use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel; use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data}; use crate::corporate::update_companies_enrich::enrich_companies_with_events; +use crate::corporate::update_companies_enrich_options_chart::{enrich_companies_with_options, enrich_companies_with_chart}; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; @@ -122,8 +123,26 @@ pub async fn run_full_update( return Ok(()); } + logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await; + let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies enriched with options data", options_count)).await; + + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after options enrichment").await; + return Ok(()); + } + + logger::log_info("Step 10: Enriching companies with Yahoo Chart (with abort-safe persistence)...").await; + let chart_count = enrich_companies_with_chart(&paths, config, yahoo_pool.clone(), shutdown_flag).await?; + logger::log_info(&format!(" ✓ {} companies enriched with chart data", chart_count)).await; + + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after chart enrichment").await; + return Ok(()); + } + if !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Step 9: Processing events (using index)...").await; + logger::log_info("Step 11: Processing events (using index)...").await; let _event_index = build_event_index(&paths).await?; logger::log_info(" ✓ Event index built").await; } else { diff --git a/src/corporate/update_companies_enrich_options_chart.rs b/src/corporate/update_companies_enrich_options_chart.rs new file mode 100644 index 0000000..aaac8fe --- /dev/null +++ b/src/corporate/update_companies_enrich_options_chart.rs @@ -0,0 +1,798 @@ +// src/corporate/update_companies_enrich_options_chart.rs +use super::{types::*}; +use crate::config::Config; +use crate::util::directories::DataPaths; +use crate::util::logger; +use crate::scraper::yahoo::{YahooClientPool}; + +use std::result::Result::Ok; +use chrono::Utc; +use std::collections::{HashSet}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use tokio::fs::{OpenOptions}; +use tokio::io::{AsyncWriteExt}; +use futures::stream::{FuturesUnordered, StreamExt}; +use serde_json::json; +use tokio::sync::mpsc; + +/// Yahoo Options enrichment per corporate company +/// +/// # Features +/// - Graceful shutdown (abort-safe) +/// - Task panic isolation (tasks fail independently) +/// - Crash-safe persistence (checkpoint + log with fsync) +/// - Smart skip logic (only process incomplete data) +/// - Uses pending queue instead of retry mechanism +/// +/// # Persistence Strategy +/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) +/// - Log: companies_options_updates.log (append-only updates) +/// - On restart: Load checkpoint + replay log +/// - Periodic checkpoints (every 50 companies) +/// - Batched fsync (every 10 writes or 10 seconds) +pub async fn enrich_companies_with_options( + paths: &DataPaths, + _config: &Config, + yahoo_pool: Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 50; + const FSYNC_BATCH_SIZE: usize = 10; + const FSYNC_INTERVAL_SECS: u64 = 10; + const CONCURRENCY_LIMIT: usize = 50; + + let data_path = paths.data_dir(); + + // File paths + let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); + let log_path = data_path.join("companies_options_updates.log"); + let state_path = data_path.join("state.jsonl"); + + // Check input exists + if !input_path.exists() { + logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping options enrichment").await; + return Ok(0); + } + + // Check if already completed + if state_path.exists() { + let state_content = tokio::fs::read_to_string(&state_path).await?; + + for line in state_content.lines() { + if line.trim().is_empty() { + continue; + } + + if let Ok(state) = serde_json::from_str::(line) { + if state.get("yahoo_options_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) { + logger::log_info(" Yahoo options enrichment already completed").await; + + // Count enriched companies + let count = count_enriched_companies(paths, "options").await?; + logger::log_info(&format!(" ✓ Found {} companies with options data", count)).await; + return Ok(count); + } + } + } + } + + // === RECOVERY PHASE: Track enriched companies === + let mut enriched_companies: HashSet = HashSet::new(); + + if log_path.exists() { + logger::log_info("Loading options enrichment progress from log...").await; + let log_content = tokio::fs::read_to_string(&log_path).await?; + + for line in log_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(entry) => { + if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) { + if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") { + enriched_companies.insert(name.to_string()); + } + } + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; + } + } + } + logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await; + } + + // Load all companies from input + logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await; + let companies = load_companies_from_jsonl(&input_path).await?; + let total_companies = companies.len(); + logger::log_info(&format!("Found {} companies to process", total_companies)).await; + + // Filter companies that need enrichment + let pending_companies: Vec = companies + .into_iter() + .filter(|company| !enriched_companies.contains(&company.name)) + .collect(); + + let pending_count = pending_companies.len(); + logger::log_info(&format!( + " {} already enriched, {} pending", + enriched_companies.len(), + pending_count + )).await; + + if pending_count == 0 { + logger::log_info(" ✓ All companies already enriched with options data").await; + mark_enrichment_complete(&state_path, "yahoo_options_enrichment_complete").await?; + return Ok(enriched_companies.len()); + } + + // === PROCESSING PHASE: Enrich companies with options === + + // Shared counters + let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let success_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let failed_count = Arc::new(AtomicUsize::new(0)); + + // Log writer channel with batching and fsync + let (log_tx, mut log_rx) = mpsc::channel::(1000); + + // Spawn log writer task + let log_writer_handle = { + let log_path = log_path.clone(); + let processed_count = Arc::clone(&processed_count); + let total_companies = total_companies; + + tokio::spawn(async move { + let mut log_file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await + .expect("Failed to open log file"); + + let mut write_count = 0; + let mut last_fsync = tokio::time::Instant::now(); + + while let Some(cmd) = log_rx.recv().await { + match cmd { + LogCommand::Write(entry) => { + let json_line = serde_json::to_string(&entry).expect("Serialization failed"); + log_file.write_all(json_line.as_bytes()).await.expect("Write failed"); + log_file.write_all(b"\n").await.expect("Write failed"); + + write_count += 1; + + // Batched fsync + if write_count >= FSYNC_BATCH_SIZE + || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS + { + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + } + } + LogCommand::Checkpoint => { + // Force fsync on checkpoint + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + + let current = processed_count.load(Ordering::SeqCst); + logger::log_info(&format!( + " Checkpoint: {}/{} companies processed", + current, total_companies + )).await; + } + LogCommand::Shutdown => { + // Final fsync before shutdown + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + break; + } + } + } + }) + }; + + // Process companies concurrently with task panic isolation + let mut tasks = FuturesUnordered::new(); + let mut pending_iter = pending_companies.into_iter(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT)); + + // Initial batch of tasks + for _ in 0..CONCURRENCY_LIMIT.min(pending_count) { + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + EnrichmentType::Options, + ); + tasks.push(task); + } + } + + // Process tasks as they complete and spawn new ones + let mut checkpoint_counter = 0; + while let Some(_result) = tasks.next().await { + // Check for shutdown + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown signal received, stopping options enrichment").await; + break; + } + + // Checkpoint periodically + checkpoint_counter += 1; + if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { + let _ = log_tx.send(LogCommand::Checkpoint).await; + } + + // Spawn next task if available + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + EnrichmentType::Options, + ); + tasks.push(task); + } + } + + // Final checkpoint and shutdown + let _ = log_tx.send(LogCommand::Checkpoint).await; + let _ = log_tx.send(LogCommand::Shutdown).await; + + // Wait for log writer to finish + let _ = log_writer_handle.await; + + let final_success = success_count.load(Ordering::SeqCst); + let final_failed = failed_count.load(Ordering::SeqCst); + + logger::log_info(&format!( + " Options enrichment: {} succeeded, {} failed", + final_success, final_failed + )).await; + + // Mark as complete if no shutdown + if !shutdown_flag.load(Ordering::SeqCst) { + mark_enrichment_complete(&state_path, "yahoo_options_enrichment_complete").await?; + } + + Ok(final_success) +} + +/// Yahoo Chart enrichment per corporate company +/// +/// # Features +/// - Graceful shutdown (abort-safe) +/// - Task panic isolation (tasks fail independently) +/// - Crash-safe persistence (checkpoint + log with fsync) +/// - Smart skip logic (only process incomplete data) +/// - Uses pending queue instead of retry mechanism +/// +/// # Persistence Strategy +/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state) +/// - Log: companies_chart_updates.log (append-only updates) +/// - On restart: Load checkpoint + replay log +/// - Periodic checkpoints (every 50 companies) +/// - Batched fsync (every 10 writes or 10 seconds) +pub async fn enrich_companies_with_chart( + paths: &DataPaths, + _config: &Config, + yahoo_pool: Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 50; + const FSYNC_BATCH_SIZE: usize = 10; + const FSYNC_INTERVAL_SECS: u64 = 10; + const CONCURRENCY_LIMIT: usize = 50; + + let data_path = paths.data_dir(); + + // File paths + let input_path = data_path.join("companies_yahoo_cleaned.jsonl"); + let log_path = data_path.join("companies_chart_updates.log"); + let state_path = data_path.join("state.jsonl"); + + // Check input exists + if !input_path.exists() { + logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping chart enrichment").await; + return Ok(0); + } + + // Check if already completed + if state_path.exists() { + let state_content = tokio::fs::read_to_string(&state_path).await?; + + for line in state_content.lines() { + if line.trim().is_empty() { + continue; + } + + if let Ok(state) = serde_json::from_str::(line) { + if state.get("yahoo_chart_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) { + logger::log_info(" Yahoo chart enrichment already completed").await; + + // Count enriched companies + let count = count_enriched_companies(paths, "chart").await?; + logger::log_info(&format!(" ✓ Found {} companies with chart data", count)).await; + return Ok(count); + } + } + } + } + + // === RECOVERY PHASE: Track enriched companies === + let mut enriched_companies: HashSet = HashSet::new(); + + if log_path.exists() { + logger::log_info("Loading chart enrichment progress from log...").await; + let log_content = tokio::fs::read_to_string(&log_path).await?; + + for line in log_content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(entry) => { + if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) { + if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") { + enriched_companies.insert(name.to_string()); + } + } + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; + } + } + } + logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await; + } + + // Load all companies from input + logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await; + let companies = load_companies_from_jsonl(&input_path).await?; + let total_companies = companies.len(); + logger::log_info(&format!("Found {} companies to process", total_companies)).await; + + // Filter companies that need enrichment + let pending_companies: Vec = companies + .into_iter() + .filter(|company| !enriched_companies.contains(&company.name)) + .collect(); + + let pending_count = pending_companies.len(); + logger::log_info(&format!( + " {} already enriched, {} pending", + enriched_companies.len(), + pending_count + )).await; + + if pending_count == 0 { + logger::log_info(" ✓ All companies already enriched with chart data").await; + mark_enrichment_complete(&state_path, "yahoo_chart_enrichment_complete").await?; + return Ok(enriched_companies.len()); + } + + // === PROCESSING PHASE: Enrich companies with chart === + + // Shared counters + let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let success_count = Arc::new(AtomicUsize::new(enriched_companies.len())); + let failed_count = Arc::new(AtomicUsize::new(0)); + + // Log writer channel with batching and fsync + let (log_tx, mut log_rx) = mpsc::channel::(1000); + + // Spawn log writer task + let log_writer_handle = { + let log_path = log_path.clone(); + let processed_count = Arc::clone(&processed_count); + let total_companies = total_companies; + + tokio::spawn(async move { + let mut log_file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await + .expect("Failed to open log file"); + + let mut write_count = 0; + let mut last_fsync = tokio::time::Instant::now(); + + while let Some(cmd) = log_rx.recv().await { + match cmd { + LogCommand::Write(entry) => { + let json_line = serde_json::to_string(&entry).expect("Serialization failed"); + log_file.write_all(json_line.as_bytes()).await.expect("Write failed"); + log_file.write_all(b"\n").await.expect("Write failed"); + + write_count += 1; + + // Batched fsync + if write_count >= FSYNC_BATCH_SIZE + || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS + { + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + } + } + LogCommand::Checkpoint => { + // Force fsync on checkpoint + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + write_count = 0; + last_fsync = tokio::time::Instant::now(); + + let current = processed_count.load(Ordering::SeqCst); + logger::log_info(&format!( + " Checkpoint: {}/{} companies processed", + current, total_companies + )).await; + } + LogCommand::Shutdown => { + // Final fsync before shutdown + log_file.flush().await.expect("Flush failed"); + log_file.sync_all().await.expect("Fsync failed"); + break; + } + } + } + }) + }; + + // Process companies concurrently with task panic isolation + let mut tasks = FuturesUnordered::new(); + let mut pending_iter = pending_companies.into_iter(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT)); + + // Initial batch of tasks + for _ in 0..CONCURRENCY_LIMIT.min(pending_count) { + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + EnrichmentType::Chart, + ); + tasks.push(task); + } + } + + // Process tasks as they complete and spawn new ones + let mut checkpoint_counter = 0; + while let Some(_result) = tasks.next().await { + // Check for shutdown + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown signal received, stopping chart enrichment").await; + break; + } + + // Checkpoint periodically + checkpoint_counter += 1; + if checkpoint_counter % CHECKPOINT_INTERVAL == 0 { + let _ = log_tx.send(LogCommand::Checkpoint).await; + } + + // Spawn next task if available + if let Some(company) = pending_iter.next() { + let task = spawn_enrichment_task( + company, + Arc::clone(&yahoo_pool), + paths.clone(), + Arc::clone(&processed_count), + Arc::clone(&success_count), + Arc::clone(&failed_count), + log_tx.clone(), + Arc::clone(&semaphore), + Arc::clone(shutdown_flag), + EnrichmentType::Chart, + ); + tasks.push(task); + } + } + + // Final checkpoint and shutdown + let _ = log_tx.send(LogCommand::Checkpoint).await; + let _ = log_tx.send(LogCommand::Shutdown).await; + + // Wait for log writer to finish + let _ = log_writer_handle.await; + + let final_success = success_count.load(Ordering::SeqCst); + let final_failed = failed_count.load(Ordering::SeqCst); + + logger::log_info(&format!( + " Chart enrichment: {} succeeded, {} failed", + final_success, final_failed + )).await; + + // Mark as complete if no shutdown + if !shutdown_flag.load(Ordering::SeqCst) { + mark_enrichment_complete(&state_path, "yahoo_chart_enrichment_complete").await?; + } + + Ok(final_success) +} + +/// Type of enrichment being performed +#[derive(Clone, Copy)] +enum EnrichmentType { + Options, + Chart, +} + +/// Spawn an enrichment task with panic isolation +fn spawn_enrichment_task( + company: CompanyCrossPlatformInfo, + yahoo_pool: Arc, + paths: DataPaths, + processed_count: Arc, + success_count: Arc, + failed_count: Arc, + log_tx: mpsc::Sender, + semaphore: Arc, + shutdown_flag: Arc, + enrichment_type: EnrichmentType, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + // Acquire semaphore permit + let _permit = semaphore.acquire().await.expect("Semaphore closed"); + + // Check shutdown before processing + if shutdown_flag.load(Ordering::SeqCst) { + return; + } + + // Perform enrichment (panic-isolated) + let result = match enrichment_type { + EnrichmentType::Options => { + enrich_company_with_options(&company, &yahoo_pool, &paths).await + } + EnrichmentType::Chart => { + enrich_company_with_chart(&company, &yahoo_pool, &paths).await + } + }; + + // Update counters + processed_count.fetch_add(1, Ordering::SeqCst); + + let status = match result { + Ok(_) => { + success_count.fetch_add(1, Ordering::SeqCst); + "enriched" + } + Err(e) => { + failed_count.fetch_add(1, Ordering::SeqCst); + logger::log_warn(&format!( + " Failed to enrich {}: {}", + company.name, e + )).await; + "failed" + } + }; + + // Log result + let log_entry = json!({ + "company_name": company.name, + "status": status, + "timestamp": Utc::now().to_rfc3339(), + }); + + let _ = log_tx.send(LogCommand::Write(log_entry)).await; + }) +} + +/// Enrich a single company with options data +async fn enrich_company_with_options( + company: &CompanyCrossPlatformInfo, + yahoo_pool: &Arc, + paths: &DataPaths, +) -> anyhow::Result<()> { + let ticker = match extract_first_yahoo_ticker(company) { + Some(t) => t, + None => { + return Err(anyhow::anyhow!("No valid Yahoo ticker found")); + } + }; + + // Get options data for all available expiration dates + let options_data = yahoo_pool.get_options_data(&ticker, None).await?; + + // Only save if we got meaningful data + if options_data.options.is_empty() { + return Err(anyhow::anyhow!("No options data available")); + } + + // Save the options data + save_company_data(paths, &company.name, &options_data, "options").await?; + + Ok(()) +} + +/// Enrich a single company with chart data +async fn enrich_company_with_chart( + company: &CompanyCrossPlatformInfo, + yahoo_pool: &Arc, + paths: &DataPaths, +) -> anyhow::Result<()> { + let ticker = match extract_first_yahoo_ticker(company) { + Some(t) => t, + None => { + return Err(anyhow::anyhow!("No valid Yahoo ticker found")); + } + }; + + // Get 1 year of daily chart data + let now = chrono::Utc::now().timestamp(); + let twenty_five_years_ago = now - (25 * 365 * 24 * 60 * 60); + + let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", twenty_five_years_ago, now).await?; + + // Only save if we got meaningful data + if chart_data.quotes.is_empty() { + return Err(anyhow::anyhow!("No chart data available")); + } + + // Save the chart data + save_company_data(paths, &company.name, &chart_data, "chart").await?; + + Ok(()) +} + +/// Save data to company directory +async fn save_company_data( + paths: &DataPaths, + company_name: &str, + data: &T, + data_type: &str, +) -> anyhow::Result<()> { + use tokio::fs; + + let safe_name = sanitize_company_name(company_name); + + let company_dir = paths.corporate_dir().join(&safe_name).join(data_type); + fs::create_dir_all(&company_dir).await?; + + let data_path = company_dir.join("data.jsonl"); + let json_line = serde_json::to_string(data)?; + + let mut file = fs::File::create(&data_path).await?; + file.write_all(json_line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + file.sync_all().await?; // Ensure data is persisted + + Ok(()) +} + +/// Extract first valid Yahoo ticker from company +fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { + for tickers in company.isin_tickers_map.values() { + for ticker in tickers { + if ticker.starts_with("YAHOO:") + && ticker != "YAHOO:NO_RESULTS" + && ticker != "YAHOO:ERROR" + { + return Some(ticker.trim_start_matches("YAHOO:").to_string()); + } + } + } + None +} + +/// Sanitize company name for file system +fn sanitize_company_name(name: &str) -> String { + name.replace("/", "_") + .replace("\\", "_") + .replace(":", "_") + .replace("*", "_") + .replace("?", "_") + .replace("\"", "_") + .replace("<", "_") + .replace(">", "_") + .replace("|", "_") +} + +/// Load companies from JSONL file +async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result> { + let content = tokio::fs::read_to_string(path).await?; + let mut companies = Vec::new(); + + for line in content.lines() { + if line.trim().is_empty() { + continue; + } + if let Ok(company) = serde_json::from_str::(line) { + companies.push(company); + } + } + + Ok(companies) +} + +/// Count enriched companies (companies with specific data type) +async fn count_enriched_companies(paths: &DataPaths, data_type: &str) -> anyhow::Result { + let corporate_dir = paths.corporate_dir(); + + if !corporate_dir.exists() { + return Ok(0); + } + + let mut count = 0; + let mut entries = tokio::fs::read_dir(&corporate_dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + let data_dir = path.join(data_type); + let data_file = data_dir.join("data.jsonl"); + + if data_file.exists() { + count += 1; + } + } + } + + Ok(count) +} + +/// Mark enrichment as complete in state file +async fn mark_enrichment_complete(state_path: &std::path::Path, key: &str) -> anyhow::Result<()> { + let enrichment_complete = json!({ + key: true, + "completed_at": Utc::now().to_rfc3339(), + }); + + let mut state_file = OpenOptions::new() + .create(true) + .append(true) + .open(state_path) + .await?; + + let state_line = serde_json::to_string(&enrichment_complete)?; + state_file.write_all(state_line.as_bytes()).await?; + state_file.write_all(b"\n").await?; + state_file.flush().await?; + state_file.sync_all().await?; + + Ok(()) +} + +/// Log command enum +enum LogCommand { + Write(serde_json::Value), + Checkpoint, + Shutdown, +} \ No newline at end of file diff --git a/src/corporate/yahoo.rs b/src/corporate/yahoo.rs index 9acb454..2fdbbcd 100644 --- a/src/corporate/yahoo.rs +++ b/src/corporate/yahoo.rs @@ -63,7 +63,7 @@ impl YahooTickerResult { } } -/// UPDATED: Scrape company details with full validation and shutdown support +/// Scrape company details with full validation and shutdown support pub async fn scrape_company_details_by_isin( pool: &Arc, isin: &str,