added atomic writer action for ctr c abort

This commit is contained in:
2025-12-19 14:12:56 +01:00
parent cd91de253b
commit b366f366e6
26 changed files with 3317 additions and 666 deletions

View File

@@ -0,0 +1,346 @@
// src/corporate/atomic_writer.rs
//
// Atomic JSONL writer that prevents partial/corrupted results from being written
use anyhow::Result;
use serde::Serialize;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
/// Command to write or validate data
#[derive(Debug)]
pub enum WriteCommand<T> {
/// Stage a result for writing (held in memory until committed)
Stage { id: String, data: T },
/// Commit staged result to disk (atomic write)
Commit { id: String },
/// Rollback staged result (discard without writing)
Rollback { id: String },
/// Commit all pending staged results and flush
CommitAll,
/// Shutdown writer gracefully (only commits valid staged results)
Shutdown,
}
/// Result of a write operation
#[derive(Debug)]
pub struct WriteResult {
pub id: String,
pub success: bool,
pub error: Option<String>,
}
/// Atomic writer that prevents partial results from being written
pub struct AtomicJsonlWriter<T> {
file: File,
staged: HashMap<String, T>,
committed_count: usize,
rollback_count: usize,
}
impl<T: Serialize + Clone> AtomicJsonlWriter<T> {
pub async fn new(path: PathBuf) -> Result<Self> {
// Ensure parent directory exists
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await?;
crate::util::logger::log_info(&format!(
"Atomic writer initialized: {:?}",
path
)).await;
Ok(Self {
file,
staged: HashMap::new(),
committed_count: 0,
rollback_count: 0,
})
}
/// Stage data for writing (held in memory, not yet written)
pub async fn stage(&mut self, id: String, data: T) {
crate::util::logger::log_info(&format!(
"Staging result for: {} (total staged: {})",
id,
self.staged.len() + 1
)).await;
self.staged.insert(id, data);
}
/// Commit a staged result to disk (atomic write)
pub async fn commit(&mut self, id: &str) -> Result<()> {
if let Some(data) = self.staged.remove(id) {
// Serialize to JSON
let json_line = serde_json::to_string(&data)?;
// Write atomically (single syscall)
self.file.write_all(json_line.as_bytes()).await?;
self.file.write_all(b"\n").await?;
self.file.flush().await?;
self.committed_count += 1;
crate::util::logger::log_info(&format!(
"✓ Committed result for: {} (total committed: {})",
id, self.committed_count
)).await;
Ok(())
} else {
Err(anyhow::anyhow!("No staged result found for id: {}", id))
}
}
/// Rollback a staged result (discard without writing)
pub async fn rollback(&mut self, id: &str) {
if self.staged.remove(id).is_some() {
self.rollback_count += 1;
crate::util::logger::log_warn(&format!(
"⚠ Rolled back result for: {} (total rollbacks: {})",
id, self.rollback_count
)).await;
}
}
/// Commit all staged results
pub async fn commit_all(&mut self) -> Result<usize> {
let ids: Vec<String> = self.staged.keys().cloned().collect();
let mut committed = 0;
for id in ids {
if let Ok(()) = self.commit(&id).await {
committed += 1;
}
}
Ok(committed)
}
/// Rollback all staged results (discard everything)
pub async fn rollback_all(&mut self) -> usize {
let count = self.staged.len();
self.staged.clear();
self.rollback_count += count;
crate::util::logger::log_warn(&format!(
"⚠ Rolled back all {} staged results",
count
)).await;
count
}
/// Get statistics
pub fn stats(&self) -> WriterStats {
WriterStats {
staged_count: self.staged.len(),
committed_count: self.committed_count,
rollback_count: self.rollback_count,
}
}
}
#[derive(Debug, Clone)]
pub struct WriterStats {
pub staged_count: usize,
pub committed_count: usize,
pub rollback_count: usize,
}
/// Managed writer service that runs in its own task
pub struct AtomicWriterService<T> {
rx: mpsc::UnboundedReceiver<WriteCommand<T>>,
writer: AtomicJsonlWriter<T>,
shutdown_flag: Arc<AtomicBool>,
}
impl<T: Serialize + Clone> AtomicWriterService<T> {
pub async fn new(
path: PathBuf,
rx: mpsc::UnboundedReceiver<WriteCommand<T>>,
shutdown_flag: Arc<AtomicBool>,
) -> Result<Self> {
let writer = AtomicJsonlWriter::new(path).await?;
Ok(Self {
rx,
writer,
shutdown_flag,
})
}
/// Main service loop
pub async fn run(mut self) {
crate::util::logger::log_info("Atomic writer service started").await;
while let Some(cmd) = self.rx.recv().await {
// Check for shutdown flag
if self.shutdown_flag.load(Ordering::SeqCst) {
crate::util::logger::log_warn(
"Shutdown detected - processing only Commit/Rollback commands"
).await;
// Only process commit/rollback commands during shutdown
match cmd {
WriteCommand::Commit { id } => {
if let Err(e) = self.writer.commit(&id).await {
crate::util::logger::log_error(&format!(
"Failed to commit {}: {}",
id, e
)).await;
}
}
WriteCommand::Rollback { id } => {
self.writer.rollback(&id).await;
}
WriteCommand::CommitAll => {
match self.writer.commit_all().await {
Ok(count) => {
crate::util::logger::log_info(&format!(
"Committed {} results during shutdown",
count
)).await;
}
Err(e) => {
crate::util::logger::log_error(&format!(
"Failed to commit all: {}",
e
)).await;
}
}
}
WriteCommand::Shutdown => break,
_ => {
// Ignore Stage commands during shutdown
crate::util::logger::log_warn(
"Ignoring new Stage command during shutdown"
).await;
}
}
continue;
}
// Normal operation
match cmd {
WriteCommand::Stage { id, data } => {
self.writer.stage(id, data).await;
}
WriteCommand::Commit { id } => {
if let Err(e) = self.writer.commit(&id).await {
crate::util::logger::log_error(&format!(
"Failed to commit {}: {}",
id, e
)).await;
}
}
WriteCommand::Rollback { id } => {
self.writer.rollback(&id).await;
}
WriteCommand::CommitAll => {
match self.writer.commit_all().await {
Ok(count) => {
crate::util::logger::log_info(&format!(
"Committed all {} staged results",
count
)).await;
}
Err(e) => {
crate::util::logger::log_error(&format!(
"Failed to commit all: {}",
e
)).await;
}
}
}
WriteCommand::Shutdown => break,
}
}
// Final shutdown - rollback any remaining staged items
let stats = self.writer.stats();
if stats.staged_count > 0 {
crate::util::logger::log_warn(&format!(
"⚠ Shutdown with {} uncommitted results - rolling back",
stats.staged_count
)).await;
self.writer.rollback_all().await;
}
crate::util::logger::log_info(&format!(
"Atomic writer service stopped. Final stats: {} committed, {} rolled back",
stats.committed_count,
stats.rollback_count
)).await;
}
}
/// Handle for sending write commands
#[derive(Clone)]
pub struct AtomicWriterHandle<T> {
tx: mpsc::UnboundedSender<WriteCommand<T>>,
}
impl<T> AtomicWriterHandle<T> {
pub fn new(tx: mpsc::UnboundedSender<WriteCommand<T>>) -> Self {
Self { tx }
}
/// Stage data for writing (does not write immediately)
pub fn stage(&self, id: String, data: T) {
let _ = self.tx.send(WriteCommand::Stage { id, data });
}
/// Commit staged data to disk
pub fn commit(&self, id: String) {
let _ = self.tx.send(WriteCommand::Commit { id });
}
/// Rollback staged data (discard)
pub fn rollback(&self, id: String) {
let _ = self.tx.send(WriteCommand::Rollback { id });
}
/// Commit all staged data
pub fn commit_all(&self) {
let _ = self.tx.send(WriteCommand::CommitAll);
}
/// Shutdown writer gracefully
pub fn shutdown(&self) {
let _ = self.tx.send(WriteCommand::Shutdown);
}
}
/// Create atomic writer service
pub async fn create_atomic_writer<T: Serialize + Clone + Send + 'static>(
path: PathBuf,
shutdown_flag: Arc<AtomicBool>,
) -> Result<(AtomicWriterHandle<T>, tokio::task::JoinHandle<()>)> {
let (tx, rx) = mpsc::unbounded_channel();
let service = AtomicWriterService::new(path, rx, shutdown_flag).await?;
let handle = tokio::spawn(async move {
service.run().await;
});
Ok((AtomicWriterHandle::new(tx), handle))
}

