From 9c66f0d361ebd83d80fe8ae48c4f2de98f093d37 Mon Sep 17 00:00:00 2001 From: donpat1to Date: Thu, 18 Dec 2025 13:05:23 +0100 Subject: [PATCH] added parallelized scraping instances for company yahoo ticker seeding --- src/corporate/mod.rs | 2 + src/corporate/update.rs | 110 +++-- src/corporate/update_parallel.rs | 522 ++++++++++++++++++++++ src/corporate/yahoo.rs | 93 +++- src/corporate/yahoo_company_extraction.js | 143 ++++-- src/main.rs | 18 +- src/scraper/webdriver.rs | 22 +- 7 files changed, 842 insertions(+), 68 deletions(-) create mode 100644 src/corporate/update_parallel.rs diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 9cce5cc..703fe2e 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -8,5 +8,7 @@ pub mod aggregation; pub mod fx; pub mod openfigi; pub mod yahoo; +pub mod update_parallel; + pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/update.rs b/src/corporate/update.rs index ffd8dc9..7684978 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,7 +1,7 @@ // src/corporate/update.rs - ABORT-SAFE VERSION WITH JSONL LOG - use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*}; use crate::config::Config; +use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; @@ -73,7 +73,7 @@ pub async fn run_full_update( } logger::log_info("Step 5: Building companies.jsonl (streaming with abort-safe persistence)...").await; - let count = build_companies_jsonl_streaming(&paths, pool, shutdown_flag).await?; + let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag).await?; logger::log_info(&format!(" ✓ Saved {} companies", count)).await; if !shutdown_flag.load(Ordering::SeqCst) { @@ -90,15 +90,31 @@ pub async fn run_full_update( /// /// Implements the data_updating_rule.md specification: /// - Append-only JSONL log for all updates -/// - fsync after each write batch +/// - Batched fsync for performance (configurable batch size) +/// - Time-based fsync for safety (max 10 seconds without fsync) /// - Atomic checkpoints via temp file + rename /// - Crash recovery by loading checkpoint + replaying log -/// - Partial lines ignored during recovery +/// - Partial lines automatically ignored by .lines() iterator +/// +/// # Error Handling & Crash Safety +/// +/// If any write or fsync fails: +/// - Function returns error immediately +/// - Partial line may be in OS buffer but not fsynced +/// - On next startup, .lines() will either: +/// a) Skip partial line (if no \n written) +/// b) Fail to parse malformed JSON (logged and skipped) +/// - No data corruption, at most last batch entries lost async fn build_companies_jsonl_streaming( paths: &DataPaths, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 50; // Create checkpoint every N updates + const FSYNC_BATCH_SIZE: usize = 10; // fsync every N writes for performance + const FSYNC_INTERVAL_SECS: u64 = 10; // Also fsync every N seconds for safety + let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); let securities_path = corporate_path.join("common_stocks.json"); @@ -125,23 +141,22 @@ async fn build_companies_jsonl_streaming( if companies_path.exists() { logger::log_info("Loading checkpoint from companies.jsonl...").await; let existing_content = tokio::fs::read_to_string(&companies_path).await?; + + // Note: .lines() only returns complete lines terminated with \n + // Partial lines (incomplete writes from crashes) are automatically skipped for line in existing_content.lines() { if line.trim().is_empty() { continue; } - // Only process complete lines (ending with proper JSON closing brace) - // This ensures we don't process partial writes from crashed processes - if !line.ends_with('}') { - logger::log_warn(&format!("Skipping incomplete checkpoint line: {}", &line[..line.len().min(50)])).await; - continue; - } + match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); } Err(e) => { - logger::log_warn(&format!("Failed to parse checkpoint line: {}", e)).await; + // This catches both malformed JSON and partial lines + logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; } } } @@ -153,16 +168,14 @@ async fn build_companies_jsonl_streaming( logger::log_info("Replaying update log...").await; let log_content = tokio::fs::read_to_string(&log_path).await?; let mut replayed = 0; + + // Note: .lines() only returns complete lines terminated with \n + // Partial lines from crashes are automatically skipped for line in log_content.lines() { if line.trim().is_empty() { continue; } - // Only replay complete lines (crash-safe: incomplete lines are ignored) - // A line is considered complete only if it ends with '\n' and valid JSON - if !line.ends_with('}') { - logger::log_warn(&format!("Skipping incomplete log line: {}", &line[..line.len().min(50)])).await; - continue; - } + match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); @@ -170,7 +183,8 @@ async fn build_companies_jsonl_streaming( replayed += 1; } Err(e) => { - logger::log_warn(&format!("Failed to parse log line: {}", e)).await; + // This catches both malformed JSON and partial lines + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; } } } @@ -190,9 +204,12 @@ async fn build_companies_jsonl_streaming( let mut count = existing_companies.len(); let mut updated_count = 0; let mut new_count = 0; - let checkpoint_interval = 50; // Create atomic checkpoint every 50 updates let mut updates_since_checkpoint = 0; + // Batched fsync tracking for performance + let mut writes_since_fsync = 0; + let mut last_fsync = std::time::Instant::now(); + use tokio::io::AsyncWriteExt; for (name, company_info) in securities.iter() { @@ -296,18 +313,29 @@ async fn build_companies_jsonl_streaming( exchange, }; - // === APPEND-ONLY: Write single-line JSON with fsync === - // This guarantees the line is either fully written or not at all + // === APPEND-ONLY: Write single-line JSON with batched fsync === + // Write guarantees the line is either fully written or not at all let line = serde_json::to_string(&company_entry)?; log_file.write_all(line.as_bytes()).await?; log_file.write_all(b"\n").await?; - log_file.flush().await?; + writes_since_fsync += 1; - // Critical: fsync to ensure durability before considering write successful - // This prevents data loss on power failure or kernel panic - log_file.sync_data().await?; + // Batched fsync for performance + time-based fsync for safety + // fsync if: batch size reached OR time interval exceeded + let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE + || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; - // Update in-memory state ONLY after successful fsync + if should_fsync { + log_file.flush().await?; + // Critical: fsync to ensure durability before considering writes successful + // This prevents data loss on power failure or kernel panic + log_file.sync_data().await?; + writes_since_fsync = 0; + last_fsync = std::time::Instant::now(); + } + + // Update in-memory state ONLY after write (fsync happens in batches) + // This is safe because we fsync before checkpoints and at end of processing processed_names.insert(name.clone()); existing_companies.insert(name.clone(), company_entry); @@ -322,7 +350,15 @@ async fn build_companies_jsonl_streaming( // === ATOMIC CHECKPOINT: Periodically create checkpoint === // This reduces recovery time by snapshotting current state - if updates_since_checkpoint >= checkpoint_interval { + if updates_since_checkpoint >= CHECKPOINT_INTERVAL { + // Ensure any pending writes are fsynced before checkpoint + if writes_since_fsync > 0 { + log_file.flush().await?; + log_file.sync_data().await?; + writes_since_fsync = 0; + last_fsync = std::time::Instant::now(); + } + logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await; let checkpoint_tmp = companies_path.with_extension("jsonl.tmp"); @@ -362,10 +398,30 @@ async fn build_companies_jsonl_streaming( tokio::task::yield_now().await; } } + + // Time-based fsync: Even if this company didn't result in a write, + // fsync any pending writes if enough time has passed + // This reduces data loss window during long Yahoo lookup operations + if writes_since_fsync > 0 && last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS { + log_file.flush().await?; + log_file.sync_data().await?; + writes_since_fsync = 0; + last_fsync = std::time::Instant::now(); + logger::log_info("Time-based fsync completed").await; + } + } + + // === FSYNC PENDING WRITES: Even if shutdown requested, save what we have === + if writes_since_fsync > 0 { + logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await; + log_file.flush().await?; + log_file.sync_data().await?; + logger::log_info("✓ Pending writes saved").await; } // === FINAL CHECKPOINT: Write complete final state === // This ensures we don't need to replay the log on next startup + // (Pending writes were already fsynced above) if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 { logger::log_info("Creating final checkpoint...").await; diff --git a/src/corporate/update_parallel.rs b/src/corporate/update_parallel.rs new file mode 100644 index 0000000..f6c340a --- /dev/null +++ b/src/corporate/update_parallel.rs @@ -0,0 +1,522 @@ +// src/corporate/update_parallel.rs +// PARALLELIZED VERSION of build_companies_jsonl_streaming +// +// Key improvements: +// - Processes multiple companies concurrently using the ChromeDriverPool +// - Maintains data safety with serialized log writes via channel +// - Respects pool size limits via semaphore +// - All fsync and checkpoint logic preserved + +use super::{types::*, yahoo::*}; +use crate::util::directories::DataPaths; +use crate::util::logger; +use crate::scraper::webdriver::ChromeDriverPool; + +use tokio::sync::mpsc; +use tokio::io::AsyncWriteExt; +use tokio::fs::OpenOptions; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use futures::stream::{FuturesUnordered, StreamExt}; + +/// Represents a write command to be serialized through the log writer +enum LogCommand { + Write(CompanyCrossPlatformInfo), + Checkpoint, + Shutdown, +} + +/// Result from processing a single company +struct CompanyProcessResult { + company: CompanyCrossPlatformInfo, + is_update: bool, +} + +/// Abort-safe incremental JSONL persistence with atomic checkpoints (PARALLELIZED) +/// +/// Implements the data_updating_rule.md specification with concurrent processing: +/// - Append-only JSONL log for all updates +/// - Batched fsync for performance (configurable batch size) +/// - Time-based fsync for safety (max 10 seconds without fsync) +/// - Atomic checkpoints via temp file + rename +/// - Crash recovery by loading checkpoint + replaying log +/// - Partial lines automatically ignored by .lines() iterator +/// - PARALLEL processing of companies using ChromeDriverPool +/// - Serialized log writes for data safety +/// +/// # Parallelization Strategy +/// +/// - Multiple companies processed concurrently (limited by pool size) +/// - Each company's Yahoo lookups happen in parallel +/// - Log writes are serialized through a channel +/// - Pool's semaphore naturally limits concurrency +/// - All fsync and checkpoint logic preserved +pub async fn build_companies_jsonl_streaming_parallel( + paths: &DataPaths, + pool: &Arc, + shutdown_flag: &Arc, +) -> anyhow::Result { + // Configuration constants + const CHECKPOINT_INTERVAL: usize = 50; + const FSYNC_BATCH_SIZE: usize = 10; + const FSYNC_INTERVAL_SECS: u64 = 10; + const CONCURRENCY_LIMIT: usize = 100; // Max companies processing at once + + let path = DataPaths::new(".")?; + let corporate_path = path.data_dir().join("corporate").join("by_name"); + let securities_path = corporate_path.join("common_stocks.json"); + + if !securities_path.exists() { + logger::log_warn("No common_stocks.json found").await; + return Ok(0); + } + + let content = tokio::fs::read_to_string(securities_path).await?; + let securities: HashMap = serde_json::from_str(&content)?; + + let companies_path = paths.data_dir().join("companies.jsonl"); + let log_path = paths.data_dir().join("companies_updates.log"); + + if let Some(parent) = companies_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + + // === RECOVERY PHASE: Load checkpoint + replay log === + let mut existing_companies: HashMap = HashMap::new(); + let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); + + if companies_path.exists() { + logger::log_info("Loading checkpoint from companies.jsonl...").await; + let existing_content = tokio::fs::read_to_string(&companies_path).await?; + + for line in existing_content.lines() { + if line.trim().is_empty() { + continue; + } + + match serde_json::from_str::(line) { + Ok(company) => { + processed_names.insert(company.name.clone()); + existing_companies.insert(company.name.clone(), company); + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; + } + } + } + logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; + } + + if log_path.exists() { + logger::log_info("Replaying update log...").await; + let log_content = tokio::fs::read_to_string(&log_path).await?; + let mut replayed = 0; + + for line in log_content.lines() { + if line.trim().is_empty() { + continue; + } + + match serde_json::from_str::(line) { + Ok(company) => { + processed_names.insert(company.name.clone()); + existing_companies.insert(company.name.clone(), company); + replayed += 1; + } + Err(e) => { + logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; + } + } + } + if replayed > 0 { + logger::log_info(&format!("Replayed {} updates from log", replayed)).await; + } + } + + // === SETUP LOG WRITER TASK === + // This task serializes all log writes to maintain data safety + let (write_tx, mut write_rx) = mpsc::channel::(1000); + + let log_file_init = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await?; + + let companies_path_clone = companies_path.clone(); + let log_path_clone = log_path.clone(); + let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); + + let writer_task = tokio::spawn(async move { + let mut log_file = log_file_init; // Move into the task + let mut writes_since_fsync = 0; + let mut last_fsync = std::time::Instant::now(); + let mut updates_since_checkpoint = 0; + let mut count = 0; + let mut new_count = 0; + let mut updated_count = 0; + + while let Some(cmd) = write_rx.recv().await { + match cmd { + LogCommand::Write(company) => { + // Write to log + let line = serde_json::to_string(&company).unwrap(); + if let Err(e) = log_file.write_all(line.as_bytes()).await { + logger::log_error(&format!("Failed to write to log: {}", e)).await; + break; + } + if let Err(e) = log_file.write_all(b"\n").await { + logger::log_error(&format!("Failed to write newline: {}", e)).await; + break; + } + + writes_since_fsync += 1; + updates_since_checkpoint += 1; + count += 1; + + // Update in-memory state + let mut existing_companies = existing_companies_writer.lock().await; + let is_update = existing_companies.contains_key(&company.name); + existing_companies.insert(company.name.clone(), company); + drop(existing_companies); + + if is_update { + updated_count += 1; + } else { + new_count += 1; + } + + // Batched + time-based fsync + let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE + || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; + + if should_fsync { + if let Err(e) = log_file.flush().await { + logger::log_error(&format!("Failed to flush: {}", e)).await; + break; + } + if let Err(e) = log_file.sync_data().await { + logger::log_error(&format!("Failed to fsync: {}", e)).await; + break; + } + writes_since_fsync = 0; + last_fsync = std::time::Instant::now(); + } + + // Periodic checkpoint + if updates_since_checkpoint >= CHECKPOINT_INTERVAL { + // Fsync pending writes before checkpoint + if writes_since_fsync > 0 { + let _ = log_file.flush().await; + let _ = log_file.sync_data().await; + writes_since_fsync = 0; + last_fsync = std::time::Instant::now(); + } + + logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await; + + let checkpoint_tmp = companies_path_clone.with_extension("jsonl.tmp"); + let mut checkpoint_file = match tokio::fs::File::create(&checkpoint_tmp).await { + Ok(f) => f, + Err(e) => { + logger::log_error(&format!("Failed to create checkpoint: {}", e)).await; + break; + } + }; + + let existing_companies = existing_companies_writer.lock().await; + for company in existing_companies.values() { + let line = serde_json::to_string(company).unwrap(); + let _ = checkpoint_file.write_all(line.as_bytes()).await; + let _ = checkpoint_file.write_all(b"\n").await; + } + drop(existing_companies); + + let _ = checkpoint_file.flush().await; + let _ = checkpoint_file.sync_all().await; + drop(checkpoint_file); + + let _ = tokio::fs::rename(&checkpoint_tmp, &companies_path_clone).await; + + // Clear log and reopen + drop(log_file); + let _ = tokio::fs::remove_file(&log_path_clone).await; + + // Reopen log file + match OpenOptions::new() + .create(true) + .append(true) + .open(&log_path_clone) + .await { + Ok(new_file) => { + log_file = new_file; + updates_since_checkpoint = 0; + logger::log_info("✓ Checkpoint created and log cleared").await; + } + Err(e) => { + logger::log_error(&format!("Failed to reopen log: {}", e)).await; + break; + } + } + } + + if count % 10 == 0 { + logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await; + } + }, + LogCommand::Checkpoint => { + // Force checkpoint - this is the final checkpoint before shutdown + if writes_since_fsync > 0 { + let _ = log_file.flush().await; + let _ = log_file.sync_data().await; + } + + logger::log_info("Creating final checkpoint...").await; + let checkpoint_tmp = companies_path_clone.with_extension("jsonl.tmp"); + if let Ok(mut checkpoint_file) = tokio::fs::File::create(&checkpoint_tmp).await { + let existing_companies = existing_companies_writer.lock().await; + for company in existing_companies.values() { + let line = serde_json::to_string(company).unwrap(); + let _ = checkpoint_file.write_all(line.as_bytes()).await; + let _ = checkpoint_file.write_all(b"\n").await; + } + drop(existing_companies); + + let _ = checkpoint_file.flush().await; + let _ = checkpoint_file.sync_all().await; + drop(checkpoint_file); + let _ = tokio::fs::rename(&checkpoint_tmp, &companies_path_clone).await; + + // Clean up log file after final checkpoint + drop(log_file); + let _ = tokio::fs::remove_file(&log_path_clone).await; + + logger::log_info("✓ Final checkpoint created").await; + } + // After final checkpoint, exit the loop + break; + }, + LogCommand::Shutdown => { + // Fsync any pending writes before exit + if writes_since_fsync > 0 { + logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await; + let _ = log_file.flush().await; + let _ = log_file.sync_data().await; + } + break; + } + } + } + + (count, new_count, updated_count) + }); + + // === PARALLEL COMPANY PROCESSING === + logger::log_info(&format!("Processing companies in parallel (max {} concurrent, pool size: {})", + CONCURRENCY_LIMIT, pool.get_number_of_instances())).await; + + let pool = pool.clone(); + let shutdown_flag = shutdown_flag.clone(); + + let mut processing_tasks = FuturesUnordered::new(); + let mut pending_companies = Vec::new(); + + // Collect companies to process + for (name, company_info) in securities.iter() { + if processed_names.contains(name) { + continue; + } + pending_companies.push((name.clone(), company_info.clone())); + } + + logger::log_info(&format!("Found {} companies to process", pending_companies.len())).await; + + // Process companies in chunks to limit memory usage + let chunk_size = CONCURRENCY_LIMIT; + let mut processed = 0; + + for chunk in pending_companies.chunks(chunk_size) { + if shutdown_flag.load(Ordering::SeqCst) { + break; + } + + // Launch tasks for this chunk + for (name, company_info) in chunk { + let name = name.clone(); + let company_info = company_info.clone(); + let pool = pool.clone(); + let shutdown_flag = shutdown_flag.clone(); + let existing_entry = existing_companies.get(&name).cloned(); + + let task = tokio::spawn(async move { + process_single_company( + name, + company_info, + existing_entry, + &pool, + &shutdown_flag + ).await + }); + + processing_tasks.push(task); + } + + // Wait for chunk to complete + while let Some(result) = processing_tasks.next().await { + match result { + Ok(Ok(Some(company_result))) => { + // Send to writer + if write_tx.send(LogCommand::Write(company_result.company)).await.is_err() { + logger::log_error("Writer task died, stopping processing").await; + break; + } + processed += 1; + } + Ok(Ok(None)) => { + // Company had no ISINs or was skipped + processed += 1; + } + Ok(Err(e)) => { + logger::log_warn(&format!("Company processing error: {}", e)).await; + processed += 1; + } + Err(e) => { + logger::log_error(&format!("Task panic: {}", e)).await; + processed += 1; + } + } + + if shutdown_flag.load(Ordering::SeqCst) { + break; + } + } + + if shutdown_flag.load(Ordering::SeqCst) { + break; + } + } + + // Signal writer to finish + let _ = write_tx.send(LogCommand::Shutdown).await; + drop(write_tx); + + // Wait for writer to finish + let (final_count, final_new, final_updated) = writer_task.await + .unwrap_or((0, 0, 0)); + + logger::log_info(&format!( + "Completed: {} total companies ({} new, {} updated)", + final_count, final_new, final_updated + )).await; + + Ok(final_count) +} + +/// Process a single company: fetch Yahoo data for its ISINs +async fn process_single_company( + name: String, + company_info: CompanyInfo, + existing_entry: Option, + pool: &Arc, + shutdown_flag: &Arc, +) -> anyhow::Result> { + let is_update = existing_entry.is_some(); + + let mut isin_tickers_map: HashMap> = + existing_entry + .as_ref() + .map(|e| e.isin_tickers_map.clone()) + .unwrap_or_default(); + + let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); + let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone()); + + // Collect unique ISIN-ticker pairs + let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); + + for figi_infos in company_info.securities.values() { + for figi_info in figi_infos { + if !figi_info.isin.is_empty() { + let tickers = unique_isin_ticker_pairs + .entry(figi_info.isin.clone()) + .or_insert_with(Vec::new); + + if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) { + tickers.push(figi_info.ticker.clone()); + } + } + } + } + + // Process each ISIN (these Yahoo lookups will happen in parallel across companies) + for (isin, figi_tickers) in unique_isin_ticker_pairs { + if shutdown_flag.load(Ordering::SeqCst) { + break; + } + + let tickers = isin_tickers_map + .entry(isin.clone()) + .or_insert_with(Vec::new); + + for figi_ticker in figi_tickers { + if !tickers.contains(&figi_ticker) { + tickers.push(figi_ticker); + } + } + + let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); + + if !has_yahoo_ticker && !shutdown_flag.load(Ordering::SeqCst) { + logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; + + match scrape_company_details_by_isin(pool, &isin).await { + Ok(Some(details)) => { + logger::log_info(&format!("✓ Found Yahoo ticker {} for ISIN {}", details.ticker, isin)).await; + + tickers.push(format!("YAHOO:{}", details.ticker)); + + if sector.is_none() && details.sector.is_some() { + sector = details.sector.clone(); + logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await; + } + + if exchange.is_none() && details.exchange.is_some() { + exchange = details.exchange.clone(); + logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await; + } + }, + Ok(None) => { + logger::log_warn(&format!("◯ No search results for ISIN {}", isin)).await; + tickers.push("YAHOO:NO_RESULTS".to_string()); + }, + Err(e) => { + if shutdown_flag.load(Ordering::SeqCst) { + break; + } + logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await; + } + } + } + } + + if shutdown_flag.load(Ordering::SeqCst) { + return Ok(None); + } + + if !isin_tickers_map.is_empty() { + let company_entry = CompanyCrossPlatformInfo { + name: name.clone(), + isin_tickers_map, + sector, + exchange, + }; + + Ok(Some(CompanyProcessResult { + company: company_entry, + is_update, + })) + } else { + Ok(None) + } +} \ No newline at end of file diff --git a/src/corporate/yahoo.rs b/src/corporate/yahoo.rs index 19648ec..f34c0b2 100644 --- a/src/corporate/yahoo.rs +++ b/src/corporate/yahoo.rs @@ -4,7 +4,7 @@ use crate::{scraper::webdriver::*, util::{directories::DataPaths}}; use event_backtest_engine::logger; use fantoccini::{Client, Locator}; use serde::{Deserialize, Serialize}; -use tokio::{time::{Duration as TokioDuration, sleep}}; +use tokio::time::{Duration as TokioDuration, sleep, timeout}; use std::{sync::Arc}; use anyhow::{anyhow, Result}; @@ -21,6 +21,16 @@ pub enum YahooTickerResult { AmbiguousResults, } +#[derive(Debug, Deserialize)] +pub struct ExtractionMetadata { + #[serde(rename = "selectedRowIndex")] + pub selected_row_index: usize, + #[serde(rename = "validFieldCount")] + pub valid_field_count: usize, + #[serde(rename = "totalRows")] + pub total_rows: usize, +} + #[derive(Debug, Deserialize)] pub struct ExtractionResult { status: String, @@ -29,6 +39,8 @@ pub struct ExtractionResult { exchange: Option, #[serde(default)] error_message: Option, + #[serde(default)] + metadata: Option, } impl YahooTickerResult { @@ -73,28 +85,99 @@ pub async fn extract_company_details( client: &Client, _isin: &str, ) -> Result> { + // Wait for page to load - look for either the table or the no-data element + let wait_result: Result> = timeout( + TokioDuration::from_secs(30), + async { + for _ in 0..60 { + let has_content: bool = client + .execute( + r#" + const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table'); + const noData = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn'); + return !!(table || noData); + "#, + vec![], + ) + .await + .map_err(|e| anyhow!("Execute error: {}", e))? + .as_bool() + .unwrap_or(false); + + if has_content { + return Ok(true); + } + + sleep(TokioDuration::from_millis(500)).await; + } + Ok(false) + }, + ) + .await + .map_err(|_| anyhow!("Timeout waiting for Yahoo Finance page to load")); + + match wait_result { + Err(_) => { + return Err(anyhow!("Timeout waiting for Yahoo Finance page to load")); + }, + Ok(Err(e)) => { + return Err(anyhow!("Error checking page content: {}", e)); + }, + Ok(Ok(false)) => { + logger::log_warn("Page content not found after waiting, attempting extraction anyway").await; + }, + Ok(Ok(true)) => { + logger::log_info("Page content detected, proceeding with extraction").await; + } + } + // Execute the JavaScript extraction script let result = client.execute(YAHOO_COMPANY_EXTRACTION_JS, vec![]).await?; + // Log the raw result for debugging + logger::log_info(&format!("JavaScript extraction raw result: {:?}", result)).await; + + // Check if result is null + if result.is_null() { + return Err(anyhow!("JavaScript returned null - page may not be fully loaded or script failed")); + } + // Parse the JSON result - let extraction: ExtractionResult = serde_json::from_value(result) - .map_err(|e| anyhow!("Failed to parse extraction result: {}", e))?; + let extraction: ExtractionResult = serde_json::from_value(result.clone()) + .map_err(|e| { + // Log the problematic result value for debugging + let result_str = serde_json::to_string_pretty(&result).unwrap_or_else(|_| format!("{:?}", result)); + anyhow!("Failed to parse extraction result: {}. Raw result: {}", e, result_str) + })?; match extraction.status.as_str() { "found" => { + // Ticker is guaranteed to be present when status is "found" + // Sector and exchange are optional if let Some(ticker) = extraction.ticker { + // Log metadata if available + if let Some(ref metadata) = extraction.metadata { + logger::log_info(&format!( + "Selected row {} with {} valid fields out of {} total rows", + metadata.selected_row_index, + metadata.valid_field_count, + metadata.total_rows + )).await; + } + Ok(Some(YahooCompanyDetails { ticker, sector: extraction.sector, exchange: extraction.exchange, })) } else { - Ok(None) + // This shouldn't happen if JS script is working correctly + Err(anyhow!("Status 'found' but no ticker present")) } }, "no_results" => Ok(None), - "not_found" => Ok(None), "error" => { + // Error status means ticker was not found or extraction failed let error_msg = extraction.error_message.unwrap_or_else(|| "Unknown error".to_string()); Err(anyhow!("JavaScript extraction error: {}", error_msg)) }, diff --git a/src/corporate/yahoo_company_extraction.js b/src/corporate/yahoo_company_extraction.js index af0a12e..082ce9f 100644 --- a/src/corporate/yahoo_company_extraction.js +++ b/src/corporate/yahoo_company_extraction.js @@ -1,61 +1,137 @@ // yahoo_company_extraction.js // JavaScript extraction script for Yahoo Finance company details // Used to extract ticker, sector, and exchange from Yahoo Finance search results +// Only ticker is mandatory - sector and exchange are optional fields -(function() { +// Example selectors: +// with results: +// document.querySelector("#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table") +// document.querySelector("#\\30 > td:nth-child(1) > span > div > a") +// document.querySelector("#\\30 > td:nth-child(2) > span > div") +// document.querySelector("#\\30 > td:nth-child(3) > span > div") +// document.querySelector("#\\30 > td:nth-child(4) > span > div > a") +// document.querySelector("#\\30 > td:nth-child(5) > span > div") +// document.querySelector("#\\30 > td:nth-child(6) > span > div") +// row with no result: +// document.querySelector("#\\32 > td:nth-child(4) > span > p") +// no results: +// document.querySelector("#main-content-wrapper > section > div.noData.yf-1omxedn") + +// Using a wrapper to ensure the result is properly captured +var extractionResult = (function() { try { - // Check for "No results found" message - const noDataElement = document.querySelector('.noData'); + // Check for "No results found" message using exact selector + const noDataElement = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn'); if (noDataElement) { return { status: 'no_results', ticker: null, sector: null, exchange: null }; } - // Find the results table - const table = document.querySelector('table.markets-table'); + // Find the results table using exact selector + const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table'); if (!table) { return { status: 'no_results', ticker: null, sector: null, exchange: null }; } - // Find the first row in tbody - const firstRow = table.querySelector('tbody tr'); - if (!firstRow) { + // Find all rows in tbody + const allRows = table.querySelectorAll('tbody tr'); + if (!allRows || allRows.length === 0) { return { status: 'no_results', ticker: null, sector: null, exchange: null }; } - // Extract ticker from first column (td:nth-child(1)) - const tickerCell = firstRow.querySelector('td:nth-child(1)'); - const ticker = tickerCell ? tickerCell.textContent.trim() : ''; - - if (!ticker) { - return { status: 'not_found', ticker: null, sector: null, exchange: null }; + // Helper function to safely extract text content + function extractText(element) { + if (!element) return ''; + const text = element.textContent.trim(); + return text; } - // Extract sector from column 4 (td:nth-child(4) > span > div > a) - const sectorCell = firstRow.querySelector('td:nth-child(4) span div a'); - let sector = sectorCell ? sectorCell.textContent.trim() : ''; - - // Normalize empty/invalid values to null - if (!sector || sector === '-' || sector === 'N/A') { - sector = null; + // Helper function to check if value is valid (not empty, not -, not N/A) + function isValidValue(value) { + if (!value) return false; + const normalized = value.trim().toLowerCase(); + return normalized !== '' && normalized !== '-' && normalized !== 'n/a'; } - // Extract exchange from column 6 (td:nth-child(6) > span) - const exchangeCell = firstRow.querySelector('td:nth-child(6) span'); - let exchange = exchangeCell ? exchangeCell.textContent.trim() : ''; - - // Normalize empty/invalid values to null - if (!exchange || exchange === '-' || exchange === 'N/A') { - exchange = null; + // Helper function to extract and normalize data from a row + function extractRowData(row) { + // Extract ticker from column 1 (td:nth-child(1) > span > div > a) + const tickerElement = row.querySelector('td:nth-child(1) > span > div > a') || + row.querySelector('td:nth-child(1)'); + const tickerRaw = extractText(tickerElement); + const ticker = isValidValue(tickerRaw) ? tickerRaw : null; + + // Extract sector from column 4 (td:nth-child(4) > span > div > a or td:nth-child(4) > span > div) + const sectorElement = row.querySelector('td:nth-child(4) > span > div > a') || + row.querySelector('td:nth-child(4) > span > div') || + row.querySelector('td:nth-child(4)'); + const sectorRaw = extractText(sectorElement); + const sector = isValidValue(sectorRaw) ? sectorRaw : null; + + // Extract exchange from column 6 (td:nth-child(6) > span > div) + const exchangeElement = row.querySelector('td:nth-child(6) > span > div') || + row.querySelector('td:nth-child(6)'); + const exchangeRaw = extractText(exchangeElement); + const exchange = isValidValue(exchangeRaw) ? exchangeRaw : null; + + return { ticker, sector, exchange }; } + // Helper function to count non-null fields (data completeness counter) + function countValidFields(data) { + let count = 0; + if (data.ticker) count++; + if (data.sector) count++; + if (data.exchange) count++; + return count; + } + + // Extract data from all rows and find the one with most complete data + let bestRow = null; + let maxFieldCount = -1; + let rowIndex = 0; + + for (const row of allRows) { + const data = extractRowData(row); + const fieldCount = countValidFields(data); + + // Select row with most valid data, or first row if tied + if (fieldCount > maxFieldCount) { + bestRow = data; + maxFieldCount = fieldCount; + bestRow.rowIndex = rowIndex; + bestRow.validFieldCount = fieldCount; + } + + rowIndex++; + } + + // Ticker is mandatory - return error status if not found + if (!bestRow || !bestRow.ticker) { + return { + status: 'error', + error_message: 'No ticker found in any row', + ticker: null, + sector: null, + exchange: null + }; + } + + // Return success with ticker (mandatory) and optional sector/exchange + // Include metadata about which row was selected and how many valid fields it had return { status: 'found', - ticker: ticker, - sector: sector, - exchange: exchange + ticker: bestRow.ticker, + sector: bestRow.sector, + exchange: bestRow.exchange, + metadata: { + selectedRowIndex: bestRow.rowIndex, + validFieldCount: bestRow.validFieldCount, + totalRows: allRows.length + } }; } catch (error) { + // Only catch unexpected errors during extraction return { status: 'error', error_message: error.toString(), @@ -64,4 +140,7 @@ exchange: null }; } -})(); \ No newline at end of file +})(); + +// Return the result explicitly +return extractionResult; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 7711c78..aeb49b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,11 +14,27 @@ use util::directories::DataPaths; use util::{logger, opnv}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::process::Command; #[tokio::main] async fn main() -> Result<()> { + let output = if cfg!(target_os = "windows") { + Command::new("cmd") + .args(["/C", "docker desktop start"]) + .output() + .expect("failed to execute process") + } else { + Command::new("sh") + .arg("-c") + .arg("echo hello") + .output() + .expect("failed to execute process") + }; + let _start_docker_desktop = output.stdout; + cleanup_all_proxy_containers().await.ok(); + let config = Config::load().map_err(|err| { eprintln!("Failed to load config: {}", err); err @@ -40,7 +56,7 @@ async fn main() -> Result<()> { // === Step 1: Fetch VPNBook configs === let proxy_pool: Option> = if config.enable_vpn_rotation { logger::log_info("VPN Rotation Enabled – Fetching latest VPNBook configs").await; - let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit(config.max_parallel_instances, None, config.max_tasks_per_instance).await?); + let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit(config.max_parallel_instances, None, 1).await?); let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?; logger::log_info(&format!("VPNBook credentials → User: {}", username)).await; diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index 5180aae..0c1093e 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -2,6 +2,9 @@ use anyhow::{anyhow, Context, Result}; use fantoccini::{Client, ClientBuilder}; +use rand::seq::{IndexedRandom, SliceRandom}; +use rand::rngs::ThreadRng; +use rand::Rng; // for the RNG trait use serde_json::{Map, Value}; use std::pin::Pin; use std::process::Stdio; @@ -363,6 +366,7 @@ impl ChromeInstance { } fn chrome_args(&self) -> Map { + let user_agent = Self::chrome_user_agent(); let mut args = vec![ "--headless=new".to_string(), "--disable-gpu".to_string(), @@ -372,14 +376,14 @@ impl ChromeInstance { "--disable-extensions".to_string(), "--disable-popup-blocking".to_string(), "--disable-notifications".to_string(), - "--disable-logging".to_string(), + //"--disable-logging".to_string(), "--disable-autofill".to_string(), "--disable-sync".to_string(), "--disable-default-apps".to_string(), "--disable-translate".to_string(), - "--window-size=1920,1080".to_string(), + //"--window-size=1920,1080".to_string(), "--disable-blink-features=AutomationControlled".to_string(), - "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36".to_string() + format!("--user-agent={}", user_agent), ]; if let Some(ref proxy) = self.proxy_url { let proxy = proxy.clone(); @@ -397,6 +401,18 @@ impl ChromeInstance { }); caps.as_object().cloned().unwrap() } + + + pub fn chrome_user_agent() -> &'static str { + static UAS: &[&str] = &[ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.91 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.6312.122 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.129 Safari/537.36", + ]; + + let mut rng = ThreadRng::default(); // non-deprecated RNG + *UAS.choose(&mut rng).unwrap() + } } fn parse_chromedriver_address(line: &str) -> Option {