// src/corporate/update_parallel.rs // PARALLELIZED VERSION of build_companies_jsonl_streaming // // Key improvements: // - Processes multiple companies concurrently using the ChromeDriverPool // - Maintains data safety with serialized log writes via channel // - Respects pool size limits via semaphore // - All fsync and checkpoint logic preserved use super::{types::*, yahoo::*}; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; use rand::Rng; use tokio::sync::mpsc; use tokio::io::AsyncWriteExt; use tokio::fs::OpenOptions; use tokio::time::sleep; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use futures::stream::{FuturesUnordered, StreamExt}; use anyhow::{anyhow, Context, Result}; /// Represents a write command to be serialized through the log writer enum LogCommand { Write(CompanyCrossPlatformInfo), Checkpoint, Shutdown, } /// Result from processing a single company struct CompanyProcessResult { company: CompanyCrossPlatformInfo, is_update: bool, } /// Abort-safe incremental JSONL persistence with atomic checkpoints (PARALLELIZED) /// /// Implements the data_updating_rule.md specification with concurrent processing: /// - Append-only JSONL log for all updates /// - Batched fsync for performance (configurable batch size) /// - Time-based fsync for safety (max 10 seconds without fsync) /// - Atomic checkpoints via temp file + rename /// - Crash recovery by loading checkpoint + replaying log /// - Partial lines automatically ignored by .lines() iterator /// - PARALLEL processing of companies using ChromeDriverPool /// - Serialized log writes for data safety /// /// # Parallelization Strategy /// /// - Multiple companies processed concurrently (limited by pool size) /// - Each company's Yahoo lookups happen in parallel /// - Log writes are serialized through a channel /// - Pool's semaphore naturally limits concurrency /// - All fsync and checkpoint logic preserved pub async fn build_companies_jsonl_streaming_parallel( paths: &DataPaths, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result { // Configuration constants const CHECKPOINT_INTERVAL: usize = 50; const FSYNC_BATCH_SIZE: usize = 10; const FSYNC_INTERVAL_SECS: u64 = 10; const CONCURRENCY_LIMIT: usize = 100; // Max companies processing at once let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); let securities_path = corporate_path.join("common_stocks.json"); if !securities_path.exists() { logger::log_warn("No common_stocks.json found").await; return Ok(0); } let content = tokio::fs::read_to_string(securities_path).await?; let securities: HashMap = serde_json::from_str(&content)?; let companies_path = paths.data_dir().join("companies.jsonl"); let log_path = paths.data_dir().join("companies_updates.log"); if let Some(parent) = companies_path.parent() { tokio::fs::create_dir_all(parent).await?; } // === RECOVERY PHASE: Load checkpoint + replay log === let mut existing_companies: HashMap = HashMap::new(); let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); if companies_path.exists() { logger::log_info("Loading checkpoint from companies.jsonl...").await; let existing_content = tokio::fs::read_to_string(&companies_path).await?; for line in existing_content.lines() { if line.trim().is_empty() { continue; } match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); } Err(e) => { logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; } } } logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; } if log_path.exists() { logger::log_info("Replaying update log...").await; let log_content = tokio::fs::read_to_string(&log_path).await?; let mut replayed = 0; for line in log_content.lines() { if line.trim().is_empty() { continue; } match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); replayed += 1; } Err(e) => { logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; } } } if replayed > 0 { logger::log_info(&format!("Replayed {} updates from log", replayed)).await; } } // === SETUP LOG WRITER TASK === // This task serializes all log writes to maintain data safety let (write_tx, mut write_rx) = mpsc::channel::(1000); let log_file_init = OpenOptions::new() .create(true) .append(true) .open(&log_path) .await?; let companies_path_clone = companies_path.clone(); let log_path_clone = log_path.clone(); let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); let writer_task = tokio::spawn(async move { let mut log_file = log_file_init; // Move into the task let mut writes_since_fsync = 0; let mut last_fsync = std::time::Instant::now(); let mut updates_since_checkpoint = 0; let mut count = 0; let mut new_count = 0; let mut updated_count = 0; while let Some(cmd) = write_rx.recv().await { match cmd { LogCommand::Write(company) => { // Write to log let line = serde_json::to_string(&company).unwrap(); if let Err(e) = log_file.write_all(line.as_bytes()).await { logger::log_error(&format!("Failed to write to log: {}", e)).await; break; } if let Err(e) = log_file.write_all(b"\n").await { logger::log_error(&format!("Failed to write newline: {}", e)).await; break; } writes_since_fsync += 1; updates_since_checkpoint += 1; count += 1; // Update in-memory state let mut existing_companies = existing_companies_writer.lock().await; let is_update = existing_companies.contains_key(&company.name); existing_companies.insert(company.name.clone(), company); drop(existing_companies); if is_update { updated_count += 1; } else { new_count += 1; } // Batched + time-based fsync let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; if should_fsync { if let Err(e) = log_file.flush().await { logger::log_error(&format!("Failed to flush: {}", e)).await; break; } if let Err(e) = log_file.sync_data().await { logger::log_error(&format!("Failed to fsync: {}", e)).await; break; } writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } // Periodic checkpoint if updates_since_checkpoint >= CHECKPOINT_INTERVAL { // Fsync pending writes before checkpoint if writes_since_fsync > 0 { let _ = log_file.flush().await; let _ = log_file.sync_data().await; writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await; let checkpoint_tmp = companies_path_clone.with_extension("jsonl.tmp"); let mut checkpoint_file = match tokio::fs::File::create(&checkpoint_tmp).await { Ok(f) => f, Err(e) => { logger::log_error(&format!("Failed to create checkpoint: {}", e)).await; break; } }; let existing_companies = existing_companies_writer.lock().await; for company in existing_companies.values() { let line = serde_json::to_string(company).unwrap(); let _ = checkpoint_file.write_all(line.as_bytes()).await; let _ = checkpoint_file.write_all(b"\n").await; } drop(existing_companies); let _ = checkpoint_file.flush().await; let _ = checkpoint_file.sync_all().await; drop(checkpoint_file); let _ = tokio::fs::rename(&checkpoint_tmp, &companies_path_clone).await; // Clear log and reopen drop(log_file); let _ = tokio::fs::remove_file(&log_path_clone).await; // Reopen log file match OpenOptions::new() .create(true) .append(true) .open(&log_path_clone) .await { Ok(new_file) => { log_file = new_file; updates_since_checkpoint = 0; logger::log_info("✓ Checkpoint created and log cleared").await; } Err(e) => { logger::log_error(&format!("Failed to reopen log: {}", e)).await; break; } } } if count % 10 == 0 { logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await; } }, LogCommand::Checkpoint => { // Force checkpoint - this is the final checkpoint before shutdown if writes_since_fsync > 0 { let _ = log_file.flush().await; let _ = log_file.sync_data().await; } logger::log_info("Creating final checkpoint...").await; let checkpoint_tmp = companies_path_clone.with_extension("jsonl.tmp"); if let Ok(mut checkpoint_file) = tokio::fs::File::create(&checkpoint_tmp).await { let existing_companies = existing_companies_writer.lock().await; for company in existing_companies.values() { let line = serde_json::to_string(company).unwrap(); let _ = checkpoint_file.write_all(line.as_bytes()).await; let _ = checkpoint_file.write_all(b"\n").await; } drop(existing_companies); let _ = checkpoint_file.flush().await; let _ = checkpoint_file.sync_all().await; drop(checkpoint_file); let _ = tokio::fs::rename(&checkpoint_tmp, &companies_path_clone).await; // Clean up log file after final checkpoint drop(log_file); let _ = tokio::fs::remove_file(&log_path_clone).await; logger::log_info("✓ Final checkpoint created").await; } // After final checkpoint, exit the loop break; }, LogCommand::Shutdown => { // Fsync any pending writes before exit if writes_since_fsync > 0 { logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await; let _ = log_file.flush().await; let _ = log_file.sync_data().await; } break; } } } (count, new_count, updated_count) }); // === PARALLEL COMPANY PROCESSING === logger::log_info(&format!("Processing companies in parallel (max {} concurrent, pool size: {})", CONCURRENCY_LIMIT, pool.get_number_of_instances())).await; let pool = pool.clone(); let shutdown_flag = shutdown_flag.clone(); let mut processing_tasks = FuturesUnordered::new(); let mut pending_companies = Vec::new(); // Collect companies to process for (name, company_info) in securities.iter() { if processed_names.contains(name) { continue; } pending_companies.push((name.clone(), company_info.clone())); } logger::log_info(&format!("Found {} companies to process", pending_companies.len())).await; // Process companies in chunks to limit memory usage let chunk_size = CONCURRENCY_LIMIT; let mut processed = 0; for chunk in pending_companies.chunks(chunk_size) { if shutdown_flag.load(Ordering::SeqCst) { break; } // Launch tasks for this chunk for (name, company_info) in chunk { let name = name.clone(); let company_info = company_info.clone(); let pool = pool.clone(); let shutdown_flag = shutdown_flag.clone(); let existing_entry = existing_companies.get(&name).cloned(); let task = tokio::spawn(async move { process_single_company( name, company_info, existing_entry, &pool, &shutdown_flag ).await }); processing_tasks.push(task); } // Wait for chunk to complete while let Some(result) = processing_tasks.next().await { match result { Ok(Ok(Some(company_result))) => { // Send to writer if write_tx.send(LogCommand::Write(company_result.company)).await.is_err() { logger::log_error("Writer task died, stopping processing").await; break; } processed += 1; } Ok(Ok(None)) => { // Company had no ISINs or was skipped processed += 1; } Ok(Err(e)) => { logger::log_warn(&format!("Company processing error: {}", e)).await; processed += 1; } Err(e) => { logger::log_error(&format!("Task panic: {}", e)).await; processed += 1; } } if shutdown_flag.load(Ordering::SeqCst) { break; } } if shutdown_flag.load(Ordering::SeqCst) { break; } } // Signal writer to finish let _ = write_tx.send(LogCommand::Shutdown).await; drop(write_tx); // Wait for writer to finish let (final_count, final_new, final_updated) = writer_task.await .unwrap_or((0, 0, 0)); logger::log_info(&format!( "Completed: {} total companies ({} new, {} updated)", final_count, final_new, final_updated )).await; Ok(final_count) } async fn scrape_with_retry( pool: &Arc, isin: &str, max_retries: u32, ) -> Result> { let mut retries = 0; loop { match scrape_company_details_by_isin(pool, isin).await { Ok(result) => return Ok(result), Err(e) => { if retries >= max_retries { return Err(e); } let backoff_ms = 1000 * 2u64.pow(retries); // 1s, 2s, 4s, 8s let jitter_ms = rand::rng().random_range(0..500); // +0-500ms Jitter let total_delay = backoff_ms + jitter_ms; logger::log_warn(&format!( "Retry {}/{} for ISIN {} after {}ms: {}", retries + 1, max_retries, isin, total_delay, e )).await; sleep(Duration::from_millis(total_delay)).await; retries += 1; } } } } /// Process a single company: fetch Yahoo data for its ISINs async fn process_single_company( name: String, company_info: CompanyInfo, existing_entry: Option, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result> { let is_update = existing_entry.is_some(); let mut isin_tickers_map: HashMap> = existing_entry .as_ref() .map(|e| e.isin_tickers_map.clone()) .unwrap_or_default(); let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone()); // Collect unique ISIN-ticker pairs let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); for figi_infos in company_info.securities.values() { for figi_info in figi_infos { if !figi_info.isin.is_empty() { let tickers = unique_isin_ticker_pairs .entry(figi_info.isin.clone()) .or_insert_with(Vec::new); if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) { tickers.push(figi_info.ticker.clone()); } } } } // Process each ISIN (these Yahoo lookups will happen in parallel across companies) for (isin, figi_tickers) in unique_isin_ticker_pairs { if shutdown_flag.load(Ordering::SeqCst) { break; } let tickers = isin_tickers_map .entry(isin.clone()) .or_insert_with(Vec::new); for figi_ticker in figi_tickers { if !tickers.contains(&figi_ticker) { tickers.push(figi_ticker); } } let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); if !has_yahoo_ticker && !shutdown_flag.load(Ordering::SeqCst) { logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; match scrape_with_retry(pool, &isin, 3).await { Ok(Some(details)) => { logger::log_info(&format!("✓ Found Yahoo ticker {} for ISIN {}", details.ticker, isin)).await; tickers.push(format!("YAHOO:{}", details.ticker)); if sector.is_none() && details.sector.is_some() { sector = details.sector.clone(); logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await; } if exchange.is_none() && details.exchange.is_some() { exchange = details.exchange.clone(); logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await; } }, Ok(None) => { logger::log_warn(&format!("◯ No search results for ISIN {}", isin)).await; tickers.push("YAHOO:NO_RESULTS".to_string()); }, Err(e) => { if shutdown_flag.load(Ordering::SeqCst) { break; } logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await; } } } } if shutdown_flag.load(Ordering::SeqCst) { return Ok(None); } if !isin_tickers_map.is_empty() { let company_entry = CompanyCrossPlatformInfo { name: name.clone(), isin_tickers_map, sector, exchange, }; Ok(Some(CompanyProcessResult { company: company_entry, is_update, })) } else { Ok(None) } }