diff --git a/Cargo.lock b/Cargo.lock index 350f4ab..665f508 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -327,6 +327,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-link", ] @@ -514,6 +515,41 @@ dependencies = [ "memchr", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.105", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.105", +] + [[package]] name = "derive_arbitrary" version = "1.4.2" @@ -618,6 +654,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -753,6 +795,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -760,6 +817,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -768,6 +826,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.105", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -786,10 +872,16 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1127,6 +1219,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.0.3" @@ -1237,6 +1335,7 @@ dependencies = [ "dns-lookup", "enum-map", "flate2", + "futures", "gethostname", "humansize", "hyper", @@ -1254,6 +1353,7 @@ dependencies = [ "pwhash", "rand", "regex", + "rmcp", "rusqlite", "rusqlite_migration", "serde", @@ -1265,7 +1365,9 @@ dependencies = [ "strum_macros", "tempfile", "term 1.1.0", + "thiserror 1.0.69", "tokio", + "tokio-stream", "tower", "tower-http", "utoipa", @@ -1595,6 +1697,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pathdiff" version = "0.2.3" @@ -1830,6 +1938,40 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "rmcp" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37f2048a81a7ff7e8ef6bc5abced70c3d9114c8f03d85d7aaaafd9fd04f12e9e" +dependencies = [ + "base64 0.22.1", + "chrono", + "futures", + "paste", + "pin-project-lite", + "rmcp-macros", + "schemars", + "serde", + "serde_json", + "thiserror 2.0.14", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "rmcp-macros" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72398e694b9f6dbb5de960cf158c8699e6a1854cb5bbaac7de0646b2005763c4" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.105", +] + [[package]] name = "ron" version = "0.8.1" @@ -1951,6 +2093,31 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schemars" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" +dependencies = [ + "chrono", + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.105", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1977,6 +2144,17 @@ dependencies = [ "syn 2.0.105", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.105", +] + [[package]] name = "serde_json" version = "1.0.142" @@ -2345,6 +2523,17 @@ dependencies = [ "syn 2.0.105", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -2461,9 +2650,21 @@ checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.105", +] + [[package]] name = "tracing-core" version = "0.1.34" diff --git a/PLAN.md b/PLAN.md index 5f6fcdd..e69de29 100644 --- a/PLAN.md +++ b/PLAN.md @@ -1,271 +0,0 @@ -# Refactoring Plan to Reduce Code Duplication - -## Implementation Order: -- [x] 1. Create core module structure and error types -- [x] 2. Create common data structures -- [x] 3. Update database layer for batch operations -- [x] 4. Create core services with clear boundaries (synchronous) -- [x] 5. Add async wrappers for API use -- [x] 6. Refactor CLI modes to use services (DONE) -- [x] 7. Refactor REST API to use async services -- [x] 8. Refactor MCP tools to use services -- [x] 9. Create unified error handling -- [ ] 10. Add integration tests -- [ ] 11. Add performance optimization guidelines (partially done) - -## 1. Create Core Module Structure and Error Types (DONE) -**Files:** -- Add: `src/core/error.rs` -- Add: `src/core/mod.rs` - -**Functions:** -- Add: `CoreError` enum with comprehensive variants -- Add: Conversion traits for all error types - -**Reason:** Establish foundation for consistent error handling -**Implementation:** -- Define base error enum with conversions to all interface-specific error types -- Use `#[derive(thiserror::Error)]` for easy `Display` and `Error` implementations -- Provide user-friendly error messages with error codes - -## 2. Create Common Data Structures (DONE) -**Files:** -- Add: `src/core/types.rs` - -**Functions:** -- Add: `ItemWithMeta` struct -- Add: `ItemWithContent` struct -- Add: `From` for `ItemWithMeta` -- Add: `From` for `ItemInfo` (API response) -- Add: Serialization/deserialization implementations - -**Reason:** Standardize data structures used across modes and APIs -**Implementation:** -- Define structs for common data types -- Include conversion functions from database types -- Add serialization/deserialization support for JSON/YAML -- Ensure all fields are properly documented - -## 3. Update Database Layer for Batch Operations (DONE) -**Files:** -- Change: `src/db.rs` - -**Functions:** -- Add: `get_items_with_meta_batch` -- Add: `get_items_with_tags_batch` -- Add: `insert_item_with_meta_transaction` -- Add: `delete_item_with_meta_transaction` -- Change: Optimize existing queries for batch operations - -**Reason:** Support efficient batch operations needed by services -**Implementation:** -- Add functions to get multiple items with their metadata and tags -- Add batch insertion/updates for tags and metadata -- Add transaction support for atomic operations -- Optimize queries for common access patterns - -## 4. Create Core Service Layer with Clear Boundaries (DONE) -**Files:** -- Add: `src/core/item_service.rs` -- Add: `src/core/compression_service.rs` -- Add: `src/core/meta_service.rs` - -**Functions:** -- Add: `get_item_full` in `item_service.rs` -- Add: `save_item` in `item_service.rs` -- Add: `list_items` in `item_service.rs` -- Add: `delete_item` in `item_service.rs` -- Add: `get_compressed_content` in `compression_service.rs` -- Add: `process_metadata` in `meta_service.rs` - -**Reason:** Extract common business logic from modes and APIs into reusable services -**Implementation:** -- Move logic from modes (get, save, list, info) and API handlers into service functions -- Services should return structured data, not format output -- Handle compression, metadata, and database operations -- Keep core services **synchronous** for CLI performance -- Use streaming for pipeline efficiency (process data in chunks) - -**Layer Division:** -- **`db.rs`**: Low-level SQL operations (e.g., `insert_item`, `query_all_items`) -- **`item_service.rs`**: Higher-level business logic (e.g., "get item with metadata and tags," "validate item before save") - -## 5. Add Async Wrappers for API Use -**Files:** -- Add: `src/core/async_item_service.rs` - -**Functions:** -- Add: `get_item_async` in `async_item_service.rs` -- Add: `save_item_async` in `async_item_service.rs` -- Add: `list_items_async` in `async_item_service.rs` -- Add: `delete_item_async` in `async_item_service.rs` - -**Reason:** Ensure services can be safely used in asynchronous contexts -**Implementation:** -- Document thread-safety guarantees for all core services -- Use `Arc>` or connection pooling for shared resources -- Provide examples for safe async/sync boundaries -- Use `tokio::task::spawn_blocking` for CPU-bound or blocking I/O operations - -## 6. Refactor CLI Modes to Use Services (DONE) -**Files:** -- Change: `src/modes/get.rs` (DONE) -- Change: `src/modes/save.rs` (DONE) -- Change: `src/modes/list.rs` (DONE) -- Change: `src/modes/info.rs` (DONE) -- Change: `src/modes/delete.rs` (DONE) -- Change: `src/modes/diff.rs` (DONE) -- Change: `src/modes/status.rs` (DONE, uses shared function) - -**Functions:** -- Change: `mode_get` to use `item_service` (DONE) -- Change: `mode_save` to use `item_service` (DONE) -- Change: `mode_list` to use `item_service` (DONE) -- Change: `mode_info` to use `item_service` (DONE) -- Change: `mode_delete` to use `item_service` (DONE) -- Change: `mode_diff` to use `item_service` (DONE) -- Change: `mode_status` to use new status service functions (DONE, uses shared function) - -**Reason:** Remove direct database and file system access from modes -**Implementation:** -- Replace current implementations with calls to core services -- Keep only CLI-specific formatting and output logic -- Handle command-line argument parsing and validation -- Use synchronous services directly -- Implement streaming for stdin/stdout to maintain pipeline performance - -## 7. Refactor REST API to Use Async Services (DONE) -**Files:** -- Change: `src/modes/server/api/item.rs` (DONE) -- Change: `src/modes/server/api/status.rs` (DONE) - -**Functions:** -- Change: `handle_get_item` to use `async_item_service::get_item_async` (DONE) -- Change: `handle_get_item_latest` to use `async_item_service::get_item_async` (DONE) -- Change: `handle_list_items` to use `async_item_service::list_items_async` (DONE) -- Change: `handle_post_item` to use `async_item_service::save_item_async` (Not implemented) -- Change: `handle_get_item_content` to use `async_item_service::get_item_content_async` (DONE) -- Change: `handle_get_item_meta` to use `async_item_service::get_item_meta_async` (DONE) -- Change: `handle_status` to use `async_item_service::get_status_async` (DONE, uses shared function) - -**Reason:** Remove business logic from HTTP handlers -**Implementation:** -- Convert handlers to call async core services -- Keep only HTTP-specific code (status codes, headers, etc.) -- Use common error handling with conversions to HTTP responses -- Wrap synchronous service calls in `tokio::task::spawn_blocking` - -## 8. Refactor MCP Tools to Use Services (DONE) -**Files:** -- Change: `src/modes/server/mcp/tools.rs` (DONE) - -**Functions:** -- Change: `save_item` to use `item_service` (DONE) -- Change: `get_item` to use `async_item_service` (DONE) -- Change: `get_latest_item` to use `async_item_service` (DONE) -- Change: `list_items` to use `async_item_service` (DONE) -- Change: `search_items` to use `async_item_service` (DONE) - -**Reason:** Remove duplication with REST API and CLI modes -**Implementation:** -- Replace current implementation with calls to core services -- Keep only MCP protocol-specific logic -- Use `async_item_service` wrappers for database operations -- Standardize response format to match API/CLI - -## 9. Create Unified Error Handling (DONE) -**Files:** -- Change: All files that handle errors - -**Functions:** -- Add: Conversion traits for all error types -- Change: All error handling to use new error system - -**Reason:** Standardize error types across the application -**Implementation:** -- Define comprehensive error enum with conversions: - - From database errors - - From I/O errors - - From compression errors - - From validation errors -- Implement conversions to: - - `anyhow::Error` (for CLI) - - `axum::http::StatusCode` (for API) - - `ToolError` (for MCP) -- Provide user-friendly error messages -- Include error codes for programmatic handling - -## 10. Add Integration Tests -**Files:** -- Add: `tests/integration/core_tests.rs` -- Add: `tests/integration/cli_tests.rs` -- Add: `tests/integration/api_tests.rs` -- Add: `tests/integration/performance_tests.rs` - -**Functions:** -- Add: Test cases for all core service functions -- Add: Test cases for CLI modes -- Add: Test cases for API endpoints -- Add: Performance benchmarks - -**Reason:** Ensure refactored code maintains functionality and performance -**Implementation:** -- Test core services independently -- Test CLI modes and APIs through their public interfaces -- Verify compression, metadata, and database operations -- Include performance benchmarks for critical paths -- Use in-memory databases and tempfiles for isolation -- Test both sync and async service implementations - -## 11. Performance Optimization Guidelines (PARTIALLY DONE) -**Files:** -- Change: All core service files -- Change: All mode files -- Change: All API handler files - -**Functions:** -- Add: Streaming implementations for I/O operations -- Add: Benchmark functions for critical paths -- Change: Buffer management to minimize copies - -**Reason:** Ensure the refactored version doesn't slow down pipelines -**Implementation:** -- Use streaming for stdin/stdout processing (chunked I/O) -- Minimize buffering and memory copies -- Offload CPU-bound work (compression, plugins) to thread pools -- Provide fast-path options (e.g., `--fast` flag to skip metadata plugins) -- Benchmark critical operations before/after refactoring -- Document performance characteristics and tradeoffs - -## Benefits: -- Reduced code duplication between CLI, API, and MCP -- Easier maintenance with clear separation of concerns -- Consistent behavior across all interfaces -- Better testability with isolated service layer -- Maintained or improved pipeline performance -- Flexible architecture supporting both sync and async use cases - -## Key Risks and Mitigations: -1. **Performance Regression:** - - Risk: Refactoring could slow down pipeline operations - - Mitigation: Benchmark before/after, use streaming, minimize overhead - -2. **Increased Complexity:** - - Risk: Adding service layer could make code harder to understand - - Mitigation: Clear documentation, gradual refactoring, maintain simple interfaces - -3. **Async/Sync Boundaries:** - - Risk: Mixing sync/async could lead to deadlocks or inefficiencies - - Mitigation: Clear boundaries, use `spawn_blocking` for sync work in async context - -4. **Breaking Changes:** - - Risk: Refactoring could change behavior in subtle ways - - Mitigation: Comprehensive tests, gradual rollout, maintain backward compatibility - -## Design Principles: -1. **Zero-Copy Where Possible:** Use slicing instead of copying data -2. **Streaming Processing:** Handle data in chunks for memory efficiency -3. **Clear Boundaries:** Separate core logic from interface-specific code -4. **Performance First:** Optimize for common pipeline use cases -5. **Consistent Errors:** Unified error handling across all interfaces -6. **Backward Compatibility:** Maintain existing CLI/API behavior diff --git a/src/core/item_service.rs b/src/core/item_service.rs deleted file mode 100644 index 73a24f8..0000000 --- a/src/core/item_service.rs +++ /dev/null @@ -1,262 +0,0 @@ -use crate::config::Settings; -use crate::services::compression_service::CompressionService; -use crate::services::error::CoreError; -use crate::services::meta_service::MetaService; -use crate::services::types::{ItemWithContent, ItemWithMeta}; -use crate::meta_plugin::{get_meta_plugin, MetaPlugin, MetaPluginType}; -use crate::db::{self, Meta}; -use crate::compression_engine::{get_compression_engine, CompressionType}; -use crate::modes::common::settings_compression_type; -use clap::Command; -use rusqlite::Connection; -use std::collections::HashMap; -use std::fs; -use std::io::{IsTerminal, Read, Write}; -use std::path::PathBuf; - -pub struct ItemService { - data_path: PathBuf, - compression_service: CompressionService, - meta_service: MetaService, -} - -impl ItemService { - pub fn new(data_path: PathBuf) -> Self { - Self { - data_path, - compression_service: CompressionService::new(), - meta_service: MetaService::new(), - } - } - - pub fn get_item(&self, conn: &Connection, id: i64) -> Result { - let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?; - let tags = db::get_item_tags(conn, &item)?; - let meta = db::get_item_meta(conn, &item)?; - Ok(ItemWithMeta { item, tags, meta }) - } - - pub fn get_item_content(&self, conn: &Connection, id: i64) -> Result { - let item_with_meta = self.get_item(conn, id)?; - let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; - - if item_id <= 0 { - return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", item_id))); - } - - let mut item_path = self.data_path.clone(); - item_path.push(item_id.to_string()); - - let content = self - .compression_service - .get_item_content(item_path, &item_with_meta.item.compression)?; - - Ok(ItemWithContent { - item_with_meta, - content, - }) - } - - pub fn find_item(&self, conn: &Connection, ids: &[i64], tags: &[String], meta: &HashMap) -> Result { - let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) { - (false, _) => db::get_item(conn, ids[0])?, - (true, true) => db::get_item_last(conn)?, - (true, false) => db::get_item_matching(conn, &tags.to_vec(), meta)? - }; - - let item = item_maybe.ok_or(CoreError::ItemNotFoundGeneric)?; - let item_id = item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; - self.get_item(conn, item_id) - } - - pub fn list_items(&self, conn: &Connection, tags: &[String], meta: &HashMap) -> Result, CoreError> { - let items = db::get_items_matching(conn, &tags.to_vec(), meta)?; - - let item_ids: Vec = items.iter().filter_map(|item| item.id).collect(); - if item_ids.is_empty() { - return Ok(Vec::new()); - } - - let tags_map = db::get_tags_for_items(conn, &item_ids)?; - let meta_map_db = db::get_meta_for_items(conn, &item_ids)?; - - let mut result = Vec::new(); - for item in items { - let item_id = item.id.unwrap(); - let tags = tags_map.get(&item_id).cloned().unwrap_or_default(); - let meta_hm = meta_map_db.get(&item_id).cloned().unwrap_or_default(); - let meta = meta_hm.into_iter().map(|(name, value)| Meta { id: item_id, name, value }).collect(); - - result.push(ItemWithMeta { item, tags, meta }); - } - - Ok(result) - } - - pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<(), CoreError> { - if id <= 0 { - return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", id))); - } - let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?; - - let mut item_path = self.data_path.clone(); - item_path.push(id.to_string()); - - let tx = conn.transaction()?; - db::delete_item(&tx, item)?; - fs::remove_file(&item_path).or_else(|e| if e.kind() == std::io::ErrorKind::NotFound { Ok(()) } else { Err(e) })?; - tx.commit()?; - - Ok(()) - } - - pub fn save_item( - &self, - mut input: R, - cmd: &mut Command, - settings: &Settings, - tags: &mut Vec, - conn: &mut Connection, - ) -> Result { - if tags.is_empty() { - tags.push("none".to_string()); - } - - let compression_type = settings_compression_type(cmd, settings); - let compression_engine = get_compression_engine(compression_type.clone())?; - - let tx = conn.transaction()?; - - let item_id; - let mut item; - { - item = db::create_item(&tx, compression_type.clone())?; - item_id = item.id.unwrap(); - db::set_item_tags(&tx, item.clone(), tags)?; - let item_meta = self.meta_service.collect_initial_meta(); - for (k, v) in item_meta.iter() { - db::add_meta(&tx, item_id, k, v)?; - } - } - - let mut plugins = self.meta_service.get_plugins(cmd, settings); - self.meta_service.initialize_plugins(&mut plugins, &tx, item_id); - - let mut item_path = self.data_path.clone(); - item_path.push(item_id.to_string()); - - let mut item_out = compression_engine.create(item_path.clone())?; - - let mut buffer = [0; 8192]; - let mut total_bytes = 0; - - loop { - let n = input.read(&mut buffer)?; - if n == 0 { break; } - - total_bytes += n as i64; - item_out.write_all(&buffer[..n])?; - self.meta_service.process_chunk(&mut plugins, &buffer[..n], &tx); - } - - item_out.flush()?; - drop(item_out); - - self.meta_service.finalize_plugins(&mut plugins, &tx); - - item.size = Some(total_bytes); - db::update_item(&tx, item.clone())?; - - tx.commit()?; - - if !settings.quiet { - if std::io::stderr().is_terminal() { - let mut t = term::stderr().unwrap(); - let _ = t.reset(); - let _ = t.attr(term::Attr::Bold); - let _ = write!(t, "KEEP:"); - let _ = t.reset(); - let _ = write!(t, " New item "); - let _ = t.attr(term::Attr::Bold); - let _ = write!(t, "{item_id}"); - let _ = t.reset(); - let _ = write!(t, " tags: "); - let _ = t.attr(term::Attr::Bold); - let _ = write!(t, "{}", tags.join(" ")); - let _ = t.reset(); - let _ = writeln!(t); - let _ = std::io::stderr().flush(); - } else { - let mut t = std::io::stderr(); - let _ = writeln!(t, "KEEP: New item: {} tags: {:?}", item_id, tags); - } - } - - self.get_item(conn, item_id) - } - - pub fn save_item_from_mcp( - &self, - content: &[u8], - tags: &Vec, - metadata: &HashMap, - conn: &mut Connection, - ) -> Result { - let compression_type = CompressionType::LZ4; - let compression_engine = get_compression_engine(compression_type.clone())?; - - let tx = conn.transaction()?; - let item_id; - let mut item; - - { - item = db::create_item(&tx, compression_type.clone())?; - item_id = item.id.unwrap(); - - // Add tags - for tag in tags { - db::add_tag(&tx, item_id, tag)?; - } - - // Add custom metadata - for (key, value) in metadata { - db::add_meta(&tx, item_id, key, value)?; - } - } - - let mut item_path = self.data_path.clone(); - item_path.push(item_id.to_string()); - - let mut writer = compression_engine.create(item_path.clone())?; - writer.write_all(content)?; - drop(writer); - - let plugin_types = vec![ - MetaPluginType::FileMime, - MetaPluginType::FileEncoding, - MetaPluginType::Binary, - MetaPluginType::LineCount, - MetaPluginType::WordCount, - MetaPluginType::DigestSha256, - MetaPluginType::Uid, - MetaPluginType::User, - MetaPluginType::Hostname, - ]; - - let mut plugins: Vec> = - plugin_types.iter().map(|p| get_meta_plugin(p.clone())).collect(); - - self.meta_service - .initialize_plugins(&mut plugins, &tx, item_id); - self.meta_service - .process_chunk(&mut plugins, content, &tx); - self.meta_service.finalize_plugins(&mut plugins, &tx); - - item.size = Some(content.len() as i64); - db::update_item(&tx, item.clone())?; - - tx.commit()?; - - self.get_item(conn, item_id) - } -} diff --git a/src/core/async_item_service.rs b/src/services/async_item_service.rs similarity index 100% rename from src/core/async_item_service.rs rename to src/services/async_item_service.rs diff --git a/src/core/compression_service.rs b/src/services/compression_service.rs similarity index 100% rename from src/core/compression_service.rs rename to src/services/compression_service.rs diff --git a/src/core/error.rs b/src/services/error.rs similarity index 100% rename from src/core/error.rs rename to src/services/error.rs diff --git a/src/core/meta_service.rs b/src/services/meta_service.rs similarity index 100% rename from src/core/meta_service.rs rename to src/services/meta_service.rs diff --git a/src/core/mod.rs b/src/services/mod.rs similarity index 100% rename from src/core/mod.rs rename to src/services/mod.rs diff --git a/src/core/types.rs b/src/services/types.rs similarity index 100% rename from src/core/types.rs rename to src/services/types.rs