refactor: simplify filter plugin interface to use &mut dyn Read/Write

This commit is contained in:
Andrew Phillips
2025-09-15 17:42:35 -03:00
committed by Andrew Phillips (aider)
parent a8871a9575
commit a72395fe83
17 changed files with 102 additions and 138 deletions

View File

@@ -56,8 +56,8 @@
### Fix build problems ### Fix build problems
1. Build the project: `TERM=dumb cargo build`. 1. Check the project: `TERM=dumb cargo check`.
2. If there are errors or warnings, create a new sub agent (expert rust developer) that uses the `TERM=dumb cargo build` output as input, planned using strategic thinking. 2. If there are errors or warnings, create a new sub agent (expert rust developer) that uses the `TERM=dumb cargo check` output as input, planned using strategic thinking.
a. Read all affected files a. Read all affected files
d. Plan the fixes using strategic thinking: d. Plan the fixes using strategic thinking:
- Read other files if they provide context or examples - Read other files if they provide context or examples

View File

@@ -93,7 +93,7 @@ impl FilterPlugin for ExecFilter {
/// // In filter context: /// // In filter context:
/// filter.filter(Box::new(&mut input), Box::new(&mut output)).unwrap(); /// filter.filter(Box::new(&mut input), Box::new(&mut output)).unwrap();
/// ``` /// ```
fn filter(&mut self, mut reader: Box<&mut dyn Read>, mut writer: Box<&mut dyn Write>) -> Result<()> { fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
if !self.supported { if !self.supported {
return Err(std::io::Error::new( return Err(std::io::Error::new(
std::io::ErrorKind::NotFound, std::io::ErrorKind::NotFound,
@@ -103,6 +103,10 @@ impl FilterPlugin for ExecFilter {
debug!("FILTER_EXEC: Executing command: {} {:?}", self.program, self.args); debug!("FILTER_EXEC: Executing command: {} {:?}", self.program, self.args);
// Read all input first
let mut input_data = Vec::new();
std::io::copy(reader, &mut input_data)?;
let mut child = Command::new(&self.program) let mut child = Command::new(&self.program)
.args(&self.args) .args(&self.args)
.stdin(Stdio::piped()) .stdin(Stdio::piped())
@@ -116,68 +120,28 @@ impl FilterPlugin for ExecFilter {
) )
})?; })?;
let stdin = child.stdin.take().ok_or_else(|| { let mut stdin = child.stdin.take().ok_or_else(|| {
std::io::Error::new( std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
"Failed to capture stdin from child process", "Failed to capture stdin from child process",
) )
})?; })?;
let stdout = child.stdout.take().ok_or_else(|| {
// Write input to child stdin
stdin.write_all(&input_data)?;
drop(stdin); // Close stdin to signal EOF
let mut stdout = child.stdout.take().ok_or_else(|| {
std::io::Error::new( std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
"Failed to capture stdout from child process", "Failed to capture stdout from child process",
) )
})?; })?;
self.stdin_writer = Some(stdin); // Copy stdout to writer
self.stdout_reader = Some(stdout); std::io::copy(&mut stdout, writer)?;
self.child_process = Some(child);
let mut stdin_writer = self.stdin_writer.as_mut().unwrap();
let mut stdout_reader = self.stdout_reader.as_mut().unwrap();
// Thread to copy from input reader to child stdin
let input_thread = std::thread::spawn(move || {
std::io::copy(&mut *reader, &mut *stdin_writer)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to write to process stdin: {}", e),
)
})?;
// Close stdin to signal EOF to the child process
drop(stdin_writer);
Ok(())
});
// Thread to copy from child stdout to output writer
let output_thread = std::thread::spawn(move || {
std::io::copy(&mut *stdout_reader, &mut *writer)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read from process stdout: {}", e),
)
})?;
Ok(())
});
// Wait for both threads to complete
input_thread.join().map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Input thread panicked: {:?}", e),
)
})?;
output_thread.join().map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Output thread panicked: {:?}", e),
)
})?;
// Wait for the child process to finish // Wait for the child process to finish
if let Some(mut child) = self.child_process.take() {
let output = child.wait_with_output() let output = child.wait_with_output()
.map_err(|e| { .map_err(|e| {
std::io::Error::new( std::io::Error::new(
@@ -196,7 +160,6 @@ impl FilterPlugin for ExecFilter {
format!("Process exited with error: {:?}", output.status), format!("Process exited with error: {:?}", output.status),
)); ));
} }
}
debug!("FILTER_EXEC: Process completed successfully"); debug!("FILTER_EXEC: Process completed successfully");
Ok(()) Ok(())

View File

@@ -70,12 +70,12 @@ impl GrepFilter {
/// filter.filter(&mut input, &mut output)?; /// filter.filter(&mut input, &mut output)?;
/// ``` /// ```
impl FilterPlugin for GrepFilter { impl FilterPlugin for GrepFilter {
fn filter(&mut self, mut reader: Box<dyn Read>, mut writer: Box<dyn Write>) -> Result<()> { fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
let mut buf_reader = std::io::BufReader::new(reader.as_mut()); let mut buf_reader = std::io::BufReader::new(reader);
for line in buf_reader.by_ref().lines() { for line in buf_reader.by_ref().lines() {
let line = line?; let line = line?;
if self.regex.is_match(&line) { if self.regex.is_match(&line) {
writeln!(writer.as_mut(), "{}", line)?; writeln!(writer, "{}", line)?;
} }
} }
Ok(()) Ok(())

View File

@@ -72,7 +72,7 @@ impl HeadBytesFilter {
/// // Input "Hello World" becomes "Hello" /// // Input "Hello World" becomes "Hello"
/// ``` /// ```
impl FilterPlugin for HeadBytesFilter { impl FilterPlugin for HeadBytesFilter {
fn filter(&mut self, mut reader: Box<dyn Read>, mut writer: Box<dyn Write>) -> Result<()> { fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
if self.remaining == 0 { if self.remaining == 0 {
return Ok(()); return Ok(());
} }
@@ -80,11 +80,11 @@ impl FilterPlugin for HeadBytesFilter {
let mut buffer = vec![0; PIPESIZE]; let mut buffer = vec![0; PIPESIZE];
while self.remaining > 0 { while self.remaining > 0 {
let to_read = std::cmp::min(self.remaining, PIPESIZE); let to_read = std::cmp::min(self.remaining, PIPESIZE);
let bytes_read = reader.as_mut().read(&mut buffer[..to_read])?; let bytes_read = reader.read(&mut buffer[..to_read])?;
if bytes_read == 0 { if bytes_read == 0 {
break; break;
} }
writer.as_mut().write_all(&buffer[..bytes_read])?; writer.write_all(&buffer[..bytes_read])?;
self.remaining -= bytes_read; self.remaining -= bytes_read;
} }
Ok(()) Ok(())
@@ -179,18 +179,19 @@ impl HeadLinesFilter {
/// ///
/// ``` /// ```
/// // Assuming a filter chain with head_lines(2) /// // Assuming a filter chain with head_lines(2)
/// // Input "Line1\nLine2\nLine3" becomes "Line1\nLine2\n" /// // Input: "Line1\nLine2\nLine3" becomes "Line1\nLine2\n"
/// ``` /// ```
impl FilterPlugin for HeadLinesFilter { impl FilterPlugin for HeadLinesFilter {
fn filter(&mut self, mut reader: Box<dyn Read>, mut writer: Box<dyn Write>) -> Result<()> { fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
if self.remaining == 0 { if self.remaining == 0 {
return Ok(()); return Ok(());
} }
let mut buf_reader = std::io::BufReader::new(reader.as_mut()); let mut buf_reader = std::io::BufReader::new(reader);
for line in buf_reader.by_ref().lines() { for line in buf_reader.by_ref().lines() {
let line = line?; let line = line?;
writeln!(writer.as_mut(), "{}", line)?; writeln!(writer, "{}", line)?;
self.remaining -= 1; self.remaining -= 1;
if self.remaining == 0 { if self.remaining == 0 {
break; break;

View File

@@ -101,7 +101,10 @@ pub trait FilterPlugin: Send {
/// // ... other methods /// // ... other methods
/// } /// }
/// ``` /// ```
fn filter(&mut self, mut reader: Box<dyn Read>, mut writer: Box<dyn Write>) -> Result<()>; fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
let _ = std::io::copy(reader, writer)?;
Ok(())
}
/// Clones this plugin into a new boxed instance. /// Clones this plugin into a new boxed instance.
/// ///
@@ -294,19 +297,16 @@ impl FilterChain {
for i in 0..plugins_len { for i in 0..plugins_len {
// Create a cursor for the current data // Create a cursor for the current data
let mut cursor = std::io::Cursor::new(&current_data); let mut input = std::io::Cursor::new(std::mem::take(&mut current_data));
let input: Box<dyn Read> = Box::new(cursor);
// For the last plugin, write directly to the output writer // For the last plugin, write directly to the output writer
if i == plugins_len - 1 { if i == plugins_len - 1 {
let output: Box<dyn Write> = Box::new(writer); self.plugins[i].filter(&mut input, writer)?;
self.plugins[i].filter(input, output)?;
} else { } else {
// For intermediate plugins, write to a buffer // For intermediate plugins, write to a buffer
let mut output = Vec::new(); let mut output_vec = Vec::new();
let output_ref: Box<dyn Write> = Box::new(&mut output); self.plugins[i].filter(&mut input, &mut output_vec)?;
self.plugins[i].filter(input, output_ref)?; current_data = output_vec;
current_data = output;
} }
} }
Ok(()) Ok(())

View File

@@ -32,13 +32,13 @@ impl FilterPlugin for SkipBytesFilter {
/// # Returns /// # Returns
/// ///
/// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails. /// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails.
fn filter(&mut self, mut reader: Box<dyn Read>, mut writer: Box<dyn Write>) -> Result<()> { fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
// Skip bytes in chunks // Skip bytes in chunks
if self.remaining > 0 { if self.remaining > 0 {
let mut buffer = vec![0; PIPESIZE]; let mut buffer = vec![0; PIPESIZE];
while self.remaining > 0 { while self.remaining > 0 {
let to_read = std::cmp::min(self.remaining, PIPESIZE); let to_read = std::cmp::min(self.remaining, PIPESIZE);
let bytes_read = reader.as_mut().read(&mut buffer[..to_read])?; let bytes_read = reader.read(&mut buffer[..to_read])?;
if bytes_read == 0 { if bytes_read == 0 {
break; break;
} }
@@ -47,7 +47,7 @@ impl FilterPlugin for SkipBytesFilter {
} }
// Copy the remaining data using io::copy for efficiency // Copy the remaining data using io::copy for efficiency
std::io::copy(reader.as_mut(), writer.as_mut())?; std::io::copy(reader, writer)?;
Ok(()) Ok(())
} }
@@ -107,14 +107,14 @@ impl FilterPlugin for SkipLinesFilter {
/// # Returns /// # Returns
/// ///
/// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails. /// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails.
fn filter(&mut self, mut reader: Box<dyn Read>, mut writer: Box<dyn Write>) -> Result<()> { fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
let mut buf_reader = std::io::BufReader::new(reader.as_mut()); let mut buf_reader = std::io::BufReader::new(reader);
for line in buf_reader.by_ref().lines() { for line in buf_reader.by_ref().lines() {
let line = line?; let line = line?;
if self.remaining > 0 { if self.remaining > 0 {
self.remaining -= 1; self.remaining -= 1;
} else { } else {
writeln!(writer.as_mut(), "{}", line)?; writeln!(writer, "{}", line)?;
} }
} }
Ok(()) Ok(())

View File

@@ -32,9 +32,9 @@ impl FilterPlugin for StripAnsiFilter {
/// # Returns /// # Returns
/// ///
/// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails. /// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails.
fn filter(&mut self, mut reader: Box<dyn Read>, mut writer: Box<dyn Write>) -> Result<()> { fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
let mut ansi_writer = Writer::new(writer.as_mut()); let mut ansi_writer = Writer::new(writer);
std::io::copy(reader.as_mut(), &mut ansi_writer)?; std::io::copy(reader, &mut ansi_writer)?;
ansi_writer.flush()?; ansi_writer.flush()?;
Ok(()) Ok(())
} }

View File

@@ -35,10 +35,10 @@ impl FilterPlugin for TailBytesFilter {
/// # Returns /// # Returns
/// ///
/// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails. /// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails.
fn filter(&mut self, mut reader: Box<dyn Read>, mut writer: Box<dyn Write>) -> Result<()> { fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
let mut temp_buffer = vec![0; PIPESIZE]; let mut temp_buffer = vec![0; PIPESIZE];
loop { loop {
let bytes_read = reader.as_mut().read(&mut temp_buffer)?; let bytes_read = reader.read(&mut temp_buffer)?;
if bytes_read == 0 { if bytes_read == 0 {
break; break;
} }
@@ -54,7 +54,7 @@ impl FilterPlugin for TailBytesFilter {
// Write the buffered data at the end // Write the buffered data at the end
let result: Vec<u8> = self.buffer.iter().cloned().collect(); let result: Vec<u8> = self.buffer.iter().cloned().collect();
writer.as_mut().write_all(&result)?; writer.write_all(&result)?;
Ok(()) Ok(())
} }
@@ -117,8 +117,8 @@ impl FilterPlugin for TailLinesFilter {
/// # Returns /// # Returns
/// ///
/// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails. /// Returns `Ok(())` on success, or an `io::Error` if reading or writing fails.
fn filter(&mut self, mut reader: Box<dyn Read>, mut writer: Box<dyn Write>) -> Result<()> { fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
let mut buf_reader = std::io::BufReader::new(reader.as_mut()); let mut buf_reader = std::io::BufReader::new(reader);
for line in buf_reader.by_ref().lines() { for line in buf_reader.by_ref().lines() {
let line = line?; let line = line?;
if self.lines.len() == self.count { if self.lines.len() == self.count {
@@ -129,7 +129,7 @@ impl FilterPlugin for TailLinesFilter {
// Write the buffered lines // Write the buffered lines
for line in &self.lines { for line in &self.lines {
writeln!(writer.as_mut(), "{}", line)?; writeln!(writer, "{}", line)?;
} }
Ok(()) Ok(())
} }

View File

@@ -127,7 +127,7 @@ impl MetaPluginExec {
.stderr(Stdio::piped()); .stderr(Stdio::piped());
match cmd.spawn() { match cmd.spawn() {
Ok(child) => { Ok(mut child) => {
let stdin = child.stdin.take().unwrap(); let stdin = child.stdin.take().unwrap();
self.writer = Some(Box::new(stdin)); self.writer = Some(Box::new(stdin));
self.process = Some(child); self.process = Some(child);

View File

@@ -26,7 +26,7 @@ impl KeepPidMetaPlugin {
// Set default outputs // Set default outputs
let default_outputs = &["keep_pid"]; let default_outputs = &["keep_pid"];
base.initialize_plugin(default_outputs, _options, outputs); base.initialize_plugin(default_outputs, &_options, &outputs);
KeepPidMetaPlugin { KeepPidMetaPlugin {
is_finalized: false, is_finalized: false,

View File

@@ -4,13 +4,14 @@ use magic::{Cookie, CookieFlags};
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use std::io::{self, Write}; use std::io::{self, Write};
use std::path::Path;
use log::debug; use log::debug;
use crate::meta_plugin::{MetaPlugin, MetaPluginType, BaseMetaPlugin, MetaPluginResponse, MetaData, process_metadata_outputs}; use crate::meta_plugin::{MetaPlugin, MetaPluginType, BaseMetaPlugin, MetaPluginResponse, MetaData, process_metadata_outputs};
#[cfg(feature = "magic")] #[cfg(feature = "magic")]
#[derive(Debug)] #[derive(Debug)]
pub struct MagicFileMetaPlugin { pub struct MagicFileMetaPluginImpl {
buffer: Vec<u8>, buffer: Vec<u8>,
max_buffer_size: usize, max_buffer_size: usize,
is_finalized: bool, is_finalized: bool,
@@ -19,11 +20,11 @@ pub struct MagicFileMetaPlugin {
} }
#[cfg(feature = "magic")] #[cfg(feature = "magic")]
impl MagicFileMetaPlugin { impl MagicFileMetaPluginImpl {
pub fn new( pub fn new(
options: Option<std::collections::HashMap<String, serde_yaml::Value>>, options: Option<std::collections::HashMap<String, serde_yaml::Value>>,
outputs: Option<std::collections::HashMap<String, serde_yaml::Value>>, outputs: Option<std::collections::HashMap<String, serde_yaml::Value>>,
) -> MagicFileMetaPlugin { ) -> MagicFileMetaPluginImpl {
let mut base = BaseMetaPlugin::new(); let mut base = BaseMetaPlugin::new();
// Set default outputs // Set default outputs
@@ -36,7 +37,7 @@ impl MagicFileMetaPlugin {
.and_then(|v| v.as_u64()) .and_then(|v| v.as_u64())
.unwrap_or(crate::common::PIPESIZE as u64) as usize; .unwrap_or(crate::common::PIPESIZE as u64) as usize;
MagicFileMetaPlugin { MagicFileMetaPluginImpl {
buffer: Vec::new(), buffer: Vec::new(),
max_buffer_size, max_buffer_size,
is_finalized: false, is_finalized: false,
@@ -68,7 +69,7 @@ impl MagicFileMetaPlugin {
let types_to_process = [ let types_to_process = [
("mime_type", CookieFlags::MIME_TYPE), ("mime_type", CookieFlags::MIME_TYPE),
("mime_encoding", CookieFlags::MIME_ENCODING), ("mime_encoding", CookieFlags::MIME_ENCODING),
("file_type", CookieFlags::NONE), ("file_type", CookieFlags::empty()),
]; ];
for (name, flags) in types_to_process.iter() { for (name, flags) in types_to_process.iter() {
@@ -90,7 +91,7 @@ impl MagicFileMetaPlugin {
} }
#[cfg(feature = "magic")] #[cfg(feature = "magic")]
impl MetaPlugin for MagicFileMetaPlugin { impl MetaPlugin for MagicFileMetaPluginImpl {
fn is_finalized(&self) -> bool { fn is_finalized(&self) -> bool {
self.is_finalized self.is_finalized
} }
@@ -111,7 +112,7 @@ impl MetaPlugin for MagicFileMetaPlugin {
} }
}; };
if let Err(e) = cookie.load(&[]) { if let Err(e) = cookie.load(&[] as &[&Path]) {
debug!("META: MagicFile plugin: failed to load magic database: {}", e); debug!("META: MagicFile plugin: failed to load magic database: {}", e);
return MetaPluginResponse { return MetaPluginResponse {
metadata: Vec::new(), metadata: Vec::new(),
@@ -404,12 +405,10 @@ impl MetaPlugin for FallbackMagicFileMetaPlugin {
} }
#[cfg(feature = "magic")] #[cfg(feature = "magic")]
use MagicFileMetaPlugin as MagicFileMetaPluginImpl; pub use MagicFileMetaPluginImpl as MagicFileMetaPlugin;
#[cfg(not(feature = "magic"))] #[cfg(not(feature = "magic"))]
use FallbackMagicFileMetaPlugin as MagicFileMetaPluginImpl; pub use FallbackMagicFileMetaPlugin as MagicFileMetaPlugin;
pub use MagicFileMetaPluginImpl as MagicFileMetaPlugin;
use crate::meta_plugin::register_meta_plugin; use crate::meta_plugin::register_meta_plugin;
@@ -419,3 +418,4 @@ fn register_magic_file_plugin() {
Box::new(MagicFileMetaPlugin::new(options, outputs)) Box::new(MagicFileMetaPlugin::new(options, outputs))
}); });
} }

View File

@@ -51,7 +51,7 @@ impl ReadRateMetaPlugin {
// Set default outputs // Set default outputs
let default_outputs = &["read_rate"]; let default_outputs = &["read_rate"];
base.initialize_plugin(default_outputs, _options, outputs); base.initialize_plugin(default_outputs, &_options, &outputs);
ReadRateMetaPlugin { ReadRateMetaPlugin {
start_time: None, start_time: None,

View File

@@ -18,7 +18,7 @@ impl ReadTimeMetaPlugin {
// Set default outputs // Set default outputs
let default_outputs = &["read_time"]; let default_outputs = &["read_time"];
base.initialize_plugin(default_outputs, _options, outputs); base.initialize_plugin(default_outputs, &_options, &outputs);
ReadTimeMetaPlugin { ReadTimeMetaPlugin {
start_time: None, start_time: None,

View File

@@ -41,7 +41,7 @@ impl ShellMetaPlugin {
// Set default outputs // Set default outputs
let default_outputs = &["shell"]; let default_outputs = &["shell"];
base.initialize_plugin(default_outputs, _options, outputs); base.initialize_plugin(default_outputs, &_options, &outputs);
ShellMetaPlugin { ShellMetaPlugin {
is_finalized: false, is_finalized: false,

View File

@@ -17,7 +17,7 @@ impl ShellPidMetaPlugin {
// Set default outputs // Set default outputs
let default_outputs = &["shell_pid"]; let default_outputs = &["shell_pid"];
base.initialize_plugin(default_outputs, options, outputs); base.initialize_plugin(default_outputs, &options, &outputs);
ShellPidMetaPlugin { ShellPidMetaPlugin {
is_finalized: false, is_finalized: false,

View File

@@ -43,8 +43,8 @@ impl TextMetaPlugin {
base.initialize_plugin( base.initialize_plugin(
&["text", "text_word_count", "text_line_count", &["text", "text_word_count", "text_line_count",
"text_line_max_len", "text_line_mean_len", "text_line_median_len"], "text_line_max_len", "text_line_mean_len", "text_line_median_len"],
options, &options,
outputs, &outputs,
); );
// Set disabled outputs to null based on options // Set disabled outputs to null based on options

View File

@@ -29,8 +29,8 @@ impl UserMetaPlugin {
// Initialize with helper function // Initialize with helper function
base.initialize_plugin( base.initialize_plugin(
&["user_uid", "user_gid", "user_name", "user_group"], &["user_uid", "user_gid", "user_name", "user_group"],
options, &options,
outputs, &outputs,
); );
UserMetaPlugin { UserMetaPlugin {