From 237a58142959f6e187c333588391107960ad344f Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Thu, 12 Mar 2026 18:02:56 -0300 Subject: [PATCH] fix: add server streaming support, fix pre-existing compilation errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server changes for client mode streaming: - POST /api/item/ now streams body via async channel → ChannelReader → save_item_raw_streaming when compress=false or meta=false - Add POST /api/item/{id}/meta endpoint for client-side metadata - Add save_item_raw_streaming to SyncDataService - Add add_item_meta to AsyncDataService Fix pre-existing issues that were hidden behind swagger cfg gate: - Remove #[cfg(feature = "swagger")] from item module so it compiles with just the server feature - Fix parse_comma_tags usage (returns Vec, not Result) - Fix TextDiff temporary value lifetime issue - Fix io::Error::new → io::Error::other - Fix ok_or_else → ok_or for Copy types - Inline format args throughout server code - Fix empty line after doc comment in pages.rs - Add cfg_attr for unused_mut where mcp feature gates mutation - Add type_complexity allow on create_auth_middleware - Distinguish task error vs save error in spawn_blocking handlers Co-Authored-By: andrew/openrouter/hunter-alpha --- src/modes/server/api/common.rs | 6 +- src/modes/server/api/item.rs | 315 ++++++++++++++++++++++------- src/modes/server/api/mod.rs | 8 +- src/modes/server/common.rs | 30 ++- src/modes/server/mod.rs | 5 +- src/modes/server/pages.rs | 32 +-- src/services/async_data_service.rs | 37 ++++ src/services/sync_data_service.rs | 203 ++++++++++++++++++- src/tests/common/test_helpers.rs | 7 +- 9 files changed, 531 insertions(+), 112 deletions(-) diff --git a/src/modes/server/api/common.rs b/src/modes/server/api/common.rs index cef034f..d9a5dbe 100644 --- a/src/modes/server/api/common.rs +++ b/src/modes/server/api/common.rs @@ -10,7 +10,7 @@ pub struct ResponseBuilder; impl ResponseBuilder { pub fn json(data: T) -> Result { let json = serde_json::to_vec(&data).map_err(|e| { - log::warn!("Failed to serialize response: {}", e); + log::warn!("Failed to serialize response: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; @@ -19,7 +19,7 @@ impl ResponseBuilder { .header(header::CONTENT_LENGTH, json.len().to_string()) .body(axum::body::Body::from(json)) .map_err(|e| { - log::warn!("Failed to build response: {}", e); + log::warn!("Failed to build response: {e}"); StatusCode::INTERNAL_SERVER_ERROR }) } @@ -30,7 +30,7 @@ impl ResponseBuilder { .header(header::CONTENT_LENGTH, content.len().to_string()) .body(axum::body::Body::from(content.to_vec())) .map_err(|e| { - log::warn!("Failed to build response: {}", e); + log::warn!("Failed to build response: {e}"); StatusCode::INTERNAL_SERVER_ERROR }) } diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index 4b800c1..5169158 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -17,8 +17,58 @@ use http_body_util::BodyExt; use log::{debug, warn}; use std::collections::HashMap; use std::io::{Cursor, Read}; +use tokio::sync::mpsc; use tokio::task; +/// Bridges an async mpsc receiver to a synchronous `Read` trait. +/// +/// Used in `spawn_blocking` contexts to consume data from an async body +/// stream as a regular reader. Blocks on each `read()` call until data +/// is available or the channel is closed. +struct ChannelReader { + rx: mpsc::Receiver, std::io::Error>>, + current: Vec, + pos: usize, +} + +impl ChannelReader { + fn new(rx: mpsc::Receiver, std::io::Error>>) -> Self { + Self { + rx, + current: Vec::new(), + pos: 0, + } + } +} + +impl Read for ChannelReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // If we have buffered data, return it first + if self.pos < self.current.len() { + let remaining = &self.current[self.pos..]; + let n = std::cmp::min(buf.len(), remaining.len()); + buf[..n].copy_from_slice(&remaining[..n]); + self.pos += n; + return Ok(n); + } + + // Need more data from the channel - block until available + match self.rx.blocking_recv() { + Some(Ok(data)) => { + let n = std::cmp::min(buf.len(), data.len()); + buf[..n].copy_from_slice(&data[..n]); + if n < data.len() { + self.current = data; + self.pos = n; + } + Ok(n) + } + Some(Err(e)) => Err(e), + None => Ok(0), // Channel closed, EOF + } + } +} + // Helper functions to replace the missing binary_detection module async fn check_binary_content_allowed( data_service: &AsyncDataService, @@ -51,11 +101,7 @@ async fn is_content_binary( { Ok((_, _, is_binary)) => Ok(is_binary), Err(e) => { - log::warn!( - "Failed to get content info for binary check for item {}: {}", - item_id, - e - ); + log::warn!("Failed to get content info for binary check for item {item_id}: {e}"); Err(StatusCode::INTERNAL_SERVER_ERROR) } } @@ -92,7 +138,7 @@ fn handle_item_error(error: CoreError) -> StatusCode { match error { CoreError::ItemNotFound(_) | CoreError::ItemNotFoundGeneric => StatusCode::NOT_FOUND, _ => { - warn!("Failed to get item: {}", error); + warn!("Failed to get item: {error}"); StatusCode::INTERNAL_SERVER_ERROR } } @@ -136,13 +182,7 @@ pub async fn handle_list_items( let tags: Vec = params .tags .as_ref() - .map(|s| { - parse_comma_tags(s).map_err(|e| { - warn!("Failed to parse tags: {}", e); - StatusCode::BAD_REQUEST - }) - }) - .transpose()? + .map(|s| parse_comma_tags(s)) .unwrap_or_default(); let data_service = create_data_service(&state); @@ -150,7 +190,7 @@ pub async fn handle_list_items( .list_items(tags, HashMap::new()) .await .map_err(|e| { - warn!("Failed to get items: {}", e); + warn!("Failed to get items: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; @@ -205,7 +245,7 @@ async fn handle_as_meta_response( ) -> Result { // Get the item with metadata let item_with_meta = data_service.get_item(item_id).await.map_err(|e| { - warn!("Failed to get item {} for as_meta content: {}", item_id, e); + warn!("Failed to get item {item_id} for as_meta content: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; @@ -290,7 +330,7 @@ async fn handle_as_meta_response_with_metadata( .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) } Err(e) => { - warn!("Failed to get content for item {}: {}", item_id, e); + warn!("Failed to get content for item {item_id}: {e}"); Err(StatusCode::INTERNAL_SERVER_ERROR) } } @@ -337,51 +377,104 @@ pub async fn handle_post_item( let tags: Vec = params .tags .as_deref() - .map(|s| { - parse_comma_tags(s).map_err(|e| { - warn!("Failed to parse tags query parameter: {}", e); - StatusCode::BAD_REQUEST - }) - }) - .transpose()? + .map(parse_comma_tags) .unwrap_or_default(); // Parse metadata from query parameter let metadata: HashMap = if let Some(ref meta_str) = params.metadata { serde_json::from_str(meta_str).map_err(|e| { - warn!("Failed to parse metadata JSON string: {}", e); + warn!("Failed to parse metadata JSON string: {e}"); StatusCode::BAD_REQUEST })? } else { HashMap::new() }; - // Convert body to bytes first (simpler than streaming for this use case) - let body_bytes = body - .collect() + let compress = params.compress; + let run_meta = params.meta; + + // When server handles both compression and meta, save_item_with_reader + // buffers internally anyway, so collect body in memory. + // When client handles compression/meta, stream the body to avoid buffering. + let item_with_meta = if compress && run_meta { + let body_bytes = body + .collect() + .await + .map_err(|e| { + warn!("Failed to read request body: {e}"); + StatusCode::BAD_REQUEST + })? + .to_bytes(); + + task::spawn_blocking(move || { + let mut conn = db.blocking_lock(); + let sync_service = + crate::services::SyncDataService::new(data_dir, settings.as_ref().clone()); + let mut cursor = Cursor::new(body_bytes.to_vec()); + sync_service.save_item_with_reader(&mut conn, &mut cursor, tags, metadata) + }) .await .map_err(|e| { - warn!("Failed to read request body: {}", e); - StatusCode::BAD_REQUEST + warn!("Failed to save item (task error): {e}"); + StatusCode::INTERNAL_SERVER_ERROR })? - .to_bytes(); + .map_err(|e| { + warn!("Failed to save item: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + } else { + // Stream body through a channel to avoid buffering in memory + let (tx, rx) = tokio::sync::mpsc::channel::, std::io::Error>>(16); - let item_with_meta = task::spawn_blocking(move || { - let mut conn = db.blocking_lock(); - let mut cursor = Cursor::new(body_bytes.to_vec()); - let sync_service = - crate::services::SyncDataService::new(data_dir, settings.as_ref().clone()); - sync_service.save_item_with_reader(&mut conn, &mut cursor, tags, metadata) - }) - .await - .map_err(|e| { - warn!("Failed to save item: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })? - .map_err(|e| { - warn!("Failed to save item: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; + // Task to read body frames and send through channel + tokio::spawn(async move { + let mut body = body; + loop { + match body.frame().await { + None => break, // Body complete + Some(Err(e)) => { + let _ = tx + .send(Err(std::io::Error::other(format!("Body error: {e}")))) + .await; + break; + } + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + if tx.send(Ok(data.to_vec())).await.is_err() { + break; // Receiver dropped + } + } + } + } + } + }); + + task::spawn_blocking(move || { + let mut conn = db.blocking_lock(); + let sync_service = + crate::services::SyncDataService::new(data_dir, settings.as_ref().clone()); + + // Convert async mpsc receiver into a sync Read + let mut stream_reader = ChannelReader::new(rx); + sync_service.save_item_raw_streaming( + &mut conn, + &mut stream_reader, + tags, + metadata, + compress, + run_meta, + ) + }) + .await + .map_err(|e| { + warn!("Failed to save item (task error): {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + warn!("Failed to save item: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + }; let compression = item_with_meta.item.compression.clone(); let tags = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); @@ -439,13 +532,7 @@ pub async fn handle_get_item_latest_content( let tags: Vec = params .tags .as_ref() - .map(|s| { - parse_comma_tags(s).map_err(|e| { - warn!("Failed to parse tags: {}", e); - StatusCode::BAD_REQUEST - }) - }) - .transpose()? + .map(|s| parse_comma_tags(s)) .unwrap_or_default(); let data_service = create_data_service(&state); @@ -478,13 +565,14 @@ pub async fn handle_get_item_latest_content( params.length, params.stream, None, + params.decompress, ) .await } } Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND), Err(e) => { - warn!("Failed to find latest item for content: {}", e); + warn!("Failed to find latest item for content: {e}"); Err(StatusCode::INTERNAL_SERVER_ERROR) } } @@ -528,8 +616,8 @@ pub async fn handle_get_item_content( } debug!( - "ITEM_API: Getting content for item {} with stream={}, allow_binary={}, offset={}, length={}", - item_id, params.stream, params.allow_binary, params.offset, params.length + "ITEM_API: Getting content for item {item_id} with stream={}, allow_binary={}, offset={}, length={}", + params.stream, params.allow_binary, params.offset, params.length ); let data_service = create_data_service(&state); @@ -554,6 +642,7 @@ pub async fn handle_get_item_content( params.length, params.stream, None, + params.decompress, ) .await; if let Ok(response) = &result { @@ -566,6 +655,7 @@ pub async fn handle_get_item_content( } } +#[allow(clippy::too_many_arguments)] async fn stream_item_content_response( data_service: &AsyncDataService, item_id: i64, @@ -574,11 +664,12 @@ async fn stream_item_content_response( length: u64, stream: bool, _filter: Option, + decompress: bool, ) -> Result { - debug!("STREAM_ITEM_CONTENT_RESPONSE: stream={}", stream); + debug!("STREAM_ITEM_CONTENT_RESPONSE: stream={stream}, decompress={decompress}"); // Get the item with metadata once let item_with_meta = data_service.get_item(item_id).await.map_err(|e| { - warn!("Failed to get item {} for content: {}", item_id, e); + warn!("Failed to get item {item_id} for content: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; @@ -592,10 +683,50 @@ async fn stream_item_content_response( length, stream, None, + decompress, ) .await } +/// Stream raw (unprocessed) content directly from the item file. +/// +/// Returns the stored file bytes without decompression or filtering. +async fn stream_raw_content_response( + data_service: &AsyncDataService, + item_id: i64, + offset: u64, + length: u64, +) -> Result { + // Get item info to find the file path and compression type + let item_with_meta = data_service.get_item(item_id).await.map_err(|e| { + warn!("Failed to get item {item_id} for raw content: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let compression = item_with_meta.item.compression.clone(); + + // Read raw file bytes + let content = data_service + .get_raw_item_content(item_id) + .await + .map_err(|e| { + warn!("Failed to get raw content for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let response_content = apply_offset_length(&content, offset, length); + + let response = Response::builder() + .header(header::CONTENT_TYPE, "application/octet-stream") + .header("X-Keep-Compression", &compression) + .header(header::CONTENT_LENGTH, response_content.len()) + .body(axum::body::Body::from(response_content.to_vec())) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + Ok(response) +} + +#[allow(clippy::too_many_arguments)] async fn stream_item_content_response_with_metadata( data_service: &AsyncDataService, item_id: i64, @@ -605,11 +736,15 @@ async fn stream_item_content_response_with_metadata( length: u64, stream: bool, _filter: Option, + decompress: bool, ) -> Result { - debug!( - "STREAM_ITEM_CONTENT_RESPONSE_WITH_METADATA: stream={}", - stream - ); + debug!("STREAM_ITEM_CONTENT_RESPONSE_WITH_METADATA: stream={stream}, decompress={decompress}"); + + // When decompress=false, return raw stored bytes + if !decompress { + return stream_raw_content_response(data_service, item_id, offset, length).await; + } + let mime_type = get_mime_type(metadata); // Check if content is binary when allow_binary is false @@ -630,7 +765,7 @@ async fn stream_item_content_response_with_metadata( Ok(response) } Err(e) => { - warn!("Failed to stream content for item {}: {}", item_id, e); + warn!("Failed to stream content for item {item_id}: {e}"); Err(StatusCode::INTERNAL_SERVER_ERROR) } } @@ -649,7 +784,7 @@ async fn stream_item_content_response_with_metadata( ResponseBuilder::binary(response_content, &mime_type) } Err(e) => { - warn!("Failed to get content for item {}: {}", item_id, e); + warn!("Failed to get content for item {item_id}: {e}"); Err(StatusCode::INTERNAL_SERVER_ERROR) } } @@ -683,13 +818,7 @@ pub async fn handle_get_item_latest_meta( let tags: Vec = params .tags .as_ref() - .map(|s| { - parse_comma_tags(s).map_err(|e| { - warn!("Failed to parse tags: {}", e); - StatusCode::BAD_REQUEST - }) - }) - .transpose()? + .map(|s| parse_comma_tags(s)) .unwrap_or_default(); let data_service = create_data_service(&state); @@ -753,6 +882,39 @@ pub async fn handle_get_item_meta( } } +pub async fn handle_post_item_meta( + State(state): State, + Path(item_id): Path, + Json(metadata): Json>, +) -> Result>, StatusCode> { + let data_service = create_data_service(&state); + + // Verify item exists + data_service + .get_item(item_id) + .await + .map_err(handle_item_error)?; + + // Add each metadata entry + for (key, value) in &metadata { + data_service + .add_item_meta(item_id, key, value) + .await + .map_err(|e| { + warn!("Failed to add metadata {key} for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + } + + let response = ApiResponse { + success: true, + data: Some(()), + error: None, + }; + + Ok(Json(response)) +} + #[utoipa::path( delete, path = "/api/item/{item_id}", @@ -920,8 +1082,8 @@ pub async fn handle_diff_items( return Err(StatusCode::BAD_REQUEST); }; - let id_a = item_a.item.id.ok_or_else(|| StatusCode::BAD_REQUEST)?; - let id_b = item_b.item.id.ok_or_else(|| StatusCode::BAD_REQUEST)?; + let id_a = item_a.item.id.ok_or(StatusCode::BAD_REQUEST)?; + let id_b = item_b.item.id.ok_or(StatusCode::BAD_REQUEST)?; let (mut reader_a, _) = sync_service .get_content(&mut conn, id_a) @@ -932,13 +1094,13 @@ pub async fn handle_diff_items( let mut content_a = Vec::new(); reader_a.read_to_end(&mut content_a).map_err(|e| { - log::error!("Failed to read content A: {}", e); + log::error!("Failed to read content A: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; let mut content_b = Vec::new(); reader_b.read_to_end(&mut content_b).map_err(|e| { - log::error!("Failed to read content B: {}", e); + log::error!("Failed to read content B: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; @@ -960,7 +1122,8 @@ fn compute_diff(a: &[u8], b: &[u8]) -> Vec { let old_lines: Vec<&str> = text_a.lines().collect(); let new_lines: Vec<&str> = text_b.lines().collect(); - let ops = similar::TextDiff::from_lines(text_a.as_ref(), text_b.as_ref()).ops(); + let text_diff = similar::TextDiff::from_lines(text_a.as_ref(), text_b.as_ref()); + let ops = text_diff.ops(); let mut diff_lines = Vec::new(); diff --git a/src/modes/server/api/mod.rs b/src/modes/server/api/mod.rs index d88ce19..cbd78a9 100644 --- a/src/modes/server/api/mod.rs +++ b/src/modes/server/api/mod.rs @@ -1,5 +1,4 @@ pub mod common; -#[cfg(feature = "swagger")] pub mod item; #[cfg(feature = "mcp")] pub mod mcp; @@ -57,9 +56,11 @@ use utoipa_swagger_ui::SwaggerUi; (url = "/", description = "Local server") ) )] +#[allow(dead_code)] struct ApiDoc; pub fn add_routes(router: Router) -> Router { + #[cfg_attr(not(feature = "mcp"), allow(unused_mut))] let mut router = router // Status endpoints .route("/api/status", get(status::handle_status)) @@ -77,7 +78,10 @@ pub fn add_routes(router: Router) -> Router { "/api/item/latest/content", get(item::handle_get_item_latest_content), ) - .route("/api/item/{item_id}/meta", get(item::handle_get_item_meta)) + .route( + "/api/item/{item_id}/meta", + get(item::handle_get_item_meta).post(item::handle_post_item_meta), + ) .route( "/api/item/{item_id}/content", get(item::handle_get_item_content), diff --git a/src/modes/server/common.rs b/src/modes/server/common.rs index 0aab1ef..1551f50 100644 --- a/src/modes/server/common.rs +++ b/src/modes/server/common.rs @@ -488,6 +488,10 @@ pub struct ItemQuery { /// Boolean flag to return content and metadata in a structured JSON format. #[serde(default = "default_as_meta")] pub as_meta: bool, + /// Whether the server should decompress the content (default: true). + /// Set to false when the client wants raw stored bytes for local decompression. + #[serde(default = "default_true")] + pub decompress: bool, } /// Query parameters for item content retrieval. @@ -538,6 +542,10 @@ pub struct ItemContentQuery { /// Boolean flag to return content and metadata in a structured JSON format. #[serde(default = "default_as_meta")] pub as_meta: bool, + /// Whether the server should decompress the content (default: true). + /// Set to false when the client wants raw stored bytes for local decompression. + #[serde(default = "default_true")] + pub decompress: bool, } /// Default function for allow_binary parameter. @@ -567,6 +575,15 @@ fn default_as_meta() -> bool { false } +/// Default function for true boolean parameters. +/// +/// # Returns +/// +/// `true` as the default value. +fn default_true() -> bool { + true +} + /// Query parameters for creating an item via POST. /// /// Query parameters for POST /api/item/ with streaming binary body. @@ -576,6 +593,14 @@ pub struct CreateItemQuery { pub tags: Option, /// Optional metadata as JSON string. pub metadata: Option, + /// Whether the server should compress the content (default: true). + /// Set to false when the client has already compressed the content. + #[serde(default = "default_true")] + pub compress: bool, + /// Whether the server should run meta plugins (default: true). + /// Set to false when the client has already collected metadata. + #[serde(default = "default_true")] + pub meta: bool, } /// Request body for creating a new item. @@ -672,7 +697,7 @@ fn check_basic_auth( } // Otherwise, do direct comparison - let expected_credentials = format!("keep:{}", expected_password); + let expected_credentials = format!("keep:{expected_password}"); return decoded_str == expected_credentials; } } @@ -803,6 +828,7 @@ pub async fn logging_middleware( /// let auth_middleware = create_auth_middleware(Some("pass".to_string()), None); /// router.layer(auth_middleware); /// ``` +#[allow(clippy::type_complexity)] pub fn create_auth_middleware( password: Option, password_hash: Option, @@ -822,7 +848,7 @@ pub fn create_auth_middleware( let uri = request.uri().clone(); if !check_auth(&headers, &password, &password_hash) { - warn!("Unauthorized request to {} from {}", uri, addr); + warn!("Unauthorized request to {uri} from {addr}"); // Add WWW-Authenticate header to trigger basic auth in browsers let mut response = Response::new(axum::body::Body::from("Unauthorized")); *response.status_mut() = StatusCode::UNAUTHORIZED; diff --git a/src/modes/server/mod.rs b/src/modes/server/mod.rs index 829a2b0..a693165 100644 --- a/src/modes/server/mod.rs +++ b/src/modes/server/mod.rs @@ -88,7 +88,7 @@ async fn run_server( format!("{}:21080", config.address) }; - debug!("SERVER: Starting REST HTTP server on {}", bind_address); + debug!("SERVER: Starting REST HTTP server on {bind_address}"); // Use the existing database connection let db_conn = Arc::new(Mutex::new(conn)); @@ -106,6 +106,7 @@ async fn run_server( .route("/mcp", post(mcp::handle_mcp_request)) .with_state(state.clone()); + #[cfg_attr(not(feature = "mcp"), allow(unused_mut))] let mut protected_router = Router::new() .merge(api::add_routes(Router::new())) .merge(pages::add_routes(Router::new())); @@ -137,7 +138,7 @@ async fn run_server( let addr: SocketAddr = bind_address.parse()?; - info!("SERVER: HTTP server listening on {}", addr); + info!("SERVER: HTTP server listening on {addr}"); let listener = tokio::net::TcpListener::bind(addr).await?; axum::serve( diff --git a/src/modes/server/pages.rs b/src/modes/server/pages.rs index f227844..ab392cd 100644 --- a/src/modes/server/pages.rs +++ b/src/modes/server/pages.rs @@ -47,12 +47,6 @@ fn default_count() -> usize { 1000 } -/// Provides the default number of items to display per page. -/// -/// # Returns -/// -/// The default count: 1000. - /// Adds the web page routes to the Axum router. /// /// This function configures the routes for the web interface, including the @@ -96,7 +90,7 @@ async fn list_items( .map_err(|_| Html("Internal Server Error".to_string()))?; Ok(response) } - Err(e) => Err(Html(format!("Error: {}", e))), + Err(e) => Err(Html(format!("Error: {e}"))), } } @@ -190,8 +184,7 @@ fn build_item_list( html.push_str("

