diff --git a/src/services/async_item_service.rs b/src/services/async_item_service.rs index 113cf4d..0abe162 100644 --- a/src/services/async_item_service.rs +++ b/src/services/async_item_service.rs @@ -76,6 +76,7 @@ impl AsyncItemService { tail_lines: Option, line_start: Option, line_end: Option, + grep: Option, ) -> Result<(Vec, String, bool), CoreError> { let db = self.db.clone(); let item_service = self.item_service.clone(); @@ -92,7 +93,8 @@ impl AsyncItemService { tail_words, tail_lines, line_start, - line_end + line_end, + grep ) }) .await @@ -188,6 +190,7 @@ impl AsyncItemService { tail_lines: Option, line_start: Option, line_end: Option, + grep: Option, ) -> Result<(std::pin::Pin> + Send>>, String), CoreError> { // Use provided metadata to determine MIME type and binary status let mime_type = metadata @@ -236,7 +239,8 @@ impl AsyncItemService { tail_words, tail_lines, line_start, - line_end + line_end, + grep ).map(|(reader, _, _)| reader) }) .await @@ -326,6 +330,7 @@ impl AsyncItemService { tail_lines: Option, line_start: Option, line_end: Option, + grep: Option, ) -> Result<(Box, String, bool), CoreError> { let db = self.db.clone(); let item_service = self.item_service.clone(); @@ -342,7 +347,8 @@ impl AsyncItemService { tail_words, tail_lines, line_start, - line_end + line_end, + grep ) }) .await diff --git a/src/services/item_service.rs b/src/services/item_service.rs index b917bf6..7d74762 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -79,11 +79,12 @@ impl ItemService { tail_lines: Option, line_start: Option, line_end: Option, + grep: Option, ) -> Result<(Vec, String, bool), CoreError> { // Use streaming approach to handle all filtering options consistently let (mut reader, mime_type, is_binary) = self.get_item_content_info_streaming( conn, id, head_bytes, head_words, head_lines, - tail_bytes, tail_words, tail_lines, line_start, line_end + tail_bytes, tail_words, tail_lines, line_start, line_end, grep )?; // Read all the filtered content into a buffer @@ -105,6 +106,7 @@ impl ItemService { tail_lines: Option, line_start: Option, line_end: Option, + grep: Option, ) -> Result<(Box, String, bool), CoreError> { let item_with_meta = self.get_item(conn, id)?; let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; @@ -119,7 +121,12 @@ impl ItemService { let reader = self.compression_service.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?; // Apply content filtering - let filtered_reader: Box = if head_bytes.is_some() || head_words.is_some() || head_lines.is_some() { + let filtered_reader: Box = if let Some(pattern) = grep { + Box::new(GrepFilter::new( + reader, + pattern, + )?) + } else if head_bytes.is_some() || head_words.is_some() || head_lines.is_some() { Box::new(HeadFilter::new( reader, head_bytes, @@ -789,6 +796,119 @@ impl Read for TailFilter { } } +// Grep filter implementation +struct GrepFilter { + inner: R, + regex: regex::Regex, + buffer: Vec, + buffer_pos: usize, + is_eof: bool, + // For line-based matching + current_line: Vec, + matched_lines: Vec>, + output_pos: usize, +} + +impl GrepFilter { + fn new(inner: R, pattern: String) -> std::io::Result { + let regex = regex::Regex::new(&pattern) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e.to_string()))?; + + Ok(Self { + inner, + regex, + buffer: Vec::new(), + buffer_pos: 0, + is_eof: false, + current_line: Vec::new(), + matched_lines: Vec::new(), + output_pos: 0, + }) + } + + fn fill_buffer(&mut self) -> std::io::Result<()> { + if self.buffer_pos >= self.buffer.len() && !self.is_eof { + // Read more data + let mut temp_buf = [0; 8192]; + let n = self.inner.read(&mut temp_buf)?; + if n == 0 { + self.is_eof = true; + // Process any remaining line + if !self.current_line.is_empty() { + self.process_line(); + } + return Ok(()); + } + self.buffer.extend_from_slice(&temp_buf[..n]); + } + Ok(()) + } + + fn process_line(&mut self) { + if !self.current_line.is_empty() { + // Convert to string and check against regex + if let Ok(line_str) = std::str::from_utf8(&self.current_line) { + if self.regex.is_match(line_str) { + self.matched_lines.push(self.current_line.clone()); + } + } + self.current_line.clear(); + } + } +} + +impl Read for GrepFilter { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // First, process the input to find matching lines + while !self.is_eof || self.buffer_pos < self.buffer.len() { + self.fill_buffer()?; + + while self.buffer_pos < self.buffer.len() { + let byte = self.buffer[self.buffer_pos]; + self.buffer_pos += 1; + + self.current_line.push(byte); + + if byte == b'\n' { + self.process_line(); + } + } + } + + // Now output the matched lines + if self.output_pos >= self.matched_lines.iter().map(|l| l.len()).sum() { + return Ok(0); + } + + let mut bytes_written = 0; + for line in &self.matched_lines { + if bytes_written >= buf.len() { + break; + } + + let line_len = line.len(); + if self.output_pos < line_len { + let bytes_to_copy = std::cmp::min(buf.len() - bytes_written, line_len - self.output_pos); + buf[bytes_written..bytes_written + bytes_to_copy] + .copy_from_slice(&line[self.output_pos..self.output_pos + bytes_to_copy]); + bytes_written += bytes_to_copy; + self.output_pos += bytes_to_copy; + } else { + self.output_pos -= line_len; + } + + // If we've moved to the next line, reset output_pos + if self.output_pos >= line_len { + self.output_pos = 0; + } else { + break; + } + } + + Ok(bytes_written) + } +} + // Line range filter implementation struct LineRangeFilter { inner: R,