fixed force update for new data and changes without new_event change
This commit is contained in:
@@ -50,7 +50,6 @@ pub fn detect_changes(old: &EconomicEvent, new: &EconomicEvent, today: &str) ->
|
||||
date: new.date.clone(),
|
||||
event: new.event.clone(),
|
||||
country: new.country.clone(),
|
||||
change_type: field.to_string(),
|
||||
field_changed: field.to_string(),
|
||||
old_value: old_val.clone(),
|
||||
new_value: new_val.clone(),
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
// src/economic/storage.rs
|
||||
use super::types::*;
|
||||
use super::helpers::*;
|
||||
use tokio::fs;
|
||||
use chrono::{Local, NaiveDate, Datelike};
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
|
||||
pub async fn scan_existing_chunks() -> anyhow::Result<Vec<ChunkInfo>> {
|
||||
let dir = std::path::Path::new("economic_events");
|
||||
@@ -49,21 +49,29 @@ pub async fn save_optimized_chunks(events: HashMap<String, EconomicEvent>) -> an
|
||||
let dir = std::path::Path::new("economic_events");
|
||||
fs::create_dir_all(dir).await?;
|
||||
|
||||
// Delete all old chunk files to prevent duplicates and overlaps
|
||||
println!("Removing old chunks...");
|
||||
|
||||
let mut entries = fs::read_dir(dir).await?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
|
||||
if name.starts_with("chunk_") && path.extension().map(|e| e == "json").unwrap_or(false) {
|
||||
fs::remove_file(&path).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut sorted: Vec<_> = events.into_values().collect();
|
||||
sorted.sort_by_key(|e| e.date.clone());
|
||||
|
||||
let mut chunk = Vec::new();
|
||||
let mut start: Option<NaiveDate> = None;
|
||||
let mut chunk: Vec<EconomicEvent> = Vec::new();
|
||||
const MAX_EVENTS_PER_CHUNK: usize = ( 30000 / 2 ) / 11; // (30000 - 2) / 11 = 2727
|
||||
|
||||
for e in sorted {
|
||||
let date = NaiveDate::parse_from_str(&e.date, "%Y-%m-%d")?;
|
||||
if let Some(s) = start {
|
||||
if (date - s).num_days() > 100 || chunk.len() >= 500 {
|
||||
save_chunk(&chunk, dir).await?;
|
||||
chunk.clear();
|
||||
start = Some(date);
|
||||
}
|
||||
} else {
|
||||
start = Some(date);
|
||||
if !chunk.is_empty() && chunk.len() >= MAX_EVENTS_PER_CHUNK {
|
||||
save_chunk(&chunk, dir).await?;
|
||||
chunk.clear();
|
||||
}
|
||||
chunk.push(e);
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ pub struct EventChange {
|
||||
pub date: String,
|
||||
pub event: String,
|
||||
pub country: String,
|
||||
pub change_type: String, // actual|forecast|time|newly_added|removed
|
||||
pub field_changed: String,
|
||||
pub old_value: String,
|
||||
pub new_value: String,
|
||||
|
||||
@@ -84,7 +84,6 @@ pub fn process_batch(
|
||||
date: new.date.clone(),
|
||||
event: new.event.clone(),
|
||||
country: new.country.clone(),
|
||||
change_type: "time".to_string(),
|
||||
field_changed: "time".to_string(),
|
||||
old_value: old_event.time.clone(),
|
||||
new_value: new.time.clone(),
|
||||
@@ -96,19 +95,6 @@ pub fn process_batch(
|
||||
}
|
||||
}
|
||||
|
||||
if new.date.as_str() > today {
|
||||
changes.push(EventChange {
|
||||
date: new.date.clone(),
|
||||
event: new.event.clone(),
|
||||
country: new.country.clone(),
|
||||
change_type: "newly_added".to_string(),
|
||||
field_changed: "new_event".to_string(),
|
||||
old_value: "".to_string(),
|
||||
new_value: format!("{} @ {}", new.date, new.time),
|
||||
detected_at: Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
existing.insert(key, new.clone());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user