"); for tag in recent_tags { html.push_str(&format!( - "{}", - tag, tag + "{tag}" )); } html.push_str("

"); @@ -228,7 +221,7 @@ fn build_item_list( "id" => { let id_value = item.id.map(|id| id.to_string()).unwrap_or_default(); // Make the ID a link to the item details page - format!("{}", item_id, id_value) + format!("{id_value}") } "time" => item.ts.format("%Y-%m-%d %H:%M:%S").to_string(), "size" => item.size.map(|s| s.to_string()).unwrap_or_default(), @@ -257,7 +250,7 @@ fn build_item_list( if let Ok(max_len) = max_len_str.parse::() { if value.chars().count() > max_len { let truncated: String = value.chars().take(max_len).collect(); - format!("{}...", truncated) + format!("{truncated}...") } else { value } @@ -275,16 +268,12 @@ fn build_item_list( crate::config::ColumnAlignment::Center => "text-align: center;", }; - html.push_str(&format!( - "{}", - align_style, display_value - )); + html.push_str(&format!("{display_value}")); } // Actions column html.push_str(&format!( - "View | Download", - item_id, item_id + "View | Download" )); html.push_str(""); @@ -372,7 +361,7 @@ async fn show_item( .map_err(|_| Html("Internal Server Error".to_string()))?; Ok(response) } - Err(e) => Err(Html(format!("Error: {}", e))), + Err(e) => Err(Html(format!("Error: {e}"))), } } @@ -386,10 +375,10 @@ fn build_item_details(conn: &Connection, id: i64) -> Result { let metas = db::get_item_meta(conn, &item)?; let mut html = String::new(); - html.push_str(&format!("Keep - Item #{}", id)); + html.push_str(&format!("Keep - Item #{id}")); html.push_str(""); html.push_str(""); - html.push_str(&format!("

