refactor: Update head, tail, skip filter plugins to use new API
Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) <aider@aider.chat>
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
use super::FilterPlugin;
|
use super::FilterPlugin;
|
||||||
use std::io::Result;
|
use std::io::{Result, Read, Write, BufRead};
|
||||||
|
|
||||||
pub struct HeadBytesFilter {
|
pub struct HeadBytesFilter {
|
||||||
remaining: usize,
|
remaining: usize,
|
||||||
@@ -14,73 +14,49 @@ impl HeadBytesFilter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl FilterPlugin for HeadBytesFilter {
|
impl FilterPlugin for HeadBytesFilter {
|
||||||
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
fn filter<R: Read, W: Write>(&mut self, reader: &mut R, writer: &mut W) -> Result<()> {
|
||||||
if self.remaining == 0 {
|
if self.remaining == 0 {
|
||||||
return Ok(Vec::new());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let bytes_to_take = std::cmp::min(data.len(), self.remaining);
|
// Read only up to remaining bytes
|
||||||
self.remaining -= bytes_to_take;
|
let mut buffer = vec![0; self.remaining];
|
||||||
Ok(data[..bytes_to_take].to_vec())
|
let bytes_read = reader.read(&mut buffer)?;
|
||||||
|
if bytes_read > 0 {
|
||||||
|
writer.write_all(&buffer[..bytes_read])?;
|
||||||
|
self.remaining -= bytes_read;
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
fn finish(&mut self) -> Result<Vec<u8>> {
|
|
||||||
Ok(Vec::new())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct HeadLinesFilter {
|
pub struct HeadLinesFilter {
|
||||||
remaining: usize,
|
remaining: usize,
|
||||||
buffer: Vec<u8>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HeadLinesFilter {
|
impl HeadLinesFilter {
|
||||||
pub fn new(count: usize) -> Self {
|
pub fn new(count: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
remaining: count,
|
remaining: count,
|
||||||
buffer: Vec::new(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FilterPlugin for HeadLinesFilter {
|
impl FilterPlugin for HeadLinesFilter {
|
||||||
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
fn filter<R: Read, W: Write>(&mut self, reader: &mut R, writer: &mut W) -> Result<()> {
|
||||||
if self.remaining == 0 {
|
if self.remaining == 0 {
|
||||||
return Ok(Vec::new());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut result = Vec::new();
|
let buf_reader = std::io::BufReader::new(reader);
|
||||||
let mut start = 0;
|
for line in buf_reader.lines() {
|
||||||
|
let line = line?;
|
||||||
for (i, &byte) in data.iter().enumerate() {
|
writeln!(writer, "{}", line)?;
|
||||||
if byte == b'\n' {
|
|
||||||
self.buffer.extend_from_slice(&data[start..=i]);
|
|
||||||
result.extend_from_slice(&self.buffer);
|
|
||||||
self.buffer.clear();
|
|
||||||
start = i + 1;
|
|
||||||
|
|
||||||
self.remaining -= 1;
|
self.remaining -= 1;
|
||||||
if self.remaining == 0 {
|
if self.remaining == 0 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Ok(())
|
||||||
|
|
||||||
// Add remaining data to buffer
|
|
||||||
if start < data.len() {
|
|
||||||
self.buffer.extend_from_slice(&data[start..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn finish(&mut self) -> Result<Vec<u8>> {
|
|
||||||
if self.remaining > 0 && !self.buffer.is_empty() {
|
|
||||||
let result = self.buffer.clone();
|
|
||||||
self.buffer.clear();
|
|
||||||
Ok(result)
|
|
||||||
} else {
|
|
||||||
Ok(Vec::new())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use super::FilterPlugin;
|
use super::FilterPlugin;
|
||||||
use std::io::Result;
|
use std::io::{Result, Read, Write, BufRead};
|
||||||
|
|
||||||
pub struct SkipBytesFilter {
|
pub struct SkipBytesFilter {
|
||||||
remaining: usize,
|
remaining: usize,
|
||||||
@@ -14,85 +14,43 @@ impl SkipBytesFilter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl FilterPlugin for SkipBytesFilter {
|
impl FilterPlugin for SkipBytesFilter {
|
||||||
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
fn filter<R: Read, W: Write>(&mut self, reader: &mut R, writer: &mut W) -> Result<()> {
|
||||||
if self.remaining == 0 {
|
// Skip the specified number of bytes
|
||||||
return Ok(data.to_vec());
|
if self.remaining > 0 {
|
||||||
|
let mut buffer = vec![0; self.remaining];
|
||||||
|
let bytes_read = reader.read(&mut buffer)?;
|
||||||
|
self.remaining -= bytes_read;
|
||||||
}
|
}
|
||||||
|
|
||||||
if data.len() <= self.remaining {
|
// Copy the remaining data
|
||||||
self.remaining -= data.len();
|
std::io::copy(reader, writer)?;
|
||||||
Ok(Vec::new())
|
Ok(())
|
||||||
} else {
|
|
||||||
let result = data[self.remaining..].to_vec();
|
|
||||||
self.remaining = 0;
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn finish(&mut self) -> Result<Vec<u8>> {
|
|
||||||
Ok(Vec::new())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SkipLinesFilter {
|
pub struct SkipLinesFilter {
|
||||||
remaining: usize,
|
remaining: usize,
|
||||||
buffer: Vec<u8>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SkipLinesFilter {
|
impl SkipLinesFilter {
|
||||||
pub fn new(count: usize) -> Self {
|
pub fn new(count: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
remaining: count,
|
remaining: count,
|
||||||
buffer: Vec::new(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FilterPlugin for SkipLinesFilter {
|
impl FilterPlugin for SkipLinesFilter {
|
||||||
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
fn filter<R: Read, W: Write>(&mut self, reader: &mut R, writer: &mut W) -> Result<()> {
|
||||||
if self.remaining == 0 {
|
let buf_reader = std::io::BufReader::new(reader);
|
||||||
let mut result = self.buffer.clone();
|
for line in buf_reader.lines() {
|
||||||
result.extend_from_slice(data);
|
let line = line?;
|
||||||
self.buffer.clear();
|
|
||||||
return Ok(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut result = Vec::new();
|
|
||||||
let mut start = 0;
|
|
||||||
|
|
||||||
for (i, &byte) in data.iter().enumerate() {
|
|
||||||
if byte == b'\n' {
|
|
||||||
if self.remaining > 0 {
|
if self.remaining > 0 {
|
||||||
self.remaining -= 1;
|
self.remaining -= 1;
|
||||||
start = i + 1;
|
|
||||||
} else {
|
} else {
|
||||||
self.buffer.extend_from_slice(&data[start..=i]);
|
writeln!(writer, "{}", line)?;
|
||||||
result.extend_from_slice(&self.buffer);
|
|
||||||
self.buffer.clear();
|
|
||||||
start = i + 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Ok(())
|
||||||
|
|
||||||
// Add remaining data to buffer
|
|
||||||
if start < data.len() {
|
|
||||||
if self.remaining == 0 {
|
|
||||||
result.extend_from_slice(&data[start..]);
|
|
||||||
} else {
|
|
||||||
self.buffer.extend_from_slice(&data[start..]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn finish(&mut self) -> Result<Vec<u8>> {
|
|
||||||
if self.remaining == 0 {
|
|
||||||
let result = self.buffer.clone();
|
|
||||||
self.buffer.clear();
|
|
||||||
Ok(result)
|
|
||||||
} else {
|
|
||||||
Ok(Vec::new())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,95 +1,75 @@
|
|||||||
use super::FilterPlugin;
|
use super::FilterPlugin;
|
||||||
use std::io::Result;
|
use std::io::{Result, Read, Write, BufRead};
|
||||||
use ringbuf::{HeapRb, Rb};
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
pub struct TailBytesFilter {
|
pub struct TailBytesFilter {
|
||||||
ring_buffer: HeapRb<u8>,
|
buffer: VecDeque<u8>,
|
||||||
_count: usize,
|
count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TailBytesFilter {
|
impl TailBytesFilter {
|
||||||
pub fn new(count: usize) -> Self {
|
pub fn new(count: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
ring_buffer: HeapRb::new(count),
|
buffer: VecDeque::with_capacity(count),
|
||||||
_count: count,
|
count,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FilterPlugin for TailBytesFilter {
|
impl FilterPlugin for TailBytesFilter {
|
||||||
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
fn filter<R: Read, W: Write>(&mut self, reader: &mut R, writer: &mut W) -> Result<()> {
|
||||||
for &byte in data {
|
let mut temp_buffer = vec![0; 8192];
|
||||||
let _ = self.ring_buffer.push(byte);
|
loop {
|
||||||
}
|
let bytes_read = reader.read(&mut temp_buffer)?;
|
||||||
Ok(Vec::new())
|
if bytes_read == 0 {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn finish(&mut self) -> Result<Vec<u8>> {
|
// Add new data to the buffer
|
||||||
// Collect all bytes from the ring buffer
|
for &byte in &temp_buffer[..bytes_read] {
|
||||||
let mut result = Vec::with_capacity(self.ring_buffer.len());
|
if self.buffer.len() == self.count {
|
||||||
for byte in self.ring_buffer.iter() {
|
self.buffer.pop_front();
|
||||||
result.push(*byte);
|
|
||||||
}
|
}
|
||||||
Ok(result)
|
self.buffer.push_back(byte);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the buffered data at the end
|
||||||
|
let result: Vec<u8> = self.buffer.iter().cloned().collect();
|
||||||
|
writer.write_all(&result)?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TailLinesFilter {
|
pub struct TailLinesFilter {
|
||||||
ring_buffer: HeapRb<u8>,
|
lines: VecDeque<String>,
|
||||||
count: usize,
|
count: usize,
|
||||||
lines_found: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TailLinesFilter {
|
impl TailLinesFilter {
|
||||||
pub fn new(count: usize) -> Self {
|
pub fn new(count: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
ring_buffer: HeapRb::new(count * 256), // Estimate 256 bytes per line
|
lines: VecDeque::with_capacity(count),
|
||||||
count,
|
count,
|
||||||
lines_found: 0,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FilterPlugin for TailLinesFilter {
|
impl FilterPlugin for TailLinesFilter {
|
||||||
fn process(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
fn filter<R: Read, W: Write>(&mut self, reader: &mut R, writer: &mut W) -> Result<()> {
|
||||||
for &byte in data {
|
let buf_reader = std::io::BufReader::new(reader);
|
||||||
let _ = self.ring_buffer.push(byte);
|
for line in buf_reader.lines() {
|
||||||
if byte == b'\n' {
|
let line = line?;
|
||||||
self.lines_found += 1;
|
if self.lines.len() == self.count {
|
||||||
|
self.lines.pop_front();
|
||||||
}
|
}
|
||||||
}
|
self.lines.push_back(line);
|
||||||
Ok(Vec::new())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn finish(&mut self) -> Result<Vec<u8>> {
|
// Write the buffered lines
|
||||||
// For ring buffer, we can use the iter() method to get all elements
|
for line in &self.lines {
|
||||||
// Since it's a circular buffer, we need to handle the wrap-around
|
writeln!(writer, "{}", line)?;
|
||||||
let mut result = Vec::with_capacity(self.ring_buffer.len());
|
|
||||||
|
|
||||||
// The ring buffer maintains elements in insertion order
|
|
||||||
for byte in self.ring_buffer.iter() {
|
|
||||||
result.push(*byte);
|
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
// Now, we need to find the last 'count' lines
|
|
||||||
if self.count == 0 {
|
|
||||||
return Ok(Vec::new());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Split into lines and take the last 'count' lines
|
|
||||||
let text = String::from_utf8_lossy(&result);
|
|
||||||
let lines: Vec<&str> = text.split('\n').collect();
|
|
||||||
|
|
||||||
// Take the last 'count' lines
|
|
||||||
let start_index = if lines.len() > self.count {
|
|
||||||
lines.len() - self.count
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
|
|
||||||
let selected_lines = &lines[start_index..];
|
|
||||||
let result_text = selected_lines.join("\n");
|
|
||||||
|
|
||||||
Ok(result_text.into_bytes())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,27 +1,4 @@
|
|||||||
use std::io::Result;
|
use std::io::Result;
|
||||||
use super::FilterPlugin;
|
|
||||||
|
|
||||||
/// Helper trait for common filter operations
|
|
||||||
pub trait FilterUtils {
|
|
||||||
/// Process data through a filter, handling empty results
|
|
||||||
fn process_data(&mut self, data: &[u8]) -> Result<Vec<u8>>;
|
|
||||||
|
|
||||||
/// Process data and check if we should continue processing
|
|
||||||
fn process_and_check_continue(&mut self, data: &[u8]) -> Result<(Vec<u8>, bool)>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: FilterPlugin> FilterUtils for T {
|
|
||||||
fn process_data(&mut self, data: &[u8]) -> Result<Vec<u8>> {
|
|
||||||
let result = self.process(data)?;
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_and_check_continue(&mut self, data: &[u8]) -> Result<(Vec<u8>, bool)> {
|
|
||||||
let result = self.process(data)?;
|
|
||||||
let should_continue = !result.is_empty();
|
|
||||||
Ok((result, should_continue))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Helper function to create a filter chain from a string
|
/// Helper function to create a filter chain from a string
|
||||||
pub fn create_filter_chain(filter_str: &str) -> Result<Option<super::FilterChain>> {
|
pub fn create_filter_chain(filter_str: &str) -> Result<Option<super::FilterChain>> {
|
||||||
|
|||||||
Reference in New Issue
Block a user