From afe23aaa403b23461c9022f9a05fa379d04433e7 Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Mon, 25 Aug 2025 12:48:10 -0300 Subject: [PATCH] feat: add save_item_from_mcp functionality to core services Co-authored-by: aider (openai/andrew/openrouter/google/gemini-2.5-pro) --- PLAN.md | 18 ++--- src/core/async_item_service.rs | 17 +++++ src/core/item_service.rs | 66 ++++++++++++++++++ src/modes/server/mcp/tools.rs | 119 ++++++++++----------------------- 4 files changed, 128 insertions(+), 92 deletions(-) diff --git a/PLAN.md b/PLAN.md index f1c777d..5f6fcdd 100644 --- a/PLAN.md +++ b/PLAN.md @@ -8,7 +8,7 @@ - [x] 5. Add async wrappers for API use - [x] 6. Refactor CLI modes to use services (DONE) - [x] 7. Refactor REST API to use async services -- [ ] 8. Refactor MCP tools to use services +- [x] 8. Refactor MCP tools to use services - [x] 9. Create unified error handling - [ ] 10. Add integration tests - [ ] 11. Add performance optimization guidelines (partially done) @@ -155,22 +155,22 @@ - Use common error handling with conversions to HTTP responses - Wrap synchronous service calls in `tokio::task::spawn_blocking` -## 8. Refactor MCP Tools to Use Services +## 8. Refactor MCP Tools to Use Services (DONE) **Files:** -- Change: `src/modes/server/mcp/tools.rs` +- Change: `src/modes/server/mcp/tools.rs` (DONE) **Functions:** -- Change: `save_item` to use `item_service::save_item` -- Change: `get_item` to use `item_service::get_item_full` -- Change: `get_latest_item` to use `item_service::get_latest_item` -- Change: `list_items` to use `item_service::list_items` -- Change: `search_items` to use `item_service::search_items` +- Change: `save_item` to use `item_service` (DONE) +- Change: `get_item` to use `async_item_service` (DONE) +- Change: `get_latest_item` to use `async_item_service` (DONE) +- Change: `list_items` to use `async_item_service` (DONE) +- Change: `search_items` to use `async_item_service` (DONE) **Reason:** Remove duplication with REST API and CLI modes **Implementation:** - Replace current implementation with calls to core services - Keep only MCP protocol-specific logic -- Use synchronous services directly (MCP is typically local/short-lived) +- Use `async_item_service` wrappers for database operations - Standardize response format to match API/CLI ## 9. Create Unified Error Handling (DONE) diff --git a/src/core/async_item_service.rs b/src/core/async_item_service.rs index 3ee1b9e..b8be12d 100644 --- a/src/core/async_item_service.rs +++ b/src/core/async_item_service.rs @@ -90,4 +90,21 @@ impl AsyncItemService { .await .unwrap() } + + pub async fn save_item_from_mcp( + &self, + content: Vec, + tags: Vec, + metadata: HashMap, + ) -> Result { + let data_path = self.data_path.clone(); + let mut conn = self.db.lock().await; + + tokio::task::spawn_blocking(move || { + let item_service = ItemService::new(data_path); + item_service.save_item_from_mcp(&content, &tags, &metadata, &mut conn) + }) + .await + .unwrap() + } } diff --git a/src/core/item_service.rs b/src/core/item_service.rs index ceec086..df76273 100644 --- a/src/core/item_service.rs +++ b/src/core/item_service.rs @@ -3,6 +3,7 @@ use crate::core::compression_service::CompressionService; use crate::core::error::CoreError; use crate::core::meta_service::MetaService; use crate::core::types::{ItemWithContent, ItemWithMeta}; +use crate::meta_plugin::{get_meta_plugin, MetaPlugin, MetaPluginType}; use crate::db::{self, Meta}; use crate::compression_engine::{get_compression_engine, CompressionType}; use crate::modes::common::settings_compression_type; @@ -194,4 +195,69 @@ impl ItemService { self.get_item(conn, item_id) } + + pub fn save_item_from_mcp( + &self, + content: &[u8], + tags: &Vec, + metadata: &HashMap, + conn: &mut Connection, + ) -> Result { + let compression_type = CompressionType::LZ4; + let compression_engine = get_compression_engine(compression_type.clone())?; + + let tx = conn.transaction()?; + let item_id; + let mut item; + + { + item = db::create_item(&tx, compression_type.clone())?; + item_id = item.id.unwrap(); + + // Add tags + for tag in tags { + db::add_tag(&tx, item_id, tag)?; + } + + // Add custom metadata + for (key, value) in metadata { + db::add_meta(&tx, item_id, key, value)?; + } + } + + let mut item_path = self.data_path.clone(); + item_path.push(item_id.to_string()); + + let mut writer = compression_engine.create(item_path.clone())?; + writer.write_all(content)?; + drop(writer); + + let plugin_types = vec![ + MetaPluginType::FileMime, + MetaPluginType::FileEncoding, + MetaPluginType::Binary, + MetaPluginType::LineCount, + MetaPluginType::WordCount, + MetaPluginType::DigestSha256, + MetaPluginType::Uid, + MetaPluginType::User, + MetaPluginType::Hostname, + ]; + + let mut plugins: Vec> = + plugin_types.iter().map(|p| get_meta_plugin(p.clone())).collect(); + + self.meta_service + .initialize_plugins(&mut plugins, &tx, item_id); + self.meta_service + .process_chunk(&mut plugins, content, &tx); + self.meta_service.finalize_plugins(&mut plugins, &tx); + + item.size = Some(content.len() as i64); + db::update_item(&tx, item.clone())?; + + tx.commit()?; + + self.get_item(conn, item_id) + } } diff --git a/src/modes/server/mcp/tools.rs b/src/modes/server/mcp/tools.rs index e8214f9..8c94aaf 100644 --- a/src/modes/server/mcp/tools.rs +++ b/src/modes/server/mcp/tools.rs @@ -1,16 +1,12 @@ use anyhow::{Result, anyhow}; use serde_json::Value; use std::collections::HashMap; -use std::io::{Write, Read}; use std::str::FromStr; use log::{debug, warn}; use crate::modes::server::common::AppState; use crate::core::async_item_service::AsyncItemService; use crate::core::error::CoreError; -use crate::db; -use crate::compression_engine::{CompressionType, get_compression_engine}; -use crate::meta_plugin::{MetaPluginType, get_meta_plugin}; #[derive(Debug, thiserror::Error)] pub enum ToolError { @@ -41,93 +37,50 @@ impl KeepTools { pub async fn save_item(&self, args: Option) -> Result { let args = args.ok_or_else(|| ToolError::InvalidArguments("Missing arguments".to_string()))?; - - let content = args.get("content") + + let content = args + .get("content") .and_then(|v| v.as_str()) .ok_or_else(|| ToolError::InvalidArguments("Missing 'content' field".to_string()))?; - - let tags: Vec = args.get("tags") + + let tags: Vec = args + .get("tags") .and_then(|v| v.as_array()) - .map(|arr| arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect() + }) .unwrap_or_default(); - - let metadata: HashMap = args.get("metadata") + + let metadata: HashMap = args + .get("metadata") .and_then(|v| v.as_object()) - .map(|obj| obj.iter().filter_map(|(k, v)| { - v.as_str().map(|s| (k.clone(), s.to_string())) - }).collect()) + .map(|obj| { + obj.iter() + .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string()))) + .collect() + }) .unwrap_or_default(); - debug!("MCP: Saving item with {} bytes, {} tags, {} metadata entries", - content.len(), tags.len(), metadata.len()); + debug!( + "MCP: Saving item with {} bytes, {} tags, {} metadata entries", + content.len(), + tags.len(), + metadata.len() + ); + + let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone()); + let item_with_meta = service + .save_item_from_mcp(content.as_bytes().to_vec(), tags, metadata) + .await + .map_err(|e| ToolError::Other(anyhow!(e)))?; + + let item_id = item_with_meta + .item + .id + .ok_or_else(|| anyhow!("Failed to get item ID"))?; - let mut conn = self.state.db.lock().await; - - // Create new item - let item = db::create_item(&mut *conn, CompressionType::LZ4)?; - let item_id = item.id.ok_or_else(|| anyhow!("Failed to get item ID"))?; - - // Save content to file - let mut item_path = self.state.data_dir.clone(); - item_path.push(item_id.to_string()); - - let compression_engine = get_compression_engine(CompressionType::LZ4)?; - let mut writer = compression_engine.create(item_path)?; - writer.write_all(content.as_bytes())?; - drop(writer); // Ensure file is closed - - // Add tags - for tag in &tags { - db::add_tag(&mut *conn, item_id, tag)?; - } - - // Add custom metadata - for (key, value) in &metadata { - db::add_meta(&mut *conn, item_id, key, value)?; - } - - // Run metadata plugins - let meta_plugins = vec![ - MetaPluginType::FileMime, - MetaPluginType::FileEncoding, - MetaPluginType::Binary, - MetaPluginType::LineCount, - MetaPluginType::WordCount, - MetaPluginType::DigestSha256, - MetaPluginType::Uid, - MetaPluginType::User, - MetaPluginType::Hostname, - ]; - - for plugin_type in meta_plugins { - let plugin_type_clone = plugin_type.clone(); - let mut plugin = get_meta_plugin(plugin_type); - if plugin.is_supported() { - if let Err(e) = plugin.initialize(&*conn, item_id) { - warn!("Failed to initialize plugin {:?}: {}", plugin_type_clone, e); - continue; - } - - let mut item_path = self.state.data_dir.clone(); - item_path.push(item_id.to_string()); - - // Process the file content through the plugin - let mut item_path = self.state.data_dir.clone(); - item_path.push(item_id.to_string()); - - let compression_engine = get_compression_engine(CompressionType::LZ4)?; - let mut reader = compression_engine.open(item_path)?; - let mut buffer = Vec::new(); - reader.read_to_end(&mut buffer)?; - - plugin.update(&buffer, &*conn); - - if let Err(e) = plugin.finalize(&*conn) { - warn!("Failed to finalize plugin {:?}: {}", plugin_type_clone, e); - } - } - } - Ok(format!("Successfully saved item with ID: {}", item_id)) }