diff --git a/src/client.rs b/src/client.rs index 0e6b42c..468b28c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,22 +1,9 @@ -use crate::services::error::CoreError; +use crate::services::{ItemInfo, error::CoreError}; use base64::Engine; use serde::de::DeserializeOwned; use std::collections::HashMap; use std::io::Read; -/// Item information returned from the server API. -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct ItemInfo { - pub id: i64, - pub ts: String, - pub uncompressed_size: Option, - pub compressed_size: Option, - pub closed: bool, - pub compression: String, - pub tags: Vec, - pub metadata: HashMap, -} - /// Percent-encode a value for use in a URL query string. fn url_encode(s: &str) -> String { let mut result = String::with_capacity(s.len() * 3); diff --git a/src/db.rs b/src/db.rs index 791da02..e68eba5 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1355,11 +1355,11 @@ pub fn get_item_meta(conn: &Connection, item: &Item) -> Result> { /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; /// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; -/// let meta = db::get_item_meta_name(&conn, &item, "mime_type".to_string())?; +/// let meta = db::get_item_meta_name(&conn, &item, "mime_type")?; /// # Ok(()) /// # } /// ``` -pub fn get_item_meta_name(conn: &Connection, item: &Item, name: String) -> Result> { +pub fn get_item_meta_name(conn: &Connection, item: &Item, name: &str) -> Result> { debug!("DB: Getting item meta name: {item:?} {name:?}"); let mut statement = conn .prepare_cached("SELECT id, name, value FROM metas WHERE id=?1 AND name=?2") @@ -1407,11 +1407,11 @@ pub fn get_item_meta_name(conn: &Connection, item: &Item, name: String) -> Resul /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; /// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; -/// let value = db::get_item_meta_value(&conn, &item, "source".to_string())?; +/// let value = db::get_item_meta_value(&conn, &item, "source")?; /// # Ok(()) /// # } /// ``` -pub fn get_item_meta_value(conn: &Connection, item: &Item, name: String) -> Result> { +pub fn get_item_meta_value(conn: &Connection, item: &Item, name: &str) -> Result> { debug!("DB: Getting item meta value: {item:?} {name:?}"); let mut statement = conn .prepare_cached("SELECT value FROM metas WHERE id=?1 AND name=?2") diff --git a/src/lib.rs b/src/lib.rs index 5e480f0..af2842e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,6 +52,7 @@ pub mod tokenizer; pub use args::Args; // Re-export PIPESIZE constant pub use common::PIPESIZE; +pub use services::CoreError; // Import all filter plugins to ensure they register themselves #[allow(unused_imports)] diff --git a/src/modes/client/get.rs b/src/modes/client/get.rs index 668c558..db6d4b2 100644 --- a/src/modes/client/get.rs +++ b/src/modes/client/get.rs @@ -39,7 +39,7 @@ pub fn mode( // Decompress through streaming readers let mut decompressed_reader: Box = - CompressionService::decompressing_reader(reader, &compression_type); + CompressionService::decompressing_reader(reader, &compression_type)?; // Binary detection: sample first chunk let mut sample_buf = [0u8; crate::common::PIPESIZE]; diff --git a/src/modes/client/import.rs b/src/modes/client/import.rs index ede4ea7..ca65773 100644 --- a/src/modes/client/import.rs +++ b/src/modes/client/import.rs @@ -29,7 +29,7 @@ pub fn mode( } else { cmd.error( clap::error::ErrorKind::InvalidValue, - format!("Unsupported import format: {import_path} (expected .keep.tar or .meta.yml)"), + format!("Unsupported import format: {}", import_path), ) .exit(); } diff --git a/src/modes/client/save.rs b/src/modes/client/save.rs index 1069450..50d07a8 100644 --- a/src/modes/client/save.rs +++ b/src/modes/client/save.rs @@ -1,8 +1,9 @@ -use crate::client::{ItemInfo, KeepClient}; +use crate::client::KeepClient; use crate::compression_engine::CompressionType; use crate::config::Settings; use crate::meta_plugin::SaveMetaFn; use crate::modes::common::settings_compression_type; +use crate::services::ItemInfo; use crate::services::compression_service::CompressionService; use crate::services::meta_service::MetaService; use anyhow::Result; @@ -75,7 +76,7 @@ pub fn mode( // Wrap pipe writer with appropriate compression let mut compressor: Box = - CompressionService::compressing_writer(Box::new(pipe_writer), &compression_type_clone); + CompressionService::compressing_writer(Box::new(pipe_writer), &compression_type_clone)?; loop { let n = stdin_lock.read(&mut buffer)?; diff --git a/src/modes/common.rs b/src/modes/common.rs index 10f8984..6a24b70 100644 --- a/src/modes/common.rs +++ b/src/modes/common.rs @@ -61,6 +61,9 @@ lazy_static! { static ref KEEP_META_RE: Regex = Regex::new(r"^KEEP_META_(.+)$").unwrap(); } +pub const IMPORT_FORMAT_ERROR: &str = + "Unsupported import format: {} (expected .keep.tar or .meta.yml)"; + pub fn get_meta_from_env() -> HashMap { debug!("COMMON: Getting meta from KEEP_META_*"); let mut meta_env: HashMap = HashMap::new(); diff --git a/src/modes/import.rs b/src/modes/import.rs index efc9d6d..f898994 100644 --- a/src/modes/import.rs +++ b/src/modes/import.rs @@ -52,7 +52,7 @@ pub fn mode_import( } else { cmd.error( clap::error::ErrorKind::InvalidValue, - format!("Unsupported import format: {import_path} (expected .keep.tar or .meta.yml)"), + format!("Unsupported import format: {}", import_path), ) .exit(); } diff --git a/src/modes/list.rs b/src/modes/list.rs index 558bd0f..9cd6b04 100644 --- a/src/modes/list.rs +++ b/src/modes/list.rs @@ -151,13 +151,19 @@ pub fn mode_list( Some(size) => format_size(size as u64, settings.human_readable), None => match item_path.metadata() { Ok(_) => "Unknown".to_string(), - Err(_) => "Missing".to_string(), + Err(e) => { + log::warn!("File missing or inaccessible: {}", e); + "Missing".to_string() + } }, }, ColumnType::Compression => item.compression.to_string(), ColumnType::FileSize => match item_path.metadata() { Ok(metadata) => format_size(metadata.len(), settings.human_readable), - Err(_) => "Missing".to_string(), + Err(e) => { + log::warn!("File missing or inaccessible: {}", e); + "Missing".to_string() + } }, ColumnType::FilePath => item_path .clone() diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index 071e119..20794dc 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -227,7 +227,7 @@ async fn handle_as_meta_response( /// offset/length applied at the stream level — never loads the full item into memory. async fn handle_as_meta_response_with_metadata( db: &Arc>, - data_dir: &std::path::Path, + _data_dir: &std::path::Path, item_service: &Arc, item_id: i64, metadata: &HashMap, @@ -1451,7 +1451,7 @@ pub async fn handle_export_items( let db = state.db.clone(); let item_service = state.item_service.clone(); - let data_dir = state.data_dir.clone(); + let _data_dir = state.data_dir.clone(); // Resolve items in blocking task let items_with_meta = task::spawn_blocking(move || { @@ -1520,6 +1520,7 @@ pub async fn handle_export_items( } } + let tx_err = tx.clone(); let writer = ChannelWriter { tx }; if let Err(e) = crate::export_tar::write_export_tar( @@ -1532,7 +1533,7 @@ pub async fn handle_export_items( &conn, ) { warn!("Export tar write failed: {e}"); - let _ = tx.blocking_send(Err(std::io::Error::other(format!("Export failed: {e}")))); + let _ = tx_err.blocking_send(Err(std::io::Error::other(format!("Export failed: {e}")))); } // Channel drops here, signaling EOF to the stream }); @@ -1602,11 +1603,7 @@ pub async fn handle_import_items( .map_err(|e| { warn!("Failed to import tar: {e}"); match &e { - crate::services::error::CoreError::Io(io_err) - if io_err.to_string() == "Payload too large" => - { - StatusCode::PAYLOAD_TOO_LARGE - } + crate::services::error::CoreError::PayloadTooLarge => StatusCode::PAYLOAD_TOO_LARGE, _ => StatusCode::INTERNAL_SERVER_ERROR, } })?; diff --git a/src/modes/server/common.rs b/src/modes/server/common.rs index 72d1475..b45c3dc 100644 --- a/src/modes/server/common.rs +++ b/src/modes/server/common.rs @@ -419,6 +419,8 @@ impl TryFrom for ItemInfo { type Error = anyhow::Error; fn try_from(item_with_meta: ItemWithMeta) -> Result { + let tags = item_with_meta.tag_names(); + let metadata = item_with_meta.meta_as_map(); Ok(ItemInfo { id: item_with_meta .item @@ -429,8 +431,8 @@ impl TryFrom for ItemInfo { compressed_size: item_with_meta.item.compressed_size, closed: item_with_meta.item.closed, compression: item_with_meta.item.compression, - tags: item_with_meta.tag_names(), - metadata: item_with_meta.meta_as_map(), + tags, + metadata, }) } } diff --git a/src/services/compression_service.rs b/src/services/compression_service.rs index 74ce758..fcda7ae 100644 --- a/src/services/compression_service.rs +++ b/src/services/compression_service.rs @@ -112,38 +112,27 @@ impl CompressionService { Ok(reader) } - /// Creates a decompressing reader wrapping the given reader. - /// - /// Returns a boxed reader that decompresses on the fly based on the compression type. - /// Useful for decompressing network streams or other non-file sources. - /// - /// # Arguments - /// - /// * `reader` - The underlying compressed reader. - /// * `compression` - Compression type string (e.g., "gzip", "lz4"). - /// - /// # Returns - /// - /// A boxed decompressing reader. Unknown/none types pass through unchanged. pub fn decompressing_reader( reader: Box, compression: &CompressionType, - ) -> Box { + ) -> Result, CoreError> { match compression { CompressionType::GZip => { use flate2::read::GzDecoder; - Box::new(GzDecoder::new(reader)) + Ok(Box::new(GzDecoder::new(reader))) } CompressionType::LZ4 => { use lz4_flex::frame::FrameDecoder; - Box::new(FrameDecoder::new(reader)) + Ok(Box::new(FrameDecoder::new(reader))) } #[cfg(feature = "zstd")] CompressionType::ZStd => { use zstd::stream::read::Decoder; - Box::new(Decoder::new(reader).expect("Failed to create zstd decoder")) + Ok(Box::new(Decoder::new(reader).map_err(|e| { + CoreError::Compression(format!("zstd decoder error: {}", e)) + })?)) } - _ => reader, + _ => Ok(reader), } } @@ -163,24 +152,24 @@ impl CompressionService { pub fn compressing_writer( writer: Box, compression: &CompressionType, - ) -> Box { + ) -> Result, CoreError> { match compression { CompressionType::GZip => { use flate2::Compression; use flate2::write::GzEncoder; - Box::new(GzEncoder::new(writer, Compression::default())) + Ok(Box::new(GzEncoder::new(writer, Compression::default()))) } - CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(writer)), + CompressionType::LZ4 => Ok(Box::new(lz4_flex::frame::FrameEncoder::new(writer))), #[cfg(feature = "zstd")] CompressionType::ZStd => { use zstd::stream::write::Encoder; - Box::new( + Ok(Box::new( Encoder::new(writer, 3) - .expect("Failed to create zstd encoder") + .map_err(|e| CoreError::Compression(format!("zstd encoder error: {}", e)))? .auto_finish(), - ) + )) } - _ => writer, + _ => Ok(writer), } } } diff --git a/src/services/error.rs b/src/services/error.rs index 0d95e18..5319ac0 100644 --- a/src/services/error.rs +++ b/src/services/error.rs @@ -13,32 +13,27 @@ use thiserror::Error; /// * `ItemNotFoundGeneric` - Generic item not found (no ID specified). /// * `InvalidInput(String)` - User or config input validation failure with message. /// * `Compression(String)` - Compression/decompression errors with details. +/// * `PayloadTooLarge` - Request body exceeded maximum allowed size. /// * `Other(anyhow::Error)` - Catch-all for other anyhow-wrapped errors. /// * `Migration(rusqlite_migration::Error)` - Database migration failures. #[derive(Error, Debug)] pub enum CoreError { #[error("Database error: {0}")] - /// Database operation failed. Database(#[from] rusqlite::Error), #[error("I/O error: {0}")] - /// File or stream I/O operation failed. Io(#[from] std::io::Error), #[error("Item not found with id {0}")] - /// Item with the specified ID does not exist in the database. ItemNotFound(i64), #[error("Item not found")] - /// Item does not exist (no specific ID). ItemNotFoundGeneric, #[error("Invalid input: {0}")] - /// Input validation failed. InvalidInput(String), #[error("Compression error: {0}")] - /// Compression or decompression operation failed. Compression(String), + #[error("Payload too large")] + PayloadTooLarge, #[error(transparent)] - /// Other unexpected error. Other(#[from] anyhow::Error), #[error("Migration error: {0}")] - /// Database schema migration failed. Migration(#[from] rusqlite_migration::Error), } diff --git a/src/services/item_service.rs b/src/services/item_service.rs index 1664f38..2b34f79 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -62,6 +62,12 @@ impl ItemService { } } + fn item_path(&self, item_id: i64) -> PathBuf { + let mut path = self.data_path.clone(); + path.push(item_id.to_string()); + path + } + /// Retrieves an item with its associated metadata and tags. /// /// Fetches the item from the database by ID and loads its tags and metadata. @@ -159,8 +165,7 @@ impl ItemService { ))); } - let mut item_path = self.data_path.clone(); - item_path.push(item_id.to_string()); + let item_path = self.item_path(item_id); debug!("ITEM_SERVICE: Reading content from path: {item_path:?}"); let content = self @@ -304,8 +309,7 @@ impl ItemService { ))); } - let mut item_path = self.data_path.clone(); - item_path.push(item_id.to_string()); + let item_path = self.item_path(item_id); let reader = self .compression_service @@ -345,8 +349,7 @@ impl ItemService { ))); } - let mut item_path = self.data_path.clone(); - item_path.push(item_id.to_string()); + let item_path = self.item_path(item_id); let reader = self .compression_service @@ -540,8 +543,7 @@ impl ItemService { let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?; debug!("ITEM_SERVICE: Found item to delete: {item:?}"); - let mut item_path = self.data_path.clone(); - item_path.push(id.to_string()); + let item_path = self.item_path(id); debug!("ITEM_SERVICE: Deleting file at path: {item_path:?}"); let deleted_item = item.clone(); @@ -662,8 +664,7 @@ impl ItemService { debug!("ITEM_SERVICE: Got {} meta plugins", plugins.len()); meta_service.initialize_plugins(&mut plugins); - let mut item_path = self.data_path.clone(); - item_path.push(item_id.to_string()); + let item_path = self.item_path(item_id); debug!("ITEM_SERVICE: Writing item to path: {item_path:?}"); let mut item_out = compression_engine.create(item_path.clone())?; @@ -859,8 +860,7 @@ impl ItemService { meta_service.initialize_plugins(&mut plugins); } - let mut item_path = self.data_path.clone(); - item_path.push(item_id.to_string()); + let item_path = self.item_path(item_id); let mut item_out = compression_engine.create(item_path.clone())?; @@ -933,8 +933,7 @@ impl ItemService { return self.get_item(conn, item_id); } - let mut item_path = self.data_path.clone(); - item_path.push(item_id.to_string()); + let item_path = self.item_path(item_id); if !item_path.exists() { return Err(CoreError::ItemNotFound(item_id)); diff --git a/src/services/mod.rs b/src/services/mod.rs index 458080e..3154d52 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -13,5 +13,5 @@ pub use filter_service::{FilterService, register_filter_plugin}; pub use item_service::ItemService; pub use meta_service::MetaService; pub use status_service::StatusService; -pub use types::{ItemWithContent, ItemWithMeta}; +pub use types::{ItemInfo, ItemWithContent, ItemWithMeta}; pub use utils::{calc_byte_range, extract_tags, parse_comma_tags}; diff --git a/src/services/types.rs b/src/services/types.rs index 962e842..02a7662 100644 --- a/src/services/types.rs +++ b/src/services/types.rs @@ -62,3 +62,15 @@ pub struct ItemWithContent { /// The content bytes. pub content: Vec, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ItemInfo { + pub id: i64, + pub ts: String, + pub uncompressed_size: Option, + pub compressed_size: Option, + pub closed: bool, + pub compression: String, + pub tags: Vec, + pub metadata: HashMap, +}