added lei to isin mapping

This commit is contained in:
2025-11-25 00:21:51 +01:00
parent bbc19f2110
commit e57a013224
10 changed files with 574 additions and 104 deletions

View File

@@ -16,8 +16,8 @@ struct DayData {
}
/// Aggregate price data from multiple exchanges, converting all to USD
pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
let company_dir = get_company_dir(isin);
pub async fn aggregate_best_price_data(lei: &str) -> anyhow::Result<()> {
let company_dir = get_company_dir(lei);
for timeframe in ["daily", "5min"].iter() {
let source_dir = company_dir.join(timeframe);
@@ -136,7 +136,7 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
.unwrap_or_else(|| "unknown".to_string());
aggregated.push(CompanyPrice {
ticker: format!("{}@agg", isin), // Mark as aggregated
ticker: format!("{lei}@agg"), // Mark as aggregated
date,
time,
open: data.open,
@@ -159,7 +159,7 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
// Save aggregation metadata
let meta = AggregationMetadata {
isin: isin.to_string(),
lei: lei.to_string(), // ← CHANGE THIS
timeframe: timeframe.to_string(),
sources: sources_used.into_iter().collect(),
total_bars: aggregated.len(),
@@ -169,7 +169,7 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
),
aggregated_at: chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
};
let meta_path = agg_dir.join("metadata.json");
fs::write(&meta_path, serde_json::to_string_pretty(&meta)?).await?;
@@ -185,7 +185,7 @@ pub async fn aggregate_best_price_data(isin: &str) -> anyhow::Result<()> {
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct AggregationMetadata {
isin: String,
lei: String,
timeframe: String,
sources: Vec<String>,
total_bars: usize,

View File

@@ -57,4 +57,14 @@ pub fn price_key(p: &CompanyPrice) -> String {
} else {
format!("{}|{}|{}", p.ticker, p.date, p.time)
}
}
pub fn parse_float(s: &str) -> Option<f64> {
s.replace("--", "").replace(",", "").parse::<f64>().ok()
}
pub fn parse_yahoo_date(s: &str) -> anyhow::Result<NaiveDate> {
NaiveDate::parse_from_str(s, "%B %d, %Y")
.or_else(|_| NaiveDate::parse_from_str(s, "%b %d, %Y"))
.map_err(|_| anyhow::anyhow!("Bad date: {s}"))
}

View File

@@ -1,11 +1,16 @@
// src/corporate/scraper.rs
use super::types::{CompanyEvent, CompanyPrice, TickerInfo};
use super::{types::{CompanyEvent, CompanyPrice, TickerInfo}, helpers::*};
use csv::ReaderBuilder;
use fantoccini::{Client, Locator};
use scraper::{Html, Selector};
use chrono::{DateTime, Duration, NaiveDate, Timelike, Utc};
use tokio::time::{sleep, Duration as TokioDuration};
use tokio::{time::{Duration as TokioDuration, sleep}};
use reqwest::Client as HttpClient;
use serde_json::Value;
use zip::ZipArchive;
use std::fs::File;
use std::{collections::HashMap};
use std::io::{Read, BufReader};
const USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36";
@@ -136,8 +141,8 @@ async fn check_ticker_exists(ticker: &str) -> anyhow::Result<TickerInfo> {
Ok(TickerInfo {
ticker: ticker.to_string(),
exchange_mic,
currency,
primary: false, // Will be set separately
currency: currency.to_string(),
primary: false,
})
}
@@ -418,12 +423,170 @@ pub async fn fetch_price_history_5min(
Ok(prices)
}
fn parse_float(s: &str) -> Option<f64> {
s.replace("--", "").replace(",", "").parse::<f64>().ok()
/// Fetch the URL of the latest ISIN↔LEI mapping CSV from GLEIF
/// Overengineered; we could just use the static URL, but this shows how to scrape if needed
pub async fn _fetch_latest_gleif_isin_lei_mapping_url(client: &Client) -> anyhow::Result<String> {
let url = format!("https://www.gleif.org/de/lei-data/lei-mapping/download-isin-to-lei-relationship-files");
client.goto(&url).await?;
let html = client.source().await?;
let _document = Html::parse_document(&html);
let _row_sel = Selector::parse("table tbody tr").unwrap();
let isin_lei = "".to_string();
Ok(isin_lei)
}
fn parse_yahoo_date(s: &str) -> anyhow::Result<NaiveDate> {
NaiveDate::parse_from_str(s, "%B %d, %Y")
.or_else(|_| NaiveDate::parse_from_str(s, "%b %d, %Y"))
.map_err(|_| anyhow::anyhow!("Bad date: {s}"))
pub async fn download_isin_lei_csv() -> anyhow::Result<Option<String>> {
let url = "https://mapping.gleif.org/api/v2/isin-lei/9315e3e3-305a-4e71-b062-46714740fa8d/download";
let zip_path = "data/isin_lei.zip";
let csv_path = "data/isin_lei.csv";
if let Err(e) = std::fs::create_dir_all("data") {
println!("Failed to create data directory: {e}");
return Ok(None);
}
// Download ZIP
let bytes = match reqwest::Client::builder()
.user_agent(USER_AGENT)
.timeout(std::time::Duration::from_secs(30))
.build()
.and_then(|c| Ok(c))
{
Ok(client) => match client.get(url).send().await {
Ok(resp) if resp.status().is_success() => match resp.bytes().await {
Ok(b) => b,
Err(e) => {
println!("Failed to read ZIP bytes: {e}");
return Ok(None);
}
},
Ok(resp) => {
println!("Server returned HTTP {}", resp.status());
return Ok(None);
}
Err(e) => {
println!("Failed to download ISIN/LEI ZIP: {e}");
return Ok(None);
}
},
Err(e) => {
println!("Failed to create HTTP client: {e}");
return Ok(None);
}
};
if let Err(e) = tokio::fs::write(zip_path, &bytes).await {
println!("Failed to write ZIP file: {e}");
return Ok(None);
}
// Extract CSV
let archive = match std::fs::File::open(zip_path)
.map(ZipArchive::new)
{
Ok(Ok(a)) => a,
Ok(Err(e)) => {
println!("Invalid ZIP: {e}");
return Ok(None);
}
Err(e) => {
println!("Cannot open ZIP file: {e}");
return Ok(None);
}
};
let mut archive = archive;
let idx = match (0..archive.len()).find(|&i| {
archive.by_index(i)
.map(|f| f.name().ends_with(".csv"))
.unwrap_or(false)
}) {
Some(i) => i,
None => {
println!("ZIP did not contain a CSV file");
return Ok(None);
}
};
let mut csv_file = match archive.by_index(idx) {
Ok(f) => f,
Err(e) => {
println!("Failed to read CSV entry: {e}");
return Ok(None);
}
};
let mut csv_bytes = Vec::new();
if let Err(e) = csv_file.read_to_end(&mut csv_bytes) {
println!("Failed to extract CSV: {e}");
return Ok(None);
}
if let Err(e) = tokio::fs::write(csv_path, &csv_bytes).await {
println!("Failed to save CSV file: {e}");
return Ok(None);
}
Ok(Some(csv_path.to_string()))
}
pub fn load_isin_lei_csv() -> anyhow::Result<HashMap<String, Vec<String>>> {
let rt = tokio::runtime::Runtime::new();
let Some(path) =
(match rt {
Ok(rt) => match rt.block_on(download_isin_lei_csv()) {
Ok(Some(p)) => Some(p),
Ok(None) => {
println!("ISIN/LEI download failed; continuing with empty map");
None
}
Err(e) => {
println!("Runtime download error: {e}");
None
}
},
Err(e) => {
println!("Failed to create Tokio runtime: {e}");
None
}
}
) else {
return Ok(HashMap::new());
};
let file = match File::open(&path) {
Ok(f) => f,
Err(e) => {
println!("Cannot open CSV '{}': {e}", path);
return Ok(HashMap::new());
}
};
let mut rdr = ReaderBuilder::new().from_reader(BufReader::new(file));
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for row in rdr.records() {
let rec = match row {
Ok(r) => r,
Err(e) => {
println!("CSV parse error: {e}");
continue;
}
};
if rec.len() < 2 {
continue;
}
let lei = rec[0].to_string();
let isin = rec[1].to_string();
map.entry(lei).or_default().push(isin);
}
Ok(map)
}

View File

@@ -111,8 +111,8 @@ pub async fn load_companies() -> Result<Vec<CompanyMetadata>, anyhow::Error> {
Ok(companies)
}
pub fn get_company_dir(isin: &str) -> PathBuf {
PathBuf::from("corporate_prices").join(isin)
pub fn get_company_dir(lei: &str) -> PathBuf {
PathBuf::from("corporate_prices").join(lei)
}
pub async fn ensure_company_dirs(isin: &str) -> anyhow::Result<()> {
@@ -131,13 +131,19 @@ pub async fn ensure_company_dirs(isin: &str) -> anyhow::Result<()> {
}
pub async fn save_company_metadata(company: &CompanyMetadata) -> anyhow::Result<()> {
let dir = get_company_dir(&company.isin);
let dir = get_company_dir(&company.lei);
fs::create_dir_all(&dir).await?;
let path = dir.join("metadata.json");
fs::write(path, serde_json::to_string_pretty(company)?).await?;
fs::write(&path, serde_json::to_string_pretty(company)?).await?;
Ok(())
}
pub async fn load_company_metadata(lei: &str) -> anyhow::Result<CompanyMetadata> {
let path = get_company_dir(lei).join("metadata.json");
let content = fs::read_to_string(path).await?;
Ok(serde_json::from_str(&content)?)
}
pub async fn save_available_exchanges(isin: &str, exchanges: Vec<AvailableExchange>) -> anyhow::Result<()> {
let dir = get_company_dir(isin);
fs::create_dir_all(&dir).await?;
@@ -146,8 +152,8 @@ pub async fn save_available_exchanges(isin: &str, exchanges: Vec<AvailableExchan
Ok(())
}
pub async fn load_available_exchanges(isin: &str) -> anyhow::Result<Vec<AvailableExchange>> {
let path = get_company_dir(isin).join("available_exchanges.json");
pub async fn load_available_exchanges(lei: &str) -> anyhow::Result<Vec<AvailableExchange>> {
let path = get_company_dir(lei).join("available_exchanges.json");
if path.exists() {
let content = fs::read_to_string(&path).await?;
Ok(serde_json::from_str(&content)?)
@@ -157,19 +163,17 @@ pub async fn load_available_exchanges(isin: &str) -> anyhow::Result<Vec<Availabl
}
pub async fn save_prices_by_source(
isin: &str,
lei: &str,
source_ticker: &str,
timeframe: &str,
prices: Vec<CompanyPrice>,
) -> anyhow::Result<()> {
let source_safe = source_ticker.replace(".", "_").replace("/", "_");
let dir = get_company_dir(isin).join(timeframe).join(&source_safe);
let dir = get_company_dir(lei).join(timeframe).join(&source_safe);
fs::create_dir_all(&dir).await?;
let path = dir.join("prices.json");
let mut prices = prices;
prices.sort_by_key(|p| (p.date.clone(), p.time.clone()));
fs::write(&path, serde_json::to_string_pretty(&prices)?).await?;
Ok(())
}

View File

@@ -49,8 +49,10 @@ pub struct TickerInfo {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyMetadata {
pub isin: String,
pub lei: String, // e.g. "5493000J2N45DDNE4Y28"
pub name: String,
pub isins: Vec<String>, // All ISINs belonging to this legal entity (primary + ADR + GDR)
pub primary_isin: String, // The most liquid / preferred one (used for folder fallback)
pub tickers: Vec<TickerInfo>,
}

View File

@@ -6,122 +6,137 @@ use chrono::Local;
use std::collections::HashMap;
pub async fn run_full_update(client: &fantoccini::Client, config: &Config) -> anyhow::Result<()> {
println!("Starting company-centric corporate update (ISIN-based)");
println!("Starting LEI-based corporate update");
// 1. Download fresh GLEIF ISIN↔LEI mapping on every run
let lei_to_isins: HashMap<String, Vec<String>> = match load_isin_lei_csv() {
Ok(map) => map,
Err(e) => {
println!("Warning: Failed to load ISIN↔LEI mapping: {}", e);
HashMap::new()
}
};
//let _isin_to_lei = load_isin_to_lei()?; // optional, useful for migration scripts
let companies = load_companies().await?;
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
let mut existing_events = load_existing_events().await?;
for company in companies {
println!("\nProcessing company: {} ({})", company.name, company.isin);
let companies = load_companies().await?; // Vec<CompanyMetadata> with lei, isins, tickers
ensure_company_dirs(&company.isin).await?;
save_company_metadata(&company).await?;
for mut company in companies {
println!("\nProcessing company: {} (LEI: {})", company.name, company.lei);
// === STEP 1: Discover all available exchanges ===
let mut all_tickers = company.tickers.clone();
// Try to discover additional exchanges using the primary ticker
if let Some(primary_ticker) = company.tickers.iter().find(|t| t.primary) {
println!(" 🔍 Discovering additional exchanges...");
match discover_available_exchanges(&company.isin, &primary_ticker.ticker).await {
Ok(discovered) => {
// Merge discovered tickers with existing ones
for disc in discovered {
if !all_tickers.iter().any(|t| t.ticker == disc.ticker) {
println!(" ✓ Found new exchange: {} ({})", disc.ticker, disc.exchange_mic);
all_tickers.push(disc);
}
}
// === Enrich with ALL ISINs known to GLEIF (includes ADRs, GDRs, etc.) ===
if let Some(all_isins) = lei_to_isins.get(&company.lei) {
let mut seen = company.isins.iter().cloned().collect::<std::collections::HashSet<_>>();
for isin in all_isins {
if !seen.contains(isin) {
company.isins.push(isin.clone());
seen.insert(isin.clone());
}
Err(e) => println!(" ⚠ Discovery failed: {}", e),
}
}
// Update metadata with newly discovered tickers
if all_tickers.len() > company.tickers.len() {
let updated_company = CompanyMetadata {
isin: company.isin.clone(),
name: company.name.clone(),
tickers: all_tickers.clone(),
};
save_company_metadata(&updated_company).await?;
println!(" 📝 Updated metadata with {} total tickers", all_tickers.len());
// Ensure company directory exists (now uses LEI)
ensure_company_dirs(&company.lei).await?;
save_company_metadata(&company).await?;
// === STEP 1: Discover additional exchanges using each known ISIN ===
let mut all_tickers = company.tickers.clone();
if let Some(primary_ticker) = company.tickers.iter().find(|t| t.primary) {
println!(" Discovering additional exchanges across {} ISIN(s)...", company.isins.len());
for isin in &company.isins {
println!(" → Checking ISIN: {}", isin);
match discover_available_exchanges(isin, &primary_ticker.ticker).await {
Ok(discovered) => {
if discovered.is_empty() {
println!(" No new exchanges found for {}", isin);
} else {
for disc in discovered {
if !all_tickers.iter().any(|t| t.ticker == disc.ticker && t.exchange_mic == disc.exchange_mic) {
println!(" Found new listing: {} ({}) [ISIN: {}]", disc.ticker, disc.exchange_mic, isin);
all_tickers.push(disc);
}
}
}
}
Err(e) => println!(" Discovery failed for {}: {}", isin, e),
}
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
}
}
// === STEP 2: Fetch data from all available exchanges ===
// Save updated metadata if we found new listings
if all_tickers.len() > company.tickers.len() {
company.tickers = all_tickers.clone();
save_company_metadata(&company).await?;
println!(" Updated metadata: {} total tickers", all_tickers.len());
}
// === STEP 2: Fetch data from ALL available tickers ===
for ticker_info in &all_tickers {
let ticker = &ticker_info.ticker;
println!("Trying ticker: {} ({})", ticker, ticker_info.exchange_mic);
println!("Fetching: {} ({})", ticker, ticker_info.exchange_mic);
let mut daily_success = false;
let mut intraday_success = false;
// Earnings (only from primary ticker to avoid duplicates)
// Earnings: only fetch from primary ticker to avoid duplicates
if ticker_info.primary {
if let Ok(new_events) = fetch_earnings_history(client, ticker).await {
let result = process_batch(&new_events, &mut existing_events, &today);
save_changes(&result.changes).await?;
println!(" {} earnings events", new_events.len());
println!(" Earnings events: {}", new_events.len());
}
}
// Daily prices
match fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await {
Ok(prices) => {
if !prices.is_empty() {
save_prices_by_source(&company.isin, ticker, "daily", prices.clone()).await?;
daily_success = true;
println!(" ✓ Saved {} daily bars ({} currency)",
prices.len(),
prices.first().map(|p| p.currency.as_str()).unwrap_or("?")
);
}
if let Ok(prices) = fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await {
if !prices.is_empty() {
save_prices_by_source(&company.lei, ticker, "daily", prices).await?;
daily_success = true;
}
Err(e) => println!(" ✗ Daily fetch failed: {}", e),
}
// 5-minute prices (last 60 days)
// 5-minute intraday (last 60 days)
let sixty_days_ago = (chrono::Local::now() - chrono::Duration::days(60))
.format("%Y-%m-%d")
.to_string();
match fetch_price_history_5min(ticker, &sixty_days_ago, &today).await {
Ok(prices) => {
if !prices.is_empty() {
save_prices_by_source(&company.isin, ticker, "5min", prices.clone()).await?;
intraday_success = true;
println!(" ✓ Saved {} 5min bars", prices.len());
}
if let Ok(prices) = fetch_price_history_5min(ticker, &sixty_days_ago, &today).await {
if !prices.is_empty() {
save_prices_by_source(&company.lei, ticker, "5min", prices).await?;
intraday_success = true;
}
Err(e) => println!(" ✗ 5min fetch failed: {}", e),
}
// Record success in available_exchanges.json
if daily_success || intraday_success {
update_available_exchange(
&company.isin,
ticker,
&ticker_info.exchange_mic,
daily_success,
intraday_success,
).await?;
}
// Update available_exchanges.json (now under LEI folder)
update_available_exchange(
&company.lei,
ticker,
&ticker_info.exchange_mic,
daily_success,
intraday_success,
).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(800)).await;
}
// === STEP 3: Aggregate prices from all sources ===
println!(" 📊 Aggregating multi-exchange data with FX conversion...");
match aggregate_best_price_data(&company.isin).await {
Ok(_) => println!(" Aggregation complete"),
Err(e) => println!(" ⚠ Aggregation warning: {}", e),
// === STEP 3: Aggregate all sources into unified USD prices ===
println!(" Aggregating multi-source price data (FX-adjusted)...");
if let Err(e) = aggregate_best_price_data(&company.lei).await {
println!(" Aggregation failed: {}", e);
} else {
println!(" Aggregation complete");
}
}
// Final save of optimized earnings events
save_optimized_events(existing_events).await?;
println!("\nCorporate update complete (ISIN-based)");
println!("\nCorporate update complete (LEI-based)");
Ok(())
}

View File

@@ -9,15 +9,14 @@ pub async fn ensure_data_dirs() -> anyhow::Result<()> {
"economic_event_changes",
"corporate_events",
"corporate_prices",
"data",
];
for dir in dirs {
let path = Path::new(dir);
if !path.exists() {
fs::create_dir_all(path).await?;
tokio::fs::create_dir_all(path).await?;
println!("Created directory: {dir}");
}
// else → silently continue
}
Ok(())
}