added working hard reset

This commit is contained in:
2025-12-23 15:07:40 +01:00
parent fb0876309f
commit f9f09d0291
5 changed files with 666 additions and 127 deletions

View File

@@ -1,5 +1,5 @@
// src/corporate/update.rs - UPDATED WITH DATA INTEGRITY FIXES
use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*};
use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*};
use crate::config::Config;
use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel;
use crate::util::directories::DataPaths;
@@ -11,7 +11,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
/// UPDATED: Main corporate update entry point with shutdown awareness
/// Main corporate update entry point with shutdown awareness
pub async fn run_full_update(
_config: &Config,
pool: &Arc<ChromeDriverPool>,
@@ -81,8 +81,16 @@ pub async fn run_full_update(
let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, _config, &None).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after companies.jsonl build").await;
return Ok(());
}
logger::log_info("Step 6: Cleansing up companies with missing essential data...").await;
let cleansed_count = companies_yahoo_jsonl(&paths).await?;
if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 6: Processing events (using index)...").await;
logger::log_info("Step 7: Processing events (using index)...").await;
let _event_index = build_event_index(&paths).await?;
logger::log_info(" ✓ Event index built").await;
} else {
@@ -93,6 +101,91 @@ pub async fn run_full_update(
Ok(())
}
/// Cleansing function to remove companies with missing essential yahoo data for integrity
/// Has to contain a ticker with 'YAHOO:'; Entries with 'YAHOO:NO_RESULTS' are removed
/// The rest stays unchanged
///
/// The '.jsonl' will be saved in the same path but 'companies_filtered.jsonl'
/// Only execute when 'companies.jsonl' is present
pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result<usize> {
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
let path = paths.base_dir();
let input_path = path.join("corporate").join("companies.jsonl");
let output_path = path.join("corporate").join("companies_yahoo.jsonl");
// Check if input file exists
if !input_path.exists() {
logger::log_warn("companies.jsonl not found, skipping cleansing").await;
return Ok(0);
}
logger::log_info(&format!(" Reading from: {:?}", input_path)).await;
logger::log_info(&format!(" Writing to: {:?}", output_path)).await;
let file = File::open(&input_path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut output_file = File::create(&output_path).await?;
let mut valid_count = 0;
let mut removed_count = 0;
let mut total_count = 0;
while let Some(line) = lines.next_line().await? {
if line.trim().is_empty() {
continue;
}
total_count += 1;
let company: CompanyCrossPlatformInfo = match serde_json::from_str(&line) {
Ok(c) => c,
Err(e) => {
logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await;
continue;
}
};
// Check if company has at least one valid YAHOO ticker
// Valid means: starts with "YAHOO:" but is NOT "YAHOO:NO_RESULTS"
let has_valid_yahoo = company.isin_tickers_map
.values()
.flatten()
.any(|ticker| ticker.starts_with("YAHOO:") && ticker != "YAHOO:NO_RESULTS");
if has_valid_yahoo {
// Write the company to the filtered output
let json_line = serde_json::to_string(&company)?;
output_file.write_all(json_line.as_bytes()).await?;
output_file.write_all(b"\n").await?;
valid_count += 1;
} else {
removed_count += 1;
if removed_count <= 5 {
// Log first few removals for debugging
logger::log_info(&format!(" Removed company '{}' (no valid Yahoo ticker)", company.name)).await;
}
}
// Progress indicator for large files
if total_count % 1000 == 0 {
logger::log_info(&format!(" Processed {} companies...", total_count)).await;
}
}
output_file.flush().await?;
logger::log_info(&format!(
" ✓ Cleansing complete: {} total → {} valid, {} removed",
total_count, valid_count, removed_count
)).await;
Ok(valid_count)
}
async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> {
let map_cache_dir = paths.cache_gleif_openfigi_map_dir();

View File

@@ -1,11 +1,7 @@
// src/corporate/update_parallel.rs - FIXED: Proper Hard Reset Implementation
// src/corporate/update_parallel.rs - PROPERLY FIXED: Correct pending queue rebuild
//
// Critical fixes:
// 1. Hard reset actually performed (no premature break)
// 2. Error counter reset after hard reset
// 3. Per-ISIN status tracking (not per-company)
// 4. Proper task draining before reset
// 5. Queue rebuilding after reset
// Critical fix: After hard reset, only skip companies with COMPLETE Yahoo data
// Not just companies that have been written
use super::{types::*, yahoo::*, helpers::*};
use crate::util::directories::DataPaths;
@@ -38,6 +34,53 @@ struct CompanyProcessResult {
is_update: bool,
}
/// Check if a company needs Yahoo data processing
/// Returns true if company has incomplete data (needs processing)
fn company_needs_processing(
company_name: &str,
company_info: &CompanyInfo,
existing_companies: &HashMap<String, CompanyCrossPlatformInfo>,
) -> bool {
// If company not in existing data at all, definitely needs processing
let Some(existing_entry) = existing_companies.get(company_name) else {
return true;
};
// Collect all ISINs this company should have
let mut required_isins = std::collections::HashSet::new();
for figi_infos in company_info.securities.values() {
for figi_info in figi_infos {
if !figi_info.isin.is_empty() {
required_isins.insert(figi_info.isin.clone());
}
}
}
// Check each required ISIN
for isin in required_isins {
// Check if this ISIN exists in the company's ticker map
if let Some(tickers) = existing_entry.isin_tickers_map.get(&isin) {
// Check if this ISIN has valid Yahoo data
let has_valid_yahoo = tickers.iter().any(|t| {
t.starts_with("YAHOO:") &&
t != "YAHOO:ERROR" && // Error marker means needs retry
t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found)
});
// If no valid Yahoo data for this ISIN, company needs processing
if !has_valid_yahoo {
return true;
}
} else {
// ISIN not in map at all, needs processing
return true;
}
}
// All ISINs have valid Yahoo data, skip this company
false
}
/// Abort-safe incremental JSONL persistence with proper hard reset handling
pub async fn build_companies_jsonl_streaming_parallel(
paths: &DataPaths,
@@ -64,14 +107,13 @@ pub async fn build_companies_jsonl_streaming_parallel(
let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name");
let securities_path = corporate_path.join("common_stocks.json");
let securities_path_cloned = securities_path.clone();
if !securities_path.exists() {
logger::log_warn("No common_stocks.json found").await;
return Ok(0);
}
let content = tokio::fs::read_to_string(securities_path).await?;
let content = tokio::fs::read_to_string(&securities_path).await?;
let securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
let companies_path = paths.data_dir().join("companies.jsonl");
@@ -145,7 +187,9 @@ pub async fn build_companies_jsonl_streaming_parallel(
let companies_path_clone = companies_path.clone();
let log_path_clone = log_path.clone();
let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone()));
let existing_companies_writer_clone = Arc::clone(&existing_companies_writer);
// Clone the Arc for the writer task (Arc clone is cheap, just increments ref count)
let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer);
let write_tx_for_writer = write_tx.clone();
let writer_task = tokio::spawn(async move {
@@ -176,7 +220,7 @@ pub async fn build_companies_jsonl_streaming_parallel(
count += 1;
// Update in-memory state
let mut existing_companies = existing_companies_writer.lock().await;
let mut existing_companies = existing_companies_writer_for_task.lock().await;
let is_update = existing_companies.contains_key(&company.name);
existing_companies.insert(company.name.clone(), company);
drop(existing_companies);
@@ -214,7 +258,7 @@ pub async fn build_companies_jsonl_streaming_parallel(
break;
}
let existing_companies = existing_companies_writer.lock().await;
let existing_companies = existing_companies_writer_for_task.lock().await;
let companies_vec: Vec<_> = existing_companies.values().cloned().collect();
drop(existing_companies);
@@ -291,7 +335,19 @@ pub async fn build_companies_jsonl_streaming_parallel(
logger::log_info(&format!("Processing {} companies with concurrency limit {}", total, CONCURRENCY_LIMIT)).await;
let mut tasks = FuturesUnordered::new();
let mut pending = securities.into_iter().collect::<Vec<_>>();
// Build initial pending list with proper filtering
let mut pending: Vec<(String, CompanyInfo)> = securities.iter()
.filter(|(name, info)| company_needs_processing(name, info, &existing_companies))
.map(|(name, info)| (name.clone(), info.clone()))
.collect();
logger::log_info(&format!(
"Initial scan: {} companies need processing ({} already complete)",
pending.len(),
total - pending.len()
)).await;
let mut processed = 0;
let mut hard_reset_count = 0;
@@ -397,7 +453,7 @@ pub async fn build_companies_jsonl_streaming_parallel(
let error_msg = e.to_string();
if error_msg.contains("HARD_RESET_REQUIRED") {
// ✅ FIX: Don't break, perform actual hard reset
// Don't break, perform actual hard reset
// Check if reset already in progress (race condition protection)
let mut reset_lock = reset_in_progress.lock().await;
@@ -439,7 +495,7 @@ pub async fn build_companies_jsonl_streaming_parallel(
logger::log_info("✅ Hard reset completed successfully").await;
hard_reset_count += 1;
// ✅ FIX: Reset the error counter
// Reset the error counter
{
let pool_guard = pool_mutex.lock().await;
let current_pool = Arc::clone(&*pool_guard);
@@ -447,24 +503,24 @@ pub async fn build_companies_jsonl_streaming_parallel(
}
logger::log_info("✓ Error counter cleared").await;
// ✅ FIX: Rebuild pending list from existing_companies
// Only re-add companies that haven't been written yet
let written_companies = {
let companies = existing_companies_writer_clone.lock().await;
companies.keys().cloned().collect::<std::collections::HashSet<_>>()
// Rebuild pending list by checking which companies need processing
logger::log_info("Rebuilding pending queue with proper Yahoo data checks...").await;
// Get current state of written companies
let current_existing = {
let companies = existing_companies_writer.lock().await;
companies.clone()
};
// Create new pending list: all companies minus those already written
let all_companies_list: Vec<(String, CompanyInfo)> = {
// Need to reload securities since we cleared pending
let content = tokio::fs::read_to_string(&securities_path_cloned).await?;
let all_securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
all_securities.into_iter()
.filter(|(name, _)| !written_companies.contains(name))
.collect()
};
// Reload all securities from disk
let content = tokio::fs::read_to_string(&securities_path).await?;
let all_securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?;
pending = all_companies_list;
// Build pending list: only companies that need processing
pending = all_securities.iter()
.filter(|(name, info)| company_needs_processing(name, info, &current_existing))
.map(|(name, info)| (name.clone(), info.clone()))
.collect();
logger::log_info(&format!(
"Restarting with {} remaining companies (out of {} total)",
@@ -472,6 +528,18 @@ pub async fn build_companies_jsonl_streaming_parallel(
total
)).await;
// Only continue if there's work to do
if pending.is_empty() {
logger::log_info("All companies have complete data, exiting").await;
// Clear reset flag
let mut reset_lock = reset_in_progress.lock().await;
*reset_lock = false;
drop(reset_lock);
break; // Exit main loop
}
// Respawn initial batch with NEW pool
for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) {
if let Some((name, company_info)) = pending.pop() {
@@ -695,7 +763,7 @@ async fn process_single_company_validated(
}
}
// ✅ FIX: Process each ISIN independently with per-ISIN status checking
// Process each ISIN independently with per-ISIN status checking
for (isin, figi_tickers) in unique_isin_ticker_pairs {
// Check shutdown before each ISIN
if shutdown_flag.load(Ordering::SeqCst) {
@@ -716,10 +784,13 @@ async fn process_single_company_validated(
}
}
// ✅ FIX: Check if THIS SPECIFIC ISIN has Yahoo data
let has_yahoo_ticker_for_this_isin = tickers.iter().any(|t| t.starts_with("YAHOO:"));
// Check if THIS SPECIFIC ISIN has valid Yahoo data (not ERROR)
let has_valid_yahoo = tickers.iter().any(|t| {
t.starts_with("YAHOO:") && t != "YAHOO:ERROR"
// Note: YAHOO:NO_RESULTS is valid (legitimately not found)
});
if !has_yahoo_ticker_for_this_isin {
if !has_valid_yahoo {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
match scrape_with_retry(pool, &isin, 3, shutdown_flag).await {
@@ -766,7 +837,7 @@ async fn process_single_company_validated(
isin, name, e
)).await;
// ✅ FIX: Mark this ISIN as failed to enable retry
// Mark this ISIN as failed to enable retry
tickers.push("YAHOO:ERROR".to_string());
}
}