capabable spawning multiple openvpn instances

This commit is contained in:
2025-12-11 00:36:46 +01:00
parent c9da56e8e9
commit 470f0922ed
17 changed files with 3000 additions and 104 deletions

View File

@@ -8,52 +8,104 @@ use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::task::JoinHandle;
use tokio::sync::{Mutex, Semaphore};
use tokio::time::{sleep, timeout, Duration};
/// Manages a pool of ChromeDriver instances for parallel scraping.
///
/// This struct maintains multiple ChromeDriver processes and allows controlled
/// concurrent access via a semaphore. Instances are reused across tasks to avoid
/// the overhead of spawning new processes.
use super::vpn_manager::{VpnInstance, VpnPool};
#[cfg(target_os = "windows")]
use super::forcebindip::ForceBindIpManager;
/// Manages a pool of ChromeDriver instances for parallel scraping with optional VPN binding.
pub struct ChromeDriverPool {
instances: Vec<Arc<Mutex<ChromeInstance>>>,
semaphore: Arc<Semaphore>,
tasks_per_instance: usize,
vpn_pool: Option<Arc<VpnPool>>,
#[cfg(target_os = "windows")]
forcebindip: Option<Arc<ForceBindIpManager>>,
}
impl ChromeDriverPool {
/// Creates a new pool with the specified number of ChromeDriver instances.
///
/// # Arguments
/// * `pool_size` - Number of concurrent ChromeDriver instances to maintain
/// Creates a new pool with the specified number of ChromeDriver instances (no VPN).
pub async fn new(pool_size: usize) -> Result<Self> {
Self::new_with_vpn_and_task_limit(pool_size, None, 0).await
}
/// Creates a new ChromeDriver pool with task-per-instance tracking.
pub async fn new_with_task_limit(
pool_size: usize,
max_tasks_per_instance: usize,
) -> Result<Self> {
Self::new_with_vpn_and_task_limit(pool_size, None, max_tasks_per_instance).await
}
/// Creates a new pool with VPN support.
pub async fn new_with_vpn(
pool_size: usize,
vpn_pool: Option<Arc<VpnPool>>,
) -> Result<Self> {
Self::new_with_vpn_and_task_limit(pool_size, vpn_pool, 0).await
}
/// Creates a new pool with VPN support and task-per-instance limits.
pub async fn new_with_vpn_and_task_limit(
pool_size: usize,
vpn_pool: Option<Arc<VpnPool>>,
max_tasks_per_instance: usize,
) -> Result<Self> {
let mut instances = Vec::with_capacity(pool_size);
println!(
"Initializing ChromeDriver pool with {} instances...",
pool_size
);
for i in 0..pool_size {
match ChromeInstance::new().await {
Ok(instance) => {
println!(" ✓ Instance {} ready", i + 1);
instances.push(Arc::new(Mutex::new(instance)));
#[cfg(target_os = "windows")]
let forcebindip = if vpn_pool.is_some() {
match ForceBindIpManager::new() {
Ok(manager) => {
crate::util::logger::log_info("✓ ForceBindIP manager initialized").await;
Some(Arc::new(manager))
}
Err(e) => {
eprintln!(" ✗ Failed to create instance {}: {}", i + 1, e);
// Clean up already created instances
drop(instances);
return Err(e);
crate::util::logger::log_warn(&format!(
"⚠ ForceBindIP not available: {}. Proceeding without IP binding.",
e
)).await;
None
}
}
} else {
None
};
crate::util::logger::log_info(&format!(
"Initializing ChromeDriver pool with {} instances{}{}...",
pool_size,
if vpn_pool.is_some() { " (VPN-enabled)" } else { "" },
if max_tasks_per_instance > 0 { &format!(" (max {} tasks/instance)", max_tasks_per_instance) } else { "" }
)).await;
for i in 0..pool_size {
// If VPN pool exists, acquire a VPN instance for this ChromeDriver
let vpn_instance = if let Some(ref vp) = vpn_pool {
Some(vp.acquire().await?)
} else {
None
};
#[cfg(target_os = "windows")]
let instance = ChromeInstance::new_with_task_limit(vpn_instance, forcebindip.clone(), max_tasks_per_instance).await?;
#[cfg(not(target_os = "windows"))]
let instance = ChromeInstance::new_with_task_limit(vpn_instance, max_tasks_per_instance).await?;
crate::util::logger::log_info(&format!(" ✓ Instance {} ready", i + 1)).await;
instances.push(Arc::new(Mutex::new(instance)));
}
Ok(Self {
instances,
semaphore: Arc::new(Semaphore::new(pool_size)),
tasks_per_instance: 0,
vpn_pool,
#[cfg(target_os = "windows")]
forcebindip,
})
}
@@ -72,9 +124,31 @@ impl ChromeDriverPool {
.map_err(|_| anyhow!("Semaphore closed"))?;
// Find an available instance (round-robin or first available)
let instance = self.instances[0].clone(); // Simple: use first, could be round-robin
let instance = self.instances[0].clone();
let mut guard = instance.lock().await;
// Track task count
guard.increment_task_count();
// Get VPN info before creating session
let vpn_info = if let Some(ref vpn) = guard.vpn_instance {
let vpn_guard = vpn.lock().await;
Some(format!("{} ({})",
vpn_guard.hostname(),
vpn_guard.external_ip().unwrap_or("unknown")))
} else {
None
};
// Log task count if limit is set
if guard.max_tasks_per_instance > 0 {
crate::util::logger::log_info(&format!(
"Instance task count: {}/{}",
guard.get_task_count(),
guard.max_tasks_per_instance
)).await;
}
// Create a new session for this task
let client = guard.new_session().await?;
@@ -82,40 +156,137 @@ impl ChromeDriverPool {
drop(guard);
// Navigate and parse
if let Some(ref info) = vpn_info {
crate::util::logger::log_info(&format!("Scraping {} via VPN: {}", url, info)).await;
}
client.goto(&url).await.context("Failed to navigate")?;
let result = timeout(Duration::from_secs(60), parse(client))
.await
.context("Parse function timed out after 60s")??;
// Handle VPN rotation if needed
if let Some(ref vpn_pool) = self.vpn_pool {
let mut guard = instance.lock().await;
if let Some(ref vpn) = guard.vpn_instance {
vpn_pool.rotate_if_needed(vpn.clone()).await?;
guard.reset_task_count(); // Reset task count on VPN rotation
}
}
Ok(result)
}
pub fn get_number_of_instances(&self) -> usize {
self.instances.len()
}
/// Returns whether VPN is enabled for this pool
pub fn is_vpn_enabled(&self) -> bool {
self.vpn_pool.is_some()
}
/// Gracefully shutdown all ChromeDriver instances in the pool.
pub async fn shutdown(&self) -> Result<()> {
crate::util::logger::log_info("Shutting down ChromeDriverPool instances...").await;
for inst in &self.instances {
crate::util::logger::log_info("Shutting down a ChromeDriver instance...").await;
let mut guard = inst.lock().await;
if let Err(e) = guard.shutdown().await {
crate::util::logger::log_warn(&format!("Error shutting down instance: {}", e)).await;
}
}
crate::util::logger::log_info("All ChromeDriver instances shut down").await;
Ok(())
}
}
/// Represents a single instance of chromedriver process.
/// Represents a single instance of chromedriver process, optionally bound to a VPN.
pub struct ChromeInstance {
process: Child,
base_url: String,
vpn_instance: Option<Arc<Mutex<VpnInstance>>>,
task_count: usize,
max_tasks_per_instance: usize,
// Optional join handle for background stderr logging task
stderr_log: Option<JoinHandle<()>>,
}
impl ChromeInstance {
/// Creates a new ChromeInstance by spawning chromedriver with random port.
///
/// This spawns `chromedriver --port=0` to avoid port conflicts, reads stdout to extract
/// the listening address, and waits for the success message. If timeout occurs or
/// spawning fails, returns an error with context.
///
/// # Errors
///
/// Returns an error if chromedriver fails to spawn (e.g., not in PATH, version mismatch),
/// if the process exits early, or if the address/success message isn't found within 30s.
pub async fn new() -> Result<Self> {
let mut command = Command::new("chromedriver-win64/chromedriver.exe");
/// Creates a new ChromeInstance, optionally bound to a VPN IP.
#[cfg(target_os = "windows")]
pub async fn new(
vpn_instance: Option<Arc<Mutex<VpnInstance>>>,
forcebindip: Option<Arc<ForceBindIpManager>>,
) -> Result<Self> {
Self::new_with_task_limit(vpn_instance, forcebindip, 0).await
}
/// Creates a new ChromeInstance with task-per-instance limit, bound to a VPN IP if provided.
#[cfg(target_os = "windows")]
pub async fn new_with_task_limit(
vpn_instance: Option<Arc<Mutex<VpnInstance>>>,
forcebindip: Option<Arc<ForceBindIpManager>>,
max_tasks_per_instance: usize,
) -> Result<Self> {
let bind_ip = if let Some(ref vpn) = vpn_instance {
let vpn_guard = vpn.lock().await;
vpn_guard.external_ip().map(|s| s.to_string())
} else {
None
};
let mut command = if let (Some(ip), Some(fb)) = (&bind_ip, &forcebindip) {
// Use ForceBindIP to bind ChromeDriver to specific VPN IP
crate::util::logger::log_info(&format!("Binding ChromeDriver to VPN IP: {}", ip)).await;
let mut std_cmd = fb.create_bound_command(
ip,
std::path::Path::new("chromedriver-win64/chromedriver.exe"),
&["--port=0"],
);
Command::from(std_cmd)
} else {
let mut cmd = Command::new("chromedriver-win64/chromedriver.exe");
cmd.arg("--port=0");
cmd
};
command.stdout(Stdio::piped()).stderr(Stdio::piped());
let mut process = command
.spawn()
.context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?;
let (base_url, stderr_handle) = Self::wait_for_chromedriver_start(&mut process).await?;
Ok(Self {
process,
base_url,
vpn_instance,
task_count: 0,
max_tasks_per_instance,
stderr_log: stderr_handle,
})
}
/// Creates a new ChromeInstance on non-Windows platforms (no ForceBindIP support).
#[cfg(not(target_os = "windows"))]
pub async fn new(vpn_instance: Option<Arc<Mutex<VpnInstance>>>) -> Result<Self> {
Self::new_with_task_limit(vpn_instance, 0).await
}
/// Creates a new ChromeInstance on non-Windows platforms with task-per-instance limit.
#[cfg(not(target_os = "windows"))]
pub async fn new_with_task_limit(vpn_instance: Option<Arc<Mutex<VpnInstance>>>, max_tasks_per_instance: usize) -> Result<Self> {
if vpn_instance.is_some() {
crate::util::logger::log_warn(
"⚠ VPN binding requested but ForceBindIP is not available on this platform"
).await;
}
let mut command = Command::new("chromedriver");
command
.arg("--port=0") // Use random available port to support pooling
.arg("--port=0")
.stdout(Stdio::piped())
.stderr(Stdio::piped());
@@ -123,20 +294,38 @@ impl ChromeInstance {
.spawn()
.context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?;
let (base_url, stderr_handle) = Self::wait_for_chromedriver_start(&mut process).await?;
Ok(Self {
process,
base_url,
vpn_instance,
task_count: 0,
max_tasks_per_instance,
stderr_log: stderr_handle,
})
}
/// Waits for ChromeDriver to start and extracts the listening address.
async fn wait_for_chromedriver_start(process: &mut Child) -> Result<(String, Option<JoinHandle<()>>)> {
let mut stdout =
BufReader::new(process.stdout.take().context("Failed to capture stdout")?).lines();
let mut stderr =
BufReader::new(process.stderr.take().context("Failed to capture stderr")?).lines();
let stderr_reader = process.stderr.take().context("Failed to capture stderr")?;
let start_time = std::time::Instant::now();
let mut address: Option<String> = None;
let mut success = false;
// Log stderr in background for debugging
tokio::spawn(async move {
while let Ok(Some(line)) = stderr.next_line().await {
eprintln!("ChromeDriver stderr: {}", line);
// Log stderr in background for debugging and return the JoinHandle so we can
// abort/await it during shutdown.
let stderr_handle: JoinHandle<()> = tokio::spawn(async move {
let mut stderr_lines = BufReader::new(stderr_reader).lines();
while let Ok(Some(line)) = stderr_lines.next_line().await {
let trimmed = line.trim();
if !trimmed.is_empty() {
crate::util::logger::log_info(&format!("ChromeDriver stderr: {}", trimmed)).await;
}
}
});
@@ -152,10 +341,7 @@ impl ChromeInstance {
}
if let (Some(addr), true) = (&address, success) {
return Ok(Self {
process,
base_url: addr.clone(),
});
return Ok((addr.clone(), Some(stderr_handle)));
}
}
@@ -164,11 +350,13 @@ impl ChromeInstance {
// Cleanup on failure
let _ = process.kill().await;
Err(anyhow!("Timeout: ChromeDriver did not start within 30 seconds. Check version match with Chrome browser and system resources."))
// If we timed out, abort stderr logging task
stderr_handle.abort();
let _ = stderr_handle.await;
Err(anyhow!("Timeout: ChromeDriver did not start within 30 seconds"))
}
/// Creates a new browser session (client) from this ChromeDriver instance.
/// Each session is independent and can be closed without affecting the driver.
pub async fn new_session(&self) -> Result<Client> {
ClientBuilder::native()
.capabilities(Self::chrome_args())
@@ -177,11 +365,47 @@ impl ChromeInstance {
.context("Failed to create new session")
}
/// Increments task counter and returns whether limit has been reached
pub fn increment_task_count(&mut self) -> bool {
if self.max_tasks_per_instance > 0 {
self.task_count += 1;
self.task_count >= self.max_tasks_per_instance
} else {
false
}
}
/// Resets task counter (called when VPN is rotated)
pub fn reset_task_count(&mut self) {
self.task_count = 0;
}
/// Returns current task count for this instance
pub fn get_task_count(&self) -> usize {
self.task_count
}
/// Gracefully shutdown the chromedriver process and background log tasks.
pub async fn shutdown(&mut self) -> Result<()> {
// Abort and await stderr logging task if present
if let Some(handle) = self.stderr_log.take() {
handle.abort();
let _ = handle.await;
}
// Try to terminate the child process
let _ = self.process.start_kill();
// Await the process to ensure resources are released
let _ = self.process.wait().await;
Ok(())
}
fn chrome_args() -> Map<String, Value> {
let args = serde_json::json!({
"goog:chromeOptions": {
"args": [
"--headless=new",
"--headless",
"--disable-gpu",
"--no-sandbox",
"--disable-dev-shm-usage",
@@ -191,13 +415,14 @@ impl ChromeInstance {
"--disable-notifications",
"--disable-logging",
"--disable-autofill",
"--disable-features=TranslateUI,OptimizationGuideModelDownloading",
"--disable-sync",
"--disable-default-apps",
"--disable-translate",
"--window-size=1920,1080",
"--disable-blink-features=AutomationControlled",
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
],
"excludeSwitches": ["enable-logging", "enable-automation"],
"useAutomationExtension": false,
"prefs": {
"profile.default_content_setting_values.notifications": 2
}
@@ -209,10 +434,6 @@ impl ChromeInstance {
}
}
/// Parses the ChromeDriver address from a log line.
///
/// Looks for the "Starting ChromeDriver ... on port XXXX" line and extracts the port.
/// Returns `Some("http://localhost:XXXX")` if found, else `None`.
fn parse_chromedriver_address(line: &str) -> Option<String> {
if line.contains("Starting ChromeDriver") {
if let Some(port_str) = line.split("on port ").nth(1) {
@@ -223,7 +444,6 @@ fn parse_chromedriver_address(line: &str) -> Option<String> {
}
}
}
// Fallback for other formats (e.g., explicit port mentions)
for word in line.split_whitespace() {
if let Ok(port) = word.trim_matches(|c: char| !c.is_numeric()).parse::<u16>() {
if port > 1024 && port < 65535 && line.to_lowercase().contains("port") {
@@ -236,14 +456,13 @@ fn parse_chromedriver_address(line: &str) -> Option<String> {
impl Drop for ChromeInstance {
fn drop(&mut self) {
// Signal child to terminate. Do NOT block here; shutdown should be
// performed with the async `shutdown()` method when possible.
let _ = self.process.start_kill();
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
/// Simplified task execution - now uses the pool pattern.
///
/// For backwards compatibility with existing code.
/// Simplified task execution - uses the pool pattern.
pub struct ScrapeTask<T> {
url: String,
parse: Box<
@@ -263,7 +482,6 @@ impl<T: Send + 'static> ScrapeTask<T> {
}
}
/// Executes using a provided pool (more efficient for multiple tasks).
pub async fn execute_with_pool(self, pool: &ChromeDriverPool) -> Result<T> {
let url = self.url;
let parse = self.parse;
@@ -271,4 +489,4 @@ impl<T: Send + 'static> ScrapeTask<T> {
pool.execute(url, move |client| async move { (parse)(client).await })
.await
}
}
}