diff --git a/integrity/checkpoint_dependencies.dot b/integrity/checkpoint_dependencies.dot index 8157da7..7c2eb21 100644 --- a/integrity/checkpoint_dependencies.dot +++ b/integrity/checkpoint_dependencies.dot @@ -2,24 +2,27 @@ digraph Dependencies { rankdir=LR; node [shape=box]; - "yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete -Corporate events enriched for all companies"]; - "yahoo_companies_cleansed" [label="yahoo_companies_cleansed -Company data cleansed and validated"]; "yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete Options data enriched for all companies"]; + "yahoo_companies_cleansed_no_data" [label="yahoo_companies_cleansed_no_data +Companies cleansed of data with no Yahoo results"]; + "lei_figi_mapping_complete" [label="lei_figi_mapping_complete +LEI-to-FIGI mappings from OpenFIGI API"]; "securities_data_complete" [label="securities_data_complete Securities data built from FIGI mappings"]; + "yahoo_companies_cleansed_low_profile" [label="yahoo_companies_cleansed_low_profile +Companies cleansed of low profile (insufficient market cap/price data)"]; + "yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete +Corporate events enriched for all companies"]; "enrichment_group" [label="enrichment_group Yahoo exchanges collected and validated"]; "yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete Chart data enriched for all companies"]; - "lei_figi_mapping_complete" [label="lei_figi_mapping_complete -LEI-to-FIGI mappings from OpenFIGI API"]; - "yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; - "yahoo_companies_cleansed" -> "securities_data_complete"; - "yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; + "yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed_low_profile" [label="via group enrichment_group"]; + "yahoo_companies_cleansed_no_data" -> "securities_data_complete"; "securities_data_complete" -> "lei_figi_mapping_complete"; - "yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; + "yahoo_companies_cleansed_low_profile" -> "yahoo_companies_cleansed_no_data"; + "yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed_low_profile" [label="via group enrichment_group"]; + "yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed_low_profile" [label="via group enrichment_group"]; } diff --git a/integrity/checkpoint_dependencies.toml b/integrity/checkpoint_dependencies.toml index 8c506c8..6072a1c 100644 --- a/integrity/checkpoint_dependencies.toml +++ b/integrity/checkpoint_dependencies.toml @@ -16,10 +16,14 @@ depends_on = ["lei_figi_mapping_complete"] # CLEANSING STAGE (Depends on collection) # ============================================================================ -[checkpoints.yahoo_companies_cleansed] -description = "Company data cleansed and validated" +[checkpoints.yahoo_companies_cleansed_no_data] +description = "Companies cleansed of data with no Yahoo results" depends_on = ["securities_data_complete"] +[checkpoints.yahoo_companies_cleansed_low_profile] +description = "Companies cleansed of low profile (insufficient market cap/price data)" +depends_on = ["yahoo_companies_cleansed_no_data"] + # ============================================================================ # ENRICHMENT GROUP (All depend on cleansed companies) # ============================================================================ @@ -31,7 +35,7 @@ members = [ "yahoo_options_enrichment_complete", "yahoo_chart_enrichment_complete" ] -depends_on = ["yahoo_companies_cleansed"] +depends_on = ["yahoo_companies_cleansed_low_profile"] [checkpoints.yahoo_events_enrichment_complete] description = "Corporate events enriched for all companies" diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index cd56b06..218788b 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -1,9 +1,7 @@ // src/corporate/storage.rs -use super::{types::*, helpers::*}; use crate::util::directories::DataPaths; use crate::util::logger; -use tokio::fs; use tokio::io::AsyncWriteExt; use std::collections::HashMap; use std::path::{PathBuf, Path}; @@ -18,60 +16,6 @@ pub struct EventIndex { pub file_path: PathBuf, } -/// Build index of all events without loading them into memory -pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result> { - let dir = paths.corporate_events_dir(); - if !dir.exists() { - logger::log_info("Corporate Storage: No events directory found").await; - return Ok(Vec::new()); - } - - let mut index = Vec::new(); - let mut entries = fs::read_dir(dir).await?; - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.extension().and_then(|s| s.to_str()) == Some("json") { - let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); - if name.starts_with("events_") && name.len() == 17 { - let content = fs::read_to_string(&path).await?; - let events: Vec = serde_json::from_str(&content)?; - - for event in events { - index.push(EventIndex { - key: event_key(&event), - ticker: event.ticker.clone(), - date: event.date.clone(), - file_path: path.clone(), - }); - } - } - } - } - - logger::log_info(&format!("Corporate Storage: Built index with {} entries", index.len())).await; - Ok(index) -} - -pub fn get_company_dir(paths: &DataPaths, lei: &str) -> PathBuf { - paths.corporate_prices_dir().join(lei) -} - -pub async fn ensure_company_dirs(paths: &DataPaths, isin: &str) -> anyhow::Result<()> { - let base = get_company_dir(paths, isin); - let paths_to_create = [ - base.clone(), - base.join("5min"), - base.join("daily"), - base.join("aggregated").join("5min"), - base.join("aggregated").join("daily"), - ]; - for p in paths_to_create { - fs::create_dir_all(&p).await?; - } - Ok(()) -} - /// Stream companies to JSONL incrementally pub async fn save_companies_to_jsonl_streaming( paths: &DataPaths, diff --git a/src/corporate/types.rs b/src/corporate/types.rs index 16c1cbb..e3df470 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -54,7 +54,6 @@ pub struct FigiData { /// * securities: Grouped by ISIN, filtered for Common Stock only #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CompanyData{ - pub id: String, pub name: String, pub primary_isin: String, pub securities: HashMap>, // ISIN -> Vec @@ -78,7 +77,6 @@ pub struct CompanyCrossPlatformData { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WarrantData { - pub company_id: String, // key in CompanyData pub company_name: String, // key in CompanyData pub warrants: HashMap, // underlying company name -> Warrant } @@ -92,7 +90,6 @@ pub struct WarrantData { /// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WarrantDetails { - pub company_id: String, // key in CompanyData pub company_name: String, // key in CompanyData, key for WarrantDetails pub issuer_company_name: Option, // key in CompanyData pub warrant_type: String, // "put" or "call" @@ -101,7 +98,6 @@ pub struct WarrantDetails { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OptionData { - pub company_id: String, // key in CompanyData pub company_name: String, // key in CompanyData pub expiration_dates: Vec, pub strikes: Vec, @@ -149,7 +145,6 @@ pub struct BondDetails { /// ticker: "WTFC 4.3 01/12/26 0003" #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CorporateBondData { - pub company_id: String, // key - company id issuing the bond pub underlying_company_name: String, // key - company name issuing the bond pub bonds: HashMap>, // ISIN -> Vec (grouped by ISIN) #[serde(skip_serializing_if = "HashMap::is_empty", default)] diff --git a/src/corporate/update.rs b/src/corporate/update.rs index 05e22a3..b75a6ee 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -64,6 +64,7 @@ pub async fn run_full_update( update_securities().await?; logger::log_info(" ✓ Securities map updated").await; + let paths = DataPaths::new(".")?; check_shutdown!(shutdown_flag); diff --git a/src/corporate/update_companies.rs b/src/corporate/update_companies.rs index e0627a8..cef62a3 100644 --- a/src/corporate/update_companies.rs +++ b/src/corporate/update_companies.rs @@ -98,8 +98,7 @@ pub async fn update_companies( // Synchronization for hard reset let reset_in_progress = Arc::new(tokio::sync::Mutex::new(false)); - let path = DataPaths::new(".")?; - let securities_path = path.corporate_dir().join("figi_securities"); + let securities_path = paths.corporate_dir().join("figi_securities"); let securities_checkpoint = securities_path.join("common_stocks.jsonl"); let securities_log = securities_path.join("common_stocks.log.jsonl"); diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index 29c4b16..29a58be 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -178,7 +178,7 @@ pub async fn companies_yahoo_cleansed_low_profile( } let manager = StateManager::new(paths.integrity_dir()).await?; - let step_name = "yahoo_companies_cleansed_no_data"; + let step_name = "yahoo_companies_cleansed_low_profile"; let content_reference = file_reference(&checkpoint_path); if manager.is_step_valid(step_name).await? { diff --git a/src/corporate/update_openfigi.rs b/src/corporate/update_openfigi.rs index fca515c..c1535da 100644 --- a/src/corporate/update_openfigi.rs +++ b/src/corporate/update_openfigi.rs @@ -275,12 +275,26 @@ async fn append_processed_sector(path: &Path, sector_name: &str) -> anyhow::Resu Ok(()) } -/// Loads checkpoint and replays log, returning set of existing keys -async fn load_checkpoint_and_replay( +/// Generic function to load checkpoint and replay log with custom key extraction +/// +/// This function handles the common pattern of loading and merging checkpoint and log files, +/// with custom key extraction logic provided by a closure. +/// +/// # Arguments +/// * `checkpoint_path` - Path to checkpoint file +/// * `log_path` - Path to log file +/// * `key_extractor` - Closure that extracts a key from a JSON entry +/// +/// # Returns +/// HashSet of extracted keys +async fn load_checkpoint_and_replay_generic( checkpoint_path: &Path, log_path: &Path, - key_field: &str, -) -> anyhow::Result> { + key_extractor: F, +) -> anyhow::Result> +where + F: Fn(&Value) -> Option, +{ let mut keys = HashSet::new(); // Load checkpoint if it exists @@ -290,12 +304,12 @@ async fn load_checkpoint_and_replay( for line in content.lines() { if line.trim().is_empty() || !line.ends_with('}') { - continue; // Skip incomplete lines + continue; } if let Ok(entry) = serde_json::from_str::(line) { - if let Some(key) = entry[key_field].as_str() { - keys.insert(key.to_string()); + if let Some(key) = key_extractor(&entry) { + keys.insert(key); } } } @@ -308,12 +322,12 @@ async fn load_checkpoint_and_replay( for line in content.lines() { if line.trim().is_empty() || !line.ends_with('}') { - continue; // Skip incomplete lines + continue; } if let Ok(entry) = serde_json::from_str::(line) { - if let Some(key) = entry[key_field].as_str() { - keys.insert(key.to_string()); + if let Some(key) = key_extractor(&entry) { + keys.insert(key); } } } @@ -322,64 +336,36 @@ async fn load_checkpoint_and_replay( Ok(keys) } +/// Loads checkpoint and replays log, returning set of existing keys (simple field extraction) +async fn load_checkpoint_and_replay( + checkpoint_path: &Path, + log_path: &Path, + key_field: &str, +) -> anyhow::Result> { + load_checkpoint_and_replay_generic(checkpoint_path, log_path, |entry| { + entry[key_field].as_str().map(|s| s.to_string()) + }).await +} + /// Loads checkpoint and replays log for nested structures (warrants/options) async fn load_checkpoint_and_replay_nested( checkpoint_path: &Path, log_path: &Path, ) -> anyhow::Result> { - let mut keys = HashSet::new(); - - // Load checkpoint if it exists - if checkpoint_path.exists() { - let content = tokio_fs::read_to_string(checkpoint_path).await - .context("Failed to read checkpoint")?; + load_checkpoint_and_replay_generic(checkpoint_path, log_path, |entry| { + let underlying = entry["underlying_company_name"].as_str().unwrap_or(""); + let type_field = if entry.get("warrant_type").is_some() { + entry["warrant_type"].as_str().unwrap_or("") + } else { + entry["option_type"].as_str().unwrap_or("") + }; - for line in content.lines() { - if line.trim().is_empty() || !line.ends_with('}') { - continue; - } - - if let Ok(entry) = serde_json::from_str::(line) { - let underlying = entry["underlying_company_name"].as_str().unwrap_or(""); - let type_field = if entry.get("warrant_type").is_some() { - entry["warrant_type"].as_str().unwrap_or("") - } else { - entry["option_type"].as_str().unwrap_or("") - }; - - if !underlying.is_empty() && !type_field.is_empty() { - keys.insert(format!("{}::{}", underlying, type_field)); - } - } + if !underlying.is_empty() && !type_field.is_empty() { + Some(format!("{}::{}", underlying, type_field)) + } else { + None } - } - - // Replay log if it exists - if log_path.exists() { - let content = tokio_fs::read_to_string(log_path).await - .context("Failed to read log")?; - - for line in content.lines() { - if line.trim().is_empty() || !line.ends_with('}') { - continue; - } - - if let Ok(entry) = serde_json::from_str::(line) { - let underlying = entry["underlying_company_name"].as_str().unwrap_or(""); - let type_field = if entry.get("warrant_type").is_some() { - entry["warrant_type"].as_str().unwrap_or("") - } else { - entry["option_type"].as_str().unwrap_or("") - }; - - if !underlying.is_empty() && !type_field.is_empty() { - keys.insert(format!("{}::{}", underlying, type_field)); - } - } - } - } - - Ok(keys) + }).await } /// Creates a checkpoint by copying log to checkpoint atomically @@ -454,11 +440,10 @@ async fn process_lei_figi_file_batched( let batch_size = 100; let mut processed_count = 0; - // === PHASE 1: Process common stocks and build company_id mapping === + // === PHASE 1: Process common stocks === logger::log_info(" Phase 1: Processing common stocks...").await; let mut common_batch: Vec = Vec::new(); - let mut company_id_map: HashMap = HashMap::new(); // company_name -> company_id for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() { @@ -480,14 +465,7 @@ async fn process_lei_figi_file_batched( // Process common stocks if !common_stocks.is_empty() { - if let Some(mut entry) = prepare_common_stock_entry(&common_stocks, existing_companies) { - // Generate UUID for company if not already done - if !company_id_map.contains_key(&entry.name) { - let company_id = uuid::Uuid::new_v4().to_string(); - company_id_map.insert(entry.name.clone(), company_id.clone()); - entry.id = company_id; - } - + if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) { // Add to existing set immediately to prevent duplicates in same run existing_companies.insert(entry.name.clone()); common_batch.push(entry); @@ -513,9 +491,9 @@ async fn process_lei_figi_file_batched( stats.companies_added += common_batch.len(); } - logger::log_info(&format!(" Phase 1 complete: Generated {} company UUIDs", company_id_map.len())).await; + logger::log_info(" Phase 1 complete").await; - // === PHASE 2: Process dependent securities using company_id mapping === + // === PHASE 2: Process dependent securities (warrants, options, corporate bonds) === logger::log_info(" Phase 2: Processing warrants, options, and corporate bonds...").await; let mut warrants_batch: Vec = Vec::new(); @@ -545,15 +523,15 @@ async fn process_lei_figi_file_batched( group_securities(&figis); if !warrant_securities.is_empty() { - for entry in prepare_warrant_entries(&warrant_securities, existing_warrants, &company_id_map) { - let key = entry.company_id.clone(); + for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) { + let key = entry.company_name.clone(); existing_warrants.insert(key); warrants_batch.push(entry); } } if !option_securities.is_empty() { - for entry in prepare_option_entries(&option_securities, existing_options, &company_id_map) { + for entry in prepare_option_entries(&option_securities, existing_options) { let key = entry.company_name.clone(); existing_options.insert(key); options_batch.push(entry); @@ -561,7 +539,7 @@ async fn process_lei_figi_file_batched( } if !corporate_bonds_securities.is_empty() { - for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds, &company_id_map) { + for entry in prepare_corporate_bond_entries(&corporate_bonds_securities, existing_corporate_bonds) { let key = entry.underlying_company_name.clone(); existing_corporate_bonds.insert(key); corporate_bonds_batch.push(entry); @@ -671,13 +649,8 @@ fn prepare_common_stock_entry( let grouped_by_isin = group_figis_by_isin(figi_infos); let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default(); - let id = format!("company_{}", std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos()); Some(CompanyData { - id, name, primary_isin, securities: grouped_by_isin, @@ -688,12 +661,10 @@ fn prepare_common_stock_entry( /// Prepares warrant entries for batching /// Prepares warrant entries for batching /// -/// Groups warrant contracts by underlying company, using company_id from the company_id_map -/// if the company exists, otherwise generates a new ID for the warrant. +/// Groups warrant contracts by underlying company. fn prepare_warrant_entries( warrant_securities: &[FigiData], existing_keys: &HashSet, - company_id_map: &HashMap, ) -> Vec { let mut entries = Vec::new(); @@ -718,18 +689,12 @@ fn prepare_warrant_entries( continue; } - // Use company_id from map if company exists, otherwise generate new ID for warrant - let company_id = company_id_map.get(&underlying_company) - .cloned() - .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); - let mut warrants_by_type: HashMap = HashMap::new(); for (warrant_type, figi) in contracts { let (_, issuer, _) = parse_warrant_name(&figi.name); let warrant_detail = WarrantDetails { - company_id: company_id.clone(), company_name: underlying_company.clone(), issuer_company_name: issuer, warrant_type: warrant_type.clone(), @@ -745,7 +710,6 @@ fn prepare_warrant_entries( } let warrant_info = WarrantData { - company_id, company_name: underlying_company.clone(), warrants: warrants_by_type, }; @@ -769,12 +733,10 @@ fn prepare_warrant_entries( /// Vector of OptionData entries, one per unique underlying company /// Prepares option entries for batching /// -/// Groups option contracts by underlying company, using company_id from the company_id_map -/// if the company exists, otherwise generates a new ID for the option. +/// Groups option contracts by underlying company. fn prepare_option_entries( option_securities: &[FigiData], existing_keys: &HashSet, - company_id_map: &HashMap, ) -> Vec { let mut entries = Vec::new(); @@ -799,11 +761,6 @@ fn prepare_option_entries( continue; } - // Use company_id from map if company exists, otherwise generate new ID for option - let company_id = company_id_map.get(&underlying_company) - .cloned() - .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); - // Build OptionContracts and extract strikes/expirations let mut option_contracts: HashMap, Vec)> = HashMap::new(); let mut all_strikes: std::collections::HashSet = std::collections::HashSet::new(); @@ -857,7 +814,6 @@ fn prepare_option_entries( .collect::>(); let option_data = OptionData { - company_id, company_name: underlying_company.clone(), expiration_dates, strikes, @@ -884,12 +840,10 @@ fn prepare_option_entries( /// Vector of CorporateBondInfo entries, one per unique issuer /// Prepares corporate bond entries for batching /// -/// Groups corporate bonds by issuer (underlying_company_name), using company_id from the company_id_map -/// if the company exists, otherwise generates a new ID for the bond. +/// Groups corporate bonds by issuer (underlying_company_name). fn prepare_corporate_bond_entries( corporate_bond_securities: &[FigiData], existing_keys: &HashSet, - company_id_map: &HashMap, ) -> Vec { let mut entries = Vec::new(); @@ -912,11 +866,6 @@ fn prepare_corporate_bond_entries( continue; } - // Use company_id from map if company exists, otherwise generate new ID for bond - let company_id = company_id_map.get(&issuer) - .cloned() - .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); - // Group by ISIN let bonds_by_isin = group_figis_by_isin(&figis); @@ -931,7 +880,6 @@ fn prepare_corporate_bond_entries( } let bond_info = CorporateBondData { - company_id, underlying_company_name: issuer.clone(), bonds: bonds_by_isin, bond_details: bond_details_map, diff --git a/src/scraper/yahoo.rs b/src/scraper/yahoo.rs index 42d4204..421df48 100644 --- a/src/scraper/yahoo.rs +++ b/src/scraper/yahoo.rs @@ -927,14 +927,7 @@ impl YahooClient { } } - // Generate company_id from symbol - let company_id = format!("option_{}", std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos()); - Ok(OptionData { - company_id, company_name: symbol.to_string(), expiration_dates, strikes, diff --git a/src/util/directories.rs b/src/util/directories.rs index 35b026a..86668b6 100644 --- a/src/util/directories.rs +++ b/src/util/directories.rs @@ -14,14 +14,14 @@ pub struct DataPaths { cache_openfigi_dir: PathBuf, cache_gleif_openfigi_map_dir: PathBuf, cache_openvpn_dir: PathBuf, + // Figi Securities data subdirectories + figi_securities_dir: PathBuf, // Economic data subdirectories economic_events_dir: PathBuf, economic_changes_dir: PathBuf, + economic_currency_dir: PathBuf, // Corporate data subdirectories corporate_dir: PathBuf, - corporate_events_dir: PathBuf, - corporate_changes_dir: PathBuf, - corporate_prices_dir: PathBuf, } impl DataPaths { @@ -40,15 +40,16 @@ impl DataPaths { let cache_gleif_openfigi_map_dir = cache_dir.join("glei_openfigi"); let cache_openvpn_dir = cache_dir.join("openvpn"); + // Figi Securities subdirectories + let figi_securities_dir = data_dir.join("figi_securities"); + // Economic subdirectories let economic_events_dir = data_dir.join("economic").join("events"); let economic_changes_dir = economic_events_dir.join("changes"); + let economic_currency_dir = data_dir.join("economic").join("currency"); // Corporate subdirectories let corporate_dir = data_dir.join("corporate"); - let corporate_events_dir = corporate_dir.join("events"); - let corporate_changes_dir = corporate_events_dir.join("changes"); - let corporate_prices_dir = corporate_dir.join("prices"); // Create all directories if they don't exist fs::create_dir_all(&data_dir)?; @@ -59,12 +60,11 @@ impl DataPaths { fs::create_dir_all(&cache_openfigi_dir)?; fs::create_dir_all(&cache_gleif_openfigi_map_dir)?; fs::create_dir_all(&cache_openvpn_dir)?; + fs::create_dir_all(&figi_securities_dir)?; fs::create_dir_all(&economic_events_dir)?; fs::create_dir_all(&economic_changes_dir)?; + fs::create_dir_all(&economic_currency_dir)?; fs::create_dir_all(&corporate_dir)?; - fs::create_dir_all(&corporate_events_dir)?; - fs::create_dir_all(&corporate_changes_dir)?; - fs::create_dir_all(&corporate_prices_dir)?; Ok(Self { base_dir, @@ -76,12 +76,11 @@ impl DataPaths { cache_openfigi_dir, cache_gleif_openfigi_map_dir, cache_openvpn_dir, + figi_securities_dir, economic_events_dir, economic_changes_dir, + economic_currency_dir, corporate_dir, - corporate_events_dir, - corporate_changes_dir, - corporate_prices_dir, }) } @@ -121,6 +120,10 @@ impl DataPaths { &self.cache_openvpn_dir } + pub fn figi_securities_dir(&self) -> &Path { + &self.figi_securities_dir + } + /// Get the economic events directory pub fn economic_events_dir(&self) -> &Path { &self.economic_events_dir @@ -130,26 +133,15 @@ impl DataPaths { pub fn economic_changes_dir(&self) -> &Path { &self.economic_changes_dir } - + + pub fn economic_currency_dir(&self) -> &Path { + &self.economic_currency_dir + } + /// Get the corporate events directory pub fn corporate_dir(&self) -> &Path { &self.corporate_dir } - - /// Get the corporate events directory - pub fn corporate_events_dir(&self) -> &Path { - &self.corporate_events_dir - } - - /// Get the corporate changes directory - pub fn corporate_changes_dir(&self) -> &Path { - &self.corporate_changes_dir - } - - /// Get the corporate prices directory - pub fn corporate_prices_dir(&self) -> &Path { - &self.corporate_prices_dir - } /// Get a specific file path within data directory pub fn data_file(&self, filename: &str) -> PathBuf { @@ -179,8 +171,5 @@ mod tests { assert!(paths.logs_dir().exists()); assert!(paths.economic_events_dir().exists()); assert!(paths.economic_changes_dir().exists()); - assert!(paths.corporate_events_dir().exists()); - assert!(paths.corporate_changes_dir().exists()); - assert!(paths.corporate_prices_dir().exists()); } } \ No newline at end of file diff --git a/src/util/integrity.rs b/src/util/integrity.rs index 0613591..fffa986 100644 --- a/src/util/integrity.rs +++ b/src/util/integrity.rs @@ -809,7 +809,12 @@ impl StateManager { return Ok(entries); } - let content = async_fs::read_to_string(&self.base_dir.join("state.jsonl")).await?; + let state_file = self.base_dir.join("state.jsonl"); + if !state_file.exists() { + return Ok(entries); + } + + let content = async_fs::read_to_string(&state_file).await?; for line in content.lines() { if line.trim().is_empty() {