added event enrichment
This commit is contained in:
@@ -2,15 +2,18 @@
|
|||||||
pub mod types;
|
pub mod types;
|
||||||
pub mod scraper;
|
pub mod scraper;
|
||||||
pub mod storage;
|
pub mod storage;
|
||||||
pub mod update;
|
|
||||||
pub mod helpers;
|
pub mod helpers;
|
||||||
pub mod aggregation;
|
pub mod aggregation;
|
||||||
pub mod fx;
|
pub mod fx;
|
||||||
pub mod openfigi;
|
pub mod openfigi;
|
||||||
pub mod yahoo;
|
pub mod yahoo;
|
||||||
pub mod update_companies;
|
|
||||||
pub mod update_companies_cleanse;
|
|
||||||
pub mod page_validation;
|
pub mod page_validation;
|
||||||
pub mod atomic_writer;
|
pub mod atomic_writer;
|
||||||
|
|
||||||
|
// Corporate update modules
|
||||||
|
pub mod update;
|
||||||
|
pub mod update_companies;
|
||||||
|
pub mod update_companies_cleanse;
|
||||||
|
pub mod update_companies_enrich;
|
||||||
|
|
||||||
pub use update::run_full_update;
|
pub use update::run_full_update;
|
||||||
@@ -3,6 +3,7 @@ use super::{scraper::*, storage::*, openfigi::*};
|
|||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel;
|
use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel;
|
||||||
use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data};
|
use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data};
|
||||||
|
use crate::corporate::update_companies_enrich::enrich_companies_with_events;
|
||||||
use crate::util::directories::DataPaths;
|
use crate::util::directories::DataPaths;
|
||||||
use crate::util::logger;
|
use crate::util::logger;
|
||||||
use crate::scraper::webdriver::ChromeDriverPool;
|
use crate::scraper::webdriver::ChromeDriverPool;
|
||||||
@@ -87,16 +88,16 @@ pub async fn run_full_update(
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
logger::log_info("Step 6: Cleansing up companies with missing essential data...").await;
|
logger::log_info("Step 6: Cleansing companies with missing essential data...").await;
|
||||||
let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?;
|
let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?;
|
||||||
logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await;
|
logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await;
|
||||||
|
|
||||||
if shutdown_flag.load(Ordering::SeqCst) {
|
if shutdown_flag.load(Ordering::SeqCst) {
|
||||||
logger::log_warn("Shutdown detected after companies.jsonl build").await;
|
logger::log_warn("Shutdown detected after no-data cleansing").await;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
logger::log_info("Step 7: Cleansing up companies with too low profile (with abort-safe persistence)...").await;
|
logger::log_info("Step 7: Cleansing companies with too low profile (with abort-safe persistence)...").await;
|
||||||
let proxy_pool = pool.get_proxy_pool()
|
let proxy_pool = pool.get_proxy_pool()
|
||||||
.ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?;
|
.ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?;
|
||||||
|
|
||||||
@@ -104,11 +105,25 @@ pub async fn run_full_update(
|
|||||||
let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?);
|
let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?);
|
||||||
logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await;
|
logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await;
|
||||||
|
|
||||||
let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool, shutdown_flag).await?;
|
let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
|
||||||
logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await;
|
logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await;
|
||||||
|
|
||||||
|
if shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
logger::log_warn("Shutdown detected after low-profile cleansing").await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
logger::log_info("Step 8: Enriching companies with Yahoo Events (with abort-safe persistence)...").await;
|
||||||
|
let enriched_count = enrich_companies_with_events(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
|
||||||
|
logger::log_info(&format!(" ✓ {} companies enriched with event data", enriched_count)).await;
|
||||||
|
|
||||||
|
if shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
logger::log_warn("Shutdown detected after event enrichment").await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
if !shutdown_flag.load(Ordering::SeqCst) {
|
if !shutdown_flag.load(Ordering::SeqCst) {
|
||||||
logger::log_info("Step 8: Processing events (using index)...").await;
|
logger::log_info("Step 9: Processing events (using index)...").await;
|
||||||
let _event_index = build_event_index(&paths).await?;
|
let _event_index = build_event_index(&paths).await?;
|
||||||
logger::log_info(" ✓ Event index built").await;
|
logger::log_info(" ✓ Event index built").await;
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -174,7 +174,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize
|
|||||||
/// - Batched fsync (every 10 writes or 10 seconds)
|
/// - Batched fsync (every 10 writes or 10 seconds)
|
||||||
pub async fn companies_yahoo_cleansed_low_profile(
|
pub async fn companies_yahoo_cleansed_low_profile(
|
||||||
paths: &DataPaths,
|
paths: &DataPaths,
|
||||||
config: &Config,
|
_config: &Config,
|
||||||
yahoo_pool: Arc<YahooClientPool>,
|
yahoo_pool: Arc<YahooClientPool>,
|
||||||
shutdown_flag: &Arc<AtomicBool>,
|
shutdown_flag: &Arc<AtomicBool>,
|
||||||
) -> anyhow::Result<usize> {
|
) -> anyhow::Result<usize> {
|
||||||
|
|||||||
@@ -1,22 +1,22 @@
|
|||||||
// src/corporate/update_companies_enrich.rs
|
// src/corporate/update_companies_enrich_events.rs
|
||||||
use super::{helpers::*, types::*};
|
use super::{types::*};
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::util::directories::DataPaths;
|
use crate::util::directories::DataPaths;
|
||||||
use crate::util::logger;
|
use crate::util::logger;
|
||||||
use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule};
|
use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule};
|
||||||
|
|
||||||
use std::result::Result::Ok;
|
use std::result::Result::Ok;
|
||||||
use chrono::{Local, Utc};
|
use chrono::Utc;
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use tokio::fs::{File, OpenOptions};
|
use tokio::fs::{OpenOptions};
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncWriteExt};
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
/// Yahoo enriching data per corporate
|
/// Yahoo Event enrichment per corporate company
|
||||||
///
|
///
|
||||||
/// # Features
|
/// # Features
|
||||||
/// - Graceful shutdown (abort-safe)
|
/// - Graceful shutdown (abort-safe)
|
||||||
@@ -28,7 +28,515 @@ use tokio::sync::mpsc;
|
|||||||
///
|
///
|
||||||
/// # Persistence Strategy
|
/// # Persistence Strategy
|
||||||
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
|
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
|
||||||
/// - Log: companies_update.log (append-only updates)
|
/// - Log: companies_events_updates.log (append-only updates)
|
||||||
/// - On restart: Load checkpoint + replay log
|
/// - On restart: Load checkpoint + replay log
|
||||||
/// - Periodic checkpoints (every 50 companies)
|
/// - Periodic checkpoints (every 50 companies)
|
||||||
/// - Batched fsync (every 10 writes or 10 seconds)
|
/// - Batched fsync (every 10 writes or 10 seconds)
|
||||||
|
pub async fn enrich_companies_with_events(
|
||||||
|
paths: &DataPaths,
|
||||||
|
_config: &Config,
|
||||||
|
yahoo_pool: Arc<YahooClientPool>,
|
||||||
|
shutdown_flag: &Arc<AtomicBool>,
|
||||||
|
) -> anyhow::Result<usize> {
|
||||||
|
// Configuration constants
|
||||||
|
const CHECKPOINT_INTERVAL: usize = 50;
|
||||||
|
const FSYNC_BATCH_SIZE: usize = 10;
|
||||||
|
const FSYNC_INTERVAL_SECS: u64 = 10;
|
||||||
|
const CONCURRENCY_LIMIT: usize = 50; // Limit parallel enrichment tasks
|
||||||
|
|
||||||
|
let data_path = paths.data_dir();
|
||||||
|
|
||||||
|
// File paths
|
||||||
|
let input_path = data_path.join("companies_yahoo_cleaned.jsonl");
|
||||||
|
let log_path = data_path.join("companies_events_updates.log");
|
||||||
|
let state_path = data_path.join("state.jsonl");
|
||||||
|
|
||||||
|
// Check input exists
|
||||||
|
if !input_path.exists() {
|
||||||
|
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping event enrichment").await;
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if already completed
|
||||||
|
if state_path.exists() {
|
||||||
|
let state_content = tokio::fs::read_to_string(&state_path).await?;
|
||||||
|
|
||||||
|
for line in state_content.lines() {
|
||||||
|
if line.trim().is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
|
||||||
|
if state.get("yahoo_events_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) {
|
||||||
|
logger::log_info(" Yahoo events enrichment already completed").await;
|
||||||
|
|
||||||
|
// Count enriched companies
|
||||||
|
let count = count_enriched_companies(paths).await?;
|
||||||
|
logger::log_info(&format!(" ✓ Found {} companies with event data", count)).await;
|
||||||
|
return Ok(count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// === RECOVERY PHASE: Track enriched companies ===
|
||||||
|
let mut enriched_companies: HashSet<String> = HashSet::new();
|
||||||
|
|
||||||
|
if log_path.exists() {
|
||||||
|
logger::log_info("Loading enrichment progress from log...").await;
|
||||||
|
let log_content = tokio::fs::read_to_string(&log_path).await?;
|
||||||
|
|
||||||
|
for line in log_content.lines() {
|
||||||
|
if line.trim().is_empty() || !line.ends_with('}') {
|
||||||
|
continue; // Skip incomplete lines
|
||||||
|
}
|
||||||
|
|
||||||
|
match serde_json::from_str::<serde_json::Value>(line) {
|
||||||
|
Ok(entry) => {
|
||||||
|
if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) {
|
||||||
|
if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") {
|
||||||
|
enriched_companies.insert(name.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
logger::log_warn(&format!("Skipping invalid log line: {}", e)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load all companies from input
|
||||||
|
logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await;
|
||||||
|
let companies = load_companies_from_jsonl(&input_path).await?;
|
||||||
|
let total_companies = companies.len();
|
||||||
|
logger::log_info(&format!("Found {} companies to process", total_companies)).await;
|
||||||
|
|
||||||
|
// Filter companies that need enrichment
|
||||||
|
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies
|
||||||
|
.into_iter()
|
||||||
|
.filter(|company| !enriched_companies.contains(&company.name))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let pending_count = pending_companies.len();
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" {} already enriched, {} pending",
|
||||||
|
enriched_companies.len(),
|
||||||
|
pending_count
|
||||||
|
)).await;
|
||||||
|
|
||||||
|
if pending_count == 0 {
|
||||||
|
logger::log_info(" ✓ All companies already enriched").await;
|
||||||
|
mark_enrichment_complete(&state_path).await?;
|
||||||
|
return Ok(enriched_companies.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
// === PROCESSING PHASE: Enrich companies with events ===
|
||||||
|
|
||||||
|
// Shared counters
|
||||||
|
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||||
|
let success_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||||
|
let failed_count = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
// Log writer channel with batching and fsync
|
||||||
|
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000);
|
||||||
|
|
||||||
|
// Spawn log writer task
|
||||||
|
let log_writer_handle = {
|
||||||
|
let log_path = log_path.clone();
|
||||||
|
let processed_count = Arc::clone(&processed_count);
|
||||||
|
let total_companies = total_companies;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut log_file = OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(&log_path)
|
||||||
|
.await
|
||||||
|
.expect("Failed to open log file");
|
||||||
|
|
||||||
|
let mut write_count = 0;
|
||||||
|
let mut last_fsync = tokio::time::Instant::now();
|
||||||
|
|
||||||
|
while let Some(cmd) = log_rx.recv().await {
|
||||||
|
match cmd {
|
||||||
|
LogCommand::Write(entry) => {
|
||||||
|
let json_line = serde_json::to_string(&entry).expect("Serialization failed");
|
||||||
|
log_file.write_all(json_line.as_bytes()).await.expect("Write failed");
|
||||||
|
log_file.write_all(b"\n").await.expect("Write failed");
|
||||||
|
|
||||||
|
write_count += 1;
|
||||||
|
|
||||||
|
// Batched fsync
|
||||||
|
if write_count >= FSYNC_BATCH_SIZE
|
||||||
|
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS
|
||||||
|
{
|
||||||
|
log_file.flush().await.expect("Flush failed");
|
||||||
|
log_file.sync_all().await.expect("Fsync failed");
|
||||||
|
write_count = 0;
|
||||||
|
last_fsync = tokio::time::Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LogCommand::Checkpoint => {
|
||||||
|
// Force fsync on checkpoint
|
||||||
|
log_file.flush().await.expect("Flush failed");
|
||||||
|
log_file.sync_all().await.expect("Fsync failed");
|
||||||
|
write_count = 0;
|
||||||
|
last_fsync = tokio::time::Instant::now();
|
||||||
|
|
||||||
|
let current = processed_count.load(Ordering::SeqCst);
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" Checkpoint: {}/{} companies processed",
|
||||||
|
current, total_companies
|
||||||
|
)).await;
|
||||||
|
}
|
||||||
|
LogCommand::Shutdown => {
|
||||||
|
// Final fsync before shutdown
|
||||||
|
log_file.flush().await.expect("Flush failed");
|
||||||
|
log_file.sync_all().await.expect("Fsync failed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
// Process companies concurrently with task panic isolation
|
||||||
|
let mut tasks = FuturesUnordered::new();
|
||||||
|
let mut pending_iter = pending_companies.into_iter();
|
||||||
|
let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT));
|
||||||
|
|
||||||
|
// Initial batch of tasks
|
||||||
|
for _ in 0..CONCURRENCY_LIMIT.min(pending_count) {
|
||||||
|
if let Some(company) = pending_iter.next() {
|
||||||
|
let task = spawn_enrichment_task(
|
||||||
|
company,
|
||||||
|
Arc::clone(&yahoo_pool),
|
||||||
|
paths.clone(),
|
||||||
|
Arc::clone(&processed_count),
|
||||||
|
Arc::clone(&success_count),
|
||||||
|
Arc::clone(&failed_count),
|
||||||
|
log_tx.clone(),
|
||||||
|
Arc::clone(&semaphore),
|
||||||
|
Arc::clone(shutdown_flag),
|
||||||
|
);
|
||||||
|
tasks.push(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process results and spawn new tasks
|
||||||
|
let mut checkpoint_counter = enriched_companies.len();
|
||||||
|
|
||||||
|
while let Some(result) = tasks.next().await {
|
||||||
|
// Handle task result (even if panicked)
|
||||||
|
match result {
|
||||||
|
Ok(_) => {
|
||||||
|
// Task completed successfully
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
logger::log_warn(&format!("Task panicked: {}", e)).await;
|
||||||
|
failed_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for shutdown
|
||||||
|
if shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
logger::log_warn("Shutdown detected, stopping new enrichment tasks...").await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn next task
|
||||||
|
if let Some(company) = pending_iter.next() {
|
||||||
|
let task = spawn_enrichment_task(
|
||||||
|
company,
|
||||||
|
Arc::clone(&yahoo_pool),
|
||||||
|
paths.clone(),
|
||||||
|
Arc::clone(&processed_count),
|
||||||
|
Arc::clone(&success_count),
|
||||||
|
Arc::clone(&failed_count),
|
||||||
|
log_tx.clone(),
|
||||||
|
Arc::clone(&semaphore),
|
||||||
|
Arc::clone(shutdown_flag),
|
||||||
|
);
|
||||||
|
tasks.push(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Periodic checkpoint
|
||||||
|
checkpoint_counter += 1;
|
||||||
|
if checkpoint_counter % CHECKPOINT_INTERVAL == 0 {
|
||||||
|
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown log writer
|
||||||
|
let _ = log_tx.send(LogCommand::Shutdown).await;
|
||||||
|
drop(log_tx);
|
||||||
|
|
||||||
|
// Wait for log writer to finish
|
||||||
|
let _ = log_writer_handle.await;
|
||||||
|
|
||||||
|
let final_processed = processed_count.load(Ordering::SeqCst);
|
||||||
|
let final_success = success_count.load(Ordering::SeqCst);
|
||||||
|
let final_failed = failed_count.load(Ordering::SeqCst);
|
||||||
|
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" Event enrichment summary: {} total, {} success, {} failed",
|
||||||
|
final_processed, final_success, final_failed
|
||||||
|
)).await;
|
||||||
|
|
||||||
|
// Mark as complete if all companies processed
|
||||||
|
if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
mark_enrichment_complete(&state_path).await?;
|
||||||
|
logger::log_info(" ✓ Event enrichment marked as complete").await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(final_success)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a single enrichment task with panic isolation
|
||||||
|
fn spawn_enrichment_task(
|
||||||
|
company: CompanyCrossPlatformInfo,
|
||||||
|
yahoo_pool: Arc<YahooClientPool>,
|
||||||
|
paths: DataPaths,
|
||||||
|
processed_count: Arc<AtomicUsize>,
|
||||||
|
success_count: Arc<AtomicUsize>,
|
||||||
|
failed_count: Arc<AtomicUsize>,
|
||||||
|
log_tx: mpsc::Sender<LogCommand>,
|
||||||
|
semaphore: Arc<tokio::sync::Semaphore>,
|
||||||
|
shutdown_flag: Arc<AtomicBool>,
|
||||||
|
) -> tokio::task::JoinHandle<()> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Acquire semaphore permit
|
||||||
|
let _permit = semaphore.acquire().await.expect("Semaphore closed");
|
||||||
|
|
||||||
|
// Check shutdown before processing
|
||||||
|
if shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process company
|
||||||
|
let result = enrich_company_with_events(
|
||||||
|
&company,
|
||||||
|
&yahoo_pool,
|
||||||
|
&paths,
|
||||||
|
).await;
|
||||||
|
|
||||||
|
// Update counters
|
||||||
|
processed_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
|
||||||
|
let status = match result {
|
||||||
|
Ok(_) => {
|
||||||
|
success_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
"enriched"
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
failed_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
logger::log_warn(&format!(
|
||||||
|
" Failed to enrich {}: {}",
|
||||||
|
company.name, e
|
||||||
|
)).await;
|
||||||
|
"failed"
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Log result
|
||||||
|
let log_entry = json!({
|
||||||
|
"company_name": company.name,
|
||||||
|
"status": status,
|
||||||
|
"timestamp": Utc::now().to_rfc3339(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ = log_tx.send(LogCommand::Write(log_entry)).await;
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enrich a single company with event data
|
||||||
|
async fn enrich_company_with_events(
|
||||||
|
company: &CompanyCrossPlatformInfo,
|
||||||
|
yahoo_pool: &Arc<YahooClientPool>,
|
||||||
|
paths: &DataPaths,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
let ticker = match extract_first_yahoo_ticker(company) {
|
||||||
|
Some(t) => t,
|
||||||
|
None => {
|
||||||
|
return Err(anyhow::anyhow!("No valid Yahoo ticker found"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Combined summary to accumulate data from all available modules
|
||||||
|
let mut combined_modules: HashMap<String, serde_json::Value> = HashMap::new();
|
||||||
|
let timestamp = chrono::Utc::now().timestamp();
|
||||||
|
|
||||||
|
// Try each event module individually
|
||||||
|
let event_modules = QuoteSummaryModule::event_modules();
|
||||||
|
|
||||||
|
for module in event_modules {
|
||||||
|
match yahoo_pool.get_quote_summary(&ticker, &[module]).await {
|
||||||
|
Ok(summary) => {
|
||||||
|
// Merge this module's data into combined summary
|
||||||
|
for (key, value) in summary.modules {
|
||||||
|
combined_modules.insert(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// Module not available - silently continue for expected errors
|
||||||
|
let err_str = e.to_string();
|
||||||
|
if err_str.contains("500") || err_str.contains("404") || err_str.contains("Not Found") {
|
||||||
|
// Expected for securities without this data - continue silently
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
// Unexpected error - log but continue trying other modules
|
||||||
|
logger::log_warn(&format!(
|
||||||
|
" Unexpected error fetching event module for {}: {}",
|
||||||
|
ticker, e
|
||||||
|
)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only save if we got at least some data
|
||||||
|
if combined_modules.is_empty() {
|
||||||
|
return Err(anyhow::anyhow!("No event data available for any module"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create combined summary with all available modules
|
||||||
|
let combined_summary = crate::scraper::yahoo::QuoteSummary {
|
||||||
|
symbol: ticker.clone(),
|
||||||
|
modules: combined_modules,
|
||||||
|
timestamp,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Save the combined event data
|
||||||
|
save_company_event_data(paths, &company.name, &combined_summary).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Save event data to company directory
|
||||||
|
async fn save_company_event_data(
|
||||||
|
paths: &DataPaths,
|
||||||
|
company_name: &str,
|
||||||
|
summary: &crate::scraper::yahoo::QuoteSummary,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
use tokio::fs;
|
||||||
|
|
||||||
|
let safe_name = sanitize_company_name(company_name);
|
||||||
|
|
||||||
|
let company_dir = paths.corporate_dir().join(&safe_name).join("events");
|
||||||
|
fs::create_dir_all(&company_dir).await?;
|
||||||
|
|
||||||
|
let data_path = company_dir.join("data.jsonl");
|
||||||
|
let json_line = serde_json::to_string(summary)?;
|
||||||
|
|
||||||
|
let mut file = fs::File::create(&data_path).await?;
|
||||||
|
file.write_all(json_line.as_bytes()).await?;
|
||||||
|
file.write_all(b"\n").await?;
|
||||||
|
file.flush().await?;
|
||||||
|
file.sync_all().await?; // Ensure data is persisted
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract first valid Yahoo ticker from company
|
||||||
|
fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option<String> {
|
||||||
|
for tickers in company.isin_tickers_map.values() {
|
||||||
|
for ticker in tickers {
|
||||||
|
if ticker.starts_with("YAHOO:")
|
||||||
|
&& ticker != "YAHOO:NO_RESULTS"
|
||||||
|
&& ticker != "YAHOO:ERROR"
|
||||||
|
{
|
||||||
|
return Some(ticker.trim_start_matches("YAHOO:").to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sanitize company name for file system
|
||||||
|
fn sanitize_company_name(name: &str) -> String {
|
||||||
|
name.replace("/", "_")
|
||||||
|
.replace("\\", "_")
|
||||||
|
.replace(":", "_")
|
||||||
|
.replace("*", "_")
|
||||||
|
.replace("?", "_")
|
||||||
|
.replace("\"", "_")
|
||||||
|
.replace("<", "_")
|
||||||
|
.replace(">", "_")
|
||||||
|
.replace("|", "_")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load companies from JSONL file
|
||||||
|
async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result<Vec<CompanyCrossPlatformInfo>> {
|
||||||
|
let content = tokio::fs::read_to_string(path).await?;
|
||||||
|
let mut companies = Vec::new();
|
||||||
|
|
||||||
|
for line in content.lines() {
|
||||||
|
if line.trim().is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Ok(company) = serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
|
||||||
|
companies.push(company);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(companies)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Count enriched companies (companies with event data)
|
||||||
|
async fn count_enriched_companies(paths: &DataPaths) -> anyhow::Result<usize> {
|
||||||
|
let corporate_dir = paths.corporate_dir();
|
||||||
|
|
||||||
|
if !corporate_dir.exists() {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut count = 0;
|
||||||
|
let mut entries = tokio::fs::read_dir(&corporate_dir).await?;
|
||||||
|
|
||||||
|
while let Some(entry) = entries.next_entry().await? {
|
||||||
|
let path = entry.path();
|
||||||
|
if path.is_dir() {
|
||||||
|
let events_dir = path.join("events");
|
||||||
|
let events_file = events_dir.join("data.jsonl");
|
||||||
|
|
||||||
|
if events_file.exists() {
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark enrichment as complete in state file
|
||||||
|
async fn mark_enrichment_complete(state_path: &std::path::Path) -> anyhow::Result<()> {
|
||||||
|
let enrichment_complete = json!({
|
||||||
|
"yahoo_events_enrichment_complete": true,
|
||||||
|
"completed_at": Utc::now().to_rfc3339(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut state_file = OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(state_path)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let state_line = serde_json::to_string(&enrichment_complete)?;
|
||||||
|
state_file.write_all(state_line.as_bytes()).await?;
|
||||||
|
state_file.write_all(b"\n").await?;
|
||||||
|
state_file.flush().await?;
|
||||||
|
state_file.sync_all().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Log command enum
|
||||||
|
enum LogCommand {
|
||||||
|
Write(serde_json::Value),
|
||||||
|
Checkpoint,
|
||||||
|
Shutdown,
|
||||||
|
}
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
// src/main.rs - FIXED: Proper temp pool cleanup
|
// src/main.rs - FIXED: Proper temp pool cleanup
|
||||||
|
|
||||||
use web_scraper::{*, scraper, economic, corporate};
|
use web_scraper::{*, scraper, corporate};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use web_scraper::config::Config;
|
use web_scraper::config::Config;
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ use std::sync::Arc;
|
|||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::sync::{Mutex, Semaphore, RwLock};
|
use tokio::sync::{Mutex, Semaphore, RwLock};
|
||||||
use tokio::time::sleep;
|
use tokio::time::{sleep, timeout};
|
||||||
use reqwest::{Client, ClientBuilder};
|
use reqwest::{Client, ClientBuilder};
|
||||||
use reqwest::Proxy as ReqwestProxy;
|
use reqwest::Proxy as ReqwestProxy;
|
||||||
use std::result::Result::Ok;
|
use std::result::Result::Ok;
|
||||||
@@ -154,9 +154,11 @@ impl QuoteSummaryModule {
|
|||||||
|
|
||||||
pub fn event_modules() -> Vec<Self> {
|
pub fn event_modules() -> Vec<Self> {
|
||||||
vec![
|
vec![
|
||||||
Self::FinancialData,
|
|
||||||
Self::CalendarEvents,
|
Self::CalendarEvents,
|
||||||
Self::SecFilings,
|
Self::SecFilings,
|
||||||
|
Self::Earnings,
|
||||||
|
Self::EarningsHistory,
|
||||||
|
Self::UpgradeDowngradeHistory,
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -277,8 +279,8 @@ impl YahooClient {
|
|||||||
|
|
||||||
let client = ClientBuilder::new()
|
let client = ClientBuilder::new()
|
||||||
.proxy(proxy)
|
.proxy(proxy)
|
||||||
.timeout(Duration::from_secs(90))
|
.timeout(Duration::from_secs(30)) // CHANGED: Reduced from 90s to 30s
|
||||||
.connect_timeout(Duration::from_secs(30))
|
.connect_timeout(Duration::from_secs(10)) // CHANGED: Reduced from 30s to 10s
|
||||||
.pool_max_idle_per_host(2)
|
.pool_max_idle_per_host(2)
|
||||||
.pool_idle_timeout(Duration::from_secs(60))
|
.pool_idle_timeout(Duration::from_secs(60))
|
||||||
.cookie_store(true)
|
.cookie_store(true)
|
||||||
@@ -314,49 +316,85 @@ impl YahooClient {
|
|||||||
is_marked_for_replacement: Arc::new(AtomicBool::new(false)),
|
is_marked_for_replacement: Arc::new(AtomicBool::new(false)),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Initialize crumb
|
// NEW: Quick proxy health check before crumb initialization
|
||||||
yahoo_client.retry_initialize_crumb(&yahoo_client).await?;
|
|
||||||
|
|
||||||
logger::log_info(&format!(
|
logger::log_info(&format!(
|
||||||
" ✓ YahooClient[{}] initialized (max_tasks: {})",
|
" YahooClient[{}] testing proxy connectivity...",
|
||||||
client_id, max_tasks_per_client
|
client_id
|
||||||
)).await;
|
)).await;
|
||||||
|
|
||||||
Ok(yahoo_client)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn retry_initialize_crumb(&self, client: &YahooClient) -> Result<()> {
|
|
||||||
let mut last_err = None;
|
|
||||||
|
|
||||||
for attempt in 1..=3 {
|
match timeout(
|
||||||
match client.initialize_crumb().await {
|
Duration::from_secs(5),
|
||||||
Ok(()) => return Ok(()),
|
yahoo_client.client.get("https://finance.yahoo.com").send()
|
||||||
Err(e) => {
|
).await {
|
||||||
let error_str = e.to_string();
|
Ok(Ok(_)) => {
|
||||||
last_err = Some(e);
|
logger::log_info(&format!(
|
||||||
|
" ✓ YahooClient[{}] proxy is responsive",
|
||||||
// If it's a permanent error, don't retry
|
client_id
|
||||||
if error_str.contains("Invalid Cookie") && attempt >= 2 {
|
)).await;
|
||||||
logger::log_error(&format!(
|
}
|
||||||
" YahooClient[{}] permanent cookie error, not retrying",
|
Ok(Err(e)) => {
|
||||||
self.client_id
|
return Err(anyhow!(
|
||||||
)).await;
|
"Proxy connection failed for YahooClient[{}]: {}",
|
||||||
break;
|
client_id, e
|
||||||
}
|
));
|
||||||
|
}
|
||||||
if attempt < 3 {
|
Err(_) => {
|
||||||
let delay_ms = 300 * attempt; // Exponential backoff
|
return Err(anyhow!(
|
||||||
logger::log_info(&format!(
|
"Proxy connection timeout for YahooClient[{}]",
|
||||||
" YahooClient[{}] crumb attempt {}/3 failed, retrying in {}ms: {}",
|
client_id
|
||||||
self.client_id, attempt, delay_ms, error_str
|
));
|
||||||
)).await;
|
}
|
||||||
sleep(Duration::from_millis(delay_ms)).await;
|
}
|
||||||
}
|
|
||||||
}
|
// Initialize crumb with timeout wrapper
|
||||||
|
match timeout(
|
||||||
|
Duration::from_secs(30), // NEW: 30 second timeout for entire initialization
|
||||||
|
yahoo_client.try_initialize_crumb(&yahoo_client)
|
||||||
|
).await {
|
||||||
|
Ok(Ok(())) => {
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" ✓ YahooClient[{}] initialized (max_tasks: {})",
|
||||||
|
client_id, max_tasks_per_client
|
||||||
|
)).await;
|
||||||
|
Ok(yahoo_client)
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
Err(anyhow!(
|
||||||
|
"Failed to initialize YahooClient[{}]: {}",
|
||||||
|
client_id, e
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
Err(anyhow!(
|
||||||
|
"Timeout initializing YahooClient[{}] (>30s)",
|
||||||
|
client_id
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_initialize_crumb(&self, client: &YahooClient) -> Result<()> {
|
||||||
|
match client.initialize_crumb().await {
|
||||||
|
Ok(()) => return Ok(()),
|
||||||
|
Err(e) => {
|
||||||
|
let error_str: String = e.to_string();
|
||||||
|
|
||||||
|
// If it's a permanent error, don't retry
|
||||||
|
if error_str.contains("Invalid Cookie") {
|
||||||
|
logger::log_error(&format!(
|
||||||
|
" YahooClient[{}] permanent cookie error, not retrying",
|
||||||
|
self.client_id
|
||||||
|
)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" YahooClient[{}] crumb attempt failed {}",
|
||||||
|
self.client_id, error_str
|
||||||
|
)).await;
|
||||||
|
|
||||||
|
Err(anyhow!("Failed fetching crumb: {}", error_str))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(last_err.unwrap_or_else(|| anyhow!("Max retries exceeded")))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the crumb fetching in the initialize_crumb function
|
// Update the crumb fetching in the initialize_crumb function
|
||||||
@@ -366,15 +404,7 @@ impl YahooClient {
|
|||||||
self.client_id
|
self.client_id
|
||||||
)).await;
|
)).await;
|
||||||
|
|
||||||
// Step 1: Make multiple requests to establish a proper session
|
// Step 1: Establish session with timeout
|
||||||
let mut cookies_established = false;
|
|
||||||
|
|
||||||
logger::log_info(&format!(
|
|
||||||
" YahooClient[{}] Session establishment",
|
|
||||||
self.client_id
|
|
||||||
)).await;
|
|
||||||
|
|
||||||
// Try different Yahoo domains to get valid cookies
|
|
||||||
let yahoo_domains = [
|
let yahoo_domains = [
|
||||||
"https://finance.yahoo.com",
|
"https://finance.yahoo.com",
|
||||||
"https://www.yahoo.com",
|
"https://www.yahoo.com",
|
||||||
@@ -382,41 +412,75 @@ impl YahooClient {
|
|||||||
];
|
];
|
||||||
|
|
||||||
for domain in yahoo_domains.iter() {
|
for domain in yahoo_domains.iter() {
|
||||||
let _ = self.client
|
// NEW: Add timeout and better error handling
|
||||||
.get(*domain)
|
match timeout(
|
||||||
.header("User-Agent", Self::random_user_agent())
|
Duration::from_secs(5),
|
||||||
.header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8")
|
self.client
|
||||||
.header("Accept-Language", "en-US,en;q=0.9")
|
.get(*domain)
|
||||||
.header("Accept-Encoding", "gzip, deflate")
|
.header("User-Agent", Self::random_user_agent())
|
||||||
.header("DNT", "1")
|
.header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8")
|
||||||
.header("Connection", "keep-alive")
|
.header("Accept-Language", "en-US,en;q=0.9")
|
||||||
.header("Upgrade-Insecure-Requests", "1")
|
.header("Accept-Encoding", "gzip, deflate")
|
||||||
.send()
|
.header("DNT", "1")
|
||||||
.await
|
.header("Connection", "keep-alive")
|
||||||
.ok();
|
.header("Upgrade-Insecure-Requests", "1")
|
||||||
|
.send()
|
||||||
|
).await {
|
||||||
|
Ok(Ok(response)) if response.status().is_success() => {
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" ✓ YahooClient[{}] session established via {}",
|
||||||
|
self.client_id, domain
|
||||||
|
)).await;
|
||||||
|
}
|
||||||
|
Ok(Ok(response)) => {
|
||||||
|
logger::log_warn(&format!(
|
||||||
|
" ⚠ YahooClient[{}] session request to {} returned {}",
|
||||||
|
self.client_id, domain, response.status()
|
||||||
|
)).await;
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
logger::log_warn(&format!(
|
||||||
|
" ⚠ YahooClient[{}] failed to connect to {}: {}",
|
||||||
|
self.client_id, domain, e
|
||||||
|
)).await;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
logger::log_warn(&format!(
|
||||||
|
" ⚠ YahooClient[{}] timeout connecting to {}",
|
||||||
|
self.client_id, domain
|
||||||
|
)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sleep(Duration::from_millis(500)).await;
|
sleep(Duration::from_millis(500)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 2: Now try to fetch the crumb with enhanced headers
|
// Step 2: Fetch crumb with timeout
|
||||||
let crumb_response = self.client
|
logger::log_info(&format!(
|
||||||
.get(YAHOO_CRUMB_URL)
|
" YahooClient[{}] requesting crumb...",
|
||||||
.header("User-Agent", Self::random_user_agent())
|
self.client_id
|
||||||
.header("Accept", "*/*")
|
)).await;
|
||||||
.header("Accept-Language", "en-US,en;q=0.9")
|
|
||||||
.header("Accept-Encoding", "gzip, deflate, br")
|
|
||||||
.header("Referer", "https://finance.yahoo.com/")
|
|
||||||
.header("Origin", "https://finance.yahoo.com")
|
|
||||||
.header("Sec-Fetch-Dest", "empty")
|
|
||||||
.header("Sec-Fetch-Mode", "cors")
|
|
||||||
.header("Sec-Fetch-Site", "same-site")
|
|
||||||
.header("Pragma", "no-cache")
|
|
||||||
.header("Cache-Control", "no-cache")
|
|
||||||
.send()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match crumb_response {
|
let crumb_result = timeout(
|
||||||
Ok(response) if response.status().is_success() => {
|
Duration::from_secs(30), // NEW: 30 second timeout for crumb fetch
|
||||||
|
self.client
|
||||||
|
.get(YAHOO_CRUMB_URL)
|
||||||
|
.header("User-Agent", Self::random_user_agent())
|
||||||
|
.header("Accept", "*/*")
|
||||||
|
.header("Accept-Language", "en-US,en;q=0.9")
|
||||||
|
.header("Accept-Encoding", "gzip, deflate, br")
|
||||||
|
.header("Referer", "https://finance.yahoo.com/")
|
||||||
|
.header("Origin", "https://finance.yahoo.com")
|
||||||
|
.header("Sec-Fetch-Dest", "empty")
|
||||||
|
.header("Sec-Fetch-Mode", "cors")
|
||||||
|
.header("Sec-Fetch-Site", "same-site")
|
||||||
|
.header("Pragma", "no-cache")
|
||||||
|
.header("Cache-Control", "no-cache")
|
||||||
|
.send()
|
||||||
|
).await;
|
||||||
|
|
||||||
|
match crumb_result {
|
||||||
|
Ok(Ok(response)) if response.status().is_success() => {
|
||||||
let crumb_text = response.text().await?;
|
let crumb_text = response.text().await?;
|
||||||
let crumb = crumb_text.trim().to_string();
|
let crumb = crumb_text.trim().to_string();
|
||||||
|
|
||||||
@@ -434,38 +498,35 @@ impl YahooClient {
|
|||||||
let mut refresh_guard = self.crumb_last_refresh.lock().await;
|
let mut refresh_guard = self.crumb_last_refresh.lock().await;
|
||||||
*refresh_guard = Some(Instant::now());
|
*refresh_guard = Some(Instant::now());
|
||||||
|
|
||||||
cookies_established = true;
|
return Ok(());
|
||||||
} else {
|
} else {
|
||||||
logger::log_warn(&format!(
|
return Err(anyhow!(
|
||||||
" YahooClient[{}] got invalid crumb format",
|
"YahooClient[{}] got invalid crumb format: '{}'",
|
||||||
self.client_id
|
self.client_id, crumb
|
||||||
)).await;
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(response) => {
|
Ok(Ok(response)) => {
|
||||||
let status = response.status();
|
let status = response.status();
|
||||||
let error_text = response.text().await?;
|
let error_text = response.text().await.unwrap_or_default();
|
||||||
logger::log_warn(&format!(
|
return Err(anyhow!(
|
||||||
" YahooClient[{}] failed: HTTP {}: {}",
|
"YahooClient[{}] crumb fetch failed: HTTP {} - {}",
|
||||||
self.client_id, status, &error_text[..error_text.len().min(100)]
|
self.client_id, status, &error_text[..error_text.len().min(100)]
|
||||||
)).await;
|
));
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Ok(Err(e)) => {
|
||||||
logger::log_warn(&format!(
|
return Err(anyhow!(
|
||||||
" YahooClient[{}] connection error: {}",
|
"YahooClient[{}] crumb connection error: {}",
|
||||||
self.client_id, e
|
self.client_id, e
|
||||||
)).await;
|
));
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
return Err(anyhow!(
|
||||||
|
"YahooClient[{}] crumb fetch timeout (>10s)",
|
||||||
|
self.client_id
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !cookies_established {
|
|
||||||
return Err(anyhow!(
|
|
||||||
"Failed to initialize cookies/crumb for YahooClient[{}]",
|
|
||||||
self.client_id
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the current crumb (refresh if needed)
|
/// Get the current crumb (refresh if needed)
|
||||||
@@ -528,7 +589,15 @@ impl YahooClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Record a failure - return true if client should be replaced
|
/// Record a failure - return true if client should be replaced
|
||||||
pub async fn record_failure(&self, error_type: &str) -> bool {
|
pub async fn record_failure(&self, error_str: &str) -> bool {
|
||||||
|
// Don't count 404 as failures - these are expected when data doesn't exist
|
||||||
|
if error_str.contains("404") || error_str.contains("Not Found") {
|
||||||
|
return false; // Don't mark for replacement
|
||||||
|
}
|
||||||
|
if error_str.contains("500") || error_str.contains("Not Found") {
|
||||||
|
return false; // Don't mark for replacement
|
||||||
|
}
|
||||||
|
|
||||||
let old_failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
let old_failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||||
let new_failures = old_failures + 1;
|
let new_failures = old_failures + 1;
|
||||||
|
|
||||||
@@ -538,13 +607,13 @@ impl YahooClient {
|
|||||||
if should_replace {
|
if should_replace {
|
||||||
logger::log_warn(&format!(
|
logger::log_warn(&format!(
|
||||||
" 🚨 YahooClient[{}] marked for replacement ({} consecutive failures, last error: {})",
|
" 🚨 YahooClient[{}] marked for replacement ({} consecutive failures, last error: {})",
|
||||||
self.client_id, new_failures, error_type
|
self.client_id, new_failures, error_str
|
||||||
)).await;
|
)).await;
|
||||||
self.is_marked_for_replacement.store(true, Ordering::Relaxed);
|
self.is_marked_for_replacement.store(true, Ordering::Relaxed);
|
||||||
} else {
|
} else {
|
||||||
logger::log_warn(&format!(
|
logger::log_warn(&format!(
|
||||||
" ⚠ YahooClient[{}] failure {}/{}: {}",
|
" ⚠ YahooClient[{}] failure {}/{}: {}",
|
||||||
self.client_id, new_failures, self.max_consecutive_failures, error_type
|
self.client_id, new_failures, self.max_consecutive_failures, error_str
|
||||||
)).await;
|
)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1064,28 +1133,46 @@ impl YahooClientPool {
|
|||||||
actual_pool_size,
|
actual_pool_size,
|
||||||
if rotation_enabled { "enabled" } else { "disabled" }
|
if rotation_enabled { "enabled" } else { "disabled" }
|
||||||
)).await;
|
)).await;
|
||||||
|
|
||||||
let mut clients = Vec::with_capacity(actual_pool_size);
|
logger::log_info(&format!(
|
||||||
|
"Initializing {} YahooClients in parallel...",
|
||||||
|
actual_pool_size
|
||||||
|
)).await;
|
||||||
|
|
||||||
|
let mut handles = Vec::new();
|
||||||
|
|
||||||
for i in 0..actual_pool_size {
|
for i in 0..actual_pool_size {
|
||||||
let proxy_url = proxy_pool.get_proxy_url(i);
|
let proxy_url = proxy_pool.get_proxy_url(i);
|
||||||
|
let monitoring_clone = monitoring.clone();
|
||||||
|
|
||||||
match YahooClient::new(
|
let handle = tokio::spawn(async move {
|
||||||
i,
|
YahooClient::new(i, proxy_url, max_tasks_per_instance, monitoring_clone).await
|
||||||
proxy_url,
|
});
|
||||||
max_tasks_per_instance,
|
|
||||||
monitoring.clone(),
|
handles.push(handle);
|
||||||
).await {
|
}
|
||||||
Ok(client) => {
|
|
||||||
|
// Wait for all to complete
|
||||||
|
let results = futures::future::join_all(handles).await;
|
||||||
|
|
||||||
|
let mut clients = Vec::with_capacity(actual_pool_size);
|
||||||
|
for (i, result) in results.into_iter().enumerate() {
|
||||||
|
match result {
|
||||||
|
Ok(Ok(client)) => {
|
||||||
clients.push(Arc::new(client));
|
clients.push(Arc::new(client));
|
||||||
logger::log_info(&format!(" ✓ Client {} ready", i)).await;
|
logger::log_info(&format!(" ✓ Client {} ready", i)).await;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Ok(Err(e)) => {
|
||||||
logger::log_warn(&format!(
|
logger::log_warn(&format!(
|
||||||
" ✗ Failed to initialize Client {}: {} - skipping",
|
" ✗ Failed to initialize Client {}: {} - skipping",
|
||||||
i, e
|
i, e
|
||||||
)).await;
|
)).await;
|
||||||
// Continue with next client instead of failing entire pool
|
}
|
||||||
|
Err(e) => {
|
||||||
|
logger::log_warn(&format!(
|
||||||
|
" ✗ Client {} task panicked: {} - skipping",
|
||||||
|
i, e
|
||||||
|
)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user