Compare commits

...

2 Commits

Author SHA1 Message Date
d26e833d93 added update_rule for incremental change 2025-12-15 23:47:28 +01:00
d744769138 added companie mapping with yahoo tickers 2025-12-14 16:48:02 +01:00
18 changed files with 1943 additions and 2702 deletions

1
.gitignore vendored
View File

@@ -34,6 +34,7 @@ target/
**/*.zip **/*.zip
**/*.log **/*.log
**/*.ovpn **/*.ovpn
**/*.tmp
#/economic_events* #/economic_events*
#/economic_event_changes* #/economic_event_changes*

BIN
check.txt

Binary file not shown.

25
data_updating_rule.md Normal file
View File

@@ -0,0 +1,25 @@
# Abort-Safe Incremental JSONL Persistence Rule
**Rule:** Persist state using an *append-only, fsync-backed JSONL log with atomic checkpoints*.
**Requirements**
- Write updates as **single-line JSON objects** (one logical mutation per line).
- **Append only** (`O_APPEND`), never modify existing lines.
- After each write batch, call **`fsync`** (or `File::sync_data`) before reporting success.
- Treat a **line as committed only if it ends with `\n`**; ignore trailing partial lines on recovery.
- Periodically create a **checkpoint**:
- Write full state to `state.tmp`
- `fsync`
- **Atomic rename** to `state.jsonl`
- On startup:
- Load last checkpoint
- Replay log lines after it in order
- On abort/panic/crash:
- No truncation
- Replay guarantees no data loss beyond last fsynced line
**Outcome**
- Crash/abort-safe
- O(1) writes
- Deterministic recovery
- Minimal overhead

View File

@@ -21,21 +21,12 @@ pub struct Config {
/// If set to "true", enables automatic VPN rotation between sessions /// If set to "true", enables automatic VPN rotation between sessions
#[serde(default)] #[serde(default)]
pub enable_vpn_rotation: bool, pub enable_vpn_rotation: bool,
/// Number of tasks per session before rotating VPN
/// If set to 0, rotates VPN between economic and corporate phases
#[serde(default = "default_tasks_per_session")]
pub tasks_per_vpn_session: usize,
} }
fn default_max_parallel_instances() -> usize { fn default_max_parallel_instances() -> usize {
10 10
} }
fn default_tasks_per_session() -> usize {
0 // 0 = rotate between economic/corporate
}
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
@@ -45,7 +36,6 @@ impl Default for Config {
max_parallel_instances: default_max_parallel_instances(), max_parallel_instances: default_max_parallel_instances(),
max_tasks_per_instance: 0, max_tasks_per_instance: 0,
enable_vpn_rotation: false, enable_vpn_rotation: false,
tasks_per_vpn_session: default_tasks_per_session(),
} }
} }
} }
@@ -93,11 +83,6 @@ impl Config {
.parse::<bool>() .parse::<bool>()
.context("Failed to parse ENABLE_VPN_ROTATION as bool")?; .context("Failed to parse ENABLE_VPN_ROTATION as bool")?;
let tasks_per_vpn_session: usize = dotenvy::var("TASKS_PER_VPN_SESSION")
.unwrap_or_else(|_| "0".to_string())
.parse()
.context("Failed to parse TASKS_PER_VPN_SESSION as usize")?;
Ok(Self { Ok(Self {
economic_start_date, economic_start_date,
corporate_start_date, corporate_start_date,
@@ -105,7 +90,6 @@ impl Config {
max_parallel_instances, max_parallel_instances,
max_tasks_per_instance, max_tasks_per_instance,
enable_vpn_rotation, enable_vpn_rotation,
tasks_per_vpn_session,
}) })
} }

View File

@@ -7,5 +7,6 @@ pub mod helpers;
pub mod aggregation; pub mod aggregation;
pub mod fx; pub mod fx;
pub mod openfigi; pub mod openfigi;
pub mod yahoo;
pub use update::run_full_update; pub use update::run_full_update;

File diff suppressed because it is too large Load Diff

View File

@@ -1,318 +1,19 @@
// src/corporate/scraper.rs // src/corporate/scraper.rs
use super::{types::*, helpers::*, openfigi::*}; use super::{types::*};
//use crate::corporate::openfigi::OpenFigiClient; //use crate::corporate::openfigi::OpenFigiClient;
use crate::{scraper::webdriver::*, util::directories::DataPaths, util::logger}; use crate::{scraper::webdriver::*, util::directories::DataPaths, util::logger};
use fantoccini::{Client, Locator}; use fantoccini::{Client};
use scraper::{Html, Selector}; use scraper::{Html, Selector};
use chrono::{DateTime, Duration, NaiveDate, Utc}; use chrono::{DateTime, Duration, NaiveDate, Utc};
use tokio::{time::{Duration as TokioDuration, sleep}}; use tokio::{time::{Duration as TokioDuration, sleep}};
use reqwest::Client as HttpClient; use reqwest::Client as HttpClient;
use serde_json::{json, Value}; use serde_json::{json, Value};
use zip::ZipArchive; use zip::ZipArchive;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap};
use std::io::{Read}; use std::io::{Read};
use anyhow::{anyhow, Result};
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"; const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36";
/// Check if a ticker exists on Yahoo Finance and return core metadata.
///
/// This function calls the public Yahoo Finance quoteSummary endpoint and extracts:
/// - ISIN (when available)
/// - Company name
/// - Exchange MIC code
/// - Trading currency
///
/// It strictly filters to only accept **equity** securities.
///
/// # Arguments
/// * `ticker` - The ticker symbol to validate (e.g., "AAPL", "7203.T", "BMW.DE")
///
/// # Returns
/// `Ok(PrimaryInfo)` on success, `Err` if ticker doesn't exist, is not equity, or data is malformed.
///
/// # Errors
/// - Ticker not found
/// - Not an equity (ETF, bond, etc.)
/// - Missing critical fields
/// - Network or JSON parsing errors
/*pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result<PrimaryInfo> {
let url = format!(
"https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price%2CassetProfile",
ticker
);
let resp = match HttpClient::new()
.get(&url)
.header("User-Agent", USER_AGENT)
.send()
.await
{
Ok(resp) => resp,
Err(err) => {
return Err(anyhow::anyhow!(
"Failed to reach Yahoo Finance for ticker {}: {}",
ticker,
err
));
}
};
if !resp.status().is_success() {
return Err(anyhow::anyhow!("Yahoo returned HTTP {} for ticker {}", resp.status(), ticker));
}
let json: Value = match resp
.json()
.await {
Ok(resp) => resp,
Err(err) => {
return Err(anyhow::anyhow!(
"Failed to parse JSON response from Yahoo Finance {}: {}",
ticker,
err
));
}
};
let result_array = json["quoteSummary"]["result"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing 'quoteSummary.result' in response"))?;
if result_array.is_empty() || result_array[0].is_null() {
return Err(anyhow::anyhow!("No quote data returned for ticker {}", ticker));
}
let quote = &result_array[0]["price"];
let profile = &result_array[0]["assetProfile"];
// === 1. Must be EQUITY ===
let quote_type = quote["quoteType"]
.as_str()
.unwrap_or("")
.to_ascii_uppercase();
if quote_type != "EQUITY" {
println!(" → Skipping {} (quoteType: {})", ticker, quote_type);
return Err(anyhow::anyhow!("Not an equity security: {}", quote_type));
}
// === 2. Extract basic info ===
let long_name = quote["longName"]
.as_str()
.or_else(|| quote["shortName"].as_str())
.unwrap_or(ticker)
.trim()
.to_string();
let currency = quote["currency"]
.as_str()
.unwrap_or("USD")
.to_string();
let exchange_mic = quote["exchange"]
.as_str()
.unwrap_or("")
.to_string();
if exchange_mic.is_empty() {
return Err(anyhow::anyhow!("Missing exchange MIC for ticker {}", ticker));
}
// === 3. Extract ISIN (from assetProfile if available) ===
let isin = profile["isin"]
.as_str()
.and_then(|s| if s.len() == 12 && s.chars().all(|c| c.is_ascii_alphanumeric()) { Some(s) } else { None })
.unwrap_or("")
.to_ascii_uppercase();
// === 4. Final sanity check: reject obvious debt securities ===
let name_upper = long_name.to_ascii_uppercase();
if name_upper.contains(" BOND") ||
name_upper.contains(" NOTE") ||
name_upper.contains(" DEBENTURE") ||
name_upper.contains(" PREFERRED") && !name_upper.contains(" STOCK") {
return Err(anyhow::anyhow!("Security name suggests debt instrument: {}", long_name));
}
println!(
" → Valid equity: {} | {} | {} | ISIN: {}",
ticker,
long_name,
exchange_mic,
if isin.is_empty() { "N/A" } else { &isin }
);
Ok(PrimaryInfo {
isin,
name: long_name,
exchange_mic,
currency,
})
}*/
/// Fetches earnings events for a ticker using a dedicated ScrapeTask.
///
/// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar,
/// reject cookies, and extract the events.
///
/// # Arguments
/// * `ticker` - The stock ticker symbol.
///
/// # Returns
/// A vector of CompanyEvent structs on success.
///
/// # Errors
/// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues.
pub async fn fetch_earnings_with_pool(
ticker: &str,
pool: &Arc<ChromeDriverPool>,
) -> anyhow::Result<Vec<CompanyEvent>> {
let ticker = ticker.to_string();
let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}", ticker);
let ticker_cloned = ticker.clone();
pool.execute(url, move |client| {
let ticker = ticker_cloned.clone();
Box::pin(async move {
reject_yahoo_cookies(&client).await?;
extract_earnings_events(&client, &ticker).await
})
}).await
}
/// Extracts earnings events from the currently loaded Yahoo Finance earnings calendar page.
///
/// This function assumes the client is already navigated to the correct URL (e.g.,
/// https://finance.yahoo.com/calendar/earnings?symbol={ticker}) and cookies are handled.
///
/// It waits for the earnings table, extracts rows, parses cells into CompanyEvent structs,
/// and handles date parsing, float parsing, and optional fields.
///
/// # Arguments
/// * `client` - The fantoccini Client with the page loaded.
/// * `ticker` - The stock ticker symbol for the events.
///
/// # Returns
/// A vector of CompanyEvent on success.
///
/// # Errors
/// Returns an error if:
/// - Table or elements not found.
/// - Date or float parsing fails.
/// - WebDriver operations fail.
///
/// # Examples
///
/// ```no_run
/// use fantoccini::Client;
/// use crate::corporate::scraper::extract_earnings;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // Assume client is set up and navigated
/// let events = extract_earnings(&client, "AAPL").await?;
/// Ok(())
/// }
/// ```
pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result<Vec<CompanyEvent>> {
// Wait for the table to load
let table = client
.wait()
.for_element(Locator::Css(r#"table[data-test="cal-table"]"#))
.await
.map_err(|e| anyhow!("Failed to find earnings table: {}", e))?;
// Find all rows in tbody
let rows = table
.find_all(Locator::Css("tbody tr"))
.await
.map_err(|e| anyhow!("Failed to find table rows: {}", e))?;
let mut events = Vec::with_capacity(rows.len());
for row in rows {
let cells = row
.find_all(Locator::Css("td"))
.await
.map_err(|e| anyhow!("Failed to find cells in row: {}", e))?;
if cells.len() < 5 {
continue; // Skip incomplete rows
}
// Extract and parse date
let date_str = cells[0]
.text()
.await
.map_err(|e| anyhow!("Failed to get date text: {}", e))?;
let date = parse_yahoo_date(&date_str)
.map_err(|e| anyhow!("Failed to parse date '{}': {}", date_str, e))?
.format("%Y-%m-%d")
.to_string();
// Extract time, replace "Time Not Supplied" with empty
let time = cells[1]
.text()
.await
.map_err(|e| anyhow!("Failed to get time text: {}", e))?
.replace("Time Not Supplied", "");
// Extract period
let period = cells[2]
.text()
.await
.map_err(|e| anyhow!("Failed to get period text: {}", e))?;
// Parse EPS forecast
let eps_forecast_str = cells[3]
.text()
.await
.map_err(|e| anyhow!("Failed to get EPS forecast text: {}", e))?;
let eps_forecast = parse_float(&eps_forecast_str);
// Parse EPS actual
let eps_actual_str = cells[4]
.text()
.await
.map_err(|e| anyhow!("Failed to get EPS actual text: {}", e))?;
let eps_actual = parse_float(&eps_actual_str);
// Parse surprise % if available
let surprise_pct = if cells.len() > 5 {
let surprise_str = cells[5]
.text()
.await
.map_err(|e| anyhow!("Failed to get surprise text: {}", e))?;
parse_float(&surprise_str)
} else {
None
};
events.push(CompanyEvent {
ticker: ticker.to_string(),
date,
time,
period,
eps_forecast,
eps_actual,
revenue_forecast: None,
revenue_actual: None,
surprise_pct,
source: "Yahoo".to_string(),
});
}
if events.is_empty() {
eprintln!("Warning: No earnings events extracted for ticker {}", ticker);
} else {
println!("Extracted {} earnings events for {}", events.len(), ticker);
}
Ok(events)
}
fn parse_price(v: Option<&Value>) -> f64 { fn parse_price(v: Option<&Value>) -> f64 {
v.and_then(|x| x.as_str()) v.and_then(|x| x.as_str())
.and_then(|s| s.replace('$', "").replace(',', "").parse::<f64>().ok()) .and_then(|s| s.replace('$', "").replace(',', "").parse::<f64>().ok())
@@ -490,20 +191,17 @@ pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow
pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> { pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download"; let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download";
// Initialize DataPaths and create cache/gleif directory
let paths = DataPaths::new(".")?; let paths = DataPaths::new(".")?;
let gleif_cache_dir = paths.cache_gleif_dir(); let gleif_cache_dir = paths.cache_gleif_dir();
if let Err(e) = std::fs::create_dir_all(&gleif_cache_dir) { if let Err(e) = std::fs::create_dir_all(&gleif_cache_dir) {
let msg = format!("Failed to create cache/gleif directory: {}", e); let msg = format!("Failed to create cache/gleif directory: {}", e);
logger::log_error(&msg).await; logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
logger::log_info("Corporate Scraper: Downloading ISIN/LEI mapping from GLEIF...").await; logger::log_info("Downloading ISIN/LEI mapping from GLEIF...").await;
// Download ZIP and get the filename from Content-Disposition header
let client = match reqwest::Client::builder() let client = match reqwest::Client::builder()
.user_agent(USER_AGENT) .user_agent(USER_AGENT)
.timeout(std::time::Duration::from_secs(30)) .timeout(std::time::Duration::from_secs(30))
@@ -511,9 +209,7 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
{ {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
let msg = format!("Failed to create HTTP client: {}", e); logger::log_error(&format!("Failed to create HTTP client: {}", e)).await;
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
}; };
@@ -521,20 +217,15 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let resp = match client.get(url).send().await { let resp = match client.get(url).send().await {
Ok(r) if r.status().is_success() => r, Ok(r) if r.status().is_success() => r,
Ok(resp) => { Ok(resp) => {
let msg = format!("Server returned HTTP {}", resp.status()); logger::log_error(&format!("Server returned HTTP {}", resp.status())).await;
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
Err(e) => { Err(e) => {
let msg = format!("Failed to download ISIN/LEI ZIP: {}", e); logger::log_error(&format!("Failed to download: {}", e)).await;
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
}; };
// Extract filename from Content-Disposition header or use default
let filename = resp let filename = resp
.headers() .headers()
.get("content-disposition") .get("content-disposition")
@@ -542,11 +233,10 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
.and_then(|s| s.split("filename=").nth(1).map(|f| f.trim_matches('"').to_string())) .and_then(|s| s.split("filename=").nth(1).map(|f| f.trim_matches('"').to_string()))
.unwrap_or_else(|| "isin_lei.zip".to_string()); .unwrap_or_else(|| "isin_lei.zip".to_string());
// Parse timestamp from filename and convert to DDMMYYYY format
let parsed_filename = parse_gleif_filename(&filename); let parsed_filename = parse_gleif_filename(&filename);
logger::log_info(&format!("Corporate Scraper: Downloaded file: {} -> {}", filename, parsed_filename)).await; logger::log_info(&format!("Downloaded: {} -> {}", filename, parsed_filename)).await;
// Determine date (DDMMYYYY) from parsed filename: "isin-lei-DDMMYYYY.csv" // Extract date from filename
let mut date_str = String::new(); let mut date_str = String::new();
if let Some(start_idx) = parsed_filename.find("isin-lei-") { if let Some(start_idx) = parsed_filename.find("isin-lei-") {
let rest = &parsed_filename[start_idx + 9..]; let rest = &parsed_filename[start_idx + 9..];
@@ -555,13 +245,10 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
} }
} }
// If we parsed a date, use/create a date folder under cache/gleif and operate inside it; otherwise use cache root.
let date_dir = if !date_str.is_empty() { let date_dir = if !date_str.is_empty() {
let p = gleif_cache_dir.join(&date_str); let p = gleif_cache_dir.join(&date_str);
// Ensure the date folder exists (create if necessary)
if let Err(e) = std::fs::create_dir_all(&p) { if let Err(e) = std::fs::create_dir_all(&p) {
let msg = format!("Failed to create date directory {:?}: {}", p, e); logger::log_warn(&format!("Failed to create date directory: {}", e)).await;
logger::log_warn(&msg).await;
None None
} else { } else {
Some(p) Some(p)
@@ -570,17 +257,16 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
None None
}; };
// Choose the directory where we'll look for existing files and where we'll save the new ones
let target_dir = date_dir.clone().unwrap_or_else(|| gleif_cache_dir.to_path_buf()); let target_dir = date_dir.clone().unwrap_or_else(|| gleif_cache_dir.to_path_buf());
// If the date folder exists (or was created), prefer any *_clean.csv inside it and return that immediately // Check for existing clean CSV
if let Some(ref ddir) = date_dir { if let Some(ref ddir) = date_dir {
if let Ok(entries) = std::fs::read_dir(ddir) { if let Ok(entries) = std::fs::read_dir(ddir) {
for entry in entries.flatten() { for entry in entries.flatten() {
if let Some(name) = entry.file_name().to_str() { if let Some(name) = entry.file_name().to_str() {
if name.to_lowercase().ends_with("_clean.csv") { if name.to_lowercase().ends_with("_clean.csv") {
let path = ddir.join(name); let path = ddir.join(name);
logger::log_info(&format!("Found existing clean GLEIF CSV: {}", path.display())).await; logger::log_info(&format!("Found existing clean CSV: {}", path.display())).await;
return Ok(Some(path.to_string_lossy().to_string())); return Ok(Some(path.to_string_lossy().to_string()));
} }
} }
@@ -588,71 +274,42 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
} }
} }
// If no clean file found in the date folder (or date folder doesn't exist), check whether the csv/zip already exist in the target dir let csv_candidate = target_dir.join(parsed_filename.replace(".zip", ".csv"));
let csv_candidate_name = parsed_filename.replace(".zip", ".csv");
let csv_candidate = target_dir.join(&csv_candidate_name);
let zip_candidate = target_dir.join(&parsed_filename);
if csv_candidate.exists() { if csv_candidate.exists() {
logger::log_info(&format!("Found existing GLEIF CSV: {}", csv_candidate.display())).await; logger::log_info(&format!("Found existing CSV: {}", csv_candidate.display())).await;
return Ok(Some(csv_candidate.to_string_lossy().to_string())); return Ok(Some(csv_candidate.to_string_lossy().to_string()));
} }
if zip_candidate.exists() {
// If zip exists but csv does not, extract later; for now prefer returning csv path (may be created by extraction step)
let inferred_csv = target_dir.join(csv_candidate_name);
if inferred_csv.exists() {
logger::log_info(&format!("Found existing extracted CSV next to ZIP: {}", inferred_csv.display())).await;
return Ok(Some(inferred_csv.to_string_lossy().to_string()));
}
// otherwise we'll overwrite/extract into target_dir below
}
let bytes = match resp.bytes().await { let bytes = match resp.bytes().await {
Ok(b) => b, Ok(b) => b,
Err(e) => { Err(e) => {
let msg = format!("Failed to read ZIP bytes: {}", e); logger::log_error(&format!("Failed to read bytes: {}", e)).await;
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
}; };
// Ensure target directory exists (create if it's the date folder and was absent earlier)
if let Some(ref ddir) = date_dir {
let _ = std::fs::create_dir_all(ddir);
}
let zip_path = target_dir.join(&parsed_filename); let zip_path = target_dir.join(&parsed_filename);
let csv_path = target_dir.join(parsed_filename.replace(".zip", ".csv")); let csv_path = target_dir.join(parsed_filename.replace(".zip", ".csv"));
if let Err(e) = tokio::fs::write(&zip_path, &bytes).await { if let Err(e) = tokio::fs::write(&zip_path, &bytes).await {
let msg = format!("Failed to write ZIP file: {}", e); logger::log_error(&format!("Failed to write ZIP: {}", e)).await;
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
logger::log_info(&format!("Corporate Scraper: Saved ZIP to {:?}", zip_path)).await;
// Extract CSV // Extract CSV from ZIP
let archive = match std::fs::File::open(&zip_path) let archive = match std::fs::File::open(&zip_path).map(ZipArchive::new) {
.map(ZipArchive::new)
{
Ok(Ok(a)) => a, Ok(Ok(a)) => a,
Ok(Err(e)) => { Ok(Err(e)) => {
let msg = format!("Invalid ZIP: {}", e); logger::log_error(&format!("Invalid ZIP: {}", e)).await;
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
Err(e) => { Err(e) => {
let msg = format!("Cannot open ZIP file: {}", e); logger::log_error(&format!("Cannot open ZIP: {}", e)).await;
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
}; };
let mut archive = archive; let mut archive = archive;
let idx = match (0..archive.len()).find(|&i| { let idx = match (0..archive.len()).find(|&i| {
archive.by_index(i) archive.by_index(i)
.map(|f| f.name().ends_with(".csv")) .map(|f| f.name().ends_with(".csv"))
@@ -660,9 +317,7 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
}) { }) {
Some(i) => i, Some(i) => i,
None => { None => {
let msg = "ZIP did not contain a CSV file"; logger::log_error("ZIP contains no CSV").await;
logger::log_error(msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
}; };
@@ -670,43 +325,32 @@ pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let mut csv_file = match archive.by_index(idx) { let mut csv_file = match archive.by_index(idx) {
Ok(f) => f, Ok(f) => f,
Err(e) => { Err(e) => {
let msg = format!("Failed to read CSV entry: {}", e); logger::log_error(&format!("Failed to read CSV: {}", e)).await;
logger::log_error(&msg).await;
println!("{}", msg);
return Ok(None); return Ok(None);
} }
}; };
let mut csv_bytes = Vec::new(); let mut csv_bytes = Vec::new();
if let Err(e) = csv_file.read_to_end(&mut csv_bytes) { if let Err(e) = csv_file.read_to_end(&mut csv_bytes) {
let msg = format!("Failed to extract CSV: {}", e); logger::log_error(&format!("Failed to extract: {}", e)).await;
logger::log_error(&msg).await;
return Ok(None); return Ok(None);
} }
if let Err(e) = tokio::fs::write(&csv_path, &csv_bytes).await { if let Err(e) = tokio::fs::write(&csv_path, &csv_bytes).await {
let msg = format!("Failed to save CSV file: {}", e); logger::log_error(&format!("Failed to save CSV: {}", e)).await;
logger::log_error(&msg).await;
return Ok(None); return Ok(None);
} }
let msg = format!(" ISIN/LEI CSV extracted: {:?}", csv_path); logger::log_info(&format!("✓ CSV extracted: {:?}", csv_path)).await;
logger::log_info(&msg).await;
Ok(Some(csv_path.to_string_lossy().to_string())) Ok(Some(csv_path.to_string_lossy().to_string()))
} }
/// Parse GLEIF filename and convert timestamp to DDMMYYYY format
/// Example: "isin-lei-20251124T080254.csv" -> "isin-lei-24112025.csv"
fn parse_gleif_filename(filename: &str) -> String { fn parse_gleif_filename(filename: &str) -> String {
// Try to find pattern: isin-lei-YYYYMMDDTHHMMSS.zip/csv
if let Some(start_idx) = filename.find("isin-lei-") { if let Some(start_idx) = filename.find("isin-lei-") {
let rest = &filename[start_idx + 9..]; // After "isin-lei-" let rest = &filename[start_idx + 9..];
// Extract the 8 digits (YYYYMMDD)
if rest.len() >= 8 && rest[0..8].chars().all(|c| c.is_numeric()) { if rest.len() >= 8 && rest[0..8].chars().all(|c| c.is_numeric()) {
let date_part = &rest[0..8]; let date_part = &rest[0..8];
// date_part is YYYYMMDD, convert to DDMMYYYY
if date_part.len() == 8 { if date_part.len() == 8 {
let year = &date_part[0..4]; let year = &date_part[0..4];
let month = &date_part[4..6]; let month = &date_part[4..6];
@@ -717,11 +361,9 @@ fn parse_gleif_filename(filename: &str) -> String {
} }
} }
// Fallback: return original filename if parsing fails
filename.to_string() filename.to_string()
} }
pub async fn load_isin_lei_csv() -> anyhow::Result<HashMap<String, Vec<String>>> { pub async fn load_isin_lei_csv() -> anyhow::Result<HashMap<String, Vec<String>>> {
// 1. Download + extract the CSV (this is now async) // 1. Download + extract the CSV (this is now async)
let csv_path = match download_isin_lei_csv().await? { let csv_path = match download_isin_lei_csv().await? {
@@ -770,29 +412,3 @@ pub async fn load_isin_lei_csv() -> anyhow::Result<HashMap<String, Vec<String>>>
Ok(map) Ok(map)
} }
pub async fn reject_yahoo_cookies(client: &Client) -> anyhow::Result<()> {
for _ in 0..10 {
let clicked: bool = client
.execute(
r#"(() => {
const btn = document.querySelector('#consent-page .reject-all');
if (btn) {
btn.click();
return true;
}
return false;
})()"#,
vec![],
)
.await?
.as_bool()
.unwrap_or(false);
if clicked { break; }
sleep(TokioDuration::from_millis(500)).await;
}
println!("Rejected Yahoo cookies if button existed");
Ok(())
}

View File

@@ -6,49 +6,12 @@ use crate::util::logger;
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use chrono::{Datelike, NaiveDate}; use chrono::{Datelike, NaiveDate};
use std::collections::{HashMap}; use std::collections::HashMap;
use std::path::{PathBuf, Path}; use std::path::{PathBuf, Path};
const BATCH_SIZE: usize = 500; // Process 500 events at a time const BATCH_SIZE: usize = 500;
/// Load events in streaming fashion to avoid memory buildup /// Lightweight index entry - only metadata, no full event data
pub async fn load_existing_events_streaming(
paths: &DataPaths,
callback: impl Fn(CompanyEvent) -> anyhow::Result<()>
) -> anyhow::Result<usize> {
let dir = paths.corporate_events_dir();
if !dir.exists() {
logger::log_info("Corporate Storage: No existing events directory found").await;
return Ok(0);
}
let mut total = 0;
let mut entries = fs::read_dir(dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") {
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if name.starts_with("events_") && name.len() == 17 {
let content = fs::read_to_string(&path).await?;
let events: Vec<CompanyEvent> = serde_json::from_str(&content)?;
for event in events {
callback(event)?;
total += 1;
}
// Yield to prevent blocking
tokio::task::yield_now().await;
}
}
}
logger::log_info(&format!("Corporate Storage: Streamed {} events", total)).await;
Ok(total)
}
/// Build lightweight index of events instead of loading everything
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct EventIndex { pub struct EventIndex {
pub key: String, pub key: String,
@@ -57,9 +20,11 @@ pub struct EventIndex {
pub file_path: PathBuf, pub file_path: PathBuf,
} }
/// Build index of all events without loading them into memory
pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result<Vec<EventIndex>> { pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result<Vec<EventIndex>> {
let dir = paths.corporate_events_dir(); let dir = paths.corporate_events_dir();
if !dir.exists() { if !dir.exists() {
logger::log_info("Corporate Storage: No events directory found").await;
return Ok(Vec::new()); return Ok(Vec::new());
} }
@@ -90,7 +55,7 @@ pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result<Vec<EventInd
Ok(index) Ok(index)
} }
/// Lookup specific event by loading only its file /// Load specific event by key (only loads its file)
pub async fn lookup_event_by_key( pub async fn lookup_event_by_key(
key: &str, key: &str,
index: &[EventIndex] index: &[EventIndex]
@@ -106,9 +71,48 @@ pub async fn lookup_event_by_key(
} }
} }
/// Stream events file by file with callback
pub async fn stream_events_with_callback<F>(
paths: &DataPaths,
mut callback: F
) -> anyhow::Result<usize>
where
F: FnMut(CompanyEvent) -> anyhow::Result<()>,
{
let dir = paths.corporate_events_dir();
if !dir.exists() {
return Ok(0);
}
let mut total = 0;
let mut entries = fs::read_dir(dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") {
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if name.starts_with("events_") {
let content = fs::read_to_string(&path).await?;
let events: Vec<CompanyEvent> = serde_json::from_str(&content)?;
for event in events {
callback(event)?;
total += 1;
}
tokio::task::yield_now().await;
}
}
}
logger::log_info(&format!("Corporate Storage: Streamed {} events", total)).await;
Ok(total)
}
/// Save events organized by month (accepts Vec, not HashMap)
pub async fn save_optimized_events( pub async fn save_optimized_events(
paths: &DataPaths, paths: &DataPaths,
events: Vec<CompanyEvent> // Changed from HashMap to Vec events: Vec<CompanyEvent>
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let dir = paths.corporate_events_dir(); let dir = paths.corporate_events_dir();
fs::create_dir_all(dir).await?; fs::create_dir_all(dir).await?;
@@ -124,16 +128,14 @@ pub async fn save_optimized_events(
removed_count += 1; removed_count += 1;
} }
} }
logger::log_info(&format!("Corporate Storage: Removed {} old event files", removed_count)).await; logger::log_info(&format!("Corporate Storage: Removed {} old files", removed_count)).await;
let total_events = events.len(); let total_events = events.len();
let mut sorted = events; let mut sorted = events;
sorted.sort_by(|a, b| { sorted.sort_by(|a, b| {
a.ticker.cmp(&b.ticker) a.ticker.cmp(&b.ticker).then(a.date.cmp(&b.date))
.then(a.date.cmp(&b.date))
}); });
// Process in batches to avoid memory buildup
let mut by_month: HashMap<String, Vec<CompanyEvent>> = HashMap::new(); let mut by_month: HashMap<String, Vec<CompanyEvent>> = HashMap::new();
for chunk in sorted.chunks(BATCH_SIZE) { for chunk in sorted.chunks(BATCH_SIZE) {
@@ -146,27 +148,28 @@ pub async fn save_optimized_events(
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
let total_months = by_month.len();
for (month, list) in by_month { for (month, list) in by_month {
let path = dir.join(format!("events_{}.json", month)); let path = dir.join(format!("events_{}.json", month));
fs::write(&path, serde_json::to_string_pretty(&list)?).await?; fs::write(&path, serde_json::to_string_pretty(&list)?).await?;
logger::log_info(&format!("Corporate Storage: Saved {} events for month {}", list.len(), month)).await; logger::log_info(&format!("Saved {} events for month {}", list.len(), month)).await;
} }
logger::log_info(&format!("Corporate Storage: Saved {} total events in {} month files", total_events, total_months)).await; logger::log_info(&format!("Saved {} total events", total_events)).await;
Ok(()) Ok(())
} }
pub async fn save_changes(paths: &DataPaths, changes: &[CompanyEventChange]) -> anyhow::Result<()> { pub async fn save_changes(
paths: &DataPaths,
changes: &[CompanyEventChange]
) -> anyhow::Result<()> {
if changes.is_empty() { if changes.is_empty() {
logger::log_info("Corporate Storage: No changes to save").await; logger::log_info("Corporate Storage: No changes to save").await;
return Ok(()); return Ok(());
} }
let dir = paths.corporate_changes_dir(); let dir = paths.corporate_changes_dir();
fs::create_dir_all(dir).await?; fs::create_dir_all(dir).await?;
logger::log_info(&format!("Corporate Storage: Saving {} changes", changes.len())).await;
let mut by_month: HashMap<String, Vec<CompanyEventChange>> = HashMap::new(); let mut by_month: HashMap<String, Vec<CompanyEventChange>> = HashMap::new();
for c in changes { for c in changes {
if let Ok(d) = NaiveDate::parse_from_str(&c.date, "%Y-%m-%d") { if let Ok(d) = NaiveDate::parse_from_str(&c.date, "%Y-%m-%d") {
@@ -180,12 +183,13 @@ pub async fn save_changes(paths: &DataPaths, changes: &[CompanyEventChange]) ->
let mut all = if path.exists() { let mut all = if path.exists() {
let s = fs::read_to_string(&path).await?; let s = fs::read_to_string(&path).await?;
serde_json::from_str(&s).unwrap_or_default() serde_json::from_str(&s).unwrap_or_default()
} else { vec![] }; } else {
vec![]
};
all.extend(list.clone()); all.extend(list.clone());
fs::write(&path, serde_json::to_string_pretty(&all)?).await?; fs::write(&path, serde_json::to_string_pretty(&all)?).await?;
logger::log_info(&format!("Corporate Storage: Saved {} changes for month {}", list.len(), month)).await;
} }
logger::log_info("Corporate Storage: All changes saved successfully").await;
Ok(()) Ok(())
} }
@@ -203,9 +207,7 @@ pub async fn save_prices_for_ticker(
let path = timeframe_dir.join("prices.json"); let path = timeframe_dir.join("prices.json");
prices.sort_by_key(|p| (p.date.clone(), p.time.clone())); prices.sort_by_key(|p| (p.date.clone(), p.time.clone()));
fs::write(&path, serde_json::to_string_pretty(&prices)?).await?;
let json = serde_json::to_string_pretty(&prices)?;
fs::write(&path, json).await?;
Ok(()) Ok(())
} }
@@ -240,7 +242,10 @@ pub async fn save_available_exchanges(
Ok(()) Ok(())
} }
pub async fn load_available_exchanges(paths: &DataPaths, lei: &str) -> anyhow::Result<Vec<AvailableExchange>> { pub async fn load_available_exchanges(
paths: &DataPaths,
lei: &str
) -> anyhow::Result<Vec<AvailableExchange>> {
let path = get_company_dir(paths, lei).join("available_exchanges.json"); let path = get_company_dir(paths, lei).join("available_exchanges.json");
if path.exists() { if path.exists() {
let content = fs::read_to_string(&path).await?; let content = fs::read_to_string(&path).await?;
@@ -267,15 +272,13 @@ pub async fn save_prices_by_source(
Ok(()) Ok(())
} }
/// Saves companies data to a JSONL file in streaming fashion /// Stream companies to JSONL incrementally
pub async fn save_companies_to_jsonl_streaming( pub async fn save_companies_to_jsonl_streaming(
paths: &DataPaths, paths: &DataPaths,
companies: &HashMap<String, HashMap<String, String>>, companies_iter: impl Iterator<Item = (String, HashMap<String, String>)>,
) -> anyhow::Result<()> { ) -> anyhow::Result<usize> {
let file_path = paths.data_dir().join("companies.jsonl"); let file_path = paths.data_dir().join("companies.jsonl");
logger::log_info(&format!("Corporate Storage: Saving {} companies to JSONL", companies.len())).await;
if let Some(parent) = file_path.parent() { if let Some(parent) = file_path.parent() {
tokio::fs::create_dir_all(parent).await?; tokio::fs::create_dir_all(parent).await?;
} }
@@ -283,32 +286,33 @@ pub async fn save_companies_to_jsonl_streaming(
let mut file = tokio::fs::File::create(&file_path).await?; let mut file = tokio::fs::File::create(&file_path).await?;
let mut count = 0; let mut count = 0;
// Process in batches for (name, securities) in companies_iter {
for (name, securities) in companies.iter() {
let line = serde_json::json!({ let line = serde_json::json!({
"name": name, "name": name,
"securities": securities "securities": securities
}); });
file.write_all(line.to_string().as_bytes()).await?; file.write_all(line.to_string().as_bytes()).await?;
file.write_all(b"\n").await?; file.write_all(b"\n").await?;
count += 1; count += 1;
if count % 100 == 0 { if count % 100 == 0 {
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
} }
let msg = format!("Saved {} companies to {:?}", companies.len(), file_path); logger::log_info(&format!("Saved {} companies to JSONL", count)).await;
println!("{}", msg); Ok(count)
logger::log_info(&msg).await;
Ok(())
} }
/// Load companies from JSONL in streaming fashion /// Stream read companies from JSONL
pub async fn load_companies_from_jsonl_streaming( pub async fn stream_companies_from_jsonl<F>(
path: &Path, path: &Path,
callback: impl Fn(String, HashMap<String, String>) -> anyhow::Result<()> mut callback: F
) -> anyhow::Result<usize> { ) -> anyhow::Result<usize>
where
F: FnMut(String, HashMap<String, String>) -> anyhow::Result<()>,
{
if !path.exists() { if !path.exists() {
return Ok(0); return Ok(0);
} }

View File

@@ -79,15 +79,20 @@ pub struct CompanyInfo{
pub securities: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> pub securities: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo>
} }
/// Company Meta Data #[derive(Debug, Clone, Serialize, Deserialize)]
/// # Attributes pub struct YahooCompanyDetails {
/// * lei: Structuring the companies by legal dependencies [LEI -> Vec<ISIN>] pub ticker: String,
/// * figi: metadata with ISIN as key pub sector: Option<String>,
/*#[derive(Debug, Clone, Serialize, Deserialize)] pub exchange: Option<String>,
pub struct CompanyMetadata { }
pub lei: String,
pub figi: Option<Vec<FigiInfo>>, #[derive(Debug, Clone, Serialize, Deserialize)]
}*/ pub struct CompanyCrossPlatformInfo {
pub name: String,
pub isin_tickers_map: HashMap<String, Vec<String>>, // ISIN -> Tickers
pub sector: Option<String>,
pub exchange: Option<String>,
}
/// Warrant Info /// Warrant Info
/// ///
@@ -118,14 +123,6 @@ pub struct OptionInfo {
pub options: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN) pub options: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
} }
/*#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrimaryInfo {
pub isin: String,
pub name: String,
pub exchange_mic: String,
pub currency: String,
}*/
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AvailableExchange { pub struct AvailableExchange {
pub exchange_mic: String, pub exchange_mic: String,

View File

@@ -1,170 +1,416 @@
// src/corporate/update.rs // src/corporate/update.rs - ABORT-SAFE VERSION WITH JSONL LOG
use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfigi::*};
use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*};
use crate::config::Config; use crate::config::Config;
use crate::util::directories::DataPaths; use crate::util::directories::DataPaths;
use crate::util::logger; use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool; use crate::scraper::webdriver::ChromeDriverPool;
use chrono::Local; use chrono::Local;
use std::collections::{HashMap}; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
/// Main function: Full update for all companies with streaming to minimize memory usage pub async fn run_full_update(
pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> { _config: &Config,
let msg = "=== Starting LEI-based corporate full update (STREAMING) ==="; pool: &Arc<ChromeDriverPool>,
println!("{}", msg); shutdown_flag: &Arc<AtomicBool>,
logger::log_info(msg).await; ) -> anyhow::Result<()> {
logger::log_info("=== Corporate Update (STREAMING MODE) ===").await;
let paths = DataPaths::new(".")?; let paths = DataPaths::new(".")?;
// Step 1: Download/locate GLEIF CSV (don't load into memory yet) logger::log_info("Step 1: Downloading GLEIF CSV...").await;
logger::log_info("Corporate Update: Downloading/locating GLEIF CSV...").await;
let gleif_csv_path = match download_isin_lei_csv().await? { let gleif_csv_path = match download_isin_lei_csv().await? {
Some(p) => { Some(p) => {
logger::log_info(&format!("Corporate Update: GLEIF CSV at: {}", p)).await; logger::log_info(&format!(" GLEIF CSV at: {}", p)).await;
p p
} }
None => { None => {
logger::log_warn("Corporate Update: Could not obtain GLEIF CSV, continuing with limited data").await; logger::log_warn(" Could not obtain GLEIF CSV").await;
return Ok(()); return Ok(());
} }
}; };
// Step 2: Load OpenFIGI type lists (small, cached) if shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Corporate Update: Loading OpenFIGI type lists...").await; return Ok(());
if let Err(e) = load_figi_type_lists().await {
logger::log_warn(&format!("Could not load OpenFIGI type lists: {}", e)).await;
} }
// Step 3: Process GLEIF → FIGI mapping in streaming fashion logger::log_info("Step 2: Loading OpenFIGI metadata...").await;
logger::log_info("Corporate Update: Building FIGI mappings (streaming)...").await; load_figi_type_lists().await.ok();
logger::log_info(" ✓ OpenFIGI metadata loaded").await;
// Build LEI→ISINs map by streaming the CSV if shutdown_flag.load(Ordering::SeqCst) {
let mut lei_to_isins: HashMap<String, Vec<String>> = HashMap::new(); return Ok(());
let mut lei_batch = Vec::new(); }
const LEI_BATCH_SIZE: usize = 1000;
stream_gleif_csv(&gleif_csv_path, |lei, isin| { logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await;
lei_to_isins.entry(lei.clone()).or_default().push(isin); let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?;
lei_batch.push(lei);
// Process in batches if !all_mapped {
if lei_batch.len() >= LEI_BATCH_SIZE { logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await;
lei_batch.clear();
}
Ok(())
}).await?;
logger::log_info(&format!("Corporate Update: Collected {} LEIs", lei_to_isins.len())).await;
// Step 4: Build FIGI mappings in batches (process and save incrementally)
logger::log_info("Corporate Update: Processing FIGI mappings in batches...").await;
let figi_result = build_lei_to_figi_infos(&lei_to_isins, None).await;
// Don't keep the full result in memory - it's already saved to JSONL files
drop(figi_result);
drop(lei_to_isins); // Release this too
logger::log_info("Corporate Update: FIGI mappings saved to cache").await;
// Step 5: Load or build securities (streaming from JSONL files)
logger::log_info("Corporate Update: Building securities map (streaming)...").await;
let dir = DataPaths::new(".")?;
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
// Find the most recent date directory
let date_dir = find_most_recent_date_dir(&map_cache_dir).await?;
let (common_stocks, _warrants, _options) = if let Some(date_dir) = date_dir {
logger::log_info(&format!("Using FIGI data from: {:?}", date_dir)).await;
load_or_build_all_securities_streaming(&date_dir).await?
} else { } else {
logger::log_warn("No FIGI date directory found, using empty maps").await; logger::log_info(" ✓ All LEIs successfully mapped").await;
(HashMap::new(), HashMap::new(), HashMap::new()) }
};
logger::log_info(&format!("Corporate Update: Processing {} companies", common_stocks.len())).await; if shutdown_flag.load(Ordering::SeqCst) {
return Ok(());
}
// Step 6: Convert to simplified companies map and save incrementally logger::log_info("Step 4: Building securities map (streaming)...").await;
logger::log_info("Corporate Update: Building companies JSONL (streaming)...").await; let date_dir = find_most_recent_figi_date_dir(&paths).await?;
if let Some(date_dir) = date_dir {
logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await;
build_securities_from_figi_streaming(&date_dir).await?;
logger::log_info(" ✓ Securities map updated").await;
} else {
logger::log_warn(" ✗ No FIGI data directory found").await;
}
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(());
}
logger::log_info("Step 5: Building companies.jsonl (streaming with abort-safe persistence)...").await;
let count = build_companies_jsonl_streaming(&paths, pool, shutdown_flag).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 6: Processing events (using index)...").await;
let _event_index = build_event_index(&paths).await?;
logger::log_info(" ✓ Event index built").await;
}
logger::log_info("✓ Corporate update complete").await;
Ok(())
}
/// Abort-safe incremental JSONL persistence with atomic checkpoints
///
/// Implements the data_updating_rule.md specification:
/// - Append-only JSONL log for all updates
/// - fsync after each write batch
/// - Atomic checkpoints via temp file + rename
/// - Crash recovery by loading checkpoint + replaying log
/// - Partial lines ignored during recovery
async fn build_companies_jsonl_streaming(
paths: &DataPaths,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<usize> {
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
let securities_path = corporate_path.join("common_stocks.json");
if !securities_path.exists() {
logger::log_warn("No common_stocks.json found").await;
return Ok(0);
}
let content = tokio::fs::read_to_string(securities_path).await?;
let securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
let companies_path = paths.data_dir().join("companies.jsonl"); let companies_path = paths.data_dir().join("companies.jsonl");
let log_path = paths.data_dir().join("companies_updates.log");
// Create file and write incrementally
if let Some(parent) = companies_path.parent() { if let Some(parent) = companies_path.parent() {
tokio::fs::create_dir_all(parent).await?; tokio::fs::create_dir_all(parent).await?;
} }
let mut file = tokio::fs::File::create(&companies_path).await?; // === RECOVERY PHASE 1: Load last checkpoint ===
let mut processed = 0; let mut existing_companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new();
let mut processed_names: std::collections::HashSet<String> = std::collections::HashSet::new();
for (name, company_info) in common_stocks.iter() { if companies_path.exists() {
let mut isin_ticker_pairs: HashMap<String, String> = HashMap::new(); logger::log_info("Loading checkpoint from companies.jsonl...").await;
let existing_content = tokio::fs::read_to_string(&companies_path).await?;
for line in existing_content.lines() {
if line.trim().is_empty() {
continue;
}
// Only process complete lines (ending with proper JSON closing brace)
// This ensures we don't process partial writes from crashed processes
if !line.ends_with('}') {
logger::log_warn(&format!("Skipping incomplete checkpoint line: {}", &line[..line.len().min(50)])).await;
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company);
}
Err(e) => {
logger::log_warn(&format!("Failed to parse checkpoint line: {}", e)).await;
}
}
}
logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await;
}
// === RECOVERY PHASE 2: Replay log after checkpoint ===
if log_path.exists() {
logger::log_info("Replaying update log...").await;
let log_content = tokio::fs::read_to_string(&log_path).await?;
let mut replayed = 0;
for line in log_content.lines() {
if line.trim().is_empty() {
continue;
}
// Only replay complete lines (crash-safe: incomplete lines are ignored)
// A line is considered complete only if it ends with '\n' and valid JSON
if !line.ends_with('}') {
logger::log_warn(&format!("Skipping incomplete log line: {}", &line[..line.len().min(50)])).await;
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company);
replayed += 1;
}
Err(e) => {
logger::log_warn(&format!("Failed to parse log line: {}", e)).await;
}
}
}
if replayed > 0 {
logger::log_info(&format!("Replayed {} updates from log", replayed)).await;
}
}
// === APPEND-ONLY LOG: Open in append mode with O_APPEND semantics ===
use tokio::fs::OpenOptions;
let mut log_file = OpenOptions::new()
.create(true)
.append(true) // O_APPEND - atomic append operations
.open(&log_path)
.await?;
let mut count = existing_companies.len();
let mut updated_count = 0;
let mut new_count = 0;
let checkpoint_interval = 50; // Create atomic checkpoint every 50 updates
let mut updates_since_checkpoint = 0;
use tokio::io::AsyncWriteExt;
for (name, company_info) in securities.iter() {
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Shutdown requested - stopping company processing").await;
break;
}
// Skip if already processed (from checkpoint or log replay)
if processed_names.contains(name) {
continue;
}
let existing_entry = existing_companies.get(name).cloned();
let is_update = existing_entry.is_some();
let mut isin_tickers_map: HashMap<String, Vec<String>> =
existing_entry
.as_ref()
.map(|e| e.isin_tickers_map.clone())
.unwrap_or_default();
let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone());
let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone());
let mut unique_isin_ticker_pairs: HashMap<String, Vec<String>> = HashMap::new();
for figi_infos in company_info.securities.values() { for figi_infos in company_info.securities.values() {
for figi_info in figi_infos { for figi_info in figi_infos {
if !figi_info.isin.is_empty() && !figi_info.ticker.is_empty() { if !figi_info.isin.is_empty() {
isin_ticker_pairs.insert(figi_info.isin.clone(), figi_info.ticker.clone()); let tickers = unique_isin_ticker_pairs
.entry(figi_info.isin.clone())
.or_insert_with(Vec::new);
if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) {
tickers.push(figi_info.ticker.clone());
}
} }
} }
} }
if !isin_ticker_pairs.is_empty() { for (isin, figi_tickers) in unique_isin_ticker_pairs {
use tokio::io::AsyncWriteExt; if shutdown_flag.load(Ordering::SeqCst) {
break;
}
let line = serde_json::json!({ let tickers = isin_tickers_map
"name": name, .entry(isin.clone())
"securities": isin_ticker_pairs .or_insert_with(Vec::new);
});
file.write_all(line.to_string().as_bytes()).await?; for figi_ticker in figi_tickers {
file.write_all(b"\n").await?; if !tickers.contains(&figi_ticker) {
processed += 1; tickers.push(figi_ticker);
}
}
// Yield periodically let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:"));
if processed % 100 == 0 {
if !has_yahoo_ticker && !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
match scrape_company_details_by_isin(pool, &isin).await {
Ok(Some(details)) => {
logger::log_info(&format!("✓ Found Yahoo ticker {} for ISIN {}", details.ticker, isin)).await;
tickers.push(format!("YAHOO:{}", details.ticker));
if sector.is_none() && details.sector.is_some() {
sector = details.sector.clone();
logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await;
}
if exchange.is_none() && details.exchange.is_some() {
exchange = details.exchange.clone();
logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await;
}
},
Ok(None) => {
logger::log_warn(&format!("◯ No search results for ISIN {}", isin)).await;
tickers.push("YAHOO:NO_RESULTS".to_string());
},
Err(e) => {
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await;
}
}
}
}
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
if !isin_tickers_map.is_empty() {
let company_entry = CompanyCrossPlatformInfo {
name: name.clone(),
isin_tickers_map,
sector,
exchange,
};
// === APPEND-ONLY: Write single-line JSON with fsync ===
// This guarantees the line is either fully written or not at all
let line = serde_json::to_string(&company_entry)?;
log_file.write_all(line.as_bytes()).await?;
log_file.write_all(b"\n").await?;
log_file.flush().await?;
// Critical: fsync to ensure durability before considering write successful
// This prevents data loss on power failure or kernel panic
log_file.sync_data().await?;
// Update in-memory state ONLY after successful fsync
processed_names.insert(name.clone());
existing_companies.insert(name.clone(), company_entry);
count += 1;
updates_since_checkpoint += 1;
if is_update {
updated_count += 1;
} else {
new_count += 1;
}
// === ATOMIC CHECKPOINT: Periodically create checkpoint ===
// This reduces recovery time by snapshotting current state
if updates_since_checkpoint >= checkpoint_interval {
logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await;
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
// Write all current state to temporary checkpoint file
for company in existing_companies.values() {
let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
}
checkpoint_file.flush().await?;
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
// Atomic rename - this is the commit point
// After this succeeds, the checkpoint is visible
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
// Clear log after successful checkpoint
// Any entries before this point are now captured in the checkpoint
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.await?;
updates_since_checkpoint = 0;
logger::log_info("✓ Checkpoint created and log cleared").await;
}
if count % 10 == 0 {
logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await;
tokio::task::yield_now().await; tokio::task::yield_now().await;
logger::log_info(&format!("Saved {} companies so far...", processed)).await;
} }
} }
} }
logger::log_info(&format!("Corporate Update: Saved {} companies to JSONL", processed)).await; // === FINAL CHECKPOINT: Write complete final state ===
// This ensures we don't need to replay the log on next startup
if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 {
logger::log_info("Creating final checkpoint...").await;
// Step 7: Process events in streaming fashion let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
logger::log_info("Corporate Update: Processing events (streaming)...").await; let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
let event_index = build_event_index(&paths).await?; for company in existing_companies.values() {
logger::log_info(&format!("Corporate Update: Built index of {} events", event_index.len())).await; let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
}
// For now, we just maintain the index checkpoint_file.flush().await?;
// In a full implementation, you'd stream through tickers and update events checkpoint_file.sync_all().await?;
drop(checkpoint_file);
// Step 8: Save any updates // Atomic rename makes final checkpoint visible
logger::log_info("Corporate Update: Finalizing...").await; tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
let msg = "✓ Corporate update complete (streaming)"; // Clean up log
println!("{}", msg); drop(log_file);
logger::log_info(msg).await; tokio::fs::remove_file(&log_path).await.ok();
Ok(())
logger::log_info("✓ Final checkpoint created").await;
}
logger::log_info(&format!("Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count)).await;
Ok(count)
} }
/// Helper to find the most recent date directory in the FIGI cache async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> {
async fn find_most_recent_date_dir(map_cache_dir: &std::path::Path) -> anyhow::Result<Option<std::path::PathBuf>> { let map_cache_dir = paths.cache_gleif_openfigi_map_dir();
if !map_cache_dir.exists() { if !map_cache_dir.exists() {
return Ok(None); return Ok(None);
} }
let mut entries = tokio::fs::read_dir(map_cache_dir).await?; let mut entries = tokio::fs::read_dir(&map_cache_dir).await?;
let mut dates = Vec::new(); let mut dates = Vec::new();
while let Some(entry) = entries.next_entry().await? { while let Some(entry) = entries.next_entry().await? {
let path = entry.path(); let path = entry.path();
if path.is_dir() { if path.is_dir() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) { if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
// Date format: DDMMYYYY
if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { if name.len() == 8 && name.chars().all(|c| c.is_numeric()) {
dates.push((name.to_string(), path)); dates.push((name.to_string(), path));
} }
@@ -176,9 +422,7 @@ async fn find_most_recent_date_dir(map_cache_dir: &std::path::Path) -> anyhow::R
return Ok(None); return Ok(None);
} }
// Sort by date (DDMMYYYY format) dates.sort_by(|a, b| b.0.cmp(&a.0));
dates.sort_by(|a, b| b.0.cmp(&a.0)); // Descending order
Ok(Some(dates[0].1.clone())) Ok(Some(dates[0].1.clone()))
} }
@@ -186,57 +430,6 @@ pub struct ProcessResult {
pub changes: Vec<CompanyEventChange>, pub changes: Vec<CompanyEventChange>,
} }
/// Process events in batches to avoid memory buildup
pub async fn process_events_streaming(
index: &[EventIndex],
new_events: &[CompanyEvent],
today: &str,
) -> anyhow::Result<(Vec<CompanyEventChange>, Vec<CompanyEvent>)> {
let mut all_changes = Vec::new();
let mut final_events: HashMap<String, CompanyEvent> = HashMap::new();
// Step 1: Load existing events in batches using the index
logger::log_info("Loading existing events in batches...").await;
let mut loaded_files = std::collections::HashSet::new();
for entry in index {
if loaded_files.contains(&entry.file_path) {
continue;
}
let content = tokio::fs::read_to_string(&entry.file_path).await?;
let events: Vec<CompanyEvent> = serde_json::from_str(&content)?;
for e in events {
final_events.insert(event_key(&e), e);
}
loaded_files.insert(entry.file_path.clone());
if final_events.len() % 1000 == 0 {
logger::log_info(&format!("Loaded {} events so far...", final_events.len())).await;
tokio::task::yield_now().await;
}
}
logger::log_info(&format!("Loaded {} existing events", final_events.len())).await;
// Step 2: Process new events in batches
for (idx, batch) in new_events.chunks(500).enumerate() {
logger::log_info(&format!("Processing batch {} ({} events)", idx + 1, batch.len())).await;
let batch_result = process_batch(batch, &mut final_events, today);
all_changes.extend(batch_result.changes);
tokio::task::yield_now().await;
}
let events_vec: Vec<CompanyEvent> = final_events.into_values().collect();
Ok((all_changes, events_vec))
}
pub fn process_batch( pub fn process_batch(
new_events: &[CompanyEvent], new_events: &[CompanyEvent],
existing: &mut HashMap<String, CompanyEvent>, existing: &mut HashMap<String, CompanyEvent>,
@@ -253,7 +446,6 @@ pub fn process_batch(
continue; continue;
} }
// Check for time change on same date
let date_key = format!("{}|{}", new.ticker, new.date); let date_key = format!("{}|{}", new.ticker, new.date);
let mut found_old = None; let mut found_old = None;
for (k, e) in existing.iter() { for (k, e) in existing.iter() {

306
src/corporate/yahoo.rs Normal file
View File

@@ -0,0 +1,306 @@
// src/corporate/yahoo.rs
use super::{types::*, helpers::*};
use crate::{scraper::webdriver::*, util::{directories::DataPaths}};
use event_backtest_engine::logger;
use fantoccini::{Client, Locator};
use serde::{Deserialize, Serialize};
use tokio::{time::{Duration as TokioDuration, sleep}};
use std::{sync::Arc};
use anyhow::{anyhow, Result};
const YAHOO_COMPANY_EXTRACTION_JS: &str = include_str!("yahoo_company_extraction.js");
/// Mapping existing
/// getting historical stock price data daily (xxxx - 2025) and hourly (last 30 days)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum YahooTickerResult {
Found(String),
NotFound,
NoResults,
AmbiguousResults,
}
#[derive(Debug, Deserialize)]
pub struct ExtractionResult {
status: String,
ticker: Option<String>,
sector: Option<String>,
exchange: Option<String>,
#[serde(default)]
error_message: Option<String>,
}
impl YahooTickerResult {
pub fn to_tagged_string(&self) -> String {
match self {
YahooTickerResult::Found(ticker) => format!("YAHOO:{}", ticker),
YahooTickerResult::NotFound => "YAHOO:NOT_FOUND".to_string(),
YahooTickerResult::NoResults => "YAHOO:NO_RESULTS".to_string(),
YahooTickerResult::AmbiguousResults => "YAHOO:AMBIGUOUS".to_string(),
}
}
pub fn is_found(&self) -> bool {
matches!(self, YahooTickerResult::Found(_))
}
pub fn get_ticker(&self) -> Option<&str> {
match self {
YahooTickerResult::Found(ticker) => Some(ticker),
_ => None,
}
}
}
pub async fn scrape_company_details_by_isin(
pool: &Arc<ChromeDriverPool>,
isin: &str,
) -> anyhow::Result<Option<YahooCompanyDetails>> {
let isin = isin.to_string();
pool.execute(format!("https://finance.yahoo.com/lookup/?s={}", isin), move |client| {
let isin = isin.clone();
Box::pin(async move {
sleep(TokioDuration::from_millis(1000)).await;
reject_yahoo_cookies(&client).await?;
sleep(TokioDuration::from_millis(1000)).await;
extract_company_details(&client, &isin).await
})
}).await
}
pub async fn extract_company_details(
client: &Client,
_isin: &str,
) -> Result<Option<YahooCompanyDetails>> {
// Execute the JavaScript extraction script
let result = client.execute(YAHOO_COMPANY_EXTRACTION_JS, vec![]).await?;
// Parse the JSON result
let extraction: ExtractionResult = serde_json::from_value(result)
.map_err(|e| anyhow!("Failed to parse extraction result: {}", e))?;
match extraction.status.as_str() {
"found" => {
if let Some(ticker) = extraction.ticker {
Ok(Some(YahooCompanyDetails {
ticker,
sector: extraction.sector,
exchange: extraction.exchange,
}))
} else {
Ok(None)
}
},
"no_results" => Ok(None),
"not_found" => Ok(None),
"error" => {
let error_msg = extraction.error_message.unwrap_or_else(|| "Unknown error".to_string());
Err(anyhow!("JavaScript extraction error: {}", error_msg))
},
_ => Ok(None),
}
}
pub async fn get_all_tickers_from_companies_jsonl(paths: &DataPaths) -> anyhow::Result<Vec<String>> {
let corporate_path = paths.data_dir().join("corporate").join("by_name");
let companies_file = corporate_path.join("companies.jsonl");
let content = tokio::fs::read_to_string(companies_file).await?;
let mut tickers = Vec::new();
for line in content.lines() {
let company: CompanyCrossPlatformInfo = serde_json::from_str(line)?;
for (_isin, ticker_vec) in company.isin_tickers_map {
tickers.extend(ticker_vec);
}
}
Ok(tickers)
}
/// Fetches earnings events for a ticker using a dedicated ScrapeTask.
///
/// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar,
/// reject cookies, and extract the events.
///
/// # Arguments
/// * `ticker` - The stock ticker symbol.
///
/// # Returns
/// A vector of CompanyEvent structs on success.
///
/// # Errors
/// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues.
pub async fn fetch_earnings_with_pool(
pool: &Arc<ChromeDriverPool>,
ticker: &str,
) -> anyhow::Result<Vec<CompanyEvent>> {
let ticker = ticker.to_string();
let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}", ticker);
let ticker_cloned = ticker.clone();
pool.execute(url, move |client| {
let ticker = ticker_cloned.clone();
Box::pin(async move {
reject_yahoo_cookies(&client).await?;
extract_earnings_events(&client, &ticker).await
})
}).await
}
/// Extracts earnings events from the currently loaded Yahoo Finance earnings calendar page.
///
/// This function assumes the client is already navigated to the correct URL (e.g.,
/// https://finance.yahoo.com/calendar/earnings?symbol={ticker}) and cookies are handled.
///
/// It waits for the earnings table, extracts rows, parses cells into CompanyEvent structs,
/// and handles date parsing, float parsing, and optional fields.
///
/// # Arguments
/// * `client` - The fantoccini Client with the page loaded.
/// * `ticker` - The stock ticker symbol for the events.
///
/// # Returns
/// A vector of CompanyEvent on success.
///
/// # Errors
/// Returns an error if:
/// - Table or elements not found.
/// - Date or float parsing fails.
/// - WebDriver operations fail.
///
/// # Examples
///
/// ```no_run
/// use fantoccini::Client;
/// use crate::corporate::scraper::extract_earnings;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // Assume client is set up and navigated
/// let events = extract_earnings(&client, "AAPL").await?;
/// Ok(())
/// }
/// ```
pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result<Vec<CompanyEvent>> {
// Wait for the table to load
let table = client
.wait()
.for_element(Locator::Css(r#"table[data-test="cal-table"]"#))
.await
.map_err(|e| anyhow!("Failed to find earnings table: {}", e))?;
// Find all rows in tbody
let rows = table
.find_all(Locator::Css("tbody tr"))
.await
.map_err(|e| anyhow!("Failed to find table rows: {}", e))?;
let mut events = Vec::with_capacity(rows.len());
for row in rows {
let cells = row
.find_all(Locator::Css("td"))
.await
.map_err(|e| anyhow!("Failed to find cells in row: {}", e))?;
if cells.len() < 5 {
continue; // Skip incomplete rows
}
// Extract and parse date
let date_str = cells[0]
.text()
.await
.map_err(|e| anyhow!("Failed to get date text: {}", e))?;
let date = parse_yahoo_date(&date_str)
.map_err(|e| anyhow!("Failed to parse date '{}': {}", date_str, e))?
.format("%Y-%m-%d")
.to_string();
// Extract time, replace "Time Not Supplied" with empty
let time = cells[1]
.text()
.await
.map_err(|e| anyhow!("Failed to get time text: {}", e))?
.replace("Time Not Supplied", "");
// Extract period
let period = cells[2]
.text()
.await
.map_err(|e| anyhow!("Failed to get period text: {}", e))?;
// Parse EPS forecast
let eps_forecast_str = cells[3]
.text()
.await
.map_err(|e| anyhow!("Failed to get EPS forecast text: {}", e))?;
let eps_forecast = parse_float(&eps_forecast_str);
// Parse EPS actual
let eps_actual_str = cells[4]
.text()
.await
.map_err(|e| anyhow!("Failed to get EPS actual text: {}", e))?;
let eps_actual = parse_float(&eps_actual_str);
// Parse surprise % if available
let surprise_pct = if cells.len() > 5 {
let surprise_str = cells[5]
.text()
.await
.map_err(|e| anyhow!("Failed to get surprise text: {}", e))?;
parse_float(&surprise_str)
} else {
None
};
events.push(CompanyEvent {
ticker: ticker.to_string(),
date,
time,
period,
eps_forecast,
eps_actual,
revenue_forecast: None,
revenue_actual: None,
surprise_pct,
source: "Yahoo".to_string(),
});
}
if events.is_empty() {
logger::log_warn(&format!("Warning: No earnings events extracted for ticker {}", ticker)).await;
} else {
logger::log_info(&format!("Extracted {} earnings events for {}", events.len(), ticker)).await;
}
Ok(events)
}
/// Rejecting Yahoo Cookies
async fn reject_yahoo_cookies(client: &Client) -> anyhow::Result<()> {
for _ in 0..10 {
let clicked: bool = client
.execute(
r#"(() => {
const btn = document.querySelector('#consent-page .reject-all');
if (btn) {
btn.click();
return true;
}
return false;
})()"#,
vec![],
)
.await?
.as_bool()
.unwrap_or(false);
if clicked { break; }
sleep(TokioDuration::from_millis(500)).await;
}
logger::log_info("Rejected Yahoo cookies if button existed").await;
Ok(())
}

View File

@@ -0,0 +1,67 @@
// yahoo_company_extraction.js
// JavaScript extraction script for Yahoo Finance company details
// Used to extract ticker, sector, and exchange from Yahoo Finance search results
(function() {
try {
// Check for "No results found" message
const noDataElement = document.querySelector('.noData');
if (noDataElement) {
return { status: 'no_results', ticker: null, sector: null, exchange: null };
}
// Find the results table
const table = document.querySelector('table.markets-table');
if (!table) {
return { status: 'no_results', ticker: null, sector: null, exchange: null };
}
// Find the first row in tbody
const firstRow = table.querySelector('tbody tr');
if (!firstRow) {
return { status: 'no_results', ticker: null, sector: null, exchange: null };
}
// Extract ticker from first column (td:nth-child(1))
const tickerCell = firstRow.querySelector('td:nth-child(1)');
const ticker = tickerCell ? tickerCell.textContent.trim() : '';
if (!ticker) {
return { status: 'not_found', ticker: null, sector: null, exchange: null };
}
// Extract sector from column 4 (td:nth-child(4) > span > div > a)
const sectorCell = firstRow.querySelector('td:nth-child(4) span div a');
let sector = sectorCell ? sectorCell.textContent.trim() : '';
// Normalize empty/invalid values to null
if (!sector || sector === '-' || sector === 'N/A') {
sector = null;
}
// Extract exchange from column 6 (td:nth-child(6) > span)
const exchangeCell = firstRow.querySelector('td:nth-child(6) span');
let exchange = exchangeCell ? exchangeCell.textContent.trim() : '';
// Normalize empty/invalid values to null
if (!exchange || exchange === '-' || exchange === 'N/A') {
exchange = null;
}
return {
status: 'found',
ticker: ticker,
sector: sector,
exchange: exchange
};
} catch (error) {
return {
status: 'error',
error_message: error.toString(),
ticker: null,
sector: null,
exchange: null
};
}
})();

View File

@@ -1,5 +1,6 @@
// src/economic/scraper.rs // src/economic/scraper.rs
use super::types::{EconomicEvent}; use super::types::{EconomicEvent};
use event_backtest_engine::logger;
use fantoccini::Client; use fantoccini::Client;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
@@ -49,6 +50,6 @@ pub async fn extract_events(client: &Client) -> anyhow::Result<Vec<EconomicEvent
}); });
} }
} }
println!("Extracted {} high-impact events", events.len()); logger::log_info(&format!("Extracted {} high-impact events", events.len())).await;
Ok(events) Ok(events)
} }

View File

@@ -116,7 +116,7 @@ pub async fn build_event_index(chunks: &[ChunkInfo]) -> anyhow::Result<Vec<Event
Ok(index) Ok(index)
} }
/// NEW: Look up a specific event by loading only its chunk /// Look up a specific event by loading only its chunk
pub async fn lookup_event_by_key(key: &str, index: &[EventIndex]) -> anyhow::Result<Option<EconomicEvent>> { pub async fn lookup_event_by_key(key: &str, index: &[EventIndex]) -> anyhow::Result<Option<EconomicEvent>> {
// Find which chunk contains this event // Find which chunk contains this event
let entry = index.iter().find(|e| e.key == key); let entry = index.iter().find(|e| e.key == key);

View File

@@ -13,23 +13,17 @@ use scraper::webdriver::ChromeDriverPool;
use util::directories::DataPaths; use util::directories::DataPaths;
use util::{logger, opnv}; use util::{logger, opnv};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
/// Application entry point
// src/main.rs
// ... existing imports ...
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
cleanup_all_proxy_containers().await.ok(); cleanup_all_proxy_containers().await.ok();
// Load configuration from .env
let config = Config::load().map_err(|err| { let config = Config::load().map_err(|err| {
eprintln!("Failed to load config: {}", err); eprintln!("Failed to load config: {}", err);
err err
})?; })?;
// Initialize paths and logger
let paths = DataPaths::new(".")?; let paths = DataPaths::new(".")?;
logger::init_debug_logger(paths.logs_dir()).await.ok(); logger::init_debug_logger(paths.logs_dir()).await.ok();
logger::log_info("=== Event Backtest Engine Started ===").await; logger::log_info("=== Event Backtest Engine Started ===").await;
@@ -40,61 +34,36 @@ async fn main() -> Result<()> {
config.enable_vpn_rotation config.enable_vpn_rotation
)).await; )).await;
// === Step 1: Fetch fresh VPNBook credentials and .ovpn files (if rotation enabled) === // Simple shutdown flag
let shutdown_flag = Arc::new(AtomicBool::new(false));
// === Step 1: Fetch VPNBook configs ===
let proxy_pool: Option<Arc<DockerVpnProxyPool>> = if config.enable_vpn_rotation { let proxy_pool: Option<Arc<DockerVpnProxyPool>> = if config.enable_vpn_rotation {
logger::log_info("VPN Rotation Enabled Fetching latest VPNBook configs").await; logger::log_info("VPN Rotation Enabled Fetching latest VPNBook configs").await;
let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit(config.max_parallel_instances, None, config.max_tasks_per_instance).await?);
// We only need 1 Chrome instance to scrape vpnbook.com (no proxy yet)
let temp_pool = Arc::new(ChromeDriverPool::new(1).await?);
let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?; let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?;
logger::log_info(&format!("VPNBook credentials → User: {}", username)).await; logger::log_info(&format!("VPNBook credentials → User: {}", username)).await;
// Count how many distinct servers (subfolders) we have in cache/openvpn/
let server_count = std::fs::read_dir(paths.cache_openvpn_dir())? let server_count = std::fs::read_dir(paths.cache_openvpn_dir())?
.filter(|e| e.as_ref().unwrap().path().is_dir()) .filter(|e| e.as_ref().unwrap().path().is_dir())
.count(); .count();
if server_count == 0 { if server_count == 0 {
logger::log_warn("No VPN servers found continuing without VPN").await; logger::log_warn("No VPN servers found continuing without VPN").await;
None None
} else { } else {
logger::log_info(&format!("Found {} VPN servers starting Docker proxy containers", server_count)).await; logger::log_info(&format!("Found {} VPN servers starting Docker proxy containers", server_count)).await;
let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await?);
let pp = Arc::new(
DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await?
);
// Verify all proxies are working before proceeding
logger::log_info("Verifying all proxy connections...").await;
let mut all_working = true;
for i in 0..pp.num_proxies() {
match pp.test_proxy_connection(i).await {
Ok(ip) => {
logger::log_info(&format!(" Proxy {}: working with IP: {}", i + 1, ip)).await;
}
Err(e) => {
logger::log_error(&format!(" Proxy {}: FAILED - {}", i + 1, e)).await;
all_working = false;
}
}
}
if !all_working {
logger::log_warn("Some proxies failed, but continuing with working ones...").await;
} else {
logger::log_info("All proxies verified and ready!").await;
}
logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await; logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await;
Some(pp) Some(pp)
} }
} else { } else {
logger::log_info("VPN rotation disabled using direct connection").await; logger::log_info("VPN rotation disabled using direct connection").await;
None None
}; };
// === Step 2: Initialize the main ChromeDriver pool (with proxy if enabled) === // === Step 2: Initialize ChromeDriver pool ===
let pool_size = config.max_parallel_instances; let pool_size = config.max_parallel_instances;
let task_limit = config.max_tasks_per_instance; let task_limit = config.max_tasks_per_instance;
@@ -110,17 +79,23 @@ async fn main() -> Result<()> {
logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size)).await; logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size)).await;
// === Step 3: Graceful Ctrl+C handler === // === Step 3: Ctrl+C handler ===
{ {
let shutdown_flag_clone = Arc::clone(&shutdown_flag);
let pool_clone = Arc::clone(&pool); let pool_clone = Arc::clone(&pool);
let proxy_clone = proxy_pool.clone(); let proxy_clone = proxy_pool.clone();
tokio::spawn(async move { tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok(); tokio::signal::ctrl_c().await.ok();
logger::log_info("Ctrl+C received shutting down gracefully...").await;
logger::log_info("Ctrl+C received — shutting down gracefully...").await; // Set flag first
shutdown_flag_clone.store(true, Ordering::SeqCst);
// Now works: &*pool_clone derefs Arc → &ChromeDriverPool // Wait a bit for tasks to notice
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// Cleanup
if let Err(e) = (&*pool_clone).shutdown().await { if let Err(e) = (&*pool_clone).shutdown().await {
logger::log_error(&format!("Error during pool shutdown: {}", e)).await; logger::log_error(&format!("Error during pool shutdown: {}", e)).await;
} }
@@ -134,36 +109,34 @@ async fn main() -> Result<()> {
} }
let _ = cleanup_all_proxy_containers().await; let _ = cleanup_all_proxy_containers().await;
std::process::exit(0); std::process::exit(0);
}); });
} }
// === Step 4: Run the actual scraping jobs === // === Step 4: Run scraping jobs ===
logger::log_info("--- Starting ECONOMIC data update ---").await; logger::log_info("--- Starting ECONOMIC data update ---").await;
economic::run_full_update(&config, &pool).await?; economic::run_full_update(&config, &pool).await?;
logger::log_info("Economic update completed").await; logger::log_info("Economic update completed").await;
logger::log_info("--- Starting CORPORATE data update ---").await; if !shutdown_flag.load(Ordering::SeqCst) {
corporate::run_full_update(&config, &pool).await?; logger::log_info("--- Starting CORPORATE data update ---").await;
logger::log_info("Corporate update completed").await; corporate::run_full_update(&config, &pool, &shutdown_flag).await?;
logger::log_info("Corporate update completed").await;
// === Step 5: Final cleanup === }
logger::log_info("Shutting down ChromeDriver pool...").await;
pool.shutdown().await?; // === Step 5: Final cleanup ===
if !shutdown_flag.load(Ordering::SeqCst) {
if let Some(pp) = proxy_pool { logger::log_info("Shutting down ChromeDriver pool...").await;
logger::log_info("Stopping Docker VPN proxy containers...").await; pool.shutdown().await?;
pp.shutdown().await?;
// CLEANUP ANY LEFTOVER CONTAINERS FROM PREVIOUS RUNS if let Some(pp) = proxy_pool {
cleanup_all_proxy_containers().await.ok(); logger::log_info("Stopping Docker VPN proxy containers...").await;
pp.shutdown().await?;
cleanup_all_proxy_containers().await.ok();
}
logger::log_info("=== Application finished successfully ===").await;
} }
logger::log_info("=== Application finished successfully ===").await;
Ok(()) Ok(())
} }
/*
memory allocation of 4294967296 bytes failed
error: process didn't exit successfully: `target\debug\event_backtest_engine.exe` (exit code: 0xc0000409, STATUS_STACK_BUFFER_OVERRUN)
*/

