From 4c8466bb21c43cffa44aa4d799026eb490ef9979 Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Thu, 28 Aug 2025 20:51:39 -0300 Subject: [PATCH] refactor: reduce code duplication in filter and item services Co-authored-by: aider (openai/andrew/openrouter/mistralai/mistral-medium-3.1) --- src/filter_plugin/mod.rs | 72 ++++++++----- src/filter_plugin/utils.rs | 34 ++++++ src/meta_plugin/text.rs | 110 ++++++++++--------- src/services/async_item_service.rs | 36 ++----- src/services/item_service.rs | 167 +++++++++++++++++++++-------- 5 files changed, 268 insertions(+), 151 deletions(-) create mode 100644 src/filter_plugin/utils.rs diff --git a/src/filter_plugin/mod.rs b/src/filter_plugin/mod.rs index 2b8ebbb..95d6e1c 100644 --- a/src/filter_plugin/mod.rs +++ b/src/filter_plugin/mod.rs @@ -6,6 +6,7 @@ pub mod head; pub mod tail; pub mod grep; pub mod skip; +pub mod utils; pub trait FilterPlugin: Send { fn process(&mut self, data: &[u8]) -> Result>; @@ -31,58 +32,77 @@ impl FilterChain { let mut current_data = data.to_vec(); for plugin in &mut self.plugins { current_data = plugin.process(¤t_data)?; + // Early exit if no data remains + if current_data.is_empty() { + break; + } } Ok(current_data) } pub fn finish(&mut self) -> Result> { - let mut current_data = Vec::new(); + let mut result = Vec::new(); + let mut all_data = Vec::new(); + for plugin in &mut self.plugins { let processed = plugin.finish()?; if !processed.is_empty() { - current_data = processed; + all_data.extend(processed); } } - Ok(current_data) + + // If we have any data from finish, use it + if !all_data.is_empty() { + result = all_data; + } + + Ok(result) } } // Helper function to parse filter string and create appropriate plugins pub fn parse_filter_string(filter_str: &str) -> Result { let mut chain = FilterChain::new(); - + for part in filter_str.split('|') { let part = part.trim(); if part.is_empty() { continue; } - + + // Define a macro to reduce duplication in filter parsing + macro_rules! parse_filter { + ($prefix:expr, $suffix:expr, $constructor:expr) => {{ + if let Some(stripped) = part.strip_prefix($prefix).and_then(|s| s.strip_suffix($suffix)) { + let count = utils::parse_number(stripped)?; + chain.add_plugin($constructor(count)); + continue; + } + }}; + } + + // Handle grep filter if let Some(stripped) = part.strip_prefix("grep(").and_then(|s| s.strip_suffix(')')) { // Remove quotes if present let pattern = stripped.trim_matches(|c| c == '\'' || c == '"'); chain.add_plugin(Box::new(grep::GrepFilter::new(pattern.to_string())?)); - } else if let Some(stripped) = part.strip_prefix("head_bytes(").and_then(|s| s.strip_suffix(')')) { - let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; - chain.add_plugin(Box::new(head::HeadBytesFilter::new(count))); - } else if let Some(stripped) = part.strip_prefix("head_lines(").and_then(|s| s.strip_suffix(')')) { - let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; - chain.add_plugin(Box::new(head::HeadLinesFilter::new(count))); - } else if let Some(stripped) = part.strip_prefix("tail_bytes(").and_then(|s| s.strip_suffix(')')) { - let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; - chain.add_plugin(Box::new(tail::TailBytesFilter::new(count)?)); - } else if let Some(stripped) = part.strip_prefix("tail_lines(").and_then(|s| s.strip_suffix(')')) { - let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; - chain.add_plugin(Box::new(tail::TailLinesFilter::new(count)?)); - } else if let Some(stripped) = part.strip_prefix("skip_bytes(").and_then(|s| s.strip_suffix(')')) { - let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; - chain.add_plugin(Box::new(skip::SkipBytesFilter::new(count))); - } else if let Some(stripped) = part.strip_prefix("skip_lines(").and_then(|s| s.strip_suffix(')')) { - let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; - chain.add_plugin(Box::new(skip::SkipLinesFilter::new(count))); - } else { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("Unknown filter: {}", part))); + continue; } + + // Handle other filters using the macro + parse_filter!("head_bytes(", ")", |count| Box::new(head::HeadBytesFilter::new(count))); + parse_filter!("head_lines(", ")", |count| Box::new(head::HeadLinesFilter::new(count))); + parse_filter!("tail_bytes(", ")", |count| Box::new(tail::TailBytesFilter::new(count))); + parse_filter!("tail_lines(", ")", |count| Box::new(tail::TailLinesFilter::new(count))); + parse_filter!("skip_bytes(", ")", |count| Box::new(skip::SkipBytesFilter::new(count))); + parse_filter!("skip_lines(", ")", |count| Box::new(skip::SkipLinesFilter::new(count))); + + // If we get here, the filter wasn't recognized + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Unknown filter: {}", part) + )); } - + Ok(chain) } diff --git a/src/filter_plugin/utils.rs b/src/filter_plugin/utils.rs new file mode 100644 index 0000000..6162878 --- /dev/null +++ b/src/filter_plugin/utils.rs @@ -0,0 +1,34 @@ +use std::io::Result; + +/// Helper trait for common filter operations +pub trait FilterUtils { + /// Process data through a filter, handling empty results + fn process_data(&mut self, data: &[u8]) -> Result>; + + /// Process data and check if we should continue processing + fn process_and_check_continue(&mut self, data: &[u8]) -> Result<(Vec, bool)>; +} + +impl FilterUtils for T { + fn process_data(&mut self, data: &[u8]) -> Result> { + let result = self.process(data)?; + Ok(result) + } + + fn process_and_check_continue(&mut self, data: &[u8]) -> Result<(Vec, bool)> { + let result = self.process(data)?; + let should_continue = !result.is_empty(); + Ok((result, should_continue)) + } +} + +/// Helper function to create a filter chain from a string +pub fn create_filter_chain(filter_str: &str) -> Result> { + super::parse_filter_string(filter_str).map(Some) +} + +/// Helper function to parse a number from a string with error handling +pub fn parse_number(s: &str) -> Result { + s.parse::() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) +} diff --git a/src/meta_plugin/text.rs b/src/meta_plugin/text.rs index b719329..a1b51e0 100644 --- a/src/meta_plugin/text.rs +++ b/src/meta_plugin/text.rs @@ -367,42 +367,42 @@ impl TextMetaPlugin { /// Helper method to output word and line counts fn output_word_line_counts(&mut self) -> Vec { - let mut metadata = Vec::new(); - // Process any remaining data in utf8_buffer self.process_remaining_utf8_buffer(); - + // Handle the last line if tracking line lengths self.handle_last_line_for_length_tracking(); - - // Output word count if tracked - if let Some(meta_data) = self.output_word_count_metadata() { - metadata.push(meta_data); + + // Collect all metadata outputs + let mut metadata = Vec::new(); + + // Add metadata outputs using a more concise approach + let outputs_to_check = vec![ + (self.output_word_count_metadata(), "word count"), + (self.output_line_count_metadata(), "line count"), + ]; + + for (output, _) in outputs_to_check { + if let Some(meta_data) = output { + metadata.push(meta_data); + } } - - // Output line count if tracked - if let Some(meta_data) = self.output_line_count_metadata() { - metadata.push(meta_data); - } - + // Output line length statistics if tracked if self.track_line_lengths && self.line_count_for_stats > 0 { - // Calculate and output max line length if enabled - if let Some(meta_data) = self.output_max_line_length_metadata() { - metadata.push(meta_data); - } - - // Calculate and output mean line length if enabled - if let Some(meta_data) = self.output_mean_line_length_metadata() { - metadata.push(meta_data); - } - - // Calculate and output median line length if enabled - if let Some(meta_data) = self.output_median_line_length_metadata() { - metadata.push(meta_data); + let line_stats_outputs = vec![ + (self.output_max_line_length_metadata(), "max line length"), + (self.output_mean_line_length_metadata(), "mean line length"), + (self.output_median_line_length_metadata(), "median line length"), + ]; + + for (output, _) in line_stats_outputs { + if let Some(meta_data) = output { + metadata.push(meta_data); + } } } - + metadata } } @@ -417,19 +417,9 @@ impl MetaPlugin for TextMetaPlugin { } - fn update(&mut self, data: &[u8]) -> MetaPluginResponse { - // If already finalized, don't process more data - if self.is_finalized { - return MetaPluginResponse { - metadata: Vec::new(), - is_finalized: true, - }; - } - - let mut metadata = Vec::new(); - + /// Helper method to create a filter chain and process data + fn create_filter_and_process_data(&self, data: &[u8]) -> Vec { // Check if we have head/tail options that would affect processing - // These options come from the base plugin's options let head_bytes = self.base.options.get("head_bytes") .and_then(|v| v.as_u64()) .map(|v| v as usize); @@ -442,7 +432,7 @@ impl MetaPlugin for TextMetaPlugin { let tail_lines = self.base.options.get("tail_lines") .and_then(|v| v.as_u64()) .map(|v| v as usize); - + // Build filter string from individual parameters let mut filter_parts = Vec::new(); if let Some(bytes) = head_bytes { @@ -457,27 +447,43 @@ impl MetaPlugin for TextMetaPlugin { if let Some(lines) = tail_lines { filter_parts.push(format!("tail_lines({})", lines)); } - + // Use the filter service to process data - let processed_data = if !filter_parts.is_empty() { + if !filter_parts.is_empty() { let filter_str = filter_parts.join(" | "); let filter_service = crate::services::filter_service::FilterService::new(); - let mut filter_chain = filter_service.create_filter_chain(Some(&filter_str)) - .map_err(|e| { + let mut filter_chain = match filter_service.create_filter_chain(Some(&filter_str)) { + Ok(chain) => chain, + Err(e) => { log::error!("Failed to create filter chain: {}", e); - data.to_vec() - }) - .unwrap_or_else(|_| data.to_vec()); - + return data.to_vec(); + } + }; + // Process the data through the filter chain - filter_service.process_data(&mut filter_chain, data) - .unwrap_or_else(|e| { + match filter_service.process_data(&mut filter_chain, data) { + Ok(processed) => processed, + Err(e) => { log::error!("Failed to process data through filter: {}", e); data.to_vec() - }) + } + } } else { data.to_vec() - }; + } + } + + fn update(&mut self, data: &[u8]) -> MetaPluginResponse { + // If already finalized, don't process more data + if self.is_finalized { + return MetaPluginResponse { + metadata: Vec::new(), + is_finalized: true, + }; + } + + let mut metadata = Vec::new(); + let processed_data = self.create_filter_and_process_data(data); // If we haven't determined if content is binary yet, build buffer and check if self.is_binary_content.is_none() { diff --git a/src/services/async_item_service.rs b/src/services/async_item_service.rs index 0abe162..a6dbc5d 100644 --- a/src/services/async_item_service.rs +++ b/src/services/async_item_service.rs @@ -361,15 +361,9 @@ impl AsyncItemService { tags: Vec, meta: HashMap, ) -> Result { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - item_service.find_item(&conn, &ids, &tags, &meta) - }) - .await - .unwrap() + self.execute_blocking(|conn, item_service| { + item_service.find_item(conn, &ids, &tags, &meta) + }).await } pub async fn list_items( @@ -377,27 +371,15 @@ impl AsyncItemService { tags: Vec, meta: HashMap, ) -> Result, CoreError> { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - item_service.list_items(&conn, &tags, &meta) - }) - .await - .unwrap() + self.execute_blocking(|conn, item_service| { + item_service.list_items(conn, &tags, &meta) + }).await } pub async fn delete_item(&self, id: i64) -> Result<(), CoreError> { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let mut conn = db.blocking_lock(); - item_service.delete_item(&mut conn, id) - }) - .await - .unwrap() + self.execute_blocking_mut(|conn, item_service| { + item_service.delete_item(conn, id) + }).await } pub async fn save_item_from_mcp( diff --git a/src/services/item_service.rs b/src/services/item_service.rs index 2d0b03c..ff37ea1 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -7,6 +7,7 @@ use crate::services::types::{ItemWithContent, ItemWithMeta}; use crate::db::{self, Meta}; use crate::compression_engine::{get_compression_engine, CompressionType}; use crate::modes::common::settings_compression_type; +use crate::filter_plugin::FilterChain; use clap::Command; use log::debug; use ringbuf::HeapRb; @@ -16,6 +17,47 @@ use std::fs; use std::io::{IsTerminal, Read, Write}; use std::path::PathBuf; +/// A reader that applies a filter chain to the data as it's read +struct FilteringReader { + reader: R, + filter_chain: Option, +} + +impl FilteringReader { + pub fn new(reader: R, filter_chain: Option) -> Self { + Self { reader, filter_chain } + } +} + +impl Read for FilteringReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // Read from the original reader + let mut temp_buf = vec![0; buf.len()]; + let bytes_read = self.reader.read(&mut temp_buf)?; + + if bytes_read == 0 { + return Ok(0); + } + + // Process through the filter chain if it exists + if let Some(chain) = &mut self.filter_chain { + match chain.process(&temp_buf[..bytes_read]) { + Ok(filtered_data) => { + let filtered_len = filtered_data.len(); + if filtered_len > 0 { + buf[..std::cmp::min(filtered_len, buf.len())].copy_from_slice(&filtered_data[..std::cmp::min(filtered_len, buf.len())]); + } + Ok(filtered_len) + } + Err(e) => Err(e), + } + } else { + buf[..bytes_read].copy_from_slice(&temp_buf[..bytes_read]); + Ok(bytes_read) + } + } +} + pub struct ItemService { data_path: PathBuf, compression_service: CompressionService, @@ -94,6 +136,70 @@ impl ItemService { Ok((content, mime_type, is_binary)) } + /// Helper method to create a filter chain from parameters + fn create_filter_chain( + &self, + grep: Option, + head_bytes: Option, + head_lines: Option, + tail_bytes: Option, + tail_lines: Option, + filter: Option, + ) -> Result, CoreError> { + // Build filter string from individual parameters (for backward compatibility) + let mut filter_parts = Vec::new(); + if let Some(pattern) = grep { + filter_parts.push(format!("grep('{}')", pattern)); + } + if let Some(bytes) = head_bytes { + filter_parts.push(format!("head_bytes({})", bytes)); + } + if let Some(lines) = head_lines { + filter_parts.push(format!("head_lines({})", lines)); + } + if let Some(bytes) = tail_bytes { + filter_parts.push(format!("tail_bytes({})", bytes)); + } + if let Some(lines) = tail_lines { + filter_parts.push(format!("tail_lines({})", lines)); + } + + // Use the provided filter string if available, otherwise build from parts + let filter_str = filter.or_else(|| { + if filter_parts.is_empty() { + None + } else { + Some(filter_parts.join(" | ")) + } + }); + + // Create filter chain + let filter_service = crate::services::filter_service::FilterService::new(); + filter_service.create_filter_chain(filter_str.as_deref()) + } + + /// Helper method to determine if content is binary + fn is_content_binary( + &self, + item_path: PathBuf, + compression: &str, + metadata: &HashMap, + ) -> Result { + // Check if we already have text metadata + if let Some(text_val) = metadata.get("text") { + return Ok(text_val == "false"); + } + + // Read only the first 8192 bytes for binary detection + let mut sample_reader = self.compression_service.stream_item_content( + item_path, + compression + )?; + let mut sample_buffer = vec![0; 8192]; + let bytes_read = sample_reader.read(&mut sample_buffer)?; + Ok(crate::common::is_binary::is_binary(&sample_buffer[..bytes_read])) + } + pub fn get_item_content_info_streaming( &self, conn: &Connection, @@ -119,40 +225,16 @@ impl ItemService { let mut item_path = self.data_path.clone(); item_path.push(item_id.to_string()); - let reader = self.compression_service.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?; - - // Build filter string from individual parameters (for backward compatibility) - let mut filter_parts = Vec::new(); - if let Some(pattern) = grep { - filter_parts.push(format!("grep('{}')", pattern)); - } - if let Some(bytes) = head_bytes { - filter_parts.push(format!("head_bytes({})", bytes)); - } - if let Some(lines) = head_lines { - filter_parts.push(format!("head_lines({})", lines)); - } - if let Some(bytes) = tail_bytes { - filter_parts.push(format!("tail_bytes({})", bytes)); - } - if let Some(lines) = tail_lines { - filter_parts.push(format!("tail_lines({})", lines)); - } - // Add other filters as needed - - // Use the provided filter string if available, otherwise build from parts - let filter_str = filter.or_else(|| { - if filter_parts.is_empty() { - None - } else { - Some(filter_parts.join(" | ")) - } - }); - + let reader = self.compression_service.stream_item_content( + item_path.clone(), + &item_with_meta.item.compression + )?; + // Create filter chain - let filter_service = crate::services::filter_service::FilterService::new(); - let mut filter_chain = filter_service.create_filter_chain(filter_str.as_deref())?; - + let filter_chain = self.create_filter_chain( + grep, head_bytes, head_lines, tail_bytes, tail_lines, filter + )?; + // Wrap the reader with filtering let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain)); @@ -162,19 +244,12 @@ impl ItemService { .map(|s| s.to_string()) .unwrap_or_else(|| "application/octet-stream".to_string()); - // Check if content is binary using only the first 8192 bytes - let is_binary = if let Some(text_val) = metadata.get("text") { - text_val == "false" - } else { - // Read only the first 8192 bytes for binary detection - let mut sample_reader = self.compression_service.stream_item_content( - item_path, - &item_with_meta.item.compression - )?; - let mut sample_buffer = vec![0; 8192]; - let bytes_read = sample_reader.read(&mut sample_buffer)?; - crate::common::is_binary::is_binary(&sample_buffer[..bytes_read]) - }; + // Check if content is binary + let is_binary = self.is_content_binary( + item_path, + &item_with_meta.item.compression, + &metadata + )?; Ok((filtered_reader, mime_type, is_binary)) }