From c01b47000f48b08d83041e2fd8302864420b661a Mon Sep 17 00:00:00 2001 From: donpat1to Date: Fri, 19 Dec 2025 16:58:22 +0100 Subject: [PATCH] removed serial data scraping for yahoo tickers --- src/corporate/update.rs | 417 ------------------------------- src/corporate/update_parallel.rs | 3 +- 2 files changed, 1 insertion(+), 419 deletions(-) diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 0c23ddf..892e477 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -93,423 +93,6 @@ pub async fn run_full_update( Ok(()) } -/// UPDATED: Serial version with validation (kept for compatibility/debugging) -/// -/// This is the non-parallel version that processes companies sequentially. -/// Updated with same validation and shutdown checks as parallel version. -/// -/// Use this for: -/// - Debugging issues with specific companies -/// - Environments where parallel processing isn't desired -/// - Testing validation logic without concurrency complexity -async fn build_companies_jsonl_streaming_serial( - paths: &DataPaths, - pool: &Arc, - shutdown_flag: &Arc, -) -> anyhow::Result { - // Configuration constants - const CHECKPOINT_INTERVAL: usize = 50; - const FSYNC_BATCH_SIZE: usize = 10; - const FSYNC_INTERVAL_SECS: u64 = 10; - - let path = DataPaths::new(".")?; - let corporate_path = path.data_dir().join("corporate").join("by_name"); - let securities_path = corporate_path.join("common_stocks.json"); - - if !securities_path.exists() { - logger::log_warn("No common_stocks.json found").await; - return Ok(0); - } - - let content = tokio::fs::read_to_string(securities_path).await?; - let securities: HashMap = serde_json::from_str(&content)?; - - let companies_path = paths.data_dir().join("companies.jsonl"); - let log_path = paths.data_dir().join("companies_updates.log"); - - if let Some(parent) = companies_path.parent() { - tokio::fs::create_dir_all(parent).await?; - } - - // === RECOVERY PHASE: Load checkpoint + replay log === - let mut existing_companies: HashMap = HashMap::new(); - let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); - - if companies_path.exists() { - logger::log_info("Loading checkpoint from companies.jsonl...").await; - let existing_content = tokio::fs::read_to_string(&companies_path).await?; - - for line in existing_content.lines() { - if line.trim().is_empty() { - continue; - } - - match serde_json::from_str::(line) { - Ok(company) => { - processed_names.insert(company.name.clone()); - existing_companies.insert(company.name.clone(), company); - } - Err(e) => { - logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; - } - } - } - logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; - } - - if log_path.exists() { - logger::log_info("Replaying update log...").await; - let log_content = tokio::fs::read_to_string(&log_path).await?; - let mut replayed = 0; - - for line in log_content.lines() { - if line.trim().is_empty() { - continue; - } - - match serde_json::from_str::(line) { - Ok(company) => { - processed_names.insert(company.name.clone()); - existing_companies.insert(company.name.clone(), company); - replayed += 1; - } - Err(e) => { - logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; - } - } - } - if replayed > 0 { - logger::log_info(&format!("Replayed {} updates from log", replayed)).await; - } - } - - // === OPEN LOG FILE === - use tokio::fs::OpenOptions; - use tokio::io::AsyncWriteExt; - - let mut log_file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path) - .await?; - - let mut writes_since_fsync = 0; - let mut last_fsync = std::time::Instant::now(); - let mut updates_since_checkpoint = 0; - let mut count = 0; - let mut new_count = 0; - let mut updated_count = 0; - - logger::log_info(&format!("Processing {} companies sequentially...", securities.len())).await; - - // === PROCESS COMPANIES SEQUENTIALLY === - for (name, company_info) in securities.clone() { - // Check shutdown before each company - if shutdown_flag.load(Ordering::SeqCst) { - logger::log_warn(&format!( - "Shutdown detected at company: {} (progress: {}/{})", - name, count, count + securities.len() - )).await; - break; - } - - let existing_entry = existing_companies.get(&name).cloned(); - let is_update = existing_entry.is_some(); - - // Process company with validation - match process_single_company_serial( - name.clone(), - company_info, - existing_entry, - pool, - shutdown_flag, - ).await { - Ok(Some(company_entry)) => { - // Write to log - let line = serde_json::to_string(&company_entry)?; - log_file.write_all(line.as_bytes()).await?; - log_file.write_all(b"\n").await?; - - writes_since_fsync += 1; - - // Batched + time-based fsync - let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE - || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; - - if should_fsync { - log_file.flush().await?; - log_file.sync_data().await?; - writes_since_fsync = 0; - last_fsync = std::time::Instant::now(); - } - - // Update in-memory state - processed_names.insert(name.clone()); - existing_companies.insert(name.clone(), company_entry); - - count += 1; - updates_since_checkpoint += 1; - - if is_update { - updated_count += 1; - } else { - new_count += 1; - } - - // Periodic checkpoint - if updates_since_checkpoint >= CHECKPOINT_INTERVAL { - if writes_since_fsync > 0 { - log_file.flush().await?; - log_file.sync_data().await?; - writes_since_fsync = 0; - last_fsync = std::time::Instant::now(); - } - - logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await; - - let checkpoint_tmp = companies_path.with_extension("jsonl.tmp"); - let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?; - - for company in existing_companies.values() { - let line = serde_json::to_string(company)?; - checkpoint_file.write_all(line.as_bytes()).await?; - checkpoint_file.write_all(b"\n").await?; - } - - checkpoint_file.flush().await?; - checkpoint_file.sync_all().await?; - drop(checkpoint_file); - - tokio::fs::rename(&checkpoint_tmp, &companies_path).await?; - - drop(log_file); - tokio::fs::remove_file(&log_path).await.ok(); - log_file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path) - .await?; - - updates_since_checkpoint = 0; - logger::log_info("✓ Checkpoint created and log cleared").await; - } - - if count % 10 == 0 { - logger::log_info(&format!( - "Progress: {} companies ({} new, {} updated)", - count, new_count, updated_count - )).await; - } - } - Ok(None) => { - // Company had no ISINs or was skipped - logger::log_info(&format!("Skipped company: {} (no ISINs)", name)).await; - } - Err(e) => { - logger::log_warn(&format!("Error processing company {}: {}", name, e)).await; - } - } - - // Time-based fsync - if writes_since_fsync > 0 && last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS { - log_file.flush().await?; - log_file.sync_data().await?; - writes_since_fsync = 0; - last_fsync = std::time::Instant::now(); - } - } - - // === FSYNC PENDING WRITES === - if writes_since_fsync > 0 { - logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await; - log_file.flush().await?; - log_file.sync_data().await?; - logger::log_info("✓ Pending writes saved").await; - } - - // === FINAL CHECKPOINT === - if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 { - logger::log_info("Creating final checkpoint...").await; - - let checkpoint_tmp = companies_path.with_extension("jsonl.tmp"); - let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?; - - for company in existing_companies.values() { - let line = serde_json::to_string(company)?; - checkpoint_file.write_all(line.as_bytes()).await?; - checkpoint_file.write_all(b"\n").await?; - } - - checkpoint_file.flush().await?; - checkpoint_file.sync_all().await?; - drop(checkpoint_file); - - tokio::fs::rename(&checkpoint_tmp, &companies_path).await?; - - drop(log_file); - tokio::fs::remove_file(&log_path).await.ok(); - - logger::log_info("✓ Final checkpoint created").await; - } - - logger::log_info(&format!( - "Completed: {} total companies ({} new, {} updated)", - count, new_count, updated_count - )).await; - - Ok(count) -} - -/// UPDATED: Process single company serially with validation -async fn process_single_company_serial( - name: String, - company_info: CompanyInfo, - existing_entry: Option, - pool: &Arc, - shutdown_flag: &Arc, -) -> anyhow::Result> { - // Check shutdown at start - if shutdown_flag.load(Ordering::SeqCst) { - return Ok(None); - } - - let mut isin_tickers_map: HashMap> = - existing_entry - .as_ref() - .map(|e| e.isin_tickers_map.clone()) - .unwrap_or_default(); - - let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); - let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone()); - - // Collect unique ISIN-ticker pairs - let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); - - for figi_infos in company_info.securities.values() { - for figi_info in figi_infos { - if !figi_info.isin.is_empty() { - let tickers = unique_isin_ticker_pairs - .entry(figi_info.isin.clone()) - .or_insert_with(Vec::new); - - if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) { - tickers.push(figi_info.ticker.clone()); - } - } - } - } - - // Process each ISIN with validation - for (isin, figi_tickers) in unique_isin_ticker_pairs { - // Check shutdown before each ISIN - if shutdown_flag.load(Ordering::SeqCst) { - return Ok(None); - } - - let tickers = isin_tickers_map - .entry(isin.clone()) - .or_insert_with(Vec::new); - - for figi_ticker in figi_tickers { - if !tickers.contains(&figi_ticker) { - tickers.push(figi_ticker); - } - } - - let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); - - if !has_yahoo_ticker { - logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; - - // Use validated scraping with retry - match scrape_with_retry_serial(pool, &isin, 3, shutdown_flag).await { - Ok(Some(details)) => { - logger::log_info(&format!( - "✓ Found Yahoo ticker {} for ISIN {} (company: {})", - details.ticker, isin, name - )).await; - - tickers.push(format!("YAHOO:{}", details.ticker)); - - if sector.is_none() && details.sector.is_some() { - sector = details.sector.clone(); - } - - if exchange.is_none() && details.exchange.is_some() { - exchange = details.exchange.clone(); - } - }, - Ok(None) => { - logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await; - tickers.push("YAHOO:NO_RESULTS".to_string()); - }, - Err(e) => { - if shutdown_flag.load(Ordering::SeqCst) { - return Ok(None); - } - logger::log_warn(&format!( - "✗ Yahoo lookup error for ISIN {} (company: {}): {}", - isin, name, e - )).await; - } - } - } - } - - // Final shutdown check - if shutdown_flag.load(Ordering::SeqCst) { - return Ok(None); - } - - if !isin_tickers_map.is_empty() { - Ok(Some(CompanyCrossPlatformInfo { - name, - isin_tickers_map, - sector, - exchange, - })) - } else { - Ok(None) - } -} - -/// UPDATED: Scrape with retry for serial processing -async fn scrape_with_retry_serial( - pool: &Arc, - isin: &str, - max_retries: u32, - shutdown_flag: &Arc, -) -> anyhow::Result> { - let mut retries = 0; - - loop { - if shutdown_flag.load(Ordering::SeqCst) { - return Err(anyhow::anyhow!("Aborted due to shutdown")); - } - - match scrape_company_details_by_isin(pool, isin, shutdown_flag).await { - Ok(result) => return Ok(result), - Err(e) => { - if retries >= max_retries { - return Err(e); - } - - let backoff_ms = 1000 * 2u64.pow(retries); - let jitter_ms = random_range(0, 500); - let total_delay = backoff_ms + jitter_ms; - - logger::log_warn(&format!( - "Retry {}/{} for ISIN {} after {}ms: {}", - retries + 1, max_retries, isin, total_delay, e - )).await; - - tokio::time::sleep(tokio::time::Duration::from_millis(total_delay)).await; - retries += 1; - } - } - } -} - async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); diff --git a/src/corporate/update_parallel.rs b/src/corporate/update_parallel.rs index 4dfbaea..dad0467 100644 --- a/src/corporate/update_parallel.rs +++ b/src/corporate/update_parallel.rs @@ -12,7 +12,6 @@ use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; -use rand::Rng; use tokio::sync::mpsc; use tokio::io::AsyncWriteExt; use tokio::fs::OpenOptions; @@ -22,7 +21,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use futures::stream::{FuturesUnordered, StreamExt}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; /// Represents a write command to be serialized through the log writer enum LogCommand {