feat: add streaming support to /content endpoints with offset and length parameters
Co-authored-by: aider (openai/andrew/openrouter/qwen/qwen3-coder) <aider@aider.chat>
This commit is contained in:
@@ -4,13 +4,15 @@ use axum::{
|
||||
response::{Json, Response, IntoResponse},
|
||||
http::header,
|
||||
};
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use log::warn;
|
||||
use std::collections::HashMap;
|
||||
use anyhow;
|
||||
|
||||
use crate::services::async_item_service::AsyncItemService;
|
||||
use crate::services::error::CoreError;
|
||||
use crate::modes::server::common::{AppState, ApiResponse, ItemInfo, ItemContentInfo, TagsQuery, ListItemsQuery, ItemQuery, ItemInfoListResponse, ItemInfoResponse, ItemContentInfoResponse, MetadataResponse};
|
||||
use crate::modes::server::common::{AppState, ApiResponse, ItemInfo, ItemContentInfo, TagsQuery, ListItemsQuery, ItemQuery, ItemInfoListResponse, ItemInfoResponse, ItemContentInfoResponse, MetadataResponse, ItemContentQuery, ItemContentParams};
|
||||
use crate::common::is_binary::is_binary;
|
||||
|
||||
#[utoipa::path(
|
||||
@@ -93,6 +95,50 @@ pub async fn handle_list_items(
|
||||
Ok(Json(response))
|
||||
}
|
||||
|
||||
async fn stream_item_content(service: &AsyncItemService, item_id: i64, allow_binary: bool, offset: u64, length: u64) -> anyhow::Result<(ReaderStream<tokio::fs::File>, String)> {
|
||||
let item_with_meta = service.get_item(item_id).await?;
|
||||
let metadata = item_with_meta.meta_as_map();
|
||||
|
||||
// Determine if content is binary
|
||||
let is_content_binary = if let Some(binary_meta) = metadata.get("binary") {
|
||||
binary_meta == "true"
|
||||
} else {
|
||||
// If binary metadata not available, we'll need to check the file
|
||||
false // Default to false for now, since we don't want to read the file here
|
||||
};
|
||||
|
||||
// If binary content is not allowed and content is binary, return an error
|
||||
if is_content_binary && !allow_binary {
|
||||
return Err(anyhow::anyhow!("Binary content not allowed"));
|
||||
}
|
||||
|
||||
let mime_type = metadata
|
||||
.get("mime_type")
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| "application/octet-stream".to_string());
|
||||
|
||||
// Open the file for streaming
|
||||
let file_path = service.data_dir.join(format!("{}.dat", item_id));
|
||||
let mut file = tokio::fs::File::open(&file_path).await?;
|
||||
|
||||
// Seek to the requested offset
|
||||
if offset > 0 {
|
||||
file.seek(std::io::SeekFrom::Start(offset)).await?;
|
||||
}
|
||||
|
||||
// Create a reader stream with optional length limit
|
||||
let stream = if length > 0 {
|
||||
// Limit the stream to the specified length
|
||||
let limited_reader = tokio::io::BufReader::new(file).take(length);
|
||||
ReaderStream::new(limited_reader)
|
||||
} else {
|
||||
// Stream the entire file from the offset
|
||||
ReaderStream::new(file)
|
||||
};
|
||||
|
||||
Ok((stream, mime_type))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/api/item/",
|
||||
@@ -146,7 +192,10 @@ pub async fn handle_post_item(
|
||||
(status = 500, description = "Internal server error - Failed to retrieve item content due to decompression or filesystem error")
|
||||
),
|
||||
params(
|
||||
("tags" = Option<String>, Query, description = "Comma-separated list of tags to filter by (e.g., 'important,work'). If specified, returns the latest item that has ALL the specified tags.")
|
||||
("tags" = Option<String>, Query, description = "Comma-separated list of tags to filter by (e.g., 'important,work'). If specified, returns the latest item that has ALL the specified tags."),
|
||||
("allow_binary" = Option<bool>, Query, description = "Whether to allow binary content to be returned (default: true). When false, returns 400 for binary files."),
|
||||
("offset" = Option<u64>, Query, description = "Byte offset from the start of the file to begin reading (default: 0)"),
|
||||
("length" = Option<u64>, Query, description = "Maximum number of bytes to return, starting at offset (default: 0 for unlimited)")
|
||||
),
|
||||
security(
|
||||
("bearerAuth" = [])
|
||||
@@ -155,7 +204,7 @@ pub async fn handle_post_item(
|
||||
)]
|
||||
pub async fn handle_get_item_latest_content(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<TagsQuery>,
|
||||
Query(params): Query<ItemContentQuery>,
|
||||
) -> Result<Response, StatusCode> {
|
||||
let tags: Vec<String> = params
|
||||
.tags
|
||||
@@ -172,17 +221,17 @@ pub async fn handle_get_item_latest_content(
|
||||
match item_with_meta {
|
||||
Ok(item) => {
|
||||
let item_id = item.item.id.unwrap();
|
||||
match get_item_raw_content(&item_service, item_id).await {
|
||||
Ok((content, mime_type)) => {
|
||||
let mut response = content.into_response();
|
||||
response.headers_mut().insert(
|
||||
header::CONTENT_TYPE,
|
||||
mime_type.parse().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
||||
);
|
||||
match stream_item_content(&item_service, item_id, params.allow_binary.unwrap_or(true), params.offset.unwrap_or(0), params.length.unwrap_or(0)).await {
|
||||
Ok((stream, mime_type)) => {
|
||||
let body = axum::body::boxed(axum::body::StreamBody::new(stream));
|
||||
let response = Response::builder()
|
||||
.header(header::CONTENT_TYPE, mime_type)
|
||||
.body(body)
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
Ok(response)
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to get raw content for item {}: {}", item_id, e);
|
||||
warn!("Failed to stream content for item {}: {}", item_id, e);
|
||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
@@ -203,13 +252,16 @@ pub async fn handle_get_item_latest_content(
|
||||
description = "Download the raw content of a specific item by its ID. The content is automatically decompressed and returned with the appropriate MIME type header for proper browser handling. This endpoint is ideal for downloading files or viewing content directly in the browser.",
|
||||
responses(
|
||||
(status = 200, description = "Successfully retrieved item raw content with appropriate Content-Type header set based on detected MIME type"),
|
||||
(status = 400, description = "Bad request - Invalid item ID (must be a positive integer)"),
|
||||
(status = 400, description = "Bad request - Invalid item ID (must be a positive integer) or binary content not allowed"),
|
||||
(status = 401, description = "Unauthorized - Invalid or missing authentication credentials"),
|
||||
(status = 404, description = "Item not found - No item exists with the specified ID"),
|
||||
(status = 500, description = "Internal server error - Failed to retrieve item content due to decompression or filesystem error")
|
||||
),
|
||||
params(
|
||||
("item_id" = i64, Path, description = "Unique identifier of the item to retrieve content for (must be a positive integer)")
|
||||
("item_id" = i64, Path, description = "Unique identifier of the item to retrieve content for (must be a positive integer)"),
|
||||
("allow_binary" = Option<bool>, Query, description = "Whether to allow binary content to be returned (default: true). When false, returns 400 for binary files."),
|
||||
("offset" = Option<u64>, Query, description = "Byte offset from the start of the file to begin reading (default: 0)"),
|
||||
("length" = Option<u64>, Query, description = "Maximum number of bytes to return, starting at offset (default: 0 for unlimited)")
|
||||
),
|
||||
security(
|
||||
("bearerAuth" = [])
|
||||
@@ -219,6 +271,7 @@ pub async fn handle_get_item_latest_content(
|
||||
pub async fn handle_get_item_content(
|
||||
State(state): State<AppState>,
|
||||
Path(item_id): Path<i64>,
|
||||
Query(params): Query<ItemContentParams>,
|
||||
) -> Result<Response, StatusCode> {
|
||||
// Validate that item ID is positive to prevent path traversal issues
|
||||
if item_id <= 0 {
|
||||
@@ -227,13 +280,13 @@ pub async fn handle_get_item_content(
|
||||
|
||||
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
|
||||
|
||||
match get_item_raw_content(&item_service, item_id).await {
|
||||
Ok((content, mime_type)) => {
|
||||
let mut response = content.into_response();
|
||||
response.headers_mut().insert(
|
||||
header::CONTENT_TYPE,
|
||||
mime_type.parse().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
||||
);
|
||||
match stream_item_content(&item_service, item_id, params.allow_binary.unwrap_or(true), params.offset.unwrap_or(0), params.length.unwrap_or(0)).await {
|
||||
Ok((stream, mime_type)) => {
|
||||
let body = axum::body::boxed(axum::body::StreamBody::new(stream));
|
||||
let response = Response::builder()
|
||||
.header(header::CONTENT_TYPE, mime_type)
|
||||
.body(body)
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
Ok(response)
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -242,49 +295,13 @@ pub async fn handle_get_item_content(
|
||||
return Err(StatusCode::NOT_FOUND);
|
||||
}
|
||||
}
|
||||
warn!("Failed to get raw content for item {}: {}", item_id, e);
|
||||
warn!("Failed to stream content for item {}: {}", item_id, e);
|
||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_item_content_info(service: &AsyncItemService, item_id: i64, allow_binary: bool) -> anyhow::Result<ItemContentInfo> {
|
||||
let item_with_content = service.get_item_content(item_id).await.map_err(anyhow::Error::from)?;
|
||||
let metadata = item_with_content.item_with_meta.meta_as_map();
|
||||
|
||||
// Determine if content is binary
|
||||
let is_binary = if let Some(binary_meta) = metadata.get("binary") {
|
||||
binary_meta == "true"
|
||||
} else {
|
||||
is_binary(&item_with_content.content)
|
||||
};
|
||||
|
||||
// Get content if not binary or if binary is allowed
|
||||
let content = if is_binary && !allow_binary {
|
||||
None
|
||||
} else {
|
||||
Some(String::from_utf8_lossy(&item_with_content.content).to_string())
|
||||
};
|
||||
|
||||
Ok(ItemContentInfo {
|
||||
metadata,
|
||||
content,
|
||||
binary: is_binary,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_item_raw_content(service: &AsyncItemService, item_id: i64) -> anyhow::Result<(Vec<u8>, String)> {
|
||||
let item_with_content = service.get_item_content(item_id).await.map_err(anyhow::Error::from)?;
|
||||
let content = item_with_content.content;
|
||||
let metadata = item_with_content.item_with_meta.meta_as_map();
|
||||
|
||||
let mime_type = metadata
|
||||
.get("mime_type")
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| "application/octet-stream".to_string());
|
||||
|
||||
Ok((content, mime_type))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
|
||||
Reference in New Issue
Block a user