From b366f366e6b243ea403a15c06b59484a733c96ed Mon Sep 17 00:00:00 2001 From: donpat1to Date: Fri, 19 Dec 2025 14:12:56 +0100 Subject: [PATCH] added atomic writer action for ctr c abort --- Cargo.lock | 212 ++++++++-- Cargo.toml | 8 +- src/config.rs | 65 +--- src/corporate/atomic_writer.rs | 346 +++++++++++++++++ src/corporate/helpers.rs | 14 + src/corporate/mod.rs | 3 +- src/corporate/page_validation.rs | 180 +++++++++ src/corporate/update.rs | 493 ++++++++++++----------- src/corporate/update_parallel.rs | 423 ++++++++++---------- src/corporate/yahoo.rs | 184 +++++---- src/economic/scraper.rs | 2 +- src/economic/storage.rs | 2 +- src/lib.rs | 6 +- src/main.rs | 63 ++- src/monitoring/dashboard.html | 644 +++++++++++++++++++++++++++++++ src/monitoring/events.rs | 129 +++++++ src/monitoring/logger.rs | 103 +++++ src/monitoring/metrics.rs | 252 ++++++++++++ src/monitoring/mod.rs | 78 ++++ src/monitoring/service.rs | 341 ++++++++++++++++ src/monitoring/webserver.rs | 77 ++++ src/scraper/docker_vpn_proxy.rs | 21 +- src/scraper/helpers.rs | 14 + src/scraper/mod.rs | 3 +- src/scraper/webdriver.rs | 236 +++++++---- src/util/logger.rs | 84 +++- 26 files changed, 3317 insertions(+), 666 deletions(-) create mode 100644 src/corporate/atomic_writer.rs create mode 100644 src/corporate/page_validation.rs create mode 100644 src/monitoring/dashboard.html create mode 100644 src/monitoring/events.rs create mode 100644 src/monitoring/logger.rs create mode 100644 src/monitoring/metrics.rs create mode 100644 src/monitoring/mod.rs create mode 100644 src/monitoring/service.rs create mode 100644 src/monitoring/webserver.rs create mode 100644 src/scraper/helpers.rs diff --git a/Cargo.lock b/Cargo.lock index bc603b2..6e09974 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.110", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -122,6 +133,64 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "base64 0.22.1", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", + "sync_wrapper", + "tokio", + "tokio-tungstenite 0.24.0", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "base64" version = "0.21.7" @@ -660,34 +729,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "event_backtest_engine" -version = "0.1.0" -dependencies = [ - "anyhow", - "chrono", - "csv", - "dotenvy", - "fantoccini", - "flate2", - "futures", - "once_cell", - "rand 0.9.2", - "rayon", - "regex", - "reqwest", - "scraper", - "serde", - "serde_json", - "tokio", - "tracing", - "tracing-subscriber", - "url", - "walkdir", - "yfinance-rs", - "zip", -] - [[package]] name = "fantoccini" version = "0.20.0" @@ -1099,6 +1140,7 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -1522,6 +1564,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.6" @@ -2684,6 +2732,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3100,6 +3159,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.24.0", +] + [[package]] name = "tokio-tungstenite" version = "0.28.0" @@ -3113,7 +3196,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls 0.26.4", - "tungstenite", + "tungstenite 0.28.0", ] [[package]] @@ -3172,6 +3255,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -3210,6 +3294,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3271,6 +3356,43 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "url", + "utf-8", +] + +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.28.0" @@ -3484,6 +3606,36 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web_scraper" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "chrono", + "csv", + "dotenvy", + "fantoccini", + "flate2", + "futures", + "once_cell", + "rand 0.9.2", + "rayon", + "regex", + "reqwest", + "scraper", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite 0.21.0", + "tracing", + "tracing-subscriber", + "url", + "walkdir", + "yfinance-rs", + "zip", +] + [[package]] name = "webdriver" version = "0.50.0" @@ -3798,7 +3950,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.28.0", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 104b69d..e5afa63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "event_backtest_engine" +name = "web_scraper" version = "0.1.0" edition = "2024" authors = ["Your Name "] @@ -53,4 +53,8 @@ once_cell = "1.21.3" # Parallel processing (for batch tickers) futures = "0.3" -rayon = "1.10" # optional: for parallel price downloads \ No newline at end of file +rayon = "1.10" # optional: for parallel price downloads + +# Web server for dashboard +axum = { version = "0.7", features = ["ws"] } +tokio-tungstenite = "0.21" # For WebSocket support \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index d9acc83..c35e129 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,5 @@ // src/config.rs - FIXED VERSION -use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; use anyhow::{Context, Result}; use chrono::{self}; use serde::{Deserialize, Serialize}; @@ -16,7 +15,7 @@ pub struct Config { pub max_tasks_per_instance: usize, - #[serde(default)] + #[serde(default = "default_enable_vpn_rotation")] pub enable_vpn_rotation: bool, // IMPROVEMENT: Reduzierte Defaults für weniger aggressive Scraping @@ -30,16 +29,20 @@ pub struct Config { pub max_retry_attempts: u32, } +fn default_enable_vpn_rotation() -> bool { + false +} + fn default_max_parallel_instances() -> usize { - 4 // Reduziert von 10 auf 4 + 4 } fn default_max_requests_per_session() -> usize { - 10 // Reduziert von 25 auf 10 + 10 } fn default_min_request_interval_ms() -> u64 { - 1200 // Erhöht von 300 auf 1200 + 1200 } fn default_max_retry_attempts() -> u32 { 3 } @@ -60,59 +63,7 @@ impl Default for Config { } } -pub struct PoolMetrics { - pub total_requests: Arc, - pub successful_requests: Arc, - pub failed_requests: Arc, - pub session_renewals: Arc, - pub rotation_events: Arc, - pub retries: Arc, - - // IMPROVEMENT: Neue Metriken für besseres Monitoring - pub navigation_timeouts: Arc, - pub bot_detection_hits: Arc, - pub proxy_failures: Arc, -} -impl PoolMetrics { - pub fn new() -> Self { - Self { - total_requests: Arc::new(AtomicUsize::new(0)), - successful_requests: Arc::new(AtomicUsize::new(0)), - failed_requests: Arc::new(AtomicUsize::new(0)), - session_renewals: Arc::new(AtomicUsize::new(0)), - rotation_events: Arc::new(AtomicUsize::new(0)), - retries: Arc::new(AtomicUsize::new(0)), - navigation_timeouts: Arc::new(AtomicUsize::new(0)), - bot_detection_hits: Arc::new(AtomicUsize::new(0)), - proxy_failures: Arc::new(AtomicUsize::new(0)), - } - } - - pub async fn log_stats(&self) { - let total = self.total_requests.load(Ordering::Relaxed); - let success = self.successful_requests.load(Ordering::Relaxed); - // FIX: Prefix unused variable with underscore - let _failed = self.failed_requests.load(Ordering::Relaxed); - let renewals = self.session_renewals.load(Ordering::Relaxed); - let rotations = self.rotation_events.load(Ordering::Relaxed); - let retries = self.retries.load(Ordering::Relaxed); - let timeouts = self.navigation_timeouts.load(Ordering::Relaxed); - let bot_hits = self.bot_detection_hits.load(Ordering::Relaxed); - let proxy_fails = self.proxy_failures.load(Ordering::Relaxed); - - let success_rate = if total > 0 { - (success as f64 / total as f64) * 100.0 - } else { - 0.0 - }; - - crate::util::logger::log_info(&format!( - "Pool Metrics: {} total requests, {:.1}% success rate, {} renewals, {} rotations, {} retries, {} timeouts, {} bot detections, {} proxy failures", - total, success_rate, renewals, rotations, retries, timeouts, bot_hits, proxy_fails - )).await; - } -} impl Config { /// Loads configuration from environment variables using dotenvy. diff --git a/src/corporate/atomic_writer.rs b/src/corporate/atomic_writer.rs new file mode 100644 index 0000000..a6fe657 --- /dev/null +++ b/src/corporate/atomic_writer.rs @@ -0,0 +1,346 @@ +// src/corporate/atomic_writer.rs +// +// Atomic JSONL writer that prevents partial/corrupted results from being written + +use anyhow::Result; +use serde::Serialize; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::fs::{File, OpenOptions}; +use tokio::io::AsyncWriteExt; +use tokio::sync::mpsc; + +/// Command to write or validate data +#[derive(Debug)] +pub enum WriteCommand { + /// Stage a result for writing (held in memory until committed) + Stage { id: String, data: T }, + + /// Commit staged result to disk (atomic write) + Commit { id: String }, + + /// Rollback staged result (discard without writing) + Rollback { id: String }, + + /// Commit all pending staged results and flush + CommitAll, + + /// Shutdown writer gracefully (only commits valid staged results) + Shutdown, +} + +/// Result of a write operation +#[derive(Debug)] +pub struct WriteResult { + pub id: String, + pub success: bool, + pub error: Option, +} + +/// Atomic writer that prevents partial results from being written +pub struct AtomicJsonlWriter { + file: File, + staged: HashMap, + committed_count: usize, + rollback_count: usize, +} + +impl AtomicJsonlWriter { + pub async fn new(path: PathBuf) -> Result { + // Ensure parent directory exists + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await?; + + crate::util::logger::log_info(&format!( + "Atomic writer initialized: {:?}", + path + )).await; + + Ok(Self { + file, + staged: HashMap::new(), + committed_count: 0, + rollback_count: 0, + }) + } + + /// Stage data for writing (held in memory, not yet written) + pub async fn stage(&mut self, id: String, data: T) { + crate::util::logger::log_info(&format!( + "Staging result for: {} (total staged: {})", + id, + self.staged.len() + 1 + )).await; + + self.staged.insert(id, data); + } + + /// Commit a staged result to disk (atomic write) + pub async fn commit(&mut self, id: &str) -> Result<()> { + if let Some(data) = self.staged.remove(id) { + // Serialize to JSON + let json_line = serde_json::to_string(&data)?; + + // Write atomically (single syscall) + self.file.write_all(json_line.as_bytes()).await?; + self.file.write_all(b"\n").await?; + self.file.flush().await?; + + self.committed_count += 1; + + crate::util::logger::log_info(&format!( + "✓ Committed result for: {} (total committed: {})", + id, self.committed_count + )).await; + + Ok(()) + } else { + Err(anyhow::anyhow!("No staged result found for id: {}", id)) + } + } + + /// Rollback a staged result (discard without writing) + pub async fn rollback(&mut self, id: &str) { + if self.staged.remove(id).is_some() { + self.rollback_count += 1; + + crate::util::logger::log_warn(&format!( + "⚠ Rolled back result for: {} (total rollbacks: {})", + id, self.rollback_count + )).await; + } + } + + /// Commit all staged results + pub async fn commit_all(&mut self) -> Result { + let ids: Vec = self.staged.keys().cloned().collect(); + let mut committed = 0; + + for id in ids { + if let Ok(()) = self.commit(&id).await { + committed += 1; + } + } + + Ok(committed) + } + + /// Rollback all staged results (discard everything) + pub async fn rollback_all(&mut self) -> usize { + let count = self.staged.len(); + self.staged.clear(); + self.rollback_count += count; + + crate::util::logger::log_warn(&format!( + "⚠ Rolled back all {} staged results", + count + )).await; + + count + } + + /// Get statistics + pub fn stats(&self) -> WriterStats { + WriterStats { + staged_count: self.staged.len(), + committed_count: self.committed_count, + rollback_count: self.rollback_count, + } + } +} + +#[derive(Debug, Clone)] +pub struct WriterStats { + pub staged_count: usize, + pub committed_count: usize, + pub rollback_count: usize, +} + +/// Managed writer service that runs in its own task +pub struct AtomicWriterService { + rx: mpsc::UnboundedReceiver>, + writer: AtomicJsonlWriter, + shutdown_flag: Arc, +} + +impl AtomicWriterService { + pub async fn new( + path: PathBuf, + rx: mpsc::UnboundedReceiver>, + shutdown_flag: Arc, + ) -> Result { + let writer = AtomicJsonlWriter::new(path).await?; + + Ok(Self { + rx, + writer, + shutdown_flag, + }) + } + + /// Main service loop + pub async fn run(mut self) { + crate::util::logger::log_info("Atomic writer service started").await; + + while let Some(cmd) = self.rx.recv().await { + // Check for shutdown flag + if self.shutdown_flag.load(Ordering::SeqCst) { + crate::util::logger::log_warn( + "Shutdown detected - processing only Commit/Rollback commands" + ).await; + + // Only process commit/rollback commands during shutdown + match cmd { + WriteCommand::Commit { id } => { + if let Err(e) = self.writer.commit(&id).await { + crate::util::logger::log_error(&format!( + "Failed to commit {}: {}", + id, e + )).await; + } + } + WriteCommand::Rollback { id } => { + self.writer.rollback(&id).await; + } + WriteCommand::CommitAll => { + match self.writer.commit_all().await { + Ok(count) => { + crate::util::logger::log_info(&format!( + "Committed {} results during shutdown", + count + )).await; + } + Err(e) => { + crate::util::logger::log_error(&format!( + "Failed to commit all: {}", + e + )).await; + } + } + } + WriteCommand::Shutdown => break, + _ => { + // Ignore Stage commands during shutdown + crate::util::logger::log_warn( + "Ignoring new Stage command during shutdown" + ).await; + } + } + continue; + } + + // Normal operation + match cmd { + WriteCommand::Stage { id, data } => { + self.writer.stage(id, data).await; + } + WriteCommand::Commit { id } => { + if let Err(e) = self.writer.commit(&id).await { + crate::util::logger::log_error(&format!( + "Failed to commit {}: {}", + id, e + )).await; + } + } + WriteCommand::Rollback { id } => { + self.writer.rollback(&id).await; + } + WriteCommand::CommitAll => { + match self.writer.commit_all().await { + Ok(count) => { + crate::util::logger::log_info(&format!( + "Committed all {} staged results", + count + )).await; + } + Err(e) => { + crate::util::logger::log_error(&format!( + "Failed to commit all: {}", + e + )).await; + } + } + } + WriteCommand::Shutdown => break, + } + } + + // Final shutdown - rollback any remaining staged items + let stats = self.writer.stats(); + if stats.staged_count > 0 { + crate::util::logger::log_warn(&format!( + "⚠ Shutdown with {} uncommitted results - rolling back", + stats.staged_count + )).await; + + self.writer.rollback_all().await; + } + + crate::util::logger::log_info(&format!( + "Atomic writer service stopped. Final stats: {} committed, {} rolled back", + stats.committed_count, + stats.rollback_count + )).await; + } +} + +/// Handle for sending write commands +#[derive(Clone)] +pub struct AtomicWriterHandle { + tx: mpsc::UnboundedSender>, +} + +impl AtomicWriterHandle { + pub fn new(tx: mpsc::UnboundedSender>) -> Self { + Self { tx } + } + + /// Stage data for writing (does not write immediately) + pub fn stage(&self, id: String, data: T) { + let _ = self.tx.send(WriteCommand::Stage { id, data }); + } + + /// Commit staged data to disk + pub fn commit(&self, id: String) { + let _ = self.tx.send(WriteCommand::Commit { id }); + } + + /// Rollback staged data (discard) + pub fn rollback(&self, id: String) { + let _ = self.tx.send(WriteCommand::Rollback { id }); + } + + /// Commit all staged data + pub fn commit_all(&self) { + let _ = self.tx.send(WriteCommand::CommitAll); + } + + /// Shutdown writer gracefully + pub fn shutdown(&self) { + let _ = self.tx.send(WriteCommand::Shutdown); + } +} + +/// Create atomic writer service +pub async fn create_atomic_writer( + path: PathBuf, + shutdown_flag: Arc, +) -> Result<(AtomicWriterHandle, tokio::task::JoinHandle<()>)> { + let (tx, rx) = mpsc::unbounded_channel(); + + let service = AtomicWriterService::new(path, rx, shutdown_flag).await?; + let handle = tokio::spawn(async move { + service.run().await; + }); + + Ok((AtomicWriterHandle::new(tx), handle)) +} \ No newline at end of file diff --git a/src/corporate/helpers.rs b/src/corporate/helpers.rs index 066d437..ba00548 100644 --- a/src/corporate/helpers.rs +++ b/src/corporate/helpers.rs @@ -2,6 +2,8 @@ use super::types::*; use chrono::{Local, NaiveDate}; use std::collections::{HashMap, HashSet}; +use rand::rngs::StdRng; +use rand::prelude::{Rng, SeedableRng, IndexedRandom}; pub fn event_key(e: &CompanyEvent) -> String { format!("{}|{}|{}", e.ticker, e.date, e.time) @@ -67,4 +69,16 @@ pub fn parse_yahoo_date(s: &str) -> anyhow::Result { NaiveDate::parse_from_str(s, "%B %d, %Y") .or_else(|_| NaiveDate::parse_from_str(s, "%b %d, %Y")) .map_err(|_| anyhow::anyhow!("Bad date: {s}")) +} + +/// Send-safe random range +pub fn random_range(min: u64, max: u64) -> u64 { + let mut rng = StdRng::from_rng(&mut rand::rng()); + rng.gen_range(min..max) +} + +/// Send-safe random choice +pub fn choose_random(items: &[T]) -> T { + let mut rng = StdRng::from_rng(&mut rand::rng()); + items.choose(&mut rng).unwrap().clone() } \ No newline at end of file diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 703fe2e..829a268 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -9,6 +9,7 @@ pub mod fx; pub mod openfigi; pub mod yahoo; pub mod update_parallel; - +pub mod page_validation; +pub mod atomic_writer; pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/page_validation.rs b/src/corporate/page_validation.rs new file mode 100644 index 0000000..27d7c43 --- /dev/null +++ b/src/corporate/page_validation.rs @@ -0,0 +1,180 @@ +// src/corporate/page_validation.rs +// +// Utilities to ensure page state is correct before extraction + +use anyhow::{anyhow, Result}; +use fantoccini::Client; +use tokio::time::{sleep, Duration}; + +/// Validates that the browser navigated to the expected URL +/// +/// This prevents extracting data from a stale page when navigation fails silently +pub async fn verify_navigation( + client: &Client, + expected_url_fragment: &str, + max_attempts: u32, +) -> Result<()> { + for attempt in 1..=max_attempts { + let current_url = client.current_url().await?; + let current = current_url.as_str(); + + if current.contains(expected_url_fragment) { + crate::util::logger::log_info(&format!( + "✓ Navigation verified: {} (attempt {})", + current, attempt + )).await; + return Ok(()); + } + + if attempt < max_attempts { + crate::util::logger::log_warn(&format!( + "Navigation mismatch (attempt {}): expected '{}', got '{}'. Retrying...", + attempt, expected_url_fragment, current + )).await; + sleep(Duration::from_millis(500)).await; + } + } + + let current_url = client.current_url().await?; + Err(anyhow!( + "Navigation verification failed: expected URL containing '{}', but got '{}'", + expected_url_fragment, + current_url.as_str() + )) +} + +/// Clears browser state by navigating to a blank page +/// +/// Use this when a navigation fails or times out to ensure clean slate +pub async fn clear_browser_state(client: &Client) -> Result<()> { + crate::util::logger::log_info("Clearing browser state with about:blank").await; + + // Navigate to blank page to clear any stale content + client.goto("about:blank").await?; + + // Brief wait to ensure page clears + sleep(Duration::from_millis(200)).await; + + Ok(()) +} + +/// Validates that expected content exists on the page before extraction +/// +/// This adds an extra safety check that the page actually loaded +pub async fn verify_page_content( + client: &Client, + content_checks: Vec, +) -> Result<()> { + for check in content_checks { + match check { + ContentCheck::ElementExists(selector) => { + let exists: bool = client + .execute( + &format!( + "return !!document.querySelector('{}');", + selector.replace("'", "\\'") + ), + vec![], + ) + .await? + .as_bool() + .unwrap_or(false); + + if !exists { + return Err(anyhow!( + "Expected element '{}' not found on page", + selector + )); + } + } + ContentCheck::TextContains(text) => { + let page_text: String = client + .execute("return document.body.innerText;", vec![]) + .await? + .as_str() + .unwrap_or("") + .to_string(); + + if !page_text.contains(&text) { + return Err(anyhow!( + "Expected text '{}' not found on page", + text + )); + } + } + } + } + + Ok(()) +} + +#[derive(Debug, Clone)] +pub enum ContentCheck { + /// Verify that a CSS selector exists + ElementExists(String), + /// Verify that page body contains text + TextContains(String), +} + +/// Safe navigation wrapper that validates and clears state on failure +pub async fn navigate_with_validation( + client: &Client, + url: &str, + expected_url_fragment: &str, + timeout_secs: u64, +) -> Result<()> { + use tokio::time::timeout; + + // Attempt navigation with timeout + let nav_result = timeout( + Duration::from_secs(timeout_secs), + client.goto(url) + ).await; + + match nav_result { + Ok(Ok(_)) => { + // Navigation succeeded, verify we're on correct page + verify_navigation(client, expected_url_fragment, 3).await?; + Ok(()) + } + Ok(Err(e)) => { + // Navigation failed - clear state before returning error + crate::util::logger::log_error(&format!( + "Navigation failed: {}. Clearing browser state...", + e + )).await; + clear_browser_state(client).await.ok(); // Best effort + Err(anyhow!("Navigation failed: {}", e)) + } + Err(_) => { + // Navigation timed out - clear state before returning error + crate::util::logger::log_error(&format!( + "Navigation timeout after {}s. Clearing browser state...", + timeout_secs + )).await; + clear_browser_state(client).await.ok(); // Best effort + Err(anyhow!("Navigation timeout")) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_content_check_variants() { + let check1 = ContentCheck::ElementExists("table".to_string()); + let check2 = ContentCheck::TextContains("Yahoo Finance".to_string()); + + match check1 { + ContentCheck::ElementExists(sel) => assert_eq!(sel, "table"), + _ => panic!("Wrong variant"), + } + + match check2 { + ContentCheck::TextContains(text) => assert_eq!(text, "Yahoo Finance"), + _ => panic!("Wrong variant"), + } + } +} \ No newline at end of file diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 7684978..0c23ddf 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -1,4 +1,4 @@ -// src/corporate/update.rs - ABORT-SAFE VERSION WITH JSONL LOG +// src/corporate/update.rs - UPDATED WITH DATA INTEGRITY FIXES use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*}; use crate::config::Config; use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel; @@ -11,12 +11,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +/// UPDATED: Main corporate update entry point with shutdown awareness pub async fn run_full_update( _config: &Config, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result<()> { - logger::log_info("=== Corporate Update (STREAMING MODE) ===").await; + logger::log_info("=== Corporate Update (STREAMING MODE WITH DATA INTEGRITY) ===").await; let paths = DataPaths::new(".")?; @@ -33,6 +34,7 @@ pub async fn run_full_update( }; if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after GLEIF download").await; return Ok(()); } @@ -41,6 +43,7 @@ pub async fn run_full_update( logger::log_info(" ✓ OpenFIGI metadata loaded").await; if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after OpenFIGI load").await; return Ok(()); } @@ -54,6 +57,7 @@ pub async fn run_full_update( } if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after LEI-FIGI mapping").await; return Ok(()); } @@ -69,10 +73,11 @@ pub async fn run_full_update( } if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected after securities map build").await; return Ok(()); } - logger::log_info("Step 5: Building companies.jsonl (streaming with abort-safe persistence)...").await; + logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag).await?; logger::log_info(&format!(" ✓ Saved {} companies", count)).await; @@ -80,40 +85,32 @@ pub async fn run_full_update( logger::log_info("Step 6: Processing events (using index)...").await; let _event_index = build_event_index(&paths).await?; logger::log_info(" ✓ Event index built").await; + } else { + logger::log_warn("Shutdown detected, skipping event index build").await; } logger::log_info("✓ Corporate update complete").await; Ok(()) } -/// Abort-safe incremental JSONL persistence with atomic checkpoints +/// UPDATED: Serial version with validation (kept for compatibility/debugging) /// -/// Implements the data_updating_rule.md specification: -/// - Append-only JSONL log for all updates -/// - Batched fsync for performance (configurable batch size) -/// - Time-based fsync for safety (max 10 seconds without fsync) -/// - Atomic checkpoints via temp file + rename -/// - Crash recovery by loading checkpoint + replaying log -/// - Partial lines automatically ignored by .lines() iterator +/// This is the non-parallel version that processes companies sequentially. +/// Updated with same validation and shutdown checks as parallel version. /// -/// # Error Handling & Crash Safety -/// -/// If any write or fsync fails: -/// - Function returns error immediately -/// - Partial line may be in OS buffer but not fsynced -/// - On next startup, .lines() will either: -/// a) Skip partial line (if no \n written) -/// b) Fail to parse malformed JSON (logged and skipped) -/// - No data corruption, at most last batch entries lost -async fn build_companies_jsonl_streaming( +/// Use this for: +/// - Debugging issues with specific companies +/// - Environments where parallel processing isn't desired +/// - Testing validation logic without concurrency complexity +async fn build_companies_jsonl_streaming_serial( paths: &DataPaths, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result { // Configuration constants - const CHECKPOINT_INTERVAL: usize = 50; // Create checkpoint every N updates - const FSYNC_BATCH_SIZE: usize = 10; // fsync every N writes for performance - const FSYNC_INTERVAL_SECS: u64 = 10; // Also fsync every N seconds for safety + const CHECKPOINT_INTERVAL: usize = 50; + const FSYNC_BATCH_SIZE: usize = 10; + const FSYNC_INTERVAL_SECS: u64 = 10; let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); @@ -134,7 +131,7 @@ async fn build_companies_jsonl_streaming( tokio::fs::create_dir_all(parent).await?; } - // === RECOVERY PHASE 1: Load last checkpoint === + // === RECOVERY PHASE: Load checkpoint + replay log === let mut existing_companies: HashMap = HashMap::new(); let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); @@ -142,8 +139,6 @@ async fn build_companies_jsonl_streaming( logger::log_info("Loading checkpoint from companies.jsonl...").await; let existing_content = tokio::fs::read_to_string(&companies_path).await?; - // Note: .lines() only returns complete lines terminated with \n - // Partial lines (incomplete writes from crashes) are automatically skipped for line in existing_content.lines() { if line.trim().is_empty() { continue; @@ -155,7 +150,6 @@ async fn build_companies_jsonl_streaming( existing_companies.insert(company.name.clone(), company); } Err(e) => { - // This catches both malformed JSON and partial lines logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; } } @@ -163,14 +157,11 @@ async fn build_companies_jsonl_streaming( logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await; } - // === RECOVERY PHASE 2: Replay log after checkpoint === if log_path.exists() { logger::log_info("Replaying update log...").await; let log_content = tokio::fs::read_to_string(&log_path).await?; let mut replayed = 0; - // Note: .lines() only returns complete lines terminated with \n - // Partial lines from crashes are automatically skipped for line in log_content.lines() { if line.trim().is_empty() { continue; @@ -183,7 +174,6 @@ async fn build_companies_jsonl_streaming( replayed += 1; } Err(e) => { - // This catches both malformed JSON and partial lines logger::log_warn(&format!("Skipping invalid log line: {}", e)).await; } } @@ -193,225 +183,143 @@ async fn build_companies_jsonl_streaming( } } - // === APPEND-ONLY LOG: Open in append mode with O_APPEND semantics === + // === OPEN LOG FILE === use tokio::fs::OpenOptions; + use tokio::io::AsyncWriteExt; + let mut log_file = OpenOptions::new() .create(true) - .append(true) // O_APPEND - atomic append operations + .append(true) .open(&log_path) .await?; - let mut count = existing_companies.len(); - let mut updated_count = 0; - let mut new_count = 0; - let mut updates_since_checkpoint = 0; - - // Batched fsync tracking for performance let mut writes_since_fsync = 0; let mut last_fsync = std::time::Instant::now(); + let mut updates_since_checkpoint = 0; + let mut count = 0; + let mut new_count = 0; + let mut updated_count = 0; - use tokio::io::AsyncWriteExt; + logger::log_info(&format!("Processing {} companies sequentially...", securities.len())).await; - for (name, company_info) in securities.iter() { + // === PROCESS COMPANIES SEQUENTIALLY === + for (name, company_info) in securities.clone() { + // Check shutdown before each company if shutdown_flag.load(Ordering::SeqCst) { - logger::log_info("Shutdown requested - stopping company processing").await; + logger::log_warn(&format!( + "Shutdown detected at company: {} (progress: {}/{})", + name, count, count + securities.len() + )).await; break; } - - // Skip if already processed (from checkpoint or log replay) - if processed_names.contains(name) { - continue; - } - - let existing_entry = existing_companies.get(name).cloned(); + + let existing_entry = existing_companies.get(&name).cloned(); let is_update = existing_entry.is_some(); - let mut isin_tickers_map: HashMap> = - existing_entry - .as_ref() - .map(|e| e.isin_tickers_map.clone()) - .unwrap_or_default(); - - let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); - let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone()); - - let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); - - for figi_infos in company_info.securities.values() { - for figi_info in figi_infos { - if !figi_info.isin.is_empty() { - let tickers = unique_isin_ticker_pairs - .entry(figi_info.isin.clone()) - .or_insert_with(Vec::new); - - if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) { - tickers.push(figi_info.ticker.clone()); - } - } - } - } - - for (isin, figi_tickers) in unique_isin_ticker_pairs { - if shutdown_flag.load(Ordering::SeqCst) { - break; - } - - let tickers = isin_tickers_map - .entry(isin.clone()) - .or_insert_with(Vec::new); - - for figi_ticker in figi_tickers { - if !tickers.contains(&figi_ticker) { - tickers.push(figi_ticker); - } - } - - let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); - - if !has_yahoo_ticker && !shutdown_flag.load(Ordering::SeqCst) { - logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; + // Process company with validation + match process_single_company_serial( + name.clone(), + company_info, + existing_entry, + pool, + shutdown_flag, + ).await { + Ok(Some(company_entry)) => { + // Write to log + let line = serde_json::to_string(&company_entry)?; + log_file.write_all(line.as_bytes()).await?; + log_file.write_all(b"\n").await?; - match scrape_company_details_by_isin(pool, &isin).await { - Ok(Some(details)) => { - logger::log_info(&format!("✓ Found Yahoo ticker {} for ISIN {}", details.ticker, isin)).await; - - tickers.push(format!("YAHOO:{}", details.ticker)); - - if sector.is_none() && details.sector.is_some() { - sector = details.sector.clone(); - logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await; - } - - if exchange.is_none() && details.exchange.is_some() { - exchange = details.exchange.clone(); - logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await; - } - }, - Ok(None) => { - logger::log_warn(&format!("◯ No search results for ISIN {}", isin)).await; - tickers.push("YAHOO:NO_RESULTS".to_string()); - }, - Err(e) => { - if shutdown_flag.load(Ordering::SeqCst) { - break; - } - logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await; - } - } - } - } - - if shutdown_flag.load(Ordering::SeqCst) { - break; - } - - if !isin_tickers_map.is_empty() { - let company_entry = CompanyCrossPlatformInfo { - name: name.clone(), - isin_tickers_map, - sector, - exchange, - }; - - // === APPEND-ONLY: Write single-line JSON with batched fsync === - // Write guarantees the line is either fully written or not at all - let line = serde_json::to_string(&company_entry)?; - log_file.write_all(line.as_bytes()).await?; - log_file.write_all(b"\n").await?; - writes_since_fsync += 1; - - // Batched fsync for performance + time-based fsync for safety - // fsync if: batch size reached OR time interval exceeded - let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE - || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; - - if should_fsync { - log_file.flush().await?; - // Critical: fsync to ensure durability before considering writes successful - // This prevents data loss on power failure or kernel panic - log_file.sync_data().await?; - writes_since_fsync = 0; - last_fsync = std::time::Instant::now(); - } - - // Update in-memory state ONLY after write (fsync happens in batches) - // This is safe because we fsync before checkpoints and at end of processing - processed_names.insert(name.clone()); - existing_companies.insert(name.clone(), company_entry); - - count += 1; - updates_since_checkpoint += 1; - - if is_update { - updated_count += 1; - } else { - new_count += 1; - } - - // === ATOMIC CHECKPOINT: Periodically create checkpoint === - // This reduces recovery time by snapshotting current state - if updates_since_checkpoint >= CHECKPOINT_INTERVAL { - // Ensure any pending writes are fsynced before checkpoint - if writes_since_fsync > 0 { + writes_since_fsync += 1; + + // Batched + time-based fsync + let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE + || last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS; + + if should_fsync { log_file.flush().await?; log_file.sync_data().await?; writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } - logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await; + // Update in-memory state + processed_names.insert(name.clone()); + existing_companies.insert(name.clone(), company_entry); - let checkpoint_tmp = companies_path.with_extension("jsonl.tmp"); - let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?; + count += 1; + updates_since_checkpoint += 1; - // Write all current state to temporary checkpoint file - for company in existing_companies.values() { - let line = serde_json::to_string(company)?; - checkpoint_file.write_all(line.as_bytes()).await?; - checkpoint_file.write_all(b"\n").await?; + if is_update { + updated_count += 1; + } else { + new_count += 1; } - checkpoint_file.flush().await?; - checkpoint_file.sync_all().await?; - drop(checkpoint_file); + // Periodic checkpoint + if updates_since_checkpoint >= CHECKPOINT_INTERVAL { + if writes_since_fsync > 0 { + log_file.flush().await?; + log_file.sync_data().await?; + writes_since_fsync = 0; + last_fsync = std::time::Instant::now(); + } + + logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await; + + let checkpoint_tmp = companies_path.with_extension("jsonl.tmp"); + let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?; + + for company in existing_companies.values() { + let line = serde_json::to_string(company)?; + checkpoint_file.write_all(line.as_bytes()).await?; + checkpoint_file.write_all(b"\n").await?; + } + + checkpoint_file.flush().await?; + checkpoint_file.sync_all().await?; + drop(checkpoint_file); + + tokio::fs::rename(&checkpoint_tmp, &companies_path).await?; + + drop(log_file); + tokio::fs::remove_file(&log_path).await.ok(); + log_file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path) + .await?; + + updates_since_checkpoint = 0; + logger::log_info("✓ Checkpoint created and log cleared").await; + } - // Atomic rename - this is the commit point - // After this succeeds, the checkpoint is visible - tokio::fs::rename(&checkpoint_tmp, &companies_path).await?; - - // Clear log after successful checkpoint - // Any entries before this point are now captured in the checkpoint - drop(log_file); - tokio::fs::remove_file(&log_path).await.ok(); - log_file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path) - .await?; - - updates_since_checkpoint = 0; - logger::log_info("✓ Checkpoint created and log cleared").await; + if count % 10 == 0 { + logger::log_info(&format!( + "Progress: {} companies ({} new, {} updated)", + count, new_count, updated_count + )).await; + } } - - if count % 10 == 0 { - logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await; - tokio::task::yield_now().await; + Ok(None) => { + // Company had no ISINs or was skipped + logger::log_info(&format!("Skipped company: {} (no ISINs)", name)).await; + } + Err(e) => { + logger::log_warn(&format!("Error processing company {}: {}", name, e)).await; } } - // Time-based fsync: Even if this company didn't result in a write, - // fsync any pending writes if enough time has passed - // This reduces data loss window during long Yahoo lookup operations + // Time-based fsync if writes_since_fsync > 0 && last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS { log_file.flush().await?; log_file.sync_data().await?; writes_since_fsync = 0; last_fsync = std::time::Instant::now(); - logger::log_info("Time-based fsync completed").await; } } - // === FSYNC PENDING WRITES: Even if shutdown requested, save what we have === + // === FSYNC PENDING WRITES === if writes_since_fsync > 0 { logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await; log_file.flush().await?; @@ -419,9 +327,7 @@ async fn build_companies_jsonl_streaming( logger::log_info("✓ Pending writes saved").await; } - // === FINAL CHECKPOINT: Write complete final state === - // This ensures we don't need to replay the log on next startup - // (Pending writes were already fsynced above) + // === FINAL CHECKPOINT === if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 { logger::log_info("Creating final checkpoint...").await; @@ -438,21 +344,172 @@ async fn build_companies_jsonl_streaming( checkpoint_file.sync_all().await?; drop(checkpoint_file); - // Atomic rename makes final checkpoint visible tokio::fs::rename(&checkpoint_tmp, &companies_path).await?; - // Clean up log drop(log_file); tokio::fs::remove_file(&log_path).await.ok(); logger::log_info("✓ Final checkpoint created").await; } - logger::log_info(&format!("Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count)).await; + logger::log_info(&format!( + "Completed: {} total companies ({} new, {} updated)", + count, new_count, updated_count + )).await; Ok(count) } +/// UPDATED: Process single company serially with validation +async fn process_single_company_serial( + name: String, + company_info: CompanyInfo, + existing_entry: Option, + pool: &Arc, + shutdown_flag: &Arc, +) -> anyhow::Result> { + // Check shutdown at start + if shutdown_flag.load(Ordering::SeqCst) { + return Ok(None); + } + + let mut isin_tickers_map: HashMap> = + existing_entry + .as_ref() + .map(|e| e.isin_tickers_map.clone()) + .unwrap_or_default(); + + let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); + let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone()); + + // Collect unique ISIN-ticker pairs + let mut unique_isin_ticker_pairs: HashMap> = HashMap::new(); + + for figi_infos in company_info.securities.values() { + for figi_info in figi_infos { + if !figi_info.isin.is_empty() { + let tickers = unique_isin_ticker_pairs + .entry(figi_info.isin.clone()) + .or_insert_with(Vec::new); + + if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) { + tickers.push(figi_info.ticker.clone()); + } + } + } + } + + // Process each ISIN with validation + for (isin, figi_tickers) in unique_isin_ticker_pairs { + // Check shutdown before each ISIN + if shutdown_flag.load(Ordering::SeqCst) { + return Ok(None); + } + + let tickers = isin_tickers_map + .entry(isin.clone()) + .or_insert_with(Vec::new); + + for figi_ticker in figi_tickers { + if !tickers.contains(&figi_ticker) { + tickers.push(figi_ticker); + } + } + + let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); + + if !has_yahoo_ticker { + logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; + + // Use validated scraping with retry + match scrape_with_retry_serial(pool, &isin, 3, shutdown_flag).await { + Ok(Some(details)) => { + logger::log_info(&format!( + "✓ Found Yahoo ticker {} for ISIN {} (company: {})", + details.ticker, isin, name + )).await; + + tickers.push(format!("YAHOO:{}", details.ticker)); + + if sector.is_none() && details.sector.is_some() { + sector = details.sector.clone(); + } + + if exchange.is_none() && details.exchange.is_some() { + exchange = details.exchange.clone(); + } + }, + Ok(None) => { + logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await; + tickers.push("YAHOO:NO_RESULTS".to_string()); + }, + Err(e) => { + if shutdown_flag.load(Ordering::SeqCst) { + return Ok(None); + } + logger::log_warn(&format!( + "✗ Yahoo lookup error for ISIN {} (company: {}): {}", + isin, name, e + )).await; + } + } + } + } + + // Final shutdown check + if shutdown_flag.load(Ordering::SeqCst) { + return Ok(None); + } + + if !isin_tickers_map.is_empty() { + Ok(Some(CompanyCrossPlatformInfo { + name, + isin_tickers_map, + sector, + exchange, + })) + } else { + Ok(None) + } +} + +/// UPDATED: Scrape with retry for serial processing +async fn scrape_with_retry_serial( + pool: &Arc, + isin: &str, + max_retries: u32, + shutdown_flag: &Arc, +) -> anyhow::Result> { + let mut retries = 0; + + loop { + if shutdown_flag.load(Ordering::SeqCst) { + return Err(anyhow::anyhow!("Aborted due to shutdown")); + } + + match scrape_company_details_by_isin(pool, isin, shutdown_flag).await { + Ok(result) => return Ok(result), + Err(e) => { + if retries >= max_retries { + return Err(e); + } + + let backoff_ms = 1000 * 2u64.pow(retries); + let jitter_ms = random_range(0, 500); + let total_delay = backoff_ms + jitter_ms; + + logger::log_warn(&format!( + "Retry {}/{} for ISIN {} after {}ms: {}", + retries + 1, max_retries, isin, total_delay, e + )).await; + + tokio::time::sleep(tokio::time::Duration::from_millis(total_delay)).await; + retries += 1; + } + } + } +} + async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); diff --git a/src/corporate/update_parallel.rs b/src/corporate/update_parallel.rs index 78eaf30..4dfbaea 100644 --- a/src/corporate/update_parallel.rs +++ b/src/corporate/update_parallel.rs @@ -1,13 +1,13 @@ -// src/corporate/update_parallel.rs -// PARALLELIZED VERSION of build_companies_jsonl_streaming +// src/corporate/update_parallel.rs - UPDATED WITH DATA INTEGRITY FIXES +// PARALLELIZED VERSION with atomic commits and validation // -// Key improvements: -// - Processes multiple companies concurrently using the ChromeDriverPool -// - Maintains data safety with serialized log writes via channel -// - Respects pool size limits via semaphore -// - All fsync and checkpoint logic preserved +// Key improvements over original: +// - Page validation to prevent stale content extraction +// - Shutdown-aware task processing +// - Better error recovery with browser state cleanup +// - All original fsync and checkpoint logic preserved -use super::{types::*, yahoo::*}; +use super::{types::*, yahoo::*, helpers::*}; use crate::util::directories::DataPaths; use crate::util::logger; use crate::scraper::webdriver::ChromeDriverPool; @@ -24,7 +24,6 @@ use std::time::Duration; use futures::stream::{FuturesUnordered, StreamExt}; use anyhow::{anyhow, Context, Result}; - /// Represents a write command to be serialized through the log writer enum LogCommand { Write(CompanyCrossPlatformInfo), @@ -38,25 +37,13 @@ struct CompanyProcessResult { is_update: bool, } -/// Abort-safe incremental JSONL persistence with atomic checkpoints (PARALLELIZED) +/// UPDATED: Abort-safe incremental JSONL persistence with validation /// -/// Implements the data_updating_rule.md specification with concurrent processing: -/// - Append-only JSONL log for all updates -/// - Batched fsync for performance (configurable batch size) -/// - Time-based fsync for safety (max 10 seconds without fsync) -/// - Atomic checkpoints via temp file + rename -/// - Crash recovery by loading checkpoint + replaying log -/// - Partial lines automatically ignored by .lines() iterator -/// - PARALLEL processing of companies using ChromeDriverPool -/// - Serialized log writes for data safety -/// -/// # Parallelization Strategy -/// -/// - Multiple companies processed concurrently (limited by pool size) -/// - Each company's Yahoo lookups happen in parallel -/// - Log writes are serialized through a channel -/// - Pool's semaphore naturally limits concurrency -/// - All fsync and checkpoint logic preserved +/// New safety features: +/// - Page validation before extraction +/// - Shutdown checks at all critical points +/// - Browser state cleanup on errors +/// - All writes still atomic with fsync pub async fn build_companies_jsonl_streaming_parallel( paths: &DataPaths, pool: &Arc, @@ -66,7 +53,7 @@ pub async fn build_companies_jsonl_streaming_parallel( const CHECKPOINT_INTERVAL: usize = 50; const FSYNC_BATCH_SIZE: usize = 10; const FSYNC_INTERVAL_SECS: u64 = 10; - const CONCURRENCY_LIMIT: usize = 100; // Max companies processing at once + const CONCURRENCY_LIMIT: usize = 100; let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); @@ -140,7 +127,6 @@ pub async fn build_companies_jsonl_streaming_parallel( } // === SETUP LOG WRITER TASK === - // This task serializes all log writes to maintain data safety let (write_tx, mut write_rx) = mpsc::channel::(1000); let log_file_init = OpenOptions::new() @@ -153,8 +139,10 @@ pub async fn build_companies_jsonl_streaming_parallel( let log_path_clone = log_path.clone(); let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); + let write_tx_for_writer = write_tx.clone(); + let writer_task = tokio::spawn(async move { - let mut log_file = log_file_init; // Move into the task + let mut log_file = log_file_init; let mut writes_since_fsync = 0; let mut last_fsync = std::time::Instant::now(); let mut updates_since_checkpoint = 0; @@ -208,187 +196,127 @@ pub async fn build_companies_jsonl_streaming_parallel( writes_since_fsync = 0; last_fsync = std::time::Instant::now(); } - - // Periodic checkpoint - if updates_since_checkpoint >= CHECKPOINT_INTERVAL { - // Fsync pending writes before checkpoint - if writes_since_fsync > 0 { - let _ = log_file.flush().await; - let _ = log_file.sync_data().await; - writes_since_fsync = 0; - last_fsync = std::time::Instant::now(); - } - - logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await; - - let checkpoint_tmp = companies_path_clone.with_extension("jsonl.tmp"); - let mut checkpoint_file = match tokio::fs::File::create(&checkpoint_tmp).await { - Ok(f) => f, - Err(e) => { - logger::log_error(&format!("Failed to create checkpoint: {}", e)).await; - break; - } - }; - - let existing_companies = existing_companies_writer.lock().await; - for company in existing_companies.values() { - let line = serde_json::to_string(company).unwrap(); - let _ = checkpoint_file.write_all(line.as_bytes()).await; - let _ = checkpoint_file.write_all(b"\n").await; - } - drop(existing_companies); - - let _ = checkpoint_file.flush().await; - let _ = checkpoint_file.sync_all().await; - drop(checkpoint_file); - - let _ = tokio::fs::rename(&checkpoint_tmp, &companies_path_clone).await; - - // Clear log and reopen - drop(log_file); - let _ = tokio::fs::remove_file(&log_path_clone).await; - - // Reopen log file - match OpenOptions::new() - .create(true) - .append(true) - .open(&log_path_clone) - .await { - Ok(new_file) => { - log_file = new_file; - updates_since_checkpoint = 0; - logger::log_info("✓ Checkpoint created and log cleared").await; - } - Err(e) => { - logger::log_error(&format!("Failed to reopen log: {}", e)).await; - break; - } - } - } - - if count % 10 == 0 { - logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await; - } - }, + } LogCommand::Checkpoint => { - // Force checkpoint - this is the final checkpoint before shutdown - if writes_since_fsync > 0 { - let _ = log_file.flush().await; - let _ = log_file.sync_data().await; + if let Err(e) = log_file.flush().await { + logger::log_error(&format!("Failed to flush before checkpoint: {}", e)).await; + break; + } + if let Err(e) = log_file.sync_data().await { + logger::log_error(&format!("Failed to fsync before checkpoint: {}", e)).await; + break; } - logger::log_info("Creating final checkpoint...").await; - let checkpoint_tmp = companies_path_clone.with_extension("jsonl.tmp"); - if let Ok(mut checkpoint_file) = tokio::fs::File::create(&checkpoint_tmp).await { - let existing_companies = existing_companies_writer.lock().await; - for company in existing_companies.values() { - let line = serde_json::to_string(company).unwrap(); - let _ = checkpoint_file.write_all(line.as_bytes()).await; - let _ = checkpoint_file.write_all(b"\n").await; + let existing_companies = existing_companies_writer.lock().await; + let companies_vec: Vec<_> = existing_companies.values().cloned().collect(); + drop(existing_companies); + + let temp_path = companies_path_clone.with_extension("tmp"); + match tokio::fs::File::create(&temp_path).await { + Ok(mut temp_file) => { + let mut checkpoint_ok = true; + for company in &companies_vec { + if let Ok(line) = serde_json::to_string(company) { + if temp_file.write_all(line.as_bytes()).await.is_err() || + temp_file.write_all(b"\n").await.is_err() { + checkpoint_ok = false; + break; + } + } + } + + if checkpoint_ok { + if temp_file.flush().await.is_ok() && + temp_file.sync_data().await.is_ok() { + drop(temp_file); + + if tokio::fs::rename(&temp_path, &companies_path_clone).await.is_ok() { + if tokio::fs::remove_file(&log_path_clone).await.is_ok() { + logger::log_info(&format!( + "✓ Checkpoint created ({} companies), log cleared", + companies_vec.len() + )).await; + + if let Ok(new_log) = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path_clone) + .await { + log_file = new_log; + } + } + } + } + } + } + Err(e) => { + logger::log_error(&format!("Failed to create checkpoint temp file: {}", e)).await; } - drop(existing_companies); - - let _ = checkpoint_file.flush().await; - let _ = checkpoint_file.sync_all().await; - drop(checkpoint_file); - let _ = tokio::fs::rename(&checkpoint_tmp, &companies_path_clone).await; - - // Clean up log file after final checkpoint - drop(log_file); - let _ = tokio::fs::remove_file(&log_path_clone).await; - - logger::log_info("✓ Final checkpoint created").await; } - // After final checkpoint, exit the loop - break; - }, + updates_since_checkpoint = 0; + } LogCommand::Shutdown => { - // Fsync any pending writes before exit - if writes_since_fsync > 0 { - logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await; - let _ = log_file.flush().await; - let _ = log_file.sync_data().await; - } + logger::log_info("Writer shutting down...").await; break; } } + + // Periodic checkpoint trigger + if updates_since_checkpoint >= CHECKPOINT_INTERVAL { + let _ = write_tx.send(LogCommand::Checkpoint).await; + } } + // Final fsync + let _ = log_file.flush().await; + let _ = log_file.sync_data().await; + + logger::log_info(&format!( + "Writer finished: {} total ({} new, {} updated)", + count, new_count, updated_count + )).await; + (count, new_count, updated_count) }); - // === PARALLEL COMPANY PROCESSING === - logger::log_info(&format!("Processing companies in parallel (max {} concurrent, pool size: {})", - CONCURRENCY_LIMIT, pool.get_number_of_instances())).await; - - let pool = pool.clone(); - let shutdown_flag = shutdown_flag.clone(); + // === PARALLEL PROCESSING PHASE === + logger::log_info(&format!( + "Starting parallel processing of {} companies (concurrency limit: {})", + securities.len(), + CONCURRENCY_LIMIT + )).await; let mut processing_tasks = FuturesUnordered::new(); - let mut pending_companies = Vec::new(); - - // Collect companies to process - for (name, company_info) in securities.iter() { - if processed_names.contains(name) { - continue; - } - pending_companies.push((name.clone(), company_info.clone())); - } - - logger::log_info(&format!("Found {} companies to process", pending_companies.len())).await; - - // Process companies in chunks to limit memory usage - let chunk_size = CONCURRENCY_LIMIT; let mut processed = 0; + let total = securities.len(); - for chunk in pending_companies.chunks(chunk_size) { + for (name, company_info) in securities.into_iter() { + // Check shutdown before creating new tasks if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected, stopping task creation").await; break; } - // Launch tasks for this chunk - for (name, company_info) in chunk { - let name = name.clone(); - let company_info = company_info.clone(); - let pool = pool.clone(); - let shutdown_flag = shutdown_flag.clone(); - let existing_entry = existing_companies.get(&name).cloned(); - - let task = tokio::spawn(async move { - process_single_company( - name, - company_info, - existing_entry, - &pool, - &shutdown_flag - ).await - }); - - processing_tasks.push(task); - } - - // Wait for chunk to complete - while let Some(result) = processing_tasks.next().await { - match result { - Ok(Ok(Some(company_result))) => { - // Send to writer - if write_tx.send(LogCommand::Write(company_result.company)).await.is_err() { - logger::log_error("Writer task died, stopping processing").await; - break; + // Wait if we hit concurrency limit + while processing_tasks.len() >= CONCURRENCY_LIMIT { + if let Some(result) = processing_tasks.next().await { + match result { + Ok(Ok(Some(company_result))) => { + let company_result: CompanyProcessResult = company_result; + let _ = write_tx_for_writer.send(LogCommand::Write(company_result.company)).await?; + processed += 1; + } + Ok(Ok(None)) => { + processed += 1; + } + Ok(Err(e)) => { + logger::log_warn(&format!("Company processing error: {}", e)).await; + processed += 1; + } + Err(e) => { + logger::log_error(&format!("Task panic: {}", e)).await; + processed += 1; } - processed += 1; - } - Ok(Ok(None)) => { - // Company had no ISINs or was skipped - processed += 1; - } - Ok(Err(e)) => { - logger::log_warn(&format!("Company processing error: {}", e)).await; - processed += 1; - } - Err(e) => { - logger::log_error(&format!("Task panic: {}", e)).await; - processed += 1; } } @@ -400,11 +328,67 @@ pub async fn build_companies_jsonl_streaming_parallel( if shutdown_flag.load(Ordering::SeqCst) { break; } + + // Spawn new task + let pool = pool.clone(); + let shutdown_flag = shutdown_flag.clone(); + let existing_entry = existing_companies.get(&name).cloned(); + + let task = tokio::spawn(async move { + process_single_company_validated( + name, + company_info, + existing_entry, + &pool, + &shutdown_flag + ).await + }); + + processing_tasks.push(task); + + if processed % 10 == 0 && processed > 0 { + logger::log_info(&format!("Progress: {}/{} companies processed", processed, total)).await; + } + } + + // Wait for remaining tasks + logger::log_info(&format!( + "Waiting for {} remaining tasks to complete...", + processing_tasks.len() + )).await; + + while let Some(result) = processing_tasks.next().await { + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn("Shutdown detected during final task wait").await; + break; + } + + match result { + Ok(Ok(Some(company_result))) => { + if write_tx_for_writer.send(LogCommand::Write(company_result.company)).await.is_err() { + logger::log_error("Writer task died").await; + break; + } + processed += 1; + } + Ok(Ok(None)) => { + processed += 1; + } + Ok(Err(e)) => { + logger::log_warn(&format!("Company processing error: {}", e)).await; + processed += 1; + } + Err(e) => { + logger::log_error(&format!("Task panic: {}", e)).await; + processed += 1; + } + } } // Signal writer to finish - let _ = write_tx.send(LogCommand::Shutdown).await; - drop(write_tx); + let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await; + let _ = write_tx_for_writer.send(LogCommand::Shutdown).await; + drop(write_tx_for_writer); // Wait for writer to finish let (final_count, final_new, final_updated) = writer_task.await @@ -418,23 +402,34 @@ pub async fn build_companies_jsonl_streaming_parallel( Ok(final_count) } +/// Scrape with retry, validation, and shutdown awareness async fn scrape_with_retry( pool: &Arc, isin: &str, max_retries: u32, + shutdown_flag: &Arc, ) -> Result> { let mut retries = 0; loop { - match scrape_company_details_by_isin(pool, isin).await { + // Check shutdown before each attempt + if shutdown_flag.load(Ordering::SeqCst) { + return Err(anyhow!("Aborted due to shutdown")); + } + + match scrape_company_details_by_isin(pool, isin, shutdown_flag).await { Ok(result) => return Ok(result), Err(e) => { if retries >= max_retries { + logger::log_error(&format!( + "All {} retries exhausted for ISIN {}: {}", + max_retries, isin, e + )).await; return Err(e); } - let backoff_ms = 1000 * 2u64.pow(retries); // 1s, 2s, 4s, 8s - let jitter_ms = rand::rng().random_range(0..500); // +0-500ms Jitter + let backoff_ms = 1000 * 2u64.pow(retries); + let jitter_ms = random_range(0, 500); let total_delay = backoff_ms + jitter_ms; logger::log_warn(&format!( @@ -449,14 +444,20 @@ async fn scrape_with_retry( } } -/// Process a single company: fetch Yahoo data for its ISINs -async fn process_single_company( +/// UPDATED: Process single company with validation and shutdown checks +async fn process_single_company_validated( name: String, company_info: CompanyInfo, existing_entry: Option, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result> { + // Check shutdown at start + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn(&format!("Shutdown detected, skipping company: {}", name)).await; + return Ok(None); + } + let is_update = existing_entry.is_some(); let mut isin_tickers_map: HashMap> = @@ -485,9 +486,14 @@ async fn process_single_company( } } - // Process each ISIN (these Yahoo lookups will happen in parallel across companies) + // Process each ISIN with validation for (isin, figi_tickers) in unique_isin_ticker_pairs { + // Check shutdown before each ISIN if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn(&format!( + "Shutdown detected while processing company: {}", + name + )).await; break; } @@ -503,11 +509,15 @@ async fn process_single_company( let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); - if !has_yahoo_ticker && !shutdown_flag.load(Ordering::SeqCst) { + if !has_yahoo_ticker { logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; - match scrape_with_retry(pool, &isin, 3).await { + + match scrape_with_retry(pool, &isin, 3, shutdown_flag).await { Ok(Some(details)) => { - logger::log_info(&format!("✓ Found Yahoo ticker {} for ISIN {}", details.ticker, isin)).await; + logger::log_info(&format!( + "✓ Found Yahoo ticker {} for ISIN {} (company: {})", + details.ticker, isin, name + )).await; tickers.push(format!("YAHOO:{}", details.ticker)); @@ -522,20 +532,30 @@ async fn process_single_company( } }, Ok(None) => { - logger::log_warn(&format!("◯ No search results for ISIN {}", isin)).await; + logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await; tickers.push("YAHOO:NO_RESULTS".to_string()); }, Err(e) => { if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn(&format!("Shutdown during scrape for ISIN {}", isin)).await; break; } - logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await; + logger::log_warn(&format!( + "✗ Yahoo lookup error for ISIN {} (company: {}): {}", + isin, name, e + )).await; + // Continue with next ISIN } } } } + // Final shutdown check before returning result if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn(&format!( + "Shutdown detected, discarding incomplete result for: {}", + name + )).await; return Ok(None); } @@ -552,6 +572,7 @@ async fn process_single_company( is_update, })) } else { + logger::log_warn(&format!("No ISINs found for company: {}", name)).await; Ok(None) } } \ No newline at end of file diff --git a/src/corporate/yahoo.rs b/src/corporate/yahoo.rs index 28c71c0..5afe96c 100644 --- a/src/corporate/yahoo.rs +++ b/src/corporate/yahoo.rs @@ -1,18 +1,15 @@ -// src/corporate/yahoo.rs -use super::{types::*, helpers::*}; +// src/corporate/yahoo.rs - UPDATED WITH DATA INTEGRITY FIXES +use super::{types::*, helpers::*, page_validation::*}; use crate::{scraper::webdriver::*, util::{directories::DataPaths}}; -use event_backtest_engine::logger; +use crate::logger; use fantoccini::{Client, Locator}; use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::time::{Duration as TokioDuration, sleep, timeout}; -use std::{sync::Arc}; +use std::{sync::Arc, sync::atomic::{AtomicBool, Ordering}}; use anyhow::{anyhow, Result}; const YAHOO_COMPANY_EXTRACTION_JS: &str = include_str!("yahoo_company_extraction.js"); -/// Mapping existing - -/// getting historical stock price data daily (xxxx - 2025) and hourly (last 30 days) #[derive(Debug, Clone, Serialize, Deserialize)] pub enum YahooTickerResult { @@ -66,29 +63,137 @@ impl YahooTickerResult { } } +/// UPDATED: Scrape company details with full validation and shutdown support pub async fn scrape_company_details_by_isin( pool: &Arc, isin: &str, + shutdown_flag: &Arc, ) -> anyhow::Result> { - let isin = isin.to_string(); - pool.execute(format!("https://finance.yahoo.com/lookup/?s={}", isin), move |client| { - let isin = isin.clone(); + // Check shutdown before starting + if shutdown_flag.load(Ordering::SeqCst) { + logger::log_warn(&format!("Shutdown detected, skipping ISIN: {}", isin)).await; + return Ok(None); + } + + let isin_owned = isin.to_string(); + let shutdown_clone = Arc::clone(shutdown_flag); + let url = format!("https://finance.yahoo.com/lookup/?s={}", isin); + + pool.execute(url.clone(), move |client| { + let isin = isin_owned.clone(); + let shutdown = shutdown_clone.clone(); + Box::pin(async move { - // Random Delay between 800-1500ms + // Check shutdown during task execution + if shutdown.load(Ordering::SeqCst) { + return Err(anyhow!("Task aborted due to shutdown")); + } + + // Random delay let delay = rand::rng().random_range(800..1500); sleep(TokioDuration::from_millis(delay)).await; + // Reject cookies reject_yahoo_cookies(&client).await?; - // Random Delay + // Check shutdown again + if shutdown.load(Ordering::SeqCst) { + return Err(anyhow!("Task aborted due to shutdown")); + } + + // CRITICAL: Validate navigation succeeded + let expected_fragment = format!("lookup/?s={}", isin); + match verify_navigation(&client, &expected_fragment, 5).await { + Ok(_) => { + logger::log_info(&format!("✓ Navigation validated for ISIN: {}", isin)).await; + } + Err(e) => { + logger::log_error(&format!( + "Navigation verification failed for ISIN {}: {}", + isin, e + )).await; + // Clear browser state before returning error + clear_browser_state(&client).await.ok(); + return Err(e); + } + } + + // Additional content validation + let page_ready: bool = client + .execute( + r#" + const table = document.querySelector('#main-content-wrapper > section > section.container.yf-1omxedn > div.tableContainer.yf-1omxedn > div > table'); + const noData = document.querySelector('#main-content-wrapper > section > div.noData.yf-1omxedn'); + return !!(table || noData); + "#, + vec![], + ) + .await? + .as_bool() + .unwrap_or(false); + + if !page_ready { + logger::log_error(&format!( + "Page content not ready for ISIN {} - neither table nor no-data element found", + isin + )).await; + clear_browser_state(&client).await.ok(); + return Err(anyhow!("Page content not ready")); + } + + logger::log_info(&format!("✓ Page content validated for ISIN: {}", isin)).await; + + // Check shutdown before extraction + if shutdown.load(Ordering::SeqCst) { + return Err(anyhow!("Task aborted due to shutdown")); + } + + // Random delay before extraction let delay = rand::rng().random_range(800..1500); sleep(TokioDuration::from_millis(delay)).await; - extract_company_details(&client, &isin).await + // Now safe to extract + extract_company_details_validated(&client, &isin).await }) }).await } +/// UPDATED: Extract with additional URL validation +async fn extract_company_details_validated( + client: &Client, + isin: &str, +) -> Result> { + // Double-check URL is still correct before extraction + let current_url = client.current_url().await?; + if !current_url.as_str().contains(isin) { + logger::log_error(&format!( + "URL mismatch before extraction: expected ISIN '{}' in URL, got '{}'", + isin, + current_url.as_str() + )).await; + clear_browser_state(client).await.ok(); + return Err(anyhow!("URL mismatch - possible stale page")); + } + + // Run extraction + let result = extract_company_details(client, isin).await?; + + // Validate extraction result + if let Some(ref details) = result { + logger::log_info(&format!( + "✓ Extracted ticker '{}' for ISIN {} (sector: {:?}, exchange: {:?})", + details.ticker, isin, details.sector, details.exchange + )).await; + } else { + logger::log_info(&format!( + "No ticker found for ISIN {} (legitimately not found)", + isin + )).await; + } + + Ok(result) +} + pub async fn extract_company_details( client: &Client, _isin: &str, @@ -153,17 +258,13 @@ pub async fn extract_company_details( // Parse the JSON result let extraction: ExtractionResult = serde_json::from_value(result.clone()) .map_err(|e| { - // Log the problematic result value for debugging let result_str = serde_json::to_string_pretty(&result).unwrap_or_else(|_| format!("{:?}", result)); anyhow!("Failed to parse extraction result: {}. Raw result: {}", e, result_str) })?; match extraction.status.as_str() { "found" => { - // Ticker is guaranteed to be present when status is "found" - // Sector and exchange are optional if let Some(ticker) = extraction.ticker { - // Log metadata if available if let Some(ref metadata) = extraction.metadata { logger::log_info(&format!( "Selected row {} with {} valid fields out of {} total rows", @@ -179,13 +280,11 @@ pub async fn extract_company_details( exchange: extraction.exchange, })) } else { - // This shouldn't happen if JS script is working correctly Err(anyhow!("Status 'found' but no ticker present")) } }, "no_results" => Ok(None), "error" => { - // Error status means ticker was not found or extraction failed let error_msg = extraction.error_message.unwrap_or_else(|| "Unknown error".to_string()); Err(anyhow!("JavaScript extraction error: {}", error_msg)) }, @@ -207,19 +306,6 @@ pub async fn get_all_tickers_from_companies_jsonl(paths: &DataPaths) -> anyhow:: Ok(tickers) } -/// Fetches earnings events for a ticker using a dedicated ScrapeTask. -/// -/// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar, -/// reject cookies, and extract the events. -/// -/// # Arguments -/// * `ticker` - The stock ticker symbol. -/// -/// # Returns -/// A vector of CompanyEvent structs on success. -/// -/// # Errors -/// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues. pub async fn fetch_earnings_with_pool( pool: &Arc, ticker: &str, @@ -238,40 +324,6 @@ pub async fn fetch_earnings_with_pool( }).await } -/// Extracts earnings events from the currently loaded Yahoo Finance earnings calendar page. -/// -/// This function assumes the client is already navigated to the correct URL (e.g., -/// https://finance.yahoo.com/calendar/earnings?symbol={ticker}) and cookies are handled. -/// -/// It waits for the earnings table, extracts rows, parses cells into CompanyEvent structs, -/// and handles date parsing, float parsing, and optional fields. -/// -/// # Arguments -/// * `client` - The fantoccini Client with the page loaded. -/// * `ticker` - The stock ticker symbol for the events. -/// -/// # Returns -/// A vector of CompanyEvent on success. -/// -/// # Errors -/// Returns an error if: -/// - Table or elements not found. -/// - Date or float parsing fails. -/// - WebDriver operations fail. -/// -/// # Examples -/// -/// ```no_run -/// use fantoccini::Client; -/// use crate::corporate::scraper::extract_earnings; -/// -/// #[tokio::main] -/// async fn main() -> Result<()> { -/// // Assume client is set up and navigated -/// let events = extract_earnings(&client, "AAPL").await?; -/// Ok(()) -/// } -/// ``` pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result> { // Wait for the table to load let table = client diff --git a/src/economic/scraper.rs b/src/economic/scraper.rs index dfb0280..7feaebf 100644 --- a/src/economic/scraper.rs +++ b/src/economic/scraper.rs @@ -1,6 +1,6 @@ // src/economic/scraper.rs use super::types::{EconomicEvent}; -use event_backtest_engine::logger; +use crate::logger; use fantoccini::Client; use tokio::time::{sleep, Duration}; diff --git a/src/economic/storage.rs b/src/economic/storage.rs index c42d09f..7ff1908 100644 --- a/src/economic/storage.rs +++ b/src/economic/storage.rs @@ -84,7 +84,7 @@ pub async fn load_events_in_batches( Ok(all_events.into_iter()) } -/// NEW: Build a lightweight index instead of loading all events +/// Build a lightweight index instead of loading all events #[derive(Debug, Clone)] pub struct EventIndex { pub key: String, diff --git a/src/lib.rs b/src/lib.rs index 94734b1..91fe582 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,10 +6,12 @@ pub mod config; pub mod scraper; pub mod util; +pub mod monitoring; +pub mod economic; +pub mod corporate; // Re-export commonly used types for convenience +pub use monitoring::{init_monitoring, ConfigSnapshot, MonitoringEvent}; pub use config::Config; pub use scraper::webdriver::{ChromeDriverPool, ChromeInstance, ScrapeTask}; -pub use util::directories::DataPaths; pub use util::logger; -pub use util::opnv; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index aeb49b2..cde6161 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,9 @@ // src/main.rs -mod config; -mod corporate; -mod economic; -mod util; -mod scraper; +use web_scraper::{*, scraper, economic, corporate}; use anyhow::Result; -use config::Config; +use web_scraper::config::Config; use scraper::docker_vpn_proxy::{DockerVpnProxyPool, cleanup_all_proxy_containers}; use scraper::webdriver::ChromeDriverPool; use util::directories::DataPaths; @@ -34,13 +30,41 @@ async fn main() -> Result<()> { cleanup_all_proxy_containers().await.ok(); - - let config = Config::load().map_err(|err| { - eprintln!("Failed to load config: {}", err); - err - })?; + let config = match Config::load() { + Ok(cfg) => cfg, + Err(_) => { + eprintln!("Using default configuration"); + Config::default() + } + }; let paths = DataPaths::new(".")?; + + // Initialize monitoring system + let config_snapshot = ConfigSnapshot { + max_parallel_instances: config.max_parallel_instances, + max_tasks_per_instance: config.max_tasks_per_instance, + enable_vpn_rotation: config.enable_vpn_rotation, + max_requests_per_session: config.max_requests_per_session, + min_request_interval_ms: config.min_request_interval_ms, + max_retry_attempts: config.max_retry_attempts, + }; + + let (monitoring_handle, _monitoring_task) = init_monitoring( + config_snapshot, + paths.logs_dir().to_path_buf(), + 3030, // Dashboard port + ).await?; + + // Emit pool initialization event + monitoring_handle.emit(monitoring::MonitoringEvent::PoolInitialized { + pool_size: config.max_parallel_instances, + with_proxy: config.enable_vpn_rotation, + with_rotation: config.max_tasks_per_instance > 0, + }); + + logger::log_info("Monitoring dashboard available at http://localhost:3030").await; + logger::init_debug_logger(paths.logs_dir()).await.ok(); logger::log_info("=== Event Backtest Engine Started ===").await; logger::log_info(&format!( @@ -56,7 +80,8 @@ async fn main() -> Result<()> { // === Step 1: Fetch VPNBook configs === let proxy_pool: Option> = if config.enable_vpn_rotation { logger::log_info("VPN Rotation Enabled – Fetching latest VPNBook configs").await; - let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit(config.max_parallel_instances, None, 1).await?); + let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit(None, &config, Some(monitoring_handle.clone())).await?); + let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?; logger::log_info(&format!("VPNBook credentials → User: {}", username)).await; @@ -72,6 +97,16 @@ async fn main() -> Result<()> { let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await?); logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await; + for i in 0..pp.num_proxies() { + if let Some(proxy_info) = pp.get_proxy_info(i) { + monitoring_handle.emit(monitoring::MonitoringEvent::ProxyConnected { + container_name: proxy_info.container_name.clone(), + ip_address: proxy_info.ip_address.clone(), + port: proxy_info.port, + }); + } + } + Some(pp) } } else { @@ -87,9 +122,9 @@ async fn main() -> Result<()> { let pool = Arc::new( if task_limit > 0 { - ChromeDriverPool::new_with_proxy_and_task_limit(pool_size, proxy_pool.clone(), task_limit).await? + ChromeDriverPool::new_with_proxy_and_task_limit(proxy_pool.clone(), &config, Some(monitoring_handle.clone())).await? } else { - ChromeDriverPool::new_with_proxy(pool_size, proxy_pool.clone()).await? + ChromeDriverPool::new_with_proxy_and_task_limit(proxy_pool.clone(), &config, Some(monitoring_handle.clone())).await? } ); diff --git a/src/monitoring/dashboard.html b/src/monitoring/dashboard.html new file mode 100644 index 0000000..ad62db4 --- /dev/null +++ b/src/monitoring/dashboard.html @@ -0,0 +1,644 @@ + + + + + + Scraper Monitoring Dashboard + + + +
+ Connecting... +
+ +
+

