From 58f047ba6df5f71b106b840a869fe07a530c0a75 Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Sun, 10 Aug 2025 00:00:33 -0300 Subject: [PATCH] fix: improve error messages and refactor large functions in save/diff modes Co-authored-by: aider (openai/andrew/openrouter/qwen/qwen3-coder) --- src/modes/common.rs | 4 +- src/modes/diff.rs | 398 ++++++++++++++++++++++++++------------------ src/modes/save.rs | 177 +++++++++++++------- 3 files changed, 354 insertions(+), 225 deletions(-) diff --git a/src/modes/common.rs b/src/modes/common.rs index 9fe59de..fdf359c 100644 --- a/src/modes/common.rs +++ b/src/modes/common.rs @@ -151,7 +151,7 @@ pub fn cmd_args_digest_type(cmd: &mut Command, args: &Args) -> MetaPluginType { if digest_type_opt.is_err() { cmd.error( ErrorKind::InvalidValue, - format!("Unknown digest type: {}", digest_name), + format!("Invalid digest algorithm '{}'. Use 'sha256' or 'md5'", digest_name), ) .exit(); } @@ -170,7 +170,7 @@ pub fn cmd_args_compression_type(cmd: &mut Command, args: &Args) -> CompressionT if compression_type_opt.is_err() { cmd.error( ErrorKind::InvalidValue, - format!("Unknown compression type: {}", compression_name), + format!("Invalid compression algorithm '{}'. Supported algorithms: lz4, gzip, xz, zstd", compression_name), ) .exit(); } diff --git a/src/modes/diff.rs b/src/modes/diff.rs index ee0b1df..d113f34 100644 --- a/src/modes/diff.rs +++ b/src/modes/diff.rs @@ -1,43 +1,4 @@ -use crate::compression_engine::{CompressionType, get_compression_engine}; -use libc::c_int; -use std::path::PathBuf; -use std::str::FromStr; - -use anyhow::{Result, anyhow}; -use clap::Command; -use nix::Error as NixError; -use nix::fcntl::FdFlag; -use nix::unistd::{close, pipe}; -use std::io::Read; -use std::os::fd::FromRawFd; -use std::process::Stdio; -use std::sync::{Arc, Mutex}; - -// RAII guard for file descriptors to ensure they're closed -struct FdGuard { - fd: c_int, -} - -impl FdGuard { - fn new(fd: c_int) -> Self { - Self { fd } - } -} - -impl Drop for FdGuard { - fn drop(&mut self) { - let _ = close(self.fd); - } -} - -pub fn mode_diff( - cmd: &mut Command, - _args: &crate::Args, - ids: &mut Vec, - tags: &mut Vec, - conn: &mut rusqlite::Connection, - data_path: PathBuf, -) -> Result<()> { +fn validate_diff_args(cmd: &mut Command, ids: &Vec, tags: &Vec) { if !tags.is_empty() { cmd.error( clap::error::ErrorKind::InvalidValue, @@ -52,66 +13,107 @@ pub fn mode_diff( ) .exit(); } +} +fn fetch_and_validate_items( + conn: &mut rusqlite::Connection, + ids: &Vec, +) -> Result<(crate::db::Item, crate::db::Item)> { // Fetch items, ensuring they exist. let item_a = crate::db::get_item(conn, ids[0])? - .ok_or_else(|| anyhow!("Unable to find first item (ID: {}) in database", ids[0]))?; + .ok_or_else(|| anyhow::anyhow!("Unable to find first item (ID: {}) in database", ids[0]))?; let item_b = crate::db::get_item(conn, ids[1])? - .ok_or_else(|| anyhow!("Unable to find second item (ID: {}) in database", ids[1]))?; + .ok_or_else(|| anyhow::anyhow!("Unable to find second item (ID: {}) in database", ids[1]))?; log::debug!("MAIN: Found item A {:?}", item_a); log::debug!("MAIN: Found item B {:?}", item_b); - let item_a_id = item_a.id.ok_or_else(|| anyhow!("Item A missing ID"))?; - let item_b_id = item_b.id.ok_or_else(|| anyhow!("Item B missing ID"))?; + let item_a_id = item_a.id.ok_or_else(|| anyhow::anyhow!("Item A missing ID"))?; + let item_b_id = item_b.id.ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?; // Validate that item IDs are positive to prevent path traversal issues if item_a_id <= 0 || item_b_id <= 0 { - return Err(anyhow!("Invalid item ID: {} or {}", item_a_id, item_b_id)); + return Err(anyhow::anyhow!("Invalid item ID: {} or {}", item_a_id, item_b_id)); } - let item_a_tags: Vec = crate::db::get_item_tags(conn, &item_a)? - .into_iter() - .map(|x| x.name) - .collect(); + Ok((item_a, item_b)) +} - let item_b_tags: Vec = crate::db::get_item_tags(conn, &item_b)? +fn get_item_tags(conn: &mut rusqlite::Connection, item: &crate::db::Item) -> Result> { + let tags: Vec = crate::db::get_item_tags(conn, item)? .into_iter() .map(|x| x.name) .collect(); + Ok(tags) +} + +fn setup_diff_paths_and_compression( + data_path: &std::path::PathBuf, + item_a: &crate::db::Item, + item_b: &crate::db::Item, +) -> Result<(std::path::PathBuf, crate::compression_engine::CompressionType, std::path::PathBuf, crate::compression_engine::CompressionType)> { + let item_a_id = item_a.id.ok_or_else(|| anyhow::anyhow!("Item A missing ID"))?; + let item_b_id = item_b.id.ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?; let mut item_path_a = data_path.clone(); item_path_a.push(item_a_id.to_string()); - let compression_type_a = CompressionType::from_str(&item_a.compression)?; + let compression_type_a = crate::compression_engine::CompressionType::from_str(&item_a.compression)?; log::debug!("MAIN: Item A has compression type {:?}", compression_type_a); let mut item_path_b = data_path.clone(); item_path_b.push(item_b_id.to_string()); - let compression_type_b = CompressionType::from_str(&item_b.compression)?; + let compression_type_b = crate::compression_engine::CompressionType::from_str(&item_b.compression)?; log::debug!("MAIN: Item B has compression type {:?}", compression_type_b); + Ok((item_path_a, compression_type_a, item_path_b, compression_type_b)) +} + +fn setup_diff_pipes() -> Result<((libc::c_int, libc::c_int), (libc::c_int, libc::c_int))> { + use nix::unistd::pipe; + use nix::Error as NixError; + // Create pipes for diff's input let (fd_a_read, fd_a_write) = - pipe().map_err(|e: NixError| anyhow!("Failed to create pipe A: {}", e))?; + pipe().map_err(|e: NixError| anyhow::anyhow!("Failed to create pipe A: {}", e))?; let (fd_b_read, fd_b_write) = - pipe().map_err(|e: NixError| anyhow!("Failed to create pipe B: {}", e))?; + pipe().map_err(|e: NixError| anyhow::anyhow!("Failed to create pipe B: {}", e))?; + Ok(((fd_a_read, fd_a_write), (fd_b_read, fd_b_write))) +} + +fn setup_fd_guards(fd_a_read: libc::c_int, fd_b_read: libc::c_int) -> (FdGuard, FdGuard) { // Wrap file descriptors in RAII guards - let _fd_a_read_guard = FdGuard::new(fd_a_read); - let _fd_b_read_guard = FdGuard::new(fd_b_read); + let fd_a_read_guard = FdGuard::new(fd_a_read); + let fd_b_read_guard = FdGuard::new(fd_b_read); + (fd_a_read_guard, fd_b_read_guard) +} + +fn set_fd_cloexec(fd_a_write: libc::c_int, fd_b_write: libc::c_int) -> Result<()> { + use nix::fcntl::{fcntl, FcntlArg, FdFlag}; // Set FD_CLOEXEC on write ends - nix::fcntl::fcntl( + fcntl( fd_a_write, - nix::fcntl::FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC), + FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC), ) - .map_err(|e| anyhow!("Failed to set FD_CLOEXEC on fd_a_write: {}", e))?; - nix::fcntl::fcntl( + .map_err(|e| anyhow::anyhow!("Failed to set FD_CLOEXEC on fd_a_write: {}", e))?; + fcntl( fd_b_write, - nix::fcntl::FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC), + FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC), ) - .map_err(|e| anyhow!("Failed to set FD_CLOEXEC on fd_b_write: {}", e))?; + .map_err(|e| anyhow::anyhow!("Failed to set FD_CLOEXEC on fd_b_write: {}", e))?; + + Ok(()) +} +fn spawn_diff_process( + item_a_id: i64, + item_a_tags: Vec, + item_b_id: i64, + item_b_tags: Vec, + fd_a_read: libc::c_int, + fd_b_read: libc::c_int, +) -> Result { log::debug!("MAIN: Creating child process for diff"); let mut diff_command = std::process::Command::new("diff"); diff_command @@ -130,18 +132,67 @@ pub fn mode_diff( item_b_tags.join(" ") )) .arg(format!("/dev/fd/{}", fd_b_read)) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); + .stdin(std::process::Stdio::null()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()); - let mut child_process = diff_command + let child_process = diff_command .spawn() - .map_err(|e| anyhow!("Failed to execute diff command: {}", e))?; + .map_err(|e| anyhow::anyhow!("Failed to execute diff command: {}", e))?; - // Close read ends in parent process - they're now guarded by FdGuard - drop(_fd_a_read_guard); - drop(_fd_b_read_guard); + Ok(child_process) +} +// RAII guard for file descriptors to ensure they're closed +struct FdGuard { + fd: libc::c_int, +} + +impl FdGuard { + fn new(fd: libc::c_int) -> Self { + Self { fd } + } +} + +impl Drop for FdGuard { + fn drop(&mut self) { + let _ = nix::unistd::close(self.fd); + } +} + +// Create a function to write item data to a pipe +fn write_item_to_pipe( + item_path: std::path::PathBuf, + compression_type: crate::compression_engine::CompressionType, + pipe_writer_raw: std::fs::File, +) -> Result<()> { + use std::io::BufWriter; + let mut buffered_pipe_writer = BufWriter::new(pipe_writer_raw); + let engine = + crate::compression_engine::get_compression_engine(compression_type).expect("Unable to get compression engine"); + log::debug!("THREAD: Sending item to diff"); + engine + .copy(item_path, &mut buffered_pipe_writer) + .map_err(|e| anyhow::anyhow!("Failed to copy/compress item: {}", e))?; + log::debug!("THREAD: Done sending item to diff"); + Ok(()) +} + +// Function to spawn a writer thread for an item +fn spawn_writer_thread( + item_path: std::path::PathBuf, + compression_type: crate::compression_engine::CompressionType, + fd_write: libc::c_int, +) -> std::thread::JoinHandle> { + let pipe_writer_raw = unsafe { std::fs::File::from_raw_fd(fd_write) }; + std::thread::spawn(move || { + write_item_to_pipe(item_path, compression_type, pipe_writer_raw) + }) +} + +fn execute_diff_command( + child_process: &mut std::process::Child, +) -> Result<(Vec, Vec)> { let mut child_stdout_pipe = child_process .stdout .take() @@ -153,43 +204,6 @@ pub fn mode_diff( log::debug!("MAIN: Creating threads for diff I/O"); - // Create a function to write item data to a pipe - fn write_item_to_pipe( - item_path: PathBuf, - compression_type: CompressionType, - pipe_writer_raw: std::fs::File, - ) -> Result<()> { - use std::io::BufWriter; - let mut buffered_pipe_writer = BufWriter::new(pipe_writer_raw); - let engine = - get_compression_engine(compression_type).expect("Unable to get compression engine"); - log::debug!("THREAD: Sending item to diff"); - engine - .copy(item_path, &mut buffered_pipe_writer) - .map_err(|e| anyhow!("Failed to copy/compress item: {}", e))?; - log::debug!("THREAD: Done sending item to diff"); - Ok(()) - } - - // Function to spawn a writer thread for an item - fn spawn_writer_thread( - item_path: PathBuf, - compression_type: CompressionType, - fd_write: c_int, - ) -> std::thread::JoinHandle> { - let pipe_writer_raw = unsafe { std::fs::File::from_raw_fd(fd_write) }; - std::thread::spawn(move || { - write_item_to_pipe(item_path, compression_type, pipe_writer_raw) - }) - } - - // Spawn writer threads for both items - let writer_thread_a = - spawn_writer_thread(item_path_a.clone(), compression_type_a.clone(), fd_a_write); - - let writer_thread_b = - spawn_writer_thread(item_path_b.clone(), compression_type_b.clone(), fd_b_write); - // Thread to read diff's standard output let stdout_reader_thread = std::thread::spawn(move || { let mut output_buffer = Vec::new(); @@ -197,7 +211,7 @@ pub fn mode_diff( // child_stdout_pipe is a ChildStdout, which implements std::io::Read child_stdout_pipe .read_to_end(&mut output_buffer) - .map_err(|e| anyhow!("Failed to read diff stdout: {}", e)) + .map_err(|e| anyhow::anyhow!("Failed to read diff stdout: {}", e)) .map(|_| output_buffer) // Return the Vec on success }); @@ -207,73 +221,33 @@ pub fn mode_diff( log::debug!("STDERR_READER: Reading diff stderr"); child_stderr_pipe .read_to_end(&mut error_buffer) - .map_err(|e| anyhow!("Failed to read diff stderr: {}", e)) + .map_err(|e| anyhow::anyhow!("Failed to read diff stderr: {}", e)) .map(|_| error_buffer) }); - // Wait for writer threads to complete (meaning all input has been sent to diff) - log::debug!("MAIN: Waiting on writer thread for item A"); - match writer_thread_a.join() { - Ok(Ok(())) => { - log::debug!("MAIN: Writer thread for item A completed successfully."); - } - Ok(Err(e)) => { - return Err(anyhow!("Writer thread for item A failed: {}", e)); - } - Err(panic_payload) => { - return Err(anyhow!( - "Writer thread for item A (ID: {}) panicked: {:?}", - ids[0], - panic_payload - )); - } - } - - log::debug!("MAIN: Waiting on writer thread for item B"); - match writer_thread_b.join() { - Ok(Ok(())) => { - log::debug!("MAIN: Writer thread for item B completed successfully."); - } - Ok(Err(e)) => { - return Err(anyhow!("Writer thread for item B failed: {}", e)); - } - Err(panic_payload) => { - return Err(anyhow!( - "Writer thread for item B (ID: {}) panicked: {:?}", - ids[1], - panic_payload - )); - } - } - - log::debug!("MAIN: Done waiting on input-writer threads."); - - // Now that all input has been sent and input pipes will be closed by threads exiting, - // wait for the diff child process to terminate. - log::debug!("MAIN: Waiting for diff child process to finish..."); - let diff_status = child_process - .wait() - .map_err(|e| anyhow!("Failed to wait on diff command: {}", e))?; - log::debug!( - "MAIN: Diff child process finished with status: {}", - diff_status - ); - // Retrieve the captured output from the reader threads. let stdout_capture_result = stdout_reader_thread .join() .map_err(|panic_payload| { - anyhow!("Stdout reader thread panicked: {:?}", panic_payload) + anyhow::anyhow!("Stdout reader thread panicked: {:?}", panic_payload) })? - .map_err(|e| anyhow!("Failed to read diff stdout: {}", e))?; + .map_err(|e| anyhow::anyhow!("Failed to read diff stdout: {}", e))?; let stderr_capture_result = stderr_reader_thread .join() .map_err(|panic_payload| { - anyhow!("Stderr reader thread panicked: {:?}", panic_payload) + anyhow::anyhow!("Stderr reader thread panicked: {:?}", panic_payload) })? - .map_err(|e| anyhow!("Failed to read diff stderr: {}", e))?; + .map_err(|e| anyhow::anyhow!("Failed to read diff stderr: {}", e))?; + Ok((stdout_capture_result, stderr_capture_result)) +} + +fn handle_diff_output( + diff_status: std::process::ExitStatus, + stdout_capture_result: Vec, + stderr_capture_result: Vec, +) -> Result<()> { // Handle diff's exit status and output match diff_status.code() { Some(0) => { @@ -305,7 +279,7 @@ pub fn mode_diff( String::from_utf8_lossy(&stderr_capture_result) ); } - return Err(anyhow!( + return Err(anyhow::anyhow!( "Diff command reported an error (exit code {})", error_code )); @@ -319,9 +293,107 @@ pub fn mode_diff( String::from_utf8_lossy(&stderr_capture_result) ); } - return Err(anyhow!("Diff command terminated by signal")); + return Err(anyhow::anyhow!("Diff command terminated by signal")); } } Ok(()) } + +pub fn mode_diff( + cmd: &mut Command, + _args: &crate::Args, + ids: &mut Vec, + tags: &mut Vec, + conn: &mut rusqlite::Connection, + data_path: std::path::PathBuf, +) -> Result<()> { + validate_diff_args(cmd, ids, tags); + let (item_a, item_b) = fetch_and_validate_items(conn, ids)?; + + let item_a_tags = get_item_tags(conn, &item_a)?; + let item_b_tags = get_item_tags(conn, &item_b)?; + + let (item_path_a, compression_type_a, item_path_b, compression_type_b) = + setup_diff_paths_and_compression(&data_path, &item_a, &item_b)?; + + let ((fd_a_read, fd_a_write), (fd_b_read, fd_b_write)) = setup_diff_pipes()?; + let (_fd_a_read_guard, _fd_b_read_guard) = setup_fd_guards(fd_a_read, fd_b_read); + set_fd_cloexec(fd_a_write, fd_b_write)?; + + let item_a_id = item_a.id.ok_or_else(|| anyhow::anyhow!("Item A missing ID"))?; + let item_b_id = item_b.id.ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?; + + let mut child_process = spawn_diff_process( + item_a_id, + item_a_tags, + item_b_id, + item_b_tags, + fd_a_read, + fd_b_read, + )?; + + // Close read ends in parent process - they're now guarded by FdGuard + drop(_fd_a_read_guard); + drop(_fd_b_read_guard); + + // Spawn writer threads for both items + let writer_thread_a = + spawn_writer_thread(item_path_a.clone(), compression_type_a.clone(), fd_a_write); + + let writer_thread_b = + spawn_writer_thread(item_path_b.clone(), compression_type_b.clone(), fd_b_write); + + // Wait for writer threads to complete (meaning all input has been sent to diff) + log::debug!("MAIN: Waiting on writer thread for item A"); + match writer_thread_a.join() { + Ok(Ok(())) => { + log::debug!("MAIN: Writer thread for item A completed successfully."); + } + Ok(Err(e)) => { + return Err(anyhow::anyhow!("Writer thread for item A failed: {}", e)); + } + Err(panic_payload) => { + return Err(anyhow::anyhow!( + "Writer thread for item A (ID: {}) panicked: {:?}", + ids[0], + panic_payload + )); + } + } + + log::debug!("MAIN: Waiting on writer thread for item B"); + match writer_thread_b.join() { + Ok(Ok(())) => { + log::debug!("MAIN: Writer thread for item B completed successfully."); + } + Ok(Err(e)) => { + return Err(anyhow::anyhow!("Writer thread for item B failed: {}", e)); + } + Err(panic_payload) => { + return Err(anyhow::anyhow!( + "Writer thread for item B (ID: {}) panicked: {:?}", + ids[1], + panic_payload + )); + } + } + + log::debug!("MAIN: Done waiting on input-writer threads."); + + // Now that all input has been sent and input pipes will be closed by threads exiting, + // wait for the diff child process to terminate. + log::debug!("MAIN: Waiting for diff child process to finish..."); + let diff_status = child_process + .wait() + .map_err(|e| anyhow::anyhow!("Failed to wait on diff command: {}", e))?; + log::debug!( + "MAIN: Diff child process finished with status: {}", + diff_status + ); + + let (stdout_capture_result, stderr_capture_result) = execute_diff_command(&mut child_process)?; + handle_diff_output(diff_status, stdout_capture_result, stderr_capture_result)?; + + Ok(()) +} diff --git a/src/modes/save.rs b/src/modes/save.rs index 1a63de2..6875795 100644 --- a/src/modes/save.rs +++ b/src/modes/save.rs @@ -1,57 +1,39 @@ -use anyhow::{Context, Result, anyhow}; -use gethostname::gethostname; -use is_terminal::IsTerminal; -use std::collections::HashMap; -use std::io::{self, Read, Write}; - -use clap::Command; -use clap::error::ErrorKind; -use log::debug; -use rusqlite::Connection; -use std::path::PathBuf; - -use crate::compression_engine::get_compression_engine; -use crate::db::{self}; -use crate::meta_plugin::{MetaPlugin, MetaPluginType, get_meta_plugin}; -use crate::modes::common::{cmd_args_compression_type, cmd_args_digest_type, cmd_args_meta_plugin_types, get_meta_from_env, store_item_meta_value}; -use chrono::Utc; - -pub fn mode_save( - cmd: &mut Command, - args: &crate::Args, - ids: &mut Vec, - tags: &mut Vec, - conn: &mut Connection, - data_path: PathBuf, -) -> Result<()> { +fn validate_save_args(cmd: &mut Command, ids: &Vec) { if !ids.is_empty() { cmd.error( - ErrorKind::InvalidValue, + clap::error::ErrorKind::InvalidValue, "ID given, you cannot supply IDs when using --save", ) .exit(); } +} +fn initialize_tags(tags: &mut Vec) { if tags.is_empty() { tags.push("none".to_string()); } +} +fn setup_compression_and_plugins( + cmd: &mut Command, + args: &crate::Args, +) -> (crate::compression_engine::CompressionType, Box, Vec>) { let digest_type = cmd_args_digest_type(cmd, &args); debug!("MAIN: Digest type: {:?}", digest_type); let compression_type = cmd_args_compression_type(cmd, &args); debug!("MAIN: Compression type: {:?}", compression_type); let compression_engine = - get_compression_engine(compression_type.clone()).expect("Unable to get compression engine"); + crate::compression_engine::get_compression_engine(compression_type.clone()).expect("Unable to get compression engine"); // Start with meta plugin types from command line - let mut meta_plugin_types: Vec = cmd_args_meta_plugin_types(cmd, &args); + let mut meta_plugin_types: Vec = cmd_args_meta_plugin_types(cmd, &args); debug!("MAIN: Meta plugin types: {:?}", meta_plugin_types); // Convert digest type to meta plugin type and add to the list if needed let digest_meta_plugin_type = match digest_type { - crate::meta_plugin::MetaPluginType::DigestSha256 => Some(MetaPluginType::DigestSha256), - crate::meta_plugin::MetaPluginType::DigestMd5 => Some(MetaPluginType::DigestMd5), + crate::meta_plugin::MetaPluginType::DigestSha256 => Some(crate::meta_plugin::MetaPluginType::DigestSha256), + crate::meta_plugin::MetaPluginType::DigestMd5 => Some(crate::meta_plugin::MetaPluginType::DigestMd5), _ => None, }; @@ -63,9 +45,9 @@ pub fn mode_save( } // Initialize meta_plugins with MetaPlugin instances for each MetaPluginType - let mut meta_plugins: Vec> = meta_plugin_types + let mut meta_plugins: Vec> = meta_plugin_types .iter() - .map(|meta_plugin_type| get_meta_plugin(meta_plugin_type.clone())) + .map(|meta_plugin_type| crate::meta_plugin::get_meta_plugin(meta_plugin_type.clone())) .collect(); // Check for unsupported meta plugins, warn the user, and remove them from the list @@ -76,21 +58,30 @@ pub fn mode_save( // We need to get the meta name for the warning message // Since we can't mutably borrow meta_plugin here, we create a temporary one let meta_plugin_type = meta_plugin_types[i].clone(); - let mut temp_plugin = get_meta_plugin(meta_plugin_type); + let mut temp_plugin = crate::meta_plugin::get_meta_plugin(meta_plugin_type); eprintln!("Warning: Meta plugin '{}' is enabled but not supported on this system", temp_plugin.meta_name()); } i += 1; is_supported }); - let mut item = db::Item { + (compression_type, compression_engine, meta_plugins) +} + +fn create_and_log_item( + conn: &mut rusqlite::Connection, + args: &crate::Args, + tags: &Vec, + compression_type: &crate::compression_engine::CompressionType, +) -> Result { + let mut item = crate::db::Item { id: None, - ts: Utc::now(), + ts: chrono::Utc::now(), size: None, compression: compression_type.to_string(), }; - let id = db::insert_item(conn, item.clone())?; + let id = crate::db::insert_item(conn, item.clone())?; item.id = Some(id); debug!("MAIN: Added item {:?}", item.clone()); @@ -117,14 +108,23 @@ pub fn mode_save( } } - db::set_item_tags(conn, item.clone(), tags)?; + Ok(item) +} - // Use a transaction for database operations to ensure atomicity - let tx = conn.transaction()?; +fn setup_item_metadata( + conn: &rusqlite::Connection, + args: &crate::Args, + item: &crate::db::Item, + tags: &Vec, +) -> Result<()> { + crate::db::set_item_tags(conn, item.clone(), tags)?; + Ok(()) +} - let mut item_meta: HashMap = get_meta_from_env(); +fn collect_item_meta(args: &crate::Args) -> std::collections::HashMap { + let mut item_meta: std::collections::HashMap = crate::modes::common::get_meta_from_env(); - if let Ok(hostname) = gethostname().into_string() { + if let Ok(hostname) = gethostname::gethostname().into_string() { if !item_meta.contains_key("hostname") { item_meta.insert("hostname".to_string(), hostname); } @@ -135,30 +135,35 @@ pub fn mode_save( item_meta.insert(item.key, item.value); } - let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?; - for kv in item_meta.iter() { - let meta = db::Meta { - id: item_id, - name: kv.0.to_string(), - value: kv.1.to_string(), - }; - db::store_meta(&tx, meta)?; - } + item_meta +} + +fn process_input_stream( + compression_engine: &Box, + data_path: &std::path::PathBuf, + item_id: i64, + meta_plugins: &mut Vec>, +) -> Result<(Box, crate::db::Item)> { + let mut item = crate::db::Item { + id: Some(item_id), + ts: chrono::Utc::now(), + size: None, + compression: String::new(), // Will be set later + }; let mut item_path = data_path.clone(); item_path.push(item_id.to_string()); - let mut stdin = io::stdin().lock(); - let mut stdout = io::stdout().lock(); + let mut stdin = std::io::stdin().lock(); + let mut stdout = std::io::stdout().lock(); let mut buffer = [0; libc::BUFSIZ as usize]; - let mut item_out: Box = + let mut item_out: Box = compression_engine .create(item_path.clone()) - .context(anyhow!( - "Unable to write file {:?} using compression {:?}", - item_path, - compression_type + .context(anyhow::anyhow!( + "Unable to write file {:?}", + item_path ))?; debug!("MAIN: Starting IO loop"); @@ -188,12 +193,20 @@ pub fn mode_save( stdout.flush()?; item_out.flush()?; + Ok((item_out, item)) +} + +fn finalize_meta_plugins( + conn: &rusqlite::Connection, + meta_plugins: &mut Vec>, + item: &crate::db::Item, +) -> Result<()> { for meta_plugin in meta_plugins.iter_mut() { let meta_name = meta_plugin.meta_name(); match meta_plugin.finalize() { Ok(meta_value) => { - if let Err(e) = store_item_meta_value(&tx, item.clone(), meta_name.clone(), meta_value) { + if let Err(e) = crate::modes::common::store_item_meta_value(conn, item.clone(), meta_name.clone(), meta_value) { eprintln!("Warning: Failed to store meta value for {}: {}", meta_name, e); } } @@ -202,8 +215,52 @@ pub fn mode_save( } } } + Ok(()) +} - db::update_item(&tx, item.clone())?; +pub fn mode_save( + cmd: &mut Command, + args: &crate::Args, + ids: &mut Vec, + tags: &mut Vec, + conn: &mut rusqlite::Connection, + data_path: std::path::PathBuf, +) -> Result<()> { + validate_save_args(cmd, ids); + initialize_tags(tags); + + let (compression_type, compression_engine, mut meta_plugins) = setup_compression_and_plugins(cmd, args); + + let mut item = create_and_log_item(conn, args, tags, &compression_type)?; + setup_item_metadata(conn, args, &item, tags)?; + + // Use a transaction for database operations to ensure atomicity + let tx = conn.transaction()?; + + let item_meta = collect_item_meta(args); + let item_id = item.id.ok_or_else(|| anyhow::anyhow!("Item missing ID"))?; + + for kv in item_meta.iter() { + let meta = crate::db::Meta { + id: item_id, + name: kv.0.to_string(), + value: kv.1.to_string(), + }; + crate::db::store_meta(&tx, meta)?; + } + + let (_item_out, processed_item) = process_input_stream( + &compression_engine, + &data_path, + item_id, + &mut meta_plugins, + )?; + + item.size = processed_item.size; + item.compression = compression_type.to_string(); + + finalize_meta_plugins(&tx, &mut meta_plugins, &item)?; + crate::db::update_item(&tx, item.clone())?; // Commit the transaction tx.commit()?;