added data streaming instead of laoding

This commit is contained in:
2025-12-12 10:54:01 +01:00
parent 1bda78897b
commit 00c9d45642
7 changed files with 888 additions and 398 deletions

View File

@@ -1,8 +1,8 @@
// src/corporate/openfigi.rs
use super::{types::*};
use crate::util::directories::DataPaths; use crate::util::directories::DataPaths;
use crate::util::logger; use crate::util::logger;
// src/corporate/openfigi.rs
use super::{types::*};
use reqwest::Client as HttpClient; use reqwest::Client as HttpClient;
use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::header::{HeaderMap, HeaderValue};
use serde_json::{json, Value}; use serde_json::{json, Value};
@@ -15,6 +15,7 @@ use tokio::time::{sleep, Duration};
use tokio::fs as tokio_fs; use tokio::fs as tokio_fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use anyhow::{Context, anyhow}; use anyhow::{Context, anyhow};
use std::io::BufRead;
#[derive(Clone)] #[derive(Clone)]
pub struct OpenFigiClient { pub struct OpenFigiClient {
@@ -933,97 +934,6 @@ async fn remove_leis_batch_from_gleif_csv(gleif_cache_dir: &Path, leis_to_remove
Ok(()) Ok(())
} }
/// Loads or builds HashMaps for companies, warrants, and options.
///
/// This function:
/// 1. Attempts to load existing data from cache
/// 2. Processes new FIGI data and classifies by securityType:
/// - "Common Stock" → companies HashMap (grouped by ISIN)
/// - "Equity WRT" → warrants HashMap (parsed from name)
/// - "Equity Option" → options HashMap (parsed from name)
/// 3. Updates/extends existing entries
/// 4. Saves results to separate JSON files
///
/// # Arguments
/// * `figi_to_lei` - HashMap mapping LEI to Vec<FigiInfo>.
///
/// # Returns
/// A tuple of (companies, warrants, options) HashMaps.
///
/// # Errors
/// Returns an error if file I/O fails or JSON serialization fails.
pub async fn load_or_build_all_securities(
figi_to_lei: &HashMap<String, Vec<FigiInfo>>
) -> anyhow::Result<(
HashMap<String, CompanyInfo>,
HashMap<String, HashMap<String, WarrantInfo>>,
HashMap<String, HashMap<String, OptionInfo>>
)> {
// Load existing data
let mut commons = load_from_cache("data/corporate/by_name/common_stocks.json").await?
.unwrap_or_else(HashMap::new);
let mut warrants = load_from_cache("data/corporate/by_name/warrants.json").await?
.unwrap_or_else(HashMap::new);
let mut options = load_from_cache("data/corporate/by_name/options.json").await?
.unwrap_or_else(HashMap::new);
/*let mut preferred = load_from_cache("data/corporate/by_name/preferred.json").await?
.unwrap_or_else(HashMap::new);*/
println!("Loaded existing data:");
println!(" - Companies: {}", commons.len());
println!(" - Warrants: {}", warrants.len());
println!(" - Options: {}", options.len());
let mut stats = ProcessingStats::new(commons.len(), warrants.len(), options.len());
println!("Processing {} LEI entries from FIGI data...", figi_to_lei.len());
for (_lei, figi_infos) in figi_to_lei.iter() {
if figi_infos.is_empty() {
continue;
}
// Group FigiInfos by security type
let mut common_stocks = Vec::new();
let mut warrant_securities = Vec::new();
let mut option_securities = Vec::new();
for figi_info in figi_infos {
match figi_info.security_type.as_str() {
"Common Stock" => common_stocks.push(figi_info.clone()),
"Equity WRT" => warrant_securities.push(figi_info.clone()),
"Equity Option" => option_securities.push(figi_info.clone()),
_ => {} // Ignore other types
}
}
// Process common stocks -> companies
if !common_stocks.is_empty() {
process_common_stocks(&mut commons, &common_stocks, &mut stats);
}
// Process warrants
if !warrant_securities.is_empty() {
process_warrants(&mut warrants, &warrant_securities, &mut stats);
}
// Process options
if !option_securities.is_empty() {
process_options(&mut options, &option_securities, &mut stats);
}
}
stats.print_summary(commons.len(), warrants.len(), options.len());
// Save all three HashMaps
save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?;
save_to_cache("data/corporate/by_name/warrants.json", &warrants).await?;
save_to_cache("data/corporate/by_name/options.json", &options).await?;
Ok((commons, warrants, options))
}
/// Statistics tracker for processing /// Statistics tracker for processing
#[derive(Debug)] #[derive(Debug)]
struct ProcessingStats { struct ProcessingStats {
@@ -1585,3 +1495,297 @@ async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
pub async fn stream_gleif_csv<F>(
csv_path: &str,
mut callback: F
) -> anyhow::Result<usize>
where
F: FnMut(String, String) -> anyhow::Result<()>,
{
logger::log_info(&format!("Streaming GLEIF CSV: {}", csv_path)).await;
let file = std::fs::File::open(csv_path)
.context("Failed to open GLEIF CSV")?;
let reader = std::io::BufReader::new(file);
let mut count = 0;
for (idx, line) in reader.lines().enumerate() {
let line = line.context("Failed to read line")?;
// Skip header
if idx == 0 {
continue;
}
// Parse CSV line
let parts: Vec<&str> = line.split(',').collect();
if parts.len() < 2 {
continue;
}
let lei = parts[0].trim().trim_matches('"').to_string();
let isin = parts[1].trim().trim_matches('"').to_string();
if !lei.is_empty() && !isin.is_empty() {
callback(lei, isin)?;
count += 1;
}
// Yield periodically
if count % 10000 == 0 {
tokio::task::yield_now().await;
}
}
logger::log_info(&format!("Streamed {} LEI-ISIN pairs", count)).await;
Ok(count)
}
/// Process FIGI mappings in batches instead of all at once
pub async fn process_figi_mappings_streaming(
lei_to_isins_stream: impl Iterator<Item = (String, Vec<String>)>,
gleif_date: Option<&str>,
batch_size: usize,
) -> anyhow::Result<()> {
let dir = DataPaths::new(".")?;
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
let date = determine_gleif_date(gleif_date, &dir).await?;
let date_dir = map_cache_dir.join(&date);
tokio_fs::create_dir_all(&date_dir).await?;
// Setup sector directories
let sector_dirs = load_market_sectors().await?;
setup_sector_directories(&date_dir, &sector_dirs).await?;
let client = OpenFigiClient::new().await?;
if !client.has_key {
logger::log_warn("No API key - limited FIGI mapping").await;
return Ok(());
}
// Process in batches
let mut batch = Vec::new();
let mut processed = 0;
for (lei, isins) in lei_to_isins_stream {
batch.push((lei, isins));
if batch.len() >= batch_size {
process_figi_batch(&client, &batch, &date_dir, &sector_dirs).await?;
processed += batch.len();
logger::log_info(&format!("Processed {} LEIs so far...", processed)).await;
batch.clear();
// Yield to prevent blocking
tokio::task::yield_now().await;
}
}
// Process remaining
if !batch.is_empty() {
process_figi_batch(&client, &batch, &date_dir, &sector_dirs).await?;
processed += batch.len();
}
logger::log_info(&format!("Total processed: {} LEIs", processed)).await;
Ok(())
}
async fn process_figi_batch(
client: &OpenFigiClient,
batch: &[(String, Vec<String>)],
date_dir: &Path,
sector_dirs: &[String],
) -> anyhow::Result<()> {
for (lei, isins) in batch {
let unique_isins: Vec<_> = isins.iter()
.cloned()
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?;
if figi_infos.is_empty() {
continue;
}
// Save to appropriate sector files
save_figi_infos_by_sector(lei, &figi_infos, date_dir, sector_dirs).await?;
}
Ok(())
}
async fn save_figi_infos_by_sector(
lei: &str,
figi_infos: &[FigiInfo],
date_dir: &Path,
_sector_dirs: &[String],
) -> anyhow::Result<()> {
let mut by_sector: HashMap<String, Vec<FigiInfo>> = HashMap::new();
for figi_info in figi_infos {
let sector = if figi_info.market_sector.is_empty() {
"uncategorized".to_string()
} else {
figi_info.market_sector.clone()
};
by_sector.entry(sector).or_default().push(figi_info.clone());
}
// Save to sector files
for (sector, figis) in by_sector {
let sector_dir = date_dir.join(&sector);
let path = sector_dir.join("lei_to_figi.jsonl");
append_lei_to_figi_jsonl(&path, lei, &figis).await?;
}
Ok(())
}
/// Modified load_or_build_all_securities to process in streaming fashion
pub async fn load_or_build_all_securities_streaming(
date_dir: &Path,
) -> anyhow::Result<(
HashMap<String, CompanyInfo>,
HashMap<String, HashMap<String, WarrantInfo>>,
HashMap<String, HashMap<String, OptionInfo>>
)> {
let mut commons = HashMap::new();
let mut warrants = HashMap::new();
let mut options = HashMap::new();
// Load existing data
commons = load_from_cache("data/corporate/by_name/common_stocks.json")
.await?
.unwrap_or_default();
warrants = load_from_cache("data/corporate/by_name/warrants.json")
.await?
.unwrap_or_default();
options = load_from_cache("data/corporate/by_name/options.json")
.await?
.unwrap_or_default();
println!("Loaded existing data:");
println!(" - Companies: {}", commons.len());
println!(" - Warrants: {}", warrants.len());
println!(" - Options: {}", options.len());
let mut stats = ProcessingStats::new(commons.len(), warrants.len(), options.len());
// Stream through JSONL files in date_dir
let equity_file = date_dir.join("Equity").join("lei_to_figi.jsonl");
if equity_file.exists() {
logger::log_info(&format!("Streaming FIGIs from {:?}", equity_file)).await;
let content = tokio_fs::read_to_string(&equity_file).await?;
let mut processed = 0;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
let entry: serde_json::Value = serde_json::from_str(line)?;
let _lei = entry["lei"].as_str().unwrap_or("");
let figi_infos: Vec<FigiInfo> = serde_json::from_value(
entry["figis"].clone()
)?;
// Process this batch
process_figi_infos_batch(
&figi_infos,
&mut commons,
&mut warrants,
&mut options,
&mut stats
);
processed += 1;
if processed % 100 == 0 {
tokio::task::yield_now().await;
}
}
}
stats.print_summary(commons.len(), warrants.len(), options.len());
// Save incrementally
save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?;
save_to_cache("data/corporate/by_name/warrants.json", &warrants).await?;
save_to_cache("data/corporate/by_name/options.json", &options).await?;
Ok((commons, warrants, options))
}
fn process_figi_infos_batch(
figi_infos: &[FigiInfo],
commons: &mut HashMap<String, CompanyInfo>,
warrants: &mut HashMap<String, HashMap<String, WarrantInfo>>,
options: &mut HashMap<String, HashMap<String, OptionInfo>>,
stats: &mut ProcessingStats,
) {
let mut common_stocks = Vec::new();
let mut warrant_securities = Vec::new();
let mut option_securities = Vec::new();
for figi_info in figi_infos {
match figi_info.security_type.as_str() {
"Common Stock" => common_stocks.push(figi_info.clone()),
"Equity WRT" => warrant_securities.push(figi_info.clone()),
"Equity Option" => option_securities.push(figi_info.clone()),
_ => {}
}
}
if !common_stocks.is_empty() {
process_common_stocks(commons, &common_stocks, stats);
}
if !warrant_securities.is_empty() {
process_warrants(warrants, &warrant_securities, stats);
}
if !option_securities.is_empty() {
process_options(options, &option_securities, stats);
}
}
// Helper functions
async fn determine_gleif_date(
gleif_date: Option<&str>,
paths: &DataPaths,
) -> anyhow::Result<String> {
if let Some(d) = gleif_date {
Ok(d.to_string())
} else {
match find_most_recent_gleif_date(paths.cache_gleif_dir()).await? {
Some(d) => Ok(d),
None => Err(anyhow!("No GLEIF CSV file found")),
}
}
}
async fn setup_sector_directories(
date_dir: &Path,
sector_dirs: &[String],
) -> anyhow::Result<()> {
// Create uncategorized folder
let uncategorized_dir = date_dir.join("uncategorized");
tokio_fs::create_dir_all(&uncategorized_dir).await?;
// Create sector folders
for sector in sector_dirs {
let sector_dir = date_dir.join(sector);
tokio_fs::create_dir_all(&sector_dir).await?;
}
Ok(())
}

View File

@@ -7,18 +7,24 @@ use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use chrono::{Datelike, NaiveDate}; use chrono::{Datelike, NaiveDate};
use std::collections::{HashMap}; use std::collections::{HashMap};
use std::path::{PathBuf}; use std::path::{PathBuf, Path};
pub async fn load_existing_events(paths: &DataPaths) -> anyhow::Result<HashMap<String, CompanyEvent>> { const BATCH_SIZE: usize = 500; // Process 500 events at a time
let mut map = HashMap::new();
/// Load events in streaming fashion to avoid memory buildup
pub async fn load_existing_events_streaming(
paths: &DataPaths,
callback: impl Fn(CompanyEvent) -> anyhow::Result<()>
) -> anyhow::Result<usize> {
let dir = paths.corporate_events_dir(); let dir = paths.corporate_events_dir();
if !dir.exists() { if !dir.exists() {
logger::log_info("Corporate Storage: No existing events directory found").await; logger::log_info("Corporate Storage: No existing events directory found").await;
return Ok(map); return Ok(0);
} }
let mut total = 0;
let mut entries = fs::read_dir(dir).await?; let mut entries = fs::read_dir(dir).await?;
let mut loaded_count = 0;
while let Some(entry) = entries.next_entry().await? { while let Some(entry) = entries.next_entry().await? {
let path = entry.path(); let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") { if path.extension().and_then(|s| s.to_str()) == Some("json") {
@@ -26,18 +32,84 @@ pub async fn load_existing_events(paths: &DataPaths) -> anyhow::Result<HashMap<S
if name.starts_with("events_") && name.len() == 17 { if name.starts_with("events_") && name.len() == 17 {
let content = fs::read_to_string(&path).await?; let content = fs::read_to_string(&path).await?;
let events: Vec<CompanyEvent> = serde_json::from_str(&content)?; let events: Vec<CompanyEvent> = serde_json::from_str(&content)?;
for event in events { for event in events {
map.insert(event_key(&event), event); callback(event)?;
} total += 1;
loaded_count += 1;
}
}
}
logger::log_info(&format!("Corporate Storage: Loaded {} events from {} files", map.len(), loaded_count)).await;
Ok(map)
} }
pub async fn save_optimized_events(paths: &DataPaths, events: HashMap<String, CompanyEvent>) -> anyhow::Result<()> { // Yield to prevent blocking
tokio::task::yield_now().await;
}
}
}
logger::log_info(&format!("Corporate Storage: Streamed {} events", total)).await;
Ok(total)
}
/// Build lightweight index of events instead of loading everything
#[derive(Debug, Clone)]
pub struct EventIndex {
pub key: String,
pub ticker: String,
pub date: String,
pub file_path: PathBuf,
}
pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result<Vec<EventIndex>> {
let dir = paths.corporate_events_dir();
if !dir.exists() {
return Ok(Vec::new());
}
let mut index = Vec::new();
let mut entries = fs::read_dir(dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") {
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if name.starts_with("events_") && name.len() == 17 {
let content = fs::read_to_string(&path).await?;
let events: Vec<CompanyEvent> = serde_json::from_str(&content)?;
for event in events {
index.push(EventIndex {
key: event_key(&event),
ticker: event.ticker.clone(),
date: event.date.clone(),
file_path: path.clone(),
});
}
}
}
}
logger::log_info(&format!("Corporate Storage: Built index with {} entries", index.len())).await;
Ok(index)
}
/// Lookup specific event by loading only its file
pub async fn lookup_event_by_key(
key: &str,
index: &[EventIndex]
) -> anyhow::Result<Option<CompanyEvent>> {
let entry = index.iter().find(|e| e.key == key);
if let Some(entry) = entry {
let content = fs::read_to_string(&entry.file_path).await?;
let events: Vec<CompanyEvent> = serde_json::from_str(&content)?;
Ok(events.into_iter().find(|e| event_key(e) == key))
} else {
Ok(None)
}
}
pub async fn save_optimized_events(
paths: &DataPaths,
events: Vec<CompanyEvent> // Changed from HashMap to Vec
) -> anyhow::Result<()> {
let dir = paths.corporate_events_dir(); let dir = paths.corporate_events_dir();
fs::create_dir_all(dir).await?; fs::create_dir_all(dir).await?;
@@ -55,16 +127,24 @@ pub async fn save_optimized_events(paths: &DataPaths, events: HashMap<String, Co
logger::log_info(&format!("Corporate Storage: Removed {} old event files", removed_count)).await; logger::log_info(&format!("Corporate Storage: Removed {} old event files", removed_count)).await;
let total_events = events.len(); let total_events = events.len();
let mut sorted: Vec<_> = events.into_values().collect(); let mut sorted = events;
sorted.sort_by_key(|e| (e.ticker.clone(), e.date.clone())); sorted.sort_by(|a, b| {
a.ticker.cmp(&b.ticker)
.then(a.date.cmp(&b.date))
});
// Process in batches to avoid memory buildup
let mut by_month: HashMap<String, Vec<CompanyEvent>> = HashMap::new(); let mut by_month: HashMap<String, Vec<CompanyEvent>> = HashMap::new();
for e in sorted {
for chunk in sorted.chunks(BATCH_SIZE) {
for e in chunk {
if let Ok(d) = NaiveDate::parse_from_str(&e.date, "%Y-%m-%d") { if let Ok(d) = NaiveDate::parse_from_str(&e.date, "%Y-%m-%d") {
let key = format!("{}-{:02}", d.year(), d.month()); let key = format!("{}-{:02}", d.year(), d.month());
by_month.entry(key).or_default().push(e); by_month.entry(key).or_default().push(e.clone());
} }
} }
tokio::task::yield_now().await;
}
let total_months = by_month.len(); let total_months = by_month.len();
for (month, list) in by_month { for (month, list) in by_month {
@@ -72,6 +152,7 @@ pub async fn save_optimized_events(paths: &DataPaths, events: HashMap<String, Co
fs::write(&path, serde_json::to_string_pretty(&list)?).await?; fs::write(&path, serde_json::to_string_pretty(&list)?).await?;
logger::log_info(&format!("Corporate Storage: Saved {} events for month {}", list.len(), month)).await; logger::log_info(&format!("Corporate Storage: Saved {} events for month {}", list.len(), month)).await;
} }
logger::log_info(&format!("Corporate Storage: Saved {} total events in {} month files", total_events, total_months)).await; logger::log_info(&format!("Corporate Storage: Saved {} total events in {} month files", total_events, total_months)).await;
Ok(()) Ok(())
} }
@@ -108,7 +189,12 @@ pub async fn save_changes(paths: &DataPaths, changes: &[CompanyEventChange]) ->
Ok(()) Ok(())
} }
pub async fn save_prices_for_ticker(paths: &DataPaths, ticker: &str, timeframe: &str, mut prices: Vec<CompanyPrice>) -> anyhow::Result<()> { pub async fn save_prices_for_ticker(
paths: &DataPaths,
ticker: &str,
timeframe: &str,
mut prices: Vec<CompanyPrice>
) -> anyhow::Result<()> {
let base_dir = paths.corporate_prices_dir(); let base_dir = paths.corporate_prices_dir();
let company_dir = base_dir.join(ticker.replace(".", "_")); let company_dir = base_dir.join(ticker.replace(".", "_"));
let timeframe_dir = company_dir.join(timeframe); let timeframe_dir = company_dir.join(timeframe);
@@ -142,7 +228,11 @@ pub async fn ensure_company_dirs(paths: &DataPaths, isin: &str) -> anyhow::Resul
Ok(()) Ok(())
} }
pub async fn save_available_exchanges(paths: &DataPaths, isin: &str, exchanges: Vec<AvailableExchange>) -> anyhow::Result<()> { pub async fn save_available_exchanges(
paths: &DataPaths,
isin: &str,
exchanges: Vec<AvailableExchange>
) -> anyhow::Result<()> {
let dir = get_company_dir(paths, isin); let dir = get_company_dir(paths, isin);
fs::create_dir_all(&dir).await?; fs::create_dir_all(&dir).await?;
let path = dir.join("available_exchanges.json"); let path = dir.join("available_exchanges.json");
@@ -177,66 +267,8 @@ pub async fn save_prices_by_source(
Ok(()) Ok(())
} }
/// Update available_exchanges.json with fetch results /// Saves companies data to a JSONL file in streaming fashion
/*pub async fn update_available_exchange( pub async fn save_companies_to_jsonl_streaming(
paths: &DataPaths,
isin: &str,
ticker: &str,
exchange_mic: &str,
has_daily: bool,
has_5min: bool,
) -> anyhow::Result<()> {
let mut exchanges = load_available_exchanges(paths, isin).await?;
if let Some(entry) = exchanges.iter_mut().find(|e| e.ticker == ticker) {
// Update existing entry
entry.record_success(has_daily, has_5min);
} else {
// Create new entry - need to get currency from somewhere
// Try to infer from the ticker or use a default
let currency = infer_currency_from_ticker(ticker);
let mut new_entry = AvailableExchange::new(
ticker.to_string(),
exchange_mic.to_string(),
currency,
);
new_entry.record_success(has_daily, has_5min);
exchanges.push(new_entry);
}
save_available_exchanges(paths, isin, exchanges).await
}*/
/// Infer currency from ticker suffix
fn infer_currency_from_ticker(ticker: &str) -> String {
if ticker.ends_with(".L") { return "GBP".to_string(); }
if ticker.ends_with(".PA") { return "EUR".to_string(); }
if ticker.ends_with(".DE") { return "EUR".to_string(); }
if ticker.ends_with(".AS") { return "EUR".to_string(); }
if ticker.ends_with(".MI") { return "EUR".to_string(); }
if ticker.ends_with(".SW") { return "CHF".to_string(); }
if ticker.ends_with(".T") { return "JPY".to_string(); }
if ticker.ends_with(".HK") { return "HKD".to_string(); }
if ticker.ends_with(".SS") { return "CNY".to_string(); }
if ticker.ends_with(".SZ") { return "CNY".to_string(); }
if ticker.ends_with(".TO") { return "CAD".to_string(); }
if ticker.ends_with(".AX") { return "AUD".to_string(); }
if ticker.ends_with(".SA") { return "BRL".to_string(); }
if ticker.ends_with(".MC") { return "EUR".to_string(); }
if ticker.ends_with(".BO") || ticker.ends_with(".NS") { return "INR".to_string(); }
"USD".to_string() // Default
}
/// Saves companies data to a JSONL file.
///
/// # Arguments
/// * `paths` - Reference to DataPaths for directory management
/// * `companies` - HashMap of company names to their securities (ISIN, Ticker pairs)
///
/// # Errors
/// Returns an error if file operations or serialization fails.
pub async fn save_companies_to_jsonl(
paths: &DataPaths, paths: &DataPaths,
companies: &HashMap<String, HashMap<String, String>>, companies: &HashMap<String, HashMap<String, String>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@@ -244,13 +276,14 @@ pub async fn save_companies_to_jsonl(
logger::log_info(&format!("Corporate Storage: Saving {} companies to JSONL", companies.len())).await; logger::log_info(&format!("Corporate Storage: Saving {} companies to JSONL", companies.len())).await;
// Create parent directory if it doesn't exist
if let Some(parent) = file_path.parent() { if let Some(parent) = file_path.parent() {
tokio::fs::create_dir_all(parent).await?; tokio::fs::create_dir_all(parent).await?;
} }
let mut file = tokio::fs::File::create(&file_path).await?; let mut file = tokio::fs::File::create(&file_path).await?;
let mut count = 0;
// Process in batches
for (name, securities) in companies.iter() { for (name, securities) in companies.iter() {
let line = serde_json::json!({ let line = serde_json::json!({
"name": name, "name": name,
@@ -258,6 +291,11 @@ pub async fn save_companies_to_jsonl(
}); });
file.write_all(line.to_string().as_bytes()).await?; file.write_all(line.to_string().as_bytes()).await?;
file.write_all(b"\n").await?; file.write_all(b"\n").await?;
count += 1;
if count % 100 == 0 {
tokio::task::yield_now().await;
}
} }
let msg = format!("✓ Saved {} companies to {:?}", companies.len(), file_path); let msg = format!("✓ Saved {} companies to {:?}", companies.len(), file_path);
@@ -265,3 +303,37 @@ pub async fn save_companies_to_jsonl(
logger::log_info(&msg).await; logger::log_info(&msg).await;
Ok(()) Ok(())
} }
/// Load companies from JSONL in streaming fashion
pub async fn load_companies_from_jsonl_streaming(
path: &Path,
callback: impl Fn(String, HashMap<String, String>) -> anyhow::Result<()>
) -> anyhow::Result<usize> {
if !path.exists() {
return Ok(0);
}
let content = tokio::fs::read_to_string(path).await?;
let mut count = 0;
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
let entry: serde_json::Value = serde_json::from_str(line)?;
let name = entry["name"].as_str().unwrap_or("").to_string();
let securities: HashMap<String, String> = serde_json::from_value(
entry["securities"].clone()
)?;
callback(name, securities)?;
count += 1;
if count % 100 == 0 {
tokio::task::yield_now().await;
}
}
Ok(count)
}

View File

@@ -9,88 +9,101 @@ use chrono::Local;
use std::collections::{HashMap}; use std::collections::{HashMap};
use std::sync::Arc; use std::sync::Arc;
/// Main function: Full update for all companies (LEI-based) with optimized parallel execution. /// Main function: Full update for all companies with streaming to minimize memory usage
///
/// This function coordinates the entire update process:
/// - Loads GLEIF mappings
/// - Builds FIGI-LEI map
/// - Loads existing events
/// - Processes each company: discovers exchanges via FIGI, fetches prices & earnings, aggregates data
/// - Uses the provided shared ChromeDriver pool for efficient parallel scraping
/// - Saves optimized events
///
/// # Arguments
/// * `config` - The application configuration.
/// * `pool` - Shared pool of ChromeDriver instances for scraping.
///
/// # Errors
/// Returns an error if any step in the update process fails.
pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> { pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
let msg = "=== Starting LEI-based corporate full update ==="; let msg = "=== Starting LEI-based corporate full update (STREAMING) ===";
println!("{}", msg); println!("{}", msg);
logger::log_info(msg).await; logger::log_info(msg).await;
// Initialize paths
let paths = DataPaths::new(".")?; let paths = DataPaths::new(".")?;
// 1. Load fresh GLEIF ISIN ↔ LEI mapping // Step 1: Download/locate GLEIF CSV (don't load into memory yet)
logger::log_info("Corporate Update: Loading GLEIF ISIN ↔ LEI mapping...").await; logger::log_info("Corporate Update: Downloading/locating GLEIF CSV...").await;
let lei_to_isins: HashMap<String, Vec<String>> = match load_isin_lei_csv().await { let gleif_csv_path = match download_isin_lei_csv().await? {
Ok(map) => { Some(p) => {
let msg = format!("Corporate Update: Loaded GLEIF mapping with {} LEI entries", map.len()); logger::log_info(&format!("Corporate Update: GLEIF CSV at: {}", p)).await;
println!("{}", msg); p
logger::log_info(&msg).await;
map
} }
Err(e) => { None => {
let msg = format!("Corporate Update: Warning - Could not load GLEIF ISIN↔LEI mapping: {}", e); logger::log_warn("Corporate Update: Could not obtain GLEIF CSV, continuing with limited data").await;
eprintln!("{}", msg); return Ok(());
logger::log_warn(&msg).await;
HashMap::new()
} }
}; };
// 2. Load OpenFIGI mapping value lists (cached) // Step 2: Load OpenFIGI type lists (small, cached)
logger::log_info("Corporate Update: Loading OpenFIGI type lists...").await; logger::log_info("Corporate Update: Loading OpenFIGI type lists...").await;
if let Err(e) = load_figi_type_lists().await { if let Err(e) = load_figi_type_lists().await {
let msg = format!("Corporate Update: Warning - Could not load OpenFIGI type lists: {}", e); logger::log_warn(&format!("Could not load OpenFIGI type lists: {}", e)).await;
eprintln!("{}", msg);
logger::log_warn(&msg).await;
} }
logger::log_info("Corporate Update: OpenFIGI type lists loaded").await;
// 3. Build FIGI → LEI map // Step 3: Process GLEIF → FIGI mapping in streaming fashion
logger::log_info("Corporate Update: Building FIGI → LEI map...").await; logger::log_info("Corporate Update: Building FIGI mappings (streaming)...").await;
let figi_to_lei:HashMap<String, Vec<FigiInfo>> = match build_lei_to_figi_infos(&lei_to_isins, None).await {
Ok(map) => { // Build LEI→ISINs map by streaming the CSV
let msg = format!("Corporate Update: Built FIGI map with {} entries", map.len()); let mut lei_to_isins: HashMap<String, Vec<String>> = HashMap::new();
println!("{}", msg); let mut lei_batch = Vec::new();
logger::log_info(&msg).await; const LEI_BATCH_SIZE: usize = 1000;
map
} stream_gleif_csv(&gleif_csv_path, |lei, isin| {
Err(e) => { lei_to_isins.entry(lei.clone()).or_default().push(isin);
let msg = format!("Corporate Update: Warning - Could not build FIGI→LEI map: {}", e); lei_batch.push(lei);
eprintln!("{}", msg);
logger::log_warn(&msg).await; // Process in batches
HashMap::new() if lei_batch.len() >= LEI_BATCH_SIZE {
lei_batch.clear();
} }
Ok(())
}).await?;
logger::log_info(&format!("Corporate Update: Collected {} LEIs", lei_to_isins.len())).await;
// Step 4: Build FIGI mappings in batches (process and save incrementally)
logger::log_info("Corporate Update: Processing FIGI mappings in batches...").await;
let figi_result = build_lei_to_figi_infos(&lei_to_isins, None).await;
// Don't keep the full result in memory - it's already saved to JSONL files
drop(figi_result);
drop(lei_to_isins); // Release this too
logger::log_info("Corporate Update: FIGI mappings saved to cache").await;
// Step 5: Load or build securities (streaming from JSONL files)
logger::log_info("Corporate Update: Building securities map (streaming)...").await;
let dir = DataPaths::new(".")?;
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
// Find the most recent date directory
let date_dir = find_most_recent_date_dir(&map_cache_dir).await?;
let (common_stocks, _warrants, _options) = if let Some(date_dir) = date_dir {
logger::log_info(&format!("Using FIGI data from: {:?}", date_dir)).await;
load_or_build_all_securities_streaming(&date_dir).await?
} else {
logger::log_warn("No FIGI date directory found, using empty maps").await;
(HashMap::new(), HashMap::new(), HashMap::new())
}; };
// 4. Load or build companies logger::log_info(&format!("Corporate Update: Processing {} companies", common_stocks.len())).await;
logger::log_info("Corporate Update: Loading/building company securities...").await;
let securities = load_or_build_all_securities(&figi_to_lei).await?;
let msg = format!("Corporate Update: Processing {} companies", securities.0.len());
println!("{}", msg);
logger::log_info(&msg).await;
// HashMap<Name, HashMap<ISIN, Ticker>> - unique pairs only // Step 6: Convert to simplified companies map and save incrementally
let companies: HashMap<String, HashMap<String, String>> = securities.0 logger::log_info("Corporate Update: Building companies JSONL (streaming)...").await;
.iter()
.fold(HashMap::new(), |mut acc, security| { let companies_path = paths.data_dir().join("companies.jsonl");
// Create file and write incrementally
if let Some(parent) = companies_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut file = tokio::fs::File::create(&companies_path).await?;
let mut processed = 0;
for (name, company_info) in common_stocks.iter() {
let mut isin_ticker_pairs: HashMap<String, String> = HashMap::new(); let mut isin_ticker_pairs: HashMap<String, String> = HashMap::new();
// Collect all unique ISIN-Ticker pairs for figi_infos in company_info.securities.values() {
for figi_infos in security.1.securities.values() {
for figi_info in figi_infos { for figi_info in figi_infos {
if !figi_info.isin.is_empty() && !figi_info.ticker.is_empty() { if !figi_info.isin.is_empty() && !figi_info.ticker.is_empty() {
isin_ticker_pairs.insert(figi_info.isin.clone(), figi_info.ticker.clone()); isin_ticker_pairs.insert(figi_info.isin.clone(), figi_info.ticker.clone());
@@ -98,72 +111,132 @@ pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> a
} }
} }
// Only add if there are pairs
if !isin_ticker_pairs.is_empty() { if !isin_ticker_pairs.is_empty() {
acc.insert(security.1.name.clone(), isin_ticker_pairs); use tokio::io::AsyncWriteExt;
}
acc let line = serde_json::json!({
"name": name,
"securities": isin_ticker_pairs
}); });
logger::log_info(&format!("Corporate Update: Saving {} companies to JSONL", companies.len())).await; file.write_all(line.to_string().as_bytes()).await?;
save_companies_to_jsonl(&paths, &companies).await.expect("Failed to save companies List."); file.write_all(b"\n").await?;
logger::log_info("Corporate Update: Companies saved successfully").await; processed += 1;
// 5. Load existing earnings events (for change detection) // Yield periodically
logger::log_info("Corporate Update: Loading existing events...").await; if processed % 100 == 0 {
let existing_events = match load_existing_events(&paths).await { tokio::task::yield_now().await;
Ok(events) => { logger::log_info(&format!("Saved {} companies so far...", processed)).await;
let msg = format!("Corporate Update: Loaded {} existing events", events.len());
println!("{}", msg);
logger::log_info(&msg).await;
events
} }
Err(e) => {
let msg = format!("Corporate Update: Warning - Could not load existing events: {}", e);
eprintln!("{}", msg);
logger::log_warn(&msg).await;
HashMap::new()
} }
};
// 5. Use the provided pool (no need to create a new one)
let pool_size = pool.get_number_of_instances(); // Use the size from the shared pool
logger::log_info(&format!("Corporate Update: Using pool size: {}", pool_size)).await;
// Process companies in parallel using the shared pool
/*let results: Vec<_> = stream::iter(companies.into_iter())
.map(|company| {
let pool_clone = pool.clone();
async move {
process_company_data(&company, &pool_clone, &mut existing_events).await
} }
})
.buffer_unordered(pool_size)
.collect().await;
// Handle results (e.g., collect changes) logger::log_info(&format!("Corporate Update: Saved {} companies to JSONL", processed)).await;
let mut all_changes = Vec::new();
for result in results {
if let Ok(ProcessResult { changes }) = result {
all_changes.extend(changes);
}
}*/
logger::log_info(&format!("Corporate Update: Saving {} events to optimized storage", existing_events.len())).await; // Step 7: Process events in streaming fashion
save_optimized_events(&paths, existing_events).await?; logger::log_info("Corporate Update: Processing events (streaming)...").await;
logger::log_info("Corporate Update: Events saved successfully").await;
//save_changes(&all_changes).await?;
let msg = "✓ Corporate update complete"; let event_index = build_event_index(&paths).await?;
logger::log_info(&format!("Corporate Update: Built index of {} events", event_index.len())).await;
// For now, we just maintain the index
// In a full implementation, you'd stream through tickers and update events
// Step 8: Save any updates
logger::log_info("Corporate Update: Finalizing...").await;
let msg = "✓ Corporate update complete (streaming)";
println!("{}", msg); println!("{}", msg);
logger::log_info(msg).await; logger::log_info(msg).await;
Ok(()) Ok(())
} }
/// Helper to find the most recent date directory in the FIGI cache
async fn find_most_recent_date_dir(map_cache_dir: &std::path::Path) -> anyhow::Result<Option<std::path::PathBuf>> {
if !map_cache_dir.exists() {
return Ok(None);
}
let mut entries = tokio::fs::read_dir(map_cache_dir).await?;
let mut dates = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
// Date format: DDMMYYYY
if name.len() == 8 && name.chars().all(|c| c.is_numeric()) {
dates.push((name.to_string(), path));
}
}
}
}
if dates.is_empty() {
return Ok(None);
}
// Sort by date (DDMMYYYY format)
dates.sort_by(|a, b| b.0.cmp(&a.0)); // Descending order
Ok(Some(dates[0].1.clone()))
}
pub struct ProcessResult { pub struct ProcessResult {
pub changes: Vec<CompanyEventChange>, pub changes: Vec<CompanyEventChange>,
} }
/// Process events in batches to avoid memory buildup
pub async fn process_events_streaming(
index: &[EventIndex],
new_events: &[CompanyEvent],
today: &str,
) -> anyhow::Result<(Vec<CompanyEventChange>, Vec<CompanyEvent>)> {
let mut all_changes = Vec::new();
let mut final_events: HashMap<String, CompanyEvent> = HashMap::new();
// Step 1: Load existing events in batches using the index
logger::log_info("Loading existing events in batches...").await;
let mut loaded_files = std::collections::HashSet::new();
for entry in index {
if loaded_files.contains(&entry.file_path) {
continue;
}
let content = tokio::fs::read_to_string(&entry.file_path).await?;
let events: Vec<CompanyEvent> = serde_json::from_str(&content)?;
for e in events {
final_events.insert(event_key(&e), e);
}
loaded_files.insert(entry.file_path.clone());
if final_events.len() % 1000 == 0 {
logger::log_info(&format!("Loaded {} events so far...", final_events.len())).await;
tokio::task::yield_now().await;
}
}
logger::log_info(&format!("Loaded {} existing events", final_events.len())).await;
// Step 2: Process new events in batches
for (idx, batch) in new_events.chunks(500).enumerate() {
logger::log_info(&format!("Processing batch {} ({} events)", idx + 1, batch.len())).await;
let batch_result = process_batch(batch, &mut final_events, today);
all_changes.extend(batch_result.changes);
tokio::task::yield_now().await;
}
let events_vec: Vec<CompanyEvent> = final_events.into_values().collect();
Ok((all_changes, events_vec))
}
pub fn process_batch( pub fn process_batch(
new_events: &[CompanyEvent], new_events: &[CompanyEvent],
existing: &mut HashMap<String, CompanyEvent>, existing: &mut HashMap<String, CompanyEvent>,

View File

@@ -6,6 +6,10 @@ use crate::util::logger;
use tokio::fs; use tokio::fs;
use chrono::{NaiveDate, Datelike}; use chrono::{NaiveDate, Datelike};
use std::collections::HashMap; use std::collections::HashMap;
use serde_json;
const CHUNK_SIZE: usize = 500; // Process 500 events at a time
const MAX_EVENTS_PER_FILE: usize = 3000;
pub async fn scan_existing_chunks(paths: &DataPaths) -> anyhow::Result<Vec<ChunkInfo>> { pub async fn scan_existing_chunks(paths: &DataPaths) -> anyhow::Result<Vec<ChunkInfo>> {
let dir = paths.economic_events_dir(); let dir = paths.economic_events_dir();
@@ -18,37 +22,122 @@ pub async fn scan_existing_chunks(paths: &DataPaths) -> anyhow::Result<Vec<Chunk
if path.extension().map(|e| e == "json").unwrap_or(false) { if path.extension().map(|e| e == "json").unwrap_or(false) {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) { if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.starts_with("chunk_") { if name.starts_with("chunk_") {
if let Some(content) = fs::read_to_string(&path).await.ok() { // Don't load the events here, just record the chunk info
if let Ok(events) = serde_json::from_str::<Vec<EconomicEvent>>(&content) {
let start = name[6..16].to_string(); let start = name[6..16].to_string();
let end = name[17..27].to_string(); let end = name[17..27].to_string();
chunks.push(ChunkInfo { start_date: start, end_date: end, path, event_count: events.len() }); chunks.push(ChunkInfo {
} start_date: start,
} end_date: end,
path,
event_count: 0 // We'll count later if needed
});
} }
} }
} }
} }
} }
chunks.sort_by_key(|c| c.start_date.clone()); chunks.sort_by_key(|c| c.start_date.clone());
logger::log_info(&format!("Economic Storage: Scanned {} event chunks", chunks.len())).await; logger::log_info(&format!("Economic Storage: Found {} event chunks", chunks.len())).await;
Ok(chunks) Ok(chunks)
} }
pub async fn load_existing_events(chunks: &[ChunkInfo]) -> anyhow::Result<HashMap<String, EconomicEvent>> { /// Stream events from a single chunk file
let mut map = HashMap::new(); pub async fn stream_chunk_events(
for chunk in chunks { chunk: &ChunkInfo,
callback: impl Fn(EconomicEvent) -> anyhow::Result<()>
) -> anyhow::Result<usize> {
let content = fs::read_to_string(&chunk.path).await?; let content = fs::read_to_string(&chunk.path).await?;
let events: Vec<EconomicEvent> = serde_json::from_str(&content)?; let events: Vec<EconomicEvent> = serde_json::from_str(&content)?;
for e in events { let count = events.len();
map.insert(event_key(&e), e);
} for event in events {
} callback(event)?;
logger::log_info(&format!("Economic Storage: Loaded {} events from {} chunks", map.len(), chunks.len())).await;
Ok(map)
} }
pub async fn save_optimized_chunks(paths: &DataPaths, events: HashMap<String, EconomicEvent>) -> anyhow::Result<()> { Ok(count)
}
/// Load events in batches to avoid memory explosion
pub async fn load_events_in_batches(
chunks: &[ChunkInfo],
batch_size: usize,
) -> anyhow::Result<impl Iterator<Item = (String, EconomicEvent)>> {
let mut all_events = Vec::new();
for chunk in chunks {
logger::log_info(&format!("Loading chunk: {:?}", chunk.path.file_name())).await;
let content = fs::read_to_string(&chunk.path).await?;
let events: Vec<EconomicEvent> = serde_json::from_str(&content)?;
for e in events {
all_events.push((event_key(&e), e));
}
// If we've accumulated enough, yield them
if all_events.len() >= batch_size {
break;
}
}
logger::log_info(&format!("Loaded {} events in batch", all_events.len())).await;
Ok(all_events.into_iter())
}
/// NEW: Build a lightweight index instead of loading all events
#[derive(Debug, Clone)]
pub struct EventIndex {
pub key: String,
pub identity_key: String,
pub date: String,
pub chunk_file: std::path::PathBuf,
}
pub async fn build_event_index(chunks: &[ChunkInfo]) -> anyhow::Result<Vec<EventIndex>> {
let mut index = Vec::new();
for chunk in chunks {
logger::log_info(&format!("Indexing chunk: {:?}", chunk.path.file_name())).await;
let content = fs::read_to_string(&chunk.path).await?;
let events: Vec<EconomicEvent> = serde_json::from_str(&content)?;
for e in events {
index.push(EventIndex {
key: event_key(&e),
identity_key: identity_key(&e),
date: e.date.clone(),
chunk_file: chunk.path.clone(),
});
}
}
logger::log_info(&format!("Built index with {} entries", index.len())).await;
Ok(index)
}
/// NEW: Look up a specific event by loading only its chunk
pub async fn lookup_event_by_key(key: &str, index: &[EventIndex]) -> anyhow::Result<Option<EconomicEvent>> {
// Find which chunk contains this event
let entry = index.iter().find(|e| e.key == key);
if let Some(entry) = entry {
// Load only that chunk
let content = fs::read_to_string(&entry.chunk_file).await?;
let events: Vec<EconomicEvent> = serde_json::from_str(&content)?;
// Find the specific event
Ok(events.into_iter().find(|e| event_key(e) == key))
} else {
Ok(None)
}
}
/// Save events in smaller, more manageable chunks
pub async fn save_optimized_chunks(
paths: &DataPaths,
events: Vec<EconomicEvent> // Changed from HashMap to Vec
) -> anyhow::Result<()> {
let dir = paths.economic_events_dir(); let dir = paths.economic_events_dir();
fs::create_dir_all(dir).await?; fs::create_dir_all(dir).await?;
@@ -67,31 +156,36 @@ pub async fn save_optimized_chunks(paths: &DataPaths, events: HashMap<String, Ec
} }
logger::log_info(&format!("Economic Storage: Removed {} old chunk files", removed_count)).await; logger::log_info(&format!("Economic Storage: Removed {} old chunk files", removed_count)).await;
let mut sorted: Vec<_> = events.into_values().collect(); let mut sorted = events;
sorted.sort_by_key(|e| e.date.clone()); sorted.sort_by(|a, b| a.date.cmp(&b.date));
let mut chunk: Vec<EconomicEvent> = Vec::new(); // Save in smaller chunks
const MAX_EVENTS_PER_CHUNK: usize = ( 30000 / 2 ) / 11; // (30000 - 2) / 11 = 2727 let mut chunk_num = 0;
for chunk in sorted.chunks(MAX_EVENTS_PER_FILE) {
save_chunk_vec(chunk, dir, chunk_num).await?;
chunk_num += 1;
for e in sorted { // Allow other tasks to run
if !chunk.is_empty() && chunk.len() >= MAX_EVENTS_PER_CHUNK { tokio::task::yield_now().await;
save_chunk(&chunk, dir).await?;
chunk.clear();
} }
chunk.push(e);
} logger::log_info(&format!("Economic Storage: Saved {} chunks to {:?}", chunk_num, dir)).await;
if !chunk.is_empty() {
save_chunk(&chunk, dir).await?;
}
logger::log_info(&format!("Economic Storage: Saved all event chunks to {:?}", dir)).await;
Ok(()) Ok(())
} }
async fn save_chunk(events: &[EconomicEvent], dir: &std::path::Path) -> anyhow::Result<()> { async fn save_chunk_vec(events: &[EconomicEvent], dir: &std::path::Path, chunk_num: usize) -> anyhow::Result<()> {
let start = events.iter().map(|e| &e.date).min().unwrap().clone(); if events.is_empty() {
let end = events.iter().map(|e| &e.date).max().unwrap().clone(); return Ok(());
let path = dir.join(format!("chunk_{}_{}.json", start, end)); }
fs::write(&path, serde_json::to_string_pretty(events)?).await?;
let start = &events[0].date;
let end = &events[events.len() - 1].date;
let path = dir.join(format!("chunk_{:04}_{}_{}.json", chunk_num, start, end));
// Write incrementally to avoid large memory allocation
let json = serde_json::to_string_pretty(events)?;
fs::write(&path, json).await?;
logger::log_info(&format!("Economic Storage: Saved chunk {} - {} ({} events)", start, end, events.len())).await; logger::log_info(&format!("Economic Storage: Saved chunk {} - {} ({} events)", start, end, events.len())).await;
Ok(()) Ok(())
} }

