added integrity dir for set data collection; one state.jsonl

This commit is contained in:
2026-01-11 16:57:36 +01:00
parent e6f8393660
commit bd74f36f4c
10 changed files with 52 additions and 35 deletions

View File

@@ -2,5 +2,24 @@ digraph Dependencies {
rankdir=LR;
node [shape=box];
"yahoo_companies_cleansed" [label="yahoo_companies_cleansed
Company data cleansed and validated"];
"yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete
Corporate events enriched for all companies"];
"yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete
Options data enriched for all companies"];
"lei_figi_mapping_complete" [label="lei_figi_mapping_complete
LEI-to-FIGI mappings from OpenFIGI API"];
"exchange_collection_complete" [label="exchange_collection_complete
Yahoo exchanges collected and validated"];
"securities_data_complete" [label="securities_data_complete
Securities data built from FIGI mappings"];
"yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete
Chart data enriched for all companies"];
"yahoo_companies_cleansed" -> "exchange_collection_complete";
"yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"];
"yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"];
"securities_data_complete" -> "lei_figi_mapping_complete";
"yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"];
}

View File

@@ -245,7 +245,7 @@ fn get_fallback_rate(currency: &str) -> f64 {
/// - Integrity tracking with content hash validation
pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usize> {
let state_path = paths.data_dir().join("state.jsonl");
let manager = StateManager::new(&state_path, &paths.data_dir().to_path_buf())?;
let manager = StateManager::new(paths.integrity_dir())?;
let step_name = "exchange_collection_complete";
let output_path = paths.data_dir().join("yahoo_exchanges.json");

View File

@@ -47,7 +47,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let manager = StateManager::new(paths.integrity_dir())?;
let step_name = "yahoo_companies_cleansed_no_data";
let content_reference = file_reference(&output_path);
@@ -179,7 +179,7 @@ pub async fn companies_yahoo_cleansed_low_profile(
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let manager = StateManager::new(paths.integrity_dir())?;
let step_name = "yahoo_companies_cleansed_no_data";
let content_reference = file_reference(&checkpoint_path);

View File

@@ -81,7 +81,7 @@ pub async fn enrich_companies_with_events(
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let manager = StateManager::new(paths.integrity_dir())?;
let step_name = "yahoo_events_enrichment_complete";
if manager.is_step_valid(step_name).await? {
@@ -418,7 +418,7 @@ pub async fn enrich_companies_with_option(
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let manager = StateManager::new(paths.integrity_dir())?;
let step_name = "yahoo_option_enrichment_complete";
if manager.is_step_valid(step_name).await? {
@@ -678,7 +678,7 @@ pub async fn enrich_companies_with_chart(
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let manager = StateManager::new(paths.integrity_dir())?;
let step_name = "yahoo_chart_enrichment_complete";
if manager.is_step_valid(step_name).await? {

View File

@@ -103,7 +103,7 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> {
let dir = DataPaths::new(".")?;
let state_path = dir.data_dir().join("state.jsonl");
let manager = StateManager::new(&state_path, &dir.data_dir().to_path_buf())?;
let manager = StateManager::new(&dir.integrity_dir())?;
let step_name = "securities_data_complete";
let data_dir = dir.data_dir();
@@ -1233,7 +1233,7 @@ pub async fn update_lei_mapping(
) -> anyhow::Result<bool> {
let dir = DataPaths::new(".")?;
let state_path = dir.cache_dir().join("state.jsonl");
let manager = StateManager::new(&state_path, &dir.cache_dir().to_path_buf())?;
let manager = StateManager::new(&dir.integrity_dir())?;
let step_name = "lei_figi_mapping_complete";
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();

View File

@@ -94,7 +94,7 @@ pub async fn collect_fx_rates(
let log_path = data_path.join("fx_rates_updates.log");
let state_path = data_path.join("state.jsonl");
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let manager = StateManager::new(paths.integrity_dir())?;
let step_name = "yahoo_fx_rate_collection_completed";
let content_reference = directory_reference(&output_path,
Some(vec![

View File

@@ -234,20 +234,15 @@ fn format_duration(duration: Duration) -> String {
format!("{:02}::{:02}::{:02}::{:02}", days, hours, minutes, seconds)
}
async fn create_state_files(paths: &DataPaths) -> Result<()> {
let paths = (
paths.data_dir().join("state.jsonl"),
paths.cache_dir().join("state.jsonl"),
);
async fn create_state_file(paths: &DataPaths) -> Result<()> {
let integrity_path = paths.integrity_dir().join("state.jsonl");
// Use OpenOptions to create the file only if it doesn't exist
for path in &[&paths.0, &paths.1] {
OpenOptions::new()
.create(true) // Create if it doesn't exist
.write(true) // Ensure we can write to the file
.open(path)?;
logger::log_info(&format!("Checked or created file: {}", path.display())).await;
}
OpenOptions::new()
.create(true) // Create if it doesn't exist
.write(true) // Ensure we can write to the file
.open(&integrity_path)?;
logger::log_info(&format!("Checked or created file: {}", integrity_path.display())).await;
Ok(())
}
@@ -255,8 +250,7 @@ async fn create_state_files(paths: &DataPaths) -> Result<()> {
async fn visualize_checkpoint_dependencies(paths: &DataPaths) -> Result<()> {
// Add more detailed error handling
match StateManager::new(
&paths.data_dir().join("state.jsonl"),
&paths.data_dir().to_path_buf()
paths.integrity_dir(),
) {
Ok(manager) => {
logger::log_info("✓ Dependency configuration loaded successfully").await;
@@ -288,7 +282,7 @@ async fn main() -> Result<()> {
start_docker_desktop().await;
cleanup_all_proxy_containers().await.ok();
create_state_files(&paths).await.ok();
create_state_file(&paths).await.ok();
visualize_checkpoint_dependencies(&paths).await.ok();
let config = Config::load().unwrap_or_else(|_| {

View File

@@ -8,6 +8,7 @@ pub struct DataPaths {
data_dir: PathBuf,
cache_dir: PathBuf,
logs_dir: PathBuf,
integrity_dir: PathBuf,
// Cache data subdirectories
cache_gleif_dir: PathBuf,
cache_openfigi_dir: PathBuf,
@@ -31,6 +32,7 @@ impl DataPaths {
let data_dir = base_dir.join("data");
let cache_dir = base_dir.join("cache");
let logs_dir = base_dir.join("logs");
let integrity_dir = base_dir.join("integrity");
// Cache subdirectories
let cache_gleif_dir = cache_dir.join("gleif");
@@ -52,6 +54,7 @@ impl DataPaths {
fs::create_dir_all(&data_dir)?;
fs::create_dir_all(&cache_dir)?;
fs::create_dir_all(&logs_dir)?;
fs::create_dir_all(&integrity_dir)?;
fs::create_dir_all(&cache_gleif_dir)?;
fs::create_dir_all(&cache_openfigi_dir)?;
fs::create_dir_all(&cache_gleif_openfigi_map_dir)?;
@@ -68,6 +71,7 @@ impl DataPaths {
data_dir,
cache_dir,
logs_dir,
integrity_dir,
cache_gleif_dir,
cache_openfigi_dir,
cache_gleif_openfigi_map_dir,
@@ -93,6 +97,10 @@ impl DataPaths {
&self.cache_dir
}
pub fn integrity_dir(&self) -> &Path {
&self.integrity_dir
}
pub fn logs_dir(&self) -> &Path {
&self.logs_dir
}

View File

@@ -766,19 +766,17 @@ impl ValidationReport {
/// State manager with centralized dependency configuration
pub struct StateManager {
state_path: PathBuf,
base_dir: PathBuf,
dependency_config: DependencyConfig,
}
impl StateManager {
/// Create new state manager and load dependency configuration
pub fn new<P: AsRef<Path>>(state_path: P, base_dir: P) -> Result<Self> {
pub fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
let base_dir = base_dir.as_ref().to_path_buf();
let dependency_config = DependencyConfig::from_default_location(&base_dir)?;
Ok(Self {
state_path: state_path.as_ref().to_path_buf(),
base_dir,
dependency_config,
})
@@ -786,14 +784,12 @@ impl StateManager {
/// Create with explicit dependency configuration
pub fn with_config<P: AsRef<Path>>(
state_path: P,
base_dir: P,
dependency_config: DependencyConfig,
) -> Result<Self> {
dependency_config.validate()?;
Ok(Self {
state_path: state_path.as_ref().to_path_buf(),
base_dir: base_dir.as_ref().to_path_buf(),
dependency_config,
})
@@ -808,11 +804,11 @@ impl StateManager {
pub async fn load_entries(&self) -> Result<HashMap<String, StateEntry>> {
let mut entries = HashMap::new();
if !self.state_path.exists() {
if !self.base_dir.exists() {
return Ok(entries);
}
let content = async_fs::read_to_string(&self.state_path).await?;
let content = async_fs::read_to_string(&self.base_dir).await?;
for line in content.lines() {
if line.trim().is_empty() {
@@ -829,11 +825,11 @@ impl StateManager {
/// Save all state entries to state.jsonl
pub async fn save_entries(&self, entries: &HashMap<String, StateEntry>) -> Result<()> {
if let Some(parent) = self.state_path.parent() {
if let Some(parent) = self.base_dir.parent() {
async_fs::create_dir_all(parent).await?;
}
let mut file = async_fs::File::create(&self.state_path).await?;
let mut file = async_fs::File::create(&self.base_dir).await?;
for entry in entries.values() {
let line = serde_json::to_string(&entry)? + "\n";
@@ -846,7 +842,7 @@ impl StateManager {
}
/// Create or update a state entry with integrity tracking
/// **UPDATED**: Dependencies are now resolved automatically from config
/// Dependencies are now resolved automatically from config
pub async fn update_entry(
&self,
step_name: String,