Files
keep/src/services/item_service.rs
Andrew Phillips 49793a0f94 feat: add streaming tar export/import and rename "none" to "raw"
- Add streaming tar-based export (--export produces .keep.tar)
- Add streaming tar import (--import reads .keep.tar archives)
- Add server endpoints GET /api/export and POST /api/import
- Rename CompressionType::None to CompressionType::Raw with "none" as alias
- Add DB migration to update existing "none" compression values to "raw"
- Fix export endpoint to propagate errors to client instead of swallowing
- Fix import endpoint to return 413 on max_body_size instead of truncating

Export streams items as tar archives without loading entire files into memory.
Import extracts items with new IDs, preserving original order. Both work
locally and via client/server mode.

Co-Authored-By: opencode <noreply@opencode.ai>
2026-03-17 21:24:39 -03:00

1085 lines
36 KiB
Rust

use crate::compression_engine::{CompressionType, get_compression_engine};
use crate::config::Settings;
use crate::db::{self, Item, Meta};
use crate::filter_plugin;
use crate::modes::common::settings_compression_type;
use crate::services::compression_service::CompressionService;
use crate::services::error::CoreError;
use crate::services::filter_service::FilterService;
use crate::services::meta_service::MetaService;
use crate::services::types::{ItemWithContent, ItemWithMeta};
use chrono::DateTime;
use chrono::Utc;
use clap::Command;
use log::debug;
use rusqlite::Connection;
use std::collections::HashMap;
use std::fs;
use std::io::{Cursor, IsTerminal, Read, Write};
use std::path::PathBuf;
/// Service for managing items in the Keep application.
///
/// This service handles CRUD operations for items, including saving content,
/// retrieving items with metadata and content, applying filters, and managing
/// compression. It integrates with the database, file system, compression engines,
/// metadata plugins, and filters to provide a complete item management interface.
pub struct ItemService {
/// Path to the data storage directory.
data_path: PathBuf,
/// Service for handling compression and decompression.
compression_service: CompressionService,
/// Service for applying content filters.
filter_service: FilterService,
}
impl ItemService {
/// Creates a new `ItemService` instance.
///
/// Initializes the service with the specified data directory path.
///
/// # Arguments
///
/// * `data_path` - Path to the directory where item files are stored.
///
/// # Returns
///
/// A new `ItemService` instance.
///
/// # Examples
///
/// ```
/// # use keep::services::ItemService;
/// # use std::path::PathBuf;
/// let service = ItemService::new(PathBuf::from("/data"));
/// ```
pub fn new(data_path: PathBuf) -> Self {
debug!("ITEM_SERVICE: Creating new ItemService with data_path: {data_path:?}");
Self {
data_path,
compression_service: CompressionService::new(),
filter_service: FilterService::new(),
}
}
/// Retrieves an item with its associated metadata and tags.
///
/// Fetches the item from the database by ID and loads its tags and metadata.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `id` - Item ID to retrieve.
///
/// # Returns
///
/// * `Result<ItemWithMeta, CoreError>` - Item with metadata and tags, or an error if not found.
///
/// # Errors
///
/// * `CoreError::ItemNotFound(id)` - If the item does not exist.
/// * Database-related errors.
///
/// # Examples
///
/// ```ignore
/// let item_with_meta = item_service.get_item(&conn, 1)?;
/// assert_eq!(item_with_meta.item.id, Some(1));
/// ```
pub fn get_item(&self, conn: &Connection, id: i64) -> Result<ItemWithMeta, CoreError> {
debug!("ITEM_SERVICE: Getting item with id: {id}");
let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?;
debug!("ITEM_SERVICE: Found item: {item:?}");
let tags = db::get_item_tags(conn, &item)?;
debug!("ITEM_SERVICE: Found {} tags for item {}", tags.len(), id);
let meta = db::get_item_meta(conn, &item)?;
debug!(
"ITEM_SERVICE: Found {} meta entries for item {}",
meta.len(),
id
);
Ok(ItemWithMeta { item, tags, meta })
}
/// Retrieves an item with its content, metadata, and tags.
///
/// Loads the item, its metadata/tags, and decompresses the full content.
/// This method is intended for CLI use only and has a size guard (100MB).
/// For larger items or server use, use `get_item_content_info_streaming`.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `id` - Item ID to retrieve.
///
/// # Returns
///
/// * `Result<ItemWithContent, CoreError>` - Item with content, or an error if not found.
///
/// # Errors
///
/// * `CoreError::ItemNotFound(id)` - If the item does not exist.
/// * `CoreError::Io(...)` - If file read or decompression fails.
/// * `CoreError::InvalidInput(...)` - If item exceeds 100MB size limit.
///
/// # Examples
///
/// ```ignore
/// let item_with_content = item_service.get_item_content(&conn, 1)?;
/// assert!(!item_with_content.content.is_empty());
/// ```
pub fn get_item_content(
&self,
conn: &Connection,
id: i64,
) -> Result<ItemWithContent, CoreError> {
// Size limit for loading entire content into memory (100MB)
const MAX_CONTENT_SIZE: i64 = 100 * 1024 * 1024;
debug!("ITEM_SERVICE: Getting item content for id: {id}");
let item_with_meta = self.get_item(conn, id)?;
let item_id = item_with_meta
.item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
if item_id <= 0 {
return Err(CoreError::InvalidInput(format!(
"Invalid item ID: {item_id}"
)));
}
// Check size guard before loading content
if let Some(size) = item_with_meta.item.size
&& size > MAX_CONTENT_SIZE
{
return Err(CoreError::InvalidInput(format!(
"Item {} exceeds size limit ({} > {}). Use streaming API for large items.",
item_id, size, MAX_CONTENT_SIZE
)));
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
debug!("ITEM_SERVICE: Reading content from path: {item_path:?}");
let content = self
.compression_service
.get_item_content(item_path, &item_with_meta.item.compression)?;
debug!(
"ITEM_SERVICE: Read {} bytes of content for item {}",
content.len(),
id
);
Ok(ItemWithContent {
item_with_meta,
content,
})
}
/// Determines if item content is binary based on metadata or sampling.
///
/// Checks existing "text" metadata first; if absent, samples the first 8192 bytes.
///
/// # Arguments
///
/// * `item_path` - Path to the item file.
/// * `compression` - Compression type.
/// * `metadata` - Item metadata.
///
/// # Returns
///
/// * `Result<bool, CoreError>` - True if binary, false if text.
///
/// # Errors
///
/// * File or compression errors.
///
/// # Examples
///
/// ```ignore
/// let is_bin = item_service.is_content_binary(path, "gzip", &meta)?;
/// ```
fn is_content_binary(
&self,
item_path: PathBuf,
compression: &str,
metadata: &HashMap<String, String>,
) -> Result<bool, CoreError> {
// Read only the first 8192 bytes for binary detection
let mut sample_reader = self
.compression_service
.stream_item_content(item_path, compression)?;
let mut sample_buffer = vec![0; 8192];
let bytes_read = sample_reader.read(&mut sample_buffer)?;
Ok(crate::common::is_binary::is_content_binary_from_metadata(
metadata,
&sample_buffer[..bytes_read],
))
}
/// Retrieves a streaming reader for item content with optional filtering.
///
/// Returns a boxed reader that applies compression decompression and filters.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `id` - Item ID.
/// * `filter` - Optional filter string.
///
/// # Returns
///
/// * `Result<(Box<dyn Read + Send>, String, bool), CoreError>` - (reader, MIME type, is_binary).
///
/// # Errors
///
/// * `CoreError::ItemNotFound(id)` - If item not found.
/// * Filter parsing or compression errors.
///
/// # Examples
///
/// ```ignore
/// let (reader, mime, is_bin) = item_service.get_item_content_info_streaming(&conn, 1, Some("grep(error)"))?;
/// ```
pub fn get_item_content_info_streaming(
&self,
conn: &Connection,
id: i64,
filter: Option<String>,
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
// Convert filter string to FilterChain if provided
let filter_chain = if let Some(filter_str) = filter {
self.filter_service
.create_filter_chain(Some(&filter_str))
.map_err(|e| {
CoreError::InvalidInput(format!("Failed to create filter chain: {e}"))
})?
} else {
None
};
self.get_item_content_info_streaming_with_chain(conn, id, filter_chain.as_ref())
}
/// Retrieves a streaming reader with a pre-built filter chain.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `id` - Item ID.
/// * `filter_chain` - Optional pre-parsed filter chain.
///
/// # Returns
///
/// * `Result<(Box<dyn Read + Send>, String, bool), CoreError>` - (reader, MIME type, is_binary).
///
/// # Errors
///
/// * `CoreError::ItemNotFound(id)` - If item not found.
/// * Compression or filtering errors.
///
/// # Examples
///
/// ```ignore
/// let chain = parse_filter_string("head(100)")?;
/// let (reader, mime, is_bin) = item_service.get_item_content_info_streaming_with_chain(&conn, 1, Some(&chain))?;
/// ```
pub fn get_item_content_info_streaming_with_chain(
&self,
conn: &Connection,
id: i64,
filter_chain: Option<&filter_plugin::FilterChain>,
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
let item_with_meta = self.get_item(conn, id)?;
let item_id = item_with_meta
.item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
if item_id <= 0 {
return Err(CoreError::InvalidInput(format!(
"Invalid item ID: {item_id}"
)));
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
let reader = self
.compression_service
.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
// Wrap the reader with filtering
let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain.cloned()));
let metadata = item_with_meta.meta_as_map();
let mime_type = metadata
.get("mime_type")
.map(|s| s.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string());
// Check if content is binary
let is_binary =
self.is_content_binary(item_path, &item_with_meta.item.compression, &metadata)?;
Ok((filtered_reader, mime_type, is_binary))
}
/// Like `get_item_content_info_streaming_with_chain` but accepts a pre-fetched `ItemWithMeta`
/// to avoid redundant DB queries.
pub fn get_item_content_info_streaming_with_item(
&self,
item_with_meta: ItemWithMeta,
filter_chain: Option<&filter_plugin::FilterChain>,
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
let item_id = item_with_meta
.item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
if item_id <= 0 {
return Err(CoreError::InvalidInput(format!(
"Invalid item ID: {item_id}"
)));
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
let reader = self
.compression_service
.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain.cloned()));
let metadata = item_with_meta.meta_as_map();
let mime_type = metadata
.get("mime_type")
.map(|s| s.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string());
let is_binary =
self.is_content_binary(item_path, &item_with_meta.item.compression, &metadata)?;
Ok((filtered_reader, mime_type, is_binary))
}
/// Finds an item by ID or tags/metadata criteria.
///
/// Supports lookup by ID, last item, or search by tags/metadata.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `ids` - Vector of IDs (if non-empty, uses first ID).
/// * `tags` - Vector of tags (all must match if provided).
/// * `meta` - HashMap of metadata key-value pairs (exact match).
///
/// # Returns
///
/// * `Result<ItemWithMeta, CoreError>` - The found item or error.
///
/// # Errors
///
/// * `CoreError::ItemNotFound(...)` - If no matching item.
/// * Database errors.
///
/// # Examples
///
/// ```ignore
/// let item = item_service.find_item(&conn, vec![1], &vec![], &HashMap::new())?;
/// ```
pub fn find_item(
&self,
conn: &Connection,
ids: &[i64],
tags: &[String],
meta: &HashMap<String, Option<String>>,
) -> Result<ItemWithMeta, CoreError> {
debug!("ITEM_SERVICE: Finding item with ids: {ids:?}, tags: {tags:?}, meta: {meta:?}");
let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) {
(false, _) => {
debug!("ITEM_SERVICE: Finding by ID: {}", ids[0]);
db::get_item(conn, ids[0])?
}
(true, true) => {
debug!("ITEM_SERVICE: Finding last item");
db::get_item_last(conn)?
}
(true, false) => {
debug!("ITEM_SERVICE: Finding by tags/meta");
db::get_item_matching(conn, &tags.to_vec(), meta)?
}
};
let item = item_maybe.ok_or(CoreError::ItemNotFoundGeneric)?;
debug!("ITEM_SERVICE: Found matching item: {item:?}");
// Get tags and meta directly instead of calling get_item which makes redundant queries
let item_id = item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
let tags = db::get_item_tags(conn, &item)?;
debug!(
"ITEM_SERVICE: Found {} tags for item {}",
tags.len(),
item_id
);
let meta = db::get_item_meta(conn, &item)?;
debug!(
"ITEM_SERVICE: Found {} meta entries for item {}",
meta.len(),
item_id
);
Ok(ItemWithMeta { item, tags, meta })
}
/// Lists items matching tags and metadata criteria.
///
/// Filters by tags (all must match) and exact metadata values, then loads full details.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `tags` - Vector of tags (all must match).
/// * `meta` - HashMap of metadata key-value pairs (exact match).
///
/// # Returns
///
/// * `Result<Vec<ItemWithMeta>, CoreError>` - List of matching items.
///
/// # Errors
///
/// * Database query errors.
///
/// # Examples
///
/// ```ignore
/// let items = item_service.list_items(&conn, &vec!["work"], &HashMap::new())?;
/// ```
pub fn list_items(
&self,
conn: &Connection,
tags: &[String],
meta: &HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, CoreError> {
debug!("ITEM_SERVICE: Listing items with tags: {tags:?}, meta: {meta:?}");
let items = db::get_items_matching(conn, &tags.to_vec(), meta)?;
debug!("ITEM_SERVICE: Found {} matching items", items.len());
let item_ids: Vec<i64> = items.iter().filter_map(|item| item.id).collect();
if item_ids.is_empty() {
debug!("ITEM_SERVICE: No items found, returning empty list");
return Ok(Vec::new());
}
debug!(
"ITEM_SERVICE: Getting tags and meta for {} items",
item_ids.len()
);
let tags_map = db::get_tags_for_items(conn, &item_ids)?;
let meta_map_db = db::get_meta_for_items(conn, &item_ids)?;
let mut result = Vec::new();
for item in items {
let item_id = item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
let tags = tags_map.get(&item_id).cloned().unwrap_or_default();
let meta_hm = meta_map_db.get(&item_id).cloned().unwrap_or_default();
let meta = meta_hm
.into_iter()
.map(|(name, value)| Meta {
id: item_id,
name,
value,
})
.collect();
result.push(ItemWithMeta { item, tags, meta });
}
debug!(
"ITEM_SERVICE: Returning {} items with full metadata",
result.len()
);
Ok(result)
}
/// Deletes an item by ID from database and storage.
///
/// Removes the item row, associated tags/metadata, and the physical file.
///
/// # Arguments
///
/// * `conn` - Mutable database connection.
/// * `id` - Item ID to delete.
///
/// # Returns
///
/// * `Result<(), CoreError>` - Success or error.
///
/// # Errors
///
/// * `CoreError::ItemNotFound(id)` - If item does not exist.
/// * File deletion errors (non-fatal if not found).
///
/// # Examples
///
/// ```ignore
/// item_service.delete_item(&mut conn, 1)?;
/// ```
pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<Item, CoreError> {
debug!("ITEM_SERVICE: Deleting item with id: {id}");
if id <= 0 {
return Err(CoreError::InvalidInput(format!("Invalid item ID: {id}")));
}
let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?;
debug!("ITEM_SERVICE: Found item to delete: {item:?}");
let mut item_path = self.data_path.clone();
item_path.push(id.to_string());
debug!("ITEM_SERVICE: Deleting file at path: {item_path:?}");
let deleted_item = item.clone();
db::delete_item(conn, item)?;
fs::remove_file(&item_path).or_else(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
Ok(())
} else {
Err(e)
}
})?;
debug!("ITEM_SERVICE: Successfully deleted item {id}");
Ok(deleted_item)
}
/// Saves content from a reader to a new item.
///
/// Reads from the input reader (e.g., stdin), applies metadata plugins,
/// compresses the content, and stores it with tags. Echoes input to stdout via TeeReader.
///
/// # Arguments
///
/// * `input` - Reader providing the content to save (e.g., stdin).
/// * `cmd` - Mutable Clap command for error handling.
/// * `settings` - Application settings.
/// * `tags` - Tags to associate (defaults to "none" if empty).
/// * `conn` - Mutable database connection.
///
/// # Returns
///
/// * `Result<Item, CoreError>` - The saved item with full details (id, size, compression, timestamp).
///
/// # Errors
///
/// * `CoreError::InvalidInput(...)` - If validation fails.
/// * Database or file I/O errors.
///
/// # Examples
///
/// ```ignore
/// let reader = std::io::stdin();
/// let item = item_service.save_item(reader, &mut cmd, &settings, &mut vec![], &mut conn)?;
/// ```
pub fn save_item<R: Read>(
&self,
mut input: R,
cmd: &mut Command,
settings: &Settings,
tags: &mut Vec<String>,
conn: &mut Connection,
) -> Result<Item, CoreError> {
debug!("ITEM_SERVICE: Starting save_item with tags: {tags:?}");
crate::modes::common::ensure_default_tag(tags);
debug!("ITEM_SERVICE: Tags after ensure_default: {tags:?}");
let compression_type = settings_compression_type(cmd, settings);
debug!("ITEM_SERVICE: Using compression type: {compression_type:?}");
let compression_engine = get_compression_engine(compression_type.clone())?;
let item_id;
let mut item;
{
item = db::create_item(conn, compression_type.clone())?;
item_id = item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
debug!("ITEM_SERVICE: Created new item with id: {item_id}");
db::set_item_tags(conn, item.clone(), tags)?;
debug!("ITEM_SERVICE: Set tags for item {item_id}");
let item_meta = MetaService::collect_initial_meta_static();
debug!(
"ITEM_SERVICE: Collected {} initial meta entries",
item_meta.len()
);
for (k, v) in item_meta.iter() {
db::add_meta(conn, item_id, k, v)?;
}
// Store user-specified metadata from --meta CLI flags
for (key, value) in &settings.meta {
if let Some(v) = value {
debug!("ITEM_SERVICE: Setting user meta {key}={v}");
db::add_meta(conn, item_id, key, v)?;
}
}
}
// Print the "KEEP: New item" message before starting to read input
if !settings.quiet {
if std::io::stderr().is_terminal() {
let mut t = term::stderr().unwrap();
let _ = t.reset();
let _ = t.attr(term::Attr::Bold);
let _ = write!(t, "KEEP:");
let _ = t.reset();
let _ = write!(t, " New item ");
let _ = t.attr(term::Attr::Bold);
let _ = write!(t, "{item_id}");
let _ = t.reset();
let _ = write!(t, " tags: ");
let _ = t.attr(term::Attr::Bold);
let _ = write!(t, "{}", tags.join(" "));
let _ = t.reset();
let _ = writeln!(t);
let _ = std::io::stderr().flush();
} else {
let mut t = std::io::stderr();
let _ = writeln!(t, "KEEP: New item: {item_id} tags: {tags:?}");
}
}
// Collect metadata from plugins into a Vec, then write to DB after plugins finish.
// This avoids capturing &Connection in the save_meta closure (which would need unsafe
// and wouldn't be Send for parallel plugins).
let (meta_service, collected_meta) = MetaService::with_collector();
let mut plugins = meta_service.get_plugins(cmd, settings);
debug!("ITEM_SERVICE: Got {} meta plugins", plugins.len());
meta_service.initialize_plugins(&mut plugins);
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
debug!("ITEM_SERVICE: Writing item to path: {item_path:?}");
let mut item_out = compression_engine.create(item_path.clone())?;
let mut total_bytes: i64 = 0;
debug!("ITEM_SERVICE: Starting to read and process input data");
crate::common::stream_copy(&mut input, |chunk| {
total_bytes += chunk.len() as i64;
item_out.write_all(chunk)?;
meta_service.process_chunk(&mut plugins, chunk);
Ok(())
})?;
debug!("ITEM_SERVICE: Processed {total_bytes} bytes total");
item_out.flush()?;
drop(item_out);
debug!("ITEM_SERVICE: Finalizing meta plugins");
meta_service.finalize_plugins(&mut plugins);
// Write collected plugin metadata to DB
if let Ok(entries) = collected_meta.lock() {
for (name, value) in entries.iter() {
db::add_meta(conn, item_id, name, value)?;
}
}
item.size = Some(total_bytes);
db::update_item(conn, item.clone())?;
debug!("ITEM_SERVICE: Save completed successfully");
Ok(item)
}
/// Returns a reference to the internal compression service.
///
/// # Returns
///
/// Reference to `CompressionService`.
pub fn get_compression_service(&self) -> &CompressionService {
&self.compression_service
}
/// Returns a reference to the data directory path.
///
/// # Returns
///
/// Reference to `PathBuf`.
pub fn get_data_path(&self) -> &PathBuf {
&self.data_path
}
/// Returns a streaming reader and item metadata for the given item.
pub fn get_item_content_streaming(
&self,
conn: &Connection,
id: i64,
) -> Result<(Box<dyn Read + Send>, ItemWithMeta), CoreError> {
let (reader, _mime, _is_binary) = self.get_item_content_info_streaming(conn, id, None)?;
let item_with_meta = self.get_item(conn, id)?;
Ok((reader, item_with_meta))
}
/// Fetches multiple items by ID, silently skipping not-found items.
/// Falls back to `list_items` if the ID list is empty.
pub fn get_items(
&self,
conn: &Connection,
ids: &[i64],
tags: &[String],
meta: &HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, CoreError> {
if ids.is_empty() {
return self.list_items(conn, tags, meta);
}
let mut results = Vec::new();
for id in ids {
match self.get_item(conn, *id) {
Ok(item) => results.push(item),
Err(CoreError::ItemNotFound(_)) => continue,
Err(e) => return Err(e),
}
}
Ok(results)
}
/// Save an item with granular control over compression and meta plugins.
///
/// This method allows callers to control whether compression and meta plugins
/// run server-side or were already handled by the client.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `content` - Raw content bytes.
/// * `tags` - Tags to associate with the item.
/// * `metadata` - Client-provided metadata.
/// * `compress` - Whether the server should compress the content.
/// * `run_meta` - Whether the server should run meta plugins.
/// * `settings` - Application settings.
///
/// # Returns
///
/// * `Result<ItemWithMeta, CoreError>` - The saved item with full details.
#[allow(clippy::too_many_arguments)]
pub fn save_item_raw(
&self,
conn: &mut Connection,
content: &[u8],
tags: Vec<String>,
metadata: HashMap<String, String>,
compress: bool,
run_meta: bool,
settings: &Settings,
) -> Result<ItemWithMeta, CoreError> {
let mut cursor = Cursor::new(content);
self.save_item_raw_streaming(
conn,
&mut cursor,
tags,
metadata,
compress,
run_meta,
None,
None,
settings,
)
}
/// Save an item from a streaming reader with granular control over compression.
///
/// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method
/// reads from the reader in chunks and writes directly to the compression
/// engine, avoiding buffering the entire content in memory.
#[allow(clippy::too_many_arguments)]
pub fn save_item_raw_streaming(
&self,
conn: &mut Connection,
reader: &mut dyn Read,
tags: Vec<String>,
metadata: HashMap<String, String>,
compress: bool,
run_meta: bool,
client_compression_type: Option<CompressionType>,
import_ts: Option<DateTime<Utc>>,
settings: &Settings,
) -> Result<ItemWithMeta, CoreError> {
let mut cmd = Command::new("keep");
let mut tags = tags;
crate::modes::common::ensure_default_tag(&mut tags);
let (compression_type_for_db, compression_engine) = if compress {
let ct = settings_compression_type(&mut cmd, settings);
let engine = get_compression_engine(ct.clone())?;
(ct, engine)
} else {
let ct = client_compression_type.unwrap_or(CompressionType::Raw);
let engine = get_compression_engine(CompressionType::Raw)?;
(ct, engine)
};
let item_id;
let mut item;
{
item = if let Some(ts) = import_ts {
db::insert_item_with_ts(conn, ts, &compression_type_for_db.to_string())?
} else {
db::create_item(conn, compression_type_for_db.clone())?
};
item_id = item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
db::set_item_tags(conn, item.clone(), &tags)?;
}
let (meta_service, collected_meta) = MetaService::with_collector();
let mut plugins = if run_meta {
meta_service.get_plugins(&mut cmd, settings)
} else {
Vec::new()
};
if run_meta {
meta_service.initialize_plugins(&mut plugins);
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
let mut item_out = compression_engine.create(item_path)?;
let mut total_bytes = 0i64;
crate::common::stream_copy(reader, |chunk| {
item_out.write_all(chunk)?;
total_bytes += chunk.len() as i64;
if run_meta {
meta_service.process_chunk(&mut plugins, chunk);
}
Ok(())
})?;
item_out.flush()?;
drop(item_out);
if run_meta {
meta_service.finalize_plugins(&mut plugins);
}
if run_meta && let Ok(entries) = collected_meta.lock() {
for (name, value) in entries.iter() {
db::add_meta(conn, item_id, name, value)?;
}
}
for (key, value) in &metadata {
if key != "uncompressed_size" {
db::add_meta(conn, item_id, key, value)?;
}
}
item.size = Some(total_bytes);
db::update_item(conn, item)?;
self.get_item(conn, item_id)
}
/// Runs specified meta plugins on an existing item's content and stores the results.
pub fn update_item_plugins(
&self,
conn: &mut Connection,
item_id: i64,
plugin_names: &[String],
metadata: HashMap<String, String>,
tags: &[String],
settings: &Settings,
) -> Result<ItemWithMeta, CoreError> {
let item = db::get_item(conn, item_id)?.ok_or_else(|| CoreError::ItemNotFound(item_id))?;
let (meta_service, collected_meta) = MetaService::with_collector();
let mut cmd = Command::new("keep");
let all_plugins = meta_service.get_plugins(&mut cmd, settings);
let mut plugins: Vec<Box<dyn crate::meta_plugin::MetaPlugin>> = all_plugins
.into_iter()
.filter(|p| {
let plugin_name = p.meta_type().to_string();
plugin_names.iter().any(|n| n == &plugin_name)
})
.collect();
if plugins.is_empty() && metadata.is_empty() {
return self.get_item(conn, item_id);
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
if !item_path.exists() {
return Err(CoreError::ItemNotFound(item_id));
}
if !plugins.is_empty() {
let compression_service = CompressionService::new();
let mut reader =
compression_service.stream_item_content(item_path, &item.compression)?;
meta_service.initialize_plugins(&mut plugins);
crate::common::stream_copy(&mut reader, |chunk| {
meta_service.process_chunk(&mut plugins, chunk);
Ok(())
})?;
meta_service.finalize_plugins(&mut plugins);
if let Ok(entries) = collected_meta.lock() {
for (name, value) in entries.iter() {
db::add_meta(conn, item_id, name, value)?;
}
}
}
for (key, value) in &metadata {
db::add_meta(conn, item_id, key, value)?;
}
for tag in tags {
db::upsert_tag(conn, item_id, tag)?;
}
self.get_item(conn, item_id)
}
}
/// A reader that applies a filter chain to the data as it's read.
///
/// Wraps an underlying reader and applies a filter chain to the data during read operations.
/// Buffers data as needed for filter processing.
///
/// # Fields
///
/// * `reader` - The underlying reader providing the data source.
/// * `filter_chain` - Optional filter chain to apply.
/// * `buffer` - Internal buffer for holding filtered data.
/// * `buffer_pos` - Current position in the internal buffer.
struct FilteringReader<R: Read> {
reader: R,
filter_chain: Option<filter_plugin::FilterChain>,
buffer: Vec<u8>,
buffer_pos: usize,
temp_buf: Vec<u8>,
}
impl<R: Read> FilteringReader<R> {
/// Creates a new `FilteringReader` with the given reader and filter chain.
///
/// # Arguments
///
/// * `reader` - The underlying reader.
/// * `filter_chain` - Optional filter chain to apply.
///
/// # Returns
///
/// A new `FilteringReader`.
///
/// # Examples
///
/// ```ignore
/// let reader = std::io::Cursor::new(b"data");
/// let filter_chain = parse_filter_string("head(10)")?;
/// let filtered = FilteringReader::new(reader, Some(filter_chain));
/// ```
pub fn new(reader: R, filter_chain: Option<filter_plugin::FilterChain>) -> Self {
Self {
reader,
filter_chain,
buffer: Vec::new(),
buffer_pos: 0,
temp_buf: vec![0; 8192],
}
}
}
impl<R: Read> Read for FilteringReader<R> {
/// Reads data, applying the filter chain if present.
///
/// If buffered data exists, serves it first. Otherwise, reads a chunk, filters it,
/// and serves the output. Handles EOF properly.
///
/// # Arguments
///
/// * `buf` - Buffer to fill with filtered data.
///
/// # Returns
///
/// * `io::Result<usize>` - Number of bytes read, or I/O error.
///
/// # Errors
///
/// Propagates errors from underlying reader or filter operations.
///
/// # Examples
///
/// ```ignore
/// let mut filtered = FilteringReader::new(std::io::Cursor::new(b"Hello"), None);
/// let mut buf = [0; 5];
/// let n = filtered.read(&mut buf).unwrap();
/// assert_eq!(n, 5);
/// ```
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// If we have data in our buffer, serve that first
if self.buffer_pos < self.buffer.len() {
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
buf[..bytes_to_copy]
.copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]);
self.buffer_pos += bytes_to_copy;
return Ok(bytes_to_copy);
}
// Reset buffer for new data
self.buffer.clear();
self.buffer_pos = 0;
// No filter chain — pass through directly, no intermediate buffer needed
if self.filter_chain.is_none() {
return self.reader.read(buf);
}
// Read chunks and process through the filter chain.
// Loop because filters like skip_lines may consume entire chunks
// without producing output — that is not EOF, we must keep reading.
let chain = self.filter_chain.as_mut().unwrap();
loop {
let to_read = std::cmp::min(buf.len(), self.temp_buf.len());
let bytes_read = self.reader.read(&mut self.temp_buf[..to_read])?;
if bytes_read == 0 {
// True EOF from the underlying reader
return Ok(0);
}
let mut input_cursor = std::io::Cursor::new(&self.temp_buf[..bytes_read]);
chain.filter(&mut input_cursor, &mut self.buffer)?;
if !self.buffer.is_empty() {
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len());
buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]);
self.buffer_pos = bytes_to_copy;
return Ok(bytes_to_copy);
}
// Filter produced no output for this chunk — read another
}
}
}