// src/scraper/webdriver.rs use anyhow::{anyhow, Context, Result}; use fantoccini::{Client, ClientBuilder}; use serde_json::{Map, Value}; use std::pin::Pin; use std::process::Stdio; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::sync::{Mutex, Semaphore}; use tokio::time::{sleep, timeout, Duration}; /// Manages a pool of ChromeDriver instances for parallel scraping. /// /// This struct maintains multiple ChromeDriver processes and allows controlled /// concurrent access via a semaphore. Instances are reused across tasks to avoid /// the overhead of spawning new processes. pub struct ChromeDriverPool { instances: Vec>>, semaphore: Arc, tasks_per_instance: usize, } impl ChromeDriverPool { /// Creates a new pool with the specified number of ChromeDriver instances. /// /// # Arguments /// * `pool_size` - Number of concurrent ChromeDriver instances to maintain pub async fn new(pool_size: usize) -> Result { let mut instances = Vec::with_capacity(pool_size); println!( "Initializing ChromeDriver pool with {} instances...", pool_size ); for i in 0..pool_size { match ChromeInstance::new().await { Ok(instance) => { println!(" ✓ Instance {} ready", i + 1); instances.push(Arc::new(Mutex::new(instance))); } Err(e) => { eprintln!(" ✗ Failed to create instance {}: {}", i + 1, e); // Clean up already created instances drop(instances); return Err(e); } } } Ok(Self { instances, semaphore: Arc::new(Semaphore::new(pool_size)), tasks_per_instance: 0, }) } /// Executes a scrape task using an available instance from the pool. pub async fn execute(&self, url: String, parse: F) -> Result where T: Send + 'static, F: FnOnce(Client) -> Fut + Send + 'static, Fut: std::future::Future> + Send + 'static, { // Acquire semaphore permit let _permit = self .semaphore .acquire() .await .map_err(|_| anyhow!("Semaphore closed"))?; // Find an available instance (round-robin or first available) let instance = self.instances[0].clone(); // Simple: use first, could be round-robin let mut guard = instance.lock().await; // Create a new session for this task let client = guard.new_session().await?; // Release lock while we do the actual scraping drop(guard); // Navigate and parse client.goto(&url).await.context("Failed to navigate")?; let result = timeout(Duration::from_secs(60), parse(client)) .await .context("Parse function timed out after 60s")??; Ok(result) } pub fn get_number_of_instances(&self) -> usize { self.instances.len() } } /// Represents a single instance of chromedriver process. pub struct ChromeInstance { process: Child, base_url: String, } impl ChromeInstance { /// Creates a new ChromeInstance by spawning chromedriver with random port. /// /// This spawns `chromedriver --port=0` to avoid port conflicts, reads stdout to extract /// the listening address, and waits for the success message. If timeout occurs or /// spawning fails, returns an error with context. /// /// # Errors /// /// Returns an error if chromedriver fails to spawn (e.g., not in PATH, version mismatch), /// if the process exits early, or if the address/success message isn't found within 30s. pub async fn new() -> Result { let mut command = Command::new("chromedriver-win64/chromedriver.exe"); command .arg("--port=0") // Use random available port to support pooling .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut process = command .spawn() .context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?; let mut stdout = BufReader::new(process.stdout.take().context("Failed to capture stdout")?).lines(); let mut stderr = BufReader::new(process.stderr.take().context("Failed to capture stderr")?).lines(); let start_time = std::time::Instant::now(); let mut address: Option = None; let mut success = false; // Log stderr in background for debugging tokio::spawn(async move { while let Ok(Some(line)) = stderr.next_line().await { eprintln!("ChromeDriver stderr: {}", line); } }); // Wait for address and success (up to 30s) while start_time.elapsed() < Duration::from_secs(30) { if let Ok(Ok(Some(line))) = timeout(Duration::from_secs(1), stdout.next_line()).await { if let Some(addr) = parse_chromedriver_address(&line) { address = Some(addr.to_string()); } if line.contains("ChromeDriver was started successfully") { success = true; } if let (Some(addr), true) = (&address, success) { return Ok(Self { process, base_url: addr.clone(), }); } } sleep(Duration::from_millis(100)).await; } // Cleanup on failure let _ = process.kill().await; Err(anyhow!("Timeout: ChromeDriver did not start within 30 seconds. Check version match with Chrome browser and system resources.")) } /// Creates a new browser session (client) from this ChromeDriver instance. /// Each session is independent and can be closed without affecting the driver. pub async fn new_session(&self) -> Result { ClientBuilder::native() .capabilities(Self::chrome_args()) .connect(&self.base_url) .await .context("Failed to create new session") } fn chrome_args() -> Map { let args = serde_json::json!({ "goog:chromeOptions": { "args": [ "--headless=new", "--disable-gpu", "--no-sandbox", "--disable-dev-shm-usage", "--disable-infobars", "--disable-extensions", "--disable-popup-blocking", "--disable-notifications", "--disable-logging", "--disable-autofill", "--disable-features=TranslateUI,OptimizationGuideModelDownloading", "--window-size=1920,1080", "--disable-blink-features=AutomationControlled", "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" ], "excludeSwitches": ["enable-logging", "enable-automation"], "useAutomationExtension": false, "prefs": { "profile.default_content_setting_values.notifications": 2 } } }); args.as_object() .expect("Capabilities should be a JSON object") .clone() } } /// Parses the ChromeDriver address from a log line. /// /// Looks for the "Starting ChromeDriver ... on port XXXX" line and extracts the port. /// Returns `Some("http://localhost:XXXX")` if found, else `None`. fn parse_chromedriver_address(line: &str) -> Option { if line.contains("Starting ChromeDriver") { if let Some(port_str) = line.split("on port ").nth(1) { if let Some(port) = port_str.split_whitespace().next() { if port.parse::().is_ok() { return Some(format!("http://localhost:{}", port)); } } } } // Fallback for other formats (e.g., explicit port mentions) for word in line.split_whitespace() { if let Ok(port) = word.trim_matches(|c: char| !c.is_numeric()).parse::() { if port > 1024 && port < 65535 && line.to_lowercase().contains("port") { return Some(format!("http://localhost:{}", port)); } } } None } impl Drop for ChromeInstance { fn drop(&mut self) { let _ = self.process.start_kill(); std::thread::sleep(std::time::Duration::from_millis(100)); } } /// Simplified task execution - now uses the pool pattern. /// /// For backwards compatibility with existing code. pub struct ScrapeTask { url: String, parse: Box< dyn FnOnce(Client) -> Pin> + Send>> + Send, >, } impl ScrapeTask { pub fn new(url: String, parse: F) -> Self where F: FnOnce(Client) -> Fut + Send + 'static, Fut: std::future::Future> + Send + 'static, { Self { url, parse: Box::new(move |client| Box::pin(parse(client))), } } /// Executes using a provided pool (more efficient for multiple tasks). pub async fn execute_with_pool(self, pool: &ChromeDriverPool) -> Result { let url = self.url; let parse = self.parse; pool.execute(url, move |client| async move { (parse)(client).await }) .await } }