added update_rule for incremental change

This commit is contained in:
2025-12-15 23:47:28 +01:00
parent d744769138
commit d26e833d93
10 changed files with 566 additions and 241 deletions

View File

@@ -1,4 +1,4 @@
// src/corporate/update.rs - COMPLETE STREAMING VERSION
// src/corporate/update.rs - ABORT-SAFE VERSION WITH JSONL LOG
use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*};
use crate::config::Config;
@@ -9,14 +9,17 @@ use crate::scraper::webdriver::ChromeDriverPool;
use chrono::Local;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
/// Main update function - fully streaming, minimal memory usage
pub async fn run_full_update(_config: &Config, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<()> {
pub async fn run_full_update(
_config: &Config,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<()> {
logger::log_info("=== Corporate Update (STREAMING MODE) ===").await;
let paths = DataPaths::new(".")?;
// Step 1: Download GLEIF CSV (don't load into memory)
logger::log_info("Step 1: Downloading GLEIF CSV...").await;
let gleif_csv_path = match download_isin_lei_csv().await? {
Some(p) => {
@@ -29,14 +32,19 @@ pub async fn run_full_update(_config: &Config, pool: &Arc<ChromeDriverPool>) ->
}
};
// Step 2: Load OpenFIGI type lists (small, cached)
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(());
}
logger::log_info("Step 2: Loading OpenFIGI metadata...").await;
load_figi_type_lists().await.ok();
logger::log_info(" ✓ OpenFIGI metadata loaded").await;
// Step 3: Check mapping status and process only unmapped LEIs
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(());
}
logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await;
let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?;
if !all_mapped {
@@ -45,7 +53,10 @@ pub async fn run_full_update(_config: &Config, pool: &Arc<ChromeDriverPool>) ->
logger::log_info(" ✓ All LEIs successfully mapped").await;
}
// Step 4: Build securities from FIGI data (streaming)
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(());
}
logger::log_info("Step 4: Building securities map (streaming)...").await;
let date_dir = find_most_recent_figi_date_dir(&paths).await?;
@@ -57,22 +68,37 @@ pub async fn run_full_update(_config: &Config, pool: &Arc<ChromeDriverPool>) ->
logger::log_warn(" ✗ No FIGI data directory found").await;
}
// Step 5: Build companies JSONL (streaming from securities)
logger::log_info("Step 5: Building companies.jsonl (streaming)...").await;
let count = build_companies_jsonl_streaming(&paths, pool).await?;
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(());
}
logger::log_info("Step 5: Building companies.jsonl (streaming with abort-safe persistence)...").await;
let count = build_companies_jsonl_streaming(&paths, pool, shutdown_flag).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
// Step 6: Process events (using index, not full load)
logger::log_info("Step 6: Processing events (using index)...").await;
let _event_index = build_event_index(&paths).await?;
logger::log_info(" ✓ Event index built").await;
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 6: Processing events (using index)...").await;
let _event_index = build_event_index(&paths).await?;
logger::log_info(" ✓ Event index built").await;
}
logger::log_info("✓ Corporate update complete").await;
Ok(())
}
/// Stream companies.jsonl creation from securities cache - INCREMENTAL MODE
async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc<ChromeDriverPool>) -> anyhow::Result<usize> {
/// Abort-safe incremental JSONL persistence with atomic checkpoints
///
/// Implements the data_updating_rule.md specification:
/// - Append-only JSONL log for all updates
/// - fsync after each write batch
/// - Atomic checkpoints via temp file + rename
/// - Crash recovery by loading checkpoint + replaying log
/// - Partial lines ignored during recovery
async fn build_companies_jsonl_streaming(
paths: &DataPaths,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<usize> {
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
let securities_path = corporate_path.join("common_stocks.json");
@@ -82,59 +108,116 @@ async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc<ChromeDri
return Ok(0);
}
// Load securities
let content = tokio::fs::read_to_string(securities_path).await?;
let securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
let companies_path = paths.data_dir().join("companies.jsonl");
let log_path = paths.data_dir().join("companies_updates.log");
if let Some(parent) = companies_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
// Load existing companies into a map
// === RECOVERY PHASE 1: Load last checkpoint ===
let mut existing_companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new();
let mut processed_names: std::collections::HashSet<String> = std::collections::HashSet::new();
if companies_path.exists() {
logger::log_info("Loading existing companies.jsonl...").await;
logger::log_info("Loading checkpoint from companies.jsonl...").await;
let existing_content = tokio::fs::read_to_string(&companies_path).await?;
for line in existing_content.lines() {
if line.trim().is_empty() {
continue;
}
// Only process complete lines (ending with proper JSON closing brace)
// This ensures we don't process partial writes from crashed processes
if !line.ends_with('}') {
logger::log_warn(&format!("Skipping incomplete checkpoint line: {}", &line[..line.len().min(50)])).await;
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company);
}
Err(e) => {
logger::log_warn(&format!("Failed to parse existing company line: {}", e)).await;
logger::log_warn(&format!("Failed to parse checkpoint line: {}", e)).await;
}
}
}
logger::log_info(&format!("Loaded {} existing companies", existing_companies.len())).await;
logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await;
}
// Create temporary file for atomic write
let temp_path = companies_path.with_extension("jsonl.tmp");
let mut file = tokio::fs::File::create(&temp_path).await?;
let mut count = 0;
// === RECOVERY PHASE 2: Replay log after checkpoint ===
if log_path.exists() {
logger::log_info("Replaying update log...").await;
let log_content = tokio::fs::read_to_string(&log_path).await?;
let mut replayed = 0;
for line in log_content.lines() {
if line.trim().is_empty() {
continue;
}
// Only replay complete lines (crash-safe: incomplete lines are ignored)
// A line is considered complete only if it ends with '\n' and valid JSON
if !line.ends_with('}') {
logger::log_warn(&format!("Skipping incomplete log line: {}", &line[..line.len().min(50)])).await;
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company);
replayed += 1;
}
Err(e) => {
logger::log_warn(&format!("Failed to parse log line: {}", e)).await;
}
}
}
if replayed > 0 {
logger::log_info(&format!("Replayed {} updates from log", replayed)).await;
}
}
// === APPEND-ONLY LOG: Open in append mode with O_APPEND semantics ===
use tokio::fs::OpenOptions;
let mut log_file = OpenOptions::new()
.create(true)
.append(true) // O_APPEND - atomic append operations
.open(&log_path)
.await?;
let mut count = existing_companies.len();
let mut updated_count = 0;
let mut new_count = 0;
let checkpoint_interval = 50; // Create atomic checkpoint every 50 updates
let mut updates_since_checkpoint = 0;
use tokio::io::AsyncWriteExt;
for (name, company_info) in securities.iter() {
// Check if we already have this company
let existing_entry = existing_companies.remove(name);
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Shutdown requested - stopping company processing").await;
break;
}
// Skip if already processed (from checkpoint or log replay)
if processed_names.contains(name) {
continue;
}
let existing_entry = existing_companies.get(name).cloned();
let is_update = existing_entry.is_some();
// Start with existing ISIN-ticker map or create new one
let mut isin_tickers_map: HashMap<String, Vec<String>> =
existing_entry
.map(|e| e.isin_tickers_map)
.as_ref()
.map(|e| e.isin_tickers_map.clone())
.unwrap_or_default();
// Step 1: Extract unique ISIN-ticker pairs from FigiInfo
let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone());
let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone());
let mut unique_isin_ticker_pairs: HashMap<String, Vec<String>> = HashMap::new();
for figi_infos in company_info.securities.values() {
@@ -144,7 +227,6 @@ async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc<ChromeDri
.entry(figi_info.isin.clone())
.or_insert_with(Vec::new);
// Add FIGI ticker if present and not duplicate
if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) {
tickers.push(figi_info.ticker.clone());
}
@@ -152,79 +234,129 @@ async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc<ChromeDri
}
}
// Step 2: Merge FIGI tickers into main map
for (isin, figi_tickers) in unique_isin_ticker_pairs {
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
let tickers = isin_tickers_map
.entry(isin.clone())
.or_insert_with(Vec::new);
// Add FIGI tickers that aren't already present
for figi_ticker in figi_tickers {
if !tickers.contains(&figi_ticker) {
tickers.push(figi_ticker);
}
}
// Step 3: Check if we need to fetch Yahoo ticker for this ISIN
let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:"));
if !has_yahoo_ticker {
logger::log_info(&format!("Fetching Yahoo ticker for {} (ISIN: {})", name, isin)).await;
let yahoo_result = scrape_ticker_by_isin(pool, &isin).await;
if !has_yahoo_ticker && !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
match yahoo_result {
Ok(result) => {
let log_msg = match &result {
YahooTickerResult::Found(ticker) =>
format!("✓ Found Yahoo ticker {} for ISIN {}", ticker, isin),
YahooTickerResult::NoResults =>
format!("○ No search results for ISIN {}", isin),
YahooTickerResult::NotFound =>
format!("○ Empty ticker result for ISIN {}", isin),
YahooTickerResult::AmbiguousResults =>
format!("⚠ Ambiguous results for ISIN {}", isin),
};
match scrape_company_details_by_isin(pool, &isin).await {
Ok(Some(details)) => {
logger::log_info(&format!("✓ Found Yahoo ticker {} for ISIN {}", details.ticker, isin)).await;
if result.is_found() {
logger::log_info(&log_msg).await;
} else {
logger::log_warn(&log_msg).await;
tickers.push(format!("YAHOO:{}", details.ticker));
if sector.is_none() && details.sector.is_some() {
sector = details.sector.clone();
logger::log_info(&format!(" Sector: {}", details.sector.as_ref().unwrap())).await;
}
tickers.push(result.to_tagged_string());
if exchange.is_none() && details.exchange.is_some() {
exchange = details.exchange.clone();
logger::log_info(&format!(" Exchange: {}", details.exchange.as_ref().unwrap())).await;
}
},
Ok(None) => {
logger::log_warn(&format!("◯ No search results for ISIN {}", isin)).await;
tickers.push("YAHOO:NO_RESULTS".to_string());
},
Err(e) => {
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
logger::log_warn(&format!("✗ Yahoo lookup error for ISIN {}: {}", isin, e)).await;
tickers.push("YAHOO:ERROR".to_string());
}
}
} else {
logger::log_warn(&format!("Skipping Yahoo lookup for {} ISIN {} - already has Yahoo data", name, isin)).await;
}
}
// Only write if we have ticker data
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
if !isin_tickers_map.is_empty() {
let company_entry = CompanyCrossPlatformInfo {
name: name.clone(),
isin_tickers_map,
sector,
exchange,
};
// === APPEND-ONLY: Write single-line JSON with fsync ===
// This guarantees the line is either fully written or not at all
let line = serde_json::to_string(&company_entry)?;
log_file.write_all(line.as_bytes()).await?;
log_file.write_all(b"\n").await?;
log_file.flush().await?;
file.write_all(line.as_bytes()).await?;
file.write_all(b"\n").await?;
// Critical: fsync to ensure durability before considering write successful
// This prevents data loss on power failure or kernel panic
log_file.sync_data().await?;
// Flush after each write for crash safety
file.flush().await?;
// Update in-memory state ONLY after successful fsync
processed_names.insert(name.clone());
existing_companies.insert(name.clone(), company_entry);
count += 1;
updates_since_checkpoint += 1;
if is_update {
updated_count += 1;
} else {
new_count += 1;
}
// === ATOMIC CHECKPOINT: Periodically create checkpoint ===
// This reduces recovery time by snapshotting current state
if updates_since_checkpoint >= checkpoint_interval {
logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await;
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
// Write all current state to temporary checkpoint file
for company in existing_companies.values() {
let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
}
checkpoint_file.flush().await?;
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
// Atomic rename - this is the commit point
// After this succeeds, the checkpoint is visible
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
// Clear log after successful checkpoint
// Any entries before this point are now captured in the checkpoint
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.await?;
updates_since_checkpoint = 0;
logger::log_info("✓ Checkpoint created and log cleared").await;
}
if count % 10 == 0 {
logger::log_info(&format!("Progress: {} companies ({} new, {} updated)", count, new_count, updated_count)).await;
tokio::task::yield_now().await;
@@ -232,29 +364,39 @@ async fn build_companies_jsonl_streaming(paths: &DataPaths, pool: &Arc<ChromeDri
}
}
// Write any remaining existing companies that weren't in securities
for (_name, company) in existing_companies {
let line = serde_json::to_string(&company)?;
file.write_all(line.as_bytes()).await?;
file.write_all(b"\n").await?;
file.flush().await?;
count += 1;
logger::log_warn(&format!("Preserved existing company: {}", _name)).await;
// === FINAL CHECKPOINT: Write complete final state ===
// This ensures we don't need to replay the log on next startup
if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 {
logger::log_info("Creating final checkpoint...").await;
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
for company in existing_companies.values() {
let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
}
checkpoint_file.flush().await?;
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
// Atomic rename makes final checkpoint visible
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
// Clean up log
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
logger::log_info("✓ Final checkpoint created").await;
}
// Ensure all data is written
file.sync_all().await?;
drop(file);
// Atomic rename: replace old file with new one
tokio::fs::rename(&temp_path, &companies_path).await?;
logger::log_info(&format!("✓ Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count)).await;
logger::log_info(&format!("Completed: {} total companies ({} new, {} updated)", count, new_count, updated_count)).await;
Ok(count)
}
/// Find most recent FIGI date directory
async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> {
let map_cache_dir = paths.cache_gleif_openfigi_map_dir();
@@ -284,8 +426,6 @@ async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Opt
Ok(Some(dates[0].1.clone()))
}
pub struct ProcessResult {
pub changes: Vec<CompanyEventChange>,
}