refactor: optimize tail filter to use ring buffer directly

Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) <aider@aider.chat>
This commit is contained in:
Andrew Phillips
2025-08-28 20:09:45 -03:00
parent 6c29c9c4c5
commit 9ef4ba2abe
2 changed files with 73 additions and 54 deletions

View File

@@ -180,6 +180,14 @@ impl AsyncItemService {
allow_binary: bool, allow_binary: bool,
offset: u64, offset: u64,
length: u64, length: u64,
head_bytes: Option<usize>,
head_words: Option<usize>,
head_lines: Option<usize>,
tail_bytes: Option<usize>,
tail_words: Option<usize>,
tail_lines: Option<usize>,
line_start: Option<usize>,
line_end: Option<usize>,
) -> Result<(std::pin::Pin<Box<dyn tokio_stream::Stream<Item = Result<tokio_util::bytes::Bytes, std::io::Error>> + Send>>, String), CoreError> { ) -> Result<(std::pin::Pin<Box<dyn tokio_stream::Stream<Item = Result<tokio_util::bytes::Bytes, std::io::Error>> + Send>>, String), CoreError> {
let _db = self.db.clone(); let _db = self.db.clone();
let _item_service = self.item_service.clone(); let _item_service = self.item_service.clone();
@@ -196,7 +204,17 @@ impl AsyncItemService {
text_val == "false" text_val == "false"
} else { } else {
// Get binary status using streaming approach // Get binary status using streaming approach
let (_, _, is_binary) = self.get_item_content_info_streaming(item_id).await?; let (_, _, is_binary) = self.get_item_content_info_streaming(
item_id,
head_bytes,
head_words,
head_lines,
tail_bytes,
tail_words,
tail_lines,
line_start,
line_end
).await?;
is_binary is_binary
}; };
@@ -205,31 +223,30 @@ impl AsyncItemService {
} }
} }
// Get a streaming reader for the content // Get a streaming reader for the content with filtering applied
let reader = { let reader = {
let db = self.db.clone(); let db = self.db.clone();
let item_service = self.item_service.clone(); let item_service = self.item_service.clone();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock(); let conn = db.blocking_lock();
let item_with_meta = item_service.get_item(&conn, item_id)?; item_service.get_item_content_info_streaming(
let item_id_val = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; &conn,
item_id,
let mut item_path = item_service.get_data_path().clone(); head_bytes,
item_path.push(item_id_val.to_string()); head_words,
head_lines,
let reader = item_service.get_compression_service().stream_item_content( tail_bytes,
item_path, tail_words,
&item_with_meta.item.compression tail_lines,
)?; line_start,
line_end
Ok::<_, CoreError>(reader) ).map(|(reader, _, _)| reader)
}) })
.await .await
.unwrap()? .unwrap()?
}; };
// Convert the reader into an async stream manually // Convert the reader into an async stream manually
// Since ReaderStream requires AsyncRead, we'll create our own implementation
use tokio_util::bytes::Bytes; use tokio_util::bytes::Bytes;
// Create a channel to stream data between the blocking thread and async runtime // Create a channel to stream data between the blocking thread and async runtime

View File