🚀 Scraper Monitoring Dashboard

+
Uptime: Loading...
+
+ + +
+
⚙️ CONFIGURATION
+
+
+ + +
+
🔧 POOL STATUS
+
+
+ + +
+
📊 GLOBAL METRICS
+
+
+ + +
+
📝 RECENT LOGS
+
+
+ + + + \ No newline at end of file diff --git a/src/monitoring/events.rs b/src/monitoring/events.rs new file mode 100644 index 0000000..f6b5011 --- /dev/null +++ b/src/monitoring/events.rs @@ -0,0 +1,129 @@ +// src/monitoring/events.rs +use super::metrics::ProxyInfo; + +/// Events emitted by the scraper system +#[derive(Debug, Clone)] +pub enum MonitoringEvent { + // Pool initialization + PoolInitialized { + pool_size: usize, + with_proxy: bool, + with_rotation: bool, + }, + + // Instance lifecycle + InstanceCreated { + instance_id: usize, + max_tasks: usize, + proxy: Option, + }, + + InstanceStatusChanged { + instance_id: usize, + status: InstanceStatusChange, + }, + + // Task execution + TaskStarted { + instance_id: usize, + url: String, + }, + + TaskCompleted { + instance_id: usize, + success: bool, + duration_ms: u64, + error: Option, + }, + + NavigationTimeout { + instance_id: usize, + url: String, + }, + + BotDetectionTriggered { + instance_id: usize, + url: String, + }, + + // Session management + SessionStarted { + instance_id: usize, + proxy: Option, + }, + + SessionRenewed { + instance_id: usize, + old_request_count: usize, + reason: RenewalReason, + new_proxy: Option, + }, + + SessionRequestIncremented { + instance_id: usize, + new_count: usize, + }, + + // Proxy events + ProxyConnected { + container_name: String, + ip_address: String, + port: u16, + }, + + ProxyFailed { + container_name: String, + error: String, + }, + + ProxyRotated { + instance_id: usize, + old_proxy: Option, + new_proxy: String, + }, + + // Pool rotation events + RotationTriggered { + reason: String, + }, + + // Logging + LogMessage { + level: LogLevel, + message: String, + }, +} + +#[derive(Debug, Clone)] +pub enum InstanceStatusChange { + Idle, + Active, + Renewing, + Error(String), +} + +#[derive(Debug, Clone)] +pub enum RenewalReason { + TaskLimit, + RequestLimit, + Error, + Manual, +} + +#[derive(Debug, Clone)] +pub enum LogLevel { + Info, + Warn, + Error, +} + +impl std::fmt::Display for RenewalReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RenewalReason::TaskLimit => write!(f, "task_limit"), + RenewalReason::RequestLimit => write!(f, "request_limit"), + RenewalReason::Error => write!(f, "error"), + RenewalReason::Manual => write!(f, "manual"), + } + } +} \ No newline at end of file diff --git a/src/monitoring/logger.rs b/src/monitoring/logger.rs new file mode 100644 index 0000000..358c538 --- /dev/null +++ b/src/monitoring/logger.rs @@ -0,0 +1,103 @@ +// src/monitoring/logger.rs +use super::metrics::SessionSummary; +use chrono::Local; +use std::path::PathBuf; +use tokio::fs::OpenOptions; +use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; + +/// Logs session summaries to JSONL files +pub struct SessionLogger { + log_dir: PathBuf, + file: Mutex>, +} + +impl SessionLogger { + pub fn new(log_dir: PathBuf) -> Self { + Self { + log_dir, + file: Mutex::new(None), + } + } + + /// Log a completed session summary + pub async fn log_session(&self, summary: &SessionSummary) { + if let Err(e) = self.write_session(summary).await { + eprintln!("Failed to log session: {}", e); + } + } + + async fn write_session(&self, summary: &SessionSummary) -> anyhow::Result<()> { + let mut file_guard = self.file.lock().await; + + // Open file if not already open + if file_guard.is_none() { + let filename = format!( + "sessions_{}.jsonl", + Local::now().format("%Y%m%d") + ); + let filepath = self.log_dir.join(filename); + + tokio::fs::create_dir_all(&self.log_dir).await?; + + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&filepath) + .await?; + + *file_guard = Some(file); + } + + if let Some(file) = file_guard.as_mut() { + let json_line = serde_json::to_string(summary)?; + file.write_all(json_line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + } + + Ok(()) + } +} + +/// Logs metrics snapshots periodically +pub struct MetricsLogger { + log_dir: PathBuf, +} + +impl MetricsLogger { + pub fn new(log_dir: PathBuf) -> Self { + Self { log_dir } + } + + /// Log a metrics snapshot + pub async fn log_metrics(&self, state: &super::metrics::DashboardState) -> anyhow::Result<()> { + let filename = format!( + "metrics_{}.jsonl", + Local::now().format("%Y%m%d") + ); + let filepath = self.log_dir.join(filename); + + tokio::fs::create_dir_all(&self.log_dir).await?; + + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&filepath) + .await?; + + let snapshot = serde_json::json!({ + "timestamp": Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), + "global": state.global, + "instance_count": state.instances.len(), + "proxy_count": state.proxies.len(), + }); + + let json_line = serde_json::to_string(&snapshot)?; + file.write_all(json_line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + + Ok(()) + } +} \ No newline at end of file diff --git a/src/monitoring/metrics.rs b/src/monitoring/metrics.rs new file mode 100644 index 0000000..da7afec --- /dev/null +++ b/src/monitoring/metrics.rs @@ -0,0 +1,252 @@ +// src/monitoring/metrics.rs +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::Instant; + +/// Complete dashboard state sent to web clients +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DashboardState { + pub config: ConfigSnapshot, + pub instances: Vec, + pub proxies: Vec, + pub global: GlobalMetrics, + pub logs: Vec, +} + +/// Snapshot of configuration settings +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConfigSnapshot { + pub max_parallel_instances: usize, + pub max_tasks_per_instance: usize, + pub enable_vpn_rotation: bool, + pub max_requests_per_session: usize, + pub min_request_interval_ms: u64, + pub max_retry_attempts: u32, +} + +/// Metrics for a single ChromeDriver instance +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InstanceMetrics { + pub id: usize, + pub status: InstanceStatus, + pub current_task: Option, + pub tasks_current_session: usize, + pub tasks_max: usize, + pub session_requests: usize, + pub total_requests: usize, + pub success_count: usize, + pub failure_count: usize, + pub connected_proxy: Option, + pub last_activity: String, // Timestamp +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum InstanceStatus { + Idle, + Active, + Renewing, + Error, +} + +/// Information about a proxy connection +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProxyInfo { + pub container_name: String, + pub ip_address: String, + pub port: u16, + pub status: ProxyStatus, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum ProxyStatus { + Connected, + Disconnected, +} + +/// Metrics for a proxy +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProxyMetrics { + pub container_name: String, + pub ip_address: String, + pub port: u16, + pub status: ProxyStatus, + pub instances_using: Vec, +} + +/// Global pool metrics +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GlobalMetrics { + pub total_requests: usize, + pub successful_requests: usize, + pub failed_requests: usize, + pub success_rate: f64, + pub session_renewals: usize, + pub rotation_events: usize, + pub navigation_timeouts: usize, + pub bot_detection_hits: usize, + pub proxy_failures: usize, + pub uptime_seconds: u64, +} + +/// Log entry for display in dashboard +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LogEntry { + pub timestamp: String, + pub level: LogLevel, + pub message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum LogLevel { + Info, + Warn, + Error, +} + +/// Internal state tracked by monitoring service +#[derive(Debug, Clone)] +pub struct MonitoringState { + pub instances: HashMap, + pub proxies: HashMap, + pub global: GlobalState, + pub start_time: Instant, +} + +#[derive(Debug, Clone)] +pub struct InstanceState { + pub id: usize, + pub status: InstanceStatus, + pub current_task: Option, + pub tasks_current_session: usize, + pub tasks_max: usize, + pub session_requests: usize, + pub total_requests: usize, + pub success_count: usize, + pub failure_count: usize, + pub connected_proxy: Option, + pub last_activity: Instant, +} + +#[derive(Debug, Clone)] +pub struct ProxyState { + pub container_name: String, + pub ip_address: String, + pub port: u16, + pub status: ProxyStatus, + pub instances_using: Vec, +} + +#[derive(Debug, Clone)] +pub struct GlobalState { + pub total_requests: usize, + pub successful_requests: usize, + pub failed_requests: usize, + pub session_renewals: usize, + pub rotation_events: usize, + pub navigation_timeouts: usize, + pub bot_detection_hits: usize, + pub proxy_failures: usize, +} + +impl MonitoringState { + pub fn new() -> Self { + Self { + instances: HashMap::new(), + proxies: HashMap::new(), + global: GlobalState { + total_requests: 0, + successful_requests: 0, + failed_requests: 0, + session_renewals: 0, + rotation_events: 0, + navigation_timeouts: 0, + bot_detection_hits: 0, + proxy_failures: 0, + }, + start_time: Instant::now(), + } + } + + /// Convert internal state to dashboard state for web clients + pub fn to_dashboard_state(&self, config: ConfigSnapshot, logs: Vec) -> DashboardState { + let instances: Vec = self + .instances + .values() + .map(|inst| InstanceMetrics { + id: inst.id, + status: inst.status.clone(), + current_task: inst.current_task.clone(), + tasks_current_session: inst.tasks_current_session, + tasks_max: inst.tasks_max, + session_requests: inst.session_requests, + total_requests: inst.total_requests, + success_count: inst.success_count, + failure_count: inst.failure_count, + connected_proxy: inst.connected_proxy.clone(), + last_activity: format_timestamp(inst.last_activity), + }) + .collect(); + + let proxies: Vec = self + .proxies + .values() + .map(|proxy| ProxyMetrics { + container_name: proxy.container_name.clone(), + ip_address: proxy.ip_address.clone(), + port: proxy.port, + status: proxy.status.clone(), + instances_using: proxy.instances_using.clone(), + }) + .collect(); + + let success_rate = if self.global.total_requests > 0 { + (self.global.successful_requests as f64 / self.global.total_requests as f64) * 100.0 + } else { + 0.0 + }; + + let global = GlobalMetrics { + total_requests: self.global.total_requests, + successful_requests: self.global.successful_requests, + failed_requests: self.global.failed_requests, + success_rate, + session_renewals: self.global.session_renewals, + rotation_events: self.global.rotation_events, + navigation_timeouts: self.global.navigation_timeouts, + bot_detection_hits: self.global.bot_detection_hits, + proxy_failures: self.global.proxy_failures, + uptime_seconds: self.start_time.elapsed().as_secs(), + }; + + DashboardState { + config, + instances, + proxies, + global, + logs, + } + } +} + +fn format_timestamp(instant: Instant) -> String { + use chrono::Local; + // This is a placeholder - in real impl we'd track actual wall-clock time + Local::now().format("%H:%M:%S").to_string() +} + +/// Session completion summary for logging +#[derive(Debug, Clone, Serialize)] +pub struct SessionSummary { + pub instance_id: usize, + pub session_start: String, + pub session_end: String, + pub duration_seconds: u64, + pub total_requests: usize, + pub successful_requests: usize, + pub failed_requests: usize, + pub proxy_info: Option, + pub renewal_reason: String, // "task_limit", "request_limit", "error" +} \ No newline at end of file diff --git a/src/monitoring/mod.rs b/src/monitoring/mod.rs new file mode 100644 index 0000000..5824eb7 --- /dev/null +++ b/src/monitoring/mod.rs @@ -0,0 +1,78 @@ +// src/monitoring/mod.rs +//! Monitoring system for tracking scraper performance and health +//! +//! This module provides: +//! - Real-time metrics collection +//! - Web-based dashboard +//! - Session logging +//! - Minimal performance overhead + +pub mod metrics; +pub mod events; +pub mod service; +pub mod webserver; +pub mod logger; + +pub use events::{MonitoringEvent,RenewalReason, InstanceStatusChange}; +pub use metrics::{ConfigSnapshot, ProxyInfo, ProxyStatus}; +pub use service::{MonitoringService, MonitoringHandle}; +pub use webserver::WebServer; + +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; + +/// Initialize the complete monitoring system +pub async fn init_monitoring( + config_snapshot: ConfigSnapshot, + log_dir: PathBuf, + dashboard_port: u16, +) -> anyhow::Result<(MonitoringHandle, tokio::task::JoinHandle<()>)> { + // Create channel for events + let (tx, rx) = mpsc::unbounded_channel(); + + // Create monitoring service + let service = MonitoringService::new(config_snapshot, rx, log_dir); + let service_arc = Arc::new(RwLock::new(service)); + + // Start monitoring service task + let service_clone = Arc::clone(&service_arc); + let monitoring_task = tokio::spawn(async move { + println!("🚀 MONITORING TASK STARTED!"); + // Take ownership of the service + let mut service = { + let mut guard = service_clone.write().await; + std::mem::replace( + &mut *guard, + MonitoringService::new( + ConfigSnapshot { + max_parallel_instances: 0, + max_tasks_per_instance: 0, + enable_vpn_rotation: false, + max_requests_per_session: 0, + min_request_interval_ms: 0, + max_retry_attempts: 0, + }, + mpsc::unbounded_channel().1, + PathBuf::new(), + ), + ) + }; + + println!("✅ ABOUT TO RUN SERVICE!"); + service.run().await; + }); + + // Start web server + let webserver = WebServer::new(Arc::clone(&service_arc), dashboard_port); + tokio::spawn(async move { + if let Err(e) = webserver.run().await { + eprintln!("Web server error: {}", e); + } + }); + + // Create handle for emitting events + let handle = MonitoringHandle::new(tx); + + Ok((handle, monitoring_task)) +} \ No newline at end of file diff --git a/src/monitoring/service.rs b/src/monitoring/service.rs new file mode 100644 index 0000000..496ab97 --- /dev/null +++ b/src/monitoring/service.rs @@ -0,0 +1,341 @@ +// src/monitoring/service.rs +use super::events::*; +use super::metrics::*; +use super::logger::SessionLogger; +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::Instant; +use tokio::sync::{mpsc, RwLock}; +use chrono::Local; + +const MAX_LOGS: usize = 100; + +/// Monitoring service that collects events and maintains state +pub struct MonitoringService { + state: Arc>, + config: ConfigSnapshot, + logs: Arc>>, + session_logger: Arc, + event_rx: mpsc::UnboundedReceiver, +} + +impl MonitoringService { + pub fn new( + config: ConfigSnapshot, + event_rx: mpsc::UnboundedReceiver, + log_dir: std::path::PathBuf, + ) -> Self { + Self { + state: Arc::new(RwLock::new(MonitoringState::new())), + config, + logs: Arc::new(RwLock::new(VecDeque::with_capacity(MAX_LOGS))), + session_logger: Arc::new(SessionLogger::new(log_dir)), + event_rx, + } + } + + /// Get current dashboard state for web clients + pub async fn get_dashboard_state(&self) -> DashboardState { + let state = self.state.read().await; + let logs = self.logs.read().await; + state.to_dashboard_state( + self.config.clone(), + logs.iter().cloned().collect(), + ) + } + + /// Main event processing loop + pub async fn run(mut self) { + while let Some(event) = self.event_rx.recv().await { + self.process_event(event).await; + } + } + + async fn process_event(&self, event: MonitoringEvent) { + match event { + MonitoringEvent::PoolInitialized { pool_size, with_proxy, with_rotation } => { + self.log_info(format!( + "Pool initialized: {} instances, proxy={}, rotation={}", + pool_size, with_proxy, with_rotation + )).await; + } + + MonitoringEvent::InstanceCreated { instance_id, max_tasks, proxy } => { + let mut state = self.state.write().await; + state.instances.insert( + instance_id, + InstanceState { + id: instance_id, + status: InstanceStatus::Idle, + current_task: None, + tasks_current_session: 0, + tasks_max: max_tasks, + session_requests: 0, + total_requests: 0, + success_count: 0, + failure_count: 0, + connected_proxy: proxy.clone(), + last_activity: Instant::now(), + }, + ); + + if let Some(proxy_info) = proxy { + state.proxies.entry(proxy_info.container_name.clone()).or_insert_with(|| { + ProxyState { + container_name: proxy_info.container_name.clone(), + ip_address: proxy_info.ip_address.clone(), + port: proxy_info.port, + status: ProxyStatus::Connected, + instances_using: vec![instance_id], + } + }).instances_using.push(instance_id); + } + + self.log_info(format!("Instance #{} created", instance_id)).await; + } + + MonitoringEvent::InstanceStatusChanged { instance_id, status } => { + let mut state = self.state.write().await; + if let Some(inst) = state.instances.get_mut(&instance_id) { + inst.status = match status { + InstanceStatusChange::Idle => InstanceStatus::Idle, + InstanceStatusChange::Active => InstanceStatus::Active, + InstanceStatusChange::Renewing => InstanceStatus::Renewing, + InstanceStatusChange::Error(_) => InstanceStatus::Error, + }; + inst.last_activity = Instant::now(); + } + } + + MonitoringEvent::TaskStarted { instance_id, url } => { + let mut state = self.state.write().await; + if let Some(inst) = state.instances.get_mut(&instance_id) { + inst.status = InstanceStatus::Active; + inst.current_task = Some(url.clone()); + inst.last_activity = Instant::now(); + } + state.global.total_requests += 1; + + self.log_info(format!("Instance #{} started task: {}", instance_id, url)).await; + } + + MonitoringEvent::TaskCompleted { instance_id, success, duration_ms, error } => { + let mut state = self.state.write().await; + if let Some(inst) = state.instances.get_mut(&instance_id) { + inst.current_task = None; + inst.status = InstanceStatus::Idle; + inst.total_requests += 1; + inst.last_activity = Instant::now(); + + if success { + inst.success_count += 1; + state.global.successful_requests += 1; + } else { + inst.failure_count += 1; + state.global.failed_requests += 1; + } + } + + if success { + self.log_info(format!( + "Instance #{} completed task in {}ms", + instance_id, duration_ms + )).await; + } else { + self.log_error(format!( + "Instance #{} failed task: {}", + instance_id, + error.unwrap_or_else(|| "unknown error".to_string()) + )).await; + } + } + + MonitoringEvent::NavigationTimeout { instance_id, url } => { + let mut state = self.state.write().await; + state.global.navigation_timeouts += 1; + + self.log_warn(format!( + "Instance #{} navigation timeout: {}", + instance_id, url + )).await; + } + + MonitoringEvent::BotDetectionTriggered { instance_id, url } => { + let mut state = self.state.write().await; + state.global.bot_detection_hits += 1; + + self.log_warn(format!( + "Instance #{} bot detection triggered: {}", + instance_id, url + )).await; + } + + MonitoringEvent::SessionStarted { instance_id, proxy } => { + let mut state = self.state.write().await; + if let Some(inst) = state.instances.get_mut(&instance_id) { + inst.session_requests = 0; + inst.tasks_current_session = 0; + inst.connected_proxy = proxy; + inst.last_activity = Instant::now(); + } + + self.log_info(format!("Instance #{} started new session", instance_id)).await; + } + + MonitoringEvent::SessionRenewed { instance_id, old_request_count, reason, new_proxy } => { + // Log the completed session + let session_summary = { + let state = self.state.read().await; + if let Some(inst) = state.instances.get(&instance_id) { + Some(SessionSummary { + instance_id, + session_start: "N/A".to_string(), // We'd need to track this + session_end: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), + duration_seconds: 0, // We'd need to track session start time + total_requests: old_request_count, + successful_requests: inst.success_count, + failed_requests: inst.failure_count, + proxy_info: inst.connected_proxy.clone(), + renewal_reason: reason.to_string(), + }) + } else { + None + } + }; + + if let Some(summary) = session_summary { + self.session_logger.log_session(&summary).await; + } + + // Update state for new session + let mut state = self.state.write().await; + if let Some(inst) = state.instances.get_mut(&instance_id) { + inst.session_requests = 0; + inst.tasks_current_session = 0; + inst.connected_proxy = new_proxy; + inst.last_activity = Instant::now(); + } + state.global.session_renewals += 1; + + self.log_info(format!( + "Instance #{} renewed session (reason: {}, {} requests)", + instance_id, reason, old_request_count + )).await; + } + + MonitoringEvent::SessionRequestIncremented { instance_id, new_count } => { + let mut state = self.state.write().await; + if let Some(inst) = state.instances.get_mut(&instance_id) { + inst.session_requests = new_count; + inst.last_activity = Instant::now(); + } + } + + MonitoringEvent::ProxyConnected { container_name, ip_address, port } => { + let mut state = self.state.write().await; + state.proxies.insert( + container_name.clone(), + ProxyState { + container_name: container_name.clone(), + ip_address: ip_address.clone(), + port, + status: ProxyStatus::Connected, + instances_using: vec![], + }, + ); + + self.log_info(format!( + "Proxy {} connected: {}:{}", + container_name, ip_address, port + )).await; + } + + MonitoringEvent::ProxyFailed { container_name, error } => { + let mut state = self.state.write().await; + if let Some(proxy) = state.proxies.get_mut(&container_name) { + proxy.status = ProxyStatus::Disconnected; + } + state.global.proxy_failures += 1; + + self.log_error(format!( + "Proxy {} failed: {}", + container_name, error + )).await; + } + + MonitoringEvent::ProxyRotated { instance_id, old_proxy, new_proxy } => { + self.log_info(format!( + "Instance #{} rotated proxy: {} -> {}", + instance_id, + old_proxy.unwrap_or_else(|| "none".to_string()), + new_proxy + )).await; + } + + MonitoringEvent::RotationTriggered { reason } => { + let mut state = self.state.write().await; + state.global.rotation_events += 1; + + self.log_info(format!("Pool rotation triggered: {}", reason)).await; + } + + MonitoringEvent::LogMessage { level, message } => { + match level { + crate::monitoring::events::LogLevel::Info => self.log_info(message).await, + crate::monitoring::events::LogLevel::Warn => self.log_warn(message).await, + crate::monitoring::events::LogLevel::Error => self.log_error(message).await, + } + } + } + } + + async fn log_info(&self, message: String) { + self.add_log(LogEntry { + timestamp: Local::now().format("%H:%M:%S").to_string(), + level: super::metrics::LogLevel::Info, + message, + }).await; + } + + async fn log_warn(&self, message: String) { + self.add_log(LogEntry { + timestamp: Local::now().format("%H:%M:%S").to_string(), + level: super::metrics::LogLevel::Warn, + message, + }).await; + } + + async fn log_error(&self, message: String) { + self.add_log(LogEntry { + timestamp: Local::now().format("%H:%M:%S").to_string(), + level: super::metrics::LogLevel::Error, + message, + }).await; + } + + async fn add_log(&self, entry: LogEntry) { + let mut logs = self.logs.write().await; + if logs.len() >= MAX_LOGS { + logs.pop_front(); + } + logs.push_back(entry); + } +} + +/// Handle for emitting monitoring events +#[derive(Clone)] +pub struct MonitoringHandle { + tx: mpsc::UnboundedSender, +} + +impl MonitoringHandle { + pub fn new(tx: mpsc::UnboundedSender) -> Self { + Self { tx } + } + + /// Emit a monitoring event (non-blocking) + pub fn emit(&self, event: MonitoringEvent) { + // Ignore send errors (monitoring should never block application) + let _ = self.tx.send(event); + } +} \ No newline at end of file diff --git a/src/monitoring/webserver.rs b/src/monitoring/webserver.rs new file mode 100644 index 0000000..dad7fb3 --- /dev/null +++ b/src/monitoring/webserver.rs @@ -0,0 +1,77 @@ +// src/monitoring/webserver.rs +use super::service::MonitoringService; +use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + State, + }, + response::{Html, IntoResponse, Response}, + routing::get, + Router, +}; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio::time::{interval, Duration}; + +const UPDATE_INTERVAL_MS: u64 = 1000; // 1 second updates + +pub struct WebServer { + service: Arc>, + port: u16, +} + +impl WebServer { + pub fn new(service: Arc>, port: u16) -> Self { + Self { service, port } + } + + pub async fn run(self) -> anyhow::Result<()> { + let app = Router::new() + .route("/", get(dashboard_handler)) + .route("/ws", get(websocket_handler)) + .with_state(self.service); + + let addr = format!("0.0.0.0:{}", self.port); + println!("📊 Dashboard available at: http://localhost:{}", self.port); + + let listener = tokio::net::TcpListener::bind(&addr).await?; + axum::serve(listener, app).await?; + + Ok(()) + } +} + +async fn dashboard_handler() -> impl IntoResponse { + Html(include_str!("dashboard.html")) +} + +async fn websocket_handler( + ws: WebSocketUpgrade, + State(service): State>>, +) -> Response { + ws.on_upgrade(|socket| handle_socket(socket, service)) +} + +async fn handle_socket(mut socket: WebSocket, service: Arc>) { + let mut ticker = interval(Duration::from_millis(UPDATE_INTERVAL_MS)); + + loop { + ticker.tick().await; + + let service_guard = service.read().await; + let state = service_guard.get_dashboard_state().await; + drop(service_guard); + + match serde_json::to_string(&state) { + Ok(json) => { + if socket.send(Message::Text(json)).await.is_err() { + break; // Client disconnected + } + } + Err(e) => { + eprintln!("Failed to serialize dashboard state: {}", e); + break; + } + } + } +} \ No newline at end of file diff --git a/src/scraper/docker_vpn_proxy.rs b/src/scraper/docker_vpn_proxy.rs index 4d68cfb..acb8a76 100644 --- a/src/scraper/docker_vpn_proxy.rs +++ b/src/scraper/docker_vpn_proxy.rs @@ -342,6 +342,25 @@ impl DockerVpnProxyPool { } Ok(()) } + + /// Get ProxyInfo for monitoring dashboard + pub fn get_proxy_info(&self, index: usize) -> Option { + if index >= self.container_names.len() { + return None; + } + + Some(crate::monitoring::ProxyInfo { + container_name: self.container_names[index].clone(), + ip_address: "127.0.0.1".to_string(), // SOCKS5 proxy on localhost + port: self.proxy_ports[index], + status: crate::monitoring::ProxyStatus::Connected, + }) + } + + /// Get container name by index + pub fn get_container_name(&self, index: usize) -> Option { + self.container_names.get(index).cloned() + } } pub async fn cleanup_all_proxy_containers() -> Result<()> { @@ -393,4 +412,4 @@ pub async fn cleanup_all_proxy_containers() -> Result<()> { } Ok(()) -} +} \ No newline at end of file diff --git a/src/scraper/helpers.rs b/src/scraper/helpers.rs new file mode 100644 index 0000000..34365ac --- /dev/null +++ b/src/scraper/helpers.rs @@ -0,0 +1,14 @@ +use rand::rngs::StdRng; +use rand::prelude::{Rng, SeedableRng, IndexedRandom}; + +/// Send-safe random range +pub fn random_range(min: u64, max: u64) -> u64 { + let mut rng = StdRng::from_rng(&mut rand::rng()); + rng.random_range(min..max) +} + +/// Send-safe random choice +pub fn choose_random(items: &[T]) -> T { + let mut rng = StdRng::from_rng(&mut rand::rng()); + items.choose(&mut rng).unwrap().clone() +} \ No newline at end of file diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index 1c0f399..4f3ee99 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -1,2 +1,3 @@ pub mod webdriver; -pub mod docker_vpn_proxy; \ No newline at end of file +pub mod docker_vpn_proxy; +pub mod helpers; \ No newline at end of file diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index 97e201f..dfc378b 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -1,10 +1,8 @@ // src/scraper/webdriver.rs +use super::helpers::*; use anyhow::{anyhow, Context, Result}; use fantoccini::{Client, ClientBuilder}; -use rand::seq::{IndexedRandom}; -use rand::rngs::ThreadRng; -use rand::Rng; // for the RNG trait use serde_json::{Map, Value}; use std::pin::Pin; use std::process::Stdio; @@ -16,6 +14,7 @@ use tokio::task::JoinHandle; use tokio::sync::{Mutex, Semaphore}; use tokio::time::{sleep, timeout, Duration}; use crate::scraper::docker_vpn_proxy::{DockerVpnProxyPool}; +use crate::Config; /// Manages a pool of ChromeDriver instances for parallel scraping with optional VPN binding. pub struct ChromeDriverPool { @@ -30,25 +29,28 @@ pub struct ChromeDriverPool { last_request_time: Arc>, min_request_interval_ms: u64, + + monitoring: Option, } impl ChromeDriverPool { /// Creates a new pool without any proxy (direct connection). - pub async fn _new(pool_size: usize) -> Result { - Self::new_with_proxy_and_task_limit(pool_size, None, 0).await + pub async fn _new(config: &Config, monitoring: Option,) -> Result { + Self::new_with_proxy_and_task_limit(None, config, monitoring).await } /// Creates a new pool with task-per-instance limit but no proxy. - pub async fn _new_with_task_limit(pool_size: usize, max_tasks_per_instance: usize) -> Result { - Self::new_with_proxy_and_task_limit(pool_size, None, max_tasks_per_instance).await + pub async fn _new_with_task_limit(config: &Config, monitoring: Option,) -> Result { + Self::new_with_proxy_and_task_limit(None, config, monitoring).await } /// Creates a new pool where each Chrome instance uses a different SOCKS5 proxy from the Docker pool. pub async fn new_with_proxy( - pool_size: usize, proxy_pool: Option>, + config: &Config, + monitoring: Option, ) -> Result { - Self::new_with_proxy_and_task_limit(pool_size, proxy_pool, 0).await + Self::new_with_proxy_and_task_limit(proxy_pool, config, monitoring).await } /// Full constructor: supports proxy + task limiting + rotation. @@ -62,10 +64,13 @@ impl ChromeDriverPool { /// /// Uses the minimum of these constraints to determine actual pool size. pub async fn new_with_proxy_and_task_limit( - pool_size_limit: usize, proxy_pool: Option>, - max_tasks_per_instance: usize, + config: &Config, + monitoring: Option, ) -> Result { + let pool_size_limit = config.max_parallel_instances; + let task_per_instance_limit = config.max_tasks_per_instance; + // Determine actual pool size based on available resources let actual_pool_size = if let Some(ref pp) = proxy_pool { let available_proxies = pp.num_proxies(); @@ -79,7 +84,7 @@ impl ChromeDriverPool { } // Rotation is enabled when task limiting is active - let rotation_enabled = max_tasks_per_instance > 0; + let rotation_enabled = task_per_instance_limit > 0; let mut instances = Vec::with_capacity(actual_pool_size); @@ -102,13 +107,61 @@ impl ChromeDriverPool { let instance = ChromeInstance::new( proxy_pool.clone(), // Clone the Arc i, // This instance's proxy index - max_tasks_per_instance + config, + monitoring.clone(), ).await?; crate::util::logger::log_info(&format!(" Instance {} ready", i + 1)).await; instances.push(Arc::new(Mutex::new(instance))); } + // Emit instance created events + for (i, instance) in instances.iter().enumerate() { + if let Some(ref mon) = monitoring { + let guard = instance.lock().await; + + // Extract proxy info if available + let proxy_info = if let Some(ref pp) = proxy_pool { + pp.get_proxy_info(i % pp.num_proxies()) + } else { + guard.proxy_url.as_ref().and_then(|url| { + // Parse proxy URL manually if no pool + // Format: socks5://localhost:10801 + if let Some(port_str) = url.split(':').last() { + if let Ok(port) = port_str.parse::() { + return Some(crate::monitoring::ProxyInfo { + container_name: format!("proxy-{}", i), + ip_address: "127.0.0.1".to_string(), + port, + status: crate::monitoring::ProxyStatus::Connected, + }); + } + } + None + }) + }; + + mon.emit(crate::monitoring::MonitoringEvent::InstanceCreated { + instance_id: i, + max_tasks: guard.max_tasks_per_instance, + proxy: proxy_info.clone(), // ✅ Now includes actual proxy info + }); + + // Also emit ProxyConnected event if proxy exists + if let Some(ref proxy) = proxy_info { + mon.emit(crate::monitoring::MonitoringEvent::ProxyConnected { + container_name: proxy.container_name.clone(), + ip_address: proxy.ip_address.clone(), + port: proxy.port, + }); + } + + drop(guard); + } + } + + let min_request_interval_ms = config.min_request_interval_ms; + Ok(Self { instances, semaphore: Arc::new(Semaphore::new(actual_pool_size)), @@ -116,7 +169,8 @@ impl ChromeDriverPool { rotation_enabled, next_instance: Arc::new(Mutex::new(0)), last_request_time: Arc::new(Mutex::new(Instant::now())), - min_request_interval_ms: 300, + min_request_interval_ms, + monitoring, }) } @@ -145,13 +199,25 @@ impl ChromeDriverPool { } } + let random_index = random_range(0, self.instances.len() as u64) as usize; // Index-Auswahl (vereinfacht, siehe unten für vollständige Rotation) let index = if self.rotation_enabled { self.get_rotated_index().await? } else { - rand::rng().random_range(0..self.instances.len()) + random_index }; + if let Some(ref mon) = self.monitoring { + mon.emit(crate::monitoring::MonitoringEvent::TaskStarted { + instance_id: index, + url: url.clone(), + }); + mon.emit(crate::monitoring::MonitoringEvent::InstanceStatusChanged { + instance_id: index, + status: crate::monitoring::InstanceStatusChange::Active, + }); + } + let instance = &self.instances[index]; let mut guard = instance.lock().await; @@ -168,6 +234,8 @@ impl ChromeDriverPool { drop(guard); // Lock freigeben vor Navigation + let start_time = Instant::now(); + // Navigation mit Timeout let navigation_result = timeout( Duration::from_secs(60), @@ -176,8 +244,20 @@ impl ChromeDriverPool { match navigation_result { Ok(Ok(_)) => { + if let Some(ref mon) = self.monitoring { + mon.emit(crate::monitoring::MonitoringEvent::TaskCompleted { + instance_id: index, + success: navigation_result.is_ok(), + duration_ms: start_time.elapsed().as_millis() as u64, + error: navigation_result.as_ref().err().map(|e| e.to_string()), + }); + mon.emit(crate::monitoring::MonitoringEvent::InstanceStatusChanged { + instance_id: index, + status: crate::monitoring::InstanceStatusChange::Idle, + }); + } crate::util::logger::log_info(&format!("✓ Navigated to {}", url)).await; - + // Parse-Funktion ausführen parse(client).await } @@ -186,6 +266,13 @@ impl ChromeDriverPool { Err(anyhow!("Navigation failed: {}", e)) } Err(_) => { + if let Some(ref mon) = self.monitoring { + mon.emit(crate::monitoring::MonitoringEvent::NavigationTimeout { + instance_id: index, + url: url.clone(), + }); + } + crate::util::logger::log_error("Navigation timeout (60s)").await; Err(anyhow!("Navigation timeout")) } @@ -285,18 +372,21 @@ pub struct ChromeInstance { max_requests_per_session: usize, // z.B. 25 proxy_pool: Option>, // Referernce to the proxy pool - current_proxy_index: Arc>, // Current proxy index in use + current_proxy_index: Arc>, // Current proxy index in use + + instance_id: usize, + monitoring: Option, } impl ChromeInstance { - pub async fn new( - proxy_pool: Option>, - initial_proxy_index: usize, - max_tasks_per_instance: usize) -> Result { + pub async fn new(proxy_pool: Option>, instance_id: usize, config: &Config, monitoring: Option) -> Result { let (base_url, process, stderr_handle) = Self::spawn_chromedriver().await?; // Get proxy URL if proxy pool is provided - let proxy_url = proxy_pool.as_ref().map(|pp| pp.get_proxy_url(initial_proxy_index)); + let proxy_url = proxy_pool.as_ref().map(|pp| pp.get_proxy_url(instance_id)); + + let max_tasks_per_instance = config.max_tasks_per_instance; + let max_requests_per_session = config.max_requests_per_session; Ok(Self { base_url, @@ -308,16 +398,21 @@ impl ChromeInstance { current_session: Arc::new(Mutex::new(None)), session_request_count: Arc::new(Mutex::new(0)), - max_requests_per_session: 25, // Konfigurierbar machen! + max_requests_per_session, proxy_pool, - current_proxy_index: Arc::new(Mutex::new(initial_proxy_index)), + current_proxy_index: Arc::new(Mutex::new(instance_id)), + + instance_id, + monitoring, }) } pub async fn get_or_renew_session(&self) -> Result { let mut session_opt = self.current_session.lock().await; let mut request_count = self.session_request_count.lock().await; + + let old_request_count = *request_count; // Session erneuern wenn: // 1. Keine Session vorhanden @@ -325,12 +420,20 @@ impl ChromeInstance { let needs_renewal = session_opt.is_none() || *request_count >= self.max_requests_per_session; if needs_renewal { + if let Some(ref mon) = self.monitoring { + mon.emit(crate::monitoring::MonitoringEvent::InstanceStatusChanged { + instance_id: self.instance_id, + status: crate::monitoring::InstanceStatusChange::Renewing, + }); + } + // Alte Session schließen if let Some(old_session) = session_opt.take() { crate::util::logger::log_info("Closing old session").await; let _ = old_session.close().await; // Kurze Pause zwischen Sessions - sleep(Duration::from_millis(rand::rng().random_range(500..1000))).await; + let random_delay = random_range(500, 1000); + sleep(Duration::from_millis(random_delay)).await; } // Neue Session mit frischem User-Agent erstellen @@ -342,6 +445,41 @@ impl ChromeInstance { let new_session = self.create_fresh_session().await?; *session_opt = Some(new_session.clone()); *request_count = 0; + + if let Some(ref mon) = self.monitoring { + let reason = if *request_count >= self.max_requests_per_session { + crate::monitoring::RenewalReason::RequestLimit + } else { + crate::monitoring::RenewalReason::TaskLimit + }; + + // Get updated proxy info + let new_proxy_info = if let Some(ref pp) = self.proxy_pool { + let proxy_idx = *self.current_proxy_index.lock().await; + pp.get_proxy_info(proxy_idx) + } else { + self.proxy_url.as_ref().and_then(|url| { + if let Some(port_str) = url.split(':').last() { + if let Ok(port) = port_str.parse::() { + return Some(crate::monitoring::ProxyInfo { + container_name: format!("proxy-{}", self.instance_id), + ip_address: "127.0.0.1".to_string(), + port, + status: crate::monitoring::ProxyStatus::Connected, + }); + } + } + None + }) + }; + + mon.emit(crate::monitoring::MonitoringEvent::SessionRenewed { + instance_id: self.instance_id, + old_request_count: *request_count, + reason: crate::monitoring::RenewalReason::RequestLimit, + new_proxy: new_proxy_info, + }); + } Ok(new_session) } else { @@ -412,11 +550,6 @@ impl ChromeInstance { caps.as_object().cloned().unwrap() } - pub async fn new_session(&self) -> Result { - // Für Backward-Compatibility, aber sollte get_or_renew_session() nutzen! - self.create_fresh_session().await - } - pub fn reset_task_count(&mut self) { self.task_count = 0; } @@ -491,52 +624,15 @@ impl ChromeInstance { Err(anyhow!("ChromeDriver failed to start within 30s")) } - fn chrome_args(&self) -> Map { - let user_agent = Self::chrome_user_agent(); - let mut args = vec![ - "--headless=new".to_string(), - "--disable-gpu".to_string(), - "--no-sandbox".to_string(), - "--disable-dev-shm-usage".to_string(), - "--disable-infobars".to_string(), - "--disable-extensions".to_string(), - "--disable-popup-blocking".to_string(), - "--disable-notifications".to_string(), - //"--disable-logging".to_string(), - "--disable-autofill".to_string(), - "--disable-sync".to_string(), - "--disable-default-apps".to_string(), - "--disable-translate".to_string(), - //"--window-size=1920,1080".to_string(), - "--disable-blink-features=AutomationControlled".to_string(), - format!("--user-agent={}", user_agent), - ]; - if let Some(ref proxy) = self.proxy_url { - let proxy = proxy.clone(); - let proxy_formatted = format!("--proxy-server={}", proxy); - args.push(proxy_formatted); - } - let caps = serde_json::json!({ - "goog:chromeOptions": { - "args": args, - "excludeSwitches": ["enable-logging", "enable-automation"], - "prefs": { - "profile.default_content_setting_values.notifications": 2 - } - } - }); - caps.as_object().cloned().unwrap() - } - pub fn chrome_user_agent() -> &'static str { static UAS: &[&str] = &[ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.91 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.6312.122 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.129 Safari/537.36", + "Mozilla/5.0 (Windows NT 11.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.91 Safari/537.36", ]; - - let mut rng = ThreadRng::default(); // non-deprecated RNG - *UAS.choose(&mut rng).unwrap() + let random_user_agent = choose_random(UAS); + random_user_agent } } diff --git a/src/util/logger.rs b/src/util/logger.rs index 122b34b..1142519 100644 --- a/src/util/logger.rs +++ b/src/util/logger.rs @@ -5,6 +5,8 @@ use tokio::sync::Mutex; use std::fs::{self, OpenOptions}; use std::io::Write; use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; static LOGGER: Lazy>> = Lazy::new(|| Mutex::new(None)); @@ -75,4 +77,84 @@ pub async fn log_warn(msg: &str) { pub async fn log_error(msg: &str) { log_detailed("ERROR", msg).await; -} \ No newline at end of file +} + +struct PoolLogger { + file: std::fs::File, + log_path: PathBuf, +} + +impl PoolLogger { + fn new(log_dir: &std::path::Path) -> std::io::Result { + fs::create_dir_all(log_dir)?; + let filename = format!("webdriver_{}.log", Local::now().format("%Y%m%d_%H%M%S")); + let log_path = log_dir.join(&filename); + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path)?; + Ok(Self { file, log_path }) + } + + async fn log(&mut self, msg: &str) { + let line = format!("[{}] {}\n", Local::now().format("%H:%M:%S"), msg); + let _ = self.file.write_all(line.as_bytes()); + let _ = self.file.flush(); + println!("{}", line.trim_end()); + } +} + +pub struct PoolMetrics { + pub total_requests: Arc, + pub successful_requests: Arc, + pub failed_requests: Arc, + pub session_renewals: Arc, + pub rotation_events: Arc, + pub retries: Arc, + + // IMPROVEMENT: Neue Metriken für besseres Monitoring + pub navigation_timeouts: Arc, + pub bot_detection_hits: Arc, + pub proxy_failures: Arc, +} + +impl PoolMetrics { + pub fn new() -> Self { + Self { + total_requests: Arc::new(AtomicUsize::new(0)), + successful_requests: Arc::new(AtomicUsize::new(0)), + failed_requests: Arc::new(AtomicUsize::new(0)), + session_renewals: Arc::new(AtomicUsize::new(0)), + rotation_events: Arc::new(AtomicUsize::new(0)), + retries: Arc::new(AtomicUsize::new(0)), + navigation_timeouts: Arc::new(AtomicUsize::new(0)), + bot_detection_hits: Arc::new(AtomicUsize::new(0)), + proxy_failures: Arc::new(AtomicUsize::new(0)), + } + } + + pub async fn log_stats(&self) { + let total = self.total_requests.load(Ordering::Relaxed); + let success = self.successful_requests.load(Ordering::Relaxed); + // FIX: Prefix unused variable with underscore + let _failed = self.failed_requests.load(Ordering::Relaxed); + let renewals = self.session_renewals.load(Ordering::Relaxed); + let rotations = self.rotation_events.load(Ordering::Relaxed); + let retries = self.retries.load(Ordering::Relaxed); + let timeouts = self.navigation_timeouts.load(Ordering::Relaxed); + let bot_hits = self.bot_detection_hits.load(Ordering::Relaxed); + let proxy_fails = self.proxy_failures.load(Ordering::Relaxed); + + let success_rate = if total > 0 { + (success as f64 / total as f64) * 100.0 + } else { + 0.0 + }; + + crate::util::logger::log_info(&format!( + "Pool Metrics: {} total requests, {:.1}% success rate, {} renewals, {} rotations, {} retries, {} timeouts, {} bot detections, {} proxy failures", + total, success_rate, renewals, rotations, retries, timeouts, bot_hits, proxy_fails + )).await; + } +} +