feat: add filtering reader implementation
Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) <aider@aider.chat>
This commit is contained in:
@@ -16,6 +16,79 @@ use std::fs;
|
|||||||
use std::io::{IsTerminal, Read, Write};
|
use std::io::{IsTerminal, Read, Write};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
/// A reader that applies a filter chain to the data as it's read
|
||||||
|
struct FilteringReader<R: Read> {
|
||||||
|
reader: R,
|
||||||
|
filter_chain: Option<crate::filter_plugin::FilterChain>,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
buffer_pos: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Read> FilteringReader<R> {
|
||||||
|
pub fn new(reader: R, filter_chain: Option<crate::filter_plugin::FilterChain>) -> Self {
|
||||||
|
Self {
|
||||||
|
reader,
|
||||||
|
filter_chain,
|
||||||
|
buffer: Vec::new(),
|
||||||
|
buffer_pos: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Read> Read for FilteringReader<R> {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
// If we have data in our buffer, serve that first
|
||||||
|
if self.buffer_pos < self.buffer.len() {
|
||||||
|
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
|
||||||
|
buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]);
|
||||||
|
self.buffer_pos += bytes_to_copy;
|
||||||
|
return Ok(bytes_to_copy);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset buffer for new data
|
||||||
|
self.buffer.clear();
|
||||||
|
self.buffer_pos = 0;
|
||||||
|
|
||||||
|
// Read from the original reader
|
||||||
|
let mut temp_buf = vec![0; buf.len()];
|
||||||
|
let bytes_read = self.reader.read(&mut temp_buf)?;
|
||||||
|
|
||||||
|
if bytes_read == 0 {
|
||||||
|
// If we're at EOF, process any remaining data in the filter chain
|
||||||
|
if let Some(chain) = &mut self.filter_chain {
|
||||||
|
let finished_data = chain.finish()?;
|
||||||
|
if !finished_data.is_empty() {
|
||||||
|
self.buffer = finished_data;
|
||||||
|
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len());
|
||||||
|
buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]);
|
||||||
|
self.buffer_pos = bytes_to_copy;
|
||||||
|
return Ok(bytes_to_copy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process through the filter chain if it exists
|
||||||
|
if let Some(chain) = &mut self.filter_chain {
|
||||||
|
let processed_data = chain.process(&temp_buf[..bytes_read])?;
|
||||||
|
if !processed_data.is_empty() {
|
||||||
|
self.buffer = processed_data;
|
||||||
|
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len());
|
||||||
|
buf[..bytes_to_copy].copy_from_slice(&self.buffer[..bytes_to_copy]);
|
||||||
|
self.buffer_pos = bytes_to_copy;
|
||||||
|
Ok(bytes_to_copy)
|
||||||
|
} else {
|
||||||
|
// No data produced by filter, try reading more
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// No filter chain, just pass through
|
||||||
|
buf[..bytes_read].copy_from_slice(&temp_buf[..bytes_read]);
|
||||||
|
Ok(bytes_read)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct ItemService {
|
pub struct ItemService {
|
||||||
data_path: PathBuf,
|
data_path: PathBuf,
|
||||||
compression_service: CompressionService,
|
compression_service: CompressionService,
|
||||||
|
|||||||
Reference in New Issue
Block a user