use crate::common::PIPESIZE; use crate::config::Settings; use crate::services::compression_service::CompressionService; use crate::services::error::CoreError; use crate::services::filter_service::FilterService; use crate::services::meta_service::MetaService; use crate::services::types::{ItemWithContent, ItemWithMeta}; use crate::db::{self, Meta}; use crate::compression_engine::{get_compression_engine, CompressionType}; use crate::modes::common::settings_compression_type; use crate::filter_plugin; use clap::Command; use log::debug; use rusqlite::Connection; use std::collections::HashMap; use std::fs; use std::io::{IsTerminal, Read, Write}; use std::path::PathBuf; /// A reader that applies a filter chain to the data as it's read struct FilteringReader { reader: R, filter_chain: Option, buffer: Vec, buffer_pos: usize, } impl FilteringReader { pub fn new(reader: R, filter_chain: Option) -> Self { Self { reader, filter_chain, buffer: Vec::new(), buffer_pos: 0, } } } impl Read for FilteringReader { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { // If we have data in our buffer, serve that first if self.buffer_pos < self.buffer.len() { let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos); buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]); self.buffer_pos += bytes_to_copy; return Ok(bytes_to_copy); } // Reset buffer for new data self.buffer.clear(); self.buffer_pos = 0; // Read from the original reader let mut temp_buf = vec![0; buf.len()]; let bytes_read = self.reader.read(&mut temp_buf)?; if bytes_read == 0 { // If we're at EOF, process any remaining data in the filter chain if let Some(chain) = &mut self.filter_chain { let finished_data = chain.finish()?; if !finished_data.is_empty() { self.buffer = finished_data; let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len()); buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]); self.buffer_pos = bytes_to_copy; return Ok(bytes_to_copy); } } return Ok(0); } // Process through the filter chain if it exists if let Some(chain) = &mut self.filter_chain { let processed_data = chain.process(&temp_buf[..bytes_read])?; if !processed_data.is_empty() { self.buffer = processed_data; let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len()); buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]); self.buffer_pos = bytes_to_copy; Ok(bytes_to_copy) } else { // No data produced by filter, try reading more Ok(0) } } else { // No filter chain, just pass through buf[..bytes_read].copy_from_slice(&temp_buf[..bytes_read]); Ok(bytes_read) } } } pub struct ItemService { data_path: PathBuf, compression_service: CompressionService, meta_service: MetaService, filter_service: FilterService, } impl ItemService { pub fn new(data_path: PathBuf) -> Self { debug!("ITEM_SERVICE: Creating new ItemService with data_path: {:?}", data_path); Self { data_path, compression_service: CompressionService::new(), meta_service: MetaService::new(), filter_service: FilterService::new(), } } pub fn get_item(&self, conn: &Connection, id: i64) -> Result { debug!("ITEM_SERVICE: Getting item with id: {}", id); let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?; debug!("ITEM_SERVICE: Found item: {:?}", item); let tags = db::get_item_tags(conn, &item)?; debug!("ITEM_SERVICE: Found {} tags for item {}", tags.len(), id); let meta = db::get_item_meta(conn, &item)?; debug!("ITEM_SERVICE: Found {} meta entries for item {}", meta.len(), id); Ok(ItemWithMeta { item, tags, meta }) } pub fn get_item_content(&self, conn: &Connection, id: i64) -> Result { debug!("ITEM_SERVICE: Getting item content for id: {}", id); let item_with_meta = self.get_item(conn, id)?; let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; if item_id <= 0 { return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", item_id))); } let mut item_path = self.data_path.clone(); item_path.push(item_id.to_string()); debug!("ITEM_SERVICE: Reading content from path: {:?}", item_path); let content = self .compression_service .get_item_content(item_path, &item_with_meta.item.compression)?; debug!("ITEM_SERVICE: Read {} bytes of content for item {}", content.len(), id); Ok(ItemWithContent { item_with_meta, content, }) } pub fn get_item_content_info( &self, conn: &Connection, id: i64, head_bytes: Option, head_words: Option, head_lines: Option, tail_bytes: Option, tail_words: Option, tail_lines: Option, line_start: Option, line_end: Option, grep: Option, ) -> Result<(Vec, String, bool), CoreError> { // Use streaming approach to handle all filtering options consistently let (mut reader, mime_type, is_binary) = self.get_item_content_info_streaming( conn, id, head_bytes, head_words, head_lines, tail_bytes, tail_words, tail_lines, line_start, line_end, grep )?; // Read all the filtered content into a buffer let mut content = Vec::new(); reader.read_to_end(&mut content)?; Ok((content, mime_type, is_binary)) } /// Helper method to create a filter chain from parameters fn create_filter_chain( &self, grep: Option, head_bytes: Option, head_lines: Option, tail_bytes: Option, tail_lines: Option, filter: Option, ) -> Result, CoreError> { // Build filter string from individual parameters (for backward compatibility) let mut filter_parts = Vec::new(); if let Some(pattern) = grep { filter_parts.push(format!("grep('{}')", pattern.replace('\'', "\\'"))); } if let Some(bytes) = head_bytes { filter_parts.push(format!("head_bytes({})", bytes)); } if let Some(lines) = head_lines { filter_parts.push(format!("head_lines({})", lines)); } if let Some(bytes) = tail_bytes { filter_parts.push(format!("tail_bytes({})", bytes)); } if let Some(lines) = tail_lines { filter_parts.push(format!("tail_lines({})", lines)); } // Use the provided filter string if available, otherwise build from parts let filter_str = filter.or_else(|| { if filter_parts.is_empty() { None } else { Some(filter_parts.join(" | ")) } }); // Create filter chain self.filter_service.create_filter_chain(filter_str.as_deref()) .map_err(|e| CoreError::InvalidInput(format!("Failed to create filter chain: {}", e))) } /// Helper method to determine if content is binary fn is_content_binary( &self, item_path: PathBuf, compression: &str, metadata: &HashMap, ) -> Result { // Check if we already have text metadata if let Some(text_val) = metadata.get("text") { return Ok(text_val == "false"); } // Read only the first 8192 bytes for binary detection let mut sample_reader = self.compression_service.stream_item_content( item_path, compression )?; let mut sample_buffer = vec![0; 8192]; let bytes_read = sample_reader.read(&mut sample_buffer)?; Ok(crate::common::is_binary::is_binary(&sample_buffer[..bytes_read])) } pub fn get_item_content_info_streaming( &self, conn: &Connection, id: i64, head_bytes: Option, _head_words: Option, head_lines: Option, tail_bytes: Option, _tail_words: Option, tail_lines: Option, _line_start: Option, _line_end: Option, grep: Option, ) -> Result<(Box, String, bool), CoreError> { let item_with_meta = self.get_item(conn, id)?; let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; if item_id <= 0 { return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", item_id))); } let mut item_path = self.data_path.clone(); item_path.push(item_id.to_string()); let reader = self.compression_service.stream_item_content( item_path.clone(), &item_with_meta.item.compression )?; // Create filter chain let filter_chain = self.create_filter_chain( grep, head_bytes, head_lines, tail_bytes, tail_lines, None )?; // Wrap the reader with filtering let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain)); let metadata = item_with_meta.meta_as_map(); let mime_type = metadata .get("mime_type") .map(|s| s.to_string()) .unwrap_or_else(|| "application/octet-stream".to_string()); // Check if content is binary let is_binary = self.is_content_binary( item_path, &item_with_meta.item.compression, &metadata )?; Ok((filtered_reader, mime_type, is_binary)) } pub fn find_item(&self, conn: &Connection, ids: &[i64], tags: &[String], meta: &HashMap) -> Result { debug!("ITEM_SERVICE: Finding item with ids: {:?}, tags: {:?}, meta: {:?}", ids, tags, meta); let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) { (false, _) => { debug!("ITEM_SERVICE: Finding by ID: {}", ids[0]); db::get_item(conn, ids[0])? }, (true, true) => { debug!("ITEM_SERVICE: Finding last item"); db::get_item_last(conn)? }, (true, false) => { debug!("ITEM_SERVICE: Finding by tags/meta"); db::get_item_matching(conn, &tags.to_vec(), meta)? } }; let item = item_maybe.ok_or(CoreError::ItemNotFoundGeneric)?; debug!("ITEM_SERVICE: Found matching item: {:?}", item); // Get tags and meta directly instead of calling get_item which makes redundant queries let item_id = item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; let tags = db::get_item_tags(conn, &item)?; debug!("ITEM_SERVICE: Found {} tags for item {}", tags.len(), item_id); let meta = db::get_item_meta(conn, &item)?; debug!("ITEM_SERVICE: Found {} meta entries for item {}", meta.len(), item_id); Ok(ItemWithMeta { item, tags, meta }) } pub fn list_items(&self, conn: &Connection, tags: &[String], meta: &HashMap) -> Result, CoreError> { debug!("ITEM_SERVICE: Listing items with tags: {:?}, meta: {:?}", tags, meta); let items = db::get_items_matching(conn, &tags.to_vec(), meta)?; debug!("ITEM_SERVICE: Found {} matching items", items.len()); let item_ids: Vec = items.iter().filter_map(|item| item.id).collect(); if item_ids.is_empty() { debug!("ITEM_SERVICE: No items found, returning empty list"); return Ok(Vec::new()); } debug!("ITEM_SERVICE: Getting tags and meta for {} items", item_ids.len()); let tags_map = db::get_tags_for_items(conn, &item_ids)?; let meta_map_db = db::get_meta_for_items(conn, &item_ids)?; let mut result = Vec::new(); for item in items { let item_id = item.id.unwrap(); let tags = tags_map.get(&item_id).cloned().unwrap_or_default(); let meta_hm = meta_map_db.get(&item_id).cloned().unwrap_or_default(); let meta = meta_hm.into_iter().map(|(name, value)| Meta { id: item_id, name, value }).collect(); result.push(ItemWithMeta { item, tags, meta }); } debug!("ITEM_SERVICE: Returning {} items with full metadata", result.len()); Ok(result) } pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<(), CoreError> { debug!("ITEM_SERVICE: Deleting item with id: {}", id); if id <= 0 { return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", id))); } let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?; debug!("ITEM_SERVICE: Found item to delete: {:?}", item); let mut item_path = self.data_path.clone(); item_path.push(id.to_string()); debug!("ITEM_SERVICE: Deleting file at path: {:?}", item_path); db::delete_item(conn, item)?; fs::remove_file(&item_path).or_else(|e| if e.kind() == std::io::ErrorKind::NotFound { Ok(()) } else { Err(e) })?; debug!("ITEM_SERVICE: Successfully deleted item {}", id); Ok(()) } pub fn save_item( &self, mut input: R, cmd: &mut Command, settings: &Settings, tags: &mut Vec, conn: &mut Connection, ) -> Result { debug!("ITEM_SERVICE: Starting save_item with tags: {:?}", tags); if tags.is_empty() { tags.push("none".to_string()); debug!("ITEM_SERVICE: No tags provided, using default 'none' tag"); } let compression_type = settings_compression_type(cmd, settings); debug!("ITEM_SERVICE: Using compression type: {:?}", compression_type); let compression_engine = get_compression_engine(compression_type.clone())?; let item_id; let mut item; { item = db::create_item(conn, compression_type.clone())?; item_id = item.id.unwrap(); debug!("ITEM_SERVICE: Created new item with id: {}", item_id); db::set_item_tags(conn, item.clone(), tags)?; debug!("ITEM_SERVICE: Set tags for item {}", item_id); let item_meta = self.meta_service.collect_initial_meta(); debug!("ITEM_SERVICE: Collected {} initial meta entries", item_meta.len()); for (k, v) in item_meta.iter() { db::add_meta(conn, item_id, k, v)?; } } // Print the "KEEP: New item" message before starting to read input if !settings.quiet { if std::io::stderr().is_terminal() { let mut t = term::stderr().unwrap(); let _ = t.reset(); let _ = t.attr(term::Attr::Bold); let _ = write!(t, "KEEP:"); let _ = t.reset(); let _ = write!(t, " New item "); let _ = t.attr(term::Attr::Bold); let _ = write!(t, "{item_id}"); let _ = t.reset(); let _ = write!(t, " tags: "); let _ = t.attr(term::Attr::Bold); let _ = write!(t, "{}", tags.join(" ")); let _ = t.reset(); let _ = writeln!(t); let _ = std::io::stderr().flush(); } else { let mut t = std::io::stderr(); let _ = writeln!(t, "KEEP: New item: {} tags: {:?}", item_id, tags); } } let mut plugins = self.meta_service.get_plugins(cmd, settings); debug!("ITEM_SERVICE: Got {} meta plugins", plugins.len()); self.meta_service.initialize_plugins(&mut plugins, conn, item_id); let mut item_path = self.data_path.clone(); item_path.push(item_id.to_string()); debug!("ITEM_SERVICE: Writing item to path: {:?}", item_path); let mut item_out = compression_engine.create(item_path.clone())?; let mut buffer = [0; PIPESIZE]; let mut total_bytes = 0; debug!("ITEM_SERVICE: Starting to read and process input data"); loop { let n = input.read(&mut buffer)?; if n == 0 { break; } total_bytes += n as i64; item_out.write_all(&buffer[..n])?; self.meta_service.process_chunk(&mut plugins, &buffer[..n], conn, item_id); } debug!("ITEM_SERVICE: Processed {} bytes total", total_bytes); item_out.flush()?; drop(item_out); debug!("ITEM_SERVICE: Finalizing meta plugins"); self.meta_service.finalize_plugins(&mut plugins, conn, item_id); item.size = Some(total_bytes); db::update_item(conn, item.clone())?; debug!("ITEM_SERVICE: Save completed successfully"); Ok(item_id) } pub fn save_item_from_mcp( &self, content: &[u8], tags: &Vec, metadata: &HashMap, cmd: &mut Command, settings: &Settings, conn: &mut Connection, ) -> Result { debug!("ITEM_SERVICE: Starting save_item_from_mcp with {} bytes, {} tags, {} metadata entries", content.len(), tags.len(), metadata.len()); let compression_type = CompressionType::LZ4; let compression_engine = get_compression_engine(compression_type.clone())?; let item_id; let mut item; { item = db::create_item(conn, compression_type.clone())?; item_id = item.id.unwrap(); debug!("ITEM_SERVICE: Created MCP item with id: {}", item_id); // Add tags for tag in tags { db::add_tag(conn, item_id, tag)?; } debug!("ITEM_SERVICE: Added {} tags to MCP item", tags.len()); // Add custom metadata for (key, value) in metadata { db::add_meta(conn, item_id, key, value)?; } debug!("ITEM_SERVICE: Added {} custom metadata entries to MCP item", metadata.len()); } let mut item_path = self.data_path.clone(); item_path.push(item_id.to_string()); debug!("ITEM_SERVICE: Writing MCP item to path: {:?}", item_path); let mut writer = compression_engine.create(item_path.clone())?; writer.write_all(content)?; drop(writer); let mut plugins = self.meta_service.get_plugins(cmd, settings); debug!("ITEM_SERVICE: Got {} configured meta plugins for MCP item", plugins.len()); self.meta_service.initialize_plugins(&mut plugins, conn, item_id); self.meta_service.process_chunk(&mut plugins, content, conn, item_id); self.meta_service.finalize_plugins(&mut plugins, conn, item_id); debug!("ITEM_SERVICE: Processed MCP item through configured meta plugins"); item.size = Some(content.len() as i64); db::update_item(conn, item.clone())?; debug!("ITEM_SERVICE: MCP item saved successfully"); self.get_item(conn, item_id) } pub fn get_compression_service(&self) -> &CompressionService { &self.compression_service } pub fn get_data_path(&self) -> &PathBuf { &self.data_path } }