added yahoo exchange extraction

This commit is contained in:
2026-01-09 19:09:42 +01:00
parent ea128f6187
commit 8dd75f7bdf
13 changed files with 1538 additions and 408 deletions

View File

@@ -0,0 +1,677 @@
// src/corporate/collect_exchanges.rs
use crate::util::directories::DataPaths;
use crate::util::logger;
use crate::scraper::yahoo::ChartData;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::fs;
use tokio::io::AsyncWriteExt;
/// Exchange information collected from company data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExchangeInfo {
#[serde(rename = "exchangeName")]
pub exchange_name: String,
pub currency: String,
#[serde(rename = "currencySymbol")]
pub currency_symbol: String,
#[serde(rename = "exchangeDataDelayedBy")]
pub exchange_data_delayed_by: i64,
#[serde(rename = "totalMarketCap")]
pub total_market_cap: u64,
#[serde(rename = "totalMarketCapUSD")]
pub total_market_cap_usd: f64, // NEW: Market cap converted to USD
pub companies: Vec<String>,
}
/// Extract exchange data from company core data
#[derive(Debug, Deserialize)]
struct CompanyCoreData {
modules: Option<CoreModules>,
}
#[derive(Debug, Deserialize)]
struct CoreModules {
price: Option<PriceModule>,
}
#[derive(Debug, Deserialize)]
struct PriceModule {
#[serde(rename = "exchangeName")]
exchange_name: Option<String>,
currency: Option<String>,
#[serde(rename = "currencySymbol")]
currency_symbol: Option<String>,
exchange: Option<String>,
#[serde(rename = "exchangeDataDelayedBy")]
exchange_data_delayed_by: Option<i64>,
#[serde(rename = "marketCap")]
market_cap: Option<MarketCapData>,
}
#[derive(Debug, Deserialize)]
struct MarketCapData {
raw: Option<u64>,
}
/// Normalize currency code and get conversion factor
/// Handles special cases like GBp (pence) and ZAc (cents)
fn normalize_currency(currency: &str) -> (&str, f64) {
match currency {
"GBp" => ("GBP", 100.0), // British Pence -> Pounds (divide by 100)
"ZAc" => ("ZAR", 100.0), // South African Cents -> Rand (divide by 100)
_ => (currency, 1.0), // No conversion needed
}
}
/// FX rate cache for currency conversion
struct FxRateCache {
rates: HashMap<String, f64>,
}
impl FxRateCache {
/// Create new FX rate cache by loading all currency charts
async fn new(paths: &DataPaths) -> anyhow::Result<Self> {
let mut rates = HashMap::new();
// USD to USD is always 1.0
rates.insert("USD".to_string(), 1.0);
let currency_dir = paths.data_dir().join("economic").join("currency");
if !currency_dir.exists() {
logger::log_warn(" FX rates directory not found - will use default rates").await;
return Ok(Self { rates });
}
let mut entries = fs::read_dir(&currency_dir).await?;
let mut loaded_count = 0;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if !path.is_dir() {
continue;
}
let currency_code = match path.file_name().and_then(|n| n.to_str()) {
Some(code) => code.to_string(),
None => continue,
};
let chart_path = path.join("chart").join("data.jsonl");
if !chart_path.exists() {
continue;
}
// Load chart and get latest rate
match load_latest_fx_rate(&chart_path).await {
Ok(rate) => {
rates.insert(currency_code.clone(), rate);
loaded_count += 1;
}
Err(e) => {
logger::log_warn(&format!(
" Failed to load FX rate for {}: {}",
currency_code, e
)).await;
}
}
}
logger::log_info(&format!(" ✓ Loaded {} FX rates", loaded_count)).await;
Ok(Self { rates })
}
/// Convert amount from given currency to USD
fn to_usd(&self, amount: u64, currency: &str) -> f64 {
// Normalize currency and get conversion factor
// e.g., GBp -> (GBP, 100.0), ZAc -> (ZAR, 100.0)
let (normalized_currency, factor) = normalize_currency(currency);
// First convert to base currency unit (e.g., pence to pounds)
let amount_in_base = amount as f64 / factor;
if normalized_currency == "USD" {
return amount_in_base;
}
// Get rate (USD per currency unit)
// For USD/EUR = 0.92, this means 1 USD = 0.92 EUR
// To convert EUR to USD: EUR_amount / 0.92
match self.rates.get(normalized_currency) {
Some(&rate) if rate > 0.0 => {
amount_in_base / rate
}
_ => {
// Fallback: use approximate rates for common currencies
let fallback_rate = get_fallback_rate(normalized_currency);
amount_in_base / fallback_rate
}
}
}
/// Get rate for a currency (USD per unit)
fn get_rate(&self, currency: &str) -> Option<f64> {
let (normalized_currency, _) = normalize_currency(currency);
self.rates.get(normalized_currency).copied()
}
}
/// Load latest FX rate from chart data
async fn load_latest_fx_rate(chart_path: &std::path::Path) -> anyhow::Result<f64> {
let content = fs::read_to_string(chart_path).await?;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
let chart: ChartData = serde_json::from_str(line)?;
if chart.quotes.is_empty() {
return Err(anyhow::anyhow!("No quotes in chart data"));
}
// Get most recent quote with a close price
let latest_rate = chart.quotes
.iter()
.rev()
.find_map(|q| q.close)
.ok_or_else(|| anyhow::anyhow!("No valid close prices"))?;
return Ok(latest_rate);
}
Err(anyhow::anyhow!("No data in chart file"))
}
/// Fallback rates for common currencies (approximate, as of 2024)
/// These are USD per currency unit (same format as our FX data)
fn get_fallback_rate(currency: &str) -> f64 {
match currency {
"USD" => 1.0,
"EUR" => 0.92, // 1 USD = 0.92 EUR
"GBP" => 0.79, // 1 USD = 0.79 GBP
"JPY" => 150.0, // 1 USD = 150 JPY
"CNY" | "RMB" => 7.2,
"CHF" => 0.88,
"AUD" => 1.52,
"CAD" => 1.36,
"HKD" => 7.8,
"SGD" => 1.34,
"SEK" => 10.5,
"NOK" => 10.8,
"DKK" => 6.9,
"PLN" => 4.0,
"CZK" => 23.0,
"TRY" => 32.0,
"ZAR" => 18.5,
"ILS" => 3.7,
"RON" => 4.6,
"KWD" => 0.31,
"TWD" => 31.5,
"ISK" => 138.0,
"NZD" => 1.65,
"MXN" => 17.0,
"BRL" => 5.0,
"INR" => 83.0,
"KRW" => 1320.0,
"THB" => 35.0,
"MYR" => 4.6,
"IDR" => 15700.0,
"PHP" => 56.0,
"VND" => 24500.0,
_ => {
// Default: assume similar to USD
1.0
}
}
}
/// Collect all exchanges from company directories and create yahoo_exchanges.json
///
/// # Features
/// - Iterates through all company directories
/// - Extracts exchange data from core/data.jsonl
/// - Groups companies by exchange
/// - Sums up market caps for each exchange
/// - **NEW**: Converts all market caps to USD using FX rates
/// - Saves consolidated mapping to data/yahoo_exchanges.json
/// - Handles missing or invalid data gracefully
pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usize> {
logger::log_info("Collecting exchange information from company directories...").await;
let corporate_dir = paths.corporate_dir();
if !corporate_dir.exists() {
logger::log_warn(" Corporate directory does not exist").await;
return Ok(0);
}
// Load FX rates for currency conversion
logger::log_info("Loading FX rates for currency conversion...").await;
let fx_cache = FxRateCache::new(paths).await?;
// Map of exchange code -> ExchangeInfo
let mut exchanges: HashMap<String, ExchangeInfo> = HashMap::new();
let mut entries = fs::read_dir(&corporate_dir).await?;
let mut processed_count = 0;
let mut skipped_count = 0;
while let Some(entry) = entries.next_entry().await? {
let company_path = entry.path();
if !company_path.is_dir() {
continue;
}
let company_name = match company_path.file_name().and_then(|n| n.to_str()) {
Some(name) => name.to_string(),
None => {
skipped_count += 1;
continue;
}
};
// Read core/data.jsonl
let core_data_path = company_path.join("core").join("data.jsonl");
if !core_data_path.exists() {
skipped_count += 1;
continue;
}
// Parse core data
match extract_exchange_info(&core_data_path, &company_name).await {
Ok(Some((exchange_code, exchange_name, currency, currency_symbol, delay, market_cap))) => {
// Convert market cap to USD
let market_cap_usd = fx_cache.to_usd(market_cap, &currency);
// Add or update exchange entry
exchanges
.entry(exchange_code.clone())
.and_modify(|info| {
// Add company to existing exchange and sum market caps
info.companies.push(company_name.clone());
info.total_market_cap = info.total_market_cap.saturating_add(market_cap);
info.total_market_cap_usd += market_cap_usd;
})
.or_insert_with(|| {
// Create new exchange entry
ExchangeInfo {
exchange_name,
currency,
currency_symbol,
exchange_data_delayed_by: delay,
total_market_cap: market_cap,
total_market_cap_usd: market_cap_usd,
companies: vec![company_name.clone()],
}
});
processed_count += 1;
}
Ok(None) => {
// No exchange data found
skipped_count += 1;
}
Err(e) => {
logger::log_warn(&format!(
" Failed to parse exchange data for {}: {}",
company_name, e
)).await;
skipped_count += 1;
}
}
// Progress logging every 100 companies
if (processed_count + skipped_count) % 100 == 0 {
logger::log_info(&format!(
" Progress: {} companies processed, {} skipped",
processed_count, skipped_count
)).await;
}
}
logger::log_info(&format!(
" ✓ Collected data from {} companies ({} skipped)",
processed_count, skipped_count
)).await;
logger::log_info(&format!(
" ✓ Found {} unique exchanges",
exchanges.len()
)).await;
// Sort companies within each exchange for consistency
for exchange_info in exchanges.values_mut() {
exchange_info.companies.sort();
}
// Save to yahoo_exchanges.json
let output_path = paths.data_dir().join("yahoo_exchanges.json");
save_exchanges_json(&output_path, &exchanges).await?;
logger::log_info(&format!(
" ✓ Saved exchange mapping to {}",
output_path.display()
)).await;
// Print summary statistics
print_exchange_statistics(&exchanges, &fx_cache).await;
Ok(exchanges.len())
}
/// Extract exchange information from a company's core data file
async fn extract_exchange_info(
core_data_path: &std::path::Path,
company_name: &str,
) -> anyhow::Result<Option<(String, String, String, String, i64, u64)>> {
let content = fs::read_to_string(core_data_path).await?;
// Parse JSONL - should be single line
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<CompanyCoreData>(line) {
Ok(data) => {
// Extract from modules.price
let price_module = match data.modules.and_then(|m| m.price) {
Some(p) => p,
None => return Ok(None),
};
// Extract required fields
let exchange = match price_module.exchange {
Some(e) if !e.is_empty() => e,
_ => return Ok(None),
};
// Filter out invalid placeholder exchange codes
if exchange == "CCC" {
return Ok(None);
}
let exchange_name = price_module.exchange_name.unwrap_or_else(|| exchange.clone());
let currency = price_module.currency.unwrap_or_else(|| "USD".to_string());
let currency_symbol = price_module.currency_symbol.unwrap_or_else(|| "$".to_string());
let delay = price_module.exchange_data_delayed_by.unwrap_or(0);
let market_cap = price_module
.market_cap
.and_then(|mc| mc.raw)
.unwrap_or(0);
return Ok(Some((
exchange,
exchange_name,
currency,
currency_symbol,
delay,
market_cap,
)));
}
Err(e) => {
// Try to parse as generic JSON to check if exchange field exists in modules.price
if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) {
// Try to access modules.price.exchange
if let Some(price) = json.get("modules").and_then(|m| m.get("price")) {
if let Some(exchange) = price.get("exchange").and_then(|v| v.as_str()) {
if !exchange.is_empty() && exchange != "CCC" {
let exchange_name = price
.get("exchangeName")
.and_then(|v| v.as_str())
.unwrap_or(exchange)
.to_string();
let currency = price
.get("currency")
.and_then(|v| v.as_str())
.unwrap_or("USD")
.to_string();
let currency_symbol = price
.get("currencySymbol")
.and_then(|v| v.as_str())
.unwrap_or("$")
.to_string();
let delay = price
.get("exchangeDataDelayedBy")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let market_cap = price
.get("marketCap")
.and_then(|mc| mc.get("raw"))
.and_then(|v| v.as_u64())
.unwrap_or(0);
return Ok(Some((
exchange.to_string(),
exchange_name,
currency,
currency_symbol,
delay,
market_cap,
)));
}
}
}
}
return Err(anyhow::anyhow!(
"Failed to parse core data for {}: {}",
company_name,
e
));
}
}
}
Ok(None)
}
/// Save exchanges map to JSON file with fsync
async fn save_exchanges_json(
path: &std::path::Path,
exchanges: &HashMap<String, ExchangeInfo>,
) -> anyhow::Result<()> {
// Create sorted output for consistency
let mut sorted_exchanges: Vec<_> = exchanges.iter().collect();
sorted_exchanges.sort_by_key(|(code, _)| code.as_str());
let exchanges_map: HashMap<String, ExchangeInfo> = sorted_exchanges
.into_iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
// Serialize with pretty printing
let json_content = serde_json::to_string_pretty(&exchanges_map)?;
// Write to temporary file first (atomic write pattern)
let tmp_path = path.with_extension("json.tmp");
let mut file = fs::File::create(&tmp_path).await?;
file.write_all(json_content.as_bytes()).await?;
file.write_all(b"\n").await?;
file.flush().await?;
file.sync_all().await?;
// Atomic rename
fs::rename(&tmp_path, path).await?;
Ok(())
}
/// Format market cap as a human-readable string
fn format_market_cap(market_cap: f64) -> String {
if market_cap >= 1_000_000_000_000.0 {
format!("{:.2}T", market_cap / 1_000_000_000_000.0)
} else if market_cap >= 1_000_000_000.0 {
format!("{:.2}B", market_cap / 1_000_000_000.0)
} else if market_cap >= 1_000_000.0 {
format!("{:.2}M", market_cap / 1_000_000.0)
} else if market_cap >= 1_000.0 {
format!("{:.2}K", market_cap / 1_000.0)
} else {
format!("{:.2}", market_cap)
}
}
/// Print statistics about collected exchanges
async fn print_exchange_statistics(exchanges: &HashMap<String, ExchangeInfo>, fx_cache: &FxRateCache) {
logger::log_info("Exchange Statistics (sorted by USD market cap):").await;
// Sort by total market cap in USD (descending)
let mut exchange_list: Vec<_> = exchanges.iter().collect();
exchange_list.sort_by(|a, b| {
b.1.total_market_cap_usd
.partial_cmp(&a.1.total_market_cap_usd)
.unwrap_or(std::cmp::Ordering::Equal)
});
// Print top 20 exchanges by total market cap (USD)
logger::log_info(" Top 20 exchanges by total market cap (USD):").await;
for (i, (code, info)) in exchange_list.iter().take(20).enumerate() {
let (normalized_currency, factor) = normalize_currency(&info.currency);
let fx_rate = fx_cache.get_rate(&info.currency);
let fx_info = match fx_rate {
Some(rate) => {
if factor > 1.0 {
// Show conversion for pence/cents
format!(" (1 {} = {} {}, {} {} = 1 {})",
normalized_currency,
format!("{:.4}", rate),
"USD",
factor as i32,
info.currency,
normalized_currency)
} else {
format!(" (1 USD = {:.4} {})", rate, info.currency)
}
}
None => format!(" (using fallback rate for {})", info.currency),
};
logger::log_info(&format!(
" {}. {} ({}) - ${} USD ({}{} {}) - {} companies{}",
i + 1,
info.exchange_name,
code,
format_market_cap(info.total_market_cap_usd),
info.currency_symbol,
format_market_cap(info.total_market_cap as f64),
info.currency,
info.companies.len(),
if info.currency != "USD" { &fx_info } else { "" }
)).await;
}
// Count by currency
let mut currency_counts: HashMap<String, usize> = HashMap::new();
let mut currency_market_caps: HashMap<String, f64> = HashMap::new();
for info in exchanges.values() {
*currency_counts.entry(info.currency.clone()).or_insert(0) += info.companies.len();
*currency_market_caps.entry(info.currency.clone()).or_insert(0.0) += info.total_market_cap_usd;
}
let mut currencies: Vec<_> = currency_counts.iter().collect();
currencies.sort_by(|a, b| {
currency_market_caps.get(b.0)
.unwrap_or(&0.0)
.partial_cmp(currency_market_caps.get(a.0).unwrap_or(&0.0))
.unwrap_or(std::cmp::Ordering::Equal)
});
logger::log_info(" Market cap by currency (USD equivalent):").await;
for (currency, count) in currencies.iter().take(10) {
let market_cap_usd = currency_market_caps.get(*currency).unwrap_or(&0.0);
let (normalized_currency, factor) = normalize_currency(currency);
let fx_rate = fx_cache.get_rate(currency);
let fx_info = match fx_rate {
Some(rate) => {
if factor > 1.0 {
format!(" (1 {} = {:.4} USD, {} {} = 1 {})",
normalized_currency, rate, factor as i32, currency, normalized_currency)
} else {
format!(" (1 USD = {:.4} {})", rate, currency)
}
}
None => format!(" (fallback)"),
};
logger::log_info(&format!(
" {}: {} companies, ${} USD{}",
currency,
count,
format_market_cap(*market_cap_usd),
if *currency != "USD" { &fx_info } else { "" }
)).await;
}
// Delay statistics
let delayed_exchanges: Vec<_> = exchanges
.iter()
.filter(|(_, info)| info.exchange_data_delayed_by > 0)
.collect();
if !delayed_exchanges.is_empty() {
logger::log_info(&format!(
" Exchanges with data delay: {} (out of {})",
delayed_exchanges.len(),
exchanges.len()
)).await;
}
// Total market cap across all exchanges (in USD)
let total_market_cap_usd: f64 = exchanges.values()
.map(|info| info.total_market_cap_usd)
.sum();
logger::log_info(&format!(
" Total market cap across all exchanges: ${} USD",
format_market_cap(total_market_cap_usd)
)).await;
}
/// Get exchange information for a specific exchange code
pub async fn get_exchange_info(
paths: &DataPaths,
exchange_code: &str,
) -> anyhow::Result<Option<ExchangeInfo>> {
let exchanges_path = paths.data_dir().join("yahoo_exchanges.json");
if !exchanges_path.exists() {
return Ok(None);
}
let content = fs::read_to_string(&exchanges_path).await?;
let exchanges: HashMap<String, ExchangeInfo> = serde_json::from_str(&content)?;
Ok(exchanges.get(exchange_code).cloned())
}
/// List all available exchanges
pub async fn list_all_exchanges(paths: &DataPaths) -> anyhow::Result<Vec<(String, ExchangeInfo)>> {
let exchanges_path = paths.data_dir().join("yahoo_exchanges.json");
if !exchanges_path.exists() {
return Ok(Vec::new());
}
let content = fs::read_to_string(&exchanges_path).await?;
let exchanges: HashMap<String, ExchangeInfo> = serde_json::from_str(&content)?;
let mut exchange_list: Vec<_> = exchanges.into_iter().collect();
exchange_list.sort_by(|a, b| a.0.cmp(&b.0));
Ok(exchange_list)
}

