diff --git a/src/main.rs b/src/main.rs index ec520a5..504e223 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,8 @@ -use chrono::{NaiveDate, Datelike}; +use chrono::{NaiveDate, Datelike, Local}; use fantoccini::{ClientBuilder, Locator}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; -use std::{path::PathBuf, process::Command}; +use std::{collections::{HashMap, HashSet}, path::PathBuf, process::Command}; use tokio::{ fs, signal, @@ -22,6 +22,17 @@ struct EconomicEvent { description: String, } +#[derive(Debug, Serialize, Deserialize, Clone)] +struct EventChange { + date: String, + event: String, + country: String, + field_changed: String, + old_value: String, + new_value: String, + detected_at: String, +} + #[derive(Debug)] struct ChunkInfo { start_date: String, @@ -254,11 +265,48 @@ fn calculate_next_start_date(events: &[EconomicEvent]) -> anyhow::Result Ok(next.format("%Y-%m-%d").to_string()) } +fn event_key(event: &EconomicEvent) -> String { + format!("{}|{}|{}", event.date, event.time, event.event) +} + +/// Compare two events and detect changes in future data +fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, now: &str) -> Vec { + let mut changes = Vec::new(); + let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); + + // Only track changes for future events + if new.date.as_str() <= now { + return changes; + } + + let fields = [ + ("actual", &old.actual, &new.actual), + ("forecast", &old.forecast, &new.forecast), + ("previous", &old.previous, &new.previous), + ("description", &old.description, &new.description), + ]; + + for (field_name, old_val, new_val) in fields { + if old_val != new_val { + changes.push(EventChange { + date: new.date.clone(), + event: new.event.clone(), + country: new.country.clone(), + field_changed: field_name.to_string(), + old_value: old_val.to_string(), + new_value: new_val.to_string(), + detected_at: timestamp.clone(), + }); + } + } + + changes +} + /// Scan the economic_events directory for existing chunks async fn scan_existing_chunks() -> anyhow::Result> { let events_dir = PathBuf::from("economic_events"); - // Create directory if it doesn't exist if !events_dir.exists() { fs::create_dir_all(&events_dir).await?; println!("šŸ“ Created economic_events directory"); @@ -272,11 +320,9 @@ async fn scan_existing_chunks() -> anyhow::Result> { let path = entry.path(); if path.extension().and_then(|s| s.to_str()) == Some("json") { if let Some(filename) = path.file_stem().and_then(|s| s.to_str()) { - // Parse filename: chunk_{startdate}_{enddate}.json if let Some(dates) = filename.strip_prefix("chunk_") { let parts: Vec<&str> = dates.split('_').collect(); if parts.len() == 2 { - // Load and count events if let Ok(content) = fs::read_to_string(&path).await { if let Ok(events) = serde_json::from_str::>(&content) { chunks.push(ChunkInfo { @@ -302,7 +348,7 @@ async fn scan_existing_chunks() -> anyhow::Result> { chunk.start_date, chunk.end_date, chunk.event_count); } } else { - println!("šŸ“­ No existing chunks found"); + println!("šŸ”­ No existing chunks found"); } Ok(chunks) @@ -310,7 +356,7 @@ async fn scan_existing_chunks() -> anyhow::Result> { /// Calculate target end date: first day of month, 3 months from now fn calculate_target_end_date() -> String { - let now = chrono::Local::now().naive_local().date(); + let now = Local::now().naive_local().date(); let three_months_ahead = if now.month() + 3 > 12 { NaiveDate::from_ymd_opt(now.year() + 1, (now.month() + 3) % 12, 1) } else { @@ -320,82 +366,162 @@ fn calculate_target_end_date() -> String { three_months_ahead.format("%Y-%m-%d").to_string() } -/// Determine what date range needs to be scraped based on existing data -fn determine_scrape_range(chunks: &[ChunkInfo], target_end: &str) -> Option<(String, String)> { - let now = chrono::Local::now().naive_local().date().format("%Y-%m-%d").to_string(); - - if chunks.is_empty() { - // No data exists, start from beginning - println!("šŸ“­ No existing data - scraping from 2007-02-13 to {}", target_end); - return Some(("2007-02-13".to_string(), target_end.to_string())); - } - - // Find the latest date in existing chunks - let latest_chunk_date = chunks.iter() - .map(|c| &c.end_date) - .max() - .cloned() - .unwrap_or_else(|| "2007-02-13".to_string()); - - println!("šŸ“Š Latest existing data: {}", latest_chunk_date); - - if latest_chunk_date >= now { - // Data is ahead of current date - update from now to target - println!("šŸ”„ Data exists beyond today - updating from {} to {}", now, target_end); - Some((now, target_end.to_string())) - } else { - // Data is behind - continue from where it left off - let next_start = parse_date(&latest_chunk_date) - .and_then(|d| d.succ_opt()) - .map(|d| d.format("%Y-%m-%d").to_string()) - .unwrap_or_else(|| latest_chunk_date.clone()); - - println!("āž”ļø Continuing from {} to {}", next_start, target_end); - Some((next_start, target_end.to_string())) - } -} - -/// Save a chunk to disk -async fn save_chunk(events: &[EconomicEvent], start: &str, end: &str) -> anyhow::Result<()> { - let events_dir = PathBuf::from("economic_events"); - fs::create_dir_all(&events_dir).await?; - - let filename = format!("chunk_{}_{}.json", start, end); - let filepath = events_dir.join(&filename); - - let json = serde_json::to_string_pretty(events)?; - fs::write(&filepath, json).await?; - - println!("šŸ’¾ Saved chunk: {} ({} events)", filename, events.len()); - Ok(()) -} - -/// Load all events from existing chunks -async fn load_all_events(chunks: &[ChunkInfo]) -> anyhow::Result> { - let mut all_events = Vec::new(); +/// Load all events from existing chunks into a HashMap +async fn load_existing_events(chunks: &[ChunkInfo]) -> anyhow::Result> { + let mut event_map = HashMap::new(); for chunk in chunks { if let Ok(content) = fs::read_to_string(&chunk.path).await { if let Ok(events) = serde_json::from_str::>(&content) { - all_events.extend(events); + for event in events { + event_map.insert(event_key(&event), event); + } } } } - println!("šŸ“„ Loaded {} events from existing chunks", all_events.len()); - Ok(all_events) + println!("šŸ“„ Loaded {} events from existing chunks", event_map.len()); + Ok(event_map) } -/// Scrape events for a specific date range and save chunks immediately -async fn scrape_date_range( +/// Save or append changes to monthly change files +async fn save_changes(changes: &[EventChange]) -> anyhow::Result<()> { + if changes.is_empty() { + return Ok(()); + } + + let events_dir = PathBuf::from("economic_events"); + fs::create_dir_all(&events_dir).await?; + + // Group changes by month + let mut changes_by_month: HashMap> = HashMap::new(); + + for change in changes { + if let Some(date) = parse_date(&change.date) { + let month_key = format!("{:02}_{}", date.month(), date.year()); + changes_by_month.entry(month_key).or_default().push(change.clone()); + } + } + + // Save each month's changes + for (month_key, month_changes) in changes_by_month { + let filename = format!("event_changes_{}.json", month_key); + let filepath = events_dir.join(&filename); + + // Load existing changes if file exists + let mut all_changes = if filepath.exists() { + let content = fs::read_to_string(&filepath).await?; + serde_json::from_str::>(&content).unwrap_or_default() + } else { + Vec::new() + }; + + // Append new changes + all_changes.extend(month_changes); + + // Save combined changes + let json = serde_json::to_string_pretty(&all_changes)?; + fs::write(&filepath, json).await?; + + println!("šŸ”„ Updated changes file: {} ({} total changes)", filename, all_changes.len()); + } + + Ok(()) +} + +/// Reorganize events into optimal chunks and save them +async fn save_optimized_chunks(events: HashMap) -> anyhow::Result<()> { + if events.is_empty() { + return Ok(()); + } + + let events_dir = PathBuf::from("economic_events"); + fs::create_dir_all(&events_dir).await?; + + // Convert to sorted vector + let mut all_events: Vec = events.into_values().collect(); + all_events.sort_by(|a, b| a.date.cmp(&b.date)); + + // Group events by date ranges (chunks of ~100 days or similar) + let mut chunks: Vec> = Vec::new(); + let mut current_chunk = Vec::new(); + let mut current_start_date: Option = None; + + for event in all_events { + if let Some(event_date) = parse_date(&event.date) { + if let Some(start) = current_start_date { + // Start new chunk if we've gone 100+ days or have 500+ events + if (event_date - start).num_days() > 100 || current_chunk.len() >= 500 { + chunks.push(current_chunk); + current_chunk = Vec::new(); + current_start_date = Some(event_date); + } + } else { + current_start_date = Some(event_date); + } + current_chunk.push(event); + } + } + + if !current_chunk.is_empty() { + chunks.push(current_chunk); + } + + // Delete old chunk files + let mut entries = fs::read_dir(&events_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if let Some(filename) = path.file_stem().and_then(|s| s.to_str()) { + if filename.starts_with("chunk_") { + fs::remove_file(&path).await?; + } + } + } + + // Save new optimized chunks + for chunk in chunks { + if chunk.is_empty() { + continue; + } + + let start = chunk.iter() + .filter_map(|e| parse_date(&e.date)) + .min() + .unwrap() + .format("%Y-%m-%d") + .to_string(); + + let end = chunk.iter() + .filter_map(|e| parse_date(&e.date)) + .max() + .unwrap() + .format("%Y-%m-%d") + .to_string(); + + let filename = format!("chunk_{}_{}.json", start, end); + let filepath = events_dir.join(&filename); + + let json = serde_json::to_string_pretty(&chunk)?; + fs::write(&filepath, json).await?; + + println!("šŸ’¾ Saved optimized chunk: {} ({} events)", filename, chunk.len()); + } + + Ok(()) +} + +/// Scrape and update data with change tracking +async fn scrape_and_update( client: &fantoccini::Client, start: &str, end: &str, -) -> anyhow::Result<()> { + existing_events: &mut HashMap, +) -> anyhow::Result> { println!("\nšŸŽÆ Scraping range: {} to {}", start, end); let mut current_start = start.to_string(); - let mut chunk_number = 0; + let mut all_changes = Vec::new(); + let now = Local::now().naive_local().date().format("%Y-%m-%d").to_string(); loop { set_date_range(client, ¤t_start, end).await?; @@ -407,24 +533,24 @@ async fn scrape_date_range( break; } - chunk_number += 1; println!(" šŸ“¦ Fetched {} events", events.len()); - // Calculate actual date range of this chunk - let chunk_start = events.iter() - .filter_map(|e| parse_date(&e.date)) - .min() - .map(|d| d.format("%Y-%m-%d").to_string()) - .unwrap_or_else(|| current_start.clone()); + // Process events: detect changes and update map + for new_event in events.clone() { + let key = event_key(&new_event); - let chunk_end = events.iter() - .filter_map(|e| parse_date(&e.date)) - .max() - .map(|d| d.format("%Y-%m-%d").to_string()) - .unwrap_or_else(|| end.to_string()); - - // Save chunk immediately - save_chunk(&events, &chunk_start, &chunk_end).await?; + if let Some(old_event) = existing_events.get(&key) { + // Event exists - check for changes + let changes = detect_changes(old_event, &new_event, &now); + if !changes.is_empty() { + println!(" šŸ”„ Detected {} change(s) in: {}", changes.len(), new_event.event); + all_changes.extend(changes); + } + } + + // Insert or update event + existing_events.insert(key, new_event); + } let next = match calculate_next_start_date(&events) { Ok(n) => n, @@ -443,31 +569,78 @@ async fn scrape_date_range( sleep(Duration::from_secs(2)).await; } - Ok(()) + Ok(all_changes) } -/// Main scraping logic with persistent storage -async fn scrape_with_persistence( - client: &fantoccini::Client, -) -> anyhow::Result<()> { - // Calculate target end date (3 months ahead, 1st of month) +/// Main logic with intelligent update handling +async fn run_intelligent_update(client: &fantoccini::Client) -> anyhow::Result<()> { + let now = Local::now().naive_local().date().format("%Y-%m-%d").to_string(); let target_end = calculate_target_end_date(); + + println!("šŸ“… Today: {}", now); println!("šŸŽÆ Target end date: {}", target_end); - // Scan for existing chunks - let existing_chunks = scan_existing_chunks().await?; + // Load existing chunks + let chunks = scan_existing_chunks().await?; + let mut existing_events = load_existing_events(&chunks).await?; - // Determine what range needs to be scraped - let scrape_range = determine_scrape_range(&existing_chunks, &target_end); - - if let Some((start, end)) = scrape_range { - // Scrape the needed range (saves chunks automatically) - scrape_date_range(client, &start, &end).await?; - println!("\nāœ… Scraping complete!"); - } else { - println!("āœ… All data is up to date!"); + if existing_events.is_empty() { + // No existing data - full scrape from beginning + println!("\nšŸ”­ No existing data - starting fresh scrape from 2007-02-13"); + let changes = scrape_and_update(client, "2007-02-13", &target_end, &mut existing_events).await?; + save_changes(&changes).await?; + save_optimized_chunks(existing_events).await?; + return Ok(()); } - + + // Find date range of existing data + let dates: Vec = existing_events.values() + .filter_map(|e| parse_date(&e.date)) + .collect(); + + let min_date = dates.iter().min().unwrap().format("%Y-%m-%d").to_string(); + let max_date = dates.iter().max().unwrap().format("%Y-%m-%d").to_string(); + + println!("šŸ“Š Existing data range: {} to {}", min_date, max_date); + + // Determine update strategy + if max_date < now { + // Case 1: Data is in the past, need to update from max_date to target + let next_start = parse_date(&max_date) + .and_then(|d| d.succ_opt()) + .map(|d| d.format("%Y-%m-%d").to_string()) + .unwrap_or(max_date); + + println!("\nšŸ“ˆ Updating from end of existing data: {} to {}", next_start, target_end); + let changes = scrape_and_update(client, &next_start, &target_end, &mut existing_events).await?; + save_changes(&changes).await?; + save_optimized_chunks(existing_events).await?; + } else if max_date >= now { + // Case 2: Data extends to or beyond today, only refresh future data + println!("\nšŸ”„ Data exists up to today - only refreshing future data: {} to {}", now, target_end); + + // Create a separate HashMap for only future events to avoid touching past data + let mut future_events: HashMap = existing_events + .iter() + .filter(|(_, event)| event.date.as_str() >= now.as_str()) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + println!("šŸ“‹ {} future events will be refreshed", future_events.len()); + + // Scrape and update only future events + let changes = scrape_and_update(client, &now, &target_end, &mut future_events).await?; + save_changes(&changes).await?; + + // Merge future events back into existing events (past data untouched) + for (key, event) in future_events { + existing_events.insert(key, event); + } + + save_optimized_chunks(existing_events).await?; + } + + println!("\nāœ… Update complete!"); Ok(()) } @@ -528,28 +701,16 @@ async fn main() -> anyhow::Result<()> { sleep(Duration::from_secs(2)).await; } - // Run persistent scraping - scrape_with_persistence(&client).await?; + // Run intelligent update + run_intelligent_update(&client).await?; - // Load and display summary + // Display final summary let chunks = scan_existing_chunks().await?; - let all_events = load_all_events(&chunks).await?; + let final_events = load_existing_events(&chunks).await?; println!("\nšŸ“Š FINAL SUMMARY:"); println!(" • Total chunks: {}", chunks.len()); - println!(" • Total events: {}", all_events.len()); - - if !chunks.is_empty() { - let dates: Vec = all_events.iter() - .filter_map(|e| parse_date(&e.date)) - .map(|d| d.format("%Y-%m-%d").to_string()) - .collect(); - if !dates.is_empty() { - let min = dates.iter().min().unwrap(); - let max = dates.iter().max().unwrap(); - println!(" • Date range: {} to {}", min, max); - } - } + println!(" • Total events: {}", final_events.len()); client.close().await?; chromedriver.kill()?;