working update function
This commit is contained in:
12
economic_event_changes/event_changes_12_2025.json
Normal file
12
economic_event_changes/event_changes_12_2025.json
Normal file
@@ -0,0 +1,12 @@
|
||||
[
|
||||
{
|
||||
"date": "2025-12-31",
|
||||
"event": "NBS EMI Dienstleistungen",
|
||||
"country": "China",
|
||||
"change_type": "time",
|
||||
"field_changed": "time",
|
||||
"old_value": "01:30",
|
||||
"new_value": "02:30",
|
||||
"detected_at": "2025-11-18 19:45:41"
|
||||
}
|
||||
]
|
||||
156
src/main.rs
156
src/main.rs
@@ -30,6 +30,7 @@ struct EventChange {
|
||||
date: String,
|
||||
event: String,
|
||||
country: String,
|
||||
change_type: String, // date | time | forecast | previous | actual | description | newly_added
|
||||
field_changed: String,
|
||||
old_value: String,
|
||||
new_value: String,
|
||||
@@ -44,6 +45,12 @@ struct ChunkInfo {
|
||||
event_count: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ScrapeResult {
|
||||
changes: Vec<EventChange>,
|
||||
removed_keys: HashSet<String>, // Keys of events that were removed (rescheduled)
|
||||
}
|
||||
|
||||
fn start_chromedriver(port: u16) -> std::process::Child {
|
||||
Command::new("chromedriver-win64/chromedriver.exe")
|
||||
.args(&[format!("--port={}", port)])
|
||||
@@ -269,9 +276,10 @@ fn event_lookup_key(event: &EconomicEvent) -> String {
|
||||
format!("{}|{}|{}", event.date, event.time, event.event)
|
||||
}
|
||||
|
||||
/// Identity key: country|event (for tracking same event across reschedules)
|
||||
/// Identity key: country|event|date (to distinguish recurring monthly/quarterly events)
|
||||
/// This prevents treating December and January releases of the same recurring event as reschedules
|
||||
fn event_identity_key(event: &EconomicEvent) -> String {
|
||||
format!("{}|{}", event.country, event.event)
|
||||
format!("{}|{}|{}", event.country, event.event, event.date)
|
||||
}
|
||||
|
||||
/// Compare two events and detect changes in future data
|
||||
@@ -293,22 +301,23 @@ fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, now: &str) -> Vec<Ev
|
||||
println!(" 📅 Event is in the future - checking for changes...");
|
||||
|
||||
let fields = [
|
||||
("actual", &old.actual, &new.actual),
|
||||
("forecast", &old.forecast, &new.forecast),
|
||||
("previous", &old.previous, &new.previous),
|
||||
("description", &old.description, &new.description),
|
||||
("actual", "actual", &old.actual, &new.actual),
|
||||
("forecast", "forecast", &old.forecast, &new.forecast),
|
||||
("previous", "previous", &old.previous, &new.previous),
|
||||
("description", "description", &old.description, &new.description),
|
||||
];
|
||||
|
||||
for (field_name, old_val, new_val) in fields {
|
||||
for (field_name, change_type, old_val, new_val) in fields {
|
||||
if old_val != new_val {
|
||||
println!(
|
||||
" 🔄 CHANGE DETECTED in '{}': '{}' -> '{}'",
|
||||
" 📝 CHANGE DETECTED in '{}': '{}' -> '{}'",
|
||||
field_name, old_val, new_val
|
||||
);
|
||||
changes.push(EventChange {
|
||||
date: new.date.clone(),
|
||||
event: new.event.clone(),
|
||||
country: new.country.clone(),
|
||||
change_type: change_type.to_string(),
|
||||
field_changed: field_name.to_string(),
|
||||
old_value: old_val.to_string(),
|
||||
new_value: new_val.to_string(),
|
||||
@@ -327,6 +336,7 @@ fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, now: &str) -> Vec<Ev
|
||||
}
|
||||
|
||||
/// Build identity lookup map: finds most recent occurrence of each event by identity
|
||||
/// Identity now includes date to distinguish recurring events (e.g., monthly GDP releases)
|
||||
fn build_identity_lookup(
|
||||
events: &HashMap<String, EconomicEvent>,
|
||||
) -> HashMap<String, (String, EconomicEvent)> {
|
||||
@@ -334,22 +344,26 @@ fn build_identity_lookup(
|
||||
|
||||
for (lookup_key, event) in events {
|
||||
let identity = event_identity_key(event);
|
||||
|
||||
// Keep the most recent occurrence (latest date/time)
|
||||
if let Some((existing_key, existing_event)) = identity_map.get(&identity) {
|
||||
if event.date > existing_event.date
|
||||
|| (event.date == existing_event.date && event.time > existing_event.time)
|
||||
{
|
||||
identity_map.insert(identity.clone(), (lookup_key.clone(), event.clone()));
|
||||
}
|
||||
} else {
|
||||
identity_map.insert(identity, (lookup_key.clone(), event.clone()));
|
||||
}
|
||||
identity_map.insert(identity, (lookup_key.clone(), event.clone()));
|
||||
}
|
||||
|
||||
identity_map
|
||||
}
|
||||
|
||||
/// Build a separate lookup for detecting time-only changes (same date, different time)
|
||||
fn build_date_event_lookup(
|
||||
events: &HashMap<String, EconomicEvent>,
|
||||
) -> HashMap<String, Vec<(String, EconomicEvent)>> {
|
||||
let mut date_event_map: HashMap<String, Vec<(String, EconomicEvent)>> = HashMap::new();
|
||||
|
||||
for (lookup_key, event) in events {
|
||||
let key = format!("{}|{}|{}", event.country, event.event, event.date);
|
||||
date_event_map.entry(key).or_default().push((lookup_key.clone(), event.clone()));
|
||||
}
|
||||
|
||||
date_event_map
|
||||
}
|
||||
|
||||
/// Scan the economic_events directory for existing chunks
|
||||
async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> {
|
||||
let events_dir = PathBuf::from("economic_events");
|
||||
@@ -598,11 +612,12 @@ async fn scrape_and_update(
|
||||
start: &str,
|
||||
end: &str,
|
||||
existing_events: &mut HashMap<String, EconomicEvent>,
|
||||
) -> anyhow::Result<Vec<EventChange>> {
|
||||
) -> anyhow::Result<ScrapeResult> {
|
||||
println!("\n🎯 Scraping range: {} to {}", start, end);
|
||||
|
||||
let mut current_start = start.to_string();
|
||||
let mut all_changes = Vec::new();
|
||||
let mut all_removed_keys = HashSet::new();
|
||||
let now = Local::now()
|
||||
.naive_local()
|
||||
.date()
|
||||
@@ -624,14 +639,15 @@ async fn scrape_and_update(
|
||||
|
||||
println!(" 📦 Fetched {} events", events.len());
|
||||
|
||||
// Build identity lookup for existing events (before processing new batch)
|
||||
// Build lookups for existing events
|
||||
let identity_lookup = build_identity_lookup(existing_events);
|
||||
let date_event_lookup = build_date_event_lookup(existing_events);
|
||||
let mut events_to_remove: Vec<String> = Vec::new();
|
||||
|
||||
// Process events: detect changes and update map
|
||||
let mut new_events_count = 0;
|
||||
let mut updated_events_count = 0;
|
||||
let mut rescheduled_events_count = 0;
|
||||
let mut time_changed_events_count = 0;
|
||||
|
||||
for new_event in events.clone() {
|
||||
let lookup_key = event_lookup_key(&new_event);
|
||||
@@ -650,35 +666,38 @@ async fn scrape_and_update(
|
||||
updated_events_count += 1;
|
||||
}
|
||||
|
||||
// CRITICAL: Always update the event in the map with latest data
|
||||
existing_events.insert(lookup_key, new_event);
|
||||
continue;
|
||||
}
|
||||
|
||||
// CASE B: Check if event was rescheduled (same identity, different date/time)
|
||||
if let Some((old_lookup_key, old_event)) = identity_lookup.get(&identity_key) {
|
||||
if old_lookup_key != &lookup_key {
|
||||
// Event was rescheduled!
|
||||
println!("\n 🔄 RESCHEDULED EVENT DETECTED:");
|
||||
println!(" Event: {}", new_event.event);
|
||||
println!(
|
||||
" Old: {} @ {} | New: {} @ {}",
|
||||
old_event.date, old_event.time, new_event.date, new_event.time
|
||||
);
|
||||
// CASE B: Check if time changed for same date/event
|
||||
let date_event_key = format!("{}|{}|{}", new_event.country, new_event.event, new_event.date);
|
||||
if let Some(existing_occurrences) = date_event_lookup.get(&date_event_key) {
|
||||
// Find if there's an existing event with different time
|
||||
if let Some((old_lookup_key, old_event)) = existing_occurrences.iter()
|
||||
.find(|(key, _)| key != &lookup_key) {
|
||||
|
||||
// Track date/time change
|
||||
println!("\n 🕐 TIME CHANGE DETECTED:");
|
||||
println!(" Event: {}", new_event.event);
|
||||
println!(" Date: {}", new_event.date);
|
||||
println!(" Old time: {} | New time: {}", old_event.time, new_event.time);
|
||||
|
||||
// Track time change
|
||||
if new_event.date.as_str() > now.as_str() {
|
||||
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
|
||||
all_changes.push(EventChange {
|
||||
date: new_event.date.clone(),
|
||||
event: new_event.event.clone(),
|
||||
country: new_event.country.clone(),
|
||||
field_changed: "date_time".to_string(),
|
||||
old_value: format!("{} @ {}", old_event.date, old_event.time),
|
||||
new_value: format!("{} @ {}", new_event.date, new_event.time),
|
||||
change_type: "time".to_string(),
|
||||
field_changed: "time".to_string(),
|
||||
old_value: old_event.time.clone(),
|
||||
new_value: new_event.time.clone(),
|
||||
detected_at: timestamp,
|
||||
});
|
||||
|
||||
println!(" 📝 Date/time change recorded");
|
||||
println!(" 📝 Time change recorded");
|
||||
}
|
||||
|
||||
// Check for other field changes too
|
||||
@@ -693,8 +712,9 @@ async fn scrape_and_update(
|
||||
|
||||
// Remove old occurrence and add new one
|
||||
events_to_remove.push(old_lookup_key.clone());
|
||||
all_removed_keys.insert(old_lookup_key.clone());
|
||||
existing_events.insert(lookup_key, new_event);
|
||||
rescheduled_events_count += 1;
|
||||
time_changed_events_count += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -705,10 +725,26 @@ async fn scrape_and_update(
|
||||
" ➕ New event: {} on {} @ {}",
|
||||
new_event.event, new_event.date, new_event.time
|
||||
);
|
||||
|
||||
// Track as newly added if it's a future event
|
||||
if new_event.date.as_str() > now.as_str() {
|
||||
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
|
||||
all_changes.push(EventChange {
|
||||
date: new_event.date.clone(),
|
||||
event: new_event.event.clone(),
|
||||
country: new_event.country.clone(),
|
||||
change_type: "newly_added".to_string(),
|
||||
field_changed: "new_event".to_string(),
|
||||
old_value: "".to_string(),
|
||||
new_value: format!("{} @ {}", new_event.date, new_event.time),
|
||||
detected_at: timestamp,
|
||||
});
|
||||
}
|
||||
|
||||
existing_events.insert(lookup_key, new_event);
|
||||
}
|
||||
|
||||
// Remove old occurrences of rescheduled events
|
||||
// Remove old occurrences of time-changed events
|
||||
for key in events_to_remove {
|
||||
existing_events.remove(&key);
|
||||
}
|
||||
@@ -716,7 +752,7 @@ async fn scrape_and_update(
|
||||
println!("\n 📊 Batch summary:");
|
||||
println!(" New events: {}", new_events_count);
|
||||
println!(" Updated events: {}", updated_events_count);
|
||||
println!(" Rescheduled events: {}", rescheduled_events_count);
|
||||
println!(" Time changed events: {}", time_changed_events_count);
|
||||
println!(" Changes tracked: {}", all_changes.len());
|
||||
|
||||
let next = match calculate_next_start_date(&events) {
|
||||
@@ -738,8 +774,12 @@ async fn scrape_and_update(
|
||||
|
||||
println!("\n🎯 SCRAPE COMPLETE:");
|
||||
println!(" Total changes detected: {}", all_changes.len());
|
||||
println!(" Total events removed (time changes): {}", all_removed_keys.len());
|
||||
|
||||
Ok(all_changes)
|
||||
Ok(ScrapeResult {
|
||||
changes: all_changes,
|
||||
removed_keys: all_removed_keys,
|
||||
})
|
||||
}
|
||||
|
||||
/// Main logic with intelligent update handling
|
||||
@@ -761,9 +801,9 @@ async fn run_intelligent_update(client: &fantoccini::Client) -> anyhow::Result<(
|
||||
if existing_events.is_empty() {
|
||||
// No existing data - full scrape from beginning
|
||||
println!("\n🔭 No existing data - starting fresh scrape from 2007-02-13");
|
||||
let changes =
|
||||
let result =
|
||||
scrape_and_update(client, "2007-02-13", &target_end, &mut existing_events).await?;
|
||||
save_changes(&changes).await?;
|
||||
save_changes(&result.changes).await?;
|
||||
save_optimized_chunks(existing_events).await?;
|
||||
return Ok(());
|
||||
}
|
||||
@@ -791,35 +831,25 @@ async fn run_intelligent_update(client: &fantoccini::Client) -> anyhow::Result<(
|
||||
"\n📈 Updating from end of existing data: {} to {}",
|
||||
next_start, target_end
|
||||
);
|
||||
let changes =
|
||||
let result =
|
||||
scrape_and_update(client, &next_start, &target_end, &mut existing_events).await?;
|
||||
save_changes(&changes).await?;
|
||||
save_changes(&result.changes).await?;
|
||||
save_optimized_chunks(existing_events).await?;
|
||||
} else if max_date >= now {
|
||||
// Case 2: Data extends to or beyond today, only refresh future data
|
||||
// Case 2: Data extends to or beyond today, refresh future data
|
||||
println!(
|
||||
"\n🔄 Data exists up to today - only refreshing future data: {} to {}",
|
||||
"\n🔄 Data exists up to today - refreshing future data: {} to {}",
|
||||
now, target_end
|
||||
);
|
||||
|
||||
// Create a separate HashMap for only future events to avoid touching past data
|
||||
let mut future_events: HashMap<String, EconomicEvent> = existing_events
|
||||
.iter()
|
||||
.filter(|(_, event)| event.date.as_str() >= now.as_str())
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
// CRITICAL FIX: Pass the actual existing_events HashMap directly
|
||||
// This ensures all updates (including rescheduled events) are properly handled
|
||||
let result = scrape_and_update(client, &now, &target_end, &mut existing_events).await?;
|
||||
|
||||
println!("📋 {} future events will be refreshed", future_events.len());
|
||||
|
||||
// Scrape and update only future events
|
||||
let changes = scrape_and_update(client, &now, &target_end, &mut future_events).await?;
|
||||
save_changes(&changes).await?;
|
||||
|
||||
// Merge future events back into existing events (past data untouched)
|
||||
for (key, event) in future_events {
|
||||
existing_events.insert(key, event);
|
||||
}
|
||||
save_changes(&result.changes).await?;
|
||||
|
||||
// The existing_events HashMap is already updated in-place by scrape_and_update
|
||||
// Just save the optimized chunks
|
||||
save_optimized_chunks(existing_events).await?;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user