Compare commits

...

4 Commits

Author SHA1 Message Date
86944a9c58 cleaned yahoo hits 2025-12-24 00:00:21 +01:00
f9f09d0291 added working hard reset 2025-12-23 15:07:40 +01:00
fb0876309f added hard reset for navigation timeout after 3 hours 2025-12-22 00:31:28 +01:00
c01b47000f removed serial data scraping for yahoo tickers 2025-12-19 16:58:22 +01:00
14 changed files with 2391 additions and 886 deletions

View File

@@ -14,8 +14,8 @@ CORPORATE_START_DATE=2010-01-01
# How far into the future we scrape economic events (in months) # How far into the future we scrape economic events (in months)
ECONOMIC_LOOKAHEAD_MONTHS=3 ECONOMIC_LOOKAHEAD_MONTHS=3
# Maximum number of parallel scraping tasks (default: 10) # Maximum number of parallel scraping tasks (default: 4)
MAX_PARALLEL_TASKS=10 MAX_PARALLEL_INSTANCES=10
# ===== VPN ROTATION (ProtonVPN Integration) ===== # ===== VPN ROTATION (ProtonVPN Integration) =====
# Enable automatic VPN rotation between sessions? # Enable automatic VPN rotation between sessions?
@@ -37,4 +37,6 @@ TASKS_PER_VPN_SESSION=50
MAX_REQUESTS_PER_SESSION=25 MAX_REQUESTS_PER_SESSION=25
MIN_REQUEST_INTERVAL_MS=300 MIN_REQUEST_INTERVAL_MS=300
MAX_RETRY_ATTEMPTS=3 MAX_RETRY_ATTEMPTS=3
PROXY_INSTANCES_PER_CERTIFICATE=2

1
.gitignore vendored
View File

