From 9ef4ba2abe2690d50a6471c49213102a93c3d0a7 Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Thu, 28 Aug 2025 20:09:45 -0300 Subject: [PATCH] refactor: optimize tail filter to use ring buffer directly Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) --- src/services/async_item_service.rs | 47 ++++++++++++------ src/services/item_service.rs | 80 +++++++++++++++--------------- 2 files changed, 73 insertions(+), 54 deletions(-) diff --git a/src/services/async_item_service.rs b/src/services/async_item_service.rs index 406da12..e617389 100644 --- a/src/services/async_item_service.rs +++ b/src/services/async_item_service.rs @@ -180,6 +180,14 @@ impl AsyncItemService { allow_binary: bool, offset: u64, length: u64, + head_bytes: Option, + head_words: Option, + head_lines: Option, + tail_bytes: Option, + tail_words: Option, + tail_lines: Option, + line_start: Option, + line_end: Option, ) -> Result<(std::pin::Pin> + Send>>, String), CoreError> { let _db = self.db.clone(); let _item_service = self.item_service.clone(); @@ -196,7 +204,17 @@ impl AsyncItemService { text_val == "false" } else { // Get binary status using streaming approach - let (_, _, is_binary) = self.get_item_content_info_streaming(item_id).await?; + let (_, _, is_binary) = self.get_item_content_info_streaming( + item_id, + head_bytes, + head_words, + head_lines, + tail_bytes, + tail_words, + tail_lines, + line_start, + line_end + ).await?; is_binary }; @@ -205,31 +223,30 @@ impl AsyncItemService { } } - // Get a streaming reader for the content + // Get a streaming reader for the content with filtering applied let reader = { let db = self.db.clone(); let item_service = self.item_service.clone(); tokio::task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_with_meta = item_service.get_item(&conn, item_id)?; - let item_id_val = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; - - let mut item_path = item_service.get_data_path().clone(); - item_path.push(item_id_val.to_string()); - - let reader = item_service.get_compression_service().stream_item_content( - item_path, - &item_with_meta.item.compression - )?; - - Ok::<_, CoreError>(reader) + item_service.get_item_content_info_streaming( + &conn, + item_id, + head_bytes, + head_words, + head_lines, + tail_bytes, + tail_words, + tail_lines, + line_start, + line_end + ).map(|(reader, _, _)| reader) }) .await .unwrap()? }; // Convert the reader into an async stream manually - // Since ReaderStream requires AsyncRead, we'll create our own implementation use tokio_util::bytes::Bytes; // Create a channel to stream data between the blocking thread and async runtime diff --git a/src/services/item_service.rs b/src/services/item_service.rs index 3d91bc4..67068d5 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -683,67 +683,70 @@ impl TailFilter { } fn process_tail(&mut self) { - // Convert the ring buffer to a Vec for easier processing - let buffer_contents: Vec = self.ring_buffer.iter().copied().collect(); + let buffer_len = self.ring_buffer.len(); if let Some(bytes) = self.tail_bytes { - self.output_start = if buffer_contents.len() > bytes { - buffer_contents.len() - bytes - } else { - 0 - }; - self.output_remaining = buffer_contents.len() - self.output_start; + self.output_remaining = std::cmp::min(bytes, buffer_len); + // The start position in the ring buffer + self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize) + .rem_euclid(self.ring_buffer.capacity() as isize) as usize; } else if let Some(lines) = self.tail_lines { - // Count lines from the end + // Count lines from the end by iterating through the ring buffer let mut lines_found = 0; - let mut start_index = buffer_contents.len(); + let mut bytes_to_keep = 0; - for (i, &byte) in buffer_contents.iter().enumerate().rev() { + // Iterate backwards through the ring buffer + let mut pos = (self.ring_buffer.write_index() as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize; + for _ in 0..buffer_len { + let byte = self.ring_buffer[pos]; + bytes_to_keep += 1; + if byte == b'\n' { lines_found += 1; if lines_found == lines { - start_index = i + 1; // Start after the newline break; } } + + pos = (pos as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize; } - // If we didn't find enough newlines, start from the beginning - if lines_found < lines { - self.output_start = 0; - } else { - self.output_start = start_index; - } - self.output_remaining = buffer_contents.len() - self.output_start; + + self.output_remaining = bytes_to_keep; + self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize) + .rem_euclid(self.ring_buffer.capacity() as isize) as usize; } else if let Some(words) = self.tail_words { // Count words from the end let mut words_found = 0; - let mut start_index = buffer_contents.len(); + let mut bytes_to_keep = 0; let mut in_word = false; - for (i, &byte) in buffer_contents.iter().enumerate().rev() { - let is_whitespace = byte.is_ascii_whitespace(); + // Iterate backwards through the ring buffer + let mut pos = (self.ring_buffer.write_index() as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize; + for _ in 0..buffer_len { + let byte = self.ring_buffer[pos]; + bytes_to_keep += 1; + let is_whitespace = byte.is_ascii_whitespace(); if !in_word && !is_whitespace { in_word = true; words_found += 1; if words_found == words { - start_index = i; break; } } else if is_whitespace { in_word = false; } + + pos = (pos as isize - 1).rem_euclid(self.ring_buffer.capacity() as isize) as usize; } - // If we didn't find enough words, start from the beginning - if words_found < words { - self.output_start = 0; - } else { - self.output_start = start_index; - } - self.output_remaining = buffer_contents.len() - self.output_start; + + self.output_remaining = bytes_to_keep; + self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize) + .rem_euclid(self.ring_buffer.capacity() as isize) as usize; } else { - self.output_start = 0; - self.output_remaining = buffer_contents.len(); + self.output_remaining = buffer_len; + self.output_start = (self.ring_buffer.write_index() as isize - self.output_remaining as isize) + .rem_euclid(self.ring_buffer.capacity() as isize) as usize; } } @@ -784,18 +787,17 @@ impl Read for TailFilter { return Ok(0); } - // Convert the ring buffer to a Vec for easier slicing - let buffer_contents: Vec = self.ring_buffer.iter().copied().collect(); - // Calculate how many bytes to copy let bytes_to_copy = std::cmp::min(buf.len(), self.output_remaining); - let end_index = self.output_start + bytes_to_copy; - // Copy the slice to the output buffer - buf[..bytes_to_copy].copy_from_slice(&buffer_contents[self.output_start..end_index]); + // Copy directly from the ring buffer + for i in 0..bytes_to_copy { + let index = (self.output_start + i) % self.ring_buffer.capacity(); + buf[i] = self.ring_buffer[index]; + } // Update the remaining bytes - self.output_start += bytes_to_copy; + self.output_start = (self.output_start + bytes_to_copy) % self.ring_buffer.capacity(); self.output_remaining -= bytes_to_copy; Ok(bytes_to_copy)