added dual key usage for event detection

This commit is contained in:
2025-11-18 15:10:36 +01:00
parent 4dec97ef63
commit 0ca53bf585
2 changed files with 209 additions and 74 deletions

View File

@@ -41,6 +41,8 @@ WebScraper/
│ └── main.rs # Hauptanwendungslogik │ └── main.rs # Hauptanwendungslogik
├── chromedriver-win64/ # ChromeDriver Binary ├── chromedriver-win64/ # ChromeDriver Binary
├── Cargo.toml # Rust Abhängigkeiten ├── Cargo.toml # Rust Abhängigkeiten
├──
├──
├── Cargo.lock # Versionssperren ├── Cargo.lock # Versionssperren
├── countries.json # Länderreferenzdaten ├── countries.json # Länderreferenzdaten
├── continents.json # Kontinentreferenzdaten ├── continents.json # Kontinentreferenzdaten

View File

@@ -1,12 +1,15 @@
use chrono::{NaiveDate, Datelike, Local}; use chrono::{Datelike, Local, NaiveDate};
use fantoccini::{ClientBuilder, Locator}; use fantoccini::{ClientBuilder, Locator};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use std::{collections::{HashMap, HashSet}, path::PathBuf, process::Command}; use std::{
collections::{HashMap, HashSet},
path::PathBuf,
process::Command,
};
use tokio::{ use tokio::{
fs, fs, signal,
signal, time::{Duration, sleep},
time::{sleep, Duration},
}; };
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
@@ -213,11 +216,7 @@ async fn extract_all_data_via_js(
Ok(vec![]) Ok(vec![])
} }
async fn set_date_range( async fn set_date_range(client: &fantoccini::Client, start: &str, end: &str) -> anyhow::Result<()> {
client: &fantoccini::Client,
start: &str,
end: &str,
) -> anyhow::Result<()> {
let set_dates_script = format!( let set_dates_script = format!(
r#" r#"
(() => {{ (() => {{
@@ -265,17 +264,26 @@ fn calculate_next_start_date(events: &[EconomicEvent]) -> anyhow::Result<String>
Ok(next.format("%Y-%m-%d").to_string()) Ok(next.format("%Y-%m-%d").to_string())
} }
fn event_key(event: &EconomicEvent) -> String { /// Storage key: date|time|event (for exact occurrence deduplication)
fn event_lookup_key(event: &EconomicEvent) -> String {
format!("{}|{}|{}", event.date, event.time, event.event) format!("{}|{}|{}", event.date, event.time, event.event)
} }
/// Identity key: country|event (for tracking same event across reschedules)
fn event_identity_key(event: &EconomicEvent) -> String {
format!("{}|{}", event.country, event.event)
}
/// Compare two events and detect changes in future data /// Compare two events and detect changes in future data
fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, now: &str) -> Vec<EventChange> { fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, now: &str) -> Vec<EventChange> {
let mut changes = Vec::new(); let mut changes = Vec::new();
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
println!("🔍 Checking event: {} on {} (now: {})", new.event, new.date, now); println!(
"🔍 Checking event: {} on {} (now: {})",
new.event, new.date, now
);
// Only track changes for future events // Only track changes for future events
if new.date.as_str() <= now { if new.date.as_str() <= now {
println!(" ⏭️ Skipped: Event is in the past/today"); println!(" ⏭️ Skipped: Event is in the past/today");
@@ -293,8 +301,10 @@ fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, now: &str) -> Vec<Ev
for (field_name, old_val, new_val) in fields { for (field_name, old_val, new_val) in fields {
if old_val != new_val { if old_val != new_val {
println!(" 🔄 CHANGE DETECTED in '{}': '{}' -> '{}'", println!(
field_name, old_val, new_val); " 🔄 CHANGE DETECTED in '{}': '{}' -> '{}'",
field_name, old_val, new_val
);
changes.push(EventChange { changes.push(EventChange {
date: new.date.clone(), date: new.date.clone(),
event: new.event.clone(), event: new.event.clone(),
@@ -316,10 +326,34 @@ fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, now: &str) -> Vec<Ev
changes changes
} }
/// Build identity lookup map: finds most recent occurrence of each event by identity
fn build_identity_lookup(
events: &HashMap<String, EconomicEvent>,
) -> HashMap<String, (String, EconomicEvent)> {
let mut identity_map: HashMap<String, (String, EconomicEvent)> = HashMap::new();
for (lookup_key, event) in events {
let identity = event_identity_key(event);
// Keep the most recent occurrence (latest date/time)
if let Some((existing_key, existing_event)) = identity_map.get(&identity) {
if event.date > existing_event.date
|| (event.date == existing_event.date && event.time > existing_event.time)
{
identity_map.insert(identity.clone(), (lookup_key.clone(), event.clone()));
}
} else {
identity_map.insert(identity, (lookup_key.clone(), event.clone()));
}
}
identity_map
}
/// Scan the economic_events directory for existing chunks /// Scan the economic_events directory for existing chunks
async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> { async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> {
let events_dir = PathBuf::from("economic_events"); let events_dir = PathBuf::from("economic_events");
if !events_dir.exists() { if !events_dir.exists() {
fs::create_dir_all(&events_dir).await?; fs::create_dir_all(&events_dir).await?;
println!("📁 Created economic_events directory"); println!("📁 Created economic_events directory");
@@ -337,7 +371,8 @@ async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> {
let parts: Vec<&str> = dates.split('_').collect(); let parts: Vec<&str> = dates.split('_').collect();
if parts.len() == 2 { if parts.len() == 2 {
if let Ok(content) = fs::read_to_string(&path).await { if let Ok(content) = fs::read_to_string(&path).await {
if let Ok(events) = serde_json::from_str::<Vec<EconomicEvent>>(&content) { if let Ok(events) = serde_json::from_str::<Vec<EconomicEvent>>(&content)
{
chunks.push(ChunkInfo { chunks.push(ChunkInfo {
start_date: parts[0].to_string(), start_date: parts[0].to_string(),
end_date: parts[1].to_string(), end_date: parts[1].to_string(),
@@ -353,12 +388,14 @@ async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> {
} }
chunks.sort_by(|a, b| a.start_date.cmp(&b.start_date)); chunks.sort_by(|a, b| a.start_date.cmp(&b.start_date));
if !chunks.is_empty() { if !chunks.is_empty() {
println!("\n📊 Found {} existing chunks:", chunks.len()); println!("\n📊 Found {} existing chunks:", chunks.len());
for chunk in &chunks { for chunk in &chunks {
println!("{} to {} ({} events)", println!(
chunk.start_date, chunk.end_date, chunk.event_count); "{} to {} ({} events)",
chunk.start_date, chunk.end_date, chunk.event_count
);
} }
} else { } else {
println!("🔭 No existing chunks found"); println!("🔭 No existing chunks found");
@@ -374,20 +411,23 @@ fn calculate_target_end_date() -> String {
NaiveDate::from_ymd_opt(now.year() + 1, (now.month() + 3) % 12, 1) NaiveDate::from_ymd_opt(now.year() + 1, (now.month() + 3) % 12, 1)
} else { } else {
NaiveDate::from_ymd_opt(now.year(), now.month() + 3, 1) NaiveDate::from_ymd_opt(now.year(), now.month() + 3, 1)
}.unwrap(); }
.unwrap();
three_months_ahead.format("%Y-%m-%d").to_string() three_months_ahead.format("%Y-%m-%d").to_string()
} }
/// Load all events from existing chunks into a HashMap /// Load all events from existing chunks into a HashMap
async fn load_existing_events(chunks: &[ChunkInfo]) -> anyhow::Result<HashMap<String, EconomicEvent>> { async fn load_existing_events(
chunks: &[ChunkInfo],
) -> anyhow::Result<HashMap<String, EconomicEvent>> {
let mut event_map = HashMap::new(); let mut event_map = HashMap::new();
for chunk in chunks { for chunk in chunks {
if let Ok(content) = fs::read_to_string(&chunk.path).await { if let Ok(content) = fs::read_to_string(&chunk.path).await {
if let Ok(events) = serde_json::from_str::<Vec<EconomicEvent>>(&content) { if let Ok(events) = serde_json::from_str::<Vec<EconomicEvent>>(&content) {
for event in events { for event in events {
event_map.insert(event_key(&event), event); event_map.insert(event_lookup_key(&event), event);
} }
} }
} }
@@ -411,11 +451,14 @@ async fn save_changes(changes: &[EventChange]) -> anyhow::Result<()> {
// Group changes by month // Group changes by month
let mut changes_by_month: HashMap<String, Vec<EventChange>> = HashMap::new(); let mut changes_by_month: HashMap<String, Vec<EventChange>> = HashMap::new();
for change in changes { for change in changes {
if let Some(date) = parse_date(&change.date) { if let Some(date) = parse_date(&change.date) {
let month_key = format!("{:02}_{}", date.month(), date.year()); let month_key = format!("{:02}_{}", date.month(), date.year());
changes_by_month.entry(month_key).or_default().push(change.clone()); changes_by_month
.entry(month_key)
.or_default()
.push(change.clone());
} }
} }
@@ -429,7 +472,9 @@ async fn save_changes(changes: &[EventChange]) -> anyhow::Result<()> {
// Load existing changes if file exists // Load existing changes if file exists
let existing_count = if filepath.exists() { let existing_count = if filepath.exists() {
let content = fs::read_to_string(&filepath).await?; let content = fs::read_to_string(&filepath).await?;
serde_json::from_str::<Vec<EventChange>>(&content).unwrap_or_default().len() serde_json::from_str::<Vec<EventChange>>(&content)
.unwrap_or_default()
.len()
} else { } else {
0 0
}; };
@@ -447,9 +492,14 @@ async fn save_changes(changes: &[EventChange]) -> anyhow::Result<()> {
// Save combined changes // Save combined changes
let json = serde_json::to_string_pretty(&all_changes)?; let json = serde_json::to_string_pretty(&all_changes)?;
fs::write(&filepath, json).await?; fs::write(&filepath, json).await?;
println!("{}: {} existing + {} new = {} total changes", println!(
filename, existing_count, month_changes.len(), all_changes.len()); "{}: {} existing + {} new = {} total changes",
filename,
existing_count,
month_changes.len(),
all_changes.len()
);
} }
Ok(()) Ok(())
@@ -510,14 +560,16 @@ async fn save_optimized_chunks(events: HashMap<String, EconomicEvent>) -> anyhow
continue; continue;
} }
let start = chunk.iter() let start = chunk
.iter()
.filter_map(|e| parse_date(&e.date)) .filter_map(|e| parse_date(&e.date))
.min() .min()
.unwrap() .unwrap()
.format("%Y-%m-%d") .format("%Y-%m-%d")
.to_string(); .to_string();
let end = chunk.iter() let end = chunk
.iter()
.filter_map(|e| parse_date(&e.date)) .filter_map(|e| parse_date(&e.date))
.max() .max()
.unwrap() .unwrap()
@@ -529,8 +581,12 @@ async fn save_optimized_chunks(events: HashMap<String, EconomicEvent>) -> anyhow
let json = serde_json::to_string_pretty(&chunk)?; let json = serde_json::to_string_pretty(&chunk)?;
fs::write(&filepath, json).await?; fs::write(&filepath, json).await?;
println!("💾 Saved optimized chunk: {} ({} events)", filename, chunk.len()); println!(
"💾 Saved optimized chunk: {} ({} events)",
filename,
chunk.len()
);
} }
Ok(()) Ok(())
@@ -544,10 +600,14 @@ async fn scrape_and_update(
existing_events: &mut HashMap<String, EconomicEvent>, existing_events: &mut HashMap<String, EconomicEvent>,
) -> anyhow::Result<Vec<EventChange>> { ) -> anyhow::Result<Vec<EventChange>> {
println!("\n🎯 Scraping range: {} to {}", start, end); println!("\n🎯 Scraping range: {} to {}", start, end);
let mut current_start = start.to_string(); let mut current_start = start.to_string();
let mut all_changes = Vec::new(); let mut all_changes = Vec::new();
let now = Local::now().naive_local().date().format("%Y-%m-%d").to_string(); let now = Local::now()
.naive_local()
.date()
.format("%Y-%m-%d")
.to_string();
println!("📅 Current date for comparison: {}", now); println!("📅 Current date for comparison: {}", now);
println!("🔍 Starting change detection...\n"); println!("🔍 Starting change detection...\n");
@@ -564,37 +624,99 @@ async fn scrape_and_update(
println!(" 📦 Fetched {} events", events.len()); println!(" 📦 Fetched {} events", events.len());
// Build identity lookup for existing events (before processing new batch)
let identity_lookup = build_identity_lookup(existing_events);
let mut events_to_remove: Vec<String> = Vec::new();
// Process events: detect changes and update map // Process events: detect changes and update map
let mut new_events_count = 0; let mut new_events_count = 0;
let mut updated_events_count = 0; let mut updated_events_count = 0;
let mut rescheduled_events_count = 0;
for new_event in events.clone() { for new_event in events.clone() {
let key = event_key(&new_event); let lookup_key = event_lookup_key(&new_event);
let identity_key = event_identity_key(&new_event);
if let Some(old_event) = existing_events.get(&key) {
// Event exists - check for changes // CASE A: Exact match (same date/time/event)
if let Some(old_event) = existing_events.get(&lookup_key) {
println!("\n 🔎 Comparing existing event:"); println!("\n 🔎 Comparing existing event:");
println!(" Event: {}", new_event.event); println!(" Event: {}", new_event.event);
println!(" Date: {} | Time: {}", new_event.date, new_event.time); println!(" Date: {} | Time: {}", new_event.date, new_event.time);
let changes = detect_changes(old_event, &new_event, &now); let changes = detect_changes(old_event, &new_event, &now);
if !changes.is_empty() { if !changes.is_empty() {
println!("{} change(s) detected and recorded!", changes.len()); println!("{} change(s) detected and recorded!", changes.len());
all_changes.extend(changes); all_changes.extend(changes);
updated_events_count += 1; updated_events_count += 1;
} }
} else {
new_events_count += 1; existing_events.insert(lookup_key, new_event);
println!(" New event: {} on {}", new_event.event, new_event.date); continue;
} }
// Insert or update event // CASE B: Check if event was rescheduled (same identity, different date/time)
existing_events.insert(key, new_event); if let Some((old_lookup_key, old_event)) = identity_lookup.get(&identity_key) {
if old_lookup_key != &lookup_key {
// Event was rescheduled!
println!("\n 🔄 RESCHEDULED EVENT DETECTED:");
println!(" Event: {}", new_event.event);
println!(
" Old: {} @ {} | New: {} @ {}",
old_event.date, old_event.time, new_event.date, new_event.time
);
// Track date/time change
if new_event.date.as_str() > now.as_str() {
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
all_changes.push(EventChange {
date: new_event.date.clone(),
event: new_event.event.clone(),
country: new_event.country.clone(),
field_changed: "date_time".to_string(),
old_value: format!("{} @ {}", old_event.date, old_event.time),
new_value: format!("{} @ {}", new_event.date, new_event.time),
detected_at: timestamp,
});
println!(" 📝 Date/time change recorded");
}
// Check for other field changes too
let field_changes = detect_changes(old_event, &new_event, &now);
if !field_changes.is_empty() {
println!(
"{} additional field change(s) detected!",
field_changes.len()
);
all_changes.extend(field_changes);
}
// Remove old occurrence and add new one
events_to_remove.push(old_lookup_key.clone());
existing_events.insert(lookup_key, new_event);
rescheduled_events_count += 1;
continue;
}
}
// CASE C: New event
new_events_count += 1;
println!(
" New event: {} on {} @ {}",
new_event.event, new_event.date, new_event.time
);
existing_events.insert(lookup_key, new_event);
}
// Remove old occurrences of rescheduled events
for key in events_to_remove {
existing_events.remove(&key);
} }
println!("\n 📊 Batch summary:"); println!("\n 📊 Batch summary:");
println!(" New events: {}", new_events_count); println!(" New events: {}", new_events_count);
println!(" Updated events: {}", updated_events_count); println!(" Updated events: {}", updated_events_count);
println!(" Rescheduled events: {}", rescheduled_events_count);
println!(" Changes tracked: {}", all_changes.len()); println!(" Changes tracked: {}", all_changes.len());
let next = match calculate_next_start_date(&events) { let next = match calculate_next_start_date(&events) {
@@ -622,35 +744,41 @@ async fn scrape_and_update(
/// Main logic with intelligent update handling /// Main logic with intelligent update handling
async fn run_intelligent_update(client: &fantoccini::Client) -> anyhow::Result<()> { async fn run_intelligent_update(client: &fantoccini::Client) -> anyhow::Result<()> {
let now = Local::now().naive_local().date().format("%Y-%m-%d").to_string(); let now = Local::now()
.naive_local()
.date()
.format("%Y-%m-%d")
.to_string();
let target_end = calculate_target_end_date(); let target_end = calculate_target_end_date();
println!("📅 Today: {}", now); println!("📅 Today: {}", now);
println!("🎯 Target end date: {}", target_end); println!("🎯 Target end date: {}", target_end);
// Load existing chunks // Load existing chunks
let chunks = scan_existing_chunks().await?; let chunks = scan_existing_chunks().await?;
let mut existing_events = load_existing_events(&chunks).await?; let mut existing_events = load_existing_events(&chunks).await?;
if existing_events.is_empty() { if existing_events.is_empty() {
// No existing data - full scrape from beginning // No existing data - full scrape from beginning
println!("\n🔭 No existing data - starting fresh scrape from 2007-02-13"); println!("\n🔭 No existing data - starting fresh scrape from 2007-02-13");
let changes = scrape_and_update(client, "2007-02-13", &target_end, &mut existing_events).await?; let changes =
scrape_and_update(client, "2007-02-13", &target_end, &mut existing_events).await?;
save_changes(&changes).await?; save_changes(&changes).await?;
save_optimized_chunks(existing_events).await?; save_optimized_chunks(existing_events).await?;
return Ok(()); return Ok(());
} }
// Find date range of existing data // Find date range of existing data
let dates: Vec<NaiveDate> = existing_events.values() let dates: Vec<NaiveDate> = existing_events
.values()
.filter_map(|e| parse_date(&e.date)) .filter_map(|e| parse_date(&e.date))
.collect(); .collect();
let min_date = dates.iter().min().unwrap().format("%Y-%m-%d").to_string(); let min_date = dates.iter().min().unwrap().format("%Y-%m-%d").to_string();
let max_date = dates.iter().max().unwrap().format("%Y-%m-%d").to_string(); let max_date = dates.iter().max().unwrap().format("%Y-%m-%d").to_string();
println!("📊 Existing data range: {} to {}", min_date, max_date); println!("📊 Existing data range: {} to {}", min_date, max_date);
// Determine update strategy // Determine update strategy
if max_date < now { if max_date < now {
// Case 1: Data is in the past, need to update from max_date to target // Case 1: Data is in the past, need to update from max_date to target
@@ -658,36 +786,43 @@ async fn run_intelligent_update(client: &fantoccini::Client) -> anyhow::Result<(
.and_then(|d| d.succ_opt()) .and_then(|d| d.succ_opt())
.map(|d| d.format("%Y-%m-%d").to_string()) .map(|d| d.format("%Y-%m-%d").to_string())
.unwrap_or(max_date); .unwrap_or(max_date);
println!("\n📈 Updating from end of existing data: {} to {}", next_start, target_end); println!(
let changes = scrape_and_update(client, &next_start, &target_end, &mut existing_events).await?; "\n📈 Updating from end of existing data: {} to {}",
next_start, target_end
);
let changes =
scrape_and_update(client, &next_start, &target_end, &mut existing_events).await?;
save_changes(&changes).await?; save_changes(&changes).await?;
save_optimized_chunks(existing_events).await?; save_optimized_chunks(existing_events).await?;
} else if max_date >= now { } else if max_date >= now {
// Case 2: Data extends to or beyond today, only refresh future data // Case 2: Data extends to or beyond today, only refresh future data
println!("\n🔄 Data exists up to today - only refreshing future data: {} to {}", now, target_end); println!(
"\n🔄 Data exists up to today - only refreshing future data: {} to {}",
now, target_end
);
// Create a separate HashMap for only future events to avoid touching past data // Create a separate HashMap for only future events to avoid touching past data
let mut future_events: HashMap<String, EconomicEvent> = existing_events let mut future_events: HashMap<String, EconomicEvent> = existing_events
.iter() .iter()
.filter(|(_, event)| event.date.as_str() >= now.as_str()) .filter(|(_, event)| event.date.as_str() >= now.as_str())
.map(|(k, v)| (k.clone(), v.clone())) .map(|(k, v)| (k.clone(), v.clone()))
.collect(); .collect();
println!("📋 {} future events will be refreshed", future_events.len()); println!("📋 {} future events will be refreshed", future_events.len());
// Scrape and update only future events // Scrape and update only future events
let changes = scrape_and_update(client, &now, &target_end, &mut future_events).await?; let changes = scrape_and_update(client, &now, &target_end, &mut future_events).await?;
save_changes(&changes).await?; save_changes(&changes).await?;
// Merge future events back into existing events (past data untouched) // Merge future events back into existing events (past data untouched)
for (key, event) in future_events { for (key, event) in future_events {
existing_events.insert(key, event); existing_events.insert(key, event);
} }
save_optimized_chunks(existing_events).await?; save_optimized_chunks(existing_events).await?;
} }
println!("\n✅ Update complete!"); println!("\n✅ Update complete!");
Ok(()) Ok(())
} }
@@ -723,9 +858,7 @@ async fn main() -> anyhow::Result<()> {
// Setup graceful shutdown // Setup graceful shutdown
let shutdown_client = client.clone(); let shutdown_client = client.clone();
tokio::spawn(async move { tokio::spawn(async move {
signal::ctrl_c() signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
.await
.expect("Failed to listen for ctrl+c");
println!("\nCtrl+C received, shutting down..."); println!("\nCtrl+C received, shutting down...");
shutdown_client.close().await.ok(); shutdown_client.close().await.ok();
std::process::exit(0); std::process::exit(0);
@@ -755,7 +888,7 @@ async fn main() -> anyhow::Result<()> {
// Display final summary // Display final summary
let chunks = scan_existing_chunks().await?; let chunks = scan_existing_chunks().await?;
let final_events = load_existing_events(&chunks).await?; let final_events = load_existing_events(&chunks).await?;
println!("\n📊 FINAL SUMMARY:"); println!("\n📊 FINAL SUMMARY:");
println!(" • Total chunks: {}", chunks.len()); println!(" • Total chunks: {}", chunks.len());
println!(" • Total events: {}", final_events.len()); println!(" • Total events: {}", final_events.len());
@@ -764,4 +897,4 @@ async fn main() -> anyhow::Result<()> {
chromedriver.kill()?; chromedriver.kill()?;
Ok(()) Ok(())
} }