diff --git a/src/checkpoint_dependencies.toml b/integrity/checkpoint_dependencies.toml similarity index 100% rename from src/checkpoint_dependencies.toml rename to integrity/checkpoint_dependencies.toml diff --git a/logs/checkpoint_dependencies.dot b/logs/checkpoint_dependencies.dot index ef63945..b6b9339 100644 --- a/logs/checkpoint_dependencies.dot +++ b/logs/checkpoint_dependencies.dot @@ -2,5 +2,24 @@ digraph Dependencies { rankdir=LR; node [shape=box]; + "yahoo_companies_cleansed" [label="yahoo_companies_cleansed +Company data cleansed and validated"]; + "yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete +Corporate events enriched for all companies"]; + "yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete +Options data enriched for all companies"]; + "lei_figi_mapping_complete" [label="lei_figi_mapping_complete +LEI-to-FIGI mappings from OpenFIGI API"]; + "exchange_collection_complete" [label="exchange_collection_complete +Yahoo exchanges collected and validated"]; + "securities_data_complete" [label="securities_data_complete +Securities data built from FIGI mappings"]; + "yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete +Chart data enriched for all companies"]; + "yahoo_companies_cleansed" -> "exchange_collection_complete"; + "yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; + "yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; + "securities_data_complete" -> "lei_figi_mapping_complete"; + "yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; } diff --git a/src/corporate/collect_exchanges.rs b/src/corporate/collect_exchanges.rs index 6b5994c..1415b6b 100644 --- a/src/corporate/collect_exchanges.rs +++ b/src/corporate/collect_exchanges.rs @@ -245,7 +245,7 @@ fn get_fallback_rate(currency: &str) -> f64 { /// - Integrity tracking with content hash validation pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result { let state_path = paths.data_dir().join("state.jsonl"); - let manager = StateManager::new(&state_path, &paths.data_dir().to_path_buf())?; + let manager = StateManager::new(paths.integrity_dir())?; let step_name = "exchange_collection_complete"; let output_path = paths.data_dir().join("yahoo_exchanges.json"); diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index b804f8a..c600b02 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -47,7 +47,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result anyhow::Result<()> { let dir = DataPaths::new(".")?; let state_path = dir.data_dir().join("state.jsonl"); - let manager = StateManager::new(&state_path, &dir.data_dir().to_path_buf())?; + let manager = StateManager::new(&dir.integrity_dir())?; let step_name = "securities_data_complete"; let data_dir = dir.data_dir(); @@ -1233,7 +1233,7 @@ pub async fn update_lei_mapping( ) -> anyhow::Result { let dir = DataPaths::new(".")?; let state_path = dir.cache_dir().join("state.jsonl"); - let manager = StateManager::new(&state_path, &dir.cache_dir().to_path_buf())?; + let manager = StateManager::new(&dir.integrity_dir())?; let step_name = "lei_figi_mapping_complete"; let map_cache_dir = dir.cache_gleif_openfigi_map_dir(); diff --git a/src/economic/yahoo_update_forex.rs b/src/economic/yahoo_update_forex.rs index 6d4a994..03c00e7 100644 --- a/src/economic/yahoo_update_forex.rs +++ b/src/economic/yahoo_update_forex.rs @@ -94,7 +94,7 @@ pub async fn collect_fx_rates( let log_path = data_path.join("fx_rates_updates.log"); let state_path = data_path.join("state.jsonl"); - let manager = StateManager::new(&state_path, &data_path.to_path_buf())?; + let manager = StateManager::new(paths.integrity_dir())?; let step_name = "yahoo_fx_rate_collection_completed"; let content_reference = directory_reference(&output_path, Some(vec![ diff --git a/src/main.rs b/src/main.rs index b3dc936..21c3878 100644 --- a/src/main.rs +++ b/src/main.rs @@ -234,20 +234,15 @@ fn format_duration(duration: Duration) -> String { format!("{:02}::{:02}::{:02}::{:02}", days, hours, minutes, seconds) } -async fn create_state_files(paths: &DataPaths) -> Result<()> { - let paths = ( - paths.data_dir().join("state.jsonl"), - paths.cache_dir().join("state.jsonl"), - ); +async fn create_state_file(paths: &DataPaths) -> Result<()> { + let integrity_path = paths.integrity_dir().join("state.jsonl"); // Use OpenOptions to create the file only if it doesn't exist - for path in &[&paths.0, &paths.1] { - OpenOptions::new() - .create(true) // Create if it doesn't exist - .write(true) // Ensure we can write to the file - .open(path)?; - logger::log_info(&format!("Checked or created file: {}", path.display())).await; - } + OpenOptions::new() + .create(true) // Create if it doesn't exist + .write(true) // Ensure we can write to the file + .open(&integrity_path)?; + logger::log_info(&format!("Checked or created file: {}", integrity_path.display())).await; Ok(()) } @@ -255,8 +250,7 @@ async fn create_state_files(paths: &DataPaths) -> Result<()> { async fn visualize_checkpoint_dependencies(paths: &DataPaths) -> Result<()> { // Add more detailed error handling match StateManager::new( - &paths.data_dir().join("state.jsonl"), - &paths.data_dir().to_path_buf() + paths.integrity_dir(), ) { Ok(manager) => { logger::log_info("✓ Dependency configuration loaded successfully").await; @@ -288,7 +282,7 @@ async fn main() -> Result<()> { start_docker_desktop().await; cleanup_all_proxy_containers().await.ok(); - create_state_files(&paths).await.ok(); + create_state_file(&paths).await.ok(); visualize_checkpoint_dependencies(&paths).await.ok(); let config = Config::load().unwrap_or_else(|_| { diff --git a/src/util/directories.rs b/src/util/directories.rs index c1ebd80..35b026a 100644 --- a/src/util/directories.rs +++ b/src/util/directories.rs @@ -8,6 +8,7 @@ pub struct DataPaths { data_dir: PathBuf, cache_dir: PathBuf, logs_dir: PathBuf, + integrity_dir: PathBuf, // Cache data subdirectories cache_gleif_dir: PathBuf, cache_openfigi_dir: PathBuf, @@ -31,6 +32,7 @@ impl DataPaths { let data_dir = base_dir.join("data"); let cache_dir = base_dir.join("cache"); let logs_dir = base_dir.join("logs"); + let integrity_dir = base_dir.join("integrity"); // Cache subdirectories let cache_gleif_dir = cache_dir.join("gleif"); @@ -52,6 +54,7 @@ impl DataPaths { fs::create_dir_all(&data_dir)?; fs::create_dir_all(&cache_dir)?; fs::create_dir_all(&logs_dir)?; + fs::create_dir_all(&integrity_dir)?; fs::create_dir_all(&cache_gleif_dir)?; fs::create_dir_all(&cache_openfigi_dir)?; fs::create_dir_all(&cache_gleif_openfigi_map_dir)?; @@ -68,6 +71,7 @@ impl DataPaths { data_dir, cache_dir, logs_dir, + integrity_dir, cache_gleif_dir, cache_openfigi_dir, cache_gleif_openfigi_map_dir, @@ -92,6 +96,10 @@ impl DataPaths { pub fn cache_dir(&self) -> &Path { &self.cache_dir } + + pub fn integrity_dir(&self) -> &Path { + &self.integrity_dir + } pub fn logs_dir(&self) -> &Path { &self.logs_dir diff --git a/src/util/integrity.rs b/src/util/integrity.rs index 99b0c40..3accb9d 100644 --- a/src/util/integrity.rs +++ b/src/util/integrity.rs @@ -766,19 +766,17 @@ impl ValidationReport { /// State manager with centralized dependency configuration pub struct StateManager { - state_path: PathBuf, base_dir: PathBuf, dependency_config: DependencyConfig, } impl StateManager { /// Create new state manager and load dependency configuration - pub fn new>(state_path: P, base_dir: P) -> Result { + pub fn new>(base_dir: P) -> Result { let base_dir = base_dir.as_ref().to_path_buf(); let dependency_config = DependencyConfig::from_default_location(&base_dir)?; Ok(Self { - state_path: state_path.as_ref().to_path_buf(), base_dir, dependency_config, }) @@ -786,14 +784,12 @@ impl StateManager { /// Create with explicit dependency configuration pub fn with_config>( - state_path: P, base_dir: P, dependency_config: DependencyConfig, ) -> Result { dependency_config.validate()?; Ok(Self { - state_path: state_path.as_ref().to_path_buf(), base_dir: base_dir.as_ref().to_path_buf(), dependency_config, }) @@ -808,11 +804,11 @@ impl StateManager { pub async fn load_entries(&self) -> Result> { let mut entries = HashMap::new(); - if !self.state_path.exists() { + if !self.base_dir.exists() { return Ok(entries); } - let content = async_fs::read_to_string(&self.state_path).await?; + let content = async_fs::read_to_string(&self.base_dir).await?; for line in content.lines() { if line.trim().is_empty() { @@ -829,11 +825,11 @@ impl StateManager { /// Save all state entries to state.jsonl pub async fn save_entries(&self, entries: &HashMap) -> Result<()> { - if let Some(parent) = self.state_path.parent() { + if let Some(parent) = self.base_dir.parent() { async_fs::create_dir_all(parent).await?; } - - let mut file = async_fs::File::create(&self.state_path).await?; + + let mut file = async_fs::File::create(&self.base_dir).await?; for entry in entries.values() { let line = serde_json::to_string(&entry)? + "\n"; @@ -846,7 +842,7 @@ impl StateManager { } /// Create or update a state entry with integrity tracking - /// **UPDATED**: Dependencies are now resolved automatically from config + /// Dependencies are now resolved automatically from config pub async fn update_entry( &self, step_name: String,