cleaned yahoo hits

This commit is contained in:
2025-12-24 00:00:21 +01:00
parent f9f09d0291
commit 86944a9c58
4 changed files with 829 additions and 217 deletions

1
.gitignore vendored
View File

@@ -35,6 +35,7 @@ target/
**/*.log
**/*.ovpn
**/*.tmp
**/*.txt
#/economic_events*
#/economic_event_changes*

View File

@@ -223,58 +223,6 @@ async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) ->
Ok(())
}
/// STREAMING: Build securities without loading everything into memory
pub async fn build_securities_from_figi_streaming(
date_dir: &Path,
) -> anyhow::Result<()> {
logger::log_info("Building securities (streaming mode)...").await;
// Load existing incrementally
let mut commons = load_from_cache_if_exists::<HashMap<String, CompanyInfo>>(
"data/corporate/by_name/common_stocks.json"
).await?;
let equity_file = date_dir.join("Equity").join("lei_to_figi.jsonl");
if !equity_file.exists() {
logger::log_warn("No Equity FIGI file found").await;
return Ok(());
}
let content = tokio_fs::read_to_string(&equity_file).await?;
let mut processed = 0;
let mut stats = ProcessingStats::new(commons.len(), 0, 0);
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
let entry: Value = serde_json::from_str(line)?;
let figi_infos: Vec<FigiInfo> = serde_json::from_value(entry["figis"].clone())?;
// Process only common stocks
let common_stocks: Vec<_> = figi_infos.iter()
.filter(|f| f.security_type == "Common Stock")
.cloned()
.collect();
if !common_stocks.is_empty() {
process_common_stocks(&mut commons, &common_stocks, &mut stats);
}
processed += 1;
if processed % 100 == 0 {
tokio::task::yield_now().await;
}
}
logger::log_info(&format!("Processed {} FIGI entries", processed)).await;
save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?;
Ok(())
}
/// Handles rate limit responses from the OpenFIGI API.
///
/// If a 429 status is received, this function sleeps for the duration specified
@@ -310,56 +258,599 @@ async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> {
Ok(())
}
fn process_common_stocks(
companies: &mut HashMap<String, CompanyInfo>,
/// Loads or builds securities data by streaming through FIGI mapping files.
///
/// Implements abort-safe incremental persistence with checkpoints and replay logs.
///
/// # Arguments
/// * `date_dir` - Path to the date-specific mapping directory (e.g., cache/gleif_openfigi_map/24112025/)
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if file I/O fails or JSON parsing fails.
pub async fn load_or_build_all_securities(date_dir: &Path) -> anyhow::Result<()> {
logger::log_info("Building securities data from FIGI mappings...").await;
let dir = DataPaths::new(".")?;
let data_dir = dir.data_dir();
let corporate_data_dir = data_dir.join("corporate");
let output_dir = corporate_data_dir.join("by_name");
tokio_fs::create_dir_all(&output_dir).await
.context("Failed to create corporate/by_name directory")?;
// Setup checkpoint and log paths for each security type
let common_checkpoint = output_dir.join("common_stocks.jsonl");
let common_log = output_dir.join("common_stocks.log.jsonl");
let warrants_checkpoint = output_dir.join("warrants.jsonl");
let warrants_log = output_dir.join("warrants.log.jsonl");
let options_checkpoint = output_dir.join("options.jsonl");
let options_log = output_dir.join("options.log.jsonl");
// Track which sectors have been fully processed
let processed_sectors_file = output_dir.join("state.jsonl");
let processed_sectors = load_processed_sectors(&processed_sectors_file).await?;
logger::log_info(&format!(" Already processed {} sectors", processed_sectors.len())).await;
// Collect sectors to process
let mut sectors_to_process = Vec::new();
let mut entries = tokio_fs::read_dir(date_dir).await
.context("Failed to read date directory")?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if !path.is_dir() {
continue;
}
let sector_name = path.file_name()
.and_then(|n| n.to_str())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string());
let lei_figi_file = path.join("lei_to_figi.jsonl");
if !lei_figi_file.exists() {
continue;
}
// Skip if already processed
if processed_sectors.contains(&sector_name) {
logger::log_info(&format!(" Skipping already processed sector: {}", sector_name)).await;
continue;
}
sectors_to_process.push((sector_name, lei_figi_file));
}
if sectors_to_process.is_empty() {
logger::log_info(" All sectors already processed, nothing to do").await;
return Ok(());
}
// Load checkpoints and replay logs - these are MUTABLE now
let mut existing_companies = load_checkpoint_and_replay(&common_checkpoint, &common_log, "name").await?;
let mut existing_warrants = load_checkpoint_and_replay_nested(&warrants_checkpoint, &warrants_log).await?;
let mut existing_options = load_checkpoint_and_replay_nested(&options_checkpoint, &options_log).await?;
logger::log_info(&format!(" Existing entries - Companies: {}, Warrants: {}, Options: {}",
existing_companies.len(), existing_warrants.len(), existing_options.len())).await;
// Process statistics
let mut stats = StreamingStats::new(
existing_companies.len(),
existing_warrants.len(),
existing_options.len()
);
logger::log_info(&format!(" Found {} sectors to process", sectors_to_process.len())).await;
// Process each sector
let mut newly_processed_sectors = Vec::new();
for (sector_name, lei_figi_file) in sectors_to_process {
logger::log_info(&format!(" Processing sector: {}", sector_name)).await;
// Stream through the lei_to_figi.jsonl file with batched writes
process_lei_figi_file_batched(
&lei_figi_file,
&common_log,
&warrants_log,
&options_log,
&mut existing_companies,
&mut existing_warrants,
&mut existing_options,
&mut stats,
).await?;
// Mark sector as processed
newly_processed_sectors.push(sector_name.clone());
// Append to processed sectors file immediately for crash safety
append_processed_sector(&processed_sectors_file, &sector_name).await?;
}
// Create checkpoints after all processing
if !newly_processed_sectors.is_empty() {
logger::log_info("Creating checkpoints...").await;
create_checkpoint(&common_checkpoint, &common_log).await?;
create_checkpoint(&warrants_checkpoint, &warrants_log).await?;
create_checkpoint(&options_checkpoint, &options_log).await?;
}
stats.print_summary();
logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await;
Ok(())
}
/// Loads the list of sectors that have been fully processed
async fn load_processed_sectors(path: &Path) -> anyhow::Result<HashSet<String>> {
let mut sectors = HashSet::new();
if !path.exists() {
return Ok(sectors);
}
let content = tokio_fs::read_to_string(path).await
.context("Failed to read processed sectors file")?;
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
match serde_json::from_str::<Value>(line) {
Ok(entry) => {
if let Some(sector) = entry["sector"].as_str() {
sectors.insert(sector.to_string());
}
}
Err(e) => {
logger::log_warn(&format!(
"Skipping invalid processed sector line {}: {}",
line_num + 1, e
)).await;
}
}
}
Ok(sectors)
}
/// Appends a sector name to the processed sectors file with fsync
/// Appends a sector name to the processed sectors JSONL file with fsync
async fn append_processed_sector(path: &Path, sector_name: &str) -> anyhow::Result<()> {
use std::fs::OpenOptions;
use std::io::Write;
let entry = json!({
"sector": sector_name,
"completed_at": chrono::Utc::now().to_rfc3339(),
});
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.context("Failed to open processed sectors file")?;
let line = serde_json::to_string(&entry)
.context("Failed to serialize sector entry")? + "\n";
file.write_all(line.as_bytes())?;
// Ensure durability
file.sync_data()
.context("Failed to fsync processed sectors file")?;
Ok(())
}
/// Loads checkpoint and replays log, returning set of existing keys
async fn load_checkpoint_and_replay(
checkpoint_path: &Path,
log_path: &Path,
key_field: &str,
) -> anyhow::Result<HashSet<String>> {
let mut keys = HashSet::new();
// Load checkpoint if it exists
if checkpoint_path.exists() {
let content = tokio_fs::read_to_string(checkpoint_path).await
.context("Failed to read checkpoint")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(key) = entry[key_field].as_str() {
keys.insert(key.to_string());
}
}
}
}
// Replay log if it exists
if log_path.exists() {
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(key) = entry[key_field].as_str() {
keys.insert(key.to_string());
}
}
}
}
Ok(keys)
}
/// Loads checkpoint and replays log for nested structures (warrants/options)
async fn load_checkpoint_and_replay_nested(
checkpoint_path: &Path,
log_path: &Path,
) -> anyhow::Result<HashSet<String>> {
let mut keys = HashSet::new();
// Load checkpoint if it exists
if checkpoint_path.exists() {
let content = tokio_fs::read_to_string(checkpoint_path).await
.context("Failed to read checkpoint")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
let type_field = if entry.get("warrant_type").is_some() {
entry["warrant_type"].as_str().unwrap_or("")
} else {
entry["option_type"].as_str().unwrap_or("")
};
if !underlying.is_empty() && !type_field.is_empty() {
keys.insert(format!("{}::{}", underlying, type_field));
}
}
}
}
// Replay log if it exists
if log_path.exists() {
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
let type_field = if entry.get("warrant_type").is_some() {
entry["warrant_type"].as_str().unwrap_or("")
} else {
entry["option_type"].as_str().unwrap_or("")
};
if !underlying.is_empty() && !type_field.is_empty() {
keys.insert(format!("{}::{}", underlying, type_field));
}
}
}
}
Ok(keys)
}
/// Creates a checkpoint by copying log to checkpoint atomically
async fn create_checkpoint(checkpoint_path: &Path, log_path: &Path) -> anyhow::Result<()> {
if !log_path.exists() {
return Ok(());
}
// Read all committed lines from log
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log for checkpoint")?;
let committed_lines: Vec<&str> = content
.lines()
.filter(|line| !line.trim().is_empty() && line.ends_with('}'))
.collect();
if committed_lines.is_empty() {
return Ok(());
}
// Write to temporary file
let tmp_path = checkpoint_path.with_extension("tmp");
let mut tmp_file = std::fs::File::create(&tmp_path)
.context("Failed to create temp checkpoint")?;
for line in committed_lines {
use std::io::Write;
writeln!(tmp_file, "{}", line)?;
}
// Ensure data is flushed to disk
tmp_file.sync_data()
.context("Failed to sync temp checkpoint")?;
drop(tmp_file);
// Atomic rename
tokio_fs::rename(&tmp_path, checkpoint_path).await
.context("Failed to rename checkpoint")?;
// Clear log after successful checkpoint
tokio_fs::remove_file(log_path).await
.context("Failed to remove log after checkpoint")?;
Ok(())
}
/// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync
async fn process_lei_figi_file_batched(
input_path: &Path,
common_log_path: &Path,
warrants_log_path: &Path,
options_log_path: &Path,
existing_companies: &mut HashSet<String>,
existing_warrants: &mut HashSet<String>,
existing_options: &mut HashSet<String>,
stats: &mut StreamingStats,
) -> anyhow::Result<()> {
let content = tokio_fs::read_to_string(input_path).await
.context("Failed to read lei_to_figi.jsonl")?;
let batch_size = 100;
let mut processed_count = 0;
let mut common_batch = Vec::new();
let mut warrants_batch = Vec::new();
let mut options_batch = Vec::new();
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() {
continue;
}
let entry: Value = serde_json::from_str(line)
.context(format!("Failed to parse JSON on line {}", line_num + 1))?;
let figis: Vec<FigiInfo> = serde_json::from_value(entry["figis"].clone())
.context("Invalid 'figis' field")?;
if figis.is_empty() {
continue;
}
// Group by security type
let (common_stocks, warrant_securities, option_securities) =
group_by_security_type(&figis);
// Collect entries for batching and update existing keys
if !common_stocks.is_empty() {
if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) {
// Add to existing set immediately to prevent duplicates in same run
existing_companies.insert(entry.name.clone());
common_batch.push(entry);
}
}
if !warrant_securities.is_empty() {
for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) {
// Add to existing set immediately
let key = format!("{}::{}", entry.underlying_company_name, entry.warrant_type);
existing_warrants.insert(key);
warrants_batch.push(entry);
}
}
if !option_securities.is_empty() {
for entry in prepare_option_entries(&option_securities, existing_options) {
// Add to existing set immediately
let key = format!("{}::{}", entry.underlying_company_name, entry.option_type);
existing_options.insert(key);
options_batch.push(entry);
}
}
// Write batches when they reach size limit
if common_batch.len() >= batch_size {
write_batch_with_fsync(common_log_path, &common_batch).await?;
stats.companies_added += common_batch.len();
common_batch.clear();
}
if warrants_batch.len() >= batch_size {
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
stats.warrants_added += warrants_batch.len();
warrants_batch.clear();
}
if options_batch.len() >= batch_size {
write_batch_with_fsync(options_log_path, &options_batch).await?;
stats.options_added += options_batch.len();
options_batch.clear();
}
processed_count += 1;
if processed_count % 1000 == 0 {
logger::log_info(&format!(" Processed {} LEI entries...", processed_count)).await;
}
}
// Write remaining batches
if !common_batch.is_empty() {
write_batch_with_fsync(common_log_path, &common_batch).await?;
stats.companies_added += common_batch.len();
}
if !warrants_batch.is_empty() {
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
stats.warrants_added += warrants_batch.len();
}
if !options_batch.is_empty() {
write_batch_with_fsync(options_log_path, &options_batch).await?;
stats.options_added += options_batch.len();
}
Ok(())
}
/// Writes a batch of entries to log with fsync
async fn write_batch_with_fsync<T: serde::Serialize>(
log_path: &Path,
entries: &[T],
) -> anyhow::Result<()> {
use std::fs::OpenOptions;
use std::io::Write;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
.context("Failed to open log file")?;
for entry in entries {
let line = serde_json::to_string(entry)
.context("Failed to serialize entry")?;
writeln!(file, "{}", line)?;
}
// Critical: fsync to ensure durability
file.sync_data()
.context("Failed to fsync log file")?;
Ok(())
}
/// Prepares a common stock entry if it doesn't exist
fn prepare_common_stock_entry(
figi_infos: &[FigiInfo],
stats: &mut ProcessingStats,
) {
existing_keys: &HashSet<String>,
) -> Option<CompanyInfo> {
let name = figi_infos[0].name.clone();
if name.is_empty() {
return;
if name.is_empty() || existing_keys.contains(&name) {
return None;
}
let grouped_by_isin = group_by_isin(figi_infos);
if let Some(existing) = companies.get_mut(&name) {
let mut updated = false;
for (isin, new_figis) in grouped_by_isin {
if let Some(existing_figis) = existing.securities.get_mut(&isin) {
let merged = merge_figi_list(existing_figis, &new_figis);
if merged.len() > existing_figis.len() {
*existing_figis = merged;
updated = true;
}
} else {
existing.securities.insert(isin.clone(), new_figis);
updated = true;
}
}
if existing.primary_isin.is_empty() {
if let Some(first_isin) = existing.securities.keys().next() {
existing.primary_isin = first_isin.clone();
}
}
if updated {
stats.companies_updated += 1;
}
} else {
let grouped_by_isin = group_figis_by_isin(figi_infos);
let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default();
companies.insert(name.clone(), CompanyInfo {
Some(CompanyInfo {
name,
primary_isin,
securities: grouped_by_isin,
});
stats.companies_added += 1;
}
})
}
fn group_by_isin(figi_infos: &[FigiInfo]) -> HashMap<String, Vec<FigiInfo>> {
/// Prepares warrant entries for batching
fn prepare_warrant_entries(
warrant_securities: &[FigiInfo],
existing_keys: &HashSet<String>,
) -> Vec<WarrantInfo> {
let mut entries = Vec::new();
for figi in warrant_securities {
let (underlying, issuer, warrant_type) = parse_warrant_name(&figi.name);
if underlying.is_empty() {
continue;
}
let key = format!("{}::{}", underlying, warrant_type);
if existing_keys.contains(&key) {
continue;
}
let warrant_info = WarrantInfo {
underlying_company_name: underlying.clone(),
issuer_company_name: issuer,
warrant_type: warrant_type.clone(),
warrants: {
let mut map = HashMap::new();
map.insert(figi.isin.clone(), vec![figi.clone()]);
map
},
};
entries.push(warrant_info);
}
entries
}
/// Prepares option entries for batching
fn prepare_option_entries(
option_securities: &[FigiInfo],
existing_keys: &HashSet<String>,
) -> Vec<OptionInfo> {
let mut entries = Vec::new();
for figi in option_securities {
let (underlying, issuer, option_type) = parse_option_name(&figi.name);
if underlying.is_empty() {
continue;
}
let key = format!("{}::{}", underlying, option_type);
if existing_keys.contains(&key) {
continue;
}
let option_info = OptionInfo {
underlying_company_name: underlying.clone(),
issuer_company_name: issuer,
option_type: option_type.clone(),
options: {
let mut map = HashMap::new();
map.insert(figi.isin.clone(), vec![figi.clone()]);
map
},
};
entries.push(option_info);
}
entries
}
/// Groups FigiInfo list by security type
fn group_by_security_type(figis: &[FigiInfo]) -> (Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>) {
let mut common_stocks = Vec::new();
let mut warrants = Vec::new();
let mut options = Vec::new();
for figi in figis {
match figi.security_type.as_str() {
"Common Stock" => common_stocks.push(figi.clone()),
"Equity WRT" => warrants.push(figi.clone()),
"Equity Option" => options.push(figi.clone()),
_ => {}
}
}
(common_stocks, warrants, options)
}
/// Groups FigiInfo by ISIN
fn group_figis_by_isin(figi_infos: &[FigiInfo]) -> HashMap<String, Vec<FigiInfo>> {
let mut grouped: HashMap<String, Vec<FigiInfo>> = HashMap::new();
for figi_info in figi_infos {
@@ -375,65 +866,126 @@ fn group_by_isin(figi_infos: &[FigiInfo]) -> HashMap<String, Vec<FigiInfo>> {
grouped
}
fn merge_figi_list(existing: &[FigiInfo], new_figis: &[FigiInfo]) -> Vec<FigiInfo> {
let mut merged = existing.to_vec();
let existing_figis: HashSet<String> = existing.iter()
.map(|f| f.figi.clone())
.collect();
/// Parse warrant name to extract underlying company, issuer, and warrant type
///
/// Examples:
/// - "VONTOBE-PW26 LEONARDO SPA" -> ("LEONARDO SPA", Some("VONTOBEL"), "put")
/// - "BAYER H-CW25 L'OREAL" -> ("L'OREAL", Some("BAYER H"), "call")
/// - "APPLE INC WARRANT" -> ("APPLE INC", None, "unknown")
fn parse_warrant_name(name: &str) -> (String, Option<String>, String) {
let name_upper = name.to_uppercase();
for new_figi in new_figis {
if !existing_figis.contains(&new_figi.figi) {
merged.push(new_figi.clone());
}
// Try to detect warrant type from code (PW=put, CW=call)
let warrant_type = if name_upper.contains("-PW") || name_upper.contains(" PW") {
"put".to_string()
} else if name_upper.contains("-CW") || name_upper.contains(" CW") {
"call".to_string()
} else {
"unknown".to_string()
};
// Try to split by warrant code pattern (e.g., "-PW26", "-CW25")
if let Some(pos) = name.find("-PW") {
let before = name[..pos].trim();
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
let after = if after_idx < name.len() {
name[after_idx..].trim()
} else {
""
};
return (
after.to_string(),
if !before.is_empty() { Some(before.to_string()) } else { None },
warrant_type,
);
}
merged.sort_by(|a, b| a.figi.cmp(&b.figi));
merged
if let Some(pos) = name.find("-CW") {
let before = name[..pos].trim();
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
let after = if after_idx < name.len() {
name[after_idx..].trim()
} else {
""
};
return (
after.to_string(),
if !before.is_empty() { Some(before.to_string()) } else { None },
warrant_type,
);
}
// Fallback: return entire name as underlying
(name.to_string(), None, warrant_type)
}
/// Parse option name to extract underlying company, issuer, and option type
///
/// Examples:
/// - "December 25 Calls on ALPHA GA" -> ("ALPHA GA", None, "call")
/// - "January 26 Puts on TESLA INC" -> ("TESLA INC", None, "put")
fn parse_option_name(name: &str) -> (String, Option<String>, String) {
let name_upper = name.to_uppercase();
// Detect option type
let option_type = if name_upper.contains("CALL") {
"call".to_string()
} else if name_upper.contains("PUT") {
"put".to_string()
} else {
"unknown".to_string()
};
// Try to extract underlying after "on"
if let Some(pos) = name_upper.find(" ON ") {
let underlying = name[pos + 4..].trim().to_string();
return (underlying, None, option_type);
}
// Fallback: return entire name
(name.to_string(), None, option_type)
}
/// Statistics tracker for streaming processing
#[derive(Debug)]
struct ProcessingStats {
struct StreamingStats {
initial_companies: usize,
initial_warrants: usize,
initial_options: usize,
companies_added: usize,
companies_updated: usize,
warrants_added: usize,
options_added: usize,
}
impl ProcessingStats {
fn new(companies: usize, _warrants: usize, _options: usize) -> Self {
impl StreamingStats {
fn new(companies: usize, warrants: usize, options: usize) -> Self {
Self {
initial_companies: companies,
initial_warrants: warrants,
initial_options: options,
companies_added: 0,
companies_updated: 0,
warrants_added: 0,
options_added: 0,
}
}
}
async fn load_from_cache_if_exists<T>(path: &str) -> anyhow::Result<T>
where
T: serde::de::DeserializeOwned + Default,
{
let cache_file = Path::new(path);
if !cache_file.exists() {
return Ok(T::default());
fn print_summary(&self) {
println!("\n=== Processing Statistics ===");
println!("Companies:");
println!(" - Initial: {}", self.initial_companies);
println!(" - Added: {}", self.companies_added);
println!(" - Total: {}", self.initial_companies + self.companies_added);
println!("Warrants:");
println!(" - Initial: {}", self.initial_warrants);
println!(" - Added: {}", self.warrants_added);
println!(" - Total: {}", self.initial_warrants + self.warrants_added);
println!("Options:");
println!(" - Initial: {}", self.initial_options);
println!(" - Added: {}", self.options_added);
println!(" - Total: {}", self.initial_options + self.options_added);
}
let content = tokio_fs::read_to_string(cache_file).await?;
Ok(serde_json::from_str(&content)?)
}
async fn save_to_cache<T>(path: &str, data: &T) -> anyhow::Result<()>
where
T: serde::Serialize,
{
let cache_path = Path::new(path);
let cache_dir = cache_path.parent().context("Invalid path")?;
tokio_fs::create_dir_all(cache_dir).await?;
let json_str = serde_json::to_string_pretty(data)?;
tokio_fs::write(cache_path, json_str).await?;
Ok(())
}
async fn load_market_sectors() -> anyhow::Result<Vec<String>> {
@@ -771,57 +1323,6 @@ pub async fn get_mapping_stats(
})
}
/// Print mapping statistics to console and logs
pub async fn print_mapping_stats(csv_path: &str) -> anyhow::Result<()> {
logger::log_info("=== LEI-FIGI Mapping Status ===").await;
let stats = get_mapping_stats(csv_path, None).await?;
logger::log_info(&format!(
"Total LEIs: {}",
stats.total_leis
)).await;
logger::log_info(&format!(
"├─ Mapped (with FIGI): {} ({:.2}%)",
stats.mapped_leis,
stats.mapping_percentage
)).await;
logger::log_info(&format!(
"├─ No Results (queried, no FIGI): {} ({:.2}%)",
stats.no_result_leis,
(stats.no_result_leis as f64 / stats.total_leis as f64) * 100.0
)).await;
logger::log_info(&format!(
"└─ Not Queried Yet: {} ({:.2}%)",
stats.unqueried_leis,
(stats.unqueried_leis as f64 / stats.total_leis as f64) * 100.0
)).await;
logger::log_info(&format!(
"\nQuery Coverage: {:.2}% ({} / {})",
stats.queried_percentage,
stats.mapped_leis + stats.no_result_leis,
stats.total_leis
)).await;
if !stats.by_sector.is_empty() {
logger::log_info("\nMapped LEIs by sector:").await;
let mut sectors: Vec<_> = stats.by_sector.iter().collect();
sectors.sort_by(|a, b| b.1.cmp(a.1)); // Sort by count descending
for (sector, count) in sectors {
logger::log_info(&format!(" {}: {}", sector, count)).await;
}
}
logger::log_info("==============================").await;
Ok(())
}
/// Quick check if mapping is complete (returns true if all mapped)
pub async fn is_mapping_complete(csv_path: &str) -> anyhow::Result<bool> {
let dir = DataPaths::new(".")?;

View File

@@ -66,7 +66,7 @@ pub async fn run_full_update(
if let Some(date_dir) = date_dir {
logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await;
build_securities_from_figi_streaming(&date_dir).await?;
load_or_build_all_securities(&date_dir).await?;
logger::log_info(" ✓ Securities map updated").await;
} else {
logger::log_warn(" ✗ No FIGI data directory found").await;
@@ -88,6 +88,7 @@ pub async fn run_full_update(
logger::log_info("Step 6: Cleansing up companies with missing essential data...").await;
let cleansed_count = companies_yahoo_jsonl(&paths).await?;
logger::log_info(&format!("{} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await;
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 7: Processing events (using index)...").await;
@@ -101,20 +102,24 @@ pub async fn run_full_update(
Ok(())
}
/// Cleansing function to remove companies with missing essential yahoo data for integrity
/// Has to contain a ticker with 'YAHOO:'; Entries with 'YAHOO:NO_RESULTS' are removed
/// Has to contain a ticker with 'YAHOO:'; Entries with 'YAHOO:NO_RESULTS' and 'YAHOO:ERROR' are removed
/// The rest stays unchanged
///
/// The '.jsonl' will be saved in the same path but 'companies_filtered.jsonl'
/// Uses state.jsonl to track completion and avoid re-running the cleansing operation
/// The '.jsonl' will be saved in the same path but 'companies_yahoo.jsonl'
/// Only execute when 'companies.jsonl' is present
pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result<usize> {
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use serde_json::json;
let path = paths.base_dir();
let data_path = paths.data_dir();
let input_path = path.join("corporate").join("companies.jsonl");
let output_path = path.join("corporate").join("companies_yahoo.jsonl");
let input_path = data_path.join("companies.jsonl");
let output_path = data_path.join("companies_yahoo.jsonl");
let state_path = data_path.join("state.jsonl");
// Check if input file exists
if !input_path.exists() {
@@ -122,6 +127,37 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result<usize> {
return Ok(0);
}
// Check if state file exists and cleansing was already completed
if state_path.exists() {
let state_content = tokio::fs::read_to_string(&state_path).await?;
for line in state_content.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
if state.get("yahoo_companies").and_then(|v| v.as_bool()).unwrap_or(false) {
logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await;
// Count lines in existing output file
if output_path.exists() {
let output_content = tokio::fs::read_to_string(&output_path).await?;
let count = output_content.lines()
.filter(|line| !line.trim().is_empty())
.count();
logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await;
return Ok(count);
} else {
logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await;
break;
}
}
}
}
}
logger::log_info(&format!(" Reading from: {:?}", input_path)).await;
logger::log_info(&format!(" Writing to: {:?}", output_path)).await;
@@ -150,11 +186,15 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result<usize> {
};
// Check if company has at least one valid YAHOO ticker
// Valid means: starts with "YAHOO:" but is NOT "YAHOO:NO_RESULTS"
// Valid means: starts with "YAHOO:" but is NOT "YAHOO:NO_RESULTS" or "YAHOO:ERROR"
let has_valid_yahoo = company.isin_tickers_map
.values()
.flatten()
.any(|ticker| ticker.starts_with("YAHOO:") && ticker != "YAHOO:NO_RESULTS");
.any(|ticker| {
ticker.starts_with("YAHOO:")
&& ticker != "YAHOO:NO_RESULTS"
&& ticker != "YAHOO:ERROR"
});
if has_valid_yahoo {
// Write the company to the filtered output
@@ -183,6 +223,20 @@ pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result<usize> {
total_count, valid_count, removed_count
)).await;
// Write state file to mark completion
let yahoo_companies = json!({
"yahoo_companies": true,
"completed_at": chrono::Utc::now().to_rfc3339(),
});
let mut state_file = File::create(&state_path).await?;
let state_line = serde_json::to_string(&yahoo_companies)?;
state_file.write_all(state_line.as_bytes()).await?;
state_file.write_all(b"\n").await?;
state_file.flush().await?;
logger::log_info(&format!(" ✓ State file created at: {:?}", state_path)).await;
Ok(valid_count)
}

View File

@@ -63,8 +63,8 @@ fn company_needs_processing(
// Check if this ISIN has valid Yahoo data
let has_valid_yahoo = tickers.iter().any(|t| {
t.starts_with("YAHOO:") &&
t != "YAHOO:ERROR" && // Error marker means needs retry
t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found)
t != "YAHOO:ERROR" //&& // Error marker means needs retry
//t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found)
});
// If no valid Yahoo data for this ISIN, company needs processing
@@ -95,9 +95,6 @@ pub async fn build_companies_jsonl_streaming_parallel(
const FSYNC_INTERVAL_SECS: u64 = 10;
const CONCURRENCY_LIMIT: usize = 100;
// Create hard reset controller
let reset_controller = pool.get_reset_controller();
// Wrap pool in mutex for potential replacement
let pool_mutex = Arc::new(tokio::sync::Mutex::new(Arc::clone(pool)));
@@ -106,15 +103,18 @@ pub async fn build_companies_jsonl_streaming_parallel(
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
let securities_path = corporate_path.join("common_stocks.json");
let securities_checkpoint = corporate_path.join("common_stocks.jsonl");
let securities_log = corporate_path.join("common_stocks.log.jsonl");
if !securities_path.exists() {
logger::log_warn("No common_stocks.json found").await;
if !securities_checkpoint.exists() {
logger::log_warn("No common_stocks.jsonl found").await;
return Ok(0);
}
let content = tokio::fs::read_to_string(&securities_path).await?;
let securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
// Load securities from checkpoint and replay log
logger::log_info("Loading common stocks from JSONL checkpoint and log...").await;
let securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?;
logger::log_info(&format!("Loaded {} companies from common stocks", securities.len())).await;
let companies_path = paths.data_dir().join("companies.jsonl");
let log_path = paths.data_dir().join("companies_updates.log");
@@ -132,8 +132,8 @@ pub async fn build_companies_jsonl_streaming_parallel(
let existing_content = tokio::fs::read_to_string(&companies_path).await?;
for line in existing_content.lines() {
if line.trim().is_empty() {
continue;
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
@@ -155,8 +155,8 @@ pub async fn build_companies_jsonl_streaming_parallel(
let mut replayed = 0;
for line in log_content.lines() {
if line.trim().is_empty() {
continue;
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
@@ -453,8 +453,6 @@ pub async fn build_companies_jsonl_streaming_parallel(
let error_msg = e.to_string();
if error_msg.contains("HARD_RESET_REQUIRED") {
// Don't break, perform actual hard reset
// Check if reset already in progress (race condition protection)
let mut reset_lock = reset_in_progress.lock().await;
if *reset_lock {
@@ -512,9 +510,10 @@ pub async fn build_companies_jsonl_streaming_parallel(
companies.clone()
};
// Reload all securities from disk
let content = tokio::fs::read_to_string(&securities_path).await?;
let all_securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
// Reload all securities from disk (checkpoint + log)
logger::log_info("Reloading securities from JSONL...").await;
let all_securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?;
logger::log_info(&format!("Reloaded {} companies", all_securities.len())).await;
// Build pending list: only companies that need processing
pending = all_securities.iter()
@@ -664,6 +663,62 @@ pub async fn build_companies_jsonl_streaming_parallel(
Ok(final_count)
}
/// Loads CompanyInfo securities from checkpoint and log JSONL files
async fn load_securities_from_jsonl(
checkpoint_path: &std::path::Path,
log_path: &std::path::Path,
) -> anyhow::Result<HashMap<String, CompanyInfo>> {
let mut securities: HashMap<String, CompanyInfo> = HashMap::new();
// Load checkpoint
if checkpoint_path.exists() {
let content = tokio::fs::read_to_string(checkpoint_path).await?;
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
match serde_json::from_str::<CompanyInfo>(line) {
Ok(company_info) => {
securities.insert(company_info.name.clone(), company_info);
}
Err(e) => {
logger::log_warn(&format!(
"Skipping invalid line {} in checkpoint: {}",
line_num + 1, e
)).await;
}
}
}
}
// Replay log (overwrites checkpoint entries if they exist)
if log_path.exists() {
let content = tokio::fs::read_to_string(log_path).await?;
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
match serde_json::from_str::<CompanyInfo>(line) {
Ok(company_info) => {
securities.insert(company_info.name.clone(), company_info);
}
Err(e) => {
logger::log_warn(&format!(
"Skipping invalid line {} in log: {}",
line_num + 1, e
)).await;
}
}
}
}
Ok(securities)
}
/// Scrape with retry, validation, and shutdown awareness
async fn scrape_with_retry(
pool: &Arc<ChromeDriverPool>,
@@ -792,6 +847,7 @@ async fn process_single_company_validated(
if !has_valid_yahoo {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
tickers.retain(|t| !t.starts_with("YAHOO:"));
match scrape_with_retry(pool, &isin, 3, shutdown_flag).await {
Ok(Some(details)) => {