refactor: reduce code duplication in filter and item services

Co-authored-by: aider (openai/andrew/openrouter/mistralai/mistral-medium-3.1) <aider@aider.chat>
This commit is contained in:
Andrew Phillips
2025-08-28 20:51:39 -03:00
parent 5542f5592a
commit 4c8466bb21
5 changed files with 268 additions and 151 deletions

View File

@@ -6,6 +6,7 @@ pub mod head;
pub mod tail; pub mod tail;
pub mod grep; pub mod grep;
pub mod skip; pub mod skip;
pub mod utils;
pub trait FilterPlugin: Send { pub trait FilterPlugin: Send {
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>>; fn process(&mut self, data: &[u8]) -> Result<Vec<u8>>;
@@ -31,58 +32,77 @@ impl FilterChain {
let mut current_data = data.to_vec(); let mut current_data = data.to_vec();
for plugin in &mut self.plugins { for plugin in &mut self.plugins {
current_data = plugin.process(&current_data)?; current_data = plugin.process(&current_data)?;
// Early exit if no data remains
if current_data.is_empty() {
break;
}
} }
Ok(current_data) Ok(current_data)
} }
pub fn finish(&mut self) -> Result<Vec<u8>> { pub fn finish(&mut self) -> Result<Vec<u8>> {
let mut current_data = Vec::new(); let mut result = Vec::new();
let mut all_data = Vec::new();
for plugin in &mut self.plugins { for plugin in &mut self.plugins {
let processed = plugin.finish()?; let processed = plugin.finish()?;
if !processed.is_empty() { if !processed.is_empty() {
current_data = processed; all_data.extend(processed);
} }
} }
Ok(current_data)
// If we have any data from finish, use it
if !all_data.is_empty() {
result = all_data;
}
Ok(result)
} }
} }
// Helper function to parse filter string and create appropriate plugins // Helper function to parse filter string and create appropriate plugins
pub fn parse_filter_string(filter_str: &str) -> Result<FilterChain> { pub fn parse_filter_string(filter_str: &str) -> Result<FilterChain> {
let mut chain = FilterChain::new(); let mut chain = FilterChain::new();
for part in filter_str.split('|') { for part in filter_str.split('|') {
let part = part.trim(); let part = part.trim();
if part.is_empty() { if part.is_empty() {
continue; continue;
} }
// Define a macro to reduce duplication in filter parsing
macro_rules! parse_filter {
($prefix:expr, $suffix:expr, $constructor:expr) => {{
if let Some(stripped) = part.strip_prefix($prefix).and_then(|s| s.strip_suffix($suffix)) {
let count = utils::parse_number(stripped)?;
chain.add_plugin($constructor(count));
continue;
}
}};
}
// Handle grep filter
if let Some(stripped) = part.strip_prefix("grep(").and_then(|s| s.strip_suffix(')')) { if let Some(stripped) = part.strip_prefix("grep(").and_then(|s| s.strip_suffix(')')) {
// Remove quotes if present // Remove quotes if present
let pattern = stripped.trim_matches(|c| c == '\'' || c == '"'); let pattern = stripped.trim_matches(|c| c == '\'' || c == '"');
chain.add_plugin(Box::new(grep::GrepFilter::new(pattern.to_string())?)); chain.add_plugin(Box::new(grep::GrepFilter::new(pattern.to_string())?));
} else if let Some(stripped) = part.strip_prefix("head_bytes(").and_then(|s| s.strip_suffix(')')) { continue;
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
chain.add_plugin(Box::new(head::HeadBytesFilter::new(count)));
} else if let Some(stripped) = part.strip_prefix("head_lines(").and_then(|s| s.strip_suffix(')')) {
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
chain.add_plugin(Box::new(head::HeadLinesFilter::new(count)));
} else if let Some(stripped) = part.strip_prefix("tail_bytes(").and_then(|s| s.strip_suffix(')')) {
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
chain.add_plugin(Box::new(tail::TailBytesFilter::new(count)?));
} else if let Some(stripped) = part.strip_prefix("tail_lines(").and_then(|s| s.strip_suffix(')')) {
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
chain.add_plugin(Box::new(tail::TailLinesFilter::new(count)?));
} else if let Some(stripped) = part.strip_prefix("skip_bytes(").and_then(|s| s.strip_suffix(')')) {
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
chain.add_plugin(Box::new(skip::SkipBytesFilter::new(count)));
} else if let Some(stripped) = part.strip_prefix("skip_lines(").and_then(|s| s.strip_suffix(')')) {
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
chain.add_plugin(Box::new(skip::SkipLinesFilter::new(count)));
} else {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("Unknown filter: {}", part)));
} }
// Handle other filters using the macro
parse_filter!("head_bytes(", ")", |count| Box::new(head::HeadBytesFilter::new(count)));
parse_filter!("head_lines(", ")", |count| Box::new(head::HeadLinesFilter::new(count)));
parse_filter!("tail_bytes(", ")", |count| Box::new(tail::TailBytesFilter::new(count)));
parse_filter!("tail_lines(", ")", |count| Box::new(tail::TailLinesFilter::new(count)));
parse_filter!("skip_bytes(", ")", |count| Box::new(skip::SkipBytesFilter::new(count)));
parse_filter!("skip_lines(", ")", |count| Box::new(skip::SkipLinesFilter::new(count)));
// If we get here, the filter wasn't recognized
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Unknown filter: {}", part)
));
} }
Ok(chain) Ok(chain)
} }

