fix: add server streaming support, fix pre-existing compilation errors

Server changes for client mode streaming:
- POST /api/item/ now streams body via async channel → ChannelReader
  → save_item_raw_streaming when compress=false or meta=false
- Add POST /api/item/{id}/meta endpoint for client-side metadata
- Add save_item_raw_streaming<R: Read> to SyncDataService
- Add add_item_meta to AsyncDataService

Fix pre-existing issues that were hidden behind swagger cfg gate:
- Remove #[cfg(feature = "swagger")] from item module so it compiles
  with just the server feature
- Fix parse_comma_tags usage (returns Vec, not Result)
- Fix TextDiff temporary value lifetime issue
- Fix io::Error::new → io::Error::other
- Fix ok_or_else → ok_or for Copy types
- Inline format args throughout server code
- Fix empty line after doc comment in pages.rs
- Add cfg_attr for unused_mut where mcp feature gates mutation
- Add type_complexity allow on create_auth_middleware
- Distinguish task error vs save error in spawn_blocking handlers

Co-Authored-By: andrew/openrouter/hunter-alpha <noreply@opencode.ai>
This commit is contained in:
2026-03-12 18:02:56 -03:00
parent c5529bedbf
commit 237a581429
9 changed files with 531 additions and 112 deletions

View File

