This commit is contained in:
Andrew Phillips
2026-02-19 13:57:39 -04:00
parent a72395fe83
commit fdeb5f7951
82 changed files with 2756 additions and 2018 deletions

View File

@@ -1,14 +1,14 @@
use crate::common::PIPESIZE;
use crate::compression_engine::{CompressionType, get_compression_engine};
use crate::config::Settings;
use crate::db::{self, Meta};
use crate::filter_plugin;
use crate::modes::common::settings_compression_type;
use crate::services::compression_service::CompressionService;
use crate::services::error::CoreError;
use crate::services::filter_service::FilterService;
use crate::services::meta_service::MetaService;
use crate::services::types::{ItemWithContent, ItemWithMeta};
use crate::db::{self, Meta};
use crate::compression_engine::{get_compression_engine, CompressionType};
use crate::modes::common::settings_compression_type;
use crate::filter_plugin;
use clap::Command;
use log::debug;
use rusqlite::Connection;
@@ -53,7 +53,10 @@ impl ItemService {
/// let service = ItemService::new(PathBuf::from("/data"));
/// ```
pub fn new(data_path: PathBuf) -> Self {
debug!("ITEM_SERVICE: Creating new ItemService with data_path: {:?}", data_path);
debug!(
"ITEM_SERVICE: Creating new ItemService with data_path: {:?}",
data_path
);
Self {
data_path,
compression_service: CompressionService::new(),
@@ -93,7 +96,11 @@ impl ItemService {
let tags = db::get_item_tags(conn, &item)?;
debug!("ITEM_SERVICE: Found {} tags for item {}", tags.len(), id);
let meta = db::get_item_meta(conn, &item)?;
debug!("ITEM_SERVICE: Found {} meta entries for item {}", meta.len(), id);
debug!(
"ITEM_SERVICE: Found {} meta entries for item {}",
meta.len(),
id
);
Ok(ItemWithMeta { item, tags, meta })
}
@@ -121,13 +128,23 @@ impl ItemService {
/// let item_with_content = item_service.get_item_content(&conn, 1)?;
/// assert!(!item_with_content.content.is_empty());
/// ```
pub fn get_item_content(&self, conn: &Connection, id: i64) -> Result<ItemWithContent, CoreError> {
pub fn get_item_content(
&self,
conn: &Connection,
id: i64,
) -> Result<ItemWithContent, CoreError> {
debug!("ITEM_SERVICE: Getting item content for id: {}", id);
let item_with_meta = self.get_item(conn, id)?;
let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
let item_id = item_with_meta
.item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
if item_id <= 0 {
return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", item_id)));
return Err(CoreError::InvalidInput(format!(
"Invalid item ID: {}",
item_id
)));
}
let mut item_path = self.data_path.clone();
@@ -137,7 +154,11 @@ impl ItemService {
let content = self
.compression_service
.get_item_content(item_path, &item_with_meta.item.compression)?;
debug!("ITEM_SERVICE: Read {} bytes of content for item {}", content.len(), id);
debug!(
"ITEM_SERVICE: Read {} bytes of content for item {}",
content.len(),
id
);
Ok(ItemWithContent {
item_with_meta,
@@ -176,14 +197,13 @@ impl ItemService {
filter: Option<String>,
) -> Result<(Vec<u8>, String, bool), CoreError> {
// Use streaming approach to handle all filtering options consistently
let (mut reader, mime_type, is_binary) = self.get_item_content_info_streaming(
conn, id, filter
)?;
let (mut reader, mime_type, is_binary) =
self.get_item_content_info_streaming(conn, id, filter)?;
// Read all the filtered content into a buffer
let mut content = Vec::new();
reader.read_to_end(&mut content)?;
Ok((content, mime_type, is_binary))
}
@@ -222,13 +242,14 @@ impl ItemService {
}
// Read only the first 8192 bytes for binary detection
let mut sample_reader = self.compression_service.stream_item_content(
item_path,
compression
)?;
let mut sample_reader = self
.compression_service
.stream_item_content(item_path, compression)?;
let mut sample_buffer = vec![0; 8192];
let bytes_read = sample_reader.read(&mut sample_buffer)?;
Ok(crate::common::is_binary::is_binary(&sample_buffer[..bytes_read]))
Ok(crate::common::is_binary::is_binary(
&sample_buffer[..bytes_read],
))
}
/// Retrieves a streaming reader for item content with optional filtering.
@@ -263,12 +284,15 @@ impl ItemService {
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
// Convert filter string to FilterChain if provided
let filter_chain = if let Some(filter_str) = filter {
self.filter_service.create_filter_chain(Some(&filter_str))
.map_err(|e| CoreError::InvalidInput(format!("Failed to create filter chain: {}", e)))?
self.filter_service
.create_filter_chain(Some(&filter_str))
.map_err(|e| {
CoreError::InvalidInput(format!("Failed to create filter chain: {}", e))
})?
} else {
None
};
self.get_item_content_info_streaming_with_chain(conn, id, filter_chain.as_ref())
}
@@ -302,19 +326,24 @@ impl ItemService {
filter_chain: Option<&filter_plugin::FilterChain>,
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
let item_with_meta = self.get_item(conn, id)?;
let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
let item_id = item_with_meta
.item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
if item_id <= 0 {
return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", item_id)));
return Err(CoreError::InvalidInput(format!(
"Invalid item ID: {}",
item_id
)));
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
let reader = self.compression_service.stream_item_content(
item_path.clone(),
&item_with_meta.item.compression
)?;
let reader = self
.compression_service
.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
// Wrap the reader with filtering
let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain.cloned()));
@@ -326,11 +355,8 @@ impl ItemService {
.unwrap_or_else(|| "application/octet-stream".to_string());
// Check if content is binary
let is_binary = self.is_content_binary(
item_path,
&item_with_meta.item.compression,
&metadata
)?;
let is_binary =
self.is_content_binary(item_path, &item_with_meta.item.compression, &metadata)?;
Ok((filtered_reader, mime_type, is_binary))
}
@@ -360,17 +386,26 @@ impl ItemService {
/// ```
/// let item = item_service.find_item(&conn, vec![1], &vec![], &HashMap::new())?;
/// ```
pub fn find_item(&self, conn: &Connection, ids: &[i64], tags: &[String], meta: &HashMap<String, String>) -> Result<ItemWithMeta, CoreError> {
debug!("ITEM_SERVICE: Finding item with ids: {:?}, tags: {:?}, meta: {:?}", ids, tags, meta);
pub fn find_item(
&self,
conn: &Connection,
ids: &[i64],
tags: &[String],
meta: &HashMap<String, String>,
) -> Result<ItemWithMeta, CoreError> {
debug!(
"ITEM_SERVICE: Finding item with ids: {:?}, tags: {:?}, meta: {:?}",
ids, tags, meta
);
let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) {
(false, _) => {
debug!("ITEM_SERVICE: Finding by ID: {}", ids[0]);
db::get_item(conn, ids[0])?
},
}
(true, true) => {
debug!("ITEM_SERVICE: Finding last item");
db::get_item_last(conn)?
},
}
(true, false) => {
debug!("ITEM_SERVICE: Finding by tags/meta");
db::get_item_matching(conn, &tags.to_vec(), meta)?
@@ -381,11 +416,21 @@ impl ItemService {
debug!("ITEM_SERVICE: Found matching item: {:?}", item);
// Get tags and meta directly instead of calling get_item which makes redundant queries
let item_id = item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
let item_id = item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
let tags = db::get_item_tags(conn, &item)?;
debug!("ITEM_SERVICE: Found {} tags for item {}", tags.len(), item_id);
debug!(
"ITEM_SERVICE: Found {} tags for item {}",
tags.len(),
item_id
);
let meta = db::get_item_meta(conn, &item)?;
debug!("ITEM_SERVICE: Found {} meta entries for item {}", meta.len(), item_id);
debug!(
"ITEM_SERVICE: Found {} meta entries for item {}",
meta.len(),
item_id
);
Ok(ItemWithMeta { item, tags, meta })
}
@@ -413,8 +458,16 @@ impl ItemService {
/// ```
/// let items = item_service.list_items(&conn, &vec!["work"], &HashMap::new())?;
/// ```
pub fn list_items(&self, conn: &Connection, tags: &[String], meta: &HashMap<String, String>) -> Result<Vec<ItemWithMeta>, CoreError> {
debug!("ITEM_SERVICE: Listing items with tags: {:?}, meta: {:?}", tags, meta);
pub fn list_items(
&self,
conn: &Connection,
tags: &[String],
meta: &HashMap<String, String>,
) -> Result<Vec<ItemWithMeta>, CoreError> {
debug!(
"ITEM_SERVICE: Listing items with tags: {:?}, meta: {:?}",
tags, meta
);
let items = db::get_items_matching(conn, &tags.to_vec(), meta)?;
debug!("ITEM_SERVICE: Found {} matching items", items.len());
@@ -424,7 +477,10 @@ impl ItemService {
return Ok(Vec::new());
}
debug!("ITEM_SERVICE: Getting tags and meta for {} items", item_ids.len());
debug!(
"ITEM_SERVICE: Getting tags and meta for {} items",
item_ids.len()
);
let tags_map = db::get_tags_for_items(conn, &item_ids)?;
let meta_map_db = db::get_meta_for_items(conn, &item_ids)?;
@@ -433,12 +489,22 @@ impl ItemService {
let item_id = item.id.unwrap();
let tags = tags_map.get(&item_id).cloned().unwrap_or_default();
let meta_hm = meta_map_db.get(&item_id).cloned().unwrap_or_default();
let meta = meta_hm.into_iter().map(|(name, value)| Meta { id: item_id, name, value }).collect();
let meta = meta_hm
.into_iter()
.map(|(name, value)| Meta {
id: item_id,
name,
value,
})
.collect();
result.push(ItemWithMeta { item, tags, meta });
}
debug!("ITEM_SERVICE: Returning {} items with full metadata", result.len());
debug!(
"ITEM_SERVICE: Returning {} items with full metadata",
result.len()
);
Ok(result)
}
@@ -478,7 +544,13 @@ impl ItemService {
debug!("ITEM_SERVICE: Deleting file at path: {:?}", item_path);
db::delete_item(conn, item)?;
fs::remove_file(&item_path).or_else(|e| if e.kind() == std::io::ErrorKind::NotFound { Ok(()) } else { Err(e) })?;
fs::remove_file(&item_path).or_else(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
Ok(())
} else {
Err(e)
}
})?;
debug!("ITEM_SERVICE: Successfully deleted item {}", id);
Ok(())
@@ -522,12 +594,15 @@ impl ItemService {
) -> Result<i64, CoreError> {
debug!("ITEM_SERVICE: Starting save_item with tags: {:?}", tags);
if tags.is_empty() {
tags.push("none".to_string());
debug!("ITEM_SERVICE: No tags provided, using default 'none' tag");
tags.push("none".to_string());
debug!("ITEM_SERVICE: No tags provided, using default 'none' tag");
}
let compression_type = settings_compression_type(cmd, settings);
debug!("ITEM_SERVICE: Using compression type: {:?}", compression_type);
debug!(
"ITEM_SERVICE: Using compression type: {:?}",
compression_type
);
let compression_engine = get_compression_engine(compression_type.clone())?;
let item_id;
@@ -539,9 +614,12 @@ impl ItemService {
db::set_item_tags(conn, item.clone(), tags)?;
debug!("ITEM_SERVICE: Set tags for item {}", item_id);
let item_meta = self.meta_service.collect_initial_meta();
debug!("ITEM_SERVICE: Collected {} initial meta entries", item_meta.len());
debug!(
"ITEM_SERVICE: Collected {} initial meta entries",
item_meta.len()
);
for (k, v) in item_meta.iter() {
db::add_meta(conn, item_id, k, v)?;
db::add_meta(conn, item_id, k, v)?;
}
}
@@ -571,7 +649,8 @@ impl ItemService {
let mut plugins = self.meta_service.get_plugins(cmd, settings);
debug!("ITEM_SERVICE: Got {} meta plugins", plugins.len());
self.meta_service.initialize_plugins(&mut plugins, conn, item_id);
self.meta_service
.initialize_plugins(&mut plugins, conn, item_id);
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
@@ -585,11 +664,14 @@ impl ItemService {
debug!("ITEM_SERVICE: Starting to read and process input data");
loop {
let n = input.read(&mut buffer)?;
if n == 0 { break; }
if n == 0 {
break;
}
total_bytes += n as i64;
item_out.write_all(&buffer[..n])?;
self.meta_service.process_chunk(&mut plugins, &buffer[..n], conn, item_id);
self.meta_service
.process_chunk(&mut plugins, &buffer[..n], conn, item_id);
}
debug!("ITEM_SERVICE: Processed {} bytes total", total_bytes);
@@ -597,7 +679,8 @@ impl ItemService {
drop(item_out);
debug!("ITEM_SERVICE: Finalizing meta plugins");
self.meta_service.finalize_plugins(&mut plugins, conn, item_id);
self.meta_service
.finalize_plugins(&mut plugins, conn, item_id);
item.size = Some(total_bytes);
db::update_item(conn, item.clone())?;
@@ -646,8 +729,12 @@ impl ItemService {
settings: &Settings,
conn: &mut Connection,
) -> Result<ItemWithMeta, CoreError> {
debug!("ITEM_SERVICE: Starting save_item_from_mcp with {} bytes, {} tags, {} metadata entries",
content.len(), tags.len(), metadata.len());
debug!(
"ITEM_SERVICE: Starting save_item_from_mcp with {} bytes, {} tags, {} metadata entries",
content.len(),
tags.len(),
metadata.len()
);
let compression_type = CompressionType::LZ4;
let compression_engine = get_compression_engine(compression_type.clone())?;
@@ -669,7 +756,10 @@ impl ItemService {
for (key, value) in metadata {
db::add_meta(conn, item_id, key, value)?;
}
debug!("ITEM_SERVICE: Added {} custom metadata entries to MCP item", metadata.len());
debug!(
"ITEM_SERVICE: Added {} custom metadata entries to MCP item",
metadata.len()
);
}
let mut item_path = self.data_path.clone();
@@ -681,11 +771,17 @@ impl ItemService {
drop(writer);
let mut plugins = self.meta_service.get_plugins(cmd, settings);
debug!("ITEM_SERVICE: Got {} configured meta plugins for MCP item", plugins.len());
self.meta_service.initialize_plugins(&mut plugins, conn, item_id);
self.meta_service.process_chunk(&mut plugins, content, conn, item_id);
self.meta_service.finalize_plugins(&mut plugins, conn, item_id);
debug!(
"ITEM_SERVICE: Got {} configured meta plugins for MCP item",
plugins.len()
);
self.meta_service
.initialize_plugins(&mut plugins, conn, item_id);
self.meta_service
.process_chunk(&mut plugins, content, conn, item_id);
self.meta_service
.finalize_plugins(&mut plugins, conn, item_id);
debug!("ITEM_SERVICE: Processed MCP item through configured meta plugins");
item.size = Some(content.len() as i64);
@@ -713,7 +809,6 @@ impl ItemService {
pub fn get_data_path(&self) -> &PathBuf {
&self.data_path
}
}
/// A reader that applies a filter chain to the data as it's read.
@@ -754,8 +849,8 @@ impl<R: Read> FilteringReader<R> {
/// let filtered = FilteringReader::new(reader, Some(filter_chain));
/// ```
pub fn new(reader: R, filter_chain: Option<filter_plugin::FilterChain>) -> Self {
Self {
reader,
Self {
reader,
filter_chain,
buffer: Vec::new(),
buffer_pos: 0,
@@ -793,7 +888,8 @@ impl<R: Read> Read for FilteringReader<R> {
// If we have data in our buffer, serve that first
if self.buffer_pos < self.buffer.len() {
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]);
buf[..bytes_to_copy]
.copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]);
self.buffer_pos += bytes_to_copy;
return Ok(bytes_to_copy);
}
@@ -816,7 +912,7 @@ impl<R: Read> Read for FilteringReader<R> {
let mut input_cursor = std::io::Cursor::new(&temp_buf[..bytes_read]);
// Write filtered output to our buffer
chain.filter(&mut input_cursor, &mut self.buffer)?;
if !self.buffer.is_empty() {
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len());
buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]);