removed serial data scraping for yahoo tickers

This commit is contained in:
2025-12-19 16:58:22 +01:00
parent 5e81959322
commit c01b47000f
2 changed files with 1 additions and 419 deletions

View File

@@ -93,423 +93,6 @@ pub async fn run_full_update(
Ok(()) Ok(())
} }
/// UPDATED: Serial version with validation (kept for compatibility/debugging)
///
/// This is the non-parallel version that processes companies sequentially.
/// Updated with same validation and shutdown checks as parallel version.
///
/// Use this for:
/// - Debugging issues with specific companies
/// - Environments where parallel processing isn't desired
/// - Testing validation logic without concurrency complexity
async fn build_companies_jsonl_streaming_serial(
paths: &DataPaths,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<usize> {
// Configuration constants
const CHECKPOINT_INTERVAL: usize = 50;
const FSYNC_BATCH_SIZE: usize = 10;
const FSYNC_INTERVAL_SECS: u64 = 10;
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
let securities_path = corporate_path.join("common_stocks.json");
if !securities_path.exists() {
logger::log_warn("No common_stocks.json found").await;
return Ok(0);
}
let content = tokio::fs::read_to_string(securities_path).await?;
let securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
let companies_path = paths.data_dir().join("companies.jsonl");
let log_path = paths.data_dir().join("companies_updates.log");
if let Some(parent) = companies_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
// === RECOVERY PHASE: Load checkpoint + replay log ===
let mut existing_companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new();
let mut processed_names: std::collections::HashSet<String> = std::collections::HashSet::new();
if companies_path.exists() {
logger::log_info("Loading checkpoint from companies.jsonl...").await;
let existing_content = tokio::fs::read_to_string(&companies_path).await?;
for line in existing_content.lines() {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company);
}
Err(e) => {
logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await;
}
}
}
logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await;
}
if log_path.exists() {
logger::log_info("Replaying update log...").await;
let log_content = tokio::fs::read_to_string(&log_path).await?;
let mut replayed = 0;
for line in log_content.lines() {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company);
replayed += 1;
}
Err(e) => {
logger::log_warn(&format!("Skipping invalid log line: {}", e)).await;
}
}
}
if replayed > 0 {
logger::log_info(&format!("Replayed {} updates from log", replayed)).await;
}
}
// === OPEN LOG FILE ===
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
let mut log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.await?;
let mut writes_since_fsync = 0;
let mut last_fsync = std::time::Instant::now();
let mut updates_since_checkpoint = 0;
let mut count = 0;
let mut new_count = 0;
let mut updated_count = 0;
logger::log_info(&format!("Processing {} companies sequentially...", securities.len())).await;
// === PROCESS COMPANIES SEQUENTIALLY ===
for (name, company_info) in securities.clone() {
// Check shutdown before each company
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn(&format!(
"Shutdown detected at company: {} (progress: {}/{})",
name, count, count + securities.len()
)).await;
break;
}
let existing_entry = existing_companies.get(&name).cloned();
let is_update = existing_entry.is_some();
// Process company with validation
match process_single_company_serial(
name.clone(),
company_info,
existing_entry,
pool,
shutdown_flag,
).await {
Ok(Some(company_entry)) => {
// Write to log
let line = serde_json::to_string(&company_entry)?;
log_file.write_all(line.as_bytes()).await?;
log_file.write_all(b"\n").await?;
writes_since_fsync += 1;
// Batched + time-based fsync
let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS;
if should_fsync {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
// Update in-memory state
processed_names.insert(name.clone());
existing_companies.insert(name.clone(), company_entry);
count += 1;
updates_since_checkpoint += 1;
if is_update {
updated_count += 1;
} else {
new_count += 1;
}
// Periodic checkpoint
if updates_since_checkpoint >= CHECKPOINT_INTERVAL {
if writes_since_fsync > 0 {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await;
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
for company in existing_companies.values() {
let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
}
checkpoint_file.flush().await?;
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.await?;
updates_since_checkpoint = 0;
logger::log_info("✓ Checkpoint created and log cleared").await;
}
if count % 10 == 0 {
logger::log_info(&format!(
"Progress: {} companies ({} new, {} updated)",
count, new_count, updated_count
)).await;
}
}
Ok(None) => {
// Company had no ISINs or was skipped
logger::log_info(&format!("Skipped company: {} (no ISINs)", name)).await;
}
Err(e) => {
logger::log_warn(&format!("Error processing company {}: {}", name, e)).await;
}
}
// Time-based fsync
if writes_since_fsync > 0 && last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
}
// === FSYNC PENDING WRITES ===
if writes_since_fsync > 0 {
logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await;
log_file.flush().await?;
log_file.sync_data().await?;
logger::log_info("✓ Pending writes saved").await;
}
// === FINAL CHECKPOINT ===
if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 {
logger::log_info("Creating final checkpoint...").await;
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
for company in existing_companies.values() {
let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
}
checkpoint_file.flush().await?;
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
logger::log_info("✓ Final checkpoint created").await;
}
logger::log_info(&format!(
"Completed: {} total companies ({} new, {} updated)",
count, new_count, updated_count
)).await;
Ok(count)
}
/// UPDATED: Process single company serially with validation
async fn process_single_company_serial(
name: String,
company_info: CompanyInfo,
existing_entry: Option<CompanyCrossPlatformInfo>,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Option<CompanyCrossPlatformInfo>> {
// Check shutdown at start
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
let mut isin_tickers_map: HashMap<String, Vec<String>> =
existing_entry
.as_ref()
.map(|e| e.isin_tickers_map.clone())
.unwrap_or_default();
let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone());
let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone());
// Collect unique ISIN-ticker pairs
let mut unique_isin_ticker_pairs: HashMap<String, Vec<String>> = HashMap::new();
for figi_infos in company_info.securities.values() {
for figi_info in figi_infos {
if !figi_info.isin.is_empty() {
let tickers = unique_isin_ticker_pairs
.entry(figi_info.isin.clone())
.or_insert_with(Vec::new);
if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) {
tickers.push(figi_info.ticker.clone());
}
}
}
}
// Process each ISIN with validation
for (isin, figi_tickers) in unique_isin_ticker_pairs {
// Check shutdown before each ISIN
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
let tickers = isin_tickers_map
.entry(isin.clone())
.or_insert_with(Vec::new);
for figi_ticker in figi_tickers {
if !tickers.contains(&figi_ticker) {
tickers.push(figi_ticker);
}
}
let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:"));
if !has_yahoo_ticker {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
// Use validated scraping with retry
match scrape_with_retry_serial(pool, &isin, 3, shutdown_flag).await {
Ok(Some(details)) => {
logger::log_info(&format!(
"✓ Found Yahoo ticker {} for ISIN {} (company: {})",
details.ticker, isin, name
)).await;
tickers.push(format!("YAHOO:{}", details.ticker));
if sector.is_none() && details.sector.is_some() {
sector = details.sector.clone();
}
if exchange.is_none() && details.exchange.is_some() {
exchange = details.exchange.clone();
}
},
Ok(None) => {
logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await;
tickers.push("YAHOO:NO_RESULTS".to_string());
},
Err(e) => {
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
logger::log_warn(&format!(
"✗ Yahoo lookup error for ISIN {} (company: {}): {}",
isin, name, e
)).await;
}
}
}
}
// Final shutdown check
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
if !isin_tickers_map.is_empty() {
Ok(Some(CompanyCrossPlatformInfo {
name,
isin_tickers_map,
sector,
exchange,
}))
} else {
Ok(None)
}
}
/// UPDATED: Scrape with retry for serial processing
async fn scrape_with_retry_serial(
pool: &Arc<ChromeDriverPool>,
isin: &str,
max_retries: u32,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Option<YahooCompanyDetails>> {
let mut retries = 0;
loop {
if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow::anyhow!("Aborted due to shutdown"));
}
match scrape_company_details_by_isin(pool, isin, shutdown_flag).await {
Ok(result) => return Ok(result),
Err(e) => {
if retries >= max_retries {
return Err(e);
}
let backoff_ms = 1000 * 2u64.pow(retries);
let jitter_ms = random_range(0, 500);
let total_delay = backoff_ms + jitter_ms;
logger::log_warn(&format!(
"Retry {}/{} for ISIN {} after {}ms: {}",
retries + 1, max_retries, isin, total_delay, e
)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(total_delay)).await;
retries += 1;
}
}
}
}
async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> { async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> {
let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); let map_cache_dir = paths.cache_gleif_openfigi_map_dir();

View File

@@ -12,7 +12,6 @@ use crate::util::directories::DataPaths;
use crate::util::logger; use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool; use crate::scraper::webdriver::ChromeDriverPool;
use rand::Rng;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::fs::OpenOptions; use tokio::fs::OpenOptions;
@@ -22,7 +21,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration; use std::time::Duration;
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Result};
/// Represents a write command to be serialized through the log writer /// Represents a write command to be serialized through the log writer
enum LogCommand { enum LogCommand {