feat: add filter plugin system with chained filters
Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) <aider@aider.chat>
This commit is contained in:
@@ -0,0 +1,65 @@
|
|||||||
|
use super::FilterPlugin;
|
||||||
|
use std::io::Result;
|
||||||
|
use regex::Regex;
|
||||||
|
|
||||||
|
pub struct GrepFilter {
|
||||||
|
regex: Regex,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GrepFilter {
|
||||||
|
pub fn new(pattern: String) -> Result<Self> {
|
||||||
|
let regex = Regex::new(&pattern)
|
||||||
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
|
||||||
|
Ok(Self {
|
||||||
|
regex,
|
||||||
|
buffer: Vec::new(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FilterPlugin for GrepFilter {
|
||||||
|
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
self.buffer.extend_from_slice(data);
|
||||||
|
|
||||||
|
let mut result = Vec::new();
|
||||||
|
let mut lines = Vec::new();
|
||||||
|
let mut start = 0;
|
||||||
|
|
||||||
|
// Split into lines
|
||||||
|
for (i, &byte) in self.buffer.iter().enumerate() {
|
||||||
|
if byte == b'\n' {
|
||||||
|
lines.push(&self.buffer[start..=i]);
|
||||||
|
start = i + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep the remaining data in buffer
|
||||||
|
self.buffer.drain(0..start);
|
||||||
|
|
||||||
|
// Filter lines that match the regex
|
||||||
|
for line in lines {
|
||||||
|
if let Ok(line_str) = std::str::from_utf8(line) {
|
||||||
|
if self.regex.is_match(line_str) {
|
||||||
|
result.extend_from_slice(line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish(&mut self) -> Result<Vec<u8>> {
|
||||||
|
// Process any remaining data in buffer
|
||||||
|
let mut result = Vec::new();
|
||||||
|
if !self.buffer.is_empty() {
|
||||||
|
if let Ok(line_str) = std::str::from_utf8(&self.buffer) {
|
||||||
|
if self.regex.is_match(line_str) {
|
||||||
|
result.extend_from_slice(&self.buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.buffer.clear();
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,86 @@
|
|||||||
|
use super::FilterPlugin;
|
||||||
|
use std::io::Result;
|
||||||
|
|
||||||
|
pub struct HeadBytesFilter {
|
||||||
|
remaining: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HeadBytesFilter {
|
||||||
|
pub fn new(count: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
remaining: count,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FilterPlugin for HeadBytesFilter {
|
||||||
|
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
if self.remaining == 0 {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
let bytes_to_take = std::cmp::min(data.len(), self.remaining);
|
||||||
|
self.remaining -= bytes_to_take;
|
||||||
|
Ok(data[..bytes_to_take].to_vec())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish(&mut self) -> Result<Vec<u8>> {
|
||||||
|
Ok(Vec::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct HeadLinesFilter {
|
||||||
|
remaining: usize,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HeadLinesFilter {
|
||||||
|
pub fn new(count: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
remaining: count,
|
||||||
|
buffer: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FilterPlugin for HeadLinesFilter {
|
||||||
|
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
if self.remaining == 0 {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut result = Vec::new();
|
||||||
|
let mut start = 0;
|
||||||
|
|
||||||
|
for (i, &byte) in data.iter().enumerate() {
|
||||||
|
if byte == b'\n' {
|
||||||
|
self.buffer.extend_from_slice(&data[start..=i]);
|
||||||
|
result.extend_from_slice(&self.buffer);
|
||||||
|
self.buffer.clear();
|
||||||
|
start = i + 1;
|
||||||
|
|
||||||
|
self.remaining -= 1;
|
||||||
|
if self.remaining == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add remaining data to buffer
|
||||||
|
if start < data.len() {
|
||||||
|
self.buffer.extend_from_slice(&data[start..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish(&mut self) -> Result<Vec<u8>> {
|
||||||
|
if self.remaining > 0 && !self.buffer.is_empty() {
|
||||||
|
let result = self.buffer.clone();
|
||||||
|
self.buffer.clear();
|
||||||
|
Ok(result)
|
||||||
|
} else {
|
||||||
|
Ok(Vec::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,88 @@
|
|||||||
|
use std::io::{Read, Result};
|
||||||
|
use regex::Regex;
|
||||||
|
use ringbuf::HeapRb;
|
||||||
|
|
||||||
|
pub mod head;
|
||||||
|
pub mod tail;
|
||||||
|
pub mod grep;
|
||||||
|
pub mod skip;
|
||||||
|
|
||||||
|
pub trait FilterPlugin: Send {
|
||||||
|
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>>;
|
||||||
|
fn finish(&mut self) -> Result<Vec<u8>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FilterChain {
|
||||||
|
plugins: Vec<Box<dyn FilterPlugin>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FilterChain {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
plugins: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_plugin(&mut self, plugin: Box<dyn FilterPlugin>) {
|
||||||
|
self.plugins.push(plugin);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
let mut current_data = data.to_vec();
|
||||||
|
for plugin in &mut self.plugins {
|
||||||
|
current_data = plugin.process(¤t_data)?;
|
||||||
|
}
|
||||||
|
Ok(current_data)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finish(&mut self) -> Result<Vec<u8>> {
|
||||||
|
let mut current_data = Vec::new();
|
||||||
|
for plugin in &mut self.plugins {
|
||||||
|
let processed = plugin.finish()?;
|
||||||
|
if !processed.is_empty() {
|
||||||
|
current_data = processed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(current_data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to parse filter string and create appropriate plugins
|
||||||
|
pub fn parse_filter_string(filter_str: &str) -> Result<FilterChain> {
|
||||||
|
let mut chain = FilterChain::new();
|
||||||
|
|
||||||
|
for part in filter_str.split('|') {
|
||||||
|
let part = part.trim();
|
||||||
|
if part.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(stripped) = part.strip_prefix("grep(").and_then(|s| s.strip_suffix(')')) {
|
||||||
|
// Remove quotes if present
|
||||||
|
let pattern = stripped.trim_matches(|c| c == '\'' || c == '"');
|
||||||
|
chain.add_plugin(Box::new(grep::GrepFilter::new(pattern.to_string())?));
|
||||||
|
} else if let Some(stripped) = part.strip_prefix("head_bytes(").and_then(|s| s.strip_suffix(')')) {
|
||||||
|
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
|
||||||
|
chain.add_plugin(Box::new(head::HeadBytesFilter::new(count)));
|
||||||
|
} else if let Some(stripped) = part.strip_prefix("head_lines(").and_then(|s| s.strip_suffix(')')) {
|
||||||
|
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
|
||||||
|
chain.add_plugin(Box::new(head::HeadLinesFilter::new(count)));
|
||||||
|
} else if let Some(stripped) = part.strip_prefix("tail_bytes(").and_then(|s| s.strip_suffix(')')) {
|
||||||
|
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
|
||||||
|
chain.add_plugin(Box::new(tail::TailBytesFilter::new(count)?));
|
||||||
|
} else if let Some(stripped) = part.strip_prefix("tail_lines(").and_then(|s| s.strip_suffix(')')) {
|
||||||
|
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
|
||||||
|
chain.add_plugin(Box::new(tail::TailLinesFilter::new(count)?));
|
||||||
|
} else if let Some(stripped) = part.strip_prefix("skip_bytes(").and_then(|s| s.strip_suffix(')')) {
|
||||||
|
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
|
||||||
|
chain.add_plugin(Box::new(skip::SkipBytesFilter::new(count)));
|
||||||
|
} else if let Some(stripped) = part.strip_prefix("skip_lines(").and_then(|s| s.strip_suffix(')')) {
|
||||||
|
let count: usize = stripped.parse().map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
|
||||||
|
chain.add_plugin(Box::new(skip::SkipLinesFilter::new(count)));
|
||||||
|
} else {
|
||||||
|
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("Unknown filter: {}", part)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(chain)
|
||||||
|
}
|
||||||
|
|||||||
98
src/filter_plugin/skip.rs
Normal file
98
src/filter_plugin/skip.rs
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
use super::FilterPlugin;
|
||||||
|
use std::io::Result;
|
||||||
|
|
||||||
|
pub struct SkipBytesFilter {
|
||||||
|
remaining: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SkipBytesFilter {
|
||||||
|
pub fn new(count: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
remaining: count,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FilterPlugin for SkipBytesFilter {
|
||||||
|
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
if self.remaining == 0 {
|
||||||
|
return Ok(data.to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
if data.len() <= self.remaining {
|
||||||
|
self.remaining -= data.len();
|
||||||
|
Ok(Vec::new())
|
||||||
|
} else {
|
||||||
|
let result = data[self.remaining..].to_vec();
|
||||||
|
self.remaining = 0;
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish(&mut self) -> Result<Vec<u8>> {
|
||||||
|
Ok(Vec::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SkipLinesFilter {
|
||||||
|
remaining: usize,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SkipLinesFilter {
|
||||||
|
pub fn new(count: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
remaining: count,
|
||||||
|
buffer: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FilterPlugin for SkipLinesFilter {
|
||||||
|
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
if self.remaining == 0 {
|
||||||
|
let mut result = self.buffer.clone();
|
||||||
|
result.extend_from_slice(data);
|
||||||
|
self.buffer.clear();
|
||||||
|
return Ok(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut result = Vec::new();
|
||||||
|
let mut start = 0;
|
||||||
|
|
||||||
|
for (i, &byte) in data.iter().enumerate() {
|
||||||
|
if byte == b'\n' {
|
||||||
|
if self.remaining > 0 {
|
||||||
|
self.remaining -= 1;
|
||||||
|
start = i + 1;
|
||||||
|
} else {
|
||||||
|
self.buffer.extend_from_slice(&data[start..=i]);
|
||||||
|
result.extend_from_slice(&self.buffer);
|
||||||
|
self.buffer.clear();
|
||||||
|
start = i + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add remaining data to buffer
|
||||||
|
if start < data.len() {
|
||||||
|
if self.remaining == 0 {
|
||||||
|
result.extend_from_slice(&data[start..]);
|
||||||
|
} else {
|
||||||
|
self.buffer.extend_from_slice(&data[start..]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish(&mut self) -> Result<Vec<u8>> {
|
||||||
|
if self.remaining == 0 {
|
||||||
|
let result = self.buffer.clone();
|
||||||
|
self.buffer.clear();
|
||||||
|
Ok(result)
|
||||||
|
} else {
|
||||||
|
Ok(Vec::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,95 @@
|
|||||||
|
use super::FilterPlugin;
|
||||||
|
use std::io::Result;
|
||||||
|
use ringbuf::HeapRb;
|
||||||
|
|
||||||
|
pub struct TailBytesFilter {
|
||||||
|
ring_buffer: HeapRb<u8>,
|
||||||
|
count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TailBytesFilter {
|
||||||
|
pub fn new(count: usize) -> Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
ring_buffer: HeapRb::new(count),
|
||||||
|
count,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FilterPlugin for TailBytesFilter {
|
||||||
|
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
for &byte in data {
|
||||||
|
let _ = self.ring_buffer.push(byte);
|
||||||
|
}
|
||||||
|
Ok(Vec::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish(&mut self) -> Result<Vec<u8>> {
|
||||||
|
let mut result = Vec::with_capacity(self.ring_buffer.len());
|
||||||
|
for byte in self.ring_buffer.iter() {
|
||||||
|
result.push(*byte);
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TailLinesFilter {
|
||||||
|
ring_buffer: HeapRb<u8>,
|
||||||
|
count: usize,
|
||||||
|
lines_found: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TailLinesFilter {
|
||||||
|
pub fn new(count: usize) -> Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
ring_buffer: HeapRb::new(count * 256), // Estimate 256 bytes per line
|
||||||
|
count,
|
||||||
|
lines_found: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FilterPlugin for TailLinesFilter {
|
||||||
|
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
for &byte in data {
|
||||||
|
let _ = self.ring_buffer.push(byte);
|
||||||
|
if byte == b'\n' {
|
||||||
|
self.lines_found += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Vec::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish(&mut self) -> Result<Vec<u8>> {
|
||||||
|
// Count lines in the buffer to find where to start
|
||||||
|
let mut lines_to_keep = std::cmp::min(self.count, self.lines_found);
|
||||||
|
let mut bytes_to_keep = 0;
|
||||||
|
let mut lines_seen = 0;
|
||||||
|
|
||||||
|
// Iterate backwards to find the starting point
|
||||||
|
for i in (0..self.ring_buffer.len()).rev() {
|
||||||
|
let index = (self.ring_buffer.write_index() as isize - 1 - i as isize)
|
||||||
|
.rem_euclid(self.ring_buffer.capacity() as isize) as usize;
|
||||||
|
let byte = self.ring_buffer[index];
|
||||||
|
|
||||||
|
if byte == b'\n' {
|
||||||
|
lines_seen += 1;
|
||||||
|
if lines_seen > lines_to_keep {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bytes_to_keep += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the relevant bytes
|
||||||
|
let start_index = self.ring_buffer.len() - bytes_to_keep;
|
||||||
|
let mut result = Vec::with_capacity(bytes_to_keep);
|
||||||
|
for i in start_index..self.ring_buffer.len() {
|
||||||
|
let index = (self.ring_buffer.write_index() as isize - (self.ring_buffer.len() - i) as isize)
|
||||||
|
.rem_euclid(self.ring_buffer.capacity() as isize) as usize;
|
||||||
|
result.push(self.ring_buffer[index]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,34 @@
|
|||||||
|
use crate::filter_plugin::{FilterChain, parse_filter_string};
|
||||||
|
use std::io::Result;
|
||||||
|
|
||||||
|
pub struct FilterService;
|
||||||
|
|
||||||
|
impl FilterService {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_filter_chain(&self, filter_str: Option<&str>) -> Result<Option<FilterChain>> {
|
||||||
|
if let Some(filter_str) = filter_str {
|
||||||
|
parse_filter_string(filter_str).map(Some)
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process_data(&self, chain: &mut Option<FilterChain>, data: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
if let Some(chain) = chain {
|
||||||
|
chain.process(data)
|
||||||
|
} else {
|
||||||
|
Ok(data.to_vec())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finish_processing(&self, chain: &mut Option<FilterChain>) -> Result<Vec<u8>> {
|
||||||
|
if let Some(chain) = chain {
|
||||||
|
chain.finish()
|
||||||
|
} else {
|
||||||
|
Ok(Vec::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -107,6 +107,7 @@ impl ItemService {
|
|||||||
line_start: Option<usize>,
|
line_start: Option<usize>,
|
||||||
line_end: Option<usize>,
|
line_end: Option<usize>,
|
||||||
grep: Option<String>,
|
grep: Option<String>,
|
||||||
|
filter: Option<String>,
|
||||||
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
|
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
|
||||||
let item_with_meta = self.get_item(conn, id)?;
|
let item_with_meta = self.get_item(conn, id)?;
|
||||||
let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
||||||
@@ -120,35 +121,40 @@ impl ItemService {
|
|||||||
|
|
||||||
let reader = self.compression_service.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
|
let reader = self.compression_service.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
|
||||||
|
|
||||||
// Apply content filtering
|
// Build filter string from individual parameters (for backward compatibility)
|
||||||
let filtered_reader: Box<dyn Read + Send> = if let Some(pattern) = grep {
|
let mut filter_parts = Vec::new();
|
||||||
Box::new(GrepFilter::new(
|
if let Some(pattern) = grep {
|
||||||
reader,
|
filter_parts.push(format!("grep('{}')", pattern));
|
||||||
pattern,
|
}
|
||||||
)?)
|
if let Some(bytes) = head_bytes {
|
||||||
} else if head_bytes.is_some() || head_words.is_some() || head_lines.is_some() {
|
filter_parts.push(format!("head_bytes({})", bytes));
|
||||||
Box::new(HeadFilter::new(
|
}
|
||||||
reader,
|
if let Some(lines) = head_lines {
|
||||||
head_bytes,
|
filter_parts.push(format!("head_lines({})", lines));
|
||||||
head_words,
|
}
|
||||||
head_lines,
|
if let Some(bytes) = tail_bytes {
|
||||||
))
|
filter_parts.push(format!("tail_bytes({})", bytes));
|
||||||
} else if tail_bytes.is_some() || tail_words.is_some() || tail_lines.is_some() {
|
}
|
||||||
Box::new(TailFilter::new(
|
if let Some(lines) = tail_lines {
|
||||||
reader,
|
filter_parts.push(format!("tail_lines({})", lines));
|
||||||
tail_bytes,
|
}
|
||||||
tail_words,
|
// Add other filters as needed
|
||||||
tail_lines,
|
|
||||||
)?)
|
// Use the provided filter string if available, otherwise build from parts
|
||||||
} else if line_start.is_some() || line_end.is_some() {
|
let filter_str = filter.or_else(|| {
|
||||||
Box::new(LineRangeFilter::new(
|
if filter_parts.is_empty() {
|
||||||
reader,
|
None
|
||||||
line_start,
|
|
||||||
line_end,
|
|
||||||
))
|
|
||||||
} else {
|
} else {
|
||||||
Box::new(reader)
|
Some(filter_parts.join(" | "))
|
||||||
};
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create filter chain
|
||||||
|
let filter_service = crate::services::filter_service::FilterService::new();
|
||||||
|
let mut filter_chain = filter_service.create_filter_chain(filter_str.as_deref())?;
|
||||||
|
|
||||||
|
// Wrap the reader with filtering
|
||||||
|
let filtered_reader = Box::new(FilteringReader::new(reader, filter_chain));
|
||||||
|
|
||||||
let metadata = item_with_meta.meta_as_map();
|
let metadata = item_with_meta.meta_as_map();
|
||||||
let mime_type = metadata
|
let mime_type = metadata
|
||||||
@@ -909,6 +915,58 @@ impl<R: Read + Send> Read for GrepFilter<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Filtering reader that applies filter plugins
|
||||||
|
struct FilteringReader<R: Read + Send> {
|
||||||
|
inner: R,
|
||||||
|
filter_chain: Option<FilterChain>,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
buffer_pos: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Read + Send> FilteringReader<R> {
|
||||||
|
fn new(inner: R, filter_chain: Option<FilterChain>) -> Self {
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
filter_chain,
|
||||||
|
buffer: Vec::new(),
|
||||||
|
buffer_pos: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Read + Send> Read for FilteringReader<R> {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
if self.buffer_pos >= self.buffer.len() {
|
||||||
|
// Read more data from the inner reader
|
||||||
|
let mut temp_buf = vec![0; 8192];
|
||||||
|
let n = self.inner.read(&mut temp_buf)?;
|
||||||
|
if n == 0 {
|
||||||
|
// End of input, finish filtering
|
||||||
|
let filter_service = crate::services::filter_service::FilterService::new();
|
||||||
|
let remaining = filter_service.finish_processing(&mut self.filter_chain)?;
|
||||||
|
self.buffer = remaining;
|
||||||
|
self.buffer_pos = 0;
|
||||||
|
if self.buffer.is_empty() {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Process the chunk
|
||||||
|
let filter_service = crate::services::filter_service::FilterService::new();
|
||||||
|
let processed = filter_service.process_data(&mut self.filter_chain, &temp_buf[..n])?;
|
||||||
|
self.buffer = processed;
|
||||||
|
self.buffer_pos = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy from buffer to output
|
||||||
|
let bytes_to_copy = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
|
||||||
|
buf[..bytes_to_copy].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + bytes_to_copy]);
|
||||||
|
self.buffer_pos += bytes_to_copy;
|
||||||
|
|
||||||
|
Ok(bytes_to_copy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Line range filter implementation
|
// Line range filter implementation
|
||||||
struct LineRangeFilter<R: Read + Send> {
|
struct LineRangeFilter<R: Read + Send> {
|
||||||
inner: R,
|
inner: R,
|
||||||
|
|||||||
Reference in New Issue
Block a user