@@ -683,67 +683,70 @@ impl<R: Read + Send> TailFilter<R> {
} }
fn process_tail(&mut self) { fn process_tail(&mut self) {
// Convert the ring buffer to a Vec for easier processing let buffer_len = self.ring_buffer.len();
let buffer_contents: Vec<u8> = self.ring_buffer.iter().copied().collect();
if let Some(bytes) = self.tail_bytes { if let Some(bytes) = self.tail_bytes {
self.output_start = if buffer_contents.len() > bytes { self.output_remaining = std::cmp::min(bytes, buffer_len);
buffer_contents.len() - bytes // The start position in the ring buffer
} else { self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize)
0 .rem_euclid(self.ring_buffer.capacity() as isize) as usize;
};
self.output_remaining = buffer_contents.len() - self.output_start;
} else if let Some(lines) = self.tail_lines { } else if let Some(lines) = self.tail_lines {
// Count lines from the end // Count lines from the end by iterating through the ring buffer
let mut lines_found = 0; let mut lines_found = 0;
let mut start_index = buffer_contents.len(); let mut bytes_to_keep = 0;
// Iterate backwards through the ring buffer
let mut pos = (self.ring_buffer.write_index() as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize;
for _ in 0..buffer_len {
let byte = self.ring_buffer[pos];
bytes_to_keep += 1;
for (i, &byte) in buffer_contents.iter().enumerate().rev() {
if byte == b'\n' { if byte == b'\n' {
lines_found += 1; lines_found += 1;
if lines_found == lines { if lines_found == lines {
start_index = i + 1; // Start after the newline
break; break;
} }
} }
pos = (pos as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize;
} }
// If we didn't find enough newlines, start from the beginning
if lines_found < lines { self.output_remaining = bytes_to_keep;
self.output_start = 0; self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize)
} else { .rem_euclid(self.ring_buffer.capacity() as isize) as usize;
self.output_start = start_index;
}
self.output_remaining = buffer_contents.len() - self.output_start;
} else if let Some(words) = self.tail_words { } else if let Some(words) = self.tail_words {
// Count words from the end // Count words from the end
let mut words_found = 0; let mut words_found = 0;
let mut start_index = buffer_contents.len(); let mut bytes_to_keep = 0;
let mut in_word = false; let mut in_word = false;
for (i, &byte) in buffer_contents.iter().enumerate().rev() { // Iterate backwards through the ring buffer
let is_whitespace = byte.is_ascii_whitespace(); let mut pos = (self.ring_buffer.write_index() as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize;
for _ in 0..buffer_len {
let byte = self.ring_buffer[pos];
bytes_to_keep += 1;
let is_whitespace = byte.is_ascii_whitespace();
if !in_word && !is_whitespace { if !in_word && !is_whitespace {
in_word = true; in_word = true;
words_found += 1; words_found += 1;
if words_found == words { if words_found == words {
start_index = i;
break; break;
} }
} else if is_whitespace { } else if is_whitespace {
in_word = false; in_word = false;
} }
pos = (pos as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize;
} }
// If we didn't find enough words, start from the beginning
if words_found < words { self.output_remaining = bytes_to_keep;
self.output_start = 0; self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize)
.rem_euclid(self.ring_buffer.capacity() as isize) as usize;
} else { } else {
self.output_start = start_index; self.output_remaining = buffer_len;
} self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize)
self.output_remaining = buffer_contents.len() - self.output_start; .rem_euclid(self.ring_buffer.capacity() as isize) as usize;
} else {
self.output_start = 0;
self.output_remaining = buffer_contents.len();
} }
} }
@@ -784,18 +787,17 @@ impl<R: Read + Send> Read for TailFilter<R> {
return Ok(0); return Ok(0);
} }
// Convert the ring buffer to a Vec for easier slicing
let buffer_contents: Vec<u8> = self.ring_buffer.iter().copied().collect();
// Calculate how many bytes to copy // Calculate how many bytes to copy
let bytes_to_copy = std::cmp::min(buf.len(), self.output_remaining); let bytes_to_copy = std::cmp::min(buf.len(), self.output_remaining);
let end_index = self.output_start + bytes_to_copy;
// Copy the slice to the output buffer // Copy directly from the ring buffer
buf[..bytes_to_copy].copy_from_slice(&buffer_contents[self.output_start..end_index]); for i in 0..bytes_to_copy {
let index = (self.output_start + i) % self.ring_buffer.capacity();
buf[i] = self.ring_buffer[index];
}
// Update the remaining bytes // Update the remaining bytes
self.output_start += bytes_to_copy; self.output_start = (self.output_start + bytes_to_copy) % self.ring_buffer.capacity();
self.output_remaining -= bytes_to_copy; self.output_remaining -= bytes_to_copy;
Ok(bytes_to_copy) Ok(bytes_to_copy)