added hard reset for navigation timeout after 3 hours

This commit is contained in:
2025-12-22 00:31:28 +01:00
parent c01b47000f
commit fb0876309f
12 changed files with 1036 additions and 264 deletions

View File

@@ -27,6 +27,9 @@ pub struct Config {
#[serde(default = "default_max_retry_attempts")]
pub max_retry_attempts: u32,
#[serde(default = "default_proxy_instances_per_certificate")]
pub proxy_instances_per_certificate: Option<usize>,
}
fn default_enable_vpn_rotation() -> bool {
@@ -47,6 +50,10 @@ fn default_min_request_interval_ms() -> u64 {
fn default_max_retry_attempts() -> u32 { 3 }
fn default_proxy_instances_per_certificate() -> Option<usize> {
Some(1)
}
impl Default for Config {
fn default() -> Self {
Self {
@@ -59,6 +66,7 @@ impl Default for Config {
min_request_interval_ms: default_min_request_interval_ms(),
max_retry_attempts: default_max_retry_attempts(),
enable_vpn_rotation: false,
proxy_instances_per_certificate: default_proxy_instances_per_certificate(),
}
}
}
@@ -112,6 +120,11 @@ impl Config {
.parse()
.context("Failed to parse MAX_RETRY_ATTEMPTS as u32")?;
let proxy_instances_per_certificate: Option<usize> = match dotenvy::var("PROXY_INSTANCES_PER_CERTIFICATE") {
Ok(val) => Some(val.parse().context("Failed to parse PROXY_INSTANCES_PER_CERTIFICATE as usize")?),
Err(_) => Some(1),
};
Ok(Self {
economic_start_date,
corporate_start_date,
@@ -122,6 +135,7 @@ impl Config {
max_requests_per_session,
min_request_interval_ms,
max_retry_attempts,
proxy_instances_per_certificate,
})
}

View File

@@ -78,7 +78,7 @@ pub async fn run_full_update(
}
logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await;
let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag).await?;
let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, _config, &None).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
if !shutdown_flag.load(Ordering::SeqCst) {

View File

@@ -1,16 +1,18 @@
// src/corporate/update_parallel.rs - UPDATED WITH DATA INTEGRITY FIXES
// PARALLELIZED VERSION with atomic commits and validation
// src/corporate/update_parallel.rs - FIXED: Proper Hard Reset Implementation
//
// Key improvements over original:
// - Page validation to prevent stale content extraction
// - Shutdown-aware task processing
// - Better error recovery with browser state cleanup
// - All original fsync and checkpoint logic preserved
// Critical fixes:
// 1. Hard reset actually performed (no premature break)
// 2. Error counter reset after hard reset
// 3. Per-ISIN status tracking (not per-company)
// 4. Proper task draining before reset
// 5. Queue rebuilding after reset
use super::{types::*, yahoo::*, helpers::*};
use crate::util::directories::DataPaths;
use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool;
use crate::scraper::hard_reset::perform_hard_reset;
use crate::config::Config;
use tokio::sync::mpsc;
use tokio::io::AsyncWriteExt;
@@ -36,17 +38,13 @@ struct CompanyProcessResult {
is_update: bool,
}
/// UPDATED: Abort-safe incremental JSONL persistence with validation
///
/// New safety features:
/// - Page validation before extraction
/// - Shutdown checks at all critical points
/// - Browser state cleanup on errors
/// - All writes still atomic with fsync
/// Abort-safe incremental JSONL persistence with proper hard reset handling
pub async fn build_companies_jsonl_streaming_parallel(
paths: &DataPaths,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
config: &Config,
monitoring: &Option<crate::monitoring::MonitoringHandle>,
) -> anyhow::Result<usize> {
// Configuration constants
const CHECKPOINT_INTERVAL: usize = 50;
@@ -54,9 +52,19 @@ pub async fn build_companies_jsonl_streaming_parallel(
const FSYNC_INTERVAL_SECS: u64 = 10;
const CONCURRENCY_LIMIT: usize = 100;
// Create hard reset controller
let reset_controller = pool.get_reset_controller();
// Wrap pool in mutex for potential replacement
let pool_mutex = Arc::new(tokio::sync::Mutex::new(Arc::clone(pool)));
// Synchronization for hard reset
let reset_in_progress = Arc::new(tokio::sync::Mutex::new(false));
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
let securities_path = corporate_path.join("common_stocks.json");
let securities_path_cloned = securities_path.clone();
if !securities_path.exists() {
logger::log_warn("No common_stocks.json found").await;
@@ -137,9 +145,9 @@ pub async fn build_companies_jsonl_streaming_parallel(
let companies_path_clone = companies_path.clone();
let log_path_clone = log_path.clone();
let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone()));
let existing_companies_writer_clone = Arc::clone(&existing_companies_writer);
let write_tx_for_writer = write_tx.clone();
let writer_task = tokio::spawn(async move {
let mut log_file = log_file_init;
let mut writes_since_fsync = 0;
@@ -278,112 +286,299 @@ pub async fn build_companies_jsonl_streaming_parallel(
(count, new_count, updated_count)
});
// === PARALLEL PROCESSING PHASE ===
logger::log_info(&format!(
"Starting parallel processing of {} companies (concurrency limit: {})",
securities.len(),
CONCURRENCY_LIMIT
)).await;
let mut processing_tasks = FuturesUnordered::new();
let mut processed = 0;
// === MAIN PROCESSING LOOP ===
let total = securities.len();
logger::log_info(&format!("Processing {} companies with concurrency limit {}", total, CONCURRENCY_LIMIT)).await;
for (name, company_info) in securities.into_iter() {
// Check shutdown before creating new tasks
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected, stopping task creation").await;
break;
}
// Wait if we hit concurrency limit
while processing_tasks.len() >= CONCURRENCY_LIMIT {
if let Some(result) = processing_tasks.next().await {
match result {
Ok(Ok(Some(company_result))) => {
let company_result: CompanyProcessResult = company_result;
let _ = write_tx_for_writer.send(LogCommand::Write(company_result.company)).await?;
processed += 1;
}
Ok(Ok(None)) => {
processed += 1;
}
Ok(Err(e)) => {
logger::log_warn(&format!("Company processing error: {}", e)).await;
processed += 1;
}
Err(e) => {
logger::log_error(&format!("Task panic: {}", e)).await;
processed += 1;
}
}
}
let mut tasks = FuturesUnordered::new();
let mut pending = securities.into_iter().collect::<Vec<_>>();
let mut processed = 0;
let mut hard_reset_count = 0;
// Spawn initial batch
for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) {
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
}
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
// Spawn new task
let pool = pool.clone();
let shutdown_flag = shutdown_flag.clone();
let existing_entry = existing_companies.get(&name).cloned();
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing_entry,
&pool,
&shutdown_flag
).await
});
processing_tasks.push(task);
if processed % 10 == 0 && processed > 0 {
logger::log_info(&format!("Progress: {}/{} companies processed", processed, total)).await;
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
}
// Wait for remaining tasks
logger::log_info(&format!(
"Waiting for {} remaining tasks to complete...",
processing_tasks.len()
)).await;
while let Some(result) = processing_tasks.next().await {
// Process results and spawn new tasks
while let Some(task_result) = tasks.next().await {
// Check for shutdown
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected during final task wait").await;
logger::log_warn("Shutdown signal received, stopping processing").await;
break;
}
match result {
Ok(Ok(Some(company_result))) => {
if write_tx_for_writer.send(LogCommand::Write(company_result.company)).await.is_err() {
logger::log_error("Writer task died").await;
break;
}
match task_result {
Ok(Ok(Some(result))) => {
// Success: send to writer
let _ = write_tx_for_writer.send(LogCommand::Write(result.company)).await;
processed += 1;
// Log progress every 100 companies
if processed % 100 == 0 {
logger::log_info(&format!(
"Progress: {}/{} companies processed ({} resets)",
processed,
total,
hard_reset_count
)).await;
}
// Spawn next task if available
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
}
Ok(Ok(None)) => {
// No result (shutdown or skip)
processed += 1;
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
}
Ok(Err(e)) => {
logger::log_warn(&format!("Company processing error: {}", e)).await;
processed += 1;
let error_msg = e.to_string();
if error_msg.contains("HARD_RESET_REQUIRED") {
// ✅ FIX: Don't break, perform actual hard reset
// Check if reset already in progress (race condition protection)
let mut reset_lock = reset_in_progress.lock().await;
if *reset_lock {
logger::log_info("Hard reset already in progress, skipping duplicate").await;
processed += 1;
continue;
}
*reset_lock = true;
drop(reset_lock); // Release lock during reset
logger::log_error("🔴 HARD RESET THRESHOLD REACHED - INITIATING RESET SEQUENCE").await;
logger::log_warn("Draining active tasks before hard reset...").await;
// Save remaining pending count
let remaining_count = pending.len();
// Stop spawning new tasks
pending.clear();
// Wait for all active tasks to complete
let mut drained = 0;
while let Some(_) = tasks.next().await {
drained += 1;
if drained % 10 == 0 {
logger::log_info(&format!("Drained {} tasks...", drained)).await;
}
}
logger::log_info(&format!(
"All tasks drained ({} active). {} companies need reprocessing.",
drained,
remaining_count
)).await;
// Perform the actual hard reset
match perform_hard_reset(&pool_mutex, config, paths, monitoring, shutdown_flag).await {
Ok(()) => {
logger::log_info("✅ Hard reset completed successfully").await;
hard_reset_count += 1;
// ✅ FIX: Reset the error counter
{
let pool_guard = pool_mutex.lock().await;
let current_pool = Arc::clone(&*pool_guard);
current_pool.get_reset_controller().reset();
}
logger::log_info("✓ Error counter cleared").await;
// ✅ FIX: Rebuild pending list from existing_companies
// Only re-add companies that haven't been written yet
let written_companies = {
let companies = existing_companies_writer_clone.lock().await;
companies.keys().cloned().collect::<std::collections::HashSet<_>>()
};
// Create new pending list: all companies minus those already written
let all_companies_list: Vec<(String, CompanyInfo)> = {
// Need to reload securities since we cleared pending
let content = tokio::fs::read_to_string(&securities_path_cloned).await?;
let all_securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
all_securities.into_iter()
.filter(|(name, _)| !written_companies.contains(name))
.collect()
};
pending = all_companies_list;
logger::log_info(&format!(
"Restarting with {} remaining companies (out of {} total)",
pending.len(),
total
)).await;
// Respawn initial batch with NEW pool
for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) {
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
}
// Clear reset flag
let mut reset_lock = reset_in_progress.lock().await;
*reset_lock = false;
drop(reset_lock);
// ✅ Continue processing (don't spawn duplicate task)
continue;
}
Err(reset_err) => {
logger::log_error(&format!("Hard reset failed: {}", reset_err)).await;
// Clear reset flag
let mut reset_lock = reset_in_progress.lock().await;
*reset_lock = false;
drop(reset_lock);
// Exit if hard reset fails
break;
}
}
} else {
// Regular error
logger::log_warn(&format!("Company processing error: {}", error_msg)).await;
processed += 1;
// Spawn next task
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
}
}
Err(e) => {
// Task panic
logger::log_error(&format!("Task panic: {}", e)).await;
processed += 1;
// Spawn next task
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
}
}
}
logger::log_info("Main processing loop completed").await;
// Signal writer to finish
let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await;
let _ = write_tx_for_writer.send(LogCommand::Shutdown).await;
@@ -394,8 +589,8 @@ pub async fn build_companies_jsonl_streaming_parallel(
.unwrap_or((0, 0, 0));
logger::log_info(&format!(
"Completed: {} total companies ({} new, {} updated)",
final_count, final_new, final_updated
"Completed: {} total companies ({} new, {} updated, {} hard resets)",
final_count, final_new, final_updated, hard_reset_count
)).await;
Ok(final_count)
@@ -415,10 +610,25 @@ async fn scrape_with_retry(
if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow!("Aborted due to shutdown"));
}
if pool.should_perform_hard_reset() {
logger::log_error("HARD_RESET_REQUIRED detected before scrape attempt").await;
return Err(anyhow!("HARD_RESET_REQUIRED"));
}
match scrape_company_details_by_isin(pool, isin, shutdown_flag).await {
Ok(result) => return Ok(result),
Err(e) => {
// Check if this is a hard reset required error
let error_msg = e.to_string();
if error_msg.contains("HARD_RESET_REQUIRED") {
logger::log_error(&format!(
"Hard reset required error for ISIN {}, propagating immediately",
isin
)).await;
return Err(e); // Propagate immediately, don't retry
}
if retries >= max_retries {
logger::log_error(&format!(
"All {} retries exhausted for ISIN {}: {}",
@@ -443,7 +653,7 @@ async fn scrape_with_retry(
}
}
/// UPDATED: Process single company with validation and shutdown checks
/// Process single company with validation and shutdown checks
async fn process_single_company_validated(
name: String,
company_info: CompanyInfo,
@@ -485,7 +695,7 @@ async fn process_single_company_validated(
}
}
// Process each ISIN with validation
// ✅ FIX: Process each ISIN independently with per-ISIN status checking
for (isin, figi_tickers) in unique_isin_ticker_pairs {
// Check shutdown before each ISIN
if shutdown_flag.load(Ordering::SeqCst) {
@@ -506,9 +716,10 @@ async fn process_single_company_validated(
}
}
let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:"));
// ✅ FIX: Check if THIS SPECIFIC ISIN has Yahoo data
let has_yahoo_ticker_for_this_isin = tickers.iter().any(|t| t.starts_with("YAHOO:"));
if !has_yahoo_ticker {
if !has_yahoo_ticker_for_this_isin {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
match scrape_with_retry(pool, &isin, 3, shutdown_flag).await {
@@ -539,11 +750,24 @@ async fn process_single_company_validated(
logger::log_warn(&format!("Shutdown during scrape for ISIN {}", isin)).await;
break;
}
// Check if this is a hard reset required error
let error_msg = e.to_string();
if error_msg.contains("HARD_RESET_REQUIRED") {
logger::log_error(&format!(
"Hard reset required during ISIN {} processing, propagating error",
isin
)).await;
return Err(e); // ← CRITICAL: Propagate immediately
}
logger::log_warn(&format!(
"✗ Yahoo lookup error for ISIN {} (company: {}): {}",
isin, name, e
)).await;
// Continue with next ISIN
// ✅ FIX: Mark this ISIN as failed to enable retry
tickers.push("YAHOO:ERROR".to_string());
}
}
}
@@ -558,6 +782,11 @@ async fn process_single_company_validated(
return Ok(None);
}
if pool.should_perform_hard_reset() {
logger::log_error("HARD_RESET_REQUIRED detected during company processing").await;
return Err(anyhow!("HARD_RESET_REQUIRED"));
}
if !isin_tickers_map.is_empty() {
let company_entry = CompanyCrossPlatformInfo {
name: name.clone(),

View File

@@ -74,6 +74,11 @@ pub async fn scrape_company_details_by_isin(
logger::log_warn(&format!("Shutdown detected, skipping ISIN: {}", isin)).await;
return Ok(None);
}
if pool.should_perform_hard_reset() {
logger::log_warn("HARD_RESET_REQUIRED detected before starting ISIN scrape").await;
return Err(anyhow!("HARD_RESET_REQUIRED"));
}
let isin_owned = isin.to_string();
let shutdown_clone = Arc::clone(shutdown_flag);

View File

@@ -66,14 +66,18 @@ async fn main() -> Result<()> {
logger::log_info("Monitoring dashboard available at http://localhost:3030").await;
logger::init_debug_logger(paths.logs_dir()).await.ok();
logger::log_info("=== Event Backtest Engine Started ===").await;
logger::log_info("=== Economic Webscraper Started ===").await;
logger::log_info(&format!(
"Config → parallel_instances: {}, task_limit: {} vpn_rotation: {}",
"Config → parallel_instances: {}, task_limit: {} vpn_rotation: {} proxy_instances_per_certificate: {:?}",
config.max_parallel_instances,
config.max_tasks_per_instance,
config.enable_vpn_rotation
config.enable_vpn_rotation,
config.proxy_instances_per_certificate
)).await;
let number_proxy_instances_per_certificate = config.proxy_instances_per_certificate.unwrap_or(1);
// Simple shutdown flag
let shutdown_flag = Arc::new(AtomicBool::new(false));
@@ -94,7 +98,7 @@ async fn main() -> Result<()> {
None
} else {
logger::log_info(&format!("Found {} VPN servers starting Docker proxy containers", server_count)).await;
let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await?);
let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password, number_proxy_instances_per_certificate).await?);
logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await;
for i in 0..pp.num_proxies() {
@@ -115,10 +119,10 @@ async fn main() -> Result<()> {
};
// === Step 2: Initialize ChromeDriver pool ===
let pool_size = config.max_parallel_instances;
let pool_size_limit = config.max_parallel_instances;
let task_limit = config.max_tasks_per_instance;
logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", pool_size)).await;
logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", pool_size_limit)).await;
let pool = Arc::new(
if task_limit > 0 {
@@ -128,7 +132,7 @@ async fn main() -> Result<()> {
}
);
logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size)).await;
logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size_limit)).await;
// === Step 3: Ctrl+C handler ===
{
@@ -166,7 +170,7 @@ async fn main() -> Result<()> {
// === Step 4: Run scraping jobs ===
logger::log_info("--- Starting ECONOMIC data update ---").await;
economic::run_full_update(&config, &pool).await?;
//economic::run_full_update(&config, &pool).await?;
logger::log_info("Economic update completed").await;
if !shutdown_flag.load(Ordering::SeqCst) {

View File

@@ -22,6 +22,11 @@ pub enum MonitoringEvent {
instance_id: usize,
status: InstanceStatusChange,
},
InstanceSelected {
instance_id: usize,
half: usize,
},
// Task execution
TaskStarted {

View File

@@ -107,6 +107,10 @@ impl MonitoringService {
}
}
MonitoringEvent::InstanceSelected { instance_id, half } => {
self.log_info(format!("Instance #{} selected (half {})", instance_id, half)).await;
}
MonitoringEvent::TaskStarted { instance_id, url } => {
let mut state = self.state.write().await;
if let Some(inst) = state.instances.get_mut(&instance_id) {

View File

@@ -10,7 +10,16 @@ pub struct DockerVpnProxyPool {
}
impl DockerVpnProxyPool {
pub async fn new(ovpn_dir: &Path, username: String, password: String) -> Result<Self> {
pub async fn new(
ovpn_dir: &Path,
username: String,
password: String,
instances_per_ovpn: usize,
) -> Result<Self> {
if instances_per_ovpn == 0 {
return Err(anyhow!("instances_per_ovpn must be at least 1"));
}
// Count hostnames (subdirs in ovpn_dir)
let hostnames: Vec<_> = std::fs::read_dir(ovpn_dir)?
.filter_map(Result::ok)
@@ -23,14 +32,21 @@ impl DockerVpnProxyPool {
return Err(anyhow!("No VPN hostnames found in {:?}", ovpn_dir));
}
crate::util::logger::log_info(&format!("Found {} VPN hostnames", num_servers)).await;
// Calculate total containers: hostnames × instances_per_ovpn
let total_containers = num_servers * instances_per_ovpn;
let mut container_names = Vec::with_capacity(num_servers);
let mut proxy_ports = Vec::with_capacity(num_servers);
crate::util::logger::log_info(&format!(
"Found {} VPN hostnames × {} instances = {} total containers",
num_servers, instances_per_ovpn, total_containers
)).await;
let mut container_names = Vec::with_capacity(total_containers);
let mut proxy_ports = Vec::with_capacity(total_containers);
let base_port: u16 = 10800;
let mut port_counter = 0u16;
// === STEP 1: Start ALL containers first ===
for (i, hostname) in hostnames.iter().enumerate() {
for hostname in hostnames.iter() {
// Pick tcp443.ovpn if exists, else first .ovpn
let hostname_dir = ovpn_dir.join(hostname);
let mut ovpn_path: Option<PathBuf> = None;
@@ -48,48 +64,58 @@ impl DockerVpnProxyPool {
let ovpn_path = ovpn_path.ok_or_else(|| anyhow!("No .ovpn found for {}", hostname))?;
let name = format!("vpn-proxy-{}", i);
let port = base_port + i as u16 + 1;
// Spawn multiple instances for this .ovpn file
for instance_num in 0..instances_per_ovpn {
let name = format!("vpn-proxy-{}-{}", hostname, instance_num);
let port = base_port + port_counter + 1;
port_counter += 1;
// Clean up any existing container with the same name
let _ = Command::new("docker")
.args(["rm", "-f", &name])
.status()
.await;
// Clean up any existing container with the same name
let _ = Command::new("docker")
.args(["rm", "-f", &name])
.status()
.await;
// Run Docker container
let status = Command::new("docker")
.args([
"run", "-d",
"--name", &name,
"--cap-add=NET_ADMIN",
"--device", "/dev/net/tun",
"--sysctl", "net.ipv4.ip_forward=1",
"-v", &format!("{}:/vpn/config.ovpn", ovpn_path.display()),
"-e", &format!("VPN_USERNAME={}", username),
"-e", &format!("VPN_PASSWORD={}", password),
"-p", &format!("{}:1080", port),
"rust-vpn-proxy",
])
.status()
.await
.context("Failed to run Docker")?;
// Run Docker container
let status = Command::new("docker")
.args([
"run", "-d",
"--name", &name,
"--cap-add=NET_ADMIN",
"--device", "/dev/net/tun",
"--sysctl", "net.ipv4.ip_forward=1",
"-v", &format!("{}:/vpn/config.ovpn", ovpn_path.display()),
"-e", &format!("VPN_USERNAME={}", username),
"-e", &format!("VPN_PASSWORD={}", password),
"-p", &format!("{}:1080", port),
"rust-vpn-proxy",
])
.status()
.await
.context("Failed to run Docker")?;
if !status.success() {
return Err(anyhow!("Docker run failed for {}", name));
if !status.success() {
return Err(anyhow!("Docker run failed for {}", name));
}
crate::util::logger::log_info(&format!(
"Started container {} on port {} (using {})",
name, port, ovpn_path.file_name().unwrap().to_string_lossy()
)).await;
container_names.push(name);
proxy_ports.push(port);
}
crate::util::logger::log_info(&format!("Started container {} on port {} (waiting for VPN...)", name, port)).await;
container_names.push(name);
proxy_ports.push(port);
}
// Brief pause to let containers start
sleep(Duration::from_secs(8)).await;
crate::util::logger::log_info(&format!("All {} containers started, beginning health checks...", container_names.len())).await;
crate::util::logger::log_info(&format!(
"All {} containers started, beginning health checks...",
container_names.len()
)).await;
// === STEP 2: Test ALL proxies in parallel with 10-second intervals ===
// === STEP 2: Test ALL proxies in parallel ===
let results = Self::test_all_proxies_parallel(&container_names, &proxy_ports).await;
// Filter out failed containers
@@ -100,8 +126,10 @@ impl DockerVpnProxyPool {
for (i, (container_name, port)) in container_names.into_iter().zip(proxy_ports.into_iter()).enumerate() {
match &results[i] {
Ok(Some(ip)) => {
crate::util::logger::log_info(&format!("✓ Container {} on port {} ready with IP: {}",
container_name, port, ip)).await;
crate::util::logger::log_info(&format!(
"✓ Container {} on port {} ready with IP: {}",
container_name, port, ip
)).await;
working_containers.push(container_name);
working_ports.push(port);
}
@@ -113,14 +141,15 @@ impl DockerVpnProxyPool {
.ok()
.and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into());
crate::util::logger::log_error(&format!("✗ Container {} on port {} ready but IP detection failed. Logs: {:?}",
container_name, port, logs)).await;
crate::util::logger::log_error(&format!(
"✗ Container {} on port {} ready but IP detection failed. Logs: {:?}",
container_name, port, logs
)).await;
failed_count += 1;
// Clean up failed container
let _ = Self::cleanup_container(&container_name).await;
}
Err(e) => {
// Get container logs to debug
let logs = Command::new("docker")
.args(["logs", "--tail", "20", &container_name])
.output()
@@ -128,8 +157,10 @@ impl DockerVpnProxyPool {
.ok()
.and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into());
crate::util::logger::log_error(&format!("✗ Container {} on port {} failed: {}. Logs: {:?}",
container_name, port, e, logs)).await;
crate::util::logger::log_error(&format!(
"✗ Container {} on port {} failed: {}. Logs: {:?}",
container_name, port, e, logs
)).await;
failed_count += 1;
// Clean up failed container
let _ = Self::cleanup_container(&container_name).await;
@@ -138,14 +169,19 @@ impl DockerVpnProxyPool {
}
if working_containers.is_empty() {
return Err(anyhow!("All {} VPN proxy containers failed to start", num_servers));
return Err(anyhow!("All {} VPN proxy containers failed to start", total_containers));
}
crate::util::logger::log_info(&format!("Started {}/{} VPN proxy containers successfully",
working_containers.len(), num_servers)).await;
crate::util::logger::log_info(&format!(
"Started {}/{} VPN proxy containers successfully ({} hostnames × {} instances)",
working_containers.len(), total_containers, num_servers, instances_per_ovpn
)).await;
if failed_count > 0 {
crate::util::logger::log_warn(&format!("{} containers failed and were cleaned up", failed_count)).await;
crate::util::logger::log_warn(&format!(
"{} containers failed and were cleaned up",
failed_count
)).await;
}
Ok(Self {

239
src/scraper/hard_reset.rs Normal file
View File

@@ -0,0 +1,239 @@
// src/scraper/hard_reset.rs - PROPERLY FIXED: Matches main.rs initialization pattern
use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}};
use crate::{ChromeDriverPool, Config, logger, scraper::docker_vpn_proxy::{DockerVpnProxyPool, cleanup_all_proxy_containers}, util::directories::DataPaths};
/// Simple error counter for triggering hard resets
pub struct HardResetController {
consecutive_errors: AtomicUsize,
}
impl HardResetController {
pub fn new() -> Self {
Self {
consecutive_errors: AtomicUsize::new(0),
}
}
/// Record success - resets counter
pub fn record_success(&self) {
self.consecutive_errors.store(0, Ordering::SeqCst);
}
/// Record error - returns new count
pub fn record_error(&self) -> usize {
self.consecutive_errors.fetch_add(1, Ordering::SeqCst) + 1
}
/// Reset counter
pub fn reset(&self) {
self.consecutive_errors.store(0, Ordering::SeqCst);
}
/// Get current count
pub fn get_count(&self) -> usize {
self.consecutive_errors.load(Ordering::SeqCst)
}
}
/// Perform hard reset: shutdown everything and recreate
pub async fn perform_hard_reset(
pool_mutex: &Arc<tokio::sync::Mutex<Arc<ChromeDriverPool>>>,
config: &Config,
paths: &DataPaths,
monitoring: &Option<crate::monitoring::MonitoringHandle>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<()> {
let number_proxy_instances = config.proxy_instances_per_certificate.unwrap_or(1);
logger::log_error("🔴 STARTING HARD RESET SEQUENCE").await;
// Check if shutdown was requested
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown requested during hard reset, aborting").await;
return Ok(());
}
// Step 1: Acquire pool lock (prevents new tasks from using it)
logger::log_info(" [1/10] Acquiring pool lock...").await;
let mut pool_guard = pool_mutex.lock().await;
let old_pool = Arc::clone(&*pool_guard);
// Step 2: Wait a moment for active tasks to complete
logger::log_info(" [2/10] Waiting 10 seconds for active tasks...").await;
drop(pool_guard); // Release lock so tasks can finish
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
// Re-acquire lock
let mut pool_guard = pool_mutex.lock().await;
// Step 3: Shutdown ChromeDriver pool
logger::log_info(" [3/10] Shutting down ChromeDriver pool...").await;
if let Err(e) = old_pool.shutdown().await {
logger::log_warn(&format!(" Warning: Pool shutdown error: {}", e)).await;
}
// Step 4: Shutdown proxies
logger::log_info(" [4/10] Shutting down proxy containers...").await;
cleanup_all_proxy_containers().await.ok();
// Step 5: Wait for cleanup
logger::log_info(" [5/10] Waiting 30 seconds for cleanup...").await;
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
// Check shutdown again
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown requested during cleanup, aborting reset").await;
return Ok(());
}
// Step 6: Recreate proxy pool (if VPN rotation is enabled)
logger::log_info(" [6/10] Recreating proxy pool...").await;
let new_proxy_pool = if config.enable_vpn_rotation {
match recreate_proxy_pool_with_fresh_credentials(config, paths, monitoring, shutdown_flag).await {
Ok(pool) => {
logger::log_info(&format!(
" ✓ Proxy pool created with {} proxies",
pool.num_proxies()
)).await;
Some(pool)
}
Err(e) => {
logger::log_warn(&format!(
" ⚠️ Proxy creation failed: {}. Continuing without proxies.",
e
)).await;
None
}
}
} else {
logger::log_info(" ⊘ VPN rotation disabled, skipping proxy pool").await;
None
};
// Step 7: Recreate ChromeDriver pool
logger::log_info(" [7/10] Recreating ChromeDriver pool...").await;
let new_pool = Arc::new(
ChromeDriverPool::new_with_proxy_and_task_limit(
new_proxy_pool,
config,
monitoring.clone(),
).await?
);
logger::log_info(" ✓ ChromeDriver pool created").await;
// Step 8: Reset the error counter on the NEW pool
logger::log_info(" [8/10] Resetting error counter...").await;
new_pool.get_reset_controller().reset();
logger::log_info(" ✓ Error counter cleared").await;
// Step 9: Replace pool atomically
logger::log_info(" [9/10] Activating new pool...").await;
*pool_guard = new_pool;
drop(pool_guard);
// Step 10: Emit monitoring event
logger::log_info(" [10/10] Updating monitoring...").await;
if let Some(mon) = monitoring {
mon.emit(crate::monitoring::MonitoringEvent::PoolInitialized {
pool_size: config.max_parallel_instances,
with_proxy: config.enable_vpn_rotation,
with_rotation: config.max_tasks_per_instance > 0,
});
}
logger::log_info("✅ HARD RESET COMPLETE").await;
Ok(())
}
/// Recreate proxy pool with fresh VPNBook credentials (matches main.rs pattern)
async fn recreate_proxy_pool_with_fresh_credentials(
config: &Config,
paths: &DataPaths,
monitoring: &Option<crate::monitoring::MonitoringHandle>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Arc<DockerVpnProxyPool>> {
let number_proxy_instances = config.proxy_instances_per_certificate.unwrap_or(1);
// Check shutdown
if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow::anyhow!("Shutdown requested during proxy recreation"));
}
logger::log_info(" [6.1] Creating temporary ChromeDriver pool for credential fetch...").await;
// Create temporary pool WITHOUT proxy (just like main.rs does)
let temp_pool = Arc::new(
ChromeDriverPool::new_with_proxy_and_task_limit(
None, // No proxy for temp pool
config,
monitoring.clone(),
).await?
);
logger::log_info(" [6.2] Fetching fresh VPNBook credentials...").await;
// Fetch fresh VPNBook credentials (just like main.rs does)
let (username, password, _files) = crate::util::opnv::fetch_vpnbook_configs(
&temp_pool,
paths.cache_dir()
).await?;
logger::log_info(&format!(" [6.3] Got credentials → User: {}", username)).await;
// Shutdown temp pool
logger::log_info(" [6.4] Shutting down temporary pool...").await;
temp_pool.shutdown().await.ok();
// Check shutdown again
if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow::anyhow!("Shutdown requested during proxy recreation"));
}
// Check if we have VPN server configs
let server_count = std::fs::read_dir(paths.cache_openvpn_dir())?
.filter(|e| e.as_ref().unwrap().path().is_dir())
.count();
if server_count == 0 {
return Err(anyhow::anyhow!("No VPN servers found after credential fetch"));
}
logger::log_info(&format!(
" [6.5] Found {} VPN servers → Creating proxy pool with {} instances per server...",
server_count,
number_proxy_instances
)).await;
// Create new proxy pool (just like main.rs does)
let proxy_pool = Arc::new(
DockerVpnProxyPool::new(
paths.cache_openvpn_dir(),
username,
password,
number_proxy_instances,
).await?
);
logger::log_info(&format!(
" [6.6] ✓ Proxy pool ready with {} total proxies",
proxy_pool.num_proxies()
)).await;
// Emit proxy connected events for monitoring
if let Some(mon) = monitoring {
for i in 0..proxy_pool.num_proxies() {
if let Some(proxy_info) = proxy_pool.get_proxy_info(i) {
mon.emit(crate::monitoring::MonitoringEvent::ProxyConnected {
container_name: proxy_info.container_name.clone(),
ip_address: proxy_info.ip_address.clone(),
port: proxy_info.port,
});
}
}
}
Ok(proxy_pool)
}

View File

@@ -1,3 +1,4 @@
pub mod webdriver;
pub mod docker_vpn_proxy;
pub mod helpers;
pub mod helpers;
pub mod hard_reset;

View File

@@ -1,5 +1,9 @@
// src/scraper/webdriver.rs
use super::helpers::*;
use super::hard_reset::HardResetController;
use super::docker_vpn_proxy::DockerVpnProxyPool;
use crate::Config;
use crate::logger;
use anyhow::{anyhow, Context, Result};
use fantoccini::{Client, ClientBuilder};
@@ -13,8 +17,6 @@ use tokio::process::{Child, Command};
use tokio::task::JoinHandle;
use tokio::sync::{Mutex, Semaphore};
use tokio::time::{sleep, timeout, Duration};
use crate::scraper::docker_vpn_proxy::{DockerVpnProxyPool};
use crate::Config;
/// Manages a pool of ChromeDriver instances for parallel scraping with optional VPN binding.
pub struct ChromeDriverPool {
@@ -31,10 +33,16 @@ pub struct ChromeDriverPool {
min_request_interval_ms: u64,
monitoring: Option<crate::monitoring::MonitoringHandle>,
hard_reset_controller: Arc<HardResetController>,
config: Arc<Config>,
}
impl ChromeDriverPool {
/// Creates a new pool without any proxy (direct connection).
/// When consecutive errors reach this value, execute() will return a special error
/// that signals the caller to trigger a hard reset
const HARD_RESET_ERROR_THRESHOLD: usize = 12;
/// Creates a new pool without any proxy (direct connection).
pub async fn _new(config: &Config, monitoring: Option<crate::monitoring::MonitoringHandle>,) -> Result<Self> {
Self::new_with_proxy_and_task_limit(None, config, monitoring).await
}
@@ -85,6 +93,11 @@ impl ChromeDriverPool {
// Rotation is enabled when task limiting is active
let rotation_enabled = task_per_instance_limit > 0;
let half_size = if rotation_enabled {
(actual_pool_size + 1) / 2 // Runde auf bei ungerader Zahl
} else {
actual_pool_size
};
let mut instances = Vec::with_capacity(actual_pool_size);
@@ -105,8 +118,8 @@ impl ChromeDriverPool {
for i in 0..actual_pool_size {
// Pass the entire proxy_pool and the index
let instance = ChromeInstance::new(
proxy_pool.clone(), // Clone the Arc
i, // This instance's proxy index
proxy_pool.clone(),
i,
config,
monitoring.clone(),
).await?;
@@ -162,15 +175,21 @@ impl ChromeDriverPool {
let min_request_interval_ms = config.min_request_interval_ms;
let hard_reset_controller = Arc::new(HardResetController::new());
let config_clone = Arc::new(config.clone());
Ok(Self {
instances,
semaphore: Arc::new(Semaphore::new(actual_pool_size)),
semaphore: Arc::new(Semaphore::new(half_size)),
proxy_pool,
rotation_enabled,
next_instance: Arc::new(Mutex::new(0)),
last_request_time: Arc::new(Mutex::new(Instant::now())),
min_request_interval_ms,
monitoring,
hard_reset_controller,
config: config_clone,
})
}
@@ -188,10 +207,8 @@ impl ChromeDriverPool {
if elapsed < self.min_request_interval_ms {
let wait_ms = self.min_request_interval_ms - elapsed;
drop(last_time); // Lock vor Sleep freigeben!
drop(last_time);
sleep(Duration::from_millis(wait_ms)).await;
let mut last_time = self.last_request_time.lock().await;
*last_time = Instant::now();
} else {
@@ -199,12 +216,20 @@ impl ChromeDriverPool {
}
}
let random_index = random_range(0, self.instances.len() as u64) as usize;
// Index-Auswahl (vereinfacht, siehe unten für vollständige Rotation)
let index = if self.rotation_enabled {
self.get_rotated_index().await?
let instance = if self.rotation_enabled {
self.select_instance_with_rotation().await?
} else {
random_index
self.select_instance_round_robin().await
};
{
let mut inst = instance.lock().await;
inst.increment_task_count();
}
let index: usize = {
let instances = &self.instances;
instances.iter().position(|inst| Arc::ptr_eq(inst, &instance)).unwrap_or(0)
};
if let Some(ref mon) = self.monitoring {
@@ -216,15 +241,10 @@ impl ChromeDriverPool {
instance_id: index,
status: crate::monitoring::InstanceStatusChange::Active,
});
}
};
let instance = &self.instances[index];
let mut guard = instance.lock().await;
// NEU: Session mit automatischer Erneuerung holen!
let client = guard.get_or_renew_session().await?;
guard.increment_task_count();
let (task_count, session_requests) = guard.get_session_stats().await;
crate::util::logger::log_info(&format!(
@@ -232,17 +252,17 @@ impl ChromeDriverPool {
index, task_count, guard.max_tasks_per_instance, session_requests
)).await;
drop(guard); // Lock freigeben vor Navigation
drop(guard);
let start_time = Instant::now();
// Navigation mit Timeout
// Navigation with timeout
let navigation_result = timeout(
Duration::from_secs(60),
client.goto(&url)
).await;
match navigation_result {
let result = match navigation_result {
Ok(Ok(_)) => {
if let Some(ref mon) = self.monitoring {
mon.emit(crate::monitoring::MonitoringEvent::TaskCompleted {
@@ -258,14 +278,111 @@ impl ChromeDriverPool {
}
crate::util::logger::log_info(&format!("✓ Navigated to {}", url)).await;
// Parse-Funktion ausführen
parse(client).await
// Execute parse function
match parse(client).await {
Ok(data) => {
// ✅ SUCCESS: Record and log
let prev_count = self.hard_reset_controller.get_count();
self.hard_reset_controller.record_success();
if prev_count > 0 {
logger::log_info(&format!(
"✓ Success - reset counter cleared (was: {}/{})",
prev_count,
Self::HARD_RESET_ERROR_THRESHOLD
)).await;
}
Ok(data)
}
Err(e) => {
// ❌ PARSE ERROR: Record, check threshold, invalidate session
let error_count = self.hard_reset_controller.record_error();
{
let mut inst = instance.lock().await;
inst.invalidate_current_session().await;
}
// Enhanced logging with threshold status
let threshold_pct = (error_count as f64 / Self::HARD_RESET_ERROR_THRESHOLD as f64) * 100.0;
logger::log_warn(&format!(
"Parse error. Reset counter: {}/{} ({:.0}%)",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD,
threshold_pct
)).await;
// Check if threshold reached
if error_count >= Self::HARD_RESET_ERROR_THRESHOLD {
logger::log_error(&format!(
"🔴 HARD RESET THRESHOLD REACHED ({}/{})",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
)).await;
return Err(anyhow!(
"HARD_RESET_REQUIRED: Parse failed: {}. Threshold reached ({}/{})",
e,
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
));
}
Err(anyhow!(
"Parse failed: {}. Hard reset at {}/{}",
e,
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
))
}
}
}
Ok(Err(e)) => {
// ❌ NAVIGATION ERROR: Record, check threshold, invalidate session
crate::util::logger::log_error(&format!("Navigation failed: {}", e)).await;
Err(anyhow!("Navigation failed: {}", e))
{
let mut inst = instance.lock().await;
inst.invalidate_current_session().await;
}
let error_count = self.hard_reset_controller.record_error();
// Enhanced logging
let threshold_pct = (error_count as f64 / Self::HARD_RESET_ERROR_THRESHOLD as f64) * 100.0;
logger::log_warn(&format!(
"Navigation error. Reset counter: {}/{} ({:.0}%)",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD,
threshold_pct
)).await;
// Check if threshold reached
if error_count >= Self::HARD_RESET_ERROR_THRESHOLD {
logger::log_error(&format!(
"🔴 HARD RESET THRESHOLD REACHED ({}/{})",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
)).await;
return Err(anyhow!(
"HARD_RESET_REQUIRED: Navigation failed: {}. Threshold reached ({}/{})",
e,
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
));
}
Err(anyhow!(
"Navigation failed: {}. Hard reset at {}/{}",
e,
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
))
}
Err(_) => {
// ❌ TIMEOUT ERROR: Record, check threshold, invalidate session
if let Some(ref mon) = self.monitoring {
mon.emit(crate::monitoring::MonitoringEvent::NavigationTimeout {
instance_id: index,
@@ -273,55 +390,138 @@ impl ChromeDriverPool {
});
}
let error_count = self.hard_reset_controller.record_error();
crate::util::logger::log_error("Navigation timeout (60s)").await;
Err(anyhow!("Navigation timeout"))
{
let mut inst = instance.lock().await;
inst.invalidate_current_session().await;
}
// Enhanced logging
let threshold_pct = (error_count as f64 / Self::HARD_RESET_ERROR_THRESHOLD as f64) * 100.0;
logger::log_warn(&format!(
"Timeout error. Reset counter: {}/{} ({:.0}%)",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD,
threshold_pct
)).await;
// Check if threshold reached
if error_count >= Self::HARD_RESET_ERROR_THRESHOLD {
logger::log_error(&format!(
"🔴 HARD RESET THRESHOLD REACHED ({}/{})",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
)).await;
return Err(anyhow!(
"HARD_RESET_REQUIRED: Navigation timeout. Threshold reached ({}/{})",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
));
}
Err(anyhow!(
"Navigation timeout. Hard reset at {}/{}",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
))
}
};
{
let mut inst = instance.lock().await;
inst.task_count = inst.task_count.saturating_sub(1);
}
result
}
async fn get_rotated_index(&self) -> Result<usize> {
let total = self.instances.len();
let half_size = total / 2;
/// Simple round-robin instance selection (no rotation)
async fn select_instance_round_robin(&self) -> Arc<Mutex<ChromeInstance>> {
let mut next = self.next_instance.lock().await;
let index = *next;
*next = (*next + 1) % self.instances.len();
drop(next);
Arc::clone(&self.instances[index])
}
/// Round-robin with half-pool rotation
async fn select_instance_with_rotation(&self) -> Result<Arc<Mutex<ChromeInstance>>> {
let pool_size = self.instances.len();
let half_size = pool_size / 2;
if half_size == 0 {
return Ok(0); // Pool zu klein für Rotation
// Pool too small for rotation, fall back to simple round-robin
return Ok(self.select_instance_round_robin().await);
}
let mut next_idx = self.next_instance.lock().await;
let current_half_start = if *next_idx < half_size { 0 } else { half_size };
let current_half_end = if *next_idx < half_size { half_size } else { total };
let mut next = self.next_instance.lock().await;
let current_half_start = (*next / half_size) * half_size;
let current_half_end = (current_half_start + half_size).min(pool_size);
// Suche verfügbare Instanz in aktueller Hälfte
for offset in 0..(current_half_end - current_half_start) {
let candidate_idx = current_half_start + ((*next_idx + offset) % half_size);
// Try to find available instance in current half
let mut attempts = 0;
let max_attempts = half_size * 2; // Try both halves
while attempts < max_attempts {
let index = current_half_start + (*next % half_size);
let instance = &self.instances[index];
let instance = &self.instances[candidate_idx];
let guard = instance.lock().await;
// Check if instance can accept more tasks
let mut inst = instance.lock().await;
let can_accept = inst.get_task_count() < inst.max_tasks_per_instance;
drop(inst);
if guard.max_tasks_per_instance == 0 ||
guard.task_count < guard.max_tasks_per_instance {
*next_idx = (candidate_idx + 1) % total;
drop(guard);
return Ok(candidate_idx);
if can_accept {
*next = (*next + 1) % pool_size;
drop(next);
if let Some(ref mon) = self.monitoring {
mon.emit(crate::monitoring::MonitoringEvent::InstanceSelected {
instance_id: index,
half: if index < half_size { 1 } else { 2 },
});
}
return Ok(Arc::clone(instance));
}
// Current half saturated, try other half
if attempts == half_size - 1 {
logger::log_info("Current half saturated, rotating to other half").await;
*next = if current_half_start == 0 { half_size } else { 0 };
} else {
*next = (*next + 1) % pool_size;
}
attempts += 1;
}
// Aktuelle Hälfte voll → Zur anderen wechseln
crate::util::logger::log_info("Current half saturated, rotating to other half").await;
drop(next);
let new_half_start = if current_half_start == 0 { half_size } else { 0 };
let new_half_end = if current_half_start == 0 { total } else { half_size };
// Alte Hälfte zurücksetzen (für nächste Rotation)
for i in current_half_start..current_half_end {
let mut instance = self.instances[i].lock().await;
instance.reset_task_count();
}
*next_idx = new_half_start;
drop(next_idx);
Ok(new_half_start)
// All instances saturated
Err(anyhow!("All instances at task capacity"))
}
pub fn get_reset_controller(&self) -> Arc<HardResetController> {
Arc::clone(&self.hard_reset_controller)
}
/// Check if hard reset threshold has been reached
pub fn should_perform_hard_reset(&self) -> bool {
self.hard_reset_controller.get_count() >= Self::HARD_RESET_ERROR_THRESHOLD
}
/// Get current error count and threshold for monitoring
pub fn get_reset_status(&self) -> (usize, usize) {
(
self.hard_reset_controller.get_count(),
Self::HARD_RESET_ERROR_THRESHOLD
)
}
/// Gracefully shut down all ChromeDriver processes and Docker proxy containers.
@@ -369,7 +569,7 @@ pub struct ChromeInstance {
current_session: Arc<Mutex<Option<Client>>>, // Current active session
session_request_count: Arc<Mutex<usize>>,
max_requests_per_session: usize, // z.B. 25
max_requests_per_session: usize,
proxy_pool: Option<Arc<DockerVpnProxyPool>>, // Referernce to the proxy pool
current_proxy_index: Arc<Mutex<usize>>, // Current proxy index in use
@@ -411,8 +611,6 @@ impl ChromeInstance {
pub async fn get_or_renew_session(&self) -> Result<Client> {
let mut session_opt = self.current_session.lock().await;
let mut request_count = self.session_request_count.lock().await;
let old_request_count = *request_count;
// Session erneuern wenn:
// 1. Keine Session vorhanden
@@ -476,7 +674,7 @@ impl ChromeInstance {
mon.emit(crate::monitoring::MonitoringEvent::SessionRenewed {
instance_id: self.instance_id,
old_request_count: *request_count,
reason: crate::monitoring::RenewalReason::RequestLimit,
reason: reason,
new_proxy: new_proxy_info,
});
}
@@ -490,15 +688,21 @@ impl ChromeInstance {
}
async fn create_fresh_session(&self) -> Result<Client> {
// Hole aktuellen Proxy-URL ohne self zu mutieren
let proxy_url = if let Some(ref pool) = self.proxy_pool {
let mut proxy_idx = self.current_proxy_index.lock().await;
*proxy_idx = (*proxy_idx + 1) % pool.num_proxies();
let url = pool.get_proxy_url(*proxy_idx);
let num_proxies = pool.num_proxies();
crate::util::logger::log_info(&format!(
"Using proxy {} for new session",
*proxy_idx
// Round-robin through all proxies
let selected_proxy = *proxy_idx % num_proxies;
*proxy_idx = (*proxy_idx + 1) % num_proxies;
let url = pool.get_proxy_url(selected_proxy);
logger::log_info(&format!(
"Instance {} creating session with proxy {}/{} (rotation)",
self.instance_id,
selected_proxy,
num_proxies
)).await;
Some(url)
@@ -516,38 +720,19 @@ impl ChromeInstance {
.context("Failed to connect to ChromeDriver")
}
fn chrome_args_with_ua(&self, user_agent: &str, proxy_url: &Option<String>) -> Map<String, Value> {
let mut args = vec![
"--headless=new".to_string(),
"--disable-gpu".to_string(),
"--no-sandbox".to_string(),
"--disable-dev-shm-usage".to_string(),
"--disable-infobars".to_string(),
"--disable-extensions".to_string(),
"--disable-popup-blocking".to_string(),
"--disable-notifications".to_string(),
"--disable-autofill".to_string(),
"--disable-sync".to_string(),
"--disable-default-apps".to_string(),
"--disable-translate".to_string(),
"--disable-blink-features=AutomationControlled".to_string(),
format!("--user-agent={}", user_agent),
];
pub async fn invalidate_current_session(&self) {
let mut session_opt = self.current_session.lock().await;
if let Some(proxy) = proxy_url {
args.push(format!("--proxy-server={}", proxy));
if let Some(old_session) = session_opt.take() {
crate::util::logger::log_info(&format!(
"Invalidating broken session for instance {}",
self.instance_id
)).await;
let _ = old_session.close().await;
}
let caps = serde_json::json!({
"goog:chromeOptions": {
"args": args,
"excludeSwitches": ["enable-logging", "enable-automation"],
"prefs": {
"profile.default_content_setting_values.notifications": 2
}
}
});
caps.as_object().cloned().unwrap()
let mut request_count = self.session_request_count.lock().await;
*request_count = 0;
}
pub fn reset_task_count(&mut self) {
@@ -578,6 +763,20 @@ impl ChromeInstance {
Ok(())
}
pub fn is_available(&self) -> bool {
if self.max_tasks_per_instance == 0 {
return true; // No limit
}
self.task_count < self.max_tasks_per_instance
}
pub fn tasks_remaining(&self) -> usize {
if self.max_tasks_per_instance == 0 {
return usize::MAX;
}
self.max_tasks_per_instance.saturating_sub(self.task_count)
}
/// Spawns the actual `chromedriver` binary and waits for it to become ready.
async fn spawn_chromedriver() -> Result<(String, Child, JoinHandle<()>)> {
let mut process = Command::new("chromedriver-win64/chromedriver.exe")
@@ -624,6 +823,40 @@ impl ChromeInstance {
Err(anyhow!("ChromeDriver failed to start within 30s"))
}
fn chrome_args_with_ua(&self, user_agent: &str, proxy_url: &Option<String>) -> Map<String, Value> {
let mut args = vec![
"--headless=new".to_string(),
"--disable-gpu".to_string(),
"--no-sandbox".to_string(),
"--disable-dev-shm-usage".to_string(),
"--disable-infobars".to_string(),
"--disable-extensions".to_string(),
"--disable-popup-blocking".to_string(),
"--disable-notifications".to_string(),
"--disable-autofill".to_string(),
"--disable-sync".to_string(),
"--disable-default-apps".to_string(),
"--disable-translate".to_string(),
"--disable-blink-features=AutomationControlled".to_string(),
format!("--user-agent={}", user_agent),
];
if let Some(proxy) = proxy_url {
args.push(format!("--proxy-server={}", proxy));
}
let caps = serde_json::json!({
"goog:chromeOptions": {
"args": args,
"excludeSwitches": ["enable-logging", "enable-automation"],
"prefs": {
"profile.default_content_setting_values.notifications": 2
}
}
});
caps.as_object().cloned().unwrap()
}
pub fn chrome_user_agent() -> &'static str {
static UAS: &[&str] = &[
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.91 Safari/537.36",