feat: add is_finalized to MetaPluginResponse and remove direct db interaction from meta plugins

Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) <aider@aider.chat>
This commit is contained in:
Andrew Phillips
2025-08-26 17:11:27 -03:00
parent ec428f5fc4
commit 9d60461354
3 changed files with 58 additions and 32 deletions

View File

@@ -25,6 +25,7 @@ pub struct MetaData {
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct MetaPluginResponse { pub struct MetaPluginResponse {
pub metadata: Vec<MetaData>, pub metadata: Vec<MetaData>,
pub is_finalized: bool,
} }
#[derive(Debug, Eq, PartialEq, Clone, strum::EnumIter, strum::Display, strum::EnumString)] #[derive(Debug, Eq, PartialEq, Clone, strum::EnumIter, strum::Display, strum::EnumString)]
@@ -93,12 +94,18 @@ pub trait MetaPlugin {
// Update the meta plugin with new data // Update the meta plugin with new data
fn update(&mut self, data: &[u8]) -> MetaPluginResponse { fn update(&mut self, data: &[u8]) -> MetaPluginResponse {
// Default implementation does nothing // Default implementation does nothing
MetaPluginResponse::default() MetaPluginResponse {
metadata: Vec::new(),
is_finalized: false,
}
} }
fn finalize(&mut self) -> MetaPluginResponse { fn finalize(&mut self) -> MetaPluginResponse {
// Default implementation does nothing // Default implementation does nothing
MetaPluginResponse::default() MetaPluginResponse {
metadata: Vec::new(),
is_finalized: true,
}
} }
fn meta_name(&self) -> String; fn meta_name(&self) -> String;
@@ -111,7 +118,10 @@ pub trait MetaPlugin {
// Initialize the plugin // Initialize the plugin
fn initialize(&mut self) -> MetaPluginResponse { fn initialize(&mut self) -> MetaPluginResponse {
// Default implementation does nothing // Default implementation does nothing
MetaPluginResponse::default() MetaPluginResponse {
metadata: Vec::new(),
is_finalized: false,
}
} }
// Access to outputs mapping // Access to outputs mapping

View File

@@ -95,37 +95,42 @@ impl MetaPlugin for MetaPluginProgram {
false false
} }
fn initialize(&mut self, item_id: i64) -> Result<PluginResponse> { fn initialize(&mut self) -> MetaPluginResponse {
debug!("META: Initializing program plugin: {:?}", self); debug!("META: Initializing program plugin: {:?}", self);
// Store item ID for later use
self.item_id = Some(item_id);
let program = self.program.clone(); let program = self.program.clone();
let args = self.args.clone(); let args = self.args.clone();
debug!("META: Executing command: {:?} {:?}", program, args); debug!("META: Executing command: {:?} {:?}", program, args);
let mut process = Command::new(program.clone()) let mut process = match Command::new(program.clone())
.args(args.clone()) .args(args.clone())
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
.spawn() .spawn()
.context(anyhow!( {
"Problem spawning child process: {:?} {:?}", Ok(process) => process,
program, Err(e) => {
args debug!("META: Failed to spawn process: {}", e);
))?; return MetaPluginResponse {
metadata: Vec::new(),
is_finalized: true,
};
}
};
let stdin = process.stdin.take().unwrap(); let stdin = process.stdin.take().unwrap();
self.writer = Some(Box::new(stdin)); self.writer = Some(Box::new(stdin));
self.process = Some(process); self.process = Some(process);
Ok(MetaPluginResponse::default()) MetaPluginResponse {
metadata: Vec::new(),
is_finalized: false,
}
} }
fn finalize(&mut self) -> Result<MetaPluginResponse> { fn finalize(&mut self) -> MetaPluginResponse {
debug!("META: Finalizing program plugin"); debug!("META: Finalizing program plugin");
let mut metadata = Vec::new(); let mut metadata = Vec::new();
@@ -134,7 +139,16 @@ impl MetaPlugin for MetaPluginProgram {
drop(self.writer.take()); drop(self.writer.take());
// Wait for the process to complete // Wait for the process to complete
let output = process.wait_with_output()?; let output = match process.wait_with_output() {
Ok(output) => output,
Err(e) => {
debug!("META: Failed to get process output: {}", e);
return MetaPluginResponse {
metadata: Vec::new(),
is_finalized: true,
};
}
};
if output.status.success() { if output.status.success() {
// Process the output // Process the output
@@ -149,12 +163,11 @@ impl MetaPlugin for MetaPluginProgram {
debug!("META: Program output: {}", result); debug!("META: Program output: {}", result);
self.result = Some(result); self.result = Some(result);
// Create metadata to be stored // Create metadata to be returned
if let Some(item_id) = self.item_id { metadata.push(MetaData {
if let Some(meta) = self.create_meta(item_id, &self.meta_name, result) { name: self.meta_name.clone(),
metadata.push(meta); value: result,
} });
}
} }
} else { } else {
debug!("META: Program failed with status: {:?}", output.status); debug!("META: Program failed with status: {:?}", output.status);
@@ -165,19 +178,22 @@ impl MetaPlugin for MetaPluginProgram {
} }
} }
Ok(MetaPluginResponse { MetaPluginResponse {
metadata: if metadata.is_empty() { None } else { Some(metadata) }, metadata,
is_finalized: true, is_finalized: true,
}) }
} }
fn update(&mut self, data: &[u8]) -> Result<MetaPluginResponse> { fn update(&mut self, data: &[u8]) -> MetaPluginResponse {
if let Some(ref mut writer) = self.writer { if let Some(ref mut writer) = self.writer {
if let Err(e) = writer.write_all(data) { if let Err(e) = writer.write_all(data) {
debug!("META: Failed to write to process stdin: {}", e); debug!("META: Failed to write to process stdin: {}", e);
} }
} }
Ok(PluginResponse::default()) MetaPluginResponse {
metadata: Vec::new(),
is_finalized: false,
}
} }
fn meta_name(&self) -> String { fn meta_name(&self) -> String {

View File

@@ -285,7 +285,7 @@ impl ItemService {
total_bytes += n as i64; total_bytes += n as i64;
item_out.write_all(&buffer[..n])?; item_out.write_all(&buffer[..n])?;
self.meta_service.process_chunk(&mut plugins, &buffer[..n], &tx); self.meta_service.process_chunk(&mut plugins, &buffer[..n], &tx, item_id);
} }
debug!("ITEM_SERVICE: Processed {} bytes total", total_bytes); debug!("ITEM_SERVICE: Processed {} bytes total", total_bytes);
@@ -293,7 +293,7 @@ impl ItemService {
drop(item_out); drop(item_out);
debug!("ITEM_SERVICE: Finalizing meta plugins"); debug!("ITEM_SERVICE: Finalizing meta plugins");
self.meta_service.finalize_plugins(&mut plugins, &tx); self.meta_service.finalize_plugins(&mut plugins, &tx, item_id);
item.size = Some(total_bytes); item.size = Some(total_bytes);
db::update_item(&tx, item.clone())?; db::update_item(&tx, item.clone())?;
@@ -365,8 +365,8 @@ impl ItemService {
self.meta_service self.meta_service
.initialize_plugins(&mut plugins, &tx, item_id); .initialize_plugins(&mut plugins, &tx, item_id);
self.meta_service self.meta_service
.process_chunk(&mut plugins, content, &tx); .process_chunk(&mut plugins, content, &tx, item_id);
self.meta_service.finalize_plugins(&mut plugins, &tx); self.meta_service.finalize_plugins(&mut plugins, &tx, item_id);
debug!("ITEM_SERVICE: Processed MCP item through meta plugins"); debug!("ITEM_SERVICE: Processed MCP item through meta plugins");
item.size = Some(content.len() as i64); item.size = Some(content.len() as i64);