From 29d8f1d89e1edc17dbf4262aff19bd249e5c9910 Mon Sep 17 00:00:00 2001 From: donpat1to Date: Mon, 12 Jan 2026 18:50:44 +0100 Subject: [PATCH] moved structs to types.rs --- integrity/checkpoint_dependencies.dot | 32 ++-- src/corporate/aggregation.rs | 195 ---------------------- src/corporate/checkpoint_helpers.rs | 12 +- src/corporate/collect_exchanges.rs | 2 +- src/corporate/helpers.rs | 24 +-- src/corporate/mod.rs | 1 - src/corporate/storage.rs | 2 +- src/corporate/types.rs | 74 ++++---- src/corporate/update_companies.rs | 10 +- src/corporate/update_companies_cleanse.rs | 24 +-- src/corporate/update_companies_enrich.rs | 22 +-- src/corporate/update_openfigi.rs | 10 +- src/corporate/yahoo_company_extraction.rs | 8 +- src/economic/yahoo_update_forex.rs | 3 +- src/scraper/yahoo.rs | 50 +----- 15 files changed, 120 insertions(+), 349 deletions(-) delete mode 100644 src/corporate/aggregation.rs diff --git a/integrity/checkpoint_dependencies.dot b/integrity/checkpoint_dependencies.dot index 6aec350..8157da7 100644 --- a/integrity/checkpoint_dependencies.dot +++ b/integrity/checkpoint_dependencies.dot @@ -2,24 +2,24 @@ digraph Dependencies { rankdir=LR; node [shape=box]; - "yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete -Options data enriched for all companies"]; - "lei_figi_mapping_complete" [label="lei_figi_mapping_complete -LEI-to-FIGI mappings from OpenFIGI API"]; - "yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete -Chart data enriched for all companies"]; - "enrichment_group" [label="enrichment_group -Yahoo exchanges collected and validated"]; - "securities_data_complete" [label="securities_data_complete -Securities data built from FIGI mappings"]; - "yahoo_companies_cleansed" [label="yahoo_companies_cleansed -Company data cleansed and validated"]; "yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete Corporate events enriched for all companies"]; + "yahoo_companies_cleansed" [label="yahoo_companies_cleansed +Company data cleansed and validated"]; + "yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete +Options data enriched for all companies"]; + "securities_data_complete" [label="securities_data_complete +Securities data built from FIGI mappings"]; + "enrichment_group" [label="enrichment_group +Yahoo exchanges collected and validated"]; + "yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete +Chart data enriched for all companies"]; + "lei_figi_mapping_complete" [label="lei_figi_mapping_complete +LEI-to-FIGI mappings from OpenFIGI API"]; - "yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; - "yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; - "securities_data_complete" -> "lei_figi_mapping_complete"; - "yahoo_companies_cleansed" -> "securities_data_complete"; "yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; + "yahoo_companies_cleansed" -> "securities_data_complete"; + "yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; + "securities_data_complete" -> "lei_figi_mapping_complete"; + "yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; } diff --git a/src/corporate/aggregation.rs b/src/corporate/aggregation.rs deleted file mode 100644 index 4648e6b..0000000 --- a/src/corporate/aggregation.rs +++ /dev/null @@ -1,195 +0,0 @@ -// src/corporate/aggregation.rs -use super::types::CompanyPrice; -use super::storage::*; -use crate::util::directories::DataPaths; -use tokio::fs; -use std::collections::HashMap; - -#[derive(Debug)] -struct DayData { - sources: Vec<(CompanyPrice, String)>, // (price, source_ticker) - total_volume: u64, - vwap: f64, - open: f64, - high: f64, - low: f64, - close: f64, -} - -/// Aggregate price data from multiple exchanges, converting all to USD -pub async fn aggregate_best_price_data(paths: &DataPaths, lei: &str) -> anyhow::Result<()> { - let company_dir = get_company_dir(paths, lei); - - for timeframe in ["daily", "5min"].iter() { - let source_dir = company_dir.join(timeframe); - if !source_dir.exists() { - continue; - } - - let mut all_prices: Vec<(CompanyPrice, String)> = Vec::new(); - let mut by_date_time: HashMap = HashMap::new(); - - // Load all sources with their ticker names - let mut entries = tokio::fs::read_dir(&source_dir).await?; - let mut source_count = 0; - let mut sources_used = std::collections::HashSet::new(); - - while let Some(entry) = entries.next_entry().await? { - let source_dir_path = entry.path(); - if !source_dir_path.is_dir() { continue; } - - let source_ticker = source_dir_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("unknown") - .to_string(); - - let prices_path = source_dir_path.join("prices.json"); - if !prices_path.exists() { continue; } - - let content = tokio::fs::read_to_string(&prices_path).await?; - let prices: Vec = serde_json::from_str(&content)?; - - if !prices.is_empty() { - sources_used.insert(source_ticker.clone()); - source_count += 1; - } - - for price in prices { - all_prices.push((price, source_ticker.clone())); - } - } - - if all_prices.is_empty() { - continue; - } - - println!(" Aggregating from {} exchanges: {}", - sources_used.len(), - sources_used.iter() - .map(|s| s.as_str()) - .collect::>() - .join(", ") - ); - - // Group by date + time (for 5min) or just date - for (p, source) in all_prices { - let key = if timeframe == &"5min" && !p.time.is_empty() { - format!("{}_{}", p.date, p.time) - } else { - p.date.clone() - }; - - // Convert to USD immediately DUMMY ------------------------------------------------------------------------------------------- - let usd_rate = 0.1; - - let mut p_usd = p.clone(); - p_usd.open *= usd_rate; - p_usd.high *= usd_rate; - p_usd.low *= usd_rate; - p_usd.close *= usd_rate; - p_usd.adj_close *= usd_rate; - p_usd.currency = "USD".to_string(); - - let entry = by_date_time.entry(key.clone()).or_insert(DayData { - sources: vec![], - total_volume: 0, - vwap: 0.0, - open: p_usd.open, - high: p_usd.high, - low: p_usd.low, - close: p_usd.close, - }); - - let volume = p.volume.max(1); // avoid div0 - let vwap_contrib = p_usd.close * volume as f64; - - entry.sources.push((p_usd.clone(), source)); - entry.total_volume += volume; - entry.vwap += vwap_contrib; - - // Use first open, last close, max high, min low - if entry.sources.len() == 1 { - entry.open = p_usd.open; - } - entry.close = p_usd.close; - entry.high = entry.high.max(p_usd.high); - entry.low = entry.low.min(p_usd.low); - } - - // Finalize aggregated data - let mut aggregated: Vec = Vec::new(); - - for (key, data) in by_date_time { - let vwap = data.vwap / data.total_volume as f64; - - let (date, time) = if key.contains('_') { - let parts: Vec<&str> = key.split('_').collect(); - (parts[0].to_string(), parts[1].to_string()) - } else { - (key, "".to_string()) - }; - - // Track which exchange contributed most volume - let best_source = data.sources.iter() - .max_by_key(|(p, _)| p.volume) - .map(|(_, src)| src.clone()) - .unwrap_or_else(|| "unknown".to_string()); - - aggregated.push(CompanyPrice { - ticker: format!("{lei}@agg"), // Mark as aggregated - date, - time, - open: data.open, - high: data.high, - low: data.low, - close: data.close, - adj_close: vwap, - volume: data.total_volume, - currency: "USD".to_string(), - }); - } - - aggregated.sort_by_key(|p| (p.date.clone(), p.time.clone())); - - // Save aggregated result - let agg_dir = company_dir.join("aggregated").join(timeframe); - fs::create_dir_all(&agg_dir).await?; - let path = agg_dir.join("prices.json"); - fs::write(&path, serde_json::to_string_pretty(&aggregated)?).await?; - - // Save aggregation metadata - let meta = AggregationMetadata { - lei: lei.to_string(), // ← CHANGE THIS - timeframe: timeframe.to_string(), - sources: sources_used.into_iter().collect(), - total_bars: aggregated.len(), - date_range: ( - aggregated.first().map(|p| p.date.clone()).unwrap_or_default(), - aggregated.last().map(|p| p.date.clone()).unwrap_or_default(), - ), - aggregated_at: chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(), - }; - - let meta_path = agg_dir.join("metadata.json"); - fs::write(&meta_path, serde_json::to_string_pretty(&meta)?).await?; - - println!(" ✓ {} {} bars from {} sources (USD)", - aggregated.len(), - timeframe, - source_count - ); - } - - Ok(()) -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -struct AggregationMetadata { - lei: String, - timeframe: String, - sources: Vec, - total_bars: usize, - date_range: (String, String), - aggregated_at: String, -} \ No newline at end of file diff --git a/src/corporate/checkpoint_helpers.rs b/src/corporate/checkpoint_helpers.rs index 034ef64..4e8004d 100644 --- a/src/corporate/checkpoint_helpers.rs +++ b/src/corporate/checkpoint_helpers.rs @@ -4,7 +4,7 @@ //! This module extracts common patterns used across multiple update modules //! to reduce code duplication and improve maintainability. -use super::types::CompanyCrossPlatformInfo; +use super::types::CompanyCrossPlatformData; use crate::util::logger; use std::collections::HashMap; use std::path::{Path}; @@ -22,7 +22,7 @@ pub async fn load_checkpoint_with_log( checkpoint_path: P1, log_path: P2, checkpoint_desc: &str, -) -> Result> +) -> Result> where P1: AsRef, P2: AsRef, @@ -30,7 +30,7 @@ where let checkpoint_path = checkpoint_path.as_ref(); let log_path = log_path.as_ref(); - let mut companies: HashMap = HashMap::new(); + let mut companies: HashMap = HashMap::new(); // Load checkpoint if it exists if checkpoint_path.exists() { @@ -42,7 +42,7 @@ where continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company) => { companies.insert(company.name.clone(), company); } @@ -65,7 +65,7 @@ where continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company) => { companies.insert(company.name.clone(), company); replayed += 1; @@ -91,7 +91,7 @@ where pub async fn consolidate_checkpoint( checkpoint_path: P1, log_path: P2, - companies: &HashMap, + companies: &HashMap, ) -> Result<()> where P1: AsRef, diff --git a/src/corporate/collect_exchanges.rs b/src/corporate/collect_exchanges.rs index b6a4915..c8ee8a3 100644 --- a/src/corporate/collect_exchanges.rs +++ b/src/corporate/collect_exchanges.rs @@ -2,7 +2,7 @@ use crate::util::directories::DataPaths; use crate::util::integrity::{DataStage, StateManager, file_reference}; use crate::util::logger; -use crate::scraper::yahoo::ChartData; +use crate::corporate::types::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; diff --git a/src/corporate/helpers.rs b/src/corporate/helpers.rs index de2c031..ed0fd7e 100644 --- a/src/corporate/helpers.rs +++ b/src/corporate/helpers.rs @@ -4,18 +4,18 @@ use chrono::{Local, NaiveDate}; use rand::rngs::StdRng; use rand::prelude::{Rng, SeedableRng, IndexedRandom}; -pub fn event_key(e: &CompanyEvent) -> String { +pub fn event_key(e: &CompanyEventData) -> String { format!("{}|{}|{}", e.ticker, e.date, e.time) } -pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Vec { +pub fn detect_changes(old: &CompanyEventData, new: &CompanyEventData, today: &str) -> Vec { let mut changes = Vec::new(); let ts = Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); if new.date.as_str() <= today { return changes; } if old.time != new.time { - changes.push(CompanyEventChange { + changes.push(CompanyEventChangeData { ticker: new.ticker.clone(), date: new.date.clone(), field_changed: "time".to_string(), @@ -26,7 +26,7 @@ pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Ve } if old.eps_forecast != new.eps_forecast { - changes.push(CompanyEventChange { + changes.push(CompanyEventChangeData { ticker: new.ticker.clone(), date: new.date.clone(), field_changed: "eps_forecast".to_string(), @@ -37,7 +37,7 @@ pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Ve } if old.eps_actual != new.eps_actual { - changes.push(CompanyEventChange { + changes.push(CompanyEventChangeData { ticker: new.ticker.clone(), date: new.date.clone(), field_changed: "eps_actual".to_string(), @@ -52,14 +52,6 @@ pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Ve changes } -pub fn price_key(p: &CompanyPrice) -> String { - if p.time.is_empty() { - format!("{}|{}", p.ticker, p.date) - } else { - format!("{}|{}|{}", p.ticker, p.date, p.time) - } -} - pub fn parse_float(s: &str) -> Option { s.replace("--", "").replace(",", "").parse::().ok() } @@ -83,7 +75,7 @@ pub fn choose_random(items: &[T]) -> T { } /// Extract first valid Yahoo ticker from company -pub fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option { +pub fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformData) -> Option { for tickers in company.isin_tickers_map.values() { for ticker in tickers { if ticker.starts_with("YAHOO:") @@ -113,7 +105,7 @@ pub fn sanitize_company_name(name: &str) -> String { /// Load companies from JSONL file pub async fn load_companies_from_jsonl( path: &std::path::Path -) -> anyhow::Result> { +) -> anyhow::Result> { let content = tokio::fs::read_to_string(path).await?; let mut companies = Vec::new(); @@ -121,7 +113,7 @@ pub async fn load_companies_from_jsonl( if line.trim().is_empty() { continue; } - if let Ok(company) = serde_json::from_str::(line) { + if let Ok(company) = serde_json::from_str::(line) { companies.push(company); } } diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 37675c5..7ac4a7d 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -3,7 +3,6 @@ pub mod types; pub mod scraper; pub mod storage; pub mod helpers; -pub mod aggregation; pub mod update_openfigi; pub mod yahoo_company_extraction; pub mod page_validation; diff --git a/src/corporate/storage.rs b/src/corporate/storage.rs index 9edfd66..cd56b06 100644 --- a/src/corporate/storage.rs +++ b/src/corporate/storage.rs @@ -35,7 +35,7 @@ pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result = serde_json::from_str(&content)?; + let events: Vec = serde_json::from_str(&content)?; for event in events { index.push(EventIndex { diff --git a/src/corporate/types.rs b/src/corporate/types.rs index 9d0667f..1c4a610 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct CompanyEvent { +pub struct CompanyEventData { pub ticker: String, pub date: String, // YYYY-MM-DD pub time: String, // "AMC", "BMO", "TAS", or "" @@ -17,21 +17,7 @@ pub struct CompanyEvent { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompanyPrice { - pub ticker: String, - pub date: String, // YYYY-MM-DD - pub time: String, // HH:MM:SS for intraday, "" for daily - pub open: f64, - pub high: f64, - pub low: f64, - pub close: f64, - pub adj_close: f64, - pub volume: u64, - pub currency: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompanyEventChange { +pub struct CompanyEventChangeData { pub ticker: String, pub date: String, pub field_changed: String, // "time", "eps_forecast", "eps_actual", "new_event" @@ -40,6 +26,24 @@ pub struct CompanyEventChange { pub detected_at: String, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChartData { + pub symbol: String, + pub quotes: Vec, + pub timestamp: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Quote { + pub timestamp: i64, + pub open: Option, + pub high: Option, + pub low: Option, + pub close: Option, + pub volume: Option, + pub adjusted_close: Option, +} + /// Figi Info based on API calls [https://www.openfigi.com/] /// # Attributes /// isin: ISIN belonging to this legal entity from lei @@ -87,7 +91,7 @@ pub struct YahooCompanyDetails { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompanyCrossPlatformInfo { +pub struct CompanyCrossPlatformData { pub name: String, pub isin_tickers_map: HashMap>, // ISIN -> Tickers pub sector: Option, @@ -109,18 +113,32 @@ pub struct WarrantInfo { pub warrants: HashMap>, // ISIN -> Vec (grouped by ISIN) } -/// Option Info -/// -/// Information for Option securities fetched out of Name in FigiInfo -/// example1: "name": "December 25 Calls on ALPHA GA", -/// issued by NULL Call Option for underlying company ALPHA GA -/// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL +/// Options Info replaced by OptionData #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct OptionInfo { - pub underlying_company_name: String, // key in CompanyInfo, key for OptionInfo - pub issuer_company_name: Option, // key in CompanyInfo - pub option_type: String, // "put" or "call" - pub options: HashMap>, // ISIN -> Vec (grouped by ISIN) +pub struct OptionData { + pub symbol: String, + pub expiration_dates: Vec, + pub strikes: Vec, + pub option: Vec, + pub timestamp: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OptionChain { + pub expiration_date: i64, + pub calls: Vec, + pub puts: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OptionContract { + pub strike: f64, + pub last_price: Option, + pub bid: Option, + pub ask: Option, + pub volume: Option, + pub open_interest: Option, + pub implied_volatility: Option, } /// Bond parsed details from ticker/description diff --git a/src/corporate/update_companies.rs b/src/corporate/update_companies.rs index d79b170..ec0d1ba 100644 --- a/src/corporate/update_companies.rs +++ b/src/corporate/update_companies.rs @@ -20,14 +20,14 @@ use anyhow::{anyhow, Result}; /// Represents a write command to be serialized through the log writer enum LogCommand { - Write(CompanyCrossPlatformInfo), + Write(CompanyCrossPlatformData), Checkpoint, Shutdown, } /// Result from processing a single company struct CompanyProcessResult { - company: CompanyCrossPlatformInfo, + company: CompanyCrossPlatformData, is_update: bool, } @@ -36,7 +36,7 @@ struct CompanyProcessResult { fn company_needs_processing( company_name: &str, company_info: &CompanyInfo, - existing_companies: &HashMap, + existing_companies: &HashMap, ) -> bool { // If company not in existing data at all, definitely needs processing let Some(existing_entry) = existing_companies.get(company_name) else { @@ -732,7 +732,7 @@ async fn scrape_with_retry( async fn process_single_company_validated( name: String, company_info: CompanyInfo, - existing_entry: Option, + existing_entry: Option, pool: &Arc, shutdown_flag: &Arc, ) -> anyhow::Result> { @@ -867,7 +867,7 @@ async fn process_single_company_validated( } if !isin_tickers_map.is_empty() { - let company_entry = CompanyCrossPlatformInfo { + let company_entry = CompanyCrossPlatformData { name: name.clone(), isin_tickers_map, sector, diff --git a/src/corporate/update_companies_cleanse.rs b/src/corporate/update_companies_cleanse.rs index a8c2684..29c4b16 100644 --- a/src/corporate/update_companies_cleanse.rs +++ b/src/corporate/update_companies_cleanse.rs @@ -20,15 +20,15 @@ use tokio::sync::mpsc; /// Result of processing a single company #[derive(Debug, Clone)] pub enum CompanyProcessResult { - Valid(CompanyCrossPlatformInfo), + Valid(CompanyCrossPlatformData), FilteredLowCap { name: String, market_cap: f64 }, FilteredNoPrice { name: String }, - Failed { company: CompanyCrossPlatformInfo, error: String, is_transient: bool }, + Failed { company: CompanyCrossPlatformData, error: String, is_transient: bool }, } /// Represents a write command to be serialized through the log writer enum LogCommand { - Write(CompanyCrossPlatformInfo), + Write(CompanyCrossPlatformData), Checkpoint, Shutdown, } @@ -81,7 +81,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result c, Err(e) => { logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await; @@ -194,7 +194,7 @@ pub async fn companies_yahoo_cleansed_low_profile( logger::log_info(" Cleansing companies with low Yahoo profile...").await; // === RECOVERY PHASE: Load checkpoint + replay log === - let mut existing_companies: HashMap = HashMap::new(); + let mut existing_companies: HashMap = HashMap::new(); let mut processed_names: std::collections::HashSet = std::collections::HashSet::new(); if checkpoint_path.exists() { @@ -206,7 +206,7 @@ pub async fn companies_yahoo_cleansed_low_profile( continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); @@ -229,7 +229,7 @@ pub async fn companies_yahoo_cleansed_low_profile( continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company) => { processed_names.insert(company.name.clone()); existing_companies.insert(company.name.clone(), company); @@ -251,7 +251,7 @@ pub async fn companies_yahoo_cleansed_low_profile( logger::log_info(&format!("Loaded {} companies from input", input_companies.len())).await; // === BUILD PENDING LIST (smart skip logic) === - let mut pending: Vec = input_companies + let mut pending: Vec = input_companies .into_iter() .filter(|company| company_needs_processing(company, &existing_companies)) .collect(); @@ -608,7 +608,7 @@ pub async fn companies_yahoo_cleansed_low_profile( /// Helper function to spawn a validation task (reduces code duplication) fn spawn_validation_task( - company: CompanyCrossPlatformInfo, + company: CompanyCrossPlatformData, yahoo_pool: &Arc, paths: &Arc, write_tx: &mpsc::Sender, @@ -688,7 +688,7 @@ fn spawn_validation_task( /// Process a single company with full error categorization async fn process_company_with_validation( - company: &CompanyCrossPlatformInfo, + company: &CompanyCrossPlatformData, yahoo_pool: &Arc, paths: &DataPaths, ) -> CompanyProcessResult { @@ -897,8 +897,8 @@ async fn save_company_core_data( /// Check if a company needs processing (validation check) fn company_needs_processing( - company: &CompanyCrossPlatformInfo, - existing_companies: &HashMap, + company: &CompanyCrossPlatformData, + existing_companies: &HashMap, ) -> bool { // If company exists in cleaned output, skip it !existing_companies.contains_key(&company.name) diff --git a/src/corporate/update_companies_enrich.rs b/src/corporate/update_companies_enrich.rs index cf019cd..f6d3773 100644 --- a/src/corporate/update_companies_enrich.rs +++ b/src/corporate/update_companies_enrich.rs @@ -29,7 +29,7 @@ enum LogCommand { /// Type alias for enrichment function type EnrichmentFn = Arc< - dyn Fn(CompanyCrossPlatformInfo, Arc, DataPaths) + dyn Fn(CompanyCrossPlatformData, Arc, DataPaths) -> Pin> + Send>> + Send + Sync @@ -104,7 +104,7 @@ pub async fn enrich_companies_with_events( logger::log_info(&format!("Found {} companies to process", total_companies)).await; // Filter companies that need enrichment - let pending_companies: Vec = companies + let pending_companies: Vec = companies .into_iter() .filter(|company| !enriched_companies.contains(&company.name)) .collect(); @@ -140,7 +140,7 @@ pub async fn enrich_companies_with_events( let failed_count = Arc::new(AtomicUsize::new(0)); // Log writer channel with batching and fsync - let (log_tx, mut log_rx) = mpsc::channel::(1000); + let (log_tx, log_rx) = mpsc::channel::(1000); // Spawn log writer task let log_writer_handle = spawn_log_writer( @@ -283,7 +283,7 @@ async fn track_events_completion( /// Enrich a single company with event data async fn enrich_company_with_events( - company: &CompanyCrossPlatformInfo, + company: &CompanyCrossPlatformData, yahoo_pool: &Arc, paths: &DataPaths, ) -> anyhow::Result<()> { @@ -438,7 +438,7 @@ pub async fn enrich_companies_with_option( logger::log_info(&format!("Found {} companies to process", total_companies)).await; // Filter companies that need enrichment - let pending_companies: Vec = companies + let pending_companies: Vec = companies .into_iter() .filter(|company| !enriched_companies.contains(&company.name)) .collect(); @@ -474,7 +474,7 @@ pub async fn enrich_companies_with_option( let failed_count = Arc::new(AtomicUsize::new(0)); // Log writer channel with batching and fsync - let (log_tx, mut log_rx) = mpsc::channel::(1000); + let (log_tx, log_rx) = mpsc::channel::(1000); // Spawn log writer task let log_writer_handle = spawn_log_writer( @@ -605,7 +605,7 @@ async fn track_option_completion( /// Enrich a single company with option data async fn enrich_company_with_option( - company: &CompanyCrossPlatformInfo, + company: &CompanyCrossPlatformData, yahoo_pool: &Arc, paths: &DataPaths, ) -> anyhow::Result<()> { @@ -697,7 +697,7 @@ pub async fn enrich_companies_with_chart( logger::log_info(&format!("Found {} companies to process", total_companies)).await; // Filter companies that need enrichment - let pending_companies: Vec = companies + let pending_companies: Vec = companies .into_iter() .filter(|company| !enriched_companies.contains(&company.name)) .collect(); @@ -733,7 +733,7 @@ pub async fn enrich_companies_with_chart( let failed_count = Arc::new(AtomicUsize::new(0)); // Log writer channel with batching and fsync - let (log_tx, mut log_rx) = mpsc::channel::(1000); + let (log_tx, log_rx) = mpsc::channel::(1000); // Spawn log writer task let log_writer_handle = spawn_log_writer( @@ -864,7 +864,7 @@ async fn track_chart_completion( /// Enrich a single company with chart data async fn enrich_company_with_chart( - company: &CompanyCrossPlatformInfo, + company: &CompanyCrossPlatformData, yahoo_pool: &Arc, paths: &DataPaths, ) -> anyhow::Result<()> { @@ -1005,7 +1005,7 @@ fn spawn_log_writer( /// - `shutdown_flag`: Flag to signal shutdown /// - `enrichment_fn`: The specific enrichment function to call (events, option, chart, etc.) fn spawn_enrichment_task( - company: CompanyCrossPlatformInfo, + company: CompanyCrossPlatformData, yahoo_pool: Arc, paths: DataPaths, processed_count: Arc, diff --git a/src/corporate/update_openfigi.rs b/src/corporate/update_openfigi.rs index ff0d9b8..a99b44f 100644 --- a/src/corporate/update_openfigi.rs +++ b/src/corporate/update_openfigi.rs @@ -517,7 +517,7 @@ async fn process_lei_figi_file_batched( let mut common_batch: Vec = Vec::new(); let mut warrants_batch: Vec = Vec::new(); - let mut options_batch: Vec = Vec::new(); + let mut options_batch: Vec = Vec::new(); let mut corporate_bonds_batch: Vec = Vec::new(); let mut government_bonds_batch: Vec = Vec::new(); @@ -538,7 +538,7 @@ async fn process_lei_figi_file_batched( // Group by security type let (common_stocks, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) = - group_by_security_type(&figis); + group_securities(&figis); // Collect entries for batching and update existing keys if !common_stocks.is_empty() { @@ -738,7 +738,7 @@ fn prepare_warrant_entries( fn prepare_option_entries( option_securities: &[FigiInfo], existing_keys: &HashSet, -) -> Vec { +) -> Vec { let mut entries = Vec::new(); for figi in option_securities { @@ -753,7 +753,7 @@ fn prepare_option_entries( continue; } - let option_info = OptionInfo { + let option_info = OptionData { underlying_company_name: underlying.clone(), issuer_company_name: issuer, option_type: option_type.clone(), @@ -898,7 +898,7 @@ fn prepare_government_bond_entries( } /// Groups FigiInfo list by security type -fn group_by_security_type(figis: &[FigiInfo]) -> (Vec, Vec, Vec, Vec, Vec) { +fn group_securities(figis: &[FigiInfo]) -> (Vec, Vec, Vec, Vec, Vec) { let mut common_stocks:Vec = Vec::new(); let mut warrants:Vec = Vec::new(); let mut options:Vec = Vec::new(); diff --git a/src/corporate/yahoo_company_extraction.rs b/src/corporate/yahoo_company_extraction.rs index 2fdbbcd..0778c30 100644 --- a/src/corporate/yahoo_company_extraction.rs +++ b/src/corporate/yahoo_company_extraction.rs @@ -303,7 +303,7 @@ pub async fn get_all_tickers_from_companies_jsonl(paths: &DataPaths) -> anyhow:: let content = tokio::fs::read_to_string(companies_file).await?; let mut tickers = Vec::new(); for line in content.lines() { - let company: CompanyCrossPlatformInfo = serde_json::from_str(line)?; + let company: CompanyCrossPlatformData = serde_json::from_str(line)?; for (_isin, ticker_vec) in company.isin_tickers_map { tickers.extend(ticker_vec); } @@ -314,7 +314,7 @@ pub async fn get_all_tickers_from_companies_jsonl(paths: &DataPaths) -> anyhow:: pub async fn fetch_earnings_with_pool( pool: &Arc, ticker: &str, -) -> anyhow::Result> { +) -> anyhow::Result> { let ticker = ticker.to_string(); let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}&offset=0&size=100", ticker); @@ -329,7 +329,7 @@ pub async fn fetch_earnings_with_pool( }).await } -pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result> { +pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result> { // Wait for the table to load let table = client .wait() @@ -403,7 +403,7 @@ pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result, - pub timestamp: i64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Quote { - pub timestamp: i64, - pub open: Option, - pub high: Option, - pub low: Option, - pub close: Option, - pub volume: Option, - pub adjusted_close: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct OptionData { - pub symbol: String, - pub expiration_dates: Vec, - pub strikes: Vec, - pub option: Vec, - pub timestamp: i64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct OptionChain { - pub expiration_date: i64, - pub calls: Vec, - pub puts: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct OptionContract { - pub strike: f64, - pub last_price: Option, - pub bid: Option, - pub ask: Option, - pub volume: Option, - pub open_interest: Option, - pub implied_volatility: Option, -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SearchResult { pub symbol: String, @@ -279,8 +235,8 @@ impl YahooClient { let client = ClientBuilder::new() .proxy(proxy) - .timeout(Duration::from_secs(30)) // CHANGED: Reduced from 90s to 30s - .connect_timeout(Duration::from_secs(10)) // CHANGED: Reduced from 30s to 10s + .timeout(Duration::from_secs(30)) + .connect_timeout(Duration::from_secs(10)) .pool_max_idle_per_host(2) .pool_idle_timeout(Duration::from_secs(60)) .cookie_store(true)