added companie mapping with yahoo tickers

This commit is contained in:
2025-12-14 16:48:02 +01:00
parent 00c9d45642
commit d744769138
12 changed files with 1507 additions and 2591 deletions

View File

@@ -1,170 +1,274 @@
// src/corporate/update.rs
use super::{scraper::*, storage::*, helpers::*, types::*, aggregation::*, openfigi::*};
// src/corporate/update.rs - COMPLETE STREAMING VERSION
use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*};
use crate::config::Config;
use crate::util::directories::DataPaths;
use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool;
use chrono::Local;
use std::collections::{HashMap};
use std::collections::HashMap;
use std::sync::Arc;
/// Main function: Full update for all companies with streaming to minimize memory usage
pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
let msg = "=== Starting LEI-based corporate full update (STREAMING) ===";
println!("{}", msg);
logger::log_info(msg).await;
/// Main update function - fully streaming, minimal memory usage
pub async fn run_full_update(_config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
logger::log_info("=== Corporate Update (STREAMING MODE) ===").await;
let paths = DataPaths::new(".")?;
// Step 1: Download/locate GLEIF CSV (don't load into memory yet)
logger::log_info("Corporate Update: Downloading/locating GLEIF CSV...").await;
// Step 1: Download GLEIF CSV (don't load into memory)
logger::log_info("Step 1: Downloading GLEIF CSV...").await;
let gleif_csv_path = match download_isin_lei_csv().await? {
Some(p) => {
logger::log_info(&format!("Corporate Update: GLEIF CSV at: {}", p)).await;
logger::log_info(&format!(" GLEIF CSV at: {}", p)).await;
p
}
None => {
logger::log_warn("Corporate Update: Could not obtain GLEIF CSV, continuing with limited data").await;
logger::log_warn(" Could not obtain GLEIF CSV").await;
return Ok(());
}
};
// Step 2: Load OpenFIGI type lists (small, cached)
logger::log_info("Corporate Update: Loading OpenFIGI type lists...").await;
if let Err(e) = load_figi_type_lists().await {
logger::log_warn(&format!("Could not load OpenFIGI type lists: {}", e)).await;
logger::log_info("Step 2: Loading OpenFIGI metadata...").await;
load_figi_type_lists().await.ok();
logger::log_info(" ✓ OpenFIGI metadata loaded").await;
// Step 3: Check mapping status and process only unmapped LEIs
logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await;
let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?;
if !all_mapped {
logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await;
} else {
logger::log_info(" ✓ All LEIs successfully mapped").await;
}
// Step 3: Process GLEIF → FIGI mapping in streaming fashion
logger::log_info("Corporate Update: Building FIGI mappings (streaming)...").await;
// Step 4: Build securities from FIGI data (streaming)
logger::log_info("Step 4: Building securities map (streaming)...").await;
let date_dir = find_most_recent_figi_date_dir(&paths).await?;
// Build LEI→ISINs map by streaming the CSV
let mut lei_to_isins: HashMap<String, Vec<String>> = HashMap::new();
let mut lei_batch = Vec::new();
const LEI_BATCH_SIZE: usize = 1000;
stream_gleif_csv(&gleif_csv_path, |lei, isin| {
lei_to_isins.entry(lei.clone()).or_default().push(isin);
lei_batch.push(lei);
// Process in batches
if lei_batch.len() >= LEI_BATCH_SIZE {
lei_batch.clear();
}
Ok(())
}).await?;
logger::log_info(&format!("Corporate Update: Collected {} LEIs", lei_to_isins.len())).await;
// Step 4: Build FIGI mappings in batches (process and save incrementally)
logger::log_info("Corporate Update: Processing FIGI mappings in batches...").await;
let figi_result = build_lei_to_figi_infos(&lei_to_isins, None).await;
// Don't keep the full result in memory - it's already saved to JSONL files
drop(figi_result);
drop(lei_to_isins); // Release this too
logger::log_info("Corporate Update: FIGI mappings saved to cache").await;
// Step 5: Load or build securities (streaming from JSONL files)
logger::log_info("Corporate Update: Building securities map (streaming)...").await;
let dir = DataPaths::new(".")?;
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
// Find the most recent date directory
let date_dir = find_most_recent_date_dir(&map_cache_dir).await?;
let (common_stocks, _warrants, _options) = if let Some(date_dir) = date_dir {
logger::log_info(&format!("Using FIGI data from: {:?}", date_dir)).await;
load_or_build_all_securities_streaming(&date_dir).await?
if let Some(date_dir) = date_dir {
logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await;
build_securities_from_figi_streaming(&date_dir).await?;
logger::log_info(" ✓ Securities map updated").await;
} else {
logger::log_warn("No FIGI date directory found, using empty maps").await;
(HashMap::new(), HashMap::new(), HashMap::new())
};
logger::log_warn("No FIGI data directory found").await;
}
logger::log_info(&format!("Corporate Update: Processing {} companies", common_stocks.len())).await;
// Step 5: Build companies JSONL (streaming from securities)
logger::log_info("Step 5: Building companies.jsonl (streaming)...").await;
let count = build_companies_jsonl_streaming(&paths, pool).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
// Step 6: Convert to simplified companies map and save incrementally
logger::log_info("Corporate Update: Building companies JSONL (streaming)...").await;
// Step 6: Process events (using index, not full load)
logger::log_info("Step 6: Processing events (using index)...").await;
let _event_index = build_event_index(&paths).await?;
logger::log_info(" ✓ Event index built").await;
logger::log_info("✓ Corporate update complete").await;
Ok(())
}
/// Stream companies.jsonl creation from securities cache - INCREMENTAL MODE
async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<usize> {
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
let securities_path = corporate_path.join("common_stocks.json");
if !securities_path.exists() {
logger::log_warn("No common_stocks.json found").await;
return Ok(0);
}
// Load securities
let content = tokio::fs::read_to_string(securities_path).await?;
let securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
let companies_path = paths.data_dir().join("companies.jsonl");
// Create file and write incrementally
if let Some(parent) = companies_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut file = tokio::fs::File::create(&companies_path).await?;
let mut processed = 0;
// Load existing companies into a map
let mut existing_companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new();
for (name, company_info) in common_stocks.iter() {
let mut isin_ticker_pairs: HashMap<String, String> = HashMap::new();
if companies_path.exists() {
logger::log_info("Loading existing companies.jsonl...").await;
let existing_content = tokio::fs::read_to_string(&companies_path).await?;
for line in existing_content.lines() {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
existing_companies.insert(company.name.clone(), company);
}
Err(e) => {
logger::log_warn(&format!("Failed to parse existing company line: {}", e)).await;
}
}
}
logger::log_info(&format!("Loaded {} existing companies", existing_companies.len())).await;
}
// Create temporary file for atomic write
let temp_path = companies_path.with_extension("jsonl.tmp");
let mut file = tokio::fs::File::create(&temp_path).await?;
let mut count = 0;
let mut updated_count = 0;
let mut new_count = 0;
use tokio::io::AsyncWriteExt;
for (name, company_info) in securities.iter() {
// Check if we already have this company
let existing_entry = existing_companies.remove(name);
let is_update = existing_entry.is_some();
// Start with existing ISIN-ticker map or create new one
let mut isin_tickers_map: HashMap<String, Vec<String>> =
existing_entry
.map(|e| e.isin_tickers_map)
.unwrap_or_default();
// Step 1: Extract unique ISIN-ticker pairs from FigiInfo
let mut unique_isin_ticker_pairs: HashMap<String, Vec<String>> = HashMap::new();
for figi_infos in company_info.securities.values() {
for figi_info in figi_infos {
if !figi_info.isin.is_empty() && !figi_info.ticker.is_empty() {
isin_ticker_pairs.insert(figi_info.isin.clone(), figi_info.ticker.clone());
if !figi_info.isin.is_empty() {
let tickers = unique_isin_ticker_pairs
.entry(figi_info.isin.clone())
.or_insert_with(Vec::new);
// Add FIGI ticker if present and not duplicate
if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) {
tickers.push(figi_info.ticker.clone());
}
}
}
}
if !isin_ticker_pairs.is_empty() {
use tokio::io::AsyncWriteExt;
// Step 2: Merge FIGI tickers into main map
for (isin, figi_tickers) in unique_isin_ticker_pairs {
let tickers = isin_tickers_map
.entry(isin.clone())
.or_insert_with(Vec::new);
let line = serde_json::json!({
"name": name,
"securities": isin_ticker_pairs
});
// Add FIGI tickers that aren't already present
for figi_ticker in figi_tickers {
if !tickers.contains(&figi_ticker) {
tickers.push(figi_ticker);
}
}
file.write_all(line.to_string().as_bytes()).await?;
// Step 3: Check if we need to fetch Yahoo ticker for this ISIN
let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:"));
if !has_yahoo_ticker {
logger::log_info(&format!("Fetching Yahoo ticker for {} (ISIN: {})", name, isin)).await;
let yahoo_result = scrape_ticker_by_isin(pool, &isin).await;
match yahoo_result {
Ok(result) => {
let log_msg = match &result {
YahooTickerResult::Found(ticker) =>
format!("✓ Found Yahoo ticker {} for ISIN {}", ticker, isin),
YahooTickerResult::NoResults =>
format!("○ No search results for ISIN {}", isin),
YahooTickerResult::NotFound =>
format!("○ Empty ticker result for ISIN {}", isin),
YahooTickerResult::AmbiguousResults =>
format!("⚠ Ambiguous results for ISIN {}", isin),
};
if result.is_found() {
logger::log_info(&log_msg).await;
} else {
logger::log_warn(&log_msg).await;
}
tickers.push(result.to_tagged_string());
},
Err(e) => {
logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await;
tickers.push("YAHOO:ERROR".to_string());
}
}
} else {
logger::log_warn(&format!("Skipping Yahoo lookup for {} ISIN {} - already has Yahoo data", name, isin)).await;
}
}
// Only write if we have ticker data
if !isin_tickers_map.is_empty() {
let company_entry = CompanyCrossPlatformInfo {
name: name.clone(),
isin_tickers_map,
};
let line = serde_json::to_string(&company_entry)?;
file.write_all(line.as_bytes()).await?;
file.write_all(b"\n").await?;
processed += 1;
// Yield periodically
if processed % 100 == 0 {
// Flush after each write for crash safety
file.flush().await?;
count += 1;
if is_update {
updated_count += 1;
} else {
new_count += 1;
}
if count % 10 == 0 {
logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await;
tokio::task::yield_now().await;
logger::log_info(&format!("Saved {} companies so far...", processed)).await;
}
}
}
logger::log_info(&format!("Corporate Update: Saved {} companies to JSONL", processed)).await;
// Step 7: Process events in streaming fashion
logger::log_info("Corporate Update: Processing events (streaming)...").await;
// Write any remaining existing companies that weren't in securities
for (_name, company) in existing_companies {
let line = serde_json::to_string(&company)?;
file.write_all(line.as_bytes()).await?;
file.write_all(b"\n").await?;
file.flush().await?;
count += 1;
logger::log_warn(&format!("Preserved existing company: {}", _name)).await;
}
let event_index = build_event_index(&paths).await?;
logger::log_info(&format!("Corporate Update: Built index of {} events", event_index.len())).await;
// Ensure all data is written
file.sync_all().await?;
drop(file);
// For now, we just maintain the index
// In a full implementation, you'd stream through tickers and update events
// Atomic rename: replace old file with new one
tokio::fs::rename(&temp_path, &companies_path).await?;
// Step 8: Save any updates
logger::log_info("Corporate Update: Finalizing...").await;
let msg = "✓ Corporate update complete (streaming)";
println!("{}", msg);
logger::log_info(msg).await;
Ok(())
logger::log_info(&format!("✓ Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count)).await;
Ok(count)
}
/// Helper to find the most recent date directory in the FIGI cache
async fn find_most_recent_date_dir(map_cache_dir: &std::path::Path) -> anyhow::Result<Option<std::path::PathBuf>> {
/// Find most recent FIGI date directory
async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> {
let map_cache_dir = paths.cache_gleif_openfigi_map_dir();
if !map_cache_dir.exists() {
return Ok(None);
}
let mut entries = tokio::fs::read_dir(map_cache_dir).await?;
let mut entries = tokio::fs::read_dir(&map_cache_dir).await?;
let mut dates = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
// Date format: DDMMYYYY
if name.len() == 8 && name.chars().all(|c| c.is_numeric()) {
dates.push((name.to_string(), path));
}
@@ -176,67 +280,16 @@ async fn find_most_recent_date_dir(map_cache_dir: &std::path::Path) -> anyhow::R
return Ok(None);
}
// Sort by date (DDMMYYYY format)
dates.sort_by(|a, b| b.0.cmp(&a.0)); // Descending order
dates.sort_by(|a, b| b.0.cmp(&a.0));
Ok(Some(dates[0].1.clone()))
}
pub struct ProcessResult {
pub changes: Vec<CompanyEventChange>,
}
/// Process events in batches to avoid memory buildup
pub async fn process_events_streaming(
index: &[EventIndex],
new_events: &[CompanyEvent],
today: &str,
) -> anyhow::Result<(Vec<CompanyEventChange>, Vec<CompanyEvent>)> {
let mut all_changes = Vec::new();
let mut final_events: HashMap<String, CompanyEvent> = HashMap::new();
// Step 1: Load existing events in batches using the index
logger::log_info("Loading existing events in batches...").await;
let mut loaded_files = std::collections::HashSet::new();
for entry in index {
if loaded_files.contains(&entry.file_path) {
continue;
}
let content = tokio::fs::read_to_string(&entry.file_path).await?;
let events: Vec<CompanyEvent> = serde_json::from_str(&content)?;
for e in events {
final_events.insert(event_key(&e), e);
}
loaded_files.insert(entry.file_path.clone());
if final_events.len() % 1000 == 0 {
logger::log_info(&format!("Loaded {} events so far...", final_events.len())).await;
tokio::task::yield_now().await;
}
}
logger::log_info(&format!("Loaded {} existing events", final_events.len())).await;
// Step 2: Process new events in batches
for (idx, batch) in new_events.chunks(500).enumerate() {
logger::log_info(&format!("Processing batch {} ({} events)", idx + 1, batch.len())).await;
let batch_result = process_batch(batch, &mut final_events, today);
all_changes.extend(batch_result.changes);
tokio::task::yield_now().await;
}
let events_vec: Vec<CompanyEvent> = final_events.into_values().collect();
Ok((all_changes, events_vec))
}
pub fn process_batch(
new_events: &[CompanyEvent],
existing: &mut HashMap<String, CompanyEvent>,
@@ -253,7 +306,6 @@ pub fn process_batch(
continue;
}
// Check for time change on same date
let date_key = format!("{}|{}", new.ticker, new.date);
let mut found_old = None;
for (k, e) in existing.iter() {