Compare commits

...

5 Commits

25 changed files with 2062 additions and 1057 deletions

31
Cargo.lock generated
View File

@@ -678,6 +678,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"toml",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"yfinance-rs", "yfinance-rs",
@@ -2671,6 +2672,15 @@ dependencies = [
"serde_core", "serde_core",
] ]
[[package]]
name = "serde_spanned"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e24345aa0fe688594e73770a5f6d1b216508b4f93484c0026d521acd30134392"
dependencies = [
"serde_core",
]
[[package]] [[package]]
name = "serde_urlencoded" name = "serde_urlencoded"
version = "0.7.1" version = "0.7.1"
@@ -3116,6 +3126,21 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "toml"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dc8b1fb61449e27716ec0e1bdf0f6b8f3e8f6b05391e8497b8b6d7804ea6d8"
dependencies = [
"indexmap",
"serde_core",
"serde_spanned",
"toml_datetime",
"toml_parser",
"toml_writer",
"winnow",
]
[[package]] [[package]]
name = "toml_datetime" name = "toml_datetime"
version = "0.7.3" version = "0.7.3"
@@ -3146,6 +3171,12 @@ dependencies = [
"winnow", "winnow",
] ]
[[package]]
name = "toml_writer"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2"
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.5.2" version = "0.5.2"

View File

@@ -34,6 +34,7 @@ rand = "0.9.2"
# Environment handling # Environment handling
dotenvy = "0.15" dotenvy = "0.15"
toml = "0.9.8"
# Date & time # Date & time
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }

View File

@@ -1,46 +0,0 @@
{
"CHF": [
0.808996035919424,
"2025-11-25"
],
"JPY": [
0.0064,
"2025-11-25"
],
"INR": [
89.28571428571429,
"2025-11-25"
],
"GBp": [
0.7603406326034063,
"2025-11-25"
],
"AUD": [
1.5463120457708364,
"2025-11-25"
],
"SAR": [
3.750937734433609,
"2025-11-25"
],
"TWD": [
31.446540880503143,
"2025-11-25"
],
"CNY": [
7.087172218284904,
"2025-11-25"
],
"HKD": [
7.776049766718508,
"2025-11-25"
],
"CAD": [
1.4110342881332016,
"2025-11-25"
],
"EUR": [
0.8649022660439372,
"2025-11-25"
]
}

View File

