diff --git a/Cargo.lock b/Cargo.lock index c5e75cc..bc603b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -680,11 +680,10 @@ dependencies = [ "serde", "serde_json", "tokio", - "toml", "tracing", "tracing-subscriber", "url", - "windows-service", + "walkdir", "yfinance-rs", "zip", ] @@ -2531,6 +2530,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -2676,15 +2684,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "serde_spanned" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e24345aa0fe688594e73770a5f6d1b216508b4f93484c0026d521acd30134392" -dependencies = [ - "serde_core", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3130,21 +3129,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "toml" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0dc8b1fb61449e27716ec0e1bdf0f6b8f3e8f6b05391e8497b8b6d7804ea6d8" -dependencies = [ - "indexmap", - "serde_core", - "serde_spanned", - "toml_datetime", - "toml_parser", - "toml_writer", - "winnow", -] - [[package]] name = "toml_datetime" version = "0.7.3" @@ -3175,12 +3159,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "toml_writer" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2" - [[package]] name = "tower" version = "0.5.2" @@ -3394,6 +3372,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3526,10 +3514,13 @@ dependencies = [ ] [[package]] -name = "widestring" -version = "1.2.1" +name = "winapi-util" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72069c3113ab32ab29e5584db3c6ec55d416895e60715417b5b883a357c3e471" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] [[package]] name = "windows-core" @@ -3592,17 +3583,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-service" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "193cae8e647981c35bc947fdd57ba7928b1fa0d4a79305f6dd2dc55221ac35ac" -dependencies = [ - "bitflags", - "widestring", - "windows-sys 0.59.0", -] - [[package]] name = "windows-strings" version = "0.5.1" @@ -3621,15 +3601,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-sys" -version = "0.59.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" -dependencies = [ - "windows-targets 0.52.6", -] - [[package]] name = "windows-sys" version = "0.60.2" diff --git a/Cargo.toml b/Cargo.toml index 493d7ed..104b69d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,16 +32,13 @@ flate2 = "1.1.5" # Formatting regex = "1.12.2" - -# Windows features -windows-service = "0.8.0" +walkdir = "2" # Generating rand = "0.9.2" # Environment handling dotenvy = "0.15" -toml = "0.9.8" # Date & time chrono = { version = "0.4", features = ["serde"] } diff --git a/event_backtest_engine.exe b/event_backtest_engine.exe index 1dedcb7..8376f59 100644 Binary files a/event_backtest_engine.exe and b/event_backtest_engine.exe differ diff --git a/install_tap_adapters.ps1 b/install_tap_adapters.ps1 deleted file mode 100644 index 629eb4d..0000000 --- a/install_tap_adapters.ps1 +++ /dev/null @@ -1,53 +0,0 @@ -# Auto-generated TAP adapter installation script -# Requires Administrator privileges - -$ErrorActionPreference = "Stop" - -# Check if running as Administrator -$currentPrincipal = New-Object Security.Principal.WindowsPrincipal([Security.Principal.WindowsIdentity]::GetCurrent()) -$isAdmin = $currentPrincipal.IsInRole([Security.Principal.WindowsBuiltInRole]::Administrator) - -if (-not $isAdmin) { - Write-Host "ERROR: This script must be run as Administrator!" -ForegroundColor Red - Write-Host "Right-click PowerShell and select 'Run as Administrator'" -ForegroundColor Yellow - exit 1 -} - -Write-Host "Installing additional TAP adapters..." -ForegroundColor Cyan - -$tapctlPath = "C:\Program Files\OpenVPN\bin\tapctl.exe" - -if (-not (Test-Path $tapctlPath)) { - Write-Host "ERROR: OpenVPN not found at: $tapctlPath" -ForegroundColor Red - Write-Host "Please install OpenVPN from: https://openvpn.net/community-downloads/" -ForegroundColor Yellow - exit 1 -} - -$existingCount = 10 -$targetCount = 10 - -for ($i = ($existingCount + 1); $i -le $targetCount; $i++) { - Write-Host "Creating TAP adapter #$i..." -ForegroundColor Yellow - - try { - & $tapctlPath create --name "OpenVPN-TAP-$i" - - if ($LASTEXITCODE -eq 0) { - Write-Host " ✓ Created OpenVPN-TAP-$i" -ForegroundColor Green - } else { - Write-Host " ⚠ Failed to create adapter (exit code: $LASTEXITCODE)" -ForegroundColor Red - } - } catch { - Write-Host " ✗ Error: $_" -ForegroundColor Red - } - - Start-Sleep -Milliseconds 500 -} - -Write-Host "`n✓ TAP adapter installation complete!" -ForegroundColor Green -Write-Host "Verifying installation..." -ForegroundColor Cyan - -$finalCount = (Get-NetAdapter | Where-Object { $_.InterfaceDescription -like "*TAP*" }).Count -Write-Host "Total TAP adapters now: $finalCount" -ForegroundColor Cyan - -exit 0 diff --git a/src/lib.rs b/src/lib.rs index 0553bbd..94734b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,10 +10,6 @@ pub mod util; // Re-export commonly used types for convenience pub use config::Config; pub use scraper::webdriver::{ChromeDriverPool, ChromeInstance, ScrapeTask}; -pub use scraper::vpn_manager::{VpnInstance, VpnPool}; pub use util::directories::DataPaths; pub use util::logger; -pub use util::opnv; - -#[cfg(target_os = "windows")] -pub use scraper::forcebindip::ForceBindIpManager; \ No newline at end of file +pub use util::opnv; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 4a03e4a..2bf40cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ // src/main.rs + mod config; mod corporate; mod economic; @@ -7,163 +8,162 @@ mod scraper; use anyhow::Result; use config::Config; +use scraper::docker_vpn_proxy::{DockerVpnProxyPool, cleanup_all_proxy_containers}; use scraper::webdriver::ChromeDriverPool; -use scraper::vpn_manager::VpnPool; use util::directories::DataPaths; use util::{logger, opnv}; use std::sync::Arc; -/// The entry point of the application. -/// -/// This function loads the configuration, optionally initializes a VPN pool, -/// initializes a shared ChromeDriver pool bound to the VPN pool (if enabled), -/// and sequentially runs the full updates for corporate and economic data. -/// -/// If VPN rotation is enabled: -/// 1. Fetches latest VPNBook OpenVPN configurations -/// 2. Creates a VPN pool and connects all VPN instances -/// 3. Binds each ChromeDriver instance to a different VPN for IP rotation -/// 4. Performs periodic health checks to reconnect unhealthy VPN instances -/// -/// # Errors -/// -/// Returns an error if configuration loading fails, pool initialization fails, -/// VPN fetching fails (if enabled), or if either update function encounters an issue -/// (e.g., network errors, scraping failures, or chromedriver spawn failures). +/// Application entry point +// src/main.rs + +// ... existing imports ... + #[tokio::main] async fn main() -> Result<()> { + cleanup_all_proxy_containers().await.ok(); + + // Load configuration from .env let config = Config::load().map_err(|err| { - println!("Failed to load Config .env: {}", err); + eprintln!("Failed to load config: {}", err); err })?; - // Initialize paths + // Initialize paths and logger let paths = DataPaths::new(".")?; - - // Initialize logger - logger::init_debug_logger(paths.logs_dir()).await.map_err(|e| { - anyhow::anyhow!("Logger initialization failed: {}", e) - })?; - - logger::log_info("=== Application started ===").await; - logger::log_info(&format!("Config: economic_start_date={}, corporate_start_date={}, lookahead_months={}, max_parallel_instances={}, enable_vpn_rotation={}, max_tasks_per_instance={}", - config.economic_start_date, config.corporate_start_date, config.economic_lookahead_months, config.max_parallel_instances, config.enable_vpn_rotation, config.max_tasks_per_instance)).await; - - // Initialize VPN pool if enabled - let vpn_pool = if config.enable_vpn_rotation { - logger::log_info("=== VPN Rotation Enabled ===").await; - logger::log_info("--- Fetching latest VPNBook OpenVPN configurations ---").await; - - let (username, password, _files) = - util::opnv::fetch_vpnbook_configs(&Arc::new(ChromeDriverPool::new(1).await?), paths.cache_dir()).await?; - - let amount_of_openvpn_servers = _files.len(); - - logger::log_info(&format!("✓ Fetched VPN credentials - Username: {}", username)).await; - - // Create VPN pool - let openvpn_dir = paths.cache_dir().join("openvpn"); - logger::log_info("--- Initializing VPN Pool ---").await; - let vp = Arc::new(VpnPool::new( - &openvpn_dir, - username, - password, - true, // enable rotation - config.tasks_per_vpn_session, - amount_of_openvpn_servers, - ).await?); - - // Connect all VPN instances (gracefully handles failures) - logger::log_info("--- Connecting to VPN servers ---").await; - match vp.connect_all().await { - Ok(()) => { - logger::log_info("✓ VPN initialization complete").await; - Some(vp) - } - Err(e) => { - logger::log_warn(&format!( - "⚠ VPN initialization failed: {}. Continuing without VPN.", - e - )).await; - None - } - } - } else { - None - }; - - // Initialize the shared ChromeDriver pool with VPN pool - let pool_size = config.max_parallel_instances; - let max_tasks_per_instance = config.max_tasks_per_instance; - + logger::init_debug_logger(paths.logs_dir()).await.ok(); + logger::log_info("=== Event Backtest Engine Started ===").await; logger::log_info(&format!( - "Initializing ChromeDriver pool with size: {}{}", - pool_size, - if max_tasks_per_instance > 0 { &format!(" (max {} tasks/instance)", max_tasks_per_instance) } else { "" } + "Config → parallel_instances: {}, task_limit: {} vpn_rotation: {}", + config.max_parallel_instances, + config.max_tasks_per_instance, + config.enable_vpn_rotation )).await; - - let pool = Arc::new( - if max_tasks_per_instance > 0 { - ChromeDriverPool::new_with_vpn_and_task_limit(pool_size, vpn_pool.clone(), max_tasks_per_instance).await? - } else if vpn_pool.is_some() { - ChromeDriverPool::new_with_vpn(pool_size, vpn_pool.clone()).await? + + // === Step 1: Fetch fresh VPNBook credentials and .ovpn files (if rotation enabled) === + let proxy_pool: Option> = if config.enable_vpn_rotation { + logger::log_info("VPN Rotation Enabled — Fetching latest VPNBook configs").await; + + // We only need 1 Chrome instance to scrape vpnbook.com (no proxy yet) + let temp_pool = Arc::new(ChromeDriverPool::new(1).await?); + let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?; + + logger::log_info(&format!("VPNBook credentials → User: {}", username)).await; + + // Count how many distinct servers (subfolders) we have in cache/openvpn/ + let server_count = std::fs::read_dir(paths.cache_openvpn_dir())? + .filter(|e| e.as_ref().unwrap().path().is_dir()) + .count(); + + if server_count == 0 { + logger::log_warn("No VPN servers found — continuing without VPN").await; + None } else { - ChromeDriverPool::new(pool_size).await? - } - ); - - logger::log_info("✓ ChromeDriver pool initialized successfully").await; + logger::log_info(&format!("Found {} VPN servers — starting Docker proxy containers", server_count)).await; - // Spawn background Ctrl-C handler to gracefully shutdown pool and VPNs - { - let pool_for_signal = Arc::clone(&pool); - let vpn_for_signal = vpn_pool.clone(); - tokio::spawn(async move { - if let Err(e) = tokio::signal::ctrl_c().await { - let _ = util::logger::log_error(&format!("Ctrl-C handler failed to install: {}", e)).await; - return; - } + let pp = Arc::new( + DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await? + ); - let _ = util::logger::log_info("Ctrl-C received — initiating graceful shutdown").await; - - if let Err(e) = pool_for_signal.shutdown().await { - let _ = util::logger::log_warn(&format!("Error shutting down ChromeDriver pool: {}", e)).await; - } - - if let Some(vp) = vpn_for_signal { - if let Err(e) = vp.disconnect_all().await { - let _ = util::logger::log_warn(&format!("Error disconnecting VPNs: {}", e)).await; + // Verify all proxies are working before proceeding + logger::log_info("Verifying all proxy connections...").await; + let mut all_working = true; + for i in 0..pp.num_proxies() { + match pp.test_proxy_connection(i).await { + Ok(ip) => { + logger::log_info(&format!(" Proxy {}: working with IP: {}", i + 1, ip)).await; + } + Err(e) => { + logger::log_error(&format!(" Proxy {}: FAILED - {}", i + 1, e)).await; + all_working = false; + } } } - let _ = util::logger::log_info("Graceful shutdown complete (from Ctrl-C)").await; - // Exit the process now that cleanup is done + if !all_working { + logger::log_warn("Some proxies failed, but continuing with working ones...").await; + } else { + logger::log_info("All proxies verified and ready!").await; + } + + logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await; + Some(pp) + } + } else { + logger::log_info("VPN rotation disabled — using direct connection").await; + None + }; + + // === Step 2: Initialize the main ChromeDriver pool (with proxy if enabled) === + let pool_size = config.max_parallel_instances; + let task_limit = config.max_tasks_per_instance; + + logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", pool_size)).await; + + let pool = Arc::new( + if task_limit > 0 { + ChromeDriverPool::new_with_proxy_and_task_limit(pool_size, proxy_pool.clone(), task_limit).await? + } else { + ChromeDriverPool::new_with_proxy(pool_size, proxy_pool.clone()).await? + } + ); + + logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size)).await; + + // === Step 3: Graceful Ctrl+C handler === + { + let pool_clone = Arc::clone(&pool); + let proxy_clone = proxy_pool.clone(); + + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + + logger::log_info("Ctrl+C received — shutting down gracefully...").await; + + // Now works: &*pool_clone derefs Arc → &ChromeDriverPool + if let Err(e) = (&*pool_clone).shutdown().await { + logger::log_error(&format!("Error during pool shutdown: {}", e)).await; + } + + if let Some(pp) = proxy_clone { + if let Err(e) = pp.shutdown().await { + logger::log_warn(&format!("Failed to stop Docker containers: {}", e)).await; + } else { + logger::log_info("All Docker VPN containers stopped").await; + } + } + + let _ = cleanup_all_proxy_containers().await; + std::process::exit(0); }); } - // Run economic update first, passing the shared pool - logger::log_info("--- Starting economic data update ---").await; + // === Step 4: Run the actual scraping jobs === + logger::log_info("--- Starting ECONOMIC data update ---").await; economic::run_full_update(&config, &pool).await?; - logger::log_info("✓ Economic data update completed").await; + logger::log_info("Economic update completed").await; - // Then run corporate update, passing the shared pool - logger::log_info("--- Starting corporate data update ---").await; + logger::log_info("--- Starting CORPORATE data update ---").await; corporate::run_full_update(&config, &pool).await?; - logger::log_info("✓ Corporate data update completed").await; + logger::log_info("Corporate update completed").await; - // Shutdown ChromeDriver pool before disconnecting VPNs so instances can - // cleanly terminate any network-bound processes. - logger::log_info("--- Shutting down ChromeDriver pool ---").await; + // === Step 5: Final cleanup === + logger::log_info("Shutting down ChromeDriver pool...").await; pool.shutdown().await?; - logger::log_info("✓ ChromeDriver pool shutdown complete").await; - // Disconnect all VPN instances if enabled - if let Some(vp) = vpn_pool { - logger::log_info("--- Disconnecting VPN instances ---").await; - vp.disconnect_all().await?; + if let Some(pp) = proxy_pool { + logger::log_info("Stopping Docker VPN proxy containers...").await; + pp.shutdown().await?; + // CLEANUP ANY LEFTOVER CONTAINERS FROM PREVIOUS RUNS + cleanup_all_proxy_containers().await.ok(); } - logger::log_info("=== Application completed successfully ===").await; + logger::log_info("=== Application finished successfully ===").await; Ok(()) -} \ No newline at end of file +} + +/* +memory allocation of 4294967296 bytes failed +error: process didn't exit successfully: `target\debug\event_backtest_engine.exe` (exit code: 0xc0000409, STATUS_STACK_BUFFER_OVERRUN) +*/ \ No newline at end of file diff --git a/src/scraper/create_tapctls.sh b/src/scraper/create_tapctls.sh deleted file mode 100644 index daa6359..0000000 --- a/src/scraper/create_tapctls.sh +++ /dev/null @@ -1,7 +0,0 @@ -# Als Administrator ausführen -cd "C:\Program Files\OpenVPN\bin" - -# 10 TAP-Adapter hinzufügen -for ($i=2; $i -le 10; $i++) { - .\tapctl.exe create --name "OpenVPN-TAP-$i" -} \ No newline at end of file diff --git a/src/scraper/docker_vpn_proxy.rs b/src/scraper/docker_vpn_proxy.rs new file mode 100644 index 0000000..05ef77a --- /dev/null +++ b/src/scraper/docker_vpn_proxy.rs @@ -0,0 +1,407 @@ +use anyhow::{anyhow, Context, Result}; +use futures::future::join_all; +use std::{path::{Path, PathBuf}, time::Duration}; +use tokio::{process::Command, time::{sleep}}; +use walkdir::WalkDir; + +pub struct DockerVpnProxyPool { + container_names: Vec, + proxy_ports: Vec, // e.g., [10801, 10802, ...] +} + +impl DockerVpnProxyPool { + pub async fn new(ovpn_dir: &Path, username: String, password: String) -> Result { + // Count hostnames (subdirs in ovpn_dir) + let hostnames: Vec<_> = std::fs::read_dir(ovpn_dir)? + .filter_map(Result::ok) + .filter(|e| e.path().is_dir()) + .map(|e| e.file_name().into_string().unwrap()) + .collect(); + + let num_servers = hostnames.len(); + if num_servers == 0 { + return Err(anyhow!("No VPN hostnames found in {:?}", ovpn_dir)); + } + + crate::util::logger::log_info(&format!("Found {} VPN hostnames", num_servers)).await; + + let mut container_names = Vec::with_capacity(num_servers); + let mut proxy_ports = Vec::with_capacity(num_servers); + let base_port: u16 = 10800; + + // === STEP 1: Start ALL containers first === + for (i, hostname) in hostnames.iter().enumerate() { + // Pick tcp443.ovpn if exists, else first .ovpn + let hostname_dir = ovpn_dir.join(hostname); + let mut ovpn_path: Option = None; + for entry in WalkDir::new(&hostname_dir).max_depth(1) { + let entry = entry?; + if entry.path().extension().map_or(false, |ext| ext == "ovpn") { + if entry.file_name().to_str().unwrap_or("").contains("tcp443") { + ovpn_path = Some(entry.path().to_path_buf()); + break; + } else if ovpn_path.is_none() { + ovpn_path = Some(entry.path().to_path_buf()); + } + } + } + + let ovpn_path = ovpn_path.ok_or_else(|| anyhow!("No .ovpn found for {}", hostname))?; + + let name = format!("vpn-proxy-{}", i); + let port = base_port + i as u16 + 1; + + // Clean up any existing container with the same name + let _ = Command::new("docker") + .args(["rm", "-f", &name]) + .status() + .await; + + // Run Docker container + let status = Command::new("docker") + .args([ + "run", "-d", + "--name", &name, + "--cap-add=NET_ADMIN", + "--device", "/dev/net/tun", + "--sysctl", "net.ipv4.ip_forward=1", + "-v", &format!("{}:/vpn/config.ovpn", ovpn_path.display()), + "-e", &format!("VPN_USERNAME={}", username), + "-e", &format!("VPN_PASSWORD={}", password), + "-p", &format!("{}:1080", port), + "rust-vpn-proxy", + ]) + .status() + .await + .context("Failed to run Docker")?; + + if !status.success() { + return Err(anyhow!("Docker run failed for {}", name)); + } + + crate::util::logger::log_info(&format!("Started container {} on port {} (waiting for VPN...)", name, port)).await; + + container_names.push(name); + proxy_ports.push(port); + } + + // Brief pause to let containers start + sleep(Duration::from_secs(8)).await; + crate::util::logger::log_info(&format!("All {} containers started, beginning health checks...", container_names.len())).await; + + // === STEP 2: Test ALL proxies in parallel with 10-second intervals === + let results = Self::test_all_proxies_parallel(&container_names, &proxy_ports).await; + + // Filter out failed containers + let mut working_containers = Vec::new(); + let mut working_ports = Vec::new(); + let mut failed_count = 0; + + for (i, (container_name, port)) in container_names.into_iter().zip(proxy_ports.into_iter()).enumerate() { + match &results[i] { + Ok(Some(ip)) => { + crate::util::logger::log_info(&format!("✓ Container {} on port {} ready with IP: {}", + container_name, port, ip)).await; + working_containers.push(container_name); + working_ports.push(port); + } + Ok(None) => { + crate::util::logger::log_warn(&format!("✓ Container {} on port {} ready but IP detection failed", + container_name, port)).await; + working_containers.push(container_name); + working_ports.push(port); + } + Err(e) => { + // Get container logs to debug + let logs = Command::new("docker") + .args(["logs", "--tail", "20", &container_name]) + .output() + .await + .ok() + .and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into()); + + crate::util::logger::log_error(&format!("✗ Container {} on port {} failed: {}. Logs: {:?}", + container_name, port, e, logs)).await; + failed_count += 1; + // Clean up failed container + let _ = Self::cleanup_container(&container_name).await; + } + } + } + + if working_containers.is_empty() { + return Err(anyhow!("All {} VPN proxy containers failed to start", num_servers)); + } + + crate::util::logger::log_info(&format!("Started {}/{} VPN proxy containers successfully", + working_containers.len(), num_servers)).await; + + if failed_count > 0 { + crate::util::logger::log_warn(&format!("{} containers failed and were cleaned up", failed_count)).await; + } + + Ok(Self { + container_names: working_containers, + proxy_ports: working_ports, + }) + } + + /// Test all proxies in parallel with 10-second intervals between tests + async fn test_all_proxies_parallel(container_names: &[String], proxy_ports: &[u16]) -> Vec>> { + let mut tasks = Vec::new(); + + for (i, (container_name, port)) in container_names.iter().zip(proxy_ports.iter()).enumerate() { + let name = container_name.clone(); + let port = *port; + + tasks.push(tokio::spawn(async move { + // Try up to 6 times with 10-second intervals (total 60 seconds) + for attempt in 1..=6 { + crate::util::logger::log_info(&format!("Testing proxy {} (port {}) - Attempt {}/6", + name, port, attempt)).await; + + match Self::test_single_proxy(port).await { + Ok(Some(ip)) => { + return Ok(Some(ip)); + } + Ok(None) => { + // Connection works but IP detection failed + return Ok(None); + } + Err(e) if attempt < 6 => { + crate::util::logger::log_info(&format!("Attempt {}/6 for {}: {} - retrying in 10s", + attempt, name, e)).await; + sleep(Duration::from_secs(10)).await; + } + Err(e) => { + return Err(anyhow!("Failed after 6 attempts: {}", e)); + } + } + } + Err(anyhow!("Unexpected exit from retry loop")) + })); + } + + // Wait for all tasks to complete + join_all(tasks) + .await + .into_iter() + .map(|result| match result { + Ok(inner) => inner, + Err(e) => Err(anyhow!("Task panicked: {}", e)), + }) + .collect() + } + + /// Test a single proxy connection + async fn test_single_proxy(port: u16) -> Result> { + use std::io::{Read, Write}; + use std::net::TcpStream; + use std::time::Duration as StdDuration; + + // First, test SOCKS5 handshake directly + crate::util::logger::log_info(&format!("Testing SOCKS5 handshake on port {}...", port)).await; + + // Use spawn_blocking for synchronous I/O + let test_result = tokio::task::spawn_blocking(move || { + // Connect to SOCKS5 proxy + let mut stream = match TcpStream::connect_timeout( + &format!("127.0.0.1:{}", port).parse().unwrap(), + StdDuration::from_secs(5) + ) { + Ok(stream) => stream, + Err(e) => return Err(anyhow!("Failed to connect: {}", e)), + }; + + // Send SOCKS5 greeting: version 5, 1 method (no auth) + let greeting: [u8; 3] = [0x05, 0x01, 0x00]; // SOCKS5, 1 method, no auth + if let Err(e) = stream.write_all(&greeting) { + return Err(anyhow!("Failed to send greeting: {}", e)); + } + + // Read response + let mut response = [0u8; 2]; + if let Err(e) = stream.read_exact(&mut response) { + return Err(anyhow!("Failed to read response: {}", e)); + } + + // Check response: should be [0x05, 0x00] for no auth required + if response[0] != 0x05 || response[1] != 0x00 { + return Err(anyhow!("Unexpected SOCKS5 response: {:?}", response)); + } + + Ok(()) + }).await; + + match test_result { + Ok(Ok(())) => { + crate::util::logger::log_info(&format!("✓ SOCKS5 proxy on port {} accepts connections", port)).await; + + // Try to get IP through proxy using curl (fallback method) + let curl_result = tokio::process::Command::new("curl") + .args([ + "-s", + "--socks5", &format!("localhost:{}", port), + "--max-time", "10", + "https://checkip.amazonaws.com" + ]) + .output() + .await; + + match curl_result { + Ok(output) if output.status.success() => { + let ip = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if Self::is_valid_ip(&ip) { + crate::util::logger::log_info(&format!("✓ Got IP via proxy: {}", ip)).await; + return Ok(Some(ip)); + } else { + crate::util::logger::log_info(&format!("✓ Proxy works, invalid IP format: {}", ip)).await; + return Ok(None); + } + } + _ => { + // Proxy accepts connections but curl failed - still acceptable + crate::util::logger::log_info(&format!("✓ Proxy accepts connections (curl test failed)")).await; + return Ok(None); + } + } + } + Ok(Err(e)) => { + return Err(anyhow!("SOCKS5 test failed: {}", e)); + } + Err(e) => { + return Err(anyhow!("Task failed: {}", e)); + } + } + } + + /// Clean up a failed container + async fn cleanup_container(container_name: &str) -> Result<()> { + let _ = Command::new("docker") + .args(["stop", container_name]) + .status() + .await; + + let _ = Command::new("docker") + .args(["rm", container_name]) + .status() + .await; + + Ok(()) + } + + fn is_valid_ip(ip: &str) -> bool { + let parts: Vec<&str> = ip.split('.').collect(); + if parts.len() != 4 { + return false; + } + + for part in parts { + if let Ok(num) = part.parse::() { + if part != num.to_string() { + return false; + } + } else { + return false; + } + } + + true + } + + /// Test if a specific proxy is working + pub async fn test_proxy_connection(&self, index: usize) -> Result { + let port = self.proxy_ports[index]; + let proxy_url = format!("socks5://localhost:{}", port); + + let client = reqwest::Client::builder() + .proxy(reqwest::Proxy::all(&proxy_url)?) + .timeout(Duration::from_secs(10)) + .build()?; + + let response = client.get("http://checkip.amazonaws.com") + .send() + .await? + .text() + .await?; + + Ok(response.trim().to_string()) + } + + pub fn get_proxy_url(&self, index: usize) -> String { + let port = self.proxy_ports[index % self.proxy_ports.len()]; + format!("socks5://localhost:{}", port) + } + + pub fn num_proxies(&self) -> usize { + self.proxy_ports.len() + } + + pub async fn shutdown(&self) -> Result<()> { + crate::util::logger::log_info(&format!("Shutting down {} Docker proxy containers...", + self.container_names.len())).await; + + for name in &self.container_names { + let _ = Command::new("docker") + .args(["stop", name]) + .status() + .await; + let _ = Command::new("docker") + .args(["rm", name]) + .status() + .await; + } + Ok(()) + } +} + +pub async fn cleanup_all_proxy_containers() -> Result<()> { + // Step 1: List all container IDs that match our pattern + let output = Command::new("docker") + .args(["ps", "-a", "--format", "{{.ID}} {{.Names}} {{.Image}}"]) + .output() + .await?; + + let stdout = String::from_utf8_lossy(&output.stdout); + + let mut containers_to_kill = Vec::new(); + + for line in stdout.lines() { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 2 { + let name_or_id = parts[0]; + let name = parts[1]; + let image = if parts.len() >= 3 { parts[2] } else { "" }; + + // Match by name prefix OR by image name + if name.starts_with("vpn-proxy-") || image.contains("rust-vpn-proxy") { + containers_to_kill.push(name_or_id.to_string()); + } + } + } + + if containers_to_kill.is_empty() { + crate::util::logger::log_info("No old rust-vpn-proxy containers found").await; + return Ok(()); + } + + // Step 2: Kill and remove them all at once + let status = Command::new("docker") + .arg("rm") + .arg("-f") + .args(&containers_to_kill) + .status() + .await?; + + if status.success() { + crate::util::logger::log_info(&format!( + "Successfully removed {} old rust-vpn-proxy container(s)", + containers_to_kill.len() + )) + .await; + } else { + crate::util::logger::log_warn("Some containers may still remain (non-critical)").await; + } + + Ok(()) +} diff --git a/src/scraper/forcebindip.rs b/src/scraper/forcebindip.rs deleted file mode 100644 index 6b135eb..0000000 --- a/src/scraper/forcebindip.rs +++ /dev/null @@ -1,163 +0,0 @@ -// src/scraper/forcebindip.rs - -use anyhow::{anyhow, Context, Result}; -use std::path::{Path, PathBuf}; -use std::process::Command; - -/// Manages ForceBindIP integration for binding processes to specific IP addresses -pub struct ForceBindIpManager { - forcebindip_path: PathBuf, -} - -impl ForceBindIpManager { - /// Creates a new ForceBindIP manager - /// - /// On Windows, looks for ForceBindIP.exe in common locations or PATH - /// On other platforms, returns an error as ForceBindIP is Windows-only - pub fn new() -> Result { - #[cfg(target_os = "windows")] - { - let possible_paths = vec![ - PathBuf::from("ForceBindIP.exe"), - PathBuf::from("tools/ForceBindIP.exe"), - PathBuf::from("C:/Program Files/ForceBindIP/ForceBindIP.exe"), - PathBuf::from("C:/Program Files (x86)/ForceBindIP/ForceBindIP.exe"), - ]; - - for path in possible_paths { - if path.exists() { - return Ok(Self { - forcebindip_path: path, - }); - } - } - - // Try to find in PATH - if let Ok(output) = Command::new("where").arg("ForceBindIP.exe").output() { - if output.status.success() { - let path_str = String::from_utf8_lossy(&output.stdout); - let path = PathBuf::from(path_str.trim()); - if path.exists() { - return Ok(Self { - forcebindip_path: path, - }); - } - } - } - - Err(anyhow!( - "ForceBindIP.exe not found. Please download from http://r1ch.net/projects/forcebindip \ - and place it in the project directory or add to PATH" - )) - } - - #[cfg(not(target_os = "windows"))] - { - Err(anyhow!( - "ForceBindIP is only available on Windows. For Linux/macOS, consider using \ - network namespaces or other routing mechanisms" - )) - } - } - - /// Creates a command that will run the given program bound to the specified IP - /// - /// # Arguments - /// * `bind_ip` - The IP address to bind to - /// * `program` - Path to the program to execute - /// * `args` - Arguments to pass to the program - /// - /// # Returns - /// A configured Command ready to be spawned - pub fn create_bound_command( - &self, - bind_ip: &str, - program: &Path, - args: &[&str], - ) -> Command { - let mut cmd = Command::new(&self.forcebindip_path); - - // ForceBindIP syntax: ForceBindIP.exe [IP] [program] [args...] - cmd.arg(bind_ip) - .arg(program); - - for arg in args { - cmd.arg(arg); - } - - cmd - } - - /// Verifies that ForceBindIP is working by testing with a simple command - pub async fn verify_installation(&self) -> Result<()> { - #[cfg(target_os = "windows")] - { - // Test by running a simple command - let output = Command::new(&self.forcebindip_path) - .arg("0.0.0.0") - .arg("cmd.exe") - .arg("/c") - .arg("echo test") - .output() - .context("Failed to execute ForceBindIP verification")?; - - if !output.status.success() { - return Err(anyhow!( - "ForceBindIP verification failed. stderr: {}", - String::from_utf8_lossy(&output.stderr) - )); - } - - Ok(()) - } - - #[cfg(not(target_os = "windows"))] - { - Err(anyhow!("ForceBindIP verification not available on non-Windows platforms")) - } - } - - /// Returns the path to the ForceBindIP executable - pub fn path(&self) -> &Path { - &self.forcebindip_path - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - #[cfg(target_os = "windows")] - fn test_forcebindip_manager_creation() { - // This test will only pass if ForceBindIP is actually installed - // In CI/CD, you might want to skip this or mock it - match ForceBindIpManager::new() { - Ok(manager) => { - println!("ForceBindIP found at: {:?}", manager.path()); - } - Err(e) => { - println!("ForceBindIP not found (expected in dev environments): {}", e); - } - } - } - - #[test] - fn test_command_creation() { - #[cfg(target_os = "windows")] - { - if let Ok(manager) = ForceBindIpManager::new() { - let cmd = manager.create_bound_command( - "192.168.1.1", - Path::new("test.exe"), - &["--arg1", "--arg2"], - ); - - // Verify the command is constructed correctly - let cmd_str = format!("{:?}", cmd); - assert!(cmd_str.contains("192.168.1.1")); - assert!(cmd_str.contains("test.exe")); - } - } - } -} \ No newline at end of file diff --git a/src/scraper/install_tap_adapter.ps1 b/src/scraper/install_tap_adapter.ps1 deleted file mode 100644 index f2bf468..0000000 --- a/src/scraper/install_tap_adapter.ps1 +++ /dev/null @@ -1,135 +0,0 @@ -# install_tap_adapters.ps1 -# Installs additional TAP-Windows adapters for parallel OpenVPN connections -# MUST BE RUN AS ADMINISTRATOR - -$ErrorActionPreference = "Stop" - -Write-Host "========================================" -ForegroundColor Cyan -Write-Host "TAP Adapter Installation Script" -ForegroundColor Cyan -Write-Host "========================================" -ForegroundColor Cyan -Write-Host "" - -# Check if running as Administrator -$currentPrincipal = New-Object Security.Principal.WindowsPrincipal([Security.Principal.WindowsIdentity]::GetCurrent()) -$isAdmin = $currentPrincipal.IsInRole([Security.Principal.WindowsBuiltInRole]::Administrator) - -if (-not $isAdmin) { - Write-Host "ERROR: This script must be run as Administrator!" -ForegroundColor Red - Write-Host "" - Write-Host "To run as Administrator:" -ForegroundColor Yellow - Write-Host " 1. Right-click PowerShell" -ForegroundColor Yellow - Write-Host " 2. Select 'Run as Administrator'" -ForegroundColor Yellow - Write-Host " 3. Run this script again" -ForegroundColor Yellow - Write-Host "" - Read-Host "Press Enter to exit" - exit 1 -} - -Write-Host "✓ Running with Administrator privileges" -ForegroundColor Green -Write-Host "" - -# Check for OpenVPN installation -$tapctlPath = "C:\Program Files\OpenVPN\bin\tapctl.exe" - -if (-not (Test-Path $tapctlPath)) { - Write-Host "ERROR: OpenVPN not found!" -ForegroundColor Red - Write-Host "" - Write-Host "Expected location: $tapctlPath" -ForegroundColor Yellow - Write-Host "" - Write-Host "Please install OpenVPN from:" -ForegroundColor Yellow - Write-Host "https://openvpn.net/community-downloads/" -ForegroundColor Cyan - Write-Host "" - Read-Host "Press Enter to exit" - exit 1 -} - -Write-Host "✓ OpenVPN found at: $tapctlPath" -ForegroundColor Green -Write-Host "" - -# Count existing TAP adapters -Write-Host "Checking existing TAP adapters..." -ForegroundColor Cyan -$existingAdapters = Get-NetAdapter | Where-Object { $_.InterfaceDescription -like "*TAP*" } -$existingCount = $existingAdapters.Count - -Write-Host " Found $existingCount existing TAP adapter(s)" -ForegroundColor Yellow - -if ($existingCount -ge 10) { - Write-Host "" - Write-Host "✓ You already have $existingCount TAP adapters (sufficient)" -ForegroundColor Green - Write-Host "" - Read-Host "Press Enter to exit" - exit 0 -} - -Write-Host "" -Write-Host "Installing additional TAP adapters..." -ForegroundColor Cyan -Write-Host " Target: 10 total adapters" -ForegroundColor Yellow -Write-Host " To install: $(10 - $existingCount) adapters" -ForegroundColor Yellow -Write-Host "" - -$targetCount = 10 -$successCount = 0 -$failCount = 0 - -for ($i = ($existingCount + 1); $i -le $targetCount; $i++) { - $adapterName = "OpenVPN-TAP-$i" - Write-Host "[$i/$targetCount] Creating $adapterName..." -ForegroundColor Cyan - - try { - $output = & $tapctlPath create --name $adapterName 2>&1 - - if ($LASTEXITCODE -eq 0) { - Write-Host " ✓ Successfully created $adapterName" -ForegroundColor Green - $successCount++ - } else { - Write-Host " ⚠ Failed to create $adapterName (exit code: $LASTEXITCODE)" -ForegroundColor Red - Write-Host " Output: $output" -ForegroundColor Gray - $failCount++ - } - } catch { - Write-Host " ✗ Error creating $adapterName : $_" -ForegroundColor Red - $failCount++ - } - - # Small delay to prevent resource conflicts - Start-Sleep -Milliseconds 500 -} - -Write-Host "" -Write-Host "========================================" -ForegroundColor Cyan -Write-Host "Installation Summary" -ForegroundColor Cyan -Write-Host "========================================" -ForegroundColor Cyan -Write-Host " Successfully created: $successCount adapter(s)" -ForegroundColor Green -Write-Host " Failed: $failCount adapter(s)" -ForegroundColor $(if ($failCount -gt 0) { "Red" } else { "Gray" }) -Write-Host "" - -# Verify final count -Write-Host "Verifying installation..." -ForegroundColor Cyan -Start-Sleep -Seconds 2 - -$finalAdapters = Get-NetAdapter | Where-Object { $_.InterfaceDescription -like "*TAP*" } -$finalCount = $finalAdapters.Count - -Write-Host "" -Write-Host "Total TAP adapters now: $finalCount" -ForegroundColor $(if ($finalCount -ge 10) { "Green" } else { "Yellow" }) -Write-Host "" - -if ($finalCount -ge 10) { - Write-Host "✓ Installation complete! You now have sufficient TAP adapters." -ForegroundColor Green - Write-Host " You can now run up to $(($finalCount * 3/4)) VPN connections in parallel." -ForegroundColor Cyan -} elseif ($finalCount -gt $existingCount) { - Write-Host "⚠ Partial success. Added $(($finalCount - $existingCount)) adapter(s)." -ForegroundColor Yellow - Write-Host " You can run up to $(($finalCount * 3/4)) VPN connections in parallel." -ForegroundColor Cyan - Write-Host " Consider running this script again if you need more." -ForegroundColor Yellow -} else { - Write-Host "✗ No adapters were added. Check error messages above." -ForegroundColor Red -} - -Write-Host "" -Write-Host "Adapter List:" -ForegroundColor Cyan -$finalAdapters | ForEach-Object { - Write-Host " • $($_.Name) ($($_.InterfaceDescription))" -ForegroundColor Gray -} - -Write-Host "" -Read-Host "Press Enter to exit" \ No newline at end of file diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index d60afcc..1c0f399 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -1,5 +1,2 @@ pub mod webdriver; -pub mod vpn_manager; - -#[cfg(target_os = "windows")] -pub mod forcebindip; \ No newline at end of file +pub mod docker_vpn_proxy; \ No newline at end of file diff --git a/src/scraper/vpn_manager.rs b/src/scraper/vpn_manager.rs deleted file mode 100644 index f1eb9b5..0000000 --- a/src/scraper/vpn_manager.rs +++ /dev/null @@ -1,1422 +0,0 @@ -// src/scraper/vpn_manager.rs - -use anyhow::{Context, Result, anyhow}; -use fantoccini::client; -use serde_json; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use tokio::process::{Child, Command}; -use tokio::sync::Mutex; -use tokio::time::{sleep, timeout, Duration}; -use std::process::Stdio; -use reqwest::Client; -use tokio::fs as tokio_fs; -use std::time::{SystemTime, UNIX_EPOCH}; - -use crate::logger; - -/// Represents a single OpenVPN connection with its associated state -pub struct VpnInstance { - /// The OpenVPN process handle - process: Option, - /// Path to the .ovpn configuration file - config_path: PathBuf, - /// The external IP address assigned by this VPN - external_ip: Option, - /// Baseline (pre-VPN) external IP for verification - baseline_ip: Option, - /// Hostname derived from the config file (e.g., "ca149.vpnbook.com") - hostname: String, - /// Number of tasks completed in the current session - tasks_completed: usize, - /// VPN credentials - username: String, - password: String, - /// Health status - is_healthy: bool, - /// Path to credentials file created for this instance (if any) - cred_path: Option, - /// Path to temporary modified .ovpn file for this instance (if any) - temp_config_path: Option, - /// Path to OpenVPN log file created for this instance (if any) - log_path: Option, -} - -impl VpnInstance { - /// Creates a new VPN instance without starting the connection - pub fn new( - config_path: PathBuf, - username: String, - password: String, - baseline_ip: Option, - ) -> Result { - // Use the file stem (filename without extension) as the hostname/identifier. - // This avoids using the parent directory name which can be the same - // for many configs and cause collisions when copying into config-auto. - let hostname = config_path - .file_stem() - .and_then(|n| n.to_str()) - .unwrap_or("unknown") - .to_string(); - - Ok(Self { - process: None, - config_path, - external_ip: None, - baseline_ip, - hostname, - tasks_completed: 0, - username, - password, - is_healthy: false, - cred_path: None, - temp_config_path: None, - log_path: None, - }) - } - - /// Starts the OpenVPN connection and detects the assigned IP - pub async fn connect(&mut self) -> Result<()> { - crate::util::logger::log_info(&format!("Starting VPN connection for {}", self.hostname)).await; - // Create fixed config first - //let fixed_config_path = self.create_fixed_config().await - // .context("Failed to create fixed OpenVPN config")?; - - // Store the temp config path so we can clean it up later - self.temp_config_path = Some(self.config_path.clone()); - let cred_file = self.create_credentials_file().await?; - self.cred_path = Some(cred_file.clone()); - - // Use baseline IP supplied when pool was created, otherwise detect once - let baseline_ip = if self.baseline_ip.is_some() { - self.baseline_ip.clone() - } else { - let b = self.detect_external_ip().await.ok().flatten(); - if let Some(ref ip) = b { - crate::util::logger::log_info(&format!("Baseline IP (before VPN): {}", ip)).await; - } - b - }; - self.baseline_ip = baseline_ip; - - #[cfg(target_os = "windows")] - { - // Windows: Spawn individual openvpn.exe process for this instance - let temp_dir = std::env::temp_dir(); - let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos(); - let instance_config = temp_dir.join(format!("ovpn_{}_{}.ovpn", self.hostname, nanos)); - let log_path = temp_dir.join(format!("openvpn_{}_{}.log", self.hostname, nanos)); - - // Copy and modify the .ovpn file to include auth-user-pass (async) - let ovpn_content = tokio_fs::read_to_string(&self.config_path).await?; - let mut modified_content = ovpn_content; - - // Remove existing auth-user-pass lines if present - modified_content = modified_content - .lines() - .filter(|line| !line.trim().starts_with("auth-user-pass")) - .collect::>() - .join("\n"); - - // Add auth-user-pass pointing to credentials file at the end - modified_content.push('\n'); - let cred_path_str = cred_file.to_string_lossy().replace('\\', "/"); - modified_content.push_str(&format!("auth-user-pass \"{}\"\n", cred_path_str)); - - tokio_fs::write(&instance_config, modified_content).await?; - self.temp_config_path = Some(instance_config.clone()); - self.log_path = Some(log_path.clone()); - - // Verify OpenVPN executable exists - let openvpn_exe = r"C:\Program Files\OpenVPN\bin\openvpn.exe"; - if !std::path::Path::new(openvpn_exe).exists() { - return Err(anyhow!( - "OpenVPN executable not found at: {}\nPlease install OpenVPN from: https://openvpn.net/community-downloads/", - openvpn_exe - )); - } - - // Spawn openvpn.exe process directly - let mut cmd = Command::new(openvpn_exe); - cmd.arg("--config") - .arg(&instance_config) - .arg("--log") - .arg(&log_path) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - crate::util::logger::log_info(&format!( - "Spawning OpenVPN: {} --config {} --log {}", - openvpn_exe, - instance_config.display(), - log_path.display() - )).await; - - let mut process = cmd - .spawn() - .context("Failed to spawn openvpn.exe. Ensure OpenVPN is installed.")?; - - // Log OpenVPN stdout in background - if let Some(stdout) = process.stdout.take() { - let hostname = self.hostname.clone(); - tokio::spawn(async move { - use tokio::io::AsyncBufReadExt; - let mut reader = tokio::io::BufReader::new(stdout).lines(); - while let Ok(Some(line)) = reader.next_line().await { - crate::util::logger::log_info(&format!("OpenVPN-OUT [{}]: {}", hostname, line)).await; - } - }); - } - - // Store process handle - self.process = Some(process); - - // Wait for OpenVPN to initialize and verify connection - // Initial wait: OpenVPN takes ~8-10 seconds typically - crate::util::logger::log_info(&format!( - "Waiting for OpenVPN initialization ({})", - self.hostname - )).await; - - tokio::time::sleep(Duration::from_secs(40)).await; - - for i in 0..3 { - if self.scan_openvpn_logs().await? { - break; - } - crate::util::logger::log_info("OpenVPN not initialized yet, waiting 10 seconds...").await; - tokio::time::sleep(Duration::from_secs(10)).await; - } - - // Now run comprehensive verification with retries built-in - match self.verify_vpn_connection().await { - Ok(true) => { - self.is_healthy = true; - crate::util::logger::log_info(&format!( - "✓ VPN {} connected successfully with IP: {}", - self.hostname, - self.external_ip.as_ref().unwrap_or(&"unknown".to_string()) - )).await; - return Ok(()); - } - Ok(false) => { - // Verification failed - kill process - if let Some(mut p) = self.process.take() { - let _ = p.kill().await; - } - return Err(anyhow!( - "VPN connection verification failed for {}", - self.hostname - )); - } - Err(e) => { - // Error during verification - if let Some(mut p) = self.process.take() { - let _ = p.kill().await; - } - return Err(anyhow!( - "VPN verification error for {}: {}", - self.hostname, - e - )); - } - } - } - - #[cfg(not(target_os = "windows"))] - { - // Non-Windows implementation - let mut cmd = Command::new("openvpn"); - cmd.arg("--config") - .arg(&self.config_path) - .arg("--auth-user-pass") - .arg(&cred_file) - .arg("--daemon") - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - match cmd.spawn() { - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - return Err(anyhow!( - "OpenVPN not found. Please install it:\n \ - Linux: sudo apt-get install openvpn\n \ - macOS: brew install openvpn\n \ - Error: {}", e - )); - } - Err(e) => { - return Err(anyhow!("Failed to spawn OpenVPN: {}", e)); - } - Ok(mut process) => { - // Capture output for monitoring - if let Some(stdout) = process.stdout.take() { - let hostname = self.hostname.clone(); - tokio::spawn(async move { - use tokio::io::AsyncBufReadExt; - let mut reader = tokio::io::BufReader::new(stdout).lines(); - while let Ok(Some(line)) = reader.next_line().await { - crate::util::logger::log_info(&format!("[VPN-{}] {}", hostname, line)).await; - } - }); - } - - self.process = Some(process); - } - } - - // Wait for connection and verify (Unix) - sleep(Duration::from_secs(10)).await; - - match self.verify_vpn_connection().await { - Ok(true) => { - self.is_healthy = true; - crate::util::logger::log_info(&format!( - "✓ VPN {} connected successfully", - self.hostname - )).await; - return Ok(()); - } - Ok(false) => { - self.disconnect().await?; - return Err(anyhow!( - "Failed to verify VPN connection for {}", - self.hostname - )); - } - Err(e) => { - self.disconnect().await?; - return Err(e); - } - } - } - } - - async fn verify_vpn_connection(&mut self) -> Result { - logger::log_info(&format!("Verifying VPN connection for {}", self.hostname)).await; - - // 1. Quick process check - if self.process.is_none() { - logger::log_warn("Process check failed - no process").await; - return Ok(false); - } - - // 2. Check logs for success markers (with retries for file buffering) - let mut log_success = false; - for attempt in 1..=5 { - tokio::time::sleep(Duration::from_secs(2)).await; // Wait for log flush - - match self.scan_openvpn_logs().await { - Ok(true) => { - log_success = true; - logger::log_info(&format!( - "✓ Log verification passed (attempt {})", - attempt - )).await; - break; - } - Ok(false) => { - if attempt < 5 { - logger::log_info(&format!( - "Log verification pending (attempt {}/5)...", - attempt - )).await; - } - } - Err(e) => { - logger::log_warn(&format!("Log read error: {}", e)).await; - } - } - } - - if !log_success { - logger::log_warn("Log verification failed after 5 attempts").await; - return Ok(false); - } - - // 3. Wait for routes to be applied (happens ~2 seconds after "Initialization Sequence Completed") - logger::log_info("Waiting for routes to stabilize...").await; - tokio::time::sleep(Duration::from_secs(3)).await; - - // 4. Network route verification (with retries) - let mut route_ok = false; - for attempt in 1..=3 { - if self.check_routing_table().await.unwrap_or(false) { - route_ok = true; - logger::log_info(&format!( - "✓ Route verification passed (attempt {})", - attempt - )).await; - break; - } - - if attempt < 3 { - logger::log_info(&format!( - "Route verification pending (attempt {}/3)...", - attempt - )).await; - tokio::time::sleep(Duration::from_secs(2)).await; - } - } - - if !route_ok { - logger::log_warn("Route verification failed - routes may not be established").await; - // Don't fail yet - some VPNs work without perfect routing table state - } - - // 5. External IP change verification (with retries and longer timeout) - logger::log_info("Verifying IP change...").await; - for attempt in 1..=3 { - match timeout(Duration::from_secs(8), self.verify_ip_change()).await { - Ok(Ok(true)) => { - let ip = self.external_ip.as_ref().unwrap(); - logger::log_info(&format!( - "✓ VPN connection verified - New IP: {} (attempt {})", - ip, attempt - )).await; - return Ok(true); - } - Ok(Ok(false)) => { - if attempt < 3 { - logger::log_info(&format!( - "IP verification pending (attempt {}/3)...", - attempt - )).await; - tokio::time::sleep(Duration::from_secs(2)).await; - } - } - Ok(Err(e)) => { - logger::log_warn(&format!("IP check error: {}", e)).await; - } - Err(_) => { - logger::log_warn("IP check timeout").await; - } - } - } - - logger::log_error(&format!( - "✗ VPN connection verification failed for {} - IP unchanged after 3 attempts", - self.hostname - )).await; - Ok(false) - } - - // Improved routing table check with better Windows detection - async fn check_routing_table(&self) -> Result { - #[cfg(target_os = "windows")] - { - use std::process::Command; - - let output = Command::new("cmd") - .args(&["/C", "route", "print", "0.0.0.0"]) - .output() - .context("Failed to execute route print")?; - - if !output.status.success() { - return Ok(false); - } - - let output_str = String::from_utf8_lossy(&output.stdout); - - // Check for VPN-related routing entries - // Look for: TAP adapter in routes, or 10.x.x.x destinations (common VPN subnets) - let has_vpn_routes = output_str.contains("TAP") - || output_str.contains("10.8.0") // Common OpenVPN subnet - || output_str.contains("10.9.0") // Another common subnet - || output_str.to_lowercase().contains("openvpn"); - - // Also check for 0.0.0.0/128.0.0.0 split routing (redirect-gateway) - let has_redirect = output_str.contains("128.0.0.0") - && output_str.matches("0.0.0.0").count() >= 2; - - let result = has_vpn_routes || has_redirect; - - if result { - logger::log_info("✓ Routing table shows VPN routes").await; - } else { - logger::log_warn("✗ No VPN routes detected in routing table").await; - logger::log_info(&format!("Route output preview: {}", - &output_str.lines().take(10).collect::>().join("\n") - )).await; - } - - Ok(result) - } - - #[cfg(not(target_os = "windows"))] - { - // For non-Windows, just check if we can detect IP change - // (routing is less critical to verify on Unix) - Ok(true) - } - } - - // Improved log scanning with better error handling - async fn scan_openvpn_logs(&self) -> Result { - #[cfg(target_os = "windows")] - { - if let Some(ref log_path) = self.log_path { - // Try to read the file, but handle "not found" gracefully - let content = match tokio::fs::read_to_string(log_path).await { - Ok(c) => c, - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - // Log file doesn't exist yet - return Ok(false); - } - Err(e) => { - return Err(anyhow::anyhow!("Failed to read log file: {}", e)); - } - }; - - // Success marker - if content.contains("Initialization Sequence Completed") { - // Check for fatal errors that might have occurred after success - if content.contains("Exiting due to fatal error") { - return Ok(false); - } - - // Check for excessive retries/errors - let error_count = content.matches("TLS Error").count() - + content.matches("Connection reset").count() - + content.matches("SIGTERM").count() - + content.matches("Restart pause").count(); - - if error_count > 3 { - logger::log_warn(&format!( - "Logs show {} errors despite initialization complete", - error_count - )).await; - return Ok(false); - } - - return Ok(true); - } - - // Check for immediate fatal errors - if content.contains("Cannot open TUN/TAP dev") - || content.contains("Exiting due to fatal error") - || content.contains("AUTH: Received control message: AUTH_FAILED") { - return Ok(false); - } - } - } - - Ok(false) - } - - // Improved IP verification with better error handling - async fn verify_ip_change(&mut self) -> Result { - //let detected_ip = self.detect_external_ip().await?; - - let new_ip = self - .detect_external_ip() - .await - .context("Failed to query external IP after VPN connection")? - .with_context(|| "VPN connected, but no external IP received – tunnel broken?")?; - - // Compare with baseline - if let Some(baseline) = &self.baseline_ip { - if &new_ip == baseline { - logger::log_warn(&format!( - "IP unchanged from baseline: {} (VPN may not be routing traffic)", - baseline - )).await; - return Ok(false); - } - - logger::log_info(&format!( - "IP changed: {} → {} ✓", - baseline, new_ip - )).await; - } else { - logger::log_info(&format!("Detected IP: {}", new_ip)).await; - } - - self.external_ip = Some(new_ip); - Ok(true) - } - - /// Creates a fixed version of the OpenVPN config to work with modern OpenVPN - async fn create_fixed_config(&self) -> Result { - // Read the original config - let content = tokio::fs::read_to_string(&self.config_path) - .await - .context("Failed to read OpenVPN config")?; - - // Create a temporary file for the fixed config - let temp_dir = std::env::temp_dir(); - let temp_config_path = temp_dir.join(format!("fixed_{}.ovpn", - self.hostname.replace(|c: char| !c.is_alphanumeric(), "_"))); - - let mut fixed_lines = Vec::new(); - let mut has_data_ciphers = false; - let mut has_compression_setting = false; - - // Process each line - for line in content.lines() { - let trimmed = line.trim(); - - if trimmed.starts_with("cipher ") { - // Skip old cipher line, we'll add data-ciphers later if needed - continue; - } else if trimmed.starts_with("data-ciphers") { - has_data_ciphers = true; - fixed_lines.push(line.to_string()); - } else if trimmed.contains("allow-compression") { - has_compression_setting = true; - fixed_lines.push(line.to_string()); - } else if trimmed.starts_with(";") || trimmed.starts_with("#") { - // Keep comments - fixed_lines.push(line.to_string()); - } else if !trimmed.is_empty() { - fixed_lines.push(line.to_string()); - } - } - - // Add modern cipher suite if missing - if !has_data_ciphers { - fixed_lines.push("data-ciphers AES-256-GCM:AES-128-GCM:CHACHA20-POLY1305".to_string()); - } - - // Add compression setting if missing - if !has_compression_setting { - fixed_lines.push("allow-compression no".to_string()); - } - - // Remove any duplicate 'auth-user-pass' lines (we'll add our own later) - fixed_lines.retain(|line| !line.trim().starts_with("auth-user-pass")); - - // Write the fixed config - let fixed_content = fixed_lines.join("\n"); - tokio::fs::write(&temp_config_path, fixed_content) - .await - .context("Failed to write fixed OpenVPN config")?; - - Ok(temp_config_path) - } - - - /// Disconnects the VPN connection - pub async fn disconnect(&mut self) -> Result<()> { - #[cfg(target_os = "windows")] - { - // Kill the openvpn.exe process if it exists - if let Some(mut process) = self.process.take() { - crate::util::logger::log_info(&format!("Disconnecting VPN {}", self.hostname)).await; - let _ = process.kill().await; - - // Clean up temp config file and log file if present - if let Some(ref p) = self.temp_config_path { - let _ = tokio_fs::remove_file(p).await; - } - if let Some(ref lp) = self.log_path { - let _ = tokio_fs::remove_file(lp).await; - } - } - - // Remove credential file if present - if let Some(ref cred) = self.cred_path { - let _ = tokio_fs::remove_file(cred).await; - } - - self.external_ip = None; - self.is_healthy = false; - self.tasks_completed = 0; - Ok(()) - } - #[cfg(not(target_os = "windows"))] - { - if let Some(mut process) = self.process.take() { - crate::util::logger::log_info(&format!("Disconnecting VPN {}", self.hostname)).await; - process.kill().await.context("Failed to kill OpenVPN process")?; - self.external_ip = None; - self.is_healthy = false; - self.tasks_completed = 0; - } - // Remove credentials file if present - if let Some(ref cred) = self.cred_path { - let _ = tokio_fs::remove_file(cred).await; - } - Ok(()) - } - } - - /// Reconnects the VPN (disconnect + connect) - pub async fn reconnect(&mut self) -> Result<()> { - crate::util::logger::log_info(&format!("Reconnecting VPN {}", self.hostname)).await; - self.disconnect().await?; - sleep(Duration::from_secs(10)).await; // Brief delay between disconnect and reconnect - self.connect().await - } - - /// Ermittelt die aktuelle externe IPv4-Adresse über ipify.org. - /// - /// Nutzt einen wiederverwendbaren reqwest-Client aus dem Pool (vermutlich vorhandenen) - /// shared Client-Pool oder erstellt einen mit sinnvollen Defaults. - /// - /// # Returns - /// - `Ok(Some(ip))` bei erfolgreicher Erkennung - /// - `Ok(None)` wenn der Server antwortet, aber kein gültiger IP-String kommt - /// - `Err(_)` bei Netzwerk-, Timeout- oder Parse-Fehlern (wird mit Kontext angereichert) - async fn detect_external_ip(&self) -> anyhow::Result> { - // Empfohlen: Nutze einen shared Client (z. B. aus Arc) - // Falls du keinen hast, ist das hier immer noch besser als jedes Mal neu bauen: - static CLIENT: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { - reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(8)) - .user_agent("my-scraper/1.0") - .build() - .expect("Failed to build static reqwest client") - }); - - let resp = CLIENT - .get("https://api.ipify.org?format=json") - .send() - .await - .context("Failed to reach ipify.org – no internet or DNS issue")?; - - // 4xx/5xx → Fehler - let resp = resp - .error_for_status() - .context("ipify.org returned error status")?; - - #[derive(serde::Deserialize)] - struct IpResponse { - ip: String, - } - - let ip_obj: IpResponse = resp - .json() - .await - .context("Failed to parse JSON from ipify.org")?; - - // Einfache Validierung: sieht nach IPv4 aus? - if ip_obj.ip.contains('.') && !ip_obj.ip.is_empty() { - Ok(Some(ip_obj.ip)) - } else { - Ok(None) - } - } - - /// Performs a simple health check without reconnection - pub async fn health_check(&mut self) -> Result { - if self.process.is_none() { - self.is_healthy = false; - return Ok(false); - } - - match self.detect_external_ip().await { - Ok(Some(_)) => { - self.is_healthy = true; - Ok(true) - } - _ => { - self.is_healthy = false; - Ok(false) - } - } - } - - /// Performs a health check and automatically reconnects if failed - pub async fn health_check_with_reconnect(&mut self) -> Result { - if self.process.is_none() { - self.is_healthy = false; - return Ok(false); - } - - // Try to detect IP again - match self.detect_external_ip().await { - Ok(Some(ip)) => { - if let Some(ref current_ip) = self.external_ip { - if &ip != current_ip { - crate::util::logger::log_warn(&format!( - "VPN {} IP changed unexpectedly: {} -> {}", - self.hostname, current_ip, ip - )).await; - self.external_ip = Some(ip); - } - } - self.is_healthy = true; - Ok(true) - } - _ => { - crate::util::logger::log_warn(&format!( - "Health check failed for VPN {}, attempting reconnect...", - self.hostname - )).await; - self.is_healthy = false; - - // Attempt automatic reconnection - match self.reconnect().await { - Ok(()) => { - crate::util::logger::log_info(&format!( - "✓ VPN {} reconnected successfully", - self.hostname - )).await; - Ok(true) - } - Err(e) => { - crate::util::logger::log_error(&format!( - "✗ VPN {} reconnection failed: {}", - self.hostname, e - )).await; - Ok(false) - } - } - } - } - } - - /// Creates a temporary credentials file for OpenVPN authentication - async fn create_credentials_file(&self) -> Result { - let temp_dir = std::env::temp_dir(); - let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos(); - let cred_path = temp_dir.join(format!("vpn_creds_{}_{}.txt", self.hostname, nanos)); - - let content = format!("{}\n{}\n", self.username, self.password); - tokio::fs::write(&cred_path, content) - .await - .context("Failed to write credentials file")?; - - Ok(cred_path) - } - - /// Increments the task counter and returns whether rotation is needed - pub fn increment_task_count(&mut self, tasks_per_session: usize) -> bool { - self.tasks_completed += 1; - if tasks_per_session > 0 && self.tasks_completed >= tasks_per_session { - true - } else { - false - } - } - - /// Returns the external IP if connected - pub fn external_ip(&self) -> Option<&str> { - self.external_ip.as_deref() - } - - /// Returns the hostname - pub fn hostname(&self) -> &str { - &self.hostname - } - - /// Returns whether the VPN is healthy - pub fn is_healthy(&self) -> bool { - self.is_healthy - } - - /// Resets the task counter - pub fn reset_task_count(&mut self) { - self.tasks_completed = 0; - } -} - - -impl Drop for VpnInstance { - fn drop(&mut self) { - if let Some(mut process) = self.process.take() { - let _ = process.start_kill(); - } - } -} - -/// Manages a pool of VPN instances for rotation -pub struct VpnPool { - instances: Vec>>, - current_index: Arc>, - enable_rotation: bool, - tasks_per_session: usize, -} - -impl VpnPool { - /// Creates a new VPN pool from .ovpn configuration files - /// Automatically ensures sufficient TAP adapters are installed - pub async fn new( - ovpn_dir: &Path, - username: String, - password: String, - enable_rotation: bool, - tasks_per_session: usize, - amount_of_openvpn_servers: usize, - ) -> Result { - // STEP 1: Ensure we have enough TAP adapters (auto-install if needed) - #[cfg(target_os = "windows")] - { - crate::util::logger::log_info("=== TAP Adapter Check ===").await; - let _ = Self::ensure_tap_adapters(amount_of_openvpn_servers).await?; - crate::util::logger::log_info("=== TAP Adapter Check Complete ===").await; - } - - // STEP 2: Continue with normal VPN pool initialization - let mut ovpn_files = Vec::new(); - - // Recursively find all .ovpn files - let mut entries = tokio::fs::read_dir(ovpn_dir).await?; - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - - if path.is_dir() { - let mut sub_entries = tokio::fs::read_dir(&path).await?; - while let Some(sub_entry) = sub_entries.next_entry().await? { - let sub_path = sub_entry.path(); - if sub_path.extension().and_then(|s| s.to_str()) == Some("ovpn") - && is_tcp_443_config(&sub_path) { - ovpn_files.push(sub_path); - } - } - } else if path.extension().and_then(|s| s.to_str()) == Some("ovpn") - && is_tcp_443_config(&path) { - ovpn_files.push(path); - } - } - - // Deduplicate configs by server - let mut unique = Vec::new(); - use std::collections::HashSet; - let mut seen: HashSet = HashSet::new(); - for cfg in ovpn_files { - match tokio_fs::read_to_string(&cfg).await { - Ok(contents) => { - let mut server_key = None; - for line in contents.lines() { - let l = line.trim(); - if l.starts_with('#') || l.is_empty() { continue; } - if l.starts_with("remote ") { - let parts: Vec<&str> = l.split_whitespace().collect(); - if parts.len() >= 2 { - let host = parts[1]; - let port = if parts.len() >= 3 { parts[2] } else { "" }; - let key = if port.is_empty() { - host.to_string() - } else { - format!("{}:{}", host, port) - }; - server_key = Some(key); - break; - } - } - } - let key = server_key.unwrap_or_else(|| { - cfg.file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("unknown") - .to_string() - }); - if !seen.contains(&key) { - seen.insert(key); - unique.push(cfg); - } - } - Err(_) => { - unique.push(cfg); - } - } - } - ovpn_files = unique; - - if ovpn_files.is_empty() { - return Err(anyhow!("No .ovpn files found in {:?}", ovpn_dir)); - } - - crate::util::logger::log_info(&format!( - "Found {} OpenVPN configurations", - ovpn_files.len() - )).await; - - // Build shared HTTP client - let client = reqwest::Client::builder() - .timeout(Duration::from_secs(5)) - .pool_max_idle_per_host(8) - .build() - .context("Failed to build HTTP client for IP detection")?; - - // Detect baseline IP - let baseline_ip: Option = match client - .get("https://api.ipify.org?format=json") - .send() - .await - { - Ok(resp) => { - if resp.status().is_success() { - if let Ok(json) = resp.json::().await { - json.get("ip") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - } else { None } - } else { None } - } - Err(_) => None, - }; - - if let Some(ref ip) = baseline_ip { - crate::util::logger::log_info(&format!("Baseline IP for pool: {}", ip)).await; - } - - // Create VPN instances - let mut instances = Vec::new(); - for config_path in ovpn_files { - let instance = VpnInstance::new( - config_path, - username.clone(), - password.clone(), - baseline_ip.clone(), - )?; - instances.push(Arc::new(Mutex::new(instance))); - } - - Ok(Self { - instances, - current_index: Arc::new(Mutex::new(0)), - enable_rotation, - tasks_per_session, - }) - } - - /// Connects all VPN instances in parallel (much faster than sequential) - pub async fn connect_all(&self) -> Result<()> { - crate::util::logger::log_info("Connecting all VPN instances in batches...").await; - - let start_time = std::time::Instant::now(); - - // Auto-detect optimal batch size based on available TAP adapters - let batch_size = Self::detect_optimal_batch_size().await; - - let mut connected = 0; - let mut failed = 0; - - // Process instances in batches - for (batch_num, chunk) in self.instances.chunks(batch_size).enumerate() { - crate::util::logger::log_info(&format!( - "Starting batch {}/{} ({} VPNs)...", - batch_num + 1, - (self.instances.len() + batch_size - 1) / batch_size, - chunk.len() - )).await; - - // Spawn parallel tasks for this batch - let mut tasks = Vec::new(); - for (i, instance) in chunk.iter().enumerate() { - let instance_clone = Arc::clone(instance); - let global_index = batch_num * batch_size + i; - - let task = tokio::spawn(async move { - let mut inst = instance_clone.lock().await; - let hostname = inst.hostname().to_string(); - - match inst.connect().await { - Ok(_) => { - let ip = inst.external_ip().unwrap_or("unknown"); - crate::util::logger::log_info(&format!( - "✓ VPN {} ({}) connected with IP: {}", - global_index + 1, hostname, ip - )).await; - Ok(()) - } - Err(e) => { - crate::util::logger::log_warn(&format!( - "⚠ Failed to connect VPN {} ({}): {}", - global_index + 1, hostname, e - )).await; - Err(e) - } - } - }); - tasks.push(task); - } - - // Wait for this batch to complete - let results = futures::future::join_all(tasks).await; - - for result in results { - match result { - Ok(Ok(_)) => connected += 1, - Ok(Err(_)) | Err(_) => failed += 1, - } - } - - // Small delay between batches to let TAP adapters stabilize - if batch_num < (self.instances.len() + batch_size - 1) / batch_size - 1 { - tokio::time::sleep(Duration::from_secs(2)).await; - } - } - - let elapsed = start_time.elapsed(); - - if connected == 0 && failed > 0 { - crate::util::logger::log_error(&format!( - "✗ FATAL: All {} VPN connection attempts failed in {:.1}s. Make sure OpenVPN is installed:\n Windows: https://openvpn.net/community-downloads/\n Linux: sudo apt-get install openvpn\n macOS: brew install openvpn", - failed, elapsed.as_secs_f64() - )).await; - return Err(anyhow!("Failed to connect any VPN instances. OpenVPN may not be installed.")); - } else if failed > 0 { - crate::util::logger::log_warn(&format!( - "⚠ Connected {} VPN instances successfully, {} failed in {:.1}s (will use available instances)", - connected, failed, elapsed.as_secs_f64() - )).await; - } else { - crate::util::logger::log_info(&format!( - "✓ All {} VPN instances connected successfully in {:.1}s", - connected, elapsed.as_secs_f64() - )).await; - } - - Ok(()) - } - - /// Gets the next available VPN instance (round-robin) - pub async fn acquire(&self) -> Result>> { - let mut index = self.current_index.lock().await; - let instance = self.instances[*index % self.instances.len()].clone(); - *index += 1; - Ok(instance) - } - - /// Rotates a VPN instance if rotation is enabled and threshold is met - pub async fn rotate_if_needed(&self, instance: Arc>) -> Result<()> { - if !self.enable_rotation { - return Ok(()); - } - - let mut inst = instance.lock().await; - if inst.increment_task_count(self.tasks_per_session) { - crate::util::logger::log_info(&format!( - "Task threshold reached for VPN {}, rotating...", - inst.hostname() - )).await; - - let old_ip = inst.external_ip().map(|s| s.to_string()); - inst.reconnect().await?; - let new_ip = inst.external_ip().map(|s| s.to_string()); - - match (old_ip, new_ip) { - (Some(old), Some(new)) if old != new => { - crate::util::logger::log_info(&format!( - "✓ VPN {} rotated: {} -> {}", - inst.hostname(), old, new - )).await; - } - (Some(old), Some(new)) if old == new => { - crate::util::logger::log_warn(&format!( - "⚠ VPN {} reconnected but IP unchanged: {}", - inst.hostname(), old - )).await; - } - _ => { - crate::util::logger::log_error(&format!( - "✗ VPN {} rotation verification failed", - inst.hostname() - )).await; - } - } - - inst.reset_task_count(); - } - - Ok(()) - } - - /// Performs health checks on all VPN instances with automatic reconnection - pub async fn health_check_all_with_reconnect(&self) -> Result<()> { - for instance in &self.instances { - let mut inst = instance.lock().await; - let _ = inst.health_check_with_reconnect().await; - } - Ok(()) - } - - /// Returns the number of VPN instances - pub fn len(&self) -> usize { - self.instances.len() - } - - /// Disconnects all VPN instances - pub async fn disconnect_all(&self) -> Result<()> { - crate::util::logger::log_info("Disconnecting all VPN instances...").await; - for instance in &self.instances { - let mut inst = instance.lock().await; - inst.disconnect().await?; - } - Ok(()) - } - - /// Amount of taps adapters currently installed - /// One VPN connection requires one TAP adapter - pub async fn ensure_tap_adapters(amount_of_openvpn_servers: usize) -> Result { - #[cfg(target_os = "windows")] - { - use std::fs; - - crate::util::logger::log_info("Checking TAP adapter availability...").await; - - let tap_count = Self::count_tap_adapters().await?; - - if tap_count >= amount_of_openvpn_servers { - crate::util::logger::log_info(&format!( - "✓ Found {} TAP adapters - sufficient for parallel VPN connections", - tap_count - )).await; - return Ok(tap_count); - } - - crate::util::logger::log_warn(&format!( - "⚠ Only {} TAP adapter(s) found. Installing {} more for optimal performance...", - tap_count, - amount_of_openvpn_servers - tap_count - )).await; - - // Create PowerShell script - let script_content = format!( - r#"# Auto-generated TAP adapter installation script -# Requires Administrator privileges - -$ErrorActionPreference = "Stop" - -# Check if running as Administrator -$currentPrincipal = New-Object Security.Principal.WindowsPrincipal([Security.Principal.WindowsIdentity]::GetCurrent()) -$isAdmin = $currentPrincipal.IsInRole([Security.Principal.WindowsBuiltInRole]::Administrator) - -if (-not $isAdmin) {{ - Write-Host "ERROR: This script must be run as Administrator!" -ForegroundColor Red - Write-Host "Right-click PowerShell and select 'Run as Administrator'" -ForegroundColor Yellow - exit 1 -}} - -Write-Host "Installing additional TAP adapters..." -ForegroundColor Cyan - -$tapctlPath = "C:\Program Files\OpenVPN\bin\tapctl.exe" - -if (-not (Test-Path $tapctlPath)) {{ - Write-Host "ERROR: OpenVPN not found at: $tapctlPath" -ForegroundColor Red - Write-Host "Please install OpenVPN from: https://openvpn.net/community-downloads/" -ForegroundColor Yellow - exit 1 -}} - -$existingCount = {} -$targetCount = 10 - -for ($i = ($existingCount + 1); $i -le $targetCount; $i++) {{ - Write-Host "Creating TAP adapter #$i..." -ForegroundColor Yellow - - try {{ - & $tapctlPath create --name "OpenVPN-TAP-$i" - - if ($LASTEXITCODE -eq 0) {{ - Write-Host " ✓ Created OpenVPN-TAP-$i" -ForegroundColor Green - }} else {{ - Write-Host " ⚠ Failed to create adapter (exit code: $LASTEXITCODE)" -ForegroundColor Red - }} - }} catch {{ - Write-Host " ✗ Error: $_" -ForegroundColor Red - }} - - Start-Sleep -Milliseconds 500 -}} - -Write-Host "`n✓ TAP adapter installation complete!" -ForegroundColor Green -Write-Host "Verifying installation..." -ForegroundColor Cyan - -$finalCount = (Get-NetAdapter | Where-Object {{ $_.InterfaceDescription -like "*TAP*" }}).Count -Write-Host "Total TAP adapters now: $finalCount" -ForegroundColor Cyan - -exit 0 -"#, - tap_count - ); - - let script_path = std::env::current_dir()?.join("install_tap_adapters.ps1"); - - // Write script to file - fs::write(&script_path, script_content) - .context("Failed to create PowerShell script")?; - - crate::util::logger::log_info(&format!( - "Created installation script: {:?}", - script_path - )).await; - - // Execute PowerShell script as Administrator - crate::util::logger::log_info( - "Executing TAP adapter installation (requires Administrator)..." - ).await; - - let output = Command::new("powershell") - .args(&[ - "-ExecutionPolicy", "Bypass", - "-NoProfile", - "-File", script_path.to_str().unwrap() - ]) - .output().await; - - match output { - Ok(result) => { - let stdout = String::from_utf8_lossy(&result.stdout); - let stderr = String::from_utf8_lossy(&result.stderr); - - if !stdout.is_empty() { - crate::util::logger::log_info(&format!("Script output:\n{}", stdout)).await; - } - - if result.status.success() { - crate::util::logger::log_info( - "✓ TAP adapters installed successfully" - ).await; - - // Recount adapters - tokio::time::sleep(Duration::from_secs(2)).await; - let new_count = Self::count_tap_adapters().await?; - - crate::util::logger::log_info(&format!( - "✓ Total TAP adapters now: {}", - new_count - )).await; - - return Ok(new_count); - } else { - let error_msg = if stderr.contains("Administrator") { - "Administrator privileges required. Please run as Administrator." - } else if !stderr.is_empty() { - &stderr - } else { - "Unknown error during installation" - }; - - crate::util::logger::log_error(&format!( - "✗ TAP adapter installation failed: {}", - error_msg - )).await; - - crate::util::logger::log_warn( - "Continuing with existing adapters. VPN connections will be sequential." - ).await; - - return Ok(tap_count); - } - } - Err(e) => { - crate::util::logger::log_error(&format!( - "Failed to execute installation script: {}", - e - )).await; - - crate::util::logger::log_info(&format!( - "You can manually run: {:?}", - script_path - )).await; - - return Ok(tap_count); - } - } - } - - #[cfg(not(target_os = "windows"))] - { - // Non-Windows: TAP adapters not needed - Ok(10) - } - } - - /// Counts existing TAP adapters on the system - async fn count_tap_adapters() -> Result { - #[cfg(target_os = "windows")] - { - let output = Command::new("ipconfig") - .arg("/all") - .output().await - .context("Failed to execute ipconfig")?; - - let output_str = String::from_utf8_lossy(&output.stdout); - - let count = output_str - .lines() - .filter(|line| { - let lower = line.to_lowercase(); - (lower.contains("tap") && lower.contains("adapter")) - || lower.contains("tap-windows") - }) - .count() / 2; // Two lines per adapter - - Ok(count) - } - - #[cfg(not(target_os = "windows"))] - { - Ok(10) // Not applicable on non-Windows - } - } - - async fn detect_optimal_batch_size() -> usize { - #[cfg(target_os = "windows")] - { - use std::process::Command; - - // Try to detect TAP adapters via ipconfig - match Command::new("ipconfig") - .arg("/all") - .output() - { - Ok(output) => { - let output_str = String::from_utf8_lossy(&output.stdout); - - // Count lines containing "TAP" adapter references - // Two lines per tap adapter typically - let tap_count = output_str - .lines() - .filter(|line| { - let lower = line.to_lowercase(); - (lower.contains("tap") && lower.contains("adapter")) - || lower.contains("tap-windows") - }) - .count() / 2; - - if tap_count == 0 { - crate::util::logger::log_warn( - "⚠ No TAP adapters detected! VPN connections will likely fail." - ).await; - return 1; - } else if tap_count == 1 { - crate::util::logger::log_warn(&format!( - "⚠ Only 1 TAP adapter found. VPNs will connect sequentially.\n\ - For parallel connections, install more TAP adapters:\n\ - Run as Admin: cd 'C:\\Program Files\\OpenVPN\\bin' && .\\tapctl.exe create --name 'OpenVPN-TAP-2'" - )).await; - return 1; - } else { - crate::util::logger::log_info(&format!( - "✓ Found {} TAP adapters - enabling parallel VPN connections", - tap_count - )).await; - - // Use 75% of available adapters to leave some headroom - let optimal = ((tap_count as f32 * 0.75).ceil() as usize).max(1); - return optimal.min(6); // Cap at 6 for stability - } - } - Err(e) => { - crate::util::logger::log_warn(&format!( - "⚠ Failed to detect TAP adapters: {}. Using sequential connection.", - e - )).await; - return 1; - } - } - } - - #[cfg(not(target_os = "windows"))] - { - // Linux/macOS: TAP adapters aren't a limitation - // Can run more in parallel - return 4; - } - } -} - -fn is_tcp_443_config(path: &Path) -> bool { - // Get filename as string - let filename = match path.file_name().and_then(|n| n.to_str()) { - Some(name) => name.to_lowercase(), - None => return false, - }; - - // Check if contains both "tcp" and "443" - filename.contains("tcp") && filename.contains("443") -} \ No newline at end of file diff --git a/src/scraper/vpn_rotation_system.md b/src/scraper/vpn_rotation_system.md deleted file mode 100644 index 1173e29..0000000 --- a/src/scraper/vpn_rotation_system.md +++ /dev/null @@ -1,397 +0,0 @@ -# VPN Rotation System - Setup Checklist - -## 🚀 Quick Setup (5 Minutes) - -Follow these steps to get your VPN rotation system up and running: - -### ✅ Step 1: Install OpenVPN - -**Windows:** -```powershell -# Download installer -# https://openvpn.net/community-downloads/ - -# Install to default location -# Add to PATH: C:\Program Files\OpenVPN\bin - -# Verify installation -openvpn --version -``` - -**Linux (Ubuntu/Debian):** -```bash -sudo apt-get update -sudo apt-get install openvpn -openvpn --version -``` - -**macOS:** -```bash -brew install openvpn -openvpn --version -``` - -### ✅ Step 2: Install ForceBindIP (Windows Only) - -```powershell -# Download from: http://r1ch.net/projects/forcebindip - -# Extract ForceBindIP.exe and place in one of: -# Option 1: Project root -.\ForceBindIP.exe - -# Option 2: Tools directory -.\tools\ForceBindIP.exe - -# Option 3: Add to PATH -C:\Program Files\ForceBindIP\ForceBindIP.exe - -# Verify installation -ForceBindIP.exe -``` - -**Linux/macOS Users:** -- ForceBindIP is Windows-only -- Use network namespaces (Linux) or alternative routing -- See documentation for workarounds - -### ✅ Step 3: Update Cargo.toml - -Add these dependencies if not already present: - -```toml -[dependencies] -anyhow = "1.0" -tokio = { version = "1.0", features = ["full"] } -fantoccini = "0.19" -reqwest = { version = "0.11", features = ["blocking"] } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -chrono = "0.4" -once_cell = "1.19" -dotenvy = "0.15" -url = "2.5" -zip = "0.6" -``` - -### ✅ Step 4: Configure Environment - -Create or update `.env` file in project root: - -```bash -# Required: Date ranges -ECONOMIC_START_DATE=2007-02-13 -CORPORATE_START_DATE=2010-01-01 -ECONOMIC_LOOKAHEAD_MONTHS=3 - -# Required: Parallelism -MAX_PARALLEL_INSTANCES=5 -MAX_TASKS_PER_INSTANCE=0 - -# VPN Configuration -ENABLE_VPN_ROTATION=true -TASKS_PER_VPN_SESSION=50 -``` - -**Configuration Presets:** - -**Conservative (Recommended for first run):** -```bash -MAX_PARALLEL_INSTANCES=3 -TASKS_PER_VPN_SESSION=100 -``` - -**Balanced:** -```bash -MAX_PARALLEL_INSTANCES=5 -TASKS_PER_VPN_SESSION=50 -``` - -**Aggressive (Use with caution):** -```bash -MAX_PARALLEL_INSTANCES=10 -TASKS_PER_VPN_SESSION=25 -``` - -### ✅ Step 5: Add VPN Module Files - -Copy these files to your project: - -``` -src/ -├── scraper/ -│ ├── mod.rs (update with: pub mod vpn_manager; pub mod forcebindip;) -│ ├── vpn_manager.rs (new file - from artifact) -│ ├── forcebindip.rs (new file - from artifact) -│ └── webdriver.rs (replace with VPN-enabled version) -├── util/ -│ ├── mod.rs (already includes opnv) -│ ├── opnv.rs (already present) -│ └── ... -├── main.rs (replace with VPN-enabled version) -└── lib.rs (update to expose VPN modules) -``` - -### ✅ Step 6: Verify Directory Structure - -Ensure these directories exist (will be auto-created): - -``` -project/ -├── cache/ -│ ├── openvpn/ (VPN configs stored here) -│ └── temp_vpn_zips/ (temporary, auto-cleaned) -├── logs/ (application logs) -├── data/ -│ ├── economic/ -│ └── corporate/ -└── chromedriver-win64/ - └── chromedriver.exe -``` - -### ✅ Step 7: Test Installation - -**Test 1: OpenVPN** -```bash -openvpn --version -# Should output version info -``` - -**Test 2: ForceBindIP (Windows)** -```powershell -ForceBindIP.exe 127.0.0.1 cmd.exe /c echo test -# Should output: test -``` - -**Test 3: Build Project** -```bash -cargo build --release -# Should compile without errors -``` - -**Test 4: Dry Run (No VPN)** -```bash -# Temporarily disable VPN -# Set in .env: ENABLE_VPN_ROTATION=false - -cargo run --release -# Should initialize ChromeDriver pool and run -``` - -### ✅ Step 8: First VPN-Enabled Run - -```bash -# Enable VPN in .env -ENABLE_VPN_ROTATION=true -TASKS_PER_VPN_SESSION=0 # Start with phase rotation only - -# Run application -cargo run --release - -# Watch logs -tail -f logs/backtest_*.log -``` - -**Expected Output:** -``` -[HH:MM:SS] [INFO] === Application started === -[HH:MM:SS] [INFO] === VPN Rotation Enabled === -[HH:MM:SS] [INFO] --- Fetching latest VPNBook OpenVPN configurations --- -[HH:MM:SS] [INFO] ✓ Fetched VPN credentials - Username: vpnbook -[HH:MM:SS] [INFO] ✓ Downloaded 6 .ovpn configuration files -[HH:MM:SS] [INFO] --- Initializing VPN Pool --- -[HH:MM:SS] [INFO] Found 6 OpenVPN configurations -[HH:MM:SS] [INFO] --- Connecting to VPN servers --- -[HH:MM:SS] [INFO] Starting VPN connection for ca149.vpnbook.com -[HH:MM:SS] [INFO] ✓ VPN ca149.vpnbook.com connected with IP: 142.4.217.133 -... -[HH:MM:SS] [INFO] ✓ ChromeDriver pool initialized successfully -``` - -## 🎯 Common Issues and Solutions - -### Issue: "openvpn: command not found" -```bash -# Windows: Add to PATH -setx PATH "%PATH%;C:\Program Files\OpenVPN\bin" - -# Linux: Install package -sudo apt-get install openvpn - -# Verify -which openvpn -``` - -### Issue: "ForceBindIP.exe not found" -```powershell -# Place in project root -curl -o ForceBindIP.exe http://r1ch.net/projects/forcebindip/ForceBindIP.exe - -# Or add to PATH -setx PATH "%PATH%;C:\path\to\ForceBindIP" -``` - -### Issue: VPN Connection Timeout -```bash -# Try different config file -# VPNBook offers multiple servers/protocols -# Look in cache/openvpn/ after first fetch - -# Files named like: -# - vpnbook-ca149-tcp80.ovpn (TCP port 80 - most compatible) -# - vpnbook-ca149-tcp443.ovpn (TCP port 443 - works through most firewalls) -# - vpnbook-ca149-udp53.ovpn (UDP port 53 - faster but may be blocked) - -# Check firewall settings -# - Allow OpenVPN.exe through Windows Firewall -# - Allow outbound connections on ports 80, 443, 53, 1194 -``` - -### Issue: "Failed to spawn chromedriver" -```bash -# Verify chromedriver path -ls chromedriver-win64/chromedriver.exe - -# Check Chrome/ChromeDriver version match -chromedriver.exe --version -# Chrome version should be compatible - -# Update ChromeDriver if needed -# Download from: https://chromedriver.chromium.org/ -``` - -### Issue: "Semaphore closed" -```bash -# Reduce parallelism in .env -MAX_PARALLEL_INSTANCES=3 - -# Or increase system resources -# Check Task Manager / Activity Monitor -``` - -## 📊 Performance Tuning - -### Optimize for Speed -```bash -MAX_PARALLEL_INSTANCES=10 -TASKS_PER_VPN_SESSION=100 -# More instances, less frequent rotation -# Risk: More aggressive, may hit rate limits -``` - -### Optimize for Stealth -```bash -MAX_PARALLEL_INSTANCES=2 -TASKS_PER_VPN_SESSION=10 -# Fewer instances, frequent rotation -# Risk: Slower, but more IP diversity -``` - -### Optimize for Stability -```bash -MAX_PARALLEL_INSTANCES=5 -TASKS_PER_VPN_SESSION=50 -# Balanced approach (recommended) -``` - -## 🔍 Monitoring and Logs - -### Key Log Files -``` -logs/ -└── backtest_YYYYMMDD_HHMMSS.log -``` - -### Important Log Patterns - -**Successful VPN Connection:** -``` -[INFO] ✓ VPN ca149.vpnbook.com connected with IP: 142.4.217.133 -``` - -**VPN Rotation:** -``` -[INFO] ✓ VPN ca149.vpnbook.com rotated: 142.4.217.133 -> 142.4.217.201 -``` - -**Health Issues:** -``` -[WARN] ⚠ Health check failed for VPN us1.vpnbook.com -[INFO] Attempting to reconnect unhealthy VPN: us1.vpnbook.com -``` - -**Binding ChromeDriver:** -``` -[INFO] Binding ChromeDriver to VPN IP: 142.4.217.133 -``` - -### Monitor Real-Time -```bash -# Linux/macOS -tail -f logs/backtest_*.log - -# Windows PowerShell -Get-Content logs\backtest_*.log -Wait -Tail 50 -``` - -### Search Logs -```bash -# Count successful connections -grep "connected with IP" logs/*.log | wc -l - -# Find errors -grep ERROR logs/*.log - -# Track rotations -grep "rotated:" logs/*.log - -# Find failed tasks -grep "failed" logs/*.log -``` - -## 🚦 Next Steps - -1. **✅ Complete Setup**: Verify all checkboxes above -2. **🧪 Test Run**: Run with `TASKS_PER_VPN_SESSION=0` first -3. **📊 Monitor**: Watch logs during first run -4. **⚙️ Tune**: Adjust configuration based on results -5. **🔄 Iterate**: Increase parallelism gradually -6. **📈 Scale**: Once stable, increase to production levels - -## 📚 Additional Resources - -- **VPNBook Website**: https://www.vpnbook.com/freevpn -- **OpenVPN Docs**: https://openvpn.net/community-resources/ -- **ForceBindIP**: http://r1ch.net/projects/forcebindip -- **ChromeDriver**: https://chromedriver.chromium.org/ - -## 🆘 Getting Help - -If you encounter issues: - -1. **Check Prerequisites**: Verify all software is installed -2. **Review Logs**: Look in `logs/` directory -3. **Test Components**: Test OpenVPN and ForceBindIP independently -4. **Simplify**: Start with `ENABLE_VPN_ROTATION=false` -5. **Document Error**: Note exact error message and context - -## 🎉 Success Criteria - -You're ready to proceed when you see: - -``` -✓ OpenVPN installed and in PATH -✓ ForceBindIP.exe accessible (Windows) -✓ Project compiles successfully -✓ VPN configurations fetched -✓ All VPNs connected -✓ ChromeDriver pool initialized -✓ First scraping task completed -``` - -**Congratulations! Your VPN rotation system is now operational.** - ---- - -*Last Updated: December 2024* -*Version: 1.0* \ No newline at end of file diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index f0e7a38..4f67bce 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -11,426 +11,258 @@ use tokio::process::{Child, Command}; use tokio::task::JoinHandle; use tokio::sync::{Mutex, Semaphore}; use tokio::time::{sleep, timeout, Duration}; - -use super::vpn_manager::{VpnInstance, VpnPool}; - -#[cfg(target_os = "windows")] -use super::forcebindip::ForceBindIpManager; +use crate::scraper::docker_vpn_proxy::{DockerVpnProxyPool}; /// Manages a pool of ChromeDriver instances for parallel scraping with optional VPN binding. pub struct ChromeDriverPool { instances: Vec>>, semaphore: Arc, - vpn_pool: Option>, - #[cfg(target_os = "windows")] - forcebindip: Option>, + /// Optional Docker-based proxy pool (one proxy per Chrome instance) + proxy_pool: Option>, } impl ChromeDriverPool { - /// Creates a new pool with the specified number of ChromeDriver instances (no VPN). +/// Creates a new pool without any proxy (direct connection). pub async fn new(pool_size: usize) -> Result { - Self::new_with_vpn_and_task_limit(pool_size, None, 0).await + Self::new_with_proxy_and_task_limit(pool_size, None, 0).await } - /// Creates a new ChromeDriver pool with task-per-instance tracking. - pub async fn new_with_task_limit( + /// Creates a new pool with task-per-instance limit but no proxy. + pub async fn new_with_task_limit(pool_size: usize, max_tasks_per_instance: usize) -> Result { + Self::new_with_proxy_and_task_limit(pool_size, None, max_tasks_per_instance).await + } + + /// Creates a new pool where each Chrome instance uses a different SOCKS5 proxy from the Docker pool. + pub async fn new_with_proxy( pool_size: usize, - max_tasks_per_instance: usize, + proxy_pool: Option>, ) -> Result { - Self::new_with_vpn_and_task_limit(pool_size, None, max_tasks_per_instance).await + Self::new_with_proxy_and_task_limit(pool_size, proxy_pool, 0).await } - /// Creates a new pool with VPN support. - pub async fn new_with_vpn( + /// Full constructor: supports proxy + task limiting. + pub async fn new_with_proxy_and_task_limit( pool_size: usize, - vpn_pool: Option>, - ) -> Result { - Self::new_with_vpn_and_task_limit(pool_size, vpn_pool, 0).await - } - - /// Creates a new pool with VPN support and task-per-instance limits. - pub async fn new_with_vpn_and_task_limit( - pool_size: usize, - vpn_pool: Option>, + proxy_pool: Option>, max_tasks_per_instance: usize, ) -> Result { let mut instances = Vec::with_capacity(pool_size); - #[cfg(target_os = "windows")] - let forcebindip = if vpn_pool.is_some() { - match ForceBindIpManager::new() { - Ok(manager) => { - crate::util::logger::log_info("✓ ForceBindIP manager initialized").await; - Some(Arc::new(manager)) - } - Err(e) => { - crate::util::logger::log_warn(&format!( - "⚠ ForceBindIP not available: {}. Proceeding without IP binding.", - e - )).await; - None - } - } - } else { - None - }; - crate::util::logger::log_info(&format!( - "Initializing ChromeDriver pool with {} instances{}{}...", + "Initializing ChromeDriver pool with {} instances{}...", pool_size, - if vpn_pool.is_some() { " (VPN-enabled)" } else { "" }, - if max_tasks_per_instance > 0 { &format!(" (max {} tasks/instance)", max_tasks_per_instance) } else { "" } - )).await; + if proxy_pool.is_some() { " (each using a unique Docker SOCKS5 proxy)" } else { "" } + )) + .await; for i in 0..pool_size { - // If VPN pool exists, acquire a VPN instance for this ChromeDriver - let vpn_instance = if let Some(ref vp) = vpn_pool { - Some(vp.acquire().await?) - } else { - None - }; + let proxy_url = proxy_pool + .as_ref() + .map(|pp| pp.get_proxy_url(i)); - #[cfg(target_os = "windows")] - let instance = ChromeInstance::new_with_task_limit(vpn_instance, forcebindip.clone(), max_tasks_per_instance).await?; - - #[cfg(not(target_os = "windows"))] - let instance = ChromeInstance::new_with_task_limit(vpn_instance, max_tasks_per_instance).await?; + let instance = ChromeInstance::new(proxy_url, max_tasks_per_instance).await?; - crate::util::logger::log_info(&format!(" ✓ Instance {} ready", i + 1)).await; + crate::util::logger::log_info(&format!(" Instance {} ready", i + 1)).await; instances.push(Arc::new(Mutex::new(instance))); } Ok(Self { instances, semaphore: Arc::new(Semaphore::new(pool_size)), - vpn_pool, - #[cfg(target_os = "windows")] - forcebindip, + proxy_pool, }) } - /// Executes a scrape task using an available instance from the pool. + /// Execute a scraping task using an available instance from the pool. pub async fn execute(&self, url: String, parse: F) -> Result where T: Send + 'static, F: FnOnce(Client) -> Fut + Send + 'static, - Fut: std::future::Future> + Send + 'static, + Fut: std::future::Future> + Send, { - // Acquire semaphore permit - let _permit = self - .semaphore - .acquire() - .await - .map_err(|_| anyhow!("Semaphore closed"))?; + let _permit = self.semaphore.acquire().await.map_err(|_| anyhow!("Pool closed"))?; - // Find an available instance (round-robin or first available) - let instance = self.instances[0].clone(); + // Round-robin selection + let index = rand::random_range(..self.instances.len()); + let instance = self.instances[index].clone(); let mut guard = instance.lock().await; - // Track task count guard.increment_task_count(); - // Get VPN info before creating session - let vpn_info = if let Some(ref vpn) = guard.vpn_instance { - let vpn_guard = vpn.lock().await; - Some(format!("{} ({})", - vpn_guard.hostname(), - vpn_guard.external_ip().unwrap_or("unknown"))) - } else { - None - }; - - // Log task count if limit is set if guard.max_tasks_per_instance > 0 { crate::util::logger::log_info(&format!( "Instance task count: {}/{}", guard.get_task_count(), guard.max_tasks_per_instance - )).await; + )) + .await; } - // Create a new session for this task let client = guard.new_session().await?; - // Release lock while we do the actual scraping - drop(guard); + drop(guard); // release lock early - // Navigate and parse - if let Some(ref info) = vpn_info { - crate::util::logger::log_info(&format!("Scraping {} via VPN: {}", url, info)).await; - } + crate::util::logger::log_info(&format!("Scraping {} ...", url)).await; + client.goto(&url).await.context("Navigation failed")?; - client.goto(&url).await.context("Failed to navigate")?; - let result = timeout(Duration::from_secs(60), parse(client)) + let result = timeout(Duration::from_secs(90), parse(client)) .await - .context("Parse function timed out after 60s")??; - - // Handle VPN rotation if needed - if let Some(ref vpn_pool) = self.vpn_pool { - let mut guard = instance.lock().await; - if let Some(ref vpn) = guard.vpn_instance { - vpn_pool.rotate_if_needed(vpn.clone()).await?; - guard.reset_task_count(); // Reset task count on VPN rotation - } - } + .context("Parse timeout")??; Ok(result) } + /// Gracefully shut down all ChromeDriver processes and Docker proxy containers. + pub async fn shutdown(&self) -> Result<()> { + for inst in &self.instances { + let mut guard = inst.lock().await; + guard.shutdown().await?; + } + + if let Some(pp) = &self.proxy_pool { + pp.shutdown().await?; + crate::util::logger::log_info("All Docker VPN proxy containers stopped").await; + } + + Ok(()) + } + pub fn get_number_of_instances(&self) -> usize { self.instances.len() } - - /// Returns whether VPN is enabled for this pool - pub fn is_vpn_enabled(&self) -> bool { - self.vpn_pool.is_some() - } - - /// Gracefully shutdown all ChromeDriver instances in the pool. - pub async fn shutdown(&self) -> Result<()> { - crate::util::logger::log_info("Shutting down ChromeDriverPool instances...").await; - for inst in &self.instances { - crate::util::logger::log_info("Shutting down a ChromeDriver instance...").await; - let mut guard = inst.lock().await; - if let Err(e) = guard.shutdown().await { - crate::util::logger::log_warn(&format!("Error shutting down instance: {}", e)).await; - } - } - crate::util::logger::log_info("All ChromeDriver instances shut down").await; - Ok(()) - } } /// Represents a single instance of chromedriver process, optionally bound to a VPN. pub struct ChromeInstance { - process: Child, base_url: String, - vpn_instance: Option>>, + process: Child, + stderr_log: Option>, task_count: usize, max_tasks_per_instance: usize, - // Optional join handle for background stderr logging task - stderr_log: Option>, + proxy_url: Option, } impl ChromeInstance { - /// Creates a new ChromeInstance, optionally bound to a VPN IP. - #[cfg(target_os = "windows")] - pub async fn new( - vpn_instance: Option>>, - forcebindip: Option>, - ) -> Result { - Self::new_with_task_limit(vpn_instance, forcebindip, 0).await - } - - /// Creates a new ChromeInstance with task-per-instance limit, bound to a VPN IP if provided. - #[cfg(target_os = "windows")] - pub async fn new_with_task_limit( - vpn_instance: Option>>, - forcebindip: Option>, - max_tasks_per_instance: usize, - ) -> Result { - let bind_ip = if let Some(ref vpn) = vpn_instance { - let vpn_guard = vpn.lock().await; - vpn_guard.external_ip().map(|s| s.to_string()) - } else { - None - }; - - let mut command = if let (Some(ip), Some(fb)) = (&bind_ip, &forcebindip) { - // Use ForceBindIP to bind ChromeDriver to specific VPN IP - crate::util::logger::log_info(&format!("Binding ChromeDriver to VPN IP: {}", ip)).await; - let mut std_cmd = fb.create_bound_command( - ip, - std::path::Path::new("chromedriver-win64/chromedriver.exe"), - &["--port=0"], - ); - Command::from(std_cmd) - } else { - let mut cmd = Command::new("chromedriver-win64/chromedriver.exe"); - cmd.arg("--port=0"); - cmd - }; - - command.stdout(Stdio::piped()).stderr(Stdio::piped()); - - let mut process = command - .spawn() - .context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?; - - let (base_url, stderr_handle) = Self::wait_for_chromedriver_start(&mut process).await?; + pub async fn new(proxy_url: Option, max_tasks_per_instance: usize) -> Result { + let (base_url, process, stderr_handle) = Self::spawn_chromedriver().await?; Ok(Self { - process, base_url, - vpn_instance, + process, + stderr_log: Some(stderr_handle), task_count: 0, max_tasks_per_instance, - stderr_log: stderr_handle, + proxy_url, }) } - /// Creates a new ChromeInstance on non-Windows platforms (no ForceBindIP support). - #[cfg(not(target_os = "windows"))] - pub async fn new(vpn_instance: Option>>) -> Result { - Self::new_with_task_limit(vpn_instance, 0).await - } - - /// Creates a new ChromeInstance on non-Windows platforms with task-per-instance limit. - #[cfg(not(target_os = "windows"))] - pub async fn new_with_task_limit(vpn_instance: Option>>, max_tasks_per_instance: usize) -> Result { - if vpn_instance.is_some() { - crate::util::logger::log_warn( - "⚠ VPN binding requested but ForceBindIP is not available on this platform" - ).await; - } - - let mut command = Command::new("chromedriver"); - command - .arg("--port=0") - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - let mut process = command - .spawn() - .context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?; - - let (base_url, stderr_handle) = Self::wait_for_chromedriver_start(&mut process).await?; - - Ok(Self { - process, - base_url, - vpn_instance, - task_count: 0, - max_tasks_per_instance, - stderr_log: stderr_handle, - }) - } - - /// Waits for ChromeDriver to start and extracts the listening address. - async fn wait_for_chromedriver_start(process: &mut Child) -> Result<(String, Option>)> { - let mut stdout = - BufReader::new(process.stdout.take().context("Failed to capture stdout")?).lines(); - - let stderr_reader = process.stderr.take().context("Failed to capture stderr")?; - - let start_time = std::time::Instant::now(); - let mut address: Option = None; - let mut success = false; - - // Log stderr in background for debugging and return the JoinHandle so we can - // abort/await it during shutdown. - let stderr_handle: JoinHandle<()> = tokio::spawn(async move { - let mut stderr_lines = BufReader::new(stderr_reader).lines(); - while let Ok(Some(line)) = stderr_lines.next_line().await { - let trimmed = line.trim(); - if !trimmed.is_empty() { - crate::util::logger::log_info(&format!("ChromeDriver stderr: {}", trimmed)).await; - } - } - }); - - // Wait for address and success (up to 30s) - while start_time.elapsed() < Duration::from_secs(30) { - if let Ok(Ok(Some(line))) = timeout(Duration::from_secs(1), stdout.next_line()).await { - if let Some(addr) = parse_chromedriver_address(&line) { - address = Some(addr.to_string()); - } - - if line.contains("ChromeDriver was started successfully") { - success = true; - } - - if let (Some(addr), true) = (&address, success) { - return Ok((addr.clone(), Some(stderr_handle))); - } - } - - sleep(Duration::from_millis(100)).await; - } - - // Cleanup on failure - let _ = process.kill().await; - // If we timed out, abort stderr logging task - stderr_handle.abort(); - let _ = stderr_handle.await; - Err(anyhow!("Timeout: ChromeDriver did not start within 30 seconds")) - } - - /// Creates a new browser session (client) from this ChromeDriver instance. pub async fn new_session(&self) -> Result { ClientBuilder::native() - .capabilities(Self::chrome_args()) + .capabilities(self.chrome_args()) .connect(&self.base_url) .await - .context("Failed to create new session") + .context("Failed to connect to ChromeDriver") } - /// Increments task counter and returns whether limit has been reached - pub fn increment_task_count(&mut self) -> bool { - if self.max_tasks_per_instance > 0 { - self.task_count += 1; - self.task_count >= self.max_tasks_per_instance - } else { - false - } + pub fn increment_task_count(&mut self) { + self.task_count += 1; } - /// Resets task counter (called when VPN is rotated) - pub fn reset_task_count(&mut self) { - self.task_count = 0; - } - - /// Returns current task count for this instance pub fn get_task_count(&self) -> usize { self.task_count } - /// Gracefully shutdown the chromedriver process and background log tasks. pub async fn shutdown(&mut self) -> Result<()> { - // Abort and await stderr logging task if present if let Some(handle) = self.stderr_log.take() { handle.abort(); let _ = handle.await; } - // Try to terminate the child process let _ = self.process.start_kill(); - // Await the process to ensure resources are released let _ = self.process.wait().await; - Ok(()) } - fn chrome_args() -> Map { - let args = serde_json::json!({ + /// Spawns the actual `chromedriver` binary and waits for it to become ready. + async fn spawn_chromedriver() -> Result<(String, Child, JoinHandle<()>)> { + let mut process = Command::new("chromedriver-win64/chromedriver.exe") + .arg("--port=0") // let OS choose free port + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .context("Failed to start chromedriver. Is it in PATH?")?; + + let stdout = process.stdout.take().unwrap(); + let stderr = process.stderr.take().unwrap(); + + let stdout_reader = BufReader::new(stdout); + let mut stdout_lines = stdout_reader.lines(); + + let stderr_reader = BufReader::new(stderr); + let stderr_handle = tokio::spawn(async move { + let mut lines = stderr_reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + let t = line.trim(); + if !t.is_empty() { + let _ = crate::util::logger::log_info(&format!("ChromeDriver: {}", t)).await; + } + } + }); + + let start = tokio::time::Instant::now(); + let mut address: Option = None; + + while start.elapsed() < Duration::from_secs(30) { + if let Ok(Ok(Some(line))) = timeout(Duration::from_secs(1), stdout_lines.next_line()).await { + if let Some(addr) = parse_chromedriver_address(&line) { + address = Some(addr); + } + if line.contains("ChromeDriver was started successfully") && address.is_some() { + return Ok((address.unwrap(), process, stderr_handle)); + } + } + sleep(Duration::from_millis(100)).await; + } + + let _ = process.kill().await; + stderr_handle.abort(); + Err(anyhow!("ChromeDriver failed to start within 30s")) + } + + fn chrome_args(&self) -> Map { + let mut args = vec![ + "--headless=new".to_string(), + "--disable-gpu".to_string(), + "--no-sandbox".to_string(), + "--disable-dev-shm-usage".to_string(), + "--disable-infobars".to_string(), + "--disable-extensions".to_string(), + "--disable-popup-blocking".to_string(), + "--disable-notifications".to_string(), + "--disable-logging".to_string(), + "--disable-autofill".to_string(), + "--disable-sync".to_string(), + "--disable-default-apps".to_string(), + "--disable-translate".to_string(), + "--window-size=1920,1080".to_string(), + "--disable-blink-features=AutomationControlled".to_string(), + "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36".to_string() + ]; + if let Some(ref proxy) = self.proxy_url { + let proxy = proxy.clone(); + let proxy_formatted = format!("--proxy-server={}", proxy); + args.push(proxy_formatted); + } + let caps = serde_json::json!({ "goog:chromeOptions": { - "args": [ - "--headless", - "--disable-gpu", - "--no-sandbox", - "--disable-dev-shm-usage", - "--disable-infobars", - "--disable-extensions", - "--disable-popup-blocking", - "--disable-notifications", - "--disable-logging", - "--disable-autofill", - "--disable-sync", - "--disable-default-apps", - "--disable-translate", - "--window-size=1920,1080", - "--disable-blink-features=AutomationControlled", - "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" - ], + "args": args, "excludeSwitches": ["enable-logging", "enable-automation"], "prefs": { "profile.default_content_setting_values.notifications": 2 } } }); - args.as_object() - .expect("Capabilities should be a JSON object") - .clone() + caps.as_object().cloned().unwrap() } }