View File

@@ -17,4 +17,6 @@ pub mod update_companies_cleanse;
pub mod update_companies_enrich;
pub mod update_companies_enrich_options_chart;
pub mod collect_exchanges;
pub use update::run_full_update;

View File

@@ -1,179 +1,13 @@
// src/corporate/scraper.rs
use super::{types::*};
//use crate::corporate::openfigi::OpenFigiClient;
use crate::{scraper::webdriver::*, util::directories::DataPaths, util::logger};
use crate::{util::directories::DataPaths, util::logger};
use fantoccini::{Client};
use scraper::{Html, Selector};
use chrono::{DateTime, Duration, NaiveDate, Utc};
use tokio::{time::{Duration as TokioDuration, sleep}};
use reqwest::Client as HttpClient;
use serde_json::{json, Value};
use zip::ZipArchive;
use std::{collections::HashMap};
use std::io::{Read};
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36";
fn parse_price(v: Option<&Value>) -> f64 {
v.and_then(|x| x.as_str())
.and_then(|s| s.replace('$', "").replace(',', "").parse::<f64>().ok())
.or_else(|| v.and_then(|x| x.as_f64()))
.unwrap_or(0.0)
}
fn parse_volume(v: Option<&Value>) -> u64 {
v.and_then(|x| x.as_str())
.and_then(|s| s.replace(',', "").parse::<u64>().ok())
.or_else(|| v.and_then(|x| x.as_u64()))
.unwrap_or(0)
}
pub async fn fetch_daily_price_history(
ticker: &str,
start_str: &str,
end_str: &str,
) -> anyhow::Result<Vec<CompanyPrice>> {
let start = NaiveDate::parse_from_str(start_str, "%Y-%m-%d")?;
let end = NaiveDate::parse_from_str(end_str, "%Y-%m-%d")? + Duration::days(1);
let mut all_prices = Vec::new();
let mut current = start;
while current < end {
let chunk_end = current + Duration::days(730);
let actual_end = chunk_end.min(end);
let period1 = current.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp();
let period2 = actual_end.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp();
println!(" Fetching {ticker} {}{}", current, actual_end - Duration::days(1));
let url = format!(
"https://query1.finance.yahoo.com/v8/finance/chart/{ticker}?period1={period1}&period2={period2}&interval=1d&includeAdjustedClose=true"
);
let json: Value = HttpClient::new()
.get(&url)
.header("User-Agent", USER_AGENT)
.send()
.await?
.json()
.await?;
let result = &json["chart"]["result"][0];
let timestamps = result["timestamp"].as_array().ok_or_else(|| anyhow::anyhow!("No timestamps"))?;
let quote = &result["indicators"]["quote"][0];
let meta = &result["meta"];
let currency = meta["currency"].as_str().unwrap_or("USD").to_string();
let opens = quote["open"].as_array();
let highs = quote["high"].as_array();
let lows = quote["low"].as_array();
let closes = quote["close"].as_array();
let adj_closes = result["indicators"]["adjclose"][0]["adjclose"].as_array()
.or_else(|| closes);
let volumes = quote["volume"].as_array();
for (i, ts_val) in timestamps.iter().enumerate() {
let ts = ts_val.as_i64().unwrap_or(0);
let dt: DateTime<Utc> = DateTime::from_timestamp(ts, 0).unwrap_or_default();
let date_str = dt.format("%Y-%m-%d").to_string();
if date_str < start_str.to_string() || date_str > end_str.to_string() {
continue;
}
let open = parse_price(opens.and_then(|a| a.get(i)));
let high = parse_price(highs.and_then(|a| a.get(i)));
let low = parse_price(lows.and_then(|a| a.get(i)));
let close = parse_price(closes.and_then(|a| a.get(i)));
let adj_close = parse_price(adj_closes.and_then(|a| a.get(i)));
let volume = parse_volume(volumes.and_then(|a| a.get(i)));
all_prices.push(CompanyPrice {
ticker: ticker.to_string(),
date: date_str,
time: "".to_string(),
open,
high,
low,
close,
adj_close,
volume,
currency: currency.clone(),
});
}
sleep(TokioDuration::from_millis(200)).await;
current = actual_end;
}
all_prices.sort_by_key(|p| (p.date.clone(), p.time.clone()));
all_prices.dedup_by(|a, b| a.date == b.date && a.time == b.time);
println!(" Got {} daily bars for {ticker}", all_prices.len());
Ok(all_prices)
}
pub async fn fetch_price_history_5min(
ticker: &str,
_start: &str,
_end: &str,
) -> anyhow::Result<Vec<CompanyPrice>> {
let now = Utc::now().timestamp();
let period1 = now - 5184000;
let period2 = now;
let url = format!(
"https://query1.finance.yahoo.com/v8/finance/chart/{ticker}?period1={period1}&period2={period2}&interval=5m&includeAdjustedClose=true"
);
let json: Value = HttpClient::new()
.get(&url)
.header("User-Agent", USER_AGENT)
.send()
.await?
.json()
.await?;
let result = &json["chart"]["result"][0];
let timestamps = result["timestamp"].as_array().ok_or_else(|| anyhow::anyhow!("No timestamps"))?;
let quote = &result["indicators"]["quote"][0];
let meta = &result["meta"];
let currency = meta["currency"].as_str().unwrap_or("USD").to_string();
let mut prices = Vec::new();
for (i, ts_val) in timestamps.iter().enumerate() {
let ts = ts_val.as_i64().unwrap_or(0);
let dt: DateTime<Utc> = DateTime::from_timestamp(ts, 0).unwrap_or_default();
let date_str = dt.format("%Y-%m-%d").to_string();
let time_str = dt.format("%H:%M:%S").to_string();
let open = parse_price(quote["open"].as_array().and_then(|a| a.get(i)));
let high = parse_price(quote["high"].as_array().and_then(|a| a.get(i)));
let low = parse_price(quote["low"].as_array().and_then(|a| a.get(i)));
let close = parse_price(quote["close"].as_array().and_then(|a| a.get(i)));
let volume = parse_volume(quote["volume"].as_array().and_then(|a| a.get(i)));
prices.push(CompanyPrice {
ticker: ticker.to_string(),
date: date_str,
time: time_str,
open,
high,
low,
close,
adj_close: close,
volume,
currency: currency.clone(),
});
}
prices.sort_by_key(|p| (p.date.clone(), p.time.clone()));
Ok(prices)
}
/// Fetch the URL of the latest ISIN↔LEI mapping CSV from GLEIF
/// Overengineered; we could just use the static URL, but this shows how to scrape if needed
pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow::Result<String> {

View File

@@ -5,6 +5,8 @@ use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel
use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data};
use crate::corporate::update_companies_enrich::enrich_companies_with_events;
use crate::corporate::update_companies_enrich_options_chart::{enrich_companies_with_options, enrich_companies_with_chart};
use crate::corporate::collect_exchanges::collect_and_save_exchanges;
use crate::economic::update_forex::collect_fx_rates;
use crate::util::directories::DataPaths;
use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool;
@@ -40,105 +42,105 @@ pub async fn run_full_update(
logger::log_warn("Shutdown detected after GLEIF download").await;
return Ok(());
}
logger::log_info("Step 2: Loading OpenFIGI metadata...").await;
load_figi_type_lists().await.ok();
logger::log_info(" ✓ OpenFIGI metadata loaded").await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after OpenFIGI load").await;
return Ok(());
}
logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await;
let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?;
if !all_mapped {
logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await;
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 2: Loading OpenFIGI metadata...").await;
load_figi_type_lists().await.ok();
logger::log_info(" ✓ OpenFIGI metadata loaded").await;
} else {
logger::log_info(" ✓ All LEIs successfully mapped").await;
logger::log_warn("Shutdown detected, skipping event index build").await;
}
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after LEI-FIGI mapping").await;
return Ok(());
}
logger::log_info("Step 4: Building securities map (streaming)...").await;
let date_dir = find_most_recent_figi_date_dir(&paths).await?;
if let Some(date_dir) = date_dir {
logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await;
load_or_build_all_securities(&date_dir).await?;
logger::log_info(" ✓ Securities map updated").await;
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 2: Loading OpenFIGI metadata...").await;
load_figi_type_lists().await.ok();
logger::log_info(" ✓ OpenFIGI metadata loaded").await;
} else {
logger::log_warn(" ✗ No FIGI data directory found").await;
logger::log_warn("Shutdown detected, skipping event index build").await;
}
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after securities map build").await;
return Ok(());
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await;
let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?;
if !all_mapped {
logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await;
} else {
logger::log_info(" ✓ All LEIs successfully mapped").await;
}
} else {
logger::log_warn("Shutdown detected, skipping event index build").await;
}
logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await;
let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, config, &None).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after companies.jsonl build").await;
return Ok(());
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 4: Building securities map (streaming)...").await;
let date_dir = find_most_recent_figi_date_dir(&paths).await?;
if let Some(date_dir) = date_dir {
logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await;
load_or_build_all_securities(&date_dir).await?;
logger::log_info(" ✓ Securities map updated").await;
} else {
logger::log_warn(" ✗ No FIGI data directory found").await;
}
} else {
logger::log_warn("Shutdown detected, skipping event index build").await;
}
logger::log_info("Step 6: Cleansing companies with missing essential data...").await;
let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?;
logger::log_info(&format!("{} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after no-data cleansing").await;
return Ok(());
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await;
let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, config, &None).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
} else {
logger::log_warn("Shutdown detected, skipping event index build").await;
}
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 6: Cleansing companies with missing essential data...").await;
let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?;
logger::log_info(&format!("{} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await;
} else {
logger::log_warn("Shutdown detected, skipping event index build").await;
}
logger::log_info("Step 7: Cleansing companies with too low profile (with abort-safe persistence)...").await;
let proxy_pool = pool.get_proxy_pool()
.ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?;
.ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?;
logger::log_info("Creating YahooClientPool with proxy rotation...").await;
let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?);
logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await;
let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!("{} companies with sufficient profile ready for analytics", cleansed_count)).await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after low-profile cleansing").await;
return Ok(());
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 7: Cleansing companies with too low profile (with abort-safe persistence)...").await;
let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!("{} companies with sufficient profile ready for analytics", cleansed_count)).await;
} else {
logger::log_warn("Shutdown detected, skipping event index build").await;
}
logger::log_info("Step 8: Enriching companies with Yahoo Events (with abort-safe persistence)...").await;
let enriched_count = enrich_companies_with_events(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!("{} companies enriched with event data", enriched_count)).await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after event enrichment").await;
return Ok(());
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 8: Enriching companies with Yahoo Events (with abort-safe persistence)...").await;
let enriched_count = enrich_companies_with_events(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!("{} companies enriched with event data", enriched_count)).await;
} else {
logger::log_warn("Shutdown detected, skipping event index build").await;
}
logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await;
let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!("{} companies enriched with options data", options_count)).await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after options enrichment").await;
return Ok(());
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await;
let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!("{} companies enriched with options data", options_count)).await;
} else {
logger::log_warn("Shutdown detected, skipping event index build").await;
}
logger::log_info("Step 10: Enriching companies with Yahoo Chart (with abort-safe persistence)...").await;
let chart_count = enrich_companies_with_chart(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!("{} companies enriched with chart data", chart_count)).await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after chart enrichment").await;
return Ok(());
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 10: Enriching companies with Yahoo Chart (with abort-safe persistence)...").await;
let chart_count = enrich_companies_with_chart(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!("{} companies enriched with chart data", chart_count)).await;
} else {
logger::log_warn("Shutdown detected, skipping event index build").await;
}
if !shutdown_flag.load(Ordering::SeqCst) {
@@ -149,6 +151,28 @@ pub async fn run_full_update(
logger::log_warn("Shutdown detected, skipping event index build").await;
}
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 12: Collecting FX rates...").await;
let proxy_pool = pool.get_proxy_pool()
.ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must have proxy rotation"))?;
let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?);
let fx_count = collect_fx_rates(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
logger::log_info(&format!(" ✓ Collected {} FX rates", fx_count)).await;
} else {
logger::log_warn("Shutdown detected, skipping FX rates collection").await;
}
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 13: Collecting exchange information...").await;
let exchange_count = collect_and_save_exchanges(&paths).await?;
logger::log_info(&format!(" ✓ Collected {} exchanges", exchange_count)).await;
} else {
logger::log_warn("Shutdown detected, skipping exchange collection").await;
}
logger::log_info("✅ Corporate update complete").await;
Ok(())
}

