From 86944a9c58bca8f262466337bba680c4759e93a7 Mon Sep 17 00:00:00 2001 From: donpat1to Date: Wed, 24 Dec 2025 00:00:21 +0100 Subject: [PATCH] cleaned yahoo hits --- .gitignore | 1 + src/corporate/openfigi.rs | 881 ++++++++++++++++++++++++------- src/corporate/update.rs | 70 ++- src/corporate/update_parallel.rs | 94 +++- 4 files changed, 829 insertions(+), 217 deletions(-) diff --git a/.gitignore b/.gitignore index 6d7845a..80d247d 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,7 @@ target/ **/*.log **/*.ovpn **/*.tmp +**/*.txt #/economic_events* #/economic_event_changes* diff --git a/src/corporate/openfigi.rs b/src/corporate/openfigi.rs index db02c04..5c4071c 100644 --- a/src/corporate/openfigi.rs +++ b/src/corporate/openfigi.rs @@ -223,58 +223,6 @@ async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> Ok(()) } -/// STREAMING: Build securities without loading everything into memory -pub async fn build_securities_from_figi_streaming( - date_dir: &Path, -) -> anyhow::Result<()> { - logger::log_info("Building securities (streaming mode)...").await; - - // Load existing incrementally - let mut commons = load_from_cache_if_exists::>( - "data/corporate/by_name/common_stocks.json" - ).await?; - - let equity_file = date_dir.join("Equity").join("lei_to_figi.jsonl"); - - if !equity_file.exists() { - logger::log_warn("No Equity FIGI file found").await; - return Ok(()); - } - - let content = tokio_fs::read_to_string(&equity_file).await?; - let mut processed = 0; - let mut stats = ProcessingStats::new(commons.len(), 0, 0); - - for line in content.lines() { - if line.trim().is_empty() { - continue; - } - - let entry: Value = serde_json::from_str(line)?; - let figi_infos: Vec = serde_json::from_value(entry["figis"].clone())?; - - // Process only common stocks - let common_stocks: Vec<_> = figi_infos.iter() - .filter(|f| f.security_type == "Common Stock") - .cloned() - .collect(); - - if !common_stocks.is_empty() { - process_common_stocks(&mut commons, &common_stocks, &mut stats); - } - - processed += 1; - if processed % 100 == 0 { - tokio::task::yield_now().await; - } - } - - logger::log_info(&format!("Processed {} FIGI entries", processed)).await; - save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?; - - Ok(()) -} - /// Handles rate limit responses from the OpenFIGI API. /// /// If a 429 status is received, this function sleeps for the duration specified @@ -310,56 +258,599 @@ async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> { Ok(()) } -fn process_common_stocks( - companies: &mut HashMap, - figi_infos: &[FigiInfo], - stats: &mut ProcessingStats, -) { - let name = figi_infos[0].name.clone(); - if name.is_empty() { - return; +/// Loads or builds securities data by streaming through FIGI mapping files. +/// +/// Implements abort-safe incremental persistence with checkpoints and replay logs. +/// +/// # Arguments +/// * `date_dir` - Path to the date-specific mapping directory (e.g., cache/gleif_openfigi_map/24112025/) +/// +/// # Returns +/// Ok(()) on success. +/// +/// # Errors +/// Returns an error if file I/O fails or JSON parsing fails. +pub async fn load_or_build_all_securities(date_dir: &Path) -> anyhow::Result<()> { + logger::log_info("Building securities data from FIGI mappings...").await; + + let dir = DataPaths::new(".")?; + let data_dir = dir.data_dir(); + let corporate_data_dir = data_dir.join("corporate"); + let output_dir = corporate_data_dir.join("by_name"); + tokio_fs::create_dir_all(&output_dir).await + .context("Failed to create corporate/by_name directory")?; + + // Setup checkpoint and log paths for each security type + let common_checkpoint = output_dir.join("common_stocks.jsonl"); + let common_log = output_dir.join("common_stocks.log.jsonl"); + let warrants_checkpoint = output_dir.join("warrants.jsonl"); + let warrants_log = output_dir.join("warrants.log.jsonl"); + let options_checkpoint = output_dir.join("options.jsonl"); + let options_log = output_dir.join("options.log.jsonl"); + + // Track which sectors have been fully processed + let processed_sectors_file = output_dir.join("state.jsonl"); + let processed_sectors = load_processed_sectors(&processed_sectors_file).await?; + + logger::log_info(&format!(" Already processed {} sectors", processed_sectors.len())).await; + + // Collect sectors to process + let mut sectors_to_process = Vec::new(); + let mut entries = tokio_fs::read_dir(date_dir).await + .context("Failed to read date directory")?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if !path.is_dir() { + continue; + } + + let sector_name = path.file_name() + .and_then(|n| n.to_str()) + .map(|s| s.to_string()) + .unwrap_or_else(|| "unknown".to_string()); + + let lei_figi_file = path.join("lei_to_figi.jsonl"); + if !lei_figi_file.exists() { + continue; + } + + // Skip if already processed + if processed_sectors.contains(§or_name) { + logger::log_info(&format!(" Skipping already processed sector: {}", sector_name)).await; + continue; + } + + sectors_to_process.push((sector_name, lei_figi_file)); } - let grouped_by_isin = group_by_isin(figi_infos); - - if let Some(existing) = companies.get_mut(&name) { - let mut updated = false; - for (isin, new_figis) in grouped_by_isin { - if let Some(existing_figis) = existing.securities.get_mut(&isin) { - let merged = merge_figi_list(existing_figis, &new_figis); - if merged.len() > existing_figis.len() { - *existing_figis = merged; - updated = true; - } - } else { - existing.securities.insert(isin.clone(), new_figis); - updated = true; - } - } - - if existing.primary_isin.is_empty() { - if let Some(first_isin) = existing.securities.keys().next() { - existing.primary_isin = first_isin.clone(); - } - } - - if updated { - stats.companies_updated += 1; - } - } else { - let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default(); - - companies.insert(name.clone(), CompanyInfo { - name, - primary_isin, - securities: grouped_by_isin, - }); - - stats.companies_added += 1; + if sectors_to_process.is_empty() { + logger::log_info(" All sectors already processed, nothing to do").await; + return Ok(()); } + + // Load checkpoints and replay logs - these are MUTABLE now + let mut existing_companies = load_checkpoint_and_replay(&common_checkpoint, &common_log, "name").await?; + let mut existing_warrants = load_checkpoint_and_replay_nested(&warrants_checkpoint, &warrants_log).await?; + let mut existing_options = load_checkpoint_and_replay_nested(&options_checkpoint, &options_log).await?; + + logger::log_info(&format!(" Existing entries - Companies: {}, Warrants: {}, Options: {}", + existing_companies.len(), existing_warrants.len(), existing_options.len())).await; + + // Process statistics + let mut stats = StreamingStats::new( + existing_companies.len(), + existing_warrants.len(), + existing_options.len() + ); + + logger::log_info(&format!(" Found {} sectors to process", sectors_to_process.len())).await; + + // Process each sector + let mut newly_processed_sectors = Vec::new(); + + for (sector_name, lei_figi_file) in sectors_to_process { + logger::log_info(&format!(" Processing sector: {}", sector_name)).await; + + // Stream through the lei_to_figi.jsonl file with batched writes + process_lei_figi_file_batched( + &lei_figi_file, + &common_log, + &warrants_log, + &options_log, + &mut existing_companies, + &mut existing_warrants, + &mut existing_options, + &mut stats, + ).await?; + + // Mark sector as processed + newly_processed_sectors.push(sector_name.clone()); + + // Append to processed sectors file immediately for crash safety + append_processed_sector(&processed_sectors_file, §or_name).await?; + } + + // Create checkpoints after all processing + if !newly_processed_sectors.is_empty() { + logger::log_info("Creating checkpoints...").await; + create_checkpoint(&common_checkpoint, &common_log).await?; + create_checkpoint(&warrants_checkpoint, &warrants_log).await?; + create_checkpoint(&options_checkpoint, &options_log).await?; + } + + stats.print_summary(); + logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await; + + Ok(()) } -fn group_by_isin(figi_infos: &[FigiInfo]) -> HashMap> { +/// Loads the list of sectors that have been fully processed +async fn load_processed_sectors(path: &Path) -> anyhow::Result> { + let mut sectors = HashSet::new(); + + if !path.exists() { + return Ok(sectors); + } + + let content = tokio_fs::read_to_string(path).await + .context("Failed to read processed sectors file")?; + + for (line_num, line) in content.lines().enumerate() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(entry) => { + if let Some(sector) = entry["sector"].as_str() { + sectors.insert(sector.to_string()); + } + } + Err(e) => { + logger::log_warn(&format!( + "Skipping invalid processed sector line {}: {}", + line_num + 1, e + )).await; + } + } + } + + Ok(sectors) +} + +/// Appends a sector name to the processed sectors file with fsync +/// Appends a sector name to the processed sectors JSONL file with fsync +async fn append_processed_sector(path: &Path, sector_name: &str) -> anyhow::Result<()> { + use std::fs::OpenOptions; + use std::io::Write; + + let entry = json!({ + "sector": sector_name, + "completed_at": chrono::Utc::now().to_rfc3339(), + }); + + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .context("Failed to open processed sectors file")?; + + let line = serde_json::to_string(&entry) + .context("Failed to serialize sector entry")? + "\n"; + + file.write_all(line.as_bytes())?; + + // Ensure durability + file.sync_data() + .context("Failed to fsync processed sectors file")?; + + Ok(()) +} + +/// Loads checkpoint and replays log, returning set of existing keys +async fn load_checkpoint_and_replay( + checkpoint_path: &Path, + log_path: &Path, + key_field: &str, +) -> anyhow::Result> { + let mut keys = HashSet::new(); + + // Load checkpoint if it exists + if checkpoint_path.exists() { + let content = tokio_fs::read_to_string(checkpoint_path).await + .context("Failed to read checkpoint")?; + + for line in content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + if let Ok(entry) = serde_json::from_str::(line) { + if let Some(key) = entry[key_field].as_str() { + keys.insert(key.to_string()); + } + } + } + } + + // Replay log if it exists + if log_path.exists() { + let content = tokio_fs::read_to_string(log_path).await + .context("Failed to read log")?; + + for line in content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + if let Ok(entry) = serde_json::from_str::(line) { + if let Some(key) = entry[key_field].as_str() { + keys.insert(key.to_string()); + } + } + } + } + + Ok(keys) +} + +/// Loads checkpoint and replays log for nested structures (warrants/options) +async fn load_checkpoint_and_replay_nested( + checkpoint_path: &Path, + log_path: &Path, +) -> anyhow::Result> { + let mut keys = HashSet::new(); + + // Load checkpoint if it exists + if checkpoint_path.exists() { + let content = tokio_fs::read_to_string(checkpoint_path).await + .context("Failed to read checkpoint")?; + + for line in content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; + } + + if let Ok(entry) = serde_json::from_str::(line) { + let underlying = entry["underlying_company_name"].as_str().unwrap_or(""); + let type_field = if entry.get("warrant_type").is_some() { + entry["warrant_type"].as_str().unwrap_or("") + } else { + entry["option_type"].as_str().unwrap_or("") + }; + + if !underlying.is_empty() && !type_field.is_empty() { + keys.insert(format!("{}::{}", underlying, type_field)); + } + } + } + } + + // Replay log if it exists + if log_path.exists() { + let content = tokio_fs::read_to_string(log_path).await + .context("Failed to read log")?; + + for line in content.lines() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; + } + + if let Ok(entry) = serde_json::from_str::(line) { + let underlying = entry["underlying_company_name"].as_str().unwrap_or(""); + let type_field = if entry.get("warrant_type").is_some() { + entry["warrant_type"].as_str().unwrap_or("") + } else { + entry["option_type"].as_str().unwrap_or("") + }; + + if !underlying.is_empty() && !type_field.is_empty() { + keys.insert(format!("{}::{}", underlying, type_field)); + } + } + } + } + + Ok(keys) +} + +/// Creates a checkpoint by copying log to checkpoint atomically +async fn create_checkpoint(checkpoint_path: &Path, log_path: &Path) -> anyhow::Result<()> { + if !log_path.exists() { + return Ok(()); + } + + // Read all committed lines from log + let content = tokio_fs::read_to_string(log_path).await + .context("Failed to read log for checkpoint")?; + + let committed_lines: Vec<&str> = content + .lines() + .filter(|line| !line.trim().is_empty() && line.ends_with('}')) + .collect(); + + if committed_lines.is_empty() { + return Ok(()); + } + + // Write to temporary file + let tmp_path = checkpoint_path.with_extension("tmp"); + let mut tmp_file = std::fs::File::create(&tmp_path) + .context("Failed to create temp checkpoint")?; + + for line in committed_lines { + use std::io::Write; + writeln!(tmp_file, "{}", line)?; + } + + // Ensure data is flushed to disk + tmp_file.sync_data() + .context("Failed to sync temp checkpoint")?; + + drop(tmp_file); + + // Atomic rename + tokio_fs::rename(&tmp_path, checkpoint_path).await + .context("Failed to rename checkpoint")?; + + // Clear log after successful checkpoint + tokio_fs::remove_file(log_path).await + .context("Failed to remove log after checkpoint")?; + + Ok(()) +} + +/// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync +async fn process_lei_figi_file_batched( + input_path: &Path, + common_log_path: &Path, + warrants_log_path: &Path, + options_log_path: &Path, + existing_companies: &mut HashSet, + existing_warrants: &mut HashSet, + existing_options: &mut HashSet, + stats: &mut StreamingStats, +) -> anyhow::Result<()> { + let content = tokio_fs::read_to_string(input_path).await + .context("Failed to read lei_to_figi.jsonl")?; + + let batch_size = 100; + let mut processed_count = 0; + + let mut common_batch = Vec::new(); + let mut warrants_batch = Vec::new(); + let mut options_batch = Vec::new(); + + for (line_num, line) in content.lines().enumerate() { + if line.trim().is_empty() { + continue; + } + + let entry: Value = serde_json::from_str(line) + .context(format!("Failed to parse JSON on line {}", line_num + 1))?; + + let figis: Vec = serde_json::from_value(entry["figis"].clone()) + .context("Invalid 'figis' field")?; + + if figis.is_empty() { + continue; + } + + // Group by security type + let (common_stocks, warrant_securities, option_securities) = + group_by_security_type(&figis); + + // Collect entries for batching and update existing keys + if !common_stocks.is_empty() { + if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) { + // Add to existing set immediately to prevent duplicates in same run + existing_companies.insert(entry.name.clone()); + common_batch.push(entry); + } + } + + if !warrant_securities.is_empty() { + for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) { + // Add to existing set immediately + let key = format!("{}::{}", entry.underlying_company_name, entry.warrant_type); + existing_warrants.insert(key); + warrants_batch.push(entry); + } + } + + if !option_securities.is_empty() { + for entry in prepare_option_entries(&option_securities, existing_options) { + // Add to existing set immediately + let key = format!("{}::{}", entry.underlying_company_name, entry.option_type); + existing_options.insert(key); + options_batch.push(entry); + } + } + + // Write batches when they reach size limit + if common_batch.len() >= batch_size { + write_batch_with_fsync(common_log_path, &common_batch).await?; + stats.companies_added += common_batch.len(); + common_batch.clear(); + } + + if warrants_batch.len() >= batch_size { + write_batch_with_fsync(warrants_log_path, &warrants_batch).await?; + stats.warrants_added += warrants_batch.len(); + warrants_batch.clear(); + } + + if options_batch.len() >= batch_size { + write_batch_with_fsync(options_log_path, &options_batch).await?; + stats.options_added += options_batch.len(); + options_batch.clear(); + } + + processed_count += 1; + if processed_count % 1000 == 0 { + logger::log_info(&format!(" Processed {} LEI entries...", processed_count)).await; + } + } + + // Write remaining batches + if !common_batch.is_empty() { + write_batch_with_fsync(common_log_path, &common_batch).await?; + stats.companies_added += common_batch.len(); + } + + if !warrants_batch.is_empty() { + write_batch_with_fsync(warrants_log_path, &warrants_batch).await?; + stats.warrants_added += warrants_batch.len(); + } + + if !options_batch.is_empty() { + write_batch_with_fsync(options_log_path, &options_batch).await?; + stats.options_added += options_batch.len(); + } + + Ok(()) +} + +/// Writes a batch of entries to log with fsync +async fn write_batch_with_fsync( + log_path: &Path, + entries: &[T], +) -> anyhow::Result<()> { + use std::fs::OpenOptions; + use std::io::Write; + + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(log_path) + .context("Failed to open log file")?; + + for entry in entries { + let line = serde_json::to_string(entry) + .context("Failed to serialize entry")?; + writeln!(file, "{}", line)?; + } + + // Critical: fsync to ensure durability + file.sync_data() + .context("Failed to fsync log file")?; + + Ok(()) +} + +/// Prepares a common stock entry if it doesn't exist +fn prepare_common_stock_entry( + figi_infos: &[FigiInfo], + existing_keys: &HashSet, +) -> Option { + let name = figi_infos[0].name.clone(); + if name.is_empty() || existing_keys.contains(&name) { + return None; + } + + let grouped_by_isin = group_figis_by_isin(figi_infos); + let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default(); + + Some(CompanyInfo { + name, + primary_isin, + securities: grouped_by_isin, + }) +} + +/// Prepares warrant entries for batching +fn prepare_warrant_entries( + warrant_securities: &[FigiInfo], + existing_keys: &HashSet, +) -> Vec { + let mut entries = Vec::new(); + + for figi in warrant_securities { + let (underlying, issuer, warrant_type) = parse_warrant_name(&figi.name); + + if underlying.is_empty() { + continue; + } + + let key = format!("{}::{}", underlying, warrant_type); + if existing_keys.contains(&key) { + continue; + } + + let warrant_info = WarrantInfo { + underlying_company_name: underlying.clone(), + issuer_company_name: issuer, + warrant_type: warrant_type.clone(), + warrants: { + let mut map = HashMap::new(); + map.insert(figi.isin.clone(), vec![figi.clone()]); + map + }, + }; + + entries.push(warrant_info); + } + + entries +} + +/// Prepares option entries for batching +fn prepare_option_entries( + option_securities: &[FigiInfo], + existing_keys: &HashSet, +) -> Vec { + let mut entries = Vec::new(); + + for figi in option_securities { + let (underlying, issuer, option_type) = parse_option_name(&figi.name); + + if underlying.is_empty() { + continue; + } + + let key = format!("{}::{}", underlying, option_type); + if existing_keys.contains(&key) { + continue; + } + + let option_info = OptionInfo { + underlying_company_name: underlying.clone(), + issuer_company_name: issuer, + option_type: option_type.clone(), + options: { + let mut map = HashMap::new(); + map.insert(figi.isin.clone(), vec![figi.clone()]); + map + }, + }; + + entries.push(option_info); + } + + entries +} + +/// Groups FigiInfo list by security type +fn group_by_security_type(figis: &[FigiInfo]) -> (Vec, Vec, Vec) { + let mut common_stocks = Vec::new(); + let mut warrants = Vec::new(); + let mut options = Vec::new(); + + for figi in figis { + match figi.security_type.as_str() { + "Common Stock" => common_stocks.push(figi.clone()), + "Equity WRT" => warrants.push(figi.clone()), + "Equity Option" => options.push(figi.clone()), + _ => {} + } + } + + (common_stocks, warrants, options) +} + +/// Groups FigiInfo by ISIN +fn group_figis_by_isin(figi_infos: &[FigiInfo]) -> HashMap> { let mut grouped: HashMap> = HashMap::new(); for figi_info in figi_infos { @@ -375,65 +866,126 @@ fn group_by_isin(figi_infos: &[FigiInfo]) -> HashMap> { grouped } -fn merge_figi_list(existing: &[FigiInfo], new_figis: &[FigiInfo]) -> Vec { - let mut merged = existing.to_vec(); - let existing_figis: HashSet = existing.iter() - .map(|f| f.figi.clone()) - .collect(); +/// Parse warrant name to extract underlying company, issuer, and warrant type +/// +/// Examples: +/// - "VONTOBE-PW26 LEONARDO SPA" -> ("LEONARDO SPA", Some("VONTOBEL"), "put") +/// - "BAYER H-CW25 L'OREAL" -> ("L'OREAL", Some("BAYER H"), "call") +/// - "APPLE INC WARRANT" -> ("APPLE INC", None, "unknown") +fn parse_warrant_name(name: &str) -> (String, Option, String) { + let name_upper = name.to_uppercase(); - for new_figi in new_figis { - if !existing_figis.contains(&new_figi.figi) { - merged.push(new_figi.clone()); - } + // Try to detect warrant type from code (PW=put, CW=call) + let warrant_type = if name_upper.contains("-PW") || name_upper.contains(" PW") { + "put".to_string() + } else if name_upper.contains("-CW") || name_upper.contains(" CW") { + "call".to_string() + } else { + "unknown".to_string() + }; + + // Try to split by warrant code pattern (e.g., "-PW26", "-CW25") + if let Some(pos) = name.find("-PW") { + let before = name[..pos].trim(); + let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len()); + let after = if after_idx < name.len() { + name[after_idx..].trim() + } else { + "" + }; + + return ( + after.to_string(), + if !before.is_empty() { Some(before.to_string()) } else { None }, + warrant_type, + ); } - merged.sort_by(|a, b| a.figi.cmp(&b.figi)); - merged + if let Some(pos) = name.find("-CW") { + let before = name[..pos].trim(); + let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len()); + let after = if after_idx < name.len() { + name[after_idx..].trim() + } else { + "" + }; + + return ( + after.to_string(), + if !before.is_empty() { Some(before.to_string()) } else { None }, + warrant_type, + ); + } + + // Fallback: return entire name as underlying + (name.to_string(), None, warrant_type) } +/// Parse option name to extract underlying company, issuer, and option type +/// +/// Examples: +/// - "December 25 Calls on ALPHA GA" -> ("ALPHA GA", None, "call") +/// - "January 26 Puts on TESLA INC" -> ("TESLA INC", None, "put") +fn parse_option_name(name: &str) -> (String, Option, String) { + let name_upper = name.to_uppercase(); + + // Detect option type + let option_type = if name_upper.contains("CALL") { + "call".to_string() + } else if name_upper.contains("PUT") { + "put".to_string() + } else { + "unknown".to_string() + }; + + // Try to extract underlying after "on" + if let Some(pos) = name_upper.find(" ON ") { + let underlying = name[pos + 4..].trim().to_string(); + return (underlying, None, option_type); + } + + // Fallback: return entire name + (name.to_string(), None, option_type) +} + +/// Statistics tracker for streaming processing #[derive(Debug)] -struct ProcessingStats { +struct StreamingStats { initial_companies: usize, + initial_warrants: usize, + initial_options: usize, companies_added: usize, - companies_updated: usize, + warrants_added: usize, + options_added: usize, } -impl ProcessingStats { - fn new(companies: usize, _warrants: usize, _options: usize) -> Self { +impl StreamingStats { + fn new(companies: usize, warrants: usize, options: usize) -> Self { Self { initial_companies: companies, + initial_warrants: warrants, + initial_options: options, companies_added: 0, - companies_updated: 0, + warrants_added: 0, + options_added: 0, } } -} - -async fn load_from_cache_if_exists(path: &str) -> anyhow::Result -where - T: serde::de::DeserializeOwned + Default, -{ - let cache_file = Path::new(path); - if !cache_file.exists() { - return Ok(T::default()); + fn print_summary(&self) { + println!("\n=== Processing Statistics ==="); + println!("Companies:"); + println!(" - Initial: {}", self.initial_companies); + println!(" - Added: {}", self.companies_added); + println!(" - Total: {}", self.initial_companies + self.companies_added); + println!("Warrants:"); + println!(" - Initial: {}", self.initial_warrants); + println!(" - Added: {}", self.warrants_added); + println!(" - Total: {}", self.initial_warrants + self.warrants_added); + println!("Options:"); + println!(" - Initial: {}", self.initial_options); + println!(" - Added: {}", self.options_added); + println!(" - Total: {}", self.initial_options + self.options_added); } - - let content = tokio_fs::read_to_string(cache_file).await?; - Ok(serde_json::from_str(&content)?) -} - -async fn save_to_cache(path: &str, data: &T) -> anyhow::Result<()> -where - T: serde::Serialize, -{ - let cache_path = Path::new(path); - let cache_dir = cache_path.parent().context("Invalid path")?; - - tokio_fs::create_dir_all(cache_dir).await?; - let json_str = serde_json::to_string_pretty(data)?; - tokio_fs::write(cache_path, json_str).await?; - - Ok(()) } async fn load_market_sectors() -> anyhow::Result> { @@ -771,57 +1323,6 @@ pub async fn get_mapping_stats( }) } -/// Print mapping statistics to console and logs -pub async fn print_mapping_stats(csv_path: &str) -> anyhow::Result<()> { - logger::log_info("=== LEI-FIGI Mapping Status ===").await; - - let stats = get_mapping_stats(csv_path, None).await?; - - logger::log_info(&format!( - "Total LEIs: {}", - stats.total_leis - )).await; - - logger::log_info(&format!( - "├─ Mapped (with FIGI): {} ({:.2}%)", - stats.mapped_leis, - stats.mapping_percentage - )).await; - - logger::log_info(&format!( - "├─ No Results (queried, no FIGI): {} ({:.2}%)", - stats.no_result_leis, - (stats.no_result_leis as f64 / stats.total_leis as f64) * 100.0 - )).await; - - logger::log_info(&format!( - "└─ Not Queried Yet: {} ({:.2}%)", - stats.unqueried_leis, - (stats.unqueried_leis as f64 / stats.total_leis as f64) * 100.0 - )).await; - - logger::log_info(&format!( - "\nQuery Coverage: {:.2}% ({} / {})", - stats.queried_percentage, - stats.mapped_leis + stats.no_result_leis, - stats.total_leis - )).await; - - if !stats.by_sector.is_empty() { - logger::log_info("\nMapped LEIs by sector:").await; - let mut sectors: Vec<_> = stats.by_sector.iter().collect(); - sectors.sort_by(|a, b| b.1.cmp(a.1)); // Sort by count descending - - for (sector, count) in sectors { - logger::log_info(&format!(" {}: {}", sector, count)).await; - } - } - - logger::log_info("==============================").await; - - Ok(()) -} - /// Quick check if mapping is complete (returns true if all mapped) pub async fn is_mapping_complete(csv_path: &str) -> anyhow::Result { let dir = DataPaths::new(".")?; diff --git a/src/corporate/update.rs b/src/corporate/update.rs index efa45e3..3d0fe24 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -66,7 +66,7 @@ pub async fn run_full_update( if let Some(date_dir) = date_dir { logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; - build_securities_from_figi_streaming(&date_dir).await?; + load_or_build_all_securities(&date_dir).await?; logger::log_info(" ✓ Securities map updated").await; } else { logger::log_warn(" ✗ No FIGI data directory found").await; @@ -88,6 +88,7 @@ pub async fn run_full_update( logger::log_info("Step 6: Cleansing up companies with missing essential data...").await; let cleansed_count = companies_yahoo_jsonl(&paths).await?; + logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await; if !shutdown_flag.load(Ordering::SeqCst) { logger::log_info("Step 7: Processing events (using index)...").await; @@ -101,20 +102,24 @@ pub async fn run_full_update( Ok(()) } + /// Cleansing function to remove companies with missing essential yahoo data for integrity -/// Has to contain a ticker with 'YAHOO:'; Entries with 'YAHOO:NO_RESULTS' are removed +/// Has to contain a ticker with 'YAHOO:'; Entries with 'YAHOO:NO_RESULTS' and 'YAHOO:ERROR' are removed /// The rest stays unchanged /// -/// The '.jsonl' will be saved in the same path but 'companies_filtered.jsonl' +/// Uses state.jsonl to track completion and avoid re-running the cleansing operation +/// The '.jsonl' will be saved in the same path but 'companies_yahoo.jsonl' /// Only execute when 'companies.jsonl' is present pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { use tokio::fs::File; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + use serde_json::json; - let path = paths.base_dir(); + let data_path = paths.data_dir(); - let input_path = path.join("corporate").join("companies.jsonl"); - let output_path = path.join("corporate").join("companies_yahoo.jsonl"); + let input_path = data_path.join("companies.jsonl"); + let output_path = data_path.join("companies_yahoo.jsonl"); + let state_path = data_path.join("state.jsonl"); // Check if input file exists if !input_path.exists() { @@ -122,6 +127,37 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { return Ok(0); } + // Check if state file exists and cleansing was already completed + if state_path.exists() { + let state_content = tokio::fs::read_to_string(&state_path).await?; + + for line in state_content.lines() { + if line.trim().is_empty() { + continue; + } + + if let Ok(state) = serde_json::from_str::(line) { + if state.get("yahoo_companies").and_then(|v| v.as_bool()).unwrap_or(false) { + logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await; + + // Count lines in existing output file + if output_path.exists() { + let output_content = tokio::fs::read_to_string(&output_path).await?; + let count = output_content.lines() + .filter(|line| !line.trim().is_empty()) + .count(); + + logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await; + return Ok(count); + } else { + logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await; + break; + } + } + } + } + } + logger::log_info(&format!(" Reading from: {:?}", input_path)).await; logger::log_info(&format!(" Writing to: {:?}", output_path)).await; @@ -150,11 +186,15 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { }; // Check if company has at least one valid YAHOO ticker - // Valid means: starts with "YAHOO:" but is NOT "YAHOO:NO_RESULTS" + // Valid means: starts with "YAHOO:" but is NOT "YAHOO:NO_RESULTS" or "YAHOO:ERROR" let has_valid_yahoo = company.isin_tickers_map .values() .flatten() - .any(|ticker| ticker.starts_with("YAHOO:") && ticker != "YAHOO:NO_RESULTS"); + .any(|ticker| { + ticker.starts_with("YAHOO:") + && ticker != "YAHOO:NO_RESULTS" + && ticker != "YAHOO:ERROR" + }); if has_valid_yahoo { // Write the company to the filtered output @@ -183,6 +223,20 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result { total_count, valid_count, removed_count )).await; + // Write state file to mark completion + let yahoo_companies = json!({ + "yahoo_companies": true, + "completed_at": chrono::Utc::now().to_rfc3339(), + }); + + let mut state_file = File::create(&state_path).await?; + let state_line = serde_json::to_string(&yahoo_companies)?; + state_file.write_all(state_line.as_bytes()).await?; + state_file.write_all(b"\n").await?; + state_file.flush().await?; + + logger::log_info(&format!(" ✓ State file created at: {:?}", state_path)).await; + Ok(valid_count) } diff --git a/src/corporate/update_parallel.rs b/src/corporate/update_parallel.rs index 100ea5c..184ab34 100644 --- a/src/corporate/update_parallel.rs +++ b/src/corporate/update_parallel.rs @@ -63,8 +63,8 @@ fn company_needs_processing( // Check if this ISIN has valid Yahoo data let has_valid_yahoo = tickers.iter().any(|t| { t.starts_with("YAHOO:") && - t != "YAHOO:ERROR" && // Error marker means needs retry - t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found) + t != "YAHOO:ERROR" //&& // Error marker means needs retry + //t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found) }); // If no valid Yahoo data for this ISIN, company needs processing @@ -95,9 +95,6 @@ pub async fn build_companies_jsonl_streaming_parallel( const FSYNC_INTERVAL_SECS: u64 = 10; const CONCURRENCY_LIMIT: usize = 100; - // Create hard reset controller - let reset_controller = pool.get_reset_controller(); - // Wrap pool in mutex for potential replacement let pool_mutex = Arc::new(tokio::sync::Mutex::new(Arc::clone(pool))); @@ -106,15 +103,18 @@ pub async fn build_companies_jsonl_streaming_parallel( let path = DataPaths::new(".")?; let corporate_path = path.data_dir().join("corporate").join("by_name"); - let securities_path = corporate_path.join("common_stocks.json"); + let securities_checkpoint = corporate_path.join("common_stocks.jsonl"); + let securities_log = corporate_path.join("common_stocks.log.jsonl"); - if !securities_path.exists() { - logger::log_warn("No common_stocks.json found").await; + if !securities_checkpoint.exists() { + logger::log_warn("No common_stocks.jsonl found").await; return Ok(0); } - let content = tokio::fs::read_to_string(&securities_path).await?; - let securities: HashMap = serde_json::from_str(&content)?; + // Load securities from checkpoint and replay log + logger::log_info("Loading common stocks from JSONL checkpoint and log...").await; + let securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?; + logger::log_info(&format!("Loaded {} companies from common stocks", securities.len())).await; let companies_path = paths.data_dir().join("companies.jsonl"); let log_path = paths.data_dir().join("companies_updates.log"); @@ -132,8 +132,8 @@ pub async fn build_companies_jsonl_streaming_parallel( let existing_content = tokio::fs::read_to_string(&companies_path).await?; for line in existing_content.lines() { - if line.trim().is_empty() { - continue; + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines } match serde_json::from_str::(line) { @@ -155,8 +155,8 @@ pub async fn build_companies_jsonl_streaming_parallel( let mut replayed = 0; for line in log_content.lines() { - if line.trim().is_empty() { - continue; + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines } match serde_json::from_str::(line) { @@ -453,8 +453,6 @@ pub async fn build_companies_jsonl_streaming_parallel( let error_msg = e.to_string(); if error_msg.contains("HARD_RESET_REQUIRED") { - // Don't break, perform actual hard reset - // Check if reset already in progress (race condition protection) let mut reset_lock = reset_in_progress.lock().await; if *reset_lock { @@ -512,9 +510,10 @@ pub async fn build_companies_jsonl_streaming_parallel( companies.clone() }; - // Reload all securities from disk - let content = tokio::fs::read_to_string(&securities_path).await?; - let all_securities: HashMap = serde_json::from_str(&content)?; + // Reload all securities from disk (checkpoint + log) + logger::log_info("Reloading securities from JSONL...").await; + let all_securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?; + logger::log_info(&format!("Reloaded {} companies", all_securities.len())).await; // Build pending list: only companies that need processing pending = all_securities.iter() @@ -664,6 +663,62 @@ pub async fn build_companies_jsonl_streaming_parallel( Ok(final_count) } +/// Loads CompanyInfo securities from checkpoint and log JSONL files +async fn load_securities_from_jsonl( + checkpoint_path: &std::path::Path, + log_path: &std::path::Path, +) -> anyhow::Result> { + let mut securities: HashMap = HashMap::new(); + + // Load checkpoint + if checkpoint_path.exists() { + let content = tokio::fs::read_to_string(checkpoint_path).await?; + + for (line_num, line) in content.lines().enumerate() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(company_info) => { + securities.insert(company_info.name.clone(), company_info); + } + Err(e) => { + logger::log_warn(&format!( + "Skipping invalid line {} in checkpoint: {}", + line_num + 1, e + )).await; + } + } + } + } + + // Replay log (overwrites checkpoint entries if they exist) + if log_path.exists() { + let content = tokio::fs::read_to_string(log_path).await?; + + for (line_num, line) in content.lines().enumerate() { + if line.trim().is_empty() || !line.ends_with('}') { + continue; // Skip incomplete lines + } + + match serde_json::from_str::(line) { + Ok(company_info) => { + securities.insert(company_info.name.clone(), company_info); + } + Err(e) => { + logger::log_warn(&format!( + "Skipping invalid line {} in log: {}", + line_num + 1, e + )).await; + } + } + } + } + + Ok(securities) +} + /// Scrape with retry, validation, and shutdown awareness async fn scrape_with_retry( pool: &Arc, @@ -792,6 +847,7 @@ async fn process_single_company_validated( if !has_valid_yahoo { logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; + tickers.retain(|t| !t.starts_with("YAHOO:")); match scrape_with_retry(pool, &isin, 3, shutdown_flag).await { Ok(Some(details)) => {