feat: add --update mode, --meta/--meta-plugin flags, streaming diff
- Add --update mode to modify tags and metadata for existing items by ID
- Add --meta key=value flag to set metadata during save/update
- Add --meta key (bare) to delete metadata keys or filter by existence
- Add --meta-plugin/-M name:{json} flag for plugin options via CLI
- Env meta plugin now uses options from --meta-plugin instead of only env vars
- Stream decompressed content to diff via /dev/fd pipes (no temp files)
- Wire --list-format CLI arg to settings (was parsed but ignored)
- Allow --info to accept tags (was restricted to numeric IDs only)
- Change DB meta filtering to HashMap<String, Option<String>> for exact match + key existence
- Fix fcntl error checking in diff pre_exec
- Fix README inaccuracies (delete by tag, nonexistent --digest flag, meta plugin key names)
This commit is contained in:
@@ -1,12 +1,18 @@
|
||||
use crate::config;
|
||||
use crate::services::item_service::ItemService;
|
||||
/// Diff mode implementation.
|
||||
///
|
||||
/// This module provides functionality for comparing two items and displaying their
|
||||
/// differences using external diff tools.
|
||||
/// differences using external diff tools. Decompressed content is streamed to diff
|
||||
/// via pipes and /dev/fd file descriptors — no temporary files are created.
|
||||
use crate::common::PIPESIZE;
|
||||
use crate::config;
|
||||
use crate::services::compression_service::CompressionService;
|
||||
use crate::services::item_service::ItemService;
|
||||
use anyhow::{Context, Result};
|
||||
use clap::Command;
|
||||
use log::debug;
|
||||
use std::io::Read;
|
||||
use std::os::unix::io::{FromRawFd, RawFd};
|
||||
use std::os::unix::process::CommandExt;
|
||||
|
||||
fn validate_diff_args(_cmd: &mut Command, ids: &[i64], tags: &[String]) -> anyhow::Result<()> {
|
||||
if !tags.is_empty() {
|
||||
@@ -23,19 +29,6 @@ fn validate_diff_args(_cmd: &mut Command, ids: &[i64], tags: &[String]) -> anyho
|
||||
}
|
||||
|
||||
/// Fetches and validates items from the database for diff operation.
|
||||
///
|
||||
/// This function retrieves two items by their IDs from the database using the
|
||||
/// item service, which handles validation, and returns them as a tuple.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `conn` - Mutable reference to the database connection.
|
||||
/// * `ids` - Vector of item IDs to fetch.
|
||||
/// * `item_service` - Reference to the item service for validation.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<(ItemWithMeta, ItemWithMeta)>` - Tuple of items with metadata or error.
|
||||
fn fetch_and_validate_items(
|
||||
conn: &mut rusqlite::Connection,
|
||||
ids: &[i64],
|
||||
@@ -44,7 +37,6 @@ fn fetch_and_validate_items(
|
||||
crate::services::types::ItemWithMeta,
|
||||
crate::services::types::ItemWithMeta,
|
||||
)> {
|
||||
// Fetch items using the service, which handles validation
|
||||
let item_a = item_service
|
||||
.get_item(conn, ids[0])
|
||||
.with_context(|| format!("Unable to find first item (ID: {}) in database", ids[0]))?;
|
||||
@@ -52,48 +44,12 @@ fn fetch_and_validate_items(
|
||||
.get_item(conn, ids[1])
|
||||
.with_context(|| format!("Unable to find second item (ID: {}) in database", ids[1]))?;
|
||||
|
||||
debug!("MAIN: Found item A {:?}", item_a.item);
|
||||
debug!("MAIN: Found item B {:?}", item_b.item);
|
||||
debug!("DIFF: Found item A {:?}", item_a.item);
|
||||
debug!("DIFF: Found item B {:?}", item_b.item);
|
||||
|
||||
Ok((item_a, item_b))
|
||||
}
|
||||
|
||||
/// Sets up file paths and compression for diff operation.
|
||||
///
|
||||
/// This function constructs the file paths for the two items and prepares the
|
||||
/// compression engines needed for reading their contents.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `item_service` - Reference to the item service.
|
||||
/// * `item_a` - First item with metadata.
|
||||
/// * `item_b` - Second item with metadata.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<(PathBuf, PathBuf)>` - Tuple of item file paths or error.
|
||||
fn setup_diff_paths_and_compression(
|
||||
item_service: &ItemService,
|
||||
item_a: &crate::services::types::ItemWithMeta,
|
||||
item_b: &crate::services::types::ItemWithMeta,
|
||||
) -> Result<(std::path::PathBuf, std::path::PathBuf)> {
|
||||
let item_a_id = item_a
|
||||
.item
|
||||
.id
|
||||
.ok_or_else(|| anyhow::anyhow!("Item A missing ID"))?;
|
||||
let item_b_id = item_b
|
||||
.item
|
||||
.id
|
||||
.ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?;
|
||||
|
||||
// Use the service's data path to construct proper file paths
|
||||
let data_path = item_service.get_data_path();
|
||||
let item_a_path = data_path.join(item_a_id.to_string());
|
||||
let item_b_path = data_path.join(item_b_id.to_string());
|
||||
|
||||
Ok((item_a_path, item_b_path))
|
||||
}
|
||||
|
||||
pub fn mode_diff(
|
||||
cmd: &mut Command,
|
||||
args: &crate::args::Args,
|
||||
@@ -125,51 +81,129 @@ pub fn mode_diff(
|
||||
|
||||
validate_diff_args(cmd, &ids, &tags)?;
|
||||
|
||||
let settings = crate::config::Settings::new(args, crate::config::Settings::default_dir()?)?;
|
||||
|
||||
let item_service = crate::services::item_service::ItemService::new(settings.dir.clone());
|
||||
|
||||
let settings = config::Settings::new(args, config::Settings::default_dir()?)?;
|
||||
let item_service = ItemService::new(settings.dir.clone());
|
||||
let (item_a, item_b) = fetch_and_validate_items(conn, &ids, &item_service)?;
|
||||
|
||||
let (path_a, path_b) = setup_diff_paths_and_compression(&item_service, &item_a, &item_b)?;
|
||||
|
||||
run_external_diff(&path_a, &path_b)?;
|
||||
|
||||
Ok(())
|
||||
run_external_diff(&item_service, &item_a, &item_b)
|
||||
}
|
||||
|
||||
/// Runs external diff command to compare two files.
|
||||
/// Creates a pipe via libc, returns (read_fd, write_fd).
|
||||
#[allow(unsafe_code)]
|
||||
fn create_pipe() -> Result<(RawFd, RawFd)> {
|
||||
let mut fds = [0i32; 2];
|
||||
// pipe2 with O_CLOEXEC is atomic — no race between pipe() and fcntl()
|
||||
let ret = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) };
|
||||
if ret != 0 {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Failed to create pipe: {}",
|
||||
std::io::Error::last_os_error()
|
||||
));
|
||||
}
|
||||
Ok((fds[0], fds[1]))
|
||||
}
|
||||
|
||||
/// Streams decompressed item content through a pipe fd.
|
||||
///
|
||||
/// Uses the system's `diff` command to generate a unified diff output.
|
||||
/// Returns an error if the diff command is not found.
|
||||
/// Returns a JoinHandle for the writer thread. The thread writes decompressed
|
||||
/// data to write_fd and closes it when done (causing EOF for the reader).
|
||||
#[allow(unsafe_code)]
|
||||
fn spawn_writer_thread(
|
||||
item_service: &ItemService,
|
||||
item: &crate::services::types::ItemWithMeta,
|
||||
write_fd: RawFd,
|
||||
) -> std::thread::JoinHandle<Result<()>> {
|
||||
let data_path = item_service.get_data_path().clone();
|
||||
let item_id = item.item.id.expect("item must have ID");
|
||||
let compression = item.item.compression.clone();
|
||||
let mut item_path = data_path;
|
||||
item_path.push(item_id.to_string());
|
||||
|
||||
std::thread::spawn(move || -> Result<()> {
|
||||
let compression_service = CompressionService::new();
|
||||
let mut reader = compression_service
|
||||
.stream_item_content(item_path, &compression)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to stream item {item_id}: {e}"))?;
|
||||
|
||||
// Wrap write_fd in a File so it's closed when this scope ends
|
||||
let mut writer = unsafe { std::fs::File::from_raw_fd(write_fd) };
|
||||
let mut buf = [0u8; PIPESIZE];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
use std::io::Write;
|
||||
writer.write_all(&buf[..n])?;
|
||||
}
|
||||
Err(e) => return Err(anyhow::anyhow!("Error reading item {item_id}: {e}")),
|
||||
}
|
||||
}
|
||||
// writer dropped here, closing write_fd → diff sees EOF
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Runs external diff command, streaming decompressed content via /dev/fd pipes.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `path_a` - Path to the first file.
|
||||
/// * `path_b` - Path to the second file.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<()>` - Success or error.
|
||||
fn run_external_diff(path_a: &std::path::Path, path_b: &std::path::Path) -> anyhow::Result<()> {
|
||||
/// Creates two pipes, spawns writer threads to decompress each item into its pipe,
|
||||
/// and runs `diff -u /dev/fd/N /dev/fd/M` where N and M are the pipe read fds.
|
||||
/// The read ends have CLOEXEC cleared in pre_exec so diff can open them.
|
||||
#[allow(unsafe_code)]
|
||||
fn run_external_diff(
|
||||
item_service: &ItemService,
|
||||
item_a: &crate::services::types::ItemWithMeta,
|
||||
item_b: &crate::services::types::ItemWithMeta,
|
||||
) -> Result<()> {
|
||||
if which::which_global("diff").is_err() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"diff command not found. Please install diffutils."
|
||||
));
|
||||
}
|
||||
|
||||
let mut child = std::process::Command::new("diff")
|
||||
.arg("-u")
|
||||
.arg(path_a)
|
||||
.arg(path_b)
|
||||
.stdout(std::process::Stdio::inherit())
|
||||
.stderr(std::process::Stdio::inherit())
|
||||
.spawn()
|
||||
.context("Failed to spawn diff command")?;
|
||||
let (read_fd_a, write_fd_a) = create_pipe()?;
|
||||
let (read_fd_b, write_fd_b) = create_pipe()?;
|
||||
|
||||
debug!("DIFF: pipe fds: a(r={read_fd_a}, w={write_fd_a}) b(r={read_fd_b}, w={write_fd_b})");
|
||||
|
||||
// Spawn writer threads before diff — they decompress and write to pipe write ends.
|
||||
// Threads take ownership of write_fd via from_raw_fd and close them on exit.
|
||||
let writer_a = spawn_writer_thread(item_service, item_a, write_fd_a);
|
||||
let writer_b = spawn_writer_thread(item_service, item_b, write_fd_b);
|
||||
|
||||
// Spawn diff with /dev/fd/N paths. pre_exec clears CLOEXEC on the pipe read fds
|
||||
// so they survive the close_fds step in _do_spawn and diff can open them.
|
||||
let mut child = unsafe {
|
||||
std::process::Command::new("diff")
|
||||
.arg("-u")
|
||||
.arg(format!("/dev/fd/{read_fd_a}"))
|
||||
.arg(format!("/dev/fd/{read_fd_b}"))
|
||||
.stdout(std::process::Stdio::inherit())
|
||||
.stderr(std::process::Stdio::inherit())
|
||||
.stdin(std::process::Stdio::null())
|
||||
.pre_exec(move || {
|
||||
// Clear CLOEXEC on pipe read fds so they survive exec
|
||||
if libc::fcntl(read_fd_a, libc::F_SETFD, 0) == -1
|
||||
|| libc::fcntl(read_fd_b, libc::F_SETFD, 0) == -1
|
||||
{
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.spawn()
|
||||
.context("Failed to spawn diff command")?
|
||||
};
|
||||
|
||||
let status = child.wait().context("Failed to wait for diff command")?;
|
||||
|
||||
// diff returns 0 if files are identical, 1 if different, 2 on error
|
||||
// Join writer threads and propagate errors
|
||||
writer_a
|
||||
.join()
|
||||
.map_err(|e| anyhow::anyhow!("Writer A panicked: {e:?}"))??;
|
||||
writer_b
|
||||
.join()
|
||||
.map_err(|e| anyhow::anyhow!("Writer B panicked: {e:?}"))??;
|
||||
|
||||
// diff returns 0 if identical, 1 if different, 2 on error
|
||||
if status.code() == Some(2) {
|
||||
Err(anyhow::anyhow!("diff command failed with an error"))
|
||||
} else {
|
||||
|
||||
@@ -51,8 +51,13 @@ pub fn mode_get(
|
||||
// If both are empty, find_item will find the last item
|
||||
|
||||
let item_service = ItemService::new(data_path.clone());
|
||||
let meta_filter: std::collections::HashMap<String, Option<String>> = settings
|
||||
.meta
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
let item_with_meta = item_service
|
||||
.find_item(conn, ids, tags, &std::collections::HashMap::new())
|
||||
.find_item(conn, ids, tags, &meta_filter)
|
||||
.map_err(|e| anyhow!("Unable to find matching item in database: {}", e))?;
|
||||
|
||||
let item_id = item_with_meta.item.id.context("Item missing ID")?;
|
||||
|
||||
@@ -65,9 +65,13 @@ pub fn mode_info(
|
||||
// If both are empty, find_item will find the last item
|
||||
|
||||
let item_service = ItemService::new(data_path.clone());
|
||||
// Use empty metadata HashMap
|
||||
let meta_filter: std::collections::HashMap<String, Option<String>> = settings
|
||||
.meta
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
let item_with_meta = item_service
|
||||
.find_item(conn, ids, tags, &std::collections::HashMap::new())
|
||||
.find_item(conn, ids, tags, &meta_filter)
|
||||
.map_err(|e| anyhow!("Unable to find matching item in database: {}", e))?;
|
||||
|
||||
show_item(item_with_meta, settings, data_path)
|
||||
|
||||
@@ -179,7 +179,12 @@ pub fn mode_list(
|
||||
}
|
||||
|
||||
let item_service = ItemService::new(data_path.clone());
|
||||
let items_with_meta = item_service.list_items(conn, tags, &std::collections::HashMap::new())?;
|
||||
let meta_filter: std::collections::HashMap<String, Option<String>> = settings
|
||||
.meta
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
let items_with_meta = item_service.list_items(conn, tags, &meta_filter)?;
|
||||
|
||||
let output_format = crate::modes::common::settings_output_format(settings);
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ pub mod list;
|
||||
pub mod save;
|
||||
pub mod status;
|
||||
pub mod status_plugins;
|
||||
pub mod update;
|
||||
|
||||
/// Column types, output formats, and formatting utilities shared across modes.
|
||||
pub use common::{ColumnType, OutputFormat, format_size, settings_output_format};
|
||||
@@ -50,3 +51,6 @@ pub use status::mode_status;
|
||||
|
||||
/// Lists available plugins and their configurations.
|
||||
pub use status_plugins::mode_status_plugins;
|
||||
|
||||
/// Updates an item's tags and metadata by ID.
|
||||
pub use update::mode_update;
|
||||
|
||||
171
src/modes/update.rs
Normal file
171
src/modes/update.rs
Normal file
@@ -0,0 +1,171 @@
|
||||
use anyhow::{Context, Result};
|
||||
use std::io::Read;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use crate::common::PIPESIZE;
|
||||
use crate::config;
|
||||
use crate::db;
|
||||
use crate::services::compression_service::CompressionService;
|
||||
use clap::Command;
|
||||
use log::debug;
|
||||
use rusqlite::Connection;
|
||||
|
||||
/// Handles the update mode: modifies tags and metadata for an existing item by ID.
|
||||
///
|
||||
/// This function processes a single item ID, updating its metadata based on `--meta`
|
||||
/// arguments and optionally replacing its tags with positional arguments.
|
||||
/// If the item's size is not set, it backfills it by streaming through the content file.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `cmd` - Clap command for error handling.
|
||||
/// * `settings` - Global settings containing metadata and meta plugin config.
|
||||
/// * `ids` - List containing exactly one item ID.
|
||||
/// * `conn` - Database connection.
|
||||
/// * `data_path` - Path to data directory.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// `Result<()>` on success, or an error if the update fails.
|
||||
pub fn mode_update(
|
||||
cmd: &mut Command,
|
||||
settings: &config::Settings,
|
||||
ids: &mut [i64],
|
||||
tags: &mut Vec<String>,
|
||||
conn: &mut Connection,
|
||||
data_path: PathBuf,
|
||||
) -> Result<()> {
|
||||
if ids.len() != 1 {
|
||||
cmd.error(
|
||||
clap::error::ErrorKind::InvalidValue,
|
||||
"--update requires exactly one numeric ID",
|
||||
)
|
||||
.exit();
|
||||
}
|
||||
|
||||
let item_id = ids[0];
|
||||
|
||||
// Look up the item
|
||||
let item =
|
||||
db::get_item(conn, item_id)?.ok_or_else(|| anyhow::anyhow!("Item {item_id} not found"))?;
|
||||
|
||||
debug!("UPDATE: Found item {item_id}: {item:?}");
|
||||
|
||||
// Parse --meta arguments into set and delete lists
|
||||
let mut set_meta: Vec<(String, String)> = Vec::new();
|
||||
let mut delete_keys: Vec<String> = Vec::new();
|
||||
|
||||
for (key, value) in &settings.meta {
|
||||
match value {
|
||||
Some(v) => set_meta.push((key.clone(), v.clone())),
|
||||
None => delete_keys.push(key.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
// Apply metadata changes
|
||||
for (key, value) in &set_meta {
|
||||
debug!("UPDATE: Setting meta {key}={value}");
|
||||
db::store_meta(
|
||||
conn,
|
||||
db::Meta {
|
||||
id: item_id,
|
||||
name: key.clone(),
|
||||
value: value.clone(),
|
||||
},
|
||||
)?;
|
||||
}
|
||||
|
||||
for key in &delete_keys {
|
||||
debug!("UPDATE: Deleting meta {key}");
|
||||
db::query_delete_meta(
|
||||
conn,
|
||||
db::Meta {
|
||||
id: item_id,
|
||||
name: key.clone(),
|
||||
value: String::new(),
|
||||
},
|
||||
)?;
|
||||
}
|
||||
|
||||
// Replace tags if provided
|
||||
if !tags.is_empty() {
|
||||
debug!("UPDATE: Replacing tags with {:?}", tags);
|
||||
db::set_item_tags(conn, item.clone(), tags)?;
|
||||
}
|
||||
|
||||
// Backfill size if not set
|
||||
let mut updated_item = item.clone();
|
||||
if item.size.is_none() {
|
||||
debug!("UPDATE: Size not set, backfilling from content file");
|
||||
if let Some(size) = compute_item_size(&data_path, &item) {
|
||||
debug!("UPDATE: Computed size: {size}");
|
||||
updated_item.size = Some(size);
|
||||
db::update_item(conn, updated_item.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
// Print confirmation
|
||||
if !settings.quiet {
|
||||
let mut parts = Vec::new();
|
||||
if !set_meta.is_empty() {
|
||||
parts.push(format!("set {} metadata", set_meta.len()));
|
||||
}
|
||||
if !delete_keys.is_empty() {
|
||||
parts.push(format!("deleted {} metadata", delete_keys.len()));
|
||||
}
|
||||
if !tags.is_empty() {
|
||||
parts.push(format!("tags: {}", tags.join(" ")));
|
||||
}
|
||||
let action = if parts.is_empty() {
|
||||
"no changes".to_string()
|
||||
} else {
|
||||
parts.join(", ")
|
||||
};
|
||||
|
||||
eprintln!("KEEP: Updated item {item_id} ({action})");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Computes the decompressed size of an item by streaming through its content file.
|
||||
///
|
||||
/// Reads the compressed file in PIPESIZE chunks and counts total decompressed bytes.
|
||||
/// Returns None if the file doesn't exist or decompression fails.
|
||||
fn compute_item_size(data_path: &Path, item: &db::Item) -> Option<i64> {
|
||||
let item_id = item.id?;
|
||||
let mut item_path = data_path.to_path_buf();
|
||||
item_path.push(item_id.to_string());
|
||||
|
||||
if !item_path.exists() {
|
||||
debug!("UPDATE: Content file not found: {item_path:?}");
|
||||
return None;
|
||||
}
|
||||
|
||||
let compression_service = CompressionService::new();
|
||||
let mut reader = match compression_service.stream_item_content(item_path, &item.compression) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
debug!("UPDATE: Failed to open content stream: {e}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let mut buffer = [0u8; PIPESIZE];
|
||||
let mut total_bytes: i64 = 0;
|
||||
|
||||
loop {
|
||||
match reader.read(&mut buffer) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
total_bytes += n as i64;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("UPDATE: Error reading content: {e}");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some(total_bytes)
|
||||
}
|
||||
Reference in New Issue
Block a user