View File

@@ -266,8 +266,36 @@ pub async fn companies_yahoo_cleansed_low_profile(
existing_companies.len()
)).await;
// === CONSOLIDATE LOG BEFORE EARLY EXIT ===
if pending.is_empty() {
logger::log_info(" ✓ All companies already processed").await;
// Consolidate log into checkpoint before exiting
if log_path.exists() {
let log_metadata = tokio::fs::metadata(&log_path).await.ok();
if log_metadata.map(|m| m.len() > 0).unwrap_or(false) {
logger::log_info(" Consolidating update log into checkpoint...").await;
let temp_checkpoint = checkpoint_path.with_extension("tmp");
let mut temp_file = File::create(&temp_checkpoint).await?;
for company in existing_companies.values() {
let json_line = serde_json::to_string(company)?;
temp_file.write_all(json_line.as_bytes()).await?;
temp_file.write_all(b"\n").await?;
}
temp_file.flush().await?;
temp_file.sync_data().await?;
drop(temp_file);
tokio::fs::rename(&temp_checkpoint, &checkpoint_path).await?;
tokio::fs::remove_file(&log_path).await.ok();
logger::log_info(&format!(" ✓ Consolidated {} companies", existing_companies.len())).await;
}
}
return Ok(existing_companies.len());
}
@@ -575,6 +603,36 @@ pub async fn companies_yahoo_cleansed_low_profile(
final_valid, final_filtered_low_cap, final_filtered_no_price, final_failed
)).await;
// === VERIFY AND RECREATE FINAL OUTPUT ===
logger::log_info("Verifying final output integrity...").await;
let final_companies_map = existing_companies_writer.lock().await;
let expected_count = final_companies_map.len();
// Always write final consolidated checkpoint
let temp_checkpoint = checkpoint_path.with_extension("tmp");
let mut temp_file = File::create(&temp_checkpoint).await?;
for company in final_companies_map.values() {
let json_line = serde_json::to_string(company)?;
temp_file.write_all(json_line.as_bytes()).await?;
temp_file.write_all(b"\n").await?;
}
temp_file.flush().await?;
temp_file.sync_data().await?;
drop(temp_file);
tokio::fs::rename(&temp_checkpoint, &checkpoint_path).await?;
drop(final_companies_map);
// Clear log since everything is in checkpoint
if log_path.exists() {
tokio::fs::remove_file(&log_path).await.ok();
}
logger::log_info(&format!("✓ Final output: {} companies in {:?}", expected_count, checkpoint_path)).await;
// Shutdown Yahoo pool
yahoo_pool.shutdown().await?;
@@ -706,7 +764,7 @@ async fn process_company_with_validation(
// Validate market cap
let market_cap = extract_market_cap(&summary);
if market_cap < 1_000_000.0 {
if market_cap < 100_000_000.0 {
return CompanyProcessResult::FilteredLowCap {
name: company.name.clone(),
market_cap,

View File

@@ -6,7 +6,7 @@ use crate::util::logger;
use crate::scraper::yahoo::{YahooClientPool};
use std::result::Result::Ok;
use chrono::Utc;
use chrono::{TimeZone, Utc};
use std::collections::{HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -656,9 +656,12 @@ async fn enrich_company_with_chart(
// Get 1 year of daily chart data
let now = chrono::Utc::now().timestamp();
let twenty_five_years_ago = now - (25 * 365 * 24 * 60 * 60);
let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", twenty_five_years_ago, now).await?;
let start = chrono::Utc
.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
.unwrap()
.timestamp();
let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", start, now).await?;
// Only save if we got meaningful data
if chart_data.quotes.is_empty() {