diff --git a/src/services/item_service.rs b/src/services/item_service.rs index 6b89a61..1c211ca 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -421,495 +421,6 @@ impl ItemService { } -// Head filter implementation -struct HeadFilter { - inner: R, - bytes_remaining: Option, - words_remaining: Option, - lines_remaining: Option, - in_word: bool, -} - -impl HeadFilter { - fn new( - inner: R, - head_bytes: Option, - head_words: Option, - head_lines: Option, - ) -> Self { - Self { - inner, - bytes_remaining: head_bytes, - words_remaining: head_words, - lines_remaining: head_lines, - in_word: false, - } - } -} - -impl Read for HeadFilter { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - // Check if any limit has been reached - if self.bytes_remaining == Some(0) || self.words_remaining == Some(0) || self.lines_remaining == Some(0) { - return Ok(0); - } - - let n = self.inner.read(buf)?; - if n == 0 { - return Ok(0); - } - - let mut processed = 0; - let mut i = 0; - - while i < n { - // Check bytes limit - if let Some(remaining) = &mut self.bytes_remaining { - if *remaining == 0 { - break; - } - } - - let byte = buf[i]; - - // Check for newlines to count lines - if let Some(remaining) = &mut self.lines_remaining { - if *remaining > 0 && byte == b'\n' { - *remaining -= 1; - } - } - - // Check for words - if let Some(remaining) = &mut self.words_remaining { - let is_whitespace = byte.is_ascii_whitespace(); - if self.in_word && is_whitespace { - self.in_word = false; - if *remaining > 0 { - *remaining -= 1; - } - } else if !is_whitespace { - self.in_word = true; - } - } - - // Update bytes remaining - if let Some(remaining) = &mut self.bytes_remaining { - *remaining -= 1; - } - - processed += 1; - i += 1; - - // Check if any limits were hit - if self.bytes_remaining == Some(0) || self.words_remaining == Some(0) || self.lines_remaining == Some(0) { - break; - } - } - - Ok(processed) - } -} - -// Tail filter implementation using ringbuf crate -struct TailFilter { - inner: R, - ring_buffer: HeapRb, - tail_bytes: Option, - tail_words: Option, - tail_lines: Option, - is_eof: bool, - // Track the number of bytes to output and the starting position - output_start: usize, - output_remaining: usize, -} - -impl TailFilter { - fn new( - inner: R, - tail_bytes: Option, - tail_words: Option, - tail_lines: Option, - ) -> std::io::Result { - // Determine buffer size based on the largest tail parameter with some padding - let buffer_size = if let Some(bytes) = tail_bytes { - bytes - } else if let Some(lines) = tail_lines { - // Estimate 256 bytes per line with some padding - lines * 256 + 1024 - } else if let Some(words) = tail_words { - // Estimate 16 bytes per word with some padding - words * 16 + 1024 - } else { - 8192 - }; - - Ok(Self { - inner, - ring_buffer: HeapRb::new(buffer_size), - tail_bytes, - tail_words, - tail_lines, - is_eof: false, - output_start: 0, - output_remaining: 0, - }) - } - - fn process_tail(&mut self) { - let buffer_len = self.ring_buffer.len(); - - if let Some(bytes) = self.tail_bytes { - self.output_remaining = std::cmp::min(bytes, buffer_len); - // The start position in the ring buffer - self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize) - .rem_euclid(self.ring_buffer.capacity() as isize) as usize; - } else if let Some(lines) = self.tail_lines { - // Count lines from the end by iterating through the ring buffer - let mut lines_found = 0; - let mut bytes_to_keep = 0; - - // Iterate backwards through the ring buffer - let mut pos = (self.ring_buffer.write_index() as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize; - for _ in 0..buffer_len { - let byte = self.ring_buffer[pos]; - bytes_to_keep += 1; - - if byte == b'\n' { - lines_found += 1; - if lines_found == lines { - break; - } - } - - pos = (pos as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize; - } - - self.output_remaining = bytes_to_keep; - self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize) - .rem_euclid(self.ring_buffer.capacity() as isize) as usize; - } else if let Some(words) = self.tail_words { - // Count words from the end - let mut words_found = 0; - let mut bytes_to_keep = 0; - let mut in_word = false; - - // Iterate backwards through the ring buffer - let mut pos = (self.ring_buffer.write_index() as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize; - for _ in 0..buffer_len { - let byte = self.ring_buffer[pos]; - bytes_to_keep += 1; - - let is_whitespace = byte.is_ascii_whitespace(); - if !in_word && !is_whitespace { - in_word = true; - words_found += 1; - if words_found == words { - break; - } - } else if is_whitespace { - in_word = false; - } - - pos = (pos as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize; - } - - self.output_remaining = bytes_to_keep; - self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize) - .rem_euclid(self.ring_buffer.capacity() as isize) as usize; - } else { - self.output_remaining = buffer_len; - self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize) - .rem_euclid(self.ring_buffer.capacity() as isize) as usize; - } - } - - fn fill_buffer(&mut self) -> std::io::Result<()> { - if self.is_eof { - return Ok(()); - } - - let mut temp_buf = vec![0; 8192]; - loop { - let n = self.inner.read(&mut temp_buf)?; - if n == 0 { - self.is_eof = true; - break; - } - - // Push bytes into the ring buffer, overwriting old data if necessary - for &byte in &temp_buf[..n] { - let _ = self.ring_buffer.push(byte); - } - } - - // Process the tail parameters to determine what to output - self.process_tail(); - - Ok(()) - } -} - -impl Read for TailFilter { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - // Fill the buffer if we haven't processed the input yet - if !self.is_eof { - self.fill_buffer()?; - } - - if self.output_remaining == 0 { - return Ok(0); - } - - // Calculate how many bytes to copy - let bytes_to_copy = std::cmp::min(buf.len(), self.output_remaining); - - // Copy directly from the ring buffer - for i in 0..bytes_to_copy { - let index = (self.output_start + i) % self.ring_buffer.capacity(); - buf[i] = self.ring_buffer[index]; - } - - // Update the remaining bytes - self.output_start = (self.output_start + bytes_to_copy) % self.ring_buffer.capacity(); - self.output_remaining -= bytes_to_copy; - - Ok(bytes_to_copy) - } -} - -// Grep filter implementation -struct GrepFilter { - inner: R, - regex: regex::Regex, - buffer: Vec, - buffer_pos: usize, - is_eof: bool, - // For line-based matching - current_line: Vec, - matched_lines: Vec>, - output_pos: usize, -} - -impl GrepFilter { - fn new(inner: R, pattern: String) -> std::io::Result { - let regex = regex::Regex::new(&pattern) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e.to_string()))?; - - Ok(Self { - inner, - regex, - buffer: Vec::new(), - buffer_pos: 0, - is_eof: false, - current_line: Vec::new(), - matched_lines: Vec::new(), - output_pos: 0, - }) - } - - fn fill_buffer(&mut self) -> std::io::Result<()> { - if self.buffer_pos >= self.buffer.len() && !self.is_eof { - // Read more data - let mut temp_buf = [0; 8192]; - let n = self.inner.read(&mut temp_buf)?; - if n == 0 { - self.is_eof = true; - // Process any remaining line - if !self.current_line.is_empty() { - self.process_line(); - } - return Ok(()); - } - self.buffer.extend_from_slice(&temp_buf[..n]); - } - Ok(()) - } - - fn process_line(&mut self) { - if !self.current_line.is_empty() { - // Convert to string and check against regex - if let Ok(line_str) = std::str::from_utf8(&self.current_line) { - if self.regex.is_match(line_str) { - self.matched_lines.push(self.current_line.clone()); - } - } - self.current_line.clear(); - } - } -} - -impl Read for GrepFilter { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - // First, process the input to find matching lines - while !self.is_eof || self.buffer_pos < self.buffer.len() { - self.fill_buffer()?; - - while self.buffer_pos < self.buffer.len() { - let byte = self.buffer[self.buffer_pos]; - self.buffer_pos += 1; - - self.current_line.push(byte); - - if byte == b'\n' { - self.process_line(); - } - } - } - - // Now output the matched lines - if self.output_pos >= self.matched_lines.iter().map(|l| l.len()).sum() { - return Ok(0); - } - - let mut bytes_written = 0; - for line in &self.matched_lines { - if bytes_written >= buf.len() { - break; - } - - let line_len = line.len(); - if self.output_pos < line_len { - let bytes_to_copy = std::cmp::min(buf.len() - bytes_written, line_len - self.output_pos); - buf[bytes_written..bytes_written + bytes_to_copy] - .copy_from_slice(&line[self.output_pos..self.output_pos + bytes_to_copy]); - bytes_written += bytes_to_copy; - self.output_pos += bytes_to_copy; - } else { - self.output_pos -= line_len; - } - - // If we've moved to the next line, reset output_pos - if self.output_pos >= line_len { - self.output_pos = 0; - } else { - break; - } - } - - Ok(bytes_written) - } -} - -// Filtering reader that applies filter plugins -struct FilteringReader { - inner: R, - filter_chain: Option, - buffer: Vec, - buffer_pos: usize, -} - -impl FilteringReader { - fn new(inner: R, filter_chain: Option) -> Self { - Self { - inner, - filter_chain, - buffer: Vec::new(), - buffer_pos: 0, - } - } -} - -impl Read for FilteringReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - if self.buffer_pos >= self.buffer.len() { - // Read more data from the inner reader - let mut temp_buf = vec![0; 8192]; - let n = self.inner.read(&mut temp_buf)?; - if n == 0 { - // End of input, finish filtering - let filter_service = crate::services::filter_service::FilterService::new(); - let remaining = filter_service.finish_processing(&mut self.filter_chain)?; - self.buffer = remaining; - self.buffer_pos = 0; - if self.buffer.is_empty() { - return Ok(0); - } - } else { - // Process the chunk - let filter_service = crate::services::filter_service::FilterService::new(); - let processed = filter_service.process_data(&mut self.filter_chain, &temp_buf[..n])?; - self.buffer = processed; - self.buffer_pos = 0; - } - } - - // Copy from buffer to output - let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos); - buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]); - self.buffer_pos += bytes_to_copy; - - Ok(bytes_to_copy) - } -} - -// Line range filter implementation -struct LineRangeFilter { - inner: R, - line_start: Option, - line_end: Option, - current_line: usize, - in_range: bool, - buffer: Vec, - buffer_pos: usize, - is_eof: bool, -} - -impl LineRangeFilter { - fn new( - inner: R, - line_start: Option, - line_end: Option, - ) -> Self { - Self { - inner, - line_start: line_start.or(Some(1)), - line_end, - current_line: 1, - in_range: false, - buffer: Vec::new(), - buffer_pos: 0, - is_eof: false, - } - } - - fn fill_buffer(&mut self) -> std::io::Result<()> { - if self.buffer_pos >= self.buffer.len() && !self.is_eof { - // Read more data - let mut temp_buf = [0; 8192]; - let n = self.inner.read(&mut temp_buf)?; - if n == 0 { - self.is_eof = true; - return Ok(()); - } - self.buffer.extend_from_slice(&temp_buf[..n]); - } - Ok(()) - } -} - -impl Read for LineRangeFilter { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - if self.is_eof && self.buffer_pos >= self.buffer.len() { - return Ok(0); - } - - let start_line = self.line_start.unwrap_or(1); - let end_line = self.line_end.unwrap_or(usize::MAX); - - let mut bytes_written = 0; - - while bytes_written < buf.len() { - // Fill the buffer if needed - self.fill_buffer()?; - - if self.buffer_pos >= self.buffer.len() { - break; - } - - // Process bytes until we find the start of the range - if !self.in_range { - while self.buffer_pos < self.buffer.len() && self.current_line < start_line { - if self.buffer[self.buffer_pos] == b'\n' { self.current_line += 1; if self.current_line >= start_line { self.in_range = true;