diff --git a/src/services/item_service.rs b/src/services/item_service.rs index f902380..5a6084c 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -17,79 +17,41 @@ use std::fs; use std::io::{IsTerminal, Read, Write}; use std::path::PathBuf; -/// A reader that applies a filter chain to the data as it's read -struct FilteringReader { - reader: R, - filter_chain: Option, - buffer: Vec, - buffer_pos: usize, -} - -impl FilteringReader { - pub fn new(reader: R, filter_chain: Option) -> Self { - Self { - reader, - filter_chain, - buffer: Vec::new(), - buffer_pos: 0, - } - } -} - -impl Read for FilteringReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - // If we have data in our buffer, serve that first - if self.buffer_pos < self.buffer.len() { - let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos); - buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]); - self.buffer_pos += bytes_to_copy; - return Ok(bytes_to_copy); - } - - // Reset buffer for new data - self.buffer.clear(); - self.buffer_pos = 0; - - // Read from the original reader into a temporary buffer - let mut temp_buf = vec![0; buf.len()]; - let bytes_read = self.reader.read(&mut temp_buf)?; - - if bytes_read == 0 { - return Ok(0); - } - - // Process through the filter chain if it exists - if let Some(ref mut chain) = self.filter_chain { - // Use a cursor to read the input data - let mut input_cursor = std::io::Cursor::new(&temp_buf[..bytes_read]); - // Write filtered output to our buffer - chain.filter(&mut input_cursor, &mut self.buffer)?; - - if !self.buffer.is_empty() { - let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len()); - buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]); - self.buffer_pos = bytes_to_copy; - Ok(bytes_to_copy) - } else { - // No data produced by filter, try reading more - Ok(0) - } - } else { - // No filter chain, just pass through - buf[..bytes_read].copy_from_slice(&temp_buf[..bytes_read]); - Ok(bytes_read) - } - } -} - +/// Service for managing items in the Keep application. +/// +/// This service handles CRUD operations for items, including saving content, +/// retrieving items with metadata and content, applying filters, and managing +/// compression. It integrates with the database, file system, compression engines, +/// metadata plugins, and filters to provide a complete item management interface. pub struct ItemService { + /// Path to the data storage directory. data_path: PathBuf, + /// Service for handling compression and decompression. compression_service: CompressionService, + /// Service for managing metadata plugins. meta_service: MetaService, + /// Service for applying content filters. filter_service: FilterService, } impl ItemService { + /// Creates a new `ItemService` instance. + /// + /// Initializes the service with the specified data directory path. + /// + /// # Arguments + /// + /// * `data_path` - Path to the directory where item files are stored. + /// + /// # Returns + /// + /// A new `ItemService` instance. + /// + /// # Examples + /// + /// ``` + /// let service = ItemService::new(PathBuf::from("/data")); + /// ``` pub fn new(data_path: PathBuf) -> Self { debug!("ITEM_SERVICE: Creating new ItemService with data_path: {:?}", data_path); Self { @@ -100,6 +62,30 @@ impl ItemService { } } + /// Retrieves an item with its associated metadata and tags. + /// + /// Fetches the item from the database by ID and loads its tags and metadata. + /// + /// # Arguments + /// + /// * `conn` - Database connection. + /// * `id` - Item ID to retrieve. + /// + /// # Returns + /// + /// * `Result` - Item with metadata and tags, or an error if not found. + /// + /// # Errors + /// + /// * `CoreError::ItemNotFound(id)` - If the item does not exist. + /// * Database-related errors. + /// + /// # Examples + /// + /// ``` + /// let item_with_meta = item_service.get_item(&conn, 1)?; + /// assert_eq!(item_with_meta.item.id, Some(1)); + /// ``` pub fn get_item(&self, conn: &Connection, id: i64) -> Result { debug!("ITEM_SERVICE: Getting item with id: {}", id); let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?; @@ -111,6 +97,30 @@ impl ItemService { Ok(ItemWithMeta { item, tags, meta }) } + /// Retrieves an item with its content, metadata, and tags. + /// + /// Loads the item, its metadata/tags, and decompresses the full content. + /// + /// # Arguments + /// + /// * `conn` - Database connection. + /// * `id` - Item ID to retrieve. + /// + /// # Returns + /// + /// * `Result` - Item with content, or an error if not found. + /// + /// # Errors + /// + /// * `CoreError::ItemNotFound(id)` - If the item does not exist. + /// * `CoreError::Io(...)` - If file read or decompression fails. + /// + /// # Examples + /// + /// ``` + /// let item_with_content = item_service.get_item_content(&conn, 1)?; + /// assert!(!item_with_content.content.is_empty()); + /// ``` pub fn get_item_content(&self, conn: &Connection, id: i64) -> Result { debug!("ITEM_SERVICE: Getting item content for id: {}", id); let item_with_meta = self.get_item(conn, id)?; @@ -135,6 +145,30 @@ impl ItemService { }) } + /// Retrieves item content with binary detection and optional filtering. + /// + /// Loads content, applies filters if specified, and determines MIME type and binary status. + /// + /// # Arguments + /// + /// * `conn` - Database connection. + /// * `id` - Item ID. + /// * `filter` - Optional filter string to apply to content. + /// + /// # Returns + /// + /// * `Result<(Vec, String, bool), CoreError>` - (content, MIME type, is_binary). + /// + /// # Errors + /// + /// * `CoreError::ItemNotFound(id)` - If item not found. + /// * Filter or compression errors. + /// + /// # Examples + /// + /// ``` + /// let (content, mime, is_binary) = item_service.get_item_content_info(&conn, 1, Some("head_lines(10)"))?; + /// ``` pub fn get_item_content_info( &self, conn: &Connection, @@ -153,8 +187,29 @@ impl ItemService { Ok((content, mime_type, is_binary)) } - - /// Helper method to determine if content is binary + /// Determines if item content is binary based on metadata or sampling. + /// + /// Checks existing "text" metadata first; if absent, samples the first 8192 bytes. + /// + /// # Arguments + /// + /// * `item_path` - Path to the item file. + /// * `compression` - Compression type. + /// * `metadata` - Item metadata. + /// + /// # Returns + /// + /// * `Result` - True if binary, false if text. + /// + /// # Errors + /// + /// * File or compression errors. + /// + /// # Examples + /// + /// ``` + /// let is_bin = item_service.is_content_binary(path, "gzip", &meta)?; + /// ``` fn is_content_binary( &self, item_path: PathBuf, @@ -176,6 +231,30 @@ impl ItemService { Ok(crate::common::is_binary::is_binary(&sample_buffer[..bytes_read])) } + /// Retrieves a streaming reader for item content with optional filtering. + /// + /// Returns a boxed reader that applies compression decompression and filters. + /// + /// # Arguments + /// + /// * `conn` - Database connection. + /// * `id` - Item ID. + /// * `filter` - Optional filter string. + /// + /// # Returns + /// + /// * `Result<(Box, String, bool), CoreError>` - (reader, MIME type, is_binary). + /// + /// # Errors + /// + /// * `CoreError::ItemNotFound(id)` - If item not found. + /// * Filter parsing or compression errors. + /// + /// # Examples + /// + /// ``` + /// let (reader, mime, is_bin) = item_service.get_item_content_info_streaming(&conn, 1, Some("grep(error)"))?; + /// ``` pub fn get_item_content_info_streaming( &self, conn: &Connection, @@ -193,6 +272,29 @@ impl ItemService { self.get_item_content_info_streaming_with_chain(conn, id, filter_chain.as_ref()) } + /// Retrieves a streaming reader with a pre-built filter chain. + /// + /// # Arguments + /// + /// * `conn` - Database connection. + /// * `id` - Item ID. + /// * `filter_chain` - Optional pre-parsed filter chain. + /// + /// # Returns + /// + /// * `Result<(Box, String, bool), CoreError>` - (reader, MIME type, is_binary). + /// + /// # Errors + /// + /// * `CoreError::ItemNotFound(id)` - If item not found. + /// * Compression or filtering errors. + /// + /// # Examples + /// + /// ``` + /// let chain = parse_filter_string("head(100)")?; + /// let (reader, mime, is_bin) = item_service.get_item_content_info_streaming_with_chain(&conn, 1, Some(&chain))?; + /// ``` pub fn get_item_content_info_streaming_with_chain( &self, conn: &Connection, @@ -233,6 +335,31 @@ impl ItemService { Ok((filtered_reader, mime_type, is_binary)) } + /// Finds an item by ID or tags/metadata criteria. + /// + /// Supports lookup by ID, last item, or search by tags/metadata. + /// + /// # Arguments + /// + /// * `conn` - Database connection. + /// * `ids` - Vector of IDs (if non-empty, uses first ID). + /// * `tags` - Vector of tags (all must match if provided). + /// * `meta` - HashMap of metadata key-value pairs (exact match). + /// + /// # Returns + /// + /// * `Result` - The found item or error. + /// + /// # Errors + /// + /// * `CoreError::ItemNotFound(...)` - If no matching item. + /// * Database errors. + /// + /// # Examples + /// + /// ``` + /// let item = item_service.find_item(&conn, vec![1], &vec![], &HashMap::new())?; + /// ``` pub fn find_item(&self, conn: &Connection, ids: &[i64], tags: &[String], meta: &HashMap) -> Result { debug!("ITEM_SERVICE: Finding item with ids: {:?}, tags: {:?}, meta: {:?}", ids, tags, meta); let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) { @@ -263,6 +390,29 @@ impl ItemService { Ok(ItemWithMeta { item, tags, meta }) } + /// Lists items matching tags and metadata criteria. + /// + /// Filters by tags (all must match) and exact metadata values, then loads full details. + /// + /// # Arguments + /// + /// * `conn` - Database connection. + /// * `tags` - Vector of tags (all must match). + /// * `meta` - HashMap of metadata key-value pairs (exact match). + /// + /// # Returns + /// + /// * `Result, CoreError>` - List of matching items. + /// + /// # Errors + /// + /// * Database query errors. + /// + /// # Examples + /// + /// ``` + /// let items = item_service.list_items(&conn, &vec!["work"], &HashMap::new())?; + /// ``` pub fn list_items(&self, conn: &Connection, tags: &[String], meta: &HashMap) -> Result, CoreError> { debug!("ITEM_SERVICE: Listing items with tags: {:?}, meta: {:?}", tags, meta); let items = db::get_items_matching(conn, &tags.to_vec(), meta)?; @@ -292,6 +442,29 @@ impl ItemService { Ok(result) } + /// Deletes an item by ID from database and storage. + /// + /// Removes the item row, associated tags/metadata, and the physical file. + /// + /// # Arguments + /// + /// * `conn` - Mutable database connection. + /// * `id` - Item ID to delete. + /// + /// # Returns + /// + /// * `Result<(), CoreError>` - Success or error. + /// + /// # Errors + /// + /// * `CoreError::ItemNotFound(id)` - If item does not exist. + /// * File deletion errors (non-fatal if not found). + /// + /// # Examples + /// + /// ``` + /// item_service.delete_item(&mut conn, 1)?; + /// ``` pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<(), CoreError> { debug!("ITEM_SERVICE: Deleting item with id: {}", id); if id <= 0 { @@ -311,6 +484,34 @@ impl ItemService { Ok(()) } + /// Saves content from a reader to a new item. + /// + /// Reads from the input reader (e.g., stdin), applies metadata plugins, + /// compresses the content, and stores it with tags. Echoes input to stdout via TeeReader. + /// + /// # Arguments + /// + /// * `input` - Reader providing the content to save (e.g., stdin). + /// * `cmd` - Mutable Clap command for error handling. + /// * `settings` - Application settings. + /// * `tags` - Tags to associate (defaults to "none" if empty). + /// * `conn` - Mutable database connection. + /// + /// # Returns + /// + /// * `Result` - The ID of the new item. + /// + /// # Errors + /// + /// * `CoreError::InvalidInput(...)` - If validation fails. + /// * Database or file I/O errors. + /// + /// # Examples + /// + /// ``` + /// let reader = std::io::stdin(); + /// let id = item_service.save_item(reader, &mut cmd, &settings, &mut vec![], &mut conn)?; + /// ``` pub fn save_item( &self, mut input: R, @@ -515,3 +716,120 @@ impl ItemService { } +/// A reader that applies a filter chain to the data as it's read. +/// +/// Wraps an underlying reader and applies a filter chain to the data during read operations. +/// Buffers data as needed for filter processing. +/// +/// # Fields +/// +/// * `reader` - The underlying reader providing the data source. +/// * `filter_chain` - Optional filter chain to apply. +/// * `buffer` - Internal buffer for holding filtered data. +/// * `buffer_pos` - Current position in the internal buffer. +struct FilteringReader { + reader: R, + filter_chain: Option, + buffer: Vec, + buffer_pos: usize, +} + +impl FilteringReader { + /// Creates a new `FilteringReader` with the given reader and filter chain. + /// + /// # Arguments + /// + /// * `reader` - The underlying reader. + /// * `filter_chain` - Optional filter chain to apply. + /// + /// # Returns + /// + /// A new `FilteringReader`. + /// + /// # Examples + /// + /// ``` + /// let reader = std::io::Cursor::new(b"data"); + /// let filter_chain = parse_filter_string("head(10)")?; + /// let filtered = FilteringReader::new(reader, Some(filter_chain)); + /// ``` + pub fn new(reader: R, filter_chain: Option) -> Self { + Self { + reader, + filter_chain, + buffer: Vec::new(), + buffer_pos: 0, + } + } +} + +impl Read for FilteringReader { + /// Reads data, applying the filter chain if present. + /// + /// If buffered data exists, serves it first. Otherwise, reads a chunk, filters it, + /// and serves the output. Handles EOF properly. + /// + /// # Arguments + /// + /// * `buf` - Buffer to fill with filtered data. + /// + /// # Returns + /// + /// * `io::Result` - Number of bytes read, or I/O error. + /// + /// # Errors + /// + /// Propagates errors from underlying reader or filter operations. + /// + /// # Examples + /// + /// ``` + /// let mut filtered = FilteringReader::new(std::io::Cursor::new(b"Hello"), None); + /// let mut buf = [0; 5]; + /// let n = filtered.read(&mut buf).unwrap(); + /// assert_eq!(n, 5); + /// ``` + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // If we have data in our buffer, serve that first + if self.buffer_pos < self.buffer.len() { + let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos); + buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]); + self.buffer_pos += bytes_to_copy; + return Ok(bytes_to_copy); + } + + // Reset buffer for new data + self.buffer.clear(); + self.buffer_pos = 0; + + // Read from the original reader into a temporary buffer + let mut temp_buf = vec![0; buf.len()]; + let bytes_read = self.reader.read(&mut temp_buf)?; + + if bytes_read == 0 { + return Ok(0); + } + + // Process through the filter chain if it exists + if let Some(ref mut chain) = self.filter_chain { + // Use a cursor to read the input data + let mut input_cursor = std::io::Cursor::new(&temp_buf[..bytes_read]); + // Write filtered output to our buffer + chain.filter(&mut input_cursor, &mut self.buffer)?; + + if !self.buffer.is_empty() { + let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len()); + buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]); + self.buffer_pos = bytes_to_copy; + Ok(bytes_to_copy) + } else { + // No data produced by filter, try reading more + Ok(0) + } + } else { + // No filter chain, just pass through + buf[..bytes_read].copy_from_slice(&temp_buf[..bytes_read]); + Ok(bytes_read) + } + } +}