Compare commits
2 Commits
0ca53bf585
...
0ea3fcc3b5
| Author | SHA1 | Date | |
|---|---|---|---|
| 0ea3fcc3b5 | |||
| 71df92965f |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -24,3 +24,4 @@ target/
|
|||||||
|
|
||||||
/chromedriver-win64/*
|
/chromedriver-win64/*
|
||||||
/economic_events*
|
/economic_events*
|
||||||
|
/economic_event_changes*
|
||||||
154
src/main.rs
154
src/main.rs
@@ -30,6 +30,7 @@ struct EventChange {
|
|||||||
date: String,
|
date: String,
|
||||||
event: String,
|
event: String,
|
||||||
country: String,
|
country: String,
|
||||||
|
change_type: String, // date | time | forecast | previous | actual | description | newly_added
|
||||||
field_changed: String,
|
field_changed: String,
|
||||||
old_value: String,
|
old_value: String,
|
||||||
new_value: String,
|
new_value: String,
|
||||||
@@ -44,6 +45,12 @@ struct ChunkInfo {
|
|||||||
event_count: usize,
|
event_count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct ScrapeResult {
|
||||||
|
changes: Vec<EventChange>,
|
||||||
|
removed_keys: HashSet<String>, // Keys of events that were removed (rescheduled)
|
||||||
|
}
|
||||||
|
|
||||||
fn start_chromedriver(port: u16) -> std::process::Child {
|
fn start_chromedriver(port: u16) -> std::process::Child {
|
||||||
Command::new("chromedriver-win64/chromedriver.exe")
|
Command::new("chromedriver-win64/chromedriver.exe")
|
||||||
.args(&[format!("--port={}", port)])
|
.args(&[format!("--port={}", port)])
|
||||||
@@ -269,9 +276,10 @@ fn event_lookup_key(event: &EconomicEvent) -> String {
|
|||||||
format!("{}|{}|{}", event.date, event.time, event.event)
|
format!("{}|{}|{}", event.date, event.time, event.event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Identity key: country|event (for tracking same event across reschedules)
|
/// Identity key: country|event|date (to distinguish recurring monthly/quarterly events)
|
||||||
|
/// This prevents treating December and January releases of the same recurring event as reschedules
|
||||||
fn event_identity_key(event: &EconomicEvent) -> String {
|
fn event_identity_key(event: &EconomicEvent) -> String {
|
||||||
format!("{}|{}", event.country, event.event)
|
format!("{}|{}|{}", event.country, event.event, event.date)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compare two events and detect changes in future data
|
/// Compare two events and detect changes in future data
|
||||||
@@ -293,22 +301,23 @@ fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, now: &str) -> Vec<Ev
|
|||||||
println!(" 📅 Event is in the future - checking for changes...");
|
println!(" 📅 Event is in the future - checking for changes...");
|
||||||
|
|
||||||
let fields = [
|
let fields = [
|
||||||
("actual", &old.actual, &new.actual),
|
("actual", "actual", &old.actual, &new.actual),
|
||||||
("forecast", &old.forecast, &new.forecast),
|
("forecast", "forecast", &old.forecast, &new.forecast),
|
||||||
("previous", &old.previous, &new.previous),
|
("previous", "previous", &old.previous, &new.previous),
|
||||||
("description", &old.description, &new.description),
|
("description", "description", &old.description, &new.description),
|
||||||
];
|
];
|
||||||
|
|
||||||
for (field_name, old_val, new_val) in fields {
|
for (field_name, change_type, old_val, new_val) in fields {
|
||||||
if old_val != new_val {
|
if old_val != new_val {
|
||||||
println!(
|
println!(
|
||||||
" 🔄 CHANGE DETECTED in '{}': '{}' -> '{}'",
|
" 📝 CHANGE DETECTED in '{}': '{}' -> '{}'",
|
||||||
field_name, old_val, new_val
|
field_name, old_val, new_val
|
||||||
);
|
);
|
||||||
changes.push(EventChange {
|
changes.push(EventChange {
|
||||||
date: new.date.clone(),
|
date: new.date.clone(),
|
||||||
event: new.event.clone(),
|
event: new.event.clone(),
|
||||||
country: new.country.clone(),
|
country: new.country.clone(),
|
||||||
|
change_type: change_type.to_string(),
|
||||||
field_changed: field_name.to_string(),
|
field_changed: field_name.to_string(),
|
||||||
old_value: old_val.to_string(),
|
old_value: old_val.to_string(),
|
||||||
new_value: new_val.to_string(),
|
new_value: new_val.to_string(),
|
||||||
@@ -327,6 +336,7 @@ fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, now: &str) -> Vec<Ev
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Build identity lookup map: finds most recent occurrence of each event by identity
|
/// Build identity lookup map: finds most recent occurrence of each event by identity
|
||||||
|
/// Identity now includes date to distinguish recurring events (e.g., monthly GDP releases)
|
||||||
fn build_identity_lookup(
|
fn build_identity_lookup(
|
||||||
events: &HashMap<String, EconomicEvent>,
|
events: &HashMap<String, EconomicEvent>,
|
||||||
) -> HashMap<String, (String, EconomicEvent)> {
|
) -> HashMap<String, (String, EconomicEvent)> {
|
||||||
@@ -334,22 +344,26 @@ fn build_identity_lookup(
|
|||||||
|
|
||||||
for (lookup_key, event) in events {
|
for (lookup_key, event) in events {
|
||||||
let identity = event_identity_key(event);
|
let identity = event_identity_key(event);
|
||||||
|
|
||||||
// Keep the most recent occurrence (latest date/time)
|
|
||||||
if let Some((existing_key, existing_event)) = identity_map.get(&identity) {
|
|
||||||
if event.date > existing_event.date
|
|
||||||
|| (event.date == existing_event.date && event.time > existing_event.time)
|
|
||||||
{
|
|
||||||
identity_map.insert(identity.clone(), (lookup_key.clone(), event.clone()));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
identity_map.insert(identity, (lookup_key.clone(), event.clone()));
|
identity_map.insert(identity, (lookup_key.clone(), event.clone()));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
identity_map
|
identity_map
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build a separate lookup for detecting time-only changes (same date, different time)
|
||||||
|
fn build_date_event_lookup(
|
||||||
|
events: &HashMap<String, EconomicEvent>,
|
||||||
|
) -> HashMap<String, Vec<(String, EconomicEvent)>> {
|
||||||
|
let mut date_event_map: HashMap<String, Vec<(String, EconomicEvent)>> = HashMap::new();
|
||||||
|
|
||||||
|
for (lookup_key, event) in events {
|
||||||
|
let key = format!("{}|{}|{}", event.country, event.event, event.date);
|
||||||
|
date_event_map.entry(key).or_default().push((lookup_key.clone(), event.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
|
date_event_map
|
||||||
|
}
|
||||||
|
|
||||||
/// Scan the economic_events directory for existing chunks
|
/// Scan the economic_events directory for existing chunks
|
||||||
async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> {
|
async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> {
|
||||||
let events_dir = PathBuf::from("economic_events");
|
let events_dir = PathBuf::from("economic_events");
|
||||||
@@ -598,11 +612,12 @@ async fn scrape_and_update(
|
|||||||
start: &str,
|
start: &str,
|
||||||
end: &str,
|
end: &str,
|
||||||
existing_events: &mut HashMap<String, EconomicEvent>,
|
existing_events: &mut HashMap<String, EconomicEvent>,
|
||||||
) -> anyhow::Result<Vec<EventChange>> {
|
) -> anyhow::Result<ScrapeResult> {
|
||||||
println!("\n🎯 Scraping range: {} to {}", start, end);
|
println!("\n🎯 Scraping range: {} to {}", start, end);
|
||||||
|
|
||||||
let mut current_start = start.to_string();
|
let mut current_start = start.to_string();
|
||||||
let mut all_changes = Vec::new();
|
let mut all_changes = Vec::new();
|
||||||
|
let mut all_removed_keys = HashSet::new();
|
||||||
let now = Local::now()
|
let now = Local::now()
|
||||||
.naive_local()
|
.naive_local()
|
||||||
.date()
|
.date()
|
||||||
@@ -624,14 +639,15 @@ async fn scrape_and_update(
|
|||||||
|
|
||||||
println!(" 📦 Fetched {} events", events.len());
|
println!(" 📦 Fetched {} events", events.len());
|
||||||
|
|
||||||
// Build identity lookup for existing events (before processing new batch)
|
// Build lookups for existing events
|
||||||
let identity_lookup = build_identity_lookup(existing_events);
|
let identity_lookup = build_identity_lookup(existing_events);
|
||||||
|
let date_event_lookup = build_date_event_lookup(existing_events);
|
||||||
let mut events_to_remove: Vec<String> = Vec::new();
|
let mut events_to_remove: Vec<String> = Vec::new();
|
||||||
|
|
||||||
// Process events: detect changes and update map
|
// Process events: detect changes and update map
|
||||||
let mut new_events_count = 0;
|
let mut new_events_count = 0;
|
||||||
let mut updated_events_count = 0;
|
let mut updated_events_count = 0;
|
||||||
let mut rescheduled_events_count = 0;
|
let mut time_changed_events_count = 0;
|
||||||
|
|
||||||
for new_event in events.clone() {
|
for new_event in events.clone() {
|
||||||
let lookup_key = event_lookup_key(&new_event);
|
let lookup_key = event_lookup_key(&new_event);
|
||||||
@@ -650,35 +666,38 @@ async fn scrape_and_update(
|
|||||||
updated_events_count += 1;
|
updated_events_count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CRITICAL: Always update the event in the map with latest data
|
||||||
existing_events.insert(lookup_key, new_event);
|
existing_events.insert(lookup_key, new_event);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// CASE B: Check if event was rescheduled (same identity, different date/time)
|
// CASE B: Check if time changed for same date/event
|
||||||
if let Some((old_lookup_key, old_event)) = identity_lookup.get(&identity_key) {
|
let date_event_key = format!("{}|{}|{}", new_event.country, new_event.event, new_event.date);
|
||||||
if old_lookup_key != &lookup_key {
|
if let Some(existing_occurrences) = date_event_lookup.get(&date_event_key) {
|
||||||
// Event was rescheduled!
|
// Find if there's an existing event with different time
|
||||||
println!("\n 🔄 RESCHEDULED EVENT DETECTED:");
|
if let Some((old_lookup_key, old_event)) = existing_occurrences.iter()
|
||||||
println!(" Event: {}", new_event.event);
|
.find(|(key, _)| key != &lookup_key) {
|
||||||
println!(
|
|
||||||
" Old: {} @ {} | New: {} @ {}",
|
|
||||||
old_event.date, old_event.time, new_event.date, new_event.time
|
|
||||||
);
|
|
||||||
|
|
||||||
// Track date/time change
|
println!("\n 🕐 TIME CHANGE DETECTED:");
|
||||||
|
println!(" Event: {}", new_event.event);
|
||||||
|
println!(" Date: {}", new_event.date);
|
||||||
|
println!(" Old time: {} | New time: {}", old_event.time, new_event.time);
|
||||||
|
|
||||||
|
// Track time change
|
||||||
if new_event.date.as_str() > now.as_str() {
|
if new_event.date.as_str() > now.as_str() {
|
||||||
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
|
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
|
||||||
all_changes.push(EventChange {
|
all_changes.push(EventChange {
|
||||||
date: new_event.date.clone(),
|
date: new_event.date.clone(),
|
||||||
event: new_event.event.clone(),
|
event: new_event.event.clone(),
|
||||||
country: new_event.country.clone(),
|
country: new_event.country.clone(),
|
||||||
field_changed: "date_time".to_string(),
|
change_type: "time".to_string(),
|
||||||
old_value: format!("{} @ {}", old_event.date, old_event.time),
|
field_changed: "time".to_string(),
|
||||||
new_value: format!("{} @ {}", new_event.date, new_event.time),
|
old_value: old_event.time.clone(),
|
||||||
|
new_value: new_event.time.clone(),
|
||||||
detected_at: timestamp,
|
detected_at: timestamp,
|
||||||
});
|
});
|
||||||
|
|
||||||
println!(" 📝 Date/time change recorded");
|
println!(" 📝 Time change recorded");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for other field changes too
|
// Check for other field changes too
|
||||||
@@ -693,8 +712,9 @@ async fn scrape_and_update(
|
|||||||
|
|
||||||
// Remove old occurrence and add new one
|
// Remove old occurrence and add new one
|
||||||
events_to_remove.push(old_lookup_key.clone());
|
events_to_remove.push(old_lookup_key.clone());
|
||||||
|
all_removed_keys.insert(old_lookup_key.clone());
|
||||||
existing_events.insert(lookup_key, new_event);
|
existing_events.insert(lookup_key, new_event);
|
||||||
rescheduled_events_count += 1;
|
time_changed_events_count += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -705,10 +725,26 @@ async fn scrape_and_update(
|
|||||||
" ➕ New event: {} on {} @ {}",
|
" ➕ New event: {} on {} @ {}",
|
||||||
new_event.event, new_event.date, new_event.time
|
new_event.event, new_event.date, new_event.time
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Track as newly added if it's a future event
|
||||||
|
if new_event.date.as_str() > now.as_str() {
|
||||||
|
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
|
||||||
|
all_changes.push(EventChange {
|
||||||
|
date: new_event.date.clone(),
|
||||||
|
event: new_event.event.clone(),
|
||||||
|
country: new_event.country.clone(),
|
||||||
|
change_type: "newly_added".to_string(),
|
||||||
|
field_changed: "new_event".to_string(),
|
||||||
|
old_value: "".to_string(),
|
||||||
|
new_value: format!("{} @ {}", new_event.date, new_event.time),
|
||||||
|
detected_at: timestamp,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
existing_events.insert(lookup_key, new_event);
|
existing_events.insert(lookup_key, new_event);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove old occurrences of rescheduled events
|
// Remove old occurrences of time-changed events
|
||||||
for key in events_to_remove {
|
for key in events_to_remove {
|
||||||
existing_events.remove(&key);
|
existing_events.remove(&key);
|
||||||
}
|
}
|
||||||
@@ -716,7 +752,7 @@ async fn scrape_and_update(
|
|||||||
println!("\n 📊 Batch summary:");
|
println!("\n 📊 Batch summary:");
|
||||||
println!(" New events: {}", new_events_count);
|
println!(" New events: {}", new_events_count);
|
||||||
println!(" Updated events: {}", updated_events_count);
|
println!(" Updated events: {}", updated_events_count);
|
||||||
println!(" Rescheduled events: {}", rescheduled_events_count);
|
println!(" Time changed events: {}", time_changed_events_count);
|
||||||
println!(" Changes tracked: {}", all_changes.len());
|
println!(" Changes tracked: {}", all_changes.len());
|
||||||
|
|
||||||
let next = match calculate_next_start_date(&events) {
|
let next = match calculate_next_start_date(&events) {
|
||||||
@@ -738,8 +774,12 @@ async fn scrape_and_update(
|
|||||||
|
|
||||||
println!("\n🎯 SCRAPE COMPLETE:");
|
println!("\n🎯 SCRAPE COMPLETE:");
|
||||||
println!(" Total changes detected: {}", all_changes.len());
|
println!(" Total changes detected: {}", all_changes.len());
|
||||||
|
println!(" Total events removed (time changes): {}", all_removed_keys.len());
|
||||||
|
|
||||||
Ok(all_changes)
|
Ok(ScrapeResult {
|
||||||
|
changes: all_changes,
|
||||||
|
removed_keys: all_removed_keys,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Main logic with intelligent update handling
|
/// Main logic with intelligent update handling
|
||||||
@@ -761,9 +801,9 @@ async fn run_intelligent_update(client: &fantoccini::Client) -> anyhow::Result<(
|
|||||||
if existing_events.is_empty() {
|
if existing_events.is_empty() {
|
||||||
// No existing data - full scrape from beginning
|
// No existing data - full scrape from beginning
|
||||||
println!("\n🔭 No existing data - starting fresh scrape from 2007-02-13");
|
println!("\n🔭 No existing data - starting fresh scrape from 2007-02-13");
|
||||||
let changes =
|
let result =
|
||||||
scrape_and_update(client, "2007-02-13", &target_end, &mut existing_events).await?;
|
scrape_and_update(client, "2007-02-13", &target_end, &mut existing_events).await?;
|
||||||
save_changes(&changes).await?;
|
save_changes(&result.changes).await?;
|
||||||
save_optimized_chunks(existing_events).await?;
|
save_optimized_chunks(existing_events).await?;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@@ -791,35 +831,25 @@ async fn run_intelligent_update(client: &fantoccini::Client) -> anyhow::Result<(
|
|||||||
"\n📈 Updating from end of existing data: {} to {}",
|
"\n📈 Updating from end of existing data: {} to {}",
|
||||||
next_start, target_end
|
next_start, target_end
|
||||||
);
|
);
|
||||||
let changes =
|
let result =
|
||||||
scrape_and_update(client, &next_start, &target_end, &mut existing_events).await?;
|
scrape_and_update(client, &next_start, &target_end, &mut existing_events).await?;
|
||||||
save_changes(&changes).await?;
|
save_changes(&result.changes).await?;
|
||||||
save_optimized_chunks(existing_events).await?;
|
save_optimized_chunks(existing_events).await?;
|
||||||
} else if max_date >= now {
|
} else if max_date >= now {
|
||||||
// Case 2: Data extends to or beyond today, only refresh future data
|
// Case 2: Data extends to or beyond today, refresh future data
|
||||||
println!(
|
println!(
|
||||||
"\n🔄 Data exists up to today - only refreshing future data: {} to {}",
|
"\n🔄 Data exists up to today - refreshing future data: {} to {}",
|
||||||
now, target_end
|
now, target_end
|
||||||
);
|
);
|
||||||
|
|
||||||
// Create a separate HashMap for only future events to avoid touching past data
|
// CRITICAL FIX: Pass the actual existing_events HashMap directly
|
||||||
let mut future_events: HashMap<String, EconomicEvent> = existing_events
|
// This ensures all updates (including rescheduled events) are properly handled
|
||||||
.iter()
|
let result = scrape_and_update(client, &now, &target_end, &mut existing_events).await?;
|
||||||
.filter(|(_, event)| event.date.as_str() >= now.as_str())
|
|
||||||
.map(|(k, v)| (k.clone(), v.clone()))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
println!("📋 {} future events will be refreshed", future_events.len());
|
save_changes(&result.changes).await?;
|
||||||
|
|
||||||
// Scrape and update only future events
|
|
||||||
let changes = scrape_and_update(client, &now, &target_end, &mut future_events).await?;
|
|
||||||
save_changes(&changes).await?;
|
|
||||||
|
|
||||||
// Merge future events back into existing events (past data untouched)
|
|
||||||
for (key, event) in future_events {
|
|
||||||
existing_events.insert(key, event);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// The existing_events HashMap is already updated in-place by scrape_and_update
|
||||||
|
// Just save the optimized chunks
|
||||||
save_optimized_chunks(existing_events).await?;
|
save_optimized_chunks(existing_events).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user