diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index f9af591..15d05bd 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -6,13 +6,10 @@ use axum::{ }; use log::warn; use std::collections::HashMap; -use std::path::PathBuf; -use std::str::FromStr; -use std::io::Read; -use anyhow::{Result, anyhow}; +use anyhow; -use crate::compression_engine::{CompressionType, get_compression_engine}; -use crate::db; +use crate::core::async_item_service::AsyncItemService; +use crate::core::error::CoreError; use crate::modes::server::common::{AppState, ApiResponse, ItemInfo, ItemContentInfo, TagsQuery, ListItemsQuery, ItemQuery, ItemInfoListResponse, ItemInfoResponse, ItemContentInfoResponse, MetadataResponse}; use crate::common::is_binary::is_binary; @@ -42,83 +39,57 @@ pub async fn handle_list_items( State(state): State, Query(params): Query, ) -> Result>>, StatusCode> { - - let mut conn = state.db.lock().await; - - let tags: Vec = params.tags + let tags: Vec = params + .tags .as_ref() .map(|s| s.split(',').map(|t| t.trim().to_string()).collect()) .unwrap_or_default(); - - let items = if tags.is_empty() { - db::get_items(&mut *conn).map_err(|e| { + + let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone()); + let mut items_with_meta = item_service + .list_items(tags, HashMap::new()) + .await + .map_err(|e| { warn!("Failed to get items: {}", e); StatusCode::INTERNAL_SERVER_ERROR - })? - } else { - db::get_items_matching(&mut *conn, &tags, &HashMap::new()) - .map_err(|e| { - warn!("Failed to get items matching tags {:?}: {}", tags, e); - StatusCode::INTERNAL_SERVER_ERROR - })? - }; - + })?; + // Apply ordering (default is newest first) - let mut items = items; match params.order.as_deref().unwrap_or("newest") { - "newest" => items.sort_by(|a, b| b.ts.cmp(&a.ts)), - "oldest" => items.sort_by(|a, b| a.ts.cmp(&b.ts)), - _ => items.sort_by(|a, b| b.ts.cmp(&a.ts)), // default to newest + "newest" => items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts)), + "oldest" => items_with_meta.sort_by(|a, b| a.item.ts.cmp(&a.item.ts)), + _ => items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts)), // default to newest } - + // Apply pagination let start = params.start.unwrap_or(0) as usize; let count = params.count.unwrap_or(100) as usize; - let items: Vec<_> = items.into_iter().skip(start).take(count).collect(); - - // Get item IDs for batch queries - let item_ids: Vec = items.iter().filter_map(|item| item.id).collect(); - - // Get tags and metadata for all items - let tags_map = db::get_tags_for_items(&mut *conn, &item_ids) - .map_err(|e| { - warn!("Failed to get tags for items: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; - let meta_map = db::get_meta_for_items(&mut *conn, &item_ids) - .map_err(|e| { - warn!("Failed to get metadata for items: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - let item_infos: Vec = items + let items_with_meta: Vec<_> = items_with_meta.into_iter().skip(start).take(count).collect(); + + let item_infos: Vec = items_with_meta .into_iter() - .map(|item| { - let item_id = item.id.unwrap_or(0); - let item_tags = tags_map.get(&item_id) - .map(|tags| tags.iter().map(|t| t.name.clone()).collect()) - .unwrap_or_default(); - let item_meta = meta_map.get(&item_id) - .cloned() - .unwrap_or_default(); - + .map(|item_with_meta| { + let item_id = item_with_meta.item.id.unwrap_or(0); + let item_tags = item_with_meta.tags.into_iter().map(|t| t.name).collect(); + let item_meta = item_with_meta.meta_as_map(); + ItemInfo { id: item_id, - ts: item.ts.to_rfc3339(), - size: item.size, - compression: item.compression, + ts: item_with_meta.item.ts.to_rfc3339(), + size: item_with_meta.item.size, + compression: item_with_meta.item.compression, tags: item_tags, metadata: item_meta, } }) .collect(); - + let response = ApiResponse { success: true, data: Some(item_infos), error: None, }; - + Ok(Json(response)) } @@ -187,45 +158,46 @@ pub async fn handle_get_item_latest( State(state): State, Query(params): Query, ) -> Result>, StatusCode> { - - let mut conn = state.db.lock().await; - - let item = if let Some(tags_str) = params.tags { - let tags: Vec = tags_str.split(',').map(|t| t.trim().to_string()).collect(); - db::get_item_matching(&mut *conn, &tags, &HashMap::new()) - .map_err(|e| { - warn!("Failed to get item matching tags {:?} for content: {}", tags, e); - StatusCode::INTERNAL_SERVER_ERROR - })? - } else { - db::get_item_last(&mut *conn).map_err(|e| { - warn!("Failed to get last item for content: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })? - }; - - if let Some(item) = item { - match get_item_content_info(&item, &state.data_dir, &mut *conn, params.allow_binary).await { - Ok(content_info) => { - let response = ApiResponse { - success: true, - data: Some(content_info), - error: None, - }; - Ok(Json(response)) - } - Err(e) => { - warn!("Failed to get content for item {}: {}", item.id.unwrap_or(0), e); - let response = ApiResponse:: { - success: false, - data: None, - error: Some(format!("Failed to retrieve content: {}", e)), - }; - Ok(Json(response)) + let tags: Vec = params + .tags + .as_ref() + .map(|s| s.split(',').map(|t| t.trim().to_string()).collect()) + .unwrap_or_default(); + + let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone()); + + let item_with_meta = item_service + .find_item(vec![], tags, HashMap::new()) + .await; + + match item_with_meta { + Ok(item) => { + let item_id = item.item.id.unwrap(); + match get_item_content_info(&item_service, item_id, params.allow_binary).await { + Ok(content_info) => { + let response = ApiResponse { + success: true, + data: Some(content_info), + error: None, + }; + Ok(Json(response)) + } + Err(e) => { + warn!("Failed to get content for item {}: {}", item_id, e); + let response = ApiResponse:: { + success: false, + data: None, + error: Some(format!("Failed to retrieve content: {}", e)), + }; + Ok(Json(response)) + } } } - } else { - Err(StatusCode::NOT_FOUND) + Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND), + Err(e) => { + warn!("Failed to find latest item: {}", e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } } } @@ -260,34 +232,33 @@ pub async fn handle_get_item( if item_id <= 0 { return Err(StatusCode::BAD_REQUEST); } - - let mut conn = state.db.lock().await; - - if let Some(item) = db::get_item(&mut *conn, item_id).map_err(|e| { - warn!("Failed to get item {} for content: {}", item_id, e); - StatusCode::INTERNAL_SERVER_ERROR - })? { - match get_item_content_info(&item, &state.data_dir, &mut *conn, params.allow_binary).await { - Ok(content_info) => { - let response = ApiResponse { - success: true, - data: Some(content_info), - error: None, - }; - Ok(Json(response)) - } - Err(e) => { - warn!("Failed to get content for item {}: {}", item_id, e); - let response = ApiResponse:: { - success: false, - data: None, - error: Some(format!("Failed to retrieve content: {}", e)), - }; - Ok(Json(response)) - } + + let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone()); + + match get_item_content_info(&item_service, item_id, params.allow_binary).await { + Ok(content_info) => { + let response = ApiResponse { + success: true, + data: Some(content_info), + error: None, + }; + Ok(Json(response)) + } + Err(e) => { + warn!("Failed to get content for item {}: {}", item_id, e); + // Check if the error is ItemNotFound to return 404 + if let Some(core_err) = e.downcast_ref::() { + if matches!(core_err, CoreError::ItemNotFound(_)) { + return Err(StatusCode::NOT_FOUND); + } + } + let response = ApiResponse:: { + success: false, + data: None, + error: Some(format!("Failed to retrieve content: {}", e)), + }; + Ok(Json(response)) } - } else { - Err(StatusCode::NOT_FOUND) } } @@ -315,40 +286,41 @@ pub async fn handle_get_item_latest_content( State(state): State, Query(params): Query, ) -> Result { - - let mut conn = state.db.lock().await; - - let item = if let Some(tags_str) = params.tags { - let tags: Vec = tags_str.split(',').map(|t| t.trim().to_string()).collect(); - db::get_item_matching(&mut *conn, &tags, &HashMap::new()) - .map_err(|e| { - warn!("Failed to get item matching tags {:?} for content: {}", tags, e); - StatusCode::INTERNAL_SERVER_ERROR - })? - } else { - db::get_item_last(&mut *conn).map_err(|e| { - warn!("Failed to get last item for content: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })? - }; - - if let Some(item) = item { - match get_item_raw_content(&item, &state.data_dir, &mut *conn).await { - Ok((content, mime_type)) => { - let mut response = content.into_response(); - response.headers_mut().insert( - header::CONTENT_TYPE, - mime_type.parse().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - ); - Ok(response) - } - Err(e) => { - warn!("Failed to get raw content for item {}: {}", item.id.unwrap_or(0), e); - Err(StatusCode::INTERNAL_SERVER_ERROR) + let tags: Vec = params + .tags + .as_ref() + .map(|s| s.split(',').map(|t| t.trim().to_string()).collect()) + .unwrap_or_default(); + + let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone()); + + let item_with_meta = item_service + .find_item(vec![], tags, HashMap::new()) + .await; + + match item_with_meta { + Ok(item) => { + let item_id = item.item.id.unwrap(); + match get_item_raw_content(&item_service, item_id).await { + Ok((content, mime_type)) => { + let mut response = content.into_response(); + response.headers_mut().insert( + header::CONTENT_TYPE, + mime_type.parse().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + ); + Ok(response) + } + Err(e) => { + warn!("Failed to get raw content for item {}: {}", item_id, e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } } } - } else { - Err(StatusCode::NOT_FOUND) + Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND), + Err(e) => { + warn!("Failed to find latest item for content: {}", e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } } } @@ -381,96 +353,46 @@ pub async fn handle_get_item_content( if item_id <= 0 { return Err(StatusCode::BAD_REQUEST); } - - let mut conn = state.db.lock().await; - - if let Some(item) = db::get_item(&mut *conn, item_id).map_err(|e| { - warn!("Failed to get item {} for content: {}", item_id, e); - StatusCode::INTERNAL_SERVER_ERROR - })? { - match get_item_raw_content(&item, &state.data_dir, &mut *conn).await { - Ok((content, mime_type)) => { - let mut response = content.into_response(); - response.headers_mut().insert( - header::CONTENT_TYPE, - mime_type.parse().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - ); - Ok(response) - } - Err(e) => { - warn!("Failed to get raw content for item {}: {}", item_id, e); - Err(StatusCode::INTERNAL_SERVER_ERROR) - } + + let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone()); + + match get_item_raw_content(&item_service, item_id).await { + Ok((content, mime_type)) => { + let mut response = content.into_response(); + response.headers_mut().insert( + header::CONTENT_TYPE, + mime_type.parse().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + ); + Ok(response) + } + Err(e) => { + if let Some(core_err) = e.downcast_ref::() { + if matches!(core_err, CoreError::ItemNotFound(_)) { + return Err(StatusCode::NOT_FOUND); + } + } + warn!("Failed to get raw content for item {}: {}", item_id, e); + Err(StatusCode::INTERNAL_SERVER_ERROR) } - } else { - Err(StatusCode::NOT_FOUND) } } -async fn get_item_content(item: &db::Item, data_dir: &PathBuf) -> Result { - let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?; - - // Validate that item ID is positive to prevent path traversal issues - if item_id <= 0 { - return Err(anyhow!("Invalid item ID: {}", item_id)); - } - - let mut item_path = data_dir.clone(); - item_path.push(item_id.to_string()); - - let compression_type = CompressionType::from_str(&item.compression)?; - let compression_engine = get_compression_engine(compression_type)?; - - // Read the content using the compression engine - let mut reader = compression_engine.open(item_path)?; - let mut content = String::new(); - reader.read_to_string(&mut content)?; - - Ok(content) -} - -async fn get_item_content_info(item: &db::Item, data_dir: &PathBuf, conn: &mut rusqlite::Connection, allow_binary: bool) -> Result { - let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?; - - // Validate that item ID is positive to prevent path traversal issues - if item_id <= 0 { - return Err(anyhow!("Invalid item ID: {}", item_id)); - } - - // Get metadata - let meta_entries = db::get_item_meta(conn, item) - .map_err(|e| anyhow!("Failed to get metadata: {}", e))?; - - let metadata: HashMap = meta_entries - .iter() - .map(|m| (m.name.clone(), m.value.clone())) - .collect(); +async fn get_item_content_info(service: &AsyncItemService, item_id: i64, allow_binary: bool) -> anyhow::Result { + let item_with_content = service.get_item_content(item_id).await.map_err(anyhow::Error::from)?; + let metadata = item_with_content.item_with_meta.meta_as_map(); // Determine if content is binary let is_binary = if let Some(binary_meta) = metadata.get("binary") { binary_meta == "true" } else { - // Fall back to checking the actual content - let mut item_path = data_dir.clone(); - item_path.push(item_id.to_string()); - - let compression_type = CompressionType::from_str(&item.compression)?; - let compression_engine = get_compression_engine(compression_type)?; - - let mut reader = compression_engine.open(item_path)?; - let mut buffer = [0u8; 8192]; // Read first 8KB to check - let bytes_read = reader.read(&mut buffer)?; - is_binary(&buffer[..bytes_read]) + is_binary(&item_with_content.content) }; // Get content if not binary or if binary is allowed let content = if is_binary && !allow_binary { None } else { - match get_item_content(item, data_dir).await { - Ok(content_str) => Some(content_str), - Err(_) => None, // If we can't read as string, treat as binary - } + Some(String::from_utf8_lossy(&item_with_content.content).to_string()) }; Ok(ItemContentInfo { @@ -480,35 +402,16 @@ async fn get_item_content_info(item: &db::Item, data_dir: &PathBuf, conn: &mut r }) } -async fn get_item_raw_content(item: &db::Item, data_dir: &PathBuf, conn: &mut rusqlite::Connection) -> Result<(Vec, String)> { - let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?; - - // Validate that item ID is positive to prevent path traversal issues - if item_id <= 0 { - return Err(anyhow!("Invalid item ID: {}", item_id)); - } - - let mut item_path = data_dir.clone(); - item_path.push(item_id.to_string()); - - let compression_type = CompressionType::from_str(&item.compression)?; - let compression_engine = get_compression_engine(compression_type)?; - - // Read the raw content using the compression engine - let mut reader = compression_engine.open(item_path)?; - let mut content = Vec::new(); - reader.read_to_end(&mut content)?; - - // Get MIME type from metadata - let meta_entries = db::get_item_meta(conn, item) - .map_err(|e| anyhow!("Failed to get metadata: {}", e))?; - - let mime_type = meta_entries - .iter() - .find(|m| m.name == "mime_type") - .map(|m| m.value.clone()) +async fn get_item_raw_content(service: &AsyncItemService, item_id: i64) -> anyhow::Result<(Vec, String)> { + let item_with_content = service.get_item_content(item_id).await.map_err(anyhow::Error::from)?; + let content = item_with_content.content; + let metadata = item_with_content.item_with_meta.meta_as_map(); + + let mime_type = metadata + .get("mime_type") + .map(|s| s.to_string()) .unwrap_or_else(|| "application/octet-stream".to_string()); - + Ok((content, mime_type)) } @@ -536,42 +439,31 @@ pub async fn handle_get_item_latest_meta( State(state): State, Query(params): Query, ) -> Result>>, StatusCode> { - - let mut conn = state.db.lock().await; - - let item = if let Some(tags_str) = params.tags { - let tags: Vec = tags_str.split(',').map(|t| t.trim().to_string()).collect(); - db::get_item_matching(&mut *conn, &tags, &HashMap::new()) - .map_err(|e| { - warn!("Failed to get item matching tags {:?} for meta: {}", tags, e); - StatusCode::INTERNAL_SERVER_ERROR - })? - } else { - db::get_item_last(&mut *conn).map_err(|e| { - warn!("Failed to get last item for meta: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })? - }; - - if let Some(item) = item { - let item_meta = db::get_item_meta(&mut *conn, &item) - .map_err(|e| { - warn!("Failed to get metadata for item {}: {}", item.id.unwrap_or(0), e); - StatusCode::INTERNAL_SERVER_ERROR - })? - .into_iter() - .map(|m| (m.name, m.value)) - .collect(); - - let response = ApiResponse { - success: true, - data: Some(item_meta), - error: None, - }; - - Ok(Json(response)) - } else { - Err(StatusCode::NOT_FOUND) + let tags: Vec = params + .tags + .as_ref() + .map(|s| s.split(',').map(|t| t.trim().to_string()).collect()) + .unwrap_or_default(); + + let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone()); + + match item_service.find_item(vec![], tags, HashMap::new()).await { + Ok(item_with_meta) => { + let item_meta = item_with_meta.meta_as_map(); + + let response = ApiResponse { + success: true, + data: Some(item_meta), + error: None, + }; + + Ok(Json(response)) + } + Err(CoreError::ItemNotFoundGeneric) => Err(StatusCode::NOT_FOUND), + Err(e) => { + warn!("Failed to get latest item for meta: {}", e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } } } @@ -600,31 +492,25 @@ pub async fn handle_get_item_meta( State(state): State, Path(item_id): Path, ) -> Result>>, StatusCode> { - - let mut conn = state.db.lock().await; - - if let Some(item) = db::get_item(&mut *conn, item_id).map_err(|e| { - warn!("Failed to get item {} for meta: {}", item_id, e); - StatusCode::INTERNAL_SERVER_ERROR - })? { - let item_meta = db::get_item_meta(&mut *conn, &item) - .map_err(|e| { - warn!("Failed to get metadata for item {}: {}", item_id, e); - StatusCode::INTERNAL_SERVER_ERROR - })? - .into_iter() - .map(|m| (m.name, m.value)) - .collect(); - - let response = ApiResponse { - success: true, - data: Some(item_meta), - error: None, - }; - - Ok(Json(response)) - } else { - Err(StatusCode::NOT_FOUND) + let item_service = AsyncItemService::new(state.data_dir.clone(), state.db.clone()); + + match item_service.get_item(item_id).await { + Ok(item_with_meta) => { + let item_meta = item_with_meta.meta_as_map(); + + let response = ApiResponse { + success: true, + data: Some(item_meta), + error: None, + }; + + Ok(Json(response)) + } + Err(CoreError::ItemNotFound(_)) => Err(StatusCode::NOT_FOUND), + Err(e) => { + warn!("Failed to get item {} for meta: {}", item_id, e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } } } diff --git a/src/modes/server/mcp/tools.rs b/src/modes/server/mcp/tools.rs index 310a059..e8214f9 100644 --- a/src/modes/server/mcp/tools.rs +++ b/src/modes/server/mcp/tools.rs @@ -6,6 +6,8 @@ use std::str::FromStr; use log::{debug, warn}; use crate::modes::server::common::AppState; +use crate::core::async_item_service::AsyncItemService; +use crate::core::error::CoreError; use crate::db; use crate::compression_engine::{CompressionType, get_compression_engine}; use crate::meta_plugin::{MetaPluginType, get_meta_plugin}; @@ -136,34 +138,27 @@ impl KeepTools { .and_then(|v| v.as_i64()) .ok_or_else(|| ToolError::InvalidArguments("Missing or invalid 'id' field".to_string()))?; - let mut conn = self.state.db.lock().await; - - let item = db::get_item(&mut *conn, item_id)? - .ok_or_else(|| ToolError::InvalidArguments(format!("Item {} not found", item_id)))?; - - // Get content - let mut item_path = self.state.data_dir.clone(); - item_path.push(item_id.to_string()); - - let compression_type = CompressionType::from_str(&item.compression)?; - let compression_engine = get_compression_engine(compression_type)?; - - let mut reader = compression_engine.open(item_path)?; - let mut content = String::new(); - reader.read_to_string(&mut content)?; - - // Get metadata and tags - let tags = db::get_item_tags(&mut *conn, &item)?; - let metadata = db::get_item_meta(&mut *conn, &item)?; - + let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone()); + + let item_with_content = match service.get_item_content(item_id).await { + Ok(iwc) => iwc, + Err(CoreError::ItemNotFound(_)) => return Err(ToolError::InvalidArguments(format!("Item {} not found", item_id))), + Err(e) => return Err(ToolError::Other(anyhow!(e))), + }; + + let item = item_with_content.item_with_meta.item; + let content = String::from_utf8_lossy(&item_with_content.content).to_string(); + let tags: Vec = item_with_content.item_with_meta.tags.into_iter().map(|t| t.name).collect(); + let metadata = item_with_content.item_with_meta.meta_as_map(); + let response = serde_json::json!({ "id": item_id, "content": content, "timestamp": item.ts.to_rfc3339(), "size": item.size, "compression": item.compression, - "tags": tags.iter().map(|t| &t.name).collect::>(), - "metadata": metadata.iter().map(|m| (&m.name, &m.value)).collect::>() + "tags": tags, + "metadata": metadata, }); Ok(serde_json::to_string_pretty(&response)?) @@ -176,40 +171,30 @@ impl KeepTools { .map(|arr| arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect()) .unwrap_or_default(); - let mut conn = self.state.db.lock().await; - - let item = if tags.is_empty() { - db::get_item_last(&mut *conn)? - } else { - db::get_item_matching(&mut *conn, &tags, &HashMap::new())? + let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone()); + + let item_with_meta = match service.find_item(vec![], tags, HashMap::new()).await { + Ok(iwm) => iwm, + Err(CoreError::ItemNotFoundGeneric) => return Err(ToolError::InvalidArguments("No items found".to_string())), + Err(e) => return Err(ToolError::Other(anyhow!(e))), }; - let item = item.ok_or_else(|| ToolError::InvalidArguments("No items found".to_string()))?; - let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?; - - // Get content - let mut item_path = self.state.data_dir.clone(); - item_path.push(item_id.to_string()); - - let compression_type = CompressionType::from_str(&item.compression)?; - let compression_engine = get_compression_engine(compression_type)?; - - let mut reader = compression_engine.open(item_path)?; - let mut content = String::new(); - reader.read_to_string(&mut content)?; - - // Get metadata and tags - let tags = db::get_item_tags(&mut *conn, &item)?; - let metadata = db::get_item_meta(&mut *conn, &item)?; + let item_id = item_with_meta.item.id.ok_or_else(|| anyhow!("Item missing ID after find"))?; + let item_with_content = service.get_item_content(item_id).await.map_err(|e| ToolError::Other(anyhow!(e)))?; + let item = item_with_content.item_with_meta.item; + let content = String::from_utf8_lossy(&item_with_content.content).to_string(); + let tags: Vec = item_with_content.item_with_meta.tags.into_iter().map(|t| t.name).collect(); + let metadata = item_with_content.item_with_meta.meta_as_map(); + let response = serde_json::json!({ "id": item_id, "content": content, "timestamp": item.ts.to_rfc3339(), "size": item.size, "compression": item.compression, - "tags": tags.iter().map(|t| &t.name).collect::>(), - "metadata": metadata.iter().map(|m| (&m.name, &m.value)).collect::>() + "tags": tags, + "metadata": metadata, }); Ok(serde_json::to_string_pretty(&response)?) @@ -233,36 +218,20 @@ impl KeepTools { .and_then(|v| v.as_u64()) .unwrap_or(0) as usize; - let mut conn = self.state.db.lock().await; - - let items = if tags.is_empty() { - db::get_items(&mut *conn)? - } else { - db::get_items_matching(&mut *conn, &tags, &HashMap::new())? - }; - + let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone()); + let mut items_with_meta = service.list_items(tags, HashMap::new()).await.map_err(|e| ToolError::Other(anyhow!(e)))?; + // Sort by timestamp (newest first) and apply pagination - let mut items = items; - items.sort_by(|a, b| b.ts.cmp(&a.ts)); - let items: Vec<_> = items.into_iter().skip(offset).take(limit).collect(); + items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts)); + let items_with_meta: Vec<_> = items_with_meta.into_iter().skip(offset).take(limit).collect(); - // Get item IDs for batch queries - let item_ids: Vec = items.iter().filter_map(|item| item.id).collect(); - - // Get tags and metadata for all items - let tags_map = db::get_tags_for_items(&mut *conn, &item_ids)?; - let meta_map = db::get_meta_for_items(&mut *conn, &item_ids)?; - - let items_info: Vec<_> = items + let items_info: Vec<_> = items_with_meta .into_iter() - .map(|item| { + .map(|item_with_meta| { + let item = item_with_meta.item; let item_id = item.id.unwrap_or(0); - let item_tags = tags_map.get(&item_id) - .map(|tags| tags.iter().map(|t| &t.name).collect::>()) - .unwrap_or_default(); - let item_meta = meta_map.get(&item_id) - .cloned() - .unwrap_or_default(); + let item_tags: Vec = item_with_meta.tags.into_iter().map(|t| t.name).collect(); + let item_meta = item_with_meta.meta_as_map(); serde_json::json!({ "id": item_id, @@ -302,31 +271,19 @@ impl KeepTools { }).collect()) .unwrap_or_default(); - let mut conn = self.state.db.lock().await; - - let items = db::get_items_matching(&mut *conn, &tags, &metadata)?; - + let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone()); + let mut items_with_meta = service.list_items(tags.clone(), metadata.clone()).await.map_err(|e| ToolError::Other(anyhow!(e)))?; + // Sort by timestamp (newest first) - let mut items = items; - items.sort_by(|a, b| b.ts.cmp(&a.ts)); + items_with_meta.sort_by(|a, b| b.item.ts.cmp(&a.item.ts)); - // Get item IDs for batch queries - let item_ids: Vec = items.iter().filter_map(|item| item.id).collect(); - - // Get tags and metadata for all items - let tags_map = db::get_tags_for_items(&mut *conn, &item_ids)?; - let meta_map = db::get_meta_for_items(&mut *conn, &item_ids)?; - - let items_info: Vec<_> = items + let items_info: Vec<_> = items_with_meta .into_iter() - .map(|item| { + .map(|item_with_meta| { + let item = item_with_meta.item; let item_id = item.id.unwrap_or(0); - let item_tags = tags_map.get(&item_id) - .map(|tags| tags.iter().map(|t| &t.name).collect::>()) - .unwrap_or_default(); - let item_meta = meta_map.get(&item_id) - .cloned() - .unwrap_or_default(); + let item_tags: Vec = item_with_meta.tags.into_iter().map(|t| t.name).collect(); + let item_meta = item_with_meta.meta_as_map(); serde_json::json!({ "id": item_id,