Files
keep/src/modes/server/api/item.rs
Andrew Phillips 62844b2073 fix: update bytes import and fix data_path field references
Co-authored-by: aider (openai/andrew/openrouter/qwen/qwen3-coder) <aider@aider.chat>
2025-08-25 18:47:54 -03:00

410 lines
18 KiB
Rust

use axum::{
extract::{Path, Query, State},
http::{StatusCode},
response::{Json, Response},
http::header,
};
use log::warn;
use std::collections::HashMap;
use crate::services::async_item_service::AsyncItemService;
use crate::services::error::CoreError;
use crate::modes::server::common::{AppState, ApiResponse, ItemInfo, TagsQuery, ListItemsQuery, ItemInfoListResponse, ItemInfoResponse, MetadataResponse, ItemQuery, ItemContentQuery};
use crate::common::is_binary::is_binary;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio_util::io::ReaderStream;
use tokio_util::bytes::Bytes;
#[utoipa::path(
get,
path = "/api/item/",
operation_id = "list_items",
summary = "List stored items",
description = "Retrieve a paginated list of stored items with their metadata and tags. Items can be filtered by tags and sorted by creation time. Each item includes comprehensive metadata extracted during storage such as file type, encoding, size, and custom tags for organization.",
responses(
(status = 200, description = "Successfully retrieved paginated list of items with metadata and tags", body = ItemInfoListResponse),
(status = 401, description = "Unauthorized - Invalid or missing authentication credentials"),
(status = 500, description = "Internal server error - Failed to retrieve items from database")
),
params(
("tags" = Option<String>, Query, description = "Comma-separated list of tags to filter by (e.g., 'important,work'). Only items that have ALL specified tags will be returned."),
("order" = Option<String>, Query, description = "Sort order for results: 'newest' (default, most recent first) or 'oldest' (oldest first)"),
("start" = Option<u64>, Query, description = "Starting index for pagination (default: 0). Use this to skip items for pagination."),
("count" = Option<u64>, Query, description = "Maximum number of items to return in this request (default: 100, maximum: 1000)")
),
security(
("bearerAuth" = [])
),
tag = "item"
)]
pub async fn handle_list_items(
State(state): State<AppState>,
Query(params): Query<ListItemsQuery>,
) -> Result<Json<ApiResponse<Vec<ItemInfo>>>, StatusCode> {
let tags: Vec<String> = params
.tags
.as_ref()
.map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
.unwrap_or_default();
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
let mut items_with_meta = item_service
.list_items(tags, HashMap::new())
.await
.map_err(|e| {
warn!("Failed to get items: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Apply ordering (default is newest first)
match params.order.as_deref().unwrap_or("newest") {
"newest" => items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts)),
"oldest" => items_with_meta.sort_by(|a, b| a.item.ts.cmp(&b.item.ts)),
_ => items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts)), // default to newest
}
// Apply pagination
let start = params.start.unwrap_or(0) as usize;
let count = params.count.unwrap_or(100) as usize;
let items_with_meta: Vec<_> = items_with_meta.into_iter().skip(start).take(count).collect();
let item_infos: Vec<ItemInfo> = items_with_meta
.into_iter()
.map(|item_with_meta| {
let item_id = item_with_meta.item.id.unwrap_or(0);
let item_tags: Vec<String> = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let item_meta = item_with_meta.meta_as_map();
ItemInfo {
id: item_id,
ts: item_with_meta.item.ts.to_rfc3339(),
size: item_with_meta.item.size,
compression: item_with_meta.item.compression,
tags: item_tags,
metadata: item_meta,
}
})
.collect();
let response = ApiResponse {
success: true,
data: Some(item_infos),
error: None,
};
Ok(Json(response))
}
#[utoipa::path(
post,
path = "/api/item/",
operation_id = "post_item",
summary = "Store new item",
description = "Create a new item by uploading content. The content will be automatically compressed, analyzed for metadata (file type, encoding, etc.), and stored with a unique identifier. Binary detection is performed automatically, and various metadata plugins extract information like line counts, file types, and system information.",
responses(
(status = 201, description = "Successfully created new item with generated metadata and unique ID", body = ItemInfoResponse),
(status = 400, description = "Bad request - Invalid input data or malformed content"),
(status = 401, description = "Unauthorized - Invalid or missing authentication credentials"),
(status = 500, description = "Internal server error - Failed to create item due to storage or processing error")
),
request_body(
content = String,
description = "Raw content to store as a new item. Can be text or binary data.",
content_type = "application/octet-stream"
),
security(
("bearerAuth" = [])
),
tag = "item"
)]
pub async fn handle_post_item(
State(_state): State<AppState>,
) -> Result<Json<ApiResponse<ItemInfo>>, StatusCode> {
// This is a simplified implementation
// In a real implementation, you'd need to properly parse multipart/form-data
// or JSON payload with the item data
let response = ApiResponse::<ItemInfo> {
success: false,
data: None,
error: Some("POST /api/item/ not yet implemented".to_string()),
};
Ok(Json(response))
}
#[utoipa::path(
get,
path = "/api/item/latest/content",
operation_id = "get_item_latest_content",
summary = "Download latest item content",
description = "Download the raw content of the most recently stored item. The content is automatically decompressed and returned with the appropriate MIME type header for proper browser handling. If tags are specified, returns the latest item matching ALL the given tags. If allow_binary is false and the content is detected as binary, a 400 error is returned. Supports offset and length parameters for partial content retrieval.",
responses(
(status = 200, description = "Successfully retrieved latest item raw content with appropriate Content-Type header set based on detected MIME type"),
(status = 400, description = "Bad request - Content is binary but allow_binary is false"),
(status = 401, description = "Unauthorized - Invalid or missing authentication credentials"),
(status = 404, description = "Item not found - No items exist in the database or no items match the specified tag criteria"),
(status = 500, description = "Internal server error - Failed to retrieve item content due to decompression or filesystem error")
),
params(
("tags" = Option<String>, Query, description = "Comma-separated list of tags to filter by (e.g., 'important,work'). If specified, returns the latest item that has ALL the specified tags."),
("allow_binary" = Option<bool>, Query, description = "Whether to allow binary content to be returned (default: true). When false, returns 400 for binary files."),
("offset" = Option<u64>, Query, description = "Byte offset from the start of the file to begin reading (default: 0)"),
("length" = Option<u64>, Query, description = "Maximum number of bytes to return, starting at offset (default: 0 for unlimited)")
),
security(
("bearerAuth" = [])
),
tag = "item"
)]
pub async fn handle_get_item_latest_content(
State(state): State<AppState>,
Query(params): Query<ItemContentQuery>,
) -> Result<Response, StatusCode> {
let tags: Vec<String> = params
.tags
.as_ref()
.map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
.unwrap_or_default();
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
let item_with_meta = item_service
.find_item(vec![], tags, HashMap::new())
.await;
match item_with_meta {
Ok(item) => {
let item_id = item.item.id.unwrap();
match stream_item_content(&item_service, item_id, params.allow_binary, params.offset, params.length).await {
Ok((stream, mime_type)) => {
let body = axum::body::Body::from_stream(stream);
let response = Response::builder()
.header(header::CONTENT_TYPE, mime_type)
.body(body)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(response)
}
Err(e) => {
warn!("Failed to stream content for item {}: {}", item_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND),
Err(e) => {
warn!("Failed to find latest item for content: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn stream_item_content(
service: &AsyncItemService,
item_id: i64,
allow_binary: bool,
offset: u64,
length: u64,
) -> anyhow::Result<(impl tokio_stream::Stream<Item = Result<bytes::Bytes, std::io::Error>>, String)> {
let item_with_content = service.get_item_content(item_id).await?;
let metadata = item_with_content.item_with_meta.meta_as_map();
// Check if content is binary when allow_binary is false
if !allow_binary {
let is_content_binary = if let Some(binary_val) = metadata.get("binary") {
binary_val == "true"
} else {
// If binary metadata not available, check the actual content
is_binary(&item_with_content.content)
};
if is_content_binary {
return Err(anyhow::anyhow!("Binary content not allowed"));
}
}
let mime_type = metadata
.get("mime_type")
.map(|s| s.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string());
// Open the file for streaming
let file_path = service.data_dir.join(format!("{}.dat", item_id));
let file = tokio::fs::File::open(&file_path).await?;
let mut buffered_file = tokio::io::BufReader::new(file);
// Seek to the requested offset if needed
if offset > 0 {
buffered_file.seek(std::io::SeekFrom::Start(offset)).await?;
}
// Create a reader stream with optional length limit
// Create a reader stream with optional length limit
let stream = if length > 0 {
// Limit the stream to the specified length
Box::pin(ReaderStream::new(buffered_file.take(length))) as std::pin::Pin<Box<dyn tokio_stream::Stream<Item = Result<Bytes, std::io::Error>> + Send>>
} else {
// Stream the entire file from the offset
Box::pin(ReaderStream::new(buffered_file)) as std::pin::Pin<Box<dyn tokio_stream::Stream<Item = Result<Bytes, std::io::Error>> + Send>>
};
Ok((stream, mime_type))
}
#[utoipa::path(
get,
path = "/api/item/{item_id}/content",
operation_id = "get_item_content",
summary = "Download item content",
description = "Download the raw content of a specific item by its ID. The content is automatically decompressed and returned with the appropriate MIME type header for proper browser handling. This endpoint is ideal for downloading files or viewing content directly in the browser. If allow_binary is false and the content is detected as binary, a 400 error is returned. Supports offset and length parameters for partial content retrieval.",
responses(
(status = 200, description = "Successfully retrieved item raw content with appropriate Content-Type header set based on detected MIME type"),
(status = 400, description = "Bad request - Invalid item ID (must be a positive integer) or content is binary but allow_binary is false"),
(status = 401, description = "Unauthorized - Invalid or missing authentication credentials"),
(status = 404, description = "Item not found - No item exists with the specified ID"),
(status = 500, description = "Internal server error - Failed to retrieve item content due to decompression or filesystem error")
),
params(
("item_id" = i64, Path, description = "Unique identifier of the item to retrieve content for (must be a positive integer)"),
("allow_binary" = Option<bool>, Query, description = "Whether to allow binary content to be returned (default: true). When false, returns 400 for binary files."),
("offset" = Option<u64>, Query, description = "Byte offset from the start of the file to begin reading (default: 0)"),
("length" = Option<u64>, Query, description = "Maximum number of bytes to return, starting at offset (default: 0 for unlimited)")
),
security(
("bearerAuth" = [])
),
tag = "item"
)]
pub async fn handle_get_item_content(
State(state): State<AppState>,
Path(item_id): Path<i64>,
Query(params): Query<ItemQuery>,
) -> Result<Response, StatusCode> {
// Validate that item ID is positive to prevent path traversal issues
if item_id <= 0 {
return Err(StatusCode::BAD_REQUEST);
}
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
match stream_item_content(&item_service, item_id, params.allow_binary, params.offset, params.length).await {
Ok((stream, mime_type)) => {
let body = axum::body::Body::from_stream(stream);
let response = Response::builder()
.header(header::CONTENT_TYPE, mime_type)
.body(body)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(response)
}
Err(e) => {
warn!("Failed to stream content for item {}: {}", item_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
#[utoipa::path(
get,
path = "/api/item/latest/meta",
operation_id = "get_item_latest_meta",
summary = "Get latest item metadata",
description = "Retrieve comprehensive metadata for the most recently stored item. Metadata includes automatically extracted information such as file type, MIME type, encoding, line counts, file size, system information (user, hostname, etc.), and cryptographic hashes. If tags are specified, returns metadata for the latest item matching ALL the given tags.",
responses(
(status = 200, description = "Successfully retrieved latest item metadata as key-value pairs including file type, encoding, size, and system information", body = MetadataResponse),
(status = 401, description = "Unauthorized - Invalid or missing authentication credentials"),
(status = 404, description = "Item not found - No items exist in the database or no items match the specified tag criteria"),
(status = 500, description = "Internal server error - Failed to retrieve item metadata from database")
),
params(
("tags" = Option<String>, Query, description = "Comma-separated list of tags to filter by (e.g., 'important,work'). If specified, returns metadata for the latest item that has ALL the specified tags.")
),
security(
("bearerAuth" = [])
),
tag = "item"
)]
pub async fn handle_get_item_latest_meta(
State(state): State<AppState>,
Query(params): Query<TagsQuery>,
) -> Result<Json<ApiResponse<HashMap<String, String>>>, StatusCode> {
let tags: Vec<String> = params
.tags
.as_ref()
.map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
.unwrap_or_default();
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
match item_service.find_item(vec![], tags, HashMap::new()).await {
Ok(item_with_meta) => {
let item_meta = item_with_meta.meta_as_map();
let response = ApiResponse {
success: true,
data: Some(item_meta),
error: None,
};
Ok(Json(response))
}
Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND),
Err(e) => {
warn!("Failed to get latest item for meta: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
#[utoipa::path(
get,
path = "/api/item/{item_id}/meta",
operation_id = "get_item_meta",
summary = "Get item metadata",
description = "Retrieve comprehensive metadata for a specific item by its ID. Metadata includes automatically extracted information such as file type, MIME type, encoding, line counts, file size, system information (user, hostname, process ID, etc.), cryptographic hashes (SHA256, MD5), and performance metrics (read time, read rate).",
responses(
(status = 200, description = "Successfully retrieved item metadata as key-value pairs including file type, encoding, size, and system information", body = MetadataResponse),
(status = 400, description = "Bad request - Invalid item ID (must be a positive integer)"),
(status = 401, description = "Unauthorized - Invalid or missing authentication credentials"),
(status = 404, description = "Item not found - No item exists with the specified ID"),
(status = 500, description = "Internal server error - Failed to retrieve item metadata from database")
),
params(
("item_id" = i64, Path, description = "Unique identifier of the item to retrieve metadata for (must be a positive integer)")
),
security(
("bearerAuth" = [])
),
tag = "item"
)]
pub async fn handle_get_item_meta(
State(state): State<AppState>,
Path(item_id): Path<i64>,
) -> Result<Json<ApiResponse<HashMap<String, String>>>, StatusCode> {
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
match item_service.get_item(item_id).await {
Ok(item_with_meta) => {
let item_meta = item_with_meta.meta_as_map();
let response = ApiResponse {
success: true,
data: Some(item_meta),
error: None,
};
Ok(Json(response))
}
Err(CoreError::ItemNotFound(_)) => Err(StatusCode::NOT_FOUND),
Err(e) => {
warn!("Failed to get item {} for meta: {}", item_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}