diff --git a/src/services/item_service.rs b/src/services/item_service.rs index 4c15f69..4439dbf 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -16,6 +16,79 @@ use std::fs; use std::io::{IsTerminal, Read, Write}; use std::path::PathBuf; +/// A reader that applies a filter chain to the data as it's read +struct FilteringReader { + reader: R, + filter_chain: Option, + buffer: Vec, + buffer_pos: usize, +} + +impl FilteringReader { + pub fn new(reader: R, filter_chain: Option) -> Self { + Self { + reader, + filter_chain, + buffer: Vec::new(), + buffer_pos: 0, + } + } +} + +impl Read for FilteringReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // If we have data in our buffer, serve that first + if self.buffer_pos < self.buffer.len() { + let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos); + buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]); + self.buffer_pos += bytes_to_copy; + return Ok(bytes_to_copy); + } + + // Reset buffer for new data + self.buffer.clear(); + self.buffer_pos = 0; + + // Read from the original reader + let mut temp_buf = vec![0; buf.len()]; + let bytes_read = self.reader.read(&mut temp_buf)?; + + if bytes_read == 0 { + // If we're at EOF, process any remaining data in the filter chain + if let Some(chain) = &mut self.filter_chain { + let finished_data = chain.finish()?; + if !finished_data.is_empty() { + self.buffer = finished_data; + let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len()); + buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]); + self.buffer_pos = bytes_to_copy; + return Ok(bytes_to_copy); + } + } + return Ok(0); + } + + // Process through the filter chain if it exists + if let Some(chain) = &mut self.filter_chain { + let processed_data = chain.process(&temp_buf[..bytes_read])?; + if !processed_data.is_empty() { + self.buffer = processed_data; + let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len()); + buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]); + self.buffer_pos = bytes_to_copy; + Ok(bytes_to_copy) + } else { + // No data produced by filter, try reading more + Ok(0) + } + } else { + // No filter chain, just pass through + buf[..bytes_read].copy_from_slice(&temp_buf[..bytes_read]); + Ok(bytes_read) + } + } +} + pub struct ItemService { data_path: PathBuf, compression_service: CompressionService,