diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ffa621b --- /dev/null +++ b/.env.example @@ -0,0 +1,48 @@ +# WebScraper Configuration File (.env) +# ==================================== +# This file configures the behavior of the WebScraper application +# Copy to .env and adjust values as needed + +# ===== ECONOMIC DATA ===== +# Start date for economic event scraping +ECONOMIC_START_DATE=2007-02-13 + +# How far into the future to look ahead for economic events (in months) +ECONOMIC_LOOKAHEAD_MONTHS=3 + +# ===== CORPORATE DATA ===== +# Start date for corporate earnings/data scraping +CORPORATE_START_DATE=2010-01-01 + +# ===== PERFORMANCE & CONCURRENCY ===== +# Maximum number of parallel ChromeDriver instances +# Higher = more concurrent tasks, but higher resource usage +MAX_PARALLEL_INSTANCES=3 + +# Maximum tasks per ChromeDriver instance before recycling +# 0 = unlimited (instance lives for entire application runtime) +MAX_TASKS_PER_INSTANCE=0 + +# ===== VPN ROTATION (ProtonVPN Integration) ===== +# Enable automatic VPN rotation between sessions? +# If false, all traffic goes through system without VPN tunneling +ENABLE_VPN_ROTATION=false + +# Comma-separated list of ProtonVPN servers to rotate through +# Examples: +# "US-Free#1,US-Free#2,UK-Free#1" +# "US,UK,JP,DE,NL" +# NOTE: Must have ENABLE_VPN_ROTATION=true for this to take effect +VPN_SERVERS= + +# Number of tasks per VPN session before rotating to new server/IP +# 0 = rotate between economic and corporate phases (one phase = one IP) +# 5 = rotate every 5 tasks +# NOTE: Must have ENABLE_VPN_ROTATION=true for this to take effect +TASKS_PER_VPN_SESSION=0 + +# ===== LOGGING ===== +# Set via RUST_LOG environment variable: +# RUST_LOG=info cargo run +# RUST_LOG=debug cargo run +# Leave empty or unset for default logging level diff --git a/Cargo.lock b/Cargo.lock index abb37f2..7933033 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -674,6 +674,7 @@ dependencies = [ "once_cell", "rand 0.9.2", "rayon", + "regex", "reqwest", "scraper", "serde", @@ -682,6 +683,7 @@ dependencies = [ "toml", "tracing", "tracing-subscriber", + "url", "yfinance-rs", "zip", ] diff --git a/Cargo.toml b/Cargo.toml index 46d1256..b9359ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ reqwest = { version = "0.12", features = ["json", "gzip", "brotli", "deflate", " scraper = "0.19" # HTML parsing for Yahoo earnings pages fantoccini = { version = "0.20", features = ["rustls-tls"] } # Headless Chrome for finanzen.net yfinance-rs = "0.7.2" +url = "2.5.7" # Serialization serde = { version = "1.0", features = ["derive"] } @@ -29,6 +30,9 @@ csv = "1.3" zip = "6.0.0" flate2 = "1.1.5" +# +regex = "1.12.2" + # Generating rand = "0.9.2" diff --git a/src/config.rs b/src/config.rs index d71e94b..d30df1d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,21 +12,51 @@ pub struct Config { pub economic_lookahead_months: u32, // default: 3 /// Maximum number of parallel scraping tasks (default: 10). /// This limits concurrency to protect system load and prevent website spamming. - #[serde(default = "default_max_parallel")] - pub max_parallel_tasks: usize, + #[serde(default = "default_max_parallel_instances")] + pub max_parallel_instances: usize, + + pub max_tasks_per_instance: usize, + + /// VPN rotation configuration + /// If set to "true", enables automatic VPN rotation between sessions + #[serde(default)] + pub enable_vpn_rotation: bool, + + /// Comma-separated list of VPN servers/country codes to rotate through. + /// Example: "US-Free#1,UK-Free#1,JP-Free#1" or "US,JP,DE" + /// If empty, VPN rotation is disabled. + #[serde(default)] + pub vpn_servers: String, + + /// Number of tasks per session before rotating VPN + /// If set to 0, rotates VPN between economic and corporate phases + #[serde(default = "default_tasks_per_session")] + pub tasks_per_vpn_session: usize, } -fn default_max_parallel() -> usize { +fn default_max_parallel_instances() -> usize { 10 } +fn default_tasks_per_session() -> usize { + 0 // 0 = rotate between economic/corporate +} + +fn default_protonvpn_extension_id() -> String { + "ghmbeldphafepmbegfdlkpapadhbakde".to_string() +} + impl Default for Config { fn default() -> Self { Self { economic_start_date: "2007-02-13".to_string(), corporate_start_date: "2010-01-01".to_string(), economic_lookahead_months: 3, - max_parallel_tasks: default_max_parallel(), + max_parallel_instances: default_max_parallel_instances(), + max_tasks_per_instance: 0, + enable_vpn_rotation: false, + vpn_servers: String::new(), + tasks_per_vpn_session: default_tasks_per_session(), } } } @@ -59,19 +89,54 @@ impl Config { .parse() .context("Failed to parse ECONOMIC_LOOKAHEAD_MONTHS as u32")?; - let max_parallel_tasks: usize = dotenvy::var("MAX_PARALLEL_TASKS") + let max_parallel_instances: usize = dotenvy::var("MAX_PARALLEL_INSTANCES") .unwrap_or_else(|_| "10".to_string()) .parse() - .context("Failed to parse MAX_PARALLEL_TASKS as usize")?; + .context("Failed to parse MAX_PARALLEL_INSTANCES as usize")?; + + let max_tasks_per_instance: usize = dotenvy::var("MAX_TASKS_PER_INSTANCE") + .unwrap_or_else(|_| "0".to_string()) + .parse() + .context("Failed to parse MAX_TASKS_PER_INSTANCE as usize")?; + + let enable_vpn_rotation = dotenvy::var("ENABLE_VPN_ROTATION") + .unwrap_or_else(|_| "false".to_string()) + .parse::() + .context("Failed to parse ENABLE_VPN_ROTATION as bool")?; + + let vpn_servers = dotenvy::var("VPN_SERVERS") + .unwrap_or_else(|_| String::new()); + + let tasks_per_vpn_session: usize = dotenvy::var("TASKS_PER_VPN_SESSION") + .unwrap_or_else(|_| "0".to_string()) + .parse() + .context("Failed to parse TASKS_PER_VPN_SESSION as usize")?; Ok(Self { economic_start_date, corporate_start_date, economic_lookahead_months, - max_parallel_tasks, + max_parallel_instances, + max_tasks_per_instance, + enable_vpn_rotation, + vpn_servers, + tasks_per_vpn_session, }) } + /// Get the list of VPN servers configured for rotation + pub fn get_vpn_servers(&self) -> Vec { + if self.vpn_servers.is_empty() { + Vec::new() + } else { + self.vpn_servers + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() + } + } + pub fn target_end_date(&self) -> String { let now = chrono::Local::now().naive_local().date(); let future = now + chrono::Duration::days(30 * self.economic_lookahead_months as i64); diff --git a/src/corporate/scraper.rs b/src/corporate/scraper.rs index 486ca88..aa163dd 100644 --- a/src/corporate/scraper.rs +++ b/src/corporate/scraper.rs @@ -1,7 +1,7 @@ // src/corporate/scraper.rs use super::{types::*, helpers::*, openfigi::*}; //use crate::corporate::openfigi::OpenFigiClient; -use crate::{webdriver::webdriver::*, util::directories::DataPaths, util::logger}; +use crate::{scraper::webdriver::*, util::directories::DataPaths, util::logger}; use fantoccini::{Client, Locator}; use scraper::{Html, Selector}; use chrono::{DateTime, Duration, NaiveDate, Utc}; diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 8e73b58..ea8b565 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -3,7 +3,7 @@ use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfi use crate::config::Config; use crate::util::directories::DataPaths; use crate::util::logger; -use crate::webdriver::webdriver::ChromeDriverPool; +use crate::scraper::webdriver::ChromeDriverPool; use chrono::Local; use std::collections::{HashMap}; diff --git a/src/economic/update.rs b/src/economic/update.rs index 0451c15..1197bef 100644 --- a/src/economic/update.rs +++ b/src/economic/update.rs @@ -1,6 +1,6 @@ // src/economic/update.rs use super::{scraper::*, storage::*, helpers::*, types::*}; -use crate::{config::Config, webdriver::webdriver::{ScrapeTask, ChromeDriverPool}, util::directories::DataPaths, util::logger}; +use crate::{config::Config, scraper::webdriver::{ScrapeTask, ChromeDriverPool}, util::directories::DataPaths, util::logger}; use chrono::{Local}; use std::sync::Arc; diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..30f8f0f --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,8 @@ +// src/lib.rs +//! Event Backtest Engine - Core Library +//! +//! Exposes all public modules for use in examples and tests + +pub mod config; +pub mod scraper; +pub mod util; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index aa64fff..5d135e4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,21 @@ // src/main.rs -mod economic; -mod corporate; mod config; -mod webdriver; +mod corporate; +mod economic; mod util; +mod scraper; use anyhow::Result; use config::Config; -use webdriver::webdriver::ChromeDriverPool; +use scraper::webdriver::ChromeDriverPool; use util::directories::DataPaths; -use util::logger; +use util::{logger, opnv}; use std::sync::Arc; /// The entry point of the application. /// /// This function loads the configuration, initializes a shared ChromeDriver pool, +/// fetches the latest VPNBook OpenVPN configurations if VPN rotation is enabled, /// and sequentially runs the full updates for corporate and economic data. /// Sequential execution helps prevent resource exhaustion from concurrent /// chromedriver instances and avoids spamming the target websites with too many requests. @@ -22,8 +23,8 @@ use std::sync::Arc; /// # Errors /// /// Returns an error if configuration loading fails, pool initialization fails, -/// or if either update function encounters an issue (e.g., network errors, -/// scraping failures, or chromedriver spawn failures like "program not found"). +/// VPN fetching fails (if enabled), or if either update function encounters an issue +/// (e.g., network errors, scraping failures, or chromedriver spawn failures like "program not found"). #[tokio::main] async fn main() -> Result<()> { let config = Config::load().map_err(|err| { @@ -40,16 +41,29 @@ async fn main() -> Result<()> { })?; logger::log_info("=== Application started ===").await; - logger::log_info(&format!("Config: economic_start_date={}, corporate_start_date={}, lookahead_months={}, max_parallel_tasks={}", - config.economic_start_date, config.corporate_start_date, config.economic_lookahead_months, config.max_parallel_tasks)).await; + logger::log_info(&format!("Config: economic_start_date={}, corporate_start_date={}, lookahead_months={}, max_parallel_instances={}, enable_vpn_rotation={}", + config.economic_start_date, config.corporate_start_date, config.economic_lookahead_months, config.max_parallel_instances, config.enable_vpn_rotation)).await; // Initialize the shared ChromeDriver pool once - let pool_size = config.max_parallel_tasks; + let pool_size = config.max_parallel_instances; logger::log_info(&format!("Initializing ChromeDriver pool with size: {}", pool_size)).await; let pool = Arc::new(ChromeDriverPool::new(pool_size).await?); logger::log_info("✓ ChromeDriver pool initialized successfully").await; + // Fetch VPNBook configs if VPN rotation is enabled + if config.enable_vpn_rotation { + logger::log_info("--- Fetching latest VPNBook OpenVPN configurations ---").await; + let (username, password, files) = + util::opnv::fetch_vpnbook_configs(&pool, paths.cache_dir()).await?; + logger::log_info(&format!("Fetched VPN username: {}, password: {}", username, password)).await; + for file in &files { + logger::log_info(&format!("Extracted OVPN: {:?}", file)).await; + } + // Optionally, store username/password for rotation use (e.g., in a file or global state) + // For now, just log them; extend as needed for rotation integration + } + // Run economic update first, passing the shared pool logger::log_info("--- Starting economic data update ---").await; economic::run_full_update(&config, &pool).await?; diff --git a/src/webdriver/mod.rs b/src/scraper/mod.rs similarity index 100% rename from src/webdriver/mod.rs rename to src/scraper/mod.rs diff --git a/src/webdriver/webdriver.rs b/src/scraper/webdriver.rs similarity index 87% rename from src/webdriver/webdriver.rs rename to src/scraper/webdriver.rs index f71bdd4..3037b01 100644 --- a/src/webdriver/webdriver.rs +++ b/src/scraper/webdriver.rs @@ -3,34 +3,38 @@ use anyhow::{anyhow, Context, Result}; use fantoccini::{Client, ClientBuilder}; use serde_json::{Map, Value}; +use std::pin::Pin; use std::process::Stdio; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::sync::{Mutex, Semaphore}; -use tokio::time::{Duration, sleep, timeout}; -use std::pin::Pin; +use tokio::time::{sleep, timeout, Duration}; /// Manages a pool of ChromeDriver instances for parallel scraping. -/// +/// /// This struct maintains multiple ChromeDriver processes and allows controlled /// concurrent access via a semaphore. Instances are reused across tasks to avoid /// the overhead of spawning new processes. pub struct ChromeDriverPool { instances: Vec>>, semaphore: Arc, + tasks_per_instance: usize, } impl ChromeDriverPool { /// Creates a new pool with the specified number of ChromeDriver instances. - /// + /// /// # Arguments /// * `pool_size` - Number of concurrent ChromeDriver instances to maintain pub async fn new(pool_size: usize) -> Result { let mut instances = Vec::with_capacity(pool_size); - - println!("Initializing ChromeDriver pool with {} instances...", pool_size); - + + println!( + "Initializing ChromeDriver pool with {} instances...", + pool_size + ); + for i in 0..pool_size { match ChromeInstance::new().await { Ok(instance) => { @@ -45,10 +49,11 @@ impl ChromeDriverPool { } } } - + Ok(Self { instances, semaphore: Arc::new(Semaphore::new(pool_size)), + tasks_per_instance: 0, }) } @@ -60,7 +65,10 @@ impl ChromeDriverPool { Fut: std::future::Future> + Send + 'static, { // Acquire semaphore permit - let _permit = self.semaphore.acquire().await + let _permit = self + .semaphore + .acquire() + .await .map_err(|_| anyhow!("Semaphore closed"))?; // Find an available instance (round-robin or first available) @@ -69,7 +77,7 @@ impl ChromeDriverPool { // Create a new session for this task let client = guard.new_session().await?; - + // Release lock while we do the actual scraping drop(guard); @@ -82,8 +90,8 @@ impl ChromeDriverPool { Ok(result) } - pub fn get_number_of_instances (&self) -> usize { - self.instances.len() + pub fn get_number_of_instances(&self) -> usize { + self.instances.len() } } @@ -94,7 +102,7 @@ pub struct ChromeInstance { } impl ChromeInstance { -/// Creates a new ChromeInstance by spawning chromedriver with random port. + /// Creates a new ChromeInstance by spawning chromedriver with random port. /// /// This spawns `chromedriver --port=0` to avoid port conflicts, reads stdout to extract /// the listening address, and waits for the success message. If timeout occurs or @@ -107,7 +115,7 @@ impl ChromeInstance { pub async fn new() -> Result { let mut command = Command::new("chromedriver-win64/chromedriver.exe"); command - .arg("--port=0") // Use random available port to support pooling + .arg("--port=0") // Use random available port to support pooling .stdout(Stdio::piped()) .stderr(Stdio::piped()); @@ -115,13 +123,11 @@ impl ChromeInstance { .spawn() .context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?; - let mut stdout = BufReader::new( - process.stdout.take().context("Failed to capture stdout")? - ).lines(); + let mut stdout = + BufReader::new(process.stdout.take().context("Failed to capture stdout")?).lines(); - let mut stderr = BufReader::new( - process.stderr.take().context("Failed to capture stderr")? - ).lines(); + let mut stderr = + BufReader::new(process.stderr.take().context("Failed to capture stderr")?).lines(); let start_time = std::time::Instant::now(); let mut address: Option = None; @@ -136,9 +142,7 @@ impl ChromeInstance { // Wait for address and success (up to 30s) while start_time.elapsed() < Duration::from_secs(30) { - if let Ok(Ok(Some(line))) = - timeout(Duration::from_secs(1), stdout.next_line()).await - { + if let Ok(Ok(Some(line))) = timeout(Duration::from_secs(1), stdout.next_line()).await { if let Some(addr) = parse_chromedriver_address(&line) { address = Some(addr.to_string()); } @@ -200,8 +204,8 @@ impl ChromeInstance { } }); args.as_object() - .expect("Capabilities should be a JSON object") - .clone() + .expect("Capabilities should be a JSON object") + .clone() } } @@ -238,11 +242,13 @@ impl Drop for ChromeInstance { } /// Simplified task execution - now uses the pool pattern. -/// +/// /// For backwards compatibility with existing code. pub struct ScrapeTask { url: String, - parse: Box Pin> + Send>> + Send>, + parse: Box< + dyn FnOnce(Client) -> Pin> + Send>> + Send, + >, } impl ScrapeTask { @@ -261,9 +267,8 @@ impl ScrapeTask { pub async fn execute_with_pool(self, pool: &ChromeDriverPool) -> Result { let url = self.url; let parse = self.parse; - - pool.execute(url, move |client| async move { - (parse)(client).await - }).await + + pool.execute(url, move |client| async move { (parse)(client).await }) + .await } -} \ No newline at end of file +} diff --git a/src/util/mod.rs b/src/util/mod.rs index 55a6a02..696c249 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,3 +1,4 @@ // src/util/mod.rs pub mod logger; -pub mod directories; \ No newline at end of file +pub mod directories; +pub mod opnv; \ No newline at end of file diff --git a/src/util/opnv.rs b/src/util/opnv.rs new file mode 100644 index 0000000..fa5dff4 --- /dev/null +++ b/src/util/opnv.rs @@ -0,0 +1,278 @@ +// src/scraper/opnv.rs + +//! Module for fetching, downloading, and extracting OpenVPN configurations from VPNBook. +//! +//! This module provides functionality to scrape the VPNBook free VPN page using +//! a headless browser, handle potential consent popups, extract current credentials, +//! collect download URLs for OpenVPN ZIP files, download them, and then extract +//! the .ovpn files into a structured directory: cache/openvpn//. +//! It is designed to fetch the most recent data on every run, as credentials and +//! server configurations change periodically. + +use anyhow::{anyhow, Context, Result}; +use fantoccini::{Client, Locator}; +use regex::Regex; +use reqwest; +use std::io::{Read}; +use std::path::{Path, PathBuf}; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use url::Url; +use zip::ZipArchive; +use crate::scraper::webdriver::{ChromeDriverPool, ScrapeTask}; + +/// Fetches, downloads, and extracts the latest OpenVPN configurations from VPNBook. +/// +/// This asynchronous function uses the provided `ChromeDriverPool` to scrape the +/// VPNBook free VPN page. It dismisses any consent popup if present, extracts the +/// current username and password, collects all OpenVPN ZIP download URLs, downloads +/// the ZIP files temporarily, extracts the .ovpn files into the specified directory +/// structure under `cache_dir`/openvpn//, and cleans up the ZIP files. +/// +/// The directory structure is: cache/openvpn//, where +/// is derived from the ZIP filename (e.g., "ca149.vpnbook.com"). +/// +/// The function ensures fresh data is fetched each time it runs, making it suitable +/// for periodic updates where credentials may change. +/// +/// # Arguments +/// +/// * `pool` - A reference to the `ChromeDriverPool` for managing browser instances. +/// * `cache_dir` - The path to the base cache directory. The OpenVPN files will be saved +/// under `cache_dir`/openvpn//. +/// +/// # Returns +/// +/// A `Result` containing a tuple with: +/// - `String`: The scraped username. +/// - `String`: The scraped password. +/// - `Vec`: Paths to the extracted .ovpn files. +/// +/// # Errors +/// +/// Returns an `anyhow::Error` if: +/// - Navigation to the page fails. +/// - The consent popup cannot be dismissed (if present). +/// - Credentials cannot be parsed from the page. +/// - Download URLs cannot be found or are invalid. +/// - HTTP downloads fail or file writing errors occur. +/// - ZIP extraction fails (e.g., invalid ZIP or I/O errors). +/// +/// # Dependencies +/// +/// This function requires the following crates (add to Cargo.toml if not present): +/// - `anyhow` for error handling. +/// - `fantoccini` for browser automation. +/// - `regex` for parsing credentials from HTML. +/// - `reqwest` (with `tokio` features) for HTTP downloads. +/// - `tokio` for asynchronous file operations. +/// - `url` for URL manipulation. +/// - `zip` for ZIP extraction. +/// +/// # Examples +/// +/// ```no_run +/// use anyhow::Result; +/// use event_backtest_engine::scraper::opnv::fetch_vpnbook_configs; +/// use event_backtest_engine::scraper::webdriver::ChromeDriverPool; +/// use std::path::Path; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// let pool = ChromeDriverPool::new(1).await?; +/// let (username, password, files) = +/// fetch_vpnbook_configs(&pool, Path::new("./cache")).await?; +/// println!("Username: {}, Password: {}", username, password); +/// for file in files { +/// println!("Extracted: {:?}", file); +/// } +/// Ok(()) +/// } +/// ``` +pub async fn fetch_vpnbook_configs( + pool: &ChromeDriverPool, + cache_dir: &Path, +) -> Result<(String, String, Vec)> { + // Prepare the openvpn directory + let vpn_dir = cache_dir.join("openvpn"); + tokio::fs::create_dir_all(&vpn_dir) + .await + .context("Failed to create openvpn directory")?; + + // Temporary directory for ZIP downloads (under cache for consistency) + let temp_dir = cache_dir.join("temp_vpn_zips"); + tokio::fs::create_dir_all(&temp_dir) + .await + .context("Failed to create temp directory")?; + + let url = "https://www.vpnbook.com/freevpn".to_string(); + + // Define the scraping task + let task = ScrapeTask::new(url, |client: Client| async move { + // Attempt to dismiss consent popup if present + let consent_selector = r#"body > div.fc-consent-root > div.fc-dialog-container > div.fc-dialog.fc-choice-dialog > div.fc-footer-buttons-container > div.fc-footer-buttons > button.fc-button.fc-cta-do-not-consent.fc-secondary-button > p"#; + if let Ok(consent_elem) = client.find(Locator::Css(consent_selector)).await { + consent_elem + .click() + .await + .context("Failed to click consent dismissal button")?; + // Brief delay to allow popup to close + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + + // Get the full page source for parsing + let page_source = client + .source() + .await + .context("Failed to retrieve page source")?; + + // Parse username and password using regex (assuming HTML structure like Username: value) + let user_re = + Regex::new(r"Username:\s*\s*(\w+)").context("Invalid regex for username")?; + let pass_re = + Regex::new(r"Password:\s*\s*(\w+)").context("Invalid regex for password")?; + + let username = user_re + .captures(&page_source) + .and_then(|c| c.get(1)) + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("Username not found in page source"))?; + + let password = pass_re + .captures(&page_source) + .and_then(|c| c.get(1)) + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("Password not found in page source"))?; + + // Locate all download links for OpenVPN ZIP files + let links = client + .find_all(Locator::Css(r#"a[href^="/free-openvpn-account/"][download=""]"#)) + .await + .context("Failed to find download links")?; + + // Collect relative hrefs + let mut rel_urls = Vec::new(); + for link in links { + if let Some(href) = link.attr("href").await.context("Failed to get href attribute")? { + rel_urls.push(href); + } + } + + Ok::<(String, String, Vec), anyhow::Error>((username, password, rel_urls)) + }); + + // Execute the scraping task using the pool + let (username, password, rel_urls) = task.execute_with_pool(pool).await?; + + // Base URL for resolving relative paths + let base_url = Url::parse("https://www.vpnbook.com/")?; + + // Download each ZIP file to temp_dir + let mut zip_paths = Vec::new(); + for rel in &rel_urls { + let full_url = base_url.join(rel).context("Failed to join URL")?; + let filename = rel + .split('/') + .last() + .ok_or_else(|| anyhow!("Invalid filename in URL"))? + .to_string(); + let out_path = temp_dir.join(&filename); + + // Perform HTTP GET request + let resp = reqwest::get(full_url.clone()) + .await + .with_context(|| format!("Failed to send download request for {}", full_url))?; + + if resp.status().is_success() { + let bytes = resp + .bytes() + .await + .context("Failed to read response bytes")?; + + // Write to file asynchronously + let mut file = File::create(&out_path) + .await + .context("Failed to create output file")?; + file.write_all(&bytes) + .await + .context("Failed to write to file")?; + + zip_paths.push(out_path); + } else { + return Err(anyhow!( + "Download failed with status: {} for URL: {}", + resp.status(), + full_url + )); + } + } + + // Now extract .ovpn files from each ZIP + let mut extracted_paths = Vec::new(); + for zip_path in zip_paths { + let hostname = get_hostname_from_zip_filename( + zip_path.file_name().unwrap().to_str().unwrap(), + ); + let hostname_dir = vpn_dir.join(&hostname); + tokio::fs::create_dir_all(&hostname_dir) + .await + .context("Failed to create hostname directory")?; + + // Use spawn_blocking for sync ZIP operations + let zip_path_clone = zip_path.clone(); + let hostname_dir_clone = hostname_dir.clone(); + let extract_result = tokio::task::spawn_blocking(move || { + let file = std::fs::File::open(&zip_path_clone) + .with_context(|| format!("Failed to open ZIP file: {:?}", zip_path_clone))?; + let mut archive = ZipArchive::new(file) + .with_context(|| format!("Failed to read ZIP archive: {:?}", zip_path_clone))?; + + let mut paths = Vec::new(); + for i in 0..archive.len() { + let mut zip_file = archive.by_index(i)?; + if zip_file.name().ends_with(".ovpn") { + let target_path = hostname_dir_clone.join(zip_file.name()); + let mut content = Vec::new(); + zip_file.read_to_end(&mut content)?; + + std::fs::write(&target_path, &content) + .with_context(|| format!("Failed to write .ovpn file: {:?}", target_path))?; + paths.push(target_path); + } + } + Ok::, anyhow::Error>(paths) + }) + .await + .context("Spawn blocking failed")??; + + extracted_paths.extend(extract_result); + + // Clean up the ZIP file after extraction + tokio::fs::remove_file(&zip_path) + .await + .context("Failed to remove temp ZIP file")?; + } + + // Optional: Clean up temp_dir if empty + let _ = tokio::fs::remove_dir(&temp_dir).await; + + Ok((username, password, extracted_paths)) +} + +/// Derives the hostname from the ZIP filename. +/// +/// For example, "vpnbook-openvpn-ca149.zip" -> "ca149.vpnbook.com" +/// +/// If the format doesn't match, returns "unknown.vpnbook.com". +fn get_hostname_from_zip_filename(filename: &str) -> String { + if filename.starts_with("vpnbook-openvpn-") && filename.ends_with(".zip") { + let code = filename + .strip_prefix("vpnbook-openvpn-") + .unwrap() + .strip_suffix(".zip") + .unwrap(); + format!("{}.vpnbook.com", code) + } else { + "unknown.vpnbook.com".to_string() + } +} \ No newline at end of file