From b3ca673b522a8e3945ff2c8a9c86e311762cd36f Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Sat, 14 Mar 2026 15:02:16 -0300 Subject: [PATCH] feat: add --update mode, --meta/--meta-plugin flags, streaming diff - Add --update mode to modify tags and metadata for existing items by ID - Add --meta key=value flag to set metadata during save/update - Add --meta key (bare) to delete metadata keys or filter by existence - Add --meta-plugin/-M name:{json} flag for plugin options via CLI - Env meta plugin now uses options from --meta-plugin instead of only env vars - Stream decompressed content to diff via /dev/fd pipes (no temp files) - Wire --list-format CLI arg to settings (was parsed but ignored) - Allow --info to accept tags (was restricted to numeric IDs only) - Change DB meta filtering to HashMap> for exact match + key existence - Fix fcntl error checking in diff pre_exec - Fix README inaccuracies (delete by tag, nonexistent --digest flag, meta plugin key names) --- README.md | 37 +++--- src/args.rs | 109 +++++++++++++-- src/config.rs | 120 +++++++++++++++-- src/db.rs | 11 +- src/main.rs | 25 ++-- src/meta_plugin/env.rs | 38 ++++-- src/modes/diff.rs | 204 +++++++++++++++++------------ src/modes/get.rs | 7 +- src/modes/info.rs | 8 +- src/modes/list.rs | 7 +- src/modes/mod.rs | 4 + src/modes/update.rs | 171 ++++++++++++++++++++++++ src/services/async_data_service.rs | 10 +- src/services/async_item_service.rs | 4 +- src/services/data_service.rs | 6 +- src/services/item_service.rs | 11 +- src/services/sync_data_service.rs | 10 +- 17 files changed, 604 insertions(+), 178 deletions(-) create mode 100644 src/modes/update.rs diff --git a/README.md b/README.md index 06e8df2..a72b019 100644 --- a/README.md +++ b/README.md @@ -149,8 +149,8 @@ keep --list # Get item details keep --info greeting -# Delete by tag -keep --delete greeting +# Delete by ID +keep --delete 1 ``` ### Real-World Examples @@ -192,8 +192,8 @@ echo "data" | keep my-tag # Save with multiple tags and metadata cat report.pdf | keep --save report --meta project=alpha --meta env=prod -# Specify compression and digest algorithm -echo "data" | keep --save my-tag --compression gzip --digest sha256 +# Specify compression +echo "data" | keep --save my-tag --compression gzip ``` Tags and metadata make items easy to find later. Tags are simple identifiers; metadata is key-value pairs. @@ -364,7 +364,7 @@ Metadata is automatically extracted when saving items. | `env` | `*` | Capture `KEEP_META_*` environment variables | | `magic_file` | `file_type` | File type detection (requires `magic` feature) | | `text` | `text_line_count`, `text_word_count` | Line and word counts | -| `user` | `uid`, `user`, `gid`, `group` | Current user info | +| `user` | `user_uid`, `user_name`, `user_gid`, `user_group` | Current user info | | `shell` | `shell` | Current shell path | | `shell_pid` | `shell_pid` | Shell process ID | | `keep_pid` | `keep_pid` | Keep process ID | @@ -376,8 +376,11 @@ Metadata is automatically extracted when saving items. | `cwd` | `cwd` | Current working directory | ```sh -# Use specific plugins -echo "data" | keep --save tag --meta-plugins "digest,text,user" +# Use specific plugins (repeatable) +echo "data" | keep --save tag --meta-plugin digest --meta-plugin text --meta-plugin user + +# Pass options to a plugin via JSON +echo "data" | keep --save tag --meta-plugin 'tokens:{"options":{"min_length":"2"}}' # Capture custom metadata via environment KEEP_META_project=alpha echo "data" | keep --save tag @@ -395,7 +398,7 @@ KEEP_META_build=1234 echo "data" | keep --save tag --meta env=staging | `KEEP_DIR` | Storage directory | `~/.keep` | | `KEEP_CONFIG` | Config file path | `~/.config/keep/config.yml` | | `KEEP_COMPRESSION` | Compression algorithm | `lz4` | -| `KEEP_META_PLUGINS` | Meta plugins to use | `env` | +| `KEEP_META_PLUGINS` | Meta plugins to use (JSON format: `name[:{json}]`, comma-separated) | `env` | | `KEEP_FILTERS` | Default filter chain | none | | `KEEP_LIST_FORMAT` | List column format | built-in defaults | | `KEEP_SERVER_ADDRESS` | Server bind address | `127.0.0.1` | @@ -758,16 +761,16 @@ This means client behavior is consistent with local mode — the same compressio Client save uses a 3-thread streaming pipeline for constant memory usage regardless of data size: ``` -┌──────────────┐ OS pipe ┌────────────────┐ +┌───────────────┐ OS pipe ┌────────────────┐ │ Reader thread ├──────────────────┤ Streamer thread│ -│ │ (compressed │ │ -│ stdin → tee │ bytes) │ pipe → POST │ -│ → hash │ │ (chunked) │ -│ → compress│ │ │ -└──────────────┘ └────────────────┘ - │ │ - ▼ ▼ - stdout + Server stores blob +│ │ (compressed │ │ +│ stdin → tee │ bytes) │ pipe → POST │ +│ → hash │ │ (chunked) │ +│ → compress │ │ │ +└───────────────┘ └────────────────┘ + │ │ + ▼ ▼ + stdout + Server stores blob SHA-256 digest ``` diff --git a/src/args.rs b/src/args.rs index 7f9f58c..e7782d5 100644 --- a/src/args.rs +++ b/src/args.rs @@ -24,52 +24,56 @@ pub struct Args { /// Struct for mode-specific arguments, defining CLI flags for different operations. #[derive(Parser, Debug, Clone)] pub struct ModeArgs { - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["get", "diff", "list", "delete", "info", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["get", "diff", "list", "delete", "info", "update", "status"]))] #[arg(help("Save an item using any tags or metadata provided"))] pub save: bool, - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "diff", "list", "delete", "info", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "diff", "list", "delete", "info", "update", "status"]))] #[arg(help( "Get an item either by it's ID or by a combination of matching tags and metatdata" ))] pub get: bool, - #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "list", "delete", "info", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "list", "delete", "info", "update", "status"]))] #[arg(help("Show a diff between two items by ID"))] pub diff: bool, - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "delete", "info", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "delete", "info", "update", "status"]))] #[arg(help("List items, filtering on tags or metadata if given"))] pub list: bool, - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "info", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "info", "update", "status"]))] #[arg(help("Delete items either by ID or by matching tags"))] #[arg(requires = "ids_or_tags")] pub delete: bool, - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "delete", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "delete", "update", "status"]))] #[arg(help( "Get an item either by it's ID or by a combination of matching tags and metatdata" ))] pub info: bool, - #[arg(group("mode"), help_heading("Mode Options"), short('S'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "server", "status_plugins"]))] + #[arg(group("mode"), help_heading("Mode Options"), short('u'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "status"]))] + #[arg(help("Update an item's tags and metadata by ID"))] + pub update: bool, + + #[arg(group("mode"), help_heading("Mode Options"), short('S'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "server", "status_plugins"]))] #[arg(help("Show status of directories and supported compression algorithms"))] pub status: bool, - #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "status", "server"]))] + #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server"]))] #[arg(help("Show available plugins and their configurations"))] pub status_plugins: bool, - #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status"]))] #[arg(help("Start REST HTTP server"))] pub server: bool, - #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "status", "server"]))] + #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server"]))] #[arg(help("Generate default configuration and output to stdout"))] pub generate_config: bool, - #[arg(help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "status", "server", "generate_config"]))] + #[arg(help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server", "generate_config"]))] #[arg(help("Generate shell completion script (bash, zsh, fish, elvish, powershell)"))] pub generate_completion: Option, @@ -92,6 +96,78 @@ pub struct ModeArgs { pub server_key: Option, } +/// Represents a meta plugin argument with optional JSON config. +/// +/// Parsed from `name` or `name:{"options":{...},"outputs":{...}}` syntax. +#[derive(Debug, Clone)] +pub struct MetaPluginArg { + pub name: String, + pub options: Option, +} + +impl FromStr for MetaPluginArg { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + if let Some((name, json_str)) = s.split_once(':') { + let value: serde_json::Value = serde_json::from_str(json_str) + .map_err(|e| anyhow::anyhow!("Invalid JSON for meta plugin '{}': {}", name, e))?; + Ok(MetaPluginArg { + name: name.to_string(), + options: Some(value), + }) + } else { + Ok(MetaPluginArg { + name: s.to_string(), + options: None, + }) + } + } +} + +/// Represents a metadata key-value argument. +/// +/// Parsed from `key=value` (set) or `key` (delete/filter by existence). +#[derive(Debug, Clone)] +pub enum MetaArg { + /// Set metadata with a value. + Set { key: String, value: String }, + /// Bare key without a value (delete in update mode, filter by existence otherwise). + Key(String), +} + +impl MetaArg { + /// Returns the key. + pub fn key(&self) -> &str { + match self { + MetaArg::Set { key, .. } | MetaArg::Key(key) => key, + } + } + + /// Returns the value if this is a Set variant. + pub fn value(&self) -> Option<&str> { + match self { + MetaArg::Set { value, .. } => Some(value), + MetaArg::Key(_) => None, + } + } +} + +impl FromStr for MetaArg { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + if let Some((key, value)) = s.split_once('=') { + Ok(MetaArg::Set { + key: key.to_string(), + value: value.to_string(), + }) + } else { + Ok(MetaArg::Key(s.to_string())) + } + } +} + /// Struct for item-specific arguments, such as compression and plugins. #[derive(Parser, Debug, Clone)] pub struct ItemArgs { @@ -102,11 +178,16 @@ pub struct ItemArgs { #[arg( help_heading("Item Options"), short('M'), - long, + long = "meta-plugin", + value_parser = clap::value_parser!(MetaPluginArg), env("KEEP_META_PLUGINS") )] - #[arg(help("Meta plugins to use when saving items"))] - pub meta_plugins: Vec, + #[arg(help("Meta plugin to use (repeatable): name or name:{json}"))] + pub meta_plugins: Vec, + + #[arg(help_heading("Item Options"), long)] + #[arg(help("Metadata key=value to set (or key to delete in --update)"))] + pub meta: Vec, #[arg(help_heading("Item Options"), long, env("KEEP_FILTERS"))] #[arg(help("Filter string to apply to content when getting items"))] diff --git a/src/config.rs b/src/config.rs index db136c0..380022e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -209,6 +209,9 @@ pub struct Settings { pub client_password: Option, #[serde(skip)] pub client_jwt: Option, + // Metadata key-value pairs from --meta CLI flag + #[serde(skip)] + pub meta: Vec<(String, Option)>, } impl Settings { @@ -330,19 +333,8 @@ impl Settings { config_builder.set_override("compression_plugin.name", compression.as_str())?; } - if !args.item.meta_plugins.is_empty() { - let meta_plugins: Vec> = args - .item - .meta_plugins - .iter() - .map(|name| { - let mut map = std::collections::HashMap::new(); - map.insert("name".to_string(), name.clone()); - map - }) - .collect(); - config_builder = config_builder.set_override("meta_plugins", meta_plugins)?; - } + // Build MetaPluginConfig entries from --meta-plugin args (name[:json]) + // These are handled after config deserialization (see below). let config = config_builder.build()?; debug!("CONFIG: Built config, attempting to deserialize"); @@ -438,6 +430,57 @@ impl Settings { }]); } + // Override meta_plugins from --meta-plugin CLI args + if !args.item.meta_plugins.is_empty() { + debug!("CONFIG: Overriding meta_plugins from --meta-plugin CLI args"); + let cli_plugins: Vec = args + .item + .meta_plugins + .iter() + .map(|arg| { + let mut options = std::collections::HashMap::new(); + let mut outputs = std::collections::HashMap::new(); + if let Some(serde_json::Value::Object(obj)) = &arg.options { + // Extract options and outputs from JSON value + if let Some(serde_json::Value::Object(opts_obj)) = + obj.get("options") + { + for (k, v) in opts_obj { + let yaml_str = serde_json::to_string(v).unwrap_or_default(); + let yaml_val: serde_yaml::Value = + serde_yaml::from_str(&yaml_str) + .unwrap_or(serde_yaml::Value::Null); + options.insert(k.clone(), yaml_val); + } + } + if let Some(serde_json::Value::Object(outs_obj)) = + obj.get("outputs") + { + for (k, v) in outs_obj { + let val_str = match v { + serde_json::Value::String(s) => s.clone(), + _ => v.to_string(), + }; + outputs.insert(k.clone(), val_str); + } + } + } + MetaPluginConfig { + name: arg.name.clone(), + options, + outputs, + } + }) + .collect(); + settings.meta_plugins = Some(cli_plugins); + } + + // Override list_format from --list-format CLI arg + if args.options.list_format != "id,time,size,tags,meta:hostname" { + debug!("CONFIG: Overriding list_format from --list-format CLI arg"); + settings.list_format = Settings::parse_list_format(&args.options.list_format); + } + // Set dir to default if not provided or is empty if settings.dir == PathBuf::new() { debug!("CONFIG: Setting default dir: {default_dir:?}"); @@ -469,6 +512,20 @@ impl Settings { .or_else(|| settings.client.as_ref().and_then(|c| c.jwt.clone())); } + // Parse --meta key=value and bare key arguments + settings.meta = args + .item + .meta + .iter() + .map(|s| { + if let Some((key, value)) = s.split_once('=') { + (key.to_string(), Some(value.to_string())) + } else { + (s.to_string(), None) + } + }) + .collect(); + debug!("CONFIG: Final settings: {settings:?}"); Ok(settings) } @@ -642,4 +699,41 @@ impl Settings { warnings } + + /// Parse a comma-separated column list string into Vec. + /// + /// Maps known column names to their default labels and alignment. + /// For unknown names (including meta:* columns), uses the name as its own label. + fn parse_list_format(input: &str) -> Vec { + input + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(|name| { + let (label, align) = match name { + "id" => ("Item", ColumnAlignment::Right), + "time" => ("Time", ColumnAlignment::Right), + "size" => ("Size", ColumnAlignment::Right), + "meta:text_line_count" => ("Lines", ColumnAlignment::Right), + "meta:token_count" => ("Tokens", ColumnAlignment::Right), + "tags" => ("Tags", ColumnAlignment::Left), + "meta:hostname_short" => ("Host", ColumnAlignment::Left), + "meta:hostname" => ("Host", ColumnAlignment::Left), + "meta:command" => ("Command", ColumnAlignment::Left), + "compression" => ("Compression", ColumnAlignment::Left), + other if other.starts_with("meta:") => { + let sub = other.strip_prefix("meta:").unwrap_or(other); + (sub, ColumnAlignment::Left) + } + other => (other, ColumnAlignment::Left), + }; + ColumnConfig { + name: name.to_string(), + label: label.to_string(), + align, + ..Default::default() + } + }) + .collect() + } } diff --git a/src/db.rs b/src/db.rs index 633f152..0708a00 100644 --- a/src/db.rs +++ b/src/db.rs @@ -912,7 +912,7 @@ pub fn get_items(conn: &Connection) -> Result> { /// let db_path = PathBuf::from("keep.db"); /// let conn = db::open(db_path)?; /// let tags = vec!["project".to_string()]; -/// let meta = HashMap::from([("status".to_string(), "active".to_string())]); +/// let meta = HashMap::from([("status".to_string(), Some("active".to_string()))]); /// let matching = db::get_items_matching(&conn, &tags, &meta)?; /// # Ok(()) /// # } @@ -920,7 +920,7 @@ pub fn get_items(conn: &Connection) -> Result> { pub fn get_items_matching( conn: &Connection, tags: &Vec, - meta: &HashMap, + meta: &HashMap>, ) -> Result> { debug!("DB: Getting items matching: tags={tags:?} meta={meta:?}"); @@ -947,7 +947,10 @@ pub fn get_items_matching( Some(m) => m, None => return false, }; - meta.iter().all(|(k, v)| item_meta.get(k) == Some(v)) + meta.iter().all(|(k, v)| match v { + Some(val) => item_meta.get(k) == Some(val), + None => item_meta.contains_key(k), + }) }) .collect(); Ok(filtered_items) @@ -990,7 +993,7 @@ pub fn get_items_matching( pub fn get_item_matching( conn: &Connection, tags: &Vec, - meta: &HashMap, + meta: &HashMap>, ) -> Result> { debug!("DB: Get item matching tags: {tags:?}, meta: {meta:?}"); let items = get_items_matching(conn, tags, meta)?; diff --git a/src/main.rs b/src/main.rs index a17467f..929ddfb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -91,19 +91,12 @@ fn main() -> Result<(), Error> { } NumberOrString::Str(str) => { // For --info and --get, try to parse strings as numbers to treat them as IDs - if args.mode.info || args.mode.get { - if let Ok(num) = str.parse::() { - debug!("MAIN: Adding parsed string to ids: {num}"); - ids.push(num); - continue; - } else if args.mode.info { - // --info only accepts numeric IDs - cmd.error( - ErrorKind::InvalidValue, - format!("--info requires numeric IDs, found: '{str}'"), - ) - .exit(); - } + if (args.mode.info || args.mode.get) + && let Ok(num) = str.parse::() + { + debug!("MAIN: Adding parsed string to ids: {num}"); + ids.push(num); + continue; } // If not a number, or not using --info/--get, treat as tag debug!("MAIN: Adding to tags: {str}"); @@ -124,6 +117,7 @@ fn main() -> Result<(), Error> { List, Delete, Info, + Update, Status, StatusPlugins, Server, @@ -144,6 +138,8 @@ fn main() -> Result<(), Error> { mode = KeepModes::Delete; } else if args.mode.info { mode = KeepModes::Info; + } else if args.mode.update { + mode = KeepModes::Update; } else if args.mode.status { mode = KeepModes::Status; } else if args.mode.status_plugins { @@ -307,6 +303,9 @@ fn main() -> Result<(), Error> { KeepModes::Info => { modes::info::mode_info(&mut cmd, &settings, ids, tags, &mut conn, data_path) } + KeepModes::Update => { + modes::update::mode_update(&mut cmd, &settings, ids, tags, &mut conn, data_path) + } KeepModes::Status => modes::status::mode_status(&mut cmd, &settings, data_path, db_path), KeepModes::StatusPlugins => { modes::status_plugins::mode_status_plugins(&mut cmd, &settings, data_path, db_path) diff --git a/src/meta_plugin/env.rs b/src/meta_plugin/env.rs index a07afaf..55b36fc 100644 --- a/src/meta_plugin/env.rs +++ b/src/meta_plugin/env.rs @@ -22,22 +22,38 @@ impl EnvMetaPlugin { /// /// A new instance of `EnvMetaPlugin`. pub fn new( - _options: Option>, + options: Option>, outputs: Option>, ) -> Self { - // Collect environment variables starting with KEEP_META_ let mut env_vars = Vec::new(); let mut outputs_map = std::collections::HashMap::new(); - for (key, value) in std::env::vars() { - if let Some(stripped_key) = key.strip_prefix("KEEP_META_") { - // Add to env_vars to process later - env_vars.push((stripped_key.to_string(), value)); - // Add to outputs with default mapping to the stripped name - outputs_map.insert( - stripped_key.to_string(), - serde_yaml::Value::String(stripped_key.to_string()), - ); + // Use options from --meta-plugin JSON if provided and non-empty, + // otherwise fall back to KEEP_META_* environment variables. + let use_options = options.as_ref().map(|o| !o.is_empty()).unwrap_or(false); + + if use_options { + let opts = options.as_ref().unwrap(); + for (key, value) in opts { + let value_str = match value { + serde_yaml::Value::String(s) => s.clone(), + serde_yaml::Value::Number(n) => n.to_string(), + serde_yaml::Value::Bool(b) => b.to_string(), + _ => serde_yaml::to_string(value).unwrap_or_default(), + }; + env_vars.push((key.clone(), value_str)); + outputs_map.insert(key.clone(), serde_yaml::Value::String(key.clone())); + } + } else { + // Fall back to KEEP_META_* environment variables + for (key, value) in std::env::vars() { + if let Some(stripped_key) = key.strip_prefix("KEEP_META_") { + env_vars.push((stripped_key.to_string(), value)); + outputs_map.insert( + stripped_key.to_string(), + serde_yaml::Value::String(stripped_key.to_string()), + ); + } } } diff --git a/src/modes/diff.rs b/src/modes/diff.rs index a0d3c1e..c7d7e47 100644 --- a/src/modes/diff.rs +++ b/src/modes/diff.rs @@ -1,12 +1,18 @@ -use crate::config; -use crate::services::item_service::ItemService; /// Diff mode implementation. /// /// This module provides functionality for comparing two items and displaying their -/// differences using external diff tools. +/// differences using external diff tools. Decompressed content is streamed to diff +/// via pipes and /dev/fd file descriptors — no temporary files are created. +use crate::common::PIPESIZE; +use crate::config; +use crate::services::compression_service::CompressionService; +use crate::services::item_service::ItemService; use anyhow::{Context, Result}; use clap::Command; use log::debug; +use std::io::Read; +use std::os::unix::io::{FromRawFd, RawFd}; +use std::os::unix::process::CommandExt; fn validate_diff_args(_cmd: &mut Command, ids: &[i64], tags: &[String]) -> anyhow::Result<()> { if !tags.is_empty() { @@ -23,19 +29,6 @@ fn validate_diff_args(_cmd: &mut Command, ids: &[i64], tags: &[String]) -> anyho } /// Fetches and validates items from the database for diff operation. -/// -/// This function retrieves two items by their IDs from the database using the -/// item service, which handles validation, and returns them as a tuple. -/// -/// # Arguments -/// -/// * `conn` - Mutable reference to the database connection. -/// * `ids` - Vector of item IDs to fetch. -/// * `item_service` - Reference to the item service for validation. -/// -/// # Returns -/// -/// * `Result<(ItemWithMeta, ItemWithMeta)>` - Tuple of items with metadata or error. fn fetch_and_validate_items( conn: &mut rusqlite::Connection, ids: &[i64], @@ -44,7 +37,6 @@ fn fetch_and_validate_items( crate::services::types::ItemWithMeta, crate::services::types::ItemWithMeta, )> { - // Fetch items using the service, which handles validation let item_a = item_service .get_item(conn, ids[0]) .with_context(|| format!("Unable to find first item (ID: {}) in database", ids[0]))?; @@ -52,48 +44,12 @@ fn fetch_and_validate_items( .get_item(conn, ids[1]) .with_context(|| format!("Unable to find second item (ID: {}) in database", ids[1]))?; - debug!("MAIN: Found item A {:?}", item_a.item); - debug!("MAIN: Found item B {:?}", item_b.item); + debug!("DIFF: Found item A {:?}", item_a.item); + debug!("DIFF: Found item B {:?}", item_b.item); Ok((item_a, item_b)) } -/// Sets up file paths and compression for diff operation. -/// -/// This function constructs the file paths for the two items and prepares the -/// compression engines needed for reading their contents. -/// -/// # Arguments -/// -/// * `item_service` - Reference to the item service. -/// * `item_a` - First item with metadata. -/// * `item_b` - Second item with metadata. -/// -/// # Returns -/// -/// * `Result<(PathBuf, PathBuf)>` - Tuple of item file paths or error. -fn setup_diff_paths_and_compression( - item_service: &ItemService, - item_a: &crate::services::types::ItemWithMeta, - item_b: &crate::services::types::ItemWithMeta, -) -> Result<(std::path::PathBuf, std::path::PathBuf)> { - let item_a_id = item_a - .item - .id - .ok_or_else(|| anyhow::anyhow!("Item A missing ID"))?; - let item_b_id = item_b - .item - .id - .ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?; - - // Use the service's data path to construct proper file paths - let data_path = item_service.get_data_path(); - let item_a_path = data_path.join(item_a_id.to_string()); - let item_b_path = data_path.join(item_b_id.to_string()); - - Ok((item_a_path, item_b_path)) -} - pub fn mode_diff( cmd: &mut Command, args: &crate::args::Args, @@ -125,51 +81,129 @@ pub fn mode_diff( validate_diff_args(cmd, &ids, &tags)?; - let settings = crate::config::Settings::new(args, crate::config::Settings::default_dir()?)?; - - let item_service = crate::services::item_service::ItemService::new(settings.dir.clone()); - + let settings = config::Settings::new(args, config::Settings::default_dir()?)?; + let item_service = ItemService::new(settings.dir.clone()); let (item_a, item_b) = fetch_and_validate_items(conn, &ids, &item_service)?; - let (path_a, path_b) = setup_diff_paths_and_compression(&item_service, &item_a, &item_b)?; - - run_external_diff(&path_a, &path_b)?; - - Ok(()) + run_external_diff(&item_service, &item_a, &item_b) } -/// Runs external diff command to compare two files. +/// Creates a pipe via libc, returns (read_fd, write_fd). +#[allow(unsafe_code)] +fn create_pipe() -> Result<(RawFd, RawFd)> { + let mut fds = [0i32; 2]; + // pipe2 with O_CLOEXEC is atomic — no race between pipe() and fcntl() + let ret = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) }; + if ret != 0 { + return Err(anyhow::anyhow!( + "Failed to create pipe: {}", + std::io::Error::last_os_error() + )); + } + Ok((fds[0], fds[1])) +} + +/// Streams decompressed item content through a pipe fd. /// -/// Uses the system's `diff` command to generate a unified diff output. -/// Returns an error if the diff command is not found. +/// Returns a JoinHandle for the writer thread. The thread writes decompressed +/// data to write_fd and closes it when done (causing EOF for the reader). +#[allow(unsafe_code)] +fn spawn_writer_thread( + item_service: &ItemService, + item: &crate::services::types::ItemWithMeta, + write_fd: RawFd, +) -> std::thread::JoinHandle> { + let data_path = item_service.get_data_path().clone(); + let item_id = item.item.id.expect("item must have ID"); + let compression = item.item.compression.clone(); + let mut item_path = data_path; + item_path.push(item_id.to_string()); + + std::thread::spawn(move || -> Result<()> { + let compression_service = CompressionService::new(); + let mut reader = compression_service + .stream_item_content(item_path, &compression) + .map_err(|e| anyhow::anyhow!("Failed to stream item {item_id}: {e}"))?; + + // Wrap write_fd in a File so it's closed when this scope ends + let mut writer = unsafe { std::fs::File::from_raw_fd(write_fd) }; + let mut buf = [0u8; PIPESIZE]; + loop { + match reader.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + use std::io::Write; + writer.write_all(&buf[..n])?; + } + Err(e) => return Err(anyhow::anyhow!("Error reading item {item_id}: {e}")), + } + } + // writer dropped here, closing write_fd → diff sees EOF + Ok(()) + }) +} + +/// Runs external diff command, streaming decompressed content via /dev/fd pipes. /// -/// # Arguments -/// -/// * `path_a` - Path to the first file. -/// * `path_b` - Path to the second file. -/// -/// # Returns -/// -/// * `Result<()>` - Success or error. -fn run_external_diff(path_a: &std::path::Path, path_b: &std::path::Path) -> anyhow::Result<()> { +/// Creates two pipes, spawns writer threads to decompress each item into its pipe, +/// and runs `diff -u /dev/fd/N /dev/fd/M` where N and M are the pipe read fds. +/// The read ends have CLOEXEC cleared in pre_exec so diff can open them. +#[allow(unsafe_code)] +fn run_external_diff( + item_service: &ItemService, + item_a: &crate::services::types::ItemWithMeta, + item_b: &crate::services::types::ItemWithMeta, +) -> Result<()> { if which::which_global("diff").is_err() { return Err(anyhow::anyhow!( "diff command not found. Please install diffutils." )); } - let mut child = std::process::Command::new("diff") - .arg("-u") - .arg(path_a) - .arg(path_b) - .stdout(std::process::Stdio::inherit()) - .stderr(std::process::Stdio::inherit()) - .spawn() - .context("Failed to spawn diff command")?; + let (read_fd_a, write_fd_a) = create_pipe()?; + let (read_fd_b, write_fd_b) = create_pipe()?; + + debug!("DIFF: pipe fds: a(r={read_fd_a}, w={write_fd_a}) b(r={read_fd_b}, w={write_fd_b})"); + + // Spawn writer threads before diff — they decompress and write to pipe write ends. + // Threads take ownership of write_fd via from_raw_fd and close them on exit. + let writer_a = spawn_writer_thread(item_service, item_a, write_fd_a); + let writer_b = spawn_writer_thread(item_service, item_b, write_fd_b); + + // Spawn diff with /dev/fd/N paths. pre_exec clears CLOEXEC on the pipe read fds + // so they survive the close_fds step in _do_spawn and diff can open them. + let mut child = unsafe { + std::process::Command::new("diff") + .arg("-u") + .arg(format!("/dev/fd/{read_fd_a}")) + .arg(format!("/dev/fd/{read_fd_b}")) + .stdout(std::process::Stdio::inherit()) + .stderr(std::process::Stdio::inherit()) + .stdin(std::process::Stdio::null()) + .pre_exec(move || { + // Clear CLOEXEC on pipe read fds so they survive exec + if libc::fcntl(read_fd_a, libc::F_SETFD, 0) == -1 + || libc::fcntl(read_fd_b, libc::F_SETFD, 0) == -1 + { + return Err(std::io::Error::last_os_error()); + } + Ok(()) + }) + .spawn() + .context("Failed to spawn diff command")? + }; let status = child.wait().context("Failed to wait for diff command")?; - // diff returns 0 if files are identical, 1 if different, 2 on error + // Join writer threads and propagate errors + writer_a + .join() + .map_err(|e| anyhow::anyhow!("Writer A panicked: {e:?}"))??; + writer_b + .join() + .map_err(|e| anyhow::anyhow!("Writer B panicked: {e:?}"))??; + + // diff returns 0 if identical, 1 if different, 2 on error if status.code() == Some(2) { Err(anyhow::anyhow!("diff command failed with an error")) } else { diff --git a/src/modes/get.rs b/src/modes/get.rs index 63fa48e..c2949dc 100644 --- a/src/modes/get.rs +++ b/src/modes/get.rs @@ -51,8 +51,13 @@ pub fn mode_get( // If both are empty, find_item will find the last item let item_service = ItemService::new(data_path.clone()); + let meta_filter: std::collections::HashMap> = settings + .meta + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); let item_with_meta = item_service - .find_item(conn, ids, tags, &std::collections::HashMap::new()) + .find_item(conn, ids, tags, &meta_filter) .map_err(|e| anyhow!("Unable to find matching item in database: {}", e))?; let item_id = item_with_meta.item.id.context("Item missing ID")?; diff --git a/src/modes/info.rs b/src/modes/info.rs index 5567074..cd4823e 100644 --- a/src/modes/info.rs +++ b/src/modes/info.rs @@ -65,9 +65,13 @@ pub fn mode_info( // If both are empty, find_item will find the last item let item_service = ItemService::new(data_path.clone()); - // Use empty metadata HashMap + let meta_filter: std::collections::HashMap> = settings + .meta + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); let item_with_meta = item_service - .find_item(conn, ids, tags, &std::collections::HashMap::new()) + .find_item(conn, ids, tags, &meta_filter) .map_err(|e| anyhow!("Unable to find matching item in database: {}", e))?; show_item(item_with_meta, settings, data_path) diff --git a/src/modes/list.rs b/src/modes/list.rs index 48fa277..15b0e33 100644 --- a/src/modes/list.rs +++ b/src/modes/list.rs @@ -179,7 +179,12 @@ pub fn mode_list( } let item_service = ItemService::new(data_path.clone()); - let items_with_meta = item_service.list_items(conn, tags, &std::collections::HashMap::new())?; + let meta_filter: std::collections::HashMap> = settings + .meta + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let items_with_meta = item_service.list_items(conn, tags, &meta_filter)?; let output_format = crate::modes::common::settings_output_format(settings); diff --git a/src/modes/mod.rs b/src/modes/mod.rs index 57c26a3..96f6bbe 100644 --- a/src/modes/mod.rs +++ b/src/modes/mod.rs @@ -16,6 +16,7 @@ pub mod list; pub mod save; pub mod status; pub mod status_plugins; +pub mod update; /// Column types, output formats, and formatting utilities shared across modes. pub use common::{ColumnType, OutputFormat, format_size, settings_output_format}; @@ -50,3 +51,6 @@ pub use status::mode_status; /// Lists available plugins and their configurations. pub use status_plugins::mode_status_plugins; + +/// Updates an item's tags and metadata by ID. +pub use update::mode_update; diff --git a/src/modes/update.rs b/src/modes/update.rs new file mode 100644 index 0000000..0718a0f --- /dev/null +++ b/src/modes/update.rs @@ -0,0 +1,171 @@ +use anyhow::{Context, Result}; +use std::io::Read; +use std::path::{Path, PathBuf}; + +use crate::common::PIPESIZE; +use crate::config; +use crate::db; +use crate::services::compression_service::CompressionService; +use clap::Command; +use log::debug; +use rusqlite::Connection; + +/// Handles the update mode: modifies tags and metadata for an existing item by ID. +/// +/// This function processes a single item ID, updating its metadata based on `--meta` +/// arguments and optionally replacing its tags with positional arguments. +/// If the item's size is not set, it backfills it by streaming through the content file. +/// +/// # Arguments +/// +/// * `cmd` - Clap command for error handling. +/// * `settings` - Global settings containing metadata and meta plugin config. +/// * `ids` - List containing exactly one item ID. +/// * `conn` - Database connection. +/// * `data_path` - Path to data directory. +/// +/// # Returns +/// +/// `Result<()>` on success, or an error if the update fails. +pub fn mode_update( + cmd: &mut Command, + settings: &config::Settings, + ids: &mut [i64], + tags: &mut Vec, + conn: &mut Connection, + data_path: PathBuf, +) -> Result<()> { + if ids.len() != 1 { + cmd.error( + clap::error::ErrorKind::InvalidValue, + "--update requires exactly one numeric ID", + ) + .exit(); + } + + let item_id = ids[0]; + + // Look up the item + let item = + db::get_item(conn, item_id)?.ok_or_else(|| anyhow::anyhow!("Item {item_id} not found"))?; + + debug!("UPDATE: Found item {item_id}: {item:?}"); + + // Parse --meta arguments into set and delete lists + let mut set_meta: Vec<(String, String)> = Vec::new(); + let mut delete_keys: Vec = Vec::new(); + + for (key, value) in &settings.meta { + match value { + Some(v) => set_meta.push((key.clone(), v.clone())), + None => delete_keys.push(key.clone()), + } + } + + // Apply metadata changes + for (key, value) in &set_meta { + debug!("UPDATE: Setting meta {key}={value}"); + db::store_meta( + conn, + db::Meta { + id: item_id, + name: key.clone(), + value: value.clone(), + }, + )?; + } + + for key in &delete_keys { + debug!("UPDATE: Deleting meta {key}"); + db::query_delete_meta( + conn, + db::Meta { + id: item_id, + name: key.clone(), + value: String::new(), + }, + )?; + } + + // Replace tags if provided + if !tags.is_empty() { + debug!("UPDATE: Replacing tags with {:?}", tags); + db::set_item_tags(conn, item.clone(), tags)?; + } + + // Backfill size if not set + let mut updated_item = item.clone(); + if item.size.is_none() { + debug!("UPDATE: Size not set, backfilling from content file"); + if let Some(size) = compute_item_size(&data_path, &item) { + debug!("UPDATE: Computed size: {size}"); + updated_item.size = Some(size); + db::update_item(conn, updated_item.clone())?; + } + } + + // Print confirmation + if !settings.quiet { + let mut parts = Vec::new(); + if !set_meta.is_empty() { + parts.push(format!("set {} metadata", set_meta.len())); + } + if !delete_keys.is_empty() { + parts.push(format!("deleted {} metadata", delete_keys.len())); + } + if !tags.is_empty() { + parts.push(format!("tags: {}", tags.join(" "))); + } + let action = if parts.is_empty() { + "no changes".to_string() + } else { + parts.join(", ") + }; + + eprintln!("KEEP: Updated item {item_id} ({action})"); + } + + Ok(()) +} + +/// Computes the decompressed size of an item by streaming through its content file. +/// +/// Reads the compressed file in PIPESIZE chunks and counts total decompressed bytes. +/// Returns None if the file doesn't exist or decompression fails. +fn compute_item_size(data_path: &Path, item: &db::Item) -> Option { + let item_id = item.id?; + let mut item_path = data_path.to_path_buf(); + item_path.push(item_id.to_string()); + + if !item_path.exists() { + debug!("UPDATE: Content file not found: {item_path:?}"); + return None; + } + + let compression_service = CompressionService::new(); + let mut reader = match compression_service.stream_item_content(item_path, &item.compression) { + Ok(r) => r, + Err(e) => { + debug!("UPDATE: Failed to open content stream: {e}"); + return None; + } + }; + + let mut buffer = [0u8; PIPESIZE]; + let mut total_bytes: i64 = 0; + + loop { + match reader.read(&mut buffer) { + Ok(0) => break, + Ok(n) => { + total_bytes += n as i64; + } + Err(e) => { + debug!("UPDATE: Error reading content: {e}"); + return None; + } + } + } + + Some(total_bytes) +} diff --git a/src/services/async_data_service.rs b/src/services/async_data_service.rs index 5fdb8fd..5588637 100644 --- a/src/services/async_data_service.rs +++ b/src/services/async_data_service.rs @@ -65,7 +65,7 @@ impl AsyncDataService { pub async fn list_items( &self, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result, CoreError> { let mut conn = self.db.lock().await; self.list(&mut conn, tags, meta) @@ -75,7 +75,7 @@ impl AsyncDataService { &self, ids: Vec, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result { let mut conn = self.db.lock().await; DataService::find_item(self, &mut conn, ids, tags, meta) @@ -237,7 +237,7 @@ impl DataService for AsyncDataService { &self, conn: &mut Connection, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result, Self::Error> { self.sync_service.list(conn, tags, meta) } @@ -251,7 +251,7 @@ impl DataService for AsyncDataService { conn: &mut Connection, ids: Vec, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result { self.sync_service.find_item(conn, ids, tags, meta) } @@ -261,7 +261,7 @@ impl DataService for AsyncDataService { conn: &mut Connection, ids: &[i64], tags: &[String], - meta: &HashMap, + meta: &HashMap>, ) -> Result, Self::Error> { self.sync_service.get_items(conn, ids, tags, meta) } diff --git a/src/services/async_item_service.rs b/src/services/async_item_service.rs index 360f9bd..a2f085e 100644 --- a/src/services/async_item_service.rs +++ b/src/services/async_item_service.rs @@ -352,7 +352,7 @@ impl AsyncItemService { &self, ids: Vec, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result { let ids_clone = ids.clone(); let tags_clone = tags.clone(); @@ -366,7 +366,7 @@ impl AsyncItemService { pub async fn list_items( &self, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result, CoreError> { let tags_clone = tags.clone(); let meta_clone = meta.clone(); diff --git a/src/services/data_service.rs b/src/services/data_service.rs index 4e2ab62..b30c895 100644 --- a/src/services/data_service.rs +++ b/src/services/data_service.rs @@ -33,7 +33,7 @@ pub trait DataService { &self, conn: &mut Connection, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result, Self::Error>; fn delete(&self, conn: &mut Connection, id: i64) -> Result; @@ -43,7 +43,7 @@ pub trait DataService { conn: &mut Connection, ids: Vec, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result; fn get_items( @@ -51,7 +51,7 @@ pub trait DataService { conn: &mut Connection, ids: &[i64], tags: &[String], - meta: &HashMap, + meta: &HashMap>, ) -> Result, Self::Error>; fn generate_status( diff --git a/src/services/item_service.rs b/src/services/item_service.rs index ca5e111..4401f86 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -398,7 +398,7 @@ impl ItemService { conn: &Connection, ids: &[i64], tags: &[String], - meta: &HashMap, + meta: &HashMap>, ) -> Result { debug!("ITEM_SERVICE: Finding item with ids: {ids:?}, tags: {tags:?}, meta: {meta:?}"); let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) { @@ -466,7 +466,7 @@ impl ItemService { &self, conn: &Connection, tags: &[String], - meta: &HashMap, + meta: &HashMap>, ) -> Result, CoreError> { debug!("ITEM_SERVICE: Listing items with tags: {tags:?}, meta: {meta:?}"); let items = db::get_items_matching(conn, &tags.to_vec(), meta)?; @@ -623,6 +623,13 @@ impl ItemService { for (k, v) in item_meta.iter() { db::add_meta(conn, item_id, k, v)?; } + // Store user-specified metadata from --meta CLI flags + for (key, value) in &settings.meta { + if let Some(v) = value { + debug!("ITEM_SERVICE: Setting user meta {key}={v}"); + db::add_meta(conn, item_id, key, v)?; + } + } } // Print the "KEEP: New item" message before starting to read input diff --git a/src/services/sync_data_service.rs b/src/services/sync_data_service.rs index f6a58ca..eeae442 100644 --- a/src/services/sync_data_service.rs +++ b/src/services/sync_data_service.rs @@ -208,7 +208,7 @@ impl SyncDataService { &self, conn: &mut Connection, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result, CoreError> { self.item_service.list_items(conn, &tags, &meta) } @@ -225,7 +225,7 @@ impl SyncDataService { conn: &mut Connection, ids: Vec, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result { self.item_service.find_item(conn, &ids, &tags, &meta) } @@ -279,7 +279,7 @@ impl DataService for SyncDataService { &self, conn: &mut Connection, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result, Self::Error> { self.list_items(conn, tags, meta) } @@ -293,7 +293,7 @@ impl DataService for SyncDataService { conn: &mut Connection, ids: Vec, tags: Vec, - meta: HashMap, + meta: HashMap>, ) -> Result { self.find_item(conn, ids, tags, meta) } @@ -303,7 +303,7 @@ impl DataService for SyncDataService { conn: &mut Connection, ids: &[i64], tags: &[String], - meta: &HashMap, + meta: &HashMap>, ) -> Result, Self::Error> { if ids.is_empty() { return self.list_items(conn, tags.to_vec(), meta.clone());