cleaned up main
This commit is contained in:
@@ -7,14 +7,12 @@
|
||||
use super::types::CompanyCrossPlatformInfo;
|
||||
use crate::util::logger;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use serde::Serialize;
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use std::path::{Path};
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs::{File};
|
||||
use tokio::io::{AsyncWriteExt};
|
||||
use anyhow::Result;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Load companies from checkpoint and replay log for recovery
|
||||
///
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::scraper::yahoo::{YahooClientPool};
|
||||
|
||||
use std::result::Result::Ok;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool};
|
||||
|
||||
/// Main corporate update entry point with shutdown awareness
|
||||
pub async fn run_full_update(
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::util::logger;
|
||||
use crate::scraper::yahoo::{YahooClientPool, QuoteSummaryModule};
|
||||
|
||||
use std::result::Result::Ok;
|
||||
use chrono::{Local, Utc};
|
||||
use chrono::{Utc};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
@@ -191,12 +191,42 @@ pub async fn companies_yahoo_cleansed_low_profile(
|
||||
let input_path = data_path.join("companies_yahoo.jsonl");
|
||||
let checkpoint_path = data_path.join("companies_yahoo_cleaned.jsonl");
|
||||
let log_path = data_path.join("companies_updates.log");
|
||||
let state_path = data_path.join("state.jsonl");
|
||||
|
||||
// Check input exists
|
||||
if !input_path.exists() {
|
||||
logger::log_warn(" companies_yahoo.jsonl not found, skipping low profile cleansing").await;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if state_path.exists() {
|
||||
let state_content = tokio::fs::read_to_string(&state_path).await?;
|
||||
|
||||
for line in state_content.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
|
||||
if state.get("yahoo_companies_cleansed_low_profile").and_then(|v| v.as_bool()).unwrap_or(false) {
|
||||
logger::log_info(" Yahoo low profile cleansing already completed, reading existing file...").await;
|
||||
|
||||
if checkpoint_path.exists() {
|
||||
let checkpoint_content = tokio::fs::read_to_string(&checkpoint_path).await?;
|
||||
let count = checkpoint_content.lines()
|
||||
.filter(|line| !line.trim().is_empty())
|
||||
.count();
|
||||
|
||||
logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo_cleaned.jsonl", count)).await;
|
||||
return Ok(count);
|
||||
} else {
|
||||
logger::log_warn(" State indicates completion but companies_yahoo_cleaned.jsonl not found, re-running...").await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === RECOVERY PHASE: Load checkpoint + replay log ===
|
||||
let mut existing_companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new();
|
||||
@@ -616,7 +646,26 @@ pub async fn companies_yahoo_cleansed_low_profile(
|
||||
// Shutdown Yahoo pool
|
||||
yahoo_pool.shutdown().await?;
|
||||
|
||||
Ok(final_valid)
|
||||
// Write completion milestone to state.jsonl
|
||||
let state_path = data_path.join("state.jsonl");
|
||||
let yahoo_low_profile = json!({
|
||||
"yahoo_companies_cleansed_low_profile": true,
|
||||
"completed_at": chrono::Utc::now().to_rfc3339(),
|
||||
});
|
||||
|
||||
let mut state_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&state_path)
|
||||
.await?;
|
||||
let state_line = serde_json::to_string(&yahoo_low_profile)?;
|
||||
state_file.write_all(state_line.as_bytes()).await?;
|
||||
state_file.write_all(b"\n").await?;
|
||||
state_file.flush().await?;
|
||||
|
||||
logger::log_info(&format!(" ✓ State milestone saved to: {:?}", state_path)).await;
|
||||
|
||||
Ok(final_count)
|
||||
}
|
||||
|
||||
/// Helper function to spawn a validation task (reduces code duplication)
|
||||
@@ -911,54 +960,9 @@ async fn save_company_core_data(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct ProcessResult {
|
||||
pub changes: Vec<CompanyEventChange>,
|
||||
}
|
||||
|
||||
pub fn process_batch(
|
||||
new_events: &[CompanyEvent],
|
||||
existing: &mut HashMap<String, CompanyEvent>,
|
||||
today: &str,
|
||||
) -> ProcessResult {
|
||||
let mut changes = Vec::new();
|
||||
|
||||
for new in new_events {
|
||||
let key = event_key(new);
|
||||
|
||||
if let Some(old) = existing.get(&key) {
|
||||
changes.extend(detect_changes(old, new, today));
|
||||
existing.insert(key, new.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
let date_key = format!("{}|{}", new.ticker, new.date);
|
||||
let mut found_old = None;
|
||||
for (k, e) in existing.iter() {
|
||||
if format!("{}|{}", e.ticker, e.date) == date_key && k != &key {
|
||||
found_old = Some((k.clone(), e.clone()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((old_key, old_event)) = found_old {
|
||||
if new.date.as_str() > today {
|
||||
changes.push(CompanyEventChange {
|
||||
ticker: new.ticker.clone(),
|
||||
date: new.date.clone(),
|
||||
field_changed: "time".to_string(),
|
||||
old_value: old_event.time.clone(),
|
||||
new_value: new.time.clone(),
|
||||
detected_at: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
|
||||
});
|
||||
}
|
||||
existing.remove(&old_key);
|
||||
}
|
||||
|
||||
existing.insert(key, new.clone());
|
||||
}
|
||||
|
||||
ProcessResult { changes }
|
||||
}
|
||||
|
||||
/// Check if a company needs processing (validation check)
|
||||
fn company_needs_processing(
|
||||
|
||||
Reference in New Issue
Block a user