diff --git a/src/filter_plugin/grep.rs b/src/filter_plugin/grep.rs index e69de29..5bf0e2f 100644 --- a/src/filter_plugin/grep.rs +++ b/src/filter_plugin/grep.rs @@ -0,0 +1,65 @@ +use super::FilterPlugin; +use std::io::Result; +use regex::Regex; + +pub struct GrepFilter { + regex: Regex, + buffer: Vec, +} + +impl GrepFilter { + pub fn new(pattern: String) -> Result { + let regex = Regex::new(&pattern) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; + Ok(Self { + regex, + buffer: Vec::new(), + }) + } +} + +impl FilterPlugin for GrepFilter { + fn process(&mut self, data: &[u8]) -> Result> { + self.buffer.extend_from_slice(data); + + let mut result = Vec::new(); + let mut lines = Vec::new(); + let mut start = 0; + + // Split into lines + for (i, &byte) in self.buffer.iter().enumerate() { + if byte == b'\n' { + lines.push(&self.buffer[start..=i]); + start = i + 1; + } + } + + // Keep the remaining data in buffer + self.buffer.drain(0..start); + + // Filter lines that match the regex + for line in lines { + if let Ok(line_str) = std::str::from_utf8(line) { + if self.regex.is_match(line_str) { + result.extend_from_slice(line); + } + } + } + + Ok(result) + } + + fn finish(&mut self) -> Result> { + // Process any remaining data in buffer + let mut result = Vec::new(); + if !self.buffer.is_empty() { + if let Ok(line_str) = std::str::from_utf8(&self.buffer) { + if self.regex.is_match(line_str) { + result.extend_from_slice(&self.buffer); + } + } + self.buffer.clear(); + } + Ok(result) + } +} diff --git a/src/filter_plugin/head.rs b/src/filter_plugin/head.rs index e69de29..e720fe9 100644 --- a/src/filter_plugin/head.rs +++ b/src/filter_plugin/head.rs @@ -0,0 +1,86 @@ +use super::FilterPlugin; +use std::io::Result; + +pub struct HeadBytesFilter { + remaining: usize, +} + +impl HeadBytesFilter { + pub fn new(count: usize) -> Self { + Self { + remaining: count, + } + } +} + +impl FilterPlugin for HeadBytesFilter { + fn process(&mut self, data: &[u8]) -> Result> { + if self.remaining == 0 { + return Ok(Vec::new()); + } + + let bytes_to_take = std::cmp::min(data.len(), self.remaining); + self.remaining -= bytes_to_take; + Ok(data[..bytes_to_take].to_vec()) + } + + fn finish(&mut self) -> Result> { + Ok(Vec::new()) + } +} + +pub struct HeadLinesFilter { + remaining: usize, + buffer: Vec, +} + +impl HeadLinesFilter { + pub fn new(count: usize) -> Self { + Self { + remaining: count, + buffer: Vec::new(), + } + } +} + +impl FilterPlugin for HeadLinesFilter { + fn process(&mut self, data: &[u8]) -> Result> { + if self.remaining == 0 { + return Ok(Vec::new()); + } + + let mut result = Vec::new(); + let mut start = 0; + + for (i, &byte) in data.iter().enumerate() { + if byte == b'\n' { + self.buffer.extend_from_slice(&data[start..=i]); + result.extend_from_slice(&self.buffer); + self.buffer.clear(); + start = i + 1; + + self.remaining -= 1; + if self.remaining == 0 { + break; + } + } + } + + // Add remaining data to buffer + if start < data.len() { + self.buffer.extend_from_slice(&data[start..]); + } + + Ok(result) + } + + fn finish(&mut self) -> Result> { + if self.remaining > 0 && !self.buffer.is_empty() { + let result = self.buffer.clone(); + self.buffer.clear(); + Ok(result) + } else { + Ok(Vec::new()) + } + } +} diff --git a/src/filter_plugin/mod.rs b/src/filter_plugin/mod.rs index e69de29..2b8ebbb 100644 --- a/src/filter_plugin/mod.rs +++ b/src/filter_plugin/mod.rs @@ -0,0 +1,88 @@ +use std::io::{Read, Result}; +use regex::Regex; +use ringbuf::HeapRb; + +pub mod head; +pub mod tail; +pub mod grep; +pub mod skip; + +pub trait FilterPlugin: Send { + fn process(&mut self, data: &[u8]) -> Result>; + fn finish(&mut self) -> Result>; +} + +pub struct FilterChain { + plugins: Vec>, +} + +impl FilterChain { + pub fn new() -> Self { + Self { + plugins: Vec::new(), + } + } + + pub fn add_plugin(&mut self, plugin: Box) { + self.plugins.push(plugin); + } + + pub fn process(&mut self, data: &[u8]) -> Result> { + let mut current_data = data.to_vec(); + for plugin in &mut self.plugins { + current_data = plugin.process(¤t_data)?; + } + Ok(current_data) + } + + pub fn finish(&mut self) -> Result> { + let mut current_data = Vec::new(); + for plugin in &mut self.plugins { + let processed = plugin.finish()?; + if !processed.is_empty() { + current_data = processed; + } + } + Ok(current_data) + } +} + +// Helper function to parse filter string and create appropriate plugins +pub fn parse_filter_string(filter_str: &str) -> Result { + let mut chain = FilterChain::new(); + + for part in filter_str.split('|') { + let part = part.trim(); + if part.is_empty() { + continue; + } + + if let Some(stripped) = part.strip_prefix("grep(").and_then(|s| s.strip_suffix(')')) { + // Remove quotes if present + let pattern = stripped.trim_matches(|c| c == '\'' || c == '"'); + chain.add_plugin(Box::new(grep::GrepFilter::new(pattern.to_string())?)); + } else if let Some(stripped) = part.strip_prefix("head_bytes(").and_then(|s| s.strip_suffix(')')) { + let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; + chain.add_plugin(Box::new(head::HeadBytesFilter::new(count))); + } else if let Some(stripped) = part.strip_prefix("head_lines(").and_then(|s| s.strip_suffix(')')) { + let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; + chain.add_plugin(Box::new(head::HeadLinesFilter::new(count))); + } else if let Some(stripped) = part.strip_prefix("tail_bytes(").and_then(|s| s.strip_suffix(')')) { + let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; + chain.add_plugin(Box::new(tail::TailBytesFilter::new(count)?)); + } else if let Some(stripped) = part.strip_prefix("tail_lines(").and_then(|s| s.strip_suffix(')')) { + let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; + chain.add_plugin(Box::new(tail::TailLinesFilter::new(count)?)); + } else if let Some(stripped) = part.strip_prefix("skip_bytes(").and_then(|s| s.strip_suffix(')')) { + let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; + chain.add_plugin(Box::new(skip::SkipBytesFilter::new(count))); + } else if let Some(stripped) = part.strip_prefix("skip_lines(").and_then(|s| s.strip_suffix(')')) { + let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; + chain.add_plugin(Box::new(skip::SkipLinesFilter::new(count))); + } else { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("Unknown filter: {}", part))); + } + } + + Ok(chain) +} diff --git a/src/filter_plugin/skip.rs b/src/filter_plugin/skip.rs new file mode 100644 index 0000000..9feb243 --- /dev/null +++ b/src/filter_plugin/skip.rs @@ -0,0 +1,98 @@ +use super::FilterPlugin; +use std::io::Result; + +pub struct SkipBytesFilter { + remaining: usize, +} + +impl SkipBytesFilter { + pub fn new(count: usize) -> Self { + Self { + remaining: count, + } + } +} + +impl FilterPlugin for SkipBytesFilter { + fn process(&mut self, data: &[u8]) -> Result> { + if self.remaining == 0 { + return Ok(data.to_vec()); + } + + if data.len() <= self.remaining { + self.remaining -= data.len(); + Ok(Vec::new()) + } else { + let result = data[self.remaining..].to_vec(); + self.remaining = 0; + Ok(result) + } + } + + fn finish(&mut self) -> Result> { + Ok(Vec::new()) + } +} + +pub struct SkipLinesFilter { + remaining: usize, + buffer: Vec, +} + +impl SkipLinesFilter { + pub fn new(count: usize) -> Self { + Self { + remaining: count, + buffer: Vec::new(), + } + } +} + +impl FilterPlugin for SkipLinesFilter { + fn process(&mut self, data: &[u8]) -> Result> { + if self.remaining == 0 { + let mut result = self.buffer.clone(); + result.extend_from_slice(data); + self.buffer.clear(); + return Ok(result); + } + + let mut result = Vec::new(); + let mut start = 0; + + for (i, &byte) in data.iter().enumerate() { + if byte == b'\n' { + if self.remaining > 0 { + self.remaining -= 1; + start = i + 1; + } else { + self.buffer.extend_from_slice(&data[start..=i]); + result.extend_from_slice(&self.buffer); + self.buffer.clear(); + start = i + 1; + } + } + } + + // Add remaining data to buffer + if start < data.len() { + if self.remaining == 0 { + result.extend_from_slice(&data[start..]); + } else { + self.buffer.extend_from_slice(&data[start..]); + } + } + + Ok(result) + } + + fn finish(&mut self) -> Result> { + if self.remaining == 0 { + let result = self.buffer.clone(); + self.buffer.clear(); + Ok(result) + } else { + Ok(Vec::new()) + } + } +} diff --git a/src/filter_plugin/tail.rs b/src/filter_plugin/tail.rs index e69de29..9d83bbc 100644 --- a/src/filter_plugin/tail.rs +++ b/src/filter_plugin/tail.rs @@ -0,0 +1,95 @@ +use super::FilterPlugin; +use std::io::Result; +use ringbuf::HeapRb; + +pub struct TailBytesFilter { + ring_buffer: HeapRb, + count: usize, +} + +impl TailBytesFilter { + pub fn new(count: usize) -> Result { + Ok(Self { + ring_buffer: HeapRb::new(count), + count, + }) + } +} + +impl FilterPlugin for TailBytesFilter { + fn process(&mut self, data: &[u8]) -> Result> { + for &byte in data { + let _ = self.ring_buffer.push(byte); + } + Ok(Vec::new()) + } + + fn finish(&mut self) -> Result> { + let mut result = Vec::with_capacity(self.ring_buffer.len()); + for byte in self.ring_buffer.iter() { + result.push(*byte); + } + Ok(result) + } +} + +pub struct TailLinesFilter { + ring_buffer: HeapRb, + count: usize, + lines_found: usize, +} + +impl TailLinesFilter { + pub fn new(count: usize) -> Result { + Ok(Self { + ring_buffer: HeapRb::new(count * 256), // Estimate 256 bytes per line + count, + lines_found: 0, + }) + } +} + +impl FilterPlugin for TailLinesFilter { + fn process(&mut self, data: &[u8]) -> Result> { + for &byte in data { + let _ = self.ring_buffer.push(byte); + if byte == b'\n' { + self.lines_found += 1; + } + } + Ok(Vec::new()) + } + + fn finish(&mut self) -> Result> { + // Count lines in the buffer to find where to start + let mut lines_to_keep = std::cmp::min(self.count, self.lines_found); + let mut bytes_to_keep = 0; + let mut lines_seen = 0; + + // Iterate backwards to find the starting point + for i in (0..self.ring_buffer.len()).rev() { + let index = (self.ring_buffer.write_index() as isize - 1 - i as isize) + .rem_euclid(self.ring_buffer.capacity() as isize) as usize; + let byte = self.ring_buffer[index]; + + if byte == b'\n' { + lines_seen += 1; + if lines_seen > lines_to_keep { + break; + } + } + bytes_to_keep += 1; + } + + // Extract the relevant bytes + let start_index = self.ring_buffer.len() - bytes_to_keep; + let mut result = Vec::with_capacity(bytes_to_keep); + for i in start_index..self.ring_buffer.len() { + let index = (self.ring_buffer.write_index() as isize - (self.ring_buffer.len() - i) as isize) + .rem_euclid(self.ring_buffer.capacity() as isize) as usize; + result.push(self.ring_buffer[index]); + } + + Ok(result) + } +} diff --git a/src/services/filter_service.rs b/src/services/filter_service.rs index e69de29..a3ff476 100644 --- a/src/services/filter_service.rs +++ b/src/services/filter_service.rs @@ -0,0 +1,34 @@ +use crate::filter_plugin::{FilterChain, parse_filter_string}; +use std::io::Result; + +pub struct FilterService; + +impl FilterService { + pub fn new() -> Self { + Self + } + + pub fn create_filter_chain(&self, filter_str: Option<&str>) -> Result> { + if let Some(filter_str) = filter_str { + parse_filter_string(filter_str).map(Some) + } else { + Ok(None) + } + } + + pub fn process_data(&self, chain: &mut Option, data: &[u8]) -> Result> { + if let Some(chain) = chain { + chain.process(data) + } else { + Ok(data.to_vec()) + } + } + + pub fn finish_processing(&self, chain: &mut Option) -> Result> { + if let Some(chain) = chain { + chain.finish() + } else { + Ok(Vec::new()) + } + } +} diff --git a/src/services/item_service.rs b/src/services/item_service.rs index 7d74762..9b4703d 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -107,6 +107,7 @@ impl ItemService { line_start: Option, line_end: Option, grep: Option, + filter: Option, ) -> Result<(Box, String, bool), CoreError> { let item_with_meta = self.get_item(conn, id)?; let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; @@ -120,35 +121,40 @@ impl ItemService { let reader = self.compression_service.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?; - // Apply content filtering - let filtered_reader: Box = if let Some(pattern) = grep { - Box::new(GrepFilter::new( - reader, - pattern, - )?) - } else if head_bytes.is_some() || head_words.is_some() || head_lines.is_some() { - Box::new(HeadFilter::new( - reader, - head_bytes, - head_words, - head_lines, - )) - } else if tail_bytes.is_some() || tail_words.is_some() || tail_lines.is_some() { - Box::new(TailFilter::new( - reader, - tail_bytes, - tail_words, - tail_lines, - )?) - } else if line_start.is_some() || line_end.is_some() { - Box::new(LineRangeFilter::new( - reader, - line_start, - line_end, - )) - } else { - Box::new(reader) - }; + // Build filter string from individual parameters (for backward compatibility) + let mut filter_parts = Vec::new(); + if let Some(pattern) = grep { + filter_parts.push(format!("grep('{}')", pattern)); + } + if let Some(bytes) = head_bytes { + filter_parts.push(format!("head_bytes({})", bytes)); + } + if let Some(lines) = head_lines { + filter_parts.push(format!("head_lines({})", lines)); + } + if let Some(bytes) = tail_bytes { + filter_parts.push(format!("tail_bytes({})", bytes)); + } + if let Some(lines) = tail_lines { + filter_parts.push(format!("tail_lines({})", lines)); + } + // Add other filters as needed + + // Use the provided filter string if available, otherwise build from parts + let filter_str = filter.or_else(|| { + if filter_parts.is_empty() { + None + } else { + Some(filter_parts.join(" | ")) + } + }); + + // Create filter chain + let filter_service = crate::services::filter_service::FilterService::new(); + let mut filter_chain = filter_service.create_filter_chain(filter_str.as_deref())?; + + // Wrap the reader with filtering + let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain)); let metadata = item_with_meta.meta_as_map(); let mime_type = metadata @@ -909,6 +915,58 @@ impl Read for GrepFilter { } } +// Filtering reader that applies filter plugins +struct FilteringReader { + inner: R, + filter_chain: Option, + buffer: Vec, + buffer_pos: usize, +} + +impl FilteringReader { + fn new(inner: R, filter_chain: Option) -> Self { + Self { + inner, + filter_chain, + buffer: Vec::new(), + buffer_pos: 0, + } + } +} + +impl Read for FilteringReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + if self.buffer_pos >= self.buffer.len() { + // Read more data from the inner reader + let mut temp_buf = vec![0; 8192]; + let n = self.inner.read(&mut temp_buf)?; + if n == 0 { + // End of input, finish filtering + let filter_service = crate::services::filter_service::FilterService::new(); + let remaining = filter_service.finish_processing(&mut self.filter_chain)?; + self.buffer = remaining; + self.buffer_pos = 0; + if self.buffer.is_empty() { + return Ok(0); + } + } else { + // Process the chunk + let filter_service = crate::services::filter_service::FilterService::new(); + let processed = filter_service.process_data(&mut self.filter_chain, &temp_buf[..n])?; + self.buffer = processed; + self.buffer_pos = 0; + } + } + + // Copy from buffer to output + let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos); + buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]); + self.buffer_pos += bytes_to_copy; + + Ok(bytes_to_copy) + } +} + // Line range filter implementation struct LineRangeFilter { inner: R,