moved structs to types.rs

This commit is contained in:
2026-01-12 18:50:44 +01:00
parent c0c9bc0ed9
commit 29d8f1d89e
15 changed files with 120 additions and 349 deletions

View File

@@ -2,24 +2,24 @@ digraph Dependencies {
rankdir=LR; rankdir=LR;
node [shape=box]; node [shape=box];
"yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete
Options data enriched for all companies"];
"lei_figi_mapping_complete" [label="lei_figi_mapping_complete
LEI-to-FIGI mappings from OpenFIGI API"];
"yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete
Chart data enriched for all companies"];
"enrichment_group" [label="enrichment_group
Yahoo exchanges collected and validated"];
"securities_data_complete" [label="securities_data_complete
Securities data built from FIGI mappings"];
"yahoo_companies_cleansed" [label="yahoo_companies_cleansed
Company data cleansed and validated"];
"yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete "yahoo_events_enrichment_complete" [label="yahoo_events_enrichment_complete
Corporate events enriched for all companies"]; Corporate events enriched for all companies"];
"yahoo_companies_cleansed" [label="yahoo_companies_cleansed
Company data cleansed and validated"];
"yahoo_options_enrichment_complete" [label="yahoo_options_enrichment_complete
Options data enriched for all companies"];
"securities_data_complete" [label="securities_data_complete
Securities data built from FIGI mappings"];
"enrichment_group" [label="enrichment_group
Yahoo exchanges collected and validated"];
"yahoo_chart_enrichment_complete" [label="yahoo_chart_enrichment_complete
Chart data enriched for all companies"];
"lei_figi_mapping_complete" [label="lei_figi_mapping_complete
LEI-to-FIGI mappings from OpenFIGI API"];
"yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"];
"yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"];
"securities_data_complete" -> "lei_figi_mapping_complete";
"yahoo_companies_cleansed" -> "securities_data_complete";
"yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"]; "yahoo_events_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"];
"yahoo_companies_cleansed" -> "securities_data_complete";
"yahoo_options_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"];
"securities_data_complete" -> "lei_figi_mapping_complete";
"yahoo_chart_enrichment_complete" -> "yahoo_companies_cleansed" [label="via group enrichment_group"];
} }

View File

@@ -1,195 +0,0 @@
// src/corporate/aggregation.rs
use super::types::CompanyPrice;
use super::storage::*;
use crate::util::directories::DataPaths;
use tokio::fs;
use std::collections::HashMap;
#[derive(Debug)]
struct DayData {
sources: Vec<(CompanyPrice, String)>, // (price, source_ticker)
total_volume: u64,
vwap: f64,
open: f64,
high: f64,
low: f64,
close: f64,
}
/// Aggregate price data from multiple exchanges, converting all to USD
pub async fn aggregate_best_price_data(paths: &DataPaths, lei: &str) -> anyhow::Result<()> {
let company_dir = get_company_dir(paths, lei);
for timeframe in ["daily", "5min"].iter() {
let source_dir = company_dir.join(timeframe);
if !source_dir.exists() {
continue;
}
let mut all_prices: Vec<(CompanyPrice, String)> = Vec::new();
let mut by_date_time: HashMap<String, DayData> = HashMap::new();
// Load all sources with their ticker names
let mut entries = tokio::fs::read_dir(&source_dir).await?;
let mut source_count = 0;
let mut sources_used = std::collections::HashSet::new();
while let Some(entry) = entries.next_entry().await? {
let source_dir_path = entry.path();
if !source_dir_path.is_dir() { continue; }
let source_ticker = source_dir_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let prices_path = source_dir_path.join("prices.json");
if !prices_path.exists() { continue; }
let content = tokio::fs::read_to_string(&prices_path).await?;
let prices: Vec<CompanyPrice> = serde_json::from_str(&content)?;
if !prices.is_empty() {
sources_used.insert(source_ticker.clone());
source_count += 1;
}
for price in prices {
all_prices.push((price, source_ticker.clone()));
}
}
if all_prices.is_empty() {
continue;
}
println!(" Aggregating from {} exchanges: {}",
sources_used.len(),
sources_used.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join(", ")
);
// Group by date + time (for 5min) or just date
for (p, source) in all_prices {
let key = if timeframe == &"5min" && !p.time.is_empty() {
format!("{}_{}", p.date, p.time)
} else {
p.date.clone()
};
// Convert to USD immediately DUMMY -------------------------------------------------------------------------------------------
let usd_rate = 0.1;
let mut p_usd = p.clone();
p_usd.open *= usd_rate;
p_usd.high *= usd_rate;
p_usd.low *= usd_rate;
p_usd.close *= usd_rate;
p_usd.adj_close *= usd_rate;
p_usd.currency = "USD".to_string();
let entry = by_date_time.entry(key.clone()).or_insert(DayData {
sources: vec![],
total_volume: 0,
vwap: 0.0,
open: p_usd.open,
high: p_usd.high,
low: p_usd.low,
close: p_usd.close,
});
let volume = p.volume.max(1); // avoid div0
let vwap_contrib = p_usd.close * volume as f64;
entry.sources.push((p_usd.clone(), source));
entry.total_volume += volume;
entry.vwap += vwap_contrib;
// Use first open, last close, max high, min low
if entry.sources.len() == 1 {
entry.open = p_usd.open;
}
entry.close = p_usd.close;
entry.high = entry.high.max(p_usd.high);
entry.low = entry.low.min(p_usd.low);
}
// Finalize aggregated data
let mut aggregated: Vec<CompanyPrice> = Vec::new();
for (key, data) in by_date_time {
let vwap = data.vwap / data.total_volume as f64;
let (date, time) = if key.contains('_') {
let parts: Vec<&str> = key.split('_').collect();
(parts[0].to_string(), parts[1].to_string())
} else {
(key, "".to_string())
};
// Track which exchange contributed most volume
let best_source = data.sources.iter()
.max_by_key(|(p, _)| p.volume)
.map(|(_, src)| src.clone())
.unwrap_or_else(|| "unknown".to_string());
aggregated.push(CompanyPrice {
ticker: format!("{lei}@agg"), // Mark as aggregated
date,
time,
open: data.open,
high: data.high,
low: data.low,
close: data.close,
adj_close: vwap,
volume: data.total_volume,
currency: "USD".to_string(),
});
}
aggregated.sort_by_key(|p| (p.date.clone(), p.time.clone()));
// Save aggregated result
let agg_dir = company_dir.join("aggregated").join(timeframe);
fs::create_dir_all(&agg_dir).await?;
let path = agg_dir.join("prices.json");
fs::write(&path, serde_json::to_string_pretty(&aggregated)?).await?;
// Save aggregation metadata
let meta = AggregationMetadata {
lei: lei.to_string(), // ← CHANGE THIS
timeframe: timeframe.to_string(),
sources: sources_used.into_iter().collect(),
total_bars: aggregated.len(),
date_range: (
aggregated.first().map(|p| p.date.clone()).unwrap_or_default(),
aggregated.last().map(|p| p.date.clone()).unwrap_or_default(),
),
aggregated_at: chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
};
let meta_path = agg_dir.join("metadata.json");
fs::write(&meta_path, serde_json::to_string_pretty(&meta)?).await?;
println!("{} {} bars from {} sources (USD)",
aggregated.len(),
timeframe,
source_count
);
}
Ok(())
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct AggregationMetadata {
lei: String,
timeframe: String,
sources: Vec<String>,
total_bars: usize,
date_range: (String, String),
aggregated_at: String,
}

View File

@@ -4,7 +4,7 @@
//! This module extracts common patterns used across multiple update modules //! This module extracts common patterns used across multiple update modules
//! to reduce code duplication and improve maintainability. //! to reduce code duplication and improve maintainability.
use super::types::CompanyCrossPlatformInfo; use super::types::CompanyCrossPlatformData;
use crate::util::logger; use crate::util::logger;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::{Path}; use std::path::{Path};
@@ -22,7 +22,7 @@ pub async fn load_checkpoint_with_log<P1, P2>(
checkpoint_path: P1, checkpoint_path: P1,
log_path: P2, log_path: P2,
checkpoint_desc: &str, checkpoint_desc: &str,
) -> Result<HashMap<String, CompanyCrossPlatformInfo>> ) -> Result<HashMap<String, CompanyCrossPlatformData>>
where where
P1: AsRef<Path>, P1: AsRef<Path>,
P2: AsRef<Path>, P2: AsRef<Path>,
@@ -30,7 +30,7 @@ where
let checkpoint_path = checkpoint_path.as_ref(); let checkpoint_path = checkpoint_path.as_ref();
let log_path = log_path.as_ref(); let log_path = log_path.as_ref();
let mut companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new(); let mut companies: HashMap<String, CompanyCrossPlatformData> = HashMap::new();
// Load checkpoint if it exists // Load checkpoint if it exists
if checkpoint_path.exists() { if checkpoint_path.exists() {
@@ -42,7 +42,7 @@ where
continue; // Skip incomplete lines continue; // Skip incomplete lines
} }
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) { match serde_json::from_str::<CompanyCrossPlatformData>(line) {
Ok(company) => { Ok(company) => {
companies.insert(company.name.clone(), company); companies.insert(company.name.clone(), company);
} }
@@ -65,7 +65,7 @@ where
continue; // Skip incomplete lines continue; // Skip incomplete lines
} }
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) { match serde_json::from_str::<CompanyCrossPlatformData>(line) {
Ok(company) => { Ok(company) => {
companies.insert(company.name.clone(), company); companies.insert(company.name.clone(), company);
replayed += 1; replayed += 1;
@@ -91,7 +91,7 @@ where
pub async fn consolidate_checkpoint<P1, P2>( pub async fn consolidate_checkpoint<P1, P2>(
checkpoint_path: P1, checkpoint_path: P1,
log_path: P2, log_path: P2,
companies: &HashMap<String, CompanyCrossPlatformInfo>, companies: &HashMap<String, CompanyCrossPlatformData>,
) -> Result<()> ) -> Result<()>
where where
P1: AsRef<Path>, P1: AsRef<Path>,

View File

@@ -2,7 +2,7 @@
use crate::util::directories::DataPaths; use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, file_reference}; use crate::util::integrity::{DataStage, StateManager, file_reference};
use crate::util::logger; use crate::util::logger;
use crate::scraper::yahoo::ChartData; use crate::corporate::types::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;

View File

@@ -4,18 +4,18 @@ use chrono::{Local, NaiveDate};
use rand::rngs::StdRng; use rand::rngs::StdRng;
use rand::prelude::{Rng, SeedableRng, IndexedRandom}; use rand::prelude::{Rng, SeedableRng, IndexedRandom};
pub fn event_key(e: &CompanyEvent) -> String { pub fn event_key(e: &CompanyEventData) -> String {
format!("{}|{}|{}", e.ticker, e.date, e.time) format!("{}|{}|{}", e.ticker, e.date, e.time)
} }
pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Vec<CompanyEventChange> { pub fn detect_changes(old: &CompanyEventData, new: &CompanyEventData, today: &str) -> Vec<CompanyEventChangeData> {
let mut changes = Vec::new(); let mut changes = Vec::new();
let ts = Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); let ts = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
if new.date.as_str() <= today { return changes; } if new.date.as_str() <= today { return changes; }
if old.time != new.time { if old.time != new.time {
changes.push(CompanyEventChange { changes.push(CompanyEventChangeData {
ticker: new.ticker.clone(), ticker: new.ticker.clone(),
date: new.date.clone(), date: new.date.clone(),
field_changed: "time".to_string(), field_changed: "time".to_string(),
@@ -26,7 +26,7 @@ pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Ve
} }
if old.eps_forecast != new.eps_forecast { if old.eps_forecast != new.eps_forecast {
changes.push(CompanyEventChange { changes.push(CompanyEventChangeData {
ticker: new.ticker.clone(), ticker: new.ticker.clone(),
date: new.date.clone(), date: new.date.clone(),
field_changed: "eps_forecast".to_string(), field_changed: "eps_forecast".to_string(),
@@ -37,7 +37,7 @@ pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Ve
} }
if old.eps_actual != new.eps_actual { if old.eps_actual != new.eps_actual {
changes.push(CompanyEventChange { changes.push(CompanyEventChangeData {
ticker: new.ticker.clone(), ticker: new.ticker.clone(),
date: new.date.clone(), date: new.date.clone(),
field_changed: "eps_actual".to_string(), field_changed: "eps_actual".to_string(),
@@ -52,14 +52,6 @@ pub fn detect_changes(old: &CompanyEvent, new: &CompanyEvent, today: &str) -> Ve
changes changes
} }
pub fn price_key(p: &CompanyPrice) -> String {
if p.time.is_empty() {
format!("{}|{}", p.ticker, p.date)
} else {
format!("{}|{}|{}", p.ticker, p.date, p.time)
}
}
pub fn parse_float(s: &str) -> Option<f64> { pub fn parse_float(s: &str) -> Option<f64> {
s.replace("--", "").replace(",", "").parse::<f64>().ok() s.replace("--", "").replace(",", "").parse::<f64>().ok()
} }
@@ -83,7 +75,7 @@ pub fn choose_random<T: Clone>(items: &[T]) -> T {
} }
/// Extract first valid Yahoo ticker from company /// Extract first valid Yahoo ticker from company
pub fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformInfo) -> Option<String> { pub fn extract_first_yahoo_ticker(company: &CompanyCrossPlatformData) -> Option<String> {
for tickers in company.isin_tickers_map.values() { for tickers in company.isin_tickers_map.values() {
for ticker in tickers { for ticker in tickers {
if ticker.starts_with("YAHOO:") if ticker.starts_with("YAHOO:")
@@ -113,7 +105,7 @@ pub fn sanitize_company_name(name: &str) -> String {
/// Load companies from JSONL file /// Load companies from JSONL file
pub async fn load_companies_from_jsonl( pub async fn load_companies_from_jsonl(
path: &std::path::Path path: &std::path::Path
) -> anyhow::Result<Vec<CompanyCrossPlatformInfo>> { ) -> anyhow::Result<Vec<CompanyCrossPlatformData>> {
let content = tokio::fs::read_to_string(path).await?; let content = tokio::fs::read_to_string(path).await?;
let mut companies = Vec::new(); let mut companies = Vec::new();
@@ -121,7 +113,7 @@ pub async fn load_companies_from_jsonl(
if line.trim().is_empty() { if line.trim().is_empty() {
continue; continue;
} }
if let Ok(company) = serde_json::from_str::<CompanyCrossPlatformInfo>(line) { if let Ok(company) = serde_json::from_str::<CompanyCrossPlatformData>(line) {
companies.push(company); companies.push(company);
} }
} }

View File

@@ -3,7 +3,6 @@ pub mod types;
pub mod scraper; pub mod scraper;
pub mod storage; pub mod storage;
pub mod helpers; pub mod helpers;
pub mod aggregation;
pub mod update_openfigi; pub mod update_openfigi;
pub mod yahoo_company_extraction; pub mod yahoo_company_extraction;
pub mod page_validation; pub mod page_validation;

View File

@@ -35,7 +35,7 @@ pub async fn build_event_index(paths: &DataPaths) -> anyhow::Result<Vec<EventInd
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if name.starts_with("events_") && name.len() == 17 { if name.starts_with("events_") && name.len() == 17 {
let content = fs::read_to_string(&path).await?; let content = fs::read_to_string(&path).await?;
let events: Vec<CompanyEvent> = serde_json::from_str(&content)?; let events: Vec<CompanyEventData> = serde_json::from_str(&content)?;
for event in events { for event in events {
index.push(EventIndex { index.push(EventIndex {

View File

@@ -3,7 +3,7 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CompanyEvent { pub struct CompanyEventData {
pub ticker: String, pub ticker: String,
pub date: String, // YYYY-MM-DD pub date: String, // YYYY-MM-DD
pub time: String, // "AMC", "BMO", "TAS", or "" pub time: String, // "AMC", "BMO", "TAS", or ""
@@ -17,21 +17,7 @@ pub struct CompanyEvent {
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyPrice { pub struct CompanyEventChangeData {
pub ticker: String,
pub date: String, // YYYY-MM-DD
pub time: String, // HH:MM:SS for intraday, "" for daily
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub adj_close: f64,
pub volume: u64,
pub currency: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyEventChange {
pub ticker: String, pub ticker: String,
pub date: String, pub date: String,
pub field_changed: String, // "time", "eps_forecast", "eps_actual", "new_event" pub field_changed: String, // "time", "eps_forecast", "eps_actual", "new_event"
@@ -40,6 +26,24 @@ pub struct CompanyEventChange {
pub detected_at: String, pub detected_at: String,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChartData {
pub symbol: String,
pub quotes: Vec<Quote>,
pub timestamp: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Quote {
pub timestamp: i64,
pub open: Option<f64>,
pub high: Option<f64>,
pub low: Option<f64>,
pub close: Option<f64>,
pub volume: Option<u64>,
pub adjusted_close: Option<f64>,
}
/// Figi Info based on API calls [https://www.openfigi.com/] /// Figi Info based on API calls [https://www.openfigi.com/]
/// # Attributes /// # Attributes
/// isin: ISIN belonging to this legal entity from lei /// isin: ISIN belonging to this legal entity from lei
@@ -87,7 +91,7 @@ pub struct YahooCompanyDetails {
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyCrossPlatformInfo { pub struct CompanyCrossPlatformData {
pub name: String, pub name: String,
pub isin_tickers_map: HashMap<String, Vec<String>>, // ISIN -> Tickers pub isin_tickers_map: HashMap<String, Vec<String>>, // ISIN -> Tickers
pub sector: Option<String>, pub sector: Option<String>,
@@ -109,18 +113,32 @@ pub struct WarrantInfo {
pub warrants: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN) pub warrants: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN)
} }
/// Option Info /// Options Info replaced by OptionData
///
/// Information for Option securities fetched out of Name in FigiInfo
/// example1: "name": "December 25 Calls on ALPHA GA",
/// issued by NULL Call Option for underlying company ALPHA GA
/// other formats like only on company instead of two, underlying and issuing company are the same, leave issuer_company_name NULL
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionInfo { pub struct OptionData {
pub underlying_company_name: String, // key in CompanyInfo, key for OptionInfo pub symbol: String,
pub issuer_company_name: Option<String>, // key in CompanyInfo pub expiration_dates: Vec<i64>,
pub option_type: String, // "put" or "call" pub strikes: Vec<f64>,
pub options: HashMap<String, Vec<FigiInfo>>, // ISIN -> Vec<FigiInfo> (grouped by ISIN) pub option: Vec<OptionChain>,
pub timestamp: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionChain {
pub expiration_date: i64,
pub calls: Vec<OptionContract>,
pub puts: Vec<OptionContract>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionContract {
pub strike: f64,
pub last_price: Option<f64>,
pub bid: Option<f64>,
pub ask: Option<f64>,
pub volume: Option<u64>,
pub open_interest: Option<u64>,
pub implied_volatility: Option<f64>,
} }
/// Bond parsed details from ticker/description /// Bond parsed details from ticker/description

View File

@@ -20,14 +20,14 @@ use anyhow::{anyhow, Result};
/// Represents a write command to be serialized through the log writer /// Represents a write command to be serialized through the log writer
enum LogCommand { enum LogCommand {
Write(CompanyCrossPlatformInfo), Write(CompanyCrossPlatformData),
Checkpoint, Checkpoint,
Shutdown, Shutdown,
} }
/// Result from processing a single company /// Result from processing a single company
struct CompanyProcessResult { struct CompanyProcessResult {
company: CompanyCrossPlatformInfo, company: CompanyCrossPlatformData,
is_update: bool, is_update: bool,
} }
@@ -36,7 +36,7 @@ struct CompanyProcessResult {
fn company_needs_processing( fn company_needs_processing(
company_name: &str, company_name: &str,
company_info: &CompanyInfo, company_info: &CompanyInfo,
existing_companies: &HashMap<String, CompanyCrossPlatformInfo>, existing_companies: &HashMap<String, CompanyCrossPlatformData>,
) -> bool { ) -> bool {
// If company not in existing data at all, definitely needs processing // If company not in existing data at all, definitely needs processing
let Some(existing_entry) = existing_companies.get(company_name) else { let Some(existing_entry) = existing_companies.get(company_name) else {
@@ -732,7 +732,7 @@ async fn scrape_with_retry(
async fn process_single_company_validated( async fn process_single_company_validated(
name: String, name: String,
company_info: CompanyInfo, company_info: CompanyInfo,
existing_entry: Option<CompanyCrossPlatformInfo>, existing_entry: Option<CompanyCrossPlatformData>,
pool: &Arc<ChromeDriverPool>, pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>, shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Option<CompanyProcessResult>> { ) -> anyhow::Result<Option<CompanyProcessResult>> {
@@ -867,7 +867,7 @@ async fn process_single_company_validated(
} }
if !isin_tickers_map.is_empty() { if !isin_tickers_map.is_empty() {
let company_entry = CompanyCrossPlatformInfo { let company_entry = CompanyCrossPlatformData {
name: name.clone(), name: name.clone(),
isin_tickers_map, isin_tickers_map,
sector, sector,

View File

@@ -20,15 +20,15 @@ use tokio::sync::mpsc;
/// Result of processing a single company /// Result of processing a single company
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum CompanyProcessResult { pub enum CompanyProcessResult {
Valid(CompanyCrossPlatformInfo), Valid(CompanyCrossPlatformData),
FilteredLowCap { name: String, market_cap: f64 }, FilteredLowCap { name: String, market_cap: f64 },
FilteredNoPrice { name: String }, FilteredNoPrice { name: String },
Failed { company: CompanyCrossPlatformInfo, error: String, is_transient: bool }, Failed { company: CompanyCrossPlatformData, error: String, is_transient: bool },
} }
/// Represents a write command to be serialized through the log writer /// Represents a write command to be serialized through the log writer
enum LogCommand { enum LogCommand {
Write(CompanyCrossPlatformInfo), Write(CompanyCrossPlatformData),
Checkpoint, Checkpoint,
Shutdown, Shutdown,
} }
@@ -81,7 +81,7 @@ pub async fn companies_yahoo_cleansed_no_data(paths: &DataPaths) -> Result<usize
total_count += 1; total_count += 1;
let company: CompanyCrossPlatformInfo = match serde_json::from_str(&line) { let company: CompanyCrossPlatformData = match serde_json::from_str(&line) {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await; logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await;
@@ -194,7 +194,7 @@ pub async fn companies_yahoo_cleansed_low_profile(
logger::log_info(" Cleansing companies with low Yahoo profile...").await; logger::log_info(" Cleansing companies with low Yahoo profile...").await;
// === RECOVERY PHASE: Load checkpoint + replay log === // === RECOVERY PHASE: Load checkpoint + replay log ===
let mut existing_companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new(); let mut existing_companies: HashMap<String, CompanyCrossPlatformData> = HashMap::new();
let mut processed_names: std::collections::HashSet<String> = std::collections::HashSet::new(); let mut processed_names: std::collections::HashSet<String> = std::collections::HashSet::new();
if checkpoint_path.exists() { if checkpoint_path.exists() {
@@ -206,7 +206,7 @@ pub async fn companies_yahoo_cleansed_low_profile(
continue; // Skip incomplete lines continue; // Skip incomplete lines
} }
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) { match serde_json::from_str::<CompanyCrossPlatformData>(line) {
Ok(company) => { Ok(company) => {
processed_names.insert(company.name.clone()); processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company); existing_companies.insert(company.name.clone(), company);
@@ -229,7 +229,7 @@ pub async fn companies_yahoo_cleansed_low_profile(
continue; // Skip incomplete lines continue; // Skip incomplete lines
} }
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) { match serde_json::from_str::<CompanyCrossPlatformData>(line) {
Ok(company) => { Ok(company) => {
processed_names.insert(company.name.clone()); processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company); existing_companies.insert(company.name.clone(), company);
@@ -251,7 +251,7 @@ pub async fn companies_yahoo_cleansed_low_profile(
logger::log_info(&format!("Loaded {} companies from input", input_companies.len())).await; logger::log_info(&format!("Loaded {} companies from input", input_companies.len())).await;
// === BUILD PENDING LIST (smart skip logic) === // === BUILD PENDING LIST (smart skip logic) ===
let mut pending: Vec<CompanyCrossPlatformInfo> = input_companies let mut pending: Vec<CompanyCrossPlatformData> = input_companies
.into_iter() .into_iter()
.filter(|company| company_needs_processing(company, &existing_companies)) .filter(|company| company_needs_processing(company, &existing_companies))
.collect(); .collect();
@@ -608,7 +608,7 @@ pub async fn companies_yahoo_cleansed_low_profile(
/// Helper function to spawn a validation task (reduces code duplication) /// Helper function to spawn a validation task (reduces code duplication)
fn spawn_validation_task( fn spawn_validation_task(
company: CompanyCrossPlatformInfo, company: CompanyCrossPlatformData,
yahoo_pool: &Arc<YahooClientPool>, yahoo_pool: &Arc<YahooClientPool>,
paths: &Arc<DataPaths>, paths: &Arc<DataPaths>,
write_tx: &mpsc::Sender<LogCommand>, write_tx: &mpsc::Sender<LogCommand>,
@@ -688,7 +688,7 @@ fn spawn_validation_task(
/// Process a single company with full error categorization /// Process a single company with full error categorization
async fn process_company_with_validation( async fn process_company_with_validation(
company: &CompanyCrossPlatformInfo, company: &CompanyCrossPlatformData,
yahoo_pool: &Arc<YahooClientPool>, yahoo_pool: &Arc<YahooClientPool>,
paths: &DataPaths, paths: &DataPaths,
) -> CompanyProcessResult { ) -> CompanyProcessResult {
@@ -897,8 +897,8 @@ async fn save_company_core_data(
/// Check if a company needs processing (validation check) /// Check if a company needs processing (validation check)
fn company_needs_processing( fn company_needs_processing(
company: &CompanyCrossPlatformInfo, company: &CompanyCrossPlatformData,
existing_companies: &HashMap<String, CompanyCrossPlatformInfo>, existing_companies: &HashMap<String, CompanyCrossPlatformData>,
) -> bool { ) -> bool {
// If company exists in cleaned output, skip it // If company exists in cleaned output, skip it
!existing_companies.contains_key(&company.name) !existing_companies.contains_key(&company.name)

View File

@@ -29,7 +29,7 @@ enum LogCommand {
/// Type alias for enrichment function /// Type alias for enrichment function
type EnrichmentFn = Arc< type EnrichmentFn = Arc<
dyn Fn(CompanyCrossPlatformInfo, Arc<YahooClientPool>, DataPaths) dyn Fn(CompanyCrossPlatformData, Arc<YahooClientPool>, DataPaths)
-> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
+ Send + Send
+ Sync + Sync
@@ -104,7 +104,7 @@ pub async fn enrich_companies_with_events(
logger::log_info(&format!("Found {} companies to process", total_companies)).await; logger::log_info(&format!("Found {} companies to process", total_companies)).await;
// Filter companies that need enrichment // Filter companies that need enrichment
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies let pending_companies: Vec<CompanyCrossPlatformData> = companies
.into_iter() .into_iter()
.filter(|company| !enriched_companies.contains(&company.name)) .filter(|company| !enriched_companies.contains(&company.name))
.collect(); .collect();
@@ -140,7 +140,7 @@ pub async fn enrich_companies_with_events(
let failed_count = Arc::new(AtomicUsize::new(0)); let failed_count = Arc::new(AtomicUsize::new(0));
// Log writer channel with batching and fsync // Log writer channel with batching and fsync
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000); let (log_tx, log_rx) = mpsc::channel::<LogCommand>(1000);
// Spawn log writer task // Spawn log writer task
let log_writer_handle = spawn_log_writer( let log_writer_handle = spawn_log_writer(
@@ -283,7 +283,7 @@ async fn track_events_completion(
/// Enrich a single company with event data /// Enrich a single company with event data
async fn enrich_company_with_events( async fn enrich_company_with_events(
company: &CompanyCrossPlatformInfo, company: &CompanyCrossPlatformData,
yahoo_pool: &Arc<YahooClientPool>, yahoo_pool: &Arc<YahooClientPool>,
paths: &DataPaths, paths: &DataPaths,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@@ -438,7 +438,7 @@ pub async fn enrich_companies_with_option(
logger::log_info(&format!("Found {} companies to process", total_companies)).await; logger::log_info(&format!("Found {} companies to process", total_companies)).await;
// Filter companies that need enrichment // Filter companies that need enrichment
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies let pending_companies: Vec<CompanyCrossPlatformData> = companies
.into_iter() .into_iter()
.filter(|company| !enriched_companies.contains(&company.name)) .filter(|company| !enriched_companies.contains(&company.name))
.collect(); .collect();
@@ -474,7 +474,7 @@ pub async fn enrich_companies_with_option(
let failed_count = Arc::new(AtomicUsize::new(0)); let failed_count = Arc::new(AtomicUsize::new(0));
// Log writer channel with batching and fsync // Log writer channel with batching and fsync
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000); let (log_tx, log_rx) = mpsc::channel::<LogCommand>(1000);
// Spawn log writer task // Spawn log writer task
let log_writer_handle = spawn_log_writer( let log_writer_handle = spawn_log_writer(
@@ -605,7 +605,7 @@ async fn track_option_completion(
/// Enrich a single company with option data /// Enrich a single company with option data
async fn enrich_company_with_option( async fn enrich_company_with_option(
company: &CompanyCrossPlatformInfo, company: &CompanyCrossPlatformData,
yahoo_pool: &Arc<YahooClientPool>, yahoo_pool: &Arc<YahooClientPool>,
paths: &DataPaths, paths: &DataPaths,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@@ -697,7 +697,7 @@ pub async fn enrich_companies_with_chart(
logger::log_info(&format!("Found {} companies to process", total_companies)).await; logger::log_info(&format!("Found {} companies to process", total_companies)).await;
// Filter companies that need enrichment // Filter companies that need enrichment
let pending_companies: Vec<CompanyCrossPlatformInfo> = companies let pending_companies: Vec<CompanyCrossPlatformData> = companies
.into_iter() .into_iter()
.filter(|company| !enriched_companies.contains(&company.name)) .filter(|company| !enriched_companies.contains(&company.name))
.collect(); .collect();
@@ -733,7 +733,7 @@ pub async fn enrich_companies_with_chart(
let failed_count = Arc::new(AtomicUsize::new(0)); let failed_count = Arc::new(AtomicUsize::new(0));
// Log writer channel with batching and fsync // Log writer channel with batching and fsync
let (log_tx, mut log_rx) = mpsc::channel::<LogCommand>(1000); let (log_tx, log_rx) = mpsc::channel::<LogCommand>(1000);
// Spawn log writer task // Spawn log writer task
let log_writer_handle = spawn_log_writer( let log_writer_handle = spawn_log_writer(
@@ -864,7 +864,7 @@ async fn track_chart_completion(
/// Enrich a single company with chart data /// Enrich a single company with chart data
async fn enrich_company_with_chart( async fn enrich_company_with_chart(
company: &CompanyCrossPlatformInfo, company: &CompanyCrossPlatformData,
yahoo_pool: &Arc<YahooClientPool>, yahoo_pool: &Arc<YahooClientPool>,
paths: &DataPaths, paths: &DataPaths,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@@ -1005,7 +1005,7 @@ fn spawn_log_writer(
/// - `shutdown_flag`: Flag to signal shutdown /// - `shutdown_flag`: Flag to signal shutdown
/// - `enrichment_fn`: The specific enrichment function to call (events, option, chart, etc.) /// - `enrichment_fn`: The specific enrichment function to call (events, option, chart, etc.)
fn spawn_enrichment_task( fn spawn_enrichment_task(
company: CompanyCrossPlatformInfo, company: CompanyCrossPlatformData,
yahoo_pool: Arc<YahooClientPool>, yahoo_pool: Arc<YahooClientPool>,
paths: DataPaths, paths: DataPaths,
processed_count: Arc<AtomicUsize>, processed_count: Arc<AtomicUsize>,

View File

@@ -517,7 +517,7 @@ async fn process_lei_figi_file_batched(
let mut common_batch: Vec<CompanyInfo> = Vec::new(); let mut common_batch: Vec<CompanyInfo> = Vec::new();
let mut warrants_batch: Vec<WarrantInfo> = Vec::new(); let mut warrants_batch: Vec<WarrantInfo> = Vec::new();
let mut options_batch: Vec<OptionInfo> = Vec::new(); let mut options_batch: Vec<OptionData> = Vec::new();
let mut corporate_bonds_batch: Vec<CorporateBondInfo> = Vec::new(); let mut corporate_bonds_batch: Vec<CorporateBondInfo> = Vec::new();
let mut government_bonds_batch: Vec<GovernmentBondInfo> = Vec::new(); let mut government_bonds_batch: Vec<GovernmentBondInfo> = Vec::new();
@@ -538,7 +538,7 @@ async fn process_lei_figi_file_batched(
// Group by security type // Group by security type
let (common_stocks, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) = let (common_stocks, warrant_securities, option_securities, corporate_bonds_securities, government_bonds_securities) =
group_by_security_type(&figis); group_securities(&figis);
// Collect entries for batching and update existing keys // Collect entries for batching and update existing keys
if !common_stocks.is_empty() { if !common_stocks.is_empty() {
@@ -738,7 +738,7 @@ fn prepare_warrant_entries(
fn prepare_option_entries( fn prepare_option_entries(
option_securities: &[FigiInfo], option_securities: &[FigiInfo],
existing_keys: &HashSet<String>, existing_keys: &HashSet<String>,
) -> Vec<OptionInfo> { ) -> Vec<OptionData> {
let mut entries = Vec::new(); let mut entries = Vec::new();
for figi in option_securities { for figi in option_securities {
@@ -753,7 +753,7 @@ fn prepare_option_entries(
continue; continue;
} }
let option_info = OptionInfo { let option_info = OptionData {
underlying_company_name: underlying.clone(), underlying_company_name: underlying.clone(),
issuer_company_name: issuer, issuer_company_name: issuer,
option_type: option_type.clone(), option_type: option_type.clone(),
@@ -898,7 +898,7 @@ fn prepare_government_bond_entries(
} }
/// Groups FigiInfo list by security type /// Groups FigiInfo list by security type
fn group_by_security_type(figis: &[FigiInfo]) -> (Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>) { fn group_securities(figis: &[FigiInfo]) -> (Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>) {
let mut common_stocks:Vec<FigiInfo> = Vec::new(); let mut common_stocks:Vec<FigiInfo> = Vec::new();
let mut warrants:Vec<FigiInfo> = Vec::new(); let mut warrants:Vec<FigiInfo> = Vec::new();
let mut options:Vec<FigiInfo> = Vec::new(); let mut options:Vec<FigiInfo> = Vec::new();

View File

@@ -303,7 +303,7 @@ pub async fn get_all_tickers_from_companies_jsonl(paths: &DataPaths) -> anyhow::
let content = tokio::fs::read_to_string(companies_file).await?; let content = tokio::fs::read_to_string(companies_file).await?;
let mut tickers = Vec::new(); let mut tickers = Vec::new();
for line in content.lines() { for line in content.lines() {
let company: CompanyCrossPlatformInfo = serde_json::from_str(line)?; let company: CompanyCrossPlatformData = serde_json::from_str(line)?;
for (_isin, ticker_vec) in company.isin_tickers_map { for (_isin, ticker_vec) in company.isin_tickers_map {
tickers.extend(ticker_vec); tickers.extend(ticker_vec);
} }
@@ -314,7 +314,7 @@ pub async fn get_all_tickers_from_companies_jsonl(paths: &DataPaths) -> anyhow::
pub async fn fetch_earnings_with_pool( pub async fn fetch_earnings_with_pool(
pool: &Arc<ChromeDriverPool>, pool: &Arc<ChromeDriverPool>,
ticker: &str, ticker: &str,
) -> anyhow::Result<Vec<CompanyEvent>> { ) -> anyhow::Result<Vec<CompanyEventData>> {
let ticker = ticker.to_string(); let ticker = ticker.to_string();
let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}&offset=0&size=100", ticker); let url = format!("https://finance.yahoo.com/calendar/earnings?symbol={}&offset=0&size=100", ticker);
@@ -329,7 +329,7 @@ pub async fn fetch_earnings_with_pool(
}).await }).await
} }
pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result<Vec<CompanyEvent>> { pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result<Vec<CompanyEventData>> {
// Wait for the table to load // Wait for the table to load
let table = client let table = client
.wait() .wait()
@@ -403,7 +403,7 @@ pub async fn extract_earnings_events(client: &Client, ticker: &str) -> Result<Ve
None None
}; };
events.push(CompanyEvent { events.push(CompanyEventData {
ticker: ticker.to_string(), ticker: ticker.to_string(),
date, date,
time, time,

View File

@@ -3,7 +3,8 @@ use crate::config::Config;
use crate::util::directories::DataPaths; use crate::util::directories::DataPaths;
use crate::util::integrity::{DataStage, StateManager, directory_reference}; use crate::util::integrity::{DataStage, StateManager, directory_reference};
use crate::util::logger; use crate::util::logger;
use crate::scraper::yahoo::{YahooClientPool, ChartData}; use crate::scraper::yahoo::{YahooClientPool};
use crate::corporate::types::*;
use std::result::Result::Ok; use std::result::Result::Ok;
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};

View File

@@ -2,6 +2,7 @@
use super::docker_vpn_proxy::DockerVpnProxyPool; use super::docker_vpn_proxy::DockerVpnProxyPool;
use crate::config::Config; use crate::config::Config;
use crate::util::logger; use crate::util::logger;
use crate::corporate::types::*;
use anyhow::{Context, Result, anyhow}; use anyhow::{Context, Result, anyhow};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
@@ -191,51 +192,6 @@ pub struct QuoteSummary {
pub timestamp: i64, pub timestamp: i64,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChartData {
pub symbol: String,
pub quotes: Vec<Quote>,
pub timestamp: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Quote {
pub timestamp: i64,
pub open: Option<f64>,
pub high: Option<f64>,
pub low: Option<f64>,
pub close: Option<f64>,
pub volume: Option<u64>,
pub adjusted_close: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionData {
pub symbol: String,
pub expiration_dates: Vec<i64>,
pub strikes: Vec<f64>,
pub option: Vec<OptionChain>,
pub timestamp: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionChain {
pub expiration_date: i64,
pub calls: Vec<OptionContract>,
pub puts: Vec<OptionContract>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionContract {
pub strike: f64,
pub last_price: Option<f64>,
pub bid: Option<f64>,
pub ask: Option<f64>,
pub volume: Option<u64>,
pub open_interest: Option<u64>,
pub implied_volatility: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchResult { pub struct SearchResult {
pub symbol: String, pub symbol: String,
@@ -279,8 +235,8 @@ impl YahooClient {
let client = ClientBuilder::new() let client = ClientBuilder::new()
.proxy(proxy) .proxy(proxy)
.timeout(Duration::from_secs(30)) // CHANGED: Reduced from 90s to 30s .timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(10)) // CHANGED: Reduced from 30s to 10s .connect_timeout(Duration::from_secs(10))
.pool_max_idle_per_host(2) .pool_max_idle_per_host(2)
.pool_idle_timeout(Duration::from_secs(60)) .pool_idle_timeout(Duration::from_secs(60))
.cookie_store(true) .cookie_store(true)