Compare commits

..

5 Commits

25 changed files with 2062 additions and 1057 deletions

31
Cargo.lock generated
View File

@@ -678,6 +678,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
"yfinance-rs",
@@ -2671,6 +2672,15 @@ dependencies = [
"serde_core",
]
[[package]]
name = "serde_spanned"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e24345aa0fe688594e73770a5f6d1b216508b4f93484c0026d521acd30134392"
dependencies = [
"serde_core",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@@ -3116,6 +3126,21 @@ dependencies = [
"tokio",
]
[[package]]
name = "toml"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dc8b1fb61449e27716ec0e1bdf0f6b8f3e8f6b05391e8497b8b6d7804ea6d8"
dependencies = [
"indexmap",
"serde_core",
"serde_spanned",
"toml_datetime",
"toml_parser",
"toml_writer",
"winnow",
]
[[package]]
name = "toml_datetime"
version = "0.7.3"
@@ -3146,6 +3171,12 @@ dependencies = [
"winnow",
]
[[package]]
name = "toml_writer"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2"
[[package]]
name = "tower"
version = "0.5.2"

View File

@@ -34,6 +34,7 @@ rand = "0.9.2"
# Environment handling
dotenvy = "0.15"
toml = "0.9.8"
# Date & time
chrono = { version = "0.4", features = ["serde"] }

View File

@@ -1,46 +0,0 @@
{
"CHF": [
0.808996035919424,
"2025-11-25"
],
"JPY": [
0.0064,
"2025-11-25"
],
"INR": [
89.28571428571429,
"2025-11-25"
],
"GBp": [
0.7603406326034063,
"2025-11-25"
],
"AUD": [
1.5463120457708364,
"2025-11-25"
],
"SAR": [
3.750937734433609,
"2025-11-25"
],
"TWD": [
31.446540880503143,
"2025-11-25"
],
"CNY": [
7.087172218284904,
"2025-11-25"
],
"HKD": [
7.776049766718508,
"2025-11-25"
],
"CAD": [
1.4110342881332016,
"2025-11-25"
],
"EUR": [
0.8649022660439372,
"2025-11-25"
]
}

View File

@@ -1,14 +1,23 @@
// src/config.rs
#[derive(Debug, Clone)]
use anyhow::{Context, Result};
use chrono::{self};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
// Economic calendar start (usually the earliest available on finanzen.net)
pub economic_start_date: String, // e.g. "2007-02-13"
// Corporate earnings & price history start
pub corporate_start_date: String, // e.g. "2000-01-01" or "2010-01-01"
// How far into the future we scrape economic events
pub economic_lookahead_months: u32, // default: 3
/// Maximum number of parallel scraping tasks (default: 10).
/// This limits concurrency to protect system load and prevent website spamming.
#[serde(default = "default_max_parallel")]
pub max_parallel_tasks: usize,
}
fn default_max_parallel() -> usize {
10
}
impl Default for Config {
@@ -17,11 +26,52 @@ impl Default for Config {
economic_start_date: "2007-02-13".to_string(),
corporate_start_date: "2010-01-01".to_string(),
economic_lookahead_months: 3,
max_parallel_tasks: default_max_parallel(),
}
}
}
impl Config {
/// Loads the configuration from environment variables using dotenvy.
///
/// This function loads a `.env` file if present (via `dotenvy::dotenv()`),
/// then retrieves each configuration value from environment variables.
/// If a variable is missing, it falls back to the default value.
/// Variable names are uppercase with underscores (e.g., ECONOMIC_START_DATE).
///
/// # Returns
/// The loaded Config on success.
///
/// # Errors
/// Returns an error if parsing fails (e.g., invalid integer for lookahead months).
pub fn load() -> Result<Self> {
// Load .env file if it exists; ignore if not found (dotenvy::dotenv returns Ok if no file)
let _ = dotenvy::dotenv().context("Failed to load .env file (optional)")?;
let economic_start_date = dotenvy::var("ECONOMIC_START_DATE")
.unwrap_or_else(|_| "2007-02-13".to_string());
let corporate_start_date = dotenvy::var("CORPORATE_START_DATE")
.unwrap_or_else(|_| "2010-01-01".to_string());
let economic_lookahead_months: u32 = dotenvy::var("ECONOMIC_LOOKAHEAD_MONTHS")
.unwrap_or_else(|_| "3".to_string())
.parse()
.context("Failed to parse ECONOMIC_LOOKAHEAD_MONTHS as u32")?;
let max_parallel_tasks: usize = dotenvy::var("MAX_PARALLEL_TASKS")
.unwrap_or_else(|_| "10".to_string())
.parse()
.context("Failed to parse MAX_PARALLEL_TASKS as usize")?;
Ok(Self {
economic_start_date,
corporate_start_date,
economic_lookahead_months,
max_parallel_tasks,
})
}
pub fn target_end_date(&self) -> String {
let now = chrono::Local::now().naive_local().date();
let future = now + chrono::Duration::days(30 * self.economic_lookahead_months as i64);

141
src/corporate/figi.md Normal file
View File

@@ -0,0 +1,141 @@
# OpenFIGI API Summary: Mapping, Search, and Filter Endpoints
This Markdown summary covers the **API Guidelines**, **Request Format**, and **Sample Request -> Sample Response** for the key OpenFIGI endpoints: Mapping, Search, and Filter. Information is based on the official documentation as of December 1, 2025.
## Mapping Endpoint
### API Guidelines
- **Endpoint**: `POST /v3/mapping`
- **Purpose**: Map third-party identifiers (e.g., ISIN, TICKER) to FIGIs (Financial Instrument Global Identifiers).
- **Request Format**: JSON array of objects (mapping jobs). Each job requires `idType` and `idValue`. Optional filters: `exchCode`, `micCode`, `currency`, `marketSecDes`, `securityType`, `securityType2`, `includeUnlistedEquities`, `optionType`, `strike`, `contractSize`, `coupon`, `expiration`, `maturity`, `stateCode`.
- **Key Parameters**:
- `idType` (String, Required): Identifier type (e.g., `ID_BB_GLOBAL`, `TICKER`, `ID_ISIN`).
- `idValue` (String/Number, Required): The identifier value.
- `exchCode` (String, Optional): Exchange code (mutually exclusive with `micCode`).
- `micCode` (String, Optional): Market Identification Code (mutually exclusive with `exchCode`).
- Range parameters (e.g., `strike`, `expiration`): Arrays like `[a, b]` or `[a, null]` for intervals.
- `includeUnlistedEquities` (Boolean, Optional): Defaults to `false`.
- **Limits**:
- Without API key: Max 5 jobs per request.
- With API key: Max 100 jobs per request.
- **Rate Limits**:
- Without API key: 25 requests/minute.
- With API key: 25 requests/6 seconds.
- **Authentication**: Include `X-OPENFIGI-APIKEY` header for higher limits.
### Sample Request
```json
[
{ "idType": "ID_BB_GLOBAL", "idValue": "BBG000BLNNH6" },
{ "idType": "TICKER", "idValue": "IBM", "exchCode": "US" },
{ "idType": "BASE_TICKER", "idValue": "TSLA 10 C100", "securityType2": "Option", "expiration": ["2018-10-01", "2018-12-01"] }
]
```
### Sample Response
```json
[{
"data": [{
"figi": "BBG000BLNNH6",
"securityType": "Common Stock",
"marketSector": "Equity",
"ticker": "IBM",
"name": "INTL BUSINESS MACHINES CORP",
"exchCode": "US",
"shareClassFIGI": "BBG001S5S399",
"compositeFIGI": "BBG000BLNNH6",
"securityType2": "Common Stock",
"securityDescription": "IBM"
}]
}]
```
## Search Endpoint
### API Guidelines
- **Endpoint**: `POST /v3/search`
- **Purpose**: Keyword-based search for FIGIs with optional filters; supports pagination.
- **Request Format**: JSON object with optional `query` (keywords) and filters (same as Mapping). Use `start` for pagination.
- **Key Parameters**:
- `query` (String, Optional): Search keywords (e.g., "ibm").
- `start` (String, Optional): Pagination token from previous `next` field.
- All Mapping filters supported (e.g., `exchCode`, `securityType`, `optionType`).
- **Limits**:
- Max results: 15,000.
- Max per page: 100.
- Max pages: 150.
- **Rate Limits**:
- Without API key: 5 requests/minute.
- With API key: 20 requests/minute.
- **Pagination**: Response includes `next` token; use as `start` in next request.
- **Authentication**: Same as Mapping.
### Sample Request
```json
{
"query": "ibm",
"exchCode": "US"
}
```
### Sample Response
```json
{
"data": [
{
"figi": "BBG000BLNNH6",
"name": "INTL BUSINESS MACHINES CORP",
"ticker": "IBM",
"exchCode": "US",
"compositeFIGI": "BBG000BLNNH6",
"securityType": "Common Stock",
"marketSector": "Equity",
"shareClassFIGI": "BBG001S5S399",
"securityType2": "Common Stock",
"securityDescription": "IBM"
}
],
"next": "QW9JSVFEOFMrQ3hDUWtjd01ERTRTMHhhUXpBPSAx.3AG33VCsv54AsUl5fGHehSytWPuWLJxf0t8VL3YXuJh="
}
```
## Filter Endpoint
### API Guidelines
- **Endpoint**: `POST /v3/filter`
- **Purpose**: Filter-based search for FIGIs (no keywords required); results sorted alphabetically by FIGI, includes total count.
- **Request Format**: JSON object with optional `query` and filters (same as Search/Mapping). Use `start` for pagination.
- **Key Parameters**: Identical to Search (`query`, `start`, and all Mapping filters).
- **Limits**: Same as Search (15,000 max results, 100/page, 150 pages).
- **Rate Limits**: Same as Search (5/min without key, 20/min with key).
- **Pagination**: Same as Search; response includes `total` count.
- **Authentication**: Same as Mapping.
### Sample Request
```json
{
"exchCode": "US"
}
```
### Sample Response
```json
{
"data": [
{
"figi": "BBG000BLNNH6",
"name": "INTL BUSINESS MACHINES CORP",
"ticker": "IBM",
"exchCode": "US",
"compositeFIGI": "BBG000BLNNH6",
"securityType": "Common Stock",
"marketSector": "Equity",
"shareClassFIGI": "BBG001S5S399",
"securityType2": "Common Stock",
"securityDescription": "IBM"
}
],
"next": "QW9JSVFEOFMrQ3hDUWtjd01ERTRTMHhhUXpBPSAx.3AG33VCsv54AsUl5fGHehSytWPuWLJxf0t8VL3YXuJh=",
"total": 29930312
}
```

View File

@@ -8,5 +8,4 @@ pub mod aggregation;
pub mod fx;
pub mod openfigi;
pub use types::*;
pub use update::run_full_update;

File diff suppressed because it is too large Load Diff

View File

@@ -1,28 +1,39 @@
// src/corporate/scraper.rs
use super::{types::*, helpers::*};
use csv::ReaderBuilder;
use super::{types::*, helpers::*, openfigi::*};
//use crate::corporate::openfigi::OpenFigiClient;
use crate::{scraper::webdriver::*};
use fantoccini::{Client, Locator};
use scraper::{Html, Selector};
use chrono::{DateTime, Duration, NaiveDate, Timelike, Utc};
use chrono::{DateTime, Duration, NaiveDate, Utc};
use tokio::{time::{Duration as TokioDuration, sleep}};
use reqwest::Client as HttpClient;
use serde_json::Value;
use serde_json::{json, Value};
use zip::ZipArchive;
use std::fs::File;
use std::{collections::HashMap};
use std::io::{Read, BufReader};
use std::{collections::HashMap, sync::Arc};
use std::io::{Read};
use anyhow::{anyhow, Result};
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36";
/// Discover all exchanges where this ISIN trades by querying Yahoo Finance
pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> anyhow::Result<Vec<TickerInfo>> {
/// Discover all exchanges where this ISIN trades by querying Yahoo Finance and enriching with OpenFIGI API calls.
///
/// # Arguments
/// * `isin` - The ISIN to search for.
/// * `known_ticker` - A known ticker symbol for fallback or initial check.
///
/// # Returns
/// A vector of FigiInfo structs containing enriched data from API calls.
///
/// # Errors
/// Returns an error if HTTP requests fail, JSON parsing fails, or OpenFIGI API responds with an error.
pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> anyhow::Result<Vec<FigiInfo>> {
println!(" Discovering exchanges for ISIN {}", isin);
let mut discovered_tickers = Vec::new();
let mut potential: Vec<(String, PrimaryInfo)> = Vec::new();
// Try the primary ticker first
if let Ok(info) = check_ticker_exists(known_ticker).await {
discovered_tickers.push(info);
potential.push((known_ticker.to_string(), info));
}
// Search for ISIN directly on Yahoo to find other listings
@@ -31,14 +42,14 @@ pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> any
isin
);
match HttpClient::new()
let resp = HttpClient::new()
.get(&search_url)
.header("User-Agent", USER_AGENT)
.send()
.await
{
Ok(resp) => {
if let Ok(json) = resp.json::<Value>().await {
.await?;
let json = resp.json::<Value>().await?;
if let Some(quotes) = json["quotes"].as_array() {
for quote in quotes {
// First: filter by quoteType directly from search results (faster rejection)
@@ -49,131 +60,249 @@ pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> any
if let Some(symbol) = quote["symbol"].as_str() {
// Avoid duplicates
if discovered_tickers.iter().any(|t: &TickerInfo| t.ticker == symbol) {
if potential.iter().any(|(s, _)| s == symbol) {
continue;
}
// Double-check with full quote data (some search results are misleading)
match check_ticker_exists(symbol).await {
Ok(info) => {
println!(" Found equity listing: {} on {} ({})",
symbol, info.exchange_mic, info.currency);
discovered_tickers.push(info);
if let Ok(info) = check_ticker_exists(symbol).await {
potential.push((symbol.to_string(), info));
}
Err(e) => {
// Most common: it's not actually equity or not tradable
// println!(" Rejected {}: {}", symbol, e);
continue;
}
}
// Be respectful to Yahoo
sleep(TokioDuration::from_millis(120)).await;
}
}
}
}
}
Err(e) => println!(" Search API error: {}", e),
}
// Also try common exchange suffixes for the base ticker
if let Some(base) = known_ticker.split('.').next() {
let suffixes = vec![
"", // US
".L", // London
".DE", // Frankfurt/XETRA
".PA", // Paris
".AS", // Amsterdam
".MI", // Milan
".SW", // Switzerland
".T", // Tokyo
".HK", // Hong Kong
".SS", // Shanghai
".SZ", // Shenzhen
".TO", // Toronto
".AX", // Australia
".SA", // Brazil
".MC", // Madrid
".BO", // Bombay
".NS", // National Stock Exchange India
];
for suffix in suffixes {
let test_ticker = format!("{}{}", base, suffix);
// Skip if already found
if discovered_tickers.iter().any(|t| t.ticker == test_ticker) {
continue;
}
if let Ok(info) = check_ticker_exists(&test_ticker).await {
discovered_tickers.push(info);
sleep(TokioDuration::from_millis(100)).await;
}
}
}
println!(" Found {} tradable exchanges", discovered_tickers.len());
Ok(discovered_tickers)
}
if potential.is_empty() {
return Ok(vec![]);
}
/// Check if a ticker exists and get its exchange/currency info
async fn check_ticker_exists(ticker: &str) -> anyhow::Result<TickerInfo> {
let url = format!(
"https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price",
ticker
);
// Enrich with OpenFIGI API
let client = OpenFigiClient::new()?;
let resp = HttpClient::new()
.get(&url)
.header("User-Agent", USER_AGENT)
let mut discovered_figis = Vec::new();
if !client.has_key() {
// Fallback without API key - create FigiInfo with default/empty fields
for (symbol, info) in potential {
println!(" Found equity listing: {} on {} ({}) - no FIGI (fallback mode)", symbol, info.exchange_mic, info.currency);
let figi_info = FigiInfo {
isin: info.isin,
figi: String::new(),
name: info.name,
ticker: symbol,
mic_code: info.exchange_mic,
currency: info.currency,
compositeFIGI: String::new(),
securityType: String::new(),
marketSector: String::new(),
shareClassFIGI: String::new(),
securityType2: String::new(),
securityDescription: String::new(),
};
discovered_figis.push(figi_info);
}
return Ok(discovered_figis);
}
// With API key, batch the mapping requests
let chunk_size = 100;
for chunk in potential.chunks(chunk_size) {
let mut jobs = vec![];
for (symbol, info) in chunk {
jobs.push(json!({
"idType": "TICKER",
"idValue": symbol,
"micCode": info.exchange_mic,
"marketSecDes": "Equity",
}));
}
let resp = client.get_figi_client()
.post("https://api.openfigi.com/v3/mapping")
.header("Content-Type", "application/json")
.json(&jobs)
.send()
.await?;
let json: Value = resp.json().await?;
if let Some(result) = json["quoteSummary"]["result"].as_array() {
if result.is_empty() {
return Err(anyhow::anyhow!("No quote data for {}", ticker));
if !resp.status().is_success() {
return Err(anyhow::anyhow!("OpenFIGI mapping failed with status: {}", resp.status()));
}
let quote = &result[0]["price"];
let parsed: Vec<Value> = resp.json().await?;
// CRITICAL: Only accept EQUITY securities
for (i, item) in parsed.iter().enumerate() {
let (symbol, info) = &chunk[i];
if let Some(data) = item["data"].as_array() {
if let Some(entry) = data.first() {
let market_sec = entry["marketSector"].as_str().unwrap_or("");
if market_sec != "Equity" {
continue;
}
println!(" Found equity listing: {} on {} ({}) - FIGI: {}", symbol, info.exchange_mic, info.currency, entry["figi"]);
let figi_info = FigiInfo {
isin: info.isin.clone(),
figi: entry["figi"].as_str().unwrap_or("").to_string(),
name: entry["name"].as_str().unwrap_or(&info.name).to_string(),
ticker: symbol.clone(),
mic_code: info.exchange_mic.clone(),
currency: info.currency.clone(),
compositeFIGI: entry["compositeFIGI"].as_str().unwrap_or("").to_string(),
securityType: entry["securityType"].as_str().unwrap_or("").to_string(),
marketSector: market_sec.to_string(),
shareClassFIGI: entry["shareClassFIGI"].as_str().unwrap_or("").to_string(),
securityType2: entry["securityType2"].as_str().unwrap_or("").to_string(),
securityDescription: entry["securityDescription"].as_str().unwrap_or("").to_string(),
};
discovered_figis.push(figi_info);
} else {
println!(" No data returned for ticker {} on MIC {}", symbol, info.exchange_mic);
}
} else if let Some(error) = item["error"].as_str() {
println!(" OpenFIGI error for ticker {}: {}", symbol, error);
}
}
// Respect rate limit (6 seconds between requests with key)
sleep(TokioDuration::from_secs(6)).await;
}
Ok(discovered_figis)
}
/// Check if a ticker exists on Yahoo Finance and return core metadata.
///
/// This function calls the public Yahoo Finance quoteSummary endpoint and extracts:
/// - ISIN (when available)
/// - Company name
/// - Exchange MIC code
/// - Trading currency
///
/// It strictly filters to only accept **equity** securities.
///
/// # Arguments
/// * `ticker` - The ticker symbol to validate (e.g., "AAPL", "7203.T", "BMW.DE")
///
/// # Returns
/// `Ok(PrimaryInfo)` on success, `Err` if ticker doesn't exist, is not equity, or data is malformed.
///
/// # Errors
/// - Ticker not found
/// - Not an equity (ETF, bond, etc.)
/// - Missing critical fields
/// - Network or JSON parsing errors
pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result<PrimaryInfo> {
let url = format!(
"https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price%2CassetProfile",
ticker
);
let resp = match HttpClient::new()
.get(&url)
.header("User-Agent", USER_AGENT)
.send()
.await
{
Ok(resp) => resp,
Err(err) => {
return Err(anyhow::anyhow!(
"Failed to reach Yahoo Finance for ticker {}: {}",
ticker,
err
));
}
};
if !resp.status().is_success() {
return Err(anyhow::anyhow!("Yahoo returned HTTP {} for ticker {}", resp.status(), ticker));
}
let json: Value = match resp
.json()
.await {
Ok(resp) => resp,
Err(err) => {
return Err(anyhow::anyhow!(
"Failed to parse JSON response from Yahoo Finance {}: {}",
ticker,
err
));
}
};
let result_array = json["quoteSummary"]["result"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing 'quoteSummary.result' in response"))?;
if result_array.is_empty() || result_array[0].is_null() {
return Err(anyhow::anyhow!("No quote data returned for ticker {}", ticker));
}
let quote = &result_array[0]["price"];
let profile = &result_array[0]["assetProfile"];
// === 1. Must be EQUITY ===
let quote_type = quote["quoteType"]
.as_str()
.unwrap_or("")
.to_uppercase();
.to_ascii_uppercase();
if quote_type != "EQUITY" {
// Optional: debug what was filtered
println!(" → Skipping {} (quoteType: {})", ticker, quote_type);
return Err(anyhow::anyhow!("Not an equity: {}", quote_type));
return Err(anyhow::anyhow!("Not an equity security: {}", quote_type));
}
let exchange = quote["exchange"].as_str().unwrap_or("");
let currency = quote["currency"].as_str().unwrap_or("USD");
let short_name = quote["shortName"].as_str().unwrap_or("");
// === 2. Extract basic info ===
let long_name = quote["longName"]
.as_str()
.or_else(|| quote["shortName"].as_str())
.unwrap_or(ticker)
.trim()
.to_string();
// Optional: extra sanity — make sure it's not a bond masquerading as equity
if short_name.to_uppercase().contains("BOND") ||
short_name.to_uppercase().contains("NOTE") ||
short_name.to_uppercase().contains("DEBENTURE") {
return Err(anyhow::anyhow!("Name suggests debt security"));
let currency = quote["currency"]
.as_str()
.unwrap_or("USD")
.to_string();
let exchange_mic = quote["exchange"]
.as_str()
.unwrap_or("")
.to_string();
if exchange_mic.is_empty() {
return Err(anyhow::anyhow!("Missing exchange MIC for ticker {}", ticker));
}
if !exchange.is_empty() {
return Ok(TickerInfo {
ticker: ticker.to_string(),
exchange_mic: exchange.to_string(),
currency: currency.to_string(),
primary: false,
});
}
// === 3. Extract ISIN (from assetProfile if available) ===
let isin = profile["isin"]
.as_str()
.and_then(|s| if s.len() == 12 && s.chars().all(|c| c.is_ascii_alphanumeric()) { Some(s) } else { None })
.unwrap_or("")
.to_ascii_uppercase();
// === 4. Final sanity check: reject obvious debt securities ===
let name_upper = long_name.to_ascii_uppercase();
if name_upper.contains(" BOND") ||
name_upper.contains(" NOTE") ||
name_upper.contains(" DEBENTURE") ||
name_upper.contains(" PREFERRED") && !name_upper.contains(" STOCK") {
return Err(anyhow::anyhow!("Security name suggests debt instrument: {}", long_name));
}
Err(anyhow::anyhow!("Invalid or missing data for {}", ticker))
println!(
" → Valid equity: {} | {} | {} | ISIN: {}",
ticker,
long_name,
exchange_mic,
if isin.is_empty() { "N/A" } else { &isin }
);
Ok(PrimaryInfo {
isin,
name: long_name,
exchange_mic,
currency,
})
}
/// Convert Yahoo's exchange name to MIC code (best effort)
@@ -203,84 +332,150 @@ fn exchange_name_to_mic(name: &str) -> String {
}.to_string()
}
pub async fn dismiss_yahoo_consent(client: &Client) -> anyhow::Result<()> {
let script = r#"
(() => {
const agree = document.querySelector('button[name="agree"]');
if (agree) {
agree.click();
return true;
}
return false;
})()
"#;
/// Fetches earnings events for a ticker using a dedicated ScrapeTask.
///
/// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar,
/// reject cookies, and extract the events.
///
/// # Arguments
/// * `ticker` - The stock ticker symbol.
///
/// # Returns
/// A vector of CompanyEvent structs on success.
///
/// # Errors
/// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues.
pub async fn fetch_earnings_with_pool(
ticker: &str,
pool: &Arc<ChromeDriverPool>,
) -> anyhow::Result<Vec<CompanyEvent>> {
let ticker = ticker.to_string();
let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}", ticker);
for _ in 0..10 {
let done: bool = client.execute(script, vec![]).await?.as_bool().unwrap_or(false);
if done {
break;
}
sleep(TokioDuration::from_millis(500)).await;
}
Ok(())
let ticker_cloned = ticker.clone();
pool.execute(url, move |client| {
let ticker = ticker_cloned.clone();
Box::pin(async move {
reject_yahoo_cookies(&client).await?;
extract_earnings_events(&client, &ticker).await
})
}).await
}
pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Result<Vec<CompanyEvent>> {
let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}&offset=0&size=100", ticker);
client.goto(&url).await?;
dismiss_yahoo_consent(client).await?;
/// Extracts earnings events from the currently loaded Yahoo Finance earnings calendar page.
///
/// This function assumes the client is already navigated to the correct URL (e.g.,
/// https://finance.yahoo.com/calendar/earnings?symbol={ticker}) and cookies are handled.
///
/// It waits for the earnings table, extracts rows, parses cells into CompanyEvent structs,
/// and handles date parsing, float parsing, and optional fields.
///
/// # Arguments
/// * `client` - The fantoccini Client with the page loaded.
/// * `ticker` - The stock ticker symbol for the events.
///
/// # Returns
/// A vector of CompanyEvent on success.
///
/// # Errors
/// Returns an error if:
/// - Table or elements not found.
/// - Date or float parsing fails.
/// - WebDriver operations fail.
///
/// # Examples
///
/// ```no_run
/// use fantoccini::Client;
/// use crate::corporate::scraper::extract_earnings;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // Assume client is set up and navigated
/// let events = extract_earnings(&client, "AAPL").await?;
/// Ok(())
/// }
/// ```
pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result<Vec<CompanyEvent>> {
// Wait for the table to load
let table = client
.wait()
.for_element(Locator::Css(r#"table[data-test="cal-table"]"#))
.await
.map_err(|e| anyhow!("Failed to find earnings table: {}", e))?;
loop {
match client.find(Locator::XPath(r#"//button[contains(text(), 'Show More')]"#)).await {
Ok(btn) => {
btn.click().await?;
sleep(TokioDuration::from_secs(2)).await;
}
Err(_) => break,
}
// Find all rows in tbody
let rows = table
.find_all(Locator::Css("tbody tr"))
.await
.map_err(|e| anyhow!("Failed to find table rows: {}", e))?;
let mut events = Vec::with_capacity(rows.len());
for row in rows {
let cells = row
.find_all(Locator::Css("td"))
.await
.map_err(|e| anyhow!("Failed to find cells in row: {}", e))?;
if cells.len() < 5 {
continue; // Skip incomplete rows
}
let html = client.source().await?;
let document = Html::parse_document(&html);
let row_sel = Selector::parse("table tbody tr").unwrap();
let mut events = Vec::new();
// Extract and parse date
let date_str = cells[0]
.text()
.await
.map_err(|e| anyhow!("Failed to get date text: {}", e))?;
let date = parse_yahoo_date(&date_str)
.map_err(|e| anyhow!("Failed to parse date '{}': {}", date_str, e))?
.format("%Y-%m-%d")
.to_string();
for row in document.select(&row_sel) {
let cols: Vec<String> = row.select(&Selector::parse("td").unwrap())
.map(|td| td.text().collect::<Vec<_>>().join(" ").trim().to_string())
.collect();
if cols.len() < 6 { continue; }
// Extract time, replace "Time Not Supplied" with empty
let time = cells[1]
.text()
.await
.map_err(|e| anyhow!("Failed to get time text: {}", e))?
.replace("Time Not Supplied", "");
let full_date = &cols[2];
let parts: Vec<&str> = full_date.split(" at ").collect();
let raw_date = parts[0].trim();
let time_str = if parts.len() > 1 { parts[1].trim() } else { "" };
// Extract period
let period = cells[2]
.text()
.await
.map_err(|e| anyhow!("Failed to get period text: {}", e))?;
let date = match parse_yahoo_date(raw_date) {
Ok(d) => d,
Err(_) => continue,
};
// Parse EPS forecast
let eps_forecast_str = cells[3]
.text()
.await
.map_err(|e| anyhow!("Failed to get EPS forecast text: {}", e))?;
let eps_forecast = parse_float(&eps_forecast_str);
let eps_forecast = parse_float(&cols[3]);
let eps_actual = if cols[4] == "-" { None } else { parse_float(&cols[4]) };
// Parse EPS actual
let eps_actual_str = cells[4]
.text()
.await
.map_err(|e| anyhow!("Failed to get EPS actual text: {}", e))?;
let eps_actual = parse_float(&eps_actual_str);
let surprise_pct = if let (Some(f), Some(a)) = (eps_forecast, eps_actual) {
if f.abs() > 0.001 { Some((a - f) / f.abs() * 100.0) } else { None }
} else { None };
let time = if time_str.contains("PM") {
"AMC".to_string()
} else if time_str.contains("AM") {
"BMO".to_string()
// Parse surprise % if available
let surprise_pct = if cells.len() > 5 {
let surprise_str = cells[5]
.text()
.await
.map_err(|e| anyhow!("Failed to get surprise text: {}", e))?;
parse_float(&surprise_str)
} else {
"".to_string()
None
};
events.push(CompanyEvent {
ticker: ticker.to_string(),
date: date.format("%Y-%m-%d").to_string(),
date,
time,
period: "".to_string(),
period,
eps_forecast,
eps_actual,
revenue_forecast: None,
@@ -290,6 +485,12 @@ pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Re
});
}
if events.is_empty() {
eprintln!("Warning: No earnings events extracted for ticker {}", ticker);
} else {
println!("Extracted {} earnings events for {}", events.len(), ticker);
}
Ok(events)
}
@@ -469,8 +670,8 @@ pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow
pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download";
let zip_path = "data/isin_lei.zip";
let csv_path = "data/isin_lei.csv";
let zip_path = "data/gleif/isin_lei.zip";
let csv_path = "data/gleif/isin_lei.csv";
if let Err(e) = std::fs::create_dir_all("data") {
println!("Failed to create data directory: {e}");
@@ -613,57 +814,6 @@ pub async fn load_isin_lei_csv() -> anyhow::Result<HashMap<String, Vec<String>>>
Ok(map)
}
pub async fn get_primary_isin_and_name(
client: &Client, // Pass your existing Selenium client
ticker: &str,
) -> anyhow::Result<PrimaryInfo> {
// Navigate to the actual quote page (always works)
let quote_url = format!("https://finance.yahoo.com/quote/{}", ticker);
client.goto(&quote_url).await?;
// Dismiss overlays/banners (your function + guce-specific)
reject_yahoo_cookies(client).await?;
// Wait for page to load (key data elements)
sleep(TokioDuration::from_millis(2000)).await;
// Get page HTML and parse
let html = client.source().await?;
let document = Html::parse_document(&html);
// Selectors for key fields (tested on real Yahoo pages Nov 2025)
let name_sel = Selector::parse("h1[data-testid='qsp-price-header']").unwrap_or_else(|_| Selector::parse("h1").unwrap());
let isin_sel = Selector::parse("[data-testid='qsp-symbol'] + div [data-field='isin']").unwrap_or_else(|_| Selector::parse("[data-field='isin']").unwrap());
let exchange_sel = Selector::parse("[data-testid='qsp-market'] span").unwrap_or_else(|_| Selector::parse(".TopNav__Exchange").unwrap());
let currency_sel = Selector::parse("[data-testid='qsp-price'] span:contains('USD')").unwrap_or_else(|_| Selector::parse(".TopNav__Currency").unwrap()); // Adjust for dynamic
let name_elem = document.select(&name_sel).next().map(|e| e.text().collect::<String>().trim().to_string());
let isin_elem = document.select(&isin_sel).next().map(|e| e.text().collect::<String>().trim().to_uppercase());
let exchange_elem = document.select(&exchange_sel).next().map(|e| e.text().collect::<String>().trim().to_string());
let currency_elem = document.select(&currency_sel).next().map(|e| e.text().collect::<String>().trim().to_string());
let name = name_elem.unwrap_or_else(|| ticker.to_string());
let isin = isin_elem.unwrap_or_default();
let exchange_mic = exchange_elem.unwrap_or_default();
let currency = currency_elem.unwrap_or_else(|| "USD".to_string());
// Validate ISIN
let valid_isin = if isin.len() == 12 && isin.chars().all(|c| c.is_alphanumeric()) {
isin
} else {
"".to_string()
};
println!(" → Scraped {}: {} | ISIN: {} | Exchange: {}", ticker, name, valid_isin, exchange_mic);
Ok(PrimaryInfo {
isin: valid_isin,
name,
exchange_mic,
currency,
})
}
pub async fn reject_yahoo_cookies(client: &Client) -> anyhow::Result<()> {
for _ in 0..10 {
let clicked: bool = client

View File

@@ -1,5 +1,5 @@
// src/corporate/storage.rs
use super::{types::*, helpers::*, scraper::get_primary_isin_and_name};
use super::{types::*, helpers::*};
use crate::config;
use tokio::fs;
@@ -102,17 +102,6 @@ pub async fn save_prices_for_ticker(ticker: &str, timeframe: &str, mut prices: V
Ok(())
}
pub async fn _load_companies() -> Result<Vec<CompanyMetadata>, anyhow::Error> {
let path = Path::new("src/data/companies.json");
if !path.exists() {
println!("Missing companies.json file at src/data/companies.json");
return Ok(vec![]);
}
let content = fs::read_to_string(path).await?;
let companies: Vec<CompanyMetadata> = serde_json::from_str(&content)?;
Ok(companies)
}
pub fn get_company_dir(lei: &str) -> PathBuf {
PathBuf::from("corporate_prices").join(lei)
}
@@ -132,20 +121,6 @@ pub async fn ensure_company_dirs(isin: &str) -> anyhow::Result<()> {
Ok(())
}
pub async fn save_company_metadata(company: &CompanyMetadata) -> anyhow::Result<()> {
let dir = get_company_dir(&company.lei);
fs::create_dir_all(&dir).await?;
let path = dir.join("metadata.json");
fs::write(&path, serde_json::to_string_pretty(company)?).await?;
Ok(())
}
pub async fn load_company_metadata(lei: &str) -> anyhow::Result<CompanyMetadata> {
let path = get_company_dir(lei).join("metadata.json");
let content = fs::read_to_string(path).await?;
Ok(serde_json::from_str(&content)?)
}
pub async fn save_available_exchanges(isin: &str, exchanges: Vec<AvailableExchange>) -> anyhow::Result<()> {
let dir = get_company_dir(isin);
fs::create_dir_all(&dir).await?;
@@ -210,18 +185,28 @@ pub async fn update_available_exchange(
}
/// Add a newly discovered exchange before fetching
///
/// # Arguments
/// * `isin` - The ISIN associated with the exchange.
/// * `figi_info` - The FigiInfo containing ticker, mic_code, and currency.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if loading or saving available exchanges fails.
pub async fn add_discovered_exchange(
isin: &str,
ticker_info: &TickerInfo,
figi_info: &FigiInfo,
) -> anyhow::Result<()> {
let mut exchanges = load_available_exchanges(isin).await?;
// Only add if not already present
if !exchanges.iter().any(|e| e.ticker == ticker_info.ticker) {
if !exchanges.iter().any(|e| e.ticker == figi_info.ticker && e.exchange_mic == figi_info.mic_code) {
let new_entry = AvailableExchange::new(
ticker_info.ticker.clone(),
ticker_info.exchange_mic.clone(),
ticker_info.currency.clone(),
figi_info.ticker.clone(),
figi_info.mic_code.clone(),
figi_info.currency.clone(),
);
exchanges.push(new_entry);
save_available_exchanges(isin, exchanges).await?;

View File

@@ -1,3 +1,5 @@
use std::collections::HashMap;
// src/corporate/types.rs
use serde::{Deserialize, Serialize};
@@ -39,21 +41,78 @@ pub struct CompanyEventChange {
pub detected_at: String,
}
/// Figi Info based on API calls [https://www.openfigi.com/]
/// # Attributes
/// isin: ISIN belonging to this legal entity from lei
///
/// # Comments
/// Use Mapping the Object List onto Figi Properties
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TickerInfo {
pub struct FigiInfo {
pub isin: String,
pub figi: String,
pub name: String,
pub ticker: String,
pub exchange_mic: String,
pub mic_code: String,
pub currency: String,
pub isin: String, // ISIN belonging to this legal entity (primary + ADR + GDR)
pub compositeFIGI: String,
pub securityType: String,
pub marketSector: String,
pub shareClassFIGI: String,
pub securityType2: String,
pub securityDescription: String,
}
/// Company Meta Data
/// # Attributes
/// * lei: Structuring the companies by legal dependencies [LEI -> Vec<ISIN>]
/// * figi: metadata with ISIN as key
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyMetadata {
pub lei: String,
pub figi: Option<String>,
pub figi: Option<Vec<FigiInfo>>,
}
/// Company Info
/// # Attributes
/// * Name as primary key (for one instition) -> might have to changed when first FigiInfo is coming in
/// * ISIN as the most liquid / preferred traded security (used for fallback)
/// * securities: Grouped by ISIN, filtered for Common Stock only
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyInfo{
pub name: String,
pub primary_isin: String, // The most liquid / preferred one (used for folder fallback)
pub tickers: Vec<TickerInfo>,
pub primary_isin: String,
pub securities: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo>
}
/// Warrant Info
///
/// Information for Warrant securities fetched out of Name in FigiInfo
/// example1: "name": "VONTOBE-PW26 LEONARDO SPA",
/// issued by VONTOBEL Put Warrant for underlying company LEONARDO SPA
/// example2: "BAYER H-CW25 L'OREAL",
/// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WarrantInfo {
pub underlying_company_name: String, // key in CompanyInfo, key for WarrantInfo
pub issuer_company_name: Option<String>, // key in CompanyInfo
pub warrant_type: String, // "put" or "call"
pub warrants: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
}
/// Option Info
///
/// Information for Option securities fetched out of Name in FigiInfo
/// example1: "name": "December 25 Calls on ALPHA GA",
/// issued by NULL Call Option for underlying company ALPHA GA
/// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionInfo {
pub underlying_company_name: String, // key in CompanyInfo, key for OptionInfo
pub issuer_company_name: Option<String>, // key in CompanyInfo
pub option_type: String, // "put" or "call"
pub options: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -1,187 +1,100 @@
// src/corporate/update.rs
use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfigi::*};
use crate::config::Config;
use crate::scraper::webdriver::ChromeDriverPool;
use chrono::Local;
use std::collections::HashMap;
use std::collections::{HashMap};
use std::sync::Arc;
pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> anyhow::Result<()> {
println!("Starting LEI-based corporate update");
/// Main function: Full update for all companies (LEI-based) with optimized parallel execution.
///
/// This function coordinates the entire update process:
/// - Loads GLEIF mappings
/// - Builds FIGI-LEI map
/// - Loads existing events
/// - Processes each company: discovers exchanges via FIGI, fetches prices & earnings, aggregates data
/// - Uses the provided shared ChromeDriver pool for efficient parallel scraping
/// - Saves optimized events
///
/// # Arguments
/// * `config` - The application configuration.
/// * `pool` - Shared pool of ChromeDriver instances for scraping.
///
/// # Errors
/// Returns an error if any step in the update process fails.
pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
println!("=== Starting LEI-based corporate full update ===");
// 1. Download fresh GLEIF ISINLEI mapping on every run
// 1. Load fresh GLEIF ISINLEI mapping
let lei_to_isins: HashMap<String, Vec<String>> = match load_isin_lei_csv().await {
Ok(map) => map,
Err(e) => {
println!("Warning: Failed to load ISIN↔LEI mapping: {}", e);
eprintln!("Warning: Could not load GLEIF ISIN↔LEI mapping: {}", e);
HashMap::new()
}
};
let figi_to_lei: HashMap<String, String> = match build_figi_to_lei_map(&lei_to_isins).await {
// 2. Load OpenFIGI mapping value lists (cached)
if let Err(e) = load_figi_type_lists().await {
eprintln!("Warning: Could not load OpenFIGI type lists: {}", e);
}
// 3. Build FIGI → LEI map
// # Attributes
// * lei: Structuring the companies by legal dependencies [LEI -> Vec<ISIN>]
// * figi: metadata with ISIN as key
let figi_to_lei:HashMap<String, Vec<FigiInfo>> = match build_lei_to_figi_infos(&lei_to_isins).await {
Ok(map) => map,
Err(e) => {
println!("Warning: Failed to build FIGI→LEI map: {}", e);
eprintln!("Warning: Could not build FIGI→LEI map: {}", e);
HashMap::new()
}
};
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
let mut existing_events = load_existing_events().await?;
// 4. Load or build companies
let mut companies = load_or_build_all_securities(&figi_to_lei).await?;
println!("Processing {} companies", companies.0.len());
let mut companies: Vec<CompanyMetadata> = match load_or_build_companies_figi(&lei_to_isins, &figi_to_lei).await {
Ok(comps) => comps,
// 5. Load existing earnings events (for change detection)
let today = Local::now().format("%Y-%m-%d").to_string();
let mut existing_events = match load_existing_events().await {
Ok(events) => events,
Err(e) => {
println!("Error loading/building company metadata: {}", e);
return Err(e);
eprintln!("Warning: Could not load existing events: {}", e);
HashMap::new()
}
}; // Vec<CompanyMetadata> with lei, isins, tickers
};
for mut company in companies {
println!("\nProcessing company: {} (LEI: {})", company.name, company.lei);
// 5. Use the provided pool (no need to create a new one)
let pool_size = pool.get_number_of_instances(); // Use the size from the shared pool
// === Enrich with ALL ISINs known to GLEIF (includes ADRs, GDRs, etc.) ===
if let Some(all_isins) = lei_to_isins.get(&company.lei) {
let mut seen = company.isins.iter().cloned().collect::<std::collections::HashSet<_>>();
for isin in all_isins {
if !seen.contains(isin) {
company.isins.push(isin.clone());
seen.insert(isin.clone());
}
}
// Process companies in parallel using the shared pool
/*let results: Vec<_> = stream::iter(companies.into_iter())
.map(|company| {
let pool_clone = pool.clone();
async move {
process_company_data(&company, &pool_clone, &mut existing_events).await
}
})
.buffer_unordered(pool_size)
.collect().await;
// Ensure company directory exists (now uses LEI)
//let figi_dir = format!("data/companies_by_figi/{}/", company.primary_figi);
ensure_company_dirs(&company.lei).await?;
save_company_metadata(&company).await?;
// Handle results (e.g., collect changes)
let mut all_changes = Vec::new();
for result in results {
if let Ok(ProcessResult { changes }) = result {
all_changes.extend(changes);
}
}*/
// === STEP 1: Discover additional exchanges using each known ISIN ===
let mut all_tickers = company.tickers.clone();
if let Some(primary_ticker) = company.tickers.iter().find(|t| t.primary) {
println!(" Discovering additional exchanges across {} ISIN(s)...", company.isins.len());
for isin in &company.isins {
println!(" → Checking ISIN: {}", isin);
match discover_available_exchanges(isin, &primary_ticker.ticker).await {
Ok(discovered) => {
if discovered.is_empty() {
println!(" No new exchanges found for {}", isin);
} else {
for disc in discovered {
if !all_tickers.iter().any(|t| t.ticker == disc.ticker && t.exchange_mic == disc.exchange_mic) {
println!(" New equity listing → {} ({}) via ISIN {}",
disc.ticker, disc.exchange_mic, isin);
all_tickers.push(disc);
}
}
}
}
Err(e) => println!(" Discovery failed for {}: {}", isin, e),
}
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
}
}
// Save updated metadata if we found new listings
if all_tickers.len() > company.tickers.len() {
company.tickers = all_tickers.clone();
save_company_metadata(&company).await?;
println!(" Updated metadata: {} total tickers", all_tickers.len());
}
// === STEP 2: Fetch data from ALL available tickers ===
for ticker_info in &all_tickers {
let ticker = &ticker_info.ticker;
println!(" → Fetching: {} ({})", ticker, ticker_info.exchange_mic);
let mut daily_success = false;
let mut intraday_success = false;
// Earnings: only fetch from primary ticker to avoid duplicates
if ticker_info.primary {
if let Ok(new_events) = fetch_earnings_history(client, ticker).await {
let result = process_batch(&new_events, &mut existing_events, &today);
save_changes(&result.changes).await?;
println!(" Earnings events: {}", new_events.len());
}
}
// Daily prices
if let Ok(prices) = fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await {
if !prices.is_empty() {
save_prices_by_source(&company.lei, ticker, "daily", prices).await?;
daily_success = true;
}
}
// 5-minute intraday (last 60 days)
let sixty_days_ago = (chrono::Local::now() - chrono::Duration::days(60))
.format("%Y-%m-%d")
.to_string();
if let Ok(prices) = fetch_price_history_5min(ticker, &sixty_days_ago, &today).await {
if !prices.is_empty() {
save_prices_by_source(&company.lei, ticker, "5min", prices).await?;
intraday_success = true;
}
}
// Update available_exchanges.json (now under LEI folder)
update_available_exchange(
&company.lei,
ticker,
&ticker_info.exchange_mic,
daily_success,
intraday_success,
).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(800)).await;
}
// === STEP 3: Aggregate all sources into unified USD prices ===
println!(" Aggregating multi-source price data (FX-adjusted)...");
if let Err(e) = aggregate_best_price_data(&company.lei).await {
println!(" Aggregation failed: {}", e);
} else {
println!(" Aggregation complete");
}
}
// Final save of optimized earnings events
save_optimized_events(existing_events).await?;
println!("\nCorporate update complete (LEI-based)");
//save_changes(&all_changes).await?;
//println!("Corporate update complete — {} changes detected", all_changes.len());
Ok(())
}
async fn enrich_companies_with_leis(
companies: &mut Vec<CompanyMetadata>,
lei_to_isins: &HashMap<String, Vec<String>>,
) {
for company in companies.iter_mut() {
if company.lei.is_empty() {
// Try to find LEI by any known ISIN
for isin in &company.isins {
for (lei, isins) in lei_to_isins {
if isins.contains(isin) {
company.lei = lei.clone();
println!("Found real LEI {} for {}", lei, company.name);
break;
}
}
if !company.lei.is_empty() { break; }
}
}
// Fallback: generate fake LEI if still missing
if company.lei.is_empty() {
company.lei = format!("FAKE{:019}", rand::random::<u64>());
println!("No real LEI found → using fake for {}", company.name);
}
}
}
pub struct ProcessResult {
pub changes: Vec<CompanyEventChange>,
}

View File

@@ -1,58 +0,0 @@
[
{
"lei": "8I5D5ASD7N5Z5P2K9M3J",
"isins": ["US46625H1005"],
"primary_isin": "US46625H1005",
"name": "JPMorgan Chase & Co.",
"tickers": [
{ "ticker": "JPM", "exchange_mic": "XNYS", "currency": "USD", "primary": true },
{ "ticker": "JPM-PC", "exchange_mic": "XNYS", "currency": "USD", "primary": false }
]
},
{
"lei": "5493001KJTIIGC8Y1R12",
"isins": ["US5949181045"],
"primary_isin": "US5949181045",
"name": "Microsoft Corporation",
"tickers": [
{ "ticker": "MSFT", "exchange_mic": "XNAS", "currency": "USD", "primary": true }
]
},
{
"lei": "529900T8BM49AURSDO55",
"isins": ["CNE000001P37"],
"primary_isin": "CNE000001P37",
"name": "Industrial and Commercial Bank of China",
"tickers": [
{ "ticker": "601398.SS", "exchange_mic": "XSHG", "currency": "CNY", "primary": true },
{ "ticker": "1398.HK", "exchange_mic": "XHKG", "currency": "HKD", "primary": false }
]
},
{
"lei": "519900X5W8K6C1FZ3B57",
"isins": ["JP3702200000"],
"primary_isin": "JP3702200000",
"name": "Toyota Motor Corporation",
"tickers": [
{ "ticker": "7203.T", "exchange_mic": "XJPX", "currency": "JPY", "primary": true },
{ "ticker": "TM", "exchange_mic": "XNYS", "currency": "USD", "primary": false }
]
},
{
"lei": "529900T8BM49AURSDO56",
"isins": ["HK0000069689"],
"primary_isin": "HK0000069689",
"name": "Tencent Holdings Limited",
"tickers": [
{ "ticker": "0700.HK", "exchange_mic": "XHKG", "currency": "HKD", "primary": true },
{ "ticker": "TCEHY", "exchange_mic": "OTCM", "currency": "USD", "primary": false }
]
},
{
"lei": "8I5D5Q1L7N5Z5P2K9M3J",
"isins": ["US90953F1049"],
"primary_isin": "US90953F1049",
"name": "Test Bonds Filter",
"tickers": [{ "ticker": "JPM", "exchange_mic": "XNYS", "currency": "USD", "primary": true }]
}
]

View File

@@ -1,9 +0,0 @@
[
"afrika",
"asien",
"europa",
"nordamerika",
"suedamerika",
"antarktis",
"ozeanien"
]

View File

@@ -1,54 +0,0 @@
[
"aegypten",
"frankreich",
"litauen",
"schweiz",
"argentinien",
"griechenland",
"mexiko",
"singapur",
"australien",
"hongkong",
"neuseeland",
"slowakei",
"bahrain",
"indien",
"niederlande",
"spanien",
"belgien",
"indonesien",
"norwegen",
"suedafrika",
"brasilien",
"irland",
"oesterreich",
"suedkorea",
"chile",
"island",
"peru",
"taiwan",
"china",
"italien",
"philippinen",
"tschechien",
"daenemark",
"japan",
"polen",
"tuerkei",
"deutschland",
"kanada",
"portugal",
"ungarn",
"estland",
"katar",
"rumaenien",
"usa",
"eurozone",
"kolumbien",
"russland",
"vereinigte-arabische-emirate",
"finnland",
"lettland",
"schweden",
"vereinigtes-koenigreich"
]

View File

@@ -1,260 +0,0 @@
{
"exchanges": [
{
"mic": "XNYS",
"name": "New York Stock Exchange",
"country": "United States",
"city": "New York City",
"market_cap_trillion_usd": 30.92,
"timezone": "America/New_York",
"tz_offset": "-05:00",
"dst": "MarNov",
"open_local": "09:30",
"close_local": "16:00",
"lunch_break": false,
"open_utc": "14:30",
"close_utc": "21:00",
"currency": "USD"
},
{
"mic": "XNAS",
"name": "Nasdaq",
"country": "United States",
"city": "New York City",
"market_cap_trillion_usd": 31.96,
"timezone": "America/New_York",
"tz_offset": "-05:00",
"dst": "MarNov",
"open_local": "09:30",
"close_local": "16:00",
"lunch_break": false,
"open_utc": "14:30",
"close_utc": "21:00",
"currency": "USD"
},
{
"mic": "XSHG",
"name": "Shanghai Stock Exchange",
"country": "China",
"city": "Shanghai",
"market_cap_trillion_usd": 7.96,
"timezone": "Asia/Shanghai",
"tz_offset": "+08:00",
"dst": null,
"open_local": "09:30",
"close_local": "15:00",
"lunch_break": "11:3013:00",
"open_utc": "01:30",
"close_utc": "07:00",
"currency": "CNY"
},
{
"mic": "XJPX",
"name": "Japan Exchange Group (Tokyo Stock Exchange)",
"country": "Japan",
"city": "Tokyo",
"market_cap_trillion_usd": 7.06,
"timezone": "Asia/Tokyo",
"tz_offset": "+09:00",
"dst": null,
"open_local": "09:00",
"close_local": "15:00",
"lunch_break": "11:3012:30",
"open_utc": "00:00",
"close_utc": "06:00",
"currency": "JPY"
},
{
"mic": "XHKG",
"name": "Hong Kong Stock Exchange",
"country": "Hong Kong",
"city": "Hong Kong",
"market_cap_trillion_usd": 6.41,
"timezone": "Asia/Hong_Kong",
"tz_offset": "+08:00",
"dst": null,
"open_local": "09:30",
"close_local": "16:00",
"lunch_break": "12:0013:00",
"open_utc": "01:30",
"close_utc": "08:00",
"currency": "HKD"
},
{
"mic": "XAMS",
"name": "Euronext Amsterdam",
"country": "Netherlands",
"city": "Amsterdam",
"market_cap_trillion_usd": 5.61,
"timezone": "Europe/Amsterdam",
"tz_offset": "+01:00",
"dst": "MarOct",
"open_local": "09:00",
"close_local": "17:30",
"lunch_break": false,
"open_utc": "08:00",
"close_utc": "16:30",
"currency": "EUR"
},
{
"mic": "XBSE",
"name": "Bombay Stock Exchange",
"country": "India",
"city": "Mumbai",
"market_cap_trillion_usd": 5.25,
"timezone": "Asia/Kolkata",
"tz_offset": "+05:30",
"dst": null,
"open_local": "09:15",
"close_local": "15:30",
"lunch_break": false,
"open_utc": "03:45",
"close_utc": "10:00",
"currency": "INR"
},
{
"mic": "XNSE",
"name": "National Stock Exchange of India",
"country": "India",
"city": "Mumbai",
"market_cap_trillion_usd": 5.32,
"timezone": "Asia/Kolkata",
"tz_offset": "+05:30",
"dst": null,
"open_local": "09:15",
"close_local": "15:d30",
"lunch_break": false,
"open_utc": "03:45",
"close_utc": "10:00",
"currency": "INR"
},
{
"mic": "XSHE",
"name": "Shenzhen Stock Exchange",
"country": "China",
"city": "Shenzhen",
"market_cap_trillion_usd": 5.11,
"timezone": "Asia/Shanghai",
"tz_offset": "+08:00",
"dst": null,
"open_local": "09:30",
"close_local": "15:00",
"lunch_break": "11:3013:00",
"open_utc": "01:30",
"close_utc": "07:00",
"currency": "CNY"
},
{
"mic": "XTSE",
"name": "Toronto Stock Exchange",
"country": "Canada",
"city": "Toronto",
"market_cap_trillion_usd": 4.00,
"timezone": "America/Toronto",
"tz_offset": "-05:00",
"dst": "MarNov",
"open_local": "09:30",
"close_local": "16:00",
"lunch_break": false,
"open_utc": "14:30",
"close_utc": "21:00",
"currency": "CAD"
},
{
"mic": "XLON",
"name": "London Stock Exchange",
"country": "United Kingdom",
"city": "London",
"market_cap_trillion_usd": 3.14,
"timezone": "Europe/London",
"tz_offset": "+00:00",
"dst": "MarOct",
"open_local": "08:00",
"close_local": "16:30",
"lunch_break": false,
"open_utc": "08:00",
"close_utc": "16:30",
"currency": "GBP"
},
{
"mic": "XTAI",
"name": "Taiwan Stock Exchange",
"country": "Taiwan",
"city": "Taipei",
"market_cap_trillion_usd": 2.87,
"timezone": "Asia/Taipei",
"tz_offset": "+08:00",
"dst": null,
"open_local": "09:00",
"close_local": "13:30",
"lunch_break": false,
"open_utc": "01:00",
"close_utc": "05:30",
"currency": "TWD"
},
{
"mic": "XSAU",
"name": "Saudi Exchange (Tadawul)",
"country": "Saudi Arabia",
"city": "Riyadh",
"market_cap_trillion_usd": 2.73,
"timezone": "Asia/Riyadh",
"tz_offset": "+03:00",
"dst": null,
"open_local": "10:00",
"close_local": "15:00",
"lunch_break": false,
"open_utc": "07:00",
"close_utc": "12:00",
"currency": "SAR"
},
{
"mic": "XFRA",
"name": "Deutsche Börse (Xetra)",
"country": "Germany",
"city": "Frankfurt",
"market_cap_trillion_usd": 2.04,
"timezone": "Europe/Berlin",
"tz_offset": "+01:00",
"dst": "MarOct",
"open_local": "09:00",
"close_local": "17:30",
"lunch_break": false,
"open_utc": "08:00",
"close_utc": "16:30",
"currency": "EUR"
},
{
"mic": "XSWX",
"name": "SIX Swiss Exchange",
"country": "Switzerland",
"city": "Zürich",
"market_cap_trillion_usd": 1.97,
"timezone": "Europe/Zurich",
"tz_offset": "+01:00",
"dst": "MarOct",
"open_local": "09:00",
"close_local": "17:30",
"lunch_break": false,
"open_utc": "08:00",
"close_utc": "16:30",
"currency": "CHF"
},
{
"mic": "XASX",
"name": "Australian Securities Exchange",
"country": "Australia",
"city": "Sydney",
"market_cap_trillion_usd": 1.89,
"timezone": "Australia/Sydney",
"tz_offset": "+10:00",
"dst": "OctApr",
"open_local": "10:00",
"close_local": "16:00",
"lunch_break": false,
"open_utc": "00:00",
"close_utc": "06:00",
"currency": "AUD"
}
]
}

View File

@@ -1,6 +0,0 @@
data/*
companies.json
continents.json
countries.json
exchanges.json

View File

@@ -1,7 +1,7 @@
// src/economic/helpers.rs
use super::types::*;
use chrono::{Local, NaiveDate};
use std::collections::{HashMap, HashSet};
use chrono::{Local};
use std::collections::{HashMap};
pub fn event_key(e: &EconomicEvent) -> String {
format!("{}|{}|{}", e.date, e.time, e.event)

View File

@@ -5,7 +5,4 @@ pub mod storage;
pub mod update;
pub mod helpers;
pub use types::*;
pub use scraper::*;
pub use update::run_full_update;
pub use helpers::*;

View File

@@ -1,24 +1,23 @@
// src/economic/scraper.rs
use super::types::{EconomicEvent, ScrapeResult};
use super::types::{EconomicEvent};
use fantoccini::Client;
use tokio::time::{sleep, Duration};
use chrono::{Local, NaiveDate};
const EXTRACTION_JS: &str = include_str!("extraction_script.js");
pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> {
client.goto("https://www.finanzen.net/termine/wirtschaftsdaten/").await?;
dismiss_overlays(client).await?;
//dismiss_overlays(client).await?;
if let Ok(tab) = client.find(fantoccini::Locator::Css(r#"div[data-sg-tab-item="teletrader-dates-three-stars"]"#)).await {
/*if let Ok(tab) = client.find(fantoccini::Locator::Css(r#"div[data-sg-tab-item="teletrader-dates-three-stars"]"#)).await {
tab.click().await?;
println!("High importance tab selected");
sleep(Duration::from_secs(2)).await;
}
}*/
Ok(())
}
pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> {
/*pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> {
for _ in 0..10 {
let removed: bool = client
.execute(
@@ -39,7 +38,7 @@ pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> {
sleep(Duration::from_millis(500)).await;
}
Ok(())
}
}*/
pub async fn set_date_range(client: &Client, start: &str, end: &str) -> anyhow::Result<()> {
let script = format!(

View File

@@ -2,12 +2,11 @@
use super::types::*;
use super::helpers::*;
use tokio::fs;
use chrono::{Local, NaiveDate, Datelike};
use chrono::{NaiveDate, Datelike};
use std::collections::HashMap;
use std::path::Path;
pub async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> {
let dir = std::path::Path::new("economic_events");
let dir = std::path::Path::new("data/economic/events");
let mut chunks = Vec::new();
if dir.exists() {
@@ -46,7 +45,7 @@ pub async fn load_existing_events(chunks: &[ChunkInfo]) -> anyhow::Result<HashMa
}
pub async fn save_optimized_chunks(events: HashMap<String, EconomicEvent>) -> anyhow::Result<()> {
let dir = std::path::Path::new("economic_events");
let dir = std::path::Path::new("data/economic/events");
fs::create_dir_all(dir).await?;
// Delete all old chunk files to prevent duplicates and overlaps
@@ -113,9 +112,3 @@ pub async fn save_changes(changes: &[EventChange]) -> anyhow::Result<()> {
}
Ok(())
}
pub fn target_end_date() -> String {
let now = Local::now().naive_local().date();
let future = now + chrono::Duration::days(90);
future.format("%Y-%m-%d").to_string()
}

View File

@@ -1,9 +1,19 @@
// src/economic/update.rs
use super::{scraper::*, storage::*, helpers::*, types::*};
use crate::config::Config;
use chrono::{Local, NaiveDate};
use crate::{config::Config, scraper::webdriver::ScrapeTask};
use crate::scraper::webdriver::ChromeDriverPool;
use chrono::{Local};
use std::sync::Arc;
pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> anyhow::Result<()> {
/// Runs the full update for economic data, using the provided ChromeDriver pool.
///
/// # Arguments
/// * `config` - The application configuration.
/// * `pool` - Shared pool of ChromeDriver instances for scraping.
///
/// # Errors
/// Returns an error if scraping, loading, or saving fails.
pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
let today_str = chrono::Local::now().date_naive().format("%Y-%m-%d").to_string();
let end_date = config.target_end_date();
@@ -26,34 +36,66 @@ pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> an
println!("Scraping economic events: {}{}", start_date, end_date);
let mut current = start_date;
let mut total_changes = 0;
// Pass the pool to the scraping function
let new_events_all = scrape_all_economic_events(&start_date, &end_date, pool).await?;
while current <= end_date {
set_date_range(client, &current, &end_date).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let new_events = extract_events(client).await?;
if new_events.is_empty() { break; }
let result = process_batch(&new_events, &mut events, &today_str);
total_changes += result.changes.len();
// Process all at once or in batches
let result = process_batch(&new_events_all, &mut events, &today_str);
let total_changes = result.changes.len();
save_changes(&result.changes).await?;
save_optimized_chunks(events).await?;
println!("Economic update complete — {} changes detected", total_changes);
Ok(())
}
/// Scrapes all economic events from start to end date using a dedicated ScrapeTask with the provided pool.
///
/// This function creates a ScrapeTask to navigate to the Finanzen.net page, prepare it,
/// and then loop through date ranges to extract events.
///
/// # Arguments
/// * `start` - Start date in YYYY-MM-DD.
/// * `end` - End date in YYYY-MM-DD.
/// * `pool` - Shared pool of ChromeDriver instances.
///
/// # Returns
/// A vector of all extracted EconomicEvent structs.
///
/// # Errors
/// Returns an error if task execution fails or extraction issues occur.
pub async fn scrape_all_economic_events(start: &str, end: &str, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<Vec<EconomicEvent>> {
let url = "https://www.finanzen.net/termine/wirtschaftsdaten/".to_string();
let start_clone = start.to_string();
let end_clone = end.to_string();
let task = ScrapeTask::new(url, move |client| async move {
goto_and_prepare(&client).await?;
let mut all_events = Vec::new();
let mut current = start_clone;
while current <= end_clone {
set_date_range(&client, &current, &end_clone).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let new_events = extract_events(&client).await?;
if new_events.is_empty() { break; }
all_events.extend(new_events.clone());
let next = new_events.iter()
.filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok())
.max()
.and_then(|d| d.succ_opt())
.map(|d| d.format("%Y-%m-%d").to_string())
.unwrap_or(end_date.clone());
.unwrap_or(end_clone.clone());
if next > end_date { break; }
if next > end_clone { break; }
current = next;
}
Ok(all_events)
});
save_optimized_chunks(events).await?;
println!("Economic update complete — {} changes detected", total_changes);
Ok(())
// Use the pool for execution
task.execute_with_pool(pool).await
}
pub fn process_batch(

View File

@@ -3,69 +3,41 @@ mod economic;
mod corporate;
mod config;
mod util;
mod scraper;
use fantoccini::{ClientBuilder};
use serde_json::{Map, Value};
use tokio::signal;
use anyhow::Result;
use config::Config;
use scraper::webdriver::ChromeDriverPool;
use std::sync::Arc;
/// The entry point of the application.
///
/// This function loads the configuration, initializes a shared ChromeDriver pool,
/// and sequentially runs the full updates for corporate and economic data.
/// Sequential execution helps prevent resource exhaustion from concurrent
/// chromedriver instances and avoids spamming the target websites with too many requests.
///
/// # Errors
///
/// Returns an error if configuration loading fails, pool initialization fails,
/// or if either update function encounters an issue (e.g., network errors,
/// scraping failures, or chromedriver spawn failures like "program not found").
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// === Ensure data directories exist ===
util::ensure_data_dirs().await?;
async fn main() -> Result<()> {
let config = Config::load().map_err(|err| {
println!("Failed to load Config .env: {}", err);
err
})?;
// === Load configuration ===
let config = config::Config::default();
// Initialize the shared ChromeDriver pool once
let pool_size = config.max_parallel_tasks;
let pool = Arc::new(ChromeDriverPool::new(pool_size).await?);
// === Start ChromeDriver ===
let mut child = std::process::Command::new("chromedriver-win64/chromedriver.exe")
.args(["--port=9515"]) // Level 3 = minimal logs
.spawn()?;
// Run economic update first, passing the shared pool
economic::run_full_update(&config, &pool).await?;
// Build capabilities to hide infobar + enable full rendering
let port = 9515;
let caps_value = serde_json::json!({
"goog:chromeOptions": {
"args": [
//"--headless",
"--disable-gpu",
"--disable-notifications",
"--disable-popup-blocking",
"--disable-blink-features=AutomationControlled"
],
"excludeSwitches": ["enable-automation"]
}
});
// Then run corporate update, passing the shared pool
corporate::run_full_update(&config, &pool).await?;
let caps_map: Map<String, Value> = caps_value.as_object()
.expect("Capabilities should be a JSON object")
.clone();
let mut client = ClientBuilder::native()
.capabilities(caps_map)
.connect(&format!("http://localhost:{}", port))
.await?;
// Graceful shutdown
let client_clone = client.clone();
tokio::spawn(async move {
signal::ctrl_c().await.unwrap();
client_clone.close().await.ok();
std::process::exit(0);
});
// === Economic Calendar Update ===
println!("Updating Economic Calendar (High Impact Only)");
economic::goto_and_prepare(&client).await?;
economic::run_full_update(&client, &config).await?;
// === Corporate Earnings Update ===
println!("\nUpdating Corporate Earnings");
corporate::run_full_update(&client, &config).await?;
// === Cleanup ===
client.close().await?;
child.kill()?;
println!("\nAll data updated successfully!");
Ok(())
}

1
src/scraper/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod webdriver;

269
src/scraper/webdriver.rs Normal file
View File

@@ -0,0 +1,269 @@
// src/scraper/webdriver.rs
use anyhow::{anyhow, Context, Result};
use fantoccini::{Client, ClientBuilder};
use serde_json::{Map, Value};
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::{Mutex, Semaphore};
use tokio::time::{Duration, sleep, timeout};
use std::pin::Pin;
/// Manages a pool of ChromeDriver instances for parallel scraping.
///
/// This struct maintains multiple ChromeDriver processes and allows controlled
/// concurrent access via a semaphore. Instances are reused across tasks to avoid
/// the overhead of spawning new processes.
pub struct ChromeDriverPool {
instances: Vec<Arc<Mutex<ChromeInstance>>>,
semaphore: Arc<Semaphore>,
}
impl ChromeDriverPool {
/// Creates a new pool with the specified number of ChromeDriver instances.
///
/// # Arguments
/// * `pool_size` - Number of concurrent ChromeDriver instances to maintain
pub async fn new(pool_size: usize) -> Result<Self> {
let mut instances = Vec::with_capacity(pool_size);
println!("Initializing ChromeDriver pool with {} instances...", pool_size);
for i in 0..pool_size {
match ChromeInstance::new().await {
Ok(instance) => {
println!(" ✓ Instance {} ready", i + 1);
instances.push(Arc::new(Mutex::new(instance)));
}
Err(e) => {
eprintln!(" ✗ Failed to create instance {}: {}", i + 1, e);
// Clean up already created instances
drop(instances);
return Err(e);
}
}
}
Ok(Self {
instances,
semaphore: Arc::new(Semaphore::new(pool_size)),
})
}
/// Executes a scrape task using an available instance from the pool.
pub async fn execute<T, F, Fut>(&self, url: String, parse: F) -> Result<T>
where
T: Send + 'static,
F: FnOnce(Client) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
{
// Acquire semaphore permit
let _permit = self.semaphore.acquire().await
.map_err(|_| anyhow!("Semaphore closed"))?;
// Find an available instance (round-robin or first available)
let instance = self.instances[0].clone(); // Simple: use first, could be round-robin
let mut guard = instance.lock().await;
// Create a new session for this task
let client = guard.new_session().await?;
// Release lock while we do the actual scraping
drop(guard);
// Navigate and parse
client.goto(&url).await.context("Failed to navigate")?;
let result = timeout(Duration::from_secs(60), parse(client))
.await
.context("Parse function timed out after 60s")??;
Ok(result)
}
pub fn get_number_of_instances (&self) -> usize {
self.instances.len()
}
}
/// Represents a single instance of chromedriver process.
pub struct ChromeInstance {
process: Child,
base_url: String,
}
impl ChromeInstance {
/// Creates a new ChromeInstance by spawning chromedriver with random port.
///
/// This spawns `chromedriver --port=0` to avoid port conflicts, reads stdout to extract
/// the listening address, and waits for the success message. If timeout occurs or
/// spawning fails, returns an error with context.
///
/// # Errors
///
/// Returns an error if chromedriver fails to spawn (e.g., not in PATH, version mismatch),
/// if the process exits early, or if the address/success message isn't found within 30s.
pub async fn new() -> Result<Self> {
let mut command = Command::new("chromedriver-win64/chromedriver.exe");
command
.arg("--port=0") // Use random available port to support pooling
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut process = command
.spawn()
.context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?;
let mut stdout = BufReader::new(
process.stdout.take().context("Failed to capture stdout")?
).lines();
let mut stderr = BufReader::new(
process.stderr.take().context("Failed to capture stderr")?
).lines();
let start_time = std::time::Instant::now();
let mut address: Option<String> = None;
let mut success = false;
// Log stderr in background for debugging
tokio::spawn(async move {
while let Ok(Some(line)) = stderr.next_line().await {
eprintln!("ChromeDriver stderr: {}", line);
}
});
// Wait for address and success (up to 30s)
while start_time.elapsed() < Duration::from_secs(30) {
if let Ok(Ok(Some(line))) =
timeout(Duration::from_secs(1), stdout.next_line()).await
{
if let Some(addr) = parse_chromedriver_address(&line) {
address = Some(addr.to_string());
}
if line.contains("ChromeDriver was started successfully") {
success = true;
}
if let (Some(addr), true) = (&address, success) {
return Ok(Self {
process,
base_url: addr.clone(),
});
}
}
sleep(Duration::from_millis(100)).await;
}
// Cleanup on failure
let _ = process.kill().await;
Err(anyhow!("Timeout: ChromeDriver did not start within 30 seconds. Check version match with Chrome browser and system resources."))
}
/// Creates a new browser session (client) from this ChromeDriver instance.
/// Each session is independent and can be closed without affecting the driver.
pub async fn new_session(&self) -> Result<Client> {
ClientBuilder::native()
.capabilities(Self::chrome_args())
.connect(&self.base_url)
.await
.context("Failed to create new session")
}
fn chrome_args() -> Map<String, Value> {
let args = serde_json::json!({
"goog:chromeOptions": {
"args": [
"--headless=new",
"--disable-gpu",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-infobars",
"--disable-extensions",
"--disable-popup-blocking",
"--disable-notifications",
"--disable-logging",
"--disable-autofill",
"--disable-features=TranslateUI,OptimizationGuideModelDownloading",
"--window-size=1920,1080",
"--disable-blink-features=AutomationControlled",
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
],
"excludeSwitches": ["enable-logging", "enable-automation"],
"useAutomationExtension": false,
"prefs": {
"profile.default_content_setting_values.notifications": 2
}
}
});
args.as_object()
.expect("Capabilities should be a JSON object")
.clone()
}
}
/// Parses the ChromeDriver address from a log line.
///
/// Looks for the "Starting ChromeDriver ... on port XXXX" line and extracts the port.
/// Returns `Some("http://localhost:XXXX")` if found, else `None`.
fn parse_chromedriver_address(line: &str) -> Option<String> {
if line.contains("Starting ChromeDriver") {
if let Some(port_str) = line.split("on port ").nth(1) {
if let Some(port) = port_str.split_whitespace().next() {
if port.parse::<u16>().is_ok() {
return Some(format!("http://localhost:{}", port));
}
}
}
}
// Fallback for other formats (e.g., explicit port mentions)
for word in line.split_whitespace() {
if let Ok(port) = word.trim_matches(|c: char| !c.is_numeric()).parse::<u16>() {
if port > 1024 && port < 65535 && line.to_lowercase().contains("port") {
return Some(format!("http://localhost:{}", port));
}
}
}
None
}
impl Drop for ChromeInstance {
fn drop(&mut self) {
let _ = self.process.start_kill();
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
/// Simplified task execution - now uses the pool pattern.
///
/// For backwards compatibility with existing code.
pub struct ScrapeTask<T> {
url: String,
parse: Box<dyn FnOnce(Client) -> Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>> + Send>,
}
impl<T: Send + 'static> ScrapeTask<T> {
pub fn new<F, Fut>(url: String, parse: F) -> Self
where
F: FnOnce(Client) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
{
Self {
url,
parse: Box::new(move |client| Box::pin(parse(client))),
}
}
/// Executes using a provided pool (more efficient for multiple tasks).
pub async fn execute_with_pool(self, pool: &ChromeDriverPool) -> Result<T> {
let url = self.url;
let parse = self.parse;
pool.execute(url, move |client| async move {
(parse)(client).await
}).await
}
}

View File

@@ -3,7 +3,7 @@ use tokio::fs;
use std::path::Path;
/// Create the required data folders if they do not exist yet.
pub async fn ensure_data_dirs() -> anyhow::Result<()> {
pub async fn _ensure_data_dirs() -> anyhow::Result<()> {
let dirs = [
"economic_events",
"economic_event_changes",