162 lines
6.7 KiB
Rust
162 lines
6.7 KiB
Rust
// src/corporate/update.rs
|
|
use super::{scraper::*, openfigi::*};
|
|
use crate::config::Config;
|
|
use crate::check_shutdown;
|
|
use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel;
|
|
use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data};
|
|
use crate::corporate::update_companies_enrich::enrich_companies_with_events;
|
|
use crate::corporate::update_companies_enrich_options_chart::{enrich_companies_with_options, enrich_companies_with_chart};
|
|
use crate::corporate::collect_exchanges::collect_and_save_exchanges;
|
|
use crate::economic::update_forex::collect_fx_rates;
|
|
use crate::util::directories::DataPaths;
|
|
use crate::util::logger;
|
|
use crate::scraper::webdriver::ChromeDriverPool;
|
|
use crate::scraper::yahoo::{YahooClientPool};
|
|
|
|
use std::result::Result::Ok;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicBool};
|
|
|
|
/// Main corporate update entry point with shutdown awareness
|
|
pub async fn run_full_update(
|
|
config: &Config,
|
|
pool: &Arc<ChromeDriverPool>,
|
|
shutdown_flag: &Arc<AtomicBool>,
|
|
) -> anyhow::Result<()> {
|
|
logger::log_info("=== Corporate Update ===").await;
|
|
|
|
let paths = DataPaths::new(".")?;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 1: Downloading GLEIF CSV...").await;
|
|
let gleif_csv_path = match download_isin_lei_csv().await? {
|
|
Some(p) => {
|
|
logger::log_info(&format!(" ✓ GLEIF CSV at: {}", p)).await;
|
|
p
|
|
}
|
|
None => {
|
|
logger::log_warn(" ✗ Could not obtain GLEIF CSV").await;
|
|
return Ok(());
|
|
}
|
|
};
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 2: Loading OpenFIGI metadata...").await;
|
|
load_figi_type_lists(&paths).await.ok();
|
|
logger::log_info(" ✓ OpenFIGI metadata loaded").await;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 3: Checking LEI-FIGI mapping status...").await;
|
|
let all_mapped = ensure_all_leis_mapped(&gleif_csv_path, None).await?;
|
|
|
|
if !all_mapped {
|
|
logger::log_warn(" ⚠ Some LEIs failed to map - continuing with partial data").await;
|
|
} else {
|
|
logger::log_info(" ✓ All LEIs successfully mapped").await;
|
|
}
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 4: Building securities map (streaming)...").await;
|
|
let date_dir = find_most_recent_figi_date_dir(&paths).await?;
|
|
|
|
if let Some(date_dir) = date_dir {
|
|
logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await;
|
|
load_or_build_all_securities(&date_dir).await?;
|
|
logger::log_info(" ✓ Securities map updated").await;
|
|
} else {
|
|
logger::log_warn(" ✗ No FIGI data directory found").await;
|
|
}
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await;
|
|
let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, config, &None).await?;
|
|
logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 6: Cleansing companies with missing essential data...").await;
|
|
let cleansed_count = companies_yahoo_cleansed_no_data(&paths).await?;
|
|
logger::log_info(&format!(" ✓ {} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
let proxy_pool = pool.get_proxy_pool()
|
|
.ok_or_else(|| anyhow::anyhow!("ChromeDriverPool must be created with VPN proxy rotation enabled"))?;
|
|
|
|
logger::log_info("Creating YahooClientPool with proxy rotation...").await;
|
|
let yahoo_pool = Arc::new(YahooClientPool::new(proxy_pool, config, None).await?);
|
|
logger::log_info(&format!("✓ YahooClientPool ready with {} clients", yahoo_pool.num_clients().await)).await;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 7: Cleansing companies with too low profile (with abort-safe persistence)...").await;
|
|
let cleansed_count = companies_yahoo_cleansed_low_profile(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
|
|
logger::log_info(&format!(" ✓ {} companies with sufficient profile ready for analytics", cleansed_count)).await;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 8: Enriching companies with Yahoo Events (with abort-safe persistence)...").await;
|
|
let enriched_count = enrich_companies_with_events(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
|
|
logger::log_info(&format!(" ✓ {} companies enriched with event data", enriched_count)).await;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 9: Enriching companies with Yahoo Options (with abort-safe persistence)...").await;
|
|
let options_count = enrich_companies_with_options(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
|
|
logger::log_info(&format!(" ✓ {} companies enriched with options data", options_count)).await;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 10: Enriching companies with Yahoo Chart (with abort-safe persistence)...").await;
|
|
let chart_count = enrich_companies_with_chart(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
|
|
logger::log_info(&format!(" ✓ {} companies enriched with chart data", chart_count)).await;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 11: Collecting FX rates...").await;
|
|
let fx_count = collect_fx_rates(&paths, config, yahoo_pool.clone(), shutdown_flag).await?;
|
|
logger::log_info(&format!(" ✓ Collected {} FX rates", fx_count)).await;
|
|
|
|
check_shutdown!(shutdown_flag);
|
|
|
|
logger::log_info("Step 12: Collecting exchange information...").await;
|
|
let exchange_count = collect_and_save_exchanges(&paths).await?;
|
|
logger::log_info(&format!(" ✓ Collected {} exchanges", exchange_count)).await;
|
|
|
|
logger::log_info("=== Corporate update complete === ").await;
|
|
Ok(())
|
|
}
|
|
|
|
async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> {
|
|
let map_cache_dir = paths.cache_gleif_openfigi_map_dir();
|
|
|
|
if !map_cache_dir.exists() {
|
|
return Ok(None);
|
|
}
|
|
|
|
let mut entries = tokio::fs::read_dir(&map_cache_dir).await?;
|
|
let mut dates = Vec::new();
|
|
|
|
while let Some(entry) = entries.next_entry().await? {
|
|
let path = entry.path();
|
|
if path.is_dir() {
|
|
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
|
|
if name.len() == 8 && name.chars().all(|c| c.is_numeric()) {
|
|
dates.push((name.to_string(), path));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if dates.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
|
|
dates.sort_by(|a, b| b.0.cmp(&a.0));
|
|
Ok(Some(dates[0].1.clone()))
|
|
} |