added integrity check to forex and exchange collection functiosn
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
// src/corporate/collect_exchanges.rs
|
||||
use crate::util::directories::DataPaths;
|
||||
use crate::util::integrity::{DataStage, StateManager, file_reference};
|
||||
use crate::util::logger;
|
||||
use crate::scraper::yahoo::ChartData;
|
||||
|
||||
@@ -238,10 +239,28 @@ fn get_fallback_rate(currency: &str) -> f64 {
|
||||
/// - Extracts exchange data from core/data.jsonl
|
||||
/// - Groups companies by exchange
|
||||
/// - Sums up market caps for each exchange
|
||||
/// - **NEW**: Converts all market caps to USD using FX rates
|
||||
/// - Converts all market caps to USD using FX rates
|
||||
/// - Saves consolidated mapping to data/yahoo_exchanges.json
|
||||
/// - Handles missing or invalid data gracefully
|
||||
/// - Integrity tracking with content hash validation
|
||||
pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usize> {
|
||||
let state_path = paths.data_dir().join("state.jsonl");
|
||||
let manager = StateManager::new(&state_path, &paths.data_dir().to_path_buf());
|
||||
let step_name = "exchange_collection_complete";
|
||||
|
||||
let output_path = paths.data_dir().join("yahoo_exchanges.json");
|
||||
|
||||
if manager.is_step_valid(step_name).await? {
|
||||
logger::log_info(" Exchange collection already completed and valid").await;
|
||||
|
||||
// Load and count exchanges
|
||||
if output_path.exists() {
|
||||
let content = fs::read_to_string(&output_path).await?;
|
||||
let exchanges: HashMap<String, ExchangeInfo> = serde_json::from_str(&content)?;
|
||||
logger::log_info(&format!(" ✓ Found {} valid exchanges", exchanges.len())).await;
|
||||
return Ok(exchanges.len());
|
||||
}
|
||||
}
|
||||
logger::log_info("Collecting exchange information from company directories...").await;
|
||||
|
||||
let corporate_dir = paths.corporate_dir();
|
||||
@@ -353,7 +372,6 @@ pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usi
|
||||
}
|
||||
|
||||
// Save to yahoo_exchanges.json
|
||||
let output_path = paths.data_dir().join("yahoo_exchanges.json");
|
||||
save_exchanges_json(&output_path, &exchanges).await?;
|
||||
|
||||
logger::log_info(&format!(
|
||||
@@ -361,12 +379,41 @@ pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usi
|
||||
output_path.display()
|
||||
)).await;
|
||||
|
||||
track_exchange_collection_completion(&manager, &output_path, step_name).await?;
|
||||
logger::log_info(" ✓ Exchange collection marked as complete with integrity tracking").await;
|
||||
|
||||
// Print summary statistics
|
||||
print_exchange_statistics(&exchanges, &fx_cache).await;
|
||||
|
||||
Ok(exchanges.len())
|
||||
}
|
||||
|
||||
/// Track exchange collection completion with content hash verification
|
||||
async fn track_exchange_collection_completion(
|
||||
manager: &StateManager,
|
||||
output_path: &std::path::Path,
|
||||
step_name: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
// Create content reference for the output file
|
||||
let content_reference = file_reference(output_path);
|
||||
|
||||
// Track completion with:
|
||||
// - Content reference: The yahoo_exchanges.json file
|
||||
// - Data stage: Data (7-day TTL by default)
|
||||
// - Dependencies: None (this is a collection step, not dependent on other tracked steps)
|
||||
// Note: In practice, it depends on core data, but we track the output file
|
||||
// which will change if core data changes, so explicit dependency not needed
|
||||
manager.update_entry(
|
||||
step_name.to_string(),
|
||||
content_reference,
|
||||
DataStage::Data,
|
||||
vec![], // No explicit dependencies - output file serves as verification
|
||||
None, // Use default TTL (7 days for Data stage)
|
||||
).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Extract exchange information from a company's core data file
|
||||
async fn extract_exchange_info(
|
||||
core_data_path: &std::path::Path,
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_pr
|
||||
use crate::corporate::update_companies_enrich::enrich_companies_with_events;
|
||||
use crate::corporate::update_companies_enrich_option_chart::{enrich_companies_with_option, enrich_companies_with_chart};
|
||||
use crate::corporate::collect_exchanges::collect_and_save_exchanges;
|
||||
use crate::economic::update_forex::collect_fx_rates;
|
||||
use crate::economic::yahoo_update_forex::collect_fx_rates;
|
||||
use crate::util::directories::DataPaths;
|
||||
use crate::util::logger;
|
||||
use crate::scraper::webdriver::ChromeDriverPool;
|
||||
|
||||
@@ -5,6 +5,6 @@ pub mod storage;
|
||||
pub mod helpers;
|
||||
|
||||
pub mod update;
|
||||
pub mod update_forex;
|
||||
pub mod yahoo_update_forex;
|
||||
|
||||
pub use update::run_full_update;
|
||||
@@ -1,6 +1,7 @@
|
||||
// src/forex/update_forex.rs
|
||||
use crate::config::Config;
|
||||
use crate::util::directories::DataPaths;
|
||||
use crate::util::integrity::{DataStage, StateManager, directory_reference};
|
||||
use crate::util::logger;
|
||||
use crate::scraper::yahoo::{YahooClientPool, ChartData};
|
||||
|
||||
@@ -89,32 +90,33 @@ pub async fn collect_fx_rates(
|
||||
let data_path = paths.data_dir();
|
||||
|
||||
// File paths
|
||||
let checkpoint_path = data_path.join("fx_rates_collected.jsonl");
|
||||
let output_path = data_path.join("economic").join("currency");
|
||||
let log_path = data_path.join("fx_rates_updates.log");
|
||||
let state_path = data_path.join("state.jsonl");
|
||||
|
||||
// Check if already completed (check state file)
|
||||
if state_path.exists() {
|
||||
let state_content = tokio::fs::read_to_string(&state_path).await?;
|
||||
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
|
||||
let step_name = "yahoo_fx_rate_collection_completed";
|
||||
let content_reference = directory_reference(&output_path,
|
||||
Some(vec![
|
||||
"*/chart/*.jsonl".to_string(), // Main pattern for events data
|
||||
"*/chart/data.jsonl".to_string(), // Specific pattern (more precise)
|
||||
]),
|
||||
Some(vec![
|
||||
"*.log".to_string(), // Exclude log files
|
||||
"*.tmp".to_string(), // Exclude temp files
|
||||
"*.bak".to_string(), // Exclude backup files
|
||||
]),
|
||||
);
|
||||
|
||||
for line in state_content.lines() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
|
||||
if state.get("fx_rates_collection_complete").and_then(|v| v.as_bool()).unwrap_or(false) {
|
||||
logger::log_info(" FX rates collection already completed").await;
|
||||
|
||||
// Count collected currencies
|
||||
let count = count_collected_currencies(paths).await?;
|
||||
logger::log_info(&format!(" ✓ Found {} currencies with chart data", count)).await;
|
||||
return Ok(count);
|
||||
}
|
||||
}
|
||||
}
|
||||
if manager.is_step_valid(step_name).await? {
|
||||
logger::log_info(" FX rates collection already completed").await;
|
||||
let count = count_collected_currencies(paths).await?;
|
||||
logger::log_info(&format!(" ✓ Found {} currencies with chart data", count)).await;
|
||||
return Ok(count);
|
||||
}
|
||||
|
||||
logger::log_info(" Updating missing forex data...").await;
|
||||
|
||||
// === RECOVERY PHASE: Track collected currencies ===
|
||||
let mut collected_currencies: HashSet<String> = HashSet::new();
|
||||
|
||||
@@ -163,7 +165,13 @@ pub async fn collect_fx_rates(
|
||||
|
||||
if pending_count == 0 {
|
||||
logger::log_info(" ✓ All currencies already collected").await;
|
||||
mark_collection_complete(&state_path).await?;
|
||||
manager.update_entry(
|
||||
step_name.to_string(),
|
||||
content_reference,
|
||||
DataStage::Data,
|
||||
vec!["yahoo_companies_cleansed".to_string()], // Dependency
|
||||
None, // Use default TTL (7 days for Data stage)
|
||||
).await?;
|
||||
return Ok(collected_currencies.len());
|
||||
}
|
||||
|
||||
@@ -309,9 +317,14 @@ pub async fn collect_fx_rates(
|
||||
|
||||
// Mark as complete if not shutdown
|
||||
if !shutdown_flag.load(Ordering::SeqCst) {
|
||||
mark_collection_complete(&state_path).await?;
|
||||
manager.update_entry(
|
||||
step_name.to_string(),
|
||||
content_reference,
|
||||
DataStage::Data,
|
||||
vec!["yahoo_companies_cleansed".to_string()], // Dependency
|
||||
None, // Use default TTL (7 days for Data stage)
|
||||
).await?;
|
||||
}
|
||||
|
||||
Ok(final_success)
|
||||
}
|
||||
|
||||
@@ -463,28 +476,6 @@ async fn count_collected_currencies(paths: &DataPaths) -> anyhow::Result<usize>
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Mark collection as complete in state file
|
||||
async fn mark_collection_complete(state_path: &std::path::Path) -> anyhow::Result<()> {
|
||||
let collection_complete = json!({
|
||||
"fx_rates_collection_complete": true,
|
||||
"completed_at": Utc::now().to_rfc3339(),
|
||||
});
|
||||
|
||||
let mut state_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(state_path)
|
||||
.await?;
|
||||
|
||||
let state_line = serde_json::to_string(&collection_complete)?;
|
||||
state_file.write_all(state_line.as_bytes()).await?;
|
||||
state_file.write_all(b"\n").await?;
|
||||
state_file.flush().await?;
|
||||
state_file.sync_all().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Log command enum
|
||||
enum LogCommand {
|
||||
Write(serde_json::Value),
|
||||
Reference in New Issue
Block a user