fix: correct critical bugs and improve pipe streaming performance

Critical bug fixes:
- save_item now returns real Item from database, not a hardcoded fake
- AsyncDataService::save() reuses self.sync_service instead of creating redundant instance
- GenerateStatus trait signature mismatch fixed (CLI/API decoupling)

Performance improvements (pipe path untouched):
- CompressionEngine::open() returns Box<dyn Read + Send> enabling true streaming
- mode_get eliminates triple full-file read (was sampling then re-reading entire file)
- FilteringReader adds fast-path bypass when no filters, pre-allocates temp buffer
- text.rs meta plugin processes &[u8] slice directly, eliminates data.to_vec() clone

API correctness:
- Tag parse errors now return 400 instead of being silently discarded
- compute_diff uses similar crate (LCS-based) instead of naive positional comparison

Cleanup:
- Modernize string formatting (format!({x})) across codebase
- Remove redundant DB query in get mode
- Derive Debug/ToSchema on public types
- Delete placeholder test files with no real assertions
- Extract parse_comma_tags utility function
This commit is contained in:
2026-03-11 20:45:05 -03:00
parent e8ea42506e
commit 8a8a6e1c4b
53 changed files with 813 additions and 640 deletions

View File

@@ -1,7 +1,7 @@
use crate::common::PIPESIZE;
use crate::compression_engine::{CompressionType, get_compression_engine};
use crate::compression_engine::{get_compression_engine, CompressionType};
use crate::config::Settings;
use crate::db::{self, Meta};
use crate::db::{self, Item, Meta};
use crate::filter_plugin;
use crate::modes::common::settings_compression_type;
use crate::services::compression_service::CompressionService;
@@ -53,10 +53,7 @@ impl ItemService {
/// let service = ItemService::new(PathBuf::from("/data"));
/// ```
pub fn new(data_path: PathBuf) -> Self {
debug!(
"ITEM_SERVICE: Creating new ItemService with data_path: {:?}",
data_path
);
debug!("ITEM_SERVICE: Creating new ItemService with data_path: {data_path:?}");
Self {
data_path,
compression_service: CompressionService::new(),
@@ -90,9 +87,9 @@ impl ItemService {
/// assert_eq!(item_with_meta.item.id, Some(1));
/// ```
pub fn get_item(&self, conn: &Connection, id: i64) -> Result<ItemWithMeta, CoreError> {
debug!("ITEM_SERVICE: Getting item with id: {}", id);
debug!("ITEM_SERVICE: Getting item with id: {id}");
let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?;
debug!("ITEM_SERVICE: Found item: {:?}", item);
debug!("ITEM_SERVICE: Found item: {item:?}");
let tags = db::get_item_tags(conn, &item)?;
debug!("ITEM_SERVICE: Found {} tags for item {}", tags.len(), id);
let meta = db::get_item_meta(conn, &item)?;
@@ -133,7 +130,7 @@ impl ItemService {
conn: &Connection,
id: i64,
) -> Result<ItemWithContent, CoreError> {
debug!("ITEM_SERVICE: Getting item content for id: {}", id);
debug!("ITEM_SERVICE: Getting item content for id: {id}");
let item_with_meta = self.get_item(conn, id)?;
let item_id = item_with_meta
.item
@@ -142,14 +139,13 @@ impl ItemService {
if item_id <= 0 {
return Err(CoreError::InvalidInput(format!(
"Invalid item ID: {}",
item_id
"Invalid item ID: {item_id}"
)));
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
debug!("ITEM_SERVICE: Reading content from path: {:?}", item_path);
debug!("ITEM_SERVICE: Reading content from path: {item_path:?}");
let content = self
.compression_service
@@ -287,7 +283,7 @@ impl ItemService {
self.filter_service
.create_filter_chain(Some(&filter_str))
.map_err(|e| {
CoreError::InvalidInput(format!("Failed to create filter chain: {}", e))
CoreError::InvalidInput(format!("Failed to create filter chain: {e}"))
})?
} else {
None
@@ -333,8 +329,7 @@ impl ItemService {
if item_id <= 0 {
return Err(CoreError::InvalidInput(format!(
"Invalid item ID: {}",
item_id
"Invalid item ID: {item_id}"
)));
}
@@ -361,6 +356,45 @@ impl ItemService {
Ok((filtered_reader, mime_type, is_binary))
}
/// Like `get_item_content_info_streaming_with_chain` but accepts a pre-fetched `ItemWithMeta`
/// to avoid redundant DB queries.
pub fn get_item_content_info_streaming_with_item(
&self,
item_with_meta: ItemWithMeta,
filter_chain: Option<&filter_plugin::FilterChain>,
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
let item_id = item_with_meta
.item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
if item_id <= 0 {
return Err(CoreError::InvalidInput(format!(
"Invalid item ID: {item_id}"
)));
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
let reader = self
.compression_service
.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain.cloned()));
let metadata = item_with_meta.meta_as_map();
let mime_type = metadata
.get("mime_type")
.map(|s| s.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string());
let is_binary =
self.is_content_binary(item_path, &item_with_meta.item.compression, &metadata)?;
Ok((filtered_reader, mime_type, is_binary))
}
/// Finds an item by ID or tags/metadata criteria.
///
/// Supports lookup by ID, last item, or search by tags/metadata.
@@ -393,10 +427,7 @@ impl ItemService {
tags: &[String],
meta: &HashMap<String, String>,
) -> Result<ItemWithMeta, CoreError> {
debug!(
"ITEM_SERVICE: Finding item with ids: {:?}, tags: {:?}, meta: {:?}",
ids, tags, meta
);
debug!("ITEM_SERVICE: Finding item with ids: {ids:?}, tags: {tags:?}, meta: {meta:?}");
let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) {
(false, _) => {
debug!("ITEM_SERVICE: Finding by ID: {}", ids[0]);
@@ -413,7 +444,7 @@ impl ItemService {
};
let item = item_maybe.ok_or(CoreError::ItemNotFoundGeneric)?;
debug!("ITEM_SERVICE: Found matching item: {:?}", item);
debug!("ITEM_SERVICE: Found matching item: {item:?}");
// Get tags and meta directly instead of calling get_item which makes redundant queries
let item_id = item
@@ -464,10 +495,7 @@ impl ItemService {
tags: &[String],
meta: &HashMap<String, String>,
) -> Result<Vec<ItemWithMeta>, CoreError> {
debug!(
"ITEM_SERVICE: Listing items with tags: {:?}, meta: {:?}",
tags, meta
);
debug!("ITEM_SERVICE: Listing items with tags: {tags:?}, meta: {meta:?}");
let items = db::get_items_matching(conn, &tags.to_vec(), meta)?;
debug!("ITEM_SERVICE: Found {} matching items", items.len());
@@ -532,16 +560,16 @@ impl ItemService {
/// item_service.delete_item(&mut conn, 1)?;
/// ```
pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<(), CoreError> {
debug!("ITEM_SERVICE: Deleting item with id: {}", id);
debug!("ITEM_SERVICE: Deleting item with id: {id}");
if id <= 0 {
return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", id)));
return Err(CoreError::InvalidInput(format!("Invalid item ID: {id}")));
}
let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?;
debug!("ITEM_SERVICE: Found item to delete: {:?}", item);
debug!("ITEM_SERVICE: Found item to delete: {item:?}");
let mut item_path = self.data_path.clone();
item_path.push(id.to_string());
debug!("ITEM_SERVICE: Deleting file at path: {:?}", item_path);
debug!("ITEM_SERVICE: Deleting file at path: {item_path:?}");
db::delete_item(conn, item)?;
fs::remove_file(&item_path).or_else(|e| {
@@ -551,7 +579,7 @@ impl ItemService {
Err(e)
}
})?;
debug!("ITEM_SERVICE: Successfully deleted item {}", id);
debug!("ITEM_SERVICE: Successfully deleted item {id}");
Ok(())
}
@@ -571,7 +599,7 @@ impl ItemService {
///
/// # Returns
///
/// * `Result<i64, CoreError>` - The ID of the new item.
/// * `Result<Item, CoreError>` - The saved item with full details (id, size, compression, timestamp).
///
/// # Errors
///
@@ -582,7 +610,7 @@ impl ItemService {
///
/// ```
/// let reader = std::io::stdin();
/// let id = item_service.save_item(reader, &mut cmd, &settings, &mut vec![], &mut conn)?;
/// let item = item_service.save_item(reader, &mut cmd, &settings, &mut vec![], &mut conn)?;
/// ```
pub fn save_item<R: Read>(
&self,
@@ -591,18 +619,15 @@ impl ItemService {
settings: &Settings,
tags: &mut Vec<String>,
conn: &mut Connection,
) -> Result<i64, CoreError> {
debug!("ITEM_SERVICE: Starting save_item with tags: {:?}", tags);
) -> Result<Item, CoreError> {
debug!("ITEM_SERVICE: Starting save_item with tags: {tags:?}");
if tags.is_empty() {
tags.push("none".to_string());
debug!("ITEM_SERVICE: No tags provided, using default 'none' tag");
}
let compression_type = settings_compression_type(cmd, settings);
debug!(
"ITEM_SERVICE: Using compression type: {:?}",
compression_type
);
debug!("ITEM_SERVICE: Using compression type: {compression_type:?}");
let compression_engine = get_compression_engine(compression_type.clone())?;
let item_id;
@@ -610,9 +635,9 @@ impl ItemService {
{
item = db::create_item(conn, compression_type.clone())?;
item_id = item.id.unwrap();
debug!("ITEM_SERVICE: Created new item with id: {}", item_id);
debug!("ITEM_SERVICE: Created new item with id: {item_id}");
db::set_item_tags(conn, item.clone(), tags)?;
debug!("ITEM_SERVICE: Set tags for item {}", item_id);
debug!("ITEM_SERVICE: Set tags for item {item_id}");
let item_meta = self.meta_service.collect_initial_meta();
debug!(
"ITEM_SERVICE: Collected {} initial meta entries",
@@ -643,7 +668,7 @@ impl ItemService {
let _ = std::io::stderr().flush();
} else {
let mut t = std::io::stderr();
let _ = writeln!(t, "KEEP: New item: {} tags: {:?}", item_id, tags);
let _ = writeln!(t, "KEEP: New item: {item_id} tags: {tags:?}");
}
}
@@ -654,7 +679,7 @@ impl ItemService {
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
debug!("ITEM_SERVICE: Writing item to path: {:?}", item_path);
debug!("ITEM_SERVICE: Writing item to path: {item_path:?}");
let mut item_out = compression_engine.create(item_path.clone())?;
@@ -673,7 +698,7 @@ impl ItemService {
self.meta_service
.process_chunk(&mut plugins, &buffer[..n], conn, item_id);
}
debug!("ITEM_SERVICE: Processed {} bytes total", total_bytes);
debug!("ITEM_SERVICE: Processed {total_bytes} bytes total");
item_out.flush()?;
drop(item_out);
@@ -687,7 +712,7 @@ impl ItemService {
debug!("ITEM_SERVICE: Save completed successfully");
Ok(item_id)
Ok(item)
}
/// Saves pre-loaded content as a new item, typically from MCP (Machine-Common-Processing) sources.
@@ -744,7 +769,7 @@ impl ItemService {
{
item = db::create_item(conn, compression_type.clone())?;
item_id = item.id.unwrap();
debug!("ITEM_SERVICE: Created MCP item with id: {}", item_id);
debug!("ITEM_SERVICE: Created MCP item with id: {item_id}");
// Add tags
for tag in tags {
@@ -764,7 +789,7 @@ impl ItemService {
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
debug!("ITEM_SERVICE: Writing MCP item to path: {:?}", item_path);
debug!("ITEM_SERVICE: Writing MCP item to path: {item_path:?}");
let mut writer = compression_engine.create(item_path.clone())?;
writer.write_all(content)?;
@@ -827,6 +852,7 @@ struct FilteringReader<R: Read> {
filter_chain: Option<filter_plugin::FilterChain>,
buffer: Vec<u8>,
buffer_pos: usize,
temp_buf: Vec<u8>,
}
impl<R: Read> FilteringReader<R> {
@@ -854,6 +880,7 @@ impl<R: Read> FilteringReader<R> {
filter_chain,
buffer: Vec::new(),
buffer_pos: 0,
temp_buf: vec![0; 8192],
}
}
}
@@ -898,19 +925,22 @@ impl<R: Read> Read for FilteringReader<R> {
self.buffer.clear();
self.buffer_pos = 0;
// Read from the original reader into a temporary buffer
let mut temp_buf = vec![0; buf.len()];
let bytes_read = self.reader.read(&mut temp_buf)?;
// No filter chain — pass through directly, no intermediate buffer needed
if self.filter_chain.is_none() {
return self.reader.read(buf);
}
// Read from the original reader into the reusable temp buffer
let to_read = std::cmp::min(buf.len(), self.temp_buf.len());
let bytes_read = self.reader.read(&mut self.temp_buf[..to_read])?;
if bytes_read == 0 {
return Ok(0);
}
// Process through the filter chain if it exists
// Process through the filter chain
if let Some(ref mut chain) = self.filter_chain {
// Use a cursor to read the input data
let mut input_cursor = std::io::Cursor::new(&temp_buf[..bytes_read]);
// Write filtered output to our buffer
let mut input_cursor = std::io::Cursor::new(&self.temp_buf[..bytes_read]);
chain.filter(&mut input_cursor, &mut self.buffer)?;
if !self.buffer.is_empty() {
@@ -919,13 +949,11 @@ impl<R: Read> Read for FilteringReader<R> {
self.buffer_pos = bytes_to_copy;
Ok(bytes_to_copy)
} else {
// No data produced by filter, try reading more
// No data produced by filter, signal to read more
Ok(0)
}
} else {
// No filter chain, just pass through
buf[..bytes_read].copy_from_slice(&temp_buf[..bytes_read]);
Ok(bytes_read)
unreachable!()
}
}
}