From 8acbd34150e4cfec68a5bacf6c58469eb02abc9a Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Sat, 14 Mar 2026 18:22:07 -0300 Subject: [PATCH] fix: add --meta filtering support to client/server list mode Plumb metadata filter from client CLI through the HTTP API to the server's data_service.list_items(). The server accepts a JSON-encoded meta query parameter where null values mean 'key exists' and string values mean 'exact match'. Also fix LZ4 compression round-trip for client mode: - Explicit flush FrameEncoder before drop to avoid sending only the frame header when compress=false - Send _client_compression metadata so client knows actual compression on retrieval (server records compression=None when compress=false) - Use FrameDecoder (frame format) instead of decompress_size_prepended (size-prepended format) to match server storage format --- src/client.rs | 7 +++++++ src/modes/client/get.rs | 26 ++++++++++++++++++++------ src/modes/client/info.rs | 2 +- src/modes/client/list.rs | 7 ++++++- src/modes/client/save.rs | 16 +++++++++++++++- src/modes/server/api/item.rs | 12 +++++++++++- src/modes/server/common.rs | 5 +++++ 7 files changed, 65 insertions(+), 10 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4539ab4..0ec813e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -226,6 +226,7 @@ impl KeepClient { order: &str, start: u64, count: u64, + meta: &HashMap>, ) -> Result, CoreError> { #[derive(serde::Deserialize)] struct ApiResponse { @@ -239,6 +240,12 @@ impl KeepClient { if !tags.is_empty() { params.push(("tags".to_string(), tags.join(","))); } + if !meta.is_empty() { + let meta_json = serde_json::to_string(meta).map_err(|e| { + CoreError::Other(anyhow::anyhow!("Failed to serialize meta filter: {}", e)) + })?; + params.push(("meta".to_string(), meta_json)); + } let param_refs: Vec<(&str, &str)> = params .iter() diff --git a/src/modes/client/get.rs b/src/modes/client/get.rs index 6ba0695..e9acb8c 100644 --- a/src/modes/client/get.rs +++ b/src/modes/client/get.rs @@ -23,14 +23,14 @@ pub fn mode( ids[0] } else if !tags.is_empty() { // Find item by tags - let items = client.list_items(tags, "newest", 0, 1)?; + let items = client.list_items(tags, "newest", 0, 1, &std::collections::HashMap::new())?; if items.is_empty() { return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags)); } items[0].id } else { // Get latest item - let items = client.list_items(&[], "newest", 0, 1)?; + let items = client.list_items(&[], "newest", 0, 1, &std::collections::HashMap::new())?; if items.is_empty() { return Err(anyhow::anyhow!("No items found")); } @@ -60,8 +60,17 @@ pub fn mode( } } - // Decompress locally - let compression_type = CompressionType::from_str(&compression).unwrap_or(CompressionType::None); + // Decompress locally. + // Prefer _client_compression metadata (set by client save) over the server-reported + // compression header, because when compress=false the server stores compressed bytes + // but records compression=None. + let effective_compression = item_info + .metadata + .get("_client_compression") + .map(|s| s.as_str()) + .unwrap_or(&compression); + let compression_type = + CompressionType::from_str(effective_compression).unwrap_or(CompressionType::None); let decompressed = match compression_type { CompressionType::GZip => { @@ -71,8 +80,13 @@ pub fn mode( decoder.read_to_end(&mut content)?; content } - CompressionType::LZ4 => lz4_flex::decompress_size_prepended(&raw_bytes) - .map_err(|e| anyhow::anyhow!("LZ4 decompression failed: {}", e))?, + CompressionType::LZ4 => { + use lz4_flex::frame::FrameDecoder; + let mut decoder = FrameDecoder::new(&raw_bytes[..]); + let mut content = Vec::new(); + decoder.read_to_end(&mut content)?; + content + } _ => raw_bytes, }; diff --git a/src/modes/client/info.rs b/src/modes/client/info.rs index e9e7da0..d8b19b2 100644 --- a/src/modes/client/info.rs +++ b/src/modes/client/info.rs @@ -16,7 +16,7 @@ pub fn mode( // If tags provided, find matching item first let item_ids: Vec = if !tags.is_empty() { - let items = client.list_items(tags, "newest", 0, 1)?; + let items = client.list_items(tags, "newest", 0, 1, &std::collections::HashMap::new())?; if items.is_empty() { return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags)); } diff --git a/src/modes/client/list.rs b/src/modes/client/list.rs index fd60e16..44230ae 100644 --- a/src/modes/client/list.rs +++ b/src/modes/client/list.rs @@ -11,7 +11,12 @@ pub fn mode( ) -> Result<(), anyhow::Error> { debug!("CLIENT_LIST: Listing items via remote server"); - let items = client.list_items(tags, "newest", 0, 100)?; + let meta_filter: std::collections::HashMap> = settings + .meta + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let items = client.list_items(tags, "newest", 0, 100, &meta_filter)?; let output_format = settings_output_format(settings); diff --git a/src/modes/client/save.rs b/src/modes/client/save.rs index bea4c19..076b26b 100644 --- a/src/modes/client/save.rs +++ b/src/modes/client/save.rs @@ -84,7 +84,10 @@ pub fn mode( compressor.write_all(&buffer[..n])?; } - // Finalize compression (flushes any buffered compressed data) + // Explicitly flush and finalize compression before dropping. + // LZ4 FrameEncoder buffers data internally; without explicit flush, + // only the frame header (7 bytes) gets written to the pipe. + compressor.flush()?; drop(compressor); // Pipe writer is now dropped (inside compressor), signaling EOF to streamer @@ -147,6 +150,17 @@ pub fn mode( uncompressed_size.to_string(), ); + // Record client compression type so the client can decompress on retrieval. + // When compress=false, the server stores the blob as-is with compression=None. + // Without this metadata, the client would get compressed bytes back but think + // they're uncompressed. + if !matches!(compression_type, CompressionType::None) { + local_metadata.insert( + "_client_compression".to_string(), + compression_type.to_string(), + ); + } + // Add hostname if let Ok(hostname) = gethostname::gethostname().into_string() { local_metadata.insert("hostname".to_string(), hostname.clone()); diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index 8e62397..9dc52f3 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -169,9 +169,19 @@ pub async fn handle_list_items( .map(|s| parse_comma_tags(s)) .unwrap_or_default(); + // Parse metadata filter from query parameter + let meta_filter: HashMap> = if let Some(ref meta_str) = params.meta { + serde_json::from_str(meta_str).map_err(|e| { + warn!("Failed to parse meta filter JSON string: {e}"); + StatusCode::BAD_REQUEST + })? + } else { + HashMap::new() + }; + let data_service = create_data_service(&state); let mut items_with_meta = data_service - .list_items(tags, HashMap::new()) + .list_items(tags, meta_filter) .await .map_err(|e| { warn!("Failed to get items: {e}"); diff --git a/src/modes/server/common.rs b/src/modes/server/common.rs index 1de4bf7..75f4fde 100644 --- a/src/modes/server/common.rs +++ b/src/modes/server/common.rs @@ -474,6 +474,11 @@ pub struct ListItemsQuery { /// /// Unsigned integer limiting the number of items returned. pub count: Option, + /// Optional metadata filter as JSON string. + /// + /// JSON object where keys are metadata keys and values are either + /// `null` (filter by key existence) or a string (filter by exact value match). + pub meta: Option, } /// Query parameters for item retrieval.