fix: improve error messages and refactor large functions in save/diff modes
Co-authored-by: aider (openai/andrew/openrouter/qwen/qwen3-coder) <aider@aider.chat>
This commit is contained in:
@@ -151,7 +151,7 @@ pub fn cmd_args_digest_type(cmd: &mut Command, args: &Args) -> MetaPluginType {
|
|||||||
if digest_type_opt.is_err() {
|
if digest_type_opt.is_err() {
|
||||||
cmd.error(
|
cmd.error(
|
||||||
ErrorKind::InvalidValue,
|
ErrorKind::InvalidValue,
|
||||||
format!("Unknown digest type: {}", digest_name),
|
format!("Invalid digest algorithm '{}'. Use 'sha256' or 'md5'", digest_name),
|
||||||
)
|
)
|
||||||
.exit();
|
.exit();
|
||||||
}
|
}
|
||||||
@@ -170,7 +170,7 @@ pub fn cmd_args_compression_type(cmd: &mut Command, args: &Args) -> CompressionT
|
|||||||
if compression_type_opt.is_err() {
|
if compression_type_opt.is_err() {
|
||||||
cmd.error(
|
cmd.error(
|
||||||
ErrorKind::InvalidValue,
|
ErrorKind::InvalidValue,
|
||||||
format!("Unknown compression type: {}", compression_name),
|
format!("Invalid compression algorithm '{}'. Supported algorithms: lz4, gzip, xz, zstd", compression_name),
|
||||||
)
|
)
|
||||||
.exit();
|
.exit();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,43 +1,4 @@
|
|||||||
use crate::compression_engine::{CompressionType, get_compression_engine};
|
fn validate_diff_args(cmd: &mut Command, ids: &Vec<i64>, tags: &Vec<String>) {
|
||||||
use libc::c_int;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
use anyhow::{Result, anyhow};
|
|
||||||
use clap::Command;
|
|
||||||
use nix::Error as NixError;
|
|
||||||
use nix::fcntl::FdFlag;
|
|
||||||
use nix::unistd::{close, pipe};
|
|
||||||
use std::io::Read;
|
|
||||||
use std::os::fd::FromRawFd;
|
|
||||||
use std::process::Stdio;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
// RAII guard for file descriptors to ensure they're closed
|
|
||||||
struct FdGuard {
|
|
||||||
fd: c_int,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FdGuard {
|
|
||||||
fn new(fd: c_int) -> Self {
|
|
||||||
Self { fd }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for FdGuard {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
let _ = close(self.fd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn mode_diff(
|
|
||||||
cmd: &mut Command,
|
|
||||||
_args: &crate::Args,
|
|
||||||
ids: &mut Vec<i64>,
|
|
||||||
tags: &mut Vec<String>,
|
|
||||||
conn: &mut rusqlite::Connection,
|
|
||||||
data_path: PathBuf,
|
|
||||||
) -> Result<()> {
|
|
||||||
if !tags.is_empty() {
|
if !tags.is_empty() {
|
||||||
cmd.error(
|
cmd.error(
|
||||||
clap::error::ErrorKind::InvalidValue,
|
clap::error::ErrorKind::InvalidValue,
|
||||||
@@ -52,66 +13,107 @@ pub fn mode_diff(
|
|||||||
)
|
)
|
||||||
.exit();
|
.exit();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fetch_and_validate_items(
|
||||||
|
conn: &mut rusqlite::Connection,
|
||||||
|
ids: &Vec<i64>,
|
||||||
|
) -> Result<(crate::db::Item, crate::db::Item)> {
|
||||||
// Fetch items, ensuring they exist.
|
// Fetch items, ensuring they exist.
|
||||||
let item_a = crate::db::get_item(conn, ids[0])?
|
let item_a = crate::db::get_item(conn, ids[0])?
|
||||||
.ok_or_else(|| anyhow!("Unable to find first item (ID: {}) in database", ids[0]))?;
|
.ok_or_else(|| anyhow::anyhow!("Unable to find first item (ID: {}) in database", ids[0]))?;
|
||||||
let item_b = crate::db::get_item(conn, ids[1])?
|
let item_b = crate::db::get_item(conn, ids[1])?
|
||||||
.ok_or_else(|| anyhow!("Unable to find second item (ID: {}) in database", ids[1]))?;
|
.ok_or_else(|| anyhow::anyhow!("Unable to find second item (ID: {}) in database", ids[1]))?;
|
||||||
|
|
||||||
log::debug!("MAIN: Found item A {:?}", item_a);
|
log::debug!("MAIN: Found item A {:?}", item_a);
|
||||||
log::debug!("MAIN: Found item B {:?}", item_b);
|
log::debug!("MAIN: Found item B {:?}", item_b);
|
||||||
|
|
||||||
let item_a_id = item_a.id.ok_or_else(|| anyhow!("Item A missing ID"))?;
|
let item_a_id = item_a.id.ok_or_else(|| anyhow::anyhow!("Item A missing ID"))?;
|
||||||
let item_b_id = item_b.id.ok_or_else(|| anyhow!("Item B missing ID"))?;
|
let item_b_id = item_b.id.ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?;
|
||||||
|
|
||||||
// Validate that item IDs are positive to prevent path traversal issues
|
// Validate that item IDs are positive to prevent path traversal issues
|
||||||
if item_a_id <= 0 || item_b_id <= 0 {
|
if item_a_id <= 0 || item_b_id <= 0 {
|
||||||
return Err(anyhow!("Invalid item ID: {} or {}", item_a_id, item_b_id));
|
return Err(anyhow::anyhow!("Invalid item ID: {} or {}", item_a_id, item_b_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
let item_a_tags: Vec<String> = crate::db::get_item_tags(conn, &item_a)?
|
Ok((item_a, item_b))
|
||||||
.into_iter()
|
}
|
||||||
.map(|x| x.name)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let item_b_tags: Vec<String> = crate::db::get_item_tags(conn, &item_b)?
|
fn get_item_tags(conn: &mut rusqlite::Connection, item: &crate::db::Item) -> Result<Vec<String>> {
|
||||||
|
let tags: Vec<String> = crate::db::get_item_tags(conn, item)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|x| x.name)
|
.map(|x| x.name)
|
||||||
.collect();
|
.collect();
|
||||||
|
Ok(tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_diff_paths_and_compression(
|
||||||
|
data_path: &std::path::PathBuf,
|
||||||
|
item_a: &crate::db::Item,
|
||||||
|
item_b: &crate::db::Item,
|
||||||
|
) -> Result<(std::path::PathBuf, crate::compression_engine::CompressionType, std::path::PathBuf, crate::compression_engine::CompressionType)> {
|
||||||
|
let item_a_id = item_a.id.ok_or_else(|| anyhow::anyhow!("Item A missing ID"))?;
|
||||||
|
let item_b_id = item_b.id.ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?;
|
||||||
|
|
||||||
let mut item_path_a = data_path.clone();
|
let mut item_path_a = data_path.clone();
|
||||||
item_path_a.push(item_a_id.to_string());
|
item_path_a.push(item_a_id.to_string());
|
||||||
let compression_type_a = CompressionType::from_str(&item_a.compression)?;
|
let compression_type_a = crate::compression_engine::CompressionType::from_str(&item_a.compression)?;
|
||||||
log::debug!("MAIN: Item A has compression type {:?}", compression_type_a);
|
log::debug!("MAIN: Item A has compression type {:?}", compression_type_a);
|
||||||
|
|
||||||
let mut item_path_b = data_path.clone();
|
let mut item_path_b = data_path.clone();
|
||||||
item_path_b.push(item_b_id.to_string());
|
item_path_b.push(item_b_id.to_string());
|
||||||
let compression_type_b = CompressionType::from_str(&item_b.compression)?;
|
let compression_type_b = crate::compression_engine::CompressionType::from_str(&item_b.compression)?;
|
||||||
log::debug!("MAIN: Item B has compression type {:?}", compression_type_b);
|
log::debug!("MAIN: Item B has compression type {:?}", compression_type_b);
|
||||||
|
|
||||||
|
Ok((item_path_a, compression_type_a, item_path_b, compression_type_b))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_diff_pipes() -> Result<((libc::c_int, libc::c_int), (libc::c_int, libc::c_int))> {
|
||||||
|
use nix::unistd::pipe;
|
||||||
|
use nix::Error as NixError;
|
||||||
|
|
||||||
// Create pipes for diff's input
|
// Create pipes for diff's input
|
||||||
let (fd_a_read, fd_a_write) =
|
let (fd_a_read, fd_a_write) =
|
||||||
pipe().map_err(|e: NixError| anyhow!("Failed to create pipe A: {}", e))?;
|
pipe().map_err(|e: NixError| anyhow::anyhow!("Failed to create pipe A: {}", e))?;
|
||||||
let (fd_b_read, fd_b_write) =
|
let (fd_b_read, fd_b_write) =
|
||||||
pipe().map_err(|e: NixError| anyhow!("Failed to create pipe B: {}", e))?;
|
pipe().map_err(|e: NixError| anyhow::anyhow!("Failed to create pipe B: {}", e))?;
|
||||||
|
|
||||||
|
Ok(((fd_a_read, fd_a_write), (fd_b_read, fd_b_write)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_fd_guards(fd_a_read: libc::c_int, fd_b_read: libc::c_int) -> (FdGuard, FdGuard) {
|
||||||
// Wrap file descriptors in RAII guards
|
// Wrap file descriptors in RAII guards
|
||||||
let _fd_a_read_guard = FdGuard::new(fd_a_read);
|
let fd_a_read_guard = FdGuard::new(fd_a_read);
|
||||||
let _fd_b_read_guard = FdGuard::new(fd_b_read);
|
let fd_b_read_guard = FdGuard::new(fd_b_read);
|
||||||
|
(fd_a_read_guard, fd_b_read_guard)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_fd_cloexec(fd_a_write: libc::c_int, fd_b_write: libc::c_int) -> Result<()> {
|
||||||
|
use nix::fcntl::{fcntl, FcntlArg, FdFlag};
|
||||||
|
|
||||||
// Set FD_CLOEXEC on write ends
|
// Set FD_CLOEXEC on write ends
|
||||||
nix::fcntl::fcntl(
|
fcntl(
|
||||||
fd_a_write,
|
fd_a_write,
|
||||||
nix::fcntl::FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC),
|
FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC),
|
||||||
)
|
)
|
||||||
.map_err(|e| anyhow!("Failed to set FD_CLOEXEC on fd_a_write: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to set FD_CLOEXEC on fd_a_write: {}", e))?;
|
||||||
nix::fcntl::fcntl(
|
fcntl(
|
||||||
fd_b_write,
|
fd_b_write,
|
||||||
nix::fcntl::FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC),
|
FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC),
|
||||||
)
|
)
|
||||||
.map_err(|e| anyhow!("Failed to set FD_CLOEXEC on fd_b_write: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to set FD_CLOEXEC on fd_b_write: {}", e))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn spawn_diff_process(
|
||||||
|
item_a_id: i64,
|
||||||
|
item_a_tags: Vec<String>,
|
||||||
|
item_b_id: i64,
|
||||||
|
item_b_tags: Vec<String>,
|
||||||
|
fd_a_read: libc::c_int,
|
||||||
|
fd_b_read: libc::c_int,
|
||||||
|
) -> Result<std::process::Child> {
|
||||||
log::debug!("MAIN: Creating child process for diff");
|
log::debug!("MAIN: Creating child process for diff");
|
||||||
let mut diff_command = std::process::Command::new("diff");
|
let mut diff_command = std::process::Command::new("diff");
|
||||||
diff_command
|
diff_command
|
||||||
@@ -130,18 +132,67 @@ pub fn mode_diff(
|
|||||||
item_b_tags.join(" ")
|
item_b_tags.join(" ")
|
||||||
))
|
))
|
||||||
.arg(format!("/dev/fd/{}", fd_b_read))
|
.arg(format!("/dev/fd/{}", fd_b_read))
|
||||||
.stdin(Stdio::null())
|
.stdin(std::process::Stdio::null())
|
||||||
.stdout(Stdio::piped())
|
.stdout(std::process::Stdio::piped())
|
||||||
.stderr(Stdio::piped());
|
.stderr(std::process::Stdio::piped());
|
||||||
|
|
||||||
let mut child_process = diff_command
|
let child_process = diff_command
|
||||||
.spawn()
|
.spawn()
|
||||||
.map_err(|e| anyhow!("Failed to execute diff command: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to execute diff command: {}", e))?;
|
||||||
|
|
||||||
// Close read ends in parent process - they're now guarded by FdGuard
|
Ok(child_process)
|
||||||
drop(_fd_a_read_guard);
|
}
|
||||||
drop(_fd_b_read_guard);
|
|
||||||
|
|
||||||
|
// RAII guard for file descriptors to ensure they're closed
|
||||||
|
struct FdGuard {
|
||||||
|
fd: libc::c_int,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FdGuard {
|
||||||
|
fn new(fd: libc::c_int) -> Self {
|
||||||
|
Self { fd }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for FdGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let _ = nix::unistd::close(self.fd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a function to write item data to a pipe
|
||||||
|
fn write_item_to_pipe(
|
||||||
|
item_path: std::path::PathBuf,
|
||||||
|
compression_type: crate::compression_engine::CompressionType,
|
||||||
|
pipe_writer_raw: std::fs::File,
|
||||||
|
) -> Result<()> {
|
||||||
|
use std::io::BufWriter;
|
||||||
|
let mut buffered_pipe_writer = BufWriter::new(pipe_writer_raw);
|
||||||
|
let engine =
|
||||||
|
crate::compression_engine::get_compression_engine(compression_type).expect("Unable to get compression engine");
|
||||||
|
log::debug!("THREAD: Sending item to diff");
|
||||||
|
engine
|
||||||
|
.copy(item_path, &mut buffered_pipe_writer)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to copy/compress item: {}", e))?;
|
||||||
|
log::debug!("THREAD: Done sending item to diff");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Function to spawn a writer thread for an item
|
||||||
|
fn spawn_writer_thread(
|
||||||
|
item_path: std::path::PathBuf,
|
||||||
|
compression_type: crate::compression_engine::CompressionType,
|
||||||
|
fd_write: libc::c_int,
|
||||||
|
) -> std::thread::JoinHandle<Result<()>> {
|
||||||
|
let pipe_writer_raw = unsafe { std::fs::File::from_raw_fd(fd_write) };
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
write_item_to_pipe(item_path, compression_type, pipe_writer_raw)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn execute_diff_command(
|
||||||
|
child_process: &mut std::process::Child,
|
||||||
|
) -> Result<(Vec<u8>, Vec<u8>)> {
|
||||||
let mut child_stdout_pipe = child_process
|
let mut child_stdout_pipe = child_process
|
||||||
.stdout
|
.stdout
|
||||||
.take()
|
.take()
|
||||||
@@ -153,43 +204,6 @@ pub fn mode_diff(
|
|||||||
|
|
||||||
log::debug!("MAIN: Creating threads for diff I/O");
|
log::debug!("MAIN: Creating threads for diff I/O");
|
||||||
|
|
||||||
// Create a function to write item data to a pipe
|
|
||||||
fn write_item_to_pipe(
|
|
||||||
item_path: PathBuf,
|
|
||||||
compression_type: CompressionType,
|
|
||||||
pipe_writer_raw: std::fs::File,
|
|
||||||
) -> Result<()> {
|
|
||||||
use std::io::BufWriter;
|
|
||||||
let mut buffered_pipe_writer = BufWriter::new(pipe_writer_raw);
|
|
||||||
let engine =
|
|
||||||
get_compression_engine(compression_type).expect("Unable to get compression engine");
|
|
||||||
log::debug!("THREAD: Sending item to diff");
|
|
||||||
engine
|
|
||||||
.copy(item_path, &mut buffered_pipe_writer)
|
|
||||||
.map_err(|e| anyhow!("Failed to copy/compress item: {}", e))?;
|
|
||||||
log::debug!("THREAD: Done sending item to diff");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Function to spawn a writer thread for an item
|
|
||||||
fn spawn_writer_thread(
|
|
||||||
item_path: PathBuf,
|
|
||||||
compression_type: CompressionType,
|
|
||||||
fd_write: c_int,
|
|
||||||
) -> std::thread::JoinHandle<Result<()>> {
|
|
||||||
let pipe_writer_raw = unsafe { std::fs::File::from_raw_fd(fd_write) };
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
write_item_to_pipe(item_path, compression_type, pipe_writer_raw)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn writer threads for both items
|
|
||||||
let writer_thread_a =
|
|
||||||
spawn_writer_thread(item_path_a.clone(), compression_type_a.clone(), fd_a_write);
|
|
||||||
|
|
||||||
let writer_thread_b =
|
|
||||||
spawn_writer_thread(item_path_b.clone(), compression_type_b.clone(), fd_b_write);
|
|
||||||
|
|
||||||
// Thread to read diff's standard output
|
// Thread to read diff's standard output
|
||||||
let stdout_reader_thread = std::thread::spawn(move || {
|
let stdout_reader_thread = std::thread::spawn(move || {
|
||||||
let mut output_buffer = Vec::new();
|
let mut output_buffer = Vec::new();
|
||||||
@@ -197,7 +211,7 @@ pub fn mode_diff(
|
|||||||
// child_stdout_pipe is a ChildStdout, which implements std::io::Read
|
// child_stdout_pipe is a ChildStdout, which implements std::io::Read
|
||||||
child_stdout_pipe
|
child_stdout_pipe
|
||||||
.read_to_end(&mut output_buffer)
|
.read_to_end(&mut output_buffer)
|
||||||
.map_err(|e| anyhow!("Failed to read diff stdout: {}", e))
|
.map_err(|e| anyhow::anyhow!("Failed to read diff stdout: {}", e))
|
||||||
.map(|_| output_buffer) // Return the Vec<u8> on success
|
.map(|_| output_buffer) // Return the Vec<u8> on success
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -207,73 +221,33 @@ pub fn mode_diff(
|
|||||||
log::debug!("STDERR_READER: Reading diff stderr");
|
log::debug!("STDERR_READER: Reading diff stderr");
|
||||||
child_stderr_pipe
|
child_stderr_pipe
|
||||||
.read_to_end(&mut error_buffer)
|
.read_to_end(&mut error_buffer)
|
||||||
.map_err(|e| anyhow!("Failed to read diff stderr: {}", e))
|
.map_err(|e| anyhow::anyhow!("Failed to read diff stderr: {}", e))
|
||||||
.map(|_| error_buffer)
|
.map(|_| error_buffer)
|
||||||
});
|
});
|
||||||
|
|
||||||
// Wait for writer threads to complete (meaning all input has been sent to diff)
|
|
||||||
log::debug!("MAIN: Waiting on writer thread for item A");
|
|
||||||
match writer_thread_a.join() {
|
|
||||||
Ok(Ok(())) => {
|
|
||||||
log::debug!("MAIN: Writer thread for item A completed successfully.");
|
|
||||||
}
|
|
||||||
Ok(Err(e)) => {
|
|
||||||
return Err(anyhow!("Writer thread for item A failed: {}", e));
|
|
||||||
}
|
|
||||||
Err(panic_payload) => {
|
|
||||||
return Err(anyhow!(
|
|
||||||
"Writer thread for item A (ID: {}) panicked: {:?}",
|
|
||||||
ids[0],
|
|
||||||
panic_payload
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!("MAIN: Waiting on writer thread for item B");
|
|
||||||
match writer_thread_b.join() {
|
|
||||||
Ok(Ok(())) => {
|
|
||||||
log::debug!("MAIN: Writer thread for item B completed successfully.");
|
|
||||||
}
|
|
||||||
Ok(Err(e)) => {
|
|
||||||
return Err(anyhow!("Writer thread for item B failed: {}", e));
|
|
||||||
}
|
|
||||||
Err(panic_payload) => {
|
|
||||||
return Err(anyhow!(
|
|
||||||
"Writer thread for item B (ID: {}) panicked: {:?}",
|
|
||||||
ids[1],
|
|
||||||
panic_payload
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!("MAIN: Done waiting on input-writer threads.");
|
|
||||||
|
|
||||||
// Now that all input has been sent and input pipes will be closed by threads exiting,
|
|
||||||
// wait for the diff child process to terminate.
|
|
||||||
log::debug!("MAIN: Waiting for diff child process to finish...");
|
|
||||||
let diff_status = child_process
|
|
||||||
.wait()
|
|
||||||
.map_err(|e| anyhow!("Failed to wait on diff command: {}", e))?;
|
|
||||||
log::debug!(
|
|
||||||
"MAIN: Diff child process finished with status: {}",
|
|
||||||
diff_status
|
|
||||||
);
|
|
||||||
|
|
||||||
// Retrieve the captured output from the reader threads.
|
// Retrieve the captured output from the reader threads.
|
||||||
let stdout_capture_result = stdout_reader_thread
|
let stdout_capture_result = stdout_reader_thread
|
||||||
.join()
|
.join()
|
||||||
.map_err(|panic_payload| {
|
.map_err(|panic_payload| {
|
||||||
anyhow!("Stdout reader thread panicked: {:?}", panic_payload)
|
anyhow::anyhow!("Stdout reader thread panicked: {:?}", panic_payload)
|
||||||
})?
|
})?
|
||||||
.map_err(|e| anyhow!("Failed to read diff stdout: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to read diff stdout: {}", e))?;
|
||||||
|
|
||||||
let stderr_capture_result = stderr_reader_thread
|
let stderr_capture_result = stderr_reader_thread
|
||||||
.join()
|
.join()
|
||||||
.map_err(|panic_payload| {
|
.map_err(|panic_payload| {
|
||||||
anyhow!("Stderr reader thread panicked: {:?}", panic_payload)
|
anyhow::anyhow!("Stderr reader thread panicked: {:?}", panic_payload)
|
||||||
})?
|
})?
|
||||||
.map_err(|e| anyhow!("Failed to read diff stderr: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to read diff stderr: {}", e))?;
|
||||||
|
|
||||||
|
Ok((stdout_capture_result, stderr_capture_result))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_diff_output(
|
||||||
|
diff_status: std::process::ExitStatus,
|
||||||
|
stdout_capture_result: Vec<u8>,
|
||||||
|
stderr_capture_result: Vec<u8>,
|
||||||
|
) -> Result<()> {
|
||||||
// Handle diff's exit status and output
|
// Handle diff's exit status and output
|
||||||
match diff_status.code() {
|
match diff_status.code() {
|
||||||
Some(0) => {
|
Some(0) => {
|
||||||
@@ -305,7 +279,7 @@ pub fn mode_diff(
|
|||||||
String::from_utf8_lossy(&stderr_capture_result)
|
String::from_utf8_lossy(&stderr_capture_result)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
return Err(anyhow!(
|
return Err(anyhow::anyhow!(
|
||||||
"Diff command reported an error (exit code {})",
|
"Diff command reported an error (exit code {})",
|
||||||
error_code
|
error_code
|
||||||
));
|
));
|
||||||
@@ -319,9 +293,107 @@ pub fn mode_diff(
|
|||||||
String::from_utf8_lossy(&stderr_capture_result)
|
String::from_utf8_lossy(&stderr_capture_result)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
return Err(anyhow!("Diff command terminated by signal"));
|
return Err(anyhow::anyhow!("Diff command terminated by signal"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn mode_diff(
|
||||||
|
cmd: &mut Command,
|
||||||
|
_args: &crate::Args,
|
||||||
|
ids: &mut Vec<i64>,
|
||||||
|
tags: &mut Vec<String>,
|
||||||
|
conn: &mut rusqlite::Connection,
|
||||||
|
data_path: std::path::PathBuf,
|
||||||
|
) -> Result<()> {
|
||||||
|
validate_diff_args(cmd, ids, tags);
|
||||||
|
let (item_a, item_b) = fetch_and_validate_items(conn, ids)?;
|
||||||
|
|
||||||
|
let item_a_tags = get_item_tags(conn, &item_a)?;
|
||||||
|
let item_b_tags = get_item_tags(conn, &item_b)?;
|
||||||
|
|
||||||
|
let (item_path_a, compression_type_a, item_path_b, compression_type_b) =
|
||||||
|
setup_diff_paths_and_compression(&data_path, &item_a, &item_b)?;
|
||||||
|
|
||||||
|
let ((fd_a_read, fd_a_write), (fd_b_read, fd_b_write)) = setup_diff_pipes()?;
|
||||||
|
let (_fd_a_read_guard, _fd_b_read_guard) = setup_fd_guards(fd_a_read, fd_b_read);
|
||||||
|
set_fd_cloexec(fd_a_write, fd_b_write)?;
|
||||||
|
|
||||||
|
let item_a_id = item_a.id.ok_or_else(|| anyhow::anyhow!("Item A missing ID"))?;
|
||||||
|
let item_b_id = item_b.id.ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?;
|
||||||
|
|
||||||
|
let mut child_process = spawn_diff_process(
|
||||||
|
item_a_id,
|
||||||
|
item_a_tags,
|
||||||
|
item_b_id,
|
||||||
|
item_b_tags,
|
||||||
|
fd_a_read,
|
||||||
|
fd_b_read,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// Close read ends in parent process - they're now guarded by FdGuard
|
||||||
|
drop(_fd_a_read_guard);
|
||||||
|
drop(_fd_b_read_guard);
|
||||||
|
|
||||||
|
// Spawn writer threads for both items
|
||||||
|
let writer_thread_a =
|
||||||
|
spawn_writer_thread(item_path_a.clone(), compression_type_a.clone(), fd_a_write);
|
||||||
|
|
||||||
|
let writer_thread_b =
|
||||||
|
spawn_writer_thread(item_path_b.clone(), compression_type_b.clone(), fd_b_write);
|
||||||
|
|
||||||
|
// Wait for writer threads to complete (meaning all input has been sent to diff)
|
||||||
|
log::debug!("MAIN: Waiting on writer thread for item A");
|
||||||
|
match writer_thread_a.join() {
|
||||||
|
Ok(Ok(())) => {
|
||||||
|
log::debug!("MAIN: Writer thread for item A completed successfully.");
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
return Err(anyhow::anyhow!("Writer thread for item A failed: {}", e));
|
||||||
|
}
|
||||||
|
Err(panic_payload) => {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"Writer thread for item A (ID: {}) panicked: {:?}",
|
||||||
|
ids[0],
|
||||||
|
panic_payload
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug!("MAIN: Waiting on writer thread for item B");
|
||||||
|
match writer_thread_b.join() {
|
||||||
|
Ok(Ok(())) => {
|
||||||
|
log::debug!("MAIN: Writer thread for item B completed successfully.");
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
return Err(anyhow::anyhow!("Writer thread for item B failed: {}", e));
|
||||||
|
}
|
||||||
|
Err(panic_payload) => {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"Writer thread for item B (ID: {}) panicked: {:?}",
|
||||||
|
ids[1],
|
||||||
|
panic_payload
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug!("MAIN: Done waiting on input-writer threads.");
|
||||||
|
|
||||||
|
// Now that all input has been sent and input pipes will be closed by threads exiting,
|
||||||
|
// wait for the diff child process to terminate.
|
||||||
|
log::debug!("MAIN: Waiting for diff child process to finish...");
|
||||||
|
let diff_status = child_process
|
||||||
|
.wait()
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to wait on diff command: {}", e))?;
|
||||||
|
log::debug!(
|
||||||
|
"MAIN: Diff child process finished with status: {}",
|
||||||
|
diff_status
|
||||||
|
);
|
||||||
|
|
||||||
|
let (stdout_capture_result, stderr_capture_result) = execute_diff_command(&mut child_process)?;
|
||||||
|
handle_diff_output(diff_status, stdout_capture_result, stderr_capture_result)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,57 +1,39 @@
|
|||||||
use anyhow::{Context, Result, anyhow};
|
fn validate_save_args(cmd: &mut Command, ids: &Vec<i64>) {
|
||||||
use gethostname::gethostname;
|
|
||||||
use is_terminal::IsTerminal;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::io::{self, Read, Write};
|
|
||||||
|
|
||||||
use clap::Command;
|
|
||||||
use clap::error::ErrorKind;
|
|
||||||
use log::debug;
|
|
||||||
use rusqlite::Connection;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
use crate::compression_engine::get_compression_engine;
|
|
||||||
use crate::db::{self};
|
|
||||||
use crate::meta_plugin::{MetaPlugin, MetaPluginType, get_meta_plugin};
|
|
||||||
use crate::modes::common::{cmd_args_compression_type, cmd_args_digest_type, cmd_args_meta_plugin_types, get_meta_from_env, store_item_meta_value};
|
|
||||||
use chrono::Utc;
|
|
||||||
|
|
||||||
pub fn mode_save(
|
|
||||||
cmd: &mut Command,
|
|
||||||
args: &crate::Args,
|
|
||||||
ids: &mut Vec<i64>,
|
|
||||||
tags: &mut Vec<String>,
|
|
||||||
conn: &mut Connection,
|
|
||||||
data_path: PathBuf,
|
|
||||||
) -> Result<()> {
|
|
||||||
if !ids.is_empty() {
|
if !ids.is_empty() {
|
||||||
cmd.error(
|
cmd.error(
|
||||||
ErrorKind::InvalidValue,
|
clap::error::ErrorKind::InvalidValue,
|
||||||
"ID given, you cannot supply IDs when using --save",
|
"ID given, you cannot supply IDs when using --save",
|
||||||
)
|
)
|
||||||
.exit();
|
.exit();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn initialize_tags(tags: &mut Vec<String>) {
|
||||||
if tags.is_empty() {
|
if tags.is_empty() {
|
||||||
tags.push("none".to_string());
|
tags.push("none".to_string());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_compression_and_plugins(
|
||||||
|
cmd: &mut Command,
|
||||||
|
args: &crate::Args,
|
||||||
|
) -> (crate::compression_engine::CompressionType, Box<dyn crate::compression_engine::CompressionEngine>, Vec<Box<dyn crate::meta_plugin::MetaPlugin>>) {
|
||||||
let digest_type = cmd_args_digest_type(cmd, &args);
|
let digest_type = cmd_args_digest_type(cmd, &args);
|
||||||
debug!("MAIN: Digest type: {:?}", digest_type);
|
debug!("MAIN: Digest type: {:?}", digest_type);
|
||||||
|
|
||||||
let compression_type = cmd_args_compression_type(cmd, &args);
|
let compression_type = cmd_args_compression_type(cmd, &args);
|
||||||
debug!("MAIN: Compression type: {:?}", compression_type);
|
debug!("MAIN: Compression type: {:?}", compression_type);
|
||||||
let compression_engine =
|
let compression_engine =
|
||||||
get_compression_engine(compression_type.clone()).expect("Unable to get compression engine");
|
crate::compression_engine::get_compression_engine(compression_type.clone()).expect("Unable to get compression engine");
|
||||||
|
|
||||||
// Start with meta plugin types from command line
|
// Start with meta plugin types from command line
|
||||||
let mut meta_plugin_types: Vec<MetaPluginType> = cmd_args_meta_plugin_types(cmd, &args);
|
let mut meta_plugin_types: Vec<crate::meta_plugin::MetaPluginType> = cmd_args_meta_plugin_types(cmd, &args);
|
||||||
debug!("MAIN: Meta plugin types: {:?}", meta_plugin_types);
|
debug!("MAIN: Meta plugin types: {:?}", meta_plugin_types);
|
||||||
|
|
||||||
// Convert digest type to meta plugin type and add to the list if needed
|
// Convert digest type to meta plugin type and add to the list if needed
|
||||||
let digest_meta_plugin_type = match digest_type {
|
let digest_meta_plugin_type = match digest_type {
|
||||||
crate::meta_plugin::MetaPluginType::DigestSha256 => Some(MetaPluginType::DigestSha256),
|
crate::meta_plugin::MetaPluginType::DigestSha256 => Some(crate::meta_plugin::MetaPluginType::DigestSha256),
|
||||||
crate::meta_plugin::MetaPluginType::DigestMd5 => Some(MetaPluginType::DigestMd5),
|
crate::meta_plugin::MetaPluginType::DigestMd5 => Some(crate::meta_plugin::MetaPluginType::DigestMd5),
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -63,9 +45,9 @@ pub fn mode_save(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize meta_plugins with MetaPlugin instances for each MetaPluginType
|
// Initialize meta_plugins with MetaPlugin instances for each MetaPluginType
|
||||||
let mut meta_plugins: Vec<Box<dyn MetaPlugin>> = meta_plugin_types
|
let mut meta_plugins: Vec<Box<dyn crate::meta_plugin::MetaPlugin>> = meta_plugin_types
|
||||||
.iter()
|
.iter()
|
||||||
.map(|meta_plugin_type| get_meta_plugin(meta_plugin_type.clone()))
|
.map(|meta_plugin_type| crate::meta_plugin::get_meta_plugin(meta_plugin_type.clone()))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Check for unsupported meta plugins, warn the user, and remove them from the list
|
// Check for unsupported meta plugins, warn the user, and remove them from the list
|
||||||
@@ -76,21 +58,30 @@ pub fn mode_save(
|
|||||||
// We need to get the meta name for the warning message
|
// We need to get the meta name for the warning message
|
||||||
// Since we can't mutably borrow meta_plugin here, we create a temporary one
|
// Since we can't mutably borrow meta_plugin here, we create a temporary one
|
||||||
let meta_plugin_type = meta_plugin_types[i].clone();
|
let meta_plugin_type = meta_plugin_types[i].clone();
|
||||||
let mut temp_plugin = get_meta_plugin(meta_plugin_type);
|
let mut temp_plugin = crate::meta_plugin::get_meta_plugin(meta_plugin_type);
|
||||||
eprintln!("Warning: Meta plugin '{}' is enabled but not supported on this system", temp_plugin.meta_name());
|
eprintln!("Warning: Meta plugin '{}' is enabled but not supported on this system", temp_plugin.meta_name());
|
||||||
}
|
}
|
||||||
i += 1;
|
i += 1;
|
||||||
is_supported
|
is_supported
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut item = db::Item {
|
(compression_type, compression_engine, meta_plugins)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_and_log_item(
|
||||||
|
conn: &mut rusqlite::Connection,
|
||||||
|
args: &crate::Args,
|
||||||
|
tags: &Vec<String>,
|
||||||
|
compression_type: &crate::compression_engine::CompressionType,
|
||||||
|
) -> Result<crate::db::Item> {
|
||||||
|
let mut item = crate::db::Item {
|
||||||
id: None,
|
id: None,
|
||||||
ts: Utc::now(),
|
ts: chrono::Utc::now(),
|
||||||
size: None,
|
size: None,
|
||||||
compression: compression_type.to_string(),
|
compression: compression_type.to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let id = db::insert_item(conn, item.clone())?;
|
let id = crate::db::insert_item(conn, item.clone())?;
|
||||||
item.id = Some(id);
|
item.id = Some(id);
|
||||||
debug!("MAIN: Added item {:?}", item.clone());
|
debug!("MAIN: Added item {:?}", item.clone());
|
||||||
|
|
||||||
@@ -117,14 +108,23 @@ pub fn mode_save(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
db::set_item_tags(conn, item.clone(), tags)?;
|
Ok(item)
|
||||||
|
}
|
||||||
|
|
||||||
// Use a transaction for database operations to ensure atomicity
|
fn setup_item_metadata(
|
||||||
let tx = conn.transaction()?;
|
conn: &rusqlite::Connection,
|
||||||
|
args: &crate::Args,
|
||||||
|
item: &crate::db::Item,
|
||||||
|
tags: &Vec<String>,
|
||||||
|
) -> Result<()> {
|
||||||
|
crate::db::set_item_tags(conn, item.clone(), tags)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
let mut item_meta: HashMap<String, String> = get_meta_from_env();
|
fn collect_item_meta(args: &crate::Args) -> std::collections::HashMap<String, String> {
|
||||||
|
let mut item_meta: std::collections::HashMap<String, String> = crate::modes::common::get_meta_from_env();
|
||||||
|
|
||||||
if let Ok(hostname) = gethostname().into_string() {
|
if let Ok(hostname) = gethostname::gethostname().into_string() {
|
||||||
if !item_meta.contains_key("hostname") {
|
if !item_meta.contains_key("hostname") {
|
||||||
item_meta.insert("hostname".to_string(), hostname);
|
item_meta.insert("hostname".to_string(), hostname);
|
||||||
}
|
}
|
||||||
@@ -135,30 +135,35 @@ pub fn mode_save(
|
|||||||
item_meta.insert(item.key, item.value);
|
item_meta.insert(item.key, item.value);
|
||||||
}
|
}
|
||||||
|
|
||||||
let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?;
|
item_meta
|
||||||
for kv in item_meta.iter() {
|
}
|
||||||
let meta = db::Meta {
|
|
||||||
id: item_id,
|
fn process_input_stream(
|
||||||
name: kv.0.to_string(),
|
compression_engine: &Box<dyn crate::compression_engine::CompressionEngine>,
|
||||||
value: kv.1.to_string(),
|
data_path: &std::path::PathBuf,
|
||||||
|
item_id: i64,
|
||||||
|
meta_plugins: &mut Vec<Box<dyn crate::meta_plugin::MetaPlugin>>,
|
||||||
|
) -> Result<(Box<dyn std::io::Write>, crate::db::Item)> {
|
||||||
|
let mut item = crate::db::Item {
|
||||||
|
id: Some(item_id),
|
||||||
|
ts: chrono::Utc::now(),
|
||||||
|
size: None,
|
||||||
|
compression: String::new(), // Will be set later
|
||||||
};
|
};
|
||||||
db::store_meta(&tx, meta)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut item_path = data_path.clone();
|
let mut item_path = data_path.clone();
|
||||||
item_path.push(item_id.to_string());
|
item_path.push(item_id.to_string());
|
||||||
|
|
||||||
let mut stdin = io::stdin().lock();
|
let mut stdin = std::io::stdin().lock();
|
||||||
let mut stdout = io::stdout().lock();
|
let mut stdout = std::io::stdout().lock();
|
||||||
let mut buffer = [0; libc::BUFSIZ as usize];
|
let mut buffer = [0; libc::BUFSIZ as usize];
|
||||||
|
|
||||||
let mut item_out: Box<dyn Write> =
|
let mut item_out: Box<dyn std::io::Write> =
|
||||||
compression_engine
|
compression_engine
|
||||||
.create(item_path.clone())
|
.create(item_path.clone())
|
||||||
.context(anyhow!(
|
.context(anyhow::anyhow!(
|
||||||
"Unable to write file {:?} using compression {:?}",
|
"Unable to write file {:?}",
|
||||||
item_path,
|
item_path
|
||||||
compression_type
|
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
debug!("MAIN: Starting IO loop");
|
debug!("MAIN: Starting IO loop");
|
||||||
@@ -188,12 +193,20 @@ pub fn mode_save(
|
|||||||
stdout.flush()?;
|
stdout.flush()?;
|
||||||
item_out.flush()?;
|
item_out.flush()?;
|
||||||
|
|
||||||
|
Ok((item_out, item))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finalize_meta_plugins(
|
||||||
|
conn: &rusqlite::Connection,
|
||||||
|
meta_plugins: &mut Vec<Box<dyn crate::meta_plugin::MetaPlugin>>,
|
||||||
|
item: &crate::db::Item,
|
||||||
|
) -> Result<()> {
|
||||||
for meta_plugin in meta_plugins.iter_mut() {
|
for meta_plugin in meta_plugins.iter_mut() {
|
||||||
let meta_name = meta_plugin.meta_name();
|
let meta_name = meta_plugin.meta_name();
|
||||||
|
|
||||||
match meta_plugin.finalize() {
|
match meta_plugin.finalize() {
|
||||||
Ok(meta_value) => {
|
Ok(meta_value) => {
|
||||||
if let Err(e) = store_item_meta_value(&tx, item.clone(), meta_name.clone(), meta_value) {
|
if let Err(e) = crate::modes::common::store_item_meta_value(conn, item.clone(), meta_name.clone(), meta_value) {
|
||||||
eprintln!("Warning: Failed to store meta value for {}: {}", meta_name, e);
|
eprintln!("Warning: Failed to store meta value for {}: {}", meta_name, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -202,8 +215,52 @@ pub fn mode_save(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
db::update_item(&tx, item.clone())?;
|
pub fn mode_save(
|
||||||
|
cmd: &mut Command,
|
||||||
|
args: &crate::Args,
|
||||||
|
ids: &mut Vec<i64>,
|
||||||
|
tags: &mut Vec<String>,
|
||||||
|
conn: &mut rusqlite::Connection,
|
||||||
|
data_path: std::path::PathBuf,
|
||||||
|
) -> Result<()> {
|
||||||
|
validate_save_args(cmd, ids);
|
||||||
|
initialize_tags(tags);
|
||||||
|
|
||||||
|
let (compression_type, compression_engine, mut meta_plugins) = setup_compression_and_plugins(cmd, args);
|
||||||
|
|
||||||
|
let mut item = create_and_log_item(conn, args, tags, &compression_type)?;
|
||||||
|
setup_item_metadata(conn, args, &item, tags)?;
|
||||||
|
|
||||||
|
// Use a transaction for database operations to ensure atomicity
|
||||||
|
let tx = conn.transaction()?;
|
||||||
|
|
||||||
|
let item_meta = collect_item_meta(args);
|
||||||
|
let item_id = item.id.ok_or_else(|| anyhow::anyhow!("Item missing ID"))?;
|
||||||
|
|
||||||
|
for kv in item_meta.iter() {
|
||||||
|
let meta = crate::db::Meta {
|
||||||
|
id: item_id,
|
||||||
|
name: kv.0.to_string(),
|
||||||
|
value: kv.1.to_string(),
|
||||||
|
};
|
||||||
|
crate::db::store_meta(&tx, meta)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (_item_out, processed_item) = process_input_stream(
|
||||||
|
&compression_engine,
|
||||||
|
&data_path,
|
||||||
|
item_id,
|
||||||
|
&mut meta_plugins,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
item.size = processed_item.size;
|
||||||
|
item.compression = compression_type.to_string();
|
||||||
|
|
||||||
|
finalize_meta_plugins(&tx, &mut meta_plugins, &item)?;
|
||||||
|
crate::db::update_item(&tx, item.clone())?;
|
||||||
|
|
||||||
// Commit the transaction
|
// Commit the transaction
|
||||||
tx.commit()?;
|
tx.commit()?;
|
||||||
|
|||||||
Reference in New Issue
Block a user