Security: - Use constant-time password comparison (subtle crate) to prevent timing attacks - Replace permissive CORS with configurable origin-restricted CORS - Add TLS warning when password auth is used without HTTPS Bug fixes: - Convert MetaPlugin panics to anyhow::Result (get_meta_plugin, outputs_mut, options_mut) - Replace item.id.unwrap() with proper error handling across 15 call sites - Fix panic on unknown column type in list mode - Fix conflicting PIPESIZE constant (was 8192 vs 65536, now unified to 8192) - Add 256MB filter chain buffer limit to prevent OOM - Gracefully skip unregistered plugins instead of panicking Dead code removal: - Delete unused filter parser files (filter_parser.rs, filter.pest, parser/ module) - ~260 lines of dead PEG parser code removed Code consolidation: - Add is_content_binary_from_metadata() helper (was duplicated in 4 places) - Simplify save_item_raw() to delegate to save_item_raw_streaming() (~90 lines removed) Incomplete features: - Populate filter_plugins in status output from global registry - Add FallbackMagicFileMetaPlugin (was referenced but never implemented) - Document init_plugins() as intentional no-op Infrastructure: - Add Dockerfile (static musl binary on scratch, 4.8MB) - Add .dockerignore - Add cors_origin to ServerConfig and config.rs
964 lines
32 KiB
Rust
964 lines
32 KiB
Rust
use crate::common::PIPESIZE;
|
|
use crate::compression_engine::{CompressionType, get_compression_engine};
|
|
use crate::config::Settings;
|
|
use crate::db::{self, Item, Meta};
|
|
use crate::filter_plugin;
|
|
use crate::modes::common::settings_compression_type;
|
|
use crate::services::compression_service::CompressionService;
|
|
use crate::services::error::CoreError;
|
|
use crate::services::filter_service::FilterService;
|
|
use crate::services::meta_service::MetaService;
|
|
use crate::services::types::{ItemWithContent, ItemWithMeta};
|
|
use clap::Command;
|
|
use log::debug;
|
|
use rusqlite::Connection;
|
|
use std::collections::HashMap;
|
|
use std::fs;
|
|
use std::io::{IsTerminal, Read, Write};
|
|
use std::path::PathBuf;
|
|
|
|
/// Service for managing items in the Keep application.
|
|
///
|
|
/// This service handles CRUD operations for items, including saving content,
|
|
/// retrieving items with metadata and content, applying filters, and managing
|
|
/// compression. It integrates with the database, file system, compression engines,
|
|
/// metadata plugins, and filters to provide a complete item management interface.
|
|
pub struct ItemService {
|
|
/// Path to the data storage directory.
|
|
data_path: PathBuf,
|
|
/// Service for handling compression and decompression.
|
|
compression_service: CompressionService,
|
|
/// Service for managing metadata plugins.
|
|
meta_service: MetaService,
|
|
/// Service for applying content filters.
|
|
filter_service: FilterService,
|
|
}
|
|
|
|
impl ItemService {
|
|
/// Creates a new `ItemService` instance.
|
|
///
|
|
/// Initializes the service with the specified data directory path.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `data_path` - Path to the directory where item files are stored.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// A new `ItemService` instance.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use keep::services::ItemService;
|
|
/// # use std::path::PathBuf;
|
|
/// let service = ItemService::new(PathBuf::from("/data"));
|
|
/// ```
|
|
pub fn new(data_path: PathBuf) -> Self {
|
|
debug!("ITEM_SERVICE: Creating new ItemService with data_path: {data_path:?}");
|
|
Self {
|
|
data_path,
|
|
compression_service: CompressionService::new(),
|
|
meta_service: MetaService::new(),
|
|
filter_service: FilterService::new(),
|
|
}
|
|
}
|
|
|
|
/// Retrieves an item with its associated metadata and tags.
|
|
///
|
|
/// Fetches the item from the database by ID and loads its tags and metadata.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `conn` - Database connection.
|
|
/// * `id` - Item ID to retrieve.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<ItemWithMeta, CoreError>` - Item with metadata and tags, or an error if not found.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * `CoreError::ItemNotFound(id)` - If the item does not exist.
|
|
/// * Database-related errors.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let item_with_meta = item_service.get_item(&conn, 1)?;
|
|
/// assert_eq!(item_with_meta.item.id, Some(1));
|
|
/// ```
|
|
pub fn get_item(&self, conn: &Connection, id: i64) -> Result<ItemWithMeta, CoreError> {
|
|
debug!("ITEM_SERVICE: Getting item with id: {id}");
|
|
let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?;
|
|
debug!("ITEM_SERVICE: Found item: {item:?}");
|
|
let tags = db::get_item_tags(conn, &item)?;
|
|
debug!("ITEM_SERVICE: Found {} tags for item {}", tags.len(), id);
|
|
let meta = db::get_item_meta(conn, &item)?;
|
|
debug!(
|
|
"ITEM_SERVICE: Found {} meta entries for item {}",
|
|
meta.len(),
|
|
id
|
|
);
|
|
Ok(ItemWithMeta { item, tags, meta })
|
|
}
|
|
|
|
/// Retrieves an item with its content, metadata, and tags.
|
|
///
|
|
/// Loads the item, its metadata/tags, and decompresses the full content.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `conn` - Database connection.
|
|
/// * `id` - Item ID to retrieve.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<ItemWithContent, CoreError>` - Item with content, or an error if not found.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * `CoreError::ItemNotFound(id)` - If the item does not exist.
|
|
/// * `CoreError::Io(...)` - If file read or decompression fails.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let item_with_content = item_service.get_item_content(&conn, 1)?;
|
|
/// assert!(!item_with_content.content.is_empty());
|
|
/// ```
|
|
pub fn get_item_content(
|
|
&self,
|
|
conn: &Connection,
|
|
id: i64,
|
|
) -> Result<ItemWithContent, CoreError> {
|
|
debug!("ITEM_SERVICE: Getting item content for id: {id}");
|
|
let item_with_meta = self.get_item(conn, id)?;
|
|
let item_id = item_with_meta
|
|
.item
|
|
.id
|
|
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
|
|
|
if item_id <= 0 {
|
|
return Err(CoreError::InvalidInput(format!(
|
|
"Invalid item ID: {item_id}"
|
|
)));
|
|
}
|
|
|
|
let mut item_path = self.data_path.clone();
|
|
item_path.push(item_id.to_string());
|
|
debug!("ITEM_SERVICE: Reading content from path: {item_path:?}");
|
|
|
|
let content = self
|
|
.compression_service
|
|
.get_item_content(item_path, &item_with_meta.item.compression)?;
|
|
debug!(
|
|
"ITEM_SERVICE: Read {} bytes of content for item {}",
|
|
content.len(),
|
|
id
|
|
);
|
|
|
|
Ok(ItemWithContent {
|
|
item_with_meta,
|
|
content,
|
|
})
|
|
}
|
|
|
|
/// Retrieves item content with binary detection and optional filtering.
|
|
///
|
|
/// Loads content, applies filters if specified, and determines MIME type and binary status.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `conn` - Database connection.
|
|
/// * `id` - Item ID.
|
|
/// * `filter` - Optional filter string to apply to content.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<(Vec<u8>, String, bool), CoreError>` - (content, MIME type, is_binary).
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * `CoreError::ItemNotFound(id)` - If item not found.
|
|
/// * Filter or compression errors.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let (content, mime, is_binary) = item_service.get_item_content_info(&conn, 1, Some("head_lines(10)"))?;
|
|
/// ```
|
|
pub fn get_item_content_info(
|
|
&self,
|
|
conn: &Connection,
|
|
id: i64,
|
|
filter: Option<String>,
|
|
) -> Result<(Vec<u8>, String, bool), CoreError> {
|
|
// Use streaming approach to handle all filtering options consistently
|
|
let (mut reader, mime_type, is_binary) =
|
|
self.get_item_content_info_streaming(conn, id, filter)?;
|
|
|
|
// Read all the filtered content into a buffer
|
|
let mut content = Vec::new();
|
|
reader.read_to_end(&mut content)?;
|
|
|
|
Ok((content, mime_type, is_binary))
|
|
}
|
|
|
|
/// Determines if item content is binary based on metadata or sampling.
|
|
///
|
|
/// Checks existing "text" metadata first; if absent, samples the first 8192 bytes.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `item_path` - Path to the item file.
|
|
/// * `compression` - Compression type.
|
|
/// * `metadata` - Item metadata.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<bool, CoreError>` - True if binary, false if text.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * File or compression errors.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let is_bin = item_service.is_content_binary(path, "gzip", &meta)?;
|
|
/// ```
|
|
fn is_content_binary(
|
|
&self,
|
|
item_path: PathBuf,
|
|
compression: &str,
|
|
metadata: &HashMap<String, String>,
|
|
) -> Result<bool, CoreError> {
|
|
// Read only the first 8192 bytes for binary detection
|
|
let mut sample_reader = self
|
|
.compression_service
|
|
.stream_item_content(item_path, compression)?;
|
|
let mut sample_buffer = vec![0; 8192];
|
|
let bytes_read = sample_reader.read(&mut sample_buffer)?;
|
|
Ok(crate::common::is_binary::is_content_binary_from_metadata(
|
|
metadata,
|
|
&sample_buffer[..bytes_read],
|
|
))
|
|
}
|
|
|
|
/// Retrieves a streaming reader for item content with optional filtering.
|
|
///
|
|
/// Returns a boxed reader that applies compression decompression and filters.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `conn` - Database connection.
|
|
/// * `id` - Item ID.
|
|
/// * `filter` - Optional filter string.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<(Box<dyn Read + Send>, String, bool), CoreError>` - (reader, MIME type, is_binary).
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * `CoreError::ItemNotFound(id)` - If item not found.
|
|
/// * Filter parsing or compression errors.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let (reader, mime, is_bin) = item_service.get_item_content_info_streaming(&conn, 1, Some("grep(error)"))?;
|
|
/// ```
|
|
pub fn get_item_content_info_streaming(
|
|
&self,
|
|
conn: &Connection,
|
|
id: i64,
|
|
filter: Option<String>,
|
|
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
|
|
// Convert filter string to FilterChain if provided
|
|
let filter_chain = if let Some(filter_str) = filter {
|
|
self.filter_service
|
|
.create_filter_chain(Some(&filter_str))
|
|
.map_err(|e| {
|
|
CoreError::InvalidInput(format!("Failed to create filter chain: {e}"))
|
|
})?
|
|
} else {
|
|
None
|
|
};
|
|
|
|
self.get_item_content_info_streaming_with_chain(conn, id, filter_chain.as_ref())
|
|
}
|
|
|
|
/// Retrieves a streaming reader with a pre-built filter chain.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `conn` - Database connection.
|
|
/// * `id` - Item ID.
|
|
/// * `filter_chain` - Optional pre-parsed filter chain.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<(Box<dyn Read + Send>, String, bool), CoreError>` - (reader, MIME type, is_binary).
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * `CoreError::ItemNotFound(id)` - If item not found.
|
|
/// * Compression or filtering errors.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let chain = parse_filter_string("head(100)")?;
|
|
/// let (reader, mime, is_bin) = item_service.get_item_content_info_streaming_with_chain(&conn, 1, Some(&chain))?;
|
|
/// ```
|
|
pub fn get_item_content_info_streaming_with_chain(
|
|
&self,
|
|
conn: &Connection,
|
|
id: i64,
|
|
filter_chain: Option<&filter_plugin::FilterChain>,
|
|
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
|
|
let item_with_meta = self.get_item(conn, id)?;
|
|
let item_id = item_with_meta
|
|
.item
|
|
.id
|
|
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
|
|
|
if item_id <= 0 {
|
|
return Err(CoreError::InvalidInput(format!(
|
|
"Invalid item ID: {item_id}"
|
|
)));
|
|
}
|
|
|
|
let mut item_path = self.data_path.clone();
|
|
item_path.push(item_id.to_string());
|
|
|
|
let reader = self
|
|
.compression_service
|
|
.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
|
|
|
|
// Wrap the reader with filtering
|
|
let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain.cloned()));
|
|
|
|
let metadata = item_with_meta.meta_as_map();
|
|
let mime_type = metadata
|
|
.get("mime_type")
|
|
.map(|s| s.to_string())
|
|
.unwrap_or_else(|| "application/octet-stream".to_string());
|
|
|
|
// Check if content is binary
|
|
let is_binary =
|
|
self.is_content_binary(item_path, &item_with_meta.item.compression, &metadata)?;
|
|
|
|
Ok((filtered_reader, mime_type, is_binary))
|
|
}
|
|
|
|
/// Like `get_item_content_info_streaming_with_chain` but accepts a pre-fetched `ItemWithMeta`
|
|
/// to avoid redundant DB queries.
|
|
pub fn get_item_content_info_streaming_with_item(
|
|
&self,
|
|
item_with_meta: ItemWithMeta,
|
|
filter_chain: Option<&filter_plugin::FilterChain>,
|
|
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
|
|
let item_id = item_with_meta
|
|
.item
|
|
.id
|
|
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
|
|
|
if item_id <= 0 {
|
|
return Err(CoreError::InvalidInput(format!(
|
|
"Invalid item ID: {item_id}"
|
|
)));
|
|
}
|
|
|
|
let mut item_path = self.data_path.clone();
|
|
item_path.push(item_id.to_string());
|
|
|
|
let reader = self
|
|
.compression_service
|
|
.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
|
|
|
|
let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain.cloned()));
|
|
|
|
let metadata = item_with_meta.meta_as_map();
|
|
let mime_type = metadata
|
|
.get("mime_type")
|
|
.map(|s| s.to_string())
|
|
.unwrap_or_else(|| "application/octet-stream".to_string());
|
|
|
|
let is_binary =
|
|
self.is_content_binary(item_path, &item_with_meta.item.compression, &metadata)?;
|
|
|
|
Ok((filtered_reader, mime_type, is_binary))
|
|
}
|
|
|
|
/// Finds an item by ID or tags/metadata criteria.
|
|
///
|
|
/// Supports lookup by ID, last item, or search by tags/metadata.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `conn` - Database connection.
|
|
/// * `ids` - Vector of IDs (if non-empty, uses first ID).
|
|
/// * `tags` - Vector of tags (all must match if provided).
|
|
/// * `meta` - HashMap of metadata key-value pairs (exact match).
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<ItemWithMeta, CoreError>` - The found item or error.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * `CoreError::ItemNotFound(...)` - If no matching item.
|
|
/// * Database errors.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let item = item_service.find_item(&conn, vec![1], &vec![], &HashMap::new())?;
|
|
/// ```
|
|
pub fn find_item(
|
|
&self,
|
|
conn: &Connection,
|
|
ids: &[i64],
|
|
tags: &[String],
|
|
meta: &HashMap<String, String>,
|
|
) -> Result<ItemWithMeta, CoreError> {
|
|
debug!("ITEM_SERVICE: Finding item with ids: {ids:?}, tags: {tags:?}, meta: {meta:?}");
|
|
let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) {
|
|
(false, _) => {
|
|
debug!("ITEM_SERVICE: Finding by ID: {}", ids[0]);
|
|
db::get_item(conn, ids[0])?
|
|
}
|
|
(true, true) => {
|
|
debug!("ITEM_SERVICE: Finding last item");
|
|
db::get_item_last(conn)?
|
|
}
|
|
(true, false) => {
|
|
debug!("ITEM_SERVICE: Finding by tags/meta");
|
|
db::get_item_matching(conn, &tags.to_vec(), meta)?
|
|
}
|
|
};
|
|
|
|
let item = item_maybe.ok_or(CoreError::ItemNotFoundGeneric)?;
|
|
debug!("ITEM_SERVICE: Found matching item: {item:?}");
|
|
|
|
// Get tags and meta directly instead of calling get_item which makes redundant queries
|
|
let item_id = item
|
|
.id
|
|
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
|
let tags = db::get_item_tags(conn, &item)?;
|
|
debug!(
|
|
"ITEM_SERVICE: Found {} tags for item {}",
|
|
tags.len(),
|
|
item_id
|
|
);
|
|
let meta = db::get_item_meta(conn, &item)?;
|
|
debug!(
|
|
"ITEM_SERVICE: Found {} meta entries for item {}",
|
|
meta.len(),
|
|
item_id
|
|
);
|
|
|
|
Ok(ItemWithMeta { item, tags, meta })
|
|
}
|
|
|
|
/// Lists items matching tags and metadata criteria.
|
|
///
|
|
/// Filters by tags (all must match) and exact metadata values, then loads full details.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `conn` - Database connection.
|
|
/// * `tags` - Vector of tags (all must match).
|
|
/// * `meta` - HashMap of metadata key-value pairs (exact match).
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<Vec<ItemWithMeta>, CoreError>` - List of matching items.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * Database query errors.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let items = item_service.list_items(&conn, &vec!["work"], &HashMap::new())?;
|
|
/// ```
|
|
pub fn list_items(
|
|
&self,
|
|
conn: &Connection,
|
|
tags: &[String],
|
|
meta: &HashMap<String, String>,
|
|
) -> Result<Vec<ItemWithMeta>, CoreError> {
|
|
debug!("ITEM_SERVICE: Listing items with tags: {tags:?}, meta: {meta:?}");
|
|
let items = db::get_items_matching(conn, &tags.to_vec(), meta)?;
|
|
debug!("ITEM_SERVICE: Found {} matching items", items.len());
|
|
|
|
let item_ids: Vec<i64> = items.iter().filter_map(|item| item.id).collect();
|
|
if item_ids.is_empty() {
|
|
debug!("ITEM_SERVICE: No items found, returning empty list");
|
|
return Ok(Vec::new());
|
|
}
|
|
|
|
debug!(
|
|
"ITEM_SERVICE: Getting tags and meta for {} items",
|
|
item_ids.len()
|
|
);
|
|
let tags_map = db::get_tags_for_items(conn, &item_ids)?;
|
|
let meta_map_db = db::get_meta_for_items(conn, &item_ids)?;
|
|
|
|
let mut result = Vec::new();
|
|
for item in items {
|
|
let item_id = item
|
|
.id
|
|
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
|
let tags = tags_map.get(&item_id).cloned().unwrap_or_default();
|
|
let meta_hm = meta_map_db.get(&item_id).cloned().unwrap_or_default();
|
|
let meta = meta_hm
|
|
.into_iter()
|
|
.map(|(name, value)| Meta {
|
|
id: item_id,
|
|
name,
|
|
value,
|
|
})
|
|
.collect();
|
|
|
|
result.push(ItemWithMeta { item, tags, meta });
|
|
}
|
|
|
|
debug!(
|
|
"ITEM_SERVICE: Returning {} items with full metadata",
|
|
result.len()
|
|
);
|
|
Ok(result)
|
|
}
|
|
|
|
/// Deletes an item by ID from database and storage.
|
|
///
|
|
/// Removes the item row, associated tags/metadata, and the physical file.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `conn` - Mutable database connection.
|
|
/// * `id` - Item ID to delete.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<(), CoreError>` - Success or error.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * `CoreError::ItemNotFound(id)` - If item does not exist.
|
|
/// * File deletion errors (non-fatal if not found).
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// item_service.delete_item(&mut conn, 1)?;
|
|
/// ```
|
|
pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<(), CoreError> {
|
|
debug!("ITEM_SERVICE: Deleting item with id: {id}");
|
|
if id <= 0 {
|
|
return Err(CoreError::InvalidInput(format!("Invalid item ID: {id}")));
|
|
}
|
|
let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?;
|
|
debug!("ITEM_SERVICE: Found item to delete: {item:?}");
|
|
|
|
let mut item_path = self.data_path.clone();
|
|
item_path.push(id.to_string());
|
|
debug!("ITEM_SERVICE: Deleting file at path: {item_path:?}");
|
|
|
|
db::delete_item(conn, item)?;
|
|
fs::remove_file(&item_path).or_else(|e| {
|
|
if e.kind() == std::io::ErrorKind::NotFound {
|
|
Ok(())
|
|
} else {
|
|
Err(e)
|
|
}
|
|
})?;
|
|
debug!("ITEM_SERVICE: Successfully deleted item {id}");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Saves content from a reader to a new item.
|
|
///
|
|
/// Reads from the input reader (e.g., stdin), applies metadata plugins,
|
|
/// compresses the content, and stores it with tags. Echoes input to stdout via TeeReader.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `input` - Reader providing the content to save (e.g., stdin).
|
|
/// * `cmd` - Mutable Clap command for error handling.
|
|
/// * `settings` - Application settings.
|
|
/// * `tags` - Tags to associate (defaults to "none" if empty).
|
|
/// * `conn` - Mutable database connection.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<Item, CoreError>` - The saved item with full details (id, size, compression, timestamp).
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * `CoreError::InvalidInput(...)` - If validation fails.
|
|
/// * Database or file I/O errors.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let reader = std::io::stdin();
|
|
/// let item = item_service.save_item(reader, &mut cmd, &settings, &mut vec![], &mut conn)?;
|
|
/// ```
|
|
pub fn save_item<R: Read>(
|
|
&self,
|
|
mut input: R,
|
|
cmd: &mut Command,
|
|
settings: &Settings,
|
|
tags: &mut Vec<String>,
|
|
conn: &mut Connection,
|
|
) -> Result<Item, CoreError> {
|
|
debug!("ITEM_SERVICE: Starting save_item with tags: {tags:?}");
|
|
if tags.is_empty() {
|
|
tags.push("none".to_string());
|
|
debug!("ITEM_SERVICE: No tags provided, using default 'none' tag");
|
|
}
|
|
|
|
let compression_type = settings_compression_type(cmd, settings);
|
|
debug!("ITEM_SERVICE: Using compression type: {compression_type:?}");
|
|
let compression_engine = get_compression_engine(compression_type.clone())?;
|
|
|
|
let item_id;
|
|
let mut item;
|
|
{
|
|
item = db::create_item(conn, compression_type.clone())?;
|
|
item_id = item
|
|
.id
|
|
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
|
debug!("ITEM_SERVICE: Created new item with id: {item_id}");
|
|
db::set_item_tags(conn, item.clone(), tags)?;
|
|
debug!("ITEM_SERVICE: Set tags for item {item_id}");
|
|
let item_meta = self.meta_service.collect_initial_meta();
|
|
debug!(
|
|
"ITEM_SERVICE: Collected {} initial meta entries",
|
|
item_meta.len()
|
|
);
|
|
for (k, v) in item_meta.iter() {
|
|
db::add_meta(conn, item_id, k, v)?;
|
|
}
|
|
}
|
|
|
|
// Print the "KEEP: New item" message before starting to read input
|
|
if !settings.quiet {
|
|
if std::io::stderr().is_terminal() {
|
|
let mut t = term::stderr().unwrap();
|
|
let _ = t.reset();
|
|
let _ = t.attr(term::Attr::Bold);
|
|
let _ = write!(t, "KEEP:");
|
|
let _ = t.reset();
|
|
let _ = write!(t, " New item ");
|
|
let _ = t.attr(term::Attr::Bold);
|
|
let _ = write!(t, "{item_id}");
|
|
let _ = t.reset();
|
|
let _ = write!(t, " tags: ");
|
|
let _ = t.attr(term::Attr::Bold);
|
|
let _ = write!(t, "{}", tags.join(" "));
|
|
let _ = t.reset();
|
|
let _ = writeln!(t);
|
|
let _ = std::io::stderr().flush();
|
|
} else {
|
|
let mut t = std::io::stderr();
|
|
let _ = writeln!(t, "KEEP: New item: {item_id} tags: {tags:?}");
|
|
}
|
|
}
|
|
|
|
let mut plugins = self.meta_service.get_plugins(cmd, settings);
|
|
debug!("ITEM_SERVICE: Got {} meta plugins", plugins.len());
|
|
self.meta_service
|
|
.initialize_plugins(&mut plugins, conn, item_id);
|
|
|
|
let mut item_path = self.data_path.clone();
|
|
item_path.push(item_id.to_string());
|
|
debug!("ITEM_SERVICE: Writing item to path: {item_path:?}");
|
|
|
|
let mut item_out = compression_engine.create(item_path.clone())?;
|
|
|
|
let mut buffer = [0; PIPESIZE];
|
|
let mut total_bytes = 0;
|
|
|
|
debug!("ITEM_SERVICE: Starting to read and process input data");
|
|
loop {
|
|
let n = input.read(&mut buffer)?;
|
|
if n == 0 {
|
|
break;
|
|
}
|
|
|
|
total_bytes += n as i64;
|
|
item_out.write_all(&buffer[..n])?;
|
|
self.meta_service
|
|
.process_chunk(&mut plugins, &buffer[..n], conn, item_id);
|
|
}
|
|
debug!("ITEM_SERVICE: Processed {total_bytes} bytes total");
|
|
|
|
item_out.flush()?;
|
|
drop(item_out);
|
|
|
|
debug!("ITEM_SERVICE: Finalizing meta plugins");
|
|
self.meta_service
|
|
.finalize_plugins(&mut plugins, conn, item_id);
|
|
|
|
item.size = Some(total_bytes);
|
|
db::update_item(conn, item.clone())?;
|
|
|
|
debug!("ITEM_SERVICE: Save completed successfully");
|
|
|
|
Ok(item)
|
|
}
|
|
|
|
/// Saves pre-loaded content as a new item, typically from MCP (Machine-Common-Processing) sources.
|
|
///
|
|
/// Bypasses streaming read, directly writes content and applies metadata/plugins.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `content` - Byte slice of content to save.
|
|
/// * `tags` - Tags to associate.
|
|
/// * `metadata` - Initial metadata key-value pairs.
|
|
/// * `cmd` - Mutable command.
|
|
/// * `settings` - Settings.
|
|
/// * `conn` - Mutable database connection.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `Result<ItemWithMeta, CoreError>` - The saved item with full details.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// * `CoreError::Database(...)` - If DB insert fails.
|
|
/// * `CoreError::Io(...)` - If file write fails.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let content = b"Hello, world!";
|
|
/// let tags = vec!["mcp".to_string()];
|
|
/// let meta = HashMap::from([("source".to_string(), "api".to_string())]);
|
|
/// let item = service.save_item_from_mcp(content, &tags, &meta, &mut cmd, &settings, &mut conn)?;
|
|
/// ```
|
|
pub fn save_item_from_mcp(
|
|
&self,
|
|
content: &[u8],
|
|
tags: &Vec<String>,
|
|
metadata: &HashMap<String, String>,
|
|
cmd: &mut Command,
|
|
settings: &Settings,
|
|
conn: &mut Connection,
|
|
) -> Result<ItemWithMeta, CoreError> {
|
|
debug!(
|
|
"ITEM_SERVICE: Starting save_item_from_mcp with {} bytes, {} tags, {} metadata entries",
|
|
content.len(),
|
|
tags.len(),
|
|
metadata.len()
|
|
);
|
|
let compression_type = CompressionType::LZ4;
|
|
let compression_engine = get_compression_engine(compression_type.clone())?;
|
|
|
|
let item_id;
|
|
let mut item;
|
|
|
|
{
|
|
item = db::create_item(conn, compression_type.clone())?;
|
|
item_id = item
|
|
.id
|
|
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
|
debug!("ITEM_SERVICE: Created MCP item with id: {item_id}");
|
|
|
|
// Add tags
|
|
for tag in tags {
|
|
db::add_tag(conn, item_id, tag)?;
|
|
}
|
|
debug!("ITEM_SERVICE: Added {} tags to MCP item", tags.len());
|
|
|
|
// Add custom metadata
|
|
for (key, value) in metadata {
|
|
db::add_meta(conn, item_id, key, value)?;
|
|
}
|
|
debug!(
|
|
"ITEM_SERVICE: Added {} custom metadata entries to MCP item",
|
|
metadata.len()
|
|
);
|
|
}
|
|
|
|
let mut item_path = self.data_path.clone();
|
|
item_path.push(item_id.to_string());
|
|
debug!("ITEM_SERVICE: Writing MCP item to path: {item_path:?}");
|
|
|
|
let mut writer = compression_engine.create(item_path.clone())?;
|
|
writer.write_all(content)?;
|
|
drop(writer);
|
|
|
|
let mut plugins = self.meta_service.get_plugins(cmd, settings);
|
|
debug!(
|
|
"ITEM_SERVICE: Got {} configured meta plugins for MCP item",
|
|
plugins.len()
|
|
);
|
|
|
|
self.meta_service
|
|
.initialize_plugins(&mut plugins, conn, item_id);
|
|
self.meta_service
|
|
.process_chunk(&mut plugins, content, conn, item_id);
|
|
self.meta_service
|
|
.finalize_plugins(&mut plugins, conn, item_id);
|
|
debug!("ITEM_SERVICE: Processed MCP item through configured meta plugins");
|
|
|
|
item.size = Some(content.len() as i64);
|
|
db::update_item(conn, item.clone())?;
|
|
|
|
debug!("ITEM_SERVICE: MCP item saved successfully");
|
|
|
|
self.get_item(conn, item_id)
|
|
}
|
|
|
|
/// Returns a reference to the internal compression service.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// Reference to `CompressionService`.
|
|
pub fn get_compression_service(&self) -> &CompressionService {
|
|
&self.compression_service
|
|
}
|
|
|
|
/// Returns a reference to the data directory path.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// Reference to `PathBuf`.
|
|
pub fn get_data_path(&self) -> &PathBuf {
|
|
&self.data_path
|
|
}
|
|
}
|
|
|
|
/// A reader that applies a filter chain to the data as it's read.
|
|
///
|
|
/// Wraps an underlying reader and applies a filter chain to the data during read operations.
|
|
/// Buffers data as needed for filter processing.
|
|
///
|
|
/// # Fields
|
|
///
|
|
/// * `reader` - The underlying reader providing the data source.
|
|
/// * `filter_chain` - Optional filter chain to apply.
|
|
/// * `buffer` - Internal buffer for holding filtered data.
|
|
/// * `buffer_pos` - Current position in the internal buffer.
|
|
struct FilteringReader<R: Read> {
|
|
reader: R,
|
|
filter_chain: Option<filter_plugin::FilterChain>,
|
|
buffer: Vec<u8>,
|
|
buffer_pos: usize,
|
|
temp_buf: Vec<u8>,
|
|
}
|
|
|
|
impl<R: Read> FilteringReader<R> {
|
|
/// Creates a new `FilteringReader` with the given reader and filter chain.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `reader` - The underlying reader.
|
|
/// * `filter_chain` - Optional filter chain to apply.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// A new `FilteringReader`.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let reader = std::io::Cursor::new(b"data");
|
|
/// let filter_chain = parse_filter_string("head(10)")?;
|
|
/// let filtered = FilteringReader::new(reader, Some(filter_chain));
|
|
/// ```
|
|
pub fn new(reader: R, filter_chain: Option<filter_plugin::FilterChain>) -> Self {
|
|
Self {
|
|
reader,
|
|
filter_chain,
|
|
buffer: Vec::new(),
|
|
buffer_pos: 0,
|
|
temp_buf: vec![0; 8192],
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<R: Read> Read for FilteringReader<R> {
|
|
/// Reads data, applying the filter chain if present.
|
|
///
|
|
/// If buffered data exists, serves it first. Otherwise, reads a chunk, filters it,
|
|
/// and serves the output. Handles EOF properly.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `buf` - Buffer to fill with filtered data.
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// * `io::Result<usize>` - Number of bytes read, or I/O error.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Propagates errors from underlying reader or filter operations.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let mut filtered = FilteringReader::new(std::io::Cursor::new(b"Hello"), None);
|
|
/// let mut buf = [0; 5];
|
|
/// let n = filtered.read(&mut buf).unwrap();
|
|
/// assert_eq!(n, 5);
|
|
/// ```
|
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
|
// If we have data in our buffer, serve that first
|
|
if self.buffer_pos < self.buffer.len() {
|
|
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
|
|
buf[..bytes_to_copy]
|
|
.copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]);
|
|
self.buffer_pos += bytes_to_copy;
|
|
return Ok(bytes_to_copy);
|
|
}
|
|
|
|
// Reset buffer for new data
|
|
self.buffer.clear();
|
|
self.buffer_pos = 0;
|
|
|
|
// No filter chain — pass through directly, no intermediate buffer needed
|
|
if self.filter_chain.is_none() {
|
|
return self.reader.read(buf);
|
|
}
|
|
|
|
// Read from the original reader into the reusable temp buffer
|
|
let to_read = std::cmp::min(buf.len(), self.temp_buf.len());
|
|
let bytes_read = self.reader.read(&mut self.temp_buf[..to_read])?;
|
|
|
|
if bytes_read == 0 {
|
|
return Ok(0);
|
|
}
|
|
|
|
// Process through the filter chain
|
|
if let Some(ref mut chain) = self.filter_chain {
|
|
let mut input_cursor = std::io::Cursor::new(&self.temp_buf[..bytes_read]);
|
|
chain.filter(&mut input_cursor, &mut self.buffer)?;
|
|
|
|
if !self.buffer.is_empty() {
|
|
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len());
|
|
buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]);
|
|
self.buffer_pos = bytes_to_copy;
|
|
Ok(bytes_to_copy)
|
|
} else {
|
|
// No data produced by filter, signal to read more
|
|
Ok(0)
|
|
}
|
|
} else {
|
|
unreachable!()
|
|
}
|
|
}
|
|
}
|