View File

@@ -2,6 +2,8 @@
use super::types::*;
use chrono::{Local, NaiveDate};
use std::collections::{HashMap, HashSet};
use rand::rngs::StdRng;
use rand::prelude::{Rng, SeedableRng, IndexedRandom};
pub fn event_key(e: &CompanyEvent) -> String {
format!("{}|{}|{}", e.ticker, e.date, e.time)
@@ -67,4 +69,16 @@ pub fn parse_yahoo_date(s: &str) -> anyhow::Result<NaiveDate> {
NaiveDate::parse_from_str(s, "%B %d, %Y")
.or_else(|_| NaiveDate::parse_from_str(s, "%b %d, %Y"))
.map_err(|_| anyhow::anyhow!("Bad date: {s}"))
}
/// Send-safe random range
pub fn random_range(min: u64, max: u64) -> u64 {
let mut rng = StdRng::from_rng(&mut rand::rng());
rng.gen_range(min..max)
}
/// Send-safe random choice
pub fn choose_random<T: Clone>(items: &[T]) -> T {
let mut rng = StdRng::from_rng(&mut rand::rng());
items.choose(&mut rng).unwrap().clone()
}

View File

@@ -9,6 +9,7 @@ pub mod fx;
pub mod openfigi;
pub mod yahoo;
pub mod update_parallel;
pub mod page_validation;
pub mod atomic_writer;
pub use update::run_full_update;

View File

@@ -0,0 +1,180 @@
// src/corporate/page_validation.rs
//
// Utilities to ensure page state is correct before extraction
use anyhow::{anyhow, Result};
use fantoccini::Client;
use tokio::time::{sleep, Duration};
/// Validates that the browser navigated to the expected URL
///
/// This prevents extracting data from a stale page when navigation fails silently
pub async fn verify_navigation(
client: &Client,
expected_url_fragment: &str,
max_attempts: u32,
) -> Result<()> {
for attempt in 1..=max_attempts {
let current_url = client.current_url().await?;
let current = current_url.as_str();
if current.contains(expected_url_fragment) {
crate::util::logger::log_info(&format!(
"✓ Navigation verified: {} (attempt {})",
current, attempt
)).await;
return Ok(());
}
if attempt < max_attempts {
crate::util::logger::log_warn(&format!(
"Navigation mismatch (attempt {}): expected '{}', got '{}'. Retrying...",
attempt, expected_url_fragment, current
)).await;
sleep(Duration::from_millis(500)).await;
}
}
let current_url = client.current_url().await?;
Err(anyhow!(
"Navigation verification failed: expected URL containing '{}', but got '{}'",
expected_url_fragment,
current_url.as_str()
))
}
/// Clears browser state by navigating to a blank page
///
/// Use this when a navigation fails or times out to ensure clean slate
pub async fn clear_browser_state(client: &Client) -> Result<()> {
crate::util::logger::log_info("Clearing browser state with about:blank").await;
// Navigate to blank page to clear any stale content
client.goto("about:blank").await?;
// Brief wait to ensure page clears
sleep(Duration::from_millis(200)).await;
Ok(())
}
/// Validates that expected content exists on the page before extraction
///
/// This adds an extra safety check that the page actually loaded
pub async fn verify_page_content(
client: &Client,
content_checks: Vec<ContentCheck>,
) -> Result<()> {
for check in content_checks {
match check {
ContentCheck::ElementExists(selector) => {
let exists: bool = client
.execute(
&format!(
"return !!document.querySelector('{}');",
selector.replace("'", "\\'")
),
vec![],
)
.await?
.as_bool()
.unwrap_or(false);
if !exists {
return Err(anyhow!(
"Expected element '{}' not found on page",
selector
));
}
}
ContentCheck::TextContains(text) => {
let page_text: String = client
.execute("return document.body.innerText;", vec![])
.await?
.as_str()
.unwrap_or("")
.to_string();
if !page_text.contains(&text) {
return Err(anyhow!(
"Expected text '{}' not found on page",
text
));
}
}
}
}
Ok(())
}
#[derive(Debug, Clone)]
pub enum ContentCheck {
/// Verify that a CSS selector exists
ElementExists(String),
/// Verify that page body contains text
TextContains(String),
}
/// Safe navigation wrapper that validates and clears state on failure
pub async fn navigate_with_validation(
client: &Client,
url: &str,
expected_url_fragment: &str,
timeout_secs: u64,
) -> Result<()> {
use tokio::time::timeout;
// Attempt navigation with timeout
let nav_result = timeout(
Duration::from_secs(timeout_secs),
client.goto(url)
).await;
match nav_result {
Ok(Ok(_)) => {
// Navigation succeeded, verify we're on correct page
verify_navigation(client, expected_url_fragment, 3).await?;
Ok(())
}
Ok(Err(e)) => {
// Navigation failed - clear state before returning error
crate::util::logger::log_error(&format!(
"Navigation failed: {}. Clearing browser state...",
e
)).await;
clear_browser_state(client).await.ok(); // Best effort
Err(anyhow!("Navigation failed: {}", e))
}
Err(_) => {
// Navigation timed out - clear state before returning error
crate::util::logger::log_error(&format!(
"Navigation timeout after {}s. Clearing browser state...",
timeout_secs
)).await;
clear_browser_state(client).await.ok(); // Best effort
Err(anyhow!("Navigation timeout"))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_content_check_variants() {
let check1 = ContentCheck::ElementExists("table".to_string());
let check2 = ContentCheck::TextContains("Yahoo Finance".to_string());
match check1 {
ContentCheck::ElementExists(sel) => assert_eq!(sel, "table"),
_ => panic!("Wrong variant"),
}
match check2 {
ContentCheck::TextContains(text) => assert_eq!(text, "Yahoo Finance"),
_ => panic!("Wrong variant"),
}
}
}

View File

@@ -1,4 +1,4 @@
// src/corporate/update.rs - ABORT-SAFE VERSION WITH JSONL LOG
// src/corporate/update.rs - UPDATED WITH DATA INTEGRITY FIXES
use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*};
use crate::config::Config;
use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel;
@@ -11,12 +11,13 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
/// UPDATED: Main corporate update entry point with shutdown awareness
pub async fn run_full_update(
_config: &Config,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<()> {
logger::log_info("=== Corporate Update (STREAMING MODE) ===").await;
logger::log_info("=== Corporate Update (STREAMING MODE WITH DATA INTEGRITY) ===").await;
let paths = DataPaths::new(".")?;
@@ -33,6 +34,7 @@ pub async fn run_full_update(
};
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after GLEIF download").await;
return Ok(());
}
@@ -41,6 +43,7 @@ pub async fn run_full_update(
logger::log_info(" ✓ OpenFIGI metadata loaded").await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after OpenFIGI load").await;
return Ok(());
}
@@ -54,6 +57,7 @@ pub async fn run_full_update(
}
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after LEI-FIGI mapping").await;
return Ok(());
}
@@ -69,10 +73,11 @@ pub async fn run_full_update(
}
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after securities map build").await;
return Ok(());
}
logger::log_info("Step 5: Building companies.jsonl (streaming with abort-safe persistence)...").await;
logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await;
let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
@@ -80,40 +85,32 @@ pub async fn run_full_update(
logger::log_info("Step 6: Processing events (using index)...").await;
let _event_index = build_event_index(&paths).await?;
logger::log_info(" ✓ Event index built").await;
} else {
logger::log_warn("Shutdown detected, skipping event index build").await;
}
logger::log_info("✓ Corporate update complete").await;
Ok(())
}
/// Abort-safe incremental JSONL persistence with atomic checkpoints
/// UPDATED: Serial version with validation (kept for compatibility/debugging)
///
/// Implements the data_updating_rule.md specification:
/// - Append-only JSONL log for all updates
/// - Batched fsync for performance (configurable batch size)
/// - Time-based fsync for safety (max 10 seconds without fsync)
/// - Atomic checkpoints via temp file + rename
/// - Crash recovery by loading checkpoint + replaying log
/// - Partial lines automatically ignored by .lines() iterator
/// This is the non-parallel version that processes companies sequentially.
/// Updated with same validation and shutdown checks as parallel version.
///
/// # Error Handling & Crash Safety
///
/// If any write or fsync fails:
/// - Function returns error immediately
/// - Partial line may be in OS buffer but not fsynced
/// - On next startup, .lines() will either:
/// a) Skip partial line (if no \n written)
/// b) Fail to parse malformed JSON (logged and skipped)
/// - No data corruption, at most last batch entries lost
async fn build_companies_jsonl_streaming(
/// Use this for:
/// - Debugging issues with specific companies
/// - Environments where parallel processing isn't desired
/// - Testing validation logic without concurrency complexity
async fn build_companies_jsonl_streaming_serial(
paths: &DataPaths,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<usize> {
// Configuration constants
const CHECKPOINT_INTERVAL: usize = 50; // Create checkpoint every N updates
const FSYNC_BATCH_SIZE: usize = 10; // fsync every N writes for performance
const FSYNC_INTERVAL_SECS: u64 = 10; // Also fsync every N seconds for safety
const CHECKPOINT_INTERVAL: usize = 50;
const FSYNC_BATCH_SIZE: usize = 10;
const FSYNC_INTERVAL_SECS: u64 = 10;
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
@@ -134,7 +131,7 @@ async fn build_companies_jsonl_streaming(
tokio::fs::create_dir_all(parent).await?;
}
// === RECOVERY PHASE 1: Load last checkpoint ===
// === RECOVERY PHASE: Load checkpoint + replay log ===
let mut existing_companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new();
let mut processed_names: std::collections::HashSet<String> = std::collections::HashSet::new();
@@ -142,8 +139,6 @@ async fn build_companies_jsonl_streaming(
logger::log_info("Loading checkpoint from companies.jsonl...").await;
let existing_content = tokio::fs::read_to_string(&companies_path).await?;
// Note: .lines() only returns complete lines terminated with \n
// Partial lines (incomplete writes from crashes) are automatically skipped
for line in existing_content.lines() {
if line.trim().is_empty() {
continue;
@@ -155,7 +150,6 @@ async fn build_companies_jsonl_streaming(
existing_companies.insert(company.name.clone(), company);
}
Err(e) => {
// This catches both malformed JSON and partial lines
logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await;
}
}
@@ -163,14 +157,11 @@ async fn build_companies_jsonl_streaming(
logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await;
}
// === RECOVERY PHASE 2: Replay log after checkpoint ===
if log_path.exists() {
logger::log_info("Replaying update log...").await;
let log_content = tokio::fs::read_to_string(&log_path).await?;
let mut replayed = 0;
// Note: .lines() only returns complete lines terminated with \n
// Partial lines from crashes are automatically skipped
for line in log_content.lines() {
if line.trim().is_empty() {
continue;
@@ -183,7 +174,6 @@ async fn build_companies_jsonl_streaming(
replayed += 1;
}
Err(e) => {
// This catches both malformed JSON and partial lines
logger::log_warn(&format!("Skipping invalid log line: {}", e)).await;
}
}
@@ -193,225 +183,143 @@ async fn build_companies_jsonl_streaming(
}
}
// === APPEND-ONLY LOG: Open in append mode with O_APPEND semantics ===
// === OPEN LOG FILE ===
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
let mut log_file = OpenOptions::new()
.create(true)
.append(true) // O_APPEND - atomic append operations
.append(true)
.open(&log_path)
.await?;
let mut count = existing_companies.len();
let mut updated_count = 0;
let mut new_count = 0;
let mut updates_since_checkpoint = 0;
// Batched fsync tracking for performance
let mut writes_since_fsync = 0;
let mut last_fsync = std::time::Instant::now();
let mut updates_since_checkpoint = 0;
let mut count = 0;
let mut new_count = 0;
let mut updated_count = 0;
use tokio::io::AsyncWriteExt;
logger::log_info(&format!("Processing {} companies sequentially...", securities.len())).await;
for (name, company_info) in securities.iter() {
// === PROCESS COMPANIES SEQUENTIALLY ===
for (name, company_info) in securities.clone() {
// Check shutdown before each company
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Shutdown requested - stopping company processing").await;
logger::log_warn(&format!(
"Shutdown detected at company: {} (progress: {}/{})",
name, count, count + securities.len()
)).await;
break;
}
// Skip if already processed (from checkpoint or log replay)
if processed_names.contains(name) {
continue;
}
let existing_entry = existing_companies.get(name).cloned();
let existing_entry = existing_companies.get(&name).cloned();
let is_update = existing_entry.is_some();
let mut isin_tickers_map: HashMap<String, Vec<String>> =
existing_entry
.as_ref()
.map(|e| e.isin_tickers_map.clone())
.unwrap_or_default();
let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone());
let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone());
let mut unique_isin_ticker_pairs: HashMap<String, Vec<String>> = HashMap::new();
for figi_infos in company_info.securities.values() {
for figi_info in figi_infos {
if !figi_info.isin.is_empty() {
let tickers = unique_isin_ticker_pairs
.entry(figi_info.isin.clone())
.or_insert_with(Vec::new);
if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) {
tickers.push(figi_info.ticker.clone());
}
}
}
}
for (isin, figi_tickers) in unique_isin_ticker_pairs {
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
let tickers = isin_tickers_map
.entry(isin.clone())
.or_insert_with(Vec::new);
for figi_ticker in figi_tickers {
if !tickers.contains(&figi_ticker) {
tickers.push(figi_ticker);
}
}
let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:"));
if !has_yahoo_ticker && !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
// Process company with validation
match process_single_company_serial(
name.clone(),
company_info,
existing_entry,
pool,
shutdown_flag,
).await {
Ok(Some(company_entry)) => {
// Write to log
let line = serde_json::to_string(&company_entry)?;
log_file.write_all(line.as_bytes()).await?;
log_file.write_all(b"\n").await?;
match scrape_company_details_by_isin(pool, &isin).await {
Ok(Some(details)) => {
logger::log_info(&format!("✓ Found Yahoo ticker {} for ISIN {}", details.ticker, isin)).await;
tickers.push(format!("YAHOO:{}", details.ticker));
if sector.is_none() && details.sector.is_some() {
sector = details.sector.clone();
logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await;
}
if exchange.is_none() && details.exchange.is_some() {
exchange = details.exchange.clone();
logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await;
}
},
Ok(None) => {
logger::log_warn(&format!("◯ No search results for ISIN {}", isin)).await;
tickers.push("YAHOO:NO_RESULTS".to_string());
},
Err(e) => {
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await;
}
}
}
}
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
if !isin_tickers_map.is_empty() {
let company_entry = CompanyCrossPlatformInfo {
name: name.clone(),
isin_tickers_map,
sector,
exchange,
};
// === APPEND-ONLY: Write single-line JSON with batched fsync ===
// Write guarantees the line is either fully written or not at all
let line = serde_json::to_string(&company_entry)?;
log_file.write_all(line.as_bytes()).await?;
log_file.write_all(b"\n").await?;
writes_since_fsync += 1;
// Batched fsync for performance + time-based fsync for safety
// fsync if: batch size reached OR time interval exceeded
let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS;
if should_fsync {
log_file.flush().await?;
// Critical: fsync to ensure durability before considering writes successful
// This prevents data loss on power failure or kernel panic
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
// Update in-memory state ONLY after write (fsync happens in batches)
// This is safe because we fsync before checkpoints and at end of processing
processed_names.insert(name.clone());
existing_companies.insert(name.clone(), company_entry);
count += 1;
updates_since_checkpoint += 1;
if is_update {
updated_count += 1;
} else {
new_count += 1;
}
// === ATOMIC CHECKPOINT: Periodically create checkpoint ===
// This reduces recovery time by snapshotting current state
if updates_since_checkpoint >= CHECKPOINT_INTERVAL {
// Ensure any pending writes are fsynced before checkpoint
if writes_since_fsync > 0 {
writes_since_fsync += 1;
// Batched + time-based fsync
let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS;
if should_fsync {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await;
// Update in-memory state
processed_names.insert(name.clone());
existing_companies.insert(name.clone(), company_entry);
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
count += 1;
updates_since_checkpoint += 1;
// Write all current state to temporary checkpoint file
for company in existing_companies.values() {
let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
if is_update {
updated_count += 1;
} else {
new_count += 1;
}
checkpoint_file.flush().await?;
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
// Periodic checkpoint
if updates_since_checkpoint >= CHECKPOINT_INTERVAL {
if writes_since_fsync > 0 {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await;
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
for company in existing_companies.values() {
let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
}
checkpoint_file.flush().await?;
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.await?;
updates_since_checkpoint = 0;
logger::log_info("✓ Checkpoint created and log cleared").await;
}
// Atomic rename - this is the commit point
// After this succeeds, the checkpoint is visible
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
// Clear log after successful checkpoint
// Any entries before this point are now captured in the checkpoint
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.await?;
updates_since_checkpoint = 0;
logger::log_info("✓ Checkpoint created and log cleared").await;
if count % 10 == 0 {
logger::log_info(&format!(
"Progress: {} companies ({} new, {} updated)",
count, new_count, updated_count
)).await;
}
}
if count % 10 == 0 {
logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await;
tokio::task::yield_now().await;
Ok(None) => {
// Company had no ISINs or was skipped
logger::log_info(&format!("Skipped company: {} (no ISINs)", name)).await;
}
Err(e) => {
logger::log_warn(&format!("Error processing company {}: {}", name, e)).await;
}
}
// Time-based fsync: Even if this company didn't result in a write,
// fsync any pending writes if enough time has passed
// This reduces data loss window during long Yahoo lookup operations
// Time-based fsync
if writes_since_fsync > 0 && last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
logger::log_info("Time-based fsync completed").await;
}
}
// === FSYNC PENDING WRITES: Even if shutdown requested, save what we have ===
// === FSYNC PENDING WRITES ===
if writes_since_fsync > 0 {
logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await;
log_file.flush().await?;
@@ -419,9 +327,7 @@ async fn build_companies_jsonl_streaming(
logger::log_info("✓ Pending writes saved").await;
}
// === FINAL CHECKPOINT: Write complete final state ===
// This ensures we don't need to replay the log on next startup
// (Pending writes were already fsynced above)
// === FINAL CHECKPOINT ===
if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 {
logger::log_info("Creating final checkpoint...").await;
@@ -438,21 +344,172 @@ async fn build_companies_jsonl_streaming(
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
// Atomic rename makes final checkpoint visible
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
// Clean up log
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
logger::log_info("✓ Final checkpoint created").await;
}
logger::log_info(&format!("Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count)).await;
logger::log_info(&format!(
"Completed: {} total companies ({} new, {} updated)",
count, new_count, updated_count
)).await;
Ok(count)
}
/// UPDATED: Process single company serially with validation
async fn process_single_company_serial(
name: String,
company_info: CompanyInfo,
existing_entry: Option<CompanyCrossPlatformInfo>,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Option<CompanyCrossPlatformInfo>> {
// Check shutdown at start
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
let mut isin_tickers_map: HashMap<String, Vec<String>> =
existing_entry
.as_ref()
.map(|e| e.isin_tickers_map.clone())
.unwrap_or_default();
let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone());
let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone());
// Collect unique ISIN-ticker pairs
let mut unique_isin_ticker_pairs: HashMap<String, Vec<String>> = HashMap::new();
for figi_infos in company_info.securities.values() {
for figi_info in figi_infos {
if !figi_info.isin.is_empty() {
let tickers = unique_isin_ticker_pairs
.entry(figi_info.isin.clone())
.or_insert_with(Vec::new);
if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) {
tickers.push(figi_info.ticker.clone());
}
}
}
}
// Process each ISIN with validation
for (isin, figi_tickers) in unique_isin_ticker_pairs {
// Check shutdown before each ISIN
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
let tickers = isin_tickers_map
.entry(isin.clone())
.or_insert_with(Vec::new);
for figi_ticker in figi_tickers {
if !tickers.contains(&figi_ticker) {
tickers.push(figi_ticker);
}
}
let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:"));
if !has_yahoo_ticker {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
// Use validated scraping with retry
match scrape_with_retry_serial(pool, &isin, 3, shutdown_flag).await {
Ok(Some(details)) => {
logger::log_info(&format!(
"✓ Found Yahoo ticker {} for ISIN {} (company: {})",
details.ticker, isin, name
)).await;
tickers.push(format!("YAHOO:{}", details.ticker));
if sector.is_none() && details.sector.is_some() {
sector = details.sector.clone();
}
if exchange.is_none() && details.exchange.is_some() {
exchange = details.exchange.clone();
}
},
Ok(None) => {
logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await;
tickers.push("YAHOO:NO_RESULTS".to_string());
},
Err(e) => {
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
logger::log_warn(&format!(
"✗ Yahoo lookup error for ISIN {} (company: {}): {}",
isin, name, e
)).await;
}
}
}
}
// Final shutdown check
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
if !isin_tickers_map.is_empty() {
Ok(Some(CompanyCrossPlatformInfo {
name,
isin_tickers_map,
sector,
exchange,
}))
} else {
Ok(None)
}
}
/// UPDATED: Scrape with retry for serial processing
async fn scrape_with_retry_serial(
pool: &Arc<ChromeDriverPool>,
isin: &str,
max_retries: u32,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Option<YahooCompanyDetails>> {
let mut retries = 0;
loop {
if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow::anyhow!("Aborted due to shutdown"));
}
match scrape_company_details_by_isin(pool, isin, shutdown_flag).await {
Ok(result) => return Ok(result),
Err(e) => {
if retries >= max_retries {
return Err(e);
}
let backoff_ms = 1000 * 2u64.pow(retries);
let jitter_ms = random_range(0, 500);
let total_delay = backoff_ms + jitter_ms;
logger::log_warn(&format!(
"Retry {}/{} for ISIN {} after {}ms: {}",
retries + 1, max_retries, isin, total_delay, e
)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(total_delay)).await;
retries += 1;
}
}
}
}
async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> {
let map_cache_dir = paths.cache_gleif_openfigi_map_dir();

View File

@@ -1,13 +1,13 @@
// src/corporate/update_parallel.rs
// PARALLELIZED VERSION of build_companies_jsonl_streaming
// src/corporate/update_parallel.rs - UPDATED WITH DATA INTEGRITY FIXES
// PARALLELIZED VERSION with atomic commits and validation
//
// Key improvements:
// - Processes multiple companies concurrently using the ChromeDriverPool
// - Maintains data safety with serialized log writes via channel
// - Respects pool size limits via semaphore
// - All fsync and checkpoint logic preserved
// Key improvements over original:
// - Page validation to prevent stale content extraction
// - Shutdown-aware task processing
// - Better error recovery with browser state cleanup
// - All original fsync and checkpoint logic preserved
use super::{types::*, yahoo::*};
use super::{types::*, yahoo::*, helpers::*};
use crate::util::directories::DataPaths;
use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool;
@@ -24,7 +24,6 @@ use std::time::Duration;
use futures::stream::{FuturesUnordered, StreamExt};
use anyhow::{anyhow, Context, Result};
/// Represents a write command to be serialized through the log writer
enum LogCommand {
Write(CompanyCrossPlatformInfo),
@@ -38,25 +37,13 @@ struct CompanyProcessResult {
is_update: bool,
}
/// Abort-safe incremental JSONL persistence with atomic checkpoints (PARALLELIZED)
/// UPDATED: Abort-safe incremental JSONL persistence with validation
///
/// Implements the data_updating_rule.md specification with concurrent processing:
/// - Append-only JSONL log for all updates
/// - Batched fsync for performance (configurable batch size)
/// - Time-based fsync for safety (max 10 seconds without fsync)
/// - Atomic checkpoints via temp file + rename
/// - Crash recovery by loading checkpoint + replaying log
/// - Partial lines automatically ignored by .lines() iterator
/// - PARALLEL processing of companies using ChromeDriverPool
/// - Serialized log writes for data safety
///
/// # Parallelization Strategy
///
/// - Multiple companies processed concurrently (limited by pool size)
/// - Each company's Yahoo lookups happen in parallel
/// - Log writes are serialized through a channel
/// - Pool's semaphore naturally limits concurrency
/// - All fsync and checkpoint logic preserved
/// New safety features:
/// - Page validation before extraction
/// - Shutdown checks at all critical points
/// - Browser state cleanup on errors
/// - All writes still atomic with fsync
pub async fn build_companies_jsonl_streaming_parallel(
paths: &DataPaths,
pool: &Arc<ChromeDriverPool>,
@@ -66,7 +53,7 @@ pub async fn build_companies_jsonl_streaming_parallel(
const CHECKPOINT_INTERVAL: usize = 50;
const FSYNC_BATCH_SIZE: usize = 10;
const FSYNC_INTERVAL_SECS: u64 = 10;
const CONCURRENCY_LIMIT: usize = 100; // Max companies processing at once
const CONCURRENCY_LIMIT: usize = 100;
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
@@ -140,7 +127,6 @@ pub async fn build_companies_jsonl_streaming_parallel(
}
// === SETUP LOG WRITER TASK ===
// This task serializes all log writes to maintain data safety
let (write_tx, mut write_rx) = mpsc::channel::<LogCommand>(1000);
let log_file_init = OpenOptions::new()
@@ -153,8 +139,10 @@ pub async fn build_companies_jsonl_streaming_parallel(
let log_path_clone = log_path.clone();
let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone()));
let write_tx_for_writer = write_tx.clone();
let writer_task = tokio::spawn(async move {
let mut log_file = log_file_init; // Move into the task
let mut log_file = log_file_init;
let mut writes_since_fsync = 0;
let mut last_fsync = std::time::Instant::now();
let mut updates_since_checkpoint = 0;
@@ -208,187 +196,127 @@ pub async fn build_companies_jsonl_streaming_parallel(
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
// Periodic checkpoint
if updates_since_checkpoint >= CHECKPOINT_INTERVAL {
// Fsync pending writes before checkpoint
if writes_since_fsync > 0 {
let _ = log_file.flush().await;
let _ = log_file.sync_data().await;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await;
let checkpoint_tmp = companies_path_clone.with_extension("jsonl.tmp");
let mut checkpoint_file = match tokio::fs::File::create(&checkpoint_tmp).await {
Ok(f) => f,
Err(e) => {
logger::log_error(&format!("Failed to create checkpoint: {}", e)).await;
break;
}
};
let existing_companies = existing_companies_writer.lock().await;
for company in existing_companies.values() {
let line = serde_json::to_string(company).unwrap();
let _ = checkpoint_file.write_all(line.as_bytes()).await;
let _ = checkpoint_file.write_all(b"\n").await;
}
drop(existing_companies);
let _ = checkpoint_file.flush().await;
let _ = checkpoint_file.sync_all().await;
drop(checkpoint_file);
let _ = tokio::fs::rename(&checkpoint_tmp, &companies_path_clone).await;
// Clear log and reopen
drop(log_file);
let _ = tokio::fs::remove_file(&log_path_clone).await;
// Reopen log file
match OpenOptions::new()
.create(true)
.append(true)
.open(&log_path_clone)
.await {
Ok(new_file) => {
log_file = new_file;
updates_since_checkpoint = 0;
logger::log_info("✓ Checkpoint created and log cleared").await;
}
Err(e) => {
logger::log_error(&format!("Failed to reopen log: {}", e)).await;
break;
}
}
}
if count % 10 == 0 {
logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await;
}
},
}
LogCommand::Checkpoint => {
// Force checkpoint - this is the final checkpoint before shutdown
if writes_since_fsync > 0 {
let _ = log_file.flush().await;
let _ = log_file.sync_data().await;
if let Err(e) = log_file.flush().await {
logger::log_error(&format!("Failed to flush before checkpoint: {}", e)).await;
break;
}
if let Err(e) = log_file.sync_data().await {
logger::log_error(&format!("Failed to fsync before checkpoint: {}", e)).await;
break;
}
logger::log_info("Creating final checkpoint...").await;
let checkpoint_tmp = companies_path_clone.with_extension("jsonl.tmp");
if let Ok(mut checkpoint_file) = tokio::fs::File::create(&checkpoint_tmp).await {
let existing_companies = existing_companies_writer.lock().await;
for company in existing_companies.values() {
let line = serde_json::to_string(company).unwrap();
let _ = checkpoint_file.write_all(line.as_bytes()).await;
let _ = checkpoint_file.write_all(b"\n").await;
let existing_companies = existing_companies_writer.lock().await;
let companies_vec: Vec<_> = existing_companies.values().cloned().collect();
drop(existing_companies);
let temp_path = companies_path_clone.with_extension("tmp");
match tokio::fs::File::create(&temp_path).await {
Ok(mut temp_file) => {
let mut checkpoint_ok = true;
for company in &companies_vec {
if let Ok(line) = serde_json::to_string(company) {
if temp_file.write_all(line.as_bytes()).await.is_err() ||
temp_file.write_all(b"\n").await.is_err() {
checkpoint_ok = false;
break;
}
}
}
if checkpoint_ok {
if temp_file.flush().await.is_ok() &&
temp_file.sync_data().await.is_ok() {
drop(temp_file);
if tokio::fs::rename(&temp_path, &companies_path_clone).await.is_ok() {
if tokio::fs::remove_file(&log_path_clone).await.is_ok() {
logger::log_info(&format!(
"✓ Checkpoint created ({} companies), log cleared",
companies_vec.len()
)).await;
if let Ok(new_log) = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path_clone)
.await {
log_file = new_log;
}
}
}
}
}
}
Err(e) => {
logger::log_error(&format!("Failed to create checkpoint temp file: {}", e)).await;
}
drop(existing_companies);
let _ = checkpoint_file.flush().await;
let _ = checkpoint_file.sync_all().await;
drop(checkpoint_file);
let _ = tokio::fs::rename(&checkpoint_tmp, &companies_path_clone).await;
// Clean up log file after final checkpoint
drop(log_file);
let _ = tokio::fs::remove_file(&log_path_clone).await;
logger::log_info("✓ Final checkpoint created").await;
}
// After final checkpoint, exit the loop
break;
},
updates_since_checkpoint = 0;
}
LogCommand::Shutdown => {
// Fsync any pending writes before exit
if writes_since_fsync > 0 {
logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await;
let _ = log_file.flush().await;
let _ = log_file.sync_data().await;
}
logger::log_info("Writer shutting down...").await;
break;
}
}
// Periodic checkpoint trigger
if updates_since_checkpoint >= CHECKPOINT_INTERVAL {
let _ = write_tx.send(LogCommand::Checkpoint).await;
}
}
// Final fsync
let _ = log_file.flush().await;
let _ = log_file.sync_data().await;
logger::log_info(&format!(
"Writer finished: {} total ({} new, {} updated)",
count, new_count, updated_count
)).await;
(count, new_count, updated_count)
});
// === PARALLEL COMPANY PROCESSING ===
logger::log_info(&format!("Processing companies in parallel (max {} concurrent, pool size: {})",
CONCURRENCY_LIMIT, pool.get_number_of_instances())).await;
let pool = pool.clone();
let shutdown_flag = shutdown_flag.clone();
// === PARALLEL PROCESSING PHASE ===
logger::log_info(&format!(
"Starting parallel processing of {} companies (concurrency limit: {})",
securities.len(),
CONCURRENCY_LIMIT
)).await;
let mut processing_tasks = FuturesUnordered::new();
let mut pending_companies = Vec::new();
// Collect companies to process
for (name, company_info) in securities.iter() {
if processed_names.contains(name) {
continue;
}
pending_companies.push((name.clone(), company_info.clone()));
}
logger::log_info(&format!("Found {} companies to process", pending_companies.len())).await;
// Process companies in chunks to limit memory usage
let chunk_size = CONCURRENCY_LIMIT;
let mut processed = 0;
let total = securities.len();
for chunk in pending_companies.chunks(chunk_size) {
for (name, company_info) in securities.into_iter() {
// Check shutdown before creating new tasks
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected, stopping task creation").await;
break;
}
// Launch tasks for this chunk
for (name, company_info) in chunk {
let name = name.clone();
let company_info = company_info.clone();
let pool = pool.clone();
let shutdown_flag = shutdown_flag.clone();
let existing_entry = existing_companies.get(&name).cloned();
let task = tokio::spawn(async move {
process_single_company(
name,
company_info,
existing_entry,
&pool,
&shutdown_flag
).await
});
processing_tasks.push(task);
}
// Wait for chunk to complete
while let Some(result) = processing_tasks.next().await {
match result {
Ok(Ok(Some(company_result))) => {
// Send to writer
if write_tx.send(LogCommand::Write(company_result.company)).await.is_err() {
logger::log_error("Writer task died, stopping processing").await;
break;
// Wait if we hit concurrency limit
while processing_tasks.len() >= CONCURRENCY_LIMIT {
if let Some(result) = processing_tasks.next().await {
match result {
Ok(Ok(Some(company_result))) => {
let company_result: CompanyProcessResult = company_result;
let _ = write_tx_for_writer.send(LogCommand::Write(company_result.company)).await?;
processed += 1;
}
Ok(Ok(None)) => {
processed += 1;
}
Ok(Err(e)) => {
logger::log_warn(&format!("Company processing error: {}", e)).await;
processed += 1;
}
Err(e) => {
logger::log_error(&format!("Task panic: {}", e)).await;
processed += 1;
}
processed += 1;
}
Ok(Ok(None)) => {
// Company had no ISINs or was skipped
processed += 1;
}
Ok(Err(e)) => {
logger::log_warn(&format!("Company processing error: {}", e)).await;
processed += 1;
}
Err(e) => {
logger::log_error(&format!("Task panic: {}", e)).await;
processed += 1;
}
}
@@ -400,11 +328,67 @@ pub async fn build_companies_jsonl_streaming_parallel(
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
// Spawn new task
let pool = pool.clone();
let shutdown_flag = shutdown_flag.clone();
let existing_entry = existing_companies.get(&name).cloned();
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing_entry,
&pool,
&shutdown_flag
).await
});
processing_tasks.push(task);
if processed % 10 == 0 && processed > 0 {
logger::log_info(&format!("Progress: {}/{} companies processed", processed, total)).await;
}
}
// Wait for remaining tasks
logger::log_info(&format!(
"Waiting for {} remaining tasks to complete...",
processing_tasks.len()
)).await;
while let Some(result) = processing_tasks.next().await {
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected during final task wait").await;
break;
}
match result {
Ok(Ok(Some(company_result))) => {
if write_tx_for_writer.send(LogCommand::Write(company_result.company)).await.is_err() {
logger::log_error("Writer task died").await;
break;
}
processed += 1;
}
Ok(Ok(None)) => {
processed += 1;
}
Ok(Err(e)) => {
logger::log_warn(&format!("Company processing error: {}", e)).await;
processed += 1;
}
Err(e) => {
logger::log_error(&format!("Task panic: {}", e)).await;
processed += 1;
}
}
}
// Signal writer to finish
let _ = write_tx.send(LogCommand::Shutdown).await;
drop(write_tx);
let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await;
let _ = write_tx_for_writer.send(LogCommand::Shutdown).await;
drop(write_tx_for_writer);
// Wait for writer to finish
let (final_count, final_new, final_updated) = writer_task.await
@@ -418,23 +402,34 @@ pub async fn build_companies_jsonl_streaming_parallel(
Ok(final_count)
}
/// Scrape with retry, validation, and shutdown awareness
async fn scrape_with_retry(
pool: &Arc<ChromeDriverPool>,
isin: &str,
max_retries: u32,
shutdown_flag: &Arc<AtomicBool>,
) -> Result<Option<YahooCompanyDetails>> {
let mut retries = 0;
loop {
match scrape_company_details_by_isin(pool, isin).await {
// Check shutdown before each attempt
if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow!("Aborted due to shutdown"));
}
match scrape_company_details_by_isin(pool, isin, shutdown_flag).await {
Ok(result) => return Ok(result),
Err(e) => {
if retries >= max_retries {
logger::log_error(&format!(
"All {} retries exhausted for ISIN {}: {}",
max_retries, isin, e
)).await;
return Err(e);
}
let backoff_ms = 1000 * 2u64.pow(retries); // 1s, 2s, 4s, 8s
let jitter_ms = rand::rng().random_range(0..500); // +0-500ms Jitter
let backoff_ms = 1000 * 2u64.pow(retries);
let jitter_ms = random_range(0, 500);
let total_delay = backoff_ms + jitter_ms;
logger::log_warn(&format!(
@@ -449,14 +444,20 @@ async fn scrape_with_retry(
}
}
/// Process a single company: fetch Yahoo data for its ISINs
async fn process_single_company(
/// UPDATED: Process single company with validation and shutdown checks
async fn process_single_company_validated(
name: String,
company_info: CompanyInfo,
existing_entry: Option<CompanyCrossPlatformInfo>,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Option<CompanyProcessResult>> {
// Check shutdown at start
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn(&format!("Shutdown detected, skipping company: {}", name)).await;
return Ok(None);
}
let is_update = existing_entry.is_some();
let mut isin_tickers_map: HashMap<String, Vec<String>> =
@@ -485,9 +486,14 @@ async fn process_single_company(
}
}
// Process each ISIN (these Yahoo lookups will happen in parallel across companies)
// Process each ISIN with validation
for (isin, figi_tickers) in unique_isin_ticker_pairs {
// Check shutdown before each ISIN
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn(&format!(
"Shutdown detected while processing company: {}",
name
)).await;
break;
}
@@ -503,11 +509,15 @@ async fn process_single_company(
let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:"));
if !has_yahoo_ticker && !shutdown_flag.load(Ordering::SeqCst) {
if !has_yahoo_ticker {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
match scrape_with_retry(pool, &isin, 3).await {
match scrape_with_retry(pool, &isin, 3, shutdown_flag).await {
Ok(Some(details)) => {
logger::log_info(&format!("✓ Found Yahoo ticker {} for ISIN {}", details.ticker, isin)).await;
logger::log_info(&format!(
"✓ Found Yahoo ticker {} for ISIN {} (company: {})",
details.ticker, isin, name
)).await;
tickers.push(format!("YAHOO:{}", details.ticker));
@@ -522,20 +532,30 @@ async fn process_single_company(
}
},
Ok(None) => {
logger::log_warn(&format!("◯ No search results for ISIN {}", isin)).await;
logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await;
tickers.push("YAHOO:NO_RESULTS".to_string());
},
Err(e) => {
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn(&format!("Shutdown during scrape for ISIN {}", isin)).await;
break;
}
logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await;
logger::log_warn(&format!(
"✗ Yahoo lookup error for ISIN {} (company: {}): {}",
isin, name, e
)).await;
// Continue with next ISIN
}
}
}
}
// Final shutdown check before returning result
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn(&format!(
"Shutdown detected, discarding incomplete result for: {}",
name
)).await;
return Ok(None);
}
@@ -552,6 +572,7 @@ async fn process_single_company(
is_update,
}))
} else {
logger::log_warn(&format!("No ISINs found for company: {}", name)).await;
Ok(None)
}
}

View File

@@ -1,18 +1,15 @@
// src/corporate/yahoo.rs
use super::{types::*, helpers::*};
// src/corporate/yahoo.rs - UPDATED WITH DATA INTEGRITY FIXES
use super::{types::*, helpers::*, page_validation::*};
use crate::{scraper::webdriver::*, util::{directories::DataPaths}};
use event_backtest_engine::logger;
use crate::logger;
use fantoccini::{Client, Locator};
use rand::Rng;
use serde::{Deserialize, Serialize};
use tokio::time::{Duration as TokioDuration, sleep, timeout};
use std::{sync::Arc};
use std::{sync::Arc, sync::atomic::{AtomicBool, Ordering}};
use anyhow::{anyhow, Result};
const YAHOO_COMPANY_EXTRACTION_JS: &str = include_str!("yahoo_company_extraction.js");
/// Mapping existing
/// getting historical stock price data daily (xxxx - 2025) and hourly (last 30 days)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum YahooTickerResult {
@@ -66,29 +63,137 @@ impl YahooTickerResult {
}
}
/// UPDATED: Scrape company details with full validation and shutdown support
pub async fn scrape_company_details_by_isin(
pool: &Arc<ChromeDriverPool>,
isin: &str,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Option<YahooCompanyDetails>> {
let isin = isin.to_string();
pool.execute(format!("https://finance.yahoo.com/lookup/?s={}", isin), move |client| {
let isin = isin.clone();
// Check shutdown before starting
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn(&format!("Shutdown detected, skipping ISIN: {}", isin)).await;
return Ok(None);
}
let isin_owned = isin.to_string();
let shutdown_clone = Arc::clone(shutdown_flag);
let url = format!("https://finance.yahoo.com/lookup/?s={}", isin);
pool.execute(url.clone(), move |client| {
let isin = isin_owned.clone();
let shutdown = shutdown_clone.clone();
Box::pin(async move {
// Random Delay between 800-1500ms
// Check shutdown during task execution
if shutdown.load(Ordering::SeqCst) {
return Err(anyhow!("Task aborted due to shutdown"));
}
// Random delay
let delay = rand::rng().random_range(800..1500);
sleep(TokioDuration::from_millis(delay)).await;
// Reject cookies
reject_yahoo_cookies(&client).await?;
// Random Delay
// Check shutdown again
if shutdown.load(Ordering::SeqCst) {
return Err(anyhow!("Task aborted due to shutdown"));
}
// CRITICAL: Validate navigation succeeded
let expected_fragment = format!("lookup/?s={}", isin);
match verify_navigation(&client, &expected_fragment, 5).await {
Ok(_) => {
logger::log_info(&format!("✓ Navigation validated for ISIN: {}", isin)).await;
}
Err(e) => {
logger::log_error(&format!(
"Navigation verification failed for ISIN {}: {}",
isin, e
)).await;
// Clear browser state before returning error
clear_browser_state(&client).await.ok();
return Err(e);
}
}
// Additional content validation
let page_ready: bool = client
.execute(
r#"
const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table');
const noData = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn');
return !!(table || noData);
"#,
vec![],
)
.await?
.as_bool()
.unwrap_or(false);
if !page_ready {
logger::log_error(&format!(
"Page content not ready for ISIN {} - neither table nor no-data element found",
isin
)).await;
clear_browser_state(&client).await.ok();
return Err(anyhow!("Page content not ready"));
}
logger::log_info(&format!("✓ Page content validated for ISIN: {}", isin)).await;
// Check shutdown before extraction
if shutdown.load(Ordering::SeqCst) {
return Err(anyhow!("Task aborted due to shutdown"));
}
// Random delay before extraction
let delay = rand::rng().random_range(800..1500);
sleep(TokioDuration::from_millis(delay)).await;
extract_company_details(&client, &isin).await
// Now safe to extract
extract_company_details_validated(&client, &isin).await
})
}).await
}
/// UPDATED: Extract with additional URL validation
async fn extract_company_details_validated(
client: &Client,
isin: &str,
) -> Result<Option<YahooCompanyDetails>> {
// Double-check URL is still correct before extraction
let current_url = client.current_url().await?;
if !current_url.as_str().contains(isin) {
logger::log_error(&format!(
"URL mismatch before extraction: expected ISIN '{}' in URL, got '{}'",
isin,
current_url.as_str()
)).await;
clear_browser_state(client).await.ok();
return Err(anyhow!("URL mismatch - possible stale page"));
}
// Run extraction
let result = extract_company_details(client, isin).await?;
// Validate extraction result
if let Some(ref details) = result {
logger::log_info(&format!(
"✓ Extracted ticker '{}' for ISIN {} (sector: {:?}, exchange: {:?})",
details.ticker, isin, details.sector, details.exchange
)).await;
} else {
logger::log_info(&format!(
"No ticker found for ISIN {} (legitimately not found)",
isin
)).await;
}
Ok(result)
}
pub async fn extract_company_details(
client: &Client,
_isin: &str,
@@ -153,17 +258,13 @@ pub async fn extract_company_details(
// Parse the JSON result
let extraction: ExtractionResult = serde_json::from_value(result.clone())
.map_err(|e| {
// Log the problematic result value for debugging
let result_str = serde_json::to_string_pretty(&result).unwrap_or_else(|_| format!("{:?}", result));
anyhow!("Failed to parse extraction result: {}. Raw result: {}", e, result_str)
})?;
match extraction.status.as_str() {
"found" => {
// Ticker is guaranteed to be present when status is "found"
// Sector and exchange are optional
if let Some(ticker) = extraction.ticker {
// Log metadata if available
if let Some(ref metadata) = extraction.metadata {
logger::log_info(&format!(
"Selected row {} with {} valid fields out of {} total rows",
@@ -179,13 +280,11 @@ pub async fn extract_company_details(
exchange: extraction.exchange,
}))
} else {
// This shouldn't happen if JS script is working correctly
Err(anyhow!("Status 'found' but no ticker present"))
}
},
"no_results" => Ok(None),
"error" => {
// Error status means ticker was not found or extraction failed
let error_msg = extraction.error_message.unwrap_or_else(|| "Unknown error".to_string());
Err(anyhow!("JavaScript extraction error: {}", error_msg))
},
@@ -207,19 +306,6 @@ pub async fn get_all_tickers_from_companies_jsonl(paths: &DataPaths) -> anyhow::
Ok(tickers)
}
/// Fetches earnings events for a ticker using a dedicated ScrapeTask.
///
/// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar,
/// reject cookies, and extract the events.
///
/// # Arguments
/// * `ticker` - The stock ticker symbol.
///
/// # Returns
/// A vector of CompanyEvent structs on success.
///
/// # Errors
/// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues.
pub async fn fetch_earnings_with_pool(
pool: &Arc<ChromeDriverPool>,
ticker: &str,
@@ -238,40 +324,6 @@ pub async fn fetch_earnings_with_pool(
}).await
}
/// Extracts earnings events from the currently loaded Yahoo Finance earnings calendar page.
///
/// This function assumes the client is already navigated to the correct URL (e.g.,
/// https://finance.yahoo.com/calendar/earnings?symbol={ticker}) and cookies are handled.
///
/// It waits for the earnings table, extracts rows, parses cells into CompanyEvent structs,
/// and handles date parsing, float parsing, and optional fields.
///
/// # Arguments
/// * `client` - The fantoccini Client with the page loaded.
/// * `ticker` - The stock ticker symbol for the events.
///
/// # Returns
/// A vector of CompanyEvent on success.
///
/// # Errors
/// Returns an error if:
/// - Table or elements not found.
/// - Date or float parsing fails.
/// - WebDriver operations fail.
///
/// # Examples
///
/// ```no_run
/// use fantoccini::Client;
/// use crate::corporate::scraper::extract_earnings;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // Assume client is set up and navigated
/// let events = extract_earnings(&client, "AAPL").await?;
/// Ok(())
/// }
/// ```
pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result<Vec<CompanyEvent>> {
// Wait for the table to load
let table = client