diff --git a/src/corporate/helpers.rs b/src/corporate/helpers.rs index ed0fd7e..24c8b3d 100644 --- a/src/corporate/helpers.rs +++ b/src/corporate/helpers.rs @@ -1,8 +1,12 @@ // src/corporate/helpers.rs use super::types::*; +use crate::util::directories::DataPaths; + use chrono::{Local, NaiveDate}; use rand::rngs::StdRng; use rand::prelude::{Rng, SeedableRng, IndexedRandom}; +use tokio::fs; +use anyhow::{anyhow}; pub fn event_key(e: &CompanyEventData) -> String { format!("{}|{}|{}", e.ticker, e.date, e.time) @@ -119,4 +123,60 @@ pub async fn load_companies_from_jsonl( } Ok(companies) +} + +pub async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { + let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); + + if !map_cache_dir.exists() { + return Ok(None); + } + + let mut entries = tokio::fs::read_dir(&map_cache_dir).await?; + let mut dates = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { + dates.push((name.to_string(), path)); + } + } + } + } + + if dates.is_empty() { + return Ok(None); + } + + dates.sort_by(|a, b| b.0.cmp(&a.0)); + Ok(Some(dates[0].1.clone())) +} + +pub async fn determine_gleif_date( + gleif_date: Option<&str>, + paths: &DataPaths, +) -> anyhow::Result { + if let Some(d) = gleif_date { + return Ok(d.to_string()); + } + + let gleif_dir = paths.cache_gleif_dir(); + let mut entries = fs::read_dir(gleif_dir).await?; + let mut dates = Vec::new(); + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { + dates.push(name.to_string()); + } + } + } + } + + dates.sort(); + dates.last().cloned().ok_or_else(|| anyhow!("No GLEIF date found")) } \ No newline at end of file diff --git a/src/corporate/mod.rs b/src/corporate/mod.rs index 7ac4a7d..f1aa69f 100644 --- a/src/corporate/mod.rs +++ b/src/corporate/mod.rs @@ -16,5 +16,6 @@ pub mod update_companies_enrich; pub mod collect_exchanges; pub mod bond_processing; +pub mod option_processing; pub use update::run_full_update; \ No newline at end of file diff --git a/src/corporate/option_processing.rs b/src/corporate/option_processing.rs new file mode 100644 index 0000000..f5498c2 --- /dev/null +++ b/src/corporate/option_processing.rs @@ -0,0 +1,54 @@ +/// Parse strike price from option ticker (e.g., "AAPL 150 CALL" -> 150.0) +pub fn parse_strike_from_ticker(ticker: &str) -> Option { + let parts: Vec<&str> = ticker.split_whitespace().collect(); + for (i, part) in parts.iter().enumerate() { + if let Ok(strike) = part.parse::() { + // Check if next word is CALL/PUT to confirm this is strike + if i + 1 < parts.len() && (parts[i + 1].to_uppercase() == "CALL" || parts[i + 1].to_uppercase() == "PUT") { + return Some(strike); + } + } + } + None +} + +/// Parse expiration date from option ticker (e.g., "AAPL 150 CALL 01/17/25" -> timestamp) +pub fn parse_expiration_from_ticker(ticker: &str) -> Option { + let parts: Vec<&str> = ticker.split_whitespace().collect(); + for part in parts { + // Look for date pattern MM/DD/YY + if part.contains('/') && part.len() >= 8 { + if let Ok(date) = chrono::NaiveDate::parse_from_str(part, "%m/%d/%y") { + return Some(date.and_hms_opt(16, 0, 0)?.and_utc().timestamp()); + } + } + } + None +} + +/// Parse option name to extract underlying company, issuer, and option type +/// +/// Examples: +/// - "December 25 Calls on ALPHA GA" -> ("ALPHA GA", None, "call") +/// - "January 26 Puts on TESLA INC" -> ("TESLA INC", None, "put") +pub fn parse_option_name(name: &str) -> (String, Option, String) { + let name_upper = name.to_uppercase(); + + // Detect option type + let option_type = if name_upper.contains("CALL") { + "call".to_string() + } else if name_upper.contains("PUT") { + "put".to_string() + } else { + "unknown".to_string() + }; + + // Try to extract underlying after "on" + if let Some(pos) = name_upper.find(" ON ") { + let underlying = name[pos + 4..].trim().to_string(); + return (underlying, None, option_type); + } + + // Fallback: return entire name + (name.to_string(), None, option_type) +} \ No newline at end of file diff --git a/src/corporate/types.rs b/src/corporate/types.rs index 1c4a610..73aa649 100644 --- a/src/corporate/types.rs +++ b/src/corporate/types.rs @@ -2,30 +2,6 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct CompanyEventData { - pub ticker: String, - pub date: String, // YYYY-MM-DD - pub time: String, // "AMC", "BMO", "TAS", or "" - pub period: String, // "Q1 2025", "FY 2024" - pub eps_forecast: Option, - pub eps_actual: Option, - pub revenue_forecast: Option, - pub revenue_actual: Option, - pub surprise_pct: Option, // (actual - forecast) / |forecast| - pub source: String, // "Yahoo" -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompanyEventChangeData { - pub ticker: String, - pub date: String, - pub field_changed: String, // "time", "eps_forecast", "eps_actual", "new_event" - pub old_value: String, - pub new_value: String, - pub detected_at: String, -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChartData { pub symbol: String, @@ -51,7 +27,7 @@ pub struct Quote { /// # Comments /// Use Mapping the Object List onto Figi Properties #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct FigiInfo { +pub struct FigiData { pub isin: String, pub figi: String, pub name: String, @@ -77,14 +53,16 @@ pub struct FigiInfo { /// * ISIN as the most liquid / preferred traded security (used for fallback) /// * securities: Grouped by ISIN, filtered for Common Stock only #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompanyInfo{ +pub struct CompanyData{ + pub id: String, pub name: String, pub primary_isin: String, - pub securities: HashMap>, // ISIN -> Vec + pub securities: HashMap>, // ISIN -> Vec + pub yahoo_company_data: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct YahooCompanyDetails { +pub struct YahooCompanyData { pub ticker: String, pub sector: Option, pub exchange: Option, @@ -98,25 +76,33 @@ pub struct CompanyCrossPlatformData { pub exchange: Option, } -/// Warrant Info +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WarrantData { + pub company_id: String, // key in CompanyData + pub company_name: String, // key in CompanyData + pub warrants: HashMap, // underlying company name -> Warrant +} + +/// Warrant Data /// -/// Information for Warrant securities fetched out of Name in FigiInfo +/// Information for Warrant securities fetched out of Name in FigiData /// example1: "name": "VONTOBE-PW26 LEONARDO SPA", /// issued by VONTOBEL Put Warrant for underlying company LEONARDO SPA /// example2: "BAYER H-CW25 L'OREAL", /// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WarrantInfo { - pub underlying_company_name: String, // key in CompanyInfo, key for WarrantInfo - pub issuer_company_name: Option, // key in CompanyInfo +pub struct WarrantDetails { + pub company_id: String, // key in CompanyData + pub company_name: String, // key in CompanyData, key for WarrantDetails + pub issuer_company_name: Option, // key in CompanyData pub warrant_type: String, // "put" or "call" - pub warrants: HashMap>, // ISIN -> Vec (grouped by ISIN) + pub warrants: HashMap>, // ISIN -> Vec (grouped by ISIN) } -/// Options Info replaced by OptionData #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OptionData { - pub symbol: String, + pub company_id: String, // key in CompanyData + pub company_name: String, // key in CompanyData pub expiration_dates: Vec, pub strikes: Vec, pub option: Vec, @@ -162,9 +148,9 @@ pub struct BondDetails { /// Example: "name": "LIBERTYVILLE BK & TRUST" /// ticker: "WTFC 4.3 01/12/26 0003" #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CorporateBondInfo { +pub struct CorporateBondData { pub underlying_company_name: String, // key - company name issuing the bond - pub bonds: HashMap>, // ISIN -> Vec (grouped by ISIN) + pub bonds: HashMap>, // ISIN -> Vec (grouped by ISIN) #[serde(skip_serializing_if = "HashMap::is_empty", default)] pub bond_details: HashMap, // ISIN -> parsed bond details } @@ -175,25 +161,42 @@ pub struct CorporateBondInfo { /// Example: "name": "SLOVAK REPUBLIC" /// ticker: "SLOVAK 1.5225 05/10/28 4Y" #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct GovernmentBondInfo { +pub struct GovernmentBondData { pub issuer_name: String, // key - government entity name pub issuer_type: String, // "sovereign", "municipal", "state", "province", etc. - pub bonds: HashMap>, // ISIN -> Vec (grouped by ISIN) + pub bonds: HashMap>, // ISIN -> Vec (grouped by ISIN) #[serde(skip_serializing_if = "HashMap::is_empty", default)] pub bond_details: HashMap, // ISIN -> parsed bond details } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct AvailableExchange { - pub exchange_mic: String, +pub struct ExchangeData { + pub mic: String, pub ticker: String, - pub has_daily: bool, - pub has_5min: bool, - pub last_successful_fetch: Option, // YYYY-MM-DD #[serde(default)] pub currency: String, - #[serde(default)] - pub discovered_at: Option, // When this exchange was first discovered - #[serde(default)] - pub fetch_count: u32, // How many times successfully fetched +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct CompanyEventData { + pub ticker: String, + pub date: String, // YYYY-MM-DD + pub time: String, // "AMC", "BMO", "TAS", or "" + pub period: String, // "Q1 2025", "FY 2024" + pub eps_forecast: Option, + pub eps_actual: Option, + pub revenue_forecast: Option, + pub revenue_actual: Option, + pub surprise_pct: Option, // (actual - forecast) / |forecast| + pub source: String, // "Yahoo" +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompanyEventChangeData { + pub ticker: String, + pub date: String, + pub field_changed: String, // "time", "eps_forecast", "eps_actual", "new_event" + pub old_value: String, + pub new_value: String, + pub detected_at: String, } \ No newline at end of file diff --git a/src/corporate/update.rs b/src/corporate/update.rs index b08bec5..05e22a3 100644 --- a/src/corporate/update.rs +++ b/src/corporate/update.rs @@ -2,7 +2,7 @@ use super::{scraper::*, update_openfigi::*}; use crate::config::Config; use crate::check_shutdown; -use crate::corporate::update_companies::build_companies_jsonl_streaming_parallel; +use crate::corporate::update_companies::update_companies; use crate::corporate::update_companies_cleanse::{companies_yahoo_cleansed_low_profile, companies_yahoo_cleansed_no_data}; use crate::corporate::update_companies_enrich::{enrich_companies_with_events, enrich_companies_with_chart, enrich_companies_with_option}; use crate::corporate::collect_exchanges::collect_and_save_exchanges; @@ -61,20 +61,14 @@ pub async fn run_full_update( check_shutdown!(shutdown_flag); logger::log_info("Step 4: Building securities map (streaming)...").await; - let date_dir = find_most_recent_figi_date_dir(&paths).await?; - - if let Some(date_dir) = date_dir { - logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; - update_securities(&date_dir).await?; - logger::log_info(" ✓ Securities map updated").await; - } else { - logger::log_warn(" ✗ No FIGI data directory found").await; - } + update_securities().await?; + logger::log_info(" ✓ Securities map updated").await; + check_shutdown!(shutdown_flag); - logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; - let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, config, &None).await?; + logger::log_info("Step 5: Building companies.jsonl with Yahoo Data...").await; + let count = update_companies(&paths, pool, shutdown_flag, config, &None).await?; logger::log_info(&format!(" ✓ Saved {} companies", count)).await; check_shutdown!(shutdown_flag); @@ -132,31 +126,3 @@ pub async fn run_full_update( Ok(()) } -async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result> { - let map_cache_dir = paths.cache_gleif_openfigi_map_dir(); - - if !map_cache_dir.exists() { - return Ok(None); - } - - let mut entries = tokio::fs::read_dir(&map_cache_dir).await?; - let mut dates = Vec::new(); - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.is_dir() { - if let Some(name) = path.file_name().and_then(|n| n.to_str()) { - if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { - dates.push((name.to_string(), path)); - } - } - } - } - - if dates.is_empty() { - return Ok(None); - } - - dates.sort_by(|a, b| b.0.cmp(&a.0)); - Ok(Some(dates[0].1.clone())) -} \ No newline at end of file diff --git a/src/corporate/update_companies.rs b/src/corporate/update_companies.rs index ec0d1ba..304d1d2 100644 --- a/src/corporate/update_companies.rs +++ b/src/corporate/update_companies.rs @@ -35,7 +35,7 @@ struct CompanyProcessResult { /// Returns true if company has incomplete data (needs processing) fn company_needs_processing( company_name: &str, - company_info: &CompanyInfo, + company_info: &CompanyData, existing_companies: &HashMap, ) -> bool { // If company not in existing data at all, definitely needs processing @@ -79,7 +79,7 @@ fn company_needs_processing( } /// Abort-safe incremental JSONL persistence with proper hard reset handling -pub async fn build_companies_jsonl_streaming_parallel( +pub async fn update_companies( paths: &DataPaths, pool: &Arc, shutdown_flag: &Arc, @@ -289,7 +289,7 @@ pub async fn build_companies_jsonl_streaming_parallel( let mut tasks = FuturesUnordered::new(); // Build initial pending list with proper filtering - let mut pending: Vec<(String, CompanyInfo)> = securities.iter() + let mut pending: Vec<(String, CompanyData)> = securities.iter() .filter(|(name, info)| company_needs_processing(name, info, &existing_companies)) .map(|(name, info)| (name.clone(), info.clone())) .collect(); @@ -619,8 +619,8 @@ pub async fn build_companies_jsonl_streaming_parallel( async fn load_securities_from_jsonl( checkpoint_path: &std::path::Path, log_path: &std::path::Path, -) -> anyhow::Result> { - let mut securities: HashMap = HashMap::new(); +) -> anyhow::Result> { + let mut securities: HashMap = HashMap::new(); // Load checkpoint if checkpoint_path.exists() { @@ -631,7 +631,7 @@ async fn load_securities_from_jsonl( continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company_info) => { securities.insert(company_info.name.clone(), company_info); } @@ -654,7 +654,7 @@ async fn load_securities_from_jsonl( continue; // Skip incomplete lines } - match serde_json::from_str::(line) { + match serde_json::from_str::(line) { Ok(company_info) => { securities.insert(company_info.name.clone(), company_info); } @@ -677,7 +677,7 @@ async fn scrape_with_retry( isin: &str, max_retries: u32, shutdown_flag: &Arc, -) -> Result> { +) -> Result> { let mut retries = 0; loop { @@ -731,7 +731,7 @@ async fn scrape_with_retry( /// Process single company with validation and shutdown checks async fn process_single_company_validated( name: String, - company_info: CompanyInfo, + company_info: CompanyData, existing_entry: Option, pool: &Arc, shutdown_flag: &Arc, diff --git a/src/corporate/update_openfigi.rs b/src/corporate/update_openfigi.rs index a99b44f..3255e24 100644 --- a/src/corporate/update_openfigi.rs +++ b/src/corporate/update_openfigi.rs @@ -1,7 +1,8 @@ // src/corporate/update_openfigi.rs - STREAMING VERSION -// Key changes: Never load entire GLEIF CSV or FIGI maps into memory use super::types::*; +use super::helpers::{find_most_recent_figi_date_dir, determine_gleif_date}; use super::bond_processing::*; +use super::option_processing::*; use crate::util::directories::DataPaths; use crate::util::integrity::{DataStage, StateManager, directory_reference}; use crate::util::logger; @@ -15,76 +16,6 @@ use anyhow::{Context, anyhow}; const LEI_BATCH_SIZE: usize = 100; // Process 100 LEIs at a time -async fn process_and_save_figi_batch( - client: &OpenFigiClient, - lei_batch: &HashMap>, - date_dir: &Path, -) -> anyhow::Result<()> { - for (lei, isins) in lei_batch { - let unique_isins: Vec<_> = isins.iter() - .cloned() - .collect::>() - .into_iter() - .collect(); - - let figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; - - if figi_infos.is_empty() { - // No FIGIs found - save to no_results.jsonl to avoid re-querying - append_no_result_lei(date_dir, lei, &unique_isins).await?; - continue; - } - - // Save FIGIs by sector as before - save_figi_infos_by_sector(lei, &figi_infos, date_dir).await?; - } - - Ok(()) -} - -async fn save_figi_infos_by_sector( - lei: &str, - figi_infos: &[FigiInfo], - date_dir: &Path, -) -> anyhow::Result<()> { - let mut by_sector: HashMap> = HashMap::new(); - - for figi_info in figi_infos { - let sector = if figi_info.market_sector.is_empty() { - "uncategorized".to_string() - } else { - figi_info.market_sector.clone() - }; - by_sector.entry(sector).or_default().push(figi_info.clone()); - } - - for (sector, figis) in by_sector { - let sector_dir = date_dir.join(§or); - let path = sector_dir.join("lei_to_figi.jsonl"); - append_lei_to_figi_jsonl(&path, lei, &figis).await?; - } - - Ok(()) -} - -async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> anyhow::Result<()> { - let entry = json!({ - "lei": lei, - "figis": figis, - }); - - let line = serde_json::to_string(&entry)? + "\n"; - - let mut file = tokio_fs::OpenOptions::new() - .create(true) - .append(true) - .open(path) - .await?; - - file.write_all(line.as_bytes()).await?; - Ok(()) -} - /// Loads or builds securities data by streaming through FIGI mapping files. /// /// Implements abort-safe incremental persistence with checkpoints and replay logs. @@ -97,13 +28,16 @@ async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) -> /// /// # Errors /// Returns an error if file I/O fails or JSON parsing fails. -pub async fn update_securities(date_dir: &Path) -> anyhow::Result<()> { +pub async fn update_securities() -> anyhow::Result<()> { logger::log_info("Building securities data from FIGI mappings...").await; let dir = DataPaths::new(".")?; let manager = StateManager::new(&dir.integrity_dir()).await?; let step_name = "securities_data_complete"; + let date_dir = find_most_recent_figi_date_dir(&dir).await? + .ok_or_else(|| anyhow!("No FIGI date directory found"))?; + let data_dir = dir.data_dir(); let output_dir = data_dir.join("figi_securities"); tokio_fs::create_dir_all(&output_dir).await @@ -515,11 +449,11 @@ async fn process_lei_figi_file_batched( let batch_size = 100; let mut processed_count = 0; - let mut common_batch: Vec = Vec::new(); - let mut warrants_batch: Vec = Vec::new(); + let mut common_batch: Vec = Vec::new(); + let mut warrants_batch: Vec = Vec::new(); let mut options_batch: Vec = Vec::new(); - let mut corporate_bonds_batch: Vec = Vec::new(); - let mut government_bonds_batch: Vec = Vec::new(); + let mut corporate_bonds_batch: Vec = Vec::new(); + let mut government_bonds_batch: Vec = Vec::new(); for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() { @@ -529,7 +463,7 @@ async fn process_lei_figi_file_batched( let entry: Value = serde_json::from_str(line) .context(format!("Failed to parse JSON on line {}", line_num + 1))?; - let figis: Vec = serde_json::from_value(entry["figis"].clone()) + let figis: Vec = serde_json::from_value(entry["figis"].clone()) .context("Invalid 'figis' field")?; if figis.is_empty() { @@ -552,7 +486,7 @@ async fn process_lei_figi_file_batched( if !warrant_securities.is_empty() { for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) { // Add to existing set immediately - let key = format!("{}::{}", entry.underlying_company_name, entry.warrant_type); + let key = entry.company_id.clone(); existing_warrants.insert(key); warrants_batch.push(entry); } @@ -561,7 +495,7 @@ async fn process_lei_figi_file_batched( if !option_securities.is_empty() { for entry in prepare_option_entries(&option_securities, existing_options) { // Add to existing set immediately - let key = format!("{}::{}", entry.underlying_company_name, entry.option_type); + let key = entry.company_name.clone(); existing_options.insert(key); options_batch.push(entry); } @@ -680,9 +614,9 @@ async fn write_batch_with_fsync( /// Prepares a common stock entry if it doesn't exist fn prepare_common_stock_entry( - figi_infos: &[FigiInfo], + figi_infos: &[FigiData], existing_keys: &HashSet, -) -> Option { +) -> Option { let name = figi_infos[0].name.clone(); if name.is_empty() || existing_keys.contains(&name) { return None; @@ -690,42 +624,78 @@ fn prepare_common_stock_entry( let grouped_by_isin = group_figis_by_isin(figi_infos); let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default(); + let id = format!("company_{}", std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos()); - Some(CompanyInfo { + Some(CompanyData { + id, name, primary_isin, securities: grouped_by_isin, + yahoo_company_data: None, }) } /// Prepares warrant entries for batching fn prepare_warrant_entries( - warrant_securities: &[FigiInfo], + warrant_securities: &[FigiData], existing_keys: &HashSet, -) -> Vec { +) -> Vec { let mut entries = Vec::new(); + // Group by underlying company + let mut grouped: HashMap> = HashMap::new(); + for figi in warrant_securities { - let (underlying, issuer, warrant_type) = parse_warrant_name(&figi.name); + let (underlying, _issuer, warrant_type) = parse_warrant_name(&figi.name); if underlying.is_empty() { continue; } - let key = format!("{}::{}", underlying, warrant_type); - if existing_keys.contains(&key) { + grouped.entry(underlying.clone()) + .or_default() + .push((warrant_type, figi.clone())); + } + + // Create WarrantData for each underlying company + for (underlying_company, contracts) in grouped { + if existing_keys.contains(&underlying_company) { continue; } - let warrant_info = WarrantInfo { - underlying_company_name: underlying.clone(), - issuer_company_name: issuer, - warrant_type: warrant_type.clone(), - warrants: { - let mut map = HashMap::new(); - map.insert(figi.isin.clone(), vec![figi.clone()]); - map - }, + let company_id = format!("warrant_{}", std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos()); + + let mut warrants_by_type: HashMap = HashMap::new(); + + for (warrant_type, figi) in contracts { + let (_, issuer, _) = parse_warrant_name(&figi.name); + + let warrant_detail = WarrantDetails { + company_id: company_id.clone(), + company_name: underlying_company.clone(), + issuer_company_name: issuer, + warrant_type: warrant_type.clone(), + warrants: { + let mut map = HashMap::new(); + map.insert(figi.isin.clone(), vec![figi.clone()]); + map + }, + }; + + let key = format!("{}_{}", underlying_company, warrant_type); + warrants_by_type.insert(key, warrant_detail); + } + + let warrant_info = WarrantData { + company_id, + company_name: underlying_company.clone(), + warrants: warrants_by_type, }; entries.push(warrant_info); @@ -735,36 +705,105 @@ fn prepare_warrant_entries( } /// Prepares option entries for batching +/// +/// Groups option contracts by underlying company, extracts strike prices and expiration dates, +/// and builds OptionChain structures organizing calls and puts by expiration date. +/// +/// # Arguments +/// * `option_securities` - List of FigiData objects for option contracts +/// * `existing_keys` - Set of already-processed keys (format: "company_name") +/// +/// # Returns +/// Vector of OptionData entries, one per unique underlying company fn prepare_option_entries( - option_securities: &[FigiInfo], + option_securities: &[FigiData], existing_keys: &HashSet, ) -> Vec { let mut entries = Vec::new(); + // Group by underlying company + let mut grouped: HashMap> = HashMap::new(); + for figi in option_securities { - let (underlying, issuer, option_type) = parse_option_name(&figi.name); + let (underlying, _issuer, option_type) = parse_option_name(&figi.name); if underlying.is_empty() { continue; } - let key = format!("{}::{}", underlying, option_type); - if existing_keys.contains(&key) { + grouped.entry(underlying.clone()) + .or_default() + .push((option_type, figi.clone())); + } + + // Create OptionData for each underlying company + for (underlying_company, contracts) in grouped { + if existing_keys.contains(&underlying_company) { continue; } - let option_info = OptionData { - underlying_company_name: underlying.clone(), - issuer_company_name: issuer, - option_type: option_type.clone(), - options: { - let mut map = HashMap::new(); - map.insert(figi.isin.clone(), vec![figi.clone()]); - map - }, + // Build OptionContracts and extract strikes/expirations + let mut option_contracts: HashMap, Vec)> = HashMap::new(); + let mut all_strikes: std::collections::HashSet = std::collections::HashSet::new(); + + for (option_type, figi) in contracts { + // Parse strike price from ticker if available + let strike = parse_strike_from_ticker(&figi.ticker).unwrap_or(0.0); + let expiration = parse_expiration_from_ticker(&figi.ticker).unwrap_or(0); + + if strike > 0.0 && expiration > 0 { + all_strikes.insert((strike * 100.0) as u64); + + let contract = OptionContract { + strike, + last_price: None, + bid: None, + ask: None, + volume: None, + open_interest: None, + implied_volatility: None, + }; + + let entry = option_contracts.entry(expiration).or_insert((Vec::new(), Vec::new())); + match option_type.as_str() { + "call" => entry.0.push(contract), + "put" => entry.1.push(contract), + _ => {} + } + } + } + + // Build OptionChains from contracts + let mut option_chains = Vec::new(); + let mut expiration_dates = Vec::new(); + + for (expiration, (calls, puts)) in option_contracts { + expiration_dates.push(expiration); + option_chains.push(OptionChain { + expiration_date: expiration, + calls, + puts, + }); + } + + expiration_dates.sort(); + option_chains.sort_by_key(|oc| oc.expiration_date); + + let strikes: Vec = all_strikes + .iter() + .map(|s| *s as f64 / 100.0) + .collect::>(); + + let option_data = OptionData { + company_id: underlying_company.clone(), + company_name: underlying_company.clone(), + expiration_dates, + strikes, + option: option_chains, + timestamp: chrono::Utc::now().timestamp(), }; - entries.push(option_info); + entries.push(option_data); } entries @@ -782,13 +821,13 @@ fn prepare_option_entries( /// # Returns /// Vector of CorporateBondInfo entries, one per unique issuer fn prepare_corporate_bond_entries( - corporate_bond_securities: &[FigiInfo], + corporate_bond_securities: &[FigiData], existing_keys: &HashSet, -) -> Vec { +) -> Vec { let mut entries = Vec::new(); // Group bonds by issuer (company name) - let mut grouped: HashMap> = HashMap::new(); + let mut grouped: HashMap> = HashMap::new(); for figi in corporate_bond_securities { let issuer = figi.name.clone(); @@ -819,7 +858,7 @@ fn prepare_corporate_bond_entries( } } - let bond_info = CorporateBondInfo { + let bond_info = CorporateBondData { underlying_company_name: issuer.clone(), bonds: bonds_by_isin, bond_details: bond_details_map, @@ -844,13 +883,13 @@ fn prepare_corporate_bond_entries( /// # Returns /// Vector of GovernmentBondInfo entries, one per unique issuer fn prepare_government_bond_entries( - government_bond_securities: &[FigiInfo], + government_bond_securities: &[FigiData], existing_keys: &HashSet, -) -> Vec { +) -> Vec { let mut entries = Vec::new(); // Group bonds by issuer (country/entity name) - let mut grouped: HashMap> = HashMap::new(); + let mut grouped: HashMap> = HashMap::new(); for figi in government_bond_securities { let issuer = figi.name.clone(); @@ -884,7 +923,7 @@ fn prepare_government_bond_entries( } } - let bond_info = GovernmentBondInfo { + let bond_info = GovernmentBondData { issuer_name: issuer.clone(), issuer_type, bonds: bonds_by_isin, @@ -898,12 +937,12 @@ fn prepare_government_bond_entries( } /// Groups FigiInfo list by security type -fn group_securities(figis: &[FigiInfo]) -> (Vec, Vec, Vec, Vec, Vec) { - let mut common_stocks:Vec = Vec::new(); - let mut warrants:Vec = Vec::new(); - let mut options:Vec = Vec::new(); - let mut corporate_bonds:Vec = Vec::new(); - let mut government_bonds:Vec = Vec::new(); +fn group_securities(figis: &[FigiData]) -> (Vec, Vec, Vec, Vec, Vec) { + let mut common_stocks:Vec = Vec::new(); + let mut warrants:Vec = Vec::new(); + let mut options:Vec = Vec::new(); + let mut corporate_bonds:Vec = Vec::new(); + let mut government_bonds:Vec = Vec::new(); for figi in figis { match figi.security_type.as_str() { @@ -923,8 +962,8 @@ fn group_securities(figis: &[FigiInfo]) -> (Vec, Vec, Vec HashMap> { - let mut grouped: HashMap> = HashMap::new(); +fn group_figis_by_isin(figi_infos: &[FigiData]) -> HashMap> { + let mut grouped: HashMap> = HashMap::new(); for figi_info in figi_infos { grouped.entry(figi_info.isin.clone()) @@ -994,33 +1033,6 @@ fn parse_warrant_name(name: &str) -> (String, Option, String) { (name.to_string(), None, warrant_type) } -/// Parse option name to extract underlying company, issuer, and option type -/// -/// Examples: -/// - "December 25 Calls on ALPHA GA" -> ("ALPHA GA", None, "call") -/// - "January 26 Puts on TESLA INC" -> ("TESLA INC", None, "put") -fn parse_option_name(name: &str) -> (String, Option, String) { - let name_upper = name.to_uppercase(); - - // Detect option type - let option_type = if name_upper.contains("CALL") { - "call".to_string() - } else if name_upper.contains("PUT") { - "put".to_string() - } else { - "unknown".to_string() - }; - - // Try to extract underlying after "on" - if let Some(pos) = name_upper.find(" ON ") { - let underlying = name[pos + 4..].trim().to_string(); - return (underlying, None, option_type); - } - - // Fallback: return entire name - (name.to_string(), None, option_type) -} - /// Statistics tracker for streaming processing #[derive(Debug)] struct StreamingStats { @@ -1104,33 +1116,6 @@ async fn load_market_sectors() -> anyhow::Result> { Ok(sectors) } -async fn determine_gleif_date( - gleif_date: Option<&str>, - paths: &DataPaths, -) -> anyhow::Result { - if let Some(d) = gleif_date { - return Ok(d.to_string()); - } - - let gleif_dir = paths.cache_gleif_dir(); - let mut entries = tokio_fs::read_dir(gleif_dir).await?; - let mut dates = Vec::new(); - - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.is_dir() { - if let Some(name) = path.file_name().and_then(|n| n.to_str()) { - if name.len() == 8 && name.chars().all(|c| c.is_numeric()) { - dates.push(name.to_string()); - } - } - } - } - - dates.sort(); - dates.last().cloned().ok_or_else(|| anyhow!("No GLEIF date found")) -} - async fn setup_sector_directories( date_dir: &Path, sector_dirs: &[String], @@ -1570,4 +1555,74 @@ async fn append_no_result_lei(date_dir: &Path, lei: &str, isins: &[String]) -> a file.write_all(line.as_bytes()).await?; Ok(()) -} \ No newline at end of file +} + +async fn process_and_save_figi_batch( + client: &OpenFigiClient, + lei_batch: &HashMap>, + date_dir: &Path, +) -> anyhow::Result<()> { + for (lei, isins) in lei_batch { + let unique_isins: Vec<_> = isins.iter() + .cloned() + .collect::>() + .into_iter() + .collect(); + + let figi_infos = client.map_isins_to_figi_infos(&unique_isins).await?; + + if figi_infos.is_empty() { + // No FIGIs found - save to no_results.jsonl to avoid re-querying + append_no_result_lei(date_dir, lei, &unique_isins).await?; + continue; + } + + // Save FIGIs by sector as before + save_figi_infos_by_sector(lei, &figi_infos, date_dir).await?; + } + + Ok(()) +} + +async fn save_figi_infos_by_sector( + lei: &str, + figi_infos: &[FigiData], + date_dir: &Path, +) -> anyhow::Result<()> { + let mut by_sector: HashMap> = HashMap::new(); + + for figi_info in figi_infos { + let sector = if figi_info.market_sector.is_empty() { + "uncategorized".to_string() + } else { + figi_info.market_sector.clone() + }; + by_sector.entry(sector).or_default().push(figi_info.clone()); + } + + for (sector, figis) in by_sector { + let sector_dir = date_dir.join(§or); + let path = sector_dir.join("lei_to_figi.jsonl"); + append_lei_to_figi_jsonl(&path, lei, &figis).await?; + } + + Ok(()) +} + +async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiData]) -> anyhow::Result<()> { + let entry = json!({ + "lei": lei, + "figis": figis, + }); + + let line = serde_json::to_string(&entry)? + "\n"; + + let mut file = tokio_fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + .await?; + + file.write_all(line.as_bytes()).await?; + Ok(()) +} diff --git a/src/corporate/yahoo_company_extraction.rs b/src/corporate/yahoo_company_extraction.rs index 0778c30..0e56e79 100644 --- a/src/corporate/yahoo_company_extraction.rs +++ b/src/corporate/yahoo_company_extraction.rs @@ -68,7 +68,7 @@ pub async fn scrape_company_details_by_isin( pool: &Arc, isin: &str, shutdown_flag: &Arc, -) -> anyhow::Result> { +) -> anyhow::Result> { // Check shutdown before starting if shutdown_flag.load(Ordering::SeqCst) { logger::log_warn(&format!("Shutdown detected, skipping ISIN: {}", isin)).await; @@ -167,7 +167,7 @@ pub async fn scrape_company_details_by_isin( async fn extract_company_details_validated( client: &Client, isin: &str, -) -> Result> { +) -> Result> { // Double-check URL is still correct before extraction let current_url = client.current_url().await?; if !current_url.as_str().contains(isin) { @@ -202,7 +202,7 @@ async fn extract_company_details_validated( pub async fn extract_company_details( client: &Client, _isin: &str, -) -> Result> { +) -> Result> { // Wait for page to load - look for either the table or the no-data element let wait_result: Result> = timeout( TokioDuration::from_secs(30), @@ -279,7 +279,7 @@ pub async fn extract_company_details( )).await; } - Ok(Some(YahooCompanyDetails { + Ok(Some(YahooCompanyData { ticker, sector: extraction.sector, exchange: extraction.exchange, diff --git a/src/lib.rs b/src/lib.rs index 8ea1791..51c85e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,5 @@ pub use scraper::webdriver::{ChromeDriverPool, ChromeInstance, ScrapeTask}; pub use util::logger; pub use util::macros; pub use scraper::yahoo::{ - YahooClient, YahooClientPool, QuoteSummaryModule, QuoteSummary, ChartData, - OptionData, SearchResult + YahooClient, YahooClientPool, QuoteSummaryModule, QuoteSummary, SearchResult }; diff --git a/src/scraper/hard_reset.rs b/src/scraper/hard_reset.rs index 33bd00c..c1c0c22 100644 --- a/src/scraper/hard_reset.rs +++ b/src/scraper/hard_reset.rs @@ -50,7 +50,7 @@ pub async fn perform_hard_reset( monitoring: &Option, shutdown_flag: &Arc, ) -> anyhow::Result<()> { - let number_proxy_instances = config.proxy_instances_per_certificate.unwrap_or(1); + //let number_proxy_instances = config.proxy_instances_per_certificate.unwrap_or(1); logger::log_error("🔴 STARTING HARD RESET SEQUENCE").await; // Check if shutdown was requested diff --git a/src/scraper/openfigi.rs b/src/scraper/openfigi.rs index 0c51c48..9adc2c3 100644 --- a/src/scraper/openfigi.rs +++ b/src/scraper/openfigi.rs @@ -40,7 +40,7 @@ impl OpenFigiClient { Ok(Self { client, has_key }) } - pub async fn map_isins_to_figi_infos(&self, isins: &[String]) -> anyhow::Result> { + pub async fn map_isins_to_figi_infos(&self, isins: &[String]) -> anyhow::Result> { if isins.is_empty() { return Ok(vec![]); } @@ -120,7 +120,7 @@ impl OpenFigiClient { if let Some(data) = result["data"].as_array() { for item in data { if let Some(figi) = item["figi"].as_str() { - all_figi_infos.push(FigiInfo { + all_figi_infos.push(FigiData { isin: isin.clone(), figi: figi.to_string(), name: item["name"].as_str().unwrap_or("").to_string(), @@ -206,7 +206,6 @@ async fn get_figi_security_type(client: &OpenFigiClient, cache_dir: &Path) -> an pub async fn load_figi_type_lists(paths: &DataPaths) -> anyhow::Result<()> { logger::log_info("Loading OpenFIGI mapping value lists...").await; - let state_path = paths.cache_dir().join("state.jsonl"); let cache_openfigi_dir = paths.cache_openfigi_dir(); tokio_fs::create_dir_all(cache_openfigi_dir).await .context("Failed to create data/openfigi directory")?; @@ -220,8 +219,6 @@ pub async fn load_figi_type_lists(paths: &DataPaths) -> anyhow::Result<()> { logger::log_info("OpenFIGI mapping value lists loaded successfully").await; - - Ok(()) } diff --git a/src/scraper/yahoo.rs b/src/scraper/yahoo.rs index 98b7881..42d4204 100644 --- a/src/scraper/yahoo.rs +++ b/src/scraper/yahoo.rs @@ -908,6 +908,7 @@ impl YahooClient { .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect()) .unwrap_or_default(); + // Collect all option contracts first let mut option_chains = Vec::new(); if let Some(option_array) = option["options"].as_array() { @@ -917,6 +918,7 @@ impl YahooClient { let calls = Self::parse_option_contracts(&opt["calls"]); let puts = Self::parse_option_contracts(&opt["puts"]); + // Build option chains option_chains.push(OptionChain { expiration_date: exp_date, calls, @@ -925,8 +927,15 @@ impl YahooClient { } } + // Generate company_id from symbol + let company_id = format!("option_{}", std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos()); + Ok(OptionData { - symbol: symbol.to_string(), + company_id, + company_name: symbol.to_string(), expiration_dates, strikes, option: option_chains,