View File

@@ -106,10 +106,18 @@ impl DockerVpnProxyPool {
working_ports.push(port); working_ports.push(port);
} }
Ok(None) => { Ok(None) => {
crate::util::logger::log_warn(&format!("✓ Container {} on port {} ready but IP detection failed", let logs = Command::new("docker")
container_name, port)).await; .args(["logs", "--tail", "20", &container_name])
working_containers.push(container_name); .output()
working_ports.push(port); .await
.ok()
.and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into());
crate::util::logger::log_error(&format!("✗ Container {} on port {} ready but IP detection failed. Logs: {:?}",
container_name, port, logs)).await;
failed_count += 1;
// Clean up failed container
let _ = Self::cleanup_container(&container_name).await;
} }
Err(e) => { Err(e) => {
// Get container logs to debug // Get container logs to debug
@@ -309,25 +317,6 @@ impl DockerVpnProxyPool {
true true
} }
/// Test if a specific proxy is working
pub async fn test_proxy_connection(&self, index: usize) -> Result<String> {
let port = self.proxy_ports[index];
let proxy_url = format!("socks5://localhost:{}", port);
let client = reqwest::Client::builder()
.proxy(reqwest::Proxy::all(&proxy_url)?)
.timeout(Duration::from_secs(10))
.build()?;
let response = client.get("http://checkip.amazonaws.com")
.send()
.await?
.text()
.await?;
Ok(response.trim().to_string())
}
pub fn get_proxy_url(&self, index: usize) -> String { pub fn get_proxy_url(&self, index: usize) -> String {
let port = self.proxy_ports[index % self.proxy_ports.len()]; let port = self.proxy_ports[index % self.proxy_ports.len()];
format!("socks5://localhost:{}", port) format!("socks5://localhost:{}", port)

View File

@@ -19,11 +19,15 @@ pub struct ChromeDriverPool {
semaphore: Arc<Semaphore>, semaphore: Arc<Semaphore>,
/// Optional Docker-based proxy pool (one proxy per Chrome instance) /// Optional Docker-based proxy pool (one proxy per Chrome instance)
proxy_pool: Option<Arc<DockerVpnProxyPool>>, proxy_pool: Option<Arc<DockerVpnProxyPool>>,
/// Whether rotation is enabled (uses half of instances at a time)
rotation_enabled: bool,
/// Index for round-robin instance selection (when rotation is enabled)
next_instance: Arc<Mutex<usize>>,
} }
impl ChromeDriverPool { impl ChromeDriverPool {
/// Creates a new pool without any proxy (direct connection). /// Creates a new pool without any proxy (direct connection).
pub async fn new(pool_size: usize) -> Result<Self> { pub async fn _new(pool_size: usize) -> Result<Self> {
Self::new_with_proxy_and_task_limit(pool_size, None, 0).await Self::new_with_proxy_and_task_limit(pool_size, None, 0).await
} }
@@ -40,22 +44,53 @@ impl ChromeDriverPool {
Self::new_with_proxy_and_task_limit(pool_size, proxy_pool, 0).await Self::new_with_proxy_and_task_limit(pool_size, proxy_pool, 0).await
} }
/// Full constructor: supports proxy + task limiting. /// Full constructor: supports proxy + task limiting + rotation.
///
/// When rotation is enabled, only half of the instances are used at once,
/// rotating to the other half when task limits are reached.
///
/// The actual pool_size is constrained by:
/// - max_parallel_instances from config (pool_size_limit parameter)
/// - Available proxies from proxy_pool (if provided)
///
/// Uses the minimum of these constraints to determine actual pool size.
pub async fn new_with_proxy_and_task_limit( pub async fn new_with_proxy_and_task_limit(
pool_size: usize, pool_size_limit: usize,
proxy_pool: Option<Arc<DockerVpnProxyPool>>, proxy_pool: Option<Arc<DockerVpnProxyPool>>,
max_tasks_per_instance: usize, max_tasks_per_instance: usize,
) -> Result<Self> { ) -> Result<Self> {
let mut instances = Vec::with_capacity(pool_size); // Determine actual pool size based on available resources
let actual_pool_size = if let Some(ref pp) = proxy_pool {
let available_proxies = pp.num_proxies();
pool_size_limit.min(available_proxies)
} else {
pool_size_limit
};
if actual_pool_size == 0 {
return Err(anyhow!("Pool size must be at least 1"));
}
// Rotation is enabled when task limiting is active
let rotation_enabled = max_tasks_per_instance > 0;
let mut instances = Vec::with_capacity(actual_pool_size);
crate::util::logger::log_info(&format!( crate::util::logger::log_info(&format!(
"Initializing ChromeDriver pool with {} instances{}...", "Initializing ChromeDriver pool with {} instances{}{}...",
pool_size, actual_pool_size,
if proxy_pool.is_some() { " (each using a unique Docker SOCKS5 proxy)" } else { "" } if proxy_pool.is_some() { " (each using a unique Docker SOCKS5 proxy)" } else { "" },
if rotation_enabled { " with rotation enabled" } else { "" }
)) ))
.await; .await;
for i in 0..pool_size { if rotation_enabled && actual_pool_size < 2 {
crate::util::logger::log_warn(
"Rotation enabled but pool has < 2 instances - rotation will be limited"
).await;
}
for i in 0..actual_pool_size {
let proxy_url = proxy_pool let proxy_url = proxy_pool
.as_ref() .as_ref()
.map(|pp| pp.get_proxy_url(i)); .map(|pp| pp.get_proxy_url(i));
@@ -68,12 +103,22 @@ impl ChromeDriverPool {
Ok(Self { Ok(Self {
instances, instances,
semaphore: Arc::new(Semaphore::new(pool_size)), semaphore: Arc::new(Semaphore::new(actual_pool_size)),
proxy_pool, proxy_pool,
rotation_enabled,
next_instance: Arc::new(Mutex::new(0)),
}) })
} }
/// Execute a scraping task using an available instance from the pool. /// Execute a scraping task using an available instance from the pool.
///
/// When rotation is enabled:
/// - Uses only half of the instances at a time
/// - Rotates to the other half when an instance reaches its task limit
/// - Cycles through instances in round-robin fashion within the active half
///
/// When rotation is disabled:
/// - Uses all instances with random selection
pub async fn execute<T, F, Fut>(&self, url: String, parse: F) -> Result<T> pub async fn execute<T, F, Fut>(&self, url: String, parse: F) -> Result<T>
where where
T: Send + 'static, T: Send + 'static,
@@ -82,8 +127,81 @@ impl ChromeDriverPool {
{ {
let _permit = self.semaphore.acquire().await.map_err(|_| anyhow!("Pool closed"))?; let _permit = self.semaphore.acquire().await.map_err(|_| anyhow!("Pool closed"))?;
// Round-robin selection let index = if self.rotation_enabled {
let index = rand::random_range(..self.instances.len()); // Rotation mode: use only half of instances at a time
let total_instances = self.instances.len();
let half_size = (total_instances + 1) / 2; // Round up for odd numbers
let mut next_idx = self.next_instance.lock().await;
let base_idx = *next_idx;
let mut selected_idx = base_idx;
let mut found_in_current_half = false;
// Try to find an available instance in the current half
for offset in 0..half_size {
let candidate_idx = (base_idx + offset) % half_size;
// Check if this instance has reached its task limit
let instance = &self.instances[candidate_idx];
let guard = instance.lock().await;
if guard.max_tasks_per_instance == 0 ||
guard.task_count < guard.max_tasks_per_instance {
// This instance is available
*next_idx = (candidate_idx + 1) % half_size;
selected_idx = candidate_idx;
found_in_current_half = true;
drop(guard);
break;
} else {
drop(guard);
}
}
if !found_in_current_half {
// All instances in current half are at limit, switch to other half
crate::util::logger::log_info(
"Current half saturated, rotating to other half of instances"
).await;
let other_half_start = half_size;
let other_half_size = total_instances - half_size;
// Find available instance in other half
let mut found_in_other_half = false;
for offset in 0..other_half_size {
let candidate_idx = other_half_start + offset;
let instance = &self.instances[candidate_idx];
let guard = instance.lock().await;
if guard.max_tasks_per_instance == 0 ||
guard.task_count < guard.max_tasks_per_instance {
// Switch to this half for future requests
*next_idx = offset;
selected_idx = candidate_idx;
found_in_other_half = true;
drop(guard);
break;
} else {
drop(guard);
}
}
if !found_in_other_half {
// All instances saturated - use round-robin anyway
selected_idx = *next_idx % total_instances;
*next_idx = (*next_idx + 1) % total_instances;
}
}
drop(next_idx);
selected_idx
} else {
// Non-rotation mode: random selection as before
rand::random_range(..self.instances.len())
};
let instance = self.instances[index].clone(); let instance = self.instances[index].clone();
let mut guard = instance.lock().await; let mut guard = instance.lock().await;
@@ -91,7 +209,8 @@ impl ChromeDriverPool {
if guard.max_tasks_per_instance > 0 { if guard.max_tasks_per_instance > 0 {
crate::util::logger::log_info(&format!( crate::util::logger::log_info(&format!(
"Instance task count: {}/{}", "Instance {} task count: {}/{}",
index,
guard.get_task_count(), guard.get_task_count(),
guard.max_tasks_per_instance guard.max_tasks_per_instance
)) ))
@@ -130,6 +249,20 @@ impl ChromeDriverPool {
pub fn get_number_of_instances(&self) -> usize { pub fn get_number_of_instances(&self) -> usize {
self.instances.len() self.instances.len()
} }
/// Returns whether rotation is enabled
pub fn is_rotation_enabled(&self) -> bool {
self.rotation_enabled
}
/// Returns the size of each half when rotation is enabled
pub fn get_rotation_half_size(&self) -> usize {
if self.rotation_enabled {
(self.instances.len() + 1) / 2
} else {
self.instances.len()
}
}
} }
/// Represents a single instance of chromedriver process, optionally bound to a VPN. /// Represents a single instance of chromedriver process, optionally bound to a VPN.

View File

@@ -1,379 +0,0 @@
// tests/vpn_integration_tests.rs
//! Integration tests for VPN rotation system
#[cfg(test)]
mod vpn_tests {
use event_backtest_engine::{
scraper::{
webdriver::ChromeDriverPool,
vpn_manager::{VpnInstance, VpnPool},
},
util::{directories::DataPaths, opnv},
};
use std::path::PathBuf;
use std::sync::Arc;
/// Helper to create a test VPN instance without connecting
fn create_test_vpn_instance() -> VpnInstance {
VpnInstance::new(
PathBuf::from("test.ovpn"),
"testuser".to_string(),
"testpass".to_string(),
)
.expect("Failed to create test VPN instance")
}
#[test]
fn test_vpn_instance_creation() {
let vpn = create_test_vpn_instance();
assert_eq!(vpn.hostname(), "test");
assert!(!vpn.is_healthy());
assert!(vpn.external_ip().is_none());
}
#[test]
fn test_vpn_task_counting() {
let mut vpn = create_test_vpn_instance();
// Should not rotate initially
assert!(!vpn.increment_task_count(10));
// Increment tasks
for i in 1..10 {
assert!(!vpn.increment_task_count(10), "Should not rotate at task {}", i);
}
// Should rotate at threshold
assert!(vpn.increment_task_count(10), "Should rotate at task 10");
// Reset and verify
vpn.reset_task_count();
assert!(!vpn.increment_task_count(10), "Should not rotate after reset");
}
#[test]
fn test_vpn_task_counting_zero_threshold() {
let mut vpn = create_test_vpn_instance();
// With threshold=0, should never auto-rotate
for _ in 0..100 {
assert!(!vpn.increment_task_count(0));
}
}
#[tokio::test]
async fn test_chromedriver_pool_creation_no_vpn() {
let result = ChromeDriverPool::new(2).await;
match result {
Ok(pool) => {
assert_eq!(pool.get_number_of_instances(), 2);
assert!(!pool.is_vpn_enabled());
}
Err(e) => {
eprintln!("ChromeDriver pool creation failed (expected if chromedriver not installed): {}", e);
}
}
}
#[test]
fn test_data_paths_creation() {
let paths = DataPaths::new("./test_data").expect("Failed to create paths");
assert!(paths.data_dir().exists());
assert!(paths.cache_dir().exists());
assert!(paths.logs_dir().exists());
assert!(paths.cache_openvpn_dir().exists());
// Cleanup
let _ = std::fs::remove_dir_all("./test_data");
}
#[tokio::test]
#[ignore] // This test requires actual network access and VPNBook availability
async fn test_fetch_vpnbook_configs() {
let paths = DataPaths::new(".").expect("Failed to create paths");
// This test requires a ChromeDriver pool
let pool_result = ChromeDriverPool::new(1).await;
if pool_result.is_err() {
eprintln!("Skipping VPNBook fetch test: ChromeDriver not available");
return;
}
let pool = Arc::new(pool_result.unwrap());
let result = opnv::fetch_vpnbook_configs(&pool, paths.cache_dir()).await;
match result {
Ok((username, password, files)) => {
assert!(!username.is_empty(), "Username should not be empty");
assert!(!password.is_empty(), "Password should not be empty");
assert!(!files.is_empty(), "Should fetch at least one config file");
println!("Fetched {} VPN configs", files.len());
for file in &files {
assert!(file.exists(), "Config file should exist: {:?}", file);
assert_eq!(file.extension().and_then(|s| s.to_str()), Some("ovpn"));
}
}
Err(e) => {
eprintln!("VPNBook fetch failed (may be temporary): {}", e);
}
}
}
#[tokio::test]
#[ignore] // Requires actual VPN configs and OpenVPN installation
async fn test_vpn_pool_creation() {
let paths = DataPaths::new(".").expect("Failed to create paths");
// First fetch configs
let pool_result = ChromeDriverPool::new(1).await;
if pool_result.is_err() {
eprintln!("Skipping VPN pool test: ChromeDriver not available");
return;
}
let temp_pool = Arc::new(pool_result.unwrap());
let fetch_result = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await;
if fetch_result.is_err() {
eprintln!("Skipping VPN pool test: Could not fetch configs");
return;
}
let (username, password, _) = fetch_result.unwrap();
// Create VPN pool
let vpn_pool_result = VpnPool::new(
paths.cache_openvpn_dir(),
username,
password,
false,
0,
).await;
match vpn_pool_result {
Ok(vpn_pool) => {
assert!(vpn_pool.len() > 0, "VPN pool should have at least one instance");
println!("Created VPN pool with {} instances", vpn_pool.len());
}
Err(e) => {
eprintln!("VPN pool creation failed: {}", e);
}
}
}
#[tokio::test]
#[ignore] // Full integration test - requires all components
async fn test_full_vpn_integration() {
let paths = DataPaths::new(".").expect("Failed to create paths");
// Step 1: Create temp ChromeDriver pool for fetching
let temp_pool = match ChromeDriverPool::new(1).await {
Ok(p) => Arc::new(p),
Err(e) => {
eprintln!("Skipping integration test: ChromeDriver not available - {}", e);
return;
}
};
// Step 2: Fetch VPNBook configs
let (username, password, files) = match opnv::fetch_vpnbook_configs(
&temp_pool,
paths.cache_dir()
).await {
Ok(result) => result,
Err(e) => {
eprintln!("Skipping integration test: Config fetch failed - {}", e);
return;
}
};
assert!(!files.is_empty(), "Should have fetched configs");
// Step 3: Create VPN pool
let vpn_pool = match VpnPool::new(
paths.cache_openvpn_dir(),
username,
password,
true,
5,
).await {
Ok(pool) => Arc::new(pool),
Err(e) => {
eprintln!("Skipping integration test: VPN pool creation failed - {}", e);
return;
}
};
// Step 4: Connect one VPN
let vpn_instance = vpn_pool.acquire().await.expect("Failed to acquire VPN");
let connect_result = {
let mut vpn = vpn_instance.lock().await;
vpn.connect().await
};
match connect_result {
Ok(_) => {
let vpn = vpn_instance.lock().await;
println!("✓ VPN connected: {} ({})",
vpn.hostname(),
vpn.external_ip().unwrap_or("unknown")
);
assert!(vpn.is_healthy());
assert!(vpn.external_ip().is_some());
}
Err(e) => {
eprintln!("VPN connection failed: {}", e);
}
}
// Step 5: Create ChromeDriver pool with VPN
let driver_pool_result = ChromeDriverPool::new_with_vpn(
1,
Some(vpn_pool.clone())
).await;
match driver_pool_result {
Ok(driver_pool) => {
assert!(driver_pool.is_vpn_enabled());
println!("✓ ChromeDriver pool created with VPN binding");
}
Err(e) => {
eprintln!("ChromeDriver pool creation failed: {}", e);
}
}
// Step 6: Cleanup
vpn_pool.disconnect_all().await.expect("Failed to disconnect VPNs");
println!("✓ Integration test complete");
}
#[test]
fn test_hostname_extraction() {
// Test the hostname extraction logic
let test_cases = vec![
("test/ca149.vpnbook.com/config.ovpn", "ca149.vpnbook.com"),
("test/us1.vpnbook.com/config.ovpn", "us1.vpnbook.com"),
("test/de4.vpnbook.com/config.ovpn", "de4.vpnbook.com"),
];
for (path, expected_hostname) in test_cases {
let pb = PathBuf::from(path);
let hostname = pb.parent()
.and_then(|p| p.file_name())
.and_then(|n| n.to_str())
.unwrap_or("unknown");
assert_eq!(hostname, expected_hostname);
}
}
#[cfg(target_os = "windows")]
#[test]
fn test_forcebindip_manager_creation() {
use event_backtest_engine::ForceBindIpManager;
match ForceBindIpManager::new() {
Ok(manager) => {
println!("✓ ForceBindIP found at: {:?}", manager.path());
assert!(manager.path().exists());
}
Err(e) => {
eprintln!("ForceBindIP not found (expected in dev): {}", e);
}
}
}
#[cfg(target_os = "windows")]
#[test]
fn test_forcebindip_command_creation() {
use event_backtest_engine::ForceBindIpManager;
use std::path::Path;
if let Ok(manager) = ForceBindIpManager::new() {
let cmd = manager.create_bound_command(
"192.168.1.100",
Path::new("test.exe"),
&["--arg1", "value1"],
);
let cmd_str = format!("{:?}", cmd);
assert!(cmd_str.contains("192.168.1.100"));
assert!(cmd_str.contains("test.exe"));
println!("✓ ForceBindIP command created successfully");
}
}
#[test]
fn test_config_defaults() {
use event_backtest_engine::Config;
let config = Config::default();
assert_eq!(config.economic_start_date, "2007-02-13");
assert_eq!(config.corporate_start_date, "2010-01-01");
assert_eq!(config.economic_lookahead_months, 3);
assert_eq!(config.max_parallel_instances, 10);
assert!(!config.enable_vpn_rotation);
assert_eq!(config.tasks_per_vpn_session, 0);
}
}
#[cfg(test)]
mod benchmark_tests {
use super::*;
#[tokio::test]
#[ignore] // Performance test
async fn benchmark_vpn_rotation_overhead() {
use std::time::Instant;
// This test measures the overhead of VPN rotation
let start = Instant::now();
// Simulate rotation cycle
// 1. Disconnect (instant)
// 2. Wait 2 seconds
// 3. Connect (5-10 seconds)
// 4. Verify IP (1-2 seconds)
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let elapsed = start.elapsed();
println!("Rotation cycle took: {:?}", elapsed);
// Typical rotation should complete in under 15 seconds
assert!(elapsed.as_secs() < 15);
}
#[tokio::test]
#[ignore] // Performance test
async fn benchmark_parallel_scraping() {
// This test measures throughput with different parallelism levels
// Results help tune MAX_PARALLEL_INSTANCES
let configs = vec![1, 2, 3, 5, 10];
for &pool_size in &configs {
println!("Testing with {} parallel instances...", pool_size);
// Would need actual scraping implementation here
// For now, just verify pool creation time
let start = std::time::Instant::now();
let pool_result = event_backtest_engine::ChromeDriverPool::new(pool_size).await;
if let Ok(_pool) = pool_result {
let elapsed = start.elapsed();
println!(" Pool initialization: {:?}", elapsed);
// Pool creation should be fast (< 5 seconds per instance)
assert!(elapsed.as_secs() < pool_size as u64 * 5);
} else {
eprintln!(" Skipped - ChromeDriver not available");
}
}
}
}