feat: add save_item_from_mcp functionality to core services
Co-authored-by: aider (openai/andrew/openrouter/google/gemini-2.5-pro) <aider@aider.chat>
This commit is contained in:
18
PLAN.md
18
PLAN.md
@@ -8,7 +8,7 @@
|
|||||||
- [x] 5. Add async wrappers for API use
|
- [x] 5. Add async wrappers for API use
|
||||||
- [x] 6. Refactor CLI modes to use services (DONE)
|
- [x] 6. Refactor CLI modes to use services (DONE)
|
||||||
- [x] 7. Refactor REST API to use async services
|
- [x] 7. Refactor REST API to use async services
|
||||||
- [ ] 8. Refactor MCP tools to use services
|
- [x] 8. Refactor MCP tools to use services
|
||||||
- [x] 9. Create unified error handling
|
- [x] 9. Create unified error handling
|
||||||
- [ ] 10. Add integration tests
|
- [ ] 10. Add integration tests
|
||||||
- [ ] 11. Add performance optimization guidelines (partially done)
|
- [ ] 11. Add performance optimization guidelines (partially done)
|
||||||
@@ -155,22 +155,22 @@
|
|||||||
- Use common error handling with conversions to HTTP responses
|
- Use common error handling with conversions to HTTP responses
|
||||||
- Wrap synchronous service calls in `tokio::task::spawn_blocking`
|
- Wrap synchronous service calls in `tokio::task::spawn_blocking`
|
||||||
|
|
||||||
## 8. Refactor MCP Tools to Use Services
|
## 8. Refactor MCP Tools to Use Services (DONE)
|
||||||
**Files:**
|
**Files:**
|
||||||
- Change: `src/modes/server/mcp/tools.rs`
|
- Change: `src/modes/server/mcp/tools.rs` (DONE)
|
||||||
|
|
||||||
**Functions:**
|
**Functions:**
|
||||||
- Change: `save_item` to use `item_service::save_item`
|
- Change: `save_item` to use `item_service` (DONE)
|
||||||
- Change: `get_item` to use `item_service::get_item_full`
|
- Change: `get_item` to use `async_item_service` (DONE)
|
||||||
- Change: `get_latest_item` to use `item_service::get_latest_item`
|
- Change: `get_latest_item` to use `async_item_service` (DONE)
|
||||||
- Change: `list_items` to use `item_service::list_items`
|
- Change: `list_items` to use `async_item_service` (DONE)
|
||||||
- Change: `search_items` to use `item_service::search_items`
|
- Change: `search_items` to use `async_item_service` (DONE)
|
||||||
|
|
||||||
**Reason:** Remove duplication with REST API and CLI modes
|
**Reason:** Remove duplication with REST API and CLI modes
|
||||||
**Implementation:**
|
**Implementation:**
|
||||||
- Replace current implementation with calls to core services
|
- Replace current implementation with calls to core services
|
||||||
- Keep only MCP protocol-specific logic
|
- Keep only MCP protocol-specific logic
|
||||||
- Use synchronous services directly (MCP is typically local/short-lived)
|
- Use `async_item_service` wrappers for database operations
|
||||||
- Standardize response format to match API/CLI
|
- Standardize response format to match API/CLI
|
||||||
|
|
||||||
## 9. Create Unified Error Handling (DONE)
|
## 9. Create Unified Error Handling (DONE)
|
||||||
|
|||||||
@@ -90,4 +90,21 @@ impl AsyncItemService {
|
|||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn save_item_from_mcp(
|
||||||
|
&self,
|
||||||
|
content: Vec<u8>,
|
||||||
|
tags: Vec<String>,
|
||||||
|
metadata: HashMap<String, String>,
|
||||||
|
) -> Result<ItemWithMeta, CoreError> {
|
||||||
|
let data_path = self.data_path.clone();
|
||||||
|
let mut conn = self.db.lock().await;
|
||||||
|
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
let item_service = ItemService::new(data_path);
|
||||||
|
item_service.save_item_from_mcp(&content, &tags, &metadata, &mut conn)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ use crate::core::compression_service::CompressionService;
|
|||||||
use crate::core::error::CoreError;
|
use crate::core::error::CoreError;
|
||||||
use crate::core::meta_service::MetaService;
|
use crate::core::meta_service::MetaService;
|
||||||
use crate::core::types::{ItemWithContent, ItemWithMeta};
|
use crate::core::types::{ItemWithContent, ItemWithMeta};
|
||||||
|
use crate::meta_plugin::{get_meta_plugin, MetaPlugin, MetaPluginType};
|
||||||
use crate::db::{self, Meta};
|
use crate::db::{self, Meta};
|
||||||
use crate::compression_engine::{get_compression_engine, CompressionType};
|
use crate::compression_engine::{get_compression_engine, CompressionType};
|
||||||
use crate::modes::common::settings_compression_type;
|
use crate::modes::common::settings_compression_type;
|
||||||
@@ -194,4 +195,69 @@ impl ItemService {
|
|||||||
|
|
||||||
self.get_item(conn, item_id)
|
self.get_item(conn, item_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn save_item_from_mcp(
|
||||||
|
&self,
|
||||||
|
content: &[u8],
|
||||||
|
tags: &Vec<String>,
|
||||||
|
metadata: &HashMap<String, String>,
|
||||||
|
conn: &mut Connection,
|
||||||
|
) -> Result<ItemWithMeta, CoreError> {
|
||||||
|
let compression_type = CompressionType::LZ4;
|
||||||
|
let compression_engine = get_compression_engine(compression_type.clone())?;
|
||||||
|
|
||||||
|
let tx = conn.transaction()?;
|
||||||
|
let item_id;
|
||||||
|
let mut item;
|
||||||
|
|
||||||
|
{
|
||||||
|
item = db::create_item(&tx, compression_type.clone())?;
|
||||||
|
item_id = item.id.unwrap();
|
||||||
|
|
||||||
|
// Add tags
|
||||||
|
for tag in tags {
|
||||||
|
db::add_tag(&tx, item_id, tag)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add custom metadata
|
||||||
|
for (key, value) in metadata {
|
||||||
|
db::add_meta(&tx, item_id, key, value)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut item_path = self.data_path.clone();
|
||||||
|
item_path.push(item_id.to_string());
|
||||||
|
|
||||||
|
let mut writer = compression_engine.create(item_path.clone())?;
|
||||||
|
writer.write_all(content)?;
|
||||||
|
drop(writer);
|
||||||
|
|
||||||
|
let plugin_types = vec![
|
||||||
|
MetaPluginType::FileMime,
|
||||||
|
MetaPluginType::FileEncoding,
|
||||||
|
MetaPluginType::Binary,
|
||||||
|
MetaPluginType::LineCount,
|
||||||
|
MetaPluginType::WordCount,
|
||||||
|
MetaPluginType::DigestSha256,
|
||||||
|
MetaPluginType::Uid,
|
||||||
|
MetaPluginType::User,
|
||||||
|
MetaPluginType::Hostname,
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut plugins: Vec<Box<dyn MetaPlugin>> =
|
||||||
|
plugin_types.iter().map(|p| get_meta_plugin(p.clone())).collect();
|
||||||
|
|
||||||
|
self.meta_service
|
||||||
|
.initialize_plugins(&mut plugins, &tx, item_id);
|
||||||
|
self.meta_service
|
||||||
|
.process_chunk(&mut plugins, content, &tx);
|
||||||
|
self.meta_service.finalize_plugins(&mut plugins, &tx);
|
||||||
|
|
||||||
|
item.size = Some(content.len() as i64);
|
||||||
|
db::update_item(&tx, item.clone())?;
|
||||||
|
|
||||||
|
tx.commit()?;
|
||||||
|
|
||||||
|
self.get_item(conn, item_id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,16 +1,12 @@
|
|||||||
use anyhow::{Result, anyhow};
|
use anyhow::{Result, anyhow};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::{Write, Read};
|
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use log::{debug, warn};
|
use log::{debug, warn};
|
||||||
|
|
||||||
use crate::modes::server::common::AppState;
|
use crate::modes::server::common::AppState;
|
||||||
use crate::core::async_item_service::AsyncItemService;
|
use crate::core::async_item_service::AsyncItemService;
|
||||||
use crate::core::error::CoreError;
|
use crate::core::error::CoreError;
|
||||||
use crate::db;
|
|
||||||
use crate::compression_engine::{CompressionType, get_compression_engine};
|
|
||||||
use crate::meta_plugin::{MetaPluginType, get_meta_plugin};
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum ToolError {
|
pub enum ToolError {
|
||||||
@@ -42,91 +38,48 @@ impl KeepTools {
|
|||||||
pub async fn save_item(&self, args: Option<Value>) -> Result<String, ToolError> {
|
pub async fn save_item(&self, args: Option<Value>) -> Result<String, ToolError> {
|
||||||
let args = args.ok_or_else(|| ToolError::InvalidArguments("Missing arguments".to_string()))?;
|
let args = args.ok_or_else(|| ToolError::InvalidArguments("Missing arguments".to_string()))?;
|
||||||
|
|
||||||
let content = args.get("content")
|
let content = args
|
||||||
|
.get("content")
|
||||||
.and_then(|v| v.as_str())
|
.and_then(|v| v.as_str())
|
||||||
.ok_or_else(|| ToolError::InvalidArguments("Missing 'content' field".to_string()))?;
|
.ok_or_else(|| ToolError::InvalidArguments("Missing 'content' field".to_string()))?;
|
||||||
|
|
||||||
let tags: Vec<String> = args.get("tags")
|
let tags: Vec<String> = args
|
||||||
|
.get("tags")
|
||||||
.and_then(|v| v.as_array())
|
.and_then(|v| v.as_array())
|
||||||
.map(|arr| arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect())
|
.map(|arr| {
|
||||||
|
arr.iter()
|
||||||
|
.filter_map(|v| v.as_str().map(|s| s.to_string()))
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let metadata: HashMap<String, String> = args.get("metadata")
|
let metadata: HashMap<String, String> = args
|
||||||
|
.get("metadata")
|
||||||
.and_then(|v| v.as_object())
|
.and_then(|v| v.as_object())
|
||||||
.map(|obj| obj.iter().filter_map(|(k, v)| {
|
.map(|obj| {
|
||||||
v.as_str().map(|s| (k.clone(), s.to_string()))
|
obj.iter()
|
||||||
}).collect())
|
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
debug!("MCP: Saving item with {} bytes, {} tags, {} metadata entries",
|
debug!(
|
||||||
content.len(), tags.len(), metadata.len());
|
"MCP: Saving item with {} bytes, {} tags, {} metadata entries",
|
||||||
|
content.len(),
|
||||||
|
tags.len(),
|
||||||
|
metadata.len()
|
||||||
|
);
|
||||||
|
|
||||||
let mut conn = self.state.db.lock().await;
|
let service = AsyncItemService::new(self.state.data_dir.clone(), self.state.db.clone());
|
||||||
|
let item_with_meta = service
|
||||||
|
.save_item_from_mcp(content.as_bytes().to_vec(), tags, metadata)
|
||||||
|
.await
|
||||||
|
.map_err(|e| ToolError::Other(anyhow!(e)))?;
|
||||||
|
|
||||||
// Create new item
|
let item_id = item_with_meta
|
||||||
let item = db::create_item(&mut *conn, CompressionType::LZ4)?;
|
.item
|
||||||
let item_id = item.id.ok_or_else(|| anyhow!("Failed to get item ID"))?;
|
.id
|
||||||
|
.ok_or_else(|| anyhow!("Failed to get item ID"))?;
|
||||||
// Save content to file
|
|
||||||
let mut item_path = self.state.data_dir.clone();
|
|
||||||
item_path.push(item_id.to_string());
|
|
||||||
|
|
||||||
let compression_engine = get_compression_engine(CompressionType::LZ4)?;
|
|
||||||
let mut writer = compression_engine.create(item_path)?;
|
|
||||||
writer.write_all(content.as_bytes())?;
|
|
||||||
drop(writer); // Ensure file is closed
|
|
||||||
|
|
||||||
// Add tags
|
|
||||||
for tag in &tags {
|
|
||||||
db::add_tag(&mut *conn, item_id, tag)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add custom metadata
|
|
||||||
for (key, value) in &metadata {
|
|
||||||
db::add_meta(&mut *conn, item_id, key, value)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run metadata plugins
|
|
||||||
let meta_plugins = vec![
|
|
||||||
MetaPluginType::FileMime,
|
|
||||||
MetaPluginType::FileEncoding,
|
|
||||||
MetaPluginType::Binary,
|
|
||||||
MetaPluginType::LineCount,
|
|
||||||
MetaPluginType::WordCount,
|
|
||||||
MetaPluginType::DigestSha256,
|
|
||||||
MetaPluginType::Uid,
|
|
||||||
MetaPluginType::User,
|
|
||||||
MetaPluginType::Hostname,
|
|
||||||
];
|
|
||||||
|
|
||||||
for plugin_type in meta_plugins {
|
|
||||||
let plugin_type_clone = plugin_type.clone();
|
|
||||||
let mut plugin = get_meta_plugin(plugin_type);
|
|
||||||
if plugin.is_supported() {
|
|
||||||
if let Err(e) = plugin.initialize(&*conn, item_id) {
|
|
||||||
warn!("Failed to initialize plugin {:?}: {}", plugin_type_clone, e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut item_path = self.state.data_dir.clone();
|
|
||||||
item_path.push(item_id.to_string());
|
|
||||||
|
|
||||||
// Process the file content through the plugin
|
|
||||||
let mut item_path = self.state.data_dir.clone();
|
|
||||||
item_path.push(item_id.to_string());
|
|
||||||
|
|
||||||
let compression_engine = get_compression_engine(CompressionType::LZ4)?;
|
|
||||||
let mut reader = compression_engine.open(item_path)?;
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
reader.read_to_end(&mut buffer)?;
|
|
||||||
|
|
||||||
plugin.update(&buffer, &*conn);
|
|
||||||
|
|
||||||
if let Err(e) = plugin.finalize(&*conn) {
|
|
||||||
warn!("Failed to finalize plugin {:?}: {}", plugin_type_clone, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(format!("Successfully saved item with ID: {}", item_id))
|
Ok(format!("Successfully saved item with ID: {}", item_id))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user