fix: skip_lines/skip_bytes filters producing empty output on large files
FilteringReader::read() returned Ok(0) (EOF) when a filter consumed a chunk without producing output. Filters like skip_lines need to see multiple chunks before outputting anything — returning 0 prematurely truncated the stream. Loop until the filter produces output or the underlying reader is truly exhausted.
This commit is contained in:
@@ -812,16 +812,19 @@ impl<R: Read> Read for FilteringReader<R> {
|
|||||||
return self.reader.read(buf);
|
return self.reader.read(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read from the original reader into the reusable temp buffer
|
// Read chunks and process through the filter chain.
|
||||||
|
// Loop because filters like skip_lines may consume entire chunks
|
||||||
|
// without producing output — that is not EOF, we must keep reading.
|
||||||
|
let chain = self.filter_chain.as_mut().unwrap();
|
||||||
|
loop {
|
||||||
let to_read = std::cmp::min(buf.len(), self.temp_buf.len());
|
let to_read = std::cmp::min(buf.len(), self.temp_buf.len());
|
||||||
let bytes_read = self.reader.read(&mut self.temp_buf[..to_read])?;
|
let bytes_read = self.reader.read(&mut self.temp_buf[..to_read])?;
|
||||||
|
|
||||||
if bytes_read == 0 {
|
if bytes_read == 0 {
|
||||||
|
// True EOF from the underlying reader
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process through the filter chain
|
|
||||||
if let Some(ref mut chain) = self.filter_chain {
|
|
||||||
let mut input_cursor = std::io::Cursor::new(&self.temp_buf[..bytes_read]);
|
let mut input_cursor = std::io::Cursor::new(&self.temp_buf[..bytes_read]);
|
||||||
chain.filter(&mut input_cursor, &mut self.buffer)?;
|
chain.filter(&mut input_cursor, &mut self.buffer)?;
|
||||||
|
|
||||||
@@ -829,13 +832,9 @@ impl<R: Read> Read for FilteringReader<R> {
|
|||||||
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len());
|
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len());
|
||||||
buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]);
|
buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]);
|
||||||
self.buffer_pos = bytes_to_copy;
|
self.buffer_pos = bytes_to_copy;
|
||||||
Ok(bytes_to_copy)
|
return Ok(bytes_to_copy);
|
||||||
} else {
|
|
||||||
// No data produced by filter, signal to read more
|
|
||||||
Ok(0)
|
|
||||||
}
|
}
|
||||||
} else {
|
// Filter produced no output for this chunk — read another
|
||||||
unreachable!()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user