implement vpn pool

This commit is contained in:
2025-12-11 23:18:04 +01:00
parent 470f0922ed
commit 1bda78897b
14 changed files with 703 additions and 2680 deletions

View File

@@ -1,4 +1,5 @@
// src/main.rs
mod config;
mod corporate;
mod economic;
@@ -7,163 +8,162 @@ mod scraper;
use anyhow::Result;
use config::Config;
use scraper::docker_vpn_proxy::{DockerVpnProxyPool, cleanup_all_proxy_containers};
use scraper::webdriver::ChromeDriverPool;
use scraper::vpn_manager::VpnPool;
use util::directories::DataPaths;
use util::{logger, opnv};
use std::sync::Arc;
/// The entry point of the application.
///
/// This function loads the configuration, optionally initializes a VPN pool,
/// initializes a shared ChromeDriver pool bound to the VPN pool (if enabled),
/// and sequentially runs the full updates for corporate and economic data.
///
/// If VPN rotation is enabled:
/// 1. Fetches latest VPNBook OpenVPN configurations
/// 2. Creates a VPN pool and connects all VPN instances
/// 3. Binds each ChromeDriver instance to a different VPN for IP rotation
/// 4. Performs periodic health checks to reconnect unhealthy VPN instances
///
/// # Errors
///
/// Returns an error if configuration loading fails, pool initialization fails,
/// VPN fetching fails (if enabled), or if either update function encounters an issue
/// (e.g., network errors, scraping failures, or chromedriver spawn failures).
/// Application entry point
// src/main.rs
// ... existing imports ...
#[tokio::main]
async fn main() -> Result<()> {
cleanup_all_proxy_containers().await.ok();
// Load configuration from .env
let config = Config::load().map_err(|err| {
println!("Failed to load Config .env: {}", err);
eprintln!("Failed to load config: {}", err);
err
})?;
// Initialize paths
// Initialize paths and logger
let paths = DataPaths::new(".")?;
// Initialize logger
logger::init_debug_logger(paths.logs_dir()).await.map_err(|e| {
anyhow::anyhow!("Logger initialization failed: {}", e)
})?;
logger::log_info("=== Application started ===").await;
logger::log_info(&format!("Config: economic_start_date={}, corporate_start_date={}, lookahead_months={}, max_parallel_instances={}, enable_vpn_rotation={}, max_tasks_per_instance={}",
config.economic_start_date, config.corporate_start_date, config.economic_lookahead_months, config.max_parallel_instances, config.enable_vpn_rotation, config.max_tasks_per_instance)).await;
// Initialize VPN pool if enabled
let vpn_pool = if config.enable_vpn_rotation {
logger::log_info("=== VPN Rotation Enabled ===").await;
logger::log_info("--- Fetching latest VPNBook OpenVPN configurations ---").await;
let (username, password, _files) =
util::opnv::fetch_vpnbook_configs(&Arc::new(ChromeDriverPool::new(1).await?), paths.cache_dir()).await?;
let amount_of_openvpn_servers = _files.len();
logger::log_info(&format!("✓ Fetched VPN credentials - Username: {}", username)).await;
// Create VPN pool
let openvpn_dir = paths.cache_dir().join("openvpn");
logger::log_info("--- Initializing VPN Pool ---").await;
let vp = Arc::new(VpnPool::new(
&openvpn_dir,
username,
password,
true, // enable rotation
config.tasks_per_vpn_session,
amount_of_openvpn_servers,
).await?);
// Connect all VPN instances (gracefully handles failures)
logger::log_info("--- Connecting to VPN servers ---").await;
match vp.connect_all().await {
Ok(()) => {
logger::log_info("✓ VPN initialization complete").await;
Some(vp)
}
Err(e) => {
logger::log_warn(&format!(
"⚠ VPN initialization failed: {}. Continuing without VPN.",
e
)).await;
None
}
}
} else {
None
};
// Initialize the shared ChromeDriver pool with VPN pool
let pool_size = config.max_parallel_instances;
let max_tasks_per_instance = config.max_tasks_per_instance;
logger::init_debug_logger(paths.logs_dir()).await.ok();
logger::log_info("=== Event Backtest Engine Started ===").await;
logger::log_info(&format!(
"Initializing ChromeDriver pool with size: {}{}",
pool_size,
if max_tasks_per_instance > 0 { &format!(" (max {} tasks/instance)", max_tasks_per_instance) } else { "" }
"Config → parallel_instances: {}, task_limit: {} vpn_rotation: {}",
config.max_parallel_instances,
config.max_tasks_per_instance,
config.enable_vpn_rotation
)).await;
let pool = Arc::new(
if max_tasks_per_instance > 0 {
ChromeDriverPool::new_with_vpn_and_task_limit(pool_size, vpn_pool.clone(), max_tasks_per_instance).await?
} else if vpn_pool.is_some() {
ChromeDriverPool::new_with_vpn(pool_size, vpn_pool.clone()).await?
// === Step 1: Fetch fresh VPNBook credentials and .ovpn files (if rotation enabled) ===
let proxy_pool: Option<Arc<DockerVpnProxyPool>> = if config.enable_vpn_rotation {
logger::log_info("VPN Rotation Enabled — Fetching latest VPNBook configs").await;
// We only need 1 Chrome instance to scrape vpnbook.com (no proxy yet)
let temp_pool = Arc::new(ChromeDriverPool::new(1).await?);
let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?;
logger::log_info(&format!("VPNBook credentials → User: {}", username)).await;
// Count how many distinct servers (subfolders) we have in cache/openvpn/
let server_count = std::fs::read_dir(paths.cache_openvpn_dir())?
.filter(|e| e.as_ref().unwrap().path().is_dir())
.count();
if server_count == 0 {
logger::log_warn("No VPN servers found — continuing without VPN").await;
None
} else {
ChromeDriverPool::new(pool_size).await?
}
);
logger::log_info("✓ ChromeDriver pool initialized successfully").await;
logger::log_info(&format!("Found {} VPN servers — starting Docker proxy containers", server_count)).await;
// Spawn background Ctrl-C handler to gracefully shutdown pool and VPNs
{
let pool_for_signal = Arc::clone(&pool);
let vpn_for_signal = vpn_pool.clone();
tokio::spawn(async move {
if let Err(e) = tokio::signal::ctrl_c().await {
let _ = util::logger::log_error(&format!("Ctrl-C handler failed to install: {}", e)).await;
return;
}
let pp = Arc::new(
DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await?
);
let _ = util::logger::log_info("Ctrl-C received — initiating graceful shutdown").await;
if let Err(e) = pool_for_signal.shutdown().await {
let _ = util::logger::log_warn(&format!("Error shutting down ChromeDriver pool: {}", e)).await;
}
if let Some(vp) = vpn_for_signal {
if let Err(e) = vp.disconnect_all().await {
let _ = util::logger::log_warn(&format!("Error disconnecting VPNs: {}", e)).await;
// Verify all proxies are working before proceeding
logger::log_info("Verifying all proxy connections...").await;
let mut all_working = true;
for i in 0..pp.num_proxies() {
match pp.test_proxy_connection(i).await {
Ok(ip) => {
logger::log_info(&format!(" Proxy {}: working with IP: {}", i + 1, ip)).await;
}
Err(e) => {
logger::log_error(&format!(" Proxy {}: FAILED - {}", i + 1, e)).await;
all_working = false;
}
}
}
let _ = util::logger::log_info("Graceful shutdown complete (from Ctrl-C)").await;
// Exit the process now that cleanup is done
if !all_working {
logger::log_warn("Some proxies failed, but continuing with working ones...").await;
} else {
logger::log_info("All proxies verified and ready!").await;
}
logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await;
Some(pp)
}
} else {
logger::log_info("VPN rotation disabled — using direct connection").await;
None
};
// === Step 2: Initialize the main ChromeDriver pool (with proxy if enabled) ===
let pool_size = config.max_parallel_instances;
let task_limit = config.max_tasks_per_instance;
logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", pool_size)).await;
let pool = Arc::new(
if task_limit > 0 {
ChromeDriverPool::new_with_proxy_and_task_limit(pool_size, proxy_pool.clone(), task_limit).await?
} else {
ChromeDriverPool::new_with_proxy(pool_size, proxy_pool.clone()).await?
}
);
logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size)).await;
// === Step 3: Graceful Ctrl+C handler ===
{
let pool_clone = Arc::clone(&pool);
let proxy_clone = proxy_pool.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
logger::log_info("Ctrl+C received — shutting down gracefully...").await;
// Now works: &*pool_clone derefs Arc → &ChromeDriverPool
if let Err(e) = (&*pool_clone).shutdown().await {
logger::log_error(&format!("Error during pool shutdown: {}", e)).await;
}
if let Some(pp) = proxy_clone {
if let Err(e) = pp.shutdown().await {
logger::log_warn(&format!("Failed to stop Docker containers: {}", e)).await;
} else {
logger::log_info("All Docker VPN containers stopped").await;
}
}
let _ = cleanup_all_proxy_containers().await;
std::process::exit(0);
});
}
// Run economic update first, passing the shared pool
logger::log_info("--- Starting economic data update ---").await;
// === Step 4: Run the actual scraping jobs ===
logger::log_info("--- Starting ECONOMIC data update ---").await;
economic::run_full_update(&config, &pool).await?;
logger::log_info("Economic data update completed").await;
logger::log_info("Economic update completed").await;
// Then run corporate update, passing the shared pool
logger::log_info("--- Starting corporate data update ---").await;
logger::log_info("--- Starting CORPORATE data update ---").await;
corporate::run_full_update(&config, &pool).await?;
logger::log_info("Corporate data update completed").await;
logger::log_info("Corporate update completed").await;
// Shutdown ChromeDriver pool before disconnecting VPNs so instances can
// cleanly terminate any network-bound processes.
logger::log_info("--- Shutting down ChromeDriver pool ---").await;
// === Step 5: Final cleanup ===
logger::log_info("Shutting down ChromeDriver pool...").await;
pool.shutdown().await?;
logger::log_info("✓ ChromeDriver pool shutdown complete").await;
// Disconnect all VPN instances if enabled
if let Some(vp) = vpn_pool {
logger::log_info("--- Disconnecting VPN instances ---").await;
vp.disconnect_all().await?;
if let Some(pp) = proxy_pool {
logger::log_info("Stopping Docker VPN proxy containers...").await;
pp.shutdown().await?;
// CLEANUP ANY LEFTOVER CONTAINERS FROM PREVIOUS RUNS
cleanup_all_proxy_containers().await.ok();
}
logger::log_info("=== Application completed successfully ===").await;
logger::log_info("=== Application finished successfully ===").await;
Ok(())
}
}
/*
memory allocation of 4294967296 bytes failed
error: process didn't exit successfully: `target\debug\event_backtest_engine.exe` (exit code: 0xc0000409, STATUS_STACK_BUFFER_OVERRUN)
*/