fix: add --meta filtering support to client/server list mode
Plumb metadata filter from client CLI through the HTTP API to the server's data_service.list_items(). The server accepts a JSON-encoded meta query parameter where null values mean 'key exists' and string values mean 'exact match'. Also fix LZ4 compression round-trip for client mode: - Explicit flush FrameEncoder before drop to avoid sending only the frame header when compress=false - Send _client_compression metadata so client knows actual compression on retrieval (server records compression=None when compress=false) - Use FrameDecoder (frame format) instead of decompress_size_prepended (size-prepended format) to match server storage format
This commit is contained in:
@@ -226,6 +226,7 @@ impl KeepClient {
|
|||||||
order: &str,
|
order: &str,
|
||||||
start: u64,
|
start: u64,
|
||||||
count: u64,
|
count: u64,
|
||||||
|
meta: &HashMap<String, Option<String>>,
|
||||||
) -> Result<Vec<ItemInfo>, CoreError> {
|
) -> Result<Vec<ItemInfo>, CoreError> {
|
||||||
#[derive(serde::Deserialize)]
|
#[derive(serde::Deserialize)]
|
||||||
struct ApiResponse {
|
struct ApiResponse {
|
||||||
@@ -239,6 +240,12 @@ impl KeepClient {
|
|||||||
if !tags.is_empty() {
|
if !tags.is_empty() {
|
||||||
params.push(("tags".to_string(), tags.join(",")));
|
params.push(("tags".to_string(), tags.join(",")));
|
||||||
}
|
}
|
||||||
|
if !meta.is_empty() {
|
||||||
|
let meta_json = serde_json::to_string(meta).map_err(|e| {
|
||||||
|
CoreError::Other(anyhow::anyhow!("Failed to serialize meta filter: {}", e))
|
||||||
|
})?;
|
||||||
|
params.push(("meta".to_string(), meta_json));
|
||||||
|
}
|
||||||
|
|
||||||
let param_refs: Vec<(&str, &str)> = params
|
let param_refs: Vec<(&str, &str)> = params
|
||||||
.iter()
|
.iter()
|
||||||
|
|||||||
@@ -23,14 +23,14 @@ pub fn mode(
|
|||||||
ids[0]
|
ids[0]
|
||||||
} else if !tags.is_empty() {
|
} else if !tags.is_empty() {
|
||||||
// Find item by tags
|
// Find item by tags
|
||||||
let items = client.list_items(tags, "newest", 0, 1)?;
|
let items = client.list_items(tags, "newest", 0, 1, &std::collections::HashMap::new())?;
|
||||||
if items.is_empty() {
|
if items.is_empty() {
|
||||||
return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags));
|
return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags));
|
||||||
}
|
}
|
||||||
items[0].id
|
items[0].id
|
||||||
} else {
|
} else {
|
||||||
// Get latest item
|
// Get latest item
|
||||||
let items = client.list_items(&[], "newest", 0, 1)?;
|
let items = client.list_items(&[], "newest", 0, 1, &std::collections::HashMap::new())?;
|
||||||
if items.is_empty() {
|
if items.is_empty() {
|
||||||
return Err(anyhow::anyhow!("No items found"));
|
return Err(anyhow::anyhow!("No items found"));
|
||||||
}
|
}
|
||||||
@@ -60,8 +60,17 @@ pub fn mode(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decompress locally
|
// Decompress locally.
|
||||||
let compression_type = CompressionType::from_str(&compression).unwrap_or(CompressionType::None);
|
// Prefer _client_compression metadata (set by client save) over the server-reported
|
||||||
|
// compression header, because when compress=false the server stores compressed bytes
|
||||||
|
// but records compression=None.
|
||||||
|
let effective_compression = item_info
|
||||||
|
.metadata
|
||||||
|
.get("_client_compression")
|
||||||
|
.map(|s| s.as_str())
|
||||||
|
.unwrap_or(&compression);
|
||||||
|
let compression_type =
|
||||||
|
CompressionType::from_str(effective_compression).unwrap_or(CompressionType::None);
|
||||||
|
|
||||||
let decompressed = match compression_type {
|
let decompressed = match compression_type {
|
||||||
CompressionType::GZip => {
|
CompressionType::GZip => {
|
||||||
@@ -71,8 +80,13 @@ pub fn mode(
|
|||||||
decoder.read_to_end(&mut content)?;
|
decoder.read_to_end(&mut content)?;
|
||||||
content
|
content
|
||||||
}
|
}
|
||||||
CompressionType::LZ4 => lz4_flex::decompress_size_prepended(&raw_bytes)
|
CompressionType::LZ4 => {
|
||||||
.map_err(|e| anyhow::anyhow!("LZ4 decompression failed: {}", e))?,
|
use lz4_flex::frame::FrameDecoder;
|
||||||
|
let mut decoder = FrameDecoder::new(&raw_bytes[..]);
|
||||||
|
let mut content = Vec::new();
|
||||||
|
decoder.read_to_end(&mut content)?;
|
||||||
|
content
|
||||||
|
}
|
||||||
_ => raw_bytes,
|
_ => raw_bytes,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ pub fn mode(
|
|||||||
|
|
||||||
// If tags provided, find matching item first
|
// If tags provided, find matching item first
|
||||||
let item_ids: Vec<i64> = if !tags.is_empty() {
|
let item_ids: Vec<i64> = if !tags.is_empty() {
|
||||||
let items = client.list_items(tags, "newest", 0, 1)?;
|
let items = client.list_items(tags, "newest", 0, 1, &std::collections::HashMap::new())?;
|
||||||
if items.is_empty() {
|
if items.is_empty() {
|
||||||
return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags));
|
return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,7 +11,12 @@ pub fn mode(
|
|||||||
) -> Result<(), anyhow::Error> {
|
) -> Result<(), anyhow::Error> {
|
||||||
debug!("CLIENT_LIST: Listing items via remote server");
|
debug!("CLIENT_LIST: Listing items via remote server");
|
||||||
|
|
||||||
let items = client.list_items(tags, "newest", 0, 100)?;
|
let meta_filter: std::collections::HashMap<String, Option<String>> = settings
|
||||||
|
.meta
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| (k.clone(), v.clone()))
|
||||||
|
.collect();
|
||||||
|
let items = client.list_items(tags, "newest", 0, 100, &meta_filter)?;
|
||||||
|
|
||||||
let output_format = settings_output_format(settings);
|
let output_format = settings_output_format(settings);
|
||||||
|
|
||||||
|
|||||||
@@ -84,7 +84,10 @@ pub fn mode(
|
|||||||
compressor.write_all(&buffer[..n])?;
|
compressor.write_all(&buffer[..n])?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finalize compression (flushes any buffered compressed data)
|
// Explicitly flush and finalize compression before dropping.
|
||||||
|
// LZ4 FrameEncoder buffers data internally; without explicit flush,
|
||||||
|
// only the frame header (7 bytes) gets written to the pipe.
|
||||||
|
compressor.flush()?;
|
||||||
drop(compressor);
|
drop(compressor);
|
||||||
|
|
||||||
// Pipe writer is now dropped (inside compressor), signaling EOF to streamer
|
// Pipe writer is now dropped (inside compressor), signaling EOF to streamer
|
||||||
@@ -147,6 +150,17 @@ pub fn mode(
|
|||||||
uncompressed_size.to_string(),
|
uncompressed_size.to_string(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Record client compression type so the client can decompress on retrieval.
|
||||||
|
// When compress=false, the server stores the blob as-is with compression=None.
|
||||||
|
// Without this metadata, the client would get compressed bytes back but think
|
||||||
|
// they're uncompressed.
|
||||||
|
if !matches!(compression_type, CompressionType::None) {
|
||||||
|
local_metadata.insert(
|
||||||
|
"_client_compression".to_string(),
|
||||||
|
compression_type.to_string(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Add hostname
|
// Add hostname
|
||||||
if let Ok(hostname) = gethostname::gethostname().into_string() {
|
if let Ok(hostname) = gethostname::gethostname().into_string() {
|
||||||
local_metadata.insert("hostname".to_string(), hostname.clone());
|
local_metadata.insert("hostname".to_string(), hostname.clone());
|
||||||
|
|||||||
@@ -169,9 +169,19 @@ pub async fn handle_list_items(
|
|||||||
.map(|s| parse_comma_tags(s))
|
.map(|s| parse_comma_tags(s))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
// Parse metadata filter from query parameter
|
||||||
|
let meta_filter: HashMap<String, Option<String>> = if let Some(ref meta_str) = params.meta {
|
||||||
|
serde_json::from_str(meta_str).map_err(|e| {
|
||||||
|
warn!("Failed to parse meta filter JSON string: {e}");
|
||||||
|
StatusCode::BAD_REQUEST
|
||||||
|
})?
|
||||||
|
} else {
|
||||||
|
HashMap::new()
|
||||||
|
};
|
||||||
|
|
||||||
let data_service = create_data_service(&state);
|
let data_service = create_data_service(&state);
|
||||||
let mut items_with_meta = data_service
|
let mut items_with_meta = data_service
|
||||||
.list_items(tags, HashMap::new())
|
.list_items(tags, meta_filter)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
warn!("Failed to get items: {e}");
|
warn!("Failed to get items: {e}");
|
||||||
|
|||||||
@@ -474,6 +474,11 @@ pub struct ListItemsQuery {
|
|||||||
///
|
///
|
||||||
/// Unsigned integer limiting the number of items returned.
|
/// Unsigned integer limiting the number of items returned.
|
||||||
pub count: Option<u32>,
|
pub count: Option<u32>,
|
||||||
|
/// Optional metadata filter as JSON string.
|
||||||
|
///
|
||||||
|
/// JSON object where keys are metadata keys and values are either
|
||||||
|
/// `null` (filter by key existence) or a string (filter by exact value match).
|
||||||
|
pub meta: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Query parameters for item retrieval.
|
/// Query parameters for item retrieval.
|
||||||
|
|||||||
Reference in New Issue
Block a user