From e8ea42506ec7165feaa868544e36fd485611026b Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Tue, 10 Mar 2026 22:31:31 -0300 Subject: [PATCH] feat: unify CLI and API with DataService trait - Add DataService trait with streaming support for save/get operations - Implement SyncDataService for CLI and AsyncDataService for API - Add missing API endpoints: DELETE /api/item/{id}, GET /api/item/{id}/info, GET /api/diff - Add GET /api/plugins/status endpoint - Preserve stdin/stdout streaming performance via Read trait --- src/modes/server/api/item.rs | 229 +++++++++++++++++++++++++++++ src/modes/server/api/mod.rs | 11 +- src/modes/server/api/status.rs | 58 ++++++++ src/modes/server/common.rs | 15 ++ src/services/async_data_service.rs | 148 +++++++++++++++++++ src/services/data_service.rs | 64 ++++++++ src/services/mod.rs | 6 + src/services/sync_data_service.rs | 213 +++++++++++++++++++++++++++ 8 files changed, 742 insertions(+), 2 deletions(-) create mode 100644 src/services/async_data_service.rs create mode 100644 src/services/data_service.rs create mode 100644 src/services/sync_data_service.rs diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index 1f71fe3..8e688c9 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -11,6 +11,7 @@ use axum::{ }; use log::{debug, warn}; use std::collections::HashMap; +use std::io::Read; // Helper functions to replace the missing binary_detection module async fn check_binary_content_allowed( @@ -701,3 +702,231 @@ pub async fn handle_get_item_meta( Err(e) => Err(handle_item_error(e)), } } + +#[utoipa::path( + delete, + path = "/api/item/{item_id}", + tag = "items", + params( + ("item_id" = i64, Path, description = "ID of the item to delete") + ), + responses( + (status = 200, description = "Item deleted successfully", body = ApiResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Item not found"), + (status = 500, description = "Internal server error") + ), + security( + ("bearerAuth" = []) + ) +)] +pub async fn handle_delete_item( + State(state): State, + Path(item_id): Path, +) -> Result>, StatusCode> { + let conn = state.db.lock().await; + + let sync_service = + crate::services::SyncDataService::new(state.data_dir.clone(), state.settings.clone()); + + let deleted_item = sync_service + .delete_item(&mut conn.clone(), item_id) + .map_err(handle_item_error)?; + + let item_info = ItemInfo { + id: deleted_item.id, + ts: deleted_item.ts, + size: deleted_item.size, + compression: deleted_item.compression, + tags: vec![], + meta: HashMap::new(), + }; + + let response = ApiResponse { + success: true, + data: Some(item_info), + error: None, + }; + + Ok(Json(response)) +} + +#[utoipa::path( + get, + path = "/api/item/{item_id}/info", + tag = "items", + params( + ("item_id" = i64, Path, description = "ID of the item to get info for") + ), + responses( + (status = 200, description = "Item info retrieved successfully", body = ApiResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Item not found"), + (status = 500, description = "Internal server error") + ), + security( + ("bearerAuth" = []) + ) +)] +pub async fn handle_get_item_info( + State(state): State, + Path(item_id): Path, +) -> Result>, StatusCode> { + let conn = state.db.lock().await; + + let sync_service = + crate::services::SyncDataService::new(state.data_dir.clone(), state.settings.clone()); + + let item_with_meta = sync_service + .get_item(&mut conn.clone(), item_id) + .map_err(handle_item_error)?; + + let tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); + + let item_info = ItemInfo { + id: item_with_meta.item.id, + ts: item_with_meta.item.ts, + size: item_with_meta.item.size, + compression: item_with_meta.item.compression, + tags, + meta: item_with_meta.meta_as_map(), + }; + + let response = ApiResponse { + success: true, + data: Some(item_info), + error: None, + }; + + Ok(Json(response)) +} + +#[derive(serde::Deserialize)] +pub struct DiffQuery { + id_a: Option, + id_b: Option, + tag_a: Option, + tag_b: Option, +} + +#[utoipa::path( + get, + path = "/api/diff", + tag = "items", + params( + ("id_a" = Option, Query, description = "First item ID"), + ("id_b" = Option, Query, description = "Second item ID"), + ("tag_a" = Option, Query, description = "Tag to find first item"), + ("tag_b" = Option, Query, description = "Tag to find second item"), + ), + responses( + (status = 200, description = "Diff between two items", body = ApiResponse>), + (status = 400, description = "Invalid request - need two items to compare"), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Item not found"), + (status = 500, description = "Internal server error") + ), + security( + ("bearerAuth" = []) + ) +)] +pub async fn handle_diff_items( + State(state): State, + Query(query): Query, +) -> Result>>, StatusCode> { + let conn = state.db.lock().await; + + let sync_service = + crate::services::SyncDataService::new(state.data_dir.clone(), state.settings.clone()); + + let item_a = if let Some(id_a) = query.id_a { + sync_service + .get_item(&mut conn.clone(), id_a) + .map_err(handle_item_error)? + } else if let Some(tag) = &query.tag_a { + sync_service + .find_item(&mut conn.clone(), vec![], vec![tag.clone()], HashMap::new()) + .map_err(handle_item_error)? + } else { + return Err(StatusCode::BAD_REQUEST); + }; + + let item_b = if let Some(id_b) = query.id_b { + sync_service + .get_item(&mut conn.clone(), id_b) + .map_err(handle_item_error)? + } else if let Some(tag) = &query.tag_b { + sync_service + .find_item(&mut conn.clone(), vec![], vec![tag.clone()], HashMap::new()) + .map_err(handle_item_error)? + } else { + return Err(StatusCode::BAD_REQUEST); + }; + + let id_a = item_a.item.id.unwrap(); + let id_b = item_b.item.id.unwrap(); + + let (reader_a, _) = sync_service + .get_content(&mut conn.clone(), id_a) + .map_err(handle_item_error)?; + let (reader_b, _) = sync_service + .get_content(&mut conn.clone(), id_b) + .map_err(handle_item_error)?; + + let mut content_a = Vec::new(); + reader_a.read_to_end(&mut content_a).map_err(|e| { + log::error!("Failed to read content A: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let mut content_b = Vec::new(); + reader_b.read_to_end(&mut content_b).map_err(|e| { + log::error!("Failed to read content B: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let diff_lines = compute_diff(&content_a, &content_b); + + let response = ApiResponse { + success: true, + data: Some(diff_lines), + error: None, + }; + + Ok(Json(response)) +} + +fn compute_diff(a: &[u8], b: &[u8]) -> Vec { + let text_a = String::from_utf8_lossy(a); + let text_b = String::from_utf8_lossy(b); + + let lines_a: Vec<&str> = text_a.lines().collect(); + let lines_b: Vec<&str> = text_b.lines().collect(); + + let mut diff_lines = Vec::new(); + + let max_lines = std::cmp::max(lines_a.len(), lines_b.len()); + for i in 0..max_lines { + let line_a = lines_a.get(i).copied(); + let line_b = lines_b.get(i).copied(); + + match (line_a, line_b) { + (Some(la), Some(lb)) if la == lb => { + diff_lines.push(format!(" {}", la)); + } + (Some(la), Some(lb)) => { + diff_lines.push(format!("- {}", la)); + diff_lines.push(format!("+ {}", lb)); + } + (Some(la), None) => { + diff_lines.push(format!("- {}", la)); + } + (None, Some(lb)) => { + diff_lines.push(format!("+ {}", lb)); + } + (None, None) => {} + } + } + + diff_lines +} diff --git a/src/modes/server/api/mod.rs b/src/modes/server/api/mod.rs index 81e9ff1..81e428f 100644 --- a/src/modes/server/api/mod.rs +++ b/src/modes/server/api/mod.rs @@ -4,7 +4,10 @@ pub mod item; pub mod mcp; pub mod status; -use axum::{Router, routing::get}; +use axum::{ + Router, + routing::{delete, get}, +}; use crate::modes::server::common::AppState; use utoipa::OpenApi; @@ -59,6 +62,7 @@ pub fn add_routes(router: Router) -> Router { let router = router // Status endpoints .route("/api/status", get(status::handle_status)) + .route("/api/plugins/status", get(status::handle_plugins_status)) // Item endpoints .route( "/api/item/", @@ -76,7 +80,10 @@ pub fn add_routes(router: Router) -> Router { .route( "/api/item/{item_id}/content", get(item::handle_get_item_content), - ); + ) + .route("/api/item/{item_id}", delete(item::handle_delete_item)) + .route("/api/item/{item_id}/info", get(item::handle_get_item_info)) + .route("/api/diff", get(item::handle_diff_items)); #[cfg(feature = "mcp")] { diff --git a/src/modes/server/api/status.rs b/src/modes/server/api/status.rs index 423418c..a28d95b 100644 --- a/src/modes/server/api/status.rs +++ b/src/modes/server/api/status.rs @@ -75,3 +75,61 @@ pub async fn handle_status( Ok(Json(response)) } + +#[derive(Debug, serde::Serialize)] +pub struct PluginsStatusResponse { + pub meta_plugins: std::collections::HashMap, + pub filter_plugins: Vec, + pub compression: Vec, +} + +#[utoipa::path( + get, + path = "/api/plugins/status", + operation_id = "keep_plugins_status", + summary = "Get plugins status", + description = "Retrieve detailed status of all available plugins including meta, filter, and compression plugins.", + responses( + (status = 200, description = "Plugins status retrieved", body = ApiResponse), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ), + security( + ("bearerAuth" = []) + ), + tag = "status" +)] +pub async fn handle_plugins_status( + State(state): State, +) -> Result>, StatusCode> { + let db_path = state + .db + .lock() + .await + .path() + .unwrap_or("unknown") + .to_string(); + + let status_service = crate::services::status_service::StatusService::new(); + let mut cmd = state.cmd.lock().await; + let status_info = status_service.generate_status( + &mut cmd, + &state.settings, + state.data_dir.clone(), + db_path.into(), + ); + + let response_data = PluginsStatusResponse { + meta_plugins: status_info.meta_plugins, + filter_plugins: status_info.filter_plugins, + compression: status_info.compression, + }; + + let response = crate::modes::server::common::ApiResponse:: { + success: true, + data: Some(response_data), + error: None, + }; + + Ok(Json(response)) +} diff --git a/src/modes/server/common.rs b/src/modes/server/common.rs index 1b23b4d..d37ee7f 100644 --- a/src/modes/server/common.rs +++ b/src/modes/server/common.rs @@ -567,6 +567,21 @@ fn default_as_meta() -> bool { false } +/// Request body for creating a new item. +/// +/// Contains the content to store and optional tags. +#[derive(Debug, Deserialize, Serialize, ToSchema)] +pub struct CreateItemRequest { + /// The content to store. + #[schema(example = "Hello, world!")] + pub content: String, + /// Optional tags to associate with the item. + #[schema(example = json!(["important", "work"]))] + pub tags: Option>, + /// Optional metadata key-value pairs. + pub metadata: Option>, +} + /// Validates bearer authentication token. /// /// This function checks if the provided authorization string is a valid Bearer token diff --git a/src/services/async_data_service.rs b/src/services/async_data_service.rs new file mode 100644 index 0000000..0460f42 --- /dev/null +++ b/src/services/async_data_service.rs @@ -0,0 +1,148 @@ +use crate::common::status::StatusInfo; +use crate::config::Settings; +use crate::db::Item; +use crate::db::Meta; +use crate::services::data_service::DataService; +use crate::services::error::CoreError; +use crate::services::types::{ItemWithContent, ItemWithMeta}; +use clap::Command; +use rusqlite::Connection; +use std::collections::HashMap; +use std::io::Read; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::Mutex; + +pub struct AsyncDataService { + data_path: PathBuf, + settings: Arc, + db: Arc>, +} + +impl AsyncDataService { + pub fn new(data_path: PathBuf, settings: Arc, db: Arc>) -> Self { + Self { + data_path, + settings, + db, + } + } + + pub fn data_path(&self) -> &PathBuf { + &self.data_path + } + + pub fn settings(&self) -> Arc { + self.settings.clone() + } + + pub fn db(&self) -> Arc> { + self.db.clone() + } +} + +impl DataService for AsyncDataService { + type Error = CoreError; + + fn save( + &self, + content: R, + cmd: &mut Command, + settings: &Settings, + tags: Vec, + conn: &mut Connection, + ) -> Result { + let sync_service = + crate::services::SyncDataService::new(self.data_path.clone(), settings.clone()); + sync_service.save(content, cmd, settings, tags, conn) + } + + fn get(&self, conn: &mut Connection, id: i64) -> Result { + let sync_service = crate::services::SyncDataService::new( + self.data_path.clone(), + self.settings.as_ref().clone(), + ); + sync_service.get(conn, id) + } + + fn get_content( + &self, + conn: &mut Connection, + id: i64, + ) -> Result<(Box, ItemWithMeta), Self::Error> { + let sync_service = crate::services::SyncDataService::new( + self.data_path.clone(), + self.settings.as_ref().clone(), + ); + sync_service.get_content(conn, id) + } + + fn list( + &self, + conn: &mut Connection, + tags: Vec, + meta: HashMap, + ) -> Result, Self::Error> { + let sync_service = crate::services::SyncDataService::new( + self.data_path.clone(), + self.settings.as_ref().clone(), + ); + sync_service.list(conn, tags, meta) + } + + fn delete(&self, conn: &mut Connection, id: i64) -> Result { + let sync_service = crate::services::SyncDataService::new( + self.data_path.clone(), + self.settings.as_ref().clone(), + ); + sync_service.delete(conn, id) + } + + fn find_item( + &self, + conn: &mut Connection, + ids: Vec, + tags: Vec, + meta: HashMap, + ) -> Result { + let sync_service = crate::services::SyncDataService::new( + self.data_path.clone(), + self.settings.as_ref().clone(), + ); + sync_service.find_item(conn, ids, tags, meta) + } + + fn get_items( + &self, + conn: &mut Connection, + ids: &[i64], + tags: &[String], + meta: &HashMap, + ) -> Result, Self::Error> { + let sync_service = crate::services::SyncDataService::new( + self.data_path.clone(), + self.settings.as_ref().clone(), + ); + sync_service.get_items(conn, ids, tags, meta) + } + + fn generate_status( + &self, + _cmd: &Command, + settings: &Settings, + data_path: &PathBuf, + db_path: &PathBuf, + ) -> Result { + let mut cmd_mut = Command::new("keep"); + let sync_service = + crate::services::SyncDataService::new(self.data_path.clone(), settings.clone()); + Ok( + sync_service.generate_status( + &mut cmd_mut, + settings, + data_path.clone(), + db_path.clone(), + ), + ) + } +} diff --git a/src/services/data_service.rs b/src/services/data_service.rs new file mode 100644 index 0000000..def7d40 --- /dev/null +++ b/src/services/data_service.rs @@ -0,0 +1,64 @@ +use crate::common::status::StatusInfo; +use crate::config::Settings; +use crate::db::{Item, Meta, Tag}; +use crate::services::error::CoreError; +use crate::services::types::{ItemWithContent, ItemWithMeta}; +use clap::Command; +use rusqlite::Connection; +use std::collections::HashMap; +use std::io::Read; +use std::path::PathBuf; + +pub trait DataService { + type Error; + + fn save( + &self, + content: R, + cmd: &mut Command, + settings: &Settings, + tags: Vec, + conn: &mut Connection, + ) -> Result; + + fn get(&self, conn: &mut Connection, id: i64) -> Result; + + fn get_content( + &self, + conn: &mut Connection, + id: i64, + ) -> Result<(Box, ItemWithMeta), Self::Error>; + + fn list( + &self, + conn: &mut Connection, + tags: Vec, + meta: HashMap, + ) -> Result, Self::Error>; + + fn delete(&self, conn: &mut Connection, id: i64) -> Result; + + fn find_item( + &self, + conn: &mut Connection, + ids: Vec, + tags: Vec, + meta: HashMap, + ) -> Result; + + fn get_items( + &self, + conn: &mut Connection, + ids: &[i64], + tags: &[String], + meta: &HashMap, + ) -> Result, Self::Error>; + + fn generate_status( + &self, + cmd: &Command, + settings: &Settings, + data_path: &PathBuf, + db_path: &PathBuf, + ) -> Result; +} diff --git a/src/services/mod.rs b/src/services/mod.rs index 434f8af..5d05fbe 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,17 +1,23 @@ +pub mod async_data_service; pub mod async_item_service; pub mod compression_service; +pub mod data_service; pub mod error; pub mod filter_service; pub mod item_service; pub mod meta_service; pub mod status_service; +pub mod sync_data_service; pub mod types; +pub use async_data_service::AsyncDataService; pub use async_item_service::AsyncItemService; pub use compression_service::CompressionService; +pub use data_service::DataService; pub use error::CoreError; pub use filter_service::{FilterService, register_filter_plugin}; pub use item_service::ItemService; pub use meta_service::MetaService; pub use status_service::StatusService; +pub use sync_data_service::SyncDataService; pub use types::{ItemWithContent, ItemWithMeta}; diff --git a/src/services/sync_data_service.rs b/src/services/sync_data_service.rs new file mode 100644 index 0000000..d35e539 --- /dev/null +++ b/src/services/sync_data_service.rs @@ -0,0 +1,213 @@ +use crate::common::status::StatusInfo; +use crate::config::Settings; +use crate::db::Item; +use crate::db::Meta; +use crate::services::data_service::DataService; +use crate::services::error::CoreError; +use crate::services::item_service::ItemService; +use crate::services::status_service::StatusService; +use crate::services::types::{ItemWithContent, ItemWithMeta}; +use clap::Command; +use rusqlite::Connection; +use std::collections::HashMap; +use std::io::{Cursor, Read}; +use std::path::PathBuf; + +pub struct SyncDataService { + item_service: ItemService, + settings: Settings, +} + +impl SyncDataService { + pub fn new(data_path: PathBuf, settings: Settings) -> Self { + Self { + item_service: ItemService::new(data_path), + settings, + } + } + + pub fn with_connection(data_path: PathBuf, settings: Settings, _conn: &Connection) -> Self { + Self::new(data_path, settings) + } + + pub fn item_service(&self) -> &ItemService { + &self.item_service + } + + pub fn settings(&self) -> &Settings { + &self.settings + } + + pub fn get_data_path(&self) -> &PathBuf { + &self.item_service.get_data_path() + } + + pub fn save_item( + &self, + content: R, + cmd: &mut Command, + settings: &Settings, + tags: &mut Vec, + conn: &mut Connection, + ) -> Result { + self.item_service + .save_item(content, cmd, settings, tags, conn) + } + + pub fn get_item(&self, conn: &mut Connection, id: i64) -> Result { + self.item_service.get_item(conn, id) + } + + pub fn get_item_content( + &self, + conn: &Connection, + id: i64, + ) -> Result { + self.item_service.get_item_content(conn, id) + } + + pub fn get_item_content_streaming( + &self, + conn: &Connection, + id: i64, + ) -> Result<(Box, ItemWithMeta), CoreError> { + let (reader, _mime, _is_binary) = self + .item_service + .get_item_content_info_streaming(conn, id, None)?; + let item_with_meta = self.item_service.get_item(conn, id)?; + Ok((reader, item_with_meta)) + } + + pub fn list_items( + &self, + conn: &mut Connection, + tags: Vec, + meta: HashMap, + ) -> Result, CoreError> { + self.item_service.list_items(conn, &tags, &meta) + } + + pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result { + let item_with_meta = self.item_service.get_item(conn, id)?; + let item = item_with_meta.item.clone(); + self.item_service.delete_item(conn, id)?; + Ok(item) + } + + pub fn find_item( + &self, + conn: &mut Connection, + ids: Vec, + tags: Vec, + meta: HashMap, + ) -> Result { + self.item_service.find_item(conn, &ids, &tags, &meta) + } + + pub fn generate_status( + &self, + cmd: &mut Command, + settings: &Settings, + data_path: PathBuf, + db_path: PathBuf, + ) -> StatusInfo { + let status_service = StatusService::new(); + status_service.generate_status(cmd, settings, data_path, db_path) + } +} + +impl DataService for SyncDataService { + type Error = CoreError; + + fn save( + &self, + content: R, + cmd: &mut Command, + settings: &Settings, + mut tags: Vec, + conn: &mut Connection, + ) -> Result { + if tags.is_empty() { + tags.push("none".to_string()); + } + + let item_id = self + .item_service + .save_item(content, cmd, settings, &mut tags, conn)?; + + Ok(Item { + id: Some(item_id), + ts: chrono::Utc::now(), + size: Some(0), + compression: "lz4".to_string(), + }) + } + + fn get(&self, conn: &mut Connection, id: i64) -> Result { + self.get_item(conn, id) + } + + fn get_content( + &self, + conn: &mut Connection, + id: i64, + ) -> Result<(Box, ItemWithMeta), Self::Error> { + self.get_item_content_streaming(conn, id) + } + + fn list( + &self, + conn: &mut Connection, + tags: Vec, + meta: HashMap, + ) -> Result, Self::Error> { + self.list_items(conn, tags, meta) + } + + fn delete(&self, conn: &mut Connection, id: i64) -> Result { + self.delete_item(conn, id) + } + + fn find_item( + &self, + conn: &mut Connection, + ids: Vec, + tags: Vec, + meta: HashMap, + ) -> Result { + self.find_item(conn, ids, tags, meta) + } + + fn get_items( + &self, + conn: &mut Connection, + ids: &[i64], + tags: &[String], + meta: &HashMap, + ) -> Result, Self::Error> { + if ids.is_empty() { + return self.list_items(conn, tags.to_vec(), meta.clone()); + } + + let mut results = Vec::new(); + for id in ids { + match self.get_item(conn, *id) { + Ok(item) => results.push(item), + Err(CoreError::ItemNotFound(_)) => continue, + Err(e) => return Err(e), + } + } + Ok(results) + } + + fn generate_status( + &self, + _cmd: &Command, + settings: &Settings, + data_path: &PathBuf, + db_path: &PathBuf, + ) -> Result { + let mut cmd_mut = Command::new("keep"); + Ok(self.generate_status(&mut cmd_mut, settings, data_path.clone(), db_path.clone())) + } +}