added creating CompanyInfo mapping
This commit is contained in:
@@ -1,35 +1,33 @@
|
||||
// src/corporate/update.rs
|
||||
use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfigi::*};
|
||||
use crate::config::Config;
|
||||
use crate::scraper::webdriver::ChromeDriverPool;
|
||||
|
||||
use chrono::Local;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use futures::{stream::{self, StreamExt}};
|
||||
|
||||
/// Hauptfunktion: Vollständiger Update-Durchlauf für alle Unternehmen (LEI-basiert)
|
||||
/// Main function: Full update for all companies (LEI-based) with optimized parallel execution.
|
||||
///
|
||||
/// Diese Funktion koordiniert den gesamten Update-Prozess:
|
||||
/// - Lädt GLEIF-Mappings
|
||||
/// - Baut FIGI-LEI-Map
|
||||
/// - Lädt bestehende Events
|
||||
/// - Verarbeitet jede Company: Ergänzt ISINs (abgeleitet aus FIGI), entdeckt Exchanges via FIGI,
|
||||
/// holt Prices & Earnings, aggregiert Daten
|
||||
/// - Speichert optimierte Events
|
||||
/// This function coordinates the entire update process:
|
||||
/// - Loads GLEIF mappings
|
||||
/// - Builds FIGI-LEI map
|
||||
/// - Loads existing events
|
||||
/// - Processes each company: discovers exchanges via FIGI, fetches prices & earnings, aggregates data
|
||||
/// - Uses the provided shared ChromeDriver pool for efficient parallel scraping
|
||||
/// - Saves optimized events
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `config` - Konfiguration mit Startdaten etc.
|
||||
///
|
||||
/// # Returns
|
||||
/// `Ok(())` bei Erfolg, sonst `anyhow::Error` mit Kontext.
|
||||
/// * `config` - The application configuration.
|
||||
/// * `pool` - Shared pool of ChromeDriver instances for scraping.
|
||||
///
|
||||
/// # Errors
|
||||
/// - Mapping-Laden fehlschlägt (Warning, fährt mit leer fort)
|
||||
/// - Company-Laden/Bauen fehlschlägt
|
||||
/// - Directory Creation oder Speichern fehlschlägt
|
||||
/// - Discovery/Fetch/Aggregation pro Company fehlschlägt (fortgesetzt bei Fehlern, mit Log)
|
||||
pub async fn run_full_update(config: &Config) -> anyhow::Result<()> {
|
||||
/// Returns an error if any step in the update process fails.
|
||||
pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
|
||||
println!("=== Starting LEI-based corporate full update ===");
|
||||
|
||||
// 1. Frisches GLEIF ISIN ↔ LEI Mapping laden (jeder Lauf neu)
|
||||
// 1. Load fresh GLEIF ISIN ↔ LEI mapping
|
||||
let lei_to_isins: HashMap<String, Vec<String>> = match load_isin_lei_csv().await {
|
||||
Ok(map) => map,
|
||||
Err(e) => {
|
||||
@@ -38,8 +36,16 @@ pub async fn run_full_update(config: &Config) -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// 2. FIGI → LEI Map (optional, nur mit API-Key sinnvoll)
|
||||
let figi_to_lei= match build_lei_to_figi_infos(&lei_to_isins).await {
|
||||
// 2. Load OpenFIGI mapping value lists (cached)
|
||||
if let Err(e) = load_figi_type_lists().await {
|
||||
eprintln!("Warning: Could not load OpenFIGI type lists: {}", e);
|
||||
}
|
||||
|
||||
// 3. Build FIGI → LEI map
|
||||
// # Attributes
|
||||
// * lei: Structuring the companies by legal dependencies [LEI -> Vec<ISIN>]
|
||||
// * figi: metadata with ISIN as key
|
||||
let figi_to_lei:HashMap<String, Vec<FigiInfo>> = match build_lei_to_figi_infos(&lei_to_isins).await {
|
||||
Ok(map) => map,
|
||||
Err(e) => {
|
||||
eprintln!("Warning: Could not build FIGI→LEI map: {}", e);
|
||||
@@ -47,7 +53,11 @@ pub async fn run_full_update(config: &Config) -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// 3. Bestehende Earnings-Events laden (für Change-Detection)
|
||||
// 4. Load or build companies
|
||||
let mut companies = load_or_build_companies_by_name(&figi_to_lei).await?;
|
||||
println!("Processing {} companies", companies.len());
|
||||
|
||||
// 5. Load existing earnings events (for change detection)
|
||||
let today = Local::now().format("%Y-%m-%d").to_string();
|
||||
let mut existing_events = match load_existing_events().await {
|
||||
Ok(events) => events,
|
||||
@@ -57,162 +67,47 @@ pub async fn run_full_update(config: &Config) -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// 4. Unternehmen laden / neu aufbauen (LEI + FIGI-Infos)
|
||||
let mut companies: Vec<CompanyMetadata> = load_or_build_companies_lei(&lei_to_isins).await?;
|
||||
// 5. Use the provided pool (no need to create a new one)
|
||||
let pool_size = pool.get_number_of_instances(); // Use the size from the shared pool
|
||||
|
||||
// 4.1 LEIs anreichern (falls missing, über bekannte ISINs aus FIGI suchen)
|
||||
//enrich_companies_with_leis(&mut companies, &lei_to_isins).await?;
|
||||
|
||||
// 5. Haupt-Loop: Jedes Unternehmen verarbeiten
|
||||
for company in companies.iter_mut() {
|
||||
let lei = &company.lei;
|
||||
let figi_infos = company.figi.as_ref().map_or(&[][..], |v| &v[..]);
|
||||
let name = figi_infos.first().map(|f| f.name.as_str()).unwrap_or("Unknown");
|
||||
println!("\nProcessing company: {} (LEI: {})", name, lei);
|
||||
|
||||
// --- 5.1 Alle bekannten ISINs aus GLEIF ergänzen ---
|
||||
let mut all_isins = lei_to_isins.get(lei).cloned().unwrap_or_default();
|
||||
let figi_isins: Vec<String> = figi_infos.iter().map(|f| f.isin.clone()).collect::<HashSet<_>>().into_iter().collect();
|
||||
all_isins.extend(figi_isins);
|
||||
all_isins.sort();
|
||||
all_isins.dedup(); // Unique ISINs
|
||||
|
||||
// --- 5.2 Verzeichnisstruktur anlegen & Metadaten speichern ---
|
||||
ensure_company_dirs(lei).await?;
|
||||
save_company_metadata(company).await?;
|
||||
|
||||
// --- 5.3 FIGI-Infos ermitteln (falls noch nicht vorhanden) ---
|
||||
let figi_infos = company.figi.get_or_insert_with(Vec::new);
|
||||
if figi_infos.is_empty() {
|
||||
println!(" No FIGI data yet → discovering exchanges via first known ISIN");
|
||||
let first_isin = all_isins.first().cloned().unwrap_or_default();
|
||||
if !first_isin.is_empty() {
|
||||
match discover_available_exchanges(&first_isin, "").await {
|
||||
Ok(discovered) => {
|
||||
figi_infos.extend(discovered);
|
||||
println!(" Discovered {} exchange(s) for first ISIN", figi_infos.len());
|
||||
}
|
||||
Err(e) => eprintln!(" Discovery failed for first ISIN: {}", e),
|
||||
}
|
||||
// Process companies in parallel using the shared pool
|
||||
/*let results: Vec<_> = stream::iter(companies.into_iter())
|
||||
.map(|company| {
|
||||
let pool_clone = pool.clone();
|
||||
async move {
|
||||
process_company_data(&company, &pool_clone, &mut existing_events).await
|
||||
}
|
||||
} else {
|
||||
println!(" {} exchange(s) already known", figi_infos.len());
|
||||
})
|
||||
.buffer_unordered(pool_size)
|
||||
.collect().await;
|
||||
|
||||
// Handle results (e.g., collect changes)
|
||||
let mut all_changes = Vec::new();
|
||||
for result in results {
|
||||
if let Ok(ProcessResult { changes }) = result {
|
||||
all_changes.extend(changes);
|
||||
}
|
||||
}*/
|
||||
|
||||
// --- 5.4 Weitere Exchanges über alle ISINs suchen ---
|
||||
let mut new_discovered = 0;
|
||||
for isin in &all_isins {
|
||||
if figi_infos.iter().any(|f| f.isin == *isin) {
|
||||
continue; // Schon bekannt
|
||||
}
|
||||
println!(" Discovering additional exchanges for ISIN {}", isin);
|
||||
match discover_available_exchanges(isin, "").await {
|
||||
Ok(mut found) => {
|
||||
for info in found.drain(..) {
|
||||
if !figi_infos.iter().any(|f| f.ticker == info.ticker && f.mic_code == info.mic_code) {
|
||||
figi_infos.push(info);
|
||||
new_discovered += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => eprintln!(" Discovery failed for {}: {}", isin, e),
|
||||
}
|
||||
}
|
||||
if new_discovered > 0 {
|
||||
println!(" +{} new exchange(s) discovered and added", new_discovered);
|
||||
}
|
||||
|
||||
// --- 5.5 AvailableExchange-Einträge anlegen (für Preis-Downloads) ---
|
||||
for figi in figi_infos.iter() {
|
||||
if let Err(e) = add_discovered_exchange(&figi.isin, figi).await {
|
||||
eprintln!(" Failed to record exchange {}: {}", figi.ticker, e);
|
||||
}
|
||||
}
|
||||
|
||||
// --- 5.6 Preisdaten von allen Exchanges holen ---
|
||||
println!(" Fetching price data from {} exchange(s)...", figi_infos.len());
|
||||
let primary_isin = figi_infos.first().map(|f| f.isin.clone()).unwrap_or_default();
|
||||
for figi in figi_infos.iter() {
|
||||
let ticker = &figi.ticker;
|
||||
let mic = &figi.mic_code;
|
||||
let is_primary = figi.isin == primary_isin;
|
||||
let mut daily_success = false;
|
||||
let mut intraday_success = false;
|
||||
|
||||
// Earnings: only fetch from primary ticker to avoid duplicates
|
||||
if is_primary {
|
||||
match fetch_earnings_history(client, ticker).await {
|
||||
Ok(new_events) => {
|
||||
let result = process_batch(&new_events, &mut existing_events, &today);
|
||||
save_changes(&result.changes).await?;
|
||||
println!(" Earnings events: {}", new_events.len());
|
||||
}
|
||||
Err(e) => eprintln!(" Failed to fetch earnings for {}: {}", ticker, e),
|
||||
}
|
||||
}
|
||||
|
||||
// Daily prices
|
||||
match fetch_daily_price_history(ticker, &config.corporate_start_date, &today).await {
|
||||
Ok(prices) => {
|
||||
if !prices.is_empty() {
|
||||
save_prices_by_source(lei, ticker, "daily", prices).await?;
|
||||
daily_success = true;
|
||||
}
|
||||
}
|
||||
Err(e) => eprintln!(" Failed to fetch daily prices for {}: {}", ticker, e),
|
||||
}
|
||||
|
||||
// 5-minute intraday (last 60 days)
|
||||
let sixty_days_ago = (Local::now() - chrono::Duration::days(60))
|
||||
.format("%Y-%m-%d")
|
||||
.to_string();
|
||||
match fetch_price_history_5min(ticker, &sixty_days_ago, &today).await {
|
||||
Ok(prices) => {
|
||||
if !prices.is_empty() {
|
||||
save_prices_by_source(lei, ticker, "5min", prices).await?;
|
||||
intraday_success = true;
|
||||
}
|
||||
}
|
||||
Err(e) => eprintln!(" Failed to fetch 5min prices for {}: {}", ticker, e),
|
||||
}
|
||||
|
||||
// Update available_exchanges.json (now under LEI folder)
|
||||
update_available_exchange(&figi.isin, ticker, mic, daily_success, intraday_success).await?;
|
||||
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(800)).await;
|
||||
}
|
||||
|
||||
// --- 5.7 Aggregation aller Quellen → einheitliche USD-Preise ---
|
||||
println!(" Aggregating price data across all sources (FX-adjusted to USD)");
|
||||
if let Err(e) = aggregate_best_price_data(lei).await {
|
||||
eprintln!(" Aggregation failed: {}", e);
|
||||
} else {
|
||||
println!(" Aggregation completed successfully");
|
||||
}
|
||||
|
||||
// Metadaten erneut speichern (falls FIGIs hinzugefügt wurden)
|
||||
save_company_metadata(company).await?;
|
||||
}
|
||||
|
||||
// 6. Optimierte Earnings-Events final speichern
|
||||
save_optimized_events(existing_events).await?;
|
||||
println!("\n=== Corporate full update completed successfully ===");
|
||||
//save_changes(&all_changes).await?;
|
||||
|
||||
//println!("Corporate update complete — {} changes detected", all_changes.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Companies mit LEIs anreichern
|
||||
async fn _enrich_companies_with_leis(
|
||||
companies: &mut Vec<CompanyMetadata>,
|
||||
lei_to_isins: &HashMap<String, Vec<String>>,
|
||||
async fn assign_leis_from_figi(
|
||||
companies: &mut [CompanyMetadata],
|
||||
lei_to_isins: &HashMap<String, Vec<String>>
|
||||
) -> anyhow::Result<()> {
|
||||
for company in companies.iter_mut() {
|
||||
if !company.lei.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
for company in companies {
|
||||
let figi_infos = company.figi.as_ref().map_or(&[][..], |v| &v[..]);
|
||||
let isins: Vec<String> = figi_infos.iter().map(|f| f.isin.clone()).collect::<HashSet<_>>().into_iter().collect();
|
||||
let isins: Vec<String> = figi_infos
|
||||
.iter()
|
||||
.map(|f| f.isin.clone())
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
// Try to find LEI by any known ISIN
|
||||
for isin in &isins {
|
||||
@@ -228,7 +123,7 @@ async fn _enrich_companies_with_leis(
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ProcessResult {
|
||||
pub changes: Vec<CompanyEventChange>,
|
||||
|
||||
Reference in New Issue
Block a user