fix: client save logs item ID early, stores compression via proper field and size via update endpoint

- Client save now logs 'New item: {id}' immediately after server response
- Compression type sent as query param, stored in DB compression field (not _client_compression metadata)
- Client set_item_size() sends uncompressed size via POST /api/item/{id}/update?size=N
- Server raw content GET uses actual file size for Content-Length (not uncompressed item.size)
- Removed _client_compression metadata hack from client save and get
- Fixed server handle_update_item to support size-only updates
- Fixed clippy: collapsible_if, too_many_arguments, unnecessary mut refs
- Fixed ListItemsQuery doctest missing meta field
This commit is contained in:
2026-03-15 10:14:55 -03:00
parent 5bad7ac7a6
commit eca17b36ee
7 changed files with 145 additions and 59 deletions

View File

@@ -341,6 +341,21 @@ impl KeepClient {
Ok(()) Ok(())
} }
/// Set the uncompressed size for an item.
pub fn set_item_size(&self, id: i64, size: u64) -> Result<(), CoreError> {
let url = format!(
"{}?size={}",
self.url(&format!("/api/item/{id}/update")),
size
);
let mut req = self.agent.post(&url);
if let Some(ref auth) = self.auth_header() {
req = req.header("Authorization", auth);
}
self.handle_error(req.send(ureq::SendBody::from_reader(&mut std::io::empty())))?;
Ok(())
}
pub fn get_item_content_raw(&self, id: i64) -> Result<(Vec<u8>, String), CoreError> { pub fn get_item_content_raw(&self, id: i64) -> Result<(Vec<u8>, String), CoreError> {
let url = format!( let url = format!(
"{}?decompress=false", "{}?decompress=false",

View File

@@ -60,17 +60,8 @@ pub fn mode(
} }
} }
// Decompress locally. // Decompress locally using the server-reported compression type
// Prefer _client_compression metadata (set by client save) over the server-reported let compression_type = CompressionType::from_str(&compression).unwrap_or(CompressionType::None);
// compression header, because when compress=false the server stores compressed bytes
// but records compression=None.
let effective_compression = item_info
.metadata
.get("_client_compression")
.map(|s| s.as_str())
.unwrap_or(&compression);
let compression_type =
CompressionType::from_str(effective_compression).unwrap_or(CompressionType::None);
let decompressed = match compression_type { let decompressed = match compression_type {
CompressionType::GZip => { CompressionType::GZip => {

View File

@@ -37,6 +37,7 @@ pub fn mode(
// Determine compression type from settings // Determine compression type from settings
let compression_type = settings_compression_type(cmd, settings); let compression_type = settings_compression_type(cmd, settings);
let compression_type_str = compression_type.to_string();
let server_compress = matches!(compression_type, CompressionType::None); let server_compress = matches!(compression_type, CompressionType::None);
// Shared metadata collection: plugins write here via save_meta closure // Shared metadata collection: plugins write here via save_meta closure
@@ -72,8 +73,8 @@ pub fn mode(
// Wrap pipe writer with appropriate compression // Wrap pipe writer with appropriate compression
let mut compressor: Box<dyn Write> = match compression_type_clone { let mut compressor: Box<dyn Write> = match compression_type_clone {
CompressionType::GZip => { CompressionType::GZip => {
use flate2::Compression;
use flate2::write::GzEncoder; use flate2::write::GzEncoder;
use flate2::Compression;
Box::new(GzEncoder::new(pipe_writer, Compression::default())) Box::new(GzEncoder::new(pipe_writer, Compression::default()))
} }
CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(pipe_writer)), CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(pipe_writer)),
@@ -114,6 +115,7 @@ pub fn mode(
let client_password = client.password().cloned(); let client_password = client.password().cloned();
let client_jwt = client.jwt().cloned(); let client_jwt = client.jwt().cloned();
let tags_clone = tags.clone(); let tags_clone = tags.clone();
let compression_type_str_clone = compression_type_str.clone();
let streamer_handle = std::thread::spawn(move || -> Result<ItemInfo> { let streamer_handle = std::thread::spawn(move || -> Result<ItemInfo> {
let streaming_client = let streaming_client =
@@ -122,7 +124,18 @@ pub fn mode(
("compress".to_string(), server_compress.to_string()), ("compress".to_string(), server_compress.to_string()),
("meta".to_string(), "false".to_string()), ("meta".to_string(), "false".to_string()),
("tags".to_string(), tags_clone.join(",")), ("tags".to_string(), tags_clone.join(",")),
(
"compression_type".to_string(),
if !server_compress {
compression_type_str_clone
} else {
String::new()
},
),
]; ];
// Filter out empty params
let params: Vec<(String, String)> =
params.into_iter().filter(|(_, v)| !v.is_empty()).collect();
let param_refs: Vec<(&str, &str)> = params let param_refs: Vec<(&str, &str)> = params
.iter() .iter()
.map(|(k, v)| (k.as_str(), v.as_str())) .map(|(k, v)| (k.as_str(), v.as_str()))
@@ -153,31 +166,20 @@ pub fn mode(
} }
} }
// Add uncompressed_size (always tracked by client) // Send uncompressed size to server (proper field, not metadata)
local_metadata.insert( client.set_item_size(item_info.id, uncompressed_size)?;
"uncompressed_size".to_string(),
uncompressed_size.to_string(),
);
// Record client compression type so the client can decompress on retrieval.
if !matches!(compression_type, CompressionType::None) {
local_metadata.insert(
"_client_compression".to_string(),
compression_type.to_string(),
);
}
// Send metadata to server // Send metadata to server
if !local_metadata.is_empty() { if !local_metadata.is_empty() {
client.post_metadata(item_info.id, &local_metadata)?; client.post_metadata(item_info.id, &local_metadata)?;
} }
// Print status to stderr // Print status to stderr (item ID is known immediately from server response)
if !settings.quiet { if !settings.quiet {
if std::io::stderr().is_terminal() { if std::io::stderr().is_terminal() {
eprintln!("KEEP: New item (streaming) tags: {}", tags.join(" ")); eprintln!("KEEP: New item: {} tags: {}", item_info.id, tags.join(" "));
} else { } else {
eprintln!("KEEP: New item (streaming) tags: {tags:?}"); eprintln!("KEEP: New item: {} tags: {tags:?}", item_info.id);
} }
} }

View File

@@ -467,6 +467,14 @@ pub async fn handle_post_item(
let compress = params.compress; let compress = params.compress;
let run_meta = params.meta; let run_meta = params.meta;
// Parse client-specified compression type (only used when compress=false)
let client_compression_type = params.compression_type.as_deref().map(|ct| {
ct.parse::<crate::compression_engine::CompressionType>().unwrap_or_else(|_| {
warn!("Unknown compression type from client: {ct}, defaulting to none");
crate::compression_engine::CompressionType::None
})
});
// Stream body through an mpsc channel with fixed-size frames. // Stream body through an mpsc channel with fixed-size frames.
// Size tracking ensures we never buffer the whole body in memory. // Size tracking ensures we never buffer the whole body in memory.
// Treat Some(0) as unlimited (None). // Treat Some(0) as unlimited (None).
@@ -524,6 +532,7 @@ pub async fn handle_post_item(
metadata, metadata,
compress, compress,
run_meta, run_meta,
client_compression_type,
) )
}) })
.await .await
@@ -797,7 +806,6 @@ async fn stream_raw_content_response(
})?; })?;
let compression = item_with_meta.item.compression.clone(); let compression = item_with_meta.item.compression.clone();
let content_size = item_with_meta.item.size.unwrap_or(0);
// Get streaming reader for raw content // Get streaming reader for raw content
let reader = data_service let reader = data_service
@@ -808,8 +816,17 @@ async fn stream_raw_content_response(
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})?; })?;
// Get the actual file size on disk (raw bytes, not uncompressed size)
let file_size = {
let mut data_path = data_service.data_path().clone();
data_path.push(item_id.to_string());
std::fs::metadata(&data_path)
.map(|m| m.len())
.unwrap_or(0)
};
// Calculate the actual response length // Calculate the actual response length
let content_len = content_size as u64; let content_len = file_size;
let start = std::cmp::min(offset, content_len); let start = std::cmp::min(offset, content_len);
let end = if length > 0 { let end = if length > 0 {
std::cmp::min(start + length, content_len) std::cmp::min(start + length, content_len)
@@ -1445,10 +1462,6 @@ pub async fn handle_update_item(
Path(item_id): Path<i64>, Path(item_id): Path<i64>,
Query(params): Query<crate::modes::server::common::UpdateItemQuery>, Query(params): Query<crate::modes::server::common::UpdateItemQuery>,
) -> Result<Json<ApiResponse<ItemInfo>>, StatusCode> { ) -> Result<Json<ApiResponse<ItemInfo>>, StatusCode> {
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let settings = state.settings.clone();
// Parse plugin names // Parse plugin names
let plugin_names: Vec<String> = params let plugin_names: Vec<String> = params
.plugins .plugins
@@ -1478,27 +1491,74 @@ pub async fn handle_update_item(
.map(crate::services::utils::parse_comma_tags) .map(crate::services::utils::parse_comma_tags)
.unwrap_or_default(); .unwrap_or_default();
// Run in blocking task since we do file I/O and DB access let mut conn = state.db.lock().await;
let result = task::spawn_blocking(move || -> Result<ItemInfo, CoreError> {
let mut conn = db
.lock()
.map_err(|e| CoreError::Other(anyhow::anyhow!("Failed to acquire DB lock: {e}")))?;
let sync_service = let sync_service = crate::services::sync_data_service::SyncDataService::new(
crate::services::sync_data_service::SyncDataService::new(data_dir, (*settings).clone()); state.data_dir.clone(),
state.settings.as_ref().clone(),
);
sync_service.update_item_plugins(&mut conn, item_id, &plugin_names, metadata, &tags) // If only size is being set (no plugins/metadata/tags), do a lightweight update
}) #[allow(clippy::collapsible_if)]
.await if let Some(size) = params.size {
.map_err(|e| { if plugin_names.is_empty() && metadata.is_empty() && tags.is_empty() {
warn!("Blocking task failed: {e}"); return match crate::db::get_item(&conn, item_id) {
StatusCode::INTERNAL_SERVER_ERROR Ok(Some(mut item)) => {
})?; item.size = Some(size);
if let Err(e) = crate::db::update_item(&conn, item) {
warn!("Failed to update item size: {e}");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
match sync_service.get_item(&mut conn, item_id) {
Ok(iwm) => {
let tags: Vec<String> =
iwm.tags.iter().map(|t| t.name.clone()).collect();
let metadata = iwm.meta_as_map();
Ok(Json(ApiResponse {
success: true,
data: Some(ItemInfo {
id: item_id,
ts: iwm.item.ts.to_rfc3339(),
size: iwm.item.size,
compression: iwm.item.compression.clone(),
tags,
metadata,
}),
error: None,
}))
}
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
Ok(None) => Err(StatusCode::NOT_FOUND),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
};
}
}
let result =
sync_service.update_item_plugins(&mut conn, item_id, &plugin_names, metadata, &tags);
match result { match result {
Ok(item_with_meta) => { Ok(item_with_meta) => {
let item_info: ItemInfo = item_with_meta.into(); let tags: Vec<String> =
item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let metadata = item_with_meta.meta_as_map();
let item_info = ItemInfo {
id: item_with_meta.item.id.ok_or_else(|| {
warn!("Item missing ID");
StatusCode::INTERNAL_SERVER_ERROR
})?,
ts: item_with_meta.item.ts.to_rfc3339(),
size: item_with_meta.item.size,
compression: item_with_meta.item.compression.clone(),
tags,
metadata,
};
Ok(Json(ApiResponse { Ok(Json(ApiResponse {
success: true,
data: Some(item_info), data: Some(item_info),
error: None, error: None,
})) }))

View File

@@ -4,7 +4,7 @@ pub mod status;
use axum::{ use axum::{
Router, Router,
routing::{delete, get}, routing::{delete, get, post},
}; };
use crate::modes::server::common::AppState; use crate::modes::server::common::AppState;

View File

@@ -454,6 +454,7 @@ pub struct TagsQuery {
/// order: Some("newest".to_string()), /// order: Some("newest".to_string()),
/// start: Some(0), /// start: Some(0),
/// count: Some(10), /// count: Some(10),
/// meta: None,
/// }; /// };
/// ``` /// ```
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@@ -639,6 +640,10 @@ pub struct CreateItemQuery {
/// Set to false when the client has already collected metadata. /// Set to false when the client has already collected metadata.
#[serde(default = "default_true")] #[serde(default = "default_true")]
pub meta: bool, pub meta: bool,
/// Compression type used by the client (e.g. "lz4", "gzip").
/// Only used when compress=false — tells the server what compression
/// the client applied so the correct type is recorded in the database.
pub compression_type: Option<String>,
} }
/// Query parameters for updating item metadata via POST. /// Query parameters for updating item metadata via POST.
@@ -654,6 +659,8 @@ pub struct UpdateItemQuery {
pub metadata: Option<String>, pub metadata: Option<String>,
/// Optional comma-separated tags to add. /// Optional comma-separated tags to add.
pub tags: Option<String>, pub tags: Option<String>,
/// Optional uncompressed size to set on the item.
pub size: Option<i64>,
} }
/// Request body for creating a new item. /// Request body for creating a new item.

View File

@@ -84,7 +84,7 @@ impl SyncDataService {
run_meta: bool, run_meta: bool,
) -> Result<ItemWithMeta, CoreError> { ) -> Result<ItemWithMeta, CoreError> {
let mut cursor = Cursor::new(content); let mut cursor = Cursor::new(content);
self.save_item_raw_streaming(conn, &mut cursor, tags, metadata, compress, run_meta) self.save_item_raw_streaming(conn, &mut cursor, tags, metadata, compress, run_meta, None)
} }
/// Save an item from a streaming reader with granular control over compression. /// Save an item from a streaming reader with granular control over compression.
@@ -92,6 +92,7 @@ impl SyncDataService {
/// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method /// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method
/// reads from the reader in chunks and writes directly to the compression /// reads from the reader in chunks and writes directly to the compression
/// engine, avoiding buffering the entire content in memory. /// engine, avoiding buffering the entire content in memory.
#[allow(clippy::too_many_arguments)]
pub fn save_item_raw_streaming( pub fn save_item_raw_streaming(
&self, &self,
conn: &mut Connection, conn: &mut Connection,
@@ -100,6 +101,7 @@ impl SyncDataService {
metadata: HashMap<String, String>, metadata: HashMap<String, String>,
compress: bool, compress: bool,
run_meta: bool, run_meta: bool,
client_compression_type: Option<CompressionType>,
) -> Result<ItemWithMeta, CoreError> { ) -> Result<ItemWithMeta, CoreError> {
let mut cmd = Command::new("keep"); let mut cmd = Command::new("keep");
let settings = &self.settings; let settings = &self.settings;
@@ -107,18 +109,25 @@ impl SyncDataService {
crate::modes::common::ensure_default_tag(&mut tags); crate::modes::common::ensure_default_tag(&mut tags);
let compression_type = if compress { // Determine compression type for DB record and for the file writer.
settings_compression_type(&mut cmd, settings) // When compress=true: server compresses using its configured engine.
// When compress=false: client already compressed — write raw bytes to disk
// but record the client's compression type in the DB.
let (compression_type_for_db, compression_engine) = if compress {
let ct = settings_compression_type(&mut cmd, settings);
let engine = get_compression_engine(ct.clone())?;
(ct, engine)
} else { } else {
CompressionType::None // Client already compressed — write raw (no engine), record actual type
let ct = client_compression_type.unwrap_or(CompressionType::None);
let engine = get_compression_engine(CompressionType::None)?;
(ct, engine)
}; };
let compression_engine = get_compression_engine(compression_type.clone())?;
let item_id; let item_id;
let mut item; let mut item;
{ {
item = crate::db::create_item(conn, compression_type.clone())?; item = crate::db::create_item(conn, compression_type_for_db.clone())?;
item_id = item item_id = item
.id .id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; .ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
@@ -180,10 +189,12 @@ impl SyncDataService {
} }
} }
// Add client-provided metadata // Add client-provided metadata (excluding internal fields)
for (key, value) in &metadata { for (key, value) in &metadata {
if key != "uncompressed_size" {
crate::db::add_meta(conn, item_id, key, value)?; crate::db::add_meta(conn, item_id, key, value)?;
} }
}
item.size = Some(total_bytes); item.size = Some(total_bytes);
crate::db::update_item(conn, item)?; crate::db::update_item(conn, item)?;