diff --git a/src/filter_plugin/head.rs b/src/filter_plugin/head.rs index e720fe9..c6b20f1 100644 --- a/src/filter_plugin/head.rs +++ b/src/filter_plugin/head.rs @@ -1,5 +1,5 @@ use super::FilterPlugin; -use std::io::Result; +use std::io::{Result, Read, Write, BufRead}; pub struct HeadBytesFilter { remaining: usize, @@ -14,73 +14,49 @@ impl HeadBytesFilter { } impl FilterPlugin for HeadBytesFilter { - fn process(&mut self, data: &[u8]) -> Result> { + fn filter(&mut self, reader: &mut R, writer: &mut W) -> Result<()> { if self.remaining == 0 { - return Ok(Vec::new()); + return Ok(()); } - let bytes_to_take = std::cmp::min(data.len(), self.remaining); - self.remaining -= bytes_to_take; - Ok(data[..bytes_to_take].to_vec()) - } - - fn finish(&mut self) -> Result> { - Ok(Vec::new()) + // Read only up to remaining bytes + let mut buffer = vec![0; self.remaining]; + let bytes_read = reader.read(&mut buffer)?; + if bytes_read > 0 { + writer.write_all(&buffer[..bytes_read])?; + self.remaining -= bytes_read; + } + Ok(()) } } pub struct HeadLinesFilter { remaining: usize, - buffer: Vec, } impl HeadLinesFilter { pub fn new(count: usize) -> Self { Self { remaining: count, - buffer: Vec::new(), } } } impl FilterPlugin for HeadLinesFilter { - fn process(&mut self, data: &[u8]) -> Result> { + fn filter(&mut self, reader: &mut R, writer: &mut W) -> Result<()> { if self.remaining == 0 { - return Ok(Vec::new()); + return Ok(()); } - let mut result = Vec::new(); - let mut start = 0; - - for (i, &byte) in data.iter().enumerate() { - if byte == b'\n' { - self.buffer.extend_from_slice(&data[start..=i]); - result.extend_from_slice(&self.buffer); - self.buffer.clear(); - start = i + 1; - - self.remaining -= 1; - if self.remaining == 0 { - break; - } + let buf_reader = std::io::BufReader::new(reader); + for line in buf_reader.lines() { + let line = line?; + writeln!(writer, "{}", line)?; + self.remaining -= 1; + if self.remaining == 0 { + break; } } - - // Add remaining data to buffer - if start < data.len() { - self.buffer.extend_from_slice(&data[start..]); - } - - Ok(result) - } - - fn finish(&mut self) -> Result> { - if self.remaining > 0 && !self.buffer.is_empty() { - let result = self.buffer.clone(); - self.buffer.clear(); - Ok(result) - } else { - Ok(Vec::new()) - } + Ok(()) } } diff --git a/src/filter_plugin/skip.rs b/src/filter_plugin/skip.rs index 9feb243..6d38bfe 100644 --- a/src/filter_plugin/skip.rs +++ b/src/filter_plugin/skip.rs @@ -1,5 +1,5 @@ use super::FilterPlugin; -use std::io::Result; +use std::io::{Result, Read, Write, BufRead}; pub struct SkipBytesFilter { remaining: usize, @@ -14,85 +14,43 @@ impl SkipBytesFilter { } impl FilterPlugin for SkipBytesFilter { - fn process(&mut self, data: &[u8]) -> Result> { - if self.remaining == 0 { - return Ok(data.to_vec()); + fn filter(&mut self, reader: &mut R, writer: &mut W) -> Result<()> { + // Skip the specified number of bytes + if self.remaining > 0 { + let mut buffer = vec![0; self.remaining]; + let bytes_read = reader.read(&mut buffer)?; + self.remaining -= bytes_read; } - if data.len() <= self.remaining { - self.remaining -= data.len(); - Ok(Vec::new()) - } else { - let result = data[self.remaining..].to_vec(); - self.remaining = 0; - Ok(result) - } - } - - fn finish(&mut self) -> Result> { - Ok(Vec::new()) + // Copy the remaining data + std::io::copy(reader, writer)?; + Ok(()) } } pub struct SkipLinesFilter { remaining: usize, - buffer: Vec, } impl SkipLinesFilter { pub fn new(count: usize) -> Self { Self { remaining: count, - buffer: Vec::new(), } } } impl FilterPlugin for SkipLinesFilter { - fn process(&mut self, data: &[u8]) -> Result> { - if self.remaining == 0 { - let mut result = self.buffer.clone(); - result.extend_from_slice(data); - self.buffer.clear(); - return Ok(result); - } - - let mut result = Vec::new(); - let mut start = 0; - - for (i, &byte) in data.iter().enumerate() { - if byte == b'\n' { - if self.remaining > 0 { - self.remaining -= 1; - start = i + 1; - } else { - self.buffer.extend_from_slice(&data[start..=i]); - result.extend_from_slice(&self.buffer); - self.buffer.clear(); - start = i + 1; - } - } - } - - // Add remaining data to buffer - if start < data.len() { - if self.remaining == 0 { - result.extend_from_slice(&data[start..]); + fn filter(&mut self, reader: &mut R, writer: &mut W) -> Result<()> { + let buf_reader = std::io::BufReader::new(reader); + for line in buf_reader.lines() { + let line = line?; + if self.remaining > 0 { + self.remaining -= 1; } else { - self.buffer.extend_from_slice(&data[start..]); + writeln!(writer, "{}", line)?; } } - - Ok(result) - } - - fn finish(&mut self) -> Result> { - if self.remaining == 0 { - let result = self.buffer.clone(); - self.buffer.clear(); - Ok(result) - } else { - Ok(Vec::new()) - } + Ok(()) } } diff --git a/src/filter_plugin/tail.rs b/src/filter_plugin/tail.rs index 8c3830b..0e5b6f2 100644 --- a/src/filter_plugin/tail.rs +++ b/src/filter_plugin/tail.rs @@ -1,95 +1,75 @@ use super::FilterPlugin; -use std::io::Result; -use ringbuf::{HeapRb, Rb}; +use std::io::{Result, Read, Write, BufRead}; +use std::collections::VecDeque; pub struct TailBytesFilter { - ring_buffer: HeapRb, - _count: usize, + buffer: VecDeque, + count: usize, } impl TailBytesFilter { pub fn new(count: usize) -> Self { Self { - ring_buffer: HeapRb::new(count), - _count: count, + buffer: VecDeque::with_capacity(count), + count, } } } impl FilterPlugin for TailBytesFilter { - fn process(&mut self, data: &[u8]) -> Result> { - for &byte in data { - let _ = self.ring_buffer.push(byte); + fn filter(&mut self, reader: &mut R, writer: &mut W) -> Result<()> { + let mut temp_buffer = vec![0; 8192]; + loop { + let bytes_read = reader.read(&mut temp_buffer)?; + if bytes_read == 0 { + break; + } + + // Add new data to the buffer + for &byte in &temp_buffer[..bytes_read] { + if self.buffer.len() == self.count { + self.buffer.pop_front(); + } + self.buffer.push_back(byte); + } } - Ok(Vec::new()) - } - - fn finish(&mut self) -> Result> { - // Collect all bytes from the ring buffer - let mut result = Vec::with_capacity(self.ring_buffer.len()); - for byte in self.ring_buffer.iter() { - result.push(*byte); - } - Ok(result) + + // Write the buffered data at the end + let result: Vec = self.buffer.iter().cloned().collect(); + writer.write_all(&result)?; + Ok(()) } } pub struct TailLinesFilter { - ring_buffer: HeapRb, + lines: VecDeque, count: usize, - lines_found: usize, } impl TailLinesFilter { pub fn new(count: usize) -> Self { Self { - ring_buffer: HeapRb::new(count * 256), // Estimate 256 bytes per line + lines: VecDeque::with_capacity(count), count, - lines_found: 0, } } } impl FilterPlugin for TailLinesFilter { - fn process(&mut self, data: &[u8]) -> Result> { - for &byte in data { - let _ = self.ring_buffer.push(byte); - if byte == b'\n' { - self.lines_found += 1; + fn filter(&mut self, reader: &mut R, writer: &mut W) -> Result<()> { + let buf_reader = std::io::BufReader::new(reader); + for line in buf_reader.lines() { + let line = line?; + if self.lines.len() == self.count { + self.lines.pop_front(); } - } - Ok(Vec::new()) - } - - fn finish(&mut self) -> Result> { - // For ring buffer, we can use the iter() method to get all elements - // Since it's a circular buffer, we need to handle the wrap-around - let mut result = Vec::with_capacity(self.ring_buffer.len()); - - // The ring buffer maintains elements in insertion order - for byte in self.ring_buffer.iter() { - result.push(*byte); + self.lines.push_back(line); } - // Now, we need to find the last 'count' lines - if self.count == 0 { - return Ok(Vec::new()); + // Write the buffered lines + for line in &self.lines { + writeln!(writer, "{}", line)?; } - - // Split into lines and take the last 'count' lines - let text = String::from_utf8_lossy(&result); - let lines: Vec<&str> = text.split('\n').collect(); - - // Take the last 'count' lines - let start_index = if lines.len() > self.count { - lines.len() - self.count - } else { - 0 - }; - - let selected_lines = &lines[start_index..]; - let result_text = selected_lines.join("\n"); - - Ok(result_text.into_bytes()) + Ok(()) } } diff --git a/src/filter_plugin/utils.rs b/src/filter_plugin/utils.rs index 221cb8c..15d80af 100644 --- a/src/filter_plugin/utils.rs +++ b/src/filter_plugin/utils.rs @@ -1,27 +1,4 @@ use std::io::Result; -use super::FilterPlugin; - -/// Helper trait for common filter operations -pub trait FilterUtils { - /// Process data through a filter, handling empty results - fn process_data(&mut self, data: &[u8]) -> Result>; - - /// Process data and check if we should continue processing - fn process_and_check_continue(&mut self, data: &[u8]) -> Result<(Vec, bool)>; -} - -impl FilterUtils for T { - fn process_data(&mut self, data: &[u8]) -> Result> { - let result = self.process(data)?; - Ok(result) - } - - fn process_and_check_continue(&mut self, data: &[u8]) -> Result<(Vec, bool)> { - let result = self.process(data)?; - let should_continue = !result.is_empty(); - Ok((result, should_continue)) - } -} /// Helper function to create a filter chain from a string pub fn create_filter_chain(filter_str: &str) -> Result> {