From dc2bd8dcdf52b69ac7f059daf37905891c14c53a Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Thu, 28 Aug 2025 20:03:50 -0300 Subject: [PATCH] feat: implement tail filter using ringbuf crate Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) --- Cargo.toml | 1 + src/services/item_service.rs | 221 ++++++++++++++++++----------------- 2 files changed, 118 insertions(+), 104 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b549797..90f6cdf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ once_cell = "1.19.0" prettytable-rs = "0.10.0" pwhash = "1.0.0" regex = "1.9.5" +ringbuf = "0.3" rmcp = { version = "0.2.0", features = ["server"] } rusqlite = { version = "0.37.0", features = ["bundled", "array", "chrono"] } rusqlite_migration = "2.3.0" diff --git a/src/services/item_service.rs b/src/services/item_service.rs index 45c5424..3d91bc4 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -9,6 +9,7 @@ use crate::compression_engine::{get_compression_engine, CompressionType}; use crate::modes::common::settings_compression_type; use clap::Command; use log::debug; +use ringbuf::HeapRb; use rusqlite::Connection; use std::collections::HashMap; use std::fs; @@ -636,18 +637,17 @@ impl Read for HeadFilter { } } -// Tail filter implementation using a ring buffer +// Tail filter implementation using ringbuf crate struct TailFilter { inner: R, - ring_buffer: Vec, - ring_buffer_pos: usize, + ring_buffer: HeapRb, tail_bytes: Option, tail_words: Option, tail_lines: Option, is_eof: bool, - bytes_read: usize, - output_pos: usize, - output_len: usize, + // Track the number of bytes to output and the starting position + output_start: usize, + output_remaining: usize, } impl TailFilter { @@ -657,135 +657,148 @@ impl TailFilter { tail_words: Option, tail_lines: Option, ) -> std::io::Result { - // Determine buffer size based on the largest tail parameter + // Determine buffer size based on the largest tail parameter with some padding let buffer_size = if let Some(bytes) = tail_bytes { bytes } else if let Some(lines) = tail_lines { - // Estimate 256 bytes per line - lines * 256 + // Estimate 256 bytes per line with some padding + lines * 256 + 1024 } else if let Some(words) = tail_words { - // Estimate 16 bytes per word - words * 16 + // Estimate 16 bytes per word with some padding + words * 16 + 1024 } else { 8192 }; Ok(Self { inner, - ring_buffer: vec![0; buffer_size], - ring_buffer_pos: 0, + ring_buffer: HeapRb::new(buffer_size), tail_bytes, tail_words, tail_lines, is_eof: false, - bytes_read: 0, - output_pos: 0, - output_len: 0, + output_start: 0, + output_remaining: 0, }) } fn process_tail(&mut self) { - if self.is_eof { - // Process the ring buffer to extract the tail based on parameters - if let Some(bytes) = self.tail_bytes { - self.output_len = std::cmp::min(bytes, self.bytes_read); - self.output_pos = if self.bytes_read > self.ring_buffer.len() { - // We've wrapped around the ring buffer - self.ring_buffer_pos - } else { - 0 - }; - } else if let Some(lines) = self.tail_lines { - // Count lines from the end - let mut line_count = 0; - let mut pos = (self.ring_buffer_pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize; - let mut i = 0; - - while i < self.bytes_read.min(self.ring_buffer.len()) { - if self.ring_buffer[pos] == b'\n' { - line_count += 1; - if line_count == lines { - break; - } - } - pos = (pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize; - i += 1; - } - self.output_len = i + 1; - self.output_pos = (pos as isize).rem_euclid(self.ring_buffer.len() as isize) as usize; - } else if let Some(words) = self.tail_words { - // Count words from the end - let mut word_count = 0; - let mut pos = (self.ring_buffer_pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize; - let mut i = 0; - let mut in_word = false; - - while i < self.bytes_read.min(self.ring_buffer.len()) { - let byte = self.ring_buffer[pos]; - let is_whitespace = byte.is_ascii_whitespace(); - - if !in_word && !is_whitespace { - in_word = true; - word_count += 1; - if word_count == words { - break; - } - } else if is_whitespace { - in_word = false; - } - - pos = (pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize; - i += 1; - } - self.output_len = i + 1; - self.output_pos = (pos as isize).rem_euclid(self.ring_buffer.len() as isize) as usize; + // Convert the ring buffer to a Vec for easier processing + let buffer_contents: Vec = self.ring_buffer.iter().copied().collect(); + + if let Some(bytes) = self.tail_bytes { + self.output_start = if buffer_contents.len() > bytes { + buffer_contents.len() - bytes } else { - self.output_len = self.bytes_read.min(self.ring_buffer.len()); - self.output_pos = if self.bytes_read > self.ring_buffer.len() { - self.ring_buffer_pos - } else { - 0 - }; + 0 + }; + self.output_remaining = buffer_contents.len() - self.output_start; + } else if let Some(lines) = self.tail_lines { + // Count lines from the end + let mut lines_found = 0; + let mut start_index = buffer_contents.len(); + + for (i, &byte) in buffer_contents.iter().enumerate().rev() { + if byte == b'\n' { + lines_found += 1; + if lines_found == lines { + start_index = i + 1; // Start after the newline + break; + } + } + } + // If we didn't find enough newlines, start from the beginning + if lines_found < lines { + self.output_start = 0; + } else { + self.output_start = start_index; + } + self.output_remaining = buffer_contents.len() - self.output_start; + } else if let Some(words) = self.tail_words { + // Count words from the end + let mut words_found = 0; + let mut start_index = buffer_contents.len(); + let mut in_word = false; + + for (i, &byte) in buffer_contents.iter().enumerate().rev() { + let is_whitespace = byte.is_ascii_whitespace(); + + if !in_word && !is_whitespace { + in_word = true; + words_found += 1; + if words_found == words { + start_index = i; + break; + } + } else if is_whitespace { + in_word = false; + } + } + // If we didn't find enough words, start from the beginning + if words_found < words { + self.output_start = 0; + } else { + self.output_start = start_index; + } + self.output_remaining = buffer_contents.len() - self.output_start; + } else { + self.output_start = 0; + self.output_remaining = buffer_contents.len(); + } + } + + fn fill_buffer(&mut self) -> std::io::Result<()> { + if self.is_eof { + return Ok(()); + } + + let mut temp_buf = vec![0; 8192]; + loop { + let n = self.inner.read(&mut temp_buf)?; + if n == 0 { + self.is_eof = true; + break; + } + + // Push bytes into the ring buffer, overwriting old data if necessary + for &byte in &temp_buf[..n] { + let _ = self.ring_buffer.push(byte); } } + + // Process the tail parameters to determine what to output + self.process_tail(); + + Ok(()) } } impl Read for TailFilter { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - if self.is_eof { - // We've processed the input and are now outputting the tail - if self.output_pos >= self.output_len { - return Ok(0); - } - - let bytes_to_copy = std::cmp::min(buf.len(), self.output_len - self.output_pos); - for i in 0..bytes_to_copy { - let index = (self.output_pos + i) % self.ring_buffer.len(); - buf[i] = self.ring_buffer[index]; - } - self.output_pos += bytes_to_copy; - return Ok(bytes_to_copy); + // Fill the buffer if we haven't processed the input yet + if !self.is_eof { + self.fill_buffer()?; } - // Read data into the ring buffer - let mut temp_buf = vec![0; 8192]; - let n = self.inner.read(&mut temp_buf)?; - if n == 0 { - self.is_eof = true; - self.process_tail(); - return self.read(buf); + if self.output_remaining == 0 { + return Ok(0); } - // Add new data to the ring buffer - for &byte in &temp_buf[..n] { - self.ring_buffer[self.ring_buffer_pos] = byte; - self.ring_buffer_pos = (self.ring_buffer_pos + 1) % self.ring_buffer.len(); - self.bytes_read += 1; - } + // Convert the ring buffer to a Vec for easier slicing + let buffer_contents: Vec = self.ring_buffer.iter().copied().collect(); - // While reading, return 0 bytes - Ok(0) + // Calculate how many bytes to copy + let bytes_to_copy = std::cmp::min(buf.len(), self.output_remaining); + let end_index = self.output_start + bytes_to_copy; + + // Copy the slice to the output buffer + buf[..bytes_to_copy].copy_from_slice(&buffer_contents[self.output_start..end_index]); + + // Update the remaining bytes + self.output_start += bytes_to_copy; + self.output_remaining -= bytes_to_copy; + + Ok(bytes_to_copy) } }