// src/corporate/atomic_writer.rs // // Atomic JSONL writer that prevents partial/corrupted results from being written use anyhow::Result; use serde::Serialize; use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc; /// Command to write or validate data #[derive(Debug)] pub enum WriteCommand { /// Stage a result for writing (held in memory until committed) Stage { id: String, data: T }, /// Commit staged result to disk (atomic write) Commit { id: String }, /// Rollback staged result (discard without writing) Rollback { id: String }, /// Commit all pending staged results and flush CommitAll, /// Shutdown writer gracefully (only commits valid staged results) Shutdown, } /// Result of a write operation #[derive(Debug)] pub struct WriteResult { pub id: String, pub success: bool, pub error: Option, } /// Atomic writer that prevents partial results from being written pub struct AtomicJsonlWriter { file: File, staged: HashMap, committed_count: usize, rollback_count: usize, } impl AtomicJsonlWriter { pub async fn new(path: PathBuf) -> Result { // Ensure parent directory exists if let Some(parent) = path.parent() { tokio::fs::create_dir_all(parent).await?; } let file = OpenOptions::new() .create(true) .append(true) .open(&path) .await?; crate::util::logger::log_info(&format!( "Atomic writer initialized: {:?}", path )).await; Ok(Self { file, staged: HashMap::new(), committed_count: 0, rollback_count: 0, }) } /// Stage data for writing (held in memory, not yet written) pub async fn stage(&mut self, id: String, data: T) { crate::util::logger::log_info(&format!( "Staging result for: {} (total staged: {})", id, self.staged.len() + 1 )).await; self.staged.insert(id, data); } /// Commit a staged result to disk (atomic write) pub async fn commit(&mut self, id: &str) -> Result<()> { if let Some(data) = self.staged.remove(id) { // Serialize to JSON let json_line = serde_json::to_string(&data)?; // Write atomically (single syscall) self.file.write_all(json_line.as_bytes()).await?; self.file.write_all(b"\n").await?; self.file.flush().await?; self.committed_count += 1; crate::util::logger::log_info(&format!( "✓ Committed result for: {} (total committed: {})", id, self.committed_count )).await; Ok(()) } else { Err(anyhow::anyhow!("No staged result found for id: {}", id)) } } /// Rollback a staged result (discard without writing) pub async fn rollback(&mut self, id: &str) { if self.staged.remove(id).is_some() { self.rollback_count += 1; crate::util::logger::log_warn(&format!( "⚠ Rolled back result for: {} (total rollbacks: {})", id, self.rollback_count )).await; } } /// Commit all staged results pub async fn commit_all(&mut self) -> Result { let ids: Vec = self.staged.keys().cloned().collect(); let mut committed = 0; for id in ids { if let Ok(()) = self.commit(&id).await { committed += 1; } } Ok(committed) } /// Rollback all staged results (discard everything) pub async fn rollback_all(&mut self) -> usize { let count = self.staged.len(); self.staged.clear(); self.rollback_count += count; crate::util::logger::log_warn(&format!( "⚠ Rolled back all {} staged results", count )).await; count } /// Get statistics pub fn stats(&self) -> WriterStats { WriterStats { staged_count: self.staged.len(), committed_count: self.committed_count, rollback_count: self.rollback_count, } } } #[derive(Debug, Clone)] pub struct WriterStats { pub staged_count: usize, pub committed_count: usize, pub rollback_count: usize, } /// Managed writer service that runs in its own task pub struct AtomicWriterService { rx: mpsc::UnboundedReceiver>, writer: AtomicJsonlWriter, shutdown_flag: Arc, } impl AtomicWriterService { pub async fn new( path: PathBuf, rx: mpsc::UnboundedReceiver>, shutdown_flag: Arc, ) -> Result { let writer = AtomicJsonlWriter::new(path).await?; Ok(Self { rx, writer, shutdown_flag, }) } /// Main service loop pub async fn run(mut self) { crate::util::logger::log_info("Atomic writer service started").await; while let Some(cmd) = self.rx.recv().await { // Check for shutdown flag if self.shutdown_flag.load(Ordering::SeqCst) { crate::util::logger::log_warn( "Shutdown detected - processing only Commit/Rollback commands" ).await; // Only process commit/rollback commands during shutdown match cmd { WriteCommand::Commit { id } => { if let Err(e) = self.writer.commit(&id).await { crate::util::logger::log_error(&format!( "Failed to commit {}: {}", id, e )).await; } } WriteCommand::Rollback { id } => { self.writer.rollback(&id).await; } WriteCommand::CommitAll => { match self.writer.commit_all().await { Ok(count) => { crate::util::logger::log_info(&format!( "Committed {} results during shutdown", count )).await; } Err(e) => { crate::util::logger::log_error(&format!( "Failed to commit all: {}", e )).await; } } } WriteCommand::Shutdown => break, _ => { // Ignore Stage commands during shutdown crate::util::logger::log_warn( "Ignoring new Stage command during shutdown" ).await; } } continue; } // Normal operation match cmd { WriteCommand::Stage { id, data } => { self.writer.stage(id, data).await; } WriteCommand::Commit { id } => { if let Err(e) = self.writer.commit(&id).await { crate::util::logger::log_error(&format!( "Failed to commit {}: {}", id, e )).await; } } WriteCommand::Rollback { id } => { self.writer.rollback(&id).await; } WriteCommand::CommitAll => { match self.writer.commit_all().await { Ok(count) => { crate::util::logger::log_info(&format!( "Committed all {} staged results", count )).await; } Err(e) => { crate::util::logger::log_error(&format!( "Failed to commit all: {}", e )).await; } } } WriteCommand::Shutdown => break, } } // Final shutdown - rollback any remaining staged items let stats = self.writer.stats(); if stats.staged_count > 0 { crate::util::logger::log_warn(&format!( "⚠ Shutdown with {} uncommitted results - rolling back", stats.staged_count )).await; self.writer.rollback_all().await; } crate::util::logger::log_info(&format!( "Atomic writer service stopped. Final stats: {} committed, {} rolled back", stats.committed_count, stats.rollback_count )).await; } } /// Handle for sending write commands #[derive(Clone)] pub struct AtomicWriterHandle { tx: mpsc::UnboundedSender>, } impl AtomicWriterHandle { pub fn new(tx: mpsc::UnboundedSender>) -> Self { Self { tx } } /// Stage data for writing (does not write immediately) pub fn stage(&self, id: String, data: T) { let _ = self.tx.send(WriteCommand::Stage { id, data }); } /// Commit staged data to disk pub fn commit(&self, id: String) { let _ = self.tx.send(WriteCommand::Commit { id }); } /// Rollback staged data (discard) pub fn rollback(&self, id: String) { let _ = self.tx.send(WriteCommand::Rollback { id }); } /// Commit all staged data pub fn commit_all(&self) { let _ = self.tx.send(WriteCommand::CommitAll); } /// Shutdown writer gracefully pub fn shutdown(&self) { let _ = self.tx.send(WriteCommand::Shutdown); } } /// Create atomic writer service pub async fn create_atomic_writer( path: PathBuf, shutdown_flag: Arc, ) -> Result<(AtomicWriterHandle, tokio::task::JoinHandle<()>)> { let (tx, rx) = mpsc::unbounded_channel(); let service = AtomicWriterService::new(path, rx, shutdown_flag).await?; let handle = tokio::spawn(async move { service.run().await; }); Ok((AtomicWriterHandle::new(tx), handle)) }