diff --git a/DESIGN.md b/DESIGN.md index 63571eb..610aca1 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -33,7 +33,7 @@ - `modes/status.rs` - Show system status and capabilities - `modes/server.rs` - REST HTTP/HTTPS server mode with OpenAPI documentation - `modes/client.rs` - Client mode for remote server (streaming save, local decompression) -- `modes/common.rs` - Shared utilities for all modes +- `modes/common.rs` - Shared utilities for all modes (OutputFormat, table creation, `print_serialized`, `build_path_table`, `ensure_default_tag`, `render_item_info_table`, `render_list_table_with_format`) ### Database Module - `db.rs` - SQLite database operations @@ -49,24 +49,31 @@ - `compression_engine/program.rs` - External program wrapper ### Meta Plugin Module -- `meta_plugin.rs` - Trait and type definitions +- `meta_plugin.rs` - Trait and type definitions, `SaveMetaFn` callback type - `meta_plugin/program.rs` - External program wrapper - `meta_plugin/digest.rs` - Internal digest implementations - `meta_plugin/system.rs` - System information metadata plugins +**SaveMetaFn Architecture**: Meta plugins are decoupled from direct DB access via a `SaveMetaFn` callback (`Arc>`). The callback is injected at `MetaService` construction and propagated to all plugins via `BaseMetaPlugin`. This enables: +- **Local mode**: Callback collects metadata into a `Vec`, written to DB after plugins finish +- **Client mode**: Callback collects into a `HashMap`, sent to server after streaming completes +- **Server mode**: Callback collects into a `Vec`, written to DB after plugins finish (same as local) + ### Common Modules - `common/is_binary.rs` - Binary file detection utilities - `common/status.rs` - Status information generation +- `common/mod.rs` - `PIPESIZE` constant (8192), `stream_copy()` streaming utility ### Client Module - `client.rs` - HTTP client wrapper (ureq-based, supports streaming POST) -- `modes/client/save.rs` - 3-thread streaming save (stdin → tee → compress → pipe → HTTP POST) +- `modes/client/save.rs` - 3-thread streaming save with local meta plugins (stdin → tee → compress → meta plugins → pipe → HTTP POST) - `modes/client/get.rs` - Get with server-side raw fetch + local decompression - `modes/client/list.rs` - List delegation to server - `modes/client/info.rs` - Info delegation to server - `modes/client/delete.rs` - Delete delegation to server - `modes/client/diff.rs` - Diff delegation to server - `modes/client/status.rs` - Status delegation to server +- `modes/client/update.rs` - Update delegation to server (sends plugin names/metadata/tags) ### Utility Modules - `plugins.rs` - Shared plugin utilities @@ -130,6 +137,7 @@ - `GET /api/item/` - Get a list of items as JSON. Optional params: `order=newest|oldest`, `start=0`, `count=100`, `tags=tag1,tag2` - `POST /api/item/` - Add a new item (body: raw content, **streamed** through fixed-size 8192-byte buffers). Query params: `tags`, `metadata` (JSON), `compress=true|false`, `meta=true|false` - `POST /api/item/<#>/meta` - Add metadata to an existing item (body: JSON object) +- `POST /api/item/<#>/update` - Re-run meta plugins on stored content. Query params: `plugins` (comma-separated), `metadata` (JSON), `tags` (comma-separated, idempotent) - `DELETE /api/item/<#>` - Delete an item - `GET /api/item/latest` - Return the latest item as JSON. Optional params: `tags=tag1,tag2`, `allow_binary=true|false` - `GET /api/item/latest/meta` - Return the latest item metadata as JSON. Optional params: `tags=tag1,tag2` @@ -148,8 +156,9 @@ - Conditional selection at startup: cert+key present → HTTPS, otherwise → HTTP ### Client/Server Protocol -- Smart clients (keep CLI) set `compress=false` and `meta=false` on POST, handling compression/metadata locally +- Smart clients (keep CLI) set `compress=false` and `meta=false` on POST, handling compression and meta plugins locally - Dumb clients (curl) use defaults (`compress=true`, `meta=true`), server handles everything +- Smart client update: sends `plugins` param to server, server runs plugins on stored content (avoids downloading compressed data) - GET responses include `X-Keep-Compression` header when `decompress=false` - Streaming save uses chunked transfer encoding for constant memory usage - **Universal streaming**: All server paths (POST, GET, diff) use `PIPESIZE` (8192) byte buffers diff --git a/README.md b/README.md index a72b019..7530752 100644 --- a/README.md +++ b/README.md @@ -256,7 +256,7 @@ keep --info --meta key=value ### Update Mode -Update an item's tags and metadata. +Update an item's tags, metadata, and re-run meta plugins. ```sh # Replace tags @@ -267,6 +267,9 @@ keep --update 1 --meta key=newvalue # Remove a metadata key keep --update 1 --meta key + +# Re-run meta plugins on stored content +keep --update 1 --meta-plugin digest --meta-plugin text ``` ### Delete Mode @@ -706,6 +709,14 @@ The server supports query parameters that control processing: | `meta` | `true` | `false` = client handles metadata, skip server-side plugins | | `decompress` | `true` | `false` = return raw compressed bytes on GET | +The `POST /api/item/{id}/update` endpoint accepts additional parameters: + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `plugins` | none | Comma-separated plugin names to re-run on stored content | +| `metadata` | none | JSON-encoded metadata overrides to apply | +| `tags` | none | Comma-separated tags to add (idempotent) | + When using a smart client, these are set automatically. For curl, the server handles everything by default. #### Example: Curl as a Dumb Client @@ -750,9 +761,10 @@ export KEEP_CLIENT_JWT= Client mode uses **local plugins** and **remote storage**: -1. **Save**: Local compression and metadata plugins run on the client; compressed data streams to the server +1. **Save**: Local compression and meta plugins run on the client; compressed data streams to the server. Smart clients set `meta=false` so the server skips its own plugins. 2. **Get**: Server sends raw compressed data; client decompresses locally and applies filters -3. **Other operations** (list, info, delete, diff): Delegated directly to the server +3. **Update**: Meta plugins run on the server to avoid downloading compressed data for re-processing +4. **Other operations** (list, info, delete, diff): Delegated directly to the server This means client behavior is consistent with local mode — the same compression settings and filters apply. @@ -761,22 +773,23 @@ This means client behavior is consistent with local mode — the same compressio Client save uses a 3-thread streaming pipeline for constant memory usage regardless of data size: ``` -┌───────────────┐ OS pipe ┌────────────────┐ -│ Reader thread ├──────────────────┤ Streamer thread│ -│ │ (compressed │ │ -│ stdin → tee │ bytes) │ pipe → POST │ -│ → hash │ │ (chunked) │ -│ → compress │ │ │ -└───────────────┘ └────────────────┘ +┌───────────────────┐ OS pipe ┌────────────────┐ +│ Reader thread ├──────────────────┤ Streamer thread│ +│ │ (compressed │ │ +│ stdin → tee │ bytes) │ pipe → POST │ +│ → hash │ │ (chunked) │ +│ → compress │ │ │ +│ → meta plugins │ │ │ +└───────────────────┘ └────────────────┘ │ │ ▼ ▼ stdout + Server stores blob - SHA-256 digest + computed metadata ``` -- **Reader thread**: Reads stdin, tees output to stdout, computes SHA-256, compresses data, writes to OS pipe +- **Reader thread**: Reads stdin, tees output to stdout, computes SHA-256 via digest plugin, compresses data, runs meta plugins (hostname, text, etc.), writes to OS pipe - **Streamer thread**: Reads compressed bytes from pipe, streams to server via chunked HTTP POST -- **Main thread**: After streaming completes, sends computed metadata (digest, hostname, size) to server +- **Main thread**: After streaming completes, sends plugin-collected metadata to server Memory usage is O(PIPESIZE) — typically 8 KB — regardless of how much data is being stored. @@ -811,6 +824,7 @@ keep --client-url http://logserver:21080 --list --meta project=myapp | `GET` | `/api/item/{id}/meta` | Item metadata by ID | | `GET` | `/api/item/{id}/info` | Item info by ID | | `POST` | `/api/item/{id}/meta` | Add metadata to existing item (body: JSON object) | +| `POST` | `/api/item/{id}/update` | Re-run meta plugins on stored content (params: `plugins`, `metadata`, `tags`) | | `DELETE` | `/api/item/{id}` | Delete item by ID | | `GET` | `/api/diff` | Diff two items (`id_a`, `id_b` params) | diff --git a/src/common/mod.rs b/src/common/mod.rs index f591b83..2c3f5ca 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -8,3 +8,21 @@ pub mod schema; /// Standard buffer size for I/O operations (8KB) pub const PIPESIZE: usize = 8192; + +/// Reads chunks from `reader` until EOF, passing each chunk to `f`. +/// +/// Uses a fixed PIPESIZE buffer to ensure bounded memory usage. +pub fn stream_copy( + reader: &mut R, + mut f: impl FnMut(&[u8]) -> std::io::Result<()>, +) -> std::io::Result<()> { + let mut buffer = [0u8; PIPESIZE]; + loop { + let n = reader.read(&mut buffer)?; + if n == 0 { + break; + } + f(&buffer[..n])?; + } + Ok(()) +} diff --git a/src/db.rs b/src/db.rs index 57664ba..0d6d86a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -337,6 +337,18 @@ pub fn add_tag(conn: &Connection, item_id: i64, tag_name: &str) -> Result<()> { insert_tag(conn, tag) } +/// Adds a tag to an item, ignoring if the tag already exists. +/// +/// Uses `INSERT OR IGNORE` to make the operation idempotent. +pub fn upsert_tag(conn: &Connection, item_id: i64, tag_name: &str) -> Result<()> { + debug!("DB: Upserting tag: item={item_id}, tag={tag_name}"); + conn.execute( + "INSERT OR IGNORE INTO tags (id, name) VALUES (?1, ?2)", + params![item_id, tag_name], + )?; + Ok(()) +} + /// Adds metadata to an item. /// /// Inserts a new metadata entry in the `metas` table. diff --git a/src/main.rs b/src/main.rs index 0ca38e0..95dceb7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -255,6 +255,9 @@ fn main() -> Result<(), Error> { KeepModes::Status => { keep::modes::client::status::mode(&client, &mut cmd, &settings) } + KeepModes::Update => { + keep::modes::client::update::mode(&client, &mut cmd, &settings, ids, tags) + } _ => { cmd.error( ErrorKind::InvalidValue, diff --git a/src/meta_plugin/cwd.rs b/src/meta_plugin/cwd.rs index 70601a1..d391793 100644 --- a/src/meta_plugin/cwd.rs +++ b/src/meta_plugin/cwd.rs @@ -49,6 +49,14 @@ impl MetaPlugin for CwdMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + fn finalize(&mut self) -> crate::meta_plugin::MetaPluginResponse { // If already finalized, don't process again if self.is_finalized { diff --git a/src/meta_plugin/digest.rs b/src/meta_plugin/digest.rs index 2753ec0..3f77301 100644 --- a/src/meta_plugin/digest.rs +++ b/src/meta_plugin/digest.rs @@ -159,6 +159,14 @@ impl MetaPlugin for DigestMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + fn initialize(&mut self) -> crate::meta_plugin::MetaPluginResponse { crate::meta_plugin::MetaPluginResponse { metadata: Vec::new(), diff --git a/src/meta_plugin/env.rs b/src/meta_plugin/env.rs index 55b36fc..014575f 100644 --- a/src/meta_plugin/env.rs +++ b/src/meta_plugin/env.rs @@ -103,6 +103,14 @@ impl MetaPlugin for EnvMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + /// Initializes the plugin, processing environment variables. /// /// Processes all KEEP_META_* variables and generates metadata using output mappings. diff --git a/src/meta_plugin/exec.rs b/src/meta_plugin/exec.rs index 0fb478c..f66a335 100644 --- a/src/meta_plugin/exec.rs +++ b/src/meta_plugin/exec.rs @@ -167,6 +167,14 @@ impl MetaPlugin for MetaPluginExec { false } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + fn initialize(&mut self) -> MetaPluginResponse { self.start_process() } diff --git a/src/meta_plugin/hostname.rs b/src/meta_plugin/hostname.rs index dedf88f..86b0bf0 100644 --- a/src/meta_plugin/hostname.rs +++ b/src/meta_plugin/hostname.rs @@ -211,6 +211,14 @@ impl MetaPlugin for HostnameMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + fn finalize(&mut self) -> crate::meta_plugin::MetaPluginResponse { // If already finalized, don't process again if self.is_finalized { diff --git a/src/meta_plugin/keep_pid.rs b/src/meta_plugin/keep_pid.rs index be10150..0d167de 100644 --- a/src/meta_plugin/keep_pid.rs +++ b/src/meta_plugin/keep_pid.rs @@ -54,6 +54,14 @@ impl MetaPlugin for KeepPidMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + /// Finalizes the plugin, processing any remaining data if needed. /// /// # Returns diff --git a/src/meta_plugin/magic_file.rs b/src/meta_plugin/magic_file.rs index 8e82007..89ee2b0 100644 --- a/src/meta_plugin/magic_file.rs +++ b/src/meta_plugin/magic_file.rs @@ -123,6 +123,14 @@ impl MetaPlugin for MagicFileMetaPluginImpl { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + fn initialize(&mut self) -> MetaPluginResponse { // Cookie is lazily initialized in the thread-local on first use. MetaPluginResponse { @@ -335,6 +343,14 @@ impl MetaPlugin for FallbackMagicFileMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + fn initialize(&mut self) -> MetaPluginResponse { MetaPluginResponse { metadata: Vec::new(), diff --git a/src/meta_plugin/mod.rs b/src/meta_plugin/mod.rs index 36d61dc..4a044e8 100644 --- a/src/meta_plugin/mod.rs +++ b/src/meta_plugin/mod.rs @@ -1,8 +1,8 @@ -use log::debug; +use log::{debug, warn}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; pub mod cwd; pub mod digest; @@ -61,8 +61,16 @@ pub struct MetaPluginResponse { pub is_finalized: bool, } +/// Type alias for the save_meta callback shared by all plugins. +pub type SaveMetaFn = Arc>; + +/// Creates a no-op save_meta for plugins not wired through MetaService. +pub fn noop_save_meta() -> SaveMetaFn { + Arc::new(Mutex::new(|_: &str, _: &str| {})) +} + /// Base implementation for meta plugins to reduce boilerplate. -#[derive(Debug, Clone, Default)] +#[derive(Clone)] pub struct BaseMetaPlugin { /// Output mappings for metadata. pub outputs: std::collections::HashMap, @@ -70,6 +78,29 @@ pub struct BaseMetaPlugin { pub options: std::collections::HashMap, /// Whether the plugin is finalized. pub is_finalized: bool, + /// Callback to store metadata. Called directly by plugins. + pub save_meta: SaveMetaFn, +} + +impl std::fmt::Debug for BaseMetaPlugin { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BaseMetaPlugin") + .field("outputs", &self.outputs) + .field("options", &self.options) + .field("is_finalized", &self.is_finalized) + .finish_non_exhaustive() + } +} + +impl Default for BaseMetaPlugin { + fn default() -> Self { + Self { + outputs: HashMap::new(), + options: HashMap::new(), + is_finalized: false, + save_meta: noop_save_meta(), + } + } } impl BaseMetaPlugin { @@ -83,41 +114,39 @@ impl BaseMetaPlugin { } /// Returns a reference to the outputs mapping. - /// - /// # Returns - /// - /// A reference to the `HashMap` of outputs. pub fn outputs(&self) -> &std::collections::HashMap { &self.outputs } /// Returns a mutable reference to the outputs mapping. - /// - /// # Returns - /// - /// A mutable reference to the `HashMap` of outputs. pub fn outputs_mut(&mut self) -> &mut std::collections::HashMap { &mut self.outputs } /// Returns a reference to the options mapping. - /// - /// # Returns - /// - /// A reference to the `HashMap` of options. pub fn options(&self) -> &std::collections::HashMap { &self.options } /// Returns a mutable reference to the options mapping. - /// - /// # Returns - /// - /// A mutable reference to the `HashMap` of options. pub fn options_mut(&mut self) -> &mut std::collections::HashMap { &mut self.options } + /// Sets the save_meta callback on the base plugin. + pub fn set_save_meta(&mut self, save_meta: SaveMetaFn) { + self.save_meta = save_meta; + } + + /// Saves a metadata entry via the save_meta callback. + pub fn save_meta(&self, name: &str, value: &str) { + if let Ok(mut f) = self.save_meta.lock() { + f(name, value); + } else { + warn!("META_PLUGIN: save_meta lock poisoned, dropping metadata: {name}={value}"); + } + } + /// Helper function to initialize plugin options and outputs. /// /// # Arguments @@ -566,6 +595,16 @@ where { self } + + /// Sets the save_meta callback for this plugin. + /// + /// Called by MetaService to wire the plugin to the metadata storage. + fn set_save_meta(&mut self, _save_meta: SaveMetaFn) {} + + /// Saves a metadata entry via the save_meta callback. + /// + /// Plugins call this during initialize/update/finalize to persist metadata. + fn save_meta(&self, _name: &str, _value: &str) {} } /// Global registry for meta plugins. @@ -593,12 +632,29 @@ pub fn get_meta_plugin( meta_plugin_type: MetaPluginType, options: Option>, outputs: Option>, +) -> anyhow::Result> { + get_meta_plugin_with_save(meta_plugin_type, options, outputs, None) +} + +/// Creates a meta plugin instance with an optional save_meta callback. +/// +/// If `save_meta` is provided, it is wired to the plugin so it can +/// store metadata directly during initialize/update/finalize. +pub fn get_meta_plugin_with_save( + meta_plugin_type: MetaPluginType, + options: Option>, + outputs: Option>, + save_meta: Option, ) -> anyhow::Result> { let registry = META_PLUGIN_REGISTRY .lock() .map_err(|e| anyhow::anyhow!("plugin registry poisoned: {e}"))?; if let Some(constructor) = registry.get(&meta_plugin_type) { - return Ok(constructor(options, outputs)); + let mut plugin = constructor(options, outputs); + if let Some(sm) = save_meta { + plugin.set_save_meta(sm); + } + return Ok(plugin); } anyhow::bail!("Meta plugin {meta_plugin_type:?} not registered") diff --git a/src/meta_plugin/read_rate.rs b/src/meta_plugin/read_rate.rs index 802d494..a868066 100644 --- a/src/meta_plugin/read_rate.rs +++ b/src/meta_plugin/read_rate.rs @@ -84,6 +84,14 @@ impl MetaPlugin for ReadRateMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + /// Finalizes the plugin, calculating the read rate. /// /// Computes KB/s from bytes read and elapsed time. Outputs via mappings. diff --git a/src/meta_plugin/read_time.rs b/src/meta_plugin/read_time.rs index a7369f1..4744b94 100644 --- a/src/meta_plugin/read_time.rs +++ b/src/meta_plugin/read_time.rs @@ -37,6 +37,14 @@ impl MetaPlugin for ReadTimeMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + fn finalize(&mut self) -> crate::meta_plugin::MetaPluginResponse { // If already finalized, don't process again if self.is_finalized { diff --git a/src/meta_plugin/shell.rs b/src/meta_plugin/shell.rs index e971d11..226950b 100644 --- a/src/meta_plugin/shell.rs +++ b/src/meta_plugin/shell.rs @@ -70,6 +70,14 @@ impl MetaPlugin for ShellMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + /// Finalizes the plugin without processing data. /// /// For this plugin, finalization is handled in `initialize`, so this returns empty metadata. diff --git a/src/meta_plugin/shell_pid.rs b/src/meta_plugin/shell_pid.rs index dc5458d..823f06b 100644 --- a/src/meta_plugin/shell_pid.rs +++ b/src/meta_plugin/shell_pid.rs @@ -35,6 +35,14 @@ impl MetaPlugin for ShellPidMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + fn finalize(&mut self) -> crate::meta_plugin::MetaPluginResponse { // If already finalized, don't process again if self.is_finalized { diff --git a/src/meta_plugin/text.rs b/src/meta_plugin/text.rs index 101346a..321eb08 100644 --- a/src/meta_plugin/text.rs +++ b/src/meta_plugin/text.rs @@ -510,6 +510,14 @@ impl MetaPlugin for TextMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + /// Updates the plugin with new data chunk. /// /// Accumulates data for binary detection (if pending) or text statistics. diff --git a/src/meta_plugin/tokens.rs b/src/meta_plugin/tokens.rs index 837b116..174efd6 100644 --- a/src/meta_plugin/tokens.rs +++ b/src/meta_plugin/tokens.rs @@ -148,6 +148,14 @@ impl MetaPlugin for TokensMetaPlugin { self.is_finalized = finalized; } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + fn update(&mut self, data: &[u8]) -> MetaPluginResponse { if self.is_finalized { return MetaPluginResponse { diff --git a/src/meta_plugin/user.rs b/src/meta_plugin/user.rs index f6fbf55..fba7abb 100644 --- a/src/meta_plugin/user.rs +++ b/src/meta_plugin/user.rs @@ -105,6 +105,14 @@ impl MetaPlugin for UserMetaPlugin { MetaPluginType::User } + fn set_save_meta(&mut self, save_meta: crate::meta_plugin::SaveMetaFn) { + self.base.set_save_meta(save_meta); + } + + fn save_meta(&self, name: &str, value: &str) { + self.base.save_meta(name, value); + } + /// Returns a reference to the outputs mapping. /// /// # Returns diff --git a/src/modes/client/info.rs b/src/modes/client/info.rs index 8352058..641d54d 100644 --- a/src/modes/client/info.rs +++ b/src/modes/client/info.rs @@ -31,11 +31,8 @@ pub fn mode( let item = client.get_item_info(id)?; match output_format { - OutputFormat::Json => { - println!("{}", serde_json::to_string_pretty(&item)?); - } - OutputFormat::Yaml => { - println!("{}", serde_yaml::to_string(&item)?); + OutputFormat::Json | OutputFormat::Yaml => { + crate::modes::common::print_serialized(&item, &output_format)?; } OutputFormat::Table => { let display = DisplayItemInfo { diff --git a/src/modes/client/list.rs b/src/modes/client/list.rs index 717326a..873e7e7 100644 --- a/src/modes/client/list.rs +++ b/src/modes/client/list.rs @@ -24,11 +24,8 @@ pub fn mode( let output_format = settings_output_format(settings); match output_format { - OutputFormat::Json => { - println!("{}", serde_json::to_string_pretty(&items)?); - } - OutputFormat::Yaml => { - println!("{}", serde_yaml::to_string(&items)?); + OutputFormat::Json | OutputFormat::Yaml => { + crate::modes::common::print_serialized(&items, &output_format)?; } OutputFormat::Table => { let rows: Vec> = items diff --git a/src/modes/client/mod.rs b/src/modes/client/mod.rs index bfff5b6..eb6084c 100644 --- a/src/modes/client/mod.rs +++ b/src/modes/client/mod.rs @@ -5,3 +5,4 @@ pub mod info; pub mod list; pub mod save; pub mod status; +pub mod update; diff --git a/src/modes/client/save.rs b/src/modes/client/save.rs index 076b26b..e42d06b 100644 --- a/src/modes/client/save.rs +++ b/src/modes/client/save.rs @@ -1,12 +1,13 @@ use crate::client::{ItemInfo, KeepClient}; use crate::compression_engine::CompressionType; use crate::config::Settings; +use crate::meta_plugin::SaveMetaFn; use crate::modes::common::settings_compression_type; +use crate::services::meta_service::MetaService; use anyhow::Result; use clap::Command; use is_terminal::IsTerminal; use log::debug; -use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::io::{Read, Write}; use std::sync::{Arc, Mutex}; @@ -14,11 +15,14 @@ use std::sync::{Arc, Mutex}; /// Streaming save mode for client. /// /// Uses three threads for true streaming with constant memory: -/// - Reader thread: reads stdin, tees to stdout, computes SHA-256, +/// - Reader thread: reads stdin, tees to stdout, runs meta plugins, /// compresses data, writes to OS pipe /// - Pipe: zero-copy transfer of compressed bytes between threads /// - Streamer thread: reads from pipe, streams to server via chunked HTTP /// +/// Meta plugins run on the client side during streaming. Collected metadata +/// is sent to the server via a separate POST after streaming completes. +/// /// Memory usage is O(PIPESIZE) regardless of data size. pub fn mode( client: &KeepClient, @@ -29,33 +33,42 @@ pub fn mode( ) -> Result<(), anyhow::Error> { debug!("CLIENT_SAVE: Saving item via remote server (streaming)"); - if tags.is_empty() { - tags.push("none".to_string()); - } + crate::modes::common::ensure_default_tag(tags); // Determine compression type from settings let compression_type = settings_compression_type(cmd, settings); let server_compress = matches!(compression_type, CompressionType::None); + // Shared metadata collection: plugins write here via save_meta closure + let collected_meta: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let meta_collector = collected_meta.clone(); + let save_meta: SaveMetaFn = Arc::new(Mutex::new(move |name: &str, value: &str| { + if let Ok(mut map) = meta_collector.lock() { + map.insert(name.to_string(), value.to_string()); + } + })); + + // Create MetaService and get plugins (must happen before spawning reader thread) + let meta_service = MetaService::new(save_meta); + let mut plugins = meta_service.get_plugins(cmd, settings); + // Create OS pipe for streaming compressed bytes between threads let (pipe_reader, pipe_writer) = os_pipe::pipe()?; - // Shared state for reader thread results - let shared = Arc::new(Mutex::new((0u64, String::new()))); - let shared_reader = Arc::clone(&shared); - - // Reader thread: stdin → tee(stdout) → hash → compress → pipe + // Reader thread: stdin → tee(stdout) → meta plugins → compress → pipe let compression_type_clone = compression_type.clone(); - let reader_handle = std::thread::spawn(move || -> Result<(u64, String)> { + let reader_handle = std::thread::spawn(move || -> Result { let stdin = std::io::stdin(); let stdout = std::io::stdout(); let mut stdin_lock = stdin.lock(); let mut stdout_lock = stdout.lock(); - let mut hasher = Sha256::new(); let mut total_bytes = 0u64; let mut buffer = [0u8; 8192]; + // Initialize meta plugins + meta_service.initialize_plugins(&mut plugins); + // Wrap pipe writer with appropriate compression let mut compressor: Box = match compression_type_clone { CompressionType::GZip => { @@ -76,29 +89,23 @@ pub fn mode( // Tee to stdout stdout_lock.write_all(&buffer[..n])?; - // Update hash - hasher.update(&buffer[..n]); + // Feed chunk to meta plugins + meta_service.process_chunk(&mut plugins, &buffer[..n]); + total_bytes += n as u64; // Compress and write to pipe compressor.write_all(&buffer[..n])?; } + // Finalize meta plugins (digest, text, tokens produce final output here) + meta_service.finalize_plugins(&mut plugins); + // Explicitly flush and finalize compression before dropping. - // LZ4 FrameEncoder buffers data internally; without explicit flush, - // only the frame header (7 bytes) gets written to the pipe. compressor.flush()?; drop(compressor); - // Pipe writer is now dropped (inside compressor), signaling EOF to streamer - - let digest = format!("{:x}", hasher.finalize()); - - // Set shared state for main thread - let mut shared = shared_reader.lock().expect("client save mutex poisoned"); - *shared = (total_bytes, digest.clone()); - - Ok((total_bytes, digest)) + Ok(total_bytes) }); // Streamer thread: reads compressed bytes from pipe → POST to server @@ -132,28 +139,27 @@ pub fn mode( .map_err(|e| anyhow::anyhow!("Streamer thread panicked: {:?}", e))??; // Wait for reader thread (should complete quickly after pipe is drained) - reader_handle + let uncompressed_size = reader_handle .join() .map_err(|e| anyhow::anyhow!("Reader thread panicked: {:?}", e))??; - // Read results from shared state - let (uncompressed_size, digest) = { - let shared = shared.lock().expect("client save mutex poisoned"); - shared.clone() - }; - - // Build local metadata and send to server + // Merge plugin-collected metadata with CLI metadata let mut local_metadata = metadata; - local_metadata.insert("digest_sha256".to_string(), digest); + + // Add plugin-collected metadata (digest, hostname, text stats, etc.) + if let Ok(plugin_meta) = collected_meta.lock() { + for (k, v) in plugin_meta.iter() { + local_metadata.entry(k.clone()).or_insert_with(|| v.clone()); + } + } + + // Add uncompressed_size (always tracked by client) local_metadata.insert( "uncompressed_size".to_string(), uncompressed_size.to_string(), ); // Record client compression type so the client can decompress on retrieval. - // When compress=false, the server stores the blob as-is with compression=None. - // Without this metadata, the client would get compressed bytes back but think - // they're uncompressed. if !matches!(compression_type, CompressionType::None) { local_metadata.insert( "_client_compression".to_string(), @@ -161,13 +167,6 @@ pub fn mode( ); } - // Add hostname - if let Ok(hostname) = gethostname::gethostname().into_string() { - local_metadata.insert("hostname".to_string(), hostname.clone()); - let short = hostname.split('.').next().unwrap_or(&hostname).to_string(); - local_metadata.insert("hostname_short".to_string(), short); - } - // Send metadata to server if !local_metadata.is_empty() { client.post_metadata(item_info.id, &local_metadata)?; diff --git a/src/modes/client/status.rs b/src/modes/client/status.rs index a334b7b..5f7d71e 100644 --- a/src/modes/client/status.rs +++ b/src/modes/client/status.rs @@ -17,22 +17,13 @@ pub fn mode( let output_format = settings_output_format(settings); match output_format { - OutputFormat::Json => { - println!("{}", serde_json::to_string_pretty(&status_info)?); - } - OutputFormat::Yaml => { - println!("{}", serde_yaml::to_string(&status_info)?); + OutputFormat::Json | OutputFormat::Yaml => { + crate::modes::common::print_serialized(&status_info, &output_format)?; } OutputFormat::Table => { // Paths - let mut path_table = - crate::modes::common::create_table_with_config(&settings.table_config); - path_table.set_header(vec![ - Cell::new("Type").add_attribute(Attribute::Bold), - Cell::new("Path").add_attribute(Attribute::Bold), - ]); - path_table.add_row(vec!["Data", &status_info.paths.data]); - path_table.add_row(vec!["Database", &status_info.paths.database]); + let path_table = + crate::modes::common::build_path_table(&status_info.paths, &settings.table_config); println!("PATHS:"); println!( "{}", diff --git a/src/modes/client/update.rs b/src/modes/client/update.rs new file mode 100644 index 0000000..d7b9dcf --- /dev/null +++ b/src/modes/client/update.rs @@ -0,0 +1,102 @@ +use crate::client::KeepClient; +use crate::config::Settings; +use anyhow::Result; +use clap::Command; +use log::debug; +use std::collections::HashMap; + +/// Client update mode: runs meta plugins on the server for an existing item. +/// +/// Sends the list of plugin names (from --meta-plugin config) and any direct +/// metadata (--meta key=value) to the server. The server reads the stored file, +/// runs the specified plugins, and stores the results. +pub fn mode( + client: &KeepClient, + cmd: &mut Command, + settings: &Settings, + ids: &mut [i64], + tags: &mut [String], +) -> Result<(), anyhow::Error> { + debug!("CLIENT_UPDATE: Updating item via remote server"); + + if ids.len() != 1 { + cmd.error( + clap::error::ErrorKind::InvalidValue, + "--update requires exactly one numeric ID", + ) + .exit(); + } + + let item_id = ids[0]; + + // Collect plugin names from settings (--meta-plugin config) + let plugin_names: Vec = settings + .meta_plugins_names() + .into_iter() + .flat_map(|s| { + s.split(',') + .map(|p| p.trim().to_string()) + .collect::>() + }) + .filter(|p| !p.is_empty()) + .collect(); + + // Collect direct metadata from --meta flags + let metadata: HashMap = settings + .meta + .iter() + .filter_map(|(k, v)| v.as_ref().map(|val| (k.clone(), val.clone()))) + .collect(); + + // Build query params + let mut params: Vec<(String, String)> = Vec::new(); + + if !plugin_names.is_empty() { + params.push(("plugins".to_string(), plugin_names.join(","))); + } + + if !metadata.is_empty() { + let meta_json = serde_json::to_string(&metadata)?; + params.push(("metadata".to_string(), meta_json)); + } + + if !tags.is_empty() { + params.push(("tags".to_string(), tags.join(","))); + } + + // Nothing to update + if params.is_empty() { + if !settings.quiet { + eprintln!("KEEP: No changes specified for item {item_id}"); + } + return Ok(()); + } + + let param_refs: Vec<(&str, &str)> = params + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + + let url_path = format!("/api/item/{item_id}/update"); + + // POST to update endpoint + let _item_info = client.post_bytes(&url_path, &[], ¶m_refs)?; + + if !settings.quiet { + let mut parts = Vec::new(); + if !plugin_names.is_empty() { + parts.push(format!("plugins: {}", plugin_names.join(", "))); + } + if !metadata.is_empty() { + parts.push(format!("{} metadata", metadata.len())); + } + if !tags.is_empty() { + parts.push(format!("tags: {}", tags.join(" "))); + } + let action = parts.join(", "); + + eprintln!("KEEP: Updated item {item_id} ({action})"); + } + + Ok(()) +} diff --git a/src/modes/common.rs b/src/modes/common.rs index 4260834..bd5cb98 100644 --- a/src/modes/common.rs +++ b/src/modes/common.rs @@ -1,3 +1,4 @@ +use crate::common::status::PathInfo; use crate::compression_engine::CompressionType; /// Common utilities shared across different modes in the Keep application. /// @@ -564,6 +565,28 @@ pub fn apply_color(mut cell: Cell, color: &config::TableColor, is_foreground: bo cell } +/// Ensures tags has at least one entry, adding "none" if empty. +pub fn ensure_default_tag(tags: &mut Vec) { + if tags.is_empty() { + tags.push("none".to_string()); + } +} + +/// Prints a serializable value in JSON or YAML format based on output format. +/// +/// Only handles Json and Yaml variants; Table should be handled separately. +pub fn print_serialized( + value: &T, + format: &OutputFormat, +) -> anyhow::Result<()> { + match format { + OutputFormat::Json => println!("{}", serde_json::to_string_pretty(value)?), + OutputFormat::Yaml => println!("{}", serde_yaml::to_string(value)?), + OutputFormat::Table => unreachable!(), + } + Ok(()) +} + /// Applies config TableAttribute to a comfy-table Cell. pub fn apply_table_attribute(mut cell: Cell, attribute: &config::TableAttribute) -> Cell { match attribute { @@ -580,3 +603,18 @@ pub fn apply_table_attribute(mut cell: Cell, attribute: &config::TableAttribute) cell } + +/// Builds a table showing data and database path information. +pub fn build_path_table(path_info: &PathInfo, table_config: &config::TableConfig) -> Table { + let mut path_table = create_table_with_config(table_config); + + path_table.set_header(vec![ + Cell::new("Type").add_attribute(Attribute::Bold), + Cell::new("Path").add_attribute(Attribute::Bold), + ]); + + path_table.add_row(vec!["Data", &path_info.data]); + path_table.add_row(vec!["Database", &path_info.database]); + + path_table +} diff --git a/src/modes/diff.rs b/src/modes/diff.rs index 7b19d1f..5c5f238 100644 --- a/src/modes/diff.rs +++ b/src/modes/diff.rs @@ -3,7 +3,6 @@ /// This module provides functionality for comparing two items and displaying their /// differences using external diff tools. Decompressed content is streamed to diff /// via pipes and /dev/fd file descriptors — no temporary files are created. -use crate::common::PIPESIZE; use crate::config; use crate::services::compression_service::CompressionService; use crate::services::item_service::ItemService; @@ -118,17 +117,11 @@ fn spawn_writer_thread( // Convert OwnedFd to File — safe, takes ownership, closes on drop let mut writer = std::fs::File::from(write_fd); - let mut buf = [0u8; PIPESIZE]; - loop { - match reader.read(&mut buf) { - Ok(0) => break, - Ok(n) => { - use std::io::Write; - writer.write_all(&buf[..n])?; - } - Err(e) => return Err(anyhow::anyhow!("Error reading item {item_id}: {e}")), - } - } + crate::common::stream_copy(&mut reader, |chunk| { + use std::io::Write; + writer.write_all(chunk) + }) + .map_err(|e| anyhow::anyhow!("Error reading item {item_id}: {e}"))?; // writer dropped here, closing write_fd → diff sees EOF Ok(()) }) diff --git a/src/modes/get.rs b/src/modes/get.rs index c2949dc..cdb830a 100644 --- a/src/modes/get.rs +++ b/src/modes/get.rs @@ -108,13 +108,9 @@ pub fn mode_get( fn stream_to_stdout(mut reader: Box) -> Result<()> { let mut stdout = std::io::stdout(); - let mut buffer = [0; PIPESIZE]; - loop { - let bytes_read = reader.read(&mut buffer)?; - if bytes_read == 0 { - break; - } - stdout.write_all(&buffer[..bytes_read])?; - } + crate::common::stream_copy(&mut reader, |chunk| { + stdout.write_all(chunk)?; + Ok(()) + })?; Ok(()) } diff --git a/src/modes/info.rs b/src/modes/info.rs index ed753b8..32cea93 100644 --- a/src/modes/info.rs +++ b/src/modes/info.rs @@ -252,15 +252,7 @@ fn show_item_structured( meta: meta_map, }; - match output_format { - OutputFormat::Json => { - println!("{}", serde_json::to_string_pretty(&item_info)?); - } - OutputFormat::Yaml => { - println!("{}", serde_yaml::to_string(&item_info)?); - } - OutputFormat::Table => unreachable!(), - } + crate::modes::common::print_serialized(&item_info, &output_format)?; Ok(()) } diff --git a/src/modes/list.rs b/src/modes/list.rs index 15245f9..98a3487 100644 --- a/src/modes/list.rs +++ b/src/modes/list.rs @@ -306,15 +306,7 @@ fn show_list_structured( list_items.push(list_item); } - match output_format { - OutputFormat::Json => { - println!("{}", serde_json::to_string_pretty(&list_items)?); - } - OutputFormat::Yaml => { - println!("{}", serde_yaml::to_string(&list_items)?); - } - OutputFormat::Table => unreachable!(), - } + crate::modes::common::print_serialized(&list_items, &output_format)?; Ok(()) } diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index 9dc52f3..e493501 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -1425,3 +1425,88 @@ fn compute_diff(a: &[u8], b: &[u8]) -> Vec { diff_lines } + +/// Updates metadata for an existing item by re-running meta plugins on stored content. +/// +/// Reads the item's stored file, decompresses it, runs the specified meta plugins, +/// and stores the resulting metadata. Also applies any direct metadata overrides. +/// +/// # Arguments +/// +/// * `state` - Application state with DB connection and data directory. +/// * `item_id` - The item ID to update. +/// * `params` - Query parameters specifying plugins and metadata. +/// +/// # Returns +/// +/// JSON response with updated item info, or error status code. +pub async fn handle_update_item( + State(state): State, + Path(item_id): Path, + Query(params): Query, +) -> Result>, StatusCode> { + let db = state.db.clone(); + let data_dir = state.data_dir.clone(); + let settings = state.settings.clone(); + + // Parse plugin names + let plugin_names: Vec = params + .plugins + .as_deref() + .map(|s| { + s.split(',') + .map(|p| p.trim().to_string()) + .filter(|p| !p.is_empty()) + .collect() + }) + .unwrap_or_default(); + + // Parse metadata overrides + let metadata: HashMap = if let Some(ref meta_str) = params.metadata { + serde_json::from_str(meta_str).map_err(|e| { + warn!("Failed to parse metadata JSON string: {e}"); + StatusCode::BAD_REQUEST + })? + } else { + HashMap::new() + }; + + // Parse tags + let tags: Vec = params + .tags + .as_deref() + .map(crate::services::utils::parse_comma_tags) + .unwrap_or_default(); + + // Run in blocking task since we do file I/O and DB access + let result = task::spawn_blocking(move || -> Result { + let mut conn = db + .lock() + .map_err(|e| CoreError::Other(anyhow::anyhow!("Failed to acquire DB lock: {e}")))?; + + let sync_service = + crate::services::sync_data_service::SyncDataService::new(data_dir, (*settings).clone()); + + sync_service.update_item_plugins(&mut conn, item_id, &plugin_names, metadata, &tags) + }) + .await + .map_err(|e| { + warn!("Blocking task failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + match result { + Ok(item_with_meta) => { + let item_info: ItemInfo = item_with_meta.into(); + Ok(Json(ApiResponse { + data: Some(item_info), + error: None, + })) + } + Err(CoreError::ItemNotFound(_)) => Err(StatusCode::NOT_FOUND), + Err(e) => { + warn!("Failed to update item {item_id}: {e}"); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } +} diff --git a/src/modes/server/api/mod.rs b/src/modes/server/api/mod.rs index fcb062a..7a78453 100644 --- a/src/modes/server/api/mod.rs +++ b/src/modes/server/api/mod.rs @@ -85,6 +85,7 @@ pub fn add_routes(router: Router) -> Router { ) .route("/api/item/{item_id}", delete(item::handle_delete_item)) .route("/api/item/{item_id}/info", get(item::handle_get_item_info)) + .route("/api/item/{item_id}/update", post(item::handle_update_item)) .route("/api/diff", get(item::handle_diff_items)) } diff --git a/src/modes/server/common.rs b/src/modes/server/common.rs index 75f4fde..be62a51 100644 --- a/src/modes/server/common.rs +++ b/src/modes/server/common.rs @@ -641,6 +641,21 @@ pub struct CreateItemQuery { pub meta: bool, } +/// Query parameters for updating item metadata via POST. +/// +/// Query parameters for POST /api/item/{item_id}/update. +/// Re-runs specified meta plugins on the stored content and/or +/// applies direct metadata key-value overrides. +#[derive(Debug, Deserialize)] +pub struct UpdateItemQuery { + /// Optional comma-separated list of plugin names to re-run. + pub plugins: Option, + /// Optional metadata overrides as JSON string. + pub metadata: Option, + /// Optional comma-separated tags to add. + pub tags: Option, +} + /// Request body for creating a new item. /// /// Contains the content to store and optional tags. diff --git a/src/modes/status.rs b/src/modes/status.rs index f9ba839..2084156 100644 --- a/src/modes/status.rs +++ b/src/modes/status.rs @@ -10,24 +10,9 @@ use comfy_table::{Attribute, Cell, Table}; use serde_json; use serde_yaml; -use crate::common::status::PathInfo; use crate::meta_plugin::MetaPluginType; use crate::meta_plugin::get_meta_plugin; -fn build_path_table(path_info: &PathInfo, table_config: &config::TableConfig) -> Table { - let mut path_table = crate::modes::common::create_table_with_config(table_config); - - path_table.set_header(vec![ - Cell::new("Type").add_attribute(Attribute::Bold), - Cell::new("Path").add_attribute(Attribute::Bold), - ]); - - path_table.add_row(vec!["Data", &path_info.data]); - path_table.add_row(vec!["Database", &path_info.database]); - - path_table -} - fn build_config_table(settings: &config::Settings) -> Table { let mut config_table = crate::modes::common::create_table_with_config(&settings.table_config); @@ -215,7 +200,8 @@ pub fn mode_status( println!(); println!("PATHS:"); - let path_table = build_path_table(&status_info.paths, &settings.table_config); + let path_table = + crate::modes::common::build_path_table(&status_info.paths, &settings.table_config); println!( "{}", crate::modes::common::trim_lines_end(&path_table.trim_fmt()) @@ -240,12 +226,11 @@ pub fn mode_status( Ok(()) } OutputFormat::Json => { - // Create a subset for status info that includes everything - println!("{}", serde_json::to_string_pretty(&status_info)?); + crate::modes::common::print_serialized(&status_info, &output_format)?; Ok(()) } OutputFormat::Yaml => { - println!("{}", serde_yaml::to_string(&status_info)?); + crate::modes::common::print_serialized(&status_info, &output_format)?; Ok(()) } } diff --git a/src/modes/update.rs b/src/modes/update.rs index 0718a0f..4fc0c4e 100644 --- a/src/modes/update.rs +++ b/src/modes/update.rs @@ -6,9 +6,11 @@ use crate::common::PIPESIZE; use crate::config; use crate::db; use crate::services::compression_service::CompressionService; +use crate::services::meta_service::MetaService; use clap::Command; use log::debug; use rusqlite::Connection; +use std::sync::{Arc, Mutex}; /// Handles the update mode: modifies tags and metadata for an existing item by ID. /// @@ -93,6 +95,13 @@ pub fn mode_update( db::set_item_tags(conn, item.clone(), tags)?; } + // Run meta plugins if --meta-plugin flags are provided + let plugin_names = settings.meta_plugins_names(); + if !plugin_names.is_empty() { + debug!("UPDATE: Running meta plugins: {:?}", plugin_names); + run_meta_plugins_on_item(conn, cmd, settings, &data_path, &item, item_id)?; + } + // Backfill size if not set let mut updated_item = item.clone(); if item.size.is_none() { @@ -169,3 +178,59 @@ fn compute_item_size(data_path: &Path, item: &db::Item) -> Option { Some(total_bytes) } + +/// Runs meta plugins on an existing item's content and stores the results. +fn run_meta_plugins_on_item( + conn: &mut Connection, + cmd: &mut Command, + settings: &config::Settings, + data_path: &Path, + item: &db::Item, + item_id: i64, +) -> Result<()> { + let mut item_path = data_path.to_path_buf(); + item_path.push(item_id.to_string()); + + if !item_path.exists() { + debug!("UPDATE: Content file not found: {item_path:?}"); + return Ok(()); + } + + // Collect metadata in memory + let collected_meta: Arc>> = Arc::new(Mutex::new(Vec::new())); + let collector = collected_meta.clone(); + let save_meta: crate::meta_plugin::SaveMetaFn = + Arc::new(Mutex::new(move |name: &str, value: &str| { + if let Ok(mut v) = collector.lock() { + v.push((name.to_string(), value.to_string())); + } + })); + + let meta_service = MetaService::new(save_meta); + let mut plugins = meta_service.get_plugins(cmd, settings); + + if plugins.is_empty() { + return Ok(()); + } + + let compression_service = CompressionService::new(); + let mut reader = compression_service.stream_item_content(item_path, &item.compression)?; + + meta_service.initialize_plugins(&mut plugins); + + crate::common::stream_copy(&mut reader, |chunk| { + meta_service.process_chunk(&mut plugins, chunk); + Ok(()) + })?; + + meta_service.finalize_plugins(&mut plugins); + + // Write collected plugin metadata to DB + if let Ok(entries) = collected_meta.lock() { + for (name, value) in entries.iter() { + db::add_meta(conn, item_id, name, value)?; + } + } + + Ok(()) +} diff --git a/src/services/item_service.rs b/src/services/item_service.rs index c3d4285..c1b5718 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -1,4 +1,3 @@ -use crate::common::PIPESIZE; use crate::compression_engine::{CompressionType, get_compression_engine}; use crate::config::Settings; use crate::db::{self, Item, Meta}; @@ -28,8 +27,6 @@ pub struct ItemService { data_path: PathBuf, /// Service for handling compression and decompression. compression_service: CompressionService, - /// Service for managing metadata plugins. - meta_service: MetaService, /// Service for applying content filters. filter_service: FilterService, } @@ -59,7 +56,6 @@ impl ItemService { Self { data_path, compression_service: CompressionService::new(), - meta_service: MetaService::new(), filter_service: FilterService::new(), } } @@ -596,10 +592,8 @@ impl ItemService { conn: &mut Connection, ) -> Result { debug!("ITEM_SERVICE: Starting save_item with tags: {tags:?}"); - if tags.is_empty() { - tags.push("none".to_string()); - debug!("ITEM_SERVICE: No tags provided, using default 'none' tag"); - } + crate::modes::common::ensure_default_tag(tags); + debug!("ITEM_SERVICE: Tags after ensure_default: {tags:?}"); let compression_type = settings_compression_type(cmd, settings); debug!("ITEM_SERVICE: Using compression type: {compression_type:?}"); @@ -615,7 +609,7 @@ impl ItemService { debug!("ITEM_SERVICE: Created new item with id: {item_id}"); db::set_item_tags(conn, item.clone(), tags)?; debug!("ITEM_SERVICE: Set tags for item {item_id}"); - let item_meta = self.meta_service.collect_initial_meta(); + let item_meta = MetaService::collect_initial_meta_static(); debug!( "ITEM_SERVICE: Collected {} initial meta entries", item_meta.len() @@ -656,10 +650,23 @@ impl ItemService { } } - let mut plugins = self.meta_service.get_plugins(cmd, settings); + // Collect metadata from plugins into a Vec, then write to DB after plugins finish. + // This avoids capturing &Connection in the save_meta closure (which would need unsafe + // and wouldn't be Send for parallel plugins). + let collected_meta: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let collector = collected_meta.clone(); + let save_meta: crate::meta_plugin::SaveMetaFn = + std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| { + if let Ok(mut v) = collector.lock() { + v.push((name.to_string(), value.to_string())); + } + })); + + let meta_service = MetaService::new(save_meta); + let mut plugins = meta_service.get_plugins(cmd, settings); debug!("ITEM_SERVICE: Got {} meta plugins", plugins.len()); - self.meta_service - .initialize_plugins(&mut plugins, conn, item_id); + meta_service.initialize_plugins(&mut plugins); let mut item_path = self.data_path.clone(); item_path.push(item_id.to_string()); @@ -667,29 +674,29 @@ impl ItemService { let mut item_out = compression_engine.create(item_path.clone())?; - let mut buffer = [0; PIPESIZE]; - let mut total_bytes = 0; + let mut total_bytes: i64 = 0; debug!("ITEM_SERVICE: Starting to read and process input data"); - loop { - let n = input.read(&mut buffer)?; - if n == 0 { - break; - } - - total_bytes += n as i64; - item_out.write_all(&buffer[..n])?; - self.meta_service - .process_chunk(&mut plugins, &buffer[..n], conn, item_id); - } + crate::common::stream_copy(&mut input, |chunk| { + total_bytes += chunk.len() as i64; + item_out.write_all(chunk)?; + meta_service.process_chunk(&mut plugins, chunk); + Ok(()) + })?; debug!("ITEM_SERVICE: Processed {total_bytes} bytes total"); item_out.flush()?; drop(item_out); debug!("ITEM_SERVICE: Finalizing meta plugins"); - self.meta_service - .finalize_plugins(&mut plugins, conn, item_id); + meta_service.finalize_plugins(&mut plugins); + + // Write collected plugin metadata to DB + if let Ok(entries) = collected_meta.lock() { + for (name, value) in entries.iter() { + db::add_meta(conn, item_id, name, value)?; + } + } item.size = Some(total_bytes); db::update_item(conn, item.clone())?; diff --git a/src/services/meta_service.rs b/src/services/meta_service.rs index c52babd..9a50d60 100644 --- a/src/services/meta_service.rs +++ b/src/services/meta_service.rs @@ -1,12 +1,13 @@ use crate::config::Settings; -use crate::meta_plugin::{MetaPlugin, MetaPluginResponse, MetaPluginType}; +use crate::meta_plugin::{MetaPlugin, MetaPluginResponse, MetaPluginType, SaveMetaFn}; use crate::modes::common::settings_meta_plugin_types; use clap::Command; -use log::{debug, error}; -use rusqlite::Connection; +use log::{debug, error, warn}; use std::collections::HashMap; -pub struct MetaService; +pub struct MetaService { + save_meta: SaveMetaFn, +} /// Sentinel plugin used as a placeholder when extracting plugins for parallel /// execution. The original plugin is written back immediately after the threads @@ -22,9 +23,28 @@ fn replace_plugin(plugins: &mut [Box], i: usize) -> Box Self { - Self + /// Creates a new MetaService with the given save_meta callback. + /// + /// All plugins created by this service will share this callback for + /// persisting metadata. The callback is wrapped in Arc> so it + /// can be cloned into parallel-safe plugin threads. + pub fn new(save_meta: SaveMetaFn) -> Self { + Self { save_meta } } pub fn get_plugins(&self, cmd: &mut Command, settings: &Settings) -> Vec> { @@ -32,7 +52,7 @@ impl MetaService { let meta_plugin_types: Vec = settings_meta_plugin_types(cmd, settings); debug!("META_SERVICE: Meta plugin types from settings: {meta_plugin_types:?}"); - // Create plugins with their configuration + // Create plugins with their configuration and wire save_meta let meta_plugins: Vec> = meta_plugin_types .iter() .filter_map(|meta_plugin_type| { @@ -66,7 +86,12 @@ impl MetaService { (None, None) }; - match crate::meta_plugin::get_meta_plugin(meta_plugin_type.clone(), options, outputs) { + match crate::meta_plugin::get_meta_plugin_with_save( + meta_plugin_type.clone(), + options, + outputs, + Some(self.save_meta.clone()), + ) { Ok(plugin) => Some(plugin), Err(e) => { log::warn!("META_SERVICE: Failed to create plugin {meta_plugin_type:?}: {e}, skipping"); @@ -79,12 +104,7 @@ impl MetaService { meta_plugins } - pub fn initialize_plugins( - &self, - plugins: &mut [Box], - conn: &Connection, - item_id: i64, - ) { + pub fn initialize_plugins(&self, plugins: &mut [Box]) { // Check for duplicate output names before initializing plugins let mut output_names: std::collections::HashMap> = std::collections::HashMap::new(); @@ -135,7 +155,6 @@ impl MetaService { parallel_plugins.push(replace_plugin(plugins, i)); } - // Write results back to original slots sequentially (DB writes are serial) let (results, panicked): (Vec<(usize, MetaPluginResponse)>, Vec) = std::thread::scope(|s| { let handles: Vec<_> = parallel_plugins @@ -157,15 +176,13 @@ impl MetaService { }); for (j, response) in results { - store_plugin_metadata(conn, item_id, &response); + store_plugin_response(&response, &self.save_meta); let mut plugin = replace_plugin(&mut parallel_plugins, j); if response.is_finalized { plugin.set_finalized(true); } plugins[parallel_idx[j]] = plugin; } - // Panicked plugins: restore the NullMetaPlugin sentinel and - // mark it finalized so future phases skip it cleanly. for j in panicked { let mut plugin = replace_plugin(&mut parallel_plugins, j); plugin.set_finalized(true); @@ -176,20 +193,14 @@ impl MetaService { // Run sequential plugins for &i in &sequential_idx { let response = plugins[i].initialize(); - store_plugin_metadata(conn, item_id, &response); + store_plugin_response(&response, &self.save_meta); if response.is_finalized { plugins[i].set_finalized(true); } } } - pub fn process_chunk( - &self, - plugins: &mut [Box], - chunk: &[u8], - conn: &Connection, - item_id: i64, - ) { + pub fn process_chunk(&self, plugins: &mut [Box], chunk: &[u8]) { // Partition non-finalized plugins by parallel_safe let (parallel_idx, sequential_idx): (Vec, Vec) = plugins .iter() @@ -200,7 +211,6 @@ impl MetaService { // Run parallel-safe plugins concurrently on this chunk if !parallel_idx.is_empty() { - // Extract plugins by unique index into a flat Vec indexed by position let mut parallel_plugins: Vec> = Vec::with_capacity(parallel_idx.len()); for &i in ¶llel_idx { @@ -228,7 +238,7 @@ impl MetaService { }); for (j, response) in results { - store_plugin_metadata(conn, item_id, &response); + store_plugin_response(&response, &self.save_meta); let mut plugin = replace_plugin(&mut parallel_plugins, j); if response.is_finalized { plugin.set_finalized(true); @@ -245,26 +255,21 @@ impl MetaService { // Run sequential plugins for &i in &sequential_idx { let response = plugins[i].update(chunk); - store_plugin_metadata(conn, item_id, &response); + store_plugin_response(&response, &self.save_meta); if response.is_finalized { plugins[i].set_finalized(true); } } } - pub fn finalize_plugins( - &self, - plugins: &mut [Box], - conn: &Connection, - item_id: i64, - ) { + pub fn finalize_plugins(&self, plugins: &mut [Box]) { for meta_plugin in plugins.iter_mut() { if meta_plugin.is_finalized() { continue; } let response = meta_plugin.finalize(); - store_plugin_metadata(conn, item_id, &response); + store_plugin_response(&response, &self.save_meta); if response.is_finalized { meta_plugin.set_finalized(true); @@ -273,22 +278,12 @@ impl MetaService { } /// Collects initial metadata from environment variables and hostname. - /// - /// Gathers metadata from `KEEP_META_*` environment variables and adds hostname - /// if not already present. - /// - /// # Returns - /// - /// A `HashMap` of initial metadata key-value pairs. - /// - /// # Examples - /// - /// ``` - /// # use keep::services::MetaService; - /// let service = MetaService::new(); - /// let initial_meta = service.collect_initial_meta(); - /// ``` pub fn collect_initial_meta(&self) -> HashMap { + Self::collect_initial_meta_static() + } + + /// Static version of collect_initial_meta for use without a MetaService instance. + pub fn collect_initial_meta_static() -> HashMap { let mut item_meta: HashMap = crate::modes::common::get_meta_from_env(); if let Ok(hostname) = gethostname::gethostname().into_string() @@ -299,34 +294,3 @@ impl MetaService { item_meta } } - -/// Stores metadata entries from a plugin response into the database. -/// -/// # Arguments -/// -/// * `conn` - Database connection. -/// * `item_id` - Item ID to associate with the metadata. -/// * `response` - The plugin response containing metadata. -fn store_plugin_metadata(conn: &Connection, item_id: i64, response: &MetaPluginResponse) { - for meta_data in &response.metadata { - let db_meta = crate::db::Meta { - id: item_id, - name: meta_data.name.clone(), - value: meta_data.value.clone(), - }; - if let Err(e) = crate::db::store_meta(conn, db_meta) { - log::warn!("META_SERVICE: Failed to store metadata: {e}"); - } - } -} - -impl Default for MetaService { - /// Provides a default `MetaService` instance. - /// - /// # Returns - /// - /// A new `MetaService` via `new()`. - fn default() -> Self { - Self::new() - } -} diff --git a/src/services/sync_data_service.rs b/src/services/sync_data_service.rs index eeae442..019dbf1 100644 --- a/src/services/sync_data_service.rs +++ b/src/services/sync_data_service.rs @@ -105,9 +105,7 @@ impl SyncDataService { let settings = &self.settings; let mut tags = tags; - if tags.is_empty() { - tags.push("none".to_string()); - } + crate::modes::common::ensure_default_tag(&mut tags); let compression_type = if compress { settings_compression_type(&mut cmd, settings) @@ -128,7 +126,18 @@ impl SyncDataService { } // Initialize meta plugins if requested - let meta_service = MetaService::new(); + // Collect metadata in memory, write to DB after plugins finish. + let collected_meta: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let collector = collected_meta.clone(); + let save_meta: crate::meta_plugin::SaveMetaFn = + std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| { + if let Ok(mut v) = collector.lock() { + v.push((name.to_string(), value.to_string())); + } + })); + + let meta_service = MetaService::new(save_meta); let mut plugins = if run_meta { meta_service.get_plugins(&mut cmd, settings) } else { @@ -136,7 +145,7 @@ impl SyncDataService { }; if run_meta { - meta_service.initialize_plugins(&mut plugins, conn, item_id); + meta_service.initialize_plugins(&mut plugins); } // Write content to file via streaming @@ -145,28 +154,30 @@ impl SyncDataService { let mut item_out = compression_engine.create(item_path)?; - let mut buffer = [0u8; crate::common::PIPESIZE]; let mut total_bytes = 0i64; - loop { - let n = reader.read(&mut buffer)?; - if n == 0 { - break; - } - item_out.write_all(&buffer[..n])?; - total_bytes += n as i64; - + crate::common::stream_copy(reader, |chunk| { + item_out.write_all(chunk)?; + total_bytes += chunk.len() as i64; if run_meta { - meta_service.process_chunk(&mut plugins, &buffer[..n], conn, item_id); + meta_service.process_chunk(&mut plugins, chunk); } - } + Ok(()) + })?; item_out.flush()?; drop(item_out); // Finalize meta plugins if run_meta { - meta_service.finalize_plugins(&mut plugins, conn, item_id); + meta_service.finalize_plugins(&mut plugins); + } + + // Write collected plugin metadata to DB + if run_meta && let Ok(entries) = collected_meta.lock() { + for (name, value) in entries.iter() { + crate::db::add_meta(conn, item_id, name, value)?; + } } // Add client-provided metadata @@ -255,9 +266,7 @@ impl DataService for SyncDataService { mut tags: Vec, conn: &mut Connection, ) -> Result { - if tags.is_empty() { - tags.push("none".to_string()); - } + crate::modes::common::ensure_default_tag(&mut tags); self.item_service .save_item(content, cmd, settings, &mut tags, conn) @@ -336,3 +345,95 @@ impl DataService for SyncDataService { )?) } } + +/// Runs specified meta plugins on an existing item's content and stores the results. +impl SyncDataService { + pub fn update_item_plugins( + &self, + conn: &mut Connection, + item_id: i64, + plugin_names: &[String], + metadata: HashMap, + tags: &[String], + ) -> Result { + use crate::services::compression_service::CompressionService; + use std::io::Read; + + let item = + crate::db::get_item(conn, item_id)?.ok_or_else(|| CoreError::ItemNotFound(item_id))?; + + // Collect metadata in memory + let collected_meta: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let collector = collected_meta.clone(); + let save_meta: crate::meta_plugin::SaveMetaFn = + std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| { + if let Ok(mut v) = collector.lock() { + v.push((name.to_string(), value.to_string())); + } + })); + + // Create MetaService and get only the requested plugins + let meta_service = crate::services::meta_service::MetaService::new(save_meta); + let mut cmd = Command::new("keep"); + let settings = &self.settings; + + // Filter to only the requested plugin types + let all_plugins = meta_service.get_plugins(&mut cmd, settings); + let mut plugins: Vec> = all_plugins + .into_iter() + .filter(|p| { + let plugin_name = p.meta_type().to_string(); + plugin_names.iter().any(|n| n == &plugin_name) + }) + .collect(); + + if plugins.is_empty() && metadata.is_empty() { + // Nothing to do, return current item info + return self.get_item(conn, item_id); + } + + // Open and decompress the stored file + let mut item_path = self.item_service.get_data_path().clone(); + item_path.push(item_id.to_string()); + + if !item_path.exists() { + return Err(CoreError::ItemNotFound(item_id)); + } + + if !plugins.is_empty() { + let compression_service = CompressionService::new(); + let mut reader = + compression_service.stream_item_content(item_path, &item.compression)?; + + // Run plugins on the content + meta_service.initialize_plugins(&mut plugins); + + crate::common::stream_copy(&mut reader, |chunk| { + meta_service.process_chunk(&mut plugins, chunk); + Ok(()) + })?; + + meta_service.finalize_plugins(&mut plugins); + + // Write collected plugin metadata to DB + if let Ok(entries) = collected_meta.lock() { + for (name, value) in entries.iter() { + crate::db::add_meta(conn, item_id, name, value)?; + } + } + } + + // Apply direct metadata overrides + for (key, value) in &metadata { + crate::db::add_meta(conn, item_id, key, value)?; + } + + // Apply tags + for tag in tags { + crate::db::upsert_tag(conn, item_id, tag)?; + } + + self.get_item(conn, item_id) + } +}