migrated checkpoint handling in integrity.rs to ssot principle

This commit is contained in:
2026-01-11 13:05:31 +01:00
parent 0487c2ec49
commit aff340ee2f
15 changed files with 880 additions and 579 deletions

41
Cargo.lock generated
View File

@@ -2743,6 +2743,15 @@ dependencies = [
"serde_core",
]
[[package]]
name = "serde_spanned"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776"
dependencies = [
"serde_core",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@@ -3213,10 +3222,25 @@ dependencies = [
]
[[package]]
name = "toml_datetime"
version = "0.7.3"
name = "toml"
version = "0.9.11+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533"
checksum = "f3afc9a848309fe1aaffaed6e1546a7a14de1f935dc9d89d32afd9a44bab7c46"
dependencies = [
"indexmap",
"serde_core",
"serde_spanned",
"toml_datetime",
"toml_parser",
"toml_writer",
"winnow",
]
[[package]]
name = "toml_datetime"
version = "0.7.5+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347"
dependencies = [
"serde_core",
]
@@ -3235,13 +3259,19 @@ dependencies = [
[[package]]
name = "toml_parser"
version = "1.0.4"
version = "1.0.6+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e"
checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44"
dependencies = [
"winnow",
]
[[package]]
name = "toml_writer"
version = "1.0.6+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607"
[[package]]
name = "tower"
version = "0.5.2"
@@ -3635,6 +3665,7 @@ dependencies = [
"sha2",
"tokio",
"tokio-tungstenite 0.21.0",
"toml",
"tracing",
"tracing-subscriber",
"url",

View File

@@ -40,6 +40,7 @@ rand = "0.9.2"
# Environment handling
dotenvy = "0.15"
toml = "0.9.8"
# Date & time
chrono = { version = "0.4", features = ["serde"] }

View File

@@ -0,0 +1,13 @@
[checkpoints.lei_figi_mapping_complete]
depends_on = []
[checkpoints.securities_data_complete]
depends_on = ["lei_figi_mapping_complete"]
[groups.enrichment_group]
members = ["yahoo_events_enrichment_complete", "yahoo_options_enrichment_complete"]
depends_on = ["yahoo_companies_cleansed"]
[checkpoints.yahoo_events_enrichment_complete]
depends_on = []
group = "enrichment_group"

View File

@@ -32,6 +32,23 @@ pub struct Config {
pub proxy_instances_per_certificate: Option<usize>,
}
impl Default for Config {
fn default() -> Self {
Self {
economic_start_date: "2007-02-13".to_string(),
corporate_start_date: "2010-01-01".to_string(),
economic_lookahead_months: 3,
max_parallel_instances: default_max_parallel_instances(),
max_tasks_per_instance: 0,
max_requests_per_session: default_max_requests_per_session(),
min_request_interval_ms: default_min_request_interval_ms(),
max_retry_attempts: default_max_retry_attempts(),
enable_vpn_rotation: false,
proxy_instances_per_certificate: default_proxy_instances_per_certificate(),
}
}
}
fn default_enable_vpn_rotation() -> bool {
false
}
@@ -54,25 +71,6 @@ fn default_proxy_instances_per_certificate() -> Option<usize> {
Some(1)
}
impl Default for Config {
fn default() -> Self {
Self {
economic_start_date: "2007-02-13".to_string(),
corporate_start_date: "2010-01-01".to_string(),
economic_lookahead_months: 3,
max_parallel_instances: default_max_parallel_instances(),
max_tasks_per_instance: 0,
max_requests_per_session: default_max_requests_per_session(),
min_request_interval_ms: default_min_request_interval_ms(),
max_retry_attempts: default_max_retry_attempts(),
enable_vpn_rotation: false,
proxy_instances_per_certificate: default_proxy_instances_per_certificate(),
}
}
}
impl Config {
/// Loads configuration from environment variables using dotenvy.
pub fn load() -> Result<Self> {

View File

@@ -245,7 +245,7 @@ fn get_fallback_rate(currency: &str) -> f64 {
/// - Integrity tracking with content hash validation
pub async fn collect_and_save_exchanges(paths: &DataPaths) -> anyhow::Result<usize> {
let state_path = paths.data_dir().join("state.jsonl");
let manager = StateManager::new(&state_path, &paths.data_dir().to_path_buf());
let manager = StateManager::new(&state_path, &paths.data_dir().to_path_buf())?;
let step_name = "exchange_collection_complete";
let output_path = paths.data_dir().join("yahoo_exchanges.json");
@@ -407,7 +407,6 @@ async fn track_exchange_collection_completion(
step_name.to_string(),
content_reference,
DataStage::Data,
vec![], // No explicit dependencies - output file serves as verification
None, // Use default TTL (7 days for Data stage)
).await?;

View File

@@ -12,6 +12,7 @@ use crate::util::directories::DataPaths;
use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool;
use crate::scraper::yahoo::{YahooClientPool};
use crate::scraper::openfigi::load_figi_type_lists;
use std::result::Result::Ok;
use std::sync::Arc;

View File

@@ -47,7 +47,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let step_name = "yahoo_companies_cleansed_no_data";
let content_reference = file_reference(&output_path);
@@ -131,7 +131,6 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;
@@ -180,7 +179,7 @@ pub async fn companies_yahoo_cleansed_low_profile(
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let step_name = "yahoo_companies_cleansed_no_data";
let content_reference = file_reference(&checkpoint_path);
@@ -632,7 +631,6 @@ pub async fn companies_yahoo_cleansed_low_profile(
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;

View File

@@ -60,7 +60,7 @@ pub async fn enrich_companies_with_events(
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let step_name = "yahoo_events_enrichment_complete";
if manager.is_step_valid(step_name).await? {
@@ -293,7 +293,6 @@ async fn track_events_completion(
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;

View File

@@ -60,7 +60,7 @@ pub async fn enrich_companies_with_option(
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let step_name = "yahoo_option_enrichment_complete";
if manager.is_step_valid(step_name).await? {
@@ -291,7 +291,7 @@ pub async fn enrich_companies_with_chart(
return Ok(0);
}
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let step_name = "yahoo_chart_enrichment_complete";
if manager.is_step_valid(step_name).await? {
@@ -697,7 +697,6 @@ async fn track_option_completion(
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;
@@ -733,7 +732,6 @@ async fn track_chart_completion(
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;

View File

@@ -1,159 +1,21 @@
// src/corporate/openfigi.rs - STREAMING VERSION
// src/corporate/update_openfigi.rs - STREAMING VERSION
// Key changes: Never load entire GLEIF CSV or FIGI maps into memory
use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, directory_reference};
use crate::util::logger;
use crate::scraper::openfigi::{OpenFigiClient};
use super::types::*;
use reqwest::Client as HttpClient;
use reqwest::header::{HeaderMap, HeaderValue};
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::io::{BufRead, BufReader};
use tokio::time::{sleep, Duration};
use tokio::fs as tokio_fs;
use tokio::io::AsyncWriteExt;
use anyhow::{Context, anyhow};
const LEI_BATCH_SIZE: usize = 100; // Process 100 LEIs at a time
#[derive(Clone)]
pub struct OpenFigiClient {
client: HttpClient,
has_key: bool,
}
impl OpenFigiClient {
pub async fn new() -> anyhow::Result<Self> {
let api_key = dotenvy::var("OPENFIGI_API_KEY").ok();
let has_key = api_key.is_some();
let mut builder = HttpClient::builder()
.user_agent("Mozilla/5.0 (compatible; OpenFIGI-Rust/1.0)")
.timeout(Duration::from_secs(30));
if let Some(key) = &api_key {
let mut headers = HeaderMap::new();
headers.insert("X-OPENFIGI-APIKEY", HeaderValue::from_str(key)?);
builder = builder.default_headers(headers);
}
let client = builder.build().context("Failed to build HTTP client")?;
logger::log_info(&format!("OpenFIGI client: {}",
if has_key { "with API key" } else { "no key" })).await;
Ok(Self { client, has_key })
}
pub async fn map_isins_to_figi_infos(&self, isins: &[String]) -> anyhow::Result<Vec<FigiInfo>> {
if isins.is_empty() {
return Ok(vec![]);
}
let mut all_figi_infos = Vec::new();
let chunk_size = if self.has_key { 100 } else { 5 };
let inter_sleep = if self.has_key {
Duration::from_millis(240)
} else {
Duration::from_millis(2400)
};
for chunk in isins.chunks(chunk_size) {
let jobs: Vec<Value> = chunk.iter()
.map(|isin| json!({
"idType": "ID_ISIN",
"idValue": isin,
}))
.collect();
let mut retry_count = 0;
let max_retries = 5;
let mut backoff_ms = 1000u64;
loop {
let resp_result = self.client
.post("https://api.openfigi.com/v3/mapping")
.header("Content-Type", "application/json")
.json(&jobs)
.send()
.await;
let resp = match resp_result {
Ok(r) => r,
Err(e) => {
retry_count += 1;
if retry_count >= max_retries {
let err_msg = format!("Failed to send mapping request after {} retries: {}", max_retries, e);
logger::log_error(&err_msg).await;
return Err(anyhow!(err_msg));
}
let warn_msg = format!("Transient error sending mapping request (attempt {}/{}): {}", retry_count, max_retries, e);
logger::log_warn(&warn_msg).await;
let retry_msg = format!(" Retrying in {}ms...", backoff_ms);
logger::log_info(&retry_msg).await;
sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(60000); // Cap at 60s
continue;
}
};
let status = resp.status();
let headers = resp.headers().clone();
let body = resp.text().await?;
if status == 429 {
let reset_sec = headers
.get("ratelimit-reset")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(10);
sleep(Duration::from_secs(reset_sec.max(10))).await;
continue;
} else if !status.is_success() {
if status.is_server_error() && retry_count < max_retries {
retry_count += 1;
sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(60000);
continue;
}
return Err(anyhow!("OpenFIGI error {}: {}", status, body));
}
let results: Vec<Value> = serde_json::from_str(&body)?;
for (isin, result) in chunk.iter().zip(results) {
if let Some(data) = result["data"].as_array() {
for item in data {
if let Some(figi) = item["figi"].as_str() {
all_figi_infos.push(FigiInfo {
isin: isin.clone(),
figi: figi.to_string(),
name: item["name"].as_str().unwrap_or("").to_string(),
ticker: item["ticker"].as_str().unwrap_or("").to_string(),
exch_code: item["exchCode"].as_str().unwrap_or("").to_string(),
composite_figi: item["compositeFIGI"].as_str().unwrap_or("").to_string(),
security_type: item["securityType"].as_str().unwrap_or("").to_string(),
market_sector: item["marketSector"].as_str().unwrap_or("").to_string(),
share_class_figi: item["shareClassFIGI"].as_str().unwrap_or("").to_string(),
security_type2: item["securityType2"].as_str().unwrap_or("").to_string(),
security_description: item["securityDescription"].as_str().unwrap_or("").to_string(),
});
}
}
}
}
break;
}
sleep(inter_sleep).await;
}
Ok(all_figi_infos)
}
}
async fn process_and_save_figi_batch(
client: &OpenFigiClient,
lei_batch: &HashMap<String, Vec<String>>,
@@ -224,41 +86,6 @@ async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) ->
Ok(())
}
/// Handles rate limit responses from the OpenFIGI API.
///
/// If a 429 status is received, this function sleeps for the duration specified
/// in the `ratelimit-reset` header (or 10 seconds by default).
///
/// # Arguments
/// * `resp` - The HTTP response to check.
///
/// # Returns
/// Ok(()) if no rate limit, or after waiting for the reset period.
///
/// # Errors
/// Returns an error if the response status indicates a non-rate-limit error.
async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> {
let status = resp.status();
if status == 429 {
let headers = resp.headers();
let reset_sec = headers
.get("ratelimit-reset")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(10);
logger::log_info(&format!(" Rate limited—waiting {}s", reset_sec)).await;
sleep(std::time::Duration::from_secs(reset_sec.max(10))).await;
return Err(anyhow!("Rate limited, please retry"));
} else if status.is_client_error() || status.is_server_error() {
return Err(anyhow!("OpenFIGI API error: {}", status));
}
Ok(())
}
/// Loads or builds securities data by streaming through FIGI mapping files.
///
/// Implements abort-safe incremental persistence with checkpoints and replay logs.
@@ -276,7 +103,7 @@ pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> {
let dir = DataPaths::new(".")?;
let state_path = dir.data_dir().join("state.jsonl");
let manager = StateManager::new(&state_path, &dir.data_dir().to_path_buf());
let manager = StateManager::new(&state_path, &dir.data_dir().to_path_buf())?;
let step_name = "securities_data_complete";
let data_dir = dir.data_dir();
@@ -432,7 +259,6 @@ async fn track_securities_completion(
"securities_data_complete".to_string(),
content_reference,
DataStage::Data,
vec!["lei_figi_mapping_complete".to_string()], // Depends on LEI mapping
None, // Use default TTL (7 days)
).await?;
@@ -1110,220 +936,6 @@ async fn setup_sector_directories(
Ok(())
}
/// Loads all OpenFIGI mapping value lists (marketSecDes, micCode, securityType).
///
/// This function fetches the available values for each mapping parameter from the OpenFIGI API
/// and caches them as JSON files in `data/openfigi/`. If the files already exist and are recent
/// (less than 30 days old), they are reused instead of re-fetching.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if API requests fail, JSON parsing fails, or file I/O fails.
pub async fn load_figi_type_lists(paths: &DataPaths) -> anyhow::Result<()> {
logger::log_info("Loading OpenFIGI mapping value lists...").await;
let state_path = paths.cache_dir().join("state.jsonl");
let cache_openfigi_dir = paths.cache_openfigi_dir();
tokio_fs::create_dir_all(cache_openfigi_dir).await
.context("Failed to create data/openfigi directory")?;
/*if state_path.exists() {
let state_content = tokio::fs::read_to_string(&state_path).await?;
for line in state_content.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
if state.get("yahoo_companies_cleansed_no_data").and_then(|v| v.as_bool()).unwrap_or(false) {
logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await;
if output_path.exists() {
let output_content = tokio::fs::read_to_string(&output_path).await?;
let count = output_content.lines()
.filter(|line| !line.trim().is_empty())
.count();
logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await;
return Ok(count);
} else {
logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await;
break;
}
}
}
}
}*/
let client = OpenFigiClient::new().await?;
// Fetch each type list
get_figi_market_sec_des(&client, cache_openfigi_dir).await?;
get_figi_mic_code(&client, cache_openfigi_dir).await?;
get_figi_security_type(&client, cache_openfigi_dir).await?;
logger::log_info("OpenFIGI mapping value lists loaded successfully").await;
Ok(())
}
/// Fetches and caches the list of valid marketSecDes values.
///
/// # Arguments
/// * `client` - The OpenFIGI client instance.
/// * `cache_dir` - Directory to save the cached JSON file.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if the API request fails or file I/O fails.
async fn get_figi_market_sec_des(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> {
let cache_file = cache_dir.join("marketSecDes.json");
// Check if cache exists and is recent (< 30 days old)
if should_use_cache(&cache_file).await? {
logger::log_info(" Using cached marketSecDes values").await;
return Ok(());
}
logger::log_info(" Fetching marketSecDes values from OpenFIGI API...").await;
let resp = client.client
.get("https://api.openfigi.com/v3/mapping/values/marketSecDes")
.send()
.await
.context("Failed to fetch marketSecDes values")?;
handle_rate_limit(&resp).await?;
let values: Value = resp.json().await
.context("Failed to parse marketSecDes response")?;
// Save to cache
let json_str = serde_json::to_string_pretty(&values)?;
tokio_fs::write(&cache_file, json_str).await
.context("Failed to write marketSecDes cache")?;
logger::log_info(" ✓ Cached marketSecDes values").await;
// Respect rate limits
sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await;
Ok(())
}
/// Fetches and caches the list of valid micCode values.
///
/// # Arguments
/// * `client` - The OpenFIGI client instance.
/// * `cache_dir` - Directory to save the cached JSON file.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if the API request fails or file I/O fails.
async fn get_figi_mic_code(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> {
let cache_file = cache_dir.join("micCode.json");
if should_use_cache(&cache_file).await? {
logger::log_info(" Using cached micCode values").await;
return Ok(());
}
logger::log_info(" Fetching micCode values from OpenFIGI API...").await;
let resp = client.client
.get("https://api.openfigi.com/v3/mapping/values/micCode")
.send()
.await
.context("Failed to fetch micCode values")?;
handle_rate_limit(&resp).await?;
let values: Value = resp.json().await
.context("Failed to parse micCode response")?;
let json_str = serde_json::to_string_pretty(&values)?;
tokio_fs::write(&cache_file, json_str).await
.context("Failed to write micCode cache")?;
logger::log_info(" ✓ Cached micCode values").await;
sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await;
Ok(())
}
/// Checks if a cache file exists and is less than 30 days old.
///
/// # Arguments
/// * `path` - Path to the cache file.
///
/// # Returns
/// True if the cache should be used, false if it needs refreshing.
async fn should_use_cache(path: &Path) -> anyhow::Result<bool> {
if !path.exists() {
return Ok(false);
}
let metadata = tokio_fs::metadata(path).await?;
let modified = metadata.modified()?;
let age = modified.elapsed().unwrap_or(std::time::Duration::from_secs(u64::MAX));
// Cache is valid for 30 days
Ok(age < std::time::Duration::from_secs(30 * 24 * 60 * 60))
}
/// Fetches and caches the list of valid securityType values.
///
/// # Arguments
/// * `client` - The OpenFIGI client instance.
/// * `cache_dir` - Directory to save the cached JSON file.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if the API request fails or file I/O fails.
async fn get_figi_security_type(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> {
let cache_file = cache_dir.join("securityType.json");
if should_use_cache(&cache_file).await? {
logger::log_info(" Using cached securityType values").await;
return Ok(());
}
logger::log_info(" Fetching securityType values from OpenFIGI API...").await;
let resp = client.client
.get("https://api.openfigi.com/v3/mapping/values/securityType")
.send()
.await
.context("Failed to fetch securityType values")?;
handle_rate_limit(&resp).await?;
let values: Value = resp.json().await
.context("Failed to parse securityType response")?;
let json_str = serde_json::to_string_pretty(&values)?;
tokio_fs::write(&cache_file, json_str).await
.context("Failed to write securityType cache")?;
logger::log_info(" ✓ Cached securityType values").await;
sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await;
Ok(())
}
#[derive(Debug)]
pub struct MappingStats {
pub total_leis: usize,
@@ -1621,7 +1233,7 @@ pub async fn update_lei_mapping(
) -> anyhow::Result<bool> {
let dir = DataPaths::new(".")?;
let state_path = dir.cache_dir().join("state.jsonl");
let manager = StateManager::new(&state_path, &dir.cache_dir().to_path_buf());
let manager = StateManager::new(&state_path, &dir.cache_dir().to_path_buf())?;
let step_name = "lei_figi_mapping_complete";
let map_cache_dir = dir.cache_gleif_openfigi_map_dir();
@@ -1694,7 +1306,6 @@ async fn track_lei_mapping_completion(
"lei_figi_mapping_complete".to_string(),
content_reference,
DataStage::Cache, // 24-hour TTL for API data
vec![], // No dependencies
None, // Use default TTL
).await?;

View File

@@ -94,7 +94,7 @@ pub async fn collect_fx_rates(
let log_path = data_path.join("fx_rates_updates.log");
let state_path = data_path.join("state.jsonl");
let manager = StateManager::new(&state_path, &data_path.to_path_buf());
let manager = StateManager::new(&state_path, &data_path.to_path_buf())?;
let step_name = "yahoo_fx_rate_collection_completed";
let content_reference = directory_reference(&output_path,
Some(vec![
@@ -169,7 +169,6 @@ pub async fn collect_fx_rates(
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;
return Ok(collected_currencies.len());
@@ -321,7 +320,6 @@ pub async fn collect_fx_rates(
step_name.to_string(),
content_reference,
DataStage::Data,
vec!["yahoo_companies_cleansed".to_string()], // Dependency
None, // Use default TTL (7 days for Data stage)
).await?;
}

View File

@@ -1,3 +1,4 @@
use web_scraper::util::integrity::StateManager;
// src/main.rs - Cleaned up version with extracted helpers
use web_scraper::{*, scraper, corporate};
use crate::check_shutdown;
@@ -233,7 +234,7 @@ fn format_duration(duration: Duration) -> String {
format!("{:02}::{:02}::{:02}::{:02}", days, hours, minutes, seconds)
}
pub async fn create_state_files(paths: &DataPaths) -> Result<()> {
async fn create_state_files(paths: &DataPaths) -> Result<()> {
let paths = (
paths.data_dir().join("state.jsonl"),
paths.cache_dir().join("state.jsonl"),
@@ -251,6 +252,14 @@ pub async fn create_state_files(paths: &DataPaths) -> Result<()> {
Ok(())
}
fn visualize_checkpoint_dependencies(paths: &DataPaths) -> Result<()> {
let manager = StateManager::new(&paths.data_dir().join("state.jsonl"), &paths.data_dir().to_path_buf())?;
manager.print_dependency_graph();
let dot = manager.get_dependency_config().to_dot();
std::fs::write(paths.logs_dir().join("checkpoint_dependencies.dot"), dot)?;
Ok(())
}
// ============================================================================
// MAIN FUNCTION - Simplified with extracted helpers
// ============================================================================
@@ -264,14 +273,13 @@ async fn main() -> Result<()> {
start_docker_desktop().await;
cleanup_all_proxy_containers().await.ok();
create_state_files(&paths).await.ok();
visualize_checkpoint_dependencies(&paths).ok();
let config = Config::load().unwrap_or_else(|_| {
eprintln!("Using default configuration");
Config::default()
});
// Initialize monitoring
let (monitoring_handle, _monitoring_task) = initialize_monitoring(&config, &paths).await?;

View File

@@ -3,3 +3,4 @@ pub mod docker_vpn_proxy;
pub mod helpers;
pub mod hard_reset;
pub mod yahoo;
pub mod openfigi;

View File

@@ -0,0 +1,370 @@
// src/scraper/openfigi.rs - STREAMING VERSION
// Key changes: Never load entire GLEIF CSV or FIGI maps into memory
use crate::util::directories::DataPaths;
use crate::util::logger;
use crate::corporate::{types::*};
use reqwest::Client as HttpClient;
use reqwest::header::{HeaderMap, HeaderValue};
use serde_json::{json, Value};
use std::path::Path;
use tokio::time::{sleep, Duration};
use tokio::fs as tokio_fs;
use anyhow::{Context, anyhow};
#[derive(Clone)]
pub struct OpenFigiClient {
pub client: HttpClient,
pub has_key: bool,
}
impl OpenFigiClient {
pub async fn new() -> anyhow::Result<Self> {
let api_key = dotenvy::var("OPENFIGI_API_KEY").ok();
let has_key = api_key.is_some();
let mut builder = HttpClient::builder()
.user_agent("Mozilla/5.0 (compatible; OpenFIGI-Rust/1.0)")
.timeout(Duration::from_secs(30));
if let Some(key) = &api_key {
let mut headers = HeaderMap::new();
headers.insert("X-OPENFIGI-APIKEY", HeaderValue::from_str(key)?);
builder = builder.default_headers(headers);
}
let client = builder.build().context("Failed to build HTTP client")?;
logger::log_info(&format!("OpenFIGI client: {}",
if has_key { "with API key" } else { "no key" })).await;
Ok(Self { client, has_key })
}
pub async fn map_isins_to_figi_infos(&self, isins: &[String]) -> anyhow::Result<Vec<FigiInfo>> {
if isins.is_empty() {
return Ok(vec![]);
}
let mut all_figi_infos = Vec::new();
let chunk_size = if self.has_key { 100 } else { 5 };
let inter_sleep = if self.has_key {
Duration::from_millis(240)
} else {
Duration::from_millis(2400)
};
for chunk in isins.chunks(chunk_size) {
let jobs: Vec<Value> = chunk.iter()
.map(|isin| json!({
"idType": "ID_ISIN",
"idValue": isin,
}))
.collect();
let mut retry_count = 0;
let max_retries = 5;
let mut backoff_ms = 1000u64;
loop {
let resp_result = self.client
.post("https://api.openfigi.com/v3/mapping")
.header("Content-Type", "application/json")
.json(&jobs)
.send()
.await;
let resp = match resp_result {
Ok(r) => r,
Err(e) => {
retry_count += 1;
if retry_count >= max_retries {
let err_msg = format!("Failed to send mapping request after {} retries: {}", max_retries, e);
logger::log_error(&err_msg).await;
return Err(anyhow!(err_msg));
}
let warn_msg = format!("Transient error sending mapping request (attempt {}/{}): {}", retry_count, max_retries, e);
logger::log_warn(&warn_msg).await;
let retry_msg = format!(" Retrying in {}ms...", backoff_ms);
logger::log_info(&retry_msg).await;
sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(60000); // Cap at 60s
continue;
}
};
let status = resp.status();
let headers = resp.headers().clone();
let body = resp.text().await?;
if status == 429 {
let reset_sec = headers
.get("ratelimit-reset")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(10);
sleep(Duration::from_secs(reset_sec.max(10))).await;
continue;
} else if !status.is_success() {
if status.is_server_error() && retry_count < max_retries {
retry_count += 1;
sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(60000);
continue;
}
return Err(anyhow!("OpenFIGI error {}: {}", status, body));
}
let results: Vec<Value> = serde_json::from_str(&body)?;
for (isin, result) in chunk.iter().zip(results) {
if let Some(data) = result["data"].as_array() {
for item in data {
if let Some(figi) = item["figi"].as_str() {
all_figi_infos.push(FigiInfo {
isin: isin.clone(),
figi: figi.to_string(),
name: item["name"].as_str().unwrap_or("").to_string(),
ticker: item["ticker"].as_str().unwrap_or("").to_string(),
exch_code: item["exchCode"].as_str().unwrap_or("").to_string(),
composite_figi: item["compositeFIGI"].as_str().unwrap_or("").to_string(),
security_type: item["securityType"].as_str().unwrap_or("").to_string(),
market_sector: item["marketSector"].as_str().unwrap_or("").to_string(),
share_class_figi: item["shareClassFIGI"].as_str().unwrap_or("").to_string(),
security_type2: item["securityType2"].as_str().unwrap_or("").to_string(),
security_description: item["securityDescription"].as_str().unwrap_or("").to_string(),
});
}
}
}
}
break;
}
sleep(inter_sleep).await;
}
Ok(all_figi_infos)
}
}
/// Fetches and caches the list of valid securityType values.
///
/// # Arguments
/// * `client` - The OpenFIGI client instance.
/// * `cache_dir` - Directory to save the cached JSON file.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if the API request fails or file I/O fails.
async fn get_figi_security_type(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> {
let cache_file = cache_dir.join("securityType.json");
if should_use_cache(&cache_file).await? {
logger::log_info(" Using cached securityType values").await;
return Ok(());
}
logger::log_info(" Fetching securityType values from OpenFIGI API...").await;
let resp = client.client
.get("https://api.openfigi.com/v3/mapping/values/securityType")
.send()
.await
.context("Failed to fetch securityType values")?;
handle_rate_limit(&resp).await?;
let values: Value = resp.json().await
.context("Failed to parse securityType response")?;
let json_str = serde_json::to_string_pretty(&values)?;
tokio_fs::write(&cache_file, json_str).await
.context("Failed to write securityType cache")?;
logger::log_info(" ✓ Cached securityType values").await;
sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await;
Ok(())
}
/// Loads all OpenFIGI mapping value lists (marketSecDes, micCode, securityType).
///
/// This function fetches the available values for each mapping parameter from the OpenFIGI API
/// and caches them as JSON files in `data/openfigi/`. If the files already exist and are recent
/// (less than 30 days old), they are reused instead of re-fetching.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if API requests fail, JSON parsing fails, or file I/O fails.
pub async fn load_figi_type_lists(paths: &DataPaths) -> anyhow::Result<()> {
logger::log_info("Loading OpenFIGI mapping value lists...").await;
let state_path = paths.cache_dir().join("state.jsonl");
let cache_openfigi_dir = paths.cache_openfigi_dir();
tokio_fs::create_dir_all(cache_openfigi_dir).await
.context("Failed to create data/openfigi directory")?;
let client = OpenFigiClient::new().await?;
// Fetch each type list
get_figi_market_sec_des(&client, cache_openfigi_dir).await?;
get_figi_mic_code(&client, cache_openfigi_dir).await?;
get_figi_security_type(&client, cache_openfigi_dir).await?;
logger::log_info("OpenFIGI mapping value lists loaded successfully").await;
Ok(())
}
/// Fetches and caches the list of valid marketSecDes values.
///
/// # Arguments
/// * `client` - The OpenFIGI client instance.
/// * `cache_dir` - Directory to save the cached JSON file.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if the API request fails or file I/O fails.
async fn get_figi_market_sec_des(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> {
let cache_file = cache_dir.join("marketSecDes.json");
// Check if cache exists and is recent (< 30 days old)
if should_use_cache(&cache_file).await? {
logger::log_info(" Using cached marketSecDes values").await;
return Ok(());
}
logger::log_info(" Fetching marketSecDes values from OpenFIGI API...").await;
let resp = client.client
.get("https://api.openfigi.com/v3/mapping/values/marketSecDes")
.send()
.await
.context("Failed to fetch marketSecDes values")?;
handle_rate_limit(&resp).await?;
let values: Value = resp.json().await
.context("Failed to parse marketSecDes response")?;
// Save to cache
let json_str = serde_json::to_string_pretty(&values)?;
tokio_fs::write(&cache_file, json_str).await
.context("Failed to write marketSecDes cache")?;
logger::log_info(" ✓ Cached marketSecDes values").await;
// Respect rate limits
sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await;
Ok(())
}
/// Fetches and caches the list of valid micCode values.
///
/// # Arguments
/// * `client` - The OpenFIGI client instance.
/// * `cache_dir` - Directory to save the cached JSON file.
///
/// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if the API request fails or file I/O fails.
async fn get_figi_mic_code(client: &OpenFigiClient, cache_dir: &Path) -> anyhow::Result<()> {
let cache_file = cache_dir.join("micCode.json");
if should_use_cache(&cache_file).await? {
logger::log_info(" Using cached micCode values").await;
return Ok(());
}
logger::log_info(" Fetching micCode values from OpenFIGI API...").await;
let resp = client.client
.get("https://api.openfigi.com/v3/mapping/values/micCode")
.send()
.await
.context("Failed to fetch micCode values")?;
handle_rate_limit(&resp).await?;
let values: Value = resp.json().await
.context("Failed to parse micCode response")?;
let json_str = serde_json::to_string_pretty(&values)?;
tokio_fs::write(&cache_file, json_str).await
.context("Failed to write micCode cache")?;
logger::log_info(" ✓ Cached micCode values").await;
sleep(Duration::from_millis(if client.has_key { 240 } else { 2400 })).await;
Ok(())
}
/// Handles rate limit responses from the OpenFIGI API.
///
/// If a 429 status is received, this function sleeps for the duration specified
/// in the `ratelimit-reset` header (or 10 seconds by default).
///
/// # Arguments
/// * `resp` - The HTTP response to check.
///
/// # Returns
/// Ok(()) if no rate limit, or after waiting for the reset period.
///
/// # Errors
/// Returns an error if the response status indicates a non-rate-limit error.
async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> {
let status = resp.status();
if status == 429 {
let headers = resp.headers();
let reset_sec = headers
.get("ratelimit-reset")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(10);
logger::log_info(&format!(" Rate limited—waiting {}s", reset_sec)).await;
sleep(std::time::Duration::from_secs(reset_sec.max(10))).await;
return Err(anyhow!("Rate limited, please retry"));
} else if status.is_client_error() || status.is_server_error() {
return Err(anyhow!("OpenFIGI API error: {}", status));
}
Ok(())
}
/// Checks if a cache file exists and is less than 30 days old.
///
/// # Arguments
/// * `path` - Path to the cache file.
///
/// # Returns
/// True if the cache should be used, false if it needs refreshing.
async fn should_use_cache(path: &Path) -> anyhow::Result<bool> {
if !path.exists() {
return Ok(false);
}
let metadata = tokio_fs::metadata(path).await?;
let modified = metadata.modified()?;
let age = modified.elapsed().unwrap_or(std::time::Duration::from_secs(u64::MAX));
// Cache is valid for 30 days
Ok(age < std::time::Duration::from_secs(30 * 24 * 60 * 60))
}

View File

@@ -1,5 +1,5 @@
// src/util/integrity.rs
//! Content integrity and state lifecycle management module
//! Content integrity and state lifecycle management module with centralized dependencies
//!
//! Features:
//! - File and directory hashing (SHA-256)
@@ -7,9 +7,12 @@
//! - State invalidation based on time or validation failures
//! - 3-stage data lifecycle: cache → data → storage
//! - Inline vs. external hash storage based on size
//! - Cascade invalidation when dependencies fail validation
//! - **Centralized dependency configuration** (Single Source of Truth)
//! - Support for checkpoint groups and hierarchies
//! - Automatic transitive dependency resolution
//! - Cycle detection in dependency graph
use anyhow::{Context, Result};
use anyhow::{Context, Result, bail};
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
@@ -34,6 +37,252 @@ const HASH_STORAGE_DIR: &str = ".integrity_hashes";
/// File extension for external hash files
const HASH_FILE_EXT: &str = ".hash";
/// Default dependency configuration file name
const DEFAULT_DEPENDENCY_CONFIG: &str = "checkpoint_dependencies.toml";
// ============================================================================
// DEPENDENCY CONFIGURATION (SINGLE SOURCE OF TRUTH)
// ============================================================================
/// Centralized dependency configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DependencyConfig {
/// Individual checkpoint dependencies
#[serde(default)]
pub checkpoints: HashMap<String, CheckpointConfig>,
/// Checkpoint groups (for hierarchical dependencies)
#[serde(default)]
pub groups: HashMap<String, GroupConfig>,
}
/// Configuration for a single checkpoint
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointConfig {
/// Description of this checkpoint
#[serde(default)]
pub description: String,
/// Direct dependencies (checkpoint names this depends on)
#[serde(default)]
pub depends_on: Vec<String>,
/// Whether this checkpoint is part of a group
#[serde(default)]
pub group: Option<String>,
}
/// Configuration for a checkpoint group
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupConfig {
/// Description of this group
#[serde(default)]
pub description: String,
/// Members of this group
pub members: Vec<String>,
/// Dependencies for all members of this group
#[serde(default)]
pub depends_on: Vec<String>,
}
impl DependencyConfig {
/// Load dependency configuration from TOML file
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let content = fs::read_to_string(path.as_ref())
.with_context(|| format!("Failed to read dependency config: {}", path.as_ref().display()))?;
let config: DependencyConfig = toml::from_str(&content)
.context("Failed to parse dependency config")?;
config.validate()?;
Ok(config)
}
/// Load from default location (dependencies.toml in base_dir)
pub fn from_default_location<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
let config_path = base_dir.as_ref().join(DEFAULT_DEPENDENCY_CONFIG);
if !config_path.exists() {
// Return empty config if file doesn't exist
return Ok(Self::default());
}
Self::from_file(config_path)
}
/// Validate configuration (check for cycles, invalid references)
pub fn validate(&self) -> Result<()> {
// Check for cycles
for checkpoint in self.checkpoints.keys() {
self.detect_cycle(checkpoint)?;
}
// Validate group memberships
for (group_name, group) in &self.groups {
for member in &group.members {
if !self.checkpoints.contains_key(member) {
bail!("Group '{}' references non-existent checkpoint: {}", group_name, member);
}
}
}
// Validate that checkpoints in groups actually declare the group
for (checkpoint_name, checkpoint) in &self.checkpoints {
if let Some(group_name) = &checkpoint.group {
if let Some(group) = self.groups.get(group_name) {
if !group.members.contains(checkpoint_name) {
bail!("Checkpoint '{}' claims to be in group '{}' but group doesn't list it",
checkpoint_name, group_name);
}
} else {
bail!("Checkpoint '{}' references non-existent group: {}", checkpoint_name, group_name);
}
}
}
Ok(())
}
/// Detect if there's a cycle in dependencies starting from checkpoint
fn detect_cycle(&self, start: &str) -> Result<()> {
let mut visited = HashSet::new();
let mut stack = HashSet::new();
self.detect_cycle_helper(start, &mut visited, &mut stack)
}
fn detect_cycle_helper(
&self,
checkpoint: &str,
visited: &mut HashSet<String>,
stack: &mut HashSet<String>,
) -> Result<()> {
if stack.contains(checkpoint) {
bail!("Cycle detected in dependency graph at checkpoint: {}", checkpoint);
}
if visited.contains(checkpoint) {
return Ok(());
}
visited.insert(checkpoint.to_string());
stack.insert(checkpoint.to_string());
// Check direct dependencies
if let Some(config) = self.checkpoints.get(checkpoint) {
for dep in &config.depends_on {
self.detect_cycle_helper(dep, visited, stack)?;
}
}
stack.remove(checkpoint);
Ok(())
}
/// Get all dependencies for a checkpoint (including transitive and group dependencies)
pub fn get_all_dependencies(&self, checkpoint: &str) -> Result<Vec<String>> {
let mut all_deps = Vec::new();
let mut visited = HashSet::new();
self.collect_dependencies(checkpoint, &mut all_deps, &mut visited)?;
// Remove duplicates while preserving order
let mut seen = HashSet::new();
all_deps.retain(|dep| seen.insert(dep.clone()));
Ok(all_deps)
}
fn collect_dependencies(
&self,
checkpoint: &str,
deps: &mut Vec<String>,
visited: &mut HashSet<String>,
) -> Result<()> {
if visited.contains(checkpoint) {
return Ok(());
}
visited.insert(checkpoint.to_string());
// Get checkpoint config
let config = self.checkpoints.get(checkpoint)
.ok_or_else(|| anyhow::anyhow!("Unknown checkpoint: {}", checkpoint))?;
// Add group dependencies first (if checkpoint is in a group)
if let Some(group_name) = &config.group {
if let Some(group) = self.groups.get(group_name) {
for dep in &group.depends_on {
if !visited.contains(dep) {
deps.push(dep.clone());
self.collect_dependencies(dep, deps, visited)?;
}
}
}
}
// Add direct dependencies
for dep in &config.depends_on {
if !visited.contains(dep) {
deps.push(dep.clone());
self.collect_dependencies(dep, deps, visited)?;
}
}
Ok(())
}
/// Get dependency graph as DOT format (for visualization)
pub fn to_dot(&self) -> String {
let mut dot = String::from("digraph Dependencies {\n");
dot.push_str(" rankdir=LR;\n");
dot.push_str(" node [shape=box];\n\n");
// Add checkpoints
for (name, config) in &self.checkpoints {
let label = if config.description.is_empty() {
name.clone()
} else {
format!("{}\n{}", name, config.description)
};
dot.push_str(&format!(" \"{}\" [label=\"{}\"];\n", name, label));
}
// Add dependencies
dot.push_str("\n");
for (name, config) in &self.checkpoints {
// Add group dependencies
if let Some(group_name) = &config.group {
if let Some(group) = self.groups.get(group_name) {
for dep in &group.depends_on {
dot.push_str(&format!(" \"{}\" -> \"{}\" [label=\"via group {}\"];\n",
name, dep, group_name));
}
}
}
// Add direct dependencies
for dep in &config.depends_on {
dot.push_str(&format!(" \"{}\" -> \"{}\";\n", name, dep));
}
}
dot.push_str("}\n");
dot
}
}
impl Default for DependencyConfig {
fn default() -> Self {
Self {
checkpoints: HashMap::new(),
groups: HashMap::new(),
}
}
}
// ============================================================================
// DATA STRUCTURES
// ============================================================================
@@ -135,7 +384,8 @@ pub struct StateEntry {
/// Validation status
pub validation_status: ValidationStatus,
/// Dependencies (other steps that must be valid for this to remain valid)
/// Dependencies (resolved automatically from config, stored for reference)
#[serde(default)]
pub dependencies: Vec<String>,
}
@@ -160,7 +410,7 @@ pub enum ValidationStatus {
}
// ============================================================================
// HASH COMPUTATION
// HASH COMPUTATION (UNCHANGED FROM ORIGINAL)
// ============================================================================
/// Hash a single file using SHA-256
@@ -185,7 +435,6 @@ pub fn hash_file<P: AsRef<Path>>(path: P) -> Result<String> {
}
/// Hash a directory recursively
/// Returns a combined hash of all files in sorted order
pub fn hash_directory<P: AsRef<Path>>(
path: P,
include_patterns: Option<&[String]>,
@@ -194,31 +443,26 @@ pub fn hash_directory<P: AsRef<Path>>(
let path = path.as_ref();
if !path.is_dir() {
anyhow::bail!("Path is not a directory: {}", path.display());
bail!("Path is not a directory: {}", path.display());
}
// Collect all files recursively
let mut files = Vec::new();
collect_files_recursive(path, &mut files, include_patterns, exclude_patterns)?;
// Sort for deterministic hashing
files.sort();
if files.is_empty() {
return Ok(String::from("d41d8cd98f00b204e9800998ecf8427e")); // MD5 of empty string
return Ok(String::from("d41d8cd98f00b204e9800998ecf8427e"));
}
// Hash all files and combine
let mut combined_hasher = Sha256::new();
for file_path in files {
// Include relative path in hash for structure awareness
let rel_path = file_path.strip_prefix(path)
.unwrap_or(&file_path)
.to_string_lossy();
combined_hasher.update(rel_path.as_bytes());
// Hash file content
let file_hash = hash_file(&file_path)?;
combined_hasher.update(file_hash.as_bytes());
}
@@ -241,7 +485,6 @@ fn collect_files_recursive(
let entry = entry?;
let path = entry.path();
// Skip hidden files and directories
if let Some(name) = path.file_name() {
if name.to_string_lossy().starts_with('.') {
continue;
@@ -251,7 +494,6 @@ fn collect_files_recursive(
if path.is_dir() {
collect_files_recursive(&path, files, include_patterns, exclude_patterns)?;
} else if path.is_file() {
// Apply pattern filters
if should_include_file(&path, include_patterns, exclude_patterns) {
files.push(path);
}
@@ -261,7 +503,7 @@ fn collect_files_recursive(
Ok(())
}
/// Check if a file should be included based on patterns
/// Check if file should be included based on patterns
fn should_include_file(
path: &Path,
include_patterns: Option<&[String]>,
@@ -269,51 +511,44 @@ fn should_include_file(
) -> bool {
let path_str = path.to_string_lossy();
// Check exclude patterns first
if let Some(excludes) = exclude_patterns {
for pattern in excludes {
if path_str.contains(pattern) || matches_glob(path, pattern) {
if glob_match(&path_str, pattern) {
return false;
}
}
}
// Check include patterns
if let Some(includes) = include_patterns {
for pattern in includes {
if path_str.contains(pattern) || matches_glob(path, pattern) {
if glob_match(&path_str, pattern) {
return true;
}
}
return false; // If includes specified but no match
return false;
}
true // Include by default
true
}
/// Simple glob pattern matching (supports * and ?)
fn matches_glob(path: &Path, pattern: &str) -> bool {
let path_str = path.to_string_lossy();
// Convert glob to regex
let regex_pattern = pattern
.replace(".", "\\.")
.replace("*", ".*")
.replace("?", ".");
if let Ok(re) = regex::Regex::new(&format!("^{}$", regex_pattern)) {
re.is_match(&path_str)
/// Simple glob pattern matching
fn glob_match(path: &str, pattern: &str) -> bool {
if pattern.contains('*') {
let parts: Vec<&str> = pattern.split('*').collect();
if parts.len() == 2 {
path.contains(parts[0]) && path.ends_with(parts[1])
} else {
false
}
} else {
false
path.ends_with(pattern)
}
}
/// Hash a content reference
/// Hash content based on reference type
pub fn hash_content_reference(reference: &ContentReference) -> Result<String> {
match reference {
ContentReference::File { path } => {
hash_file(path)
}
ContentReference::File { path } => hash_file(path),
ContentReference::Directory { path, include_patterns, exclude_patterns } => {
hash_directory(
path,
@@ -323,75 +558,44 @@ pub fn hash_content_reference(reference: &ContentReference) -> Result<String> {
}
ContentReference::Composite { references } => {
let mut combined_hasher = Sha256::new();
for reference in references {
let hash = hash_content_reference(reference)?;
for ref_item in references {
let hash = hash_content_reference(ref_item)?;
combined_hasher.update(hash.as_bytes());
}
Ok(format!("{:x}", combined_hasher.finalize()))
}
}
}
// ============================================================================
// HASH STORAGE MANAGEMENT
// ============================================================================
/// Determine storage method based on hash size
pub fn determine_hash_storage(hash: &str, base_dir: &Path) -> HashStorage {
if hash.len() <= INLINE_HASH_THRESHOLD {
HashStorage::Inline {
hash: hash.to_string(),
}
/// Determine whether to store hash inline or externally
fn determine_hash_storage(hash: &str, base_dir: &Path) -> HashStorage {
if hash.len() > INLINE_HASH_THRESHOLD {
let hash_dir = base_dir.join(HASH_STORAGE_DIR);
let hash_file = hash_dir.join(format!("{}{}", &hash[..16], HASH_FILE_EXT));
HashStorage::External { hash_file }
} else {
let hash_id = Sha256::digest(hash.as_bytes());
let hash_filename = format!("{:x}{}", hash_id, HASH_FILE_EXT);
HashStorage::External {
hash_file: base_dir
.join(HASH_STORAGE_DIR)
.join(hash_filename),
}
HashStorage::Inline { hash: hash.to_string() }
}
}
/// Store hash externally if needed
pub async fn store_hash(
hash: &str,
storage: &HashStorage,
) -> Result<()> {
match storage {
HashStorage::Inline { .. } => {
// Nothing to do, hash is inline
Ok(())
}
HashStorage::External { hash_file } => {
// Create directory if needed
if let Some(parent) = hash_file.parent() {
async_fs::create_dir_all(parent).await?;
}
// Write hash to file
let mut file = async_fs::File::create(hash_file).await?;
file.write_all(hash.as_bytes()).await?;
file.flush().await?;
Ok(())
/// Store hash to file if external storage
async fn store_hash(hash: &str, storage: &HashStorage) -> Result<()> {
if let HashStorage::External { hash_file } = storage {
if let Some(parent) = hash_file.parent() {
async_fs::create_dir_all(parent).await?;
}
async_fs::write(hash_file, hash.as_bytes()).await?;
}
Ok(())
}
/// Retrieve hash from storage
pub async fn retrieve_hash(storage: &HashStorage) -> Result<String> {
/// Load hash from storage
async fn load_hash(storage: &HashStorage) -> Result<String> {
match storage {
HashStorage::Inline { hash } => {
Ok(hash.clone())
}
HashStorage::Inline { hash } => Ok(hash.clone()),
HashStorage::External { hash_file } => {
async_fs::read_to_string(hash_file)
.await
.with_context(|| format!("Failed to read hash file: {}", hash_file.display()))
let content = async_fs::read_to_string(hash_file).await?;
Ok(content.trim().to_string())
}
}
}
@@ -400,52 +604,47 @@ pub async fn retrieve_hash(storage: &HashStorage) -> Result<String> {
// VALIDATION
// ============================================================================
/// Validate a state entry's content against its hash
pub async fn validate_entry(entry: &StateEntry) -> Result<ValidationStatus> {
// Check if completed
/// Validate a single state entry
async fn validate_entry(entry: &StateEntry) -> Result<ValidationStatus> {
if !entry.completed {
return Ok(ValidationStatus::Unknown);
}
// Check TTL expiration
if let Some(completed_at) = entry.completed_at {
let ttl = entry.ttl_override
.or_else(|| entry.data_stage.map(|s| s.default_ttl()))
.unwrap_or_else(|| Duration::days(7));
let content_ref = match &entry.content_reference {
Some(r) => r,
None => return Ok(ValidationStatus::Unknown),
};
let expiration = completed_at + ttl;
if Utc::now() > expiration {
return Ok(ValidationStatus::Expired);
let stored_hash_storage = match &entry.content_hash {
Some(h) => h,
None => return Ok(ValidationStatus::Unknown),
};
let stored_hash = load_hash(stored_hash_storage).await?;
let current_hash = match hash_content_reference(content_ref) {
Ok(h) => h,
Err(e) => {
return Ok(ValidationStatus::Invalid {
reason: format!("Failed to compute hash: {}", e),
});
}
};
if stored_hash != current_hash {
return Ok(ValidationStatus::Invalid {
reason: "Hash mismatch".to_string(),
});
}
// Validate content hash if available
if let (Some(reference), Some(storage)) = (&entry.content_reference, &entry.content_hash) {
// Compute current hash
let current_hash = match hash_content_reference(reference) {
Ok(hash) => hash,
Err(e) => {
return Ok(ValidationStatus::Invalid {
reason: format!("Failed to compute hash: {}", e),
});
}
};
if let Some(stage) = entry.data_stage {
let ttl = entry.ttl_override.unwrap_or_else(|| stage.default_ttl());
// Retrieve stored hash
let stored_hash = match retrieve_hash(storage).await {
Ok(hash) => hash,
Err(e) => {
return Ok(ValidationStatus::Invalid {
reason: format!("Failed to retrieve stored hash: {}", e),
});
if let Some(completed_at) = entry.completed_at {
let age = Utc::now() - completed_at;
if age > ttl {
return Ok(ValidationStatus::Expired);
}
};
// Compare hashes
if current_hash != stored_hash {
return Ok(ValidationStatus::Invalid {
reason: "Hash mismatch".to_string(),
});
}
}
@@ -458,7 +657,6 @@ pub async fn validate_all_entries(
) -> Result<ValidationReport> {
let mut report = ValidationReport::default();
// First pass: validate each entry independently
for (name, entry) in entries.iter_mut() {
let status = validate_entry(entry).await?;
entry.validation_status = status.clone();
@@ -479,7 +677,6 @@ pub async fn validate_all_entries(
}
}
// Second pass: cascade invalidation based on dependencies
let mut invalidated = HashSet::new();
for name in &report.invalid_entries {
invalidated.insert(name.clone());
@@ -493,7 +690,6 @@ pub async fn validate_all_entries(
continue;
}
// Check if any dependency is invalidated
for dep in &entry.dependencies {
if invalidated.contains(dep) {
newly_invalidated.push((name.clone(), dep.clone()));
@@ -565,21 +761,47 @@ impl ValidationReport {
}
// ============================================================================
// STATE MANAGEMENT
// STATE MANAGEMENT (UPDATED WITH CENTRALIZED DEPENDENCIES)
// ============================================================================
/// State manager for reading/writing state entries
/// State manager with centralized dependency configuration
pub struct StateManager {
state_path: PathBuf,
base_dir: PathBuf,
dependency_config: DependencyConfig,
}
impl StateManager {
pub fn new<P: AsRef<Path>>(state_path: P, base_dir: P) -> Self {
Self {
/// Create new state manager and load dependency configuration
pub fn new<P: AsRef<Path>>(state_path: P, base_dir: P) -> Result<Self> {
let base_dir = base_dir.as_ref().to_path_buf();
let dependency_config = DependencyConfig::from_default_location(&base_dir)?;
Ok(Self {
state_path: state_path.as_ref().to_path_buf(),
base_dir,
dependency_config,
})
}
/// Create with explicit dependency configuration
pub fn with_config<P: AsRef<Path>>(
state_path: P,
base_dir: P,
dependency_config: DependencyConfig,
) -> Result<Self> {
dependency_config.validate()?;
Ok(Self {
state_path: state_path.as_ref().to_path_buf(),
base_dir: base_dir.as_ref().to_path_buf(),
}
dependency_config,
})
}
/// Get the dependency configuration (for inspection/debugging)
pub fn get_dependency_config(&self) -> &DependencyConfig {
&self.dependency_config
}
/// Load all state entries from state.jsonl
@@ -607,28 +829,39 @@ impl StateManager {
/// Save all state entries to state.jsonl
pub async fn save_entries(&self, entries: &HashMap<String, StateEntry>) -> Result<()> {
let mut lines = Vec::new();
for entry in entries.values() {
let json = serde_json::to_string(entry)?;
lines.push(json);
if let Some(parent) = self.state_path.parent() {
async_fs::create_dir_all(parent).await?;
}
let content = lines.join("\n") + "\n";
async_fs::write(&self.state_path, content).await?;
let mut file = async_fs::File::create(&self.state_path).await?;
for entry in entries.values() {
let line = serde_json::to_string(&entry)? + "\n";
file.write_all(line.as_bytes()).await?;
}
file.sync_all().await?;
Ok(())
}
/// Create or update a state entry with integrity tracking
/// **UPDATED**: Dependencies are now resolved automatically from config
pub async fn update_entry(
&self,
step_name: String,
content_reference: ContentReference,
data_stage: DataStage,
dependencies: Vec<String>,
ttl_override: Option<Duration>,
) -> Result<StateEntry> {
// Resolve dependencies from configuration
let dependencies = self.dependency_config
.get_all_dependencies(&step_name)
.unwrap_or_else(|_| {
// If checkpoint not in config, no dependencies
Vec::new()
});
// Compute hash
let hash = hash_content_reference(&content_reference)?;
@@ -697,6 +930,48 @@ impl StateManager {
self.save_entries(&entries).await?;
Ok(report)
}
/// Print dependency graph information
pub fn print_dependency_graph(&self) {
println!("=== Dependency Configuration ===");
println!("\nCheckpoints: {}", self.dependency_config.checkpoints.len());
println!("Groups: {}", self.dependency_config.groups.len());
println!("\n--- Checkpoints ---");
for (name, config) in &self.dependency_config.checkpoints {
println!("{}", name);
if !config.description.is_empty() {
println!(" Description: {}", config.description);
}
if let Some(group) = &config.group {
println!(" Group: {}", group);
}
if !config.depends_on.is_empty() {
println!(" Depends on: {}", config.depends_on.join(", "));
}
// Show resolved dependencies
if let Ok(all_deps) = self.dependency_config.get_all_dependencies(name) {
if !all_deps.is_empty() {
println!(" Resolved (including transitive): {}", all_deps.join(", "));
}
}
println!();
}
println!("\n--- Groups ---");
for (name, group) in &self.dependency_config.groups {
println!("{}", name);
if !group.description.is_empty() {
println!(" Description: {}", group.description);
}
println!(" Members: {}", group.members.join(", "));
if !group.depends_on.is_empty() {
println!(" Group dependencies: {}", group.depends_on.join(", "));
}
println!();
}
}
}
// ============================================================================