added options chart enrichment
This commit is contained in:
@@ -15,5 +15,6 @@ pub mod update;
|
||||
pub mod update_companies;
|
||||
pub mod update_companies_cleanse;
|
||||
pub mod update_companies_enrich;
|
||||
pub mod update_companies_enrich_options_chart;
|
||||
|
||||
pub use update::run_full_update;
|
||||
@@ -4,6 +4,7 @@ use crate::config::Config;
|
||||
use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel;
|
||||
use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data};
|
||||
use crate::corporate::update_companies_enrich::enrich_companies_with_events;
|
||||
use crate::corporate::update_companies_enrich_options_chart::{enrich_companies_with_options, enrich_companies_with_chart};
|
||||
use crate::util::directories::DataPaths;
|
||||
use crate::util::logger;
|
||||
use crate::scraper::webdriver::ChromeDriverPool;
|
||||
@@ -122,8 +123,26 @@ pub async fn run_full_update(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await;
|
||||
let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
|
||||
logger::log_info(&format!(" ✓ {} companies enriched with options data", options_count)).await;
|
||||
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_warn("Shutdown detected after options enrichment").await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
logger::log_info("Step 10: Enriching companies with Yahoo Chart (with abort-safe persistence)...").await;
|
||||
let chart_count = enrich_companies_with_chart(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
|
||||
logger::log_info(&format!(" ✓ {} companies enriched with chart data", chart_count)).await;
|
||||
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_warn("Shutdown detected after chart enrichment").await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_info("Step 9: Processing events (using index)...").await;
|
||||
logger::log_info("Step 11: Processing events (using index)...").await;
|
||||
let _event_index = build_event_index(&paths).await?;
|
||||
logger::log_info(" ✓ Event index built").await;
|
||||
} else {
|
||||
|
||||
798
src/corporate/update_companies_enrich_options_chart.rs
Normal file
798
src/corporate/update_companies_enrich_options_chart.rs
Normal file
@@ -0,0 +1,798 @@
|
||||
// src/corporate/update_companies_enrich_options_chart.rs
|
||||
use super::{types::*};
|
||||
use crate::config::Config;
|
||||
use crate::util::directories::DataPaths;
|
||||
use crate::util::logger;
|
||||
use crate::scraper::yahoo::{YahooClientPool};
|
||||
|
||||
use std::result::Result::Ok;
|
||||
use chrono::Utc;
|
||||
use std::collections::{HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::fs::{OpenOptions};
|
||||
use tokio::io::{AsyncWriteExt};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use serde_json::json;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Yahoo Options enrichment per corporate company
|
||||
///
|
||||
/// # Features
|
||||
/// - Graceful shutdown (abort-safe)
|
||||
/// - Task panic isolation (tasks fail independently)
|
||||
/// - Crash-safe persistence (checkpoint + log with fsync)
|
||||
/// - Smart skip logic (only process incomplete data)
|
||||
/// - Uses pending queue instead of retry mechanism
|
||||
///
|
||||
/// # Persistence Strategy
|
||||
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
|
||||
/// - Log: companies_options_updates.log (append-only updates)
|
||||
/// - On restart: Load checkpoint + replay log
|
||||
/// - Periodic checkpoints (every 50 companies)
|
||||
/// - Batched fsync (every 10 writes or 10 seconds)
|
||||
pub async fn enrich_companies_with_options(
|
||||
paths: &DataPaths,
|
||||
_config: &Config,
|
||||
yahoo_pool: Arc<YahooClientPool>,
|
||||
shutdown_flag: &Arc<AtomicBool>,
|
||||
) -> anyhow::Result<usize> {
|
||||
// Configuration constants
|
||||
const CHECKPOINT_INTERVAL: usize = 50;
|
||||
const FSYNC_BATCH_SIZE: usize = 10;
|
||||
const FSYNC_INTERVAL_SECS: u64 = 10;
|
||||
const CONCURRENCY_LIMIT: usize = 50;
|
||||
|
||||
let data_path = paths.data_dir();
|
||||
|
||||
// File paths
|
||||
let input_path = data_path.join("companies_yahoo_cleaned.jsonl");
|
||||
let log_path = data_path.join("companies_options_updates.log");
|
||||
let state_path = data_path.join("state.jsonl");
|
||||
|
||||
// Check input exists
|
||||
if !input_path.exists() {
|
||||
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping options enrichment").await;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Check if already completed
|
||||
if state_path.exists() {
|
||||
let state_content = tokio::fs::read_to_string(&state_path).await?;
|
||||
|
||||
for line in state_content.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
|
||||
if state.get("yahoo_options_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) {
|
||||
logger::log_info(" Yahoo options enrichment already completed").await;
|
||||
|
||||
// Count enriched companies
|
||||
let count = count_enriched_companies(paths, "options").await?;
|
||||
logger::log_info(&format!(" ✓ Found {} companies with options data", count)).await;
|
||||
return Ok(count);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === RECOVERY PHASE: Track enriched companies ===
|
||||
let mut enriched_companies: HashSet<String> = HashSet::new();
|
||||
|
||||
if log_path.exists() {
|
||||
logger::log_info("Loading options enrichment progress from log...").await;
|
||||
let log_content = tokio::fs::read_to_string(&log_path).await?;
|
||||
|
||||
for line in log_content.lines() {
|
||||
if line.trim().is_empty() || !line.ends_with('}') {
|
||||
continue; // Skip incomplete lines
|
||||
}
|
||||
|
||||
match serde_json::from_str::<serde_json::Value>(line) {
|
||||
Ok(entry) => {
|
||||
if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) {
|
||||
if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") {
|
||||
enriched_companies.insert(name.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
logger::log_warn(&format!("Skipping invalid log line: {}", e)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await;
|
||||
}
|
||||
|
||||
// Load all companies from input
|
||||
logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await;
|
||||
let companies = load_companies_from_jsonl(&input_path).await?;
|
||||
let total_companies = companies.len();
|
||||
logger::log_info(&format!("Found {} companies to process", total_companies)).await;
|
||||
|
||||
// Filter companies that need enrichment
|
||||
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies
|
||||
.into_iter()
|
||||
.filter(|company| !enriched_companies.contains(&company.name))
|
||||
.collect();
|
||||
|
||||
let pending_count = pending_companies.len();
|
||||
logger::log_info(&format!(
|
||||
" {} already enriched, {} pending",
|
||||
enriched_companies.len(),
|
||||
pending_count
|
||||
)).await;
|
||||
|
||||
if pending_count == 0 {
|
||||
logger::log_info(" ✓ All companies already enriched with options data").await;
|
||||
mark_enrichment_complete(&state_path, "yahoo_options_enrichment_complete").await?;
|
||||
return Ok(enriched_companies.len());
|
||||
}
|
||||
|
||||
// === PROCESSING PHASE: Enrich companies with options ===
|
||||
|
||||
// Shared counters
|
||||
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||
let success_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||
let failed_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
// Log writer channel with batching and fsync
|
||||
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000);
|
||||
|
||||
// Spawn log writer task
|
||||
let log_writer_handle = {
|
||||
let log_path = log_path.clone();
|
||||
let processed_count = Arc::clone(&processed_count);
|
||||
let total_companies = total_companies;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut log_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&log_path)
|
||||
.await
|
||||
.expect("Failed to open log file");
|
||||
|
||||
let mut write_count = 0;
|
||||
let mut last_fsync = tokio::time::Instant::now();
|
||||
|
||||
while let Some(cmd) = log_rx.recv().await {
|
||||
match cmd {
|
||||
LogCommand::Write(entry) => {
|
||||
let json_line = serde_json::to_string(&entry).expect("Serialization failed");
|
||||
log_file.write_all(json_line.as_bytes()).await.expect("Write failed");
|
||||
log_file.write_all(b"\n").await.expect("Write failed");
|
||||
|
||||
write_count += 1;
|
||||
|
||||
// Batched fsync
|
||||
if write_count >= FSYNC_BATCH_SIZE
|
||||
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS
|
||||
{
|
||||
log_file.flush().await.expect("Flush failed");
|
||||
log_file.sync_all().await.expect("Fsync failed");
|
||||
write_count = 0;
|
||||
last_fsync = tokio::time::Instant::now();
|
||||
}
|
||||
}
|
||||
LogCommand::Checkpoint => {
|
||||
// Force fsync on checkpoint
|
||||
log_file.flush().await.expect("Flush failed");
|
||||
log_file.sync_all().await.expect("Fsync failed");
|
||||
write_count = 0;
|
||||
last_fsync = tokio::time::Instant::now();
|
||||
|
||||
let current = processed_count.load(Ordering::SeqCst);
|
||||
logger::log_info(&format!(
|
||||
" Checkpoint: {}/{} companies processed",
|
||||
current, total_companies
|
||||
)).await;
|
||||
}
|
||||
LogCommand::Shutdown => {
|
||||
// Final fsync before shutdown
|
||||
log_file.flush().await.expect("Flush failed");
|
||||
log_file.sync_all().await.expect("Fsync failed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
// Process companies concurrently with task panic isolation
|
||||
let mut tasks = FuturesUnordered::new();
|
||||
let mut pending_iter = pending_companies.into_iter();
|
||||
let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT));
|
||||
|
||||
// Initial batch of tasks
|
||||
for _ in 0..CONCURRENCY_LIMIT.min(pending_count) {
|
||||
if let Some(company) = pending_iter.next() {
|
||||
let task = spawn_enrichment_task(
|
||||
company,
|
||||
Arc::clone(&yahoo_pool),
|
||||
paths.clone(),
|
||||
Arc::clone(&processed_count),
|
||||
Arc::clone(&success_count),
|
||||
Arc::clone(&failed_count),
|
||||
log_tx.clone(),
|
||||
Arc::clone(&semaphore),
|
||||
Arc::clone(shutdown_flag),
|
||||
EnrichmentType::Options,
|
||||
);
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
|
||||
// Process tasks as they complete and spawn new ones
|
||||
let mut checkpoint_counter = 0;
|
||||
while let Some(_result) = tasks.next().await {
|
||||
// Check for shutdown
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_warn("Shutdown signal received, stopping options enrichment").await;
|
||||
break;
|
||||
}
|
||||
|
||||
// Checkpoint periodically
|
||||
checkpoint_counter += 1;
|
||||
if checkpoint_counter % CHECKPOINT_INTERVAL == 0 {
|
||||
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
||||
}
|
||||
|
||||
// Spawn next task if available
|
||||
if let Some(company) = pending_iter.next() {
|
||||
let task = spawn_enrichment_task(
|
||||
company,
|
||||
Arc::clone(&yahoo_pool),
|
||||
paths.clone(),
|
||||
Arc::clone(&processed_count),
|
||||
Arc::clone(&success_count),
|
||||
Arc::clone(&failed_count),
|
||||
log_tx.clone(),
|
||||
Arc::clone(&semaphore),
|
||||
Arc::clone(shutdown_flag),
|
||||
EnrichmentType::Options,
|
||||
);
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
|
||||
// Final checkpoint and shutdown
|
||||
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
||||
let _ = log_tx.send(LogCommand::Shutdown).await;
|
||||
|
||||
// Wait for log writer to finish
|
||||
let _ = log_writer_handle.await;
|
||||
|
||||
let final_success = success_count.load(Ordering::SeqCst);
|
||||
let final_failed = failed_count.load(Ordering::SeqCst);
|
||||
|
||||
logger::log_info(&format!(
|
||||
" Options enrichment: {} succeeded, {} failed",
|
||||
final_success, final_failed
|
||||
)).await;
|
||||
|
||||
// Mark as complete if no shutdown
|
||||
if !shutdown_flag.load(Ordering::SeqCst) {
|
||||
mark_enrichment_complete(&state_path, "yahoo_options_enrichment_complete").await?;
|
||||
}
|
||||
|
||||
Ok(final_success)
|
||||
}
|
||||
|
||||
/// Yahoo Chart enrichment per corporate company
|
||||
///
|
||||
/// # Features
|
||||
/// - Graceful shutdown (abort-safe)
|
||||
/// - Task panic isolation (tasks fail independently)
|
||||
/// - Crash-safe persistence (checkpoint + log with fsync)
|
||||
/// - Smart skip logic (only process incomplete data)
|
||||
/// - Uses pending queue instead of retry mechanism
|
||||
///
|
||||
/// # Persistence Strategy
|
||||
/// - Checkpoint: companies_yahoo_cleaned.jsonl (atomic state)
|
||||
/// - Log: companies_chart_updates.log (append-only updates)
|
||||
/// - On restart: Load checkpoint + replay log
|
||||
/// - Periodic checkpoints (every 50 companies)
|
||||
/// - Batched fsync (every 10 writes or 10 seconds)
|
||||
pub async fn enrich_companies_with_chart(
|
||||
paths: &DataPaths,
|
||||
_config: &Config,
|
||||
yahoo_pool: Arc<YahooClientPool>,
|
||||
shutdown_flag: &Arc<AtomicBool>,
|
||||
) -> anyhow::Result<usize> {
|
||||
// Configuration constants
|
||||
const CHECKPOINT_INTERVAL: usize = 50;
|
||||
const FSYNC_BATCH_SIZE: usize = 10;
|
||||
const FSYNC_INTERVAL_SECS: u64 = 10;
|
||||
const CONCURRENCY_LIMIT: usize = 50;
|
||||
|
||||
let data_path = paths.data_dir();
|
||||
|
||||
// File paths
|
||||
let input_path = data_path.join("companies_yahoo_cleaned.jsonl");
|
||||
let log_path = data_path.join("companies_chart_updates.log");
|
||||
let state_path = data_path.join("state.jsonl");
|
||||
|
||||
// Check input exists
|
||||
if !input_path.exists() {
|
||||
logger::log_warn(" companies_yahoo_cleaned.jsonl not found, skipping chart enrichment").await;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Check if already completed
|
||||
if state_path.exists() {
|
||||
let state_content = tokio::fs::read_to_string(&state_path).await?;
|
||||
|
||||
for line in state_content.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
|
||||
if state.get("yahoo_chart_enrichment_complete").and_then(|v| v.as_bool()).unwrap_or(false) {
|
||||
logger::log_info(" Yahoo chart enrichment already completed").await;
|
||||
|
||||
// Count enriched companies
|
||||
let count = count_enriched_companies(paths, "chart").await?;
|
||||
logger::log_info(&format!(" ✓ Found {} companies with chart data", count)).await;
|
||||
return Ok(count);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === RECOVERY PHASE: Track enriched companies ===
|
||||
let mut enriched_companies: HashSet<String> = HashSet::new();
|
||||
|
||||
if log_path.exists() {
|
||||
logger::log_info("Loading chart enrichment progress from log...").await;
|
||||
let log_content = tokio::fs::read_to_string(&log_path).await?;
|
||||
|
||||
for line in log_content.lines() {
|
||||
if line.trim().is_empty() || !line.ends_with('}') {
|
||||
continue; // Skip incomplete lines
|
||||
}
|
||||
|
||||
match serde_json::from_str::<serde_json::Value>(line) {
|
||||
Ok(entry) => {
|
||||
if let Some(name) = entry.get("company_name").and_then(|v| v.as_str()) {
|
||||
if entry.get("status").and_then(|v| v.as_str()) == Some("enriched") {
|
||||
enriched_companies.insert(name.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
logger::log_warn(&format!("Skipping invalid log line: {}", e)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
logger::log_info(&format!("Loaded {} enriched companies from log", enriched_companies.len())).await;
|
||||
}
|
||||
|
||||
// Load all companies from input
|
||||
logger::log_info("Loading companies from companies_yahoo_cleaned.jsonl...").await;
|
||||
let companies = load_companies_from_jsonl(&input_path).await?;
|
||||
let total_companies = companies.len();
|
||||
logger::log_info(&format!("Found {} companies to process", total_companies)).await;
|
||||
|
||||
// Filter companies that need enrichment
|
||||
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies
|
||||
.into_iter()
|
||||
.filter(|company| !enriched_companies.contains(&company.name))
|
||||
.collect();
|
||||
|
||||
let pending_count = pending_companies.len();
|
||||
logger::log_info(&format!(
|
||||
" {} already enriched, {} pending",
|
||||
enriched_companies.len(),
|
||||
pending_count
|
||||
)).await;
|
||||
|
||||
if pending_count == 0 {
|
||||
logger::log_info(" ✓ All companies already enriched with chart data").await;
|
||||
mark_enrichment_complete(&state_path, "yahoo_chart_enrichment_complete").await?;
|
||||
return Ok(enriched_companies.len());
|
||||
}
|
||||
|
||||
// === PROCESSING PHASE: Enrich companies with chart ===
|
||||
|
||||
// Shared counters
|
||||
let processed_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||
let success_count = Arc::new(AtomicUsize::new(enriched_companies.len()));
|
||||
let failed_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
// Log writer channel with batching and fsync
|
||||
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000);
|
||||
|
||||
// Spawn log writer task
|
||||
let log_writer_handle = {
|
||||
let log_path = log_path.clone();
|
||||
let processed_count = Arc::clone(&processed_count);
|
||||
let total_companies = total_companies;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut log_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&log_path)
|
||||
.await
|
||||
.expect("Failed to open log file");
|
||||
|
||||
let mut write_count = 0;
|
||||
let mut last_fsync = tokio::time::Instant::now();
|
||||
|
||||
while let Some(cmd) = log_rx.recv().await {
|
||||
match cmd {
|
||||
LogCommand::Write(entry) => {
|
||||
let json_line = serde_json::to_string(&entry).expect("Serialization failed");
|
||||
log_file.write_all(json_line.as_bytes()).await.expect("Write failed");
|
||||
log_file.write_all(b"\n").await.expect("Write failed");
|
||||
|
||||
write_count += 1;
|
||||
|
||||
// Batched fsync
|
||||
if write_count >= FSYNC_BATCH_SIZE
|
||||
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS
|
||||
{
|
||||
log_file.flush().await.expect("Flush failed");
|
||||
log_file.sync_all().await.expect("Fsync failed");
|
||||
write_count = 0;
|
||||
last_fsync = tokio::time::Instant::now();
|
||||
}
|
||||
}
|
||||
LogCommand::Checkpoint => {
|
||||
// Force fsync on checkpoint
|
||||
log_file.flush().await.expect("Flush failed");
|
||||
log_file.sync_all().await.expect("Fsync failed");
|
||||
write_count = 0;
|
||||
last_fsync = tokio::time::Instant::now();
|
||||
|
||||
let current = processed_count.load(Ordering::SeqCst);
|
||||
logger::log_info(&format!(
|
||||
" Checkpoint: {}/{} companies processed",
|
||||
current, total_companies
|
||||
)).await;
|
||||
}
|
||||
LogCommand::Shutdown => {
|
||||
// Final fsync before shutdown
|
||||
log_file.flush().await.expect("Flush failed");
|
||||
log_file.sync_all().await.expect("Fsync failed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
// Process companies concurrently with task panic isolation
|
||||
let mut tasks = FuturesUnordered::new();
|
||||
let mut pending_iter = pending_companies.into_iter();
|
||||
let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT));
|
||||
|
||||
// Initial batch of tasks
|
||||
for _ in 0..CONCURRENCY_LIMIT.min(pending_count) {
|
||||
if let Some(company) = pending_iter.next() {
|
||||
let task = spawn_enrichment_task(
|
||||
company,
|
||||
Arc::clone(&yahoo_pool),
|
||||
paths.clone(),
|
||||
Arc::clone(&processed_count),
|
||||
Arc::clone(&success_count),
|
||||
Arc::clone(&failed_count),
|
||||
log_tx.clone(),
|
||||
Arc::clone(&semaphore),
|
||||
Arc::clone(shutdown_flag),
|
||||
EnrichmentType::Chart,
|
||||
);
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
|
||||
// Process tasks as they complete and spawn new ones
|
||||
let mut checkpoint_counter = 0;
|
||||
while let Some(_result) = tasks.next().await {
|
||||
// Check for shutdown
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
logger::log_warn("Shutdown signal received, stopping chart enrichment").await;
|
||||
break;
|
||||
}
|
||||
|
||||
// Checkpoint periodically
|
||||
checkpoint_counter += 1;
|
||||
if checkpoint_counter % CHECKPOINT_INTERVAL == 0 {
|
||||
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
||||
}
|
||||
|
||||
// Spawn next task if available
|
||||
if let Some(company) = pending_iter.next() {
|
||||
let task = spawn_enrichment_task(
|
||||
company,
|
||||
Arc::clone(&yahoo_pool),
|
||||
paths.clone(),
|
||||
Arc::clone(&processed_count),
|
||||
Arc::clone(&success_count),
|
||||
Arc::clone(&failed_count),
|
||||
log_tx.clone(),
|
||||
Arc::clone(&semaphore),
|
||||
Arc::clone(shutdown_flag),
|
||||
EnrichmentType::Chart,
|
||||
);
|
||||
tasks.push(task);
|
||||
}
|
||||
}
|
||||
|
||||
// Final checkpoint and shutdown
|
||||
let _ = log_tx.send(LogCommand::Checkpoint).await;
|
||||
let _ = log_tx.send(LogCommand::Shutdown).await;
|
||||
|
||||
// Wait for log writer to finish
|
||||
let _ = log_writer_handle.await;
|
||||
|
||||
let final_success = success_count.load(Ordering::SeqCst);
|
||||
let final_failed = failed_count.load(Ordering::SeqCst);
|
||||
|
||||
logger::log_info(&format!(
|
||||
" Chart enrichment: {} succeeded, {} failed",
|
||||
final_success, final_failed
|
||||
)).await;
|
||||
|
||||
// Mark as complete if no shutdown
|
||||
if !shutdown_flag.load(Ordering::SeqCst) {
|
||||
mark_enrichment_complete(&state_path, "yahoo_chart_enrichment_complete").await?;
|
||||
}
|
||||
|
||||
Ok(final_success)
|
||||
}
|
||||
|
||||
/// Type of enrichment being performed
|
||||
#[derive(Clone, Copy)]
|
||||
enum EnrichmentType {
|
||||
Options,
|
||||
Chart,
|
||||
}
|
||||
|
||||
/// Spawn an enrichment task with panic isolation
|
||||
fn spawn_enrichment_task(
|
||||
company: CompanyCrossPlatformInfo,
|
||||
yahoo_pool: Arc<YahooClientPool>,
|
||||
paths: DataPaths,
|
||||
processed_count: Arc<AtomicUsize>,
|
||||
success_count: Arc<AtomicUsize>,
|
||||
failed_count: Arc<AtomicUsize>,
|
||||
log_tx: mpsc::Sender<LogCommand>,
|
||||
semaphore: Arc<tokio::sync::Semaphore>,
|
||||
shutdown_flag: Arc<AtomicBool>,
|
||||
enrichment_type: EnrichmentType,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
// Acquire semaphore permit
|
||||
let _permit = semaphore.acquire().await.expect("Semaphore closed");
|
||||
|
||||
// Check shutdown before processing
|
||||
if shutdown_flag.load(Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Perform enrichment (panic-isolated)
|
||||
let result = match enrichment_type {
|
||||
EnrichmentType::Options => {
|
||||
enrich_company_with_options(&company, &yahoo_pool, &paths).await
|
||||
}
|
||||
EnrichmentType::Chart => {
|
||||
enrich_company_with_chart(&company, &yahoo_pool, &paths).await
|
||||
}
|
||||
};
|
||||
|
||||
// Update counters
|
||||
processed_count.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
let status = match result {
|
||||
Ok(_) => {
|
||||
success_count.fetch_add(1, Ordering::SeqCst);
|
||||
"enriched"
|
||||
}
|
||||
Err(e) => {
|
||||
failed_count.fetch_add(1, Ordering::SeqCst);
|
||||
logger::log_warn(&format!(
|
||||
" Failed to enrich {}: {}",
|
||||
company.name, e
|
||||
)).await;
|
||||
"failed"
|
||||
}
|
||||
};
|
||||
|
||||
// Log result
|
||||
let log_entry = json!({
|
||||
"company_name": company.name,
|
||||
"status": status,
|
||||
"timestamp": Utc::now().to_rfc3339(),
|
||||
});
|
||||
|
||||
let _ = log_tx.send(LogCommand::Write(log_entry)).await;
|
||||
})
|
||||
}
|
||||
|
||||
/// Enrich a single company with options data
|
||||
async fn enrich_company_with_options(
|
||||
company: &CompanyCrossPlatformInfo,
|
||||
yahoo_pool: &Arc<YahooClientPool>,
|
||||
paths: &DataPaths,
|
||||
) -> anyhow::Result<()> {
|
||||
let ticker = match extract_first_yahoo_ticker(company) {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
return Err(anyhow::anyhow!("No valid Yahoo ticker found"));
|
||||
}
|
||||
};
|
||||
|
||||
// Get options data for all available expiration dates
|
||||
let options_data = yahoo_pool.get_options_data(&ticker, None).await?;
|
||||
|
||||
// Only save if we got meaningful data
|
||||
if options_data.options.is_empty() {
|
||||
return Err(anyhow::anyhow!("No options data available"));
|
||||
}
|
||||
|
||||
// Save the options data
|
||||
save_company_data(paths, &company.name, &options_data, "options").await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enrich a single company with chart data
|
||||
async fn enrich_company_with_chart(
|
||||
company: &CompanyCrossPlatformInfo,
|
||||
yahoo_pool: &Arc<YahooClientPool>,
|
||||
paths: &DataPaths,
|
||||
) -> anyhow::Result<()> {
|
||||
let ticker = match extract_first_yahoo_ticker(company) {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
return Err(anyhow::anyhow!("No valid Yahoo ticker found"));
|
||||
}
|
||||
};
|
||||
|
||||
// Get 1 year of daily chart data
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let twenty_five_years_ago = now - (25 * 365 * 24 * 60 * 60);
|
||||
|
||||
let chart_data = yahoo_pool.get_chart_data(&ticker, "1d", twenty_five_years_ago, now).await?;
|
||||
|
||||
// Only save if we got meaningful data
|
||||
if chart_data.quotes.is_empty() {
|
||||
return Err(anyhow::anyhow!("No chart data available"));
|
||||
}
|
||||
|
||||
// Save the chart data
|
||||
save_company_data(paths, &company.name, &chart_data, "chart").await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Save data to company directory
|
||||
async fn save_company_data<T: serde::Serialize>(
|
||||
paths: &DataPaths,
|
||||
company_name: &str,
|
||||
data: &T,
|
||||
data_type: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
use tokio::fs;
|
||||
|
||||
let safe_name = sanitize_company_name(company_name);
|
||||
|
||||
let company_dir = paths.corporate_dir().join(&safe_name).join(data_type);
|
||||
fs::create_dir_all(&company_dir).await?;
|
||||
|
||||
let data_path = company_dir.join("data.jsonl");
|
||||
let json_line = serde_json::to_string(data)?;
|
||||
|
||||
let mut file = fs::File::create(&data_path).await?;
|
||||
file.write_all(json_line.as_bytes()).await?;
|
||||
file.write_all(b"\n").await?;
|
||||
file.flush().await?;
|
||||
file.sync_all().await?; // Ensure data is persisted
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Extract first valid Yahoo ticker from company
|
||||
fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option<String> {
|
||||
for tickers in company.isin_tickers_map.values() {
|
||||
for ticker in tickers {
|
||||
if ticker.starts_with("YAHOO:")
|
||||
&& ticker != "YAHOO:NO_RESULTS"
|
||||
&& ticker != "YAHOO:ERROR"
|
||||
{
|
||||
return Some(ticker.trim_start_matches("YAHOO:").to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Sanitize company name for file system
|
||||
fn sanitize_company_name(name: &str) -> String {
|
||||
name.replace("/", "_")
|
||||
.replace("\\", "_")
|
||||
.replace(":", "_")
|
||||
.replace("*", "_")
|
||||
.replace("?", "_")
|
||||
.replace("\"", "_")
|
||||
.replace("<", "_")
|
||||
.replace(">", "_")
|
||||
.replace("|", "_")
|
||||
}
|
||||
|
||||
/// Load companies from JSONL file
|
||||
async fn load_companies_from_jsonl(path: &std::path::Path) -> anyhow::Result<Vec<CompanyCrossPlatformInfo>> {
|
||||
let content = tokio::fs::read_to_string(path).await?;
|
||||
let mut companies = Vec::new();
|
||||
|
||||
for line in content.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
if let Ok(company) = serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
|
||||
companies.push(company);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(companies)
|
||||
}
|
||||
|
||||
/// Count enriched companies (companies with specific data type)
|
||||
async fn count_enriched_companies(paths: &DataPaths, data_type: &str) -> anyhow::Result<usize> {
|
||||
let corporate_dir = paths.corporate_dir();
|
||||
|
||||
if !corporate_dir.exists() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let mut count = 0;
|
||||
let mut entries = tokio::fs::read_dir(&corporate_dir).await?;
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
let data_dir = path.join(data_type);
|
||||
let data_file = data_dir.join("data.jsonl");
|
||||
|
||||
if data_file.exists() {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Mark enrichment as complete in state file
|
||||
async fn mark_enrichment_complete(state_path: &std::path::Path, key: &str) -> anyhow::Result<()> {
|
||||
let enrichment_complete = json!({
|
||||
key: true,
|
||||
"completed_at": Utc::now().to_rfc3339(),
|
||||
});
|
||||
|
||||
let mut state_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(state_path)
|
||||
.await?;
|
||||
|
||||
let state_line = serde_json::to_string(&enrichment_complete)?;
|
||||
state_file.write_all(state_line.as_bytes()).await?;
|
||||
state_file.write_all(b"\n").await?;
|
||||
state_file.flush().await?;
|
||||
state_file.sync_all().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Log command enum
|
||||
enum LogCommand {
|
||||
Write(serde_json::Value),
|
||||
Checkpoint,
|
||||
Shutdown,
|
||||
}
|
||||
@@ -63,7 +63,7 @@ impl YahooTickerResult {
|
||||
}
|
||||
}
|
||||
|
||||
/// UPDATED: Scrape company details with full validation and shutdown support
|
||||
/// Scrape company details with full validation and shutdown support
|
||||
pub async fn scrape_company_details_by_isin(
|
||||
pool: &Arc<ChromeDriverPool>,
|
||||
isin: &str,
|
||||
|
||||
Reference in New Issue
Block a user