diff --git a/Cargo.lock b/Cargo.lock index 7933033..c5e75cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -684,6 +684,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "windows-service", "yfinance-rs", "zip", ] @@ -3524,6 +3525,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "widestring" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72069c3113ab32ab29e5584db3c6ec55d416895e60715417b5b883a357c3e471" + [[package]] name = "windows-core" version = "0.62.2" @@ -3585,6 +3592,17 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-service" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193cae8e647981c35bc947fdd57ba7928b1fa0d4a79305f6dd2dc55221ac35ac" +dependencies = [ + "bitflags", + "widestring", + "windows-sys 0.59.0", +] + [[package]] name = "windows-strings" version = "0.5.1" @@ -3603,6 +3621,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.60.2" diff --git a/Cargo.toml b/Cargo.toml index b9359ba..493d7ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "event_backtest_engine" version = "0.1.0" -edition = "2021" +edition = "2024" authors = ["Your Name "] description = "High-impact economic & corporate earnings data collector for short-event backtesting (overnight/weekend gaps)" license = "MIT OR Apache-2.0" @@ -30,9 +30,12 @@ csv = "1.3" zip = "6.0.0" flate2 = "1.1.5" -# +# Formatting regex = "1.12.2" +# Windows features +windows-service = "0.8.0" + # Generating rand = "0.9.2" diff --git a/check.txt b/check.txt new file mode 100644 index 0000000..7b36f52 Binary files /dev/null and b/check.txt differ diff --git a/event_backtest_engine.exe b/event_backtest_engine.exe new file mode 100644 index 0000000..1dedcb7 Binary files /dev/null and b/event_backtest_engine.exe differ diff --git a/install_tap_adapters.ps1 b/install_tap_adapters.ps1 new file mode 100644 index 0000000..629eb4d --- /dev/null +++ b/install_tap_adapters.ps1 @@ -0,0 +1,53 @@ +# Auto-generated TAP adapter installation script +# Requires Administrator privileges + +$ErrorActionPreference = "Stop" + +# Check if running as Administrator +$currentPrincipal = New-Object Security.Principal.WindowsPrincipal([Security.Principal.WindowsIdentity]::GetCurrent()) +$isAdmin = $currentPrincipal.IsInRole([Security.Principal.WindowsBuiltInRole]::Administrator) + +if (-not $isAdmin) { + Write-Host "ERROR: This script must be run as Administrator!" -ForegroundColor Red + Write-Host "Right-click PowerShell and select 'Run as Administrator'" -ForegroundColor Yellow + exit 1 +} + +Write-Host "Installing additional TAP adapters..." -ForegroundColor Cyan + +$tapctlPath = "C:\Program Files\OpenVPN\bin\tapctl.exe" + +if (-not (Test-Path $tapctlPath)) { + Write-Host "ERROR: OpenVPN not found at: $tapctlPath" -ForegroundColor Red + Write-Host "Please install OpenVPN from: https://openvpn.net/community-downloads/" -ForegroundColor Yellow + exit 1 +} + +$existingCount = 10 +$targetCount = 10 + +for ($i = ($existingCount + 1); $i -le $targetCount; $i++) { + Write-Host "Creating TAP adapter #$i..." -ForegroundColor Yellow + + try { + & $tapctlPath create --name "OpenVPN-TAP-$i" + + if ($LASTEXITCODE -eq 0) { + Write-Host " ✓ Created OpenVPN-TAP-$i" -ForegroundColor Green + } else { + Write-Host " ⚠ Failed to create adapter (exit code: $LASTEXITCODE)" -ForegroundColor Red + } + } catch { + Write-Host " ✗ Error: $_" -ForegroundColor Red + } + + Start-Sleep -Milliseconds 500 +} + +Write-Host "`n✓ TAP adapter installation complete!" -ForegroundColor Green +Write-Host "Verifying installation..." -ForegroundColor Cyan + +$finalCount = (Get-NetAdapter | Where-Object { $_.InterfaceDescription -like "*TAP*" }).Count +Write-Host "Total TAP adapters now: $finalCount" -ForegroundColor Cyan + +exit 0 diff --git a/src/config.rs b/src/config.rs index c681eec..16d3639 100644 --- a/src/config.rs +++ b/src/config.rs @@ -22,12 +22,6 @@ pub struct Config { #[serde(default)] pub enable_vpn_rotation: bool, - /// Comma-separated list of VPN servers/country codes to rotate through. - /// Example: "US-Free#1,UK-Free#1,JP-Free#1" or "US,JP,DE" - /// If empty, VPN rotation is disabled. - #[serde(default)] - pub vpn_servers: String, - /// Number of tasks per session before rotating VPN /// If set to 0, rotates VPN between economic and corporate phases #[serde(default = "default_tasks_per_session")] @@ -51,7 +45,6 @@ impl Default for Config { max_parallel_instances: default_max_parallel_instances(), max_tasks_per_instance: 0, enable_vpn_rotation: false, - vpn_servers: String::new(), tasks_per_vpn_session: default_tasks_per_session(), } } @@ -100,9 +93,6 @@ impl Config { .parse::() .context("Failed to parse ENABLE_VPN_ROTATION as bool")?; - let vpn_servers = dotenvy::var("VPN_SERVERS") - .unwrap_or_else(|_| String::new()); - let tasks_per_vpn_session: usize = dotenvy::var("TASKS_PER_VPN_SESSION") .unwrap_or_else(|_| "0".to_string()) .parse() @@ -115,7 +105,6 @@ impl Config { max_parallel_instances, max_tasks_per_instance, enable_vpn_rotation, - vpn_servers, tasks_per_vpn_session, }) } diff --git a/src/lib.rs b/src/lib.rs index 30f8f0f..0553bbd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,4 +5,15 @@ pub mod config; pub mod scraper; -pub mod util; \ No newline at end of file +pub mod util; + +// Re-export commonly used types for convenience +pub use config::Config; +pub use scraper::webdriver::{ChromeDriverPool, ChromeInstance, ScrapeTask}; +pub use scraper::vpn_manager::{VpnInstance, VpnPool}; +pub use util::directories::DataPaths; +pub use util::logger; +pub use util::opnv; + +#[cfg(target_os = "windows")] +pub use scraper::forcebindip::ForceBindIpManager; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 5d135e4..4a03e4a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,23 +8,28 @@ mod scraper; use anyhow::Result; use config::Config; use scraper::webdriver::ChromeDriverPool; +use scraper::vpn_manager::VpnPool; use util::directories::DataPaths; use util::{logger, opnv}; use std::sync::Arc; /// The entry point of the application. /// -/// This function loads the configuration, initializes a shared ChromeDriver pool, -/// fetches the latest VPNBook OpenVPN configurations if VPN rotation is enabled, +/// This function loads the configuration, optionally initializes a VPN pool, +/// initializes a shared ChromeDriver pool bound to the VPN pool (if enabled), /// and sequentially runs the full updates for corporate and economic data. -/// Sequential execution helps prevent resource exhaustion from concurrent -/// chromedriver instances and avoids spamming the target websites with too many requests. +/// +/// If VPN rotation is enabled: +/// 1. Fetches latest VPNBook OpenVPN configurations +/// 2. Creates a VPN pool and connects all VPN instances +/// 3. Binds each ChromeDriver instance to a different VPN for IP rotation +/// 4. Performs periodic health checks to reconnect unhealthy VPN instances /// /// # Errors /// /// Returns an error if configuration loading fails, pool initialization fails, /// VPN fetching fails (if enabled), or if either update function encounters an issue -/// (e.g., network errors, scraping failures, or chromedriver spawn failures like "program not found"). +/// (e.g., network errors, scraping failures, or chromedriver spawn failures). #[tokio::main] async fn main() -> Result<()> { let config = Config::load().map_err(|err| { @@ -41,27 +46,100 @@ async fn main() -> Result<()> { })?; logger::log_info("=== Application started ===").await; - logger::log_info(&format!("Config: economic_start_date={}, corporate_start_date={}, lookahead_months={}, max_parallel_instances={}, enable_vpn_rotation={}", - config.economic_start_date, config.corporate_start_date, config.economic_lookahead_months, config.max_parallel_instances, config.enable_vpn_rotation)).await; + logger::log_info(&format!("Config: economic_start_date={}, corporate_start_date={}, lookahead_months={}, max_parallel_instances={}, enable_vpn_rotation={}, max_tasks_per_instance={}", + config.economic_start_date, config.corporate_start_date, config.economic_lookahead_months, config.max_parallel_instances, config.enable_vpn_rotation, config.max_tasks_per_instance)).await; - // Initialize the shared ChromeDriver pool once + // Initialize VPN pool if enabled + let vpn_pool = if config.enable_vpn_rotation { + logger::log_info("=== VPN Rotation Enabled ===").await; + logger::log_info("--- Fetching latest VPNBook OpenVPN configurations ---").await; + + let (username, password, _files) = + util::opnv::fetch_vpnbook_configs(&Arc::new(ChromeDriverPool::new(1).await?), paths.cache_dir()).await?; + + let amount_of_openvpn_servers = _files.len(); + + logger::log_info(&format!("✓ Fetched VPN credentials - Username: {}", username)).await; + + // Create VPN pool + let openvpn_dir = paths.cache_dir().join("openvpn"); + logger::log_info("--- Initializing VPN Pool ---").await; + let vp = Arc::new(VpnPool::new( + &openvpn_dir, + username, + password, + true, // enable rotation + config.tasks_per_vpn_session, + amount_of_openvpn_servers, + ).await?); + + // Connect all VPN instances (gracefully handles failures) + logger::log_info("--- Connecting to VPN servers ---").await; + match vp.connect_all().await { + Ok(()) => { + logger::log_info("✓ VPN initialization complete").await; + Some(vp) + } + Err(e) => { + logger::log_warn(&format!( + "⚠ VPN initialization failed: {}. Continuing without VPN.", + e + )).await; + None + } + } + } else { + None + }; + + // Initialize the shared ChromeDriver pool with VPN pool let pool_size = config.max_parallel_instances; - logger::log_info(&format!("Initializing ChromeDriver pool with size: {}", pool_size)).await; + let max_tasks_per_instance = config.max_tasks_per_instance; + + logger::log_info(&format!( + "Initializing ChromeDriver pool with size: {}{}", + pool_size, + if max_tasks_per_instance > 0 { &format!(" (max {} tasks/instance)", max_tasks_per_instance) } else { "" } + )).await; + + let pool = Arc::new( + if max_tasks_per_instance > 0 { + ChromeDriverPool::new_with_vpn_and_task_limit(pool_size, vpn_pool.clone(), max_tasks_per_instance).await? + } else if vpn_pool.is_some() { + ChromeDriverPool::new_with_vpn(pool_size, vpn_pool.clone()).await? + } else { + ChromeDriverPool::new(pool_size).await? + } + ); - let pool = Arc::new(ChromeDriverPool::new(pool_size).await?); logger::log_info("✓ ChromeDriver pool initialized successfully").await; - // Fetch VPNBook configs if VPN rotation is enabled - if config.enable_vpn_rotation { - logger::log_info("--- Fetching latest VPNBook OpenVPN configurations ---").await; - let (username, password, files) = - util::opnv::fetch_vpnbook_configs(&pool, paths.cache_dir()).await?; - logger::log_info(&format!("Fetched VPN username: {}, password: {}", username, password)).await; - for file in &files { - logger::log_info(&format!("Extracted OVPN: {:?}", file)).await; - } - // Optionally, store username/password for rotation use (e.g., in a file or global state) - // For now, just log them; extend as needed for rotation integration + // Spawn background Ctrl-C handler to gracefully shutdown pool and VPNs + { + let pool_for_signal = Arc::clone(&pool); + let vpn_for_signal = vpn_pool.clone(); + tokio::spawn(async move { + if let Err(e) = tokio::signal::ctrl_c().await { + let _ = util::logger::log_error(&format!("Ctrl-C handler failed to install: {}", e)).await; + return; + } + + let _ = util::logger::log_info("Ctrl-C received — initiating graceful shutdown").await; + + if let Err(e) = pool_for_signal.shutdown().await { + let _ = util::logger::log_warn(&format!("Error shutting down ChromeDriver pool: {}", e)).await; + } + + if let Some(vp) = vpn_for_signal { + if let Err(e) = vp.disconnect_all().await { + let _ = util::logger::log_warn(&format!("Error disconnecting VPNs: {}", e)).await; + } + } + + let _ = util::logger::log_info("Graceful shutdown complete (from Ctrl-C)").await; + // Exit the process now that cleanup is done + std::process::exit(0); + }); } // Run economic update first, passing the shared pool @@ -74,6 +152,18 @@ async fn main() -> Result<()> { corporate::run_full_update(&config, &pool).await?; logger::log_info("✓ Corporate data update completed").await; + // Shutdown ChromeDriver pool before disconnecting VPNs so instances can + // cleanly terminate any network-bound processes. + logger::log_info("--- Shutting down ChromeDriver pool ---").await; + pool.shutdown().await?; + logger::log_info("✓ ChromeDriver pool shutdown complete").await; + + // Disconnect all VPN instances if enabled + if let Some(vp) = vpn_pool { + logger::log_info("--- Disconnecting VPN instances ---").await; + vp.disconnect_all().await?; + } + logger::log_info("=== Application completed successfully ===").await; Ok(()) } \ No newline at end of file diff --git a/src/scraper/create_tapctls.sh b/src/scraper/create_tapctls.sh new file mode 100644 index 0000000..daa6359 --- /dev/null +++ b/src/scraper/create_tapctls.sh @@ -0,0 +1,7 @@ +# Als Administrator ausführen +cd "C:\Program Files\OpenVPN\bin" + +# 10 TAP-Adapter hinzufügen +for ($i=2; $i -le 10; $i++) { + .\tapctl.exe create --name "OpenVPN-TAP-$i" +} \ No newline at end of file diff --git a/src/scraper/forcebindip.rs b/src/scraper/forcebindip.rs new file mode 100644 index 0000000..6b135eb --- /dev/null +++ b/src/scraper/forcebindip.rs @@ -0,0 +1,163 @@ +// src/scraper/forcebindip.rs + +use anyhow::{anyhow, Context, Result}; +use std::path::{Path, PathBuf}; +use std::process::Command; + +/// Manages ForceBindIP integration for binding processes to specific IP addresses +pub struct ForceBindIpManager { + forcebindip_path: PathBuf, +} + +impl ForceBindIpManager { + /// Creates a new ForceBindIP manager + /// + /// On Windows, looks for ForceBindIP.exe in common locations or PATH + /// On other platforms, returns an error as ForceBindIP is Windows-only + pub fn new() -> Result { + #[cfg(target_os = "windows")] + { + let possible_paths = vec![ + PathBuf::from("ForceBindIP.exe"), + PathBuf::from("tools/ForceBindIP.exe"), + PathBuf::from("C:/Program Files/ForceBindIP/ForceBindIP.exe"), + PathBuf::from("C:/Program Files (x86)/ForceBindIP/ForceBindIP.exe"), + ]; + + for path in possible_paths { + if path.exists() { + return Ok(Self { + forcebindip_path: path, + }); + } + } + + // Try to find in PATH + if let Ok(output) = Command::new("where").arg("ForceBindIP.exe").output() { + if output.status.success() { + let path_str = String::from_utf8_lossy(&output.stdout); + let path = PathBuf::from(path_str.trim()); + if path.exists() { + return Ok(Self { + forcebindip_path: path, + }); + } + } + } + + Err(anyhow!( + "ForceBindIP.exe not found. Please download from http://r1ch.net/projects/forcebindip \ + and place it in the project directory or add to PATH" + )) + } + + #[cfg(not(target_os = "windows"))] + { + Err(anyhow!( + "ForceBindIP is only available on Windows. For Linux/macOS, consider using \ + network namespaces or other routing mechanisms" + )) + } + } + + /// Creates a command that will run the given program bound to the specified IP + /// + /// # Arguments + /// * `bind_ip` - The IP address to bind to + /// * `program` - Path to the program to execute + /// * `args` - Arguments to pass to the program + /// + /// # Returns + /// A configured Command ready to be spawned + pub fn create_bound_command( + &self, + bind_ip: &str, + program: &Path, + args: &[&str], + ) -> Command { + let mut cmd = Command::new(&self.forcebindip_path); + + // ForceBindIP syntax: ForceBindIP.exe [IP] [program] [args...] + cmd.arg(bind_ip) + .arg(program); + + for arg in args { + cmd.arg(arg); + } + + cmd + } + + /// Verifies that ForceBindIP is working by testing with a simple command + pub async fn verify_installation(&self) -> Result<()> { + #[cfg(target_os = "windows")] + { + // Test by running a simple command + let output = Command::new(&self.forcebindip_path) + .arg("0.0.0.0") + .arg("cmd.exe") + .arg("/c") + .arg("echo test") + .output() + .context("Failed to execute ForceBindIP verification")?; + + if !output.status.success() { + return Err(anyhow!( + "ForceBindIP verification failed. stderr: {}", + String::from_utf8_lossy(&output.stderr) + )); + } + + Ok(()) + } + + #[cfg(not(target_os = "windows"))] + { + Err(anyhow!("ForceBindIP verification not available on non-Windows platforms")) + } + } + + /// Returns the path to the ForceBindIP executable + pub fn path(&self) -> &Path { + &self.forcebindip_path + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[cfg(target_os = "windows")] + fn test_forcebindip_manager_creation() { + // This test will only pass if ForceBindIP is actually installed + // In CI/CD, you might want to skip this or mock it + match ForceBindIpManager::new() { + Ok(manager) => { + println!("ForceBindIP found at: {:?}", manager.path()); + } + Err(e) => { + println!("ForceBindIP not found (expected in dev environments): {}", e); + } + } + } + + #[test] + fn test_command_creation() { + #[cfg(target_os = "windows")] + { + if let Ok(manager) = ForceBindIpManager::new() { + let cmd = manager.create_bound_command( + "192.168.1.1", + Path::new("test.exe"), + &["--arg1", "--arg2"], + ); + + // Verify the command is constructed correctly + let cmd_str = format!("{:?}", cmd); + assert!(cmd_str.contains("192.168.1.1")); + assert!(cmd_str.contains("test.exe")); + } + } + } +} \ No newline at end of file diff --git a/src/scraper/install_tap_adapter.ps1 b/src/scraper/install_tap_adapter.ps1 new file mode 100644 index 0000000..f2bf468 --- /dev/null +++ b/src/scraper/install_tap_adapter.ps1 @@ -0,0 +1,135 @@ +# install_tap_adapters.ps1 +# Installs additional TAP-Windows adapters for parallel OpenVPN connections +# MUST BE RUN AS ADMINISTRATOR + +$ErrorActionPreference = "Stop" + +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "TAP Adapter Installation Script" -ForegroundColor Cyan +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "" + +# Check if running as Administrator +$currentPrincipal = New-Object Security.Principal.WindowsPrincipal([Security.Principal.WindowsIdentity]::GetCurrent()) +$isAdmin = $currentPrincipal.IsInRole([Security.Principal.WindowsBuiltInRole]::Administrator) + +if (-not $isAdmin) { + Write-Host "ERROR: This script must be run as Administrator!" -ForegroundColor Red + Write-Host "" + Write-Host "To run as Administrator:" -ForegroundColor Yellow + Write-Host " 1. Right-click PowerShell" -ForegroundColor Yellow + Write-Host " 2. Select 'Run as Administrator'" -ForegroundColor Yellow + Write-Host " 3. Run this script again" -ForegroundColor Yellow + Write-Host "" + Read-Host "Press Enter to exit" + exit 1 +} + +Write-Host "✓ Running with Administrator privileges" -ForegroundColor Green +Write-Host "" + +# Check for OpenVPN installation +$tapctlPath = "C:\Program Files\OpenVPN\bin\tapctl.exe" + +if (-not (Test-Path $tapctlPath)) { + Write-Host "ERROR: OpenVPN not found!" -ForegroundColor Red + Write-Host "" + Write-Host "Expected location: $tapctlPath" -ForegroundColor Yellow + Write-Host "" + Write-Host "Please install OpenVPN from:" -ForegroundColor Yellow + Write-Host "https://openvpn.net/community-downloads/" -ForegroundColor Cyan + Write-Host "" + Read-Host "Press Enter to exit" + exit 1 +} + +Write-Host "✓ OpenVPN found at: $tapctlPath" -ForegroundColor Green +Write-Host "" + +# Count existing TAP adapters +Write-Host "Checking existing TAP adapters..." -ForegroundColor Cyan +$existingAdapters = Get-NetAdapter | Where-Object { $_.InterfaceDescription -like "*TAP*" } +$existingCount = $existingAdapters.Count + +Write-Host " Found $existingCount existing TAP adapter(s)" -ForegroundColor Yellow + +if ($existingCount -ge 10) { + Write-Host "" + Write-Host "✓ You already have $existingCount TAP adapters (sufficient)" -ForegroundColor Green + Write-Host "" + Read-Host "Press Enter to exit" + exit 0 +} + +Write-Host "" +Write-Host "Installing additional TAP adapters..." -ForegroundColor Cyan +Write-Host " Target: 10 total adapters" -ForegroundColor Yellow +Write-Host " To install: $(10 - $existingCount) adapters" -ForegroundColor Yellow +Write-Host "" + +$targetCount = 10 +$successCount = 0 +$failCount = 0 + +for ($i = ($existingCount + 1); $i -le $targetCount; $i++) { + $adapterName = "OpenVPN-TAP-$i" + Write-Host "[$i/$targetCount] Creating $adapterName..." -ForegroundColor Cyan + + try { + $output = & $tapctlPath create --name $adapterName 2>&1 + + if ($LASTEXITCODE -eq 0) { + Write-Host " ✓ Successfully created $adapterName" -ForegroundColor Green + $successCount++ + } else { + Write-Host " ⚠ Failed to create $adapterName (exit code: $LASTEXITCODE)" -ForegroundColor Red + Write-Host " Output: $output" -ForegroundColor Gray + $failCount++ + } + } catch { + Write-Host " ✗ Error creating $adapterName : $_" -ForegroundColor Red + $failCount++ + } + + # Small delay to prevent resource conflicts + Start-Sleep -Milliseconds 500 +} + +Write-Host "" +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "Installation Summary" -ForegroundColor Cyan +Write-Host "========================================" -ForegroundColor Cyan +Write-Host " Successfully created: $successCount adapter(s)" -ForegroundColor Green +Write-Host " Failed: $failCount adapter(s)" -ForegroundColor $(if ($failCount -gt 0) { "Red" } else { "Gray" }) +Write-Host "" + +# Verify final count +Write-Host "Verifying installation..." -ForegroundColor Cyan +Start-Sleep -Seconds 2 + +$finalAdapters = Get-NetAdapter | Where-Object { $_.InterfaceDescription -like "*TAP*" } +$finalCount = $finalAdapters.Count + +Write-Host "" +Write-Host "Total TAP adapters now: $finalCount" -ForegroundColor $(if ($finalCount -ge 10) { "Green" } else { "Yellow" }) +Write-Host "" + +if ($finalCount -ge 10) { + Write-Host "✓ Installation complete! You now have sufficient TAP adapters." -ForegroundColor Green + Write-Host " You can now run up to $(($finalCount * 3/4)) VPN connections in parallel." -ForegroundColor Cyan +} elseif ($finalCount -gt $existingCount) { + Write-Host "⚠ Partial success. Added $(($finalCount - $existingCount)) adapter(s)." -ForegroundColor Yellow + Write-Host " You can run up to $(($finalCount * 3/4)) VPN connections in parallel." -ForegroundColor Cyan + Write-Host " Consider running this script again if you need more." -ForegroundColor Yellow +} else { + Write-Host "✗ No adapters were added. Check error messages above." -ForegroundColor Red +} + +Write-Host "" +Write-Host "Adapter List:" -ForegroundColor Cyan +$finalAdapters | ForEach-Object { + Write-Host " • $($_.Name) ($($_.InterfaceDescription))" -ForegroundColor Gray +} + +Write-Host "" +Read-Host "Press Enter to exit" \ No newline at end of file diff --git a/src/scraper/mod.rs b/src/scraper/mod.rs index 3bd0fd2..d60afcc 100644 --- a/src/scraper/mod.rs +++ b/src/scraper/mod.rs @@ -1 +1,5 @@ pub mod webdriver; +pub mod vpn_manager; + +#[cfg(target_os = "windows")] +pub mod forcebindip; \ No newline at end of file diff --git a/src/scraper/vpn_manager.rs b/src/scraper/vpn_manager.rs new file mode 100644 index 0000000..f1eb9b5 --- /dev/null +++ b/src/scraper/vpn_manager.rs @@ -0,0 +1,1422 @@ +// src/scraper/vpn_manager.rs + +use anyhow::{Context, Result, anyhow}; +use fantoccini::client; +use serde_json; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::process::{Child, Command}; +use tokio::sync::Mutex; +use tokio::time::{sleep, timeout, Duration}; +use std::process::Stdio; +use reqwest::Client; +use tokio::fs as tokio_fs; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::logger; + +/// Represents a single OpenVPN connection with its associated state +pub struct VpnInstance { + /// The OpenVPN process handle + process: Option, + /// Path to the .ovpn configuration file + config_path: PathBuf, + /// The external IP address assigned by this VPN + external_ip: Option, + /// Baseline (pre-VPN) external IP for verification + baseline_ip: Option, + /// Hostname derived from the config file (e.g., "ca149.vpnbook.com") + hostname: String, + /// Number of tasks completed in the current session + tasks_completed: usize, + /// VPN credentials + username: String, + password: String, + /// Health status + is_healthy: bool, + /// Path to credentials file created for this instance (if any) + cred_path: Option, + /// Path to temporary modified .ovpn file for this instance (if any) + temp_config_path: Option, + /// Path to OpenVPN log file created for this instance (if any) + log_path: Option, +} + +impl VpnInstance { + /// Creates a new VPN instance without starting the connection + pub fn new( + config_path: PathBuf, + username: String, + password: String, + baseline_ip: Option, + ) -> Result { + // Use the file stem (filename without extension) as the hostname/identifier. + // This avoids using the parent directory name which can be the same + // for many configs and cause collisions when copying into config-auto. + let hostname = config_path + .file_stem() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + .to_string(); + + Ok(Self { + process: None, + config_path, + external_ip: None, + baseline_ip, + hostname, + tasks_completed: 0, + username, + password, + is_healthy: false, + cred_path: None, + temp_config_path: None, + log_path: None, + }) + } + + /// Starts the OpenVPN connection and detects the assigned IP + pub async fn connect(&mut self) -> Result<()> { + crate::util::logger::log_info(&format!("Starting VPN connection for {}", self.hostname)).await; + // Create fixed config first + //let fixed_config_path = self.create_fixed_config().await + // .context("Failed to create fixed OpenVPN config")?; + + // Store the temp config path so we can clean it up later + self.temp_config_path = Some(self.config_path.clone()); + let cred_file = self.create_credentials_file().await?; + self.cred_path = Some(cred_file.clone()); + + // Use baseline IP supplied when pool was created, otherwise detect once + let baseline_ip = if self.baseline_ip.is_some() { + self.baseline_ip.clone() + } else { + let b = self.detect_external_ip().await.ok().flatten(); + if let Some(ref ip) = b { + crate::util::logger::log_info(&format!("Baseline IP (before VPN): {}", ip)).await; + } + b + }; + self.baseline_ip = baseline_ip; + + #[cfg(target_os = "windows")] + { + // Windows: Spawn individual openvpn.exe process for this instance + let temp_dir = std::env::temp_dir(); + let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos(); + let instance_config = temp_dir.join(format!("ovpn_{}_{}.ovpn", self.hostname, nanos)); + let log_path = temp_dir.join(format!("openvpn_{}_{}.log", self.hostname, nanos)); + + // Copy and modify the .ovpn file to include auth-user-pass (async) + let ovpn_content = tokio_fs::read_to_string(&self.config_path).await?; + let mut modified_content = ovpn_content; + + // Remove existing auth-user-pass lines if present + modified_content = modified_content + .lines() + .filter(|line| !line.trim().starts_with("auth-user-pass")) + .collect::>() + .join("\n"); + + // Add auth-user-pass pointing to credentials file at the end + modified_content.push('\n'); + let cred_path_str = cred_file.to_string_lossy().replace('\\', "/"); + modified_content.push_str(&format!("auth-user-pass \"{}\"\n", cred_path_str)); + + tokio_fs::write(&instance_config, modified_content).await?; + self.temp_config_path = Some(instance_config.clone()); + self.log_path = Some(log_path.clone()); + + // Verify OpenVPN executable exists + let openvpn_exe = r"C:\Program Files\OpenVPN\bin\openvpn.exe"; + if !std::path::Path::new(openvpn_exe).exists() { + return Err(anyhow!( + "OpenVPN executable not found at: {}\nPlease install OpenVPN from: https://openvpn.net/community-downloads/", + openvpn_exe + )); + } + + // Spawn openvpn.exe process directly + let mut cmd = Command::new(openvpn_exe); + cmd.arg("--config") + .arg(&instance_config) + .arg("--log") + .arg(&log_path) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + crate::util::logger::log_info(&format!( + "Spawning OpenVPN: {} --config {} --log {}", + openvpn_exe, + instance_config.display(), + log_path.display() + )).await; + + let mut process = cmd + .spawn() + .context("Failed to spawn openvpn.exe. Ensure OpenVPN is installed.")?; + + // Log OpenVPN stdout in background + if let Some(stdout) = process.stdout.take() { + let hostname = self.hostname.clone(); + tokio::spawn(async move { + use tokio::io::AsyncBufReadExt; + let mut reader = tokio::io::BufReader::new(stdout).lines(); + while let Ok(Some(line)) = reader.next_line().await { + crate::util::logger::log_info(&format!("OpenVPN-OUT [{}]: {}", hostname, line)).await; + } + }); + } + + // Store process handle + self.process = Some(process); + + // Wait for OpenVPN to initialize and verify connection + // Initial wait: OpenVPN takes ~8-10 seconds typically + crate::util::logger::log_info(&format!( + "Waiting for OpenVPN initialization ({})", + self.hostname + )).await; + + tokio::time::sleep(Duration::from_secs(40)).await; + + for i in 0..3 { + if self.scan_openvpn_logs().await? { + break; + } + crate::util::logger::log_info("OpenVPN not initialized yet, waiting 10 seconds...").await; + tokio::time::sleep(Duration::from_secs(10)).await; + } + + // Now run comprehensive verification with retries built-in + match self.verify_vpn_connection().await { + Ok(true) => { + self.is_healthy = true; + crate::util::logger::log_info(&format!( + "✓ VPN {} connected successfully with IP: {}", + self.hostname, + self.external_ip.as_ref().unwrap_or(&"unknown".to_string()) + )).await; + return Ok(()); + } + Ok(false) => { + // Verification failed - kill process + if let Some(mut p) = self.process.take() { + let _ = p.kill().await; + } + return Err(anyhow!( + "VPN connection verification failed for {}", + self.hostname + )); + } + Err(e) => { + // Error during verification + if let Some(mut p) = self.process.take() { + let _ = p.kill().await; + } + return Err(anyhow!( + "VPN verification error for {}: {}", + self.hostname, + e + )); + } + } + } + + #[cfg(not(target_os = "windows"))] + { + // Non-Windows implementation + let mut cmd = Command::new("openvpn"); + cmd.arg("--config") + .arg(&self.config_path) + .arg("--auth-user-pass") + .arg(&cred_file) + .arg("--daemon") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + match cmd.spawn() { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Err(anyhow!( + "OpenVPN not found. Please install it:\n \ + Linux: sudo apt-get install openvpn\n \ + macOS: brew install openvpn\n \ + Error: {}", e + )); + } + Err(e) => { + return Err(anyhow!("Failed to spawn OpenVPN: {}", e)); + } + Ok(mut process) => { + // Capture output for monitoring + if let Some(stdout) = process.stdout.take() { + let hostname = self.hostname.clone(); + tokio::spawn(async move { + use tokio::io::AsyncBufReadExt; + let mut reader = tokio::io::BufReader::new(stdout).lines(); + while let Ok(Some(line)) = reader.next_line().await { + crate::util::logger::log_info(&format!("[VPN-{}] {}", hostname, line)).await; + } + }); + } + + self.process = Some(process); + } + } + + // Wait for connection and verify (Unix) + sleep(Duration::from_secs(10)).await; + + match self.verify_vpn_connection().await { + Ok(true) => { + self.is_healthy = true; + crate::util::logger::log_info(&format!( + "✓ VPN {} connected successfully", + self.hostname + )).await; + return Ok(()); + } + Ok(false) => { + self.disconnect().await?; + return Err(anyhow!( + "Failed to verify VPN connection for {}", + self.hostname + )); + } + Err(e) => { + self.disconnect().await?; + return Err(e); + } + } + } + } + + async fn verify_vpn_connection(&mut self) -> Result { + logger::log_info(&format!("Verifying VPN connection for {}", self.hostname)).await; + + // 1. Quick process check + if self.process.is_none() { + logger::log_warn("Process check failed - no process").await; + return Ok(false); + } + + // 2. Check logs for success markers (with retries for file buffering) + let mut log_success = false; + for attempt in 1..=5 { + tokio::time::sleep(Duration::from_secs(2)).await; // Wait for log flush + + match self.scan_openvpn_logs().await { + Ok(true) => { + log_success = true; + logger::log_info(&format!( + "✓ Log verification passed (attempt {})", + attempt + )).await; + break; + } + Ok(false) => { + if attempt < 5 { + logger::log_info(&format!( + "Log verification pending (attempt {}/5)...", + attempt + )).await; + } + } + Err(e) => { + logger::log_warn(&format!("Log read error: {}", e)).await; + } + } + } + + if !log_success { + logger::log_warn("Log verification failed after 5 attempts").await; + return Ok(false); + } + + // 3. Wait for routes to be applied (happens ~2 seconds after "Initialization Sequence Completed") + logger::log_info("Waiting for routes to stabilize...").await; + tokio::time::sleep(Duration::from_secs(3)).await; + + // 4. Network route verification (with retries) + let mut route_ok = false; + for attempt in 1..=3 { + if self.check_routing_table().await.unwrap_or(false) { + route_ok = true; + logger::log_info(&format!( + "✓ Route verification passed (attempt {})", + attempt + )).await; + break; + } + + if attempt < 3 { + logger::log_info(&format!( + "Route verification pending (attempt {}/3)...", + attempt + )).await; + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + + if !route_ok { + logger::log_warn("Route verification failed - routes may not be established").await; + // Don't fail yet - some VPNs work without perfect routing table state + } + + // 5. External IP change verification (with retries and longer timeout) + logger::log_info("Verifying IP change...").await; + for attempt in 1..=3 { + match timeout(Duration::from_secs(8), self.verify_ip_change()).await { + Ok(Ok(true)) => { + let ip = self.external_ip.as_ref().unwrap(); + logger::log_info(&format!( + "✓ VPN connection verified - New IP: {} (attempt {})", + ip, attempt + )).await; + return Ok(true); + } + Ok(Ok(false)) => { + if attempt < 3 { + logger::log_info(&format!( + "IP verification pending (attempt {}/3)...", + attempt + )).await; + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + Ok(Err(e)) => { + logger::log_warn(&format!("IP check error: {}", e)).await; + } + Err(_) => { + logger::log_warn("IP check timeout").await; + } + } + } + + logger::log_error(&format!( + "✗ VPN connection verification failed for {} - IP unchanged after 3 attempts", + self.hostname + )).await; + Ok(false) + } + + // Improved routing table check with better Windows detection + async fn check_routing_table(&self) -> Result { + #[cfg(target_os = "windows")] + { + use std::process::Command; + + let output = Command::new("cmd") + .args(&["/C", "route", "print", "0.0.0.0"]) + .output() + .context("Failed to execute route print")?; + + if !output.status.success() { + return Ok(false); + } + + let output_str = String::from_utf8_lossy(&output.stdout); + + // Check for VPN-related routing entries + // Look for: TAP adapter in routes, or 10.x.x.x destinations (common VPN subnets) + let has_vpn_routes = output_str.contains("TAP") + || output_str.contains("10.8.0") // Common OpenVPN subnet + || output_str.contains("10.9.0") // Another common subnet + || output_str.to_lowercase().contains("openvpn"); + + // Also check for 0.0.0.0/128.0.0.0 split routing (redirect-gateway) + let has_redirect = output_str.contains("128.0.0.0") + && output_str.matches("0.0.0.0").count() >= 2; + + let result = has_vpn_routes || has_redirect; + + if result { + logger::log_info("✓ Routing table shows VPN routes").await; + } else { + logger::log_warn("✗ No VPN routes detected in routing table").await; + logger::log_info(&format!("Route output preview: {}", + &output_str.lines().take(10).collect::>().join("\n") + )).await; + } + + Ok(result) + } + + #[cfg(not(target_os = "windows"))] + { + // For non-Windows, just check if we can detect IP change + // (routing is less critical to verify on Unix) + Ok(true) + } + } + + // Improved log scanning with better error handling + async fn scan_openvpn_logs(&self) -> Result { + #[cfg(target_os = "windows")] + { + if let Some(ref log_path) = self.log_path { + // Try to read the file, but handle "not found" gracefully + let content = match tokio::fs::read_to_string(log_path).await { + Ok(c) => c, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // Log file doesn't exist yet + return Ok(false); + } + Err(e) => { + return Err(anyhow::anyhow!("Failed to read log file: {}", e)); + } + }; + + // Success marker + if content.contains("Initialization Sequence Completed") { + // Check for fatal errors that might have occurred after success + if content.contains("Exiting due to fatal error") { + return Ok(false); + } + + // Check for excessive retries/errors + let error_count = content.matches("TLS Error").count() + + content.matches("Connection reset").count() + + content.matches("SIGTERM").count() + + content.matches("Restart pause").count(); + + if error_count > 3 { + logger::log_warn(&format!( + "Logs show {} errors despite initialization complete", + error_count + )).await; + return Ok(false); + } + + return Ok(true); + } + + // Check for immediate fatal errors + if content.contains("Cannot open TUN/TAP dev") + || content.contains("Exiting due to fatal error") + || content.contains("AUTH: Received control message: AUTH_FAILED") { + return Ok(false); + } + } + } + + Ok(false) + } + + // Improved IP verification with better error handling + async fn verify_ip_change(&mut self) -> Result { + //let detected_ip = self.detect_external_ip().await?; + + let new_ip = self + .detect_external_ip() + .await + .context("Failed to query external IP after VPN connection")? + .with_context(|| "VPN connected, but no external IP received – tunnel broken?")?; + + // Compare with baseline + if let Some(baseline) = &self.baseline_ip { + if &new_ip == baseline { + logger::log_warn(&format!( + "IP unchanged from baseline: {} (VPN may not be routing traffic)", + baseline + )).await; + return Ok(false); + } + + logger::log_info(&format!( + "IP changed: {} → {} ✓", + baseline, new_ip + )).await; + } else { + logger::log_info(&format!("Detected IP: {}", new_ip)).await; + } + + self.external_ip = Some(new_ip); + Ok(true) + } + + /// Creates a fixed version of the OpenVPN config to work with modern OpenVPN + async fn create_fixed_config(&self) -> Result { + // Read the original config + let content = tokio::fs::read_to_string(&self.config_path) + .await + .context("Failed to read OpenVPN config")?; + + // Create a temporary file for the fixed config + let temp_dir = std::env::temp_dir(); + let temp_config_path = temp_dir.join(format!("fixed_{}.ovpn", + self.hostname.replace(|c: char| !c.is_alphanumeric(), "_"))); + + let mut fixed_lines = Vec::new(); + let mut has_data_ciphers = false; + let mut has_compression_setting = false; + + // Process each line + for line in content.lines() { + let trimmed = line.trim(); + + if trimmed.starts_with("cipher ") { + // Skip old cipher line, we'll add data-ciphers later if needed + continue; + } else if trimmed.starts_with("data-ciphers") { + has_data_ciphers = true; + fixed_lines.push(line.to_string()); + } else if trimmed.contains("allow-compression") { + has_compression_setting = true; + fixed_lines.push(line.to_string()); + } else if trimmed.starts_with(";") || trimmed.starts_with("#") { + // Keep comments + fixed_lines.push(line.to_string()); + } else if !trimmed.is_empty() { + fixed_lines.push(line.to_string()); + } + } + + // Add modern cipher suite if missing + if !has_data_ciphers { + fixed_lines.push("data-ciphers AES-256-GCM:AES-128-GCM:CHACHA20-POLY1305".to_string()); + } + + // Add compression setting if missing + if !has_compression_setting { + fixed_lines.push("allow-compression no".to_string()); + } + + // Remove any duplicate 'auth-user-pass' lines (we'll add our own later) + fixed_lines.retain(|line| !line.trim().starts_with("auth-user-pass")); + + // Write the fixed config + let fixed_content = fixed_lines.join("\n"); + tokio::fs::write(&temp_config_path, fixed_content) + .await + .context("Failed to write fixed OpenVPN config")?; + + Ok(temp_config_path) + } + + + /// Disconnects the VPN connection + pub async fn disconnect(&mut self) -> Result<()> { + #[cfg(target_os = "windows")] + { + // Kill the openvpn.exe process if it exists + if let Some(mut process) = self.process.take() { + crate::util::logger::log_info(&format!("Disconnecting VPN {}", self.hostname)).await; + let _ = process.kill().await; + + // Clean up temp config file and log file if present + if let Some(ref p) = self.temp_config_path { + let _ = tokio_fs::remove_file(p).await; + } + if let Some(ref lp) = self.log_path { + let _ = tokio_fs::remove_file(lp).await; + } + } + + // Remove credential file if present + if let Some(ref cred) = self.cred_path { + let _ = tokio_fs::remove_file(cred).await; + } + + self.external_ip = None; + self.is_healthy = false; + self.tasks_completed = 0; + Ok(()) + } + #[cfg(not(target_os = "windows"))] + { + if let Some(mut process) = self.process.take() { + crate::util::logger::log_info(&format!("Disconnecting VPN {}", self.hostname)).await; + process.kill().await.context("Failed to kill OpenVPN process")?; + self.external_ip = None; + self.is_healthy = false; + self.tasks_completed = 0; + } + // Remove credentials file if present + if let Some(ref cred) = self.cred_path { + let _ = tokio_fs::remove_file(cred).await; + } + Ok(()) + } + } + + /// Reconnects the VPN (disconnect + connect) + pub async fn reconnect(&mut self) -> Result<()> { + crate::util::logger::log_info(&format!("Reconnecting VPN {}", self.hostname)).await; + self.disconnect().await?; + sleep(Duration::from_secs(10)).await; // Brief delay between disconnect and reconnect + self.connect().await + } + + /// Ermittelt die aktuelle externe IPv4-Adresse über ipify.org. + /// + /// Nutzt einen wiederverwendbaren reqwest-Client aus dem Pool (vermutlich vorhandenen) + /// shared Client-Pool oder erstellt einen mit sinnvollen Defaults. + /// + /// # Returns + /// - `Ok(Some(ip))` bei erfolgreicher Erkennung + /// - `Ok(None)` wenn der Server antwortet, aber kein gültiger IP-String kommt + /// - `Err(_)` bei Netzwerk-, Timeout- oder Parse-Fehlern (wird mit Kontext angereichert) + async fn detect_external_ip(&self) -> anyhow::Result> { + // Empfohlen: Nutze einen shared Client (z. B. aus Arc) + // Falls du keinen hast, ist das hier immer noch besser als jedes Mal neu bauen: + static CLIENT: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { + reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(8)) + .user_agent("my-scraper/1.0") + .build() + .expect("Failed to build static reqwest client") + }); + + let resp = CLIENT + .get("https://api.ipify.org?format=json") + .send() + .await + .context("Failed to reach ipify.org – no internet or DNS issue")?; + + // 4xx/5xx → Fehler + let resp = resp + .error_for_status() + .context("ipify.org returned error status")?; + + #[derive(serde::Deserialize)] + struct IpResponse { + ip: String, + } + + let ip_obj: IpResponse = resp + .json() + .await + .context("Failed to parse JSON from ipify.org")?; + + // Einfache Validierung: sieht nach IPv4 aus? + if ip_obj.ip.contains('.') && !ip_obj.ip.is_empty() { + Ok(Some(ip_obj.ip)) + } else { + Ok(None) + } + } + + /// Performs a simple health check without reconnection + pub async fn health_check(&mut self) -> Result { + if self.process.is_none() { + self.is_healthy = false; + return Ok(false); + } + + match self.detect_external_ip().await { + Ok(Some(_)) => { + self.is_healthy = true; + Ok(true) + } + _ => { + self.is_healthy = false; + Ok(false) + } + } + } + + /// Performs a health check and automatically reconnects if failed + pub async fn health_check_with_reconnect(&mut self) -> Result { + if self.process.is_none() { + self.is_healthy = false; + return Ok(false); + } + + // Try to detect IP again + match self.detect_external_ip().await { + Ok(Some(ip)) => { + if let Some(ref current_ip) = self.external_ip { + if &ip != current_ip { + crate::util::logger::log_warn(&format!( + "VPN {} IP changed unexpectedly: {} -> {}", + self.hostname, current_ip, ip + )).await; + self.external_ip = Some(ip); + } + } + self.is_healthy = true; + Ok(true) + } + _ => { + crate::util::logger::log_warn(&format!( + "Health check failed for VPN {}, attempting reconnect...", + self.hostname + )).await; + self.is_healthy = false; + + // Attempt automatic reconnection + match self.reconnect().await { + Ok(()) => { + crate::util::logger::log_info(&format!( + "✓ VPN {} reconnected successfully", + self.hostname + )).await; + Ok(true) + } + Err(e) => { + crate::util::logger::log_error(&format!( + "✗ VPN {} reconnection failed: {}", + self.hostname, e + )).await; + Ok(false) + } + } + } + } + } + + /// Creates a temporary credentials file for OpenVPN authentication + async fn create_credentials_file(&self) -> Result { + let temp_dir = std::env::temp_dir(); + let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos(); + let cred_path = temp_dir.join(format!("vpn_creds_{}_{}.txt", self.hostname, nanos)); + + let content = format!("{}\n{}\n", self.username, self.password); + tokio::fs::write(&cred_path, content) + .await + .context("Failed to write credentials file")?; + + Ok(cred_path) + } + + /// Increments the task counter and returns whether rotation is needed + pub fn increment_task_count(&mut self, tasks_per_session: usize) -> bool { + self.tasks_completed += 1; + if tasks_per_session > 0 && self.tasks_completed >= tasks_per_session { + true + } else { + false + } + } + + /// Returns the external IP if connected + pub fn external_ip(&self) -> Option<&str> { + self.external_ip.as_deref() + } + + /// Returns the hostname + pub fn hostname(&self) -> &str { + &self.hostname + } + + /// Returns whether the VPN is healthy + pub fn is_healthy(&self) -> bool { + self.is_healthy + } + + /// Resets the task counter + pub fn reset_task_count(&mut self) { + self.tasks_completed = 0; + } +} + + +impl Drop for VpnInstance { + fn drop(&mut self) { + if let Some(mut process) = self.process.take() { + let _ = process.start_kill(); + } + } +} + +/// Manages a pool of VPN instances for rotation +pub struct VpnPool { + instances: Vec>>, + current_index: Arc>, + enable_rotation: bool, + tasks_per_session: usize, +} + +impl VpnPool { + /// Creates a new VPN pool from .ovpn configuration files + /// Automatically ensures sufficient TAP adapters are installed + pub async fn new( + ovpn_dir: &Path, + username: String, + password: String, + enable_rotation: bool, + tasks_per_session: usize, + amount_of_openvpn_servers: usize, + ) -> Result { + // STEP 1: Ensure we have enough TAP adapters (auto-install if needed) + #[cfg(target_os = "windows")] + { + crate::util::logger::log_info("=== TAP Adapter Check ===").await; + let _ = Self::ensure_tap_adapters(amount_of_openvpn_servers).await?; + crate::util::logger::log_info("=== TAP Adapter Check Complete ===").await; + } + + // STEP 2: Continue with normal VPN pool initialization + let mut ovpn_files = Vec::new(); + + // Recursively find all .ovpn files + let mut entries = tokio::fs::read_dir(ovpn_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + + if path.is_dir() { + let mut sub_entries = tokio::fs::read_dir(&path).await?; + while let Some(sub_entry) = sub_entries.next_entry().await? { + let sub_path = sub_entry.path(); + if sub_path.extension().and_then(|s| s.to_str()) == Some("ovpn") + && is_tcp_443_config(&sub_path) { + ovpn_files.push(sub_path); + } + } + } else if path.extension().and_then(|s| s.to_str()) == Some("ovpn") + && is_tcp_443_config(&path) { + ovpn_files.push(path); + } + } + + // Deduplicate configs by server + let mut unique = Vec::new(); + use std::collections::HashSet; + let mut seen: HashSet = HashSet::new(); + for cfg in ovpn_files { + match tokio_fs::read_to_string(&cfg).await { + Ok(contents) => { + let mut server_key = None; + for line in contents.lines() { + let l = line.trim(); + if l.starts_with('#') || l.is_empty() { continue; } + if l.starts_with("remote ") { + let parts: Vec<&str> = l.split_whitespace().collect(); + if parts.len() >= 2 { + let host = parts[1]; + let port = if parts.len() >= 3 { parts[2] } else { "" }; + let key = if port.is_empty() { + host.to_string() + } else { + format!("{}:{}", host, port) + }; + server_key = Some(key); + break; + } + } + } + let key = server_key.unwrap_or_else(|| { + cfg.file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unknown") + .to_string() + }); + if !seen.contains(&key) { + seen.insert(key); + unique.push(cfg); + } + } + Err(_) => { + unique.push(cfg); + } + } + } + ovpn_files = unique; + + if ovpn_files.is_empty() { + return Err(anyhow!("No .ovpn files found in {:?}", ovpn_dir)); + } + + crate::util::logger::log_info(&format!( + "Found {} OpenVPN configurations", + ovpn_files.len() + )).await; + + // Build shared HTTP client + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(5)) + .pool_max_idle_per_host(8) + .build() + .context("Failed to build HTTP client for IP detection")?; + + // Detect baseline IP + let baseline_ip: Option = match client + .get("https://api.ipify.org?format=json") + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + if let Ok(json) = resp.json::().await { + json.get("ip") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + } else { None } + } else { None } + } + Err(_) => None, + }; + + if let Some(ref ip) = baseline_ip { + crate::util::logger::log_info(&format!("Baseline IP for pool: {}", ip)).await; + } + + // Create VPN instances + let mut instances = Vec::new(); + for config_path in ovpn_files { + let instance = VpnInstance::new( + config_path, + username.clone(), + password.clone(), + baseline_ip.clone(), + )?; + instances.push(Arc::new(Mutex::new(instance))); + } + + Ok(Self { + instances, + current_index: Arc::new(Mutex::new(0)), + enable_rotation, + tasks_per_session, + }) + } + + /// Connects all VPN instances in parallel (much faster than sequential) + pub async fn connect_all(&self) -> Result<()> { + crate::util::logger::log_info("Connecting all VPN instances in batches...").await; + + let start_time = std::time::Instant::now(); + + // Auto-detect optimal batch size based on available TAP adapters + let batch_size = Self::detect_optimal_batch_size().await; + + let mut connected = 0; + let mut failed = 0; + + // Process instances in batches + for (batch_num, chunk) in self.instances.chunks(batch_size).enumerate() { + crate::util::logger::log_info(&format!( + "Starting batch {}/{} ({} VPNs)...", + batch_num + 1, + (self.instances.len() + batch_size - 1) / batch_size, + chunk.len() + )).await; + + // Spawn parallel tasks for this batch + let mut tasks = Vec::new(); + for (i, instance) in chunk.iter().enumerate() { + let instance_clone = Arc::clone(instance); + let global_index = batch_num * batch_size + i; + + let task = tokio::spawn(async move { + let mut inst = instance_clone.lock().await; + let hostname = inst.hostname().to_string(); + + match inst.connect().await { + Ok(_) => { + let ip = inst.external_ip().unwrap_or("unknown"); + crate::util::logger::log_info(&format!( + "✓ VPN {} ({}) connected with IP: {}", + global_index + 1, hostname, ip + )).await; + Ok(()) + } + Err(e) => { + crate::util::logger::log_warn(&format!( + "⚠ Failed to connect VPN {} ({}): {}", + global_index + 1, hostname, e + )).await; + Err(e) + } + } + }); + tasks.push(task); + } + + // Wait for this batch to complete + let results = futures::future::join_all(tasks).await; + + for result in results { + match result { + Ok(Ok(_)) => connected += 1, + Ok(Err(_)) | Err(_) => failed += 1, + } + } + + // Small delay between batches to let TAP adapters stabilize + if batch_num < (self.instances.len() + batch_size - 1) / batch_size - 1 { + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + + let elapsed = start_time.elapsed(); + + if connected == 0 && failed > 0 { + crate::util::logger::log_error(&format!( + "✗ FATAL: All {} VPN connection attempts failed in {:.1}s. Make sure OpenVPN is installed:\n Windows: https://openvpn.net/community-downloads/\n Linux: sudo apt-get install openvpn\n macOS: brew install openvpn", + failed, elapsed.as_secs_f64() + )).await; + return Err(anyhow!("Failed to connect any VPN instances. OpenVPN may not be installed.")); + } else if failed > 0 { + crate::util::logger::log_warn(&format!( + "⚠ Connected {} VPN instances successfully, {} failed in {:.1}s (will use available instances)", + connected, failed, elapsed.as_secs_f64() + )).await; + } else { + crate::util::logger::log_info(&format!( + "✓ All {} VPN instances connected successfully in {:.1}s", + connected, elapsed.as_secs_f64() + )).await; + } + + Ok(()) + } + + /// Gets the next available VPN instance (round-robin) + pub async fn acquire(&self) -> Result>> { + let mut index = self.current_index.lock().await; + let instance = self.instances[*index % self.instances.len()].clone(); + *index += 1; + Ok(instance) + } + + /// Rotates a VPN instance if rotation is enabled and threshold is met + pub async fn rotate_if_needed(&self, instance: Arc>) -> Result<()> { + if !self.enable_rotation { + return Ok(()); + } + + let mut inst = instance.lock().await; + if inst.increment_task_count(self.tasks_per_session) { + crate::util::logger::log_info(&format!( + "Task threshold reached for VPN {}, rotating...", + inst.hostname() + )).await; + + let old_ip = inst.external_ip().map(|s| s.to_string()); + inst.reconnect().await?; + let new_ip = inst.external_ip().map(|s| s.to_string()); + + match (old_ip, new_ip) { + (Some(old), Some(new)) if old != new => { + crate::util::logger::log_info(&format!( + "✓ VPN {} rotated: {} -> {}", + inst.hostname(), old, new + )).await; + } + (Some(old), Some(new)) if old == new => { + crate::util::logger::log_warn(&format!( + "⚠ VPN {} reconnected but IP unchanged: {}", + inst.hostname(), old + )).await; + } + _ => { + crate::util::logger::log_error(&format!( + "✗ VPN {} rotation verification failed", + inst.hostname() + )).await; + } + } + + inst.reset_task_count(); + } + + Ok(()) + } + + /// Performs health checks on all VPN instances with automatic reconnection + pub async fn health_check_all_with_reconnect(&self) -> Result<()> { + for instance in &self.instances { + let mut inst = instance.lock().await; + let _ = inst.health_check_with_reconnect().await; + } + Ok(()) + } + + /// Returns the number of VPN instances + pub fn len(&self) -> usize { + self.instances.len() + } + + /// Disconnects all VPN instances + pub async fn disconnect_all(&self) -> Result<()> { + crate::util::logger::log_info("Disconnecting all VPN instances...").await; + for instance in &self.instances { + let mut inst = instance.lock().await; + inst.disconnect().await?; + } + Ok(()) + } + + /// Amount of taps adapters currently installed + /// One VPN connection requires one TAP adapter + pub async fn ensure_tap_adapters(amount_of_openvpn_servers: usize) -> Result { + #[cfg(target_os = "windows")] + { + use std::fs; + + crate::util::logger::log_info("Checking TAP adapter availability...").await; + + let tap_count = Self::count_tap_adapters().await?; + + if tap_count >= amount_of_openvpn_servers { + crate::util::logger::log_info(&format!( + "✓ Found {} TAP adapters - sufficient for parallel VPN connections", + tap_count + )).await; + return Ok(tap_count); + } + + crate::util::logger::log_warn(&format!( + "⚠ Only {} TAP adapter(s) found. Installing {} more for optimal performance...", + tap_count, + amount_of_openvpn_servers - tap_count + )).await; + + // Create PowerShell script + let script_content = format!( + r#"# Auto-generated TAP adapter installation script +# Requires Administrator privileges + +$ErrorActionPreference = "Stop" + +# Check if running as Administrator +$currentPrincipal = New-Object Security.Principal.WindowsPrincipal([Security.Principal.WindowsIdentity]::GetCurrent()) +$isAdmin = $currentPrincipal.IsInRole([Security.Principal.WindowsBuiltInRole]::Administrator) + +if (-not $isAdmin) {{ + Write-Host "ERROR: This script must be run as Administrator!" -ForegroundColor Red + Write-Host "Right-click PowerShell and select 'Run as Administrator'" -ForegroundColor Yellow + exit 1 +}} + +Write-Host "Installing additional TAP adapters..." -ForegroundColor Cyan + +$tapctlPath = "C:\Program Files\OpenVPN\bin\tapctl.exe" + +if (-not (Test-Path $tapctlPath)) {{ + Write-Host "ERROR: OpenVPN not found at: $tapctlPath" -ForegroundColor Red + Write-Host "Please install OpenVPN from: https://openvpn.net/community-downloads/" -ForegroundColor Yellow + exit 1 +}} + +$existingCount = {} +$targetCount = 10 + +for ($i = ($existingCount + 1); $i -le $targetCount; $i++) {{ + Write-Host "Creating TAP adapter #$i..." -ForegroundColor Yellow + + try {{ + & $tapctlPath create --name "OpenVPN-TAP-$i" + + if ($LASTEXITCODE -eq 0) {{ + Write-Host " ✓ Created OpenVPN-TAP-$i" -ForegroundColor Green + }} else {{ + Write-Host " ⚠ Failed to create adapter (exit code: $LASTEXITCODE)" -ForegroundColor Red + }} + }} catch {{ + Write-Host " ✗ Error: $_" -ForegroundColor Red + }} + + Start-Sleep -Milliseconds 500 +}} + +Write-Host "`n✓ TAP adapter installation complete!" -ForegroundColor Green +Write-Host "Verifying installation..." -ForegroundColor Cyan + +$finalCount = (Get-NetAdapter | Where-Object {{ $_.InterfaceDescription -like "*TAP*" }}).Count +Write-Host "Total TAP adapters now: $finalCount" -ForegroundColor Cyan + +exit 0 +"#, + tap_count + ); + + let script_path = std::env::current_dir()?.join("install_tap_adapters.ps1"); + + // Write script to file + fs::write(&script_path, script_content) + .context("Failed to create PowerShell script")?; + + crate::util::logger::log_info(&format!( + "Created installation script: {:?}", + script_path + )).await; + + // Execute PowerShell script as Administrator + crate::util::logger::log_info( + "Executing TAP adapter installation (requires Administrator)..." + ).await; + + let output = Command::new("powershell") + .args(&[ + "-ExecutionPolicy", "Bypass", + "-NoProfile", + "-File", script_path.to_str().unwrap() + ]) + .output().await; + + match output { + Ok(result) => { + let stdout = String::from_utf8_lossy(&result.stdout); + let stderr = String::from_utf8_lossy(&result.stderr); + + if !stdout.is_empty() { + crate::util::logger::log_info(&format!("Script output:\n{}", stdout)).await; + } + + if result.status.success() { + crate::util::logger::log_info( + "✓ TAP adapters installed successfully" + ).await; + + // Recount adapters + tokio::time::sleep(Duration::from_secs(2)).await; + let new_count = Self::count_tap_adapters().await?; + + crate::util::logger::log_info(&format!( + "✓ Total TAP adapters now: {}", + new_count + )).await; + + return Ok(new_count); + } else { + let error_msg = if stderr.contains("Administrator") { + "Administrator privileges required. Please run as Administrator." + } else if !stderr.is_empty() { + &stderr + } else { + "Unknown error during installation" + }; + + crate::util::logger::log_error(&format!( + "✗ TAP adapter installation failed: {}", + error_msg + )).await; + + crate::util::logger::log_warn( + "Continuing with existing adapters. VPN connections will be sequential." + ).await; + + return Ok(tap_count); + } + } + Err(e) => { + crate::util::logger::log_error(&format!( + "Failed to execute installation script: {}", + e + )).await; + + crate::util::logger::log_info(&format!( + "You can manually run: {:?}", + script_path + )).await; + + return Ok(tap_count); + } + } + } + + #[cfg(not(target_os = "windows"))] + { + // Non-Windows: TAP adapters not needed + Ok(10) + } + } + + /// Counts existing TAP adapters on the system + async fn count_tap_adapters() -> Result { + #[cfg(target_os = "windows")] + { + let output = Command::new("ipconfig") + .arg("/all") + .output().await + .context("Failed to execute ipconfig")?; + + let output_str = String::from_utf8_lossy(&output.stdout); + + let count = output_str + .lines() + .filter(|line| { + let lower = line.to_lowercase(); + (lower.contains("tap") && lower.contains("adapter")) + || lower.contains("tap-windows") + }) + .count() / 2; // Two lines per adapter + + Ok(count) + } + + #[cfg(not(target_os = "windows"))] + { + Ok(10) // Not applicable on non-Windows + } + } + + async fn detect_optimal_batch_size() -> usize { + #[cfg(target_os = "windows")] + { + use std::process::Command; + + // Try to detect TAP adapters via ipconfig + match Command::new("ipconfig") + .arg("/all") + .output() + { + Ok(output) => { + let output_str = String::from_utf8_lossy(&output.stdout); + + // Count lines containing "TAP" adapter references + // Two lines per tap adapter typically + let tap_count = output_str + .lines() + .filter(|line| { + let lower = line.to_lowercase(); + (lower.contains("tap") && lower.contains("adapter")) + || lower.contains("tap-windows") + }) + .count() / 2; + + if tap_count == 0 { + crate::util::logger::log_warn( + "⚠ No TAP adapters detected! VPN connections will likely fail." + ).await; + return 1; + } else if tap_count == 1 { + crate::util::logger::log_warn(&format!( + "⚠ Only 1 TAP adapter found. VPNs will connect sequentially.\n\ + For parallel connections, install more TAP adapters:\n\ + Run as Admin: cd 'C:\\Program Files\\OpenVPN\\bin' && .\\tapctl.exe create --name 'OpenVPN-TAP-2'" + )).await; + return 1; + } else { + crate::util::logger::log_info(&format!( + "✓ Found {} TAP adapters - enabling parallel VPN connections", + tap_count + )).await; + + // Use 75% of available adapters to leave some headroom + let optimal = ((tap_count as f32 * 0.75).ceil() as usize).max(1); + return optimal.min(6); // Cap at 6 for stability + } + } + Err(e) => { + crate::util::logger::log_warn(&format!( + "⚠ Failed to detect TAP adapters: {}. Using sequential connection.", + e + )).await; + return 1; + } + } + } + + #[cfg(not(target_os = "windows"))] + { + // Linux/macOS: TAP adapters aren't a limitation + // Can run more in parallel + return 4; + } + } +} + +fn is_tcp_443_config(path: &Path) -> bool { + // Get filename as string + let filename = match path.file_name().and_then(|n| n.to_str()) { + Some(name) => name.to_lowercase(), + None => return false, + }; + + // Check if contains both "tcp" and "443" + filename.contains("tcp") && filename.contains("443") +} \ No newline at end of file diff --git a/src/scraper/vpn_rotation_system.md b/src/scraper/vpn_rotation_system.md new file mode 100644 index 0000000..1173e29 --- /dev/null +++ b/src/scraper/vpn_rotation_system.md @@ -0,0 +1,397 @@ +# VPN Rotation System - Setup Checklist + +## 🚀 Quick Setup (5 Minutes) + +Follow these steps to get your VPN rotation system up and running: + +### ✅ Step 1: Install OpenVPN + +**Windows:** +```powershell +# Download installer +# https://openvpn.net/community-downloads/ + +# Install to default location +# Add to PATH: C:\Program Files\OpenVPN\bin + +# Verify installation +openvpn --version +``` + +**Linux (Ubuntu/Debian):** +```bash +sudo apt-get update +sudo apt-get install openvpn +openvpn --version +``` + +**macOS:** +```bash +brew install openvpn +openvpn --version +``` + +### ✅ Step 2: Install ForceBindIP (Windows Only) + +```powershell +# Download from: http://r1ch.net/projects/forcebindip + +# Extract ForceBindIP.exe and place in one of: +# Option 1: Project root +.\ForceBindIP.exe + +# Option 2: Tools directory +.\tools\ForceBindIP.exe + +# Option 3: Add to PATH +C:\Program Files\ForceBindIP\ForceBindIP.exe + +# Verify installation +ForceBindIP.exe +``` + +**Linux/macOS Users:** +- ForceBindIP is Windows-only +- Use network namespaces (Linux) or alternative routing +- See documentation for workarounds + +### ✅ Step 3: Update Cargo.toml + +Add these dependencies if not already present: + +```toml +[dependencies] +anyhow = "1.0" +tokio = { version = "1.0", features = ["full"] } +fantoccini = "0.19" +reqwest = { version = "0.11", features = ["blocking"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +chrono = "0.4" +once_cell = "1.19" +dotenvy = "0.15" +url = "2.5" +zip = "0.6" +``` + +### ✅ Step 4: Configure Environment + +Create or update `.env` file in project root: + +```bash +# Required: Date ranges +ECONOMIC_START_DATE=2007-02-13 +CORPORATE_START_DATE=2010-01-01 +ECONOMIC_LOOKAHEAD_MONTHS=3 + +# Required: Parallelism +MAX_PARALLEL_INSTANCES=5 +MAX_TASKS_PER_INSTANCE=0 + +# VPN Configuration +ENABLE_VPN_ROTATION=true +TASKS_PER_VPN_SESSION=50 +``` + +**Configuration Presets:** + +**Conservative (Recommended for first run):** +```bash +MAX_PARALLEL_INSTANCES=3 +TASKS_PER_VPN_SESSION=100 +``` + +**Balanced:** +```bash +MAX_PARALLEL_INSTANCES=5 +TASKS_PER_VPN_SESSION=50 +``` + +**Aggressive (Use with caution):** +```bash +MAX_PARALLEL_INSTANCES=10 +TASKS_PER_VPN_SESSION=25 +``` + +### ✅ Step 5: Add VPN Module Files + +Copy these files to your project: + +``` +src/ +├── scraper/ +│ ├── mod.rs (update with: pub mod vpn_manager; pub mod forcebindip;) +│ ├── vpn_manager.rs (new file - from artifact) +│ ├── forcebindip.rs (new file - from artifact) +│ └── webdriver.rs (replace with VPN-enabled version) +├── util/ +│ ├── mod.rs (already includes opnv) +│ ├── opnv.rs (already present) +│ └── ... +├── main.rs (replace with VPN-enabled version) +└── lib.rs (update to expose VPN modules) +``` + +### ✅ Step 6: Verify Directory Structure + +Ensure these directories exist (will be auto-created): + +``` +project/ +├── cache/ +│ ├── openvpn/ (VPN configs stored here) +│ └── temp_vpn_zips/ (temporary, auto-cleaned) +├── logs/ (application logs) +├── data/ +│ ├── economic/ +│ └── corporate/ +└── chromedriver-win64/ + └── chromedriver.exe +``` + +### ✅ Step 7: Test Installation + +**Test 1: OpenVPN** +```bash +openvpn --version +# Should output version info +``` + +**Test 2: ForceBindIP (Windows)** +```powershell +ForceBindIP.exe 127.0.0.1 cmd.exe /c echo test +# Should output: test +``` + +**Test 3: Build Project** +```bash +cargo build --release +# Should compile without errors +``` + +**Test 4: Dry Run (No VPN)** +```bash +# Temporarily disable VPN +# Set in .env: ENABLE_VPN_ROTATION=false + +cargo run --release +# Should initialize ChromeDriver pool and run +``` + +### ✅ Step 8: First VPN-Enabled Run + +```bash +# Enable VPN in .env +ENABLE_VPN_ROTATION=true +TASKS_PER_VPN_SESSION=0 # Start with phase rotation only + +# Run application +cargo run --release + +# Watch logs +tail -f logs/backtest_*.log +``` + +**Expected Output:** +``` +[HH:MM:SS] [INFO] === Application started === +[HH:MM:SS] [INFO] === VPN Rotation Enabled === +[HH:MM:SS] [INFO] --- Fetching latest VPNBook OpenVPN configurations --- +[HH:MM:SS] [INFO] ✓ Fetched VPN credentials - Username: vpnbook +[HH:MM:SS] [INFO] ✓ Downloaded 6 .ovpn configuration files +[HH:MM:SS] [INFO] --- Initializing VPN Pool --- +[HH:MM:SS] [INFO] Found 6 OpenVPN configurations +[HH:MM:SS] [INFO] --- Connecting to VPN servers --- +[HH:MM:SS] [INFO] Starting VPN connection for ca149.vpnbook.com +[HH:MM:SS] [INFO] ✓ VPN ca149.vpnbook.com connected with IP: 142.4.217.133 +... +[HH:MM:SS] [INFO] ✓ ChromeDriver pool initialized successfully +``` + +## 🎯 Common Issues and Solutions + +### Issue: "openvpn: command not found" +```bash +# Windows: Add to PATH +setx PATH "%PATH%;C:\Program Files\OpenVPN\bin" + +# Linux: Install package +sudo apt-get install openvpn + +# Verify +which openvpn +``` + +### Issue: "ForceBindIP.exe not found" +```powershell +# Place in project root +curl -o ForceBindIP.exe http://r1ch.net/projects/forcebindip/ForceBindIP.exe + +# Or add to PATH +setx PATH "%PATH%;C:\path\to\ForceBindIP" +``` + +### Issue: VPN Connection Timeout +```bash +# Try different config file +# VPNBook offers multiple servers/protocols +# Look in cache/openvpn/ after first fetch + +# Files named like: +# - vpnbook-ca149-tcp80.ovpn (TCP port 80 - most compatible) +# - vpnbook-ca149-tcp443.ovpn (TCP port 443 - works through most firewalls) +# - vpnbook-ca149-udp53.ovpn (UDP port 53 - faster but may be blocked) + +# Check firewall settings +# - Allow OpenVPN.exe through Windows Firewall +# - Allow outbound connections on ports 80, 443, 53, 1194 +``` + +### Issue: "Failed to spawn chromedriver" +```bash +# Verify chromedriver path +ls chromedriver-win64/chromedriver.exe + +# Check Chrome/ChromeDriver version match +chromedriver.exe --version +# Chrome version should be compatible + +# Update ChromeDriver if needed +# Download from: https://chromedriver.chromium.org/ +``` + +### Issue: "Semaphore closed" +```bash +# Reduce parallelism in .env +MAX_PARALLEL_INSTANCES=3 + +# Or increase system resources +# Check Task Manager / Activity Monitor +``` + +## 📊 Performance Tuning + +### Optimize for Speed +```bash +MAX_PARALLEL_INSTANCES=10 +TASKS_PER_VPN_SESSION=100 +# More instances, less frequent rotation +# Risk: More aggressive, may hit rate limits +``` + +### Optimize for Stealth +```bash +MAX_PARALLEL_INSTANCES=2 +TASKS_PER_VPN_SESSION=10 +# Fewer instances, frequent rotation +# Risk: Slower, but more IP diversity +``` + +### Optimize for Stability +```bash +MAX_PARALLEL_INSTANCES=5 +TASKS_PER_VPN_SESSION=50 +# Balanced approach (recommended) +``` + +## 🔍 Monitoring and Logs + +### Key Log Files +``` +logs/ +└── backtest_YYYYMMDD_HHMMSS.log +``` + +### Important Log Patterns + +**Successful VPN Connection:** +``` +[INFO] ✓ VPN ca149.vpnbook.com connected with IP: 142.4.217.133 +``` + +**VPN Rotation:** +``` +[INFO] ✓ VPN ca149.vpnbook.com rotated: 142.4.217.133 -> 142.4.217.201 +``` + +**Health Issues:** +``` +[WARN] ⚠ Health check failed for VPN us1.vpnbook.com +[INFO] Attempting to reconnect unhealthy VPN: us1.vpnbook.com +``` + +**Binding ChromeDriver:** +``` +[INFO] Binding ChromeDriver to VPN IP: 142.4.217.133 +``` + +### Monitor Real-Time +```bash +# Linux/macOS +tail -f logs/backtest_*.log + +# Windows PowerShell +Get-Content logs\backtest_*.log -Wait -Tail 50 +``` + +### Search Logs +```bash +# Count successful connections +grep "connected with IP" logs/*.log | wc -l + +# Find errors +grep ERROR logs/*.log + +# Track rotations +grep "rotated:" logs/*.log + +# Find failed tasks +grep "failed" logs/*.log +``` + +## 🚦 Next Steps + +1. **✅ Complete Setup**: Verify all checkboxes above +2. **🧪 Test Run**: Run with `TASKS_PER_VPN_SESSION=0` first +3. **📊 Monitor**: Watch logs during first run +4. **⚙️ Tune**: Adjust configuration based on results +5. **🔄 Iterate**: Increase parallelism gradually +6. **📈 Scale**: Once stable, increase to production levels + +## 📚 Additional Resources + +- **VPNBook Website**: https://www.vpnbook.com/freevpn +- **OpenVPN Docs**: https://openvpn.net/community-resources/ +- **ForceBindIP**: http://r1ch.net/projects/forcebindip +- **ChromeDriver**: https://chromedriver.chromium.org/ + +## 🆘 Getting Help + +If you encounter issues: + +1. **Check Prerequisites**: Verify all software is installed +2. **Review Logs**: Look in `logs/` directory +3. **Test Components**: Test OpenVPN and ForceBindIP independently +4. **Simplify**: Start with `ENABLE_VPN_ROTATION=false` +5. **Document Error**: Note exact error message and context + +## 🎉 Success Criteria + +You're ready to proceed when you see: + +``` +✓ OpenVPN installed and in PATH +✓ ForceBindIP.exe accessible (Windows) +✓ Project compiles successfully +✓ VPN configurations fetched +✓ All VPNs connected +✓ ChromeDriver pool initialized +✓ First scraping task completed +``` + +**Congratulations! Your VPN rotation system is now operational.** + +--- + +*Last Updated: December 2024* +*Version: 1.0* \ No newline at end of file diff --git a/src/scraper/webdriver.rs b/src/scraper/webdriver.rs index 3037b01..f0e7a38 100644 --- a/src/scraper/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -8,52 +8,104 @@ use std::process::Stdio; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; +use tokio::task::JoinHandle; use tokio::sync::{Mutex, Semaphore}; use tokio::time::{sleep, timeout, Duration}; -/// Manages a pool of ChromeDriver instances for parallel scraping. -/// -/// This struct maintains multiple ChromeDriver processes and allows controlled -/// concurrent access via a semaphore. Instances are reused across tasks to avoid -/// the overhead of spawning new processes. +use super::vpn_manager::{VpnInstance, VpnPool}; + +#[cfg(target_os = "windows")] +use super::forcebindip::ForceBindIpManager; + +/// Manages a pool of ChromeDriver instances for parallel scraping with optional VPN binding. pub struct ChromeDriverPool { instances: Vec>>, semaphore: Arc, - tasks_per_instance: usize, + vpn_pool: Option>, + #[cfg(target_os = "windows")] + forcebindip: Option>, } impl ChromeDriverPool { - /// Creates a new pool with the specified number of ChromeDriver instances. - /// - /// # Arguments - /// * `pool_size` - Number of concurrent ChromeDriver instances to maintain + /// Creates a new pool with the specified number of ChromeDriver instances (no VPN). pub async fn new(pool_size: usize) -> Result { + Self::new_with_vpn_and_task_limit(pool_size, None, 0).await + } + + /// Creates a new ChromeDriver pool with task-per-instance tracking. + pub async fn new_with_task_limit( + pool_size: usize, + max_tasks_per_instance: usize, + ) -> Result { + Self::new_with_vpn_and_task_limit(pool_size, None, max_tasks_per_instance).await + } + + /// Creates a new pool with VPN support. + pub async fn new_with_vpn( + pool_size: usize, + vpn_pool: Option>, + ) -> Result { + Self::new_with_vpn_and_task_limit(pool_size, vpn_pool, 0).await + } + + /// Creates a new pool with VPN support and task-per-instance limits. + pub async fn new_with_vpn_and_task_limit( + pool_size: usize, + vpn_pool: Option>, + max_tasks_per_instance: usize, + ) -> Result { let mut instances = Vec::with_capacity(pool_size); - println!( - "Initializing ChromeDriver pool with {} instances...", - pool_size - ); - - for i in 0..pool_size { - match ChromeInstance::new().await { - Ok(instance) => { - println!(" ✓ Instance {} ready", i + 1); - instances.push(Arc::new(Mutex::new(instance))); + #[cfg(target_os = "windows")] + let forcebindip = if vpn_pool.is_some() { + match ForceBindIpManager::new() { + Ok(manager) => { + crate::util::logger::log_info("✓ ForceBindIP manager initialized").await; + Some(Arc::new(manager)) } Err(e) => { - eprintln!(" ✗ Failed to create instance {}: {}", i + 1, e); - // Clean up already created instances - drop(instances); - return Err(e); + crate::util::logger::log_warn(&format!( + "⚠ ForceBindIP not available: {}. Proceeding without IP binding.", + e + )).await; + None } } + } else { + None + }; + + crate::util::logger::log_info(&format!( + "Initializing ChromeDriver pool with {} instances{}{}...", + pool_size, + if vpn_pool.is_some() { " (VPN-enabled)" } else { "" }, + if max_tasks_per_instance > 0 { &format!(" (max {} tasks/instance)", max_tasks_per_instance) } else { "" } + )).await; + + for i in 0..pool_size { + // If VPN pool exists, acquire a VPN instance for this ChromeDriver + let vpn_instance = if let Some(ref vp) = vpn_pool { + Some(vp.acquire().await?) + } else { + None + }; + + #[cfg(target_os = "windows")] + let instance = ChromeInstance::new_with_task_limit(vpn_instance, forcebindip.clone(), max_tasks_per_instance).await?; + + #[cfg(not(target_os = "windows"))] + let instance = ChromeInstance::new_with_task_limit(vpn_instance, max_tasks_per_instance).await?; + + crate::util::logger::log_info(&format!(" ✓ Instance {} ready", i + 1)).await; + instances.push(Arc::new(Mutex::new(instance))); } Ok(Self { instances, semaphore: Arc::new(Semaphore::new(pool_size)), - tasks_per_instance: 0, + vpn_pool, + #[cfg(target_os = "windows")] + forcebindip, }) } @@ -72,9 +124,31 @@ impl ChromeDriverPool { .map_err(|_| anyhow!("Semaphore closed"))?; // Find an available instance (round-robin or first available) - let instance = self.instances[0].clone(); // Simple: use first, could be round-robin + let instance = self.instances[0].clone(); let mut guard = instance.lock().await; + // Track task count + guard.increment_task_count(); + + // Get VPN info before creating session + let vpn_info = if let Some(ref vpn) = guard.vpn_instance { + let vpn_guard = vpn.lock().await; + Some(format!("{} ({})", + vpn_guard.hostname(), + vpn_guard.external_ip().unwrap_or("unknown"))) + } else { + None + }; + + // Log task count if limit is set + if guard.max_tasks_per_instance > 0 { + crate::util::logger::log_info(&format!( + "Instance task count: {}/{}", + guard.get_task_count(), + guard.max_tasks_per_instance + )).await; + } + // Create a new session for this task let client = guard.new_session().await?; @@ -82,40 +156,137 @@ impl ChromeDriverPool { drop(guard); // Navigate and parse + if let Some(ref info) = vpn_info { + crate::util::logger::log_info(&format!("Scraping {} via VPN: {}", url, info)).await; + } + client.goto(&url).await.context("Failed to navigate")?; let result = timeout(Duration::from_secs(60), parse(client)) .await .context("Parse function timed out after 60s")??; + // Handle VPN rotation if needed + if let Some(ref vpn_pool) = self.vpn_pool { + let mut guard = instance.lock().await; + if let Some(ref vpn) = guard.vpn_instance { + vpn_pool.rotate_if_needed(vpn.clone()).await?; + guard.reset_task_count(); // Reset task count on VPN rotation + } + } + Ok(result) } pub fn get_number_of_instances(&self) -> usize { self.instances.len() } + + /// Returns whether VPN is enabled for this pool + pub fn is_vpn_enabled(&self) -> bool { + self.vpn_pool.is_some() + } + + /// Gracefully shutdown all ChromeDriver instances in the pool. + pub async fn shutdown(&self) -> Result<()> { + crate::util::logger::log_info("Shutting down ChromeDriverPool instances...").await; + for inst in &self.instances { + crate::util::logger::log_info("Shutting down a ChromeDriver instance...").await; + let mut guard = inst.lock().await; + if let Err(e) = guard.shutdown().await { + crate::util::logger::log_warn(&format!("Error shutting down instance: {}", e)).await; + } + } + crate::util::logger::log_info("All ChromeDriver instances shut down").await; + Ok(()) + } } -/// Represents a single instance of chromedriver process. +/// Represents a single instance of chromedriver process, optionally bound to a VPN. pub struct ChromeInstance { process: Child, base_url: String, + vpn_instance: Option>>, + task_count: usize, + max_tasks_per_instance: usize, + // Optional join handle for background stderr logging task + stderr_log: Option>, } impl ChromeInstance { - /// Creates a new ChromeInstance by spawning chromedriver with random port. - /// - /// This spawns `chromedriver --port=0` to avoid port conflicts, reads stdout to extract - /// the listening address, and waits for the success message. If timeout occurs or - /// spawning fails, returns an error with context. - /// - /// # Errors - /// - /// Returns an error if chromedriver fails to spawn (e.g., not in PATH, version mismatch), - /// if the process exits early, or if the address/success message isn't found within 30s. - pub async fn new() -> Result { - let mut command = Command::new("chromedriver-win64/chromedriver.exe"); + /// Creates a new ChromeInstance, optionally bound to a VPN IP. + #[cfg(target_os = "windows")] + pub async fn new( + vpn_instance: Option>>, + forcebindip: Option>, + ) -> Result { + Self::new_with_task_limit(vpn_instance, forcebindip, 0).await + } + + /// Creates a new ChromeInstance with task-per-instance limit, bound to a VPN IP if provided. + #[cfg(target_os = "windows")] + pub async fn new_with_task_limit( + vpn_instance: Option>>, + forcebindip: Option>, + max_tasks_per_instance: usize, + ) -> Result { + let bind_ip = if let Some(ref vpn) = vpn_instance { + let vpn_guard = vpn.lock().await; + vpn_guard.external_ip().map(|s| s.to_string()) + } else { + None + }; + + let mut command = if let (Some(ip), Some(fb)) = (&bind_ip, &forcebindip) { + // Use ForceBindIP to bind ChromeDriver to specific VPN IP + crate::util::logger::log_info(&format!("Binding ChromeDriver to VPN IP: {}", ip)).await; + let mut std_cmd = fb.create_bound_command( + ip, + std::path::Path::new("chromedriver-win64/chromedriver.exe"), + &["--port=0"], + ); + Command::from(std_cmd) + } else { + let mut cmd = Command::new("chromedriver-win64/chromedriver.exe"); + cmd.arg("--port=0"); + cmd + }; + + command.stdout(Stdio::piped()).stderr(Stdio::piped()); + + let mut process = command + .spawn() + .context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?; + + let (base_url, stderr_handle) = Self::wait_for_chromedriver_start(&mut process).await?; + + Ok(Self { + process, + base_url, + vpn_instance, + task_count: 0, + max_tasks_per_instance, + stderr_log: stderr_handle, + }) + } + + /// Creates a new ChromeInstance on non-Windows platforms (no ForceBindIP support). + #[cfg(not(target_os = "windows"))] + pub async fn new(vpn_instance: Option>>) -> Result { + Self::new_with_task_limit(vpn_instance, 0).await + } + + /// Creates a new ChromeInstance on non-Windows platforms with task-per-instance limit. + #[cfg(not(target_os = "windows"))] + pub async fn new_with_task_limit(vpn_instance: Option>>, max_tasks_per_instance: usize) -> Result { + if vpn_instance.is_some() { + crate::util::logger::log_warn( + "⚠ VPN binding requested but ForceBindIP is not available on this platform" + ).await; + } + + let mut command = Command::new("chromedriver"); command - .arg("--port=0") // Use random available port to support pooling + .arg("--port=0") .stdout(Stdio::piped()) .stderr(Stdio::piped()); @@ -123,20 +294,38 @@ impl ChromeInstance { .spawn() .context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?; + let (base_url, stderr_handle) = Self::wait_for_chromedriver_start(&mut process).await?; + + Ok(Self { + process, + base_url, + vpn_instance, + task_count: 0, + max_tasks_per_instance, + stderr_log: stderr_handle, + }) + } + + /// Waits for ChromeDriver to start and extracts the listening address. + async fn wait_for_chromedriver_start(process: &mut Child) -> Result<(String, Option>)> { let mut stdout = BufReader::new(process.stdout.take().context("Failed to capture stdout")?).lines(); - let mut stderr = - BufReader::new(process.stderr.take().context("Failed to capture stderr")?).lines(); + let stderr_reader = process.stderr.take().context("Failed to capture stderr")?; let start_time = std::time::Instant::now(); let mut address: Option = None; let mut success = false; - // Log stderr in background for debugging - tokio::spawn(async move { - while let Ok(Some(line)) = stderr.next_line().await { - eprintln!("ChromeDriver stderr: {}", line); + // Log stderr in background for debugging and return the JoinHandle so we can + // abort/await it during shutdown. + let stderr_handle: JoinHandle<()> = tokio::spawn(async move { + let mut stderr_lines = BufReader::new(stderr_reader).lines(); + while let Ok(Some(line)) = stderr_lines.next_line().await { + let trimmed = line.trim(); + if !trimmed.is_empty() { + crate::util::logger::log_info(&format!("ChromeDriver stderr: {}", trimmed)).await; + } } }); @@ -152,10 +341,7 @@ impl ChromeInstance { } if let (Some(addr), true) = (&address, success) { - return Ok(Self { - process, - base_url: addr.clone(), - }); + return Ok((addr.clone(), Some(stderr_handle))); } } @@ -164,11 +350,13 @@ impl ChromeInstance { // Cleanup on failure let _ = process.kill().await; - Err(anyhow!("Timeout: ChromeDriver did not start within 30 seconds. Check version match with Chrome browser and system resources.")) + // If we timed out, abort stderr logging task + stderr_handle.abort(); + let _ = stderr_handle.await; + Err(anyhow!("Timeout: ChromeDriver did not start within 30 seconds")) } /// Creates a new browser session (client) from this ChromeDriver instance. - /// Each session is independent and can be closed without affecting the driver. pub async fn new_session(&self) -> Result { ClientBuilder::native() .capabilities(Self::chrome_args()) @@ -177,11 +365,47 @@ impl ChromeInstance { .context("Failed to create new session") } + /// Increments task counter and returns whether limit has been reached + pub fn increment_task_count(&mut self) -> bool { + if self.max_tasks_per_instance > 0 { + self.task_count += 1; + self.task_count >= self.max_tasks_per_instance + } else { + false + } + } + + /// Resets task counter (called when VPN is rotated) + pub fn reset_task_count(&mut self) { + self.task_count = 0; + } + + /// Returns current task count for this instance + pub fn get_task_count(&self) -> usize { + self.task_count + } + + /// Gracefully shutdown the chromedriver process and background log tasks. + pub async fn shutdown(&mut self) -> Result<()> { + // Abort and await stderr logging task if present + if let Some(handle) = self.stderr_log.take() { + handle.abort(); + let _ = handle.await; + } + + // Try to terminate the child process + let _ = self.process.start_kill(); + // Await the process to ensure resources are released + let _ = self.process.wait().await; + + Ok(()) + } + fn chrome_args() -> Map { let args = serde_json::json!({ "goog:chromeOptions": { "args": [ - "--headless=new", + "--headless", "--disable-gpu", "--no-sandbox", "--disable-dev-shm-usage", @@ -191,13 +415,14 @@ impl ChromeInstance { "--disable-notifications", "--disable-logging", "--disable-autofill", - "--disable-features=TranslateUI,OptimizationGuideModelDownloading", + "--disable-sync", + "--disable-default-apps", + "--disable-translate", "--window-size=1920,1080", "--disable-blink-features=AutomationControlled", - "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ], "excludeSwitches": ["enable-logging", "enable-automation"], - "useAutomationExtension": false, "prefs": { "profile.default_content_setting_values.notifications": 2 } @@ -209,10 +434,6 @@ impl ChromeInstance { } } -/// Parses the ChromeDriver address from a log line. -/// -/// Looks for the "Starting ChromeDriver ... on port XXXX" line and extracts the port. -/// Returns `Some("http://localhost:XXXX")` if found, else `None`. fn parse_chromedriver_address(line: &str) -> Option { if line.contains("Starting ChromeDriver") { if let Some(port_str) = line.split("on port ").nth(1) { @@ -223,7 +444,6 @@ fn parse_chromedriver_address(line: &str) -> Option { } } } - // Fallback for other formats (e.g., explicit port mentions) for word in line.split_whitespace() { if let Ok(port) = word.trim_matches(|c: char| !c.is_numeric()).parse::() { if port > 1024 && port < 65535 && line.to_lowercase().contains("port") { @@ -236,14 +456,13 @@ fn parse_chromedriver_address(line: &str) -> Option { impl Drop for ChromeInstance { fn drop(&mut self) { + // Signal child to terminate. Do NOT block here; shutdown should be + // performed with the async `shutdown()` method when possible. let _ = self.process.start_kill(); - std::thread::sleep(std::time::Duration::from_millis(100)); } } -/// Simplified task execution - now uses the pool pattern. -/// -/// For backwards compatibility with existing code. +/// Simplified task execution - uses the pool pattern. pub struct ScrapeTask { url: String, parse: Box< @@ -263,7 +482,6 @@ impl ScrapeTask { } } - /// Executes using a provided pool (more efficient for multiple tasks). pub async fn execute_with_pool(self, pool: &ChromeDriverPool) -> Result { let url = self.url; let parse = self.parse; @@ -271,4 +489,4 @@ impl ScrapeTask { pool.execute(url, move |client| async move { (parse)(client).await }) .await } -} +} \ No newline at end of file diff --git a/src/util/directories.rs b/src/util/directories.rs index e81abdf..07a3d8e 100644 --- a/src/util/directories.rs +++ b/src/util/directories.rs @@ -1,8 +1,6 @@ use std::path::{Path, PathBuf}; use std::fs; -use crate::util::opnv; - /// Central configuration for all data paths pub struct DataPaths { base_dir: PathBuf, diff --git a/test/vpn_integration_tests.rs b/test/vpn_integration_tests.rs new file mode 100644 index 0000000..5fc2b76 --- /dev/null +++ b/test/vpn_integration_tests.rs @@ -0,0 +1,379 @@ +// tests/vpn_integration_tests.rs +//! Integration tests for VPN rotation system + +#[cfg(test)] +mod vpn_tests { + use event_backtest_engine::{ + scraper::{ + webdriver::ChromeDriverPool, + vpn_manager::{VpnInstance, VpnPool}, + }, + util::{directories::DataPaths, opnv}, + }; + use std::path::PathBuf; + use std::sync::Arc; + + /// Helper to create a test VPN instance without connecting + fn create_test_vpn_instance() -> VpnInstance { + VpnInstance::new( + PathBuf::from("test.ovpn"), + "testuser".to_string(), + "testpass".to_string(), + ) + .expect("Failed to create test VPN instance") + } + + #[test] + fn test_vpn_instance_creation() { + let vpn = create_test_vpn_instance(); + assert_eq!(vpn.hostname(), "test"); + assert!(!vpn.is_healthy()); + assert!(vpn.external_ip().is_none()); + } + + #[test] + fn test_vpn_task_counting() { + let mut vpn = create_test_vpn_instance(); + + // Should not rotate initially + assert!(!vpn.increment_task_count(10)); + + // Increment tasks + for i in 1..10 { + assert!(!vpn.increment_task_count(10), "Should not rotate at task {}", i); + } + + // Should rotate at threshold + assert!(vpn.increment_task_count(10), "Should rotate at task 10"); + + // Reset and verify + vpn.reset_task_count(); + assert!(!vpn.increment_task_count(10), "Should not rotate after reset"); + } + + #[test] + fn test_vpn_task_counting_zero_threshold() { + let mut vpn = create_test_vpn_instance(); + + // With threshold=0, should never auto-rotate + for _ in 0..100 { + assert!(!vpn.increment_task_count(0)); + } + } + + #[tokio::test] + async fn test_chromedriver_pool_creation_no_vpn() { + let result = ChromeDriverPool::new(2).await; + + match result { + Ok(pool) => { + assert_eq!(pool.get_number_of_instances(), 2); + assert!(!pool.is_vpn_enabled()); + } + Err(e) => { + eprintln!("ChromeDriver pool creation failed (expected if chromedriver not installed): {}", e); + } + } + } + + #[test] + fn test_data_paths_creation() { + let paths = DataPaths::new("./test_data").expect("Failed to create paths"); + + assert!(paths.data_dir().exists()); + assert!(paths.cache_dir().exists()); + assert!(paths.logs_dir().exists()); + assert!(paths.cache_openvpn_dir().exists()); + + // Cleanup + let _ = std::fs::remove_dir_all("./test_data"); + } + + #[tokio::test] + #[ignore] // This test requires actual network access and VPNBook availability + async fn test_fetch_vpnbook_configs() { + let paths = DataPaths::new(".").expect("Failed to create paths"); + + // This test requires a ChromeDriver pool + let pool_result = ChromeDriverPool::new(1).await; + if pool_result.is_err() { + eprintln!("Skipping VPNBook fetch test: ChromeDriver not available"); + return; + } + + let pool = Arc::new(pool_result.unwrap()); + + let result = opnv::fetch_vpnbook_configs(&pool, paths.cache_dir()).await; + + match result { + Ok((username, password, files)) => { + assert!(!username.is_empty(), "Username should not be empty"); + assert!(!password.is_empty(), "Password should not be empty"); + assert!(!files.is_empty(), "Should fetch at least one config file"); + + println!("Fetched {} VPN configs", files.len()); + for file in &files { + assert!(file.exists(), "Config file should exist: {:?}", file); + assert_eq!(file.extension().and_then(|s| s.to_str()), Some("ovpn")); + } + } + Err(e) => { + eprintln!("VPNBook fetch failed (may be temporary): {}", e); + } + } + } + + #[tokio::test] + #[ignore] // Requires actual VPN configs and OpenVPN installation + async fn test_vpn_pool_creation() { + let paths = DataPaths::new(".").expect("Failed to create paths"); + + // First fetch configs + let pool_result = ChromeDriverPool::new(1).await; + if pool_result.is_err() { + eprintln!("Skipping VPN pool test: ChromeDriver not available"); + return; + } + + let temp_pool = Arc::new(pool_result.unwrap()); + let fetch_result = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await; + + if fetch_result.is_err() { + eprintln!("Skipping VPN pool test: Could not fetch configs"); + return; + } + + let (username, password, _) = fetch_result.unwrap(); + + // Create VPN pool + let vpn_pool_result = VpnPool::new( + paths.cache_openvpn_dir(), + username, + password, + false, + 0, + ).await; + + match vpn_pool_result { + Ok(vpn_pool) => { + assert!(vpn_pool.len() > 0, "VPN pool should have at least one instance"); + println!("Created VPN pool with {} instances", vpn_pool.len()); + } + Err(e) => { + eprintln!("VPN pool creation failed: {}", e); + } + } + } + + #[tokio::test] + #[ignore] // Full integration test - requires all components + async fn test_full_vpn_integration() { + let paths = DataPaths::new(".").expect("Failed to create paths"); + + // Step 1: Create temp ChromeDriver pool for fetching + let temp_pool = match ChromeDriverPool::new(1).await { + Ok(p) => Arc::new(p), + Err(e) => { + eprintln!("Skipping integration test: ChromeDriver not available - {}", e); + return; + } + }; + + // Step 2: Fetch VPNBook configs + let (username, password, files) = match opnv::fetch_vpnbook_configs( + &temp_pool, + paths.cache_dir() + ).await { + Ok(result) => result, + Err(e) => { + eprintln!("Skipping integration test: Config fetch failed - {}", e); + return; + } + }; + + assert!(!files.is_empty(), "Should have fetched configs"); + + // Step 3: Create VPN pool + let vpn_pool = match VpnPool::new( + paths.cache_openvpn_dir(), + username, + password, + true, + 5, + ).await { + Ok(pool) => Arc::new(pool), + Err(e) => { + eprintln!("Skipping integration test: VPN pool creation failed - {}", e); + return; + } + }; + + // Step 4: Connect one VPN + let vpn_instance = vpn_pool.acquire().await.expect("Failed to acquire VPN"); + let connect_result = { + let mut vpn = vpn_instance.lock().await; + vpn.connect().await + }; + + match connect_result { + Ok(_) => { + let vpn = vpn_instance.lock().await; + println!("✓ VPN connected: {} ({})", + vpn.hostname(), + vpn.external_ip().unwrap_or("unknown") + ); + assert!(vpn.is_healthy()); + assert!(vpn.external_ip().is_some()); + } + Err(e) => { + eprintln!("VPN connection failed: {}", e); + } + } + + // Step 5: Create ChromeDriver pool with VPN + let driver_pool_result = ChromeDriverPool::new_with_vpn( + 1, + Some(vpn_pool.clone()) + ).await; + + match driver_pool_result { + Ok(driver_pool) => { + assert!(driver_pool.is_vpn_enabled()); + println!("✓ ChromeDriver pool created with VPN binding"); + } + Err(e) => { + eprintln!("ChromeDriver pool creation failed: {}", e); + } + } + + // Step 6: Cleanup + vpn_pool.disconnect_all().await.expect("Failed to disconnect VPNs"); + println!("✓ Integration test complete"); + } + + #[test] + fn test_hostname_extraction() { + // Test the hostname extraction logic + let test_cases = vec![ + ("test/ca149.vpnbook.com/config.ovpn", "ca149.vpnbook.com"), + ("test/us1.vpnbook.com/config.ovpn", "us1.vpnbook.com"), + ("test/de4.vpnbook.com/config.ovpn", "de4.vpnbook.com"), + ]; + + for (path, expected_hostname) in test_cases { + let pb = PathBuf::from(path); + let hostname = pb.parent() + .and_then(|p| p.file_name()) + .and_then(|n| n.to_str()) + .unwrap_or("unknown"); + + assert_eq!(hostname, expected_hostname); + } + } + + #[cfg(target_os = "windows")] + #[test] + fn test_forcebindip_manager_creation() { + use event_backtest_engine::ForceBindIpManager; + + match ForceBindIpManager::new() { + Ok(manager) => { + println!("✓ ForceBindIP found at: {:?}", manager.path()); + assert!(manager.path().exists()); + } + Err(e) => { + eprintln!("ForceBindIP not found (expected in dev): {}", e); + } + } + } + + #[cfg(target_os = "windows")] + #[test] + fn test_forcebindip_command_creation() { + use event_backtest_engine::ForceBindIpManager; + use std::path::Path; + + if let Ok(manager) = ForceBindIpManager::new() { + let cmd = manager.create_bound_command( + "192.168.1.100", + Path::new("test.exe"), + &["--arg1", "value1"], + ); + + let cmd_str = format!("{:?}", cmd); + assert!(cmd_str.contains("192.168.1.100")); + assert!(cmd_str.contains("test.exe")); + println!("✓ ForceBindIP command created successfully"); + } + } + + #[test] + fn test_config_defaults() { + use event_backtest_engine::Config; + + let config = Config::default(); + assert_eq!(config.economic_start_date, "2007-02-13"); + assert_eq!(config.corporate_start_date, "2010-01-01"); + assert_eq!(config.economic_lookahead_months, 3); + assert_eq!(config.max_parallel_instances, 10); + assert!(!config.enable_vpn_rotation); + assert_eq!(config.tasks_per_vpn_session, 0); + } +} + +#[cfg(test)] +mod benchmark_tests { + use super::*; + + #[tokio::test] + #[ignore] // Performance test + async fn benchmark_vpn_rotation_overhead() { + use std::time::Instant; + + // This test measures the overhead of VPN rotation + let start = Instant::now(); + + // Simulate rotation cycle + // 1. Disconnect (instant) + // 2. Wait 2 seconds + // 3. Connect (5-10 seconds) + // 4. Verify IP (1-2 seconds) + + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + let elapsed = start.elapsed(); + println!("Rotation cycle took: {:?}", elapsed); + + // Typical rotation should complete in under 15 seconds + assert!(elapsed.as_secs() < 15); + } + + #[tokio::test] + #[ignore] // Performance test + async fn benchmark_parallel_scraping() { + // This test measures throughput with different parallelism levels + // Results help tune MAX_PARALLEL_INSTANCES + + let configs = vec![1, 2, 3, 5, 10]; + + for &pool_size in &configs { + println!("Testing with {} parallel instances...", pool_size); + + // Would need actual scraping implementation here + // For now, just verify pool creation time + let start = std::time::Instant::now(); + + let pool_result = event_backtest_engine::ChromeDriverPool::new(pool_size).await; + + if let Ok(_pool) = pool_result { + let elapsed = start.elapsed(); + println!(" Pool initialization: {:?}", elapsed); + + // Pool creation should be fast (< 5 seconds per instance) + assert!(elapsed.as_secs() < pool_size as u64 * 5); + } else { + eprintln!(" Skipped - ChromeDriver not available"); + } + } + } +} \ No newline at end of file