diff --git a/Cargo.lock b/Cargo.lock index 288d39a..c1d9e62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "adler" @@ -506,6 +506,7 @@ dependencies = [ "libc", "log", "lz4_flex", + "nix", "prettytable-rs", "regex", "rusqlite", @@ -585,6 +586,15 @@ version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -594,6 +604,19 @@ dependencies = [ "adler", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", + "memoffset", + "pin-utils", +] + [[package]] name = "num-traits" version = "0.2.16" @@ -615,6 +638,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.27" diff --git a/src/main.rs b/src/main.rs index 6c0a430..0eb032b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,10 @@ use std::path::PathBuf; use std::collections::HashMap; use std::env; use std::os::fd::FromRawFd; +use std::process::Stdio; // For Stdio::null, Stdio::piped +use nix::fcntl::{FdFlag}; +use nix::unistd::{close, pipe}; +use nix::Error as NixError; use regex::Regex; @@ -46,7 +50,6 @@ use is_terminal::IsTerminal; extern crate term; -use nix::unistd; lazy_static! { static ref FORMAT_BOX_CHARS_NO_BORDER_LINE_SEPARATOR: TableFormat = format::FormatBuilder::new() @@ -482,102 +485,254 @@ fn mode_get(cmd: &mut Command, args: Args, ids: &mut Vec, tags: &mut Vec, tags: &mut Vec, conn: &mut Connection, data_path: PathBuf) -> Result<()> { - - if ! tags.is_empty() { - cmd.error(ErrorKind::InvalidValue, "Tags given, you must supply exactly two IDs when using --diff").exit(); - } else if ids.len() > 2 || ids.len() < 1 { - cmd.error(ErrorKind::InvalidValue, "You must supply exactly one or two IDs when using --diff").exit(); +fn mode_diff( + cmd: &mut clap::Command, + _args: Args, // Mark as unused if not needed directly + ids: &mut Vec, + tags: &mut Vec, + conn: &mut Connection, + data_path: PathBuf, +) -> Result<()> { + if !tags.is_empty() { + cmd.error( + ErrorKind::InvalidValue, + "Tags are not supported with --diff. Please provide exactly two IDs.", + ) + .exit(); + } + if ids.len() != 2 { + cmd.error( + ErrorKind::InvalidValue, + "You must supply exactly two IDs when using --diff.", + ) + .exit(); } - let item_a: Option = db::get_item(conn, ids[0])?; + // Fetch items, ensuring they exist. + let item_a = db::get_item(conn, ids[0])? + .ok_or_else(|| anyhow!("Unable to find first item (ID: {}) in database", ids[0]))?; + let item_b = db::get_item(conn, ids[1])? + .ok_or_else(|| anyhow!("Unable to find second item (ID: {}) in database", ids[1]))?; - let mut item_b: Option = None; - if ids.len() == 2 { - item_b = db::get_item(conn, ids[1])?; + debug!("MAIN: Found item A {:?}", item_a); + debug!("MAIN: Found item B {:?}", item_b); + + let item_a_tags: Vec = db::get_item_tags(conn, &item_a)? + .into_iter() + .map(|x| {x.name}) + .collect(); + + let item_b_tags: Vec = db::get_item_tags(conn, &item_b)? + .into_iter() + .map(|x| {x.name}) + .collect(); + + + let mut item_path_a = data_path.clone(); + item_path_a.push(item_a.id.unwrap().to_string()); // id.unwrap() is safe due to ok_or_else + let compression_type_a = CompressionType::from_str(&item_a.compression)?; + debug!("MAIN: Item A has compression type {:?}", compression_type_a); + + let mut item_path_b = data_path.clone(); + item_path_b.push(item_b.id.unwrap().to_string()); + let compression_type_b = CompressionType::from_str(&item_b.compression)?; + debug!("MAIN: Item B has compression type {:?}", compression_type_b); + + // Create pipes for diff's input + let (fd_a_read, fd_a_write) = + pipe().map_err(|e: NixError| anyhow!("Failed to create pipe A: {}", e))?; + let (fd_b_read, fd_b_write) = + pipe().map_err(|e: NixError| anyhow!("Failed to create pipe B: {}", e))?; + + // Set FD_CLOEXEC on write ends. While they are consumed by File::from_raw_fd, + // it's good practice if the raw FDs were to be handled further before that. + // For this specific code, since from_raw_fd takes ownership immediately, this is less critical + // but doesn't hurt. + nix::fcntl::fcntl(fd_a_write, nix::fcntl::FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) + .map_err(|e| anyhow!("Failed to set FD_CLOEXEC on fd_a_write: {}", e))?; + nix::fcntl::fcntl(fd_b_write, nix::fcntl::FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) + .map_err(|e| anyhow!("Failed to set FD_CLOEXEC on fd_b_write: {}", e))?; + + + debug!("MAIN: Creating child process for diff"); + let mut diff_command = std::process::Command::new("diff"); + diff_command + .arg("-u") + .arg("--label") + .arg(format!("Keep item A: {} {}", item_a.id.unwrap(), item_a_tags.join(" "))) + .arg(format!("/dev/fd/{}", fd_a_read)) + .arg("--label") + .arg(format!("Keep item B: {} {}", item_b.id.unwrap(), item_b_tags.join(" "))) + .arg(format!("/dev/fd/{}", fd_b_read)) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + let mut child_process = diff_command + .spawn() + .map_err(|e| anyhow!("Failed to execute diff command: {}", e))?; + + close(fd_a_read).map_err(|e| anyhow!("Failed to close fd_a_read in parent: {}", e))?; + close(fd_b_read).map_err(|e| anyhow!("Failed to close fd_b_read in parent: {}", e))?; + + let mut child_stdout_pipe = child_process + .stdout + .take() + .expect("BUG: Failed to capture diff stdout pipe"); + let mut child_stderr_pipe = child_process + .stderr + .take() + .expect("BUG: Failed to capture diff stderr pipe"); + + debug!("MAIN: Creating threads for diff I/O"); + + // Thread to write Item A data to diff + let writer_thread_a = { + // `File::from_raw_fd` takes ownership of fd_a_write. + let pipe_writer_a_raw = unsafe { std::fs::File::from_raw_fd(fd_a_write) }; + let item_path_a_clone = item_path_a.clone(); // Avoid lifetime issues if item_path_a is used later + let compression_type_a_clone = compression_type_a.clone(); + std::thread::spawn(move || { + // Original code used .expect/.unwrap, implying panics on error. + // This matches that style. For more robust error handling, return Result from thread. + let mut buffered_pipe_writer_a = BufWriter::new(pipe_writer_a_raw); + let engine_a = compression::get_engine(compression_type_a_clone) + .expect("Unable to get compression engine for Item A"); + debug!("THREAD_A: Sending item A to diff"); + engine_a + .copy(item_path_a_clone, &mut buffered_pipe_writer_a) + .expect("Failed to copy/compress item A"); + debug!("THREAD_A: Done Sending item A to diff"); + // pipe_writer_a_raw (and buffered_pipe_writer_a) are dropped here, closing fd_a_write. + // This signals EOF to one of diff's inputs. + }) + }; + + // Thread to write Item B data to diff + let writer_thread_b = { + let pipe_writer_b_raw = unsafe { std::fs::File::from_raw_fd(fd_b_write) }; + let item_path_b_clone = item_path_b.clone(); + let compression_type_b_clone = compression_type_b.clone(); + std::thread::spawn(move || { + let mut buffered_pipe_writer_b = BufWriter::new(pipe_writer_b_raw); + let engine_b = compression::get_engine(compression_type_b_clone) + .expect("Unable to get compression engine for Item B"); + debug!("THREAD_B: Sending item B to diff"); + engine_b + .copy(item_path_b_clone, &mut buffered_pipe_writer_b) + .expect("Failed to copy/compress item B"); + debug!("THREAD_B: Done Sending item B to diff"); + }) + }; + + // Thread to read diff's standard output + let stdout_reader_thread = std::thread::spawn(move || { + let mut output_buffer = Vec::new(); + debug!("STDOUT_READER: Reading diff stdout"); + // child_stdout_pipe is a ChildStdout, which implements std::io::Read + child_stdout_pipe + .read_to_end(&mut output_buffer) + .map_err(|e| anyhow!("Failed to read diff stdout: {}", e)) + .map(|_| output_buffer) // Return the Vec on success + }); + + // Thread to read diff's standard error + let stderr_reader_thread = std::thread::spawn(move || { + let mut error_buffer = Vec::new(); + debug!("STDERR_READER: Reading diff stderr"); + child_stderr_pipe + .read_to_end(&mut error_buffer) + .map_err(|e| anyhow!("Failed to read diff stderr: {}", e)) + .map(|_| error_buffer) + }); + + // Wait for writer threads to complete (meaning all input has been sent to diff) + debug!("MAIN: Waiting on writer thread for item A"); + if let Err(panic_payload) = writer_thread_a.join() { + // Propagate panic from writer thread + return Err(anyhow!( + "Writer thread for item A (ID: {}) panicked: {:?}", + ids[0], + panic_payload + )); } + debug!("MAIN: Writer thread for item A completed."); - if let Some(item_a) = item_a { - debug!("MAIN: Found item A {:?}", item_a); + debug!("MAIN: Waiting on writer thread for item B"); + if let Err(panic_payload) = writer_thread_b.join() { + return Err(anyhow!( + "Writer thread for item B (ID: {}) panicked: {:?}", + ids[1], + panic_payload + )); + } + debug!("MAIN: Writer thread for item B completed."); + debug!("MAIN: Done waiting on input-writer threads."); - let mut item_path_a = data_path.clone(); - item_path_a.push(item_a.id.unwrap().to_string()); + // Now that all input has been sent and input pipes will be closed by threads exiting, + // wait for the diff child process to terminate. + debug!("MAIN: Waiting for diff child process to finish..."); + let diff_status = child_process + .wait() + .map_err(|e| anyhow!("Failed to wait on diff command: {}", e))?; + debug!("MAIN: Diff child process finished with status: {}", diff_status); - let compression_type_a = CompressionType::from_str(&item_a.compression)?; - debug!("MAIN: Item A has compression type {:?}", compression_type_a.clone()); + // Retrieve the captured output from the reader threads. + // .join().unwrap() here will panic if the reader thread itself panicked. + // The inner Result is from the read_to_end operation within the thread. + let stdout_capture_result = stdout_reader_thread.join().unwrap_or_else(|panic_payload| { + Err(anyhow!("Stdout reader thread panicked: {:?}", panic_payload)) + })?; + let stderr_capture_result = stderr_reader_thread.join().unwrap_or_else(|panic_payload| { + Err(anyhow!("Stderr reader thread panicked: {:?}", panic_payload)) + })?; - - if let Some(item_b) = item_b { - debug!("MAIN: Found item B {:?}", item_b); - - let mut item_path_b = data_path.clone(); - item_path_b.push(item_b.id.unwrap().to_string()); - - let compression_type_b = CompressionType::from_str(&item_b.compression)?; - debug!("MAIN: Item B has compression type {:?}", compression_type_b.clone()); - - let (fd_a_read, fd_a_write) = unistd::pipe().unwrap(); - let (fd_b_read, fd_b_write) = unistd::pipe().unwrap(); - - debug!("MAIN: Creating child process for diff"); - - let child = std::process::Command::new("diff") - .arg("-u") - .arg(format!("/dev/fd/{}", fd_a_read)) - .arg(format!("/dev/fd/{}", fd_b_read)) - .stdin(std::process::Stdio::null()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn() - .expect("Failed to execute diff command"); - - debug!("MAIN: Creating buffers"); - - let stdout_a_raw = unsafe { std::fs::File::from_raw_fd(fd_a_write) }; - let stdout_b_raw = unsafe { std::fs::File::from_raw_fd(fd_b_write) }; - - debug!("MAIN: Sending item A to diff"); - let handle_a = { - let stdout_a_raw = stdout_a_raw; - let item_path_a = item_path_a.clone(); - let compression_type_a = compression_type_a.clone(); - std::thread::spawn(move || { - let mut stdout_a = BufWriter::new(stdout_a_raw); - let compression_engine_a = compression::get_engine(compression_type_a).expect("Unable to get compression engine"); - compression_engine_a.copy(item_path_a, &mut stdout_a).unwrap() - }) - }; - - debug!("MAIN: Sending item B to diff"); - let handle_b = { - let stdout_b_raw = stdout_b_raw; - let item_path_b = item_path_b.clone(); - let compression_type_b = compression_type_b.clone(); - std::thread::spawn(move || { - let mut stdout_b = BufWriter::new(stdout_b_raw); - let compression_engine_b = compression::get_engine(compression_type_b).expect("Unable to get compression engine"); - compression_engine_b.copy(item_path_b, &mut stdout_b).unwrap() - }) - }; - - debug!("MAIN: Done copying data to FDs"); - handle_a.join().unwrap(); - handle_b.join().unwrap(); - - let output = child.wait_with_output().expect("Failed to wait on diff command"); - if output.status.success() { - println!("{}", String::from_utf8_lossy(&output.stdout)); - } else { - eprintln!("{}", String::from_utf8_lossy(&output.stderr)); + // Handle diff's exit status and output + match diff_status.code() { + Some(0) => { // Exit code 0: No differences + debug!("MAIN: Diff successful, no differences found."); + // Typically, diff -u doesn't print to stdout if no differences. + // But if it did, it would be shown here. + if !stdout_capture_result.is_empty() { + println!("{}", String::from_utf8_lossy(&stdout_capture_result)); } - - Ok(()) - } else { - Err(anyhow!("Unable to find second item in database")) } - } else { - Err(anyhow!("Unable to find first item in database")) + Some(1) => { // Exit code 1: Differences found + debug!("MAIN: Diff successful, differences found."); + println!("{}", String::from_utf8_lossy(&stdout_capture_result)); + } + Some(error_code) => { // Exit code > 1: Error in diff utility + eprintln!("Diff command failed with exit code: {}", error_code); + if !stdout_capture_result.is_empty() { + eprintln!( + "Diff stdout before error:\n{}", + String::from_utf8_lossy(&stdout_capture_result) + ); + } + if !stderr_capture_result.is_empty() { + eprintln!( + "Diff stderr:\n{}", + String::from_utf8_lossy(&stderr_capture_result) + ); + } + return Err(anyhow!( + "Diff command reported an error (exit code {})", + error_code + )); + } + None => { // Process terminated by a signal + eprintln!("Diff command terminated by signal."); + if !stderr_capture_result.is_empty() { + eprintln!( + "Diff stderr before signal termination:\n{}", + String::from_utf8_lossy(&stderr_capture_result) + ); + } + return Err(anyhow!("Diff command terminated by signal")); + } } + + Ok(()) }