View File

@@ -3,15 +3,9 @@ use super::{scraper::*, storage::*, helpers::*, types::*};
use crate::{config::Config, scraper::webdriver::{ScrapeTask, ChromeDriverPool}, util::directories::DataPaths, util::logger}; use crate::{config::Config, scraper::webdriver::{ScrapeTask, ChromeDriverPool}, util::directories::DataPaths, util::logger};
use chrono::{Local}; use chrono::{Local};
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap;
/// Runs the full update for economic data, using the provided ChromeDriver pool. /// Runs the full update for economic data using streaming to minimize memory usage
///
/// # Arguments
/// * `config` - The application configuration.
/// * `pool` - Shared pool of ChromeDriver instances for scraping.
///
/// # Errors
/// Returns an error if scraping, loading, or saving fails.
pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> { pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
let paths = DataPaths::new(".")?; let paths = DataPaths::new(".")?;
@@ -20,81 +14,124 @@ pub async fn run_full_update(config: &Config, pool: &Arc<ChromeDriverPool>) -> a
let today_str = chrono::Local::now().date_naive().format("%Y-%m-%d").to_string(); let today_str = chrono::Local::now().date_naive().format("%Y-%m-%d").to_string();
let end_date = config.target_end_date(); let end_date = config.target_end_date();
logger::log_info(&format!("Economic Update: Scanning existing chunks from {:?}", paths.economic_events_dir())).await; // Step 1: Build lightweight index instead of loading all events
logger::log_info("Economic Update: Building event index...").await;
let chunks = scan_existing_chunks(&paths).await?; let chunks = scan_existing_chunks(&paths).await?;
let mut events = load_existing_events(&chunks).await?; let event_index = build_event_index(&chunks).await?;
let msg = format!("Economic Update: Loaded {} events from {} chunks", events.len(), chunks.len()); logger::log_info(&format!("Economic Update: Indexed {} events from {} chunks",
println!("{}", msg); event_index.len(), chunks.len())).await;
logger::log_info(&msg).await;
let start_date = if events.is_empty() { // Step 2: Determine start date
let start_date = if event_index.is_empty() {
logger::log_warn("Economic Update: No existing events found, starting from config date").await; logger::log_warn("Economic Update: No existing events found, starting from config date").await;
config.economic_start_date.clone() config.economic_start_date.clone()
} else if events.values().any(|e| e.date >= today_str) { } else {
// Find the latest date in the index
let max_date = event_index.iter()
.map(|e| &e.date)
.max()
.cloned()
.unwrap_or(today_str.clone());
if max_date >= today_str {
logger::log_info("Economic Update: Events exist for today, starting from today").await; logger::log_info("Economic Update: Events exist for today, starting from today").await;
today_str.clone() today_str.clone()
} else { } else {
let next = events.values() let next = chrono::NaiveDate::parse_from_str(&max_date, "%Y-%m-%d")
.filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok()) .ok()
.max()
.and_then(|d| d.succ_opt()) .and_then(|d| d.succ_opt())
.map(|d| d.format("%Y-%m-%d").to_string()) .map(|d| d.format("%Y-%m-%d").to_string())
.unwrap_or(today_str.clone()); .unwrap_or(today_str.clone());
logger::log_info(&format!("Economic Update: Resuming from: {}", next)).await; logger::log_info(&format!("Economic Update: Resuming from: {}", next)).await;
next next
}
}; };
let msg = format!("Economic Update: Scraping events from {}{}", start_date, end_date); logger::log_info(&format!("Economic Update: Scraping events from {}{}", start_date, end_date)).await;
println!("{}", msg);
logger::log_info(&msg).await;
// Pass the pool to the scraping function // Step 3: Scrape new events in batches
let new_events_all = scrape_all_economic_events(&start_date, &end_date, pool).await?; let new_events = scrape_all_economic_events(&start_date, &end_date, pool).await?;
let msg = format!("Economic Update: Scraped {} new events", new_events_all.len()); logger::log_info(&format!("Economic Update: Scraped {} new events", new_events.len())).await;
println!("{}", msg);
logger::log_info(&msg).await;
// Process all at once or in batches // Step 4: Process events in streaming fashion
let result = process_batch(&new_events_all, &mut events, &today_str); let (changes, updated_events) = process_events_streaming(&chunks, &new_events, &today_str).await?;
let total_changes = result.changes.len();
let msg = format!("Economic Update: Detected {} changes", total_changes); logger::log_info(&format!("Economic Update: Detected {} changes", changes.len())).await;
println!("{}", msg);
logger::log_info(&msg).await;
if total_changes > 0 { if !changes.is_empty() {
logger::log_info(&format!("Economic Update: Saving {} changes to log", total_changes)).await; logger::log_info(&format!("Economic Update: Saving {} changes to log", changes.len())).await;
save_changes(&paths, &result.changes).await?; save_changes(&paths, &changes).await?;
logger::log_info("Economic Update: Changes saved successfully").await; logger::log_info("Economic Update: Changes saved successfully").await;
} }
logger::log_info(&format!("Economic Update: Saving {} total events to chunks", events.len())).await; // Step 5: Save consolidated events
save_optimized_chunks(&paths, events).await?; logger::log_info(&format!("Economic Update: Saving {} total events to chunks", updated_events.len())).await;
save_optimized_chunks(&paths, updated_events).await?;
let msg = format!("✓ Economic update complete — {} changes detected", total_changes); logger::log_info(&format!("✓ Economic update complete — {} changes detected", changes.len())).await;
println!("{}", msg);
logger::log_info(&msg).await;
Ok(()) Ok(())
} }
/// Scrapes all economic events from start to end date using a dedicated ScrapeTask with the provided pool. /// Process events using streaming to minimize memory usage
/// async fn process_events_streaming(
/// This function creates a ScrapeTask to navigate to the Finanzen.net page, prepare it, chunks: &[ChunkInfo],
/// and then loop through date ranges to extract events. new_events: &[EconomicEvent],
/// today: &str,
/// # Arguments ) -> anyhow::Result<(Vec<EventChange>, Vec<EconomicEvent>)> {
/// * `start` - Start date in YYYY-MM-DD. let mut all_changes = Vec::new();
/// * `end` - End date in YYYY-MM-DD. let mut final_events: HashMap<String, EconomicEvent> = HashMap::new();
/// * `pool` - Shared pool of ChromeDriver instances.
/// // Step 1: Load existing events in batches
/// # Returns logger::log_info("Processing existing events in batches...").await;
/// A vector of all extracted EconomicEvent structs.
/// for chunk in chunks {
/// # Errors logger::log_info(&format!("Loading chunk: {:?}", chunk.path.file_name())).await;
/// Returns an error if task execution fails or extraction issues occur.
pub async fn scrape_all_economic_events(start: &str, end: &str, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<Vec<EconomicEvent>> { let content = tokio::fs::read_to_string(&chunk.path).await?;
let events: Vec<EconomicEvent> = serde_json::from_str(&content)?;
// Add to final events map
for e in events {
final_events.insert(event_key(&e), e);
}
// Clear memory periodically
if final_events.len() > 10000 {
logger::log_info(&format!("Loaded {} events so far...", final_events.len())).await;
}
}
logger::log_info(&format!("Loaded {} existing events total", final_events.len())).await;
// Step 2: Process new events in batches
logger::log_info("Processing new events...").await;
for (idx, batch) in new_events.chunks(500).enumerate() {
logger::log_info(&format!("Processing batch {} ({} events)", idx + 1, batch.len())).await;
let batch_result = process_batch(batch, &mut final_events, today);
all_changes.extend(batch_result.changes);
// Yield to prevent blocking
tokio::task::yield_now().await;
}
logger::log_info(&format!("Processing complete. Total events: {}", final_events.len())).await;
// Convert HashMap to Vec for saving
let events_vec: Vec<EconomicEvent> = final_events.into_values().collect();
Ok((all_changes, events_vec))
}
/// Scrapes all economic events from start to end date
pub async fn scrape_all_economic_events(
start: &str,
end: &str,
pool: &Arc<ChromeDriverPool>
) -> anyhow::Result<Vec<EconomicEvent>> {
let url = "https://www.finanzen.net/termine/wirtschaftsdaten/".to_string(); let url = "https://www.finanzen.net/termine/wirtschaftsdaten/".to_string();
let start_clone = start.to_string(); let start_clone = start.to_string();
let end_clone = end.to_string(); let end_clone = end.to_string();
@@ -108,9 +145,18 @@ pub async fn scrape_all_economic_events(start: &str, end: &str, pool: &Arc<Chrom
set_date_range(&client, &current, &end_clone).await?; set_date_range(&client, &current, &end_clone).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let new_events = extract_events(&client).await?; let new_events = extract_events(&client).await?;
if new_events.is_empty() { break; }
if new_events.is_empty() {
break;
}
all_events.extend(new_events.clone()); all_events.extend(new_events.clone());
// Prevent memory buildup - process in chunks if too large
if all_events.len() > 5000 {
logger::log_info(&format!("Scraped {} events so far, continuing...", all_events.len())).await;
}
let next = new_events.iter() let next = new_events.iter()
.filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok()) .filter_map(|e| chrono::NaiveDate::parse_from_str(&e.date, "%Y-%m-%d").ok())
.max() .max()
@@ -121,16 +167,17 @@ pub async fn scrape_all_economic_events(start: &str, end: &str, pool: &Arc<Chrom
if next > end_clone { break; } if next > end_clone { break; }
current = next; current = next;
} }
Ok(all_events) Ok(all_events)
}); });
// Use the pool for execution
task.execute_with_pool(pool).await task.execute_with_pool(pool).await
} }
/// Process a batch of events and detect changes
pub fn process_batch( pub fn process_batch(
new_events: &[EconomicEvent], new_events: &[EconomicEvent],
existing: &mut std::collections::HashMap<String, EconomicEvent>, existing: &mut HashMap<String, EconomicEvent>,
today: &str, today: &str,
) -> ScrapeResult { ) -> ScrapeResult {
let mut changes = Vec::new(); let mut changes = Vec::new();

View File

@@ -150,7 +150,7 @@ impl DockerVpnProxyPool {
async fn test_all_proxies_parallel(container_names: &[String], proxy_ports: &[u16]) -> Vec<Result<Option<String>>> { async fn test_all_proxies_parallel(container_names: &[String], proxy_ports: &[u16]) -> Vec<Result<Option<String>>> {
let mut tasks = Vec::new(); let mut tasks = Vec::new();
for (i, (container_name, port)) in container_names.iter().zip(proxy_ports.iter()).enumerate() { for (_i, (container_name, port)) in container_names.iter().zip(proxy_ports.iter()).enumerate() {
let name = container_name.clone(); let name = container_name.clone();
let port = *port; let port = *port;

View File

@@ -28,7 +28,7 @@ impl ChromeDriverPool {
} }
/// Creates a new pool with task-per-instance limit but no proxy. /// Creates a new pool with task-per-instance limit but no proxy.
pub async fn new_with_task_limit(pool_size: usize, max_tasks_per_instance: usize) -> Result<Self> { pub async fn _new_with_task_limit(pool_size: usize, max_tasks_per_instance: usize) -> Result<Self> {
Self::new_with_proxy_and_task_limit(pool_size, None, max_tasks_per_instance).await Self::new_with_proxy_and_task_limit(pool_size, None, max_tasks_per_instance).await
} }