refactor: update api handlers to use async item service
Co-authored-by: aider (openai/andrew/openrouter/google/gemini-2.5-pro) <aider@aider.chat>
This commit is contained in:
@@ -6,13 +6,10 @@ use axum::{
|
|||||||
};
|
};
|
||||||
use log::warn;
|
use log::warn;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::PathBuf;
|
use anyhow;
|
||||||
use std::str::FromStr;
|
|
||||||
use std::io::Read;
|
|
||||||
use anyhow::{Result, anyhow};
|
|
||||||
|
|
||||||
use crate::compression_engine::{CompressionType, get_compression_engine};
|
use crate::core::async_item_service::AsyncItemService;
|
||||||
use crate::db;
|
use crate::core::error::CoreError;
|
||||||
use crate::modes::server::common::{AppState, ApiResponse, ItemInfo, ItemContentInfo, TagsQuery, ListItemsQuery, ItemQuery, ItemInfoListResponse, ItemInfoResponse, ItemContentInfoResponse, MetadataResponse};
|
use crate::modes::server::common::{AppState, ApiResponse, ItemInfo, ItemContentInfo, TagsQuery, ListItemsQuery, ItemQuery, ItemInfoListResponse, ItemInfoResponse, ItemContentInfoResponse, MetadataResponse};
|
||||||
use crate::common::is_binary::is_binary;
|
use crate::common::is_binary::is_binary;
|
||||||
|
|
||||||
@@ -42,71 +39,45 @@ pub async fn handle_list_items(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Query(params): Query<ListItemsQuery>,
|
Query(params): Query<ListItemsQuery>,
|
||||||
) -> Result<Json<ApiResponse<Vec<ItemInfo>>>, StatusCode> {
|
) -> Result<Json<ApiResponse<Vec<ItemInfo>>>, StatusCode> {
|
||||||
|
let tags: Vec<String> = params
|
||||||
let mut conn = state.db.lock().await;
|
.tags
|
||||||
|
|
||||||
let tags: Vec<String> = params.tags
|
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
|
.map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let items = if tags.is_empty() {
|
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
|
||||||
db::get_items(&mut *conn).map_err(|e| {
|
let mut items_with_meta = item_service
|
||||||
|
.list_items(tags, HashMap::new())
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
warn!("Failed to get items: {}", e);
|
warn!("Failed to get items: {}", e);
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
StatusCode::INTERNAL_SERVER_ERROR
|
||||||
})?
|
})?;
|
||||||
} else {
|
|
||||||
db::get_items_matching(&mut *conn, &tags, &HashMap::new())
|
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Failed to get items matching tags {:?}: {}", tags, e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?
|
|
||||||
};
|
|
||||||
|
|
||||||
// Apply ordering (default is newest first)
|
// Apply ordering (default is newest first)
|
||||||
let mut items = items;
|
|
||||||
match params.order.as_deref().unwrap_or("newest") {
|
match params.order.as_deref().unwrap_or("newest") {
|
||||||
"newest" => items.sort_by(|a, b| b.ts.cmp(&a.ts)),
|
"newest" => items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts)),
|
||||||
"oldest" => items.sort_by(|a, b| a.ts.cmp(&b.ts)),
|
"oldest" => items_with_meta.sort_by(|a, b| a.item.ts.cmp(&a.item.ts)),
|
||||||
_ => items.sort_by(|a, b| b.ts.cmp(&a.ts)), // default to newest
|
_ => items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts)), // default to newest
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply pagination
|
// Apply pagination
|
||||||
let start = params.start.unwrap_or(0) as usize;
|
let start = params.start.unwrap_or(0) as usize;
|
||||||
let count = params.count.unwrap_or(100) as usize;
|
let count = params.count.unwrap_or(100) as usize;
|
||||||
let items: Vec<_> = items.into_iter().skip(start).take(count).collect();
|
let items_with_meta: Vec<_> = items_with_meta.into_iter().skip(start).take(count).collect();
|
||||||
|
|
||||||
// Get item IDs for batch queries
|
let item_infos: Vec<ItemInfo> = items_with_meta
|
||||||
let item_ids: Vec<i64> = items.iter().filter_map(|item| item.id).collect();
|
|
||||||
|
|
||||||
// Get tags and metadata for all items
|
|
||||||
let tags_map = db::get_tags_for_items(&mut *conn, &item_ids)
|
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Failed to get tags for items: {}", e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?;
|
|
||||||
let meta_map = db::get_meta_for_items(&mut *conn, &item_ids)
|
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Failed to get metadata for items: {}", e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let item_infos: Vec<ItemInfo> = items
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|item| {
|
.map(|item_with_meta| {
|
||||||
let item_id = item.id.unwrap_or(0);
|
let item_id = item_with_meta.item.id.unwrap_or(0);
|
||||||
let item_tags = tags_map.get(&item_id)
|
let item_tags = item_with_meta.tags.into_iter().map(|t| t.name).collect();
|
||||||
.map(|tags| tags.iter().map(|t| t.name.clone()).collect())
|
let item_meta = item_with_meta.meta_as_map();
|
||||||
.unwrap_or_default();
|
|
||||||
let item_meta = meta_map.get(&item_id)
|
|
||||||
.cloned()
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
ItemInfo {
|
ItemInfo {
|
||||||
id: item_id,
|
id: item_id,
|
||||||
ts: item.ts.to_rfc3339(),
|
ts: item_with_meta.item.ts.to_rfc3339(),
|
||||||
size: item.size,
|
size: item_with_meta.item.size,
|
||||||
compression: item.compression,
|
compression: item_with_meta.item.compression,
|
||||||
tags: item_tags,
|
tags: item_tags,
|
||||||
metadata: item_meta,
|
metadata: item_meta,
|
||||||
}
|
}
|
||||||
@@ -187,25 +158,22 @@ pub async fn handle_get_item_latest(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Query(params): Query<TagsQuery>,
|
Query(params): Query<TagsQuery>,
|
||||||
) -> Result<Json<ApiResponse<ItemContentInfo>>, StatusCode> {
|
) -> Result<Json<ApiResponse<ItemContentInfo>>, StatusCode> {
|
||||||
|
let tags: Vec<String> = params
|
||||||
|
.tags
|
||||||
|
.as_ref()
|
||||||
|
.map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mut conn = state.db.lock().await;
|
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
|
||||||
|
|
||||||
let item = if let Some(tags_str) = params.tags {
|
let item_with_meta = item_service
|
||||||
let tags: Vec<String> = tags_str.split(',').map(|t| t.trim().to_string()).collect();
|
.find_item(vec![], tags, HashMap::new())
|
||||||
db::get_item_matching(&mut *conn, &tags, &HashMap::new())
|
.await;
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Failed to get item matching tags {:?} for content: {}", tags, e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?
|
|
||||||
} else {
|
|
||||||
db::get_item_last(&mut *conn).map_err(|e| {
|
|
||||||
warn!("Failed to get last item for content: {}", e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(item) = item {
|
match item_with_meta {
|
||||||
match get_item_content_info(&item, &state.data_dir, &mut *conn, params.allow_binary).await {
|
Ok(item) => {
|
||||||
|
let item_id = item.item.id.unwrap();
|
||||||
|
match get_item_content_info(&item_service, item_id, params.allow_binary).await {
|
||||||
Ok(content_info) => {
|
Ok(content_info) => {
|
||||||
let response = ApiResponse {
|
let response = ApiResponse {
|
||||||
success: true,
|
success: true,
|
||||||
@@ -215,7 +183,7 @@ pub async fn handle_get_item_latest(
|
|||||||
Ok(Json(response))
|
Ok(Json(response))
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to get content for item {}: {}", item.id.unwrap_or(0), e);
|
warn!("Failed to get content for item {}: {}", item_id, e);
|
||||||
let response = ApiResponse::<ItemContentInfo> {
|
let response = ApiResponse::<ItemContentInfo> {
|
||||||
success: false,
|
success: false,
|
||||||
data: None,
|
data: None,
|
||||||
@@ -224,8 +192,12 @@ pub async fn handle_get_item_latest(
|
|||||||
Ok(Json(response))
|
Ok(Json(response))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
Err(StatusCode::NOT_FOUND)
|
Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to find latest item: {}", e);
|
||||||
|
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -261,13 +233,9 @@ pub async fn handle_get_item(
|
|||||||
return Err(StatusCode::BAD_REQUEST);
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut conn = state.db.lock().await;
|
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
|
||||||
|
|
||||||
if let Some(item) = db::get_item(&mut *conn, item_id).map_err(|e| {
|
match get_item_content_info(&item_service, item_id, params.allow_binary).await {
|
||||||
warn!("Failed to get item {} for content: {}", item_id, e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})? {
|
|
||||||
match get_item_content_info(&item, &state.data_dir, &mut *conn, params.allow_binary).await {
|
|
||||||
Ok(content_info) => {
|
Ok(content_info) => {
|
||||||
let response = ApiResponse {
|
let response = ApiResponse {
|
||||||
success: true,
|
success: true,
|
||||||
@@ -278,6 +246,12 @@ pub async fn handle_get_item(
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to get content for item {}: {}", item_id, e);
|
warn!("Failed to get content for item {}: {}", item_id, e);
|
||||||
|
// Check if the error is ItemNotFound to return 404
|
||||||
|
if let Some(core_err) = e.downcast_ref::<CoreError>() {
|
||||||
|
if matches!(core_err, CoreError::ItemNotFound(_)) {
|
||||||
|
return Err(StatusCode::NOT_FOUND);
|
||||||
|
}
|
||||||
|
}
|
||||||
let response = ApiResponse::<ItemContentInfo> {
|
let response = ApiResponse::<ItemContentInfo> {
|
||||||
success: false,
|
success: false,
|
||||||
data: None,
|
data: None,
|
||||||
@@ -286,9 +260,6 @@ pub async fn handle_get_item(
|
|||||||
Ok(Json(response))
|
Ok(Json(response))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(StatusCode::NOT_FOUND)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[utoipa::path(
|
#[utoipa::path(
|
||||||
@@ -315,25 +286,22 @@ pub async fn handle_get_item_latest_content(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Query(params): Query<TagsQuery>,
|
Query(params): Query<TagsQuery>,
|
||||||
) -> Result<Response, StatusCode> {
|
) -> Result<Response, StatusCode> {
|
||||||
|
let tags: Vec<String> = params
|
||||||
|
.tags
|
||||||
|
.as_ref()
|
||||||
|
.map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mut conn = state.db.lock().await;
|
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
|
||||||
|
|
||||||
let item = if let Some(tags_str) = params.tags {
|
let item_with_meta = item_service
|
||||||
let tags: Vec<String> = tags_str.split(',').map(|t| t.trim().to_string()).collect();
|
.find_item(vec![], tags, HashMap::new())
|
||||||
db::get_item_matching(&mut *conn, &tags, &HashMap::new())
|
.await;
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Failed to get item matching tags {:?} for content: {}", tags, e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?
|
|
||||||
} else {
|
|
||||||
db::get_item_last(&mut *conn).map_err(|e| {
|
|
||||||
warn!("Failed to get last item for content: {}", e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(item) = item {
|
match item_with_meta {
|
||||||
match get_item_raw_content(&item, &state.data_dir, &mut *conn).await {
|
Ok(item) => {
|
||||||
|
let item_id = item.item.id.unwrap();
|
||||||
|
match get_item_raw_content(&item_service, item_id).await {
|
||||||
Ok((content, mime_type)) => {
|
Ok((content, mime_type)) => {
|
||||||
let mut response = content.into_response();
|
let mut response = content.into_response();
|
||||||
response.headers_mut().insert(
|
response.headers_mut().insert(
|
||||||
@@ -343,12 +311,16 @@ pub async fn handle_get_item_latest_content(
|
|||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to get raw content for item {}: {}", item.id.unwrap_or(0), e);
|
warn!("Failed to get raw content for item {}: {}", item_id, e);
|
||||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
Err(StatusCode::NOT_FOUND)
|
Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to find latest item for content: {}", e);
|
||||||
|
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -382,13 +354,9 @@ pub async fn handle_get_item_content(
|
|||||||
return Err(StatusCode::BAD_REQUEST);
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut conn = state.db.lock().await;
|
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
|
||||||
|
|
||||||
if let Some(item) = db::get_item(&mut *conn, item_id).map_err(|e| {
|
match get_item_raw_content(&item_service, item_id).await {
|
||||||
warn!("Failed to get item {} for content: {}", item_id, e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})? {
|
|
||||||
match get_item_raw_content(&item, &state.data_dir, &mut *conn).await {
|
|
||||||
Ok((content, mime_type)) => {
|
Ok((content, mime_type)) => {
|
||||||
let mut response = content.into_response();
|
let mut response = content.into_response();
|
||||||
response.headers_mut().insert(
|
response.headers_mut().insert(
|
||||||
@@ -398,79 +366,33 @@ pub async fn handle_get_item_content(
|
|||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
if let Some(core_err) = e.downcast_ref::<CoreError>() {
|
||||||
|
if matches!(core_err, CoreError::ItemNotFound(_)) {
|
||||||
|
return Err(StatusCode::NOT_FOUND);
|
||||||
|
}
|
||||||
|
}
|
||||||
warn!("Failed to get raw content for item {}: {}", item_id, e);
|
warn!("Failed to get raw content for item {}: {}", item_id, e);
|
||||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(StatusCode::NOT_FOUND)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_item_content(item: &db::Item, data_dir: &PathBuf) -> Result<String> {
|
async fn get_item_content_info(service: &AsyncItemService, item_id: i64, allow_binary: bool) -> anyhow::Result<ItemContentInfo> {
|
||||||
let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?;
|
let item_with_content = service.get_item_content(item_id).await.map_err(anyhow::Error::from)?;
|
||||||
|
let metadata = item_with_content.item_with_meta.meta_as_map();
|
||||||
// Validate that item ID is positive to prevent path traversal issues
|
|
||||||
if item_id <= 0 {
|
|
||||||
return Err(anyhow!("Invalid item ID: {}", item_id));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut item_path = data_dir.clone();
|
|
||||||
item_path.push(item_id.to_string());
|
|
||||||
|
|
||||||
let compression_type = CompressionType::from_str(&item.compression)?;
|
|
||||||
let compression_engine = get_compression_engine(compression_type)?;
|
|
||||||
|
|
||||||
// Read the content using the compression engine
|
|
||||||
let mut reader = compression_engine.open(item_path)?;
|
|
||||||
let mut content = String::new();
|
|
||||||
reader.read_to_string(&mut content)?;
|
|
||||||
|
|
||||||
Ok(content)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_item_content_info(item: &db::Item, data_dir: &PathBuf, conn: &mut rusqlite::Connection, allow_binary: bool) -> Result<ItemContentInfo> {
|
|
||||||
let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?;
|
|
||||||
|
|
||||||
// Validate that item ID is positive to prevent path traversal issues
|
|
||||||
if item_id <= 0 {
|
|
||||||
return Err(anyhow!("Invalid item ID: {}", item_id));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get metadata
|
|
||||||
let meta_entries = db::get_item_meta(conn, item)
|
|
||||||
.map_err(|e| anyhow!("Failed to get metadata: {}", e))?;
|
|
||||||
|
|
||||||
let metadata: HashMap<String, String> = meta_entries
|
|
||||||
.iter()
|
|
||||||
.map(|m| (m.name.clone(), m.value.clone()))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// Determine if content is binary
|
// Determine if content is binary
|
||||||
let is_binary = if let Some(binary_meta) = metadata.get("binary") {
|
let is_binary = if let Some(binary_meta) = metadata.get("binary") {
|
||||||
binary_meta == "true"
|
binary_meta == "true"
|
||||||
} else {
|
} else {
|
||||||
// Fall back to checking the actual content
|
is_binary(&item_with_content.content)
|
||||||
let mut item_path = data_dir.clone();
|
|
||||||
item_path.push(item_id.to_string());
|
|
||||||
|
|
||||||
let compression_type = CompressionType::from_str(&item.compression)?;
|
|
||||||
let compression_engine = get_compression_engine(compression_type)?;
|
|
||||||
|
|
||||||
let mut reader = compression_engine.open(item_path)?;
|
|
||||||
let mut buffer = [0u8; 8192]; // Read first 8KB to check
|
|
||||||
let bytes_read = reader.read(&mut buffer)?;
|
|
||||||
is_binary(&buffer[..bytes_read])
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Get content if not binary or if binary is allowed
|
// Get content if not binary or if binary is allowed
|
||||||
let content = if is_binary && !allow_binary {
|
let content = if is_binary && !allow_binary {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
match get_item_content(item, data_dir).await {
|
Some(String::from_utf8_lossy(&item_with_content.content).to_string())
|
||||||
Ok(content_str) => Some(content_str),
|
|
||||||
Err(_) => None, // If we can't read as string, treat as binary
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(ItemContentInfo {
|
Ok(ItemContentInfo {
|
||||||
@@ -480,33 +402,14 @@ async fn get_item_content_info(item: &db::Item, data_dir: &PathBuf, conn: &mut r
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_item_raw_content(item: &db::Item, data_dir: &PathBuf, conn: &mut rusqlite::Connection) -> Result<(Vec<u8>, String)> {
|
async fn get_item_raw_content(service: &AsyncItemService, item_id: i64) -> anyhow::Result<(Vec<u8>, String)> {
|
||||||
let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?;
|
let item_with_content = service.get_item_content(item_id).await.map_err(anyhow::Error::from)?;
|
||||||
|
let content = item_with_content.content;
|
||||||
|
let metadata = item_with_content.item_with_meta.meta_as_map();
|
||||||
|
|
||||||
// Validate that item ID is positive to prevent path traversal issues
|
let mime_type = metadata
|
||||||
if item_id <= 0 {
|
.get("mime_type")
|
||||||
return Err(anyhow!("Invalid item ID: {}", item_id));
|
.map(|s| s.to_string())
|
||||||
}
|
|
||||||
|
|
||||||
let mut item_path = data_dir.clone();
|
|
||||||
item_path.push(item_id.to_string());
|
|
||||||
|
|
||||||
let compression_type = CompressionType::from_str(&item.compression)?;
|
|
||||||
let compression_engine = get_compression_engine(compression_type)?;
|
|
||||||
|
|
||||||
// Read the raw content using the compression engine
|
|
||||||
let mut reader = compression_engine.open(item_path)?;
|
|
||||||
let mut content = Vec::new();
|
|
||||||
reader.read_to_end(&mut content)?;
|
|
||||||
|
|
||||||
// Get MIME type from metadata
|
|
||||||
let meta_entries = db::get_item_meta(conn, item)
|
|
||||||
.map_err(|e| anyhow!("Failed to get metadata: {}", e))?;
|
|
||||||
|
|
||||||
let mime_type = meta_entries
|
|
||||||
.iter()
|
|
||||||
.find(|m| m.name == "mime_type")
|
|
||||||
.map(|m| m.value.clone())
|
|
||||||
.unwrap_or_else(|| "application/octet-stream".to_string());
|
.unwrap_or_else(|| "application/octet-stream".to_string());
|
||||||
|
|
||||||
Ok((content, mime_type))
|
Ok((content, mime_type))
|
||||||
@@ -536,32 +439,17 @@ pub async fn handle_get_item_latest_meta(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Query(params): Query<TagsQuery>,
|
Query(params): Query<TagsQuery>,
|
||||||
) -> Result<Json<ApiResponse<HashMap<String, String>>>, StatusCode> {
|
) -> Result<Json<ApiResponse<HashMap<String, String>>>, StatusCode> {
|
||||||
|
let tags: Vec<String> = params
|
||||||
|
.tags
|
||||||
|
.as_ref()
|
||||||
|
.map(|s| s.split(',').map(|t| t.trim().to_string()).collect())
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mut conn = state.db.lock().await;
|
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
|
||||||
|
|
||||||
let item = if let Some(tags_str) = params.tags {
|
match item_service.find_item(vec![], tags, HashMap::new()).await {
|
||||||
let tags: Vec<String> = tags_str.split(',').map(|t| t.trim().to_string()).collect();
|
Ok(item_with_meta) => {
|
||||||
db::get_item_matching(&mut *conn, &tags, &HashMap::new())
|
let item_meta = item_with_meta.meta_as_map();
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Failed to get item matching tags {:?} for meta: {}", tags, e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?
|
|
||||||
} else {
|
|
||||||
db::get_item_last(&mut *conn).map_err(|e| {
|
|
||||||
warn!("Failed to get last item for meta: {}", e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(item) = item {
|
|
||||||
let item_meta = db::get_item_meta(&mut *conn, &item)
|
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Failed to get metadata for item {}: {}", item.id.unwrap_or(0), e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?
|
|
||||||
.into_iter()
|
|
||||||
.map(|m| (m.name, m.value))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let response = ApiResponse {
|
let response = ApiResponse {
|
||||||
success: true,
|
success: true,
|
||||||
@@ -570,8 +458,12 @@ pub async fn handle_get_item_latest_meta(
|
|||||||
};
|
};
|
||||||
|
|
||||||
Ok(Json(response))
|
Ok(Json(response))
|
||||||
} else {
|
}
|
||||||
Err(StatusCode::NOT_FOUND)
|
Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to get latest item for meta: {}", e);
|
||||||
|
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -600,21 +492,11 @@ pub async fn handle_get_item_meta(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Path(item_id): Path<i64>,
|
Path(item_id): Path<i64>,
|
||||||
) -> Result<Json<ApiResponse<HashMap<String, String>>>, StatusCode> {
|
) -> Result<Json<ApiResponse<HashMap<String, String>>>, StatusCode> {
|
||||||
|
let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone());
|
||||||
|
|
||||||
let mut conn = state.db.lock().await;
|
match item_service.get_item(item_id).await {
|
||||||
|
Ok(item_with_meta) => {
|
||||||
if let Some(item) = db::get_item(&mut *conn, item_id).map_err(|e| {
|
let item_meta = item_with_meta.meta_as_map();
|
||||||
warn!("Failed to get item {} for meta: {}", item_id, e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})? {
|
|
||||||
let item_meta = db::get_item_meta(&mut *conn, &item)
|
|
||||||
.map_err(|e| {
|
|
||||||
warn!("Failed to get metadata for item {}: {}", item_id, e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
|
||||||
})?
|
|
||||||
.into_iter()
|
|
||||||
.map(|m| (m.name, m.value))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let response = ApiResponse {
|
let response = ApiResponse {
|
||||||
success: true,
|
success: true,
|
||||||
@@ -623,8 +505,12 @@ pub async fn handle_get_item_meta(
|
|||||||
};
|
};
|
||||||
|
|
||||||
Ok(Json(response))
|
Ok(Json(response))
|
||||||
} else {
|
}
|
||||||
Err(StatusCode::NOT_FOUND)
|
Err(CoreError::ItemNotFound(_)) => Err(StatusCode::NOT_FOUND),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to get item {} for meta: {}", item_id, e);
|
||||||
|
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ use std::str::FromStr;
|
|||||||
use log::{debug, warn};
|
use log::{debug, warn};
|
||||||
|
|
||||||
use crate::modes::server::common::AppState;
|
use crate::modes::server::common::AppState;
|
||||||
|
use crate::core::async_item_service::AsyncItemService;
|
||||||
|
use crate::core::error::CoreError;
|
||||||
use crate::db;
|
use crate::db;
|
||||||
use crate::compression_engine::{CompressionType, get_compression_engine};
|
use crate::compression_engine::{CompressionType, get_compression_engine};
|
||||||
use crate::meta_plugin::{MetaPluginType, get_meta_plugin};
|
use crate::meta_plugin::{MetaPluginType, get_meta_plugin};
|
||||||
@@ -136,25 +138,18 @@ impl KeepTools {
|
|||||||
.and_then(|v| v.as_i64())
|
.and_then(|v| v.as_i64())
|
||||||
.ok_or_else(|| ToolError::InvalidArguments("Missing or invalid 'id' field".to_string()))?;
|
.ok_or_else(|| ToolError::InvalidArguments("Missing or invalid 'id' field".to_string()))?;
|
||||||
|
|
||||||
let mut conn = self.state.db.lock().await;
|
let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone());
|
||||||
|
|
||||||
let item = db::get_item(&mut *conn, item_id)?
|
let item_with_content = match service.get_item_content(item_id).await {
|
||||||
.ok_or_else(|| ToolError::InvalidArguments(format!("Item {} not found", item_id)))?;
|
Ok(iwc) => iwc,
|
||||||
|
Err(CoreError::ItemNotFound(_)) => return Err(ToolError::InvalidArguments(format!("Item {} not found", item_id))),
|
||||||
|
Err(e) => return Err(ToolError::Other(anyhow!(e))),
|
||||||
|
};
|
||||||
|
|
||||||
// Get content
|
let item = item_with_content.item_with_meta.item;
|
||||||
let mut item_path = self.state.data_dir.clone();
|
let content = String::from_utf8_lossy(&item_with_content.content).to_string();
|
||||||
item_path.push(item_id.to_string());
|
let tags: Vec<String> = item_with_content.item_with_meta.tags.into_iter().map(|t| t.name).collect();
|
||||||
|
let metadata = item_with_content.item_with_meta.meta_as_map();
|
||||||
let compression_type = CompressionType::from_str(&item.compression)?;
|
|
||||||
let compression_engine = get_compression_engine(compression_type)?;
|
|
||||||
|
|
||||||
let mut reader = compression_engine.open(item_path)?;
|
|
||||||
let mut content = String::new();
|
|
||||||
reader.read_to_string(&mut content)?;
|
|
||||||
|
|
||||||
// Get metadata and tags
|
|
||||||
let tags = db::get_item_tags(&mut *conn, &item)?;
|
|
||||||
let metadata = db::get_item_meta(&mut *conn, &item)?;
|
|
||||||
|
|
||||||
let response = serde_json::json!({
|
let response = serde_json::json!({
|
||||||
"id": item_id,
|
"id": item_id,
|
||||||
@@ -162,8 +157,8 @@ impl KeepTools {
|
|||||||
"timestamp": item.ts.to_rfc3339(),
|
"timestamp": item.ts.to_rfc3339(),
|
||||||
"size": item.size,
|
"size": item.size,
|
||||||
"compression": item.compression,
|
"compression": item.compression,
|
||||||
"tags": tags.iter().map(|t| &t.name).collect::<Vec<_>>(),
|
"tags": tags,
|
||||||
"metadata": metadata.iter().map(|m| (&m.name, &m.value)).collect::<HashMap<_, _>>()
|
"metadata": metadata,
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(serde_json::to_string_pretty(&response)?)
|
Ok(serde_json::to_string_pretty(&response)?)
|
||||||
@@ -176,31 +171,21 @@ impl KeepTools {
|
|||||||
.map(|arr| arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect())
|
.map(|arr| arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mut conn = self.state.db.lock().await;
|
let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone());
|
||||||
|
|
||||||
let item = if tags.is_empty() {
|
let item_with_meta = match service.find_item(vec![], tags, HashMap::new()).await {
|
||||||
db::get_item_last(&mut *conn)?
|
Ok(iwm) => iwm,
|
||||||
} else {
|
Err(CoreError::ItemNotFoundGeneric) => return Err(ToolError::InvalidArguments("No items found".to_string())),
|
||||||
db::get_item_matching(&mut *conn, &tags, &HashMap::new())?
|
Err(e) => return Err(ToolError::Other(anyhow!(e))),
|
||||||
};
|
};
|
||||||
|
|
||||||
let item = item.ok_or_else(|| ToolError::InvalidArguments("No items found".to_string()))?;
|
let item_id = item_with_meta.item.id.ok_or_else(|| anyhow!("Item missing ID after find"))?;
|
||||||
let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?;
|
let item_with_content = service.get_item_content(item_id).await.map_err(|e| ToolError::Other(anyhow!(e)))?;
|
||||||
|
|
||||||
// Get content
|
let item = item_with_content.item_with_meta.item;
|
||||||
let mut item_path = self.state.data_dir.clone();
|
let content = String::from_utf8_lossy(&item_with_content.content).to_string();
|
||||||
item_path.push(item_id.to_string());
|
let tags: Vec<String> = item_with_content.item_with_meta.tags.into_iter().map(|t| t.name).collect();
|
||||||
|
let metadata = item_with_content.item_with_meta.meta_as_map();
|
||||||
let compression_type = CompressionType::from_str(&item.compression)?;
|
|
||||||
let compression_engine = get_compression_engine(compression_type)?;
|
|
||||||
|
|
||||||
let mut reader = compression_engine.open(item_path)?;
|
|
||||||
let mut content = String::new();
|
|
||||||
reader.read_to_string(&mut content)?;
|
|
||||||
|
|
||||||
// Get metadata and tags
|
|
||||||
let tags = db::get_item_tags(&mut *conn, &item)?;
|
|
||||||
let metadata = db::get_item_meta(&mut *conn, &item)?;
|
|
||||||
|
|
||||||
let response = serde_json::json!({
|
let response = serde_json::json!({
|
||||||
"id": item_id,
|
"id": item_id,
|
||||||
@@ -208,8 +193,8 @@ impl KeepTools {
|
|||||||
"timestamp": item.ts.to_rfc3339(),
|
"timestamp": item.ts.to_rfc3339(),
|
||||||
"size": item.size,
|
"size": item.size,
|
||||||
"compression": item.compression,
|
"compression": item.compression,
|
||||||
"tags": tags.iter().map(|t| &t.name).collect::<Vec<_>>(),
|
"tags": tags,
|
||||||
"metadata": metadata.iter().map(|m| (&m.name, &m.value)).collect::<HashMap<_, _>>()
|
"metadata": metadata,
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(serde_json::to_string_pretty(&response)?)
|
Ok(serde_json::to_string_pretty(&response)?)
|
||||||
@@ -233,36 +218,20 @@ impl KeepTools {
|
|||||||
.and_then(|v| v.as_u64())
|
.and_then(|v| v.as_u64())
|
||||||
.unwrap_or(0) as usize;
|
.unwrap_or(0) as usize;
|
||||||
|
|
||||||
let mut conn = self.state.db.lock().await;
|
let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone());
|
||||||
|
let mut items_with_meta = service.list_items(tags, HashMap::new()).await.map_err(|e| ToolError::Other(anyhow!(e)))?;
|
||||||
let items = if tags.is_empty() {
|
|
||||||
db::get_items(&mut *conn)?
|
|
||||||
} else {
|
|
||||||
db::get_items_matching(&mut *conn, &tags, &HashMap::new())?
|
|
||||||
};
|
|
||||||
|
|
||||||
// Sort by timestamp (newest first) and apply pagination
|
// Sort by timestamp (newest first) and apply pagination
|
||||||
let mut items = items;
|
items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts));
|
||||||
items.sort_by(|a, b| b.ts.cmp(&a.ts));
|
let items_with_meta: Vec<_> = items_with_meta.into_iter().skip(offset).take(limit).collect();
|
||||||
let items: Vec<_> = items.into_iter().skip(offset).take(limit).collect();
|
|
||||||
|
|
||||||
// Get item IDs for batch queries
|
let items_info: Vec<_> = items_with_meta
|
||||||
let item_ids: Vec<i64> = items.iter().filter_map(|item| item.id).collect();
|
|
||||||
|
|
||||||
// Get tags and metadata for all items
|
|
||||||
let tags_map = db::get_tags_for_items(&mut *conn, &item_ids)?;
|
|
||||||
let meta_map = db::get_meta_for_items(&mut *conn, &item_ids)?;
|
|
||||||
|
|
||||||
let items_info: Vec<_> = items
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|item| {
|
.map(|item_with_meta| {
|
||||||
|
let item = item_with_meta.item;
|
||||||
let item_id = item.id.unwrap_or(0);
|
let item_id = item.id.unwrap_or(0);
|
||||||
let item_tags = tags_map.get(&item_id)
|
let item_tags: Vec<String> = item_with_meta.tags.into_iter().map(|t| t.name).collect();
|
||||||
.map(|tags| tags.iter().map(|t| &t.name).collect::<Vec<_>>())
|
let item_meta = item_with_meta.meta_as_map();
|
||||||
.unwrap_or_default();
|
|
||||||
let item_meta = meta_map.get(&item_id)
|
|
||||||
.cloned()
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
serde_json::json!({
|
serde_json::json!({
|
||||||
"id": item_id,
|
"id": item_id,
|
||||||
@@ -302,31 +271,19 @@ impl KeepTools {
|
|||||||
}).collect())
|
}).collect())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mut conn = self.state.db.lock().await;
|
let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone());
|
||||||
|
let mut items_with_meta = service.list_items(tags.clone(), metadata.clone()).await.map_err(|e| ToolError::Other(anyhow!(e)))?;
|
||||||
let items = db::get_items_matching(&mut *conn, &tags, &metadata)?;
|
|
||||||
|
|
||||||
// Sort by timestamp (newest first)
|
// Sort by timestamp (newest first)
|
||||||
let mut items = items;
|
items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts));
|
||||||
items.sort_by(|a, b| b.ts.cmp(&a.ts));
|
|
||||||
|
|
||||||
// Get item IDs for batch queries
|
let items_info: Vec<_> = items_with_meta
|
||||||
let item_ids: Vec<i64> = items.iter().filter_map(|item| item.id).collect();
|
|
||||||
|
|
||||||
// Get tags and metadata for all items
|
|
||||||
let tags_map = db::get_tags_for_items(&mut *conn, &item_ids)?;
|
|
||||||
let meta_map = db::get_meta_for_items(&mut *conn, &item_ids)?;
|
|
||||||
|
|
||||||
let items_info: Vec<_> = items
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|item| {
|
.map(|item_with_meta| {
|
||||||
|
let item = item_with_meta.item;
|
||||||
let item_id = item.id.unwrap_or(0);
|
let item_id = item.id.unwrap_or(0);
|
||||||
let item_tags = tags_map.get(&item_id)
|
let item_tags: Vec<String> = item_with_meta.tags.into_iter().map(|t| t.name).collect();
|
||||||
.map(|tags| tags.iter().map(|t| &t.name).collect::<Vec<_>>())
|
let item_meta = item_with_meta.meta_as_map();
|
||||||
.unwrap_or_default();
|
|
||||||
let item_meta = meta_map.get(&item_id)
|
|
||||||
.cloned()
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
serde_json::json!({
|
serde_json::json!({
|
||||||
"id": item_id,
|
"id": item_id,
|
||||||
|
|||||||
Reference in New Issue
Block a user