added parallelized scraping instances for company yahoo ticker seeding

This commit is contained in:
2025-12-18 13:05:23 +01:00
parent d26e833d93
commit 9c66f0d361
7 changed files with 842 additions and 68 deletions

View File

@@ -1,7 +1,7 @@
// src/corporate/update.rs - ABORT-SAFE VERSION WITH JSONL LOG
use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*};
use crate::config::Config;
use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel;
use crate::util::directories::DataPaths;
use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool;
@@ -73,7 +73,7 @@ pub async fn run_full_update(
}
logger::log_info("Step 5: Building companies.jsonl (streaming with abort-safe persistence)...").await;
let count = build_companies_jsonl_streaming(&paths, pool, shutdown_flag).await?;
let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
if !shutdown_flag.load(Ordering::SeqCst) {
@@ -90,15 +90,31 @@ pub async fn run_full_update(
///
/// Implements the data_updating_rule.md specification:
/// - Append-only JSONL log for all updates
/// - fsync after each write batch
/// - Batched fsync for performance (configurable batch size)
/// - Time-based fsync for safety (max 10 seconds without fsync)
/// - Atomic checkpoints via temp file + rename
/// - Crash recovery by loading checkpoint + replaying log
/// - Partial lines ignored during recovery
/// - Partial lines automatically ignored by .lines() iterator
///
/// # Error Handling & Crash Safety
///
/// If any write or fsync fails:
/// - Function returns error immediately
/// - Partial line may be in OS buffer but not fsynced
/// - On next startup, .lines() will either:
/// a) Skip partial line (if no \n written)
/// b) Fail to parse malformed JSON (logged and skipped)
/// - No data corruption, at most last batch entries lost
async fn build_companies_jsonl_streaming(
paths: &DataPaths,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<usize> {
// Configuration constants
const CHECKPOINT_INTERVAL: usize = 50; // Create checkpoint every N updates
const FSYNC_BATCH_SIZE: usize = 10; // fsync every N writes for performance
const FSYNC_INTERVAL_SECS: u64 = 10; // Also fsync every N seconds for safety
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
let securities_path = corporate_path.join("common_stocks.json");
@@ -125,23 +141,22 @@ async fn build_companies_jsonl_streaming(
if companies_path.exists() {
logger::log_info("Loading checkpoint from companies.jsonl...").await;
let existing_content = tokio::fs::read_to_string(&companies_path).await?;
// Note: .lines() only returns complete lines terminated with \n
// Partial lines (incomplete writes from crashes) are automatically skipped
for line in existing_content.lines() {
if line.trim().is_empty() {
continue;
}
// Only process complete lines (ending with proper JSON closing brace)
// This ensures we don't process partial writes from crashed processes
if !line.ends_with('}') {
logger::log_warn(&format!("Skipping incomplete checkpoint line: {}", &line[..line.len().min(50)])).await;
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company);
}
Err(e) => {
logger::log_warn(&format!("Failed to parse checkpoint line: {}", e)).await;
// This catches both malformed JSON and partial lines
logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await;
}
}
}
@@ -153,16 +168,14 @@ async fn build_companies_jsonl_streaming(
logger::log_info("Replaying update log...").await;
let log_content = tokio::fs::read_to_string(&log_path).await?;
let mut replayed = 0;
// Note: .lines() only returns complete lines terminated with \n
// Partial lines from crashes are automatically skipped
for line in log_content.lines() {
if line.trim().is_empty() {
continue;
}
// Only replay complete lines (crash-safe: incomplete lines are ignored)
// A line is considered complete only if it ends with '\n' and valid JSON
if !line.ends_with('}') {
logger::log_warn(&format!("Skipping incomplete log line: {}", &line[..line.len().min(50)])).await;
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
processed_names.insert(company.name.clone());
@@ -170,7 +183,8 @@ async fn build_companies_jsonl_streaming(
replayed += 1;
}
Err(e) => {
logger::log_warn(&format!("Failed to parse log line: {}", e)).await;
// This catches both malformed JSON and partial lines
logger::log_warn(&format!("Skipping invalid log line: {}", e)).await;
}
}
}
@@ -190,9 +204,12 @@ async fn build_companies_jsonl_streaming(
let mut count = existing_companies.len();
let mut updated_count = 0;
let mut new_count = 0;
let checkpoint_interval = 50; // Create atomic checkpoint every 50 updates
let mut updates_since_checkpoint = 0;
// Batched fsync tracking for performance
let mut writes_since_fsync = 0;
let mut last_fsync = std::time::Instant::now();
use tokio::io::AsyncWriteExt;
for (name, company_info) in securities.iter() {
@@ -296,18 +313,29 @@ async fn build_companies_jsonl_streaming(
exchange,
};
// === APPEND-ONLY: Write single-line JSON with fsync ===
// This guarantees the line is either fully written or not at all
// === APPEND-ONLY: Write single-line JSON with batched fsync ===
// Write guarantees the line is either fully written or not at all
let line = serde_json::to_string(&company_entry)?;
log_file.write_all(line.as_bytes()).await?;
log_file.write_all(b"\n").await?;
log_file.flush().await?;
writes_since_fsync += 1;
// Critical: fsync to ensure durability before considering write successful
// This prevents data loss on power failure or kernel panic
log_file.sync_data().await?;
// Batched fsync for performance + time-based fsync for safety
// fsync if: batch size reached OR time interval exceeded
let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS;
// Update in-memory state ONLY after successful fsync
if should_fsync {
log_file.flush().await?;
// Critical: fsync to ensure durability before considering writes successful
// This prevents data loss on power failure or kernel panic
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
// Update in-memory state ONLY after write (fsync happens in batches)
// This is safe because we fsync before checkpoints and at end of processing
processed_names.insert(name.clone());
existing_companies.insert(name.clone(), company_entry);
@@ -322,7 +350,15 @@ async fn build_companies_jsonl_streaming(
// === ATOMIC CHECKPOINT: Periodically create checkpoint ===
// This reduces recovery time by snapshotting current state
if updates_since_checkpoint >= checkpoint_interval {
if updates_since_checkpoint >= CHECKPOINT_INTERVAL {
// Ensure any pending writes are fsynced before checkpoint
if writes_since_fsync > 0 {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await;
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
@@ -362,10 +398,30 @@ async fn build_companies_jsonl_streaming(
tokio::task::yield_now().await;
}
}
// Time-based fsync: Even if this company didn't result in a write,
// fsync any pending writes if enough time has passed
// This reduces data loss window during long Yahoo lookup operations
if writes_since_fsync > 0 && last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
logger::log_info("Time-based fsync completed").await;
}
}
// === FSYNC PENDING WRITES: Even if shutdown requested, save what we have ===
if writes_since_fsync > 0 {
logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await;
log_file.flush().await?;
log_file.sync_data().await?;
logger::log_info("✓ Pending writes saved").await;
}
// === FINAL CHECKPOINT: Write complete final state ===
// This ensures we don't need to replay the log on next startup
// (Pending writes were already fsynced above)
if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 {
logger::log_info("Creating final checkpoint...").await;