use crate::compression_engine::{CompressionType, get_compression_engine}; use libc::c_int; use std::path::PathBuf; use std::str::FromStr; use anyhow::{anyhow, Result}; use clap::Command; use nix::fcntl::FdFlag; use nix::unistd::{close, pipe}; use nix::Error as NixError; use std::io::Read; use std::os::fd::FromRawFd; use std::process::Stdio; pub fn mode_diff( cmd: &mut Command, _args: crate::Args, ids: &mut Vec, tags: &mut Vec, conn: &mut rusqlite::Connection, data_path: PathBuf, ) -> Result<()> { if !tags.is_empty() { cmd.error( clap::error::ErrorKind::InvalidValue, "Tags are not supported with --diff. Please provide exactly two IDs.", ) .exit(); } if ids.len() != 2 { cmd.error( clap::error::ErrorKind::InvalidValue, "You must supply exactly two IDs when using --diff.", ) .exit(); } // Fetch items, ensuring they exist. let item_a = crate::db::get_item(conn, ids[0])? .ok_or_else(|| anyhow!("Unable to find first item (ID: {}) in database", ids[0]))?; let item_b = crate::db::get_item(conn, ids[1])? .ok_or_else(|| anyhow!("Unable to find second item (ID: {}) in database", ids[1]))?; log::debug!("MAIN: Found item A {:?}", item_a); log::debug!("MAIN: Found item B {:?}", item_b); let item_a_tags: Vec = crate::db::get_item_tags(conn, &item_a)? .into_iter() .map(|x| x.name) .collect(); let item_b_tags: Vec = crate::db::get_item_tags(conn, &item_b)? .into_iter() .map(|x| x.name) .collect(); let mut item_path_a = data_path.clone(); item_path_a.push(item_a.id.unwrap().to_string()); // id.unwrap() is safe due to ok_or_else let compression_type_a = CompressionType::from_str(&item_a.compression)?; log::debug!("MAIN: Item A has compression type {:?}", compression_type_a); let mut item_path_b = data_path.clone(); item_path_b.push(item_b.id.unwrap().to_string()); let compression_type_b = CompressionType::from_str(&item_b.compression)?; log::debug!("MAIN: Item B has compression type {:?}", compression_type_b); // Create pipes for diff's input let (fd_a_read, fd_a_write) = pipe().map_err(|e: NixError| anyhow!("Failed to create pipe A: {}", e))?; let (fd_b_read, fd_b_write) = pipe().map_err(|e: NixError| anyhow!("Failed to create pipe B: {}", e))?; // Set FD_CLOEXEC on write ends. While they are consumed by File::from_raw_fd, // it's good practice if the raw FDs were to be handled further before that. // For this specific code, since from_raw_fd takes ownership immediately, this is less critical // but doesn't hurt. nix::fcntl::fcntl( fd_a_write, nix::fcntl::FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC), ) .map_err(|e| anyhow!("Failed to set FD_CLOEXEC on fd_a_write: {}", e))?; nix::fcntl::fcntl( fd_b_write, nix::fcntl::FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC), ) .map_err(|e| anyhow!("Failed to set FD_CLOEXEC on fd_b_write: {}", e))?; log::debug!("MAIN: Creating child process for diff"); let mut diff_command = std::process::Command::new("diff"); diff_command .arg("-u") .arg("--label") .arg(format!( "Keep item A: {} {}", item_a.id.unwrap(), item_a_tags.join(" ") )) .arg(format!("/dev/fd/{}", fd_a_read)) .arg("--label") .arg(format!( "Keep item B: {} {}", item_b.id.unwrap(), item_b_tags.join(" ") )) .arg(format!("/dev/fd/{}", fd_b_read)) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child_process = diff_command .spawn() .map_err(|e| anyhow!("Failed to execute diff command: {}", e))?; close(fd_a_read).map_err(|e| anyhow!("Failed to close fd_a_read in parent: {}", e))?; close(fd_b_read).map_err(|e| anyhow!("Failed to close fd_b_read in parent: {}", e))?; let mut child_stdout_pipe = child_process .stdout .take() .expect("BUG: Failed to capture diff stdout pipe"); let mut child_stderr_pipe = child_process .stderr .take() .expect("BUG: Failed to capture diff stderr pipe"); log::debug!("MAIN: Creating threads for diff I/O"); // Create a function to write item data to a pipe fn write_item_to_pipe( item_path: PathBuf, compression_type: CompressionType, pipe_writer_raw: std::fs::File, ) { use std::io::BufWriter; let mut buffered_pipe_writer = BufWriter::new(pipe_writer_raw); let engine = crate::compression_engine::get_compression_engine(compression_type) .expect("Unable to get compression engine"); log::debug!("THREAD: Sending item to diff"); engine .copy(item_path, &mut buffered_pipe_writer) .expect("Failed to copy/compress item"); log::debug!("THREAD: Done sending item to diff"); } // Function to spawn a writer thread for an item fn spawn_writer_thread( item_path: PathBuf, compression_type: CompressionType, fd_write: c_int, ) -> std::thread::JoinHandle<()> { let pipe_writer_raw = unsafe { std::fs::File::from_raw_fd(fd_write) }; std::thread::spawn(move || { write_item_to_pipe(item_path, compression_type, pipe_writer_raw); }) } // Spawn writer threads for both items let writer_thread_a = spawn_writer_thread(item_path_a.clone(), compression_type_a.clone(), fd_a_write); let writer_thread_b = spawn_writer_thread(item_path_b.clone(), compression_type_b.clone(), fd_b_write); // Thread to read diff's standard output let stdout_reader_thread = std::thread::spawn(move || { let mut output_buffer = Vec::new(); log::debug!("STDOUT_READER: Reading diff stdout"); // child_stdout_pipe is a ChildStdout, which implements std::io::Read child_stdout_pipe .read_to_end(&mut output_buffer) .map_err(|e| anyhow!("Failed to read diff stdout: {}", e)) .map(|_| output_buffer) // Return the Vec on success }); // Thread to read diff's standard error let stderr_reader_thread = std::thread::spawn(move || { let mut error_buffer = Vec::new(); log::debug!("STDERR_READER: Reading diff stderr"); child_stderr_pipe .read_to_end(&mut error_buffer) .map_err(|e| anyhow!("Failed to read diff stderr: {}", e)) .map(|_| error_buffer) }); // Wait for writer threads to complete (meaning all input has been sent to diff) log::debug!("MAIN: Waiting on writer thread for item A"); if let Err(panic_payload) = writer_thread_a.join() { // Propagate panic from writer thread return Err(anyhow!( "Writer thread for item A (ID: {}) panicked: {:?}", ids[0], panic_payload )); } log::debug!("MAIN: Writer thread for item A completed."); log::debug!("MAIN: Waiting on writer thread for item B"); if let Err(panic_payload) = writer_thread_b.join() { return Err(anyhow!( "Writer thread for item B (ID: {}) panicked: {:?}", ids[1], panic_payload )); } log::debug!("MAIN: Writer thread for item B completed."); log::debug!("MAIN: Done waiting on input-writer threads."); // Now that all input has been sent and input pipes will be closed by threads exiting, // wait for the diff child process to terminate. log::debug!("MAIN: Waiting for diff child process to finish..."); let diff_status = child_process .wait() .map_err(|e| anyhow!("Failed to wait on diff command: {}", e))?; log::debug!( "MAIN: Diff child process finished with status: {}", diff_status ); // Retrieve the captured output from the reader threads. // .join().unwrap() here will panic if the reader thread itself panicked. // The inner Result is from the read_to_end operation within the thread. let stdout_capture_result = stdout_reader_thread .join() .unwrap_or_else(|panic_payload| { Err(anyhow!( "Stdout reader thread panicked: {:?}", panic_payload )) })?; let stderr_capture_result = stderr_reader_thread .join() .unwrap_or_else(|panic_payload| { Err(anyhow!( "Stderr reader thread panicked: {:?}", panic_payload )) })?; // Handle diff's exit status and output match diff_status.code() { Some(0) => { // Exit code 0: No differences log::debug!("MAIN: Diff successful, no differences found."); // Typically, diff -u doesn't print to stdout if no differences. // But if it did, it would be shown here. if !stdout_capture_result.is_empty() { println!("{}", String::from_utf8_lossy(&stdout_capture_result)); } } Some(1) => { // Exit code 1: Differences found log::debug!("MAIN: Diff successful, differences found."); println!("{}", String::from_utf8_lossy(&stdout_capture_result)); } Some(error_code) => { // Exit code > 1: Error in diff utility eprintln!("Diff command failed with exit code: {}", error_code); if !stdout_capture_result.is_empty() { eprintln!( "Diff stdout before error:\n{}", String::from_utf8_lossy(&stdout_capture_result) ); } if !stderr_capture_result.is_empty() { eprintln!( "Diff stderr:\n{}", String::from_utf8_lossy(&stderr_capture_result) ); } return Err(anyhow!( "Diff command reported an error (exit code {})", error_code )); } None => { // Process terminated by a signal eprintln!("Diff command terminated by signal."); if !stderr_capture_result.is_empty() { eprintln!( "Diff stderr before signal termination:\n{}", String::from_utf8_lossy(&stderr_capture_result) ); } return Err(anyhow!("Diff command terminated by signal")); } } Ok(()) }