@@ -1,14 +1,23 @@
// src/config.rs use anyhow::{Context, Result};
#[derive(Debug, Clone)] use chrono::{self};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config { pub struct Config {
// Economic calendar start (usually the earliest available on finanzen.net) // Economic calendar start (usually the earliest available on finanzen.net)
pub economic_start_date: String, // e.g. "2007-02-13" pub economic_start_date: String, // e.g. "2007-02-13"
// Corporate earnings & price history start // Corporate earnings & price history start
pub corporate_start_date: String, // e.g. "2000-01-01" or "2010-01-01" pub corporate_start_date: String, // e.g. "2000-01-01" or "2010-01-01"
// How far into the future we scrape economic events // How far into the future we scrape economic events
pub economic_lookahead_months: u32, // default: 3 pub economic_lookahead_months: u32, // default: 3
/// Maximum number of parallel scraping tasks (default: 10).
/// This limits concurrency to protect system load and prevent website spamming.
#[serde(default = "default_max_parallel")]
pub max_parallel_tasks: usize,
}
fn default_max_parallel() -> usize {
10
} }
impl Default for Config { impl Default for Config {
@@ -17,11 +26,52 @@ impl Default for Config {
economic_start_date: "2007-02-13".to_string(), economic_start_date: "2007-02-13".to_string(),
corporate_start_date: "2010-01-01".to_string(), corporate_start_date: "2010-01-01".to_string(),
economic_lookahead_months: 3, economic_lookahead_months: 3,
max_parallel_tasks: default_max_parallel(),
} }
} }
} }
impl Config { impl Config {
/// Loads the configuration from environment variables using dotenvy.
///
/// This function loads a `.env` file if present (via `dotenvy::dotenv()`),
/// then retrieves each configuration value from environment variables.
/// If a variable is missing, it falls back to the default value.
/// Variable names are uppercase with underscores (e.g., ECONOMIC_START_DATE).
///
/// # Returns
/// The loaded Config on success.
///
/// # Errors
/// Returns an error if parsing fails (e.g., invalid integer for lookahead months).
pub fn load() -> Result<Self> {
// Load .env file if it exists; ignore if not found (dotenvy::dotenv returns Ok if no file)
let _ = dotenvy::dotenv().context("Failed to load .env file (optional)")?;
let economic_start_date = dotenvy::var("ECONOMIC_START_DATE")
.unwrap_or_else(|_| "2007-02-13".to_string());
let corporate_start_date = dotenvy::var("CORPORATE_START_DATE")
.unwrap_or_else(|_| "2010-01-01".to_string());
let economic_lookahead_months: u32 = dotenvy::var("ECONOMIC_LOOKAHEAD_MONTHS")
.unwrap_or_else(|_| "3".to_string())
.parse()
.context("Failed to parse ECONOMIC_LOOKAHEAD_MONTHS as u32")?;
let max_parallel_tasks: usize = dotenvy::var("MAX_PARALLEL_TASKS")
.unwrap_or_else(|_| "10".to_string())
.parse()
.context("Failed to parse MAX_PARALLEL_TASKS as usize")?;
Ok(Self {
economic_start_date,
corporate_start_date,
economic_lookahead_months,
max_parallel_tasks,
})
}
pub fn target_end_date(&self) -> String { pub fn target_end_date(&self) -> String {
let now = chrono::Local::now().naive_local().date(); let now = chrono::Local::now().naive_local().date();
let future = now + chrono::Duration::days(30 * self.economic_lookahead_months as i64); let future = now + chrono::Duration::days(30 * self.economic_lookahead_months as i64);

141
src/corporate/figi.md Normal file
View File

@@ -0,0 +1,141 @@
# OpenFIGI API Summary: Mapping, Search, and Filter Endpoints
This Markdown summary covers the **API Guidelines**, **Request Format**, and **Sample Request -> Sample Response** for the key OpenFIGI endpoints: Mapping, Search, and Filter. Information is based on the official documentation as of December 1, 2025.
## Mapping Endpoint
### API Guidelines
- **Endpoint**: `POST /v3/mapping`
- **Purpose**: Map third-party identifiers (e.g., ISIN, TICKER) to FIGIs (Financial Instrument Global Identifiers).
- **Request Format**: JSON array of objects (mapping jobs). Each job requires `idType` and `idValue`. Optional filters: `exchCode`, `micCode`, `currency`, `marketSecDes`, `securityType`, `securityType2`, `includeUnlistedEquities`, `optionType`, `strike`, `contractSize`, `coupon`, `expiration`, `maturity`, `stateCode`.
- **Key Parameters**:
- `idType` (String, Required): Identifier type (e.g., `ID_BB_GLOBAL`, `TICKER`, `ID_ISIN`).
- `idValue` (String/Number, Required): The identifier value.
- `exchCode` (String, Optional): Exchange code (mutually exclusive with `micCode`).
- `micCode` (String, Optional): Market Identification Code (mutually exclusive with `exchCode`).
- Range parameters (e.g., `strike`, `expiration`): Arrays like `[a, b]` or `[a, null]` for intervals.
- `includeUnlistedEquities` (Boolean, Optional): Defaults to `false`.
- **Limits**:
- Without API key: Max 5 jobs per request.
- With API key: Max 100 jobs per request.
- **Rate Limits**:
- Without API key: 25 requests/minute.
- With API key: 25 requests/6 seconds.
- **Authentication**: Include `X-OPENFIGI-APIKEY` header for higher limits.
### Sample Request
```json
[
{ "idType": "ID_BB_GLOBAL", "idValue": "BBG000BLNNH6" },
{ "idType": "TICKER", "idValue": "IBM", "exchCode": "US" },
{ "idType": "BASE_TICKER", "idValue": "TSLA 10 C100", "securityType2": "Option", "expiration": ["2018-10-01", "2018-12-01"] }
]
```
### Sample Response
```json
[{
"data": [{
"figi": "BBG000BLNNH6",
"securityType": "Common Stock",
"marketSector": "Equity",
"ticker": "IBM",
"name": "INTL BUSINESS MACHINES CORP",
"exchCode": "US",
"shareClassFIGI": "BBG001S5S399",
"compositeFIGI": "BBG000BLNNH6",
"securityType2": "Common Stock",
"securityDescription": "IBM"
}]
}]
```
## Search Endpoint
### API Guidelines
- **Endpoint**: `POST /v3/search`
- **Purpose**: Keyword-based search for FIGIs with optional filters; supports pagination.
- **Request Format**: JSON object with optional `query` (keywords) and filters (same as Mapping). Use `start` for pagination.
- **Key Parameters**:
- `query` (String, Optional): Search keywords (e.g., "ibm").
- `start` (String, Optional): Pagination token from previous `next` field.
- All Mapping filters supported (e.g., `exchCode`, `securityType`, `optionType`).
- **Limits**:
- Max results: 15,000.
- Max per page: 100.
- Max pages: 150.
- **Rate Limits**:
- Without API key: 5 requests/minute.
- With API key: 20 requests/minute.
- **Pagination**: Response includes `next` token; use as `start` in next request.
- **Authentication**: Same as Mapping.
### Sample Request
```json
{
"query": "ibm",
"exchCode": "US"
}
```
### Sample Response
```json
{
"data": [
{
"figi": "BBG000BLNNH6",
"name": "INTL BUSINESS MACHINES CORP",
"ticker": "IBM",
"exchCode": "US",
"compositeFIGI": "BBG000BLNNH6",
"securityType": "Common Stock",
"marketSector": "Equity",
"shareClassFIGI": "BBG001S5S399",
"securityType2": "Common Stock",
"securityDescription": "IBM"
}
],
"next": "QW9JSVFEOFMrQ3hDUWtjd01ERTRTMHhhUXpBPSAx.3AG33VCsv54AsUl5fGHehSytWPuWLJxf0t8VL3YXuJh="
}
```
## Filter Endpoint
### API Guidelines
- **Endpoint**: `POST /v3/filter`
- **Purpose**: Filter-based search for FIGIs (no keywords required); results sorted alphabetically by FIGI, includes total count.
- **Request Format**: JSON object with optional `query` and filters (same as Search/Mapping). Use `start` for pagination.
- **Key Parameters**: Identical to Search (`query`, `start`, and all Mapping filters).
- **Limits**: Same as Search (15,000 max results, 100/page, 150 pages).
- **Rate Limits**: Same as Search (5/min without key, 20/min with key).
- **Pagination**: Same as Search; response includes `total` count.
- **Authentication**: Same as Mapping.
### Sample Request
```json
{
"exchCode": "US"
}
```
### Sample Response
```json
{
"data": [
{
"figi": "BBG000BLNNH6",
"name": "INTL BUSINESS MACHINES CORP",
"ticker": "IBM",
"exchCode": "US",
"compositeFIGI": "BBG000BLNNH6",
"securityType": "Common Stock",
"marketSector": "Equity",
"shareClassFIGI": "BBG001S5S399",
"securityType2": "Common Stock",
"securityDescription": "IBM"
}
],
"next": "QW9JSVFEOFMrQ3hDUWtjd01ERTRTMHhhUXpBPSAx.3AG33VCsv54AsUl5fGHehSytWPuWLJxf0t8VL3YXuJh=",
"total": 29930312
}
```

View File

@@ -8,5 +8,4 @@ pub mod aggregation;
pub mod fx; pub mod fx;
pub mod openfigi; pub mod openfigi;
pub use types::*;
pub use update::run_full_update; pub use update::run_full_update;

File diff suppressed because it is too large Load Diff

View File

@@ -1,28 +1,39 @@
// src/corporate/scraper.rs // src/corporate/scraper.rs
use super::{types::*, helpers::*}; use super::{types::*, helpers::*, openfigi::*};
use csv::ReaderBuilder; //use crate::corporate::openfigi::OpenFigiClient;
use crate::{scraper::webdriver::*};
use fantoccini::{Client, Locator}; use fantoccini::{Client, Locator};
use scraper::{Html, Selector}; use scraper::{Html, Selector};
use chrono::{DateTime, Duration, NaiveDate, Timelike, Utc}; use chrono::{DateTime, Duration, NaiveDate, Utc};
use tokio::{time::{Duration as TokioDuration, sleep}}; use tokio::{time::{Duration as TokioDuration, sleep}};
use reqwest::Client as HttpClient; use reqwest::Client as HttpClient;
use serde_json::Value; use serde_json::{json, Value};
use zip::ZipArchive; use zip::ZipArchive;
use std::fs::File; use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap}; use std::io::{Read};
use std::io::{Read, BufReader}; use anyhow::{anyhow, Result};
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"; const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36";
/// Discover all exchanges where this ISIN trades by querying Yahoo Finance /// Discover all exchanges where this ISIN trades by querying Yahoo Finance and enriching with OpenFIGI API calls.
pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> anyhow::Result<Vec<TickerInfo>> { ///
/// # Arguments
/// * `isin` - The ISIN to search for.
/// * `known_ticker` - A known ticker symbol for fallback or initial check.
///
/// # Returns
/// A vector of FigiInfo structs containing enriched data from API calls.
///
/// # Errors
/// Returns an error if HTTP requests fail, JSON parsing fails, or OpenFIGI API responds with an error.
pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> anyhow::Result<Vec<FigiInfo>> {
println!(" Discovering exchanges for ISIN {}", isin); println!(" Discovering exchanges for ISIN {}", isin);
let mut discovered_tickers = Vec::new(); let mut potential: Vec<(String, PrimaryInfo)> = Vec::new();
// Try the primary ticker first // Try the primary ticker first
if let Ok(info) = check_ticker_exists(known_ticker).await { if let Ok(info) = check_ticker_exists(known_ticker).await {
discovered_tickers.push(info); potential.push((known_ticker.to_string(), info));
} }
// Search for ISIN directly on Yahoo to find other listings // Search for ISIN directly on Yahoo to find other listings
@@ -31,14 +42,14 @@ pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> any
isin isin
); );
match HttpClient::new() let resp = HttpClient::new()
.get(&search_url) .get(&search_url)
.header("User-Agent", USER_AGENT) .header("User-Agent", USER_AGENT)
.send() .send()
.await .await?;
{
Ok(resp) => { let json = resp.json::<Value>().await?;
if let Ok(json) = resp.json::<Value>().await {
if let Some(quotes) = json["quotes"].as_array() { if let Some(quotes) = json["quotes"].as_array() {
for quote in quotes { for quote in quotes {
// First: filter by quoteType directly from search results (faster rejection) // First: filter by quoteType directly from search results (faster rejection)
@@ -49,131 +60,249 @@ pub async fn discover_available_exchanges(isin: &str, known_ticker: &str) -> any
if let Some(symbol) = quote["symbol"].as_str() { if let Some(symbol) = quote["symbol"].as_str() {
// Avoid duplicates // Avoid duplicates
if discovered_tickers.iter().any(|t: &TickerInfo| t.ticker == symbol) { if potential.iter().any(|(s, _)| s == symbol) {
continue; continue;
} }
// Double-check with full quote data (some search results are misleading) // Double-check with full quote data (some search results are misleading)
match check_ticker_exists(symbol).await { if let Ok(info) = check_ticker_exists(symbol).await {
Ok(info) => { potential.push((symbol.to_string(), info));
println!(" Found equity listing: {} on {} ({})",
symbol, info.exchange_mic, info.currency);
discovered_tickers.push(info);
} }
Err(e) => {
// Most common: it's not actually equity or not tradable
// println!(" Rejected {}: {}", symbol, e);
continue;
}
}
// Be respectful to Yahoo
sleep(TokioDuration::from_millis(120)).await;
}
}
}
}
}
Err(e) => println!(" Search API error: {}", e),
}
// Also try common exchange suffixes for the base ticker
if let Some(base) = known_ticker.split('.').next() {
let suffixes = vec![
"", // US
".L", // London
".DE", // Frankfurt/XETRA
".PA", // Paris
".AS", // Amsterdam
".MI", // Milan
".SW", // Switzerland
".T", // Tokyo
".HK", // Hong Kong
".SS", // Shanghai
".SZ", // Shenzhen
".TO", // Toronto
".AX", // Australia
".SA", // Brazil
".MC", // Madrid
".BO", // Bombay
".NS", // National Stock Exchange India
];
for suffix in suffixes {
let test_ticker = format!("{}{}", base, suffix);
// Skip if already found
if discovered_tickers.iter().any(|t| t.ticker == test_ticker) {
continue;
}
if let Ok(info) = check_ticker_exists(&test_ticker).await {
discovered_tickers.push(info);
sleep(TokioDuration::from_millis(100)).await;
} }
} }
} }
println!(" Found {} tradable exchanges", discovered_tickers.len()); if potential.is_empty() {
Ok(discovered_tickers) return Ok(vec![]);
} }
/// Check if a ticker exists and get its exchange/currency info // Enrich with OpenFIGI API
async fn check_ticker_exists(ticker: &str) -> anyhow::Result<TickerInfo> { let client = OpenFigiClient::new()?;
let url = format!(
"https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price",
ticker
);
let resp = HttpClient::new() let mut discovered_figis = Vec::new();
.get(&url)
.header("User-Agent", USER_AGENT) if !client.has_key() {
// Fallback without API key - create FigiInfo with default/empty fields
for (symbol, info) in potential {
println!(" Found equity listing: {} on {} ({}) - no FIGI (fallback mode)", symbol, info.exchange_mic, info.currency);
let figi_info = FigiInfo {
isin: info.isin,
figi: String::new(),
name: info.name,
ticker: symbol,
mic_code: info.exchange_mic,
currency: info.currency,
compositeFIGI: String::new(),
securityType: String::new(),
marketSector: String::new(),
shareClassFIGI: String::new(),
securityType2: String::new(),
securityDescription: String::new(),
};
discovered_figis.push(figi_info);
}
return Ok(discovered_figis);
}
// With API key, batch the mapping requests
let chunk_size = 100;
for chunk in potential.chunks(chunk_size) {
let mut jobs = vec![];
for (symbol, info) in chunk {
jobs.push(json!({
"idType": "TICKER",
"idValue": symbol,
"micCode": info.exchange_mic,
"marketSecDes": "Equity",
}));
}
let resp = client.get_figi_client()
.post("https://api.openfigi.com/v3/mapping")
.header("Content-Type", "application/json")
.json(&jobs)
.send() .send()
.await?; .await?;
let json: Value = resp.json().await?; if !resp.status().is_success() {
return Err(anyhow::anyhow!("OpenFIGI mapping failed with status: {}", resp.status()));
if let Some(result) = json["quoteSummary"]["result"].as_array() {
if result.is_empty() {
return Err(anyhow::anyhow!("No quote data for {}", ticker));
} }
let quote = &result[0]["price"]; let parsed: Vec<Value> = resp.json().await?;
// CRITICAL: Only accept EQUITY securities for (i, item) in parsed.iter().enumerate() {
let (symbol, info) = &chunk[i];
if let Some(data) = item["data"].as_array() {
if let Some(entry) = data.first() {
let market_sec = entry["marketSector"].as_str().unwrap_or("");
if market_sec != "Equity" {
continue;
}
println!(" Found equity listing: {} on {} ({}) - FIGI: {}", symbol, info.exchange_mic, info.currency, entry["figi"]);
let figi_info = FigiInfo {
isin: info.isin.clone(),
figi: entry["figi"].as_str().unwrap_or("").to_string(),
name: entry["name"].as_str().unwrap_or(&info.name).to_string(),
ticker: symbol.clone(),
mic_code: info.exchange_mic.clone(),
currency: info.currency.clone(),
compositeFIGI: entry["compositeFIGI"].as_str().unwrap_or("").to_string(),
securityType: entry["securityType"].as_str().unwrap_or("").to_string(),
marketSector: market_sec.to_string(),
shareClassFIGI: entry["shareClassFIGI"].as_str().unwrap_or("").to_string(),
securityType2: entry["securityType2"].as_str().unwrap_or("").to_string(),
securityDescription: entry["securityDescription"].as_str().unwrap_or("").to_string(),
};
discovered_figis.push(figi_info);
} else {
println!(" No data returned for ticker {} on MIC {}", symbol, info.exchange_mic);
}
} else if let Some(error) = item["error"].as_str() {
println!(" OpenFIGI error for ticker {}: {}", symbol, error);
}
}
// Respect rate limit (6 seconds between requests with key)
sleep(TokioDuration::from_secs(6)).await;
}
Ok(discovered_figis)
}
/// Check if a ticker exists on Yahoo Finance and return core metadata.
///
/// This function calls the public Yahoo Finance quoteSummary endpoint and extracts:
/// - ISIN (when available)
/// - Company name
/// - Exchange MIC code
/// - Trading currency
///
/// It strictly filters to only accept **equity** securities.
///
/// # Arguments
/// * `ticker` - The ticker symbol to validate (e.g., "AAPL", "7203.T", "BMW.DE")
///
/// # Returns
/// `Ok(PrimaryInfo)` on success, `Err` if ticker doesn't exist, is not equity, or data is malformed.
///
/// # Errors
/// - Ticker not found
/// - Not an equity (ETF, bond, etc.)
/// - Missing critical fields
/// - Network or JSON parsing errors
pub async fn check_ticker_exists(ticker: &str) -> anyhow::Result<PrimaryInfo> {
let url = format!(
"https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=price%2CassetProfile",
ticker
);
let resp = match HttpClient::new()
.get(&url)
.header("User-Agent", USER_AGENT)
.send()
.await
{
Ok(resp) => resp,
Err(err) => {
return Err(anyhow::anyhow!(
"Failed to reach Yahoo Finance for ticker {}: {}",
ticker,
err
));
}
};
if !resp.status().is_success() {
return Err(anyhow::anyhow!("Yahoo returned HTTP {} for ticker {}", resp.status(), ticker));
}
let json: Value = match resp
.json()
.await {
Ok(resp) => resp,
Err(err) => {
return Err(anyhow::anyhow!(
"Failed to parse JSON response from Yahoo Finance {}: {}",
ticker,
err
));
}
};
let result_array = json["quoteSummary"]["result"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing 'quoteSummary.result' in response"))?;
if result_array.is_empty() || result_array[0].is_null() {
return Err(anyhow::anyhow!("No quote data returned for ticker {}", ticker));
}
let quote = &result_array[0]["price"];
let profile = &result_array[0]["assetProfile"];
// === 1. Must be EQUITY ===
let quote_type = quote["quoteType"] let quote_type = quote["quoteType"]
.as_str() .as_str()
.unwrap_or("") .unwrap_or("")
.to_uppercase(); .to_ascii_uppercase();
if quote_type != "EQUITY" { if quote_type != "EQUITY" {
// Optional: debug what was filtered
println!(" → Skipping {} (quoteType: {})", ticker, quote_type); println!(" → Skipping {} (quoteType: {})", ticker, quote_type);
return Err(anyhow::anyhow!("Not an equity: {}", quote_type)); return Err(anyhow::anyhow!("Not an equity security: {}", quote_type));
} }
let exchange = quote["exchange"].as_str().unwrap_or(""); // === 2. Extract basic info ===
let currency = quote["currency"].as_str().unwrap_or("USD"); let long_name = quote["longName"]
let short_name = quote["shortName"].as_str().unwrap_or(""); .as_str()
.or_else(|| quote["shortName"].as_str())
.unwrap_or(ticker)
.trim()
.to_string();
// Optional: extra sanity — make sure it's not a bond masquerading as equity let currency = quote["currency"]
if short_name.to_uppercase().contains("BOND") || .as_str()
short_name.to_uppercase().contains("NOTE") || .unwrap_or("USD")
short_name.to_uppercase().contains("DEBENTURE") { .to_string();
return Err(anyhow::anyhow!("Name suggests debt security"));
let exchange_mic = quote["exchange"]
.as_str()
.unwrap_or("")
.to_string();
if exchange_mic.is_empty() {
return Err(anyhow::anyhow!("Missing exchange MIC for ticker {}", ticker));
} }
if !exchange.is_empty() { // === 3. Extract ISIN (from assetProfile if available) ===
return Ok(TickerInfo { let isin = profile["isin"]
ticker: ticker.to_string(), .as_str()
exchange_mic: exchange.to_string(), .and_then(|s| if s.len() == 12 && s.chars().all(|c| c.is_ascii_alphanumeric()) { Some(s) } else { None })
currency: currency.to_string(), .unwrap_or("")
primary: false, .to_ascii_uppercase();
});
} // === 4. Final sanity check: reject obvious debt securities ===
let name_upper = long_name.to_ascii_uppercase();
if name_upper.contains(" BOND") ||
name_upper.contains(" NOTE") ||
name_upper.contains(" DEBENTURE") ||
name_upper.contains(" PREFERRED") && !name_upper.contains(" STOCK") {
return Err(anyhow::anyhow!("Security name suggests debt instrument: {}", long_name));
} }
Err(anyhow::anyhow!("Invalid or missing data for {}", ticker)) println!(
" → Valid equity: {} | {} | {} | ISIN: {}",
ticker,
long_name,
exchange_mic,
if isin.is_empty() { "N/A" } else { &isin }
);
Ok(PrimaryInfo {
isin,
name: long_name,
exchange_mic,
currency,
})
} }
/// Convert Yahoo's exchange name to MIC code (best effort) /// Convert Yahoo's exchange name to MIC code (best effort)
@@ -203,84 +332,150 @@ fn exchange_name_to_mic(name: &str) -> String {
}.to_string() }.to_string()
} }
pub async fn dismiss_yahoo_consent(client: &Client) -> anyhow::Result<()> { /// Fetches earnings events for a ticker using a dedicated ScrapeTask.
let script = r#" ///
(() => { /// This function creates and executes a ScrapeTask to navigate to the Yahoo Finance earnings calendar,
const agree = document.querySelector('button[name="agree"]'); /// reject cookies, and extract the events.
if (agree) { ///
agree.click(); /// # Arguments
return true; /// * `ticker` - The stock ticker symbol.
} ///
return false; /// # Returns
})() /// A vector of CompanyEvent structs on success.
"#; ///
/// # Errors
/// Returns an error if the task execution fails, e.g., chromedriver spawn or navigation issues.
pub async fn fetch_earnings_with_pool(
ticker: &str,
pool: &Arc<ChromeDriverPool>,
) -> anyhow::Result<Vec<CompanyEvent>> {
let ticker = ticker.to_string();
let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}", ticker);
for _ in 0..10 { let ticker_cloned = ticker.clone();
let done: bool = client.execute(script, vec![]).await?.as_bool().unwrap_or(false);
if done { pool.execute(url, move |client| {
break; let ticker = ticker_cloned.clone();
} Box::pin(async move {
sleep(TokioDuration::from_millis(500)).await; reject_yahoo_cookies(&client).await?;
} extract_earnings_events(&client, &ticker).await
Ok(()) })
}).await
} }
pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Result<Vec<CompanyEvent>> { /// Extracts earnings events from the currently loaded Yahoo Finance earnings calendar page.
let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}&offset=0&size=100", ticker); ///
client.goto(&url).await?; /// This function assumes the client is already navigated to the correct URL (e.g.,
dismiss_yahoo_consent(client).await?; /// https://finance.yahoo.com/calendar/earnings?symbol={ticker}) and cookies are handled.
///
/// It waits for the earnings table, extracts rows, parses cells into CompanyEvent structs,
/// and handles date parsing, float parsing, and optional fields.
///
/// # Arguments
/// * `client` - The fantoccini Client with the page loaded.
/// * `ticker` - The stock ticker symbol for the events.
///
/// # Returns
/// A vector of CompanyEvent on success.
///
/// # Errors
/// Returns an error if:
/// - Table or elements not found.
/// - Date or float parsing fails.
/// - WebDriver operations fail.
///
/// # Examples
///
/// ```no_run
/// use fantoccini::Client;
/// use crate::corporate::scraper::extract_earnings;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // Assume client is set up and navigated
/// let events = extract_earnings(&client, "AAPL").await?;
/// Ok(())
/// }
/// ```
pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result<Vec<CompanyEvent>> {
// Wait for the table to load
let table = client
.wait()
.for_element(Locator::Css(r#"table[data-test="cal-table"]"#))
.await
.map_err(|e| anyhow!("Failed to find earnings table: {}", e))?;
loop { // Find all rows in tbody
match client.find(Locator::XPath(r#"//button[contains(text(), 'Show More')]"#)).await { let rows = table
Ok(btn) => { .find_all(Locator::Css("tbody tr"))
btn.click().await?; .await
sleep(TokioDuration::from_secs(2)).await; .map_err(|e| anyhow!("Failed to find table rows: {}", e))?;
}
Err(_) => break, let mut events = Vec::with_capacity(rows.len());
}
for row in rows {
let cells = row
.find_all(Locator::Css("td"))
.await
.map_err(|e| anyhow!("Failed to find cells in row: {}", e))?;
if cells.len() < 5 {
continue; // Skip incomplete rows
} }
let html = client.source().await?; // Extract and parse date
let document = Html::parse_document(&html); let date_str = cells[0]
let row_sel = Selector::parse("table tbody tr").unwrap(); .text()
let mut events = Vec::new(); .await
.map_err(|e| anyhow!("Failed to get date text: {}", e))?;
let date = parse_yahoo_date(&date_str)
.map_err(|e| anyhow!("Failed to parse date '{}': {}", date_str, e))?
.format("%Y-%m-%d")
.to_string();
for row in document.select(&row_sel) { // Extract time, replace "Time Not Supplied" with empty
let cols: Vec<String> = row.select(&Selector::parse("td").unwrap()) let time = cells[1]
.map(|td| td.text().collect::<Vec<_>>().join(" ").trim().to_string()) .text()
.collect(); .await
if cols.len() < 6 { continue; } .map_err(|e| anyhow!("Failed to get time text: {}", e))?
.replace("Time Not Supplied", "");
let full_date = &cols[2]; // Extract period
let parts: Vec<&str> = full_date.split(" at ").collect(); let period = cells[2]
let raw_date = parts[0].trim(); .text()
let time_str = if parts.len() > 1 { parts[1].trim() } else { "" }; .await
.map_err(|e| anyhow!("Failed to get period text: {}", e))?;
let date = match parse_yahoo_date(raw_date) { // Parse EPS forecast
Ok(d) => d, let eps_forecast_str = cells[3]
Err(_) => continue, .text()
}; .await
.map_err(|e| anyhow!("Failed to get EPS forecast text: {}", e))?;
let eps_forecast = parse_float(&eps_forecast_str);
let eps_forecast = parse_float(&cols[3]); // Parse EPS actual
let eps_actual = if cols[4] == "-" { None } else { parse_float(&cols[4]) }; let eps_actual_str = cells[4]
.text()
.await
.map_err(|e| anyhow!("Failed to get EPS actual text: {}", e))?;
let eps_actual = parse_float(&eps_actual_str);
let surprise_pct = if let (Some(f), Some(a)) = (eps_forecast, eps_actual) { // Parse surprise % if available
if f.abs() > 0.001 { Some((a - f) / f.abs() * 100.0) } else { None } let surprise_pct = if cells.len() > 5 {
} else { None }; let surprise_str = cells[5]
.text()
let time = if time_str.contains("PM") { .await
"AMC".to_string() .map_err(|e| anyhow!("Failed to get surprise text: {}", e))?;
} else if time_str.contains("AM") { parse_float(&surprise_str)
"BMO".to_string()
} else { } else {
"".to_string() None
}; };
events.push(CompanyEvent { events.push(CompanyEvent {
ticker: ticker.to_string(), ticker: ticker.to_string(),
date: date.format("%Y-%m-%d").to_string(), date,
time, time,
period: "".to_string(), period,
eps_forecast, eps_forecast,
eps_actual, eps_actual,
revenue_forecast: None, revenue_forecast: None,
@@ -290,6 +485,12 @@ pub async fn fetch_earnings_history(client: &Client, ticker: &str) -> anyhow::Re
}); });
} }
if events.is_empty() {
eprintln!("Warning: No earnings events extracted for ticker {}", ticker);
} else {
println!("Extracted {} earnings events for {}", events.len(), ticker);
}
Ok(events) Ok(events)
} }
@@ -469,8 +670,8 @@ pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow
pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> { pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download"; let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download";
let zip_path = "data/isin_lei.zip"; let zip_path = "data/gleif/isin_lei.zip";
let csv_path = "data/isin_lei.csv"; let csv_path = "data/gleif/isin_lei.csv";
if let Err(e) = std::fs::create_dir_all("data") { if let Err(e) = std::fs::create_dir_all("data") {
println!("Failed to create data directory: {e}"); println!("Failed to create data directory: {e}");
@@ -613,57 +814,6 @@ pub async fn load_isin_lei_csv() -> anyhow::Result<HashMap<String, Vec<String>>>
Ok(map) Ok(map)
} }
pub async fn get_primary_isin_and_name(
client: &Client, // Pass your existing Selenium client
ticker: &str,
) -> anyhow::Result<PrimaryInfo> {
// Navigate to the actual quote page (always works)
let quote_url = format!("https://finance.yahoo.com/quote/{}", ticker);
client.goto(&quote_url).await?;
// Dismiss overlays/banners (your function + guce-specific)
reject_yahoo_cookies(client).await?;
// Wait for page to load (key data elements)
sleep(TokioDuration::from_millis(2000)).await;
// Get page HTML and parse
let html = client.source().await?;
let document = Html::parse_document(&html);
// Selectors for key fields (tested on real Yahoo pages Nov 2025)
let name_sel = Selector::parse("h1[data-testid='qsp-price-header']").unwrap_or_else(|_| Selector::parse("h1").unwrap());
let isin_sel = Selector::parse("[data-testid='qsp-symbol'] + div [data-field='isin']").unwrap_or_else(|_| Selector::parse("[data-field='isin']").unwrap());
let exchange_sel = Selector::parse("[data-testid='qsp-market'] span").unwrap_or_else(|_| Selector::parse(".TopNav__Exchange").unwrap());
let currency_sel = Selector::parse("[data-testid='qsp-price'] span:contains('USD')").unwrap_or_else(|_| Selector::parse(".TopNav__Currency").unwrap()); // Adjust for dynamic
let name_elem = document.select(&name_sel).next().map(|e| e.text().collect::<String>().trim().to_string());
let isin_elem = document.select(&isin_sel).next().map(|e| e.text().collect::<String>().trim().to_uppercase());
let exchange_elem = document.select(&exchange_sel).next().map(|e| e.text().collect::<String>().trim().to_string());
let currency_elem = document.select(&currency_sel).next().map(|e| e.text().collect::<String>().trim().to_string());
let name = name_elem.unwrap_or_else(|| ticker.to_string());
let isin = isin_elem.unwrap_or_default();
let exchange_mic = exchange_elem.unwrap_or_default();
let currency = currency_elem.unwrap_or_else(|| "USD".to_string());
// Validate ISIN
let valid_isin = if isin.len() == 12 && isin.chars().all(|c| c.is_alphanumeric()) {
isin
} else {
"".to_string()
};
println!(" → Scraped {}: {} | ISIN: {} | Exchange: {}", ticker, name, valid_isin, exchange_mic);
Ok(PrimaryInfo {
isin: valid_isin,
name,
exchange_mic,
currency,
})
}
pub async fn reject_yahoo_cookies(client: &Client) -> anyhow::Result<()> { pub async fn reject_yahoo_cookies(client: &Client) -> anyhow::Result<()> {
for _ in 0..10 { for _ in 0..10 {
let clicked: bool = client let clicked: bool = client

View File

@@ -1,5 +1,5 @@
// src/corporate/storage.rs // src/corporate/storage.rs
use super::{types::*, helpers::*, scraper::get_primary_isin_and_name}; use super::{types::*, helpers::*};
use crate::config; use crate::config;
use tokio::fs; use tokio::fs;
@@ -102,17 +102,6 @@ pub async fn save_prices_for_ticker(ticker: &str, timeframe: &str, mut prices: V
Ok(()) Ok(())
} }
pub async fn _load_companies() -> Result<Vec<CompanyMetadata>, anyhow::Error> {
let path = Path::new("src/data/companies.json");
if !path.exists() {
println!("Missing companies.json file at src/data/companies.json");
return Ok(vec![]);
}
let content = fs::read_to_string(path).await?;
let companies: Vec<CompanyMetadata> = serde_json::from_str(&content)?;
Ok(companies)
}
pub fn get_company_dir(lei: &str) -> PathBuf { pub fn get_company_dir(lei: &str) -> PathBuf {
PathBuf::from("corporate_prices").join(lei) PathBuf::from("corporate_prices").join(lei)
} }
@@ -132,20 +121,6 @@ pub async fn ensure_company_dirs(isin: &str) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
pub async fn save_company_metadata(company: &CompanyMetadata) -> anyhow::Result<()> {
let dir = get_company_dir(&company.lei);
fs::create_dir_all(&dir).await?;
let path = dir.join("metadata.json");
fs::write(&path, serde_json::to_string_pretty(company)?).await?;
Ok(())
}
pub async fn load_company_metadata(lei: &str) -> anyhow::Result<CompanyMetadata> {
let path = get_company_dir(lei).join("metadata.json");
let content = fs::read_to_string(path).await?;
Ok(serde_json::from_str(&content)?)
}
pub async fn save_available_exchanges(isin: &str, exchanges: Vec<AvailableExchange>) -> anyhow::Result<()> { pub async fn save_available_exchanges(isin: &str, exchanges: Vec<AvailableExchange>) -> anyhow::Result<()> {
let dir = get_company_dir(isin); let dir = get_company_dir(isin);
fs::create_dir_all(&dir).await?; fs::create_dir_all(&dir).await?;
@@ -210,18 +185,28 @@ pub async fn update_available_exchange(
} }
/// Add a newly discovered exchange before fetching /// Add a newly discovered exchange before fetching
///
/// # Arguments
/// * `isin` - The ISIN associated with the exchange.
/// * `figi_info` - The FigiInfo containing ticker, mic_code, and currency.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if loading or saving available exchanges fails.
pub async fn add_discovered_exchange( pub async fn add_discovered_exchange(
isin: &str, isin: &str,
ticker_info: &TickerInfo, figi_info: &FigiInfo,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut exchanges = load_available_exchanges(isin).await?; let mut exchanges = load_available_exchanges(isin).await?;
// Only add if not already present // Only add if not already present
if !exchanges.iter().any(|e| e.ticker == ticker_info.ticker) { if !exchanges.iter().any(|e| e.ticker == figi_info.ticker && e.exchange_mic == figi_info.mic_code) {
let new_entry = AvailableExchange::new( let new_entry = AvailableExchange::new(
ticker_info.ticker.clone(), figi_info.ticker.clone(),
ticker_info.exchange_mic.clone(), figi_info.mic_code.clone(),
ticker_info.currency.clone(), figi_info.currency.clone(),
); );
exchanges.push(new_entry); exchanges.push(new_entry);
save_available_exchanges(isin, exchanges).await?; save_available_exchanges(isin, exchanges).await?;

View File

@@ -1,3 +1,5 @@
use std::collections::HashMap;
// src/corporate/types.rs // src/corporate/types.rs
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -39,21 +41,78 @@ pub struct CompanyEventChange {
pub detected_at: String, pub detected_at: String,
} }
/// Figi Info based on API calls [https://www.openfigi.com/]
/// # Attributes
/// isin: ISIN belonging to this legal entity from lei
///
/// # Comments
/// Use Mapping the Object List onto Figi Properties
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TickerInfo { pub struct FigiInfo {
pub isin: String,
pub figi: String,
pub name: String,
pub ticker: String, pub ticker: String,
pub exchange_mic: String, pub mic_code: String,
pub currency: String, pub currency: String,
pub isin: String, // ISIN belonging to this legal entity (primary + ADR + GDR) pub compositeFIGI: String,
pub securityType: String,
pub marketSector: String,
pub shareClassFIGI: String,
pub securityType2: String,
pub securityDescription: String,
} }
/// Company Meta Data
/// # Attributes
/// * lei: Structuring the companies by legal dependencies [LEI -> Vec<ISIN>]
/// * figi: metadata with ISIN as key
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyMetadata { pub struct CompanyMetadata {
pub lei: String, pub lei: String,
pub figi: Option<String>, pub figi: Option<Vec<FigiInfo>>,
}
/// Company Info
/// # Attributes
/// * Name as primary key (for one instition) -> might have to changed when first FigiInfo is coming in
/// * ISIN as the most liquid / preferred traded security (used for fallback)
/// * securities: Grouped by ISIN, filtered for Common Stock only
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyInfo{
pub name: String, pub name: String,
pub primary_isin: String, // The most liquid / preferred one (used for folder fallback) pub primary_isin: String,
pub tickers: Vec<TickerInfo>, pub securities: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo>
}
/// Warrant Info
///
/// Information for Warrant securities fetched out of Name in FigiInfo
/// example1: "name": "VONTOBE-PW26 LEONARDO SPA",
/// issued by VONTOBEL Put Warrant for underlying company LEONARDO SPA
/// example2: "BAYER H-CW25 L'OREAL",
/// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WarrantInfo {
pub underlying_company_name: String, // key in CompanyInfo, key for WarrantInfo
pub issuer_company_name: Option<String>, // key in CompanyInfo
pub warrant_type: String, // "put" or "call"
pub warrants: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
}
/// Option Info
///
/// Information for Option securities fetched out of Name in FigiInfo
/// example1: "name": "December 25 Calls on ALPHA GA",
/// issued by NULL Call Option for underlying company ALPHA GA
/// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionInfo {
pub underlying_company_name: String, // key in CompanyInfo, key for OptionInfo
pub issuer_company_name: Option<String>, // key in CompanyInfo
pub option_type: String, // "put" or "call"
pub options: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -1,187 +1,100 @@
// src/corporate/update.rs // src/corporate/update.rs
use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfigi::*}; use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfigi::*};
use crate::config::Config; use crate::config::Config;
use crate::scraper::webdriver::ChromeDriverPool;
use chrono::Local; use chrono::Local;
use std::collections::HashMap; use std::collections::{HashMap};
use std::sync::Arc;
pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> anyhow::Result<()> { /// Main function: Full update for all companies (LEI-based) with optimized parallel execution.
println!("Starting LEI-based corporate update"); ///
/// This function coordinates the entire update process:
/// - Loads GLEIF mappings
/// - Builds FIGI-LEI map
/// - Loads existing events
/// - Processes each company: discovers exchanges via FIGI, fetches prices & earnings, aggregates data
/// - Uses the provided shared ChromeDriver pool for efficient parallel scraping
/// - Saves optimized events
///
/// # Arguments
/// * `config` - The application configuration.
/// * `pool` - Shared pool of ChromeDriver instances for scraping.
///
/// # Errors
/// Returns an error if any step in the update process fails.
pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
println!("=== Starting LEI-based corporate full update ===");
// 1. Download fresh GLEIF ISINLEI mapping on every run // 1. Load fresh GLEIF ISINLEI mapping
let lei_to_isins: HashMap<String, Vec<String>> = match load_isin_lei_csv().await { let lei_to_isins: HashMap<String, Vec<String>> = match load_isin_lei_csv().await {
Ok(map) => map, Ok(map) => map,
Err(e) => { Err(e) => {
println!("Warning: Failed to load ISIN↔LEI mapping: {}", e); eprintln!("Warning: Could not load GLEIF ISIN↔LEI mapping: {}", e);
HashMap::new() HashMap::new()
} }
}; };
let figi_to_lei: HashMap<String, String> = match build_figi_to_lei_map(&lei_to_isins).await { // 2. Load OpenFIGI mapping value lists (cached)
if let Err(e) = load_figi_type_lists().await {
eprintln!("Warning: Could not load OpenFIGI type lists: {}", e);
}
// 3. Build FIGI → LEI map
// # Attributes
// * lei: Structuring the companies by legal dependencies [LEI -> Vec<ISIN>]
// * figi: metadata with ISIN as key
let figi_to_lei:HashMap<String, Vec<FigiInfo>> = match build_lei_to_figi_infos(&lei_to_isins).await {
Ok(map) => map, Ok(map) => map,
Err(e) => { Err(e) => {
println!("Warning: Failed to build FIGI→LEI map: {}", e); eprintln!("Warning: Could not build FIGI→LEI map: {}", e);
HashMap::new() HashMap::new()
} }
}; };
let today = chrono::Local::now().format("%Y-%m-%d").to_string(); // 4. Load or build companies
let mut existing_events = load_existing_events().await?; let mut companies = load_or_build_all_securities(&figi_to_lei).await?;
println!("Processing {} companies", companies.0.len());
let mut companies: Vec<CompanyMetadata> = match load_or_build_companies_figi(&lei_to_isins, &figi_to_lei).await { // 5. Load existing earnings events (for change detection)
Ok(comps) => comps, let today = Local::now().format("%Y-%m-%d").to_string();
let mut existing_events = match load_existing_events().await {
Ok(events) => events,
Err(e) => { Err(e) => {
println!("Error loading/building company metadata: {}", e); eprintln!("Warning: Could not load existing events: {}", e);
return Err(e); HashMap::new()
} }
}; // Vec<CompanyMetadata> with lei, isins, tickers };
for mut company in companies { // 5. Use the provided pool (no need to create a new one)
println!("\nProcessing company: {} (LEI: {})", company.name, company.lei); let pool_size = pool.get_number_of_instances(); // Use the size from the shared pool
// === Enrich with ALL ISINs known to GLEIF (includes ADRs, GDRs, etc.) === // Process companies in parallel using the shared pool
if let Some(all_isins) = lei_to_isins.get(&company.lei) { /*let results: Vec<_> = stream::iter(companies.into_iter())
let mut seen = company.isins.iter().cloned().collect::<std::collections::HashSet<_>>(); .map(|company| {
for isin in all_isins { let pool_clone = pool.clone();
if !seen.contains(isin) { async move {
company.isins.push(isin.clone()); process_company_data(&company, &pool_clone, &mut existing_events).await
seen.insert(isin.clone());
}
}
} }
})
.buffer_unordered(pool_size)
.collect().await;
// Ensure company directory exists (now uses LEI) // Handle results (e.g., collect changes)
//let figi_dir = format!("data/companies_by_figi/{}/", company.primary_figi); let mut all_changes = Vec::new();
ensure_company_dirs(&company.lei).await?; for result in results {
save_company_metadata(&company).await?; if let Ok(ProcessResult { changes }) = result {
all_changes.extend(changes);
}
}*/
// === STEP 1: Discover additional exchanges using each known ISIN ===
let mut all_tickers = company.tickers.clone();
if let Some(primary_ticker) = company.tickers.iter().find(|t| t.primary) {
println!(" Discovering additional exchanges across {} ISIN(s)...", company.isins.len());
for isin in &company.isins {
println!(" → Checking ISIN: {}", isin);
match discover_available_exchanges(isin, &primary_ticker.ticker).await {
Ok(discovered) => {
if discovered.is_empty() {
println!(" No new exchanges found for {}", isin);
} else {
for disc in discovered {
if !all_tickers.iter().any(|t| t.ticker == disc.ticker && t.exchange_mic == disc.exchange_mic) {
println!(" New equity listing → {} ({}) via ISIN {}",
disc.ticker, disc.exchange_mic, isin);
all_tickers.push(disc);
}
}
}
}
Err(e) => println!(" Discovery failed for {}: {}", isin, e),
}
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
}
}
// Save updated metadata if we found new listings
if all_tickers.len() > company.tickers.len() {
company.tickers = all_tickers.clone();
save_company_metadata(&company).await?;
println!(" Updated metadata: {} total tickers", all_tickers.len());
}
// === STEP 2: Fetch data from ALL available tickers ===
for ticker_info in &all_tickers {
let ticker = &ticker_info.ticker;
println!(" → Fetching: {} ({})", ticker, ticker_info.exchange_mic);
let mut daily_success = false;
let mut intraday_success = false;
// Earnings: only fetch from primary ticker to avoid duplicates
if ticker_info.primary {
if let Ok(new_events) = fetch_earnings_history(client, ticker).await {
let result = process_batch(&new_events, &mut existing_events, &today);
save_changes(&result.changes).await?;
println!(" Earnings events: {}", new_events.len());
}
}
// Daily prices
if let Ok(prices) = fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await {
if !prices.is_empty() {
save_prices_by_source(&company.lei, ticker, "daily", prices).await?;
daily_success = true;
}
}
// 5-minute intraday (last 60 days)
let sixty_days_ago = (chrono::Local::now() - chrono::Duration::days(60))
.format("%Y-%m-%d")
.to_string();
if let Ok(prices) = fetch_price_history_5min(ticker, &sixty_days_ago, &today).await {
if !prices.is_empty() {
save_prices_by_source(&company.lei, ticker, "5min", prices).await?;
intraday_success = true;
}
}
// Update available_exchanges.json (now under LEI folder)
update_available_exchange(
&company.lei,
ticker,
&ticker_info.exchange_mic,
daily_success,
intraday_success,
).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(800)).await;
}
// === STEP 3: Aggregate all sources into unified USD prices ===
println!(" Aggregating multi-source price data (FX-adjusted)...");
if let Err(e) = aggregate_best_price_data(&company.lei).await {
println!(" Aggregation failed: {}", e);
} else {
println!(" Aggregation complete");
}
}
// Final save of optimized earnings events
save_optimized_events(existing_events).await?; save_optimized_events(existing_events).await?;
println!("\nCorporate update complete (LEI-based)"); //save_changes(&all_changes).await?;
//println!("Corporate update complete — {} changes detected", all_changes.len());
Ok(()) Ok(())
} }
async fn enrich_companies_with_leis(
companies: &mut Vec<CompanyMetadata>,
lei_to_isins: &HashMap<String, Vec<String>>,
) {
for company in companies.iter_mut() {
if company.lei.is_empty() {
// Try to find LEI by any known ISIN
for isin in &company.isins {
for (lei, isins) in lei_to_isins {
if isins.contains(isin) {
company.lei = lei.clone();
println!("Found real LEI {} for {}", lei, company.name);
break;
}
}
if !company.lei.is_empty() { break; }
}
}
// Fallback: generate fake LEI if still missing
if company.lei.is_empty() {
company.lei = format!("FAKE{:019}", rand::random::<u64>());
println!("No real LEI found → using fake for {}", company.name);
}
}
}
pub struct ProcessResult { pub struct ProcessResult {
pub changes: Vec<CompanyEventChange>, pub changes: Vec<CompanyEventChange>,
} }

View File

@@ -1,58 +0,0 @@
[
{
"lei": "8I5D5ASD7N5Z5P2K9M3J",
"isins": ["US46625H1005"],
"primary_isin": "US46625H1005",
"name": "JPMorgan Chase & Co.",
"tickers": [
{ "ticker": "JPM", "exchange_mic": "XNYS", "currency": "USD", "primary": true },
{ "ticker": "JPM-PC", "exchange_mic": "XNYS", "currency": "USD", "primary": false }
]
},
{
"lei": "5493001KJTIIGC8Y1R12",
"isins": ["US5949181045"],
"primary_isin": "US5949181045",
"name": "Microsoft Corporation",
"tickers": [
{ "ticker": "MSFT", "exchange_mic": "XNAS", "currency": "USD", "primary": true }
]
},
{
"lei": "529900T8BM49AURSDO55",
"isins": ["CNE000001P37"],
"primary_isin": "CNE000001P37",
"name": "Industrial and Commercial Bank of China",
"tickers": [
{ "ticker": "601398.SS", "exchange_mic": "XSHG", "currency": "CNY", "primary": true },
{ "ticker": "1398.HK", "exchange_mic": "XHKG", "currency": "HKD", "primary": false }
]
},
{
"lei": "519900X5W8K6C1FZ3B57",
"isins": ["JP3702200000"],
"primary_isin": "JP3702200000",
"name": "Toyota Motor Corporation",
"tickers": [
{ "ticker": "7203.T", "exchange_mic": "XJPX", "currency": "JPY", "primary": true },
{ "ticker": "TM", "exchange_mic": "XNYS", "currency": "USD", "primary": false }
]
},
{
"lei": "529900T8BM49AURSDO56",
"isins": ["HK0000069689"],
"primary_isin": "HK0000069689",
"name": "Tencent Holdings Limited",
"tickers": [
{ "ticker": "0700.HK", "exchange_mic": "XHKG", "currency": "HKD", "primary": true },
{ "ticker": "TCEHY", "exchange_mic": "OTCM", "currency": "USD", "primary": false }
]
},
{
"lei": "8I5D5Q1L7N5Z5P2K9M3J",
"isins": ["US90953F1049"],
"primary_isin": "US90953F1049",
"name": "Test Bonds Filter",
"tickers": [{ "ticker": "JPM", "exchange_mic": "XNYS", "currency": "USD", "primary": true }]
}
]

View File

@@ -1,9 +0,0 @@
[
"afrika",
"asien",
"europa",
"nordamerika",
"suedamerika",
"antarktis",
"ozeanien"
]

View File

@@ -1,54 +0,0 @@
[
"aegypten",
"frankreich",
"litauen",
"schweiz",
"argentinien",
"griechenland",
"mexiko",
"singapur",
"australien",
"hongkong",
"neuseeland",
"slowakei",
"bahrain",
"indien",
"niederlande",
"spanien",
"belgien",
"indonesien",
"norwegen",
"suedafrika",
"brasilien",
"irland",
"oesterreich",
"suedkorea",
"chile",
"island",
"peru",
"taiwan",
"china",
"italien",
"philippinen",
"tschechien",
"daenemark",
"japan",
"polen",
"tuerkei",
"deutschland",
"kanada",
"portugal",
"ungarn",
"estland",
"katar",
"rumaenien",
"usa",
"eurozone",
"kolumbien",
"russland",
"vereinigte-arabische-emirate",
"finnland",
"lettland",
"schweden",
"vereinigtes-koenigreich"
]

View File

@@ -1,260 +0,0 @@
{
"exchanges": [
{
"mic": "XNYS",
"name": "New York Stock Exchange",
"country": "United States",
"city": "New York City",
"market_cap_trillion_usd": 30.92,
"timezone": "America/New_York",
"tz_offset": "-05:00",
"dst": "MarNov",
"open_local": "09:30",
"close_local": "16:00",
"lunch_break": false,
"open_utc": "14:30",
"close_utc": "21:00",
"currency": "USD"
},
{
"mic": "XNAS",
"name": "Nasdaq",
"country": "United States",
"city": "New York City",
"market_cap_trillion_usd": 31.96,
"timezone": "America/New_York",
"tz_offset": "-05:00",
"dst": "MarNov",
"open_local": "09:30",
"close_local": "16:00",
"lunch_break": false,
"open_utc": "14:30",
"close_utc": "21:00",
"currency": "USD"
},
{
"mic": "XSHG",
"name": "Shanghai Stock Exchange",
"country": "China",
"city": "Shanghai",
"market_cap_trillion_usd": 7.96,
"timezone": "Asia/Shanghai",
"tz_offset": "+08:00",
"dst": null,
"open_local": "09:30",
"close_local": "15:00",
"lunch_break": "11:3013:00",
"open_utc": "01:30",
"close_utc": "07:00",
"currency": "CNY"
},
{
"mic": "XJPX",
"name": "Japan Exchange Group (Tokyo Stock Exchange)",
"country": "Japan",
"city": "Tokyo",
"market_cap_trillion_usd": 7.06,
"timezone": "Asia/Tokyo",
"tz_offset": "+09:00",
"dst": null,
"open_local": "09:00",
"close_local": "15:00",
"lunch_break": "11:3012:30",
"open_utc": "00:00",
"close_utc": "06:00",
"currency": "JPY"
},
{
"mic": "XHKG",
"name": "Hong Kong Stock Exchange",
"country": "Hong Kong",
"city": "Hong Kong",
"market_cap_trillion_usd": 6.41,
"timezone": "Asia/Hong_Kong",
"tz_offset": "+08:00",
"dst": null,
"open_local": "09:30",
"close_local": "16:00",
"lunch_break": "12:0013:00",
"open_utc": "01:30",
"close_utc": "08:00",
"currency": "HKD"
},
{
"mic": "XAMS",
"name": "Euronext Amsterdam",
"country": "Netherlands",
"city": "Amsterdam",
"market_cap_trillion_usd": 5.61,
"timezone": "Europe/Amsterdam",
"tz_offset": "+01:00",
"dst": "MarOct",
"open_local": "09:00",
"close_local": "17:30",
"lunch_break": false,
"open_utc": "08:00",
"close_utc": "16:30",
"currency": "EUR"
},
{
"mic": "XBSE",
"name": "Bombay Stock Exchange",
"country": "India",
"city": "Mumbai",
"market_cap_trillion_usd": 5.25,
"timezone": "Asia/Kolkata",
"tz_offset": "+05:30",
"dst": null,
"open_local": "09:15",
"close_local": "15:30",
"lunch_break": false,
"open_utc": "03:45",
"close_utc": "10:00",
"currency": "INR"
},
{
"mic": "XNSE",
"name": "National Stock Exchange of India",
"country": "India",
"city": "Mumbai",
"market_cap_trillion_usd": 5.32,
"timezone": "Asia/Kolkata",
"tz_offset": "+05:30",
"dst": null,
"open_local": "09:15",
"close_local": "15:d30",
"lunch_break": false,
"open_utc": "03:45",
"close_utc": "10:00",
"currency": "INR"
},
{
"mic": "XSHE",
"name": "Shenzhen Stock Exchange",
"country": "China",
"city": "Shenzhen",
"market_cap_trillion_usd": 5.11,
"timezone": "Asia/Shanghai",
"tz_offset": "+08:00",
"dst": null,
"open_local": "09:30",
"close_local": "15:00",
"lunch_break": "11:3013:00",
"open_utc": "01:30",
"close_utc": "07:00",
"currency": "CNY"
},
{
"mic": "XTSE",
"name": "Toronto Stock Exchange",
"country": "Canada",
"city": "Toronto",
"market_cap_trillion_usd": 4.00,
"timezone": "America/Toronto",
"tz_offset": "-05:00",
"dst": "MarNov",
"open_local": "09:30",
"close_local": "16:00",
"lunch_break": false,
"open_utc": "14:30",
"close_utc": "21:00",
"currency": "CAD"
},
{
"mic": "XLON",
"name": "London Stock Exchange",
"country": "United Kingdom",
"city": "London",
"market_cap_trillion_usd": 3.14,
"timezone": "Europe/London",
"tz_offset": "+00:00",
"dst": "MarOct",
"open_local": "08:00",
"close_local": "16:30",
"lunch_break": false,
"open_utc": "08:00",
"close_utc": "16:30",
"currency": "GBP"
},
{
"mic": "XTAI",
"name": "Taiwan Stock Exchange",
"country": "Taiwan",
"city": "Taipei",
"market_cap_trillion_usd": 2.87,
"timezone": "Asia/Taipei",
"tz_offset": "+08:00",
"dst": null,
"open_local": "09:00",
"close_local": "13:30",
"lunch_break": false,
"open_utc": "01:00",
"close_utc": "05:30",
"currency": "TWD"
},
{
"mic": "XSAU",
"name": "Saudi Exchange (Tadawul)",
"country": "Saudi Arabia",
"city": "Riyadh",
"market_cap_trillion_usd": 2.73,
"timezone": "Asia/Riyadh",
"tz_offset": "+03:00",
"dst": null,
"open_local": "10:00",
"close_local": "15:00",
"lunch_break": false,
"open_utc": "07:00",
"close_utc": "12:00",
"currency": "SAR"
},
{
"mic": "XFRA",
"name": "Deutsche Börse (Xetra)",
"country": "Germany",
"city": "Frankfurt",
"market_cap_trillion_usd": 2.04,
"timezone": "Europe/Berlin",
"tz_offset": "+01:00",
"dst": "MarOct",
"open_local": "09:00",
"close_local": "17:30",
"lunch_break": false,
"open_utc": "08:00",
"close_utc": "16:30",
"currency": "EUR"
},
{
"mic": "XSWX",
"name": "SIX Swiss Exchange",
"country": "Switzerland",
"city": "Zürich",
"market_cap_trillion_usd": 1.97,
"timezone": "Europe/Zurich",
"tz_offset": "+01:00",
"dst": "MarOct",
"open_local": "09:00",
"close_local": "17:30",
"lunch_break": false,
"open_utc": "08:00",
"close_utc": "16:30",
"currency": "CHF"
},
{
"mic": "XASX",
"name": "Australian Securities Exchange",
"country": "Australia",
"city": "Sydney",
"market_cap_trillion_usd": 1.89,
"timezone": "Australia/Sydney",
"tz_offset": "+10:00",
"dst": "OctApr",
"open_local": "10:00",
"close_local": "16:00",
"lunch_break": false,
"open_utc": "00:00",
"close_utc": "06:00",
"currency": "AUD"
}
]
}

View File

@@ -1,6 +0,0 @@
data/*
companies.json
continents.json
countries.json
exchanges.json

View File

@@ -1,7 +1,7 @@
// src/economic/helpers.rs // src/economic/helpers.rs
use super::types::*; use super::types::*;
use chrono::{Local, NaiveDate}; use chrono::{Local};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap};
pub fn event_key(e: &EconomicEvent) -> String { pub fn event_key(e: &EconomicEvent) -> String {
format!("{}|{}|{}", e.date, e.time, e.event) format!("{}|{}|{}", e.date, e.time, e.event)

View File

@@ -5,7 +5,4 @@ pub mod storage;
pub mod update; pub mod update;
pub mod helpers; pub mod helpers;
pub use types::*;
pub use scraper::*;
pub use update::run_full_update; pub use update::run_full_update;
pub use helpers::*;

View File

@@ -1,24 +1,23 @@
// src/economic/scraper.rs // src/economic/scraper.rs
use super::types::{EconomicEvent, ScrapeResult}; use super::types::{EconomicEvent};
use fantoccini::Client; use fantoccini::Client;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use chrono::{Local, NaiveDate};
const EXTRACTION_JS: &str = include_str!("extraction_script.js"); const EXTRACTION_JS: &str = include_str!("extraction_script.js");
pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> { pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> {
client.goto("https://www.finanzen.net/termine/wirtschaftsdaten/").await?; client.goto("https://www.finanzen.net/termine/wirtschaftsdaten/").await?;
dismiss_overlays(client).await?; //dismiss_overlays(client).await?;
if let Ok(tab) = client.find(fantoccini::Locator::Css(r#"div[data-sg-tab-item="teletrader-dates-three-stars"]"#)).await { /*if let Ok(tab) = client.find(fantoccini::Locator::Css(r#"div[data-sg-tab-item="teletrader-dates-three-stars"]"#)).await {
tab.click().await?; tab.click().await?;
println!("High importance tab selected"); println!("High importance tab selected");
sleep(Duration::from_secs(2)).await; sleep(Duration::from_secs(2)).await;
} }*/
Ok(()) Ok(())
} }
pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> { /*pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> {
for _ in 0..10 { for _ in 0..10 {
let removed: bool = client let removed: bool = client
.execute( .execute(
@@ -39,7 +38,7 @@ pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> {
sleep(Duration::from_millis(500)).await; sleep(Duration::from_millis(500)).await;
} }
Ok(()) Ok(())
} }*/
pub async fn set_date_range(client: &Client, start: &str, end: &str) -> anyhow::Result<()> { pub async fn set_date_range(client: &Client, start: &str, end: &str) -> anyhow::Result<()> {
let script = format!( let script = format!(

View File

@@ -2,12 +2,11 @@
use super::types::*; use super::types::*;
use super::helpers::*; use super::helpers::*;
use tokio::fs; use tokio::fs;
use chrono::{Local, NaiveDate, Datelike}; use chrono::{NaiveDate, Datelike};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path;
pub async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> { pub async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> {
let dir = std::path::Path::new("economic_events"); let dir = std::path::Path::new("data/economic/events");
let mut chunks = Vec::new(); let mut chunks = Vec::new();
if dir.exists() { if dir.exists() {
@@ -46,7 +45,7 @@ pub async fn load_existing_events(chunks: &[ChunkInfo]) -> anyhow::Result<HashMa
} }
pub async fn save_optimized_chunks(events: HashMap<String, EconomicEvent>) -> anyhow::Result<()> { pub async fn save_optimized_chunks(events: HashMap<String, EconomicEvent>) -> anyhow::Result<()> {
let dir = std::path::Path::new("economic_events"); let dir = std::path::Path::new("data/economic/events");
fs::create_dir_all(dir).await?; fs::create_dir_all(dir).await?;
// Delete all old chunk files to prevent duplicates and overlaps // Delete all old chunk files to prevent duplicates and overlaps
@@ -113,9 +112,3 @@ pub async fn save_changes(changes: &[EventChange]) -> anyhow::Result<()> {
} }
Ok(()) Ok(())
} }
pub fn target_end_date() -> String {
let now = Local::now().naive_local().date();
let future = now + chrono::Duration::days(90);
future.format("%Y-%m-%d").to_string()
}

View File

@@ -1,9 +1,19 @@
// src/economic/update.rs // src/economic/update.rs
use super::{scraper::*, storage::*, helpers::*, types::*}; use super::{scraper::*, storage::*, helpers::*, types::*};
use crate::config::Config; use crate::{config::Config, scraper::webdriver::ScrapeTask};
use chrono::{Local, NaiveDate}; use crate::scraper::webdriver::ChromeDriverPool;
use chrono::{Local};
use std::sync::Arc;
pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> anyhow::Result<()> { /// Runs the full update for economic data, using the provided ChromeDriver pool.
///
/// # Arguments
/// * `config` - The application configuration.
/// * `pool` - Shared pool of ChromeDriver instances for scraping.
///
/// # Errors
/// Returns an error if scraping, loading, or saving fails.
pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
let today_str = chrono::Local::now().date_naive().format("%Y-%m-%d").to_string(); let today_str = chrono::Local::now().date_naive().format("%Y-%m-%d").to_string();
let end_date = config.target_end_date(); let end_date = config.target_end_date();
@@ -26,34 +36,66 @@ pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> an
println!("Scraping economic events: {}{}", start_date, end_date); println!("Scraping economic events: {}{}", start_date, end_date);
let mut current = start_date; // Pass the pool to the scraping function
let mut total_changes = 0; let new_events_all = scrape_all_economic_events(&start_date, &end_date, pool).await?;
while current <= end_date { // Process all at once or in batches
set_date_range(client, &current, &end_date).await?; let result = process_batch(&new_events_all, &mut events, &today_str);
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; let total_changes = result.changes.len();
let new_events = extract_events(client).await?;
if new_events.is_empty() { break; }
let result = process_batch(&new_events, &mut events, &today_str);
total_changes += result.changes.len();
save_changes(&result.changes).await?; save_changes(&result.changes).await?;
save_optimized_chunks(events).await?;
println!("Economic update complete — {} changes detected", total_changes);
Ok(())
}
/// Scrapes all economic events from start to end date using a dedicated ScrapeTask with the provided pool.
///
/// This function creates a ScrapeTask to navigate to the Finanzen.net page, prepare it,
/// and then loop through date ranges to extract events.
///
/// # Arguments
/// * `start` - Start date in YYYY-MM-DD.
/// * `end` - End date in YYYY-MM-DD.
/// * `pool` - Shared pool of ChromeDriver instances.
///
/// # Returns
/// A vector of all extracted EconomicEvent structs.
///
/// # Errors
/// Returns an error if task execution fails or extraction issues occur.
pub async fn scrape_all_economic_events(start: &str, end: &str, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<Vec<EconomicEvent>> {
let url = "https://www.finanzen.net/termine/wirtschaftsdaten/".to_string();
let start_clone = start.to_string();
let end_clone = end.to_string();
let task = ScrapeTask::new(url, move |client| async move {
goto_and_prepare(&client).await?;
let mut all_events = Vec::new();
let mut current = start_clone;
while current <= end_clone {
set_date_range(&client, &current, &end_clone).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let new_events = extract_events(&client).await?;
if new_events.is_empty() { break; }
all_events.extend(new_events.clone());
let next = new_events.iter() let next = new_events.iter()
.filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok()) .filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok())
.max() .max()
.and_then(|d| d.succ_opt()) .and_then(|d| d.succ_opt())
.map(|d| d.format("%Y-%m-%d").to_string()) .map(|d| d.format("%Y-%m-%d").to_string())
.unwrap_or(end_date.clone()); .unwrap_or(end_clone.clone());
if next > end_date { break; } if next > end_clone { break; }
current = next; current = next;
} }
Ok(all_events)
});
save_optimized_chunks(events).await?; // Use the pool for execution
println!("Economic update complete — {} changes detected", total_changes); task.execute_with_pool(pool).await
Ok(())
} }
pub fn process_batch( pub fn process_batch(

View File

@@ -3,69 +3,41 @@ mod economic;
mod corporate; mod corporate;
mod config; mod config;
mod util; mod util;
mod scraper;
use fantoccini::{ClientBuilder}; use anyhow::Result;
use serde_json::{Map, Value}; use config::Config;
use tokio::signal; use scraper::webdriver::ChromeDriverPool;
use std::sync::Arc;
/// The entry point of the application.
///
/// This function loads the configuration, initializes a shared ChromeDriver pool,
/// and sequentially runs the full updates for corporate and economic data.
/// Sequential execution helps prevent resource exhaustion from concurrent
/// chromedriver instances and avoids spamming the target websites with too many requests.
///
/// # Errors
///
/// Returns an error if configuration loading fails, pool initialization fails,
/// or if either update function encounters an issue (e.g., network errors,
/// scraping failures, or chromedriver spawn failures like "program not found").
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> Result<()> {
// === Ensure data directories exist === let config = Config::load().map_err(|err| {
util::ensure_data_dirs().await?; println!("Failed to load Config .env: {}", err);
err
})?;
// === Load configuration === // Initialize the shared ChromeDriver pool once
let config = config::Config::default(); let pool_size = config.max_parallel_tasks;
let pool = Arc::new(ChromeDriverPool::new(pool_size).await?);
// === Start ChromeDriver === // Run economic update first, passing the shared pool
let mut child = std::process::Command::new("chromedriver-win64/chromedriver.exe") economic::run_full_update(&config, &pool).await?;
.args(["--port=9515"]) // Level 3 = minimal logs
.spawn()?;
// Build capabilities to hide infobar + enable full rendering // Then run corporate update, passing the shared pool
let port = 9515; corporate::run_full_update(&config, &pool).await?;
let caps_value = serde_json::json!({
"goog:chromeOptions": {
"args": [
//"--headless",
"--disable-gpu",
"--disable-notifications",
"--disable-popup-blocking",
"--disable-blink-features=AutomationControlled"
],
"excludeSwitches": ["enable-automation"]
}
});
let caps_map: Map<String, Value> = caps_value.as_object()
.expect("Capabilities should be a JSON object")
.clone();
let mut client = ClientBuilder::native()
.capabilities(caps_map)
.connect(&format!("http://localhost:{}", port))
.await?;
// Graceful shutdown
let client_clone = client.clone();
tokio::spawn(async move {
signal::ctrl_c().await.unwrap();
client_clone.close().await.ok();
std::process::exit(0);
});
// === Economic Calendar Update ===
println!("Updating Economic Calendar (High Impact Only)");
economic::goto_and_prepare(&client).await?;
economic::run_full_update(&client, &config).await?;
// === Corporate Earnings Update ===
println!("\nUpdating Corporate Earnings");
corporate::run_full_update(&client, &config).await?;
// === Cleanup ===
client.close().await?;
child.kill()?;
println!("\nAll data updated successfully!");
Ok(()) Ok(())
} }

1
src/scraper/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod webdriver;

269
src/scraper/webdriver.rs Normal file
View File

@@ -0,0 +1,269 @@
// src/scraper/webdriver.rs
use anyhow::{anyhow, Context, Result};
use fantoccini::{Client, ClientBuilder};
use serde_json::{Map, Value};
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::{Mutex, Semaphore};
use tokio::time::{Duration, sleep, timeout};
use std::pin::Pin;
/// Manages a pool of ChromeDriver instances for parallel scraping.
///
/// This struct maintains multiple ChromeDriver processes and allows controlled
/// concurrent access via a semaphore. Instances are reused across tasks to avoid
/// the overhead of spawning new processes.
pub struct ChromeDriverPool {
instances: Vec<Arc<Mutex<ChromeInstance>>>,
semaphore: Arc<Semaphore>,
}
impl ChromeDriverPool {
/// Creates a new pool with the specified number of ChromeDriver instances.
///
/// # Arguments
/// * `pool_size` - Number of concurrent ChromeDriver instances to maintain
pub async fn new(pool_size: usize) -> Result<Self> {
let mut instances = Vec::with_capacity(pool_size);
println!("Initializing ChromeDriver pool with {} instances...", pool_size);
for i in 0..pool_size {
match ChromeInstance::new().await {
Ok(instance) => {
println!(" ✓ Instance {} ready", i + 1);
instances.push(Arc::new(Mutex::new(instance)));
}
Err(e) => {
eprintln!(" ✗ Failed to create instance {}: {}", i + 1, e);
// Clean up already created instances
drop(instances);
return Err(e);
}
}
}
Ok(Self {
instances,
semaphore: Arc::new(Semaphore::new(pool_size)),
})
}
/// Executes a scrape task using an available instance from the pool.
pub async fn execute<T, F, Fut>(&self, url: String, parse: F) -> Result<T>
where
T: Send + 'static,
F: FnOnce(Client) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
{
// Acquire semaphore permit
let _permit = self.semaphore.acquire().await
.map_err(|_| anyhow!("Semaphore closed"))?;
// Find an available instance (round-robin or first available)
let instance = self.instances[0].clone(); // Simple: use first, could be round-robin
let mut guard = instance.lock().await;
// Create a new session for this task
let client = guard.new_session().await?;
// Release lock while we do the actual scraping
drop(guard);
// Navigate and parse
client.goto(&url).await.context("Failed to navigate")?;
let result = timeout(Duration::from_secs(60), parse(client))
.await
.context("Parse function timed out after 60s")??;
Ok(result)
}
pub fn get_number_of_instances (&self) -> usize {
self.instances.len()
}
}
/// Represents a single instance of chromedriver process.
pub struct ChromeInstance {
process: Child,
base_url: String,
}
impl ChromeInstance {
/// Creates a new ChromeInstance by spawning chromedriver with random port.
///
/// This spawns `chromedriver --port=0` to avoid port conflicts, reads stdout to extract
/// the listening address, and waits for the success message. If timeout occurs or
/// spawning fails, returns an error with context.
///
/// # Errors
///
/// Returns an error if chromedriver fails to spawn (e.g., not in PATH, version mismatch),
/// if the process exits early, or if the address/success message isn't found within 30s.
pub async fn new() -> Result<Self> {
let mut command = Command::new("chromedriver-win64/chromedriver.exe");
command
.arg("--port=0") // Use random available port to support pooling
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut process = command
.spawn()
.context("Failed to spawn chromedriver. Ensure it's installed and in PATH.")?;
let mut stdout = BufReader::new(
process.stdout.take().context("Failed to capture stdout")?
).lines();
let mut stderr = BufReader::new(
process.stderr.take().context("Failed to capture stderr")?
).lines();
let start_time = std::time::Instant::now();
let mut address: Option<String> = None;
let mut success = false;
// Log stderr in background for debugging
tokio::spawn(async move {
while let Ok(Some(line)) = stderr.next_line().await {
eprintln!("ChromeDriver stderr: {}", line);
}
});
// Wait for address and success (up to 30s)
while start_time.elapsed() < Duration::from_secs(30) {
if let Ok(Ok(Some(line))) =
timeout(Duration::from_secs(1), stdout.next_line()).await
{
if let Some(addr) = parse_chromedriver_address(&line) {
address = Some(addr.to_string());
}
if line.contains("ChromeDriver was started successfully") {
success = true;
}
if let (Some(addr), true) = (&address, success) {
return Ok(Self {
process,
base_url: addr.clone(),
});
}
}
sleep(Duration::from_millis(100)).await;
}
// Cleanup on failure
let _ = process.kill().await;
Err(anyhow!("Timeout: ChromeDriver did not start within 30 seconds. Check version match with Chrome browser and system resources."))
}
/// Creates a new browser session (client) from this ChromeDriver instance.
/// Each session is independent and can be closed without affecting the driver.
pub async fn new_session(&self) -> Result<Client> {
ClientBuilder::native()
.capabilities(Self::chrome_args())
.connect(&self.base_url)
.await
.context("Failed to create new session")
}
fn chrome_args() -> Map<String, Value> {
let args = serde_json::json!({
"goog:chromeOptions": {
"args": [
"--headless=new",
"--disable-gpu",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-infobars",
"--disable-extensions",
"--disable-popup-blocking",
"--disable-notifications",
"--disable-logging",
"--disable-autofill",
"--disable-features=TranslateUI,OptimizationGuideModelDownloading",
"--window-size=1920,1080",
"--disable-blink-features=AutomationControlled",
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
],
"excludeSwitches": ["enable-logging", "enable-automation"],
"useAutomationExtension": false,
"prefs": {
"profile.default_content_setting_values.notifications": 2
}
}
});
args.as_object()
.expect("Capabilities should be a JSON object")
.clone()
}
}
/// Parses the ChromeDriver address from a log line.
///
/// Looks for the "Starting ChromeDriver ... on port XXXX" line and extracts the port.
/// Returns `Some("http://localhost:XXXX")` if found, else `None`.
fn parse_chromedriver_address(line: &str) -> Option<String> {
if line.contains("Starting ChromeDriver") {
if let Some(port_str) = line.split("on port ").nth(1) {
if let Some(port) = port_str.split_whitespace().next() {
if port.parse::<u16>().is_ok() {
return Some(format!("http://localhost:{}", port));
}
}
}
}
// Fallback for other formats (e.g., explicit port mentions)
for word in line.split_whitespace() {
if let Ok(port) = word.trim_matches(|c: char| !c.is_numeric()).parse::<u16>() {
if port > 1024 && port < 65535 && line.to_lowercase().contains("port") {
return Some(format!("http://localhost:{}", port));
}
}
}
None
}
impl Drop for ChromeInstance {
fn drop(&mut self) {
let _ = self.process.start_kill();
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
/// Simplified task execution - now uses the pool pattern.
///
/// For backwards compatibility with existing code.
pub struct ScrapeTask<T> {
url: String,
parse: Box<dyn FnOnce(Client) -> Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>> + Send>,
}
impl<T: Send + 'static> ScrapeTask<T> {
pub fn new<F, Fut>(url: String, parse: F) -> Self
where
F: FnOnce(Client) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
{
Self {
url,
parse: Box::new(move |client| Box::pin(parse(client))),
}
}
/// Executes using a provided pool (more efficient for multiple tasks).
pub async fn execute_with_pool(self, pool: &ChromeDriverPool) -> Result<T> {
let url = self.url;
let parse = self.parse;
pool.execute(url, move |client| async move {
(parse)(client).await
}).await
}
}

View File

@@ -3,7 +3,7 @@ use tokio::fs;
use std::path::Path; use std::path::Path;
/// Create the required data folders if they do not exist yet. /// Create the required data folders if they do not exist yet.
pub async fn ensure_data_dirs() -> anyhow::Result<()> { pub async fn _ensure_data_dirs() -> anyhow::Result<()> {
let dirs = [ let dirs = [
"economic_events", "economic_events",
"economic_event_changes", "economic_event_changes",