feat: implement tail filter using ringbuf crate

Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) <aider@aider.chat>
This commit is contained in:
Andrew Phillips
2025-08-28 20:03:50 -03:00
parent e99d7ad0ea
commit dc2bd8dcdf
2 changed files with 118 additions and 104 deletions

View File

@@ -40,6 +40,7 @@ once_cell = "1.19.0"
prettytable-rs = "0.10.0" prettytable-rs = "0.10.0"
pwhash = "1.0.0" pwhash = "1.0.0"
regex = "1.9.5" regex = "1.9.5"
ringbuf = "0.3"
rmcp = { version = "0.2.0", features = ["server"] } rmcp = { version = "0.2.0", features = ["server"] }
rusqlite = { version = "0.37.0", features = ["bundled", "array", "chrono"] } rusqlite = { version = "0.37.0", features = ["bundled", "array", "chrono"] }
rusqlite_migration = "2.3.0" rusqlite_migration = "2.3.0"

View File

@@ -9,6 +9,7 @@ use crate::compression_engine::{get_compression_engine, CompressionType};
use crate::modes::common::settings_compression_type; use crate::modes::common::settings_compression_type;
use clap::Command; use clap::Command;
use log::debug; use log::debug;
use ringbuf::HeapRb;
use rusqlite::Connection; use rusqlite::Connection;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs; use std::fs;
@@ -636,18 +637,17 @@ impl<R: Read + Send> Read for HeadFilter<R> {
} }
} }
// Tail filter implementation using a ring buffer // Tail filter implementation using ringbuf crate
struct TailFilter<R: Read + Send> { struct TailFilter<R: Read + Send> {
inner: R, inner: R,
ring_buffer: Vec<u8>, ring_buffer: HeapRb<u8>,
ring_buffer_pos: usize,
tail_bytes: Option<usize>, tail_bytes: Option<usize>,
tail_words: Option<usize>, tail_words: Option<usize>,
tail_lines: Option<usize>, tail_lines: Option<usize>,
is_eof: bool, is_eof: bool,
bytes_read: usize, // Track the number of bytes to output and the starting position
output_pos: usize, output_start: usize,
output_len: usize, output_remaining: usize,
} }
impl<R: Read + Send> TailFilter<R> { impl<R: Read + Send> TailFilter<R> {
@@ -657,135 +657,148 @@ impl<R: Read + Send> TailFilter<R> {
tail_words: Option<usize>, tail_words: Option<usize>,
tail_lines: Option<usize>, tail_lines: Option<usize>,
) -> std::io::Result<Self> { ) -> std::io::Result<Self> {
// Determine buffer size based on the largest tail parameter // Determine buffer size based on the largest tail parameter with some padding
let buffer_size = if let Some(bytes) = tail_bytes { let buffer_size = if let Some(bytes) = tail_bytes {
bytes bytes
} else if let Some(lines) = tail_lines { } else if let Some(lines) = tail_lines {
// Estimate 256 bytes per line // Estimate 256 bytes per line with some padding
lines * 256 lines * 256 + 1024
} else if let Some(words) = tail_words { } else if let Some(words) = tail_words {
// Estimate 16 bytes per word // Estimate 16 bytes per word with some padding
words * 16 words * 16 + 1024
} else { } else {
8192 8192
}; };
Ok(Self { Ok(Self {
inner, inner,
ring_buffer: vec![0; buffer_size], ring_buffer: HeapRb::new(buffer_size),
ring_buffer_pos: 0,
tail_bytes, tail_bytes,
tail_words, tail_words,
tail_lines, tail_lines,
is_eof: false, is_eof: false,
bytes_read: 0, output_start: 0,
output_pos: 0, output_remaining: 0,
output_len: 0,
}) })
} }
fn process_tail(&mut self) { fn process_tail(&mut self) {
if self.is_eof { // Convert the ring buffer to a Vec for easier processing
// Process the ring buffer to extract the tail based on parameters let buffer_contents: Vec<u8> = self.ring_buffer.iter().copied().collect();
if let Some(bytes) = self.tail_bytes {
self.output_len = std::cmp::min(bytes, self.bytes_read); if let Some(bytes) = self.tail_bytes {
self.output_pos = if self.bytes_read > self.ring_buffer.len() { self.output_start = if buffer_contents.len() > bytes {
// We've wrapped around the ring buffer buffer_contents.len() - bytes
self.ring_buffer_pos
} else {
0
};
} else if let Some(lines) = self.tail_lines {
// Count lines from the end
let mut line_count = 0;
let mut pos = (self.ring_buffer_pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize;
let mut i = 0;
while i < self.bytes_read.min(self.ring_buffer.len()) {
if self.ring_buffer[pos] == b'\n' {
line_count += 1;
if line_count == lines {
break;
}
}
pos = (pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize;
i += 1;
}
self.output_len = i + 1;
self.output_pos = (pos as isize).rem_euclid(self.ring_buffer.len() as isize) as usize;
} else if let Some(words) = self.tail_words {
// Count words from the end
let mut word_count = 0;
let mut pos = (self.ring_buffer_pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize;
let mut i = 0;
let mut in_word = false;
while i < self.bytes_read.min(self.ring_buffer.len()) {
let byte = self.ring_buffer[pos];
let is_whitespace = byte.is_ascii_whitespace();
if !in_word && !is_whitespace {
in_word = true;
word_count += 1;
if word_count == words {
break;
}
} else if is_whitespace {
in_word = false;
}
pos = (pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize;
i += 1;
}
self.output_len = i + 1;
self.output_pos = (pos as isize).rem_euclid(self.ring_buffer.len() as isize) as usize;
} else { } else {
self.output_len = self.bytes_read.min(self.ring_buffer.len()); 0
self.output_pos = if self.bytes_read > self.ring_buffer.len() { };
self.ring_buffer_pos self.output_remaining = buffer_contents.len() - self.output_start;
} else { } else if let Some(lines) = self.tail_lines {
0 // Count lines from the end
}; let mut lines_found = 0;
let mut start_index = buffer_contents.len();
for (i, &byte) in buffer_contents.iter().enumerate().rev() {
if byte == b'\n' {
lines_found += 1;
if lines_found == lines {
start_index = i + 1; // Start after the newline
break;
}
}
}
// If we didn't find enough newlines, start from the beginning
if lines_found < lines {
self.output_start = 0;
} else {
self.output_start = start_index;
}
self.output_remaining = buffer_contents.len() - self.output_start;
} else if let Some(words) = self.tail_words {
// Count words from the end
let mut words_found = 0;
let mut start_index = buffer_contents.len();
let mut in_word = false;
for (i, &byte) in buffer_contents.iter().enumerate().rev() {
let is_whitespace = byte.is_ascii_whitespace();
if !in_word && !is_whitespace {
in_word = true;
words_found += 1;
if words_found == words {
start_index = i;
break;
}
} else if is_whitespace {
in_word = false;
}
}
// If we didn't find enough words, start from the beginning
if words_found < words {
self.output_start = 0;
} else {
self.output_start = start_index;
}
self.output_remaining = buffer_contents.len() - self.output_start;
} else {
self.output_start = 0;
self.output_remaining = buffer_contents.len();
}
}
fn fill_buffer(&mut self) -> std::io::Result<()> {
if self.is_eof {
return Ok(());
}
let mut temp_buf = vec![0; 8192];
loop {
let n = self.inner.read(&mut temp_buf)?;
if n == 0 {
self.is_eof = true;
break;
}
// Push bytes into the ring buffer, overwriting old data if necessary
for &byte in &temp_buf[..n] {
let _ = self.ring_buffer.push(byte);
} }
} }
// Process the tail parameters to determine what to output
self.process_tail();
Ok(())
} }
} }
impl<R: Read + Send> Read for TailFilter<R> { impl<R: Read + Send> Read for TailFilter<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.is_eof { // Fill the buffer if we haven't processed the input yet
// We've processed the input and are now outputting the tail if !self.is_eof {
if self.output_pos >= self.output_len { self.fill_buffer()?;
return Ok(0);
}
let bytes_to_copy = std::cmp::min(buf.len(), self.output_len - self.output_pos);
for i in 0..bytes_to_copy {
let index = (self.output_pos + i) % self.ring_buffer.len();
buf[i] = self.ring_buffer[index];
}
self.output_pos += bytes_to_copy;
return Ok(bytes_to_copy);
} }
// Read data into the ring buffer if self.output_remaining == 0 {
let mut temp_buf = vec![0; 8192]; return Ok(0);
let n = self.inner.read(&mut temp_buf)?;
if n == 0 {
self.is_eof = true;
self.process_tail();
return self.read(buf);
} }
// Add new data to the ring buffer // Convert the ring buffer to a Vec for easier slicing
for &byte in &temp_buf[..n] { let buffer_contents: Vec<u8> = self.ring_buffer.iter().copied().collect();
self.ring_buffer[self.ring_buffer_pos] = byte;
self.ring_buffer_pos = (self.ring_buffer_pos + 1) % self.ring_buffer.len();
self.bytes_read += 1;
}
// While reading, return 0 bytes // Calculate how many bytes to copy
Ok(0) let bytes_to_copy = std::cmp::min(buf.len(), self.output_remaining);
let end_index = self.output_start + bytes_to_copy;
// Copy the slice to the output buffer
buf[..bytes_to_copy].copy_from_slice(&buffer_contents[self.output_start..end_index]);
// Update the remaining bytes
self.output_start += bytes_to_copy;
self.output_remaining -= bytes_to_copy;
Ok(bytes_to_copy)
} }
} }