fixed yahoo api calls for cleansing low profile data
This commit is contained in:
929
src/corporate/update_companies.rs
Normal file
929
src/corporate/update_companies.rs
Normal file
@@ -0,0 +1,929 @@
|
||||
// src/corporate/update_companies.rs
|
||||
use super::{types::*, yahoo::*, helpers::*};
|
||||
use crate::util::directories::DataPaths;
|
||||
use crate::util::logger;
|
||||
use crate::scraper::webdriver::ChromeDriverPool;
|
||||
use crate::scraper::hard_reset::perform_hard_reset;
|
||||
use crate::config::Config;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::time::sleep;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use anyhow::{anyhow, Result};
|
||||
|
||||
/// Represents a write command to be serialized through the log writer
|
||||
enum LogCommand {
|
||||
Write(CompanyCrossPlatformInfo),
|
||||
Checkpoint,
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
/// Result from processing a single company
|
||||
struct CompanyProcessResult {
|
||||
company: CompanyCrossPlatformInfo,
|
||||
is_update: bool,
|
||||
}
|
||||
|
||||
/// Check if a company needs Yahoo data processing
|
||||
/// Returns true if company has incomplete data (needs processing)
|
||||
fn company_needs_processing(
|
||||
company_name: &str,
|
||||
company_info: &CompanyInfo,
|
||||
existing_companies: &HashMap<String, CompanyCrossPlatformInfo>,
|
||||
) -> bool {
|
||||
// If company not in existing data at all, definitely needs processing
|
||||
let Some(existing_entry) = existing_companies.get(company_name) else {
|
||||
return true;
|
||||
};
|
||||
|
||||
// Collect all ISINs this company should have
|
||||
let mut required_isins = std::collections::HashSet::new();
|
||||
for figi_infos in company_info.securities.values() {
|
||||
for figi_info in figi_infos {
|
||||
if !figi_info.isin.is_empty() {
|
||||
required_isins.insert(figi_info.isin.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check each required ISIN
|
||||
for isin in required_isins {
|
||||
// Check if this ISIN exists in the company's ticker map
|
||||
if let Some(tickers) = existing_entry.isin_tickers_map.get(&isin) {
|
||||
// Check if this ISIN has valid Yahoo data
|
||||
let has_valid_yahoo = tickers.iter().any(|t| {
|
||||
t.starts_with("YAHOO:") &&
|
||||
t != "YAHOO:ERROR" //&& // Error marker means needs retry
|
||||
//t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found)
|
||||
});
|
||||
|
||||
// If no valid Yahoo data for this ISIN, company needs processing
|
||||
if !has_valid_yahoo {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
// ISIN not in map at all, needs processing
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// All ISINs have valid Yahoo data, skip this company
|
||||
false
|
||||
}
|
||||
|
||||
/// Abort-safe incremental JSONL persistence with proper hard reset handling
|
||||
pub async fn build_companies_jsonl_streaming_parallel(
|
||||
paths: &DataPaths,
|
||||
pool: &Arc<ChromeDriverPool>,
|
||||
shutdown_flag: &Arc<AtomicBool>,
|
||||
config: &Config,
|
||||
monitoring: &Option<crate::monitoring::MonitoringHandle>,
|
||||
) -> anyhow::Result<usize> {
|
||||
// Configuration constants
|
||||
const CHECKPOINT_INTERVAL: usize = 50;
|
||||
const FSYNC_BATCH_SIZE: usize = 10;
|
||||
const FSYNC_INTERVAL_SECS: u64 = 10;
|
||||
const CONCURRENCY_LIMIT: usize = 100;
|
||||
|
||||
// Wrap pool in mutex for potential replacement
|
||||
let pool_mutex = Arc::new(tokio::sync::Mutex::new(Arc::clone(pool)));
|
||||
|
||||
// Synchronization for hard reset
|
||||
let reset_in_progress = Arc::new(tokio::sync::Mutex::new(false));
|
||||
|
||||
let path = DataPaths::new(".")?;
|
||||
let corporate_path = path.data_dir().join("corporate").join("by_name");
|
||||
let securities_checkpoint = corporate_path.join("common_stocks.jsonl");
|
||||
let securities_log = corporate_path.join("common_stocks.log.jsonl");
|
||||
|
||||
if !securities_checkpoint.exists() {
|
||||
logger::log_warn("No common_stocks.jsonl found").await;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Load securities from checkpoint and replay log
|
||||
logger::log_info("Loading common stocks from JSONL checkpoint and log...").await;
|
||||
let securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?;
|
||||
logger::log_info(&format!("Loaded {} companies from common stocks", securities.len())).await;
|
||||
|
||||
let companies_path = paths.data_dir().join("companies.jsonl");
|
||||
let log_path = paths.data_dir().join("companies_updates.log");
|
||||
|
||||
if let Some(parent) = companies_path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
|
||||
// === RECOVERY PHASE: Load checkpoint + replay log ===
|
||||
let mut existing_companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new();
|
||||
let mut processed_names: std::collections::HashSet<String> = std::collections::HashSet::new();
|
||||
|
||||
if companies_path.exists() {
|
||||
logger::log_info("Loading checkpoint from companies.jsonl...").await;
|
||||
let existing_content = tokio::fs::read_to_string(&companies_path).await?;
|
||||
|
||||
for line in existing_content.lines() {
|
||||
if line.trim().is_empty() || !line.ends_with('}') {
|
||||
continue; // Skip incomplete lines
|
||||
}
|
||||
|
||||
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
|
||||
Ok(company) => {
|
||||
processed_names.insert(company.name.clone());
|
||||
existing_companies.insert(company.name.clone(), company);
|
||||
}
|
||||
Err(e) => {
|
||||
logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await;
|
||||
}
|
||||
|
||||
if log_path.exists() {
|
||||
logger::log_info("Replaying update log...").await;
|
||||
let log_content = tokio::fs::read_to_string(&log_path).await?;
|
||||
let mut replayed = 0;
|
||||
|
||||
for line in log_content.lines() {
|
||||
if line.trim().is_empty() || !line.ends_with('}') {
|
||||
continue; // Skip incomplete lines
|
||||
}
|
||||
|
||||
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
|
||||
Ok(company) => {
|
||||
processed_names.insert(company.name.clone());
|
||||
existing_companies.insert(company.name.clone(), company);
|
||||
replayed += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
logger::log_warn(&format!("Skipping invalid log line: {}", e)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
if replayed > 0 {
|
||||
logger::log_info(&format!("Replayed {} updates from log", replayed)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// === SETUP LOG WRITER TASK ===
|
||||
let (write_tx, mut write_rx) = mpsc::channel::<LogCommand>(1000);
|
||||
|
||||
let log_file_init = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&log_path)
|
||||
.await?;
|
||||
|
||||
let companies_path_clone = companies_path.clone();
|
||||
let log_path_clone = log_path.clone();
|
||||
let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone()));
|
||||
|
||||
// Clone the Arc for the writer task (Arc clone is cheap, just increments ref count)
|
||||
let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer);
|
||||
|
||||
let write_tx_for_writer = write_tx.clone();
|
||||
let writer_task = tokio::spawn(async move {
|
||||
let mut log_file = log_file_init;
|
||||
let mut writes_since_fsync = 0;
|
||||
let mut last_fsync = std::time::Instant::now();
|
||||
let mut updates_since_checkpoint = 0;
|
||||
let mut count = 0;
|
||||
let mut new_count = 0;
|
||||
let mut updated_count = 0;
|
||||
|
||||
while let Some(cmd) = write_rx.recv().await {
|
||||
match cmd {
|
||||
LogCommand::Write(company) => {
|
||||
// Write to log
|
||||
let line = serde_json::to_string(&company).unwrap();
|
||||
if let Err(e) = log_file.write_all(line.as_bytes()).await {
|
||||
logger::log_error(&format!("Failed to write to log: {}", e)).await;
|
||||
break;
|
||||
}
|
||||
if let Err(e) = log_file.write_all(b"\n").await {
|
||||
logger::log_error(&format!("Failed to write newline: {}", e)).await;
|
||||
break;
|
||||
}
|
||||
|
||||
writes_since_fsync += 1;
|
||||
updates_since_checkpoint += 1;
|
||||
count += 1;
|
||||
|
||||
// Update in-memory state
|
||||
let mut existing_companies = existing_companies_writer_for_task.lock().await;
|
||||
let is_update = existing_companies.contains_key(&company.name);
|
||||
existing_companies.insert(company.name.clone(), company);
|
||||
drop(existing_companies);
|
||||
|
||||
if is_update {
|
||||
updated_count += 1;
|
||||
} else {
|
||||
new_count += 1;
|
||||
}
|
||||
|
||||
// Batched + time-based fsync
|
||||
let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE
|
||||
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS;
|
||||
|
||||
if should_fsync {
|
||||
if let Err(e) = log_file.flush().await {
|
||||
logger::log_error(&format!("Failed to flush: {}", e)).await;
|
||||
break;
|
||||
}
|
||||
if let Err(e) = log_file.sync_data().await {
|
||||
logger::log_error(&format!("Failed to fsync: {}", e)).await;
|
||||
break;
|
||||
}
|
||||
writes_since_fsync = 0;
|
||||
last_fsync = std::time::Instant::now();
|
||||
}
|
||||
}
|
||||
LogCommand::Checkpoint => {
|
||||
if let Err(e) = log_file.flush().await {
|
||||
logger::log_error(&format!("Failed to flush before checkpoint: {}", e)).await;
|
||||
break;
|
||||
}
|
||||
if let Err(e) = log_file.sync_data().await {
|
||||
logger::log_error(&format!("Failed to fsync before checkpoint: {}", e)).await;
|
||||
break;
|
||||
}
|
||||
|
||||
let existing_companies = existing_companies_writer_for_task.lock().await;
|
||||
let companies_vec: Vec<_> = existing_companies.values().cloned().collect();
|
||||
drop(existing_companies);
|
||||
|
||||
let temp_path = companies_path_clone.with_extension("tmp");
|
||||
match tokio::fs::File::create(&temp_path).await {
|
||||
Ok(mut temp_file) => {
|
||||
let mut checkpoint_ok = true;
|
||||
for company in &companies_vec {
|
||||
if let Ok(line) = serde_json::to_string(company) {
|
||||
if temp_file.write_all(line.as_bytes()).await.is_err() ||
|
||||
temp_file.write_all(b"\n").await.is_err() {
|
||||
checkpoint_ok = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if checkpoint_ok {
|
||||
if temp_file.flush().await.is_ok() &&
|
||||
temp_file.sync_data().await.is_ok() {
|
||||
drop(temp_file);
|
||||
|
||||
if tokio::fs::rename(&temp_path, &companies_path_clone).await.is_ok() {
|
||||
if tokio::fs::remove_file(&log_path_clone).await.is_ok() {
|
||||
logger::log_info(&format!(
|
||||
"✓ Checkpoint created ({} companies), log cleared",
|
||||
companies_vec.len()
|
||||
)).await;
|
||||
|
||||
if let Ok(new_log) = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&log_path_clone)
|
||||
.await {
|
||||
log_file = new_log;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
logger::log_error(&format!("Failed to create checkpoint temp file: {}", e)).await;
|
||||
}
|
||||
}
|
||||
updates_since_checkpoint = 0;
|
||||
}
|
||||
LogCommand::Shutdown => {
|
||||
logger::log_info("Writer shutting down...").await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Periodic checkpoint trigger
|
||||
if updates_since_checkpoint >= CHECKPOINT_INTERVAL {
|
||||
let _ = write_tx.send(LogCommand::Checkpoint).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Final fsync
|
||||
let _ = log_file.flush().await;
|
||||
let _ = log_file.sync_data().await;
|
||||
|
||||
logger::log_info(&format!(
|
||||
"Writer finished: {} total ({} new, {} updated)",
|
||||
count, new_count, updated_count
|
||||
)).await;
|
||||
|
||||
(count, new_count, updated_count)
|
||||
});
|
||||
|
||||
// === MAIN PROCESSING LOOP ===
|
||||
let total = securities.len();
|
||||
logger::log_info(&format!("Processing {} companies with concurrency limit {}", total, CONCURRENCY_LIMIT)).await;
|
||||
|
||||
let mut tasks = FuturesUnordered::new();
|
||||
|
||||
// Build initial pending list with proper filtering
|
||||
let mut pending: Vec<(String, CompanyInfo)> = securities.iter()
|
||||
.filter(|(name, info)| company_needs_processing(name, info, &existing_companies))
|
||||
.map(|(name, info)| (name.clone(), info.clone()))
|
||||
.collect();
|
||||
|
||||
logger::log_info(&format!(
|
||||
"Initial scan: {} companies need processing ({} already complete)",
|
||||
pending.len(),
|
||||
total - pending.len()
|
||||
)).await;
|
||||
|
||||
let mut processed = 0;
|
||||
let mut hard_reset_count = 0;
|
||||
|
||||
// Spawn initial batch
|
||||
for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) {
|
||||
if let Some((name, company_info)) = pending.pop() {
|
||||
let current_pool = {
|
||||
let pool_guard = pool_mutex.lock().await;
|
||||
Arc::clone(&*pool_guard)
|
||||
};
|
||||
|
||||
let existing = existing_companies.get(&name).cloned();
|
||||
let shutdown_flag_clone = Arc::clone(shutdown_flag);
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
process_single_company_validated(
|
||||
name,
|
||||
company_info,
|
||||
existing,
|
||||
¤t_pool,
|
||||
&shutdown_flag_clone,
|
||||
).await
|
||||
});
|
||||
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
|
||||
// Process results and spawn new tasks
|
||||
while let Some(task_result) = tasks.next().await {
|
||||
// Check for shutdown
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_warn("Shutdown signal received, stopping processing").await;
|
||||
break;
|
||||
}
|
||||
|
||||
match task_result {
|
||||
Ok(Ok(Some(result))) => {
|
||||
// Success: send to writer
|
||||
let _ = write_tx_for_writer.send(LogCommand::Write(result.company)).await;
|
||||
processed += 1;
|
||||
|
||||
// Log progress every 100 companies
|
||||
if processed % 100 == 0 {
|
||||
logger::log_info(&format!(
|
||||
"Progress: {}/{} companies processed ({} resets)",
|
||||
processed,
|
||||
total,
|
||||
hard_reset_count
|
||||
)).await;
|
||||
}
|
||||
|
||||
// Spawn next task if available
|
||||
if let Some((name, company_info)) = pending.pop() {
|
||||
let current_pool = {
|
||||
let pool_guard = pool_mutex.lock().await;
|
||||
Arc::clone(&*pool_guard)
|
||||
};
|
||||
|
||||
let existing = existing_companies.get(&name).cloned();
|
||||
let shutdown_flag_clone = Arc::clone(shutdown_flag);
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
process_single_company_validated(
|
||||
name,
|
||||
company_info,
|
||||
existing,
|
||||
¤t_pool,
|
||||
&shutdown_flag_clone,
|
||||
).await
|
||||
});
|
||||
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
Ok(Ok(None)) => {
|
||||
// No result (shutdown or skip)
|
||||
processed += 1;
|
||||
|
||||
if let Some((name, company_info)) = pending.pop() {
|
||||
let current_pool = {
|
||||
let pool_guard = pool_mutex.lock().await;
|
||||
Arc::clone(&*pool_guard)
|
||||
};
|
||||
|
||||
let existing = existing_companies.get(&name).cloned();
|
||||
let shutdown_flag_clone = Arc::clone(shutdown_flag);
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
process_single_company_validated(
|
||||
name,
|
||||
company_info,
|
||||
existing,
|
||||
¤t_pool,
|
||||
&shutdown_flag_clone,
|
||||
).await
|
||||
});
|
||||
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
let error_msg = e.to_string();
|
||||
|
||||
if error_msg.contains("HARD_RESET_REQUIRED") {
|
||||
// Check if reset already in progress (race condition protection)
|
||||
let mut reset_lock = reset_in_progress.lock().await;
|
||||
if *reset_lock {
|
||||
logger::log_info("Hard reset already in progress, skipping duplicate").await;
|
||||
processed += 1;
|
||||
continue;
|
||||
}
|
||||
*reset_lock = true;
|
||||
drop(reset_lock); // Release lock during reset
|
||||
|
||||
logger::log_error("🔴 HARD RESET THRESHOLD REACHED - INITIATING RESET SEQUENCE").await;
|
||||
logger::log_warn("Draining active tasks before hard reset...").await;
|
||||
|
||||
// Save remaining pending count
|
||||
let remaining_count = pending.len();
|
||||
|
||||
// Stop spawning new tasks
|
||||
pending.clear();
|
||||
|
||||
// Wait for all active tasks to complete
|
||||
let mut drained = 0;
|
||||
while let Some(_) = tasks.next().await {
|
||||
drained += 1;
|
||||
if drained % 10 == 0 {
|
||||
logger::log_info(&format!("Drained {} tasks...", drained)).await;
|
||||
}
|
||||
}
|
||||
|
||||
logger::log_info(&format!(
|
||||
"All tasks drained ({} active). {} companies need reprocessing.",
|
||||
drained,
|
||||
remaining_count
|
||||
)).await;
|
||||
|
||||
// Perform the actual hard reset
|
||||
match perform_hard_reset(&pool_mutex, config, paths, monitoring, shutdown_flag).await {
|
||||
Ok(()) => {
|
||||
logger::log_info("✅ Hard reset completed successfully").await;
|
||||
hard_reset_count += 1;
|
||||
|
||||
// Reset the error counter
|
||||
{
|
||||
let pool_guard = pool_mutex.lock().await;
|
||||
let current_pool = Arc::clone(&*pool_guard);
|
||||
current_pool.get_reset_controller().reset();
|
||||
}
|
||||
logger::log_info("✓ Error counter cleared").await;
|
||||
|
||||
// Rebuild pending list by checking which companies need processing
|
||||
logger::log_info("Rebuilding pending queue with proper Yahoo data checks...").await;
|
||||
|
||||
// Get current state of written companies
|
||||
let current_existing = {
|
||||
let companies = existing_companies_writer.lock().await;
|
||||
companies.clone()
|
||||
};
|
||||
|
||||
// Reload all securities from disk (checkpoint + log)
|
||||
logger::log_info("Reloading securities from JSONL...").await;
|
||||
let all_securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?;
|
||||
logger::log_info(&format!("Reloaded {} companies", all_securities.len())).await;
|
||||
|
||||
// Build pending list: only companies that need processing
|
||||
pending = all_securities.iter()
|
||||
.filter(|(name, info)| company_needs_processing(name, info, ¤t_existing))
|
||||
.map(|(name, info)| (name.clone(), info.clone()))
|
||||
.collect();
|
||||
|
||||
logger::log_info(&format!(
|
||||
"Restarting with {} remaining companies (out of {} total)",
|
||||
pending.len(),
|
||||
total
|
||||
)).await;
|
||||
|
||||
// Only continue if there's work to do
|
||||
if pending.is_empty() {
|
||||
logger::log_info("All companies have complete data, exiting").await;
|
||||
|
||||
// Clear reset flag
|
||||
let mut reset_lock = reset_in_progress.lock().await;
|
||||
*reset_lock = false;
|
||||
drop(reset_lock);
|
||||
|
||||
break; // Exit main loop
|
||||
}
|
||||
|
||||
// Respawn initial batch with NEW pool
|
||||
for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) {
|
||||
if let Some((name, company_info)) = pending.pop() {
|
||||
let current_pool = {
|
||||
let pool_guard = pool_mutex.lock().await;
|
||||
Arc::clone(&*pool_guard)
|
||||
};
|
||||
|
||||
let existing = existing_companies.get(&name).cloned();
|
||||
let shutdown_flag_clone = Arc::clone(shutdown_flag);
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
process_single_company_validated(
|
||||
name,
|
||||
company_info,
|
||||
existing,
|
||||
¤t_pool,
|
||||
&shutdown_flag_clone,
|
||||
).await
|
||||
});
|
||||
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
|
||||
// Clear reset flag
|
||||
let mut reset_lock = reset_in_progress.lock().await;
|
||||
*reset_lock = false;
|
||||
drop(reset_lock);
|
||||
|
||||
// ✅ Continue processing (don't spawn duplicate task)
|
||||
continue;
|
||||
}
|
||||
Err(reset_err) => {
|
||||
logger::log_error(&format!("Hard reset failed: {}", reset_err)).await;
|
||||
|
||||
// Clear reset flag
|
||||
let mut reset_lock = reset_in_progress.lock().await;
|
||||
*reset_lock = false;
|
||||
drop(reset_lock);
|
||||
|
||||
// Exit if hard reset fails
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Regular error
|
||||
logger::log_warn(&format!("Company processing error: {}", error_msg)).await;
|
||||
processed += 1;
|
||||
|
||||
// Spawn next task
|
||||
if let Some((name, company_info)) = pending.pop() {
|
||||
let current_pool = {
|
||||
let pool_guard = pool_mutex.lock().await;
|
||||
Arc::clone(&*pool_guard)
|
||||
};
|
||||
|
||||
let existing = existing_companies.get(&name).cloned();
|
||||
let shutdown_flag_clone = Arc::clone(shutdown_flag);
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
process_single_company_validated(
|
||||
name,
|
||||
company_info,
|
||||
existing,
|
||||
¤t_pool,
|
||||
&shutdown_flag_clone,
|
||||
).await
|
||||
});
|
||||
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Task panic
|
||||
logger::log_error(&format!("Task panic: {}", e)).await;
|
||||
processed += 1;
|
||||
|
||||
// Spawn next task
|
||||
if let Some((name, company_info)) = pending.pop() {
|
||||
let current_pool = {
|
||||
let pool_guard = pool_mutex.lock().await;
|
||||
Arc::clone(&*pool_guard)
|
||||
};
|
||||
|
||||
let existing = existing_companies.get(&name).cloned();
|
||||
let shutdown_flag_clone = Arc::clone(shutdown_flag);
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
process_single_company_validated(
|
||||
name,
|
||||
company_info,
|
||||
existing,
|
||||
¤t_pool,
|
||||
&shutdown_flag_clone,
|
||||
).await
|
||||
});
|
||||
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger::log_info("Main processing loop completed").await;
|
||||
|
||||
// Signal writer to finish
|
||||
let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await;
|
||||
let _ = write_tx_for_writer.send(LogCommand::Shutdown).await;
|
||||
drop(write_tx_for_writer);
|
||||
|
||||
// Wait for writer to finish
|
||||
let (final_count, final_new, final_updated) = writer_task.await
|
||||
.unwrap_or((0, 0, 0));
|
||||
|
||||
logger::log_info(&format!(
|
||||
"✅ Completed: {} total companies ({} new, {} updated, {} hard resets)",
|
||||
final_count, final_new, final_updated, hard_reset_count
|
||||
)).await;
|
||||
|
||||
Ok(final_count)
|
||||
}
|
||||
|
||||
/// Loads CompanyInfo securities from checkpoint and log JSONL files
|
||||
async fn load_securities_from_jsonl(
|
||||
checkpoint_path: &std::path::Path,
|
||||
log_path: &std::path::Path,
|
||||
) -> anyhow::Result<HashMap<String, CompanyInfo>> {
|
||||
let mut securities: HashMap<String, CompanyInfo> = HashMap::new();
|
||||
|
||||
// Load checkpoint
|
||||
if checkpoint_path.exists() {
|
||||
let content = tokio::fs::read_to_string(checkpoint_path).await?;
|
||||
|
||||
for (line_num, line) in content.lines().enumerate() {
|
||||
if line.trim().is_empty() || !line.ends_with('}') {
|
||||
continue; // Skip incomplete lines
|
||||
}
|
||||
|
||||
match serde_json::from_str::<CompanyInfo>(line) {
|
||||
Ok(company_info) => {
|
||||
securities.insert(company_info.name.clone(), company_info);
|
||||
}
|
||||
Err(e) => {
|
||||
logger::log_warn(&format!(
|
||||
"Skipping invalid line {} in checkpoint: {}",
|
||||
line_num + 1, e
|
||||
)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Replay log (overwrites checkpoint entries if they exist)
|
||||
if log_path.exists() {
|
||||
let content = tokio::fs::read_to_string(log_path).await?;
|
||||
|
||||
for (line_num, line) in content.lines().enumerate() {
|
||||
if line.trim().is_empty() || !line.ends_with('}') {
|
||||
continue; // Skip incomplete lines
|
||||
}
|
||||
|
||||
match serde_json::from_str::<CompanyInfo>(line) {
|
||||
Ok(company_info) => {
|
||||
securities.insert(company_info.name.clone(), company_info);
|
||||
}
|
||||
Err(e) => {
|
||||
logger::log_warn(&format!(
|
||||
"Skipping invalid line {} in log: {}",
|
||||
line_num + 1, e
|
||||
)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(securities)
|
||||
}
|
||||
|
||||
/// Scrape with retry, validation, and shutdown awareness
|
||||
async fn scrape_with_retry(
|
||||
pool: &Arc<ChromeDriverPool>,
|
||||
isin: &str,
|
||||
max_retries: u32,
|
||||
shutdown_flag: &Arc<AtomicBool>,
|
||||
) -> Result<Option<YahooCompanyDetails>> {
|
||||
let mut retries = 0;
|
||||
|
||||
loop {
|
||||
// Check shutdown before each attempt
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
return Err(anyhow!("Aborted due to shutdown"));
|
||||
}
|
||||
|
||||
if pool.should_perform_hard_reset() {
|
||||
logger::log_error("HARD_RESET_REQUIRED detected before scrape attempt").await;
|
||||
return Err(anyhow!("HARD_RESET_REQUIRED"));
|
||||
}
|
||||
|
||||
match scrape_company_details_by_isin(pool, isin, shutdown_flag).await {
|
||||
Ok(result) => return Ok(result),
|
||||
Err(e) => {
|
||||
// Check if this is a hard reset required error
|
||||
let error_msg = e.to_string();
|
||||
if error_msg.contains("HARD_RESET_REQUIRED") {
|
||||
logger::log_error(&format!(
|
||||
"Hard reset required error for ISIN {}, propagating immediately",
|
||||
isin
|
||||
)).await;
|
||||
return Err(e); // Propagate immediately, don't retry
|
||||
}
|
||||
|
||||
if retries >= max_retries {
|
||||
logger::log_error(&format!(
|
||||
"All {} retries exhausted for ISIN {}: {}",
|
||||
max_retries, isin, e
|
||||
)).await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
let backoff_ms = 1000 * 2u64.pow(retries);
|
||||
let jitter_ms = random_range(0, 500);
|
||||
let total_delay = backoff_ms + jitter_ms;
|
||||
|
||||
logger::log_warn(&format!(
|
||||
"Retry {}/{} for ISIN {} after {}ms: {}",
|
||||
retries + 1, max_retries, isin, total_delay, e
|
||||
)).await;
|
||||
|
||||
sleep(Duration::from_millis(total_delay)).await;
|
||||
retries += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Process single company with validation and shutdown checks
|
||||
async fn process_single_company_validated(
|
||||
name: String,
|
||||
company_info: CompanyInfo,
|
||||
existing_entry: Option<CompanyCrossPlatformInfo>,
|
||||
pool: &Arc<ChromeDriverPool>,
|
||||
shutdown_flag: &Arc<AtomicBool>,
|
||||
) -> anyhow::Result<Option<CompanyProcessResult>> {
|
||||
// Check shutdown at start
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_warn(&format!("Shutdown detected, skipping company: {}", name)).await;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let is_update = existing_entry.is_some();
|
||||
|
||||
let mut isin_tickers_map: HashMap<String, Vec<String>> =
|
||||
existing_entry
|
||||
.as_ref()
|
||||
.map(|e| e.isin_tickers_map.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone());
|
||||
let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone());
|
||||
|
||||
// Collect unique ISIN-ticker pairs
|
||||
let mut unique_isin_ticker_pairs: HashMap<String, Vec<String>> = HashMap::new();
|
||||
|
||||
for figi_infos in company_info.securities.values() {
|
||||
for figi_info in figi_infos {
|
||||
if !figi_info.isin.is_empty() {
|
||||
let tickers = unique_isin_ticker_pairs
|
||||
.entry(figi_info.isin.clone())
|
||||
.or_insert_with(Vec::new);
|
||||
|
||||
if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) {
|
||||
tickers.push(figi_info.ticker.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process each ISIN independently with per-ISIN status checking
|
||||
for (isin, figi_tickers) in unique_isin_ticker_pairs {
|
||||
// Check shutdown before each ISIN
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_warn(&format!(
|
||||
"Shutdown detected while processing company: {}",
|
||||
name
|
||||
)).await;
|
||||
break;
|
||||
}
|
||||
|
||||
let tickers = isin_tickers_map
|
||||
.entry(isin.clone())
|
||||
.or_insert_with(Vec::new);
|
||||
|
||||
for figi_ticker in figi_tickers {
|
||||
if !tickers.contains(&figi_ticker) {
|
||||
tickers.push(figi_ticker);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if THIS SPECIFIC ISIN has valid Yahoo data (not ERROR)
|
||||
let has_valid_yahoo = tickers.iter().any(|t| {
|
||||
t.starts_with("YAHOO:") && t != "YAHOO:ERROR"
|
||||
// Note: YAHOO:NO_RESULTS is valid (legitimately not found)
|
||||
});
|
||||
|
||||
if !has_valid_yahoo {
|
||||
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
|
||||
tickers.retain(|t| !t.starts_with("YAHOO:"));
|
||||
|
||||
match scrape_with_retry(pool, &isin, 3, shutdown_flag).await {
|
||||
Ok(Some(details)) => {
|
||||
logger::log_info(&format!(
|
||||
"✓ Found Yahoo ticker {} for ISIN {} (company: {})",
|
||||
details.ticker, isin, name
|
||||
)).await;
|
||||
|
||||
tickers.push(format!("YAHOO:{}", details.ticker));
|
||||
|
||||
if sector.is_none() && details.sector.is_some() {
|
||||
sector = details.sector.clone();
|
||||
logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await;
|
||||
}
|
||||
|
||||
if exchange.is_none() && details.exchange.is_some() {
|
||||
exchange = details.exchange.clone();
|
||||
logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await;
|
||||
}
|
||||
},
|
||||
Ok(None) => {
|
||||
logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await;
|
||||
tickers.push("YAHOO:NO_RESULTS".to_string());
|
||||
},
|
||||
Err(e) => {
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_warn(&format!("Shutdown during scrape for ISIN {}", isin)).await;
|
||||
break;
|
||||
}
|
||||
|
||||
// Check if this is a hard reset required error
|
||||
let error_msg = e.to_string();
|
||||
if error_msg.contains("HARD_RESET_REQUIRED") {
|
||||
logger::log_error(&format!(
|
||||
"Hard reset required during ISIN {} processing, propagating error",
|
||||
isin
|
||||
)).await;
|
||||
return Err(e); // ← CRITICAL: Propagate immediately
|
||||
}
|
||||
|
||||
logger::log_warn(&format!(
|
||||
"✗ Yahoo lookup error for ISIN {} (company: {}): {}",
|
||||
isin, name, e
|
||||
)).await;
|
||||
|
||||
// Mark this ISIN as failed to enable retry
|
||||
tickers.push("YAHOO:ERROR".to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Final shutdown check before returning result
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_warn(&format!(
|
||||
"Shutdown detected, discarding incomplete result for: {}",
|
||||
name
|
||||
)).await;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if pool.should_perform_hard_reset() {
|
||||
logger::log_error("HARD_RESET_REQUIRED detected during company processing").await;
|
||||
return Err(anyhow!("HARD_RESET_REQUIRED"));
|
||||
}
|
||||
|
||||
if !isin_tickers_map.is_empty() {
|
||||
let company_entry = CompanyCrossPlatformInfo {
|
||||
name: name.clone(),
|
||||
isin_tickers_map,
|
||||
sector,
|
||||
exchange,
|
||||
};
|
||||
|
||||
Ok(Some(CompanyProcessResult {
|
||||
company: company_entry,
|
||||
is_update,
|
||||
}))
|
||||
} else {
|
||||
logger::log_warn(&format!("No ISINs found for company: {}", name)).await;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user