@@ -35,6 +35,7 @@ target/
**/*.log **/*.log
**/*.ovpn **/*.ovpn
**/*.tmp **/*.tmp
**/*.txt
#/economic_events* #/economic_events*
#/economic_event_changes* #/economic_event_changes*

View File

@@ -27,6 +27,9 @@ pub struct Config {
#[serde(default = "default_max_retry_attempts")] #[serde(default = "default_max_retry_attempts")]
pub max_retry_attempts: u32, pub max_retry_attempts: u32,
#[serde(default = "default_proxy_instances_per_certificate")]
pub proxy_instances_per_certificate: Option<usize>,
} }
fn default_enable_vpn_rotation() -> bool { fn default_enable_vpn_rotation() -> bool {
@@ -47,6 +50,10 @@ fn default_min_request_interval_ms() -> u64 {
fn default_max_retry_attempts() -> u32 { 3 } fn default_max_retry_attempts() -> u32 { 3 }
fn default_proxy_instances_per_certificate() -> Option<usize> {
Some(1)
}
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
@@ -59,6 +66,7 @@ impl Default for Config {
min_request_interval_ms: default_min_request_interval_ms(), min_request_interval_ms: default_min_request_interval_ms(),
max_retry_attempts: default_max_retry_attempts(), max_retry_attempts: default_max_retry_attempts(),
enable_vpn_rotation: false, enable_vpn_rotation: false,
proxy_instances_per_certificate: default_proxy_instances_per_certificate(),
} }
} }
} }
@@ -112,6 +120,11 @@ impl Config {
.parse() .parse()
.context("Failed to parse MAX_RETRY_ATTEMPTS as u32")?; .context("Failed to parse MAX_RETRY_ATTEMPTS as u32")?;
let proxy_instances_per_certificate: Option<usize> = match dotenvy::var("PROXY_INSTANCES_PER_CERTIFICATE") {
Ok(val) => Some(val.parse().context("Failed to parse PROXY_INSTANCES_PER_CERTIFICATE as usize")?),
Err(_) => Some(1),
};
Ok(Self { Ok(Self {
economic_start_date, economic_start_date,
corporate_start_date, corporate_start_date,
@@ -122,6 +135,7 @@ impl Config {
max_requests_per_session, max_requests_per_session,
min_request_interval_ms, min_request_interval_ms,
max_retry_attempts, max_retry_attempts,
proxy_instances_per_certificate,
}) })
} }

View File

@@ -223,58 +223,6 @@ async fn append_lei_to_figi_jsonl(path: &Path, lei: &str, figis: &[FigiInfo]) ->
Ok(()) Ok(())
} }
/// STREAMING: Build securities without loading everything into memory
pub async fn build_securities_from_figi_streaming(
date_dir: &Path,
) -> anyhow::Result<()> {
logger::log_info("Building securities (streaming mode)...").await;
// Load existing incrementally
let mut commons = load_from_cache_if_exists::<HashMap<String, CompanyInfo>>(
"data/corporate/by_name/common_stocks.json"
).await?;
let equity_file = date_dir.join("Equity").join("lei_to_figi.jsonl");
if !equity_file.exists() {
logger::log_warn("No Equity FIGI file found").await;
return Ok(());
}
let content = tokio_fs::read_to_string(&equity_file).await?;
let mut processed = 0;
let mut stats = ProcessingStats::new(commons.len(), 0, 0);
for line in content.lines() {
if line.trim().is_empty() {
continue;
}
let entry: Value = serde_json::from_str(line)?;
let figi_infos: Vec<FigiInfo> = serde_json::from_value(entry["figis"].clone())?;
// Process only common stocks
let common_stocks: Vec<_> = figi_infos.iter()
.filter(|f| f.security_type == "Common Stock")
.cloned()
.collect();
if !common_stocks.is_empty() {
process_common_stocks(&mut commons, &common_stocks, &mut stats);
}
processed += 1;
if processed % 100 == 0 {
tokio::task::yield_now().await;
}
}
logger::log_info(&format!("Processed {} FIGI entries", processed)).await;
save_to_cache("data/corporate/by_name/common_stocks.json", &commons).await?;
Ok(())
}
/// Handles rate limit responses from the OpenFIGI API. /// Handles rate limit responses from the OpenFIGI API.
/// ///
/// If a 429 status is received, this function sleeps for the duration specified /// If a 429 status is received, this function sleeps for the duration specified
@@ -310,56 +258,599 @@ async fn handle_rate_limit(resp: &reqwest::Response) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
fn process_common_stocks( /// Loads or builds securities data by streaming through FIGI mapping files.
companies: &mut HashMap<String, CompanyInfo>, ///
figi_infos: &[FigiInfo], /// Implements abort-safe incremental persistence with checkpoints and replay logs.
stats: &mut ProcessingStats, ///
) { /// # Arguments
let name = figi_infos[0].name.clone(); /// * `date_dir` - Path to the date-specific mapping directory (e.g., cache/gleif_openfigi_map/24112025/)
if name.is_empty() { ///
return; /// # Returns
/// Ok(()) on success.
///
/// # Errors
/// Returns an error if file I/O fails or JSON parsing fails.
pub async fn load_or_build_all_securities(date_dir: &Path) -> anyhow::Result<()> {
logger::log_info("Building securities data from FIGI mappings...").await;
let dir = DataPaths::new(".")?;
let data_dir = dir.data_dir();
let corporate_data_dir = data_dir.join("corporate");
let output_dir = corporate_data_dir.join("by_name");
tokio_fs::create_dir_all(&output_dir).await
.context("Failed to create corporate/by_name directory")?;
// Setup checkpoint and log paths for each security type
let common_checkpoint = output_dir.join("common_stocks.jsonl");
let common_log = output_dir.join("common_stocks.log.jsonl");
let warrants_checkpoint = output_dir.join("warrants.jsonl");
let warrants_log = output_dir.join("warrants.log.jsonl");
let options_checkpoint = output_dir.join("options.jsonl");
let options_log = output_dir.join("options.log.jsonl");
// Track which sectors have been fully processed
let processed_sectors_file = output_dir.join("state.jsonl");
let processed_sectors = load_processed_sectors(&processed_sectors_file).await?;
logger::log_info(&format!(" Already processed {} sectors", processed_sectors.len())).await;
// Collect sectors to process
let mut sectors_to_process = Vec::new();
let mut entries = tokio_fs::read_dir(date_dir).await
.context("Failed to read date directory")?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if !path.is_dir() {
continue;
}
let sector_name = path.file_name()
.and_then(|n| n.to_str())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string());
let lei_figi_file = path.join("lei_to_figi.jsonl");
if !lei_figi_file.exists() {
continue;
}
// Skip if already processed
if processed_sectors.contains(&sector_name) {
logger::log_info(&format!(" Skipping already processed sector: {}", sector_name)).await;
continue;
}
sectors_to_process.push((sector_name, lei_figi_file));
} }
let grouped_by_isin = group_by_isin(figi_infos); if sectors_to_process.is_empty() {
logger::log_info(" All sectors already processed, nothing to do").await;
if let Some(existing) = companies.get_mut(&name) { return Ok(());
let mut updated = false;
for (isin, new_figis) in grouped_by_isin {
if let Some(existing_figis) = existing.securities.get_mut(&isin) {
let merged = merge_figi_list(existing_figis, &new_figis);
if merged.len() > existing_figis.len() {
*existing_figis = merged;
updated = true;
}
} else {
existing.securities.insert(isin.clone(), new_figis);
updated = true;
}
}
if existing.primary_isin.is_empty() {
if let Some(first_isin) = existing.securities.keys().next() {
existing.primary_isin = first_isin.clone();
}
}
if updated {
stats.companies_updated += 1;
}
} else {
let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default();
companies.insert(name.clone(), CompanyInfo {
name,
primary_isin,
securities: grouped_by_isin,
});
stats.companies_added += 1;
} }
// Load checkpoints and replay logs - these are MUTABLE now
let mut existing_companies = load_checkpoint_and_replay(&common_checkpoint, &common_log, "name").await?;
let mut existing_warrants = load_checkpoint_and_replay_nested(&warrants_checkpoint, &warrants_log).await?;
let mut existing_options = load_checkpoint_and_replay_nested(&options_checkpoint, &options_log).await?;
logger::log_info(&format!(" Existing entries - Companies: {}, Warrants: {}, Options: {}",
existing_companies.len(), existing_warrants.len(), existing_options.len())).await;
// Process statistics
let mut stats = StreamingStats::new(
existing_companies.len(),
existing_warrants.len(),
existing_options.len()
);
logger::log_info(&format!(" Found {} sectors to process", sectors_to_process.len())).await;
// Process each sector
let mut newly_processed_sectors = Vec::new();
for (sector_name, lei_figi_file) in sectors_to_process {
logger::log_info(&format!(" Processing sector: {}", sector_name)).await;
// Stream through the lei_to_figi.jsonl file with batched writes
process_lei_figi_file_batched(
&lei_figi_file,
&common_log,
&warrants_log,
&options_log,
&mut existing_companies,
&mut existing_warrants,
&mut existing_options,
&mut stats,
).await?;
// Mark sector as processed
newly_processed_sectors.push(sector_name.clone());
// Append to processed sectors file immediately for crash safety
append_processed_sector(&processed_sectors_file, &sector_name).await?;
}
// Create checkpoints after all processing
if !newly_processed_sectors.is_empty() {
logger::log_info("Creating checkpoints...").await;
create_checkpoint(&common_checkpoint, &common_log).await?;
create_checkpoint(&warrants_checkpoint, &warrants_log).await?;
create_checkpoint(&options_checkpoint, &options_log).await?;
}
stats.print_summary();
logger::log_info(&format!("✓ Processed {} new sectors successfully", newly_processed_sectors.len())).await;
Ok(())
} }
fn group_by_isin(figi_infos: &[FigiInfo]) -> HashMap<String, Vec<FigiInfo>> { /// Loads the list of sectors that have been fully processed
async fn load_processed_sectors(path: &Path) -> anyhow::Result<HashSet<String>> {
let mut sectors = HashSet::new();
if !path.exists() {
return Ok(sectors);
}
let content = tokio_fs::read_to_string(path).await
.context("Failed to read processed sectors file")?;
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
match serde_json::from_str::<Value>(line) {
Ok(entry) => {
if let Some(sector) = entry["sector"].as_str() {
sectors.insert(sector.to_string());
}
}
Err(e) => {
logger::log_warn(&format!(
"Skipping invalid processed sector line {}: {}",
line_num + 1, e
)).await;
}
}
}
Ok(sectors)
}
/// Appends a sector name to the processed sectors file with fsync
/// Appends a sector name to the processed sectors JSONL file with fsync
async fn append_processed_sector(path: &Path, sector_name: &str) -> anyhow::Result<()> {
use std::fs::OpenOptions;
use std::io::Write;
let entry = json!({
"sector": sector_name,
"completed_at": chrono::Utc::now().to_rfc3339(),
});
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.context("Failed to open processed sectors file")?;
let line = serde_json::to_string(&entry)
.context("Failed to serialize sector entry")? + "\n";
file.write_all(line.as_bytes())?;
// Ensure durability
file.sync_data()
.context("Failed to fsync processed sectors file")?;
Ok(())
}
/// Loads checkpoint and replays log, returning set of existing keys
async fn load_checkpoint_and_replay(
checkpoint_path: &Path,
log_path: &Path,
key_field: &str,
) -> anyhow::Result<HashSet<String>> {
let mut keys = HashSet::new();
// Load checkpoint if it exists
if checkpoint_path.exists() {
let content = tokio_fs::read_to_string(checkpoint_path).await
.context("Failed to read checkpoint")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(key) = entry[key_field].as_str() {
keys.insert(key.to_string());
}
}
}
}
// Replay log if it exists
if log_path.exists() {
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
if let Some(key) = entry[key_field].as_str() {
keys.insert(key.to_string());
}
}
}
}
Ok(keys)
}
/// Loads checkpoint and replays log for nested structures (warrants/options)
async fn load_checkpoint_and_replay_nested(
checkpoint_path: &Path,
log_path: &Path,
) -> anyhow::Result<HashSet<String>> {
let mut keys = HashSet::new();
// Load checkpoint if it exists
if checkpoint_path.exists() {
let content = tokio_fs::read_to_string(checkpoint_path).await
.context("Failed to read checkpoint")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
let type_field = if entry.get("warrant_type").is_some() {
entry["warrant_type"].as_str().unwrap_or("")
} else {
entry["option_type"].as_str().unwrap_or("")
};
if !underlying.is_empty() && !type_field.is_empty() {
keys.insert(format!("{}::{}", underlying, type_field));
}
}
}
}
// Replay log if it exists
if log_path.exists() {
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log")?;
for line in content.lines() {
if line.trim().is_empty() || !line.ends_with('}') {
continue;
}
if let Ok(entry) = serde_json::from_str::<Value>(line) {
let underlying = entry["underlying_company_name"].as_str().unwrap_or("");
let type_field = if entry.get("warrant_type").is_some() {
entry["warrant_type"].as_str().unwrap_or("")
} else {
entry["option_type"].as_str().unwrap_or("")
};
if !underlying.is_empty() && !type_field.is_empty() {
keys.insert(format!("{}::{}", underlying, type_field));
}
}
}
}
Ok(keys)
}
/// Creates a checkpoint by copying log to checkpoint atomically
async fn create_checkpoint(checkpoint_path: &Path, log_path: &Path) -> anyhow::Result<()> {
if !log_path.exists() {
return Ok(());
}
// Read all committed lines from log
let content = tokio_fs::read_to_string(log_path).await
.context("Failed to read log for checkpoint")?;
let committed_lines: Vec<&str> = content
.lines()
.filter(|line| !line.trim().is_empty() && line.ends_with('}'))
.collect();
if committed_lines.is_empty() {
return Ok(());
}
// Write to temporary file
let tmp_path = checkpoint_path.with_extension("tmp");
let mut tmp_file = std::fs::File::create(&tmp_path)
.context("Failed to create temp checkpoint")?;
for line in committed_lines {
use std::io::Write;
writeln!(tmp_file, "{}", line)?;
}
// Ensure data is flushed to disk
tmp_file.sync_data()
.context("Failed to sync temp checkpoint")?;
drop(tmp_file);
// Atomic rename
tokio_fs::rename(&tmp_path, checkpoint_path).await
.context("Failed to rename checkpoint")?;
// Clear log after successful checkpoint
tokio_fs::remove_file(log_path).await
.context("Failed to remove log after checkpoint")?;
Ok(())
}
/// Streams through a lei_to_figi.jsonl file and processes entries in batches with fsync
async fn process_lei_figi_file_batched(
input_path: &Path,
common_log_path: &Path,
warrants_log_path: &Path,
options_log_path: &Path,
existing_companies: &mut HashSet<String>,
existing_warrants: &mut HashSet<String>,
existing_options: &mut HashSet<String>,
stats: &mut StreamingStats,
) -> anyhow::Result<()> {
let content = tokio_fs::read_to_string(input_path).await
.context("Failed to read lei_to_figi.jsonl")?;
let batch_size = 100;
let mut processed_count = 0;
let mut common_batch = Vec::new();
let mut warrants_batch = Vec::new();
let mut options_batch = Vec::new();
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() {
continue;
}
let entry: Value = serde_json::from_str(line)
.context(format!("Failed to parse JSON on line {}", line_num + 1))?;
let figis: Vec<FigiInfo> = serde_json::from_value(entry["figis"].clone())
.context("Invalid 'figis' field")?;
if figis.is_empty() {
continue;
}
// Group by security type
let (common_stocks, warrant_securities, option_securities) =
group_by_security_type(&figis);
// Collect entries for batching and update existing keys
if !common_stocks.is_empty() {
if let Some(entry) = prepare_common_stock_entry(&common_stocks, existing_companies) {
// Add to existing set immediately to prevent duplicates in same run
existing_companies.insert(entry.name.clone());
common_batch.push(entry);
}
}
if !warrant_securities.is_empty() {
for entry in prepare_warrant_entries(&warrant_securities, existing_warrants) {
// Add to existing set immediately
let key = format!("{}::{}", entry.underlying_company_name, entry.warrant_type);
existing_warrants.insert(key);
warrants_batch.push(entry);
}
}
if !option_securities.is_empty() {
for entry in prepare_option_entries(&option_securities, existing_options) {
// Add to existing set immediately
let key = format!("{}::{}", entry.underlying_company_name, entry.option_type);
existing_options.insert(key);
options_batch.push(entry);
}
}
// Write batches when they reach size limit
if common_batch.len() >= batch_size {
write_batch_with_fsync(common_log_path, &common_batch).await?;
stats.companies_added += common_batch.len();
common_batch.clear();
}
if warrants_batch.len() >= batch_size {
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
stats.warrants_added += warrants_batch.len();
warrants_batch.clear();
}
if options_batch.len() >= batch_size {
write_batch_with_fsync(options_log_path, &options_batch).await?;
stats.options_added += options_batch.len();
options_batch.clear();
}
processed_count += 1;
if processed_count % 1000 == 0 {
logger::log_info(&format!(" Processed {} LEI entries...", processed_count)).await;
}
}
// Write remaining batches
if !common_batch.is_empty() {
write_batch_with_fsync(common_log_path, &common_batch).await?;
stats.companies_added += common_batch.len();
}
if !warrants_batch.is_empty() {
write_batch_with_fsync(warrants_log_path, &warrants_batch).await?;
stats.warrants_added += warrants_batch.len();
}
if !options_batch.is_empty() {
write_batch_with_fsync(options_log_path, &options_batch).await?;
stats.options_added += options_batch.len();
}
Ok(())
}
/// Writes a batch of entries to log with fsync
async fn write_batch_with_fsync<T: serde::Serialize>(
log_path: &Path,
entries: &[T],
) -> anyhow::Result<()> {
use std::fs::OpenOptions;
use std::io::Write;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
.context("Failed to open log file")?;
for entry in entries {
let line = serde_json::to_string(entry)
.context("Failed to serialize entry")?;
writeln!(file, "{}", line)?;
}
// Critical: fsync to ensure durability
file.sync_data()
.context("Failed to fsync log file")?;
Ok(())
}
/// Prepares a common stock entry if it doesn't exist
fn prepare_common_stock_entry(
figi_infos: &[FigiInfo],
existing_keys: &HashSet<String>,
) -> Option<CompanyInfo> {
let name = figi_infos[0].name.clone();
if name.is_empty() || existing_keys.contains(&name) {
return None;
}
let grouped_by_isin = group_figis_by_isin(figi_infos);
let primary_isin = grouped_by_isin.keys().next().cloned().unwrap_or_default();
Some(CompanyInfo {
name,
primary_isin,
securities: grouped_by_isin,
})
}
/// Prepares warrant entries for batching
fn prepare_warrant_entries(
warrant_securities: &[FigiInfo],
existing_keys: &HashSet<String>,
) -> Vec<WarrantInfo> {
let mut entries = Vec::new();
for figi in warrant_securities {
let (underlying, issuer, warrant_type) = parse_warrant_name(&figi.name);
if underlying.is_empty() {
continue;
}
let key = format!("{}::{}", underlying, warrant_type);
if existing_keys.contains(&key) {
continue;
}
let warrant_info = WarrantInfo {
underlying_company_name: underlying.clone(),
issuer_company_name: issuer,
warrant_type: warrant_type.clone(),
warrants: {
let mut map = HashMap::new();
map.insert(figi.isin.clone(), vec![figi.clone()]);
map
},
};
entries.push(warrant_info);
}
entries
}
/// Prepares option entries for batching
fn prepare_option_entries(
option_securities: &[FigiInfo],
existing_keys: &HashSet<String>,
) -> Vec<OptionInfo> {
let mut entries = Vec::new();
for figi in option_securities {
let (underlying, issuer, option_type) = parse_option_name(&figi.name);
if underlying.is_empty() {
continue;
}
let key = format!("{}::{}", underlying, option_type);
if existing_keys.contains(&key) {
continue;
}
let option_info = OptionInfo {
underlying_company_name: underlying.clone(),
issuer_company_name: issuer,
option_type: option_type.clone(),
options: {
let mut map = HashMap::new();
map.insert(figi.isin.clone(), vec![figi.clone()]);
map
},
};
entries.push(option_info);
}
entries
}
/// Groups FigiInfo list by security type
fn group_by_security_type(figis: &[FigiInfo]) -> (Vec<FigiInfo>, Vec<FigiInfo>, Vec<FigiInfo>) {
let mut common_stocks = Vec::new();
let mut warrants = Vec::new();
let mut options = Vec::new();
for figi in figis {
match figi.security_type.as_str() {
"Common Stock" => common_stocks.push(figi.clone()),
"Equity WRT" => warrants.push(figi.clone()),
"Equity Option" => options.push(figi.clone()),
_ => {}
}
}
(common_stocks, warrants, options)
}
/// Groups FigiInfo by ISIN
fn group_figis_by_isin(figi_infos: &[FigiInfo]) -> HashMap<String, Vec<FigiInfo>> {
let mut grouped: HashMap<String, Vec<FigiInfo>> = HashMap::new(); let mut grouped: HashMap<String, Vec<FigiInfo>> = HashMap::new();
for figi_info in figi_infos { for figi_info in figi_infos {
@@ -375,65 +866,126 @@ fn group_by_isin(figi_infos: &[FigiInfo]) -> HashMap<String, Vec<FigiInfo>> {
grouped grouped
} }
fn merge_figi_list(existing: &[FigiInfo], new_figis: &[FigiInfo]) -> Vec<FigiInfo> { /// Parse warrant name to extract underlying company, issuer, and warrant type
let mut merged = existing.to_vec(); ///
let existing_figis: HashSet<String> = existing.iter() /// Examples:
.map(|f| f.figi.clone()) /// - "VONTOBE-PW26 LEONARDO SPA" -> ("LEONARDO SPA", Some("VONTOBEL"), "put")
.collect(); /// - "BAYER H-CW25 L'OREAL" -> ("L'OREAL", Some("BAYER H"), "call")
/// - "APPLE INC WARRANT" -> ("APPLE INC", None, "unknown")
fn parse_warrant_name(name: &str) -> (String, Option<String>, String) {
let name_upper = name.to_uppercase();
for new_figi in new_figis { // Try to detect warrant type from code (PW=put, CW=call)
if !existing_figis.contains(&new_figi.figi) { let warrant_type = if name_upper.contains("-PW") || name_upper.contains(" PW") {
merged.push(new_figi.clone()); "put".to_string()
} } else if name_upper.contains("-CW") || name_upper.contains(" CW") {
"call".to_string()
} else {
"unknown".to_string()
};
// Try to split by warrant code pattern (e.g., "-PW26", "-CW25")
if let Some(pos) = name.find("-PW") {
let before = name[..pos].trim();
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
let after = if after_idx < name.len() {
name[after_idx..].trim()
} else {
""
};
return (
after.to_string(),
if !before.is_empty() { Some(before.to_string()) } else { None },
warrant_type,
);
} }
merged.sort_by(|a, b| a.figi.cmp(&b.figi)); if let Some(pos) = name.find("-CW") {
merged let before = name[..pos].trim();
let after_idx = name[pos..].find(' ').map(|i| pos + i + 1).unwrap_or(name.len());
let after = if after_idx < name.len() {
name[after_idx..].trim()
} else {
""
};
return (
after.to_string(),
if !before.is_empty() { Some(before.to_string()) } else { None },
warrant_type,
);
}
// Fallback: return entire name as underlying
(name.to_string(), None, warrant_type)
} }
/// Parse option name to extract underlying company, issuer, and option type
///
/// Examples:
/// - "December 25 Calls on ALPHA GA" -> ("ALPHA GA", None, "call")
/// - "January 26 Puts on TESLA INC" -> ("TESLA INC", None, "put")
fn parse_option_name(name: &str) -> (String, Option<String>, String) {
let name_upper = name.to_uppercase();
// Detect option type
let option_type = if name_upper.contains("CALL") {
"call".to_string()
} else if name_upper.contains("PUT") {
"put".to_string()
} else {
"unknown".to_string()
};
// Try to extract underlying after "on"
if let Some(pos) = name_upper.find(" ON ") {
let underlying = name[pos + 4..].trim().to_string();
return (underlying, None, option_type);
}
// Fallback: return entire name
(name.to_string(), None, option_type)
}
/// Statistics tracker for streaming processing
#[derive(Debug)] #[derive(Debug)]
struct ProcessingStats { struct StreamingStats {
initial_companies: usize, initial_companies: usize,
initial_warrants: usize,
initial_options: usize,
companies_added: usize, companies_added: usize,
companies_updated: usize, warrants_added: usize,
options_added: usize,
} }
impl ProcessingStats { impl StreamingStats {
fn new(companies: usize, _warrants: usize, _options: usize) -> Self { fn new(companies: usize, warrants: usize, options: usize) -> Self {
Self { Self {
initial_companies: companies, initial_companies: companies,
initial_warrants: warrants,
initial_options: options,
companies_added: 0, companies_added: 0,
companies_updated: 0, warrants_added: 0,
options_added: 0,
} }
} }
}
async fn load_from_cache_if_exists<T>(path: &str) -> anyhow::Result<T>
where
T: serde::de::DeserializeOwned + Default,
{
let cache_file = Path::new(path);
if !cache_file.exists() { fn print_summary(&self) {
return Ok(T::default()); println!("\n=== Processing Statistics ===");
println!("Companies:");
println!(" - Initial: {}", self.initial_companies);
println!(" - Added: {}", self.companies_added);
println!(" - Total: {}", self.initial_companies + self.companies_added);
println!("Warrants:");
println!(" - Initial: {}", self.initial_warrants);
println!(" - Added: {}", self.warrants_added);
println!(" - Total: {}", self.initial_warrants + self.warrants_added);
println!("Options:");
println!(" - Initial: {}", self.initial_options);
println!(" - Added: {}", self.options_added);
println!(" - Total: {}", self.initial_options + self.options_added);
} }
let content = tokio_fs::read_to_string(cache_file).await?;
Ok(serde_json::from_str(&content)?)
}
async fn save_to_cache<T>(path: &str, data: &T) -> anyhow::Result<()>
where
T: serde::Serialize,
{
let cache_path = Path::new(path);
let cache_dir = cache_path.parent().context("Invalid path")?;
tokio_fs::create_dir_all(cache_dir).await?;
let json_str = serde_json::to_string_pretty(data)?;
tokio_fs::write(cache_path, json_str).await?;
Ok(())
} }
async fn load_market_sectors() -> anyhow::Result<Vec<String>> { async fn load_market_sectors() -> anyhow::Result<Vec<String>> {
@@ -771,57 +1323,6 @@ pub async fn get_mapping_stats(
}) })
} }
/// Print mapping statistics to console and logs
pub async fn print_mapping_stats(csv_path: &str) -> anyhow::Result<()> {
logger::log_info("=== LEI-FIGI Mapping Status ===").await;
let stats = get_mapping_stats(csv_path, None).await?;
logger::log_info(&format!(
"Total LEIs: {}",
stats.total_leis
)).await;
logger::log_info(&format!(
"├─ Mapped (with FIGI): {} ({:.2}%)",
stats.mapped_leis,
stats.mapping_percentage
)).await;
logger::log_info(&format!(
"├─ No Results (queried, no FIGI): {} ({:.2}%)",
stats.no_result_leis,
(stats.no_result_leis as f64 / stats.total_leis as f64) * 100.0
)).await;
logger::log_info(&format!(
"└─ Not Queried Yet: {} ({:.2}%)",
stats.unqueried_leis,
(stats.unqueried_leis as f64 / stats.total_leis as f64) * 100.0
)).await;
logger::log_info(&format!(
"\nQuery Coverage: {:.2}% ({} / {})",
stats.queried_percentage,
stats.mapped_leis + stats.no_result_leis,
stats.total_leis
)).await;
if !stats.by_sector.is_empty() {
logger::log_info("\nMapped LEIs by sector:").await;
let mut sectors: Vec<_> = stats.by_sector.iter().collect();
sectors.sort_by(|a, b| b.1.cmp(a.1)); // Sort by count descending
for (sector, count) in sectors {
logger::log_info(&format!(" {}: {}", sector, count)).await;
}
}
logger::log_info("==============================").await;
Ok(())
}
/// Quick check if mapping is complete (returns true if all mapped) /// Quick check if mapping is complete (returns true if all mapped)
pub async fn is_mapping_complete(csv_path: &str) -> anyhow::Result<bool> { pub async fn is_mapping_complete(csv_path: &str) -> anyhow::Result<bool> {
let dir = DataPaths::new(".")?; let dir = DataPaths::new(".")?;

View File

@@ -1,5 +1,5 @@
// src/corporate/update.rs - UPDATED WITH DATA INTEGRITY FIXES // src/corporate/update.rs - UPDATED WITH DATA INTEGRITY FIXES
use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*, yahoo::*}; use super::{scraper::*, storage::*, helpers::*, types::*, openfigi::*};
use crate::config::Config; use crate::config::Config;
use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel; use crate::corporate::update_parallel::build_companies_jsonl_streaming_parallel;
use crate::util::directories::DataPaths; use crate::util::directories::DataPaths;
@@ -11,7 +11,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
/// UPDATED: Main corporate update entry point with shutdown awareness /// Main corporate update entry point with shutdown awareness
pub async fn run_full_update( pub async fn run_full_update(
_config: &Config, _config: &Config,
pool: &Arc<ChromeDriverPool>, pool: &Arc<ChromeDriverPool>,
@@ -66,7 +66,7 @@ pub async fn run_full_update(
if let Some(date_dir) = date_dir { if let Some(date_dir) = date_dir {
logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await; logger::log_info(&format!(" Using FIGI data from: {:?}", date_dir)).await;
build_securities_from_figi_streaming(&date_dir).await?; load_or_build_all_securities(&date_dir).await?;
logger::log_info(" ✓ Securities map updated").await; logger::log_info(" ✓ Securities map updated").await;
} else { } else {
logger::log_warn(" ✗ No FIGI data directory found").await; logger::log_warn(" ✗ No FIGI data directory found").await;
@@ -78,11 +78,20 @@ pub async fn run_full_update(
} }
logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await; logger::log_info("Step 5: Building companies.jsonl with parallel processing and validation...").await;
let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag).await?; let count = build_companies_jsonl_streaming_parallel(&paths, pool, shutdown_flag, _config, &None).await?;
logger::log_info(&format!(" ✓ Saved {} companies", count)).await; logger::log_info(&format!(" ✓ Saved {} companies", count)).await;
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected after companies.jsonl build").await;
return Ok(());
}
logger::log_info("Step 6: Cleansing up companies with missing essential data...").await;
let cleansed_count = companies_yahoo_jsonl(&paths).await?;
logger::log_info(&format!("{} companies found on Yahoo ready for further use in companies_yahoo.jsonl", cleansed_count)).await;
if !shutdown_flag.load(Ordering::SeqCst) { if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Step 6: Processing events (using index)...").await; logger::log_info("Step 7: Processing events (using index)...").await;
let _event_index = build_event_index(&paths).await?; let _event_index = build_event_index(&paths).await?;
logger::log_info(" ✓ Event index built").await; logger::log_info(" ✓ Event index built").await;
} else { } else {
@@ -93,421 +102,142 @@ pub async fn run_full_update(
Ok(()) Ok(())
} }
/// UPDATED: Serial version with validation (kept for compatibility/debugging)
/// Cleansing function to remove companies with missing essential yahoo data for integrity
/// Has to contain a ticker with 'YAHOO:'; Entries with 'YAHOO:NO_RESULTS' and 'YAHOO:ERROR' are removed
/// The rest stays unchanged
/// ///
/// This is the non-parallel version that processes companies sequentially. /// Uses state.jsonl to track completion and avoid re-running the cleansing operation
/// Updated with same validation and shutdown checks as parallel version. /// The '.jsonl' will be saved in the same path but 'companies_yahoo.jsonl'
/// /// Only execute when 'companies.jsonl' is present
/// Use this for: pub async fn companies_yahoo_jsonl(paths: &DataPaths) -> anyhow::Result<usize> {
/// - Debugging issues with specific companies use tokio::fs::File;
/// - Environments where parallel processing isn't desired use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
/// - Testing validation logic without concurrency complexity use serde_json::json;
async fn build_companies_jsonl_streaming_serial(
paths: &DataPaths, let data_path = paths.data_dir();
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<usize> {
// Configuration constants
const CHECKPOINT_INTERVAL: usize = 50;
const FSYNC_BATCH_SIZE: usize = 10;
const FSYNC_INTERVAL_SECS: u64 = 10;
let path = DataPaths::new(".")?; let input_path = data_path.join("companies.jsonl");
let corporate_path = path.data_dir().join("corporate").join("by_name"); let output_path = data_path.join("companies_yahoo.jsonl");
let securities_path = corporate_path.join("common_stocks.json"); let state_path = data_path.join("state.jsonl");
if !securities_path.exists() { // Check if input file exists
logger::log_warn("No common_stocks.json found").await; if !input_path.exists() {
logger::log_warn("companies.jsonl not found, skipping cleansing").await;
return Ok(0); return Ok(0);
} }
let content = tokio::fs::read_to_string(securities_path).await?; // Check if state file exists and cleansing was already completed
let securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?; if state_path.exists() {
let state_content = tokio::fs::read_to_string(&state_path).await?;
let companies_path = paths.data_dir().join("companies.jsonl");
let log_path = paths.data_dir().join("companies_updates.log");
if let Some(parent) = companies_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
// === RECOVERY PHASE: Load checkpoint + replay log ===
let mut existing_companies: HashMap<String, CompanyCrossPlatformInfo> = HashMap::new();
let mut processed_names: std::collections::HashSet<String> = std::collections::HashSet::new();
if companies_path.exists() {
logger::log_info("Loading checkpoint from companies.jsonl...").await;
let existing_content = tokio::fs::read_to_string(&companies_path).await?;
for line in existing_content.lines() { for line in state_content.lines() {
if line.trim().is_empty() { if line.trim().is_empty() {
continue; continue;
} }
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) { if let Ok(state) = serde_json::from_str::<serde_json::Value>(line) {
Ok(company) => { if state.get("yahoo_companies").and_then(|v| v.as_bool()).unwrap_or(false) {
processed_names.insert(company.name.clone()); logger::log_info(" Yahoo companies cleansing already completed, reading existing file...").await;
existing_companies.insert(company.name.clone(), company);
} // Count lines in existing output file
Err(e) => { if output_path.exists() {
logger::log_warn(&format!("Skipping invalid checkpoint line: {}", e)).await; let output_content = tokio::fs::read_to_string(&output_path).await?;
let count = output_content.lines()
.filter(|line| !line.trim().is_empty())
.count();
logger::log_info(&format!(" ✓ Found {} companies in companies_yahoo.jsonl", count)).await;
return Ok(count);
} else {
logger::log_warn(" State indicates completion but companies_yahoo.jsonl not found, re-running...").await;
break;
}
} }
} }
} }
logger::log_info(&format!("Loaded checkpoint with {} companies", existing_companies.len())).await;
}
if log_path.exists() {
logger::log_info("Replaying update log...").await;
let log_content = tokio::fs::read_to_string(&log_path).await?;
let mut replayed = 0;
for line in log_content.lines() {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
Ok(company) => {
processed_names.insert(company.name.clone());
existing_companies.insert(company.name.clone(), company);
replayed += 1;
}
Err(e) => {
logger::log_warn(&format!("Skipping invalid log line: {}", e)).await;
}
}
}
if replayed > 0 {
logger::log_info(&format!("Replayed {} updates from log", replayed)).await;
}
} }
// === OPEN LOG FILE === logger::log_info(&format!(" Reading from: {:?}", input_path)).await;
use tokio::fs::OpenOptions; logger::log_info(&format!(" Writing to: {:?}", output_path)).await;
use tokio::io::AsyncWriteExt;
let mut log_file = OpenOptions::new() let file = File::open(&input_path).await?;
.create(true) let reader = BufReader::new(file);
.append(true) let mut lines = reader.lines();
.open(&log_path)
.await?;
let mut writes_since_fsync = 0; let mut output_file = File::create(&output_path).await?;
let mut last_fsync = std::time::Instant::now(); let mut valid_count = 0;
let mut updates_since_checkpoint = 0; let mut removed_count = 0;
let mut count = 0; let mut total_count = 0;
let mut new_count = 0;
let mut updated_count = 0;
logger::log_info(&format!("Processing {} companies sequentially...", securities.len())).await; while let Some(line) = lines.next_line().await? {
if line.trim().is_empty() {
// === PROCESS COMPANIES SEQUENTIALLY === continue;
for (name, company_info) in securities.clone() {
// Check shutdown before each company
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn(&format!(
"Shutdown detected at company: {} (progress: {}/{})",
name, count, count + securities.len()
)).await;
break;
} }
let existing_entry = existing_companies.get(&name).cloned(); total_count += 1;
let is_update = existing_entry.is_some();
// Process company with validation let company: CompanyCrossPlatformInfo = match serde_json::from_str(&line) {
match process_single_company_serial( Ok(c) => c,
name.clone(),
company_info,
existing_entry,
pool,
shutdown_flag,
).await {
Ok(Some(company_entry)) => {
// Write to log
let line = serde_json::to_string(&company_entry)?;
log_file.write_all(line.as_bytes()).await?;
log_file.write_all(b"\n").await?;
writes_since_fsync += 1;
// Batched + time-based fsync
let should_fsync = writes_since_fsync >= FSYNC_BATCH_SIZE
|| last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS;
if should_fsync {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
// Update in-memory state
processed_names.insert(name.clone());
existing_companies.insert(name.clone(), company_entry);
count += 1;
updates_since_checkpoint += 1;
if is_update {
updated_count += 1;
} else {
new_count += 1;
}
// Periodic checkpoint
if updates_since_checkpoint >= CHECKPOINT_INTERVAL {
if writes_since_fsync > 0 {
log_file.flush().await?;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
}
logger::log_info(&format!("Creating checkpoint at {} companies...", count)).await;
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
for company in existing_companies.values() {
let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
}
checkpoint_file.flush().await?;
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.await?;
updates_since_checkpoint = 0;
logger::log_info("✓ Checkpoint created and log cleared").await;
}
if count % 10 == 0 {
logger::log_info(&format!(
"Progress: {} companies ({} new, {} updated)",
count, new_count, updated_count
)).await;
}
}
Ok(None) => {
// Company had no ISINs or was skipped
logger::log_info(&format!("Skipped company: {} (no ISINs)", name)).await;
}
Err(e) => { Err(e) => {
logger::log_warn(&format!("Error processing company {}: {}", name, e)).await; logger::log_warn(&format!(" Failed to parse company on line {}: {}", total_count, e)).await;
continue;
}
};
// Check if company has at least one valid YAHOO ticker
// Valid means: starts with "YAHOO:" but is NOT "YAHOO:NO_RESULTS" or "YAHOO:ERROR"
let has_valid_yahoo = company.isin_tickers_map
.values()
.flatten()
.any(|ticker| {
ticker.starts_with("YAHOO:")
&& ticker != "YAHOO:NO_RESULTS"
&& ticker != "YAHOO:ERROR"
});
if has_valid_yahoo {
// Write the company to the filtered output
let json_line = serde_json::to_string(&company)?;
output_file.write_all(json_line.as_bytes()).await?;
output_file.write_all(b"\n").await?;
valid_count += 1;
} else {
removed_count += 1;
if removed_count <= 5 {
// Log first few removals for debugging
logger::log_info(&format!(" Removed company '{}' (no valid Yahoo ticker)", company.name)).await;
} }
} }
// Time-based fsync // Progress indicator for large files
if writes_since_fsync > 0 && last_fsync.elapsed().as_secs() >= FSYNC_INTERVAL_SECS { if total_count % 1000 == 0 {
log_file.flush().await?; logger::log_info(&format!(" Processed {} companies...", total_count)).await;
log_file.sync_data().await?;
writes_since_fsync = 0;
last_fsync = std::time::Instant::now();
} }
} }
// === FSYNC PENDING WRITES === output_file.flush().await?;
if writes_since_fsync > 0 {
logger::log_info(&format!("Fsyncing {} pending writes...", writes_since_fsync)).await;
log_file.flush().await?;
log_file.sync_data().await?;
logger::log_info("✓ Pending writes saved").await;
}
// === FINAL CHECKPOINT ===
if !shutdown_flag.load(Ordering::SeqCst) && updates_since_checkpoint > 0 {
logger::log_info("Creating final checkpoint...").await;
let checkpoint_tmp = companies_path.with_extension("jsonl.tmp");
let mut checkpoint_file = tokio::fs::File::create(&checkpoint_tmp).await?;
for company in existing_companies.values() {
let line = serde_json::to_string(company)?;
checkpoint_file.write_all(line.as_bytes()).await?;
checkpoint_file.write_all(b"\n").await?;
}
checkpoint_file.flush().await?;
checkpoint_file.sync_all().await?;
drop(checkpoint_file);
tokio::fs::rename(&checkpoint_tmp, &companies_path).await?;
drop(log_file);
tokio::fs::remove_file(&log_path).await.ok();
logger::log_info("✓ Final checkpoint created").await;
}
logger::log_info(&format!( logger::log_info(&format!(
"Completed: {} total companies ({} new, {} updated)", " ✓ Cleansing complete: {} total {} valid, {} removed",
count, new_count, updated_count total_count, valid_count, removed_count
)).await; )).await;
Ok(count) // Write state file to mark completion
} let yahoo_companies = json!({
"yahoo_companies": true,
/// UPDATED: Process single company serially with validation "completed_at": chrono::Utc::now().to_rfc3339(),
async fn process_single_company_serial( });
name: String,
company_info: CompanyInfo,
existing_entry: Option<CompanyCrossPlatformInfo>,
pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Option<CompanyCrossPlatformInfo>> {
// Check shutdown at start
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
let mut isin_tickers_map: HashMap<String, Vec<String>> = let mut state_file = File::create(&state_path).await?;
existing_entry let state_line = serde_json::to_string(&yahoo_companies)?;
.as_ref() state_file.write_all(state_line.as_bytes()).await?;
.map(|e| e.isin_tickers_map.clone()) state_file.write_all(b"\n").await?;
.unwrap_or_default(); state_file.flush().await?;
let mut sector = existing_entry.as_ref().and_then(|e| e.sector.clone()); logger::log_info(&format!(" ✓ State file created at: {:?}", state_path)).await;
let mut exchange = existing_entry.as_ref().and_then(|e| e.exchange.clone());
// Collect unique ISIN-ticker pairs Ok(valid_count)
let mut unique_isin_ticker_pairs: HashMap<String, Vec<String>> = HashMap::new();
for figi_infos in company_info.securities.values() {
for figi_info in figi_infos {
if !figi_info.isin.is_empty() {
let tickers = unique_isin_ticker_pairs
.entry(figi_info.isin.clone())
.or_insert_with(Vec::new);
if !figi_info.ticker.is_empty() && !tickers.contains(&figi_info.ticker) {
tickers.push(figi_info.ticker.clone());
}
}
}
}
// Process each ISIN with validation
for (isin, figi_tickers) in unique_isin_ticker_pairs {
// Check shutdown before each ISIN
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
let tickers = isin_tickers_map
.entry(isin.clone())
.or_insert_with(Vec::new);
for figi_ticker in figi_tickers {
if !tickers.contains(&figi_ticker) {
tickers.push(figi_ticker);
}
}
let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:"));
if !has_yahoo_ticker {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
// Use validated scraping with retry
match scrape_with_retry_serial(pool, &isin, 3, shutdown_flag).await {
Ok(Some(details)) => {
logger::log_info(&format!(
"✓ Found Yahoo ticker {} for ISIN {} (company: {})",
details.ticker, isin, name
)).await;
tickers.push(format!("YAHOO:{}", details.ticker));
if sector.is_none() && details.sector.is_some() {
sector = details.sector.clone();
}
if exchange.is_none() && details.exchange.is_some() {
exchange = details.exchange.clone();
}
},
Ok(None) => {
logger::log_warn(&format!("◯ No search results for ISIN {} (company: {})", isin, name)).await;
tickers.push("YAHOO:NO_RESULTS".to_string());
},
Err(e) => {
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
logger::log_warn(&format!(
"✗ Yahoo lookup error for ISIN {} (company: {}): {}",
isin, name, e
)).await;
}
}
}
}
// Final shutdown check
if shutdown_flag.load(Ordering::SeqCst) {
return Ok(None);
}
if !isin_tickers_map.is_empty() {
Ok(Some(CompanyCrossPlatformInfo {
name,
isin_tickers_map,
sector,
exchange,
}))
} else {
Ok(None)
}
}
/// UPDATED: Scrape with retry for serial processing
async fn scrape_with_retry_serial(
pool: &Arc<ChromeDriverPool>,
isin: &str,
max_retries: u32,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Option<YahooCompanyDetails>> {
let mut retries = 0;
loop {
if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow::anyhow!("Aborted due to shutdown"));
}
match scrape_company_details_by_isin(pool, isin, shutdown_flag).await {
Ok(result) => return Ok(result),
Err(e) => {
if retries >= max_retries {
return Err(e);
}
let backoff_ms = 1000 * 2u64.pow(retries);
let jitter_ms = random_range(0, 500);
let total_delay = backoff_ms + jitter_ms;
logger::log_warn(&format!(
"Retry {}/{} for ISIN {} after {}ms: {}",
retries + 1, max_retries, isin, total_delay, e
)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(total_delay)).await;
retries += 1;
}
}
}
} }
async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> { async fn find_most_recent_figi_date_dir(paths: &DataPaths) -> anyhow::Result<Option<std::path::PathBuf>> {

View File

@@ -1,18 +1,15 @@
// src/corporate/update_parallel.rs - UPDATED WITH DATA INTEGRITY FIXES // src/corporate/update_parallel.rs - PROPERLY FIXED: Correct pending queue rebuild
// PARALLELIZED VERSION with atomic commits and validation
// //
// Key improvements over original: // Critical fix: After hard reset, only skip companies with COMPLETE Yahoo data
// - Page validation to prevent stale content extraction // Not just companies that have been written
// - Shutdown-aware task processing
// - Better error recovery with browser state cleanup
// - All original fsync and checkpoint logic preserved
use super::{types::*, yahoo::*, helpers::*}; use super::{types::*, yahoo::*, helpers::*};
use crate::util::directories::DataPaths; use crate::util::directories::DataPaths;
use crate::util::logger; use crate::util::logger;
use crate::scraper::webdriver::ChromeDriverPool; use crate::scraper::webdriver::ChromeDriverPool;
use crate::scraper::hard_reset::perform_hard_reset;
use crate::config::Config;
use rand::Rng;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::fs::OpenOptions; use tokio::fs::OpenOptions;
@@ -22,7 +19,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration; use std::time::Duration;
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Result};
/// Represents a write command to be serialized through the log writer /// Represents a write command to be serialized through the log writer
enum LogCommand { enum LogCommand {
@@ -37,17 +34,60 @@ struct CompanyProcessResult {
is_update: bool, is_update: bool,
} }
/// UPDATED: Abort-safe incremental JSONL persistence with validation /// Check if a company needs Yahoo data processing
/// /// Returns true if company has incomplete data (needs processing)
/// New safety features: fn company_needs_processing(
/// - Page validation before extraction company_name: &str,
/// - Shutdown checks at all critical points company_info: &CompanyInfo,
/// - Browser state cleanup on errors existing_companies: &HashMap<String, CompanyCrossPlatformInfo>,
/// - All writes still atomic with fsync ) -> bool {
// If company not in existing data at all, definitely needs processing
let Some(existing_entry) = existing_companies.get(company_name) else {
return true;
};
// Collect all ISINs this company should have
let mut required_isins = std::collections::HashSet::new();
for figi_infos in company_info.securities.values() {
for figi_info in figi_infos {
if !figi_info.isin.is_empty() {
required_isins.insert(figi_info.isin.clone());
}
}
}
// Check each required ISIN
for isin in required_isins {
// Check if this ISIN exists in the company's ticker map
if let Some(tickers) = existing_entry.isin_tickers_map.get(&isin) {
// Check if this ISIN has valid Yahoo data
let has_valid_yahoo = tickers.iter().any(|t| {
t.starts_with("YAHOO:") &&
t != "YAHOO:ERROR" //&& // Error marker means needs retry
//t != "YAHOO:NO_RESULTS" // This is actually valid (legitimately not found)
});
// If no valid Yahoo data for this ISIN, company needs processing
if !has_valid_yahoo {
return true;
}
} else {
// ISIN not in map at all, needs processing
return true;
}
}
// All ISINs have valid Yahoo data, skip this company
false
}
/// Abort-safe incremental JSONL persistence with proper hard reset handling
pub async fn build_companies_jsonl_streaming_parallel( pub async fn build_companies_jsonl_streaming_parallel(
paths: &DataPaths, paths: &DataPaths,
pool: &Arc<ChromeDriverPool>, pool: &Arc<ChromeDriverPool>,
shutdown_flag: &Arc<AtomicBool>, shutdown_flag: &Arc<AtomicBool>,
config: &Config,
monitoring: &Option<crate::monitoring::MonitoringHandle>,
) -> anyhow::Result<usize> { ) -> anyhow::Result<usize> {
// Configuration constants // Configuration constants
const CHECKPOINT_INTERVAL: usize = 50; const CHECKPOINT_INTERVAL: usize = 50;
@@ -55,17 +95,26 @@ pub async fn build_companies_jsonl_streaming_parallel(
const FSYNC_INTERVAL_SECS: u64 = 10; const FSYNC_INTERVAL_SECS: u64 = 10;
const CONCURRENCY_LIMIT: usize = 100; const CONCURRENCY_LIMIT: usize = 100;
// Wrap pool in mutex for potential replacement
let pool_mutex = Arc::new(tokio::sync::Mutex::new(Arc::clone(pool)));
// Synchronization for hard reset
let reset_in_progress = Arc::new(tokio::sync::Mutex::new(false));
let path = DataPaths::new(".")?; let path = DataPaths::new(".")?;
let corporate_path = path.data_dir().join("corporate").join("by_name"); let corporate_path = path.data_dir().join("corporate").join("by_name");
let securities_path = corporate_path.join("common_stocks.json"); let securities_checkpoint = corporate_path.join("common_stocks.jsonl");
let securities_log = corporate_path.join("common_stocks.log.jsonl");
if !securities_path.exists() { if !securities_checkpoint.exists() {
logger::log_warn("No common_stocks.json found").await; logger::log_warn("No common_stocks.jsonl found").await;
return Ok(0); return Ok(0);
} }
let content = tokio::fs::read_to_string(securities_path).await?; // Load securities from checkpoint and replay log
let securities: HashMap<String, CompanyInfo> = serde_json::from_str(&content)?; logger::log_info("Loading common stocks from JSONL checkpoint and log...").await;
let securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?;
logger::log_info(&format!("Loaded {} companies from common stocks", securities.len())).await;
let companies_path = paths.data_dir().join("companies.jsonl"); let companies_path = paths.data_dir().join("companies.jsonl");
let log_path = paths.data_dir().join("companies_updates.log"); let log_path = paths.data_dir().join("companies_updates.log");
@@ -83,8 +132,8 @@ pub async fn build_companies_jsonl_streaming_parallel(
let existing_content = tokio::fs::read_to_string(&companies_path).await?; let existing_content = tokio::fs::read_to_string(&companies_path).await?;
for line in existing_content.lines() { for line in existing_content.lines() {
if line.trim().is_empty() { if line.trim().is_empty() || !line.ends_with('}') {
continue; continue; // Skip incomplete lines
} }
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) { match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
@@ -106,8 +155,8 @@ pub async fn build_companies_jsonl_streaming_parallel(
let mut replayed = 0; let mut replayed = 0;
for line in log_content.lines() { for line in log_content.lines() {
if line.trim().is_empty() { if line.trim().is_empty() || !line.ends_with('}') {
continue; continue; // Skip incomplete lines
} }
match serde_json::from_str::<CompanyCrossPlatformInfo>(line) { match serde_json::from_str::<CompanyCrossPlatformInfo>(line) {
@@ -139,8 +188,10 @@ pub async fn build_companies_jsonl_streaming_parallel(
let log_path_clone = log_path.clone(); let log_path_clone = log_path.clone();
let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone())); let existing_companies_writer = Arc::new(tokio::sync::Mutex::new(existing_companies.clone()));
// Clone the Arc for the writer task (Arc clone is cheap, just increments ref count)
let existing_companies_writer_for_task = Arc::clone(&existing_companies_writer);
let write_tx_for_writer = write_tx.clone(); let write_tx_for_writer = write_tx.clone();
let writer_task = tokio::spawn(async move { let writer_task = tokio::spawn(async move {
let mut log_file = log_file_init; let mut log_file = log_file_init;
let mut writes_since_fsync = 0; let mut writes_since_fsync = 0;
@@ -169,7 +220,7 @@ pub async fn build_companies_jsonl_streaming_parallel(
count += 1; count += 1;
// Update in-memory state // Update in-memory state
let mut existing_companies = existing_companies_writer.lock().await; let mut existing_companies = existing_companies_writer_for_task.lock().await;
let is_update = existing_companies.contains_key(&company.name); let is_update = existing_companies.contains_key(&company.name);
existing_companies.insert(company.name.clone(), company); existing_companies.insert(company.name.clone(), company);
drop(existing_companies); drop(existing_companies);
@@ -207,7 +258,7 @@ pub async fn build_companies_jsonl_streaming_parallel(
break; break;
} }
let existing_companies = existing_companies_writer.lock().await; let existing_companies = existing_companies_writer_for_task.lock().await;
let companies_vec: Vec<_> = existing_companies.values().cloned().collect(); let companies_vec: Vec<_> = existing_companies.values().cloned().collect();
drop(existing_companies); drop(existing_companies);
@@ -279,112 +330,322 @@ pub async fn build_companies_jsonl_streaming_parallel(
(count, new_count, updated_count) (count, new_count, updated_count)
}); });
// === PARALLEL PROCESSING PHASE === // === MAIN PROCESSING LOOP ===
let total = securities.len();
logger::log_info(&format!("Processing {} companies with concurrency limit {}", total, CONCURRENCY_LIMIT)).await;
let mut tasks = FuturesUnordered::new();
// Build initial pending list with proper filtering
let mut pending: Vec<(String, CompanyInfo)> = securities.iter()
.filter(|(name, info)| company_needs_processing(name, info, &existing_companies))
.map(|(name, info)| (name.clone(), info.clone()))
.collect();
logger::log_info(&format!( logger::log_info(&format!(
"Starting parallel processing of {} companies (concurrency limit: {})", "Initial scan: {} companies need processing ({} already complete)",
securities.len(), pending.len(),
CONCURRENCY_LIMIT total - pending.len()
)).await; )).await;
let mut processing_tasks = FuturesUnordered::new();
let mut processed = 0; let mut processed = 0;
let total = securities.len(); let mut hard_reset_count = 0;
for (name, company_info) in securities.into_iter() { // Spawn initial batch
// Check shutdown before creating new tasks for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) {
if shutdown_flag.load(Ordering::SeqCst) { if let Some((name, company_info)) = pending.pop() {
logger::log_warn("Shutdown detected, stopping task creation").await; let current_pool = {
break; let pool_guard = pool_mutex.lock().await;
} Arc::clone(&*pool_guard)
};
// Wait if we hit concurrency limit
while processing_tasks.len() >= CONCURRENCY_LIMIT {
if let Some(result) = processing_tasks.next().await {
match result {
Ok(Ok(Some(company_result))) => {
let company_result: CompanyProcessResult = company_result;
let _ = write_tx_for_writer.send(LogCommand::Write(company_result.company)).await?;
processed += 1;
}
Ok(Ok(None)) => {
processed += 1;
}
Ok(Err(e)) => {
logger::log_warn(&format!("Company processing error: {}", e)).await;
processed += 1;
}
Err(e) => {
logger::log_error(&format!("Task panic: {}", e)).await;
processed += 1;
}
}
}
if shutdown_flag.load(Ordering::SeqCst) { let existing = existing_companies.get(&name).cloned();
break; let shutdown_flag_clone = Arc::clone(shutdown_flag);
}
} let task = tokio::spawn(async move {
process_single_company_validated(
if shutdown_flag.load(Ordering::SeqCst) { name,
break; company_info,
} existing,
&current_pool,
// Spawn new task &shutdown_flag_clone,
let pool = pool.clone(); ).await
let shutdown_flag = shutdown_flag.clone(); });
let existing_entry = existing_companies.get(&name).cloned();
tasks.push(task);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing_entry,
&pool,
&shutdown_flag
).await
});
processing_tasks.push(task);
if processed % 10 == 0 && processed > 0 {
logger::log_info(&format!("Progress: {}/{} companies processed", processed, total)).await;
} }
} }
// Wait for remaining tasks // Process results and spawn new tasks
logger::log_info(&format!( while let Some(task_result) = tasks.next().await {
"Waiting for {} remaining tasks to complete...", // Check for shutdown
processing_tasks.len()
)).await;
while let Some(result) = processing_tasks.next().await {
if shutdown_flag.load(Ordering::SeqCst) { if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown detected during final task wait").await; logger::log_warn("Shutdown signal received, stopping processing").await;
break; break;
} }
match result { match task_result {
Ok(Ok(Some(company_result))) => { Ok(Ok(Some(result))) => {
if write_tx_for_writer.send(LogCommand::Write(company_result.company)).await.is_err() { // Success: send to writer
logger::log_error("Writer task died").await; let _ = write_tx_for_writer.send(LogCommand::Write(result.company)).await;
break;
}
processed += 1; processed += 1;
// Log progress every 100 companies
if processed % 100 == 0 {
logger::log_info(&format!(
"Progress: {}/{} companies processed ({} resets)",
processed,
total,
hard_reset_count
)).await;
}
// Spawn next task if available
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
} }
Ok(Ok(None)) => { Ok(Ok(None)) => {
// No result (shutdown or skip)
processed += 1; processed += 1;
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
} }
Ok(Err(e)) => { Ok(Err(e)) => {
logger::log_warn(&format!("Company processing error: {}", e)).await; let error_msg = e.to_string();
processed += 1;
if error_msg.contains("HARD_RESET_REQUIRED") {
// Check if reset already in progress (race condition protection)
let mut reset_lock = reset_in_progress.lock().await;
if *reset_lock {
logger::log_info("Hard reset already in progress, skipping duplicate").await;
processed += 1;
continue;
}
*reset_lock = true;
drop(reset_lock); // Release lock during reset
logger::log_error("🔴 HARD RESET THRESHOLD REACHED - INITIATING RESET SEQUENCE").await;
logger::log_warn("Draining active tasks before hard reset...").await;
// Save remaining pending count
let remaining_count = pending.len();
// Stop spawning new tasks
pending.clear();
// Wait for all active tasks to complete
let mut drained = 0;
while let Some(_) = tasks.next().await {
drained += 1;
if drained % 10 == 0 {
logger::log_info(&format!("Drained {} tasks...", drained)).await;
}
}
logger::log_info(&format!(
"All tasks drained ({} active). {} companies need reprocessing.",
drained,
remaining_count
)).await;
// Perform the actual hard reset
match perform_hard_reset(&pool_mutex, config, paths, monitoring, shutdown_flag).await {
Ok(()) => {
logger::log_info("✅ Hard reset completed successfully").await;
hard_reset_count += 1;
// Reset the error counter
{
let pool_guard = pool_mutex.lock().await;
let current_pool = Arc::clone(&*pool_guard);
current_pool.get_reset_controller().reset();
}
logger::log_info("✓ Error counter cleared").await;
// Rebuild pending list by checking which companies need processing
logger::log_info("Rebuilding pending queue with proper Yahoo data checks...").await;
// Get current state of written companies
let current_existing = {
let companies = existing_companies_writer.lock().await;
companies.clone()
};
// Reload all securities from disk (checkpoint + log)
logger::log_info("Reloading securities from JSONL...").await;
let all_securities = load_securities_from_jsonl(&securities_checkpoint, &securities_log).await?;
logger::log_info(&format!("Reloaded {} companies", all_securities.len())).await;
// Build pending list: only companies that need processing
pending = all_securities.iter()
.filter(|(name, info)| company_needs_processing(name, info, &current_existing))
.map(|(name, info)| (name.clone(), info.clone()))
.collect();
logger::log_info(&format!(
"Restarting with {} remaining companies (out of {} total)",
pending.len(),
total
)).await;
// Only continue if there's work to do
if pending.is_empty() {
logger::log_info("All companies have complete data, exiting").await;
// Clear reset flag
let mut reset_lock = reset_in_progress.lock().await;
*reset_lock = false;
drop(reset_lock);
break; // Exit main loop
}
// Respawn initial batch with NEW pool
for _ in 0..CONCURRENCY_LIMIT.min(pending.len()) {
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
}
// Clear reset flag
let mut reset_lock = reset_in_progress.lock().await;
*reset_lock = false;
drop(reset_lock);
// ✅ Continue processing (don't spawn duplicate task)
continue;
}
Err(reset_err) => {
logger::log_error(&format!("Hard reset failed: {}", reset_err)).await;
// Clear reset flag
let mut reset_lock = reset_in_progress.lock().await;
*reset_lock = false;
drop(reset_lock);
// Exit if hard reset fails
break;
}
}
} else {
// Regular error
logger::log_warn(&format!("Company processing error: {}", error_msg)).await;
processed += 1;
// Spawn next task
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
}
} }
Err(e) => { Err(e) => {
// Task panic
logger::log_error(&format!("Task panic: {}", e)).await; logger::log_error(&format!("Task panic: {}", e)).await;
processed += 1; processed += 1;
// Spawn next task
if let Some((name, company_info)) = pending.pop() {
let current_pool = {
let pool_guard = pool_mutex.lock().await;
Arc::clone(&*pool_guard)
};
let existing = existing_companies.get(&name).cloned();
let shutdown_flag_clone = Arc::clone(shutdown_flag);
let task = tokio::spawn(async move {
process_single_company_validated(
name,
company_info,
existing,
&current_pool,
&shutdown_flag_clone,
).await
});
tasks.push(task);
}
} }
} }
} }
logger::log_info("Main processing loop completed").await;
// Signal writer to finish // Signal writer to finish
let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await; let _ = write_tx_for_writer.send(LogCommand::Checkpoint).await;
let _ = write_tx_for_writer.send(LogCommand::Shutdown).await; let _ = write_tx_for_writer.send(LogCommand::Shutdown).await;
@@ -395,13 +656,69 @@ pub async fn build_companies_jsonl_streaming_parallel(
.unwrap_or((0, 0, 0)); .unwrap_or((0, 0, 0));
logger::log_info(&format!( logger::log_info(&format!(
"Completed: {} total companies ({} new, {} updated)", "Completed: {} total companies ({} new, {} updated, {} hard resets)",
final_count, final_new, final_updated final_count, final_new, final_updated, hard_reset_count
)).await; )).await;
Ok(final_count) Ok(final_count)
} }
/// Loads CompanyInfo securities from checkpoint and log JSONL files
async fn load_securities_from_jsonl(
checkpoint_path: &std::path::Path,
log_path: &std::path::Path,
) -> anyhow::Result<HashMap<String, CompanyInfo>> {
let mut securities: HashMap<String, CompanyInfo> = HashMap::new();
// Load checkpoint
if checkpoint_path.exists() {
let content = tokio::fs::read_to_string(checkpoint_path).await?;
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
match serde_json::from_str::<CompanyInfo>(line) {
Ok(company_info) => {
securities.insert(company_info.name.clone(), company_info);
}
Err(e) => {
logger::log_warn(&format!(
"Skipping invalid line {} in checkpoint: {}",
line_num + 1, e
)).await;
}
}
}
}
// Replay log (overwrites checkpoint entries if they exist)
if log_path.exists() {
let content = tokio::fs::read_to_string(log_path).await?;
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() || !line.ends_with('}') {
continue; // Skip incomplete lines
}
match serde_json::from_str::<CompanyInfo>(line) {
Ok(company_info) => {
securities.insert(company_info.name.clone(), company_info);
}
Err(e) => {
logger::log_warn(&format!(
"Skipping invalid line {} in log: {}",
line_num + 1, e
)).await;
}
}
}
}
Ok(securities)
}
/// Scrape with retry, validation, and shutdown awareness /// Scrape with retry, validation, and shutdown awareness
async fn scrape_with_retry( async fn scrape_with_retry(
pool: &Arc<ChromeDriverPool>, pool: &Arc<ChromeDriverPool>,
@@ -416,10 +733,25 @@ async fn scrape_with_retry(
if shutdown_flag.load(Ordering::SeqCst) { if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow!("Aborted due to shutdown")); return Err(anyhow!("Aborted due to shutdown"));
} }
if pool.should_perform_hard_reset() {
logger::log_error("HARD_RESET_REQUIRED detected before scrape attempt").await;
return Err(anyhow!("HARD_RESET_REQUIRED"));
}
match scrape_company_details_by_isin(pool, isin, shutdown_flag).await { match scrape_company_details_by_isin(pool, isin, shutdown_flag).await {
Ok(result) => return Ok(result), Ok(result) => return Ok(result),
Err(e) => { Err(e) => {
// Check if this is a hard reset required error
let error_msg = e.to_string();
if error_msg.contains("HARD_RESET_REQUIRED") {
logger::log_error(&format!(
"Hard reset required error for ISIN {}, propagating immediately",
isin
)).await;
return Err(e); // Propagate immediately, don't retry
}
if retries >= max_retries { if retries >= max_retries {
logger::log_error(&format!( logger::log_error(&format!(
"All {} retries exhausted for ISIN {}: {}", "All {} retries exhausted for ISIN {}: {}",
@@ -444,7 +776,7 @@ async fn scrape_with_retry(
} }
} }
/// UPDATED: Process single company with validation and shutdown checks /// Process single company with validation and shutdown checks
async fn process_single_company_validated( async fn process_single_company_validated(
name: String, name: String,
company_info: CompanyInfo, company_info: CompanyInfo,
@@ -486,7 +818,7 @@ async fn process_single_company_validated(
} }
} }
// Process each ISIN with validation // Process each ISIN independently with per-ISIN status checking
for (isin, figi_tickers) in unique_isin_ticker_pairs { for (isin, figi_tickers) in unique_isin_ticker_pairs {
// Check shutdown before each ISIN // Check shutdown before each ISIN
if shutdown_flag.load(Ordering::SeqCst) { if shutdown_flag.load(Ordering::SeqCst) {
@@ -507,10 +839,15 @@ async fn process_single_company_validated(
} }
} }
let has_yahoo_ticker = tickers.iter().any(|t| t.starts_with("YAHOO:")); // Check if THIS SPECIFIC ISIN has valid Yahoo data (not ERROR)
let has_valid_yahoo = tickers.iter().any(|t| {
t.starts_with("YAHOO:") && t != "YAHOO:ERROR"
// Note: YAHOO:NO_RESULTS is valid (legitimately not found)
});
if !has_yahoo_ticker { if !has_valid_yahoo {
logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await; logger::log_info(&format!("Fetching Yahoo details for {} (ISIN: {})", name, isin)).await;
tickers.retain(|t| !t.starts_with("YAHOO:"));
match scrape_with_retry(pool, &isin, 3, shutdown_flag).await { match scrape_with_retry(pool, &isin, 3, shutdown_flag).await {
Ok(Some(details)) => { Ok(Some(details)) => {
@@ -540,11 +877,24 @@ async fn process_single_company_validated(
logger::log_warn(&format!("Shutdown during scrape for ISIN {}", isin)).await; logger::log_warn(&format!("Shutdown during scrape for ISIN {}", isin)).await;
break; break;
} }
// Check if this is a hard reset required error
let error_msg = e.to_string();
if error_msg.contains("HARD_RESET_REQUIRED") {
logger::log_error(&format!(
"Hard reset required during ISIN {} processing, propagating error",
isin
)).await;
return Err(e); // ← CRITICAL: Propagate immediately
}
logger::log_warn(&format!( logger::log_warn(&format!(
"✗ Yahoo lookup error for ISIN {} (company: {}): {}", "✗ Yahoo lookup error for ISIN {} (company: {}): {}",
isin, name, e isin, name, e
)).await; )).await;
// Continue with next ISIN
// Mark this ISIN as failed to enable retry
tickers.push("YAHOO:ERROR".to_string());
} }
} }
} }
@@ -559,6 +909,11 @@ async fn process_single_company_validated(
return Ok(None); return Ok(None);
} }
if pool.should_perform_hard_reset() {
logger::log_error("HARD_RESET_REQUIRED detected during company processing").await;
return Err(anyhow!("HARD_RESET_REQUIRED"));
}
if !isin_tickers_map.is_empty() { if !isin_tickers_map.is_empty() {
let company_entry = CompanyCrossPlatformInfo { let company_entry = CompanyCrossPlatformInfo {
name: name.clone(), name: name.clone(),

View File

@@ -74,6 +74,11 @@ pub async fn scrape_company_details_by_isin(
logger::log_warn(&format!("Shutdown detected, skipping ISIN: {}", isin)).await; logger::log_warn(&format!("Shutdown detected, skipping ISIN: {}", isin)).await;
return Ok(None); return Ok(None);
} }
if pool.should_perform_hard_reset() {
logger::log_warn("HARD_RESET_REQUIRED detected before starting ISIN scrape").await;
return Err(anyhow!("HARD_RESET_REQUIRED"));
}
let isin_owned = isin.to_string(); let isin_owned = isin.to_string();
let shutdown_clone = Arc::clone(shutdown_flag); let shutdown_clone = Arc::clone(shutdown_flag);

View File

@@ -1,4 +1,4 @@
// src/main.rs // src/main.rs - FIXED: Proper temp pool cleanup
use web_scraper::{*, scraper, economic, corporate}; use web_scraper::{*, scraper, economic, corporate};
@@ -66,25 +66,63 @@ async fn main() -> Result<()> {
logger::log_info("Monitoring dashboard available at http://localhost:3030").await; logger::log_info("Monitoring dashboard available at http://localhost:3030").await;
logger::init_debug_logger(paths.logs_dir()).await.ok(); logger::init_debug_logger(paths.logs_dir()).await.ok();
logger::log_info("=== Event Backtest Engine Started ===").await; logger::log_info("=== Economic Webscraper Started ===").await;
logger::log_info(&format!( logger::log_info(&format!(
"Config → parallel_instances: {}, task_limit: {} vpn_rotation: {}", "Config → parallel_instances: {}, task_limit: {} vpn_rotation: {} proxy_instances_per_certificate: {:?}",
config.max_parallel_instances, config.max_parallel_instances,
config.max_tasks_per_instance, config.max_tasks_per_instance,
config.enable_vpn_rotation config.enable_vpn_rotation,
config.proxy_instances_per_certificate
)).await; )).await;
let number_proxy_instances_per_certificate = config.proxy_instances_per_certificate.unwrap_or(1);
// Simple shutdown flag // Simple shutdown flag
let shutdown_flag = Arc::new(AtomicBool::new(false)); let shutdown_flag = Arc::new(AtomicBool::new(false));
// === Step 1: Fetch VPNBook configs === // === Step 1: Fetch VPNBook configs ===
let proxy_pool: Option<Arc<DockerVpnProxyPool>> = if config.enable_vpn_rotation { let proxy_pool: Option<Arc<DockerVpnProxyPool>> = if config.enable_vpn_rotation {
logger::log_info("VPN Rotation Enabled Fetching latest VPNBook configs").await; logger::log_info("VPN Rotation Enabled Fetching latest VPNBook configs").await;
let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit(None, &config, Some(monitoring_handle.clone())).await?);
// Create temp pool and ensure it's properly shut down
logger::log_info("Creating temporary ChromeDriver pool for VPN credential fetch...").await;
let temp_pool = Arc::new(ChromeDriverPool::new_with_proxy_and_task_limit(
None,
&config,
Some(monitoring_handle.clone())
).await?);
logger::log_info("Fetching VPNBook credentials...").await;
let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?; let (username, password, _files) = opnv::fetch_vpnbook_configs(&temp_pool, paths.cache_dir()).await?;
logger::log_info(&format!("VPNBook credentials → User: {}", username)).await; logger::log_info(&format!("VPNBook credentials → User: {}", username)).await;
// Properly shutdown temp pool with error handling
logger::log_info("Shutting down temporary pool...").await;
match temp_pool.shutdown().await {
Ok(()) => {
logger::log_info("✓ Temporary pool shut down successfully").await;
}
Err(e) => {
logger::log_error(&format!("✗ Temp pool shutdown error: {}", e)).await;
// Force-kill as backup
#[cfg(target_os = "windows")]
{
let _ = tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chrome.exe"])
.output()
.await;
let _ = tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chromedriver.exe"])
.output()
.await;
}
}
}
// Wait a moment for cleanup
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let server_count = std::fs::read_dir(paths.cache_openvpn_dir())? let server_count = std::fs::read_dir(paths.cache_openvpn_dir())?
.filter(|e| e.as_ref().unwrap().path().is_dir()) .filter(|e| e.as_ref().unwrap().path().is_dir())
.count(); .count();
@@ -94,7 +132,7 @@ async fn main() -> Result<()> {
None None
} else { } else {
logger::log_info(&format!("Found {} VPN servers starting Docker proxy containers", server_count)).await; logger::log_info(&format!("Found {} VPN servers starting Docker proxy containers", server_count)).await;
let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password).await?); let pp = Arc::new(DockerVpnProxyPool::new(paths.cache_openvpn_dir(), username, password, number_proxy_instances_per_certificate).await?);
logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await; logger::log_info(&format!("All {} Docker proxy containers started and ready", pp.num_proxies())).await;
for i in 0..pp.num_proxies() { for i in 0..pp.num_proxies() {
@@ -115,10 +153,10 @@ async fn main() -> Result<()> {
}; };
// === Step 2: Initialize ChromeDriver pool === // === Step 2: Initialize ChromeDriver pool ===
let pool_size = config.max_parallel_instances; let pool_size_limit = config.max_parallel_instances;
let task_limit = config.max_tasks_per_instance; let task_limit = config.max_tasks_per_instance;
logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", pool_size)).await; logger::log_info(&format!("Creating ChromeDriver pool with {} instances...", pool_size_limit)).await;
let pool = Arc::new( let pool = Arc::new(
if task_limit > 0 { if task_limit > 0 {
@@ -128,7 +166,7 @@ async fn main() -> Result<()> {
} }
); );
logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size)).await; logger::log_info(&format!("ChromeDriver pool ready with {} instances", pool_size_limit)).await;
// === Step 3: Ctrl+C handler === // === Step 3: Ctrl+C handler ===
{ {
@@ -146,27 +184,53 @@ async fn main() -> Result<()> {
// Wait a bit for tasks to notice // Wait a bit for tasks to notice
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// Cleanup // ✅ FIXED: Better error handling during shutdown
if let Err(e) = (&*pool_clone).shutdown().await { logger::log_info("Shutting down ChromeDriver pool...").await;
logger::log_error(&format!("Error during pool shutdown: {}", e)).await; match (&*pool_clone).shutdown().await {
Ok(()) => {
logger::log_info("✓ ChromeDriver pool shut down successfully").await;
}
Err(e) => {
logger::log_error(&format!("✗ Pool shutdown error: {}", e)).await;
}
} }
if let Some(pp) = proxy_clone { if let Some(pp) = proxy_clone {
if let Err(e) = pp.shutdown().await { logger::log_info("Stopping Docker VPN proxy containers...").await;
logger::log_warn(&format!("Failed to stop Docker containers: {}", e)).await; match pp.shutdown().await {
} else { Ok(()) => {
logger::log_info("All Docker VPN containers stopped").await; logger::log_info("All Docker VPN containers stopped").await;
}
Err(e) => {
logger::log_error(&format!("✗ Proxy shutdown error: {}", e)).await;
}
} }
} }
let _ = cleanup_all_proxy_containers().await; let _ = cleanup_all_proxy_containers().await;
// ✅ ADDED: Force-kill any remaining Chrome/ChromeDriver processes
#[cfg(target_os = "windows")]
{
logger::log_info("Force-killing any remaining Chrome processes...").await;
let _ = tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chrome.exe"])
.output()
.await;
let _ = tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chromedriver.exe"])
.output()
.await;
}
logger::log_info("Shutdown complete").await;
std::process::exit(0); std::process::exit(0);
}); });
} }
// === Step 4: Run scraping jobs === // === Step 4: Run scraping jobs ===
logger::log_info("--- Starting ECONOMIC data update ---").await; logger::log_info("--- Starting ECONOMIC data update ---").await;
economic::run_full_update(&config, &pool).await?; //economic::run_full_update(&config, &pool).await?;
logger::log_info("Economic update completed").await; logger::log_info("Economic update completed").await;
if !shutdown_flag.load(Ordering::SeqCst) { if !shutdown_flag.load(Ordering::SeqCst) {
@@ -178,14 +242,60 @@ async fn main() -> Result<()> {
// === Step 5: Final cleanup === // === Step 5: Final cleanup ===
if !shutdown_flag.load(Ordering::SeqCst) { if !shutdown_flag.load(Ordering::SeqCst) {
logger::log_info("Shutting down ChromeDriver pool...").await; logger::log_info("Shutting down ChromeDriver pool...").await;
pool.shutdown().await?; match pool.shutdown().await {
Ok(()) => {
logger::log_info("✓ ChromeDriver pool shut down successfully").await;
}
Err(e) => {
logger::log_error(&format!("✗ Pool shutdown error: {}", e)).await;
}
}
if let Some(pp) = proxy_pool { if let Some(pp) = proxy_pool {
logger::log_info("Stopping Docker VPN proxy containers...").await; logger::log_info("Stopping Docker VPN proxy containers...").await;
pp.shutdown().await?; match pp.shutdown().await {
Ok(()) => {
logger::log_info("✓ All Docker VPN containers stopped").await;
}
Err(e) => {
logger::log_error(&format!("✗ Proxy shutdown error: {}", e)).await;
}
}
cleanup_all_proxy_containers().await.ok(); cleanup_all_proxy_containers().await.ok();
} }
// ✅ ADDED: Final force-kill to ensure no leaks
#[cfg(target_os = "windows")]
{
logger::log_info("Final cleanup: force-killing any remaining Chrome processes...").await;
tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chrome.exe"])
.output()
.await
.ok();
tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chromedriver.exe"])
.output()
.await
.ok();
// Verify cleanup
if let Ok(output) = tokio::process::Command::new("tasklist")
.args(["/FI", "IMAGENAME eq chrome.exe"])
.output()
.await
{
let stdout = String::from_utf8_lossy(&output.stdout);
let chrome_count = stdout.lines().filter(|line| line.contains("chrome.exe")).count();
if chrome_count > 0 {
logger::log_warn(&format!("⚠️ {} Chrome processes still running after cleanup!", chrome_count)).await;
} else {
logger::log_info("✓ All Chrome processes cleaned up").await;
}
}
}
logger::log_info("=== Application finished successfully ===").await; logger::log_info("=== Application finished successfully ===").await;
} }

View File

@@ -22,6 +22,11 @@ pub enum MonitoringEvent {
instance_id: usize, instance_id: usize,
status: InstanceStatusChange, status: InstanceStatusChange,
}, },
InstanceSelected {
instance_id: usize,
half: usize,
},
// Task execution // Task execution
TaskStarted { TaskStarted {

View File

@@ -107,6 +107,10 @@ impl MonitoringService {
} }
} }
MonitoringEvent::InstanceSelected { instance_id, half } => {
self.log_info(format!("Instance #{} selected (half {})", instance_id, half)).await;
}
MonitoringEvent::TaskStarted { instance_id, url } => { MonitoringEvent::TaskStarted { instance_id, url } => {
let mut state = self.state.write().await; let mut state = self.state.write().await;
if let Some(inst) = state.instances.get_mut(&instance_id) { if let Some(inst) = state.instances.get_mut(&instance_id) {

View File

@@ -10,7 +10,16 @@ pub struct DockerVpnProxyPool {
} }
impl DockerVpnProxyPool { impl DockerVpnProxyPool {
pub async fn new(ovpn_dir: &Path, username: String, password: String) -> Result<Self> { pub async fn new(
ovpn_dir: &Path,
username: String,
password: String,
instances_per_ovpn: usize,
) -> Result<Self> {
if instances_per_ovpn == 0 {
return Err(anyhow!("instances_per_ovpn must be at least 1"));
}
// Count hostnames (subdirs in ovpn_dir) // Count hostnames (subdirs in ovpn_dir)
let hostnames: Vec<_> = std::fs::read_dir(ovpn_dir)? let hostnames: Vec<_> = std::fs::read_dir(ovpn_dir)?
.filter_map(Result::ok) .filter_map(Result::ok)
@@ -23,14 +32,21 @@ impl DockerVpnProxyPool {
return Err(anyhow!("No VPN hostnames found in {:?}", ovpn_dir)); return Err(anyhow!("No VPN hostnames found in {:?}", ovpn_dir));
} }
crate::util::logger::log_info(&format!("Found {} VPN hostnames", num_servers)).await; // Calculate total containers: hostnames × instances_per_ovpn
let total_containers = num_servers * instances_per_ovpn;
let mut container_names = Vec::with_capacity(num_servers); crate::util::logger::log_info(&format!(
let mut proxy_ports = Vec::with_capacity(num_servers); "Found {} VPN hostnames × {} instances = {} total containers",
num_servers, instances_per_ovpn, total_containers
)).await;
let mut container_names = Vec::with_capacity(total_containers);
let mut proxy_ports = Vec::with_capacity(total_containers);
let base_port: u16 = 10800; let base_port: u16 = 10800;
let mut port_counter = 0u16;
// === STEP 1: Start ALL containers first === // === STEP 1: Start ALL containers first ===
for (i, hostname) in hostnames.iter().enumerate() { for hostname in hostnames.iter() {
// Pick tcp443.ovpn if exists, else first .ovpn // Pick tcp443.ovpn if exists, else first .ovpn
let hostname_dir = ovpn_dir.join(hostname); let hostname_dir = ovpn_dir.join(hostname);
let mut ovpn_path: Option<PathBuf> = None; let mut ovpn_path: Option<PathBuf> = None;
@@ -48,48 +64,58 @@ impl DockerVpnProxyPool {
let ovpn_path = ovpn_path.ok_or_else(|| anyhow!("No .ovpn found for {}", hostname))?; let ovpn_path = ovpn_path.ok_or_else(|| anyhow!("No .ovpn found for {}", hostname))?;
let name = format!("vpn-proxy-{}", i); // Spawn multiple instances for this .ovpn file
let port = base_port + i as u16 + 1; for instance_num in 0..instances_per_ovpn {
let name = format!("vpn-proxy-{}-{}", hostname, instance_num);
let port = base_port + port_counter + 1;
port_counter += 1;
// Clean up any existing container with the same name // Clean up any existing container with the same name
let _ = Command::new("docker") let _ = Command::new("docker")
.args(["rm", "-f", &name]) .args(["rm", "-f", &name])
.status() .status()
.await; .await;
// Run Docker container // Run Docker container
let status = Command::new("docker") let status = Command::new("docker")
.args([ .args([
"run", "-d", "run", "-d",
"--name", &name, "--name", &name,
"--cap-add=NET_ADMIN", "--cap-add=NET_ADMIN",
"--device", "/dev/net/tun", "--device", "/dev/net/tun",
"--sysctl", "net.ipv4.ip_forward=1", "--sysctl", "net.ipv4.ip_forward=1",
"-v", &format!("{}:/vpn/config.ovpn", ovpn_path.display()), "-v", &format!("{}:/vpn/config.ovpn", ovpn_path.display()),
"-e", &format!("VPN_USERNAME={}", username), "-e", &format!("VPN_USERNAME={}", username),
"-e", &format!("VPN_PASSWORD={}", password), "-e", &format!("VPN_PASSWORD={}", password),
"-p", &format!("{}:1080", port), "-p", &format!("{}:1080", port),
"rust-vpn-proxy", "rust-vpn-proxy",
]) ])
.status() .status()
.await .await
.context("Failed to run Docker")?; .context("Failed to run Docker")?;
if !status.success() { if !status.success() {
return Err(anyhow!("Docker run failed for {}", name)); return Err(anyhow!("Docker run failed for {}", name));
}
crate::util::logger::log_info(&format!(
"Started container {} on port {} (using {})",
name, port, ovpn_path.file_name().unwrap().to_string_lossy()
)).await;
container_names.push(name);
proxy_ports.push(port);
} }
crate::util::logger::log_info(&format!("Started container {} on port {} (waiting for VPN...)", name, port)).await;
container_names.push(name);
proxy_ports.push(port);
} }
// Brief pause to let containers start // Brief pause to let containers start
sleep(Duration::from_secs(8)).await; sleep(Duration::from_secs(8)).await;
crate::util::logger::log_info(&format!("All {} containers started, beginning health checks...", container_names.len())).await; crate::util::logger::log_info(&format!(
"All {} containers started, beginning health checks...",
container_names.len()
)).await;
// === STEP 2: Test ALL proxies in parallel with 10-second intervals === // === STEP 2: Test ALL proxies in parallel ===
let results = Self::test_all_proxies_parallel(&container_names, &proxy_ports).await; let results = Self::test_all_proxies_parallel(&container_names, &proxy_ports).await;
// Filter out failed containers // Filter out failed containers
@@ -100,8 +126,10 @@ impl DockerVpnProxyPool {
for (i, (container_name, port)) in container_names.into_iter().zip(proxy_ports.into_iter()).enumerate() { for (i, (container_name, port)) in container_names.into_iter().zip(proxy_ports.into_iter()).enumerate() {
match &results[i] { match &results[i] {
Ok(Some(ip)) => { Ok(Some(ip)) => {
crate::util::logger::log_info(&format!("✓ Container {} on port {} ready with IP: {}", crate::util::logger::log_info(&format!(
container_name, port, ip)).await; "✓ Container {} on port {} ready with IP: {}",
container_name, port, ip
)).await;
working_containers.push(container_name); working_containers.push(container_name);
working_ports.push(port); working_ports.push(port);
} }
@@ -113,14 +141,15 @@ impl DockerVpnProxyPool {
.ok() .ok()
.and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into()); .and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into());
crate::util::logger::log_error(&format!("✗ Container {} on port {} ready but IP detection failed. Logs: {:?}", crate::util::logger::log_error(&format!(
container_name, port, logs)).await; "✗ Container {} on port {} ready but IP detection failed. Logs: {:?}",
container_name, port, logs
)).await;
failed_count += 1; failed_count += 1;
// Clean up failed container // Clean up failed container
let _ = Self::cleanup_container(&container_name).await; let _ = Self::cleanup_container(&container_name).await;
} }
Err(e) => { Err(e) => {
// Get container logs to debug
let logs = Command::new("docker") let logs = Command::new("docker")
.args(["logs", "--tail", "20", &container_name]) .args(["logs", "--tail", "20", &container_name])
.output() .output()
@@ -128,8 +157,10 @@ impl DockerVpnProxyPool {
.ok() .ok()
.and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into()); .and_then(|output| String::from_utf8_lossy(&output.stdout).to_string().into());
crate::util::logger::log_error(&format!("✗ Container {} on port {} failed: {}. Logs: {:?}", crate::util::logger::log_error(&format!(
container_name, port, e, logs)).await; "✗ Container {} on port {} failed: {}. Logs: {:?}",
container_name, port, e, logs
)).await;
failed_count += 1; failed_count += 1;
// Clean up failed container // Clean up failed container
let _ = Self::cleanup_container(&container_name).await; let _ = Self::cleanup_container(&container_name).await;
@@ -138,14 +169,19 @@ impl DockerVpnProxyPool {
} }
if working_containers.is_empty() { if working_containers.is_empty() {
return Err(anyhow!("All {} VPN proxy containers failed to start", num_servers)); return Err(anyhow!("All {} VPN proxy containers failed to start", total_containers));
} }
crate::util::logger::log_info(&format!("Started {}/{} VPN proxy containers successfully", crate::util::logger::log_info(&format!(
working_containers.len(), num_servers)).await; "Started {}/{} VPN proxy containers successfully ({} hostnames × {} instances)",
working_containers.len(), total_containers, num_servers, instances_per_ovpn
)).await;
if failed_count > 0 { if failed_count > 0 {
crate::util::logger::log_warn(&format!("{} containers failed and were cleaned up", failed_count)).await; crate::util::logger::log_warn(&format!(
"{} containers failed and were cleaned up",
failed_count
)).await;
} }
Ok(Self { Ok(Self {

377
src/scraper/hard_reset.rs Normal file
View File

@@ -0,0 +1,377 @@
// src/scraper/hard_reset.rs - FIXED: Proper cleanup without Arc leaks
use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}};
use crate::{ChromeDriverPool, Config, logger, scraper::docker_vpn_proxy::{DockerVpnProxyPool, cleanup_all_proxy_containers}, util::directories::DataPaths};
/// Simple error counter for triggering hard resets
pub struct HardResetController {
consecutive_errors: AtomicUsize,
}
impl HardResetController {
pub fn new() -> Self {
Self {
consecutive_errors: AtomicUsize::new(0),
}
}
/// Record success - resets counter
pub fn record_success(&self) {
self.consecutive_errors.store(0, Ordering::SeqCst);
}
/// Record error - returns new count
pub fn record_error(&self) -> usize {
self.consecutive_errors.fetch_add(1, Ordering::SeqCst) + 1
}
/// Reset counter
pub fn reset(&self) {
self.consecutive_errors.store(0, Ordering::SeqCst);
}
/// Get current count
pub fn get_count(&self) -> usize {
self.consecutive_errors.load(Ordering::SeqCst)
}
}
/// ✅ FIXED: Perform hard reset without Arc reference leaks
///
/// Key improvements:
/// 1. Don't clone old_pool - just shutdown through mutex guard
/// 2. Verify all processes killed before creating new pool
/// 3. Explicitly shutdown temp pools with error handling
/// 4. Add process counting/verification
pub async fn perform_hard_reset(
pool_mutex: &Arc<tokio::sync::Mutex<Arc<ChromeDriverPool>>>,
config: &Config,
paths: &DataPaths,
monitoring: &Option<crate::monitoring::MonitoringHandle>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<()> {
let number_proxy_instances = config.proxy_instances_per_certificate.unwrap_or(1);
logger::log_error("🔴 STARTING HARD RESET SEQUENCE").await;
// Check if shutdown was requested
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown requested during hard reset, aborting").await;
return Ok(());
}
// ===== STEP 1: ACQUIRE POOL LOCK (NO CLONING!) =====
logger::log_info(" [1/12] Acquiring pool lock...").await;
let mut pool_guard = pool_mutex.lock().await;
// Get instance count before shutdown for verification
let old_instance_count = pool_guard.get_number_of_instances();
logger::log_info(&format!(" [1/12] Pool has {} instances", old_instance_count)).await;
// ===== STEP 2: SHUTDOWN OLD POOL (NO ARC CLONE!) =====
logger::log_info(" [2/12] Shutting down old pool (NO Arc clone)...").await;
// Shutdown through the Arc without cloning it
// This is safe because we hold the mutex lock
match pool_guard.shutdown().await {
Ok(()) => {
logger::log_info(" [2/12] ✓ Pool shutdown complete").await;
}
Err(e) => {
logger::log_error(&format!(" [2/12] ✗ Pool shutdown error: {}", e)).await;
// Continue anyway - we'll force-kill processes
}
}
// ===== STEP 3: FORCE-KILL ANY REMAINING CHROME PROCESSES =====
logger::log_info(" [3/12] Force-killing any remaining Chrome/ChromeDriver processes...").await;
#[cfg(target_os = "windows")]
{
// Kill all chrome.exe processes
let chrome_result = tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chrome.exe"])
.output()
.await;
match chrome_result {
Ok(output) if output.status.success() => {
logger::log_info(" [3/12] ✓ Chrome processes killed").await;
}
_ => {
logger::log_info(" [3/12] ⊘ No Chrome processes found").await;
}
}
// Kill all chromedriver.exe processes
let chromedriver_result = tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chromedriver.exe"])
.output()
.await;
match chromedriver_result {
Ok(output) if output.status.success() => {
logger::log_info(" [3/12] ✓ ChromeDriver processes killed").await;
}
_ => {
logger::log_info(" [3/12] ⊘ No ChromeDriver processes found").await;
}
}
}
#[cfg(not(target_os = "windows"))]
{
// Kill all chrome processes
let _ = tokio::process::Command::new("pkill")
.arg("chrome")
.output()
.await;
let _ = tokio::process::Command::new("pkill")
.arg("chromedriver")
.output()
.await;
logger::log_info(" [3/12] ✓ Force-killed Chrome/ChromeDriver").await;
}
// ===== STEP 4: SHUTDOWN PROXIES =====
logger::log_info(" [4/12] Shutting down proxy containers...").await;
cleanup_all_proxy_containers().await.ok();
// ===== STEP 5: WAIT FOR CLEANUP =====
logger::log_info(" [5/12] Waiting 30 seconds for cleanup...").await;
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
// ===== STEP 6: VERIFY CLEANUP =====
logger::log_info(" [6/12] Verifying process cleanup...").await;
#[cfg(target_os = "windows")]
{
let check_chrome = tokio::process::Command::new("tasklist")
.args(["/FI", "IMAGENAME eq chrome.exe"])
.output()
.await;
if let Ok(output) = check_chrome {
let stdout = String::from_utf8_lossy(&output.stdout);
let chrome_count = stdout.lines().filter(|line| line.contains("chrome.exe")).count();
if chrome_count > 0 {
logger::log_warn(&format!(" [6/12] ⚠️ {} Chrome processes still running!", chrome_count)).await;
} else {
logger::log_info(" [6/12] ✓ No Chrome processes running").await;
}
}
}
// Check shutdown again
if shutdown_flag.load(Ordering::SeqCst) {
logger::log_warn("Shutdown requested during cleanup, aborting reset").await;
return Ok(());
}
// ===== STEP 7: RECREATE PROXY POOL =====
logger::log_info(" [7/12] Recreating proxy pool...").await;
let new_proxy_pool = if config.enable_vpn_rotation {
match recreate_proxy_pool_with_fresh_credentials(config, paths, monitoring, shutdown_flag).await {
Ok(pool) => {
logger::log_info(&format!(
" [7/12] ✓ Proxy pool created with {} proxies",
pool.num_proxies()
)).await;
Some(pool)
}
Err(e) => {
logger::log_warn(&format!(
" [7/12] ⚠️ Proxy creation failed: {}. Continuing without proxies.",
e
)).await;
None
}
}
} else {
logger::log_info(" [7/12] ⊘ VPN rotation disabled, skipping proxy pool").await;
None
};
// ===== STEP 8: RECREATE CHROMEDRIVER POOL =====
logger::log_info(" [8/12] Recreating ChromeDriver pool...").await;
let new_pool = Arc::new(
ChromeDriverPool::new_with_proxy_and_task_limit(
new_proxy_pool,
config,
monitoring.clone(),
).await?
);
logger::log_info(&format!(
" [8/12] ✓ ChromeDriver pool created with {} instances",
new_pool.get_number_of_instances()
)).await;
// ===== STEP 9: RESET ERROR COUNTER =====
logger::log_info(" [9/12] Resetting error counter...").await;
new_pool.get_reset_controller().reset();
logger::log_info(" [9/12] ✓ Error counter cleared").await;
// ===== STEP 10: REPLACE POOL ATOMICALLY =====
logger::log_info(" [10/12] Activating new pool...").await;
*pool_guard = new_pool;
drop(pool_guard);
logger::log_info(" [10/12] ✓ New pool activated").await;
// ===== STEP 11: EMIT MONITORING EVENT =====
logger::log_info(" [11/12] Updating monitoring...").await;
if let Some(mon) = monitoring {
mon.emit(crate::monitoring::MonitoringEvent::PoolInitialized {
pool_size: config.max_parallel_instances,
with_proxy: config.enable_vpn_rotation,
with_rotation: config.max_tasks_per_instance > 0,
});
}
// ===== STEP 12: FINAL VERIFICATION =====
logger::log_info(" [12/12] Final verification...").await;
#[cfg(target_os = "windows")]
{
let check_chrome = tokio::process::Command::new("tasklist")
.args(["/FI", "IMAGENAME eq chrome.exe"])
.output()
.await;
if let Ok(output) = check_chrome {
let stdout = String::from_utf8_lossy(&output.stdout);
let chrome_count = stdout.lines().filter(|line| line.contains("chrome.exe")).count();
logger::log_info(&format!(" [12/12] Chrome processes: {}", chrome_count)).await;
}
let check_chromedriver = tokio::process::Command::new("tasklist")
.args(["/FI", "IMAGENAME eq chromedriver.exe"])
.output()
.await;
if let Ok(output) = check_chromedriver {
let stdout = String::from_utf8_lossy(&output.stdout);
let chromedriver_count = stdout.lines().filter(|line| line.contains("chromedriver.exe")).count();
logger::log_info(&format!(" [12/12] ChromeDriver processes: {}", chromedriver_count)).await;
}
}
logger::log_info("✅ HARD RESET COMPLETE").await;
Ok(())
}
/// ✅ FIXED: Recreate proxy pool with temp pool that's properly shut down
async fn recreate_proxy_pool_with_fresh_credentials(
config: &Config,
paths: &DataPaths,
monitoring: &Option<crate::monitoring::MonitoringHandle>,
shutdown_flag: &Arc<AtomicBool>,
) -> anyhow::Result<Arc<DockerVpnProxyPool>> {
let number_proxy_instances = config.proxy_instances_per_certificate.unwrap_or(1);
// Check shutdown
if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow::anyhow!("Shutdown requested during proxy recreation"));
}
logger::log_info(" [7.1] Creating temporary ChromeDriver pool for credential fetch...").await;
// Create temporary pool WITHOUT proxy
let temp_pool = Arc::new(
ChromeDriverPool::new_with_proxy_and_task_limit(
None, // No proxy for temp pool
config,
monitoring.clone(),
).await?
);
logger::log_info(" [7.2] Fetching fresh VPNBook credentials...").await;
// Fetch fresh VPNBook credentials
let (username, password, _files) = crate::util::opnv::fetch_vpnbook_configs(
&temp_pool,
paths.cache_dir()
).await?;
logger::log_info(&format!(" [7.3] Got credentials → User: {}", username)).await;
// ✅ FIXED: Properly shutdown temp pool with error handling
logger::log_info(" [7.4] Shutting down temporary pool...").await;
match temp_pool.shutdown().await {
Ok(()) => {
logger::log_info(" [7.4] ✓ Temp pool shut down successfully").await;
}
Err(e) => {
logger::log_error(&format!(" [7.4] ✗ Temp pool shutdown error: {}", e)).await;
// Force-kill processes as backup
#[cfg(target_os = "windows")]
{
let _ = tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chrome.exe"])
.output()
.await;
let _ = tokio::process::Command::new("taskkill")
.args(["/F", "/IM", "chromedriver.exe"])
.output()
.await;
}
}
}
// Wait a moment for temp pool cleanup
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// Check shutdown again
if shutdown_flag.load(Ordering::SeqCst) {
return Err(anyhow::anyhow!("Shutdown requested during proxy recreation"));
}
// Check if we have VPN server configs
let server_count = std::fs::read_dir(paths.cache_openvpn_dir())?
.filter(|e| e.as_ref().unwrap().path().is_dir())
.count();
if server_count == 0 {
return Err(anyhow::anyhow!("No VPN servers found after credential fetch"));
}
logger::log_info(&format!(
" [7.5] Found {} VPN servers → Creating proxy pool with {} instances per server...",
server_count,
number_proxy_instances
)).await;
// Create new proxy pool
let proxy_pool = Arc::new(
DockerVpnProxyPool::new(
paths.cache_openvpn_dir(),
username,
password,
number_proxy_instances,
).await?
);
logger::log_info(&format!(
" [7.6] ✓ Proxy pool ready with {} total proxies",
proxy_pool.num_proxies()
)).await;
// Emit proxy connected events for monitoring
if let Some(mon) = monitoring {
for i in 0..proxy_pool.num_proxies() {
if let Some(proxy_info) = proxy_pool.get_proxy_info(i) {
mon.emit(crate::monitoring::MonitoringEvent::ProxyConnected {
container_name: proxy_info.container_name.clone(),
ip_address: proxy_info.ip_address.clone(),
port: proxy_info.port,
});
}
}
}
Ok(proxy_pool)
}

View File

@@ -1,3 +1,4 @@
pub mod webdriver; pub mod webdriver;
pub mod docker_vpn_proxy; pub mod docker_vpn_proxy;
pub mod helpers; pub mod helpers;
pub mod hard_reset;

View File

@@ -1,5 +1,9 @@
// src/scraper/webdriver.rs // src/scraper/webdriver.rs
use super::helpers::*; use super::helpers::*;
use super::hard_reset::HardResetController;
use super::docker_vpn_proxy::DockerVpnProxyPool;
use crate::Config;
use crate::logger;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use fantoccini::{Client, ClientBuilder}; use fantoccini::{Client, ClientBuilder};
@@ -13,8 +17,6 @@ use tokio::process::{Child, Command};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::sync::{Mutex, Semaphore}; use tokio::sync::{Mutex, Semaphore};
use tokio::time::{sleep, timeout, Duration}; use tokio::time::{sleep, timeout, Duration};
use crate::scraper::docker_vpn_proxy::{DockerVpnProxyPool};
use crate::Config;
/// Manages a pool of ChromeDriver instances for parallel scraping with optional VPN binding. /// Manages a pool of ChromeDriver instances for parallel scraping with optional VPN binding.
pub struct ChromeDriverPool { pub struct ChromeDriverPool {
@@ -31,10 +33,16 @@ pub struct ChromeDriverPool {
min_request_interval_ms: u64, min_request_interval_ms: u64,
monitoring: Option<crate::monitoring::MonitoringHandle>, monitoring: Option<crate::monitoring::MonitoringHandle>,
hard_reset_controller: Arc<HardResetController>,
config: Arc<Config>,
} }
impl ChromeDriverPool { impl ChromeDriverPool {
/// Creates a new pool without any proxy (direct connection). /// When consecutive errors reach this value, execute() will return a special error
/// that signals the caller to trigger a hard reset
const HARD_RESET_ERROR_THRESHOLD: usize = 12;
/// Creates a new pool without any proxy (direct connection).
pub async fn _new(config: &Config, monitoring: Option<crate::monitoring::MonitoringHandle>,) -> Result<Self> { pub async fn _new(config: &Config, monitoring: Option<crate::monitoring::MonitoringHandle>,) -> Result<Self> {
Self::new_with_proxy_and_task_limit(None, config, monitoring).await Self::new_with_proxy_and_task_limit(None, config, monitoring).await
} }
@@ -85,6 +93,11 @@ impl ChromeDriverPool {
// Rotation is enabled when task limiting is active // Rotation is enabled when task limiting is active
let rotation_enabled = task_per_instance_limit > 0; let rotation_enabled = task_per_instance_limit > 0;
let half_size = if rotation_enabled {
(actual_pool_size + 1) / 2 // Round up for odd numbers
} else {
actual_pool_size
};
let mut instances = Vec::with_capacity(actual_pool_size); let mut instances = Vec::with_capacity(actual_pool_size);
@@ -105,8 +118,8 @@ impl ChromeDriverPool {
for i in 0..actual_pool_size { for i in 0..actual_pool_size {
// Pass the entire proxy_pool and the index // Pass the entire proxy_pool and the index
let instance = ChromeInstance::new( let instance = ChromeInstance::new(
proxy_pool.clone(), // Clone the Arc proxy_pool.clone(),
i, // This instance's proxy index i,
config, config,
monitoring.clone(), monitoring.clone(),
).await?; ).await?;
@@ -144,7 +157,7 @@ impl ChromeDriverPool {
mon.emit(crate::monitoring::MonitoringEvent::InstanceCreated { mon.emit(crate::monitoring::MonitoringEvent::InstanceCreated {
instance_id: i, instance_id: i,
max_tasks: guard.max_tasks_per_instance, max_tasks: guard.max_tasks_per_instance,
proxy: proxy_info.clone(), // ✅ Now includes actual proxy info proxy: proxy_info.clone(),
}); });
// Also emit ProxyConnected event if proxy exists // Also emit ProxyConnected event if proxy exists
@@ -162,15 +175,21 @@ impl ChromeDriverPool {
let min_request_interval_ms = config.min_request_interval_ms; let min_request_interval_ms = config.min_request_interval_ms;
let hard_reset_controller = Arc::new(HardResetController::new());
let config_clone = Arc::new(config.clone());
Ok(Self { Ok(Self {
instances, instances,
semaphore: Arc::new(Semaphore::new(actual_pool_size)), semaphore: Arc::new(Semaphore::new(half_size)),
proxy_pool, proxy_pool,
rotation_enabled, rotation_enabled,
next_instance: Arc::new(Mutex::new(0)), next_instance: Arc::new(Mutex::new(0)),
last_request_time: Arc::new(Mutex::new(Instant::now())), last_request_time: Arc::new(Mutex::new(Instant::now())),
min_request_interval_ms, min_request_interval_ms,
monitoring, monitoring,
hard_reset_controller,
config: config_clone,
}) })
} }
@@ -188,10 +207,8 @@ impl ChromeDriverPool {
if elapsed < self.min_request_interval_ms { if elapsed < self.min_request_interval_ms {
let wait_ms = self.min_request_interval_ms - elapsed; let wait_ms = self.min_request_interval_ms - elapsed;
drop(last_time); // Lock vor Sleep freigeben! drop(last_time);
sleep(Duration::from_millis(wait_ms)).await; sleep(Duration::from_millis(wait_ms)).await;
let mut last_time = self.last_request_time.lock().await; let mut last_time = self.last_request_time.lock().await;
*last_time = Instant::now(); *last_time = Instant::now();
} else { } else {
@@ -199,12 +216,20 @@ impl ChromeDriverPool {
} }
} }
let random_index = random_range(0, self.instances.len() as u64) as usize; let instance = if self.rotation_enabled {
// Index-Auswahl (vereinfacht, siehe unten für vollständige Rotation) self.select_instance_with_rotation().await?
let index = if self.rotation_enabled {
self.get_rotated_index().await?
} else { } else {
random_index self.select_instance_round_robin().await
};
{
let mut inst = instance.lock().await;
inst.increment_task_count();
}
let index: usize = {
let instances = &self.instances;
instances.iter().position(|inst| Arc::ptr_eq(inst, &instance)).unwrap_or(0)
}; };
if let Some(ref mon) = self.monitoring { if let Some(ref mon) = self.monitoring {
@@ -216,15 +241,10 @@ impl ChromeDriverPool {
instance_id: index, instance_id: index,
status: crate::monitoring::InstanceStatusChange::Active, status: crate::monitoring::InstanceStatusChange::Active,
}); });
} };
let instance = &self.instances[index];
let mut guard = instance.lock().await; let mut guard = instance.lock().await;
// NEU: Session mit automatischer Erneuerung holen!
let client = guard.get_or_renew_session().await?; let client = guard.get_or_renew_session().await?;
guard.increment_task_count();
let (task_count, session_requests) = guard.get_session_stats().await; let (task_count, session_requests) = guard.get_session_stats().await;
crate::util::logger::log_info(&format!( crate::util::logger::log_info(&format!(
@@ -232,17 +252,17 @@ impl ChromeDriverPool {
index, task_count, guard.max_tasks_per_instance, session_requests index, task_count, guard.max_tasks_per_instance, session_requests
)).await; )).await;
drop(guard); // Lock freigeben vor Navigation drop(guard);
let start_time = Instant::now(); let start_time = Instant::now();
// Navigation mit Timeout // Navigation with timeout
let navigation_result = timeout( let navigation_result = timeout(
Duration::from_secs(60), Duration::from_secs(60),
client.goto(&url) client.goto(&url)
).await; ).await;
match navigation_result { let result = match navigation_result {
Ok(Ok(_)) => { Ok(Ok(_)) => {
if let Some(ref mon) = self.monitoring { if let Some(ref mon) = self.monitoring {
mon.emit(crate::monitoring::MonitoringEvent::TaskCompleted { mon.emit(crate::monitoring::MonitoringEvent::TaskCompleted {
@@ -258,14 +278,111 @@ impl ChromeDriverPool {
} }
crate::util::logger::log_info(&format!("✓ Navigated to {}", url)).await; crate::util::logger::log_info(&format!("✓ Navigated to {}", url)).await;
// Parse-Funktion ausführen // Execute parse function
parse(client).await match parse(client).await {
Ok(data) => {
// ✅ SUCCESS: Record and log
let prev_count = self.hard_reset_controller.get_count();
self.hard_reset_controller.record_success();
if prev_count > 0 {
logger::log_info(&format!(
"✓ Success - reset counter cleared (was: {}/{})",
prev_count,
Self::HARD_RESET_ERROR_THRESHOLD
)).await;
}
Ok(data)
}
Err(e) => {
// ❌ PARSE ERROR: Record, check threshold, invalidate session
let error_count = self.hard_reset_controller.record_error();
{
let mut inst = instance.lock().await;
inst.invalidate_current_session().await;
}
// Enhanced logging with threshold status
let threshold_pct = (error_count as f64 / Self::HARD_RESET_ERROR_THRESHOLD as f64) * 100.0;
logger::log_warn(&format!(
"Parse error. Reset counter: {}/{} ({:.0}%)",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD,
threshold_pct
)).await;
// Check if threshold reached
if error_count >= Self::HARD_RESET_ERROR_THRESHOLD {
logger::log_error(&format!(
"🔴 HARD RESET THRESHOLD REACHED ({}/{})",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
)).await;
return Err(anyhow!(
"HARD_RESET_REQUIRED: Parse failed: {}. Threshold reached ({}/{})",
e,
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
));
}
Err(anyhow!(
"Parse failed: {}. Hard reset at {}/{}",
e,
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
))
}
}
} }
Ok(Err(e)) => { Ok(Err(e)) => {
// ❌ NAVIGATION ERROR: Record, check threshold, invalidate session
crate::util::logger::log_error(&format!("Navigation failed: {}", e)).await; crate::util::logger::log_error(&format!("Navigation failed: {}", e)).await;
Err(anyhow!("Navigation failed: {}", e))
{
let mut inst = instance.lock().await;
inst.invalidate_current_session().await;
}
let error_count = self.hard_reset_controller.record_error();
// Enhanced logging
let threshold_pct = (error_count as f64 / Self::HARD_RESET_ERROR_THRESHOLD as f64) * 100.0;
logger::log_warn(&format!(
"Navigation error. Reset counter: {}/{} ({:.0}%)",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD,
threshold_pct
)).await;
// Check if threshold reached
if error_count >= Self::HARD_RESET_ERROR_THRESHOLD {
logger::log_error(&format!(
"🔴 HARD RESET THRESHOLD REACHED ({}/{})",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
)).await;
return Err(anyhow!(
"HARD_RESET_REQUIRED: Navigation failed: {}. Threshold reached ({}/{})",
e,
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
));
}
Err(anyhow!(
"Navigation failed: {}. Hard reset at {}/{}",
e,
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
))
} }
Err(_) => { Err(_) => {
// ❌ TIMEOUT ERROR: Record, check threshold, invalidate session
if let Some(ref mon) = self.monitoring { if let Some(ref mon) = self.monitoring {
mon.emit(crate::monitoring::MonitoringEvent::NavigationTimeout { mon.emit(crate::monitoring::MonitoringEvent::NavigationTimeout {
instance_id: index, instance_id: index,
@@ -273,69 +390,178 @@ impl ChromeDriverPool {
}); });
} }
let error_count = self.hard_reset_controller.record_error();
crate::util::logger::log_error("Navigation timeout (60s)").await; crate::util::logger::log_error("Navigation timeout (60s)").await;
Err(anyhow!("Navigation timeout"))
{
let mut inst = instance.lock().await;
inst.invalidate_current_session().await;
}
// Enhanced logging
let threshold_pct = (error_count as f64 / Self::HARD_RESET_ERROR_THRESHOLD as f64) * 100.0;
logger::log_warn(&format!(
"Timeout error. Reset counter: {}/{} ({:.0}%)",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD,
threshold_pct
)).await;
// Check if threshold reached
if error_count >= Self::HARD_RESET_ERROR_THRESHOLD {
logger::log_error(&format!(
"🔴 HARD RESET THRESHOLD REACHED ({}/{})",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
)).await;
return Err(anyhow!(
"HARD_RESET_REQUIRED: Navigation timeout. Threshold reached ({}/{})",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
));
}
Err(anyhow!(
"Navigation timeout. Hard reset at {}/{}",
error_count,
Self::HARD_RESET_ERROR_THRESHOLD
))
} }
};
{
let mut inst = instance.lock().await;
inst.task_count = inst.task_count.saturating_sub(1);
} }
result
} }
async fn get_rotated_index(&self) -> Result<usize> { /// Simple round-robin instance selection (no rotation)
let total = self.instances.len(); async fn select_instance_round_robin(&self) -> Arc<Mutex<ChromeInstance>> {
let half_size = total / 2; let mut next = self.next_instance.lock().await;
let index = *next;
*next = (*next + 1) % self.instances.len();
drop(next);
Arc::clone(&self.instances[index])
}
/// Round-robin with half-pool rotation
async fn select_instance_with_rotation(&self) -> Result<Arc<Mutex<ChromeInstance>>> {
let pool_size = self.instances.len();
let half_size = pool_size / 2;
if half_size == 0 { if half_size == 0 {
return Ok(0); // Pool zu klein für Rotation // Pool too small for rotation, fall back to simple round-robin
return Ok(self.select_instance_round_robin().await);
} }
let mut next_idx = self.next_instance.lock().await; let mut next = self.next_instance.lock().await;
let current_half_start = if *next_idx < half_size { 0 } else { half_size }; let current_half_start = (*next / half_size) * half_size;
let current_half_end = if *next_idx < half_size { half_size } else { total }; let current_half_end = (current_half_start + half_size).min(pool_size);
// Suche verfügbare Instanz in aktueller Hälfte // Try to find available instance in current half
for offset in 0..(current_half_end - current_half_start) { let mut attempts = 0;
let candidate_idx = current_half_start + ((*next_idx + offset) % half_size); let max_attempts = half_size * 2; // Try both halves
while attempts < max_attempts {
let index = current_half_start + (*next % half_size);
let instance = &self.instances[index];
let instance = &self.instances[candidate_idx]; // Check if instance can accept more tasks
let guard = instance.lock().await; let mut inst = instance.lock().await;
let can_accept = inst.get_task_count() < inst.max_tasks_per_instance;
drop(inst);
if guard.max_tasks_per_instance == 0 || if can_accept {
guard.task_count < guard.max_tasks_per_instance { *next = (*next + 1) % pool_size;
*next_idx = (candidate_idx + 1) % total; drop(next);
drop(guard);
return Ok(candidate_idx); if let Some(ref mon) = self.monitoring {
mon.emit(crate::monitoring::MonitoringEvent::InstanceSelected {
instance_id: index,
half: if index < half_size { 1 } else { 2 },
});
}
return Ok(Arc::clone(instance));
} }
// Current half saturated, try other half
if attempts == half_size - 1 {
logger::log_info("Current half saturated, rotating to other half").await;
*next = if current_half_start == 0 { half_size } else { 0 };
} else {
*next = (*next + 1) % pool_size;
}
attempts += 1;
} }
// Aktuelle Hälfte voll → Zur anderen wechseln drop(next);
crate::util::logger::log_info("Current half saturated, rotating to other half").await;
let new_half_start = if current_half_start == 0 { half_size } else { 0 }; // All instances saturated
let new_half_end = if current_half_start == 0 { total } else { half_size }; Err(anyhow!("All instances at task capacity"))
}
// Alte Hälfte zurücksetzen (für nächste Rotation)
for i in current_half_start..current_half_end { pub fn get_reset_controller(&self) -> Arc<HardResetController> {
let mut instance = self.instances[i].lock().await; Arc::clone(&self.hard_reset_controller)
instance.reset_task_count(); }
}
/// Check if hard reset threshold has been reached
*next_idx = new_half_start; pub fn should_perform_hard_reset(&self) -> bool {
drop(next_idx); self.hard_reset_controller.get_count() >= Self::HARD_RESET_ERROR_THRESHOLD
}
Ok(new_half_start)
/// Get current error count and threshold for monitoring
pub fn get_reset_status(&self) -> (usize, usize) {
(
self.hard_reset_controller.get_count(),
Self::HARD_RESET_ERROR_THRESHOLD
)
} }
/// Gracefully shut down all ChromeDriver processes and Docker proxy containers. /// Gracefully shut down all ChromeDriver processes and Docker proxy containers.
/// ✅ FIXED: Now with proper error propagation and Chrome process cleanup
pub async fn shutdown(&self) -> Result<()> { pub async fn shutdown(&self) -> Result<()> {
for inst in &self.instances { logger::log_info(&format!("Shutting down {} ChromeDriver instances...", self.instances.len())).await;
let mut shutdown_errors = Vec::new();
for (i, inst) in self.instances.iter().enumerate() {
logger::log_info(&format!(" Shutting down instance {}...", i)).await;
let mut guard = inst.lock().await; let mut guard = inst.lock().await;
guard.shutdown().await?; if let Err(e) = guard.shutdown().await {
logger::log_error(&format!(" ✗ Instance {} shutdown error: {}", i, e)).await;
shutdown_errors.push(format!("Instance {}: {}", i, e));
} else {
logger::log_info(&format!(" ✓ Instance {} shut down", i)).await;
}
} }
if let Some(pp) = &self.proxy_pool { if let Some(pp) = &self.proxy_pool {
pp.shutdown().await?; logger::log_info("Shutting down proxy pool...").await;
crate::util::logger::log_info("All Docker VPN proxy containers stopped").await; if let Err(e) = pp.shutdown().await {
logger::log_error(&format!("Proxy pool shutdown error: {}", e)).await;
shutdown_errors.push(format!("Proxy pool: {}", e));
} else {
logger::log_info("✓ Proxy pool shut down").await;
}
} }
if !shutdown_errors.is_empty() {
return Err(anyhow!(
"Pool shutdown completed with {} error(s): {}",
shutdown_errors.len(),
shutdown_errors.join("; ")
));
}
logger::log_info("✓ All ChromeDriver instances shut down successfully").await;
Ok(()) Ok(())
} }
@@ -369,13 +595,16 @@ pub struct ChromeInstance {
current_session: Arc<Mutex<Option<Client>>>, // Current active session current_session: Arc<Mutex<Option<Client>>>, // Current active session
session_request_count: Arc<Mutex<usize>>, session_request_count: Arc<Mutex<usize>>,
max_requests_per_session: usize, // z.B. 25 max_requests_per_session: usize,
proxy_pool: Option<Arc<DockerVpnProxyPool>>, // Referernce to the proxy pool proxy_pool: Option<Arc<DockerVpnProxyPool>>, // Reference to the proxy pool
current_proxy_index: Arc<Mutex<usize>>, // Current proxy index in use current_proxy_index: Arc<Mutex<usize>>, // Current proxy index in use
instance_id: usize, instance_id: usize,
monitoring: Option<crate::monitoring::MonitoringHandle>, monitoring: Option<crate::monitoring::MonitoringHandle>,
// ✅ NEW: Track Chrome browser PID for proper cleanup
chrome_pid: Arc<Mutex<Option<u32>>>,
} }
impl ChromeInstance { impl ChromeInstance {
@@ -405,18 +634,17 @@ impl ChromeInstance {
instance_id, instance_id,
monitoring, monitoring,
chrome_pid: Arc::new(Mutex::new(None)),
}) })
} }
pub async fn get_or_renew_session(&self) -> Result<Client> { pub async fn get_or_renew_session(&mut self) -> Result<Client> {
let mut session_opt = self.current_session.lock().await; let mut session_opt = self.current_session.lock().await;
let mut request_count = self.session_request_count.lock().await; let mut request_count = self.session_request_count.lock().await;
let old_request_count = *request_count;
// Session erneuern wenn: // Session renewal conditions:
// 1. Keine Session vorhanden // 1. No session exists
// 2. Request-Limit erreicht // 2. Request limit reached
let needs_renewal = session_opt.is_none() || *request_count >= self.max_requests_per_session; let needs_renewal = session_opt.is_none() || *request_count >= self.max_requests_per_session;
if needs_renewal { if needs_renewal {
@@ -427,16 +655,22 @@ impl ChromeInstance {
}); });
} }
// Alte Session schließen // ✅ FIXED: Close old session with proper error handling
if let Some(old_session) = session_opt.take() { if let Some(old_session) = session_opt.take() {
crate::util::logger::log_info("Closing old session").await; crate::util::logger::log_info("Closing old session").await;
let _ = old_session.close().await;
// Kurze Pause zwischen Sessions // Try to close gracefully first
if let Err(e) = old_session.close().await {
logger::log_warn(&format!("Session close failed (may leave Chrome tabs open): {}", e)).await;
// Continue anyway - we'll force-kill if needed
}
// Brief pause between sessions
let random_delay = random_range(500, 1000); let random_delay = random_range(500, 1000);
sleep(Duration::from_millis(random_delay)).await; sleep(Duration::from_millis(random_delay)).await;
} }
// Neue Session mit frischem User-Agent erstellen // Create new session with fresh User-Agent
crate::util::logger::log_info(&format!( crate::util::logger::log_info(&format!(
"Creating new session (requests in last session: {})", "Creating new session (requests in last session: {})",
*request_count *request_count
@@ -476,29 +710,35 @@ impl ChromeInstance {
mon.emit(crate::monitoring::MonitoringEvent::SessionRenewed { mon.emit(crate::monitoring::MonitoringEvent::SessionRenewed {
instance_id: self.instance_id, instance_id: self.instance_id,
old_request_count: *request_count, old_request_count: *request_count,
reason: crate::monitoring::RenewalReason::RequestLimit, reason: reason,
new_proxy: new_proxy_info, new_proxy: new_proxy_info,
}); });
} }
Ok(new_session) Ok(new_session)
} else { } else {
// Existierende Session verwenden // Use existing session
*request_count += 1; *request_count += 1;
Ok(session_opt.as_ref().unwrap().clone()) Ok(session_opt.as_ref().unwrap().clone())
} }
} }
async fn create_fresh_session(&self) -> Result<Client> { async fn create_fresh_session(&self) -> Result<Client> {
// Hole aktuellen Proxy-URL ohne self zu mutieren
let proxy_url = if let Some(ref pool) = self.proxy_pool { let proxy_url = if let Some(ref pool) = self.proxy_pool {
let mut proxy_idx = self.current_proxy_index.lock().await; let mut proxy_idx = self.current_proxy_index.lock().await;
*proxy_idx = (*proxy_idx + 1) % pool.num_proxies(); let num_proxies = pool.num_proxies();
let url = pool.get_proxy_url(*proxy_idx);
crate::util::logger::log_info(&format!( // Round-robin through all proxies
"Using proxy {} for new session", let selected_proxy = *proxy_idx % num_proxies;
*proxy_idx *proxy_idx = (*proxy_idx + 1) % num_proxies;
let url = pool.get_proxy_url(selected_proxy);
logger::log_info(&format!(
"Instance {} creating session with proxy {}/{} (rotation)",
self.instance_id,
selected_proxy,
num_proxies
)).await; )).await;
Some(url) Some(url)
@@ -509,45 +749,39 @@ impl ChromeInstance {
let user_agent = Self::chrome_user_agent(); let user_agent = Self::chrome_user_agent();
let capabilities = self.chrome_args_with_ua(user_agent, &proxy_url); let capabilities = self.chrome_args_with_ua(user_agent, &proxy_url);
ClientBuilder::native() let client = ClientBuilder::native()
.capabilities(capabilities) .capabilities(capabilities)
.connect(&self.base_url) .connect(&self.base_url)
.await .await
.context("Failed to connect to ChromeDriver") .context("Failed to connect to ChromeDriver")?;
// ✅ NEW: Extract and store Chrome PID for cleanup
// Chrome process info can be extracted from session info if needed
// For now, we rely on killing the process tree
Ok(client)
} }
fn chrome_args_with_ua(&self, user_agent: &str, proxy_url: &Option<String>) -> Map<String, Value> { pub async fn invalidate_current_session(&self) {
let mut args = vec![ let mut session_opt = self.current_session.lock().await;
"--headless=new".to_string(),
"--disable-gpu".to_string(),
"--no-sandbox".to_string(),
"--disable-dev-shm-usage".to_string(),
"--disable-infobars".to_string(),
"--disable-extensions".to_string(),
"--disable-popup-blocking".to_string(),
"--disable-notifications".to_string(),
"--disable-autofill".to_string(),
"--disable-sync".to_string(),
"--disable-default-apps".to_string(),
"--disable-translate".to_string(),
"--disable-blink-features=AutomationControlled".to_string(),
format!("--user-agent={}", user_agent),
];
if let Some(proxy) = proxy_url { if let Some(old_session) = session_opt.take() {
args.push(format!("--proxy-server={}", proxy)); crate::util::logger::log_info(&format!(
"Invalidating broken session for instance {}",
self.instance_id
)).await;
// ✅ FIXED: Proper error handling instead of silent failure
if let Err(e) = old_session.close().await {
logger::log_warn(&format!(
"Failed to close broken session (Chrome tabs may remain): {}",
e
)).await;
}
} }
let caps = serde_json::json!({ let mut request_count = self.session_request_count.lock().await;
"goog:chromeOptions": { *request_count = 0;
"args": args,
"excludeSwitches": ["enable-logging", "enable-automation"],
"prefs": {
"profile.default_content_setting_values.notifications": 2
}
}
});
caps.as_object().cloned().unwrap()
} }
pub fn reset_task_count(&mut self) { pub fn reset_task_count(&mut self) {
@@ -567,17 +801,103 @@ impl ChromeInstance {
self.task_count self.task_count
} }
/// ✅ FIXED: Proper Chrome + ChromeDriver shutdown with process tree killing
pub async fn shutdown(&mut self) -> Result<()> { pub async fn shutdown(&mut self) -> Result<()> {
logger::log_info(&format!("Shutting down ChromeInstance {}...", self.instance_id)).await;
// Step 1: Close any active session to signal Chrome to close
{
let mut session_opt = self.current_session.lock().await;
if let Some(session) = session_opt.take() {
logger::log_info(" Closing active session...").await;
if let Err(e) = session.close().await {
logger::log_warn(&format!(" Session close failed: {}", e)).await;
}
}
}
// Step 2: Abort stderr logging task
if let Some(handle) = self.stderr_log.take() { if let Some(handle) = self.stderr_log.take() {
handle.abort(); handle.abort();
let _ = handle.await; let _ = handle.await;
} }
let _ = self.process.start_kill(); // Step 3: Get ChromeDriver PID before killing
let _ = self.process.wait().await; let chromedriver_pid = self.process.id();
logger::log_info(&format!(" ChromeDriver PID: {:?}", chromedriver_pid)).await;
// Step 4: Kill ChromeDriver and wait
if let Err(e) = self.process.start_kill() {
logger::log_warn(&format!(" Failed to kill ChromeDriver: {}", e)).await;
}
// Wait for ChromeDriver to exit (with timeout)
match timeout(Duration::from_secs(5), self.process.wait()).await {
Ok(Ok(status)) => {
logger::log_info(&format!(" ChromeDriver exited with status: {:?}", status)).await;
}
Ok(Err(e)) => {
logger::log_warn(&format!(" Error waiting for ChromeDriver: {}", e)).await;
}
Err(_) => {
logger::log_warn(" ChromeDriver didn't exit within 5s").await;
}
}
// Step 5: ✅ CRITICAL FIX: Force-kill Chrome process tree
// On Windows, Chrome doesn't die when ChromeDriver dies
if let Some(pid) = chromedriver_pid {
logger::log_info(&format!(" Force-killing Chrome process tree for PID {}...", pid)).await;
#[cfg(target_os = "windows")]
{
// Kill entire process tree on Windows
let _ = Command::new("taskkill")
.args(["/F", "/T", "/PID", &pid.to_string()])
.output()
.await;
// Also kill any remaining chrome.exe processes
let _ = Command::new("taskkill")
.args(["/F", "/IM", "chrome.exe"])
.output()
.await;
}
#[cfg(not(target_os = "windows"))]
{
// Kill process group on Unix
let _ = Command::new("pkill")
.args(["-P", &pid.to_string()])
.output()
.await;
}
logger::log_info(" ✓ Chrome process tree killed").await;
}
// Step 6: Wait a moment for processes to fully terminate
sleep(Duration::from_millis(500)).await;
logger::log_info(&format!("✓ ChromeInstance {} shut down", self.instance_id)).await;
Ok(()) Ok(())
} }
pub fn is_available(&self) -> bool {
if self.max_tasks_per_instance == 0 {
return true; // No limit
}
self.task_count < self.max_tasks_per_instance
}
pub fn tasks_remaining(&self) -> usize {
if self.max_tasks_per_instance == 0 {
return usize::MAX;
}
self.max_tasks_per_instance.saturating_sub(self.task_count)
}
/// Spawns the actual `chromedriver` binary and waits for it to become ready. /// Spawns the actual `chromedriver` binary and waits for it to become ready.
async fn spawn_chromedriver() -> Result<(String, Child, JoinHandle<()>)> { async fn spawn_chromedriver() -> Result<(String, Child, JoinHandle<()>)> {
let mut process = Command::new("chromedriver-win64/chromedriver.exe") let mut process = Command::new("chromedriver-win64/chromedriver.exe")
@@ -624,6 +944,40 @@ impl ChromeInstance {
Err(anyhow!("ChromeDriver failed to start within 30s")) Err(anyhow!("ChromeDriver failed to start within 30s"))
} }
fn chrome_args_with_ua(&self, user_agent: &str, proxy_url: &Option<String>) -> Map<String, Value> {
let mut args = vec![
"--headless=new".to_string(),
"--disable-gpu".to_string(),
"--no-sandbox".to_string(),
"--disable-dev-shm-usage".to_string(),
"--disable-infobars".to_string(),
"--disable-extensions".to_string(),
"--disable-popup-blocking".to_string(),
"--disable-notifications".to_string(),
"--disable-autofill".to_string(),
"--disable-sync".to_string(),
"--disable-default-apps".to_string(),
"--disable-translate".to_string(),
"--disable-blink-features=AutomationControlled".to_string(),
format!("--user-agent={}", user_agent),
];
if let Some(proxy) = proxy_url {
args.push(format!("--proxy-server={}", proxy));
}
let caps = serde_json::json!({
"goog:chromeOptions": {
"args": args,
"excludeSwitches": ["enable-logging", "enable-automation"],
"prefs": {
"profile.default_content_setting_values.notifications": 2
}
}
});
caps.as_object().cloned().unwrap()
}
pub fn chrome_user_agent() -> &'static str { pub fn chrome_user_agent() -> &'static str {
static UAS: &[&str] = &[ static UAS: &[&str] = &[
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.91 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.91 Safari/537.36",
@@ -636,6 +990,24 @@ impl ChromeInstance {
} }
} }
impl Drop for ChromeInstance {
fn drop(&mut self) {
// Signal both ChromeDriver and Chrome to terminate
let _ = self.process.start_kill();
// Also try to kill Chrome if we know the PID
if let Some(pid) = self.process.id() {
#[cfg(target_os = "windows")]
{
// Fire and forget - this is best-effort cleanup
let _ = std::process::Command::new("taskkill")
.args(["/F", "/T", "/PID", &pid.to_string()])
.output();
}
}
}
}
fn parse_chromedriver_address(line: &str) -> Option<String> { fn parse_chromedriver_address(line: &str) -> Option<String> {
if line.contains("Starting ChromeDriver") { if line.contains("Starting ChromeDriver") {
if let Some(port_str) = line.split("on port ").nth(1) { if let Some(port_str) = line.split("on port ").nth(1) {
@@ -656,14 +1028,6 @@ fn parse_chromedriver_address(line: &str) -> Option<String> {
None None
} }
impl Drop for ChromeInstance {
fn drop(&mut self) {
// Signal child to terminate. Do NOT block here; shutdown should be
// performed with the async `shutdown()` method when possible.
let _ = self.process.start_kill();
}
}
/// Simplified task execution - uses the pool pattern. /// Simplified task execution - uses the pool pattern.
pub struct ScrapeTask<T> { pub struct ScrapeTask<T> {
url: String, url: String,