View File

@@ -0,0 +1,34 @@
use std::io::Result;
/// Helper trait for common filter operations
pub trait FilterUtils {
/// Process data through a filter, handling empty results
fn process_data(&mut self, data: &[u8]) -> Result<Vec<u8>>;
/// Process data and check if we should continue processing
fn process_and_check_continue(&mut self, data: &[u8]) -> Result<(Vec<u8>, bool)>;
}
impl<T: FilterPlugin> FilterUtils for T {
fn process_data(&mut self, data: &[u8]) -> Result<Vec<u8>> {
let result = self.process(data)?;
Ok(result)
}
fn process_and_check_continue(&mut self, data: &[u8]) -> Result<(Vec<u8>, bool)> {
let result = self.process(data)?;
let should_continue = !result.is_empty();
Ok((result, should_continue))
}
}
/// Helper function to create a filter chain from a string
pub fn create_filter_chain(filter_str: &str) -> Result<Option<super::FilterChain>> {
super::parse_filter_string(filter_str).map(Some)
}
/// Helper function to parse a number from a string with error handling
pub fn parse_number<T: std::str::FromStr>(s: &str) -> Result<T> {
s.parse::<T>()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
}

View File

@@ -367,42 +367,42 @@ impl TextMetaPlugin {
/// Helper method to output word and line counts /// Helper method to output word and line counts
fn output_word_line_counts(&mut self) -> Vec<crate::meta_plugin::MetaData> { fn output_word_line_counts(&mut self) -> Vec<crate::meta_plugin::MetaData> {
let mut metadata = Vec::new();
// Process any remaining data in utf8_buffer // Process any remaining data in utf8_buffer
self.process_remaining_utf8_buffer(); self.process_remaining_utf8_buffer();
// Handle the last line if tracking line lengths // Handle the last line if tracking line lengths
self.handle_last_line_for_length_tracking(); self.handle_last_line_for_length_tracking();
// Output word count if tracked // Collect all metadata outputs
if let Some(meta_data) = self.output_word_count_metadata() { let mut metadata = Vec::new();
metadata.push(meta_data);
// Add metadata outputs using a more concise approach
let outputs_to_check = vec![
(self.output_word_count_metadata(), "word count"),
(self.output_line_count_metadata(), "line count"),
];
for (output, _) in outputs_to_check {
if let Some(meta_data) = output {
metadata.push(meta_data);
}
} }
// Output line count if tracked
if let Some(meta_data) = self.output_line_count_metadata() {
metadata.push(meta_data);
}
// Output line length statistics if tracked // Output line length statistics if tracked
if self.track_line_lengths && self.line_count_for_stats > 0 { if self.track_line_lengths && self.line_count_for_stats > 0 {
// Calculate and output max line length if enabled let line_stats_outputs = vec![
if let Some(meta_data) = self.output_max_line_length_metadata() { (self.output_max_line_length_metadata(), "max line length"),
metadata.push(meta_data); (self.output_mean_line_length_metadata(), "mean line length"),
} (self.output_median_line_length_metadata(), "median line length"),
];
// Calculate and output mean line length if enabled
if let Some(meta_data) = self.output_mean_line_length_metadata() { for (output, _) in line_stats_outputs {
metadata.push(meta_data); if let Some(meta_data) = output {
} metadata.push(meta_data);
}
// Calculate and output median line length if enabled
if let Some(meta_data) = self.output_median_line_length_metadata() {
metadata.push(meta_data);
} }
} }
metadata metadata
} }
} }
@@ -417,19 +417,9 @@ impl MetaPlugin for TextMetaPlugin {
} }
fn update(&mut self, data: &[u8]) -> MetaPluginResponse { /// Helper method to create a filter chain and process data
// If already finalized, don't process more data fn create_filter_and_process_data(&self, data: &[u8]) -> Vec<u8> {
if self.is_finalized {
return MetaPluginResponse {
metadata: Vec::new(),
is_finalized: true,
};
}
let mut metadata = Vec::new();
// Check if we have head/tail options that would affect processing // Check if we have head/tail options that would affect processing
// These options come from the base plugin's options
let head_bytes = self.base.options.get("head_bytes") let head_bytes = self.base.options.get("head_bytes")
.and_then(|v| v.as_u64()) .and_then(|v| v.as_u64())
.map(|v| v as usize); .map(|v| v as usize);
@@ -442,7 +432,7 @@ impl MetaPlugin for TextMetaPlugin {
let tail_lines = self.base.options.get("tail_lines") let tail_lines = self.base.options.get("tail_lines")
.and_then(|v| v.as_u64()) .and_then(|v| v.as_u64())
.map(|v| v as usize); .map(|v| v as usize);
// Build filter string from individual parameters // Build filter string from individual parameters
let mut filter_parts = Vec::new(); let mut filter_parts = Vec::new();
if let Some(bytes) = head_bytes { if let Some(bytes) = head_bytes {
@@ -457,27 +447,43 @@ impl MetaPlugin for TextMetaPlugin {
if let Some(lines) = tail_lines { if let Some(lines) = tail_lines {
filter_parts.push(format!("tail_lines({})", lines)); filter_parts.push(format!("tail_lines({})", lines));
} }
// Use the filter service to process data // Use the filter service to process data
let processed_data = if !filter_parts.is_empty() { if !filter_parts.is_empty() {
let filter_str = filter_parts.join(" | "); let filter_str = filter_parts.join(" | ");
let filter_service = crate::services::filter_service::FilterService::new(); let filter_service = crate::services::filter_service::FilterService::new();
let mut filter_chain = filter_service.create_filter_chain(Some(&filter_str)) let mut filter_chain = match filter_service.create_filter_chain(Some(&filter_str)) {
.map_err(|e| { Ok(chain) => chain,
Err(e) => {
log::error!("Failed to create filter chain: {}", e); log::error!("Failed to create filter chain: {}", e);
data.to_vec() return data.to_vec();
}) }
.unwrap_or_else(|_| data.to_vec()); };
// Process the data through the filter chain // Process the data through the filter chain
filter_service.process_data(&mut filter_chain, data) match filter_service.process_data(&mut filter_chain, data) {
.unwrap_or_else(|e| { Ok(processed) => processed,
Err(e) => {
log::error!("Failed to process data through filter: {}", e); log::error!("Failed to process data through filter: {}", e);
data.to_vec() data.to_vec()
}) }
}
} else { } else {
data.to_vec() data.to_vec()
}; }
}
fn update(&mut self, data: &[u8]) -> MetaPluginResponse {
// If already finalized, don't process more data
if self.is_finalized {
return MetaPluginResponse {
metadata: Vec::new(),
is_finalized: true,
};
}
let mut metadata = Vec::new();
let processed_data = self.create_filter_and_process_data(data);
// If we haven't determined if content is binary yet, build buffer and check // If we haven't determined if content is binary yet, build buffer and check
if self.is_binary_content.is_none() { if self.is_binary_content.is_none() {

View File

@@ -361,15 +361,9 @@ impl AsyncItemService {
tags: Vec<String>, tags: Vec<String>,
meta: HashMap<String, String>, meta: HashMap<String, String>,
) -> Result<ItemWithMeta, CoreError> { ) -> Result<ItemWithMeta, CoreError> {
let db = self.db.clone(); self.execute_blocking(|conn, item_service| {
let item_service = self.item_service.clone(); item_service.find_item(conn, &ids, &tags, &meta)
}).await
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
item_service.find_item(&conn, &ids, &tags, &meta)
})
.await
.unwrap()
} }
pub async fn list_items( pub async fn list_items(
@@ -377,27 +371,15 @@ impl AsyncItemService {
tags: Vec<String>, tags: Vec<String>,
meta: HashMap<String, String>, meta: HashMap<String, String>,
) -> Result<Vec<ItemWithMeta>, CoreError> { ) -> Result<Vec<ItemWithMeta>, CoreError> {
let db = self.db.clone(); self.execute_blocking(|conn, item_service| {
let item_service = self.item_service.clone(); item_service.list_items(conn, &tags, &meta)
}).await
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
item_service.list_items(&conn, &tags, &meta)
})
.await
.unwrap()
} }
pub async fn delete_item(&self, id: i64) -> Result<(), CoreError> { pub async fn delete_item(&self, id: i64) -> Result<(), CoreError> {
let db = self.db.clone(); self.execute_blocking_mut(|conn, item_service| {
let item_service = self.item_service.clone(); item_service.delete_item(conn, id)
}).await
tokio::task::spawn_blocking(move || {
let mut conn = db.blocking_lock();
item_service.delete_item(&mut conn, id)
})
.await
.unwrap()
} }
pub async fn save_item_from_mcp( pub async fn save_item_from_mcp(

View File

@@ -7,6 +7,7 @@ use crate::services::types::{ItemWithContent, ItemWithMeta};
use crate::db::{self, Meta}; use crate::db::{self, Meta};
use crate::compression_engine::{get_compression_engine, CompressionType}; use crate::compression_engine::{get_compression_engine, CompressionType};
use crate::modes::common::settings_compression_type; use crate::modes::common::settings_compression_type;
use crate::filter_plugin::FilterChain;
use clap::Command; use clap::Command;
use log::debug; use log::debug;
use ringbuf::HeapRb; use ringbuf::HeapRb;
@@ -16,6 +17,47 @@ use std::fs;
use std::io::{IsTerminal, Read, Write}; use std::io::{IsTerminal, Read, Write};
use std::path::PathBuf; use std::path::PathBuf;
/// A reader that applies a filter chain to the data as it's read
struct FilteringReader<R: Read> {
reader: R,
filter_chain: Option<FilterChain>,
}
impl<R: Read> FilteringReader<R> {
pub fn new(reader: R, filter_chain: Option<FilterChain>) -> Self {
Self { reader, filter_chain }
}
}
impl<R: Read> Read for FilteringReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// Read from the original reader
let mut temp_buf = vec![0; buf.len()];
let bytes_read = self.reader.read(&mut temp_buf)?;
if bytes_read == 0 {
return Ok(0);
}
// Process through the filter chain if it exists
if let Some(chain) = &mut self.filter_chain {
match chain.process(&temp_buf[..bytes_read]) {
Ok(filtered_data) => {
let filtered_len = filtered_data.len();
if filtered_len > 0 {
buf[..std::cmp::min(filtered_len, buf.len())].copy_from_slice(&filtered_data[..std::cmp::min(filtered_len, buf.len())]);
}
Ok(filtered_len)
}
Err(e) => Err(e),
}
} else {
buf[..bytes_read].copy_from_slice(&temp_buf[..bytes_read]);
Ok(bytes_read)
}
}
}
pub struct ItemService { pub struct ItemService {
data_path: PathBuf, data_path: PathBuf,
compression_service: CompressionService, compression_service: CompressionService,
@@ -94,6 +136,70 @@ impl ItemService {
Ok((content, mime_type, is_binary)) Ok((content, mime_type, is_binary))
} }
/// Helper method to create a filter chain from parameters
fn create_filter_chain(
&self,
grep: Option<String>,
head_bytes: Option<usize>,
head_lines: Option<usize>,
tail_bytes: Option<usize>,
tail_lines: Option<usize>,
filter: Option<String>,
) -> Result<Option<FilterChain>, CoreError> {
// Build filter string from individual parameters (for backward compatibility)
let mut filter_parts = Vec::new();
if let Some(pattern) = grep {
filter_parts.push(format!("grep('{}')", pattern));
}
if let Some(bytes) = head_bytes {
filter_parts.push(format!("head_bytes({})", bytes));
}
if let Some(lines) = head_lines {
filter_parts.push(format!("head_lines({})", lines));
}
if let Some(bytes) = tail_bytes {
filter_parts.push(format!("tail_bytes({})", bytes));
}
if let Some(lines) = tail_lines {
filter_parts.push(format!("tail_lines({})", lines));
}
// Use the provided filter string if available, otherwise build from parts
let filter_str = filter.or_else(|| {
if filter_parts.is_empty() {
None
} else {
Some(filter_parts.join(" | "))
}
});
// Create filter chain
let filter_service = crate::services::filter_service::FilterService::new();
filter_service.create_filter_chain(filter_str.as_deref())
}
/// Helper method to determine if content is binary
fn is_content_binary(
&self,
item_path: PathBuf,
compression: &str,
metadata: &HashMap<String, String>,
) -> Result<bool, CoreError> {
// Check if we already have text metadata
if let Some(text_val) = metadata.get("text") {
return Ok(text_val == "false");
}
// Read only the first 8192 bytes for binary detection
let mut sample_reader = self.compression_service.stream_item_content(
item_path,
compression
)?;
let mut sample_buffer = vec![0; 8192];
let bytes_read = sample_reader.read(&mut sample_buffer)?;
Ok(crate::common::is_binary::is_binary(&sample_buffer[..bytes_read]))
}
pub fn get_item_content_info_streaming( pub fn get_item_content_info_streaming(
&self, &self,
conn: &Connection, conn: &Connection,
@@ -119,40 +225,16 @@ impl ItemService {
let mut item_path = self.data_path.clone(); let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string()); item_path.push(item_id.to_string());
let reader = self.compression_service.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?; let reader = self.compression_service.stream_item_content(
item_path.clone(),
// Build filter string from individual parameters (for backward compatibility) &item_with_meta.item.compression
let mut filter_parts = Vec::new(); )?;
if let Some(pattern) = grep {
filter_parts.push(format!("grep('{}')", pattern));
}
if let Some(bytes) = head_bytes {
filter_parts.push(format!("head_bytes({})", bytes));
}
if let Some(lines) = head_lines {
filter_parts.push(format!("head_lines({})", lines));
}
if let Some(bytes) = tail_bytes {
filter_parts.push(format!("tail_bytes({})", bytes));
}
if let Some(lines) = tail_lines {
filter_parts.push(format!("tail_lines({})", lines));
}
// Add other filters as needed
// Use the provided filter string if available, otherwise build from parts
let filter_str = filter.or_else(|| {
if filter_parts.is_empty() {
None
} else {
Some(filter_parts.join(" | "))
}
});
// Create filter chain // Create filter chain
let filter_service = crate::services::filter_service::FilterService::new(); let filter_chain = self.create_filter_chain(
let mut filter_chain = filter_service.create_filter_chain(filter_str.as_deref())?; grep, head_bytes, head_lines, tail_bytes, tail_lines, filter
)?;
// Wrap the reader with filtering // Wrap the reader with filtering
let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain)); let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain));
@@ -162,19 +244,12 @@ impl ItemService {
.map(|s| s.to_string()) .map(|s| s.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string()); .unwrap_or_else(|| "application/octet-stream".to_string());
// Check if content is binary using only the first 8192 bytes // Check if content is binary
let is_binary = if let Some(text_val) = metadata.get("text") { let is_binary = self.is_content_binary(
text_val == "false" item_path,
} else { &item_with_meta.item.compression,
// Read only the first 8192 bytes for binary detection &metadata
let mut sample_reader = self.compression_service.stream_item_content( )?;
item_path,
&item_with_meta.item.compression
)?;
let mut sample_buffer = vec![0; 8192];
let bytes_read = sample_reader.read(&mut sample_buffer)?;
crate::common::is_binary::is_binary(&sample_buffer[..bytes_read])
};
Ok((filtered_reader, mime_type, is_binary)) Ok((filtered_reader, mime_type, is_binary))
} }