Item #{}

", id)); + html.push_str(&format!("

Item #{id}

")); // Single table for all details html.push_str(""); @@ -439,8 +428,7 @@ fn build_item_details(conn: &Connection, id: i64) -> Result { // Links html.push_str("

Actions

"); html.push_str(&format!( - "

Download Content

", - id + "

Download Content

" )); html.push_str("

Back to list

"); diff --git a/src/services/async_data_service.rs b/src/services/async_data_service.rs index 063edee..15bce7f 100644 --- a/src/services/async_data_service.rs +++ b/src/services/async_data_service.rs @@ -51,6 +51,17 @@ impl AsyncDataService { self.get(&mut conn, id) } + pub async fn add_item_meta( + &self, + item_id: i64, + name: &str, + value: &str, + ) -> Result<(), CoreError> { + let conn = self.db.lock().await; + crate::db::add_meta(&conn, item_id, name, value)?; + Ok(()) + } + pub async fn list_items( &self, tags: Vec, @@ -184,6 +195,32 @@ impl AsyncDataService { Ok((Box::pin(stream), content_length)) } + + /// Get raw item content without decompression. + /// + /// Reads the stored file bytes directly from disk, bypassing decompression. + /// Used when the client requests raw bytes with `decompress=false`. + pub async fn get_raw_item_content(&self, id: i64) -> Result, CoreError> { + let data_path = self.data_path.clone(); + + tokio::task::spawn_blocking(move || { + let mut item_path = data_path; + item_path.push(id.to_string()); + + let mut file = std::fs::File::open(&item_path).map_err(|e| { + CoreError::Io(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Item file not found: {item_path:?}: {e}"), + )) + })?; + + let mut content = Vec::new(); + file.read_to_end(&mut content)?; + Ok(content) + }) + .await + .map_err(|e| CoreError::Other(anyhow::anyhow!("Task join error: {}", e)))? + } } impl DataService for AsyncDataService { diff --git a/src/services/sync_data_service.rs b/src/services/sync_data_service.rs index 02d1155..c52c2b8 100644 --- a/src/services/sync_data_service.rs +++ b/src/services/sync_data_service.rs @@ -1,16 +1,19 @@ use crate::common::status::StatusInfo; +use crate::compression_engine::{CompressionType, get_compression_engine}; use crate::config::Settings; use crate::db::Item; use crate::db::Meta; +use crate::modes::common::settings_compression_type; use crate::services::data_service::DataService; use crate::services::error::CoreError; use crate::services::item_service::ItemService; +use crate::services::meta_service::MetaService; use crate::services::status_service::StatusService; use crate::services::types::{ItemWithContent, ItemWithMeta}; use clap::Command; use rusqlite::Connection; use std::collections::HashMap; -use std::io::{Cursor, Read}; +use std::io::{Cursor, Read, Write}; use std::path::{Path, PathBuf}; pub struct SyncDataService { @@ -80,6 +83,204 @@ impl SyncDataService { self.get_item(conn, item_id) } + /// Save an item with granular control over compression and meta plugins. + /// + /// This method allows clients to control whether compression and meta plugins + /// run server-side or were already handled by the client. + /// + /// # Arguments + /// + /// * `conn` - Database connection. + /// * `content` - Raw content bytes. + /// * `tags` - Tags to associate with the item. + /// * `metadata` - Client-provided metadata. + /// * `compress` - Whether the server should compress the content. + /// * `run_meta` - Whether the server should run meta plugins. + /// + /// # Returns + /// + /// * `Result` - The saved item with full details. + pub fn save_item_raw( + &self, + conn: &mut Connection, + content: &[u8], + tags: Vec, + metadata: HashMap, + compress: bool, + run_meta: bool, + ) -> Result { + let mut cmd = Command::new("keep"); + let settings = &self.settings; + let mut tags = tags; + + if tags.is_empty() { + tags.push("none".to_string()); + } + + let compression_type = if compress { + settings_compression_type(&mut cmd, settings) + } else { + CompressionType::None + }; + + let compression_engine = get_compression_engine(compression_type.clone())?; + + let item_id; + let mut item; + { + item = crate::db::create_item(conn, compression_type.clone())?; + item_id = item.id.unwrap(); + crate::db::set_item_tags(conn, item.clone(), &tags)?; + } + + // Initialize meta plugins if requested + let meta_service = MetaService::new(); + let mut plugins = if run_meta { + meta_service.get_plugins(&mut cmd, settings) + } else { + Vec::new() + }; + + if run_meta { + meta_service.initialize_plugins(&mut plugins, conn, item_id); + } + + // Write content to file + let mut item_path = self.item_service.get_data_path().clone(); + item_path.push(item_id.to_string()); + + let mut item_out = compression_engine.create(item_path)?; + + let mut total_bytes = 0i64; + const PIPESIZE: usize = 65536; + + if run_meta && !plugins.is_empty() { + // Process in chunks for meta plugins + let mut offset = 0; + while offset < content.len() { + let end = std::cmp::min(offset + PIPESIZE, content.len()); + let chunk = &content[offset..end]; + item_out.write_all(chunk)?; + total_bytes += chunk.len() as i64; + meta_service.process_chunk(&mut plugins, chunk, conn, item_id); + offset = end; + } + } else { + // Write all at once, no meta processing + item_out.write_all(content)?; + total_bytes = content.len() as i64; + } + + item_out.flush()?; + drop(item_out); + + // Finalize meta plugins + if run_meta { + meta_service.finalize_plugins(&mut plugins, conn, item_id); + } + + // Add client-provided metadata + for (key, value) in &metadata { + crate::db::add_meta(conn, item_id, key, value)?; + } + + item.size = Some(total_bytes); + crate::db::update_item(conn, item)?; + + self.get_item(conn, item_id) + } + + /// Save an item from a streaming reader with granular control over compression. + /// + /// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method + /// reads from the reader in chunks and writes directly to the compression + /// engine, avoiding buffering the entire content in memory. + pub fn save_item_raw_streaming( + &self, + conn: &mut Connection, + reader: &mut dyn Read, + tags: Vec, + metadata: HashMap, + compress: bool, + run_meta: bool, + ) -> Result { + let mut cmd = Command::new("keep"); + let settings = &self.settings; + let mut tags = tags; + + if tags.is_empty() { + tags.push("none".to_string()); + } + + let compression_type = if compress { + settings_compression_type(&mut cmd, settings) + } else { + CompressionType::None + }; + + let compression_engine = get_compression_engine(compression_type.clone())?; + + let item_id; + let mut item; + { + item = crate::db::create_item(conn, compression_type.clone())?; + item_id = item.id.unwrap(); + crate::db::set_item_tags(conn, item.clone(), &tags)?; + } + + // Initialize meta plugins if requested + let meta_service = MetaService::new(); + let mut plugins = if run_meta { + meta_service.get_plugins(&mut cmd, settings) + } else { + Vec::new() + }; + + if run_meta { + meta_service.initialize_plugins(&mut plugins, conn, item_id); + } + + // Write content to file via streaming + let mut item_path = self.item_service.get_data_path().clone(); + item_path.push(item_id.to_string()); + + let mut item_out = compression_engine.create(item_path)?; + + let mut buffer = [0u8; 65536]; + let mut total_bytes = 0i64; + + loop { + let n = reader.read(&mut buffer)?; + if n == 0 { + break; + } + item_out.write_all(&buffer[..n])?; + total_bytes += n as i64; + + if run_meta { + meta_service.process_chunk(&mut plugins, &buffer[..n], conn, item_id); + } + } + + item_out.flush()?; + drop(item_out); + + // Finalize meta plugins + if run_meta { + meta_service.finalize_plugins(&mut plugins, conn, item_id); + } + + // Add client-provided metadata + for (key, value) in &metadata { + crate::db::add_meta(conn, item_id, key, value)?; + } + + item.size = Some(total_bytes); + crate::db::update_item(conn, item)?; + + self.get_item(conn, item_id) + } + pub fn get_item(&self, conn: &mut Connection, id: i64) -> Result { self.item_service.get_item(conn, id) } diff --git a/src/tests/common/test_helpers.rs b/src/tests/common/test_helpers.rs index 417105b..2cddaec 100644 --- a/src/tests/common/test_helpers.rs +++ b/src/tests/common/test_helpers.rs @@ -16,7 +16,7 @@ pub fn create_temp_dir() -> TempDir { pub fn create_temp_file_with_content(dir: &TempDir, filename: &str, content: &str) -> PathBuf { let file_path = dir.path().join(filename); let mut file = File::create(&file_path).expect("Failed to create test file"); - write!(file, "{}", content).expect("Failed to write to test file"); + write!(file, "{content}").expect("Failed to write to test file"); file_path } @@ -95,14 +95,13 @@ pub fn get_file_size(file_path: &PathBuf) -> u64 { /// Assert that a file exists pub fn assert_file_exists(file_path: &PathBuf) { - assert!(file_path.exists(), "File {:?} does not exist", file_path); + assert!(file_path.exists(), "File {file_path:?} does not exist"); } /// Assert that a file does not exist pub fn assert_file_not_exists(file_path: &PathBuf) { assert!( !file_path.exists(), - "File {:?} should not exist but it does", - file_path + "File {file_path:?} should not exist but it does" ); }