From eca17b36eec347b73cd7bd290524c3a60ea67303 Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Sun, 15 Mar 2026 10:14:55 -0300 Subject: [PATCH] fix: client save logs item ID early, stores compression via proper field and size via update endpoint - Client save now logs 'New item: {id}' immediately after server response - Compression type sent as query param, stored in DB compression field (not _client_compression metadata) - Client set_item_size() sends uncompressed size via POST /api/item/{id}/update?size=N - Server raw content GET uses actual file size for Content-Length (not uncompressed item.size) - Removed _client_compression metadata hack from client save and get - Fixed server handle_update_item to support size-only updates - Fixed clippy: collapsible_if, too_many_arguments, unnecessary mut refs - Fixed ListItemsQuery doctest missing meta field --- src/client.rs | 15 +++++ src/modes/client/get.rs | 13 +--- src/modes/client/save.rs | 36 ++++++----- src/modes/server/api/item.rs | 102 ++++++++++++++++++++++++------ src/modes/server/api/mod.rs | 2 +- src/modes/server/common.rs | 7 ++ src/services/sync_data_service.rs | 29 ++++++--- 7 files changed, 145 insertions(+), 59 deletions(-) diff --git a/src/client.rs b/src/client.rs index 1d5fb87..1c04af7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -341,6 +341,21 @@ impl KeepClient { Ok(()) } + /// Set the uncompressed size for an item. + pub fn set_item_size(&self, id: i64, size: u64) -> Result<(), CoreError> { + let url = format!( + "{}?size={}", + self.url(&format!("/api/item/{id}/update")), + size + ); + let mut req = self.agent.post(&url); + if let Some(ref auth) = self.auth_header() { + req = req.header("Authorization", auth); + } + self.handle_error(req.send(ureq::SendBody::from_reader(&mut std::io::empty())))?; + Ok(()) + } + pub fn get_item_content_raw(&self, id: i64) -> Result<(Vec, String), CoreError> { let url = format!( "{}?decompress=false", diff --git a/src/modes/client/get.rs b/src/modes/client/get.rs index e9acb8c..53b63ab 100644 --- a/src/modes/client/get.rs +++ b/src/modes/client/get.rs @@ -60,17 +60,8 @@ pub fn mode( } } - // Decompress locally. - // Prefer _client_compression metadata (set by client save) over the server-reported - // compression header, because when compress=false the server stores compressed bytes - // but records compression=None. - let effective_compression = item_info - .metadata - .get("_client_compression") - .map(|s| s.as_str()) - .unwrap_or(&compression); - let compression_type = - CompressionType::from_str(effective_compression).unwrap_or(CompressionType::None); + // Decompress locally using the server-reported compression type + let compression_type = CompressionType::from_str(&compression).unwrap_or(CompressionType::None); let decompressed = match compression_type { CompressionType::GZip => { diff --git a/src/modes/client/save.rs b/src/modes/client/save.rs index e42d06b..9245028 100644 --- a/src/modes/client/save.rs +++ b/src/modes/client/save.rs @@ -37,6 +37,7 @@ pub fn mode( // Determine compression type from settings let compression_type = settings_compression_type(cmd, settings); + let compression_type_str = compression_type.to_string(); let server_compress = matches!(compression_type, CompressionType::None); // Shared metadata collection: plugins write here via save_meta closure @@ -72,8 +73,8 @@ pub fn mode( // Wrap pipe writer with appropriate compression let mut compressor: Box = match compression_type_clone { CompressionType::GZip => { - use flate2::Compression; use flate2::write::GzEncoder; + use flate2::Compression; Box::new(GzEncoder::new(pipe_writer, Compression::default())) } CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(pipe_writer)), @@ -114,6 +115,7 @@ pub fn mode( let client_password = client.password().cloned(); let client_jwt = client.jwt().cloned(); let tags_clone = tags.clone(); + let compression_type_str_clone = compression_type_str.clone(); let streamer_handle = std::thread::spawn(move || -> Result { let streaming_client = @@ -122,7 +124,18 @@ pub fn mode( ("compress".to_string(), server_compress.to_string()), ("meta".to_string(), "false".to_string()), ("tags".to_string(), tags_clone.join(",")), + ( + "compression_type".to_string(), + if !server_compress { + compression_type_str_clone + } else { + String::new() + }, + ), ]; + // Filter out empty params + let params: Vec<(String, String)> = + params.into_iter().filter(|(_, v)| !v.is_empty()).collect(); let param_refs: Vec<(&str, &str)> = params .iter() .map(|(k, v)| (k.as_str(), v.as_str())) @@ -153,31 +166,20 @@ pub fn mode( } } - // Add uncompressed_size (always tracked by client) - local_metadata.insert( - "uncompressed_size".to_string(), - uncompressed_size.to_string(), - ); - - // Record client compression type so the client can decompress on retrieval. - if !matches!(compression_type, CompressionType::None) { - local_metadata.insert( - "_client_compression".to_string(), - compression_type.to_string(), - ); - } + // Send uncompressed size to server (proper field, not metadata) + client.set_item_size(item_info.id, uncompressed_size)?; // Send metadata to server if !local_metadata.is_empty() { client.post_metadata(item_info.id, &local_metadata)?; } - // Print status to stderr + // Print status to stderr (item ID is known immediately from server response) if !settings.quiet { if std::io::stderr().is_terminal() { - eprintln!("KEEP: New item (streaming) tags: {}", tags.join(" ")); + eprintln!("KEEP: New item: {} tags: {}", item_info.id, tags.join(" ")); } else { - eprintln!("KEEP: New item (streaming) tags: {tags:?}"); + eprintln!("KEEP: New item: {} tags: {tags:?}", item_info.id); } } diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index e493501..be7e142 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -467,6 +467,14 @@ pub async fn handle_post_item( let compress = params.compress; let run_meta = params.meta; + // Parse client-specified compression type (only used when compress=false) + let client_compression_type = params.compression_type.as_deref().map(|ct| { + ct.parse::().unwrap_or_else(|_| { + warn!("Unknown compression type from client: {ct}, defaulting to none"); + crate::compression_engine::CompressionType::None + }) + }); + // Stream body through an mpsc channel with fixed-size frames. // Size tracking ensures we never buffer the whole body in memory. // Treat Some(0) as unlimited (None). @@ -524,6 +532,7 @@ pub async fn handle_post_item( metadata, compress, run_meta, + client_compression_type, ) }) .await @@ -797,7 +806,6 @@ async fn stream_raw_content_response( })?; let compression = item_with_meta.item.compression.clone(); - let content_size = item_with_meta.item.size.unwrap_or(0); // Get streaming reader for raw content let reader = data_service @@ -808,8 +816,17 @@ async fn stream_raw_content_response( StatusCode::INTERNAL_SERVER_ERROR })?; + // Get the actual file size on disk (raw bytes, not uncompressed size) + let file_size = { + let mut data_path = data_service.data_path().clone(); + data_path.push(item_id.to_string()); + std::fs::metadata(&data_path) + .map(|m| m.len()) + .unwrap_or(0) + }; + // Calculate the actual response length - let content_len = content_size as u64; + let content_len = file_size; let start = std::cmp::min(offset, content_len); let end = if length > 0 { std::cmp::min(start + length, content_len) @@ -1445,10 +1462,6 @@ pub async fn handle_update_item( Path(item_id): Path, Query(params): Query, ) -> Result>, StatusCode> { - let db = state.db.clone(); - let data_dir = state.data_dir.clone(); - let settings = state.settings.clone(); - // Parse plugin names let plugin_names: Vec = params .plugins @@ -1478,27 +1491,74 @@ pub async fn handle_update_item( .map(crate::services::utils::parse_comma_tags) .unwrap_or_default(); - // Run in blocking task since we do file I/O and DB access - let result = task::spawn_blocking(move || -> Result { - let mut conn = db - .lock() - .map_err(|e| CoreError::Other(anyhow::anyhow!("Failed to acquire DB lock: {e}")))?; + let mut conn = state.db.lock().await; - let sync_service = - crate::services::sync_data_service::SyncDataService::new(data_dir, (*settings).clone()); + let sync_service = crate::services::sync_data_service::SyncDataService::new( + state.data_dir.clone(), + state.settings.as_ref().clone(), + ); - sync_service.update_item_plugins(&mut conn, item_id, &plugin_names, metadata, &tags) - }) - .await - .map_err(|e| { - warn!("Blocking task failed: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; + // If only size is being set (no plugins/metadata/tags), do a lightweight update + #[allow(clippy::collapsible_if)] + if let Some(size) = params.size { + if plugin_names.is_empty() && metadata.is_empty() && tags.is_empty() { + return match crate::db::get_item(&conn, item_id) { + Ok(Some(mut item)) => { + item.size = Some(size); + if let Err(e) = crate::db::update_item(&conn, item) { + warn!("Failed to update item size: {e}"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + match sync_service.get_item(&mut conn, item_id) { + Ok(iwm) => { + let tags: Vec = + iwm.tags.iter().map(|t| t.name.clone()).collect(); + let metadata = iwm.meta_as_map(); + Ok(Json(ApiResponse { + success: true, + data: Some(ItemInfo { + id: item_id, + ts: iwm.item.ts.to_rfc3339(), + size: iwm.item.size, + compression: iwm.item.compression.clone(), + tags, + metadata, + }), + error: None, + })) + } + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } + } + Ok(None) => Err(StatusCode::NOT_FOUND), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + }; + } + } + + let result = + sync_service.update_item_plugins(&mut conn, item_id, &plugin_names, metadata, &tags); match result { Ok(item_with_meta) => { - let item_info: ItemInfo = item_with_meta.into(); + let tags: Vec = + item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); + let metadata = item_with_meta.meta_as_map(); + + let item_info = ItemInfo { + id: item_with_meta.item.id.ok_or_else(|| { + warn!("Item missing ID"); + StatusCode::INTERNAL_SERVER_ERROR + })?, + ts: item_with_meta.item.ts.to_rfc3339(), + size: item_with_meta.item.size, + compression: item_with_meta.item.compression.clone(), + tags, + metadata, + }; + Ok(Json(ApiResponse { + success: true, data: Some(item_info), error: None, })) diff --git a/src/modes/server/api/mod.rs b/src/modes/server/api/mod.rs index 7a78453..7b28607 100644 --- a/src/modes/server/api/mod.rs +++ b/src/modes/server/api/mod.rs @@ -4,7 +4,7 @@ pub mod status; use axum::{ Router, - routing::{delete, get}, + routing::{delete, get, post}, }; use crate::modes::server::common::AppState; diff --git a/src/modes/server/common.rs b/src/modes/server/common.rs index be62a51..e3951c3 100644 --- a/src/modes/server/common.rs +++ b/src/modes/server/common.rs @@ -454,6 +454,7 @@ pub struct TagsQuery { /// order: Some("newest".to_string()), /// start: Some(0), /// count: Some(10), +/// meta: None, /// }; /// ``` #[derive(Debug, Deserialize)] @@ -639,6 +640,10 @@ pub struct CreateItemQuery { /// Set to false when the client has already collected metadata. #[serde(default = "default_true")] pub meta: bool, + /// Compression type used by the client (e.g. "lz4", "gzip"). + /// Only used when compress=false — tells the server what compression + /// the client applied so the correct type is recorded in the database. + pub compression_type: Option, } /// Query parameters for updating item metadata via POST. @@ -654,6 +659,8 @@ pub struct UpdateItemQuery { pub metadata: Option, /// Optional comma-separated tags to add. pub tags: Option, + /// Optional uncompressed size to set on the item. + pub size: Option, } /// Request body for creating a new item. diff --git a/src/services/sync_data_service.rs b/src/services/sync_data_service.rs index 019dbf1..ec22b57 100644 --- a/src/services/sync_data_service.rs +++ b/src/services/sync_data_service.rs @@ -84,7 +84,7 @@ impl SyncDataService { run_meta: bool, ) -> Result { let mut cursor = Cursor::new(content); - self.save_item_raw_streaming(conn, &mut cursor, tags, metadata, compress, run_meta) + self.save_item_raw_streaming(conn, &mut cursor, tags, metadata, compress, run_meta, None) } /// Save an item from a streaming reader with granular control over compression. @@ -92,6 +92,7 @@ impl SyncDataService { /// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method /// reads from the reader in chunks and writes directly to the compression /// engine, avoiding buffering the entire content in memory. + #[allow(clippy::too_many_arguments)] pub fn save_item_raw_streaming( &self, conn: &mut Connection, @@ -100,6 +101,7 @@ impl SyncDataService { metadata: HashMap, compress: bool, run_meta: bool, + client_compression_type: Option, ) -> Result { let mut cmd = Command::new("keep"); let settings = &self.settings; @@ -107,18 +109,25 @@ impl SyncDataService { crate::modes::common::ensure_default_tag(&mut tags); - let compression_type = if compress { - settings_compression_type(&mut cmd, settings) + // Determine compression type for DB record and for the file writer. + // When compress=true: server compresses using its configured engine. + // When compress=false: client already compressed — write raw bytes to disk + // but record the client's compression type in the DB. + let (compression_type_for_db, compression_engine) = if compress { + let ct = settings_compression_type(&mut cmd, settings); + let engine = get_compression_engine(ct.clone())?; + (ct, engine) } else { - CompressionType::None + // Client already compressed — write raw (no engine), record actual type + let ct = client_compression_type.unwrap_or(CompressionType::None); + let engine = get_compression_engine(CompressionType::None)?; + (ct, engine) }; - let compression_engine = get_compression_engine(compression_type.clone())?; - let item_id; let mut item; { - item = crate::db::create_item(conn, compression_type.clone())?; + item = crate::db::create_item(conn, compression_type_for_db.clone())?; item_id = item .id .ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; @@ -180,9 +189,11 @@ impl SyncDataService { } } - // Add client-provided metadata + // Add client-provided metadata (excluding internal fields) for (key, value) in &metadata { - crate::db::add_meta(conn, item_id, key, value)?; + if key != "uncompressed_size" { + crate::db::add_meta(conn, item_id, key, value)?; + } } item.size = Some(total_bytes);