feat: implement streaming support for tail and line range filters
Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) <aider@aider.chat>
This commit is contained in:
@@ -646,6 +646,8 @@ struct TailFilter<R: Read + Send> {
|
|||||||
tail_lines: Option<usize>,
|
tail_lines: Option<usize>,
|
||||||
is_eof: bool,
|
is_eof: bool,
|
||||||
bytes_read: usize,
|
bytes_read: usize,
|
||||||
|
output_pos: usize,
|
||||||
|
output_len: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Send> TailFilter<R> {
|
impl<R: Read + Send> TailFilter<R> {
|
||||||
@@ -677,27 +679,102 @@ impl<R: Read + Send> TailFilter<R> {
|
|||||||
tail_lines,
|
tail_lines,
|
||||||
is_eof: false,
|
is_eof: false,
|
||||||
bytes_read: 0,
|
bytes_read: 0,
|
||||||
|
output_pos: 0,
|
||||||
|
output_len: 0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn process_tail(&mut self) {
|
||||||
|
if self.is_eof {
|
||||||
|
// Process the ring buffer to extract the tail based on parameters
|
||||||
|
if let Some(bytes) = self.tail_bytes {
|
||||||
|
self.output_len = std::cmp::min(bytes, self.bytes_read);
|
||||||
|
self.output_pos = if self.bytes_read > self.ring_buffer.len() {
|
||||||
|
// We've wrapped around the ring buffer
|
||||||
|
self.ring_buffer_pos
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
} else if let Some(lines) = self.tail_lines {
|
||||||
|
// Count lines from the end
|
||||||
|
let mut line_count = 0;
|
||||||
|
let mut pos = (self.ring_buffer_pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize;
|
||||||
|
let mut i = 0;
|
||||||
|
|
||||||
|
while i < self.bytes_read.min(self.ring_buffer.len()) {
|
||||||
|
if self.ring_buffer[pos] == b'\n' {
|
||||||
|
line_count += 1;
|
||||||
|
if line_count == lines {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pos = (pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize;
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
self.output_len = i + 1;
|
||||||
|
self.output_pos = (pos as isize).rem_euclid(self.ring_buffer.len() as isize) as usize;
|
||||||
|
} else if let Some(words) = self.tail_words {
|
||||||
|
// Count words from the end
|
||||||
|
let mut word_count = 0;
|
||||||
|
let mut pos = (self.ring_buffer_pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize;
|
||||||
|
let mut i = 0;
|
||||||
|
let mut in_word = false;
|
||||||
|
|
||||||
|
while i < self.bytes_read.min(self.ring_buffer.len()) {
|
||||||
|
let byte = self.ring_buffer[pos];
|
||||||
|
let is_whitespace = byte.is_ascii_whitespace();
|
||||||
|
|
||||||
|
if !in_word && !is_whitespace {
|
||||||
|
in_word = true;
|
||||||
|
word_count += 1;
|
||||||
|
if word_count == words {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else if is_whitespace {
|
||||||
|
in_word = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pos = (pos as isize - 1).rem_euclid(self.ring_buffer.len() as isize) as usize;
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
self.output_len = i + 1;
|
||||||
|
self.output_pos = (pos as isize).rem_euclid(self.ring_buffer.len() as isize) as usize;
|
||||||
|
} else {
|
||||||
|
self.output_len = self.bytes_read.min(self.ring_buffer.len());
|
||||||
|
self.output_pos = if self.bytes_read > self.ring_buffer.len() {
|
||||||
|
self.ring_buffer_pos
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Send> Read for TailFilter<R> {
|
impl<R: Read + Send> Read for TailFilter<R> {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
if self.is_eof {
|
if self.is_eof {
|
||||||
return Ok(0);
|
// We've processed the input and are now outputting the tail
|
||||||
|
if self.output_pos >= self.output_len {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let bytes_to_copy = std::cmp::min(buf.len(), self.output_len - self.output_pos);
|
||||||
|
for i in 0..bytes_to_copy {
|
||||||
|
let index = (self.output_pos + i) % self.ring_buffer.len();
|
||||||
|
buf[i] = self.ring_buffer[index];
|
||||||
|
}
|
||||||
|
self.output_pos += bytes_to_copy;
|
||||||
|
return Ok(bytes_to_copy);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fill the ring buffer with data from the inner reader
|
// Read data into the ring buffer
|
||||||
let mut temp_buf = vec![0; 8192];
|
let mut temp_buf = vec![0; 8192];
|
||||||
let n = self.inner.read(&mut temp_buf)?;
|
let n = self.inner.read(&mut temp_buf)?;
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
self.is_eof = true;
|
self.is_eof = true;
|
||||||
// Now we can process the ring buffer to extract the tail
|
self.process_tail();
|
||||||
// This part needs to be implemented based on the tail parameters
|
return self.read(buf);
|
||||||
// For now, just return from the ring buffer
|
|
||||||
let to_copy = std::cmp::min(buf.len(), self.ring_buffer.len());
|
|
||||||
buf[..to_copy].copy_from_slice(&self.ring_buffer[..to_copy]);
|
|
||||||
return Ok(to_copy);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add new data to the ring buffer
|
// Add new data to the ring buffer
|
||||||
@@ -707,7 +784,7 @@ impl<R: Read + Send> Read for TailFilter<R> {
|
|||||||
self.bytes_read += 1;
|
self.bytes_read += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// We're still reading, so return 0 until EOF
|
// While reading, return 0 bytes
|
||||||
Ok(0)
|
Ok(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -721,6 +798,7 @@ struct LineRangeFilter<R: Read + Send> {
|
|||||||
in_range: bool,
|
in_range: bool,
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
buffer_pos: usize,
|
buffer_pos: usize,
|
||||||
|
is_eof: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Send> LineRangeFilter<R> {
|
impl<R: Read + Send> LineRangeFilter<R> {
|
||||||
@@ -731,21 +809,98 @@ impl<R: Read + Send> LineRangeFilter<R> {
|
|||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
inner,
|
inner,
|
||||||
line_start,
|
line_start: line_start.or(Some(1)),
|
||||||
line_end,
|
line_end,
|
||||||
current_line: 1,
|
current_line: 1,
|
||||||
in_range: false,
|
in_range: false,
|
||||||
buffer: Vec::new(),
|
buffer: Vec::new(),
|
||||||
buffer_pos: 0,
|
buffer_pos: 0,
|
||||||
|
is_eof: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn fill_buffer(&mut self) -> std::io::Result<()> {
|
||||||
|
if self.buffer_pos >= self.buffer.len() && !self.is_eof {
|
||||||
|
// Read more data
|
||||||
|
let mut temp_buf = [0; 8192];
|
||||||
|
let n = self.inner.read(&mut temp_buf)?;
|
||||||
|
if n == 0 {
|
||||||
|
self.is_eof = true;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
self.buffer.extend_from_slice(&temp_buf[..n]);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read + Send> Read for LineRangeFilter<R> {
|
impl<R: Read + Send> Read for LineRangeFilter<R> {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
// Implementation would process the stream to find the specified line range
|
if self.is_eof && self.buffer_pos >= self.buffer.len() {
|
||||||
// This is a complex operation that needs to be implemented carefully
|
return Ok(0);
|
||||||
// For now, this is a placeholder
|
}
|
||||||
Ok(0)
|
|
||||||
|
let start_line = self.line_start.unwrap_or(1);
|
||||||
|
let end_line = self.line_end.unwrap_or(usize::MAX);
|
||||||
|
|
||||||
|
let mut bytes_written = 0;
|
||||||
|
|
||||||
|
while bytes_written < buf.len() {
|
||||||
|
// Fill the buffer if needed
|
||||||
|
self.fill_buffer()?;
|
||||||
|
|
||||||
|
if self.buffer_pos >= self.buffer.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process bytes until we find the start of the range
|
||||||
|
if !self.in_range {
|
||||||
|
while self.buffer_pos < self.buffer.len() && self.current_line < start_line {
|
||||||
|
if self.buffer[self.buffer_pos] == b'\n' {
|
||||||
|
self.current_line += 1;
|
||||||
|
if self.current_line >= start_line {
|
||||||
|
self.in_range = true;
|
||||||
|
self.buffer_pos += 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.buffer_pos += 1;
|
||||||
|
}
|
||||||
|
// If we're still not in range, continue reading
|
||||||
|
if !self.in_range {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we're in the range, copy data until we reach the end line or buffer end
|
||||||
|
if self.in_range && self.current_line <= end_line {
|
||||||
|
let bytes_to_copy = std::cmp::min(
|
||||||
|
buf.len() - bytes_written,
|
||||||
|
self.buffer.len() - self.buffer_pos
|
||||||
|
);
|
||||||
|
|
||||||
|
// Check if we encounter a newline that would take us past the end line
|
||||||
|
for i in 0..bytes_to_copy {
|
||||||
|
let byte = self.buffer[self.buffer_pos + i];
|
||||||
|
buf[bytes_written + i] = byte;
|
||||||
|
|
||||||
|
if byte == b'\n' {
|
||||||
|
self.current_line += 1;
|
||||||
|
if self.current_line > end_line {
|
||||||
|
// We've reached the end of the range
|
||||||
|
self.buffer_pos += i + 1;
|
||||||
|
return Ok(bytes_written + i + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes_written += bytes_to_copy;
|
||||||
|
self.buffer_pos += bytes_to_copy;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(bytes_written)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user