merged enriching functions into one module
This commit is contained in:
6
logs/checkpoint_dependencies.dot
Normal file
6
logs/checkpoint_dependencies.dot
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
digraph Dependencies {
|
||||||
|
rankdir=LR;
|
||||||
|
node [shape=box];
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,13 +1,57 @@
|
|||||||
[checkpoints.lei_figi_mapping_complete]
|
# checkpoint_dependencies.toml - Complete configuration
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# COLLECTION STAGE (No dependencies)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
[checkpoints.exchange_collection_complete]
|
||||||
|
description = "Yahoo exchanges collected and validated"
|
||||||
depends_on = []
|
depends_on = []
|
||||||
|
|
||||||
[checkpoints.securities_data_complete]
|
[checkpoints.lei_figi_mapping_complete]
|
||||||
depends_on = ["lei_figi_mapping_complete"]
|
description = "LEI-to-FIGI mappings from OpenFIGI API"
|
||||||
|
depends_on = []
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# CLEANSING STAGE (Depends on collection)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
[checkpoints.yahoo_companies_cleansed]
|
||||||
|
description = "Company data cleansed and validated"
|
||||||
|
depends_on = ["exchange_collection_complete"]
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# ENRICHMENT GROUP (All depend on cleansed companies)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
[groups.enrichment_group]
|
[groups.enrichment_group]
|
||||||
members = ["yahoo_events_enrichment_complete", "yahoo_options_enrichment_complete"]
|
description = "Yahoo Finance enrichment functions"
|
||||||
|
members = [
|
||||||
|
"yahoo_events_enrichment_complete",
|
||||||
|
"yahoo_options_enrichment_complete",
|
||||||
|
"yahoo_chart_enrichment_complete"
|
||||||
|
]
|
||||||
depends_on = ["yahoo_companies_cleansed"]
|
depends_on = ["yahoo_companies_cleansed"]
|
||||||
|
|
||||||
[checkpoints.yahoo_events_enrichment_complete]
|
[checkpoints.yahoo_events_enrichment_complete]
|
||||||
|
description = "Corporate events enriched for all companies"
|
||||||
depends_on = []
|
depends_on = []
|
||||||
group = "enrichment_group"
|
group = "enrichment_group"
|
||||||
|
|
||||||
|
[checkpoints.yahoo_options_enrichment_complete]
|
||||||
|
description = "Options data enriched for all companies"
|
||||||
|
depends_on = []
|
||||||
|
group = "enrichment_group"
|
||||||
|
|
||||||
|
[checkpoints.yahoo_chart_enrichment_complete]
|
||||||
|
description = "Chart data enriched for all companies"
|
||||||
|
depends_on = []
|
||||||
|
group = "enrichment_group"
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# SECURITIES PROCESSING (Depends on LEI mapping)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
[checkpoints.securities_data_complete]
|
||||||
|
description = "Securities data built from FIGI mappings"
|
||||||
|
depends_on = ["lei_figi_mapping_complete"]
|
||||||
@@ -14,7 +14,6 @@ pub mod update;
|
|||||||
pub mod update_companies;
|
pub mod update_companies;
|
||||||
pub mod update_companies_cleanse;
|
pub mod update_companies_cleanse;
|
||||||
pub mod update_companies_enrich;
|
pub mod update_companies_enrich;
|
||||||
pub mod update_companies_enrich_option_chart;
|
|
||||||
|
|
||||||
pub mod collect_exchanges;
|
pub mod collect_exchanges;
|
||||||
|
|
||||||
|
|||||||
@@ -4,8 +4,7 @@ use crate::config::Config;
|
|||||||
use crate::check_shutdown;
|
use crate::check_shutdown;
|
||||||
use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel;
|
use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel;
|
||||||
use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data};
|
use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data};
|
||||||
use crate::corporate::update_companies_enrich::enrich_companies_with_events;
|
use crate::corporate::update_companies_enrich::{enrich_companies_with_events, enrich_companies_with_chart, enrich_companies_with_option};
|
||||||
use crate::corporate::update_companies_enrich_option_chart::{enrich_companies_with_option, enrich_companies_with_chart};
|
|
||||||
use crate::corporate::collect_exchanges::collect_and_save_exchanges;
|
use crate::corporate::collect_exchanges::collect_and_save_exchanges;
|
||||||
use crate::economic::yahoo_update_forex::collect_fx_rates;
|
use crate::economic::yahoo_update_forex::collect_fx_rates;
|
||||||
use crate::util::directories::DataPaths;
|
use crate::util::directories::DataPaths;
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
// src/corporate/update_companies_enrich_events.rs - WITH INTEGRITY MODULE
|
// src/corporate/update_companies_enrich.rs - MERGED VERSION WITH GENERIC ENRICHMENT
|
||||||
use super::{types::*, helpers::*};
|
use super::{types::*, helpers::*};
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::corporate::checkpoint_helpers;
|
use crate::corporate::checkpoint_helpers;
|
||||||
@@ -8,7 +8,7 @@ use crate::util::logger;
|
|||||||
use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule};
|
use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule};
|
||||||
|
|
||||||
use std::result::Result::Ok;
|
use std::result::Result::Ok;
|
||||||
use chrono::Utc;
|
use chrono::{TimeZone, Utc};
|
||||||
use std::collections::{HashSet};
|
use std::collections::{HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
@@ -17,6 +17,27 @@ use tokio::io::{AsyncWriteExt};
|
|||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
/// Log command enum (shared across all enrichment types)
|
||||||
|
enum LogCommand {
|
||||||
|
Write(serde_json::Value),
|
||||||
|
Checkpoint,
|
||||||
|
Shutdown,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Type alias for enrichment function
|
||||||
|
type EnrichmentFn = Arc<
|
||||||
|
dyn Fn(CompanyCrossPlatformInfo, Arc<YahooClientPool>, DataPaths)
|
||||||
|
-> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
>;
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// EVENTS ENRICHMENT
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
/// Yahoo Event enrichment per corporate company
|
/// Yahoo Event enrichment per corporate company
|
||||||
///
|
///
|
||||||
@@ -104,6 +125,16 @@ pub async fn enrich_companies_with_events(
|
|||||||
|
|
||||||
// === PROCESSING PHASE: Enrich companies with events ===
|
// === PROCESSING PHASE: Enrich companies with events ===
|
||||||
|
|
||||||
|
// Create enrichment function
|
||||||
|
let enrichment_fn: EnrichmentFn = Arc::new(move |company, pool, paths| {
|
||||||
|
let company = company.clone();
|
||||||
|
let pool = Arc::clone(&pool);
|
||||||
|
let paths = paths.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
enrich_company_with_events(&company, &pool, &paths).await
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
// Shared counters
|
// Shared counters
|
||||||
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||||
let success_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
let success_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||||
@@ -113,64 +144,14 @@ pub async fn enrich_companies_with_events(
|
|||||||
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000);
|
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000);
|
||||||
|
|
||||||
// Spawn log writer task
|
// Spawn log writer task
|
||||||
let log_writer_handle = {
|
let log_writer_handle = spawn_log_writer(
|
||||||
let log_path = log_path.clone();
|
log_path,
|
||||||
let processed_count = Arc::clone(&processed_count);
|
log_rx,
|
||||||
let total_companies = total_companies;
|
Arc::clone(&processed_count),
|
||||||
|
total_companies,
|
||||||
tokio::spawn(async move {
|
FSYNC_BATCH_SIZE,
|
||||||
let mut log_file = OpenOptions::new()
|
FSYNC_INTERVAL_SECS,
|
||||||
.create(true)
|
);
|
||||||
.append(true)
|
|
||||||
.open(&log_path)
|
|
||||||
.await
|
|
||||||
.expect("Failed to open log file");
|
|
||||||
|
|
||||||
let mut write_count = 0;
|
|
||||||
let mut last_fsync = tokio::time::Instant::now();
|
|
||||||
|
|
||||||
while let Some(cmd) = log_rx.recv().await {
|
|
||||||
match cmd {
|
|
||||||
LogCommand::Write(entry) => {
|
|
||||||
let json_line = serde_json::to_string(&entry).expect("Serialization failed");
|
|
||||||
log_file.write_all(json_line.as_bytes()).await.expect("Write failed");
|
|
||||||
log_file.write_all(b"\n").await.expect("Write failed");
|
|
||||||
|
|
||||||
write_count += 1;
|
|
||||||
|
|
||||||
// Batched fsync
|
|
||||||
if write_count >= FSYNC_BATCH_SIZE
|
|
||||||
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS
|
|
||||||
{
|
|
||||||
log_file.flush().await.expect("Flush failed");
|
|
||||||
log_file.sync_all().await.expect("Fsync failed");
|
|
||||||
write_count = 0;
|
|
||||||
last_fsync = tokio::time::Instant::now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LogCommand::Checkpoint => {
|
|
||||||
// Force fsync on checkpoint
|
|
||||||
log_file.flush().await.expect("Flush failed");
|
|
||||||
log_file.sync_all().await.expect("Fsync failed");
|
|
||||||
write_count = 0;
|
|
||||||
last_fsync = tokio::time::Instant::now();
|
|
||||||
|
|
||||||
let current = processed_count.load(Ordering::SeqCst);
|
|
||||||
logger::log_info(&format!(
|
|
||||||
" Checkpoint: {}/{} companies processed",
|
|
||||||
current, total_companies
|
|
||||||
)).await;
|
|
||||||
}
|
|
||||||
LogCommand::Shutdown => {
|
|
||||||
// Final fsync before shutdown
|
|
||||||
log_file.flush().await.expect("Flush failed");
|
|
||||||
log_file.sync_all().await.expect("Fsync failed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
// Process companies concurrently with task panic isolation
|
// Process companies concurrently with task panic isolation
|
||||||
let mut tasks = FuturesUnordered::new();
|
let mut tasks = FuturesUnordered::new();
|
||||||
@@ -190,6 +171,7 @@ pub async fn enrich_companies_with_events(
|
|||||||
log_tx.clone(),
|
log_tx.clone(),
|
||||||
Arc::clone(&semaphore),
|
Arc::clone(&semaphore),
|
||||||
Arc::clone(shutdown_flag),
|
Arc::clone(shutdown_flag),
|
||||||
|
Arc::clone(&enrichment_fn),
|
||||||
);
|
);
|
||||||
tasks.push(task);
|
tasks.push(task);
|
||||||
}
|
}
|
||||||
@@ -212,11 +194,17 @@ pub async fn enrich_companies_with_events(
|
|||||||
|
|
||||||
// Check for shutdown
|
// Check for shutdown
|
||||||
if shutdown_flag.load(Ordering::SeqCst) {
|
if shutdown_flag.load(Ordering::SeqCst) {
|
||||||
logger::log_warn("Shutdown detected, stopping new enrichment tasks...").await;
|
logger::log_warn("Shutdown signal received, stopping event enrichment").await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spawn next task
|
// Checkpoint periodically
|
||||||
|
checkpoint_counter += 1;
|
||||||
|
if checkpoint_counter % CHECKPOINT_INTERVAL == 0 {
|
||||||
|
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn next task if available
|
||||||
if let Some(company) = pending_iter.next() {
|
if let Some(company) = pending_iter.next() {
|
||||||
let task = spawn_enrichment_task(
|
let task = spawn_enrichment_task(
|
||||||
company,
|
company,
|
||||||
@@ -228,18 +216,13 @@ pub async fn enrich_companies_with_events(
|
|||||||
log_tx.clone(),
|
log_tx.clone(),
|
||||||
Arc::clone(&semaphore),
|
Arc::clone(&semaphore),
|
||||||
Arc::clone(shutdown_flag),
|
Arc::clone(shutdown_flag),
|
||||||
|
Arc::clone(&enrichment_fn),
|
||||||
);
|
);
|
||||||
tasks.push(task);
|
tasks.push(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Periodic checkpoint
|
|
||||||
checkpoint_counter += 1;
|
|
||||||
if checkpoint_counter % CHECKPOINT_INTERVAL == 0 {
|
|
||||||
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown log writer
|
// Signal log writer to shutdown
|
||||||
let _ = log_tx.send(LogCommand::Shutdown).await;
|
let _ = log_tx.send(LogCommand::Shutdown).await;
|
||||||
drop(log_tx);
|
drop(log_tx);
|
||||||
|
|
||||||
@@ -299,63 +282,6 @@ async fn track_events_completion(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn a single enrichment task with panic isolation
|
|
||||||
fn spawn_enrichment_task(
|
|
||||||
company: CompanyCrossPlatformInfo,
|
|
||||||
yahoo_pool: Arc<YahooClientPool>,
|
|
||||||
paths: DataPaths,
|
|
||||||
processed_count: Arc<AtomicUsize>,
|
|
||||||
success_count: Arc<AtomicUsize>,
|
|
||||||
failed_count: Arc<AtomicUsize>,
|
|
||||||
log_tx: mpsc::Sender<LogCommand>,
|
|
||||||
semaphore: Arc<tokio::sync::Semaphore>,
|
|
||||||
shutdown_flag: Arc<AtomicBool>,
|
|
||||||
) -> tokio::task::JoinHandle<()> {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
// Acquire semaphore permit
|
|
||||||
let _permit = semaphore.acquire().await.expect("Semaphore closed");
|
|
||||||
|
|
||||||
// Check shutdown before processing
|
|
||||||
if shutdown_flag.load(Ordering::SeqCst) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process company
|
|
||||||
let result = enrich_company_with_events(
|
|
||||||
&company,
|
|
||||||
&yahoo_pool,
|
|
||||||
&paths,
|
|
||||||
).await;
|
|
||||||
|
|
||||||
// Update counters
|
|
||||||
processed_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
|
|
||||||
let status = match result {
|
|
||||||
Ok(_) => {
|
|
||||||
success_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
"enriched"
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
failed_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
logger::log_warn(&format!(
|
|
||||||
" Failed to enrich {}: {}",
|
|
||||||
company.name, e
|
|
||||||
)).await;
|
|
||||||
"failed"
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Log result
|
|
||||||
let log_entry = json!({
|
|
||||||
"company_name": company.name,
|
|
||||||
"status": status,
|
|
||||||
"timestamp": Utc::now().to_rfc3339(),
|
|
||||||
});
|
|
||||||
|
|
||||||
let _ = log_tx.send(LogCommand::Write(log_entry)).await;
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Enrich a single company with event data
|
/// Enrich a single company with event data
|
||||||
async fn enrich_company_with_events(
|
async fn enrich_company_with_events(
|
||||||
company: &CompanyCrossPlatformInfo,
|
company: &CompanyCrossPlatformInfo,
|
||||||
@@ -446,9 +372,690 @@ async fn save_company_event_data(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Log command enum
|
// ============================================================================
|
||||||
enum LogCommand {
|
// OPTION ENRICHMENT
|
||||||
Write(serde_json::Value),
|
// ============================================================================
|
||||||
Checkpoint,
|
|
||||||
Shutdown,
|
/// Yahoo Option enrichment per corporate company
|
||||||
|
///
|
||||||
|
/// # Features
|
||||||
|
/// - Graceful shutdown (abort-safe)
|
||||||
|
/// - Task panic isolation (tasks fail independently)
|
||||||
|
/// - Crash-safe persistence (checkpoint + log with fsync)
|
||||||
|
/// - Smart skip logic (only process incomplete data)
|
||||||
|
/// - Uses pending queue instead of retry mechanism
|
||||||
|
/// - Content integrity validation with hash tracking
|
||||||
|
///
|
||||||
|
/// # Persistence Strategy
|
||||||
|
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
|
||||||
|
/// - Log: companies_option_updates.log (append-only updates)
|
||||||
|
/// - On restart: Load checkpoint + replay log
|
||||||
|
/// - Periodic checkpoints (every 50 companies)
|
||||||
|
/// - Batched fsync (every 10 writes or 10 seconds)
|
||||||
|
/// - Hash validation of all option data directories
|
||||||
|
pub async fn enrich_companies_with_option(
|
||||||
|
paths: &DataPaths,
|
||||||
|
_config: &Config,
|
||||||
|
yahoo_pool: Arc<YahooClientPool>,
|
||||||
|
shutdown_flag: &Arc<AtomicBool>,
|
||||||
|
) -> anyhow::Result<usize> {
|
||||||
|
// Configuration constants
|
||||||
|
const CHECKPOINT_INTERVAL: usize = 50;
|
||||||
|
const FSYNC_BATCH_SIZE: usize = 10;
|
||||||
|
const FSYNC_INTERVAL_SECS: u64 = 10;
|
||||||
|
const CONCURRENCY_LIMIT: usize = 50;
|
||||||
|
|
||||||
|
let data_path = paths.data_dir();
|
||||||
|
|
||||||
|
// File paths
|
||||||
|
let input_path = data_path.join("companies_yahoo_cleaned.jsonl");
|
||||||
|
let log_path = data_path.join("companies_option_updates.log");
|
||||||
|
let state_path = data_path.join("state.jsonl");
|
||||||
|
|
||||||
|
// Check input exists
|
||||||
|
if !input_path.exists() {
|
||||||
|
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping option enrichment").await;
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
|
||||||
|
let step_name = "yahoo_option_enrichment_complete";
|
||||||
|
|
||||||
|
if manager.is_step_valid(step_name).await? {
|
||||||
|
logger::log_info(" Yahoo option enrichment already completed and valid").await;
|
||||||
|
let count = checkpoint_helpers::count_enriched_companies(paths, "option").await?;
|
||||||
|
logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await;
|
||||||
|
return Ok(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger::log_info(" Option data needs refresh - starting enrichment").await;
|
||||||
|
|
||||||
|
// === RECOVERY PHASE: Track enriched companies ===
|
||||||
|
let enriched_companies = checkpoint_helpers::load_enrichment_progress(&log_path).await?;
|
||||||
|
|
||||||
|
// Load all companies from input
|
||||||
|
logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await;
|
||||||
|
let companies = load_companies_from_jsonl(&input_path).await?;
|
||||||
|
let total_companies = companies.len();
|
||||||
|
logger::log_info(&format!("Found {} companies to process", total_companies)).await;
|
||||||
|
|
||||||
|
// Filter companies that need enrichment
|
||||||
|
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies
|
||||||
|
.into_iter()
|
||||||
|
.filter(|company| !enriched_companies.contains(&company.name))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let pending_count = pending_companies.len();
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" {} already enriched, {} pending",
|
||||||
|
enriched_companies.len(),
|
||||||
|
pending_count
|
||||||
|
)).await;
|
||||||
|
|
||||||
|
if pending_count == 0 {
|
||||||
|
logger::log_info(" ✓ All companies already enriched").await;
|
||||||
|
track_option_completion(&manager, paths, step_name).await?;
|
||||||
|
return Ok(enriched_companies.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
// === PROCESSING PHASE: Enrich companies with option ===
|
||||||
|
|
||||||
|
// Create enrichment function
|
||||||
|
let enrichment_fn: EnrichmentFn = Arc::new(move |company, pool, paths| {
|
||||||
|
let company = company.clone();
|
||||||
|
let pool = Arc::clone(&pool);
|
||||||
|
let paths = paths.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
enrich_company_with_option(&company, &pool, &paths).await
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
// Shared counters
|
||||||
|
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||||
|
let success_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||||
|
let failed_count = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
// Log writer channel with batching and fsync
|
||||||
|
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000);
|
||||||
|
|
||||||
|
// Spawn log writer task
|
||||||
|
let log_writer_handle = spawn_log_writer(
|
||||||
|
log_path,
|
||||||
|
log_rx,
|
||||||
|
Arc::clone(&processed_count),
|
||||||
|
total_companies,
|
||||||
|
FSYNC_BATCH_SIZE,
|
||||||
|
FSYNC_INTERVAL_SECS,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Process companies concurrently with task panic isolation
|
||||||
|
let mut tasks = FuturesUnordered::new();
|
||||||
|
let mut pending_iter = pending_companies.into_iter();
|
||||||
|
let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT));
|
||||||
|
|
||||||
|
// Initial batch of tasks
|
||||||
|
for _ in 0..CONCURRENCY_LIMIT.min(pending_count) {
|
||||||
|
if let Some(company) = pending_iter.next() {
|
||||||
|
let task = spawn_enrichment_task(
|
||||||
|
company,
|
||||||
|
Arc::clone(&yahoo_pool),
|
||||||
|
paths.clone(),
|
||||||
|
Arc::clone(&processed_count),
|
||||||
|
Arc::clone(&success_count),
|
||||||
|
Arc::clone(&failed_count),
|
||||||
|
log_tx.clone(),
|
||||||
|
Arc::clone(&semaphore),
|
||||||
|
Arc::clone(shutdown_flag),
|
||||||
|
Arc::clone(&enrichment_fn),
|
||||||
|
);
|
||||||
|
tasks.push(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process tasks as they complete and spawn new ones
|
||||||
|
let mut checkpoint_counter = 0;
|
||||||
|
while let Some(_result) = tasks.next().await {
|
||||||
|
// Check for shutdown
|
||||||
|
if shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
logger::log_warn("Shutdown signal received, stopping option enrichment").await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checkpoint periodically
|
||||||
|
checkpoint_counter += 1;
|
||||||
|
if checkpoint_counter % CHECKPOINT_INTERVAL == 0 {
|
||||||
|
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn next task if available
|
||||||
|
if let Some(company) = pending_iter.next() {
|
||||||
|
let task = spawn_enrichment_task(
|
||||||
|
company,
|
||||||
|
Arc::clone(&yahoo_pool),
|
||||||
|
paths.clone(),
|
||||||
|
Arc::clone(&processed_count),
|
||||||
|
Arc::clone(&success_count),
|
||||||
|
Arc::clone(&failed_count),
|
||||||
|
log_tx.clone(),
|
||||||
|
Arc::clone(&semaphore),
|
||||||
|
Arc::clone(shutdown_flag),
|
||||||
|
Arc::clone(&enrichment_fn),
|
||||||
|
);
|
||||||
|
tasks.push(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Signal log writer to shutdown
|
||||||
|
let _ = log_tx.send(LogCommand::Shutdown).await;
|
||||||
|
drop(log_tx);
|
||||||
|
|
||||||
|
// Wait for log writer to finish
|
||||||
|
let _ = log_writer_handle.await;
|
||||||
|
|
||||||
|
let final_processed = processed_count.load(Ordering::SeqCst);
|
||||||
|
let final_success = success_count.load(Ordering::SeqCst);
|
||||||
|
let final_failed = failed_count.load(Ordering::SeqCst);
|
||||||
|
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" Option enrichment summary: {} total, {} success, {} failed",
|
||||||
|
final_processed, final_success, final_failed
|
||||||
|
)).await;
|
||||||
|
|
||||||
|
// Mark as complete if all companies processed
|
||||||
|
if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
track_option_completion(&manager, paths, step_name).await?;
|
||||||
|
logger::log_info(" ✓ Option enrichment marked as complete with integrity tracking").await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(final_success)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Track option enrichment completion with content hash verification
|
||||||
|
async fn track_option_completion(
|
||||||
|
manager: &StateManager,
|
||||||
|
paths: &DataPaths,
|
||||||
|
step_name: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// Create content reference for all option data
|
||||||
|
// This will hash ALL files matching the pattern: {company}/option/data.jsonl
|
||||||
|
let content_reference = directory_reference(
|
||||||
|
paths.corporate_dir(),
|
||||||
|
Some(vec![
|
||||||
|
"*/option/*.jsonl".to_string(), // Main pattern for option data
|
||||||
|
"*/option/data.jsonl".to_string(), // Specific pattern (more precise)
|
||||||
|
]),
|
||||||
|
Some(vec![
|
||||||
|
"*.log".to_string(), // Exclude log files
|
||||||
|
"*.tmp".to_string(), // Exclude temp files
|
||||||
|
"*.bak".to_string(), // Exclude backup files
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Track completion with:
|
||||||
|
// - Content reference: All option directories
|
||||||
|
// - Data stage: Data (7-day TTL by default)
|
||||||
|
// - Dependencies: Depends on cleaned companies data
|
||||||
|
manager.update_entry(
|
||||||
|
step_name.to_string(),
|
||||||
|
content_reference,
|
||||||
|
DataStage::Data,
|
||||||
|
None, // Use default TTL (7 days for Data stage)
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enrich a single company with option data
|
||||||
|
async fn enrich_company_with_option(
|
||||||
|
company: &CompanyCrossPlatformInfo,
|
||||||
|
yahoo_pool: &Arc<YahooClientPool>,
|
||||||
|
paths: &DataPaths,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let ticker = match extract_first_yahoo_ticker(company) {
|
||||||
|
Some(t) => t,
|
||||||
|
None => {
|
||||||
|
return Err(anyhow::anyhow!("No valid Yahoo ticker found"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get option data for all available expiration dates
|
||||||
|
let option_data = yahoo_pool.get_option_data(&ticker, None).await?;
|
||||||
|
|
||||||
|
// Only save if we got meaningful data
|
||||||
|
if option_data.option.is_empty() {
|
||||||
|
return Err(anyhow::anyhow!("No option data available"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the option data
|
||||||
|
save_company_data(paths, &company.name, &option_data, "option").await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// CHART ENRICHMENT
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/// Yahoo Chart enrichment per corporate company
|
||||||
|
///
|
||||||
|
/// # Features
|
||||||
|
/// - Graceful shutdown (abort-safe)
|
||||||
|
/// - Task panic isolation (tasks fail independently)
|
||||||
|
/// - Crash-safe persistence (checkpoint + log with fsync)
|
||||||
|
/// - Smart skip logic (only process incomplete data)
|
||||||
|
/// - Uses pending queue instead of retry mechanism
|
||||||
|
/// - Content integrity validation with hash tracking
|
||||||
|
///
|
||||||
|
/// # Persistence Strategy
|
||||||
|
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
|
||||||
|
/// - Log: companies_chart_updates.log (append-only updates)
|
||||||
|
/// - On restart: Load checkpoint + replay log
|
||||||
|
/// - Periodic checkpoints (every 50 companies)
|
||||||
|
/// - Batched fsync (every 10 writes or 10 seconds)
|
||||||
|
/// - Hash validation of all chart data directories
|
||||||
|
pub async fn enrich_companies_with_chart(
|
||||||
|
paths: &DataPaths,
|
||||||
|
_config: &Config,
|
||||||
|
yahoo_pool: Arc<YahooClientPool>,
|
||||||
|
shutdown_flag: &Arc<AtomicBool>,
|
||||||
|
) -> anyhow::Result<usize> {
|
||||||
|
// Configuration constants
|
||||||
|
const CHECKPOINT_INTERVAL: usize = 50;
|
||||||
|
const FSYNC_BATCH_SIZE: usize = 10;
|
||||||
|
const FSYNC_INTERVAL_SECS: u64 = 10;
|
||||||
|
const CONCURRENCY_LIMIT: usize = 50;
|
||||||
|
|
||||||
|
let data_path = paths.data_dir();
|
||||||
|
|
||||||
|
// File paths
|
||||||
|
let input_path = data_path.join("companies_yahoo_cleaned.jsonl");
|
||||||
|
let log_path = data_path.join("companies_chart_updates.log");
|
||||||
|
let state_path = data_path.join("state.jsonl");
|
||||||
|
|
||||||
|
// Check input exists
|
||||||
|
if !input_path.exists() {
|
||||||
|
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping chart enrichment").await;
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
|
||||||
|
let step_name = "yahoo_chart_enrichment_complete";
|
||||||
|
|
||||||
|
if manager.is_step_valid(step_name).await? {
|
||||||
|
logger::log_info(" Yahoo chart enrichment already completed and valid").await;
|
||||||
|
let count = checkpoint_helpers::count_enriched_companies(paths, "chart").await?;
|
||||||
|
logger::log_info(&format!(" ✓ Found {} companies with valid chart data", count)).await;
|
||||||
|
return Ok(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger::log_info(" Chart data needs refresh - starting enrichment").await;
|
||||||
|
|
||||||
|
// === RECOVERY PHASE: Track enriched companies ===
|
||||||
|
let enriched_companies = checkpoint_helpers::load_enrichment_progress(&log_path).await?;
|
||||||
|
|
||||||
|
// Load all companies from input
|
||||||
|
logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await;
|
||||||
|
let companies = load_companies_from_jsonl(&input_path).await?;
|
||||||
|
let total_companies = companies.len();
|
||||||
|
logger::log_info(&format!("Found {} companies to process", total_companies)).await;
|
||||||
|
|
||||||
|
// Filter companies that need enrichment
|
||||||
|
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies
|
||||||
|
.into_iter()
|
||||||
|
.filter(|company| !enriched_companies.contains(&company.name))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let pending_count = pending_companies.len();
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" {} already enriched, {} pending",
|
||||||
|
enriched_companies.len(),
|
||||||
|
pending_count
|
||||||
|
)).await;
|
||||||
|
|
||||||
|
if pending_count == 0 {
|
||||||
|
logger::log_info(" ✓ All companies already enriched").await;
|
||||||
|
track_chart_completion(&manager, paths, step_name).await?;
|
||||||
|
return Ok(enriched_companies.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
// === PROCESSING PHASE: Enrich companies with chart ===
|
||||||
|
|
||||||
|
// Create enrichment function
|
||||||
|
let enrichment_fn: EnrichmentFn = Arc::new(move |company, pool, paths| {
|
||||||
|
let company = company.clone();
|
||||||
|
let pool = Arc::clone(&pool);
|
||||||
|
let paths = paths.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
enrich_company_with_chart(&company, &pool, &paths).await
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
// Shared counters
|
||||||
|
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||||
|
let success_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||||
|
let failed_count = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
// Log writer channel with batching and fsync
|
||||||
|
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000);
|
||||||
|
|
||||||
|
// Spawn log writer task
|
||||||
|
let log_writer_handle = spawn_log_writer(
|
||||||
|
log_path,
|
||||||
|
log_rx,
|
||||||
|
Arc::clone(&processed_count),
|
||||||
|
total_companies,
|
||||||
|
FSYNC_BATCH_SIZE,
|
||||||
|
FSYNC_INTERVAL_SECS,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Process companies concurrently with task panic isolation
|
||||||
|
let mut tasks = FuturesUnordered::new();
|
||||||
|
let mut pending_iter = pending_companies.into_iter();
|
||||||
|
let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT));
|
||||||
|
|
||||||
|
// Initial batch of tasks
|
||||||
|
for _ in 0..CONCURRENCY_LIMIT.min(pending_count) {
|
||||||
|
if let Some(company) = pending_iter.next() {
|
||||||
|
let task = spawn_enrichment_task(
|
||||||
|
company,
|
||||||
|
Arc::clone(&yahoo_pool),
|
||||||
|
paths.clone(),
|
||||||
|
Arc::clone(&processed_count),
|
||||||
|
Arc::clone(&success_count),
|
||||||
|
Arc::clone(&failed_count),
|
||||||
|
log_tx.clone(),
|
||||||
|
Arc::clone(&semaphore),
|
||||||
|
Arc::clone(shutdown_flag),
|
||||||
|
Arc::clone(&enrichment_fn),
|
||||||
|
);
|
||||||
|
tasks.push(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process tasks as they complete and spawn new ones
|
||||||
|
let mut checkpoint_counter = 0;
|
||||||
|
while let Some(_result) = tasks.next().await {
|
||||||
|
// Check for shutdown
|
||||||
|
if shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
logger::log_warn("Shutdown signal received, stopping chart enrichment").await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checkpoint periodically
|
||||||
|
checkpoint_counter += 1;
|
||||||
|
if checkpoint_counter % CHECKPOINT_INTERVAL == 0 {
|
||||||
|
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn next task if available
|
||||||
|
if let Some(company) = pending_iter.next() {
|
||||||
|
let task = spawn_enrichment_task(
|
||||||
|
company,
|
||||||
|
Arc::clone(&yahoo_pool),
|
||||||
|
paths.clone(),
|
||||||
|
Arc::clone(&processed_count),
|
||||||
|
Arc::clone(&success_count),
|
||||||
|
Arc::clone(&failed_count),
|
||||||
|
log_tx.clone(),
|
||||||
|
Arc::clone(&semaphore),
|
||||||
|
Arc::clone(shutdown_flag),
|
||||||
|
Arc::clone(&enrichment_fn),
|
||||||
|
);
|
||||||
|
tasks.push(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Signal log writer to shutdown
|
||||||
|
let _ = log_tx.send(LogCommand::Shutdown).await;
|
||||||
|
drop(log_tx);
|
||||||
|
|
||||||
|
// Wait for log writer to finish
|
||||||
|
let _ = log_writer_handle.await;
|
||||||
|
|
||||||
|
let final_processed = processed_count.load(Ordering::SeqCst);
|
||||||
|
let final_success = success_count.load(Ordering::SeqCst);
|
||||||
|
let final_failed = failed_count.load(Ordering::SeqCst);
|
||||||
|
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" Chart enrichment summary: {} total, {} success, {} failed",
|
||||||
|
final_processed, final_success, final_failed
|
||||||
|
)).await;
|
||||||
|
|
||||||
|
// Mark as complete if all companies processed
|
||||||
|
if final_processed >= total_companies && !shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
track_chart_completion(&manager, paths, step_name).await?;
|
||||||
|
logger::log_info(" ✓ Chart enrichment marked as complete with integrity tracking").await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(final_success)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Track chart enrichment completion with content hash verification
|
||||||
|
async fn track_chart_completion(
|
||||||
|
manager: &StateManager,
|
||||||
|
paths: &DataPaths,
|
||||||
|
step_name: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// Create content reference for all chart data
|
||||||
|
// This will hash ALL files matching the pattern: {company}/chart/data.jsonl
|
||||||
|
let content_reference = directory_reference(
|
||||||
|
paths.corporate_dir(),
|
||||||
|
Some(vec![
|
||||||
|
"*/chart/*.jsonl".to_string(), // Main pattern for chart data
|
||||||
|
"*/chart/data.jsonl".to_string(), // Specific pattern (more precise)
|
||||||
|
]),
|
||||||
|
Some(vec![
|
||||||
|
"*.log".to_string(), // Exclude log files
|
||||||
|
"*.tmp".to_string(), // Exclude temp files
|
||||||
|
"*.bak".to_string(), // Exclude backup files
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Track completion with:
|
||||||
|
// - Content reference: All chart directories
|
||||||
|
// - Data stage: Data (7-day TTL by default)
|
||||||
|
// - Dependencies: Depends on cleaned companies data
|
||||||
|
manager.update_entry(
|
||||||
|
step_name.to_string(),
|
||||||
|
content_reference,
|
||||||
|
DataStage::Data,
|
||||||
|
None, // Use default TTL (7 days for Data stage)
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enrich a single company with chart data
|
||||||
|
async fn enrich_company_with_chart(
|
||||||
|
company: &CompanyCrossPlatformInfo,
|
||||||
|
yahoo_pool: &Arc<YahooClientPool>,
|
||||||
|
paths: &DataPaths,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let ticker = match extract_first_yahoo_ticker(company) {
|
||||||
|
Some(t) => t,
|
||||||
|
None => {
|
||||||
|
return Err(anyhow::anyhow!("No valid Yahoo ticker found"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get 1 year of daily chart data
|
||||||
|
let now = chrono::Utc::now().timestamp();
|
||||||
|
let start = chrono::Utc
|
||||||
|
.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
|
||||||
|
.unwrap()
|
||||||
|
.timestamp();
|
||||||
|
|
||||||
|
let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", start, now).await?;
|
||||||
|
|
||||||
|
// Only save if we got meaningful data
|
||||||
|
if chart_data.quotes.is_empty() {
|
||||||
|
return Err(anyhow::anyhow!("No chart data available"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the chart data
|
||||||
|
save_company_data(paths, &company.name, &chart_data, "chart").await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Save data to company directory (generic version)
|
||||||
|
async fn save_company_data<T: serde::Serialize>(
|
||||||
|
paths: &DataPaths,
|
||||||
|
company_name: &str,
|
||||||
|
data: &T,
|
||||||
|
data_type: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
use tokio::fs;
|
||||||
|
|
||||||
|
let safe_name = sanitize_company_name(company_name);
|
||||||
|
|
||||||
|
let company_dir = paths.corporate_dir().join(&safe_name).join(data_type);
|
||||||
|
fs::create_dir_all(&company_dir).await?;
|
||||||
|
|
||||||
|
let data_path = company_dir.join("data.jsonl");
|
||||||
|
let json_line = serde_json::to_string(data)?;
|
||||||
|
|
||||||
|
let mut file = fs::File::create(&data_path).await?;
|
||||||
|
file.write_all(json_line.as_bytes()).await?;
|
||||||
|
file.write_all(b"\n").await?;
|
||||||
|
file.flush().await?;
|
||||||
|
file.sync_all().await?; // Ensure data is persisted
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// GENERIC SHARED FUNCTIONS
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/// Spawn log writer task (shared across all enrichment types)
|
||||||
|
fn spawn_log_writer(
|
||||||
|
log_path: std::path::PathBuf,
|
||||||
|
mut log_rx: mpsc::Receiver<LogCommand>,
|
||||||
|
processed_count: Arc<AtomicUsize>,
|
||||||
|
total_companies: usize,
|
||||||
|
fsync_batch_size: usize,
|
||||||
|
fsync_interval_secs: u64,
|
||||||
|
) -> tokio::task::JoinHandle<()> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut log_file = OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(&log_path)
|
||||||
|
.await
|
||||||
|
.expect("Failed to open log file");
|
||||||
|
|
||||||
|
let mut write_count = 0;
|
||||||
|
let mut last_fsync = tokio::time::Instant::now();
|
||||||
|
|
||||||
|
while let Some(cmd) = log_rx.recv().await {
|
||||||
|
match cmd {
|
||||||
|
LogCommand::Write(entry) => {
|
||||||
|
let json_line = serde_json::to_string(&entry).expect("Serialization failed");
|
||||||
|
log_file.write_all(json_line.as_bytes()).await.expect("Write failed");
|
||||||
|
log_file.write_all(b"\n").await.expect("Write failed");
|
||||||
|
|
||||||
|
write_count += 1;
|
||||||
|
|
||||||
|
// Batched fsync
|
||||||
|
if write_count >= fsync_batch_size
|
||||||
|
|| last_fsync.elapsed().as_secs() >= fsync_interval_secs
|
||||||
|
{
|
||||||
|
log_file.flush().await.expect("Flush failed");
|
||||||
|
log_file.sync_all().await.expect("Fsync failed");
|
||||||
|
write_count = 0;
|
||||||
|
last_fsync = tokio::time::Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LogCommand::Checkpoint => {
|
||||||
|
// Force fsync on checkpoint
|
||||||
|
log_file.flush().await.expect("Flush failed");
|
||||||
|
log_file.sync_all().await.expect("Fsync failed");
|
||||||
|
write_count = 0;
|
||||||
|
last_fsync = tokio::time::Instant::now();
|
||||||
|
|
||||||
|
let current = processed_count.load(Ordering::SeqCst);
|
||||||
|
logger::log_info(&format!(
|
||||||
|
" Checkpoint: {}/{} companies processed",
|
||||||
|
current, total_companies
|
||||||
|
)).await;
|
||||||
|
}
|
||||||
|
LogCommand::Shutdown => {
|
||||||
|
// Final fsync before shutdown
|
||||||
|
log_file.flush().await.expect("Flush failed");
|
||||||
|
log_file.sync_all().await.expect("Fsync failed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a single enrichment task with panic isolation (GENERIC VERSION)
|
||||||
|
///
|
||||||
|
/// This generic version accepts an enrichment function as a parameter,
|
||||||
|
/// allowing it to be reused for events, options, charts, or any other enrichment type.
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
/// - `company`: The company to enrich
|
||||||
|
/// - `yahoo_pool`: Yahoo API client pool
|
||||||
|
/// - `paths`: Data paths
|
||||||
|
/// - `processed_count`: Counter for processed companies
|
||||||
|
/// - `success_count`: Counter for successful enrichments
|
||||||
|
/// - `failed_count`: Counter for failed enrichments
|
||||||
|
/// - `log_tx`: Channel to send log commands
|
||||||
|
/// - `semaphore`: Semaphore for concurrency control
|
||||||
|
/// - `shutdown_flag`: Flag to signal shutdown
|
||||||
|
/// - `enrichment_fn`: The specific enrichment function to call (events, option, chart, etc.)
|
||||||
|
fn spawn_enrichment_task(
|
||||||
|
company: CompanyCrossPlatformInfo,
|
||||||
|
yahoo_pool: Arc<YahooClientPool>,
|
||||||
|
paths: DataPaths,
|
||||||
|
processed_count: Arc<AtomicUsize>,
|
||||||
|
success_count: Arc<AtomicUsize>,
|
||||||
|
failed_count: Arc<AtomicUsize>,
|
||||||
|
log_tx: mpsc::Sender<LogCommand>,
|
||||||
|
semaphore: Arc<tokio::sync::Semaphore>,
|
||||||
|
shutdown_flag: Arc<AtomicBool>,
|
||||||
|
enrichment_fn: EnrichmentFn,
|
||||||
|
) -> tokio::task::JoinHandle<()> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Acquire semaphore permit
|
||||||
|
let _permit = semaphore.acquire().await.expect("Semaphore closed");
|
||||||
|
|
||||||
|
// Check shutdown before processing
|
||||||
|
if shutdown_flag.load(Ordering::SeqCst) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the enrichment function (this is where the type-specific logic happens)
|
||||||
|
let result = enrichment_fn(company.clone(), Arc::clone(&yahoo_pool), paths).await;
|
||||||
|
|
||||||
|
// Update counters
|
||||||
|
processed_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
|
||||||
|
let status = match result {
|
||||||
|
Ok(_) => {
|
||||||
|
success_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
"enriched"
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
failed_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
logger::log_warn(&format!(
|
||||||
|
" Failed to enrich {}: {}",
|
||||||
|
company.name, e
|
||||||
|
)).await;
|
||||||
|
"failed"
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Log result
|
||||||
|
let log_entry = json!({
|
||||||
|
"company_name": company.name,
|
||||||
|
"status": status,
|
||||||
|
"timestamp": Utc::now().to_rfc3339(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ = log_tx.send(LogCommand::Write(log_entry)).await;
|
||||||
|
})
|
||||||
}
|
}
|
||||||
@@ -1,739 +0,0 @@
|
|||||||
// src/corporate/update_companies_enrich_option_chart.rs
|
|
||||||
use super::{types::*, helpers::*};
|
|
||||||
use crate::config::Config;
|
|
||||||
use crate::corporate::checkpoint_helpers;
|
|
||||||
use crate::util::directories::DataPaths;
|
|
||||||
use crate::util::integrity::{DataStage, StateManager, directory_reference};
|
|
||||||
use crate::util::logger;
|
|
||||||
use crate::scraper::yahoo::{YahooClientPool};
|
|
||||||
|
|
||||||
use std::result::Result::Ok;
|
|
||||||
use chrono::{TimeZone, Utc};
|
|
||||||
use std::collections::{HashSet};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|
||||||
use tokio::fs::{OpenOptions};
|
|
||||||
use tokio::io::{AsyncWriteExt};
|
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
|
||||||
use serde_json::json;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
/// Yahoo Option enrichment per corporate company
|
|
||||||
///
|
|
||||||
/// # Features
|
|
||||||
/// - Graceful shutdown (abort-safe)
|
|
||||||
/// - Task panic isolation (tasks fail independently)
|
|
||||||
/// - Crash-safe persistence (checkpoint + log with fsync)
|
|
||||||
/// - Smart skip logic (only process incomplete data)
|
|
||||||
/// - Uses pending queue instead of retry mechanism
|
|
||||||
/// - Content integrity validation with hash tracking
|
|
||||||
///
|
|
||||||
/// # Persistence Strategy
|
|
||||||
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
|
|
||||||
/// - Log: companies_option_updates.log (append-only updates)
|
|
||||||
/// - On restart: Load checkpoint + replay log
|
|
||||||
/// - Periodic checkpoints (every 50 companies)
|
|
||||||
/// - Batched fsync (every 10 writes or 10 seconds)
|
|
||||||
/// - Hash validation of all option data directories
|
|
||||||
pub async fn enrich_companies_with_option(
|
|
||||||
paths: &DataPaths,
|
|
||||||
_config: &Config,
|
|
||||||
yahoo_pool: Arc<YahooClientPool>,
|
|
||||||
shutdown_flag: &Arc<AtomicBool>,
|
|
||||||
) -> anyhow::Result<usize> {
|
|
||||||
// Configuration constants
|
|
||||||
const CHECKPOINT_INTERVAL: usize = 50;
|
|
||||||
const FSYNC_BATCH_SIZE: usize = 10;
|
|
||||||
const FSYNC_INTERVAL_SECS: u64 = 10;
|
|
||||||
const CONCURRENCY_LIMIT: usize = 50;
|
|
||||||
|
|
||||||
let data_path = paths.data_dir();
|
|
||||||
|
|
||||||
// File paths
|
|
||||||
let input_path = data_path.join("companies_yahoo_cleaned.jsonl");
|
|
||||||
let log_path = data_path.join("companies_option_updates.log");
|
|
||||||
let state_path = data_path.join("state.jsonl");
|
|
||||||
|
|
||||||
// Check input exists
|
|
||||||
if !input_path.exists() {
|
|
||||||
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping option enrichment").await;
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
|
|
||||||
let step_name = "yahoo_option_enrichment_complete";
|
|
||||||
|
|
||||||
if manager.is_step_valid(step_name).await? {
|
|
||||||
logger::log_info(" Yahoo option enrichment already completed and valid").await;
|
|
||||||
let count = checkpoint_helpers::count_enriched_companies(paths, "option").await?;
|
|
||||||
logger::log_info(&format!(" ✓ Found {} companies with valid option data", count)).await;
|
|
||||||
return Ok(count);
|
|
||||||
}
|
|
||||||
|
|
||||||
logger::log_info(" Option data needs refresh - starting enrichment").await;
|
|
||||||
|
|
||||||
// === RECOVERY PHASE: Track enriched companies ===
|
|
||||||
let enriched_companies = checkpoint_helpers::load_enrichment_progress(&log_path).await?;
|
|
||||||
|
|
||||||
// Load all companies from input
|
|
||||||
logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await;
|
|
||||||
let companies = load_companies_from_jsonl(&input_path).await?;
|
|
||||||
let total_companies = companies.len();
|
|
||||||
logger::log_info(&format!("Found {} companies to process", total_companies)).await;
|
|
||||||
|
|
||||||
// Filter companies that need enrichment
|
|
||||||
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies
|
|
||||||
.into_iter()
|
|
||||||
.filter(|company| !enriched_companies.contains(&company.name))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let pending_count = pending_companies.len();
|
|
||||||
logger::log_info(&format!(
|
|
||||||
" {} already enriched, {} pending",
|
|
||||||
enriched_companies.len(),
|
|
||||||
pending_count
|
|
||||||
)).await;
|
|
||||||
|
|
||||||
if pending_count == 0 {
|
|
||||||
logger::log_info(" ✓ All companies already enriched").await;
|
|
||||||
track_option_completion(&manager, paths, step_name).await?;
|
|
||||||
return Ok(enriched_companies.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
// === PROCESSING PHASE: Enrich companies with option ===
|
|
||||||
|
|
||||||
// Shared counters
|
|
||||||
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
|
||||||
let success_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
|
||||||
let failed_count = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
// Log writer channel with batching and fsync
|
|
||||||
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000);
|
|
||||||
|
|
||||||
// Spawn log writer task
|
|
||||||
let log_writer_handle = {
|
|
||||||
let log_path = log_path.clone();
|
|
||||||
let processed_count = Arc::clone(&processed_count);
|
|
||||||
let total_companies = total_companies;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut log_file = OpenOptions::new()
|
|
||||||
.create(true)
|
|
||||||
.append(true)
|
|
||||||
.open(&log_path)
|
|
||||||
.await
|
|
||||||
.expect("Failed to open log file");
|
|
||||||
|
|
||||||
let mut write_count = 0;
|
|
||||||
let mut last_fsync = tokio::time::Instant::now();
|
|
||||||
|
|
||||||
while let Some(cmd) = log_rx.recv().await {
|
|
||||||
match cmd {
|
|
||||||
LogCommand::Write(entry) => {
|
|
||||||
let json_line = serde_json::to_string(&entry).expect("Serialization failed");
|
|
||||||
log_file.write_all(json_line.as_bytes()).await.expect("Write failed");
|
|
||||||
log_file.write_all(b"\n").await.expect("Write failed");
|
|
||||||
|
|
||||||
write_count += 1;
|
|
||||||
|
|
||||||
// Batched fsync
|
|
||||||
if write_count >= FSYNC_BATCH_SIZE
|
|
||||||
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS
|
|
||||||
{
|
|
||||||
log_file.flush().await.expect("Flush failed");
|
|
||||||
log_file.sync_all().await.expect("Fsync failed");
|
|
||||||
write_count = 0;
|
|
||||||
last_fsync = tokio::time::Instant::now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LogCommand::Checkpoint => {
|
|
||||||
// Force fsync on checkpoint
|
|
||||||
log_file.flush().await.expect("Flush failed");
|
|
||||||
log_file.sync_all().await.expect("Fsync failed");
|
|
||||||
write_count = 0;
|
|
||||||
last_fsync = tokio::time::Instant::now();
|
|
||||||
|
|
||||||
let current = processed_count.load(Ordering::SeqCst);
|
|
||||||
logger::log_info(&format!(
|
|
||||||
" Checkpoint: {}/{} companies processed",
|
|
||||||
current, total_companies
|
|
||||||
)).await;
|
|
||||||
}
|
|
||||||
LogCommand::Shutdown => {
|
|
||||||
// Final fsync before shutdown
|
|
||||||
log_file.flush().await.expect("Flush failed");
|
|
||||||
log_file.sync_all().await.expect("Fsync failed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
// Process companies concurrently with task panic isolation
|
|
||||||
let mut tasks = FuturesUnordered::new();
|
|
||||||
let mut pending_iter = pending_companies.into_iter();
|
|
||||||
let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT));
|
|
||||||
|
|
||||||
// Initial batch of tasks
|
|
||||||
for _ in 0..CONCURRENCY_LIMIT.min(pending_count) {
|
|
||||||
if let Some(company) = pending_iter.next() {
|
|
||||||
let task = spawn_enrichment_task(
|
|
||||||
company,
|
|
||||||
Arc::clone(&yahoo_pool),
|
|
||||||
paths.clone(),
|
|
||||||
Arc::clone(&processed_count),
|
|
||||||
Arc::clone(&success_count),
|
|
||||||
Arc::clone(&failed_count),
|
|
||||||
log_tx.clone(),
|
|
||||||
Arc::clone(&semaphore),
|
|
||||||
Arc::clone(shutdown_flag),
|
|
||||||
EnrichmentType::Option,
|
|
||||||
);
|
|
||||||
tasks.push(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process tasks as they complete and spawn new ones
|
|
||||||
let mut checkpoint_counter = 0;
|
|
||||||
while let Some(_result) = tasks.next().await {
|
|
||||||
// Check for shutdown
|
|
||||||
if shutdown_flag.load(Ordering::SeqCst) {
|
|
||||||
logger::log_warn("Shutdown signal received, stopping option enrichment").await;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Checkpoint periodically
|
|
||||||
checkpoint_counter += 1;
|
|
||||||
if checkpoint_counter % CHECKPOINT_INTERVAL == 0 {
|
|
||||||
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn next task if available
|
|
||||||
if let Some(company) = pending_iter.next() {
|
|
||||||
let task = spawn_enrichment_task(
|
|
||||||
company,
|
|
||||||
Arc::clone(&yahoo_pool),
|
|
||||||
paths.clone(),
|
|
||||||
Arc::clone(&processed_count),
|
|
||||||
Arc::clone(&success_count),
|
|
||||||
Arc::clone(&failed_count),
|
|
||||||
log_tx.clone(),
|
|
||||||
Arc::clone(&semaphore),
|
|
||||||
Arc::clone(shutdown_flag),
|
|
||||||
EnrichmentType::Option,
|
|
||||||
);
|
|
||||||
tasks.push(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Final checkpoint and shutdown
|
|
||||||
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
|
||||||
let _ = log_tx.send(LogCommand::Shutdown).await;
|
|
||||||
|
|
||||||
// Wait for log writer to finish
|
|
||||||
let _ = log_writer_handle.await;
|
|
||||||
|
|
||||||
let final_success = success_count.load(Ordering::SeqCst);
|
|
||||||
let final_failed = failed_count.load(Ordering::SeqCst);
|
|
||||||
|
|
||||||
logger::log_info(&format!(
|
|
||||||
" Option enrichment: {} succeeded, {} failed",
|
|
||||||
final_success, final_failed
|
|
||||||
)).await;
|
|
||||||
|
|
||||||
// Mark as complete if no shutdown
|
|
||||||
if !shutdown_flag.load(Ordering::SeqCst) {
|
|
||||||
track_option_completion(&manager, paths, step_name).await?;
|
|
||||||
logger::log_info(" ✓ Option enrichment marked as complete with integrity tracking").await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(final_success)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Yahoo Chart enrichment per corporate company
|
|
||||||
///
|
|
||||||
/// # Features
|
|
||||||
/// - Graceful shutdown (abort-safe)
|
|
||||||
/// - Task panic isolation (tasks fail independently)
|
|
||||||
/// - Crash-safe persistence (checkpoint + log with fsync)
|
|
||||||
/// - Smart skip logic (only process incomplete data)
|
|
||||||
/// - Uses pending queue instead of retry mechanism
|
|
||||||
///
|
|
||||||
/// # Persistence Strategy
|
|
||||||
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
|
|
||||||
/// - Log: companies_chart_updates.log (append-only updates)
|
|
||||||
/// - On restart: Load checkpoint + replay log
|
|
||||||
/// - Periodic checkpoints (every 50 companies)
|
|
||||||
/// - Batched fsync (every 10 writes or 10 seconds)
|
|
||||||
pub async fn enrich_companies_with_chart(
|
|
||||||
paths: &DataPaths,
|
|
||||||
_config: &Config,
|
|
||||||
yahoo_pool: Arc<YahooClientPool>,
|
|
||||||
shutdown_flag: &Arc<AtomicBool>,
|
|
||||||
) -> anyhow::Result<usize> {
|
|
||||||
// Configuration constants
|
|
||||||
const CHECKPOINT_INTERVAL: usize = 50;
|
|
||||||
const FSYNC_BATCH_SIZE: usize = 10;
|
|
||||||
const FSYNC_INTERVAL_SECS: u64 = 10;
|
|
||||||
const CONCURRENCY_LIMIT: usize = 50;
|
|
||||||
|
|
||||||
let data_path = paths.data_dir();
|
|
||||||
|
|
||||||
// File paths
|
|
||||||
let input_path = data_path.join("companies_yahoo_cleaned.jsonl");
|
|
||||||
let log_path = data_path.join("companies_chart_updates.log");
|
|
||||||
let state_path = data_path.join("state.jsonl");
|
|
||||||
|
|
||||||
// Check input exists
|
|
||||||
if !input_path.exists() {
|
|
||||||
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping chart enrichment").await;
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
|
|
||||||
let step_name = "yahoo_chart_enrichment_complete";
|
|
||||||
|
|
||||||
if manager.is_step_valid(step_name).await? {
|
|
||||||
logger::log_info(" Yahoo chart enrichment already completed and valid").await;
|
|
||||||
let count = checkpoint_helpers::count_enriched_companies(paths, "chart").await?;
|
|
||||||
logger::log_info(&format!(" ✓ Found {} companies with valid chart data", count)).await;
|
|
||||||
return Ok(count);
|
|
||||||
}
|
|
||||||
|
|
||||||
logger::log_info(" Chart data needs refresh - starting enrichment").await;
|
|
||||||
|
|
||||||
// === RECOVERY PHASE: Track enriched companies ===
|
|
||||||
let mut enriched_companies: HashSet<String> = HashSet::new();
|
|
||||||
|
|
||||||
if log_path.exists() {
|
|
||||||
logger::log_info("Loading chart enrichment progress from log...").await;
|
|
||||||
let log_content = tokio::fs::read_to_string(&log_path).await?;
|
|
||||||
|
|
||||||
for line in log_content.lines() {
|
|
||||||
if line.trim().is_empty() || !line.ends_with('}') {
|
|
||||||
continue; // Skip incomplete lines
|
|
||||||
}
|
|
||||||
|
|
||||||
match serde_json::from_str::<serde_json::Value>(line) {
|
|
||||||
Ok(entry) => {
|
|
||||||
if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) {
|
|
||||||
if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") {
|
|
||||||
enriched_companies.insert(name.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
logger::log_warn(&format!("Skipping invalid log line: {}", e)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load all companies from input
|
|
||||||
logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await;
|
|
||||||
let companies = load_companies_from_jsonl(&input_path).await?;
|
|
||||||
let total_companies = companies.len();
|
|
||||||
logger::log_info(&format!("Found {} companies to process", total_companies)).await;
|
|
||||||
|
|
||||||
// Filter companies that need enrichment
|
|
||||||
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies
|
|
||||||
.into_iter()
|
|
||||||
.filter(|company| !enriched_companies.contains(&company.name))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let pending_count = pending_companies.len();
|
|
||||||
logger::log_info(&format!(
|
|
||||||
" {} already enriched, {} pending",
|
|
||||||
enriched_companies.len(),
|
|
||||||
pending_count
|
|
||||||
)).await;
|
|
||||||
|
|
||||||
if pending_count == 0 {
|
|
||||||
logger::log_info(" ✓ All companies already enriched").await;
|
|
||||||
track_chart_completion(&manager, paths, step_name).await?;
|
|
||||||
return Ok(enriched_companies.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
// === PROCESSING PHASE: Enrich companies with chart ===
|
|
||||||
|
|
||||||
// Shared counters
|
|
||||||
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
|
||||||
let success_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
|
||||||
let failed_count = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
// Log writer channel with batching and fsync
|
|
||||||
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000);
|
|
||||||
|
|
||||||
// Spawn log writer task
|
|
||||||
let log_writer_handle = {
|
|
||||||
let log_path = log_path.clone();
|
|
||||||
let processed_count = Arc::clone(&processed_count);
|
|
||||||
let total_companies = total_companies;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut log_file = OpenOptions::new()
|
|
||||||
.create(true)
|
|
||||||
.append(true)
|
|
||||||
.open(&log_path)
|
|
||||||
.await
|
|
||||||
.expect("Failed to open log file");
|
|
||||||
|
|
||||||
let mut write_count = 0;
|
|
||||||
let mut last_fsync = tokio::time::Instant::now();
|
|
||||||
|
|
||||||
while let Some(cmd) = log_rx.recv().await {
|
|
||||||
match cmd {
|
|
||||||
LogCommand::Write(entry) => {
|
|
||||||
let json_line = serde_json::to_string(&entry).expect("Serialization failed");
|
|
||||||
log_file.write_all(json_line.as_bytes()).await.expect("Write failed");
|
|
||||||
log_file.write_all(b"\n").await.expect("Write failed");
|
|
||||||
|
|
||||||
write_count += 1;
|
|
||||||
|
|
||||||
// Batched fsync
|
|
||||||
if write_count >= FSYNC_BATCH_SIZE
|
|
||||||
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS
|
|
||||||
{
|
|
||||||
log_file.flush().await.expect("Flush failed");
|
|
||||||
log_file.sync_all().await.expect("Fsync failed");
|
|
||||||
write_count = 0;
|
|
||||||
last_fsync = tokio::time::Instant::now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LogCommand::Checkpoint => {
|
|
||||||
// Force fsync on checkpoint
|
|
||||||
log_file.flush().await.expect("Flush failed");
|
|
||||||
log_file.sync_all().await.expect("Fsync failed");
|
|
||||||
write_count = 0;
|
|
||||||
last_fsync = tokio::time::Instant::now();
|
|
||||||
|
|
||||||
let current = processed_count.load(Ordering::SeqCst);
|
|
||||||
logger::log_info(&format!(
|
|
||||||
" Checkpoint: {}/{} companies processed",
|
|
||||||
current, total_companies
|
|
||||||
)).await;
|
|
||||||
}
|
|
||||||
LogCommand::Shutdown => {
|
|
||||||
// Final fsync before shutdown
|
|
||||||
log_file.flush().await.expect("Flush failed");
|
|
||||||
log_file.sync_all().await.expect("Fsync failed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
// Process companies concurrently with task panic isolation
|
|
||||||
let mut tasks = FuturesUnordered::new();
|
|
||||||
let mut pending_iter = pending_companies.into_iter();
|
|
||||||
let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT));
|
|
||||||
|
|
||||||
// Initial batch of tasks
|
|
||||||
for _ in 0..CONCURRENCY_LIMIT.min(pending_count) {
|
|
||||||
if let Some(company) = pending_iter.next() {
|
|
||||||
let task = spawn_enrichment_task(
|
|
||||||
company,
|
|
||||||
Arc::clone(&yahoo_pool),
|
|
||||||
paths.clone(),
|
|
||||||
Arc::clone(&processed_count),
|
|
||||||
Arc::clone(&success_count),
|
|
||||||
Arc::clone(&failed_count),
|
|
||||||
log_tx.clone(),
|
|
||||||
Arc::clone(&semaphore),
|
|
||||||
Arc::clone(shutdown_flag),
|
|
||||||
EnrichmentType::Chart,
|
|
||||||
);
|
|
||||||
tasks.push(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process tasks as they complete and spawn new ones
|
|
||||||
let mut checkpoint_counter = 0;
|
|
||||||
while let Some(_result) = tasks.next().await {
|
|
||||||
// Check for shutdown
|
|
||||||
if shutdown_flag.load(Ordering::SeqCst) {
|
|
||||||
logger::log_warn("Shutdown signal received, stopping chart enrichment").await;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Checkpoint periodically
|
|
||||||
checkpoint_counter += 1;
|
|
||||||
if checkpoint_counter % CHECKPOINT_INTERVAL == 0 {
|
|
||||||
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn next task if available
|
|
||||||
if let Some(company) = pending_iter.next() {
|
|
||||||
let task = spawn_enrichment_task(
|
|
||||||
company,
|
|
||||||
Arc::clone(&yahoo_pool),
|
|
||||||
paths.clone(),
|
|
||||||
Arc::clone(&processed_count),
|
|
||||||
Arc::clone(&success_count),
|
|
||||||
Arc::clone(&failed_count),
|
|
||||||
log_tx.clone(),
|
|
||||||
Arc::clone(&semaphore),
|
|
||||||
Arc::clone(shutdown_flag),
|
|
||||||
EnrichmentType::Chart,
|
|
||||||
);
|
|
||||||
tasks.push(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Final checkpoint and shutdown
|
|
||||||
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
|
||||||
let _ = log_tx.send(LogCommand::Shutdown).await;
|
|
||||||
|
|
||||||
// Wait for log writer to finish
|
|
||||||
let _ = log_writer_handle.await;
|
|
||||||
|
|
||||||
let final_success = success_count.load(Ordering::SeqCst);
|
|
||||||
let final_failed = failed_count.load(Ordering::SeqCst);
|
|
||||||
|
|
||||||
logger::log_info(&format!(
|
|
||||||
" Chart enrichment: {} succeeded, {} failed",
|
|
||||||
final_success, final_failed
|
|
||||||
)).await;
|
|
||||||
|
|
||||||
// Mark as complete if no shutdown
|
|
||||||
if !shutdown_flag.load(Ordering::SeqCst) {
|
|
||||||
track_chart_completion(&manager, paths, step_name).await?;
|
|
||||||
logger::log_info(" ✓ Chart enrichment marked as complete with integrity tracking").await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(final_success)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Type of enrichment being performed
|
|
||||||
#[derive(Clone, Copy)]
|
|
||||||
enum EnrichmentType {
|
|
||||||
Option,
|
|
||||||
Chart,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawn an enrichment task with panic isolation
|
|
||||||
fn spawn_enrichment_task(
|
|
||||||
company: CompanyCrossPlatformInfo,
|
|
||||||
yahoo_pool: Arc<YahooClientPool>,
|
|
||||||
paths: DataPaths,
|
|
||||||
processed_count: Arc<AtomicUsize>,
|
|
||||||
success_count: Arc<AtomicUsize>,
|
|
||||||
failed_count: Arc<AtomicUsize>,
|
|
||||||
log_tx: mpsc::Sender<LogCommand>,
|
|
||||||
semaphore: Arc<tokio::sync::Semaphore>,
|
|
||||||
shutdown_flag: Arc<AtomicBool>,
|
|
||||||
enrichment_type: EnrichmentType,
|
|
||||||
) -> tokio::task::JoinHandle<()> {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
// Acquire semaphore permit
|
|
||||||
let _permit = semaphore.acquire().await.expect("Semaphore closed");
|
|
||||||
|
|
||||||
// Check shutdown before processing
|
|
||||||
if shutdown_flag.load(Ordering::SeqCst) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perform enrichment (panic-isolated)
|
|
||||||
let result = match enrichment_type {
|
|
||||||
EnrichmentType::Option => {
|
|
||||||
enrich_company_with_option(&company, &yahoo_pool, &paths).await
|
|
||||||
}
|
|
||||||
EnrichmentType::Chart => {
|
|
||||||
enrich_company_with_chart(&company, &yahoo_pool, &paths).await
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Update counters
|
|
||||||
processed_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
|
|
||||||
let status = match result {
|
|
||||||
Ok(_) => {
|
|
||||||
success_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
"enriched"
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
failed_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
logger::log_warn(&format!(
|
|
||||||
" Failed to enrich {}: {}",
|
|
||||||
company.name, e
|
|
||||||
)).await;
|
|
||||||
"failed"
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Log result
|
|
||||||
let log_entry = json!({
|
|
||||||
"company_name": company.name,
|
|
||||||
"status": status,
|
|
||||||
"timestamp": Utc::now().to_rfc3339(),
|
|
||||||
});
|
|
||||||
|
|
||||||
let _ = log_tx.send(LogCommand::Write(log_entry)).await;
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Enrich a single company with option data
|
|
||||||
async fn enrich_company_with_option(
|
|
||||||
company: &CompanyCrossPlatformInfo,
|
|
||||||
yahoo_pool: &Arc<YahooClientPool>,
|
|
||||||
paths: &DataPaths,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let ticker = match extract_first_yahoo_ticker(company) {
|
|
||||||
Some(t) => t,
|
|
||||||
None => {
|
|
||||||
return Err(anyhow::anyhow!("No valid Yahoo ticker found"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Get option data for all available expiration dates
|
|
||||||
let option_data = yahoo_pool.get_option_data(&ticker, None).await?;
|
|
||||||
|
|
||||||
// Only save if we got meaningful data
|
|
||||||
if option_data.option.is_empty() {
|
|
||||||
return Err(anyhow::anyhow!("No option data available"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save the option data
|
|
||||||
save_company_data(paths, &company.name, &option_data, "option").await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Enrich a single company with chart data
|
|
||||||
async fn enrich_company_with_chart(
|
|
||||||
company: &CompanyCrossPlatformInfo,
|
|
||||||
yahoo_pool: &Arc<YahooClientPool>,
|
|
||||||
paths: &DataPaths,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let ticker = match extract_first_yahoo_ticker(company) {
|
|
||||||
Some(t) => t,
|
|
||||||
None => {
|
|
||||||
return Err(anyhow::anyhow!("No valid Yahoo ticker found"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Get 1 year of daily chart data
|
|
||||||
let now = chrono::Utc::now().timestamp();
|
|
||||||
let start = chrono::Utc
|
|
||||||
.with_ymd_and_hms(2000, 1, 1, 0, 0, 0)
|
|
||||||
.unwrap()
|
|
||||||
.timestamp();
|
|
||||||
|
|
||||||
let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", start, now).await?;
|
|
||||||
|
|
||||||
// Only save if we got meaningful data
|
|
||||||
if chart_data.quotes.is_empty() {
|
|
||||||
return Err(anyhow::anyhow!("No chart data available"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save the chart data
|
|
||||||
save_company_data(paths, &company.name, &chart_data, "chart").await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Save data to company directory
|
|
||||||
async fn save_company_data<T: serde::Serialize>(
|
|
||||||
paths: &DataPaths,
|
|
||||||
company_name: &str,
|
|
||||||
data: &T,
|
|
||||||
data_type: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
use tokio::fs;
|
|
||||||
|
|
||||||
let safe_name = sanitize_company_name(company_name);
|
|
||||||
|
|
||||||
let company_dir = paths.corporate_dir().join(&safe_name).join(data_type);
|
|
||||||
fs::create_dir_all(&company_dir).await?;
|
|
||||||
|
|
||||||
let data_path = company_dir.join("data.jsonl");
|
|
||||||
let json_line = serde_json::to_string(data)?;
|
|
||||||
|
|
||||||
let mut file = fs::File::create(&data_path).await?;
|
|
||||||
file.write_all(json_line.as_bytes()).await?;
|
|
||||||
file.write_all(b"\n").await?;
|
|
||||||
file.flush().await?;
|
|
||||||
file.sync_all().await?; // Ensure data is persisted
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Log command enum
|
|
||||||
enum LogCommand {
|
|
||||||
Write(serde_json::Value),
|
|
||||||
Checkpoint,
|
|
||||||
Shutdown,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Track option enrichment completion with content hash verification
|
|
||||||
async fn track_option_completion(
|
|
||||||
manager: &StateManager,
|
|
||||||
paths: &DataPaths,
|
|
||||||
step_name: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
// Create content reference for all option data
|
|
||||||
// This will hash ALL files matching the pattern: {company}/option/data.jsonl
|
|
||||||
let content_reference = directory_reference(
|
|
||||||
paths.corporate_dir(),
|
|
||||||
Some(vec![
|
|
||||||
"*/option/*.jsonl".to_string(), // Main pattern for option data
|
|
||||||
"*/option/data.jsonl".to_string(), // Specific pattern (more precise)
|
|
||||||
]),
|
|
||||||
Some(vec![
|
|
||||||
"*.log".to_string(), // Exclude log files
|
|
||||||
"*.tmp".to_string(), // Exclude temp files
|
|
||||||
"*.bak".to_string(), // Exclude backup files
|
|
||||||
]),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Track completion with:
|
|
||||||
// - Content reference: All option directories
|
|
||||||
// - Data stage: Data (7-day TTL by default)
|
|
||||||
// - Dependencies: Depends on cleaned companies data
|
|
||||||
manager.update_entry(
|
|
||||||
step_name.to_string(),
|
|
||||||
content_reference,
|
|
||||||
DataStage::Data,
|
|
||||||
None, // Use default TTL (7 days for Data stage)
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Track chart enrichment completion with content hash verification
|
|
||||||
async fn track_chart_completion(
|
|
||||||
manager: &StateManager,
|
|
||||||
paths: &DataPaths,
|
|
||||||
step_name: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
// Create content reference for all chart data
|
|
||||||
// This will hash ALL files matching the pattern: {company}/chart/data.jsonl
|
|
||||||
let content_reference = directory_reference(
|
|
||||||
paths.corporate_dir(),
|
|
||||||
Some(vec![
|
|
||||||
"*/chart/*.jsonl".to_string(), // Main pattern for chart data
|
|
||||||
"*/chart/data.jsonl".to_string(), // Specific pattern (more precise)
|
|
||||||
]),
|
|
||||||
Some(vec![
|
|
||||||
"*.log".to_string(), // Exclude log files
|
|
||||||
"*.tmp".to_string(), // Exclude temp files
|
|
||||||
"*.bak".to_string(), // Exclude backup files
|
|
||||||
]),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Track completion with:
|
|
||||||
// - Content reference: All chart directories
|
|
||||||
// - Data stage: Data (7-day TTL by default)
|
|
||||||
// - Dependencies: Depends on cleaned companies data
|
|
||||||
manager.update_entry(
|
|
||||||
step_name.to_string(),
|
|
||||||
content_reference,
|
|
||||||
DataStage::Data,
|
|
||||||
None, // Use default TTL (7 days for Data stage)
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@@ -8,7 +8,30 @@ const EXTRACTION_JS: &str = include_str!("extraction_script.js");
|
|||||||
|
|
||||||
pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> {
|
pub async fn goto_and_prepare(client: &Client) -> anyhow::Result<()> {
|
||||||
client.goto("https://www.finanzen.net/termine/wirtschaftsdaten/").await?;
|
client.goto("https://www.finanzen.net/termine/wirtschaftsdaten/").await?;
|
||||||
|
dismiss_overlays(client).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn dismiss_overlays(client: &Client) -> anyhow::Result<()> {
|
||||||
|
for _ in 0..10 {
|
||||||
|
let removed: bool = client
|
||||||
|
.execute(
|
||||||
|
r#"(() => {
|
||||||
|
const iframe = document.querySelector('iframe[title="Contentpass First Layer"]');
|
||||||
|
if (iframe && iframe.parentNode) {
|
||||||
|
iframe.parentNode.removeChild(iframe);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
})()"#,
|
||||||
|
vec![],
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.as_bool()
|
||||||
|
.unwrap_or(false);
|
||||||
|
if removed { break; }
|
||||||
|
sleep(Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
30
src/main.rs
30
src/main.rs
@@ -252,12 +252,28 @@ async fn create_state_files(paths: &DataPaths) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn visualize_checkpoint_dependencies(paths: &DataPaths) -> Result<()> {
|
async fn visualize_checkpoint_dependencies(paths: &DataPaths) -> Result<()> {
|
||||||
let manager = StateManager::new(&paths.data_dir().join("state.jsonl"), &paths.data_dir().to_path_buf())?;
|
// Add more detailed error handling
|
||||||
manager.print_dependency_graph();
|
match StateManager::new(
|
||||||
let dot = manager.get_dependency_config().to_dot();
|
&paths.data_dir().join("state.jsonl"),
|
||||||
std::fs::write(paths.logs_dir().join("checkpoint_dependencies.dot"), dot)?;
|
&paths.data_dir().to_path_buf()
|
||||||
Ok(())
|
) {
|
||||||
|
Ok(manager) => {
|
||||||
|
logger::log_info("✓ Dependency configuration loaded successfully").await;
|
||||||
|
manager.print_dependency_graph();
|
||||||
|
|
||||||
|
let dot = manager.get_dependency_config().to_dot();
|
||||||
|
let dot_path = paths.logs_dir().join("checkpoint_dependencies.dot");
|
||||||
|
std::fs::write(&dot_path, dot)?;
|
||||||
|
|
||||||
|
logger::log_info(&format!("✓ DOT file written to: {}", dot_path.display())).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
logger::log_error(&format!("✗ Failed to load dependency config: {}", e)).await;
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
@@ -273,7 +289,7 @@ async fn main() -> Result<()> {
|
|||||||
start_docker_desktop().await;
|
start_docker_desktop().await;
|
||||||
cleanup_all_proxy_containers().await.ok();
|
cleanup_all_proxy_containers().await.ok();
|
||||||
create_state_files(&paths).await.ok();
|
create_state_files(&paths).await.ok();
|
||||||
visualize_checkpoint_dependencies(&paths).ok();
|
visualize_checkpoint_dependencies(&paths).await.ok();
|
||||||
|
|
||||||
let config = Config::load().unwrap_or_else(|_| {
|
let config = Config::load().unwrap_or_else(|_| {
|
||||||
eprintln!("Using default configuration");
|
eprintln!("Using default configuration");
|
||||||
|
|||||||
Reference in New Issue
Block a user