refactor: remove filter implementations from item_service

Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) <aider@aider.chat>
This commit is contained in:
Andrew Phillips
2025-08-28 20:36:18 -03:00
parent 66696af67e
commit 272f75349c

View File

@@ -421,495 +421,6 @@ impl ItemService {
} }
// Head filter implementation
struct HeadFilter<R: Read + Send> {
inner: R,
bytes_remaining: Option<usize>,
words_remaining: Option<usize>,
lines_remaining: Option<usize>,
in_word: bool,
}
impl<R: Read + Send> HeadFilter<R> {
fn new(
inner: R,
head_bytes: Option<usize>,
head_words: Option<usize>,
head_lines: Option<usize>,
) -> Self {
Self {
inner,
bytes_remaining: head_bytes,
words_remaining: head_words,
lines_remaining: head_lines,
in_word: false,
}
}
}
impl<R: Read + Send> Read for HeadFilter<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// Check if any limit has been reached
if self.bytes_remaining == Some(0) || self.words_remaining == Some(0) || self.lines_remaining == Some(0) {
return Ok(0);
}
let n = self.inner.read(buf)?;
if n == 0 {
return Ok(0);
}
let mut processed = 0;
let mut i = 0;
while i < n {
// Check bytes limit
if let Some(remaining) = &mut self.bytes_remaining {
if *remaining == 0 {
break;
}
}
let byte = buf[i];
// Check for newlines to count lines
if let Some(remaining) = &mut self.lines_remaining {
if *remaining > 0 && byte == b'\n' {
*remaining -= 1;
}
}
// Check for words
if let Some(remaining) = &mut self.words_remaining {
let is_whitespace = byte.is_ascii_whitespace();
if self.in_word && is_whitespace {
self.in_word = false;
if *remaining > 0 {
*remaining -= 1;
}
} else if !is_whitespace {
self.in_word = true;
}
}
// Update bytes remaining
if let Some(remaining) = &mut self.bytes_remaining {
*remaining -= 1;
}
processed += 1;
i += 1;
// Check if any limits were hit
if self.bytes_remaining == Some(0) || self.words_remaining == Some(0) || self.lines_remaining == Some(0) {
break;
}
}
Ok(processed)
}
}
// Tail filter implementation using ringbuf crate
struct TailFilter<R: Read + Send> {
inner: R,
ring_buffer: HeapRb<u8>,
tail_bytes: Option<usize>,
tail_words: Option<usize>,
tail_lines: Option<usize>,
is_eof: bool,
// Track the number of bytes to output and the starting position
output_start: usize,
output_remaining: usize,
}
impl<R: Read + Send> TailFilter<R> {
fn new(
inner: R,
tail_bytes: Option<usize>,
tail_words: Option<usize>,
tail_lines: Option<usize>,
) -> std::io::Result<Self> {
// Determine buffer size based on the largest tail parameter with some padding
let buffer_size = if let Some(bytes) = tail_bytes {
bytes
} else if let Some(lines) = tail_lines {
// Estimate 256 bytes per line with some padding
lines * 256 + 1024
} else if let Some(words) = tail_words {
// Estimate 16 bytes per word with some padding
words * 16 + 1024
} else {
8192
};
Ok(Self {
inner,
ring_buffer: HeapRb::new(buffer_size),
tail_bytes,
tail_words,
tail_lines,
is_eof: false,
output_start: 0,
output_remaining: 0,
})
}
fn process_tail(&mut self) {
let buffer_len = self.ring_buffer.len();
if let Some(bytes) = self.tail_bytes {
self.output_remaining = std::cmp::min(bytes, buffer_len);
// The start position in the ring buffer
self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize)
.rem_euclid(self.ring_buffer.capacity() as isize) as usize;
} else if let Some(lines) = self.tail_lines {
// Count lines from the end by iterating through the ring buffer
let mut lines_found = 0;
let mut bytes_to_keep = 0;
// Iterate backwards through the ring buffer
let mut pos = (self.ring_buffer.write_index() as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize;
for _ in 0..buffer_len {
let byte = self.ring_buffer[pos];
bytes_to_keep += 1;
if byte == b'\n' {
lines_found += 1;
if lines_found == lines {
break;
}
}
pos = (pos as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize;
}
self.output_remaining = bytes_to_keep;
self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize)
.rem_euclid(self.ring_buffer.capacity() as isize) as usize;
} else if let Some(words) = self.tail_words {
// Count words from the end
let mut words_found = 0;
let mut bytes_to_keep = 0;
let mut in_word = false;
// Iterate backwards through the ring buffer
let mut pos = (self.ring_buffer.write_index() as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize;
for _ in 0..buffer_len {
let byte = self.ring_buffer[pos];
bytes_to_keep += 1;
let is_whitespace = byte.is_ascii_whitespace();
if !in_word && !is_whitespace {
in_word = true;
words_found += 1;
if words_found == words {
break;
}
} else if is_whitespace {
in_word = false;
}
pos = (pos as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize;
}
self.output_remaining = bytes_to_keep;
self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize)
.rem_euclid(self.ring_buffer.capacity() as isize) as usize;
} else {
self.output_remaining = buffer_len;
self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize)
.rem_euclid(self.ring_buffer.capacity() as isize) as usize;
}
}
fn fill_buffer(&mut self) -> std::io::Result<()> {
if self.is_eof {
return Ok(());
}
let mut temp_buf = vec![0; 8192];
loop {
let n = self.inner.read(&mut temp_buf)?;
if n == 0 {
self.is_eof = true;
break;
}
// Push bytes into the ring buffer, overwriting old data if necessary
for &byte in &temp_buf[..n] {
let _ = self.ring_buffer.push(byte);
}
}
// Process the tail parameters to determine what to output
self.process_tail();
Ok(())
}
}
impl<R: Read + Send> Read for TailFilter<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// Fill the buffer if we haven't processed the input yet
if !self.is_eof {
self.fill_buffer()?;
}
if self.output_remaining == 0 {
return Ok(0);
}
// Calculate how many bytes to copy
let bytes_to_copy = std::cmp::min(buf.len(), self.output_remaining);
// Copy directly from the ring buffer
for i in 0..bytes_to_copy {
let index = (self.output_start + i) % self.ring_buffer.capacity();
buf[i] = self.ring_buffer[index];
}
// Update the remaining bytes
self.output_start = (self.output_start + bytes_to_copy) % self.ring_buffer.capacity();
self.output_remaining -= bytes_to_copy;
Ok(bytes_to_copy)
}
}
// Grep filter implementation
struct GrepFilter<R: Read + Send> {
inner: R,
regex: regex::Regex,
buffer: Vec<u8>,
buffer_pos: usize,
is_eof: bool,
// For line-based matching
current_line: Vec<u8>,
matched_lines: Vec<Vec<u8>>,
output_pos: usize,
}
impl<R: Read + Send> GrepFilter<R> {
fn new(inner: R, pattern: String) -> std::io::Result<Self> {
let regex = regex::Regex::new(&pattern)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e.to_string()))?;
Ok(Self {
inner,
regex,
buffer: Vec::new(),
buffer_pos: 0,
is_eof: false,
current_line: Vec::new(),
matched_lines: Vec::new(),
output_pos: 0,
})
}
fn fill_buffer(&mut self) -> std::io::Result<()> {
if self.buffer_pos >= self.buffer.len() && !self.is_eof {
// Read more data
let mut temp_buf = [0; 8192];
let n = self.inner.read(&mut temp_buf)?;
if n == 0 {
self.is_eof = true;
// Process any remaining line
if !self.current_line.is_empty() {
self.process_line();
}
return Ok(());
}
self.buffer.extend_from_slice(&temp_buf[..n]);
}
Ok(())
}
fn process_line(&mut self) {
if !self.current_line.is_empty() {
// Convert to string and check against regex
if let Ok(line_str) = std::str::from_utf8(&self.current_line) {
if self.regex.is_match(line_str) {
self.matched_lines.push(self.current_line.clone());
}
}
self.current_line.clear();
}
}
}
impl<R: Read + Send> Read for GrepFilter<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// First, process the input to find matching lines
while !self.is_eof || self.buffer_pos < self.buffer.len() {
self.fill_buffer()?;
while self.buffer_pos < self.buffer.len() {
let byte = self.buffer[self.buffer_pos];
self.buffer_pos += 1;
self.current_line.push(byte);
if byte == b'\n' {
self.process_line();
}
}
}
// Now output the matched lines
if self.output_pos >= self.matched_lines.iter().map(|l| l.len()).sum() {
return Ok(0);
}
let mut bytes_written = 0;
for line in &self.matched_lines {
if bytes_written >= buf.len() {
break;
}
let line_len = line.len();
if self.output_pos < line_len {
let bytes_to_copy = std::cmp::min(buf.len() - bytes_written, line_len - self.output_pos);
buf[bytes_written..bytes_written + bytes_to_copy]
.copy_from_slice(&line[self.output_pos..self.output_pos + bytes_to_copy]);
bytes_written += bytes_to_copy;
self.output_pos += bytes_to_copy;
} else {
self.output_pos -= line_len;
}
// If we've moved to the next line, reset output_pos
if self.output_pos >= line_len {
self.output_pos = 0;
} else {
break;
}
}
Ok(bytes_written)
}
}
// Filtering reader that applies filter plugins
struct FilteringReader<R: Read + Send> {
inner: R,
filter_chain: Option<FilterChain>,
buffer: Vec<u8>,
buffer_pos: usize,
}
impl<R: Read + Send> FilteringReader<R> {
fn new(inner: R, filter_chain: Option<FilterChain>) -> Self {
Self {
inner,
filter_chain,
buffer: Vec::new(),
buffer_pos: 0,
}
}
}
impl<R: Read + Send> Read for FilteringReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.buffer_pos >= self.buffer.len() {
// Read more data from the inner reader
let mut temp_buf = vec![0; 8192];
let n = self.inner.read(&mut temp_buf)?;
if n == 0 {
// End of input, finish filtering
let filter_service = crate::services::filter_service::FilterService::new();
let remaining = filter_service.finish_processing(&mut self.filter_chain)?;
self.buffer = remaining;
self.buffer_pos = 0;
if self.buffer.is_empty() {
return Ok(0);
}
} else {
// Process the chunk
let filter_service = crate::services::filter_service::FilterService::new();
let processed = filter_service.process_data(&mut self.filter_chain, &temp_buf[..n])?;
self.buffer = processed;
self.buffer_pos = 0;
}
}
// Copy from buffer to output
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]);
self.buffer_pos += bytes_to_copy;
Ok(bytes_to_copy)
}
}
// Line range filter implementation
struct LineRangeFilter<R: Read + Send> {
inner: R,
line_start: Option<usize>,
line_end: Option<usize>,
current_line: usize,
in_range: bool,
buffer: Vec<u8>,
buffer_pos: usize,
is_eof: bool,
}
impl<R: Read + Send> LineRangeFilter<R> {
fn new(
inner: R,
line_start: Option<usize>,
line_end: Option<usize>,
) -> Self {
Self {
inner,
line_start: line_start.or(Some(1)),
line_end,
current_line: 1,
in_range: false,
buffer: Vec::new(),
buffer_pos: 0,
is_eof: false,
}
}
fn fill_buffer(&mut self) -> std::io::Result<()> {
if self.buffer_pos >= self.buffer.len() && !self.is_eof {
// Read more data
let mut temp_buf = [0; 8192];
let n = self.inner.read(&mut temp_buf)?;
if n == 0 {
self.is_eof = true;
return Ok(());
}
self.buffer.extend_from_slice(&temp_buf[..n]);
}
Ok(())
}
}
impl<R: Read + Send> Read for LineRangeFilter<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.is_eof && self.buffer_pos >= self.buffer.len() {
return Ok(0);
}
let start_line = self.line_start.unwrap_or(1);
let end_line = self.line_end.unwrap_or(usize::MAX);
let mut bytes_written = 0;
while bytes_written < buf.len() {
// Fill the buffer if needed
self.fill_buffer()?;
if self.buffer_pos >= self.buffer.len() {
break;
}
// Process bytes until we find the start of the range
if !self.in_range {
while self.buffer_pos < self.buffer.len() && self.current_line < start_line {
if self.buffer[self.buffer_pos] == b'\n' {
self.current_line += 1; self.current_line += 1;
if self.current_line >= start_line { if self.current_line >= start_line {
self.in_range = true; self.in_range = true;