@@ -10,7 +10,7 @@ pub struct ResponseBuilder;
impl ResponseBuilder { impl ResponseBuilder {
pub fn json<T: Serialize>(data: T) -> Result<Response, StatusCode> { pub fn json<T: Serialize>(data: T) -> Result<Response, StatusCode> {
let json = serde_json::to_vec(&data).map_err(|e| { let json = serde_json::to_vec(&data).map_err(|e| {
log::warn!("Failed to serialize response: {}", e); log::warn!("Failed to serialize response: {e}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})?; })?;
@@ -19,7 +19,7 @@ impl ResponseBuilder {
.header(header::CONTENT_LENGTH, json.len().to_string()) .header(header::CONTENT_LENGTH, json.len().to_string())
.body(axum::body::Body::from(json)) .body(axum::body::Body::from(json))
.map_err(|e| { .map_err(|e| {
log::warn!("Failed to build response: {}", e); log::warn!("Failed to build response: {e}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
}) })
} }
@@ -30,7 +30,7 @@ impl ResponseBuilder {
.header(header::CONTENT_LENGTH, content.len().to_string()) .header(header::CONTENT_LENGTH, content.len().to_string())
.body(axum::body::Body::from(content.to_vec())) .body(axum::body::Body::from(content.to_vec()))
.map_err(|e| { .map_err(|e| {
log::warn!("Failed to build response: {}", e); log::warn!("Failed to build response: {e}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
}) })
} }

View File

@@ -17,8 +17,58 @@ use http_body_util::BodyExt;
use log::{debug, warn}; use log::{debug, warn};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Cursor, Read}; use std::io::{Cursor, Read};
use tokio::sync::mpsc;
use tokio::task; use tokio::task;
/// Bridges an async mpsc receiver to a synchronous `Read` trait.
///
/// Used in `spawn_blocking` contexts to consume data from an async body
/// stream as a regular reader. Blocks on each `read()` call until data
/// is available or the channel is closed.
struct ChannelReader {
rx: mpsc::Receiver<Result<Vec<u8>, std::io::Error>>,
current: Vec<u8>,
pos: usize,
}
impl ChannelReader {
fn new(rx: mpsc::Receiver<Result<Vec<u8>, std::io::Error>>) -> Self {
Self {
rx,
current: Vec::new(),
pos: 0,
}
}
}
impl Read for ChannelReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// If we have buffered data, return it first
if self.pos < self.current.len() {
let remaining = &self.current[self.pos..];
let n = std::cmp::min(buf.len(), remaining.len());
buf[..n].copy_from_slice(&remaining[..n]);
self.pos += n;
return Ok(n);
}
// Need more data from the channel - block until available
match self.rx.blocking_recv() {
Some(Ok(data)) => {
let n = std::cmp::min(buf.len(), data.len());
buf[..n].copy_from_slice(&data[..n]);
if n < data.len() {
self.current = data;
self.pos = n;
}
Ok(n)
}
Some(Err(e)) => Err(e),
None => Ok(0), // Channel closed, EOF
}
}
}
// Helper functions to replace the missing binary_detection module // Helper functions to replace the missing binary_detection module
async fn check_binary_content_allowed( async fn check_binary_content_allowed(
data_service: &AsyncDataService, data_service: &AsyncDataService,
@@ -51,11 +101,7 @@ async fn is_content_binary(
{ {
Ok((_, _, is_binary)) => Ok(is_binary), Ok((_, _, is_binary)) => Ok(is_binary),
Err(e) => { Err(e) => {
log::warn!( log::warn!("Failed to get content info for binary check for item {item_id}: {e}");
"Failed to get content info for binary check for item {}: {}",
item_id,
e
);
Err(StatusCode::INTERNAL_SERVER_ERROR) Err(StatusCode::INTERNAL_SERVER_ERROR)
} }
} }
@@ -92,7 +138,7 @@ fn handle_item_error(error: CoreError) -> StatusCode {
match error { match error {
CoreError::ItemNotFound(_) | CoreError::ItemNotFoundGeneric => StatusCode::NOT_FOUND, CoreError::ItemNotFound(_) | CoreError::ItemNotFoundGeneric => StatusCode::NOT_FOUND,
_ => { _ => {
warn!("Failed to get item: {}", error); warn!("Failed to get item: {error}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
} }
} }
@@ -136,13 +182,7 @@ pub async fn handle_list_items(
let tags: Vec<String> = params let tags: Vec<String> = params
.tags .tags
.as_ref() .as_ref()
.map(|s| { .map(|s| parse_comma_tags(s))
parse_comma_tags(s).map_err(|e| {
warn!("Failed to parse tags: {}", e);
StatusCode::BAD_REQUEST
})
})
.transpose()?
.unwrap_or_default(); .unwrap_or_default();
let data_service = create_data_service(&state); let data_service = create_data_service(&state);
@@ -150,7 +190,7 @@ pub async fn handle_list_items(
.list_items(tags, HashMap::new()) .list_items(tags, HashMap::new())
.await .await
.map_err(|e| { .map_err(|e| {
warn!("Failed to get items: {}", e); warn!("Failed to get items: {e}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})?; })?;
@@ -205,7 +245,7 @@ async fn handle_as_meta_response(
) -> Result<Response, StatusCode> { ) -> Result<Response, StatusCode> {
// Get the item with metadata // Get the item with metadata
let item_with_meta = data_service.get_item(item_id).await.map_err(|e| { let item_with_meta = data_service.get_item(item_id).await.map_err(|e| {
warn!("Failed to get item {} for as_meta content: {}", item_id, e); warn!("Failed to get item {item_id} for as_meta content: {e}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})?; })?;
@@ -290,7 +330,7 @@ async fn handle_as_meta_response_with_metadata(
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
} }
Err(e) => { Err(e) => {
warn!("Failed to get content for item {}: {}", item_id, e); warn!("Failed to get content for item {item_id}: {e}");
Err(StatusCode::INTERNAL_SERVER_ERROR) Err(StatusCode::INTERNAL_SERVER_ERROR)
} }
} }
@@ -337,51 +377,104 @@ pub async fn handle_post_item(
let tags: Vec<String> = params let tags: Vec<String> = params
.tags .tags
.as_deref() .as_deref()
.map(|s| { .map(parse_comma_tags)
parse_comma_tags(s).map_err(|e| {
warn!("Failed to parse tags query parameter: {}", e);
StatusCode::BAD_REQUEST
})
})
.transpose()?
.unwrap_or_default(); .unwrap_or_default();
// Parse metadata from query parameter // Parse metadata from query parameter
let metadata: HashMap<String, String> = if let Some(ref meta_str) = params.metadata { let metadata: HashMap<String, String> = if let Some(ref meta_str) = params.metadata {
serde_json::from_str(meta_str).map_err(|e| { serde_json::from_str(meta_str).map_err(|e| {
warn!("Failed to parse metadata JSON string: {}", e); warn!("Failed to parse metadata JSON string: {e}");
StatusCode::BAD_REQUEST StatusCode::BAD_REQUEST
})? })?
} else { } else {
HashMap::new() HashMap::new()
}; };
// Convert body to bytes first (simpler than streaming for this use case) let compress = params.compress;
let body_bytes = body let run_meta = params.meta;
.collect()
// When server handles both compression and meta, save_item_with_reader
// buffers internally anyway, so collect body in memory.
// When client handles compression/meta, stream the body to avoid buffering.
let item_with_meta = if compress && run_meta {
let body_bytes = body
.collect()
.await
.map_err(|e| {
warn!("Failed to read request body: {e}");
StatusCode::BAD_REQUEST
})?
.to_bytes();
task::spawn_blocking(move || {
let mut conn = db.blocking_lock();
let sync_service =
crate::services::SyncDataService::new(data_dir, settings.as_ref().clone());
let mut cursor = Cursor::new(body_bytes.to_vec());
sync_service.save_item_with_reader(&mut conn, &mut cursor, tags, metadata)
})
.await .await
.map_err(|e| { .map_err(|e| {
warn!("Failed to read request body: {}", e); warn!("Failed to save item (task error): {e}");
StatusCode::BAD_REQUEST StatusCode::INTERNAL_SERVER_ERROR
})? })?
.to_bytes(); .map_err(|e| {
warn!("Failed to save item: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?
} else {
// Stream body through a channel to avoid buffering in memory
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Vec<u8>, std::io::Error>>(16);
let item_with_meta = task::spawn_blocking(move || { // Task to read body frames and send through channel
let mut conn = db.blocking_lock(); tokio::spawn(async move {
let mut cursor = Cursor::new(body_bytes.to_vec()); let mut body = body;
let sync_service = loop {
crate::services::SyncDataService::new(data_dir, settings.as_ref().clone()); match body.frame().await {
sync_service.save_item_with_reader(&mut conn, &mut cursor, tags, metadata) None => break, // Body complete
}) Some(Err(e)) => {
.await let _ = tx
.map_err(|e| { .send(Err(std::io::Error::other(format!("Body error: {e}"))))
warn!("Failed to save item: {}", e); .await;
StatusCode::INTERNAL_SERVER_ERROR break;
})? }
.map_err(|e| { Some(Ok(frame)) => {
warn!("Failed to save item: {}", e); if let Ok(data) = frame.into_data() {
StatusCode::INTERNAL_SERVER_ERROR if tx.send(Ok(data.to_vec())).await.is_err() {
})?; break; // Receiver dropped
}
}
}
}
}
});
task::spawn_blocking(move || {
let mut conn = db.blocking_lock();
let sync_service =
crate::services::SyncDataService::new(data_dir, settings.as_ref().clone());
// Convert async mpsc receiver into a sync Read
let mut stream_reader = ChannelReader::new(rx);
sync_service.save_item_raw_streaming(
&mut conn,
&mut stream_reader,
tags,
metadata,
compress,
run_meta,
)
})
.await
.map_err(|e| {
warn!("Failed to save item (task error): {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?
.map_err(|e| {
warn!("Failed to save item: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?
};
let compression = item_with_meta.item.compression.clone(); let compression = item_with_meta.item.compression.clone();
let tags = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); let tags = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
@@ -439,13 +532,7 @@ pub async fn handle_get_item_latest_content(
let tags: Vec<String> = params let tags: Vec<String> = params
.tags .tags
.as_ref() .as_ref()
.map(|s| { .map(|s| parse_comma_tags(s))
parse_comma_tags(s).map_err(|e| {
warn!("Failed to parse tags: {}", e);
StatusCode::BAD_REQUEST
})
})
.transpose()?
.unwrap_or_default(); .unwrap_or_default();
let data_service = create_data_service(&state); let data_service = create_data_service(&state);
@@ -478,13 +565,14 @@ pub async fn handle_get_item_latest_content(
params.length, params.length,
params.stream, params.stream,
None, None,
params.decompress,
) )
.await .await
} }
} }
Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND), Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND),
Err(e) => { Err(e) => {
warn!("Failed to find latest item for content: {}", e); warn!("Failed to find latest item for content: {e}");
Err(StatusCode::INTERNAL_SERVER_ERROR) Err(StatusCode::INTERNAL_SERVER_ERROR)
} }
} }
@@ -528,8 +616,8 @@ pub async fn handle_get_item_content(
} }
debug!( debug!(
"ITEM_API: Getting content for item {} with stream={}, allow_binary={}, offset={}, length={}", "ITEM_API: Getting content for item {item_id} with stream={}, allow_binary={}, offset={}, length={}",
item_id, params.stream, params.allow_binary, params.offset, params.length params.stream, params.allow_binary, params.offset, params.length
); );
let data_service = create_data_service(&state); let data_service = create_data_service(&state);
@@ -554,6 +642,7 @@ pub async fn handle_get_item_content(
params.length, params.length,
params.stream, params.stream,
None, None,
params.decompress,
) )
.await; .await;
if let Ok(response) = &result { if let Ok(response) = &result {
@@ -566,6 +655,7 @@ pub async fn handle_get_item_content(
} }
} }
#[allow(clippy::too_many_arguments)]
async fn stream_item_content_response( async fn stream_item_content_response(
data_service: &AsyncDataService, data_service: &AsyncDataService,
item_id: i64, item_id: i64,
@@ -574,11 +664,12 @@ async fn stream_item_content_response(
length: u64, length: u64,
stream: bool, stream: bool,
_filter: Option<String>, _filter: Option<String>,
decompress: bool,
) -> Result<Response, StatusCode> { ) -> Result<Response, StatusCode> {
debug!("STREAM_ITEM_CONTENT_RESPONSE: stream={}", stream); debug!("STREAM_ITEM_CONTENT_RESPONSE: stream={stream}, decompress={decompress}");
// Get the item with metadata once // Get the item with metadata once
let item_with_meta = data_service.get_item(item_id).await.map_err(|e| { let item_with_meta = data_service.get_item(item_id).await.map_err(|e| {
warn!("Failed to get item {} for content: {}", item_id, e); warn!("Failed to get item {item_id} for content: {e}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})?; })?;
@@ -592,10 +683,50 @@ async fn stream_item_content_response(
length, length,
stream, stream,
None, None,
decompress,
) )
.await .await
} }
/// Stream raw (unprocessed) content directly from the item file.
///
/// Returns the stored file bytes without decompression or filtering.
async fn stream_raw_content_response(
data_service: &AsyncDataService,
item_id: i64,
offset: u64,
length: u64,
) -> Result<Response, StatusCode> {
// Get item info to find the file path and compression type
let item_with_meta = data_service.get_item(item_id).await.map_err(|e| {
warn!("Failed to get item {item_id} for raw content: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let compression = item_with_meta.item.compression.clone();
// Read raw file bytes
let content = data_service
.get_raw_item_content(item_id)
.await
.map_err(|e| {
warn!("Failed to get raw content for item {item_id}: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let response_content = apply_offset_length(&content, offset, length);
let response = Response::builder()
.header(header::CONTENT_TYPE, "application/octet-stream")
.header("X-Keep-Compression", &compression)
.header(header::CONTENT_LENGTH, response_content.len())
.body(axum::body::Body::from(response_content.to_vec()))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(response)
}
#[allow(clippy::too_many_arguments)]
async fn stream_item_content_response_with_metadata( async fn stream_item_content_response_with_metadata(
data_service: &AsyncDataService, data_service: &AsyncDataService,
item_id: i64, item_id: i64,
@@ -605,11 +736,15 @@ async fn stream_item_content_response_with_metadata(
length: u64, length: u64,
stream: bool, stream: bool,
_filter: Option<String>, _filter: Option<String>,
decompress: bool,
) -> Result<Response, StatusCode> { ) -> Result<Response, StatusCode> {
debug!( debug!("STREAM_ITEM_CONTENT_RESPONSE_WITH_METADATA: stream={stream}, decompress={decompress}");
"STREAM_ITEM_CONTENT_RESPONSE_WITH_METADATA: stream={}",
stream // When decompress=false, return raw stored bytes
); if !decompress {
return stream_raw_content_response(data_service, item_id, offset, length).await;
}
let mime_type = get_mime_type(metadata); let mime_type = get_mime_type(metadata);
// Check if content is binary when allow_binary is false // Check if content is binary when allow_binary is false
@@ -630,7 +765,7 @@ async fn stream_item_content_response_with_metadata(
Ok(response) Ok(response)
} }
Err(e) => { Err(e) => {
warn!("Failed to stream content for item {}: {}", item_id, e); warn!("Failed to stream content for item {item_id}: {e}");
Err(StatusCode::INTERNAL_SERVER_ERROR) Err(StatusCode::INTERNAL_SERVER_ERROR)
} }
} }
@@ -649,7 +784,7 @@ async fn stream_item_content_response_with_metadata(
ResponseBuilder::binary(response_content, &mime_type) ResponseBuilder::binary(response_content, &mime_type)
} }
Err(e) => { Err(e) => {
warn!("Failed to get content for item {}: {}", item_id, e); warn!("Failed to get content for item {item_id}: {e}");
Err(StatusCode::INTERNAL_SERVER_ERROR) Err(StatusCode::INTERNAL_SERVER_ERROR)
} }
} }
@@ -683,13 +818,7 @@ pub async fn handle_get_item_latest_meta(
let tags: Vec<String> = params let tags: Vec<String> = params
.tags .tags
.as_ref() .as_ref()
.map(|s| { .map(|s| parse_comma_tags(s))
parse_comma_tags(s).map_err(|e| {
warn!("Failed to parse tags: {}", e);
StatusCode::BAD_REQUEST
})
})
.transpose()?
.unwrap_or_default(); .unwrap_or_default();
let data_service = create_data_service(&state); let data_service = create_data_service(&state);
@@ -753,6 +882,39 @@ pub async fn handle_get_item_meta(
} }
} }
pub async fn handle_post_item_meta(
State(state): State<AppState>,
Path(item_id): Path<i64>,
Json(metadata): Json<HashMap<String, String>>,
) -> Result<Json<ApiResponse<()>>, StatusCode> {
let data_service = create_data_service(&state);
// Verify item exists
data_service
.get_item(item_id)
.await
.map_err(handle_item_error)?;
// Add each metadata entry
for (key, value) in &metadata {
data_service
.add_item_meta(item_id, key, value)
.await
.map_err(|e| {
warn!("Failed to add metadata {key} for item {item_id}: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
}
let response = ApiResponse {
success: true,
data: Some(()),
error: None,
};
Ok(Json(response))
}
#[utoipa::path( #[utoipa::path(
delete, delete,
path = "/api/item/{item_id}", path = "/api/item/{item_id}",
@@ -920,8 +1082,8 @@ pub async fn handle_diff_items(
return Err(StatusCode::BAD_REQUEST); return Err(StatusCode::BAD_REQUEST);
}; };
let id_a = item_a.item.id.ok_or_else(|| StatusCode::BAD_REQUEST)?; let id_a = item_a.item.id.ok_or(StatusCode::BAD_REQUEST)?;
let id_b = item_b.item.id.ok_or_else(|| StatusCode::BAD_REQUEST)?; let id_b = item_b.item.id.ok_or(StatusCode::BAD_REQUEST)?;
let (mut reader_a, _) = sync_service let (mut reader_a, _) = sync_service
.get_content(&mut conn, id_a) .get_content(&mut conn, id_a)
@@ -932,13 +1094,13 @@ pub async fn handle_diff_items(
let mut content_a = Vec::new(); let mut content_a = Vec::new();
reader_a.read_to_end(&mut content_a).map_err(|e| { reader_a.read_to_end(&mut content_a).map_err(|e| {
log::error!("Failed to read content A: {}", e); log::error!("Failed to read content A: {e}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})?; })?;
let mut content_b = Vec::new(); let mut content_b = Vec::new();
reader_b.read_to_end(&mut content_b).map_err(|e| { reader_b.read_to_end(&mut content_b).map_err(|e| {
log::error!("Failed to read content B: {}", e); log::error!("Failed to read content B: {e}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})?; })?;
@@ -960,7 +1122,8 @@ fn compute_diff(a: &[u8], b: &[u8]) -> Vec<String> {
let old_lines: Vec<&str> = text_a.lines().collect(); let old_lines: Vec<&str> = text_a.lines().collect();
let new_lines: Vec<&str> = text_b.lines().collect(); let new_lines: Vec<&str> = text_b.lines().collect();
let ops = similar::TextDiff::from_lines(text_a.as_ref(), text_b.as_ref()).ops(); let text_diff = similar::TextDiff::from_lines(text_a.as_ref(), text_b.as_ref());
let ops = text_diff.ops();
let mut diff_lines = Vec::new(); let mut diff_lines = Vec::new();

View File

@@ -1,5 +1,4 @@
pub mod common; pub mod common;
#[cfg(feature = "swagger")]
pub mod item; pub mod item;
#[cfg(feature = "mcp")] #[cfg(feature = "mcp")]
pub mod mcp; pub mod mcp;
@@ -57,9 +56,11 @@ use utoipa_swagger_ui::SwaggerUi;
(url = "/", description = "Local server") (url = "/", description = "Local server")
) )
)] )]
#[allow(dead_code)]
struct ApiDoc; struct ApiDoc;
pub fn add_routes(router: Router<AppState>) -> Router<AppState> { pub fn add_routes(router: Router<AppState>) -> Router<AppState> {
#[cfg_attr(not(feature = "mcp"), allow(unused_mut))]
let mut router = router let mut router = router
// Status endpoints // Status endpoints
.route("/api/status", get(status::handle_status)) .route("/api/status", get(status::handle_status))
@@ -77,7 +78,10 @@ pub fn add_routes(router: Router<AppState>) -> Router<AppState> {
"/api/item/latest/content", "/api/item/latest/content",
get(item::handle_get_item_latest_content), get(item::handle_get_item_latest_content),
) )
.route("/api/item/{item_id}/meta", get(item::handle_get_item_meta)) .route(
"/api/item/{item_id}/meta",
get(item::handle_get_item_meta).post(item::handle_post_item_meta),
)
.route( .route(
"/api/item/{item_id}/content", "/api/item/{item_id}/content",
get(item::handle_get_item_content), get(item::handle_get_item_content),

View File

@@ -488,6 +488,10 @@ pub struct ItemQuery {
/// Boolean flag to return content and metadata in a structured JSON format. /// Boolean flag to return content and metadata in a structured JSON format.
#[serde(default = "default_as_meta")] #[serde(default = "default_as_meta")]
pub as_meta: bool, pub as_meta: bool,
/// Whether the server should decompress the content (default: true).
/// Set to false when the client wants raw stored bytes for local decompression.
#[serde(default = "default_true")]
pub decompress: bool,
} }
/// Query parameters for item content retrieval. /// Query parameters for item content retrieval.
@@ -538,6 +542,10 @@ pub struct ItemContentQuery {
/// Boolean flag to return content and metadata in a structured JSON format. /// Boolean flag to return content and metadata in a structured JSON format.
#[serde(default = "default_as_meta")] #[serde(default = "default_as_meta")]
pub as_meta: bool, pub as_meta: bool,
/// Whether the server should decompress the content (default: true).
/// Set to false when the client wants raw stored bytes for local decompression.
#[serde(default = "default_true")]
pub decompress: bool,
} }
/// Default function for allow_binary parameter. /// Default function for allow_binary parameter.
@@ -567,6 +575,15 @@ fn default_as_meta() -> bool {
false false
} }
/// Default function for true boolean parameters.
///
/// # Returns
///
/// `true` as the default value.
fn default_true() -> bool {
true
}
/// Query parameters for creating an item via POST. /// Query parameters for creating an item via POST.
/// ///
/// Query parameters for POST /api/item/ with streaming binary body. /// Query parameters for POST /api/item/ with streaming binary body.
@@ -576,6 +593,14 @@ pub struct CreateItemQuery {
pub tags: Option<String>, pub tags: Option<String>,
/// Optional metadata as JSON string. /// Optional metadata as JSON string.
pub metadata: Option<String>, pub metadata: Option<String>,
/// Whether the server should compress the content (default: true).
/// Set to false when the client has already compressed the content.
#[serde(default = "default_true")]
pub compress: bool,
/// Whether the server should run meta plugins (default: true).
/// Set to false when the client has already collected metadata.
#[serde(default = "default_true")]
pub meta: bool,
} }
/// Request body for creating a new item. /// Request body for creating a new item.
@@ -672,7 +697,7 @@ fn check_basic_auth(
} }
// Otherwise, do direct comparison // Otherwise, do direct comparison
let expected_credentials = format!("keep:{}", expected_password); let expected_credentials = format!("keep:{expected_password}");
return decoded_str == expected_credentials; return decoded_str == expected_credentials;
} }
} }
@@ -803,6 +828,7 @@ pub async fn logging_middleware(
/// let auth_middleware = create_auth_middleware(Some("pass".to_string()), None); /// let auth_middleware = create_auth_middleware(Some("pass".to_string()), None);
/// router.layer(auth_middleware); /// router.layer(auth_middleware);
/// ``` /// ```
#[allow(clippy::type_complexity)]
pub fn create_auth_middleware( pub fn create_auth_middleware(
password: Option<String>, password: Option<String>,
password_hash: Option<String>, password_hash: Option<String>,
@@ -822,7 +848,7 @@ pub fn create_auth_middleware(
let uri = request.uri().clone(); let uri = request.uri().clone();
if !check_auth(&headers, &password, &password_hash) { if !check_auth(&headers, &password, &password_hash) {
warn!("Unauthorized request to {} from {}", uri, addr); warn!("Unauthorized request to {uri} from {addr}");
// Add WWW-Authenticate header to trigger basic auth in browsers // Add WWW-Authenticate header to trigger basic auth in browsers
let mut response = Response::new(axum::body::Body::from("Unauthorized")); let mut response = Response::new(axum::body::Body::from("Unauthorized"));
*response.status_mut() = StatusCode::UNAUTHORIZED; *response.status_mut() = StatusCode::UNAUTHORIZED;

View File

@@ -88,7 +88,7 @@ async fn run_server(
format!("{}:21080", config.address) format!("{}:21080", config.address)
}; };
debug!("SERVER: Starting REST HTTP server on {}", bind_address); debug!("SERVER: Starting REST HTTP server on {bind_address}");
// Use the existing database connection // Use the existing database connection
let db_conn = Arc::new(Mutex::new(conn)); let db_conn = Arc::new(Mutex::new(conn));
@@ -106,6 +106,7 @@ async fn run_server(
.route("/mcp", post(mcp::handle_mcp_request)) .route("/mcp", post(mcp::handle_mcp_request))
.with_state(state.clone()); .with_state(state.clone());
#[cfg_attr(not(feature = "mcp"), allow(unused_mut))]
let mut protected_router = Router::new() let mut protected_router = Router::new()
.merge(api::add_routes(Router::new())) .merge(api::add_routes(Router::new()))
.merge(pages::add_routes(Router::new())); .merge(pages::add_routes(Router::new()));
@@ -137,7 +138,7 @@ async fn run_server(
let addr: SocketAddr = bind_address.parse()?; let addr: SocketAddr = bind_address.parse()?;
info!("SERVER: HTTP server listening on {}", addr); info!("SERVER: HTTP server listening on {addr}");
let listener = tokio::net::TcpListener::bind(addr).await?; let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve( axum::serve(

View File

@@ -47,12 +47,6 @@ fn default_count() -> usize {
1000 1000
} }
/// Provides the default number of items to display per page.
///
/// # Returns
///
/// The default count: 1000.
/// Adds the web page routes to the Axum router. /// Adds the web page routes to the Axum router.
/// ///
/// This function configures the routes for the web interface, including the /// This function configures the routes for the web interface, including the
@@ -96,7 +90,7 @@ async fn list_items(
.map_err(|_| Html("<html><body>Internal Server Error</body></html>".to_string()))?; .map_err(|_| Html("<html><body>Internal Server Error</body></html>".to_string()))?;
Ok(response) Ok(response)
} }
Err(e) => Err(Html(format!("<html><body>Error: {}</body></html>", e))), Err(e) => Err(Html(format!("<html><body>Error: {e}</body></html>"))),
} }
} }
@@ -190,8 +184,7 @@ fn build_item_list(
html.push_str("<p>"); html.push_str("<p>");
for tag in recent_tags { for tag in recent_tags {
html.push_str(&format!( html.push_str(&format!(
"<a href=\"/?tags={}\" style=\"margin-right: 8px;\">{}</a>", "<a href=\"/?tags={tag}\" style=\"margin-right: 8px;\">{tag}</a>"
tag, tag
)); ));
} }
html.push_str("</p>"); html.push_str("</p>");
@@ -228,7 +221,7 @@ fn build_item_list(
"id" => { "id" => {
let id_value = item.id.map(|id| id.to_string()).unwrap_or_default(); let id_value = item.id.map(|id| id.to_string()).unwrap_or_default();
// Make the ID a link to the item details page // Make the ID a link to the item details page
format!("<a href=\"/item/{}\">{}</a>", item_id, id_value) format!("<a href=\"/item/{item_id}\">{id_value}</a>")
} }
"time" => item.ts.format("%Y-%m-%d %H:%M:%S").to_string(), "time" => item.ts.format("%Y-%m-%d %H:%M:%S").to_string(),
"size" => item.size.map(|s| s.to_string()).unwrap_or_default(), "size" => item.size.map(|s| s.to_string()).unwrap_or_default(),
@@ -257,7 +250,7 @@ fn build_item_list(
if let Ok(max_len) = max_len_str.parse::<usize>() { if let Ok(max_len) = max_len_str.parse::<usize>() {
if value.chars().count() > max_len { if value.chars().count() > max_len {
let truncated: String = value.chars().take(max_len).collect(); let truncated: String = value.chars().take(max_len).collect();
format!("{}...", truncated) format!("{truncated}...")
} else { } else {
value value
} }
@@ -275,16 +268,12 @@ fn build_item_list(
crate::config::ColumnAlignment::Center => "text-align: center;", crate::config::ColumnAlignment::Center => "text-align: center;",
}; };
html.push_str(&format!( html.push_str(&format!("<td style=\"{align_style}\">{display_value}</td>"));
"<td style=\"{}\">{}</td>",
align_style, display_value
));
} }
// Actions column // Actions column
html.push_str(&format!( html.push_str(&format!(
"<td><a href=\"/item/{}\">View</a> | <a href=\"/api/item/{}/content\">Download</a></td>", "<td><a href=\"/item/{item_id}\">View</a> | <a href=\"/api/item/{item_id}/content\">Download</a></td>"
item_id, item_id
)); ));
html.push_str("</tr>"); html.push_str("</tr>");
@@ -372,7 +361,7 @@ async fn show_item(
.map_err(|_| Html("<html><body>Internal Server Error</body></html>".to_string()))?; .map_err(|_| Html("<html><body>Internal Server Error</body></html>".to_string()))?;
Ok(response) Ok(response)
} }
Err(e) => Err(Html(format!("<html><body>Error: {}</body></html>", e))), Err(e) => Err(Html(format!("<html><body>Error: {e}</body></html>"))),
} }
} }
@@ -386,10 +375,10 @@ fn build_item_details(conn: &Connection, id: i64) -> Result<String> {
let metas = db::get_item_meta(conn, &item)?; let metas = db::get_item_meta(conn, &item)?;
let mut html = String::new(); let mut html = String::new();
html.push_str(&format!("<html><head><title>Keep - Item #{}</title>", id)); html.push_str(&format!("<html><head><title>Keep - Item #{id}</title>"));
html.push_str("<link rel=\"stylesheet\" href=\"/style.css\">"); html.push_str("<link rel=\"stylesheet\" href=\"/style.css\">");
html.push_str("</head><body>"); html.push_str("</head><body>");
html.push_str(&format!("<h1>Item #{}</h1>", id)); html.push_str(&format!("<h1>Item #{id}</h1>"));
// Single table for all details // Single table for all details
html.push_str("<table>"); html.push_str("<table>");
@@ -439,8 +428,7 @@ fn build_item_details(conn: &Connection, id: i64) -> Result<String> {
// Links // Links
html.push_str("<h2>Actions</h2>"); html.push_str("<h2>Actions</h2>");
html.push_str(&format!( html.push_str(&format!(
"<p><a href=\"/api/item/{}/content\">Download Content</a></p>", "<p><a href=\"/api/item/{id}/content\">Download Content</a></p>"
id
)); ));
html.push_str("<p><a href=\"/\">Back to list</a></p>"); html.push_str("<p><a href=\"/\">Back to list</a></p>");

View File

@@ -51,6 +51,17 @@ impl AsyncDataService {
self.get(&mut conn, id) self.get(&mut conn, id)
} }
pub async fn add_item_meta(
&self,
item_id: i64,
name: &str,
value: &str,
) -> Result<(), CoreError> {
let conn = self.db.lock().await;
crate::db::add_meta(&conn, item_id, name, value)?;
Ok(())
}
pub async fn list_items( pub async fn list_items(
&self, &self,
tags: Vec<String>, tags: Vec<String>,
@@ -184,6 +195,32 @@ impl AsyncDataService {
Ok((Box::pin(stream), content_length)) Ok((Box::pin(stream), content_length))
} }
/// Get raw item content without decompression.
///
/// Reads the stored file bytes directly from disk, bypassing decompression.
/// Used when the client requests raw bytes with `decompress=false`.
pub async fn get_raw_item_content(&self, id: i64) -> Result<Vec<u8>, CoreError> {
let data_path = self.data_path.clone();
tokio::task::spawn_blocking(move || {
let mut item_path = data_path;
item_path.push(id.to_string());
let mut file = std::fs::File::open(&item_path).map_err(|e| {
CoreError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Item file not found: {item_path:?}: {e}"),
))
})?;
let mut content = Vec::new();
file.read_to_end(&mut content)?;
Ok(content)
})
.await
.map_err(|e| CoreError::Other(anyhow::anyhow!("Task join error: {}", e)))?
}
} }
impl DataService for AsyncDataService { impl DataService for AsyncDataService {

View File

@@ -1,16 +1,19 @@
use crate::common::status::StatusInfo; use crate::common::status::StatusInfo;
use crate::compression_engine::{CompressionType, get_compression_engine};
use crate::config::Settings; use crate::config::Settings;
use crate::db::Item; use crate::db::Item;
use crate::db::Meta; use crate::db::Meta;
use crate::modes::common::settings_compression_type;
use crate::services::data_service::DataService; use crate::services::data_service::DataService;
use crate::services::error::CoreError; use crate::services::error::CoreError;
use crate::services::item_service::ItemService; use crate::services::item_service::ItemService;
use crate::services::meta_service::MetaService;
use crate::services::status_service::StatusService; use crate::services::status_service::StatusService;
use crate::services::types::{ItemWithContent, ItemWithMeta}; use crate::services::types::{ItemWithContent, ItemWithMeta};
use clap::Command; use clap::Command;
use rusqlite::Connection; use rusqlite::Connection;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Cursor, Read}; use std::io::{Cursor, Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
pub struct SyncDataService { pub struct SyncDataService {
@@ -80,6 +83,204 @@ impl SyncDataService {
self.get_item(conn, item_id) self.get_item(conn, item_id)
} }
/// Save an item with granular control over compression and meta plugins.
///
/// This method allows clients to control whether compression and meta plugins
/// run server-side or were already handled by the client.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `content` - Raw content bytes.
/// * `tags` - Tags to associate with the item.
/// * `metadata` - Client-provided metadata.
/// * `compress` - Whether the server should compress the content.
/// * `run_meta` - Whether the server should run meta plugins.
///
/// # Returns
///
/// * `Result<ItemWithMeta, CoreError>` - The saved item with full details.
pub fn save_item_raw(
&self,
conn: &mut Connection,
content: &[u8],
tags: Vec<String>,
metadata: HashMap<String, String>,
compress: bool,
run_meta: bool,
) -> Result<ItemWithMeta, CoreError> {
let mut cmd = Command::new("keep");
let settings = &self.settings;
let mut tags = tags;
if tags.is_empty() {
tags.push("none".to_string());
}
let compression_type = if compress {
settings_compression_type(&mut cmd, settings)
} else {
CompressionType::None
};
let compression_engine = get_compression_engine(compression_type.clone())?;
let item_id;
let mut item;
{
item = crate::db::create_item(conn, compression_type.clone())?;
item_id = item.id.unwrap();
crate::db::set_item_tags(conn, item.clone(), &tags)?;
}
// Initialize meta plugins if requested
let meta_service = MetaService::new();
let mut plugins = if run_meta {
meta_service.get_plugins(&mut cmd, settings)
} else {
Vec::new()
};
if run_meta {
meta_service.initialize_plugins(&mut plugins, conn, item_id);
}
// Write content to file
let mut item_path = self.item_service.get_data_path().clone();
item_path.push(item_id.to_string());
let mut item_out = compression_engine.create(item_path)?;
let mut total_bytes = 0i64;
const PIPESIZE: usize = 65536;
if run_meta && !plugins.is_empty() {
// Process in chunks for meta plugins
let mut offset = 0;
while offset < content.len() {
let end = std::cmp::min(offset + PIPESIZE, content.len());
let chunk = &content[offset..end];
item_out.write_all(chunk)?;
total_bytes += chunk.len() as i64;
meta_service.process_chunk(&mut plugins, chunk, conn, item_id);
offset = end;
}
} else {
// Write all at once, no meta processing
item_out.write_all(content)?;
total_bytes = content.len() as i64;
}
item_out.flush()?;
drop(item_out);
// Finalize meta plugins
if run_meta {
meta_service.finalize_plugins(&mut plugins, conn, item_id);
}
// Add client-provided metadata
for (key, value) in &metadata {
crate::db::add_meta(conn, item_id, key, value)?;
}
item.size = Some(total_bytes);
crate::db::update_item(conn, item)?;
self.get_item(conn, item_id)
}
/// Save an item from a streaming reader with granular control over compression.
///
/// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method
/// reads from the reader in chunks and writes directly to the compression
/// engine, avoiding buffering the entire content in memory.
pub fn save_item_raw_streaming(
&self,
conn: &mut Connection,
reader: &mut dyn Read,
tags: Vec<String>,
metadata: HashMap<String, String>,
compress: bool,
run_meta: bool,
) -> Result<ItemWithMeta, CoreError> {
let mut cmd = Command::new("keep");
let settings = &self.settings;
let mut tags = tags;
if tags.is_empty() {
tags.push("none".to_string());
}
let compression_type = if compress {
settings_compression_type(&mut cmd, settings)
} else {
CompressionType::None
};
let compression_engine = get_compression_engine(compression_type.clone())?;
let item_id;
let mut item;
{
item = crate::db::create_item(conn, compression_type.clone())?;
item_id = item.id.unwrap();
crate::db::set_item_tags(conn, item.clone(), &tags)?;
}
// Initialize meta plugins if requested
let meta_service = MetaService::new();
let mut plugins = if run_meta {
meta_service.get_plugins(&mut cmd, settings)
} else {
Vec::new()
};
if run_meta {
meta_service.initialize_plugins(&mut plugins, conn, item_id);
}
// Write content to file via streaming
let mut item_path = self.item_service.get_data_path().clone();
item_path.push(item_id.to_string());
let mut item_out = compression_engine.create(item_path)?;
let mut buffer = [0u8; 65536];
let mut total_bytes = 0i64;
loop {
let n = reader.read(&mut buffer)?;
if n == 0 {
break;
}
item_out.write_all(&buffer[..n])?;
total_bytes += n as i64;
if run_meta {
meta_service.process_chunk(&mut plugins, &buffer[..n], conn, item_id);
}
}
item_out.flush()?;
drop(item_out);
// Finalize meta plugins
if run_meta {
meta_service.finalize_plugins(&mut plugins, conn, item_id);
}
// Add client-provided metadata
for (key, value) in &metadata {
crate::db::add_meta(conn, item_id, key, value)?;
}
item.size = Some(total_bytes);
crate::db::update_item(conn, item)?;
self.get_item(conn, item_id)
}
pub fn get_item(&self, conn: &mut Connection, id: i64) -> Result<ItemWithMeta, CoreError> { pub fn get_item(&self, conn: &mut Connection, id: i64) -> Result<ItemWithMeta, CoreError> {
self.item_service.get_item(conn, id) self.item_service.get_item(conn, id)
} }

View File

@@ -16,7 +16,7 @@ pub fn create_temp_dir() -> TempDir {
pub fn create_temp_file_with_content(dir: &TempDir, filename: &str, content: &str) -> PathBuf { pub fn create_temp_file_with_content(dir: &TempDir, filename: &str, content: &str) -> PathBuf {
let file_path = dir.path().join(filename); let file_path = dir.path().join(filename);
let mut file = File::create(&file_path).expect("Failed to create test file"); let mut file = File::create(&file_path).expect("Failed to create test file");
write!(file, "{}", content).expect("Failed to write to test file"); write!(file, "{content}").expect("Failed to write to test file");
file_path file_path
} }
@@ -95,14 +95,13 @@ pub fn get_file_size(file_path: &PathBuf) -> u64 {
/// Assert that a file exists /// Assert that a file exists
pub fn assert_file_exists(file_path: &PathBuf) { pub fn assert_file_exists(file_path: &PathBuf) {
assert!(file_path.exists(), "File {:?} does not exist", file_path); assert!(file_path.exists(), "File {file_path:?} does not exist");
} }
/// Assert that a file does not exist /// Assert that a file does not exist
pub fn assert_file_not_exists(file_path: &PathBuf) { pub fn assert_file_not_exists(file_path: &PathBuf) {
assert!( assert!(
!file_path.exists(), !file_path.exists(),
"File {:?} should not exist but it does", "File {file_path:?} should not exist but it does"
file_path
); );
} }