use crate::common::is_binary::is_binary; use crate::common::PIPESIZE; use crate::meta_plugin::{MetaPlugin, MetaPluginResponse}; #[derive(Debug, Clone, Default)] pub struct TextMetaPlugin { buffer: Option>, max_buffer_size: usize, is_finalized: bool, word_count: usize, line_count: usize, is_binary_content: Option, // State for tracking word boundaries across chunks in_word: bool, // Buffer for handling UTF-8 character boundaries utf8_buffer: Vec, base: crate::meta_plugin::BaseMetaPlugin, } impl TextMetaPlugin { pub fn new( options: Option>, outputs: Option>, ) -> TextMetaPlugin { let mut base = crate::meta_plugin::BaseMetaPlugin::new(); base.meta_name = "text".to_string(); // Initialize with helper function base.initialize_plugin( &["text", "binary", "text_word_count", "text_line_count"], options, outputs, ); let max_buffer_size = base.options.get("max_buffer_size") .and_then(|v| v.as_u64()) .unwrap_or(PIPESIZE as u64) as usize; TextMetaPlugin { buffer: Some(Vec::new()), max_buffer_size, is_finalized: false, word_count: 0, line_count: 0, is_binary_content: None, in_word: false, utf8_buffer: Vec::new(), base, } } pub fn new_simple() -> TextMetaPlugin { Self::new(None, None) } /// Count words and lines in a text chunk, handling block boundaries correctly fn count_text_stats(&mut self, data: &[u8]) { // Count lines (newlines) self.line_count += data.iter().filter(|&&b| b == b'\n').count(); // Handle UTF-8 character boundaries by combining with any buffered bytes let combined_data = if !self.utf8_buffer.is_empty() { let mut combined = self.utf8_buffer.clone(); combined.extend_from_slice(data); combined } else { data.to_vec() }; // Clear the UTF-8 buffer self.utf8_buffer.clear(); // Convert to string, handling potential UTF-8 boundaries let text = match std::str::from_utf8(&combined_data) { Ok(text) => text, Err(e) => { // If we have incomplete UTF-8 at the end, buffer those bytes for next chunk let valid_up_to = e.valid_up_to(); if valid_up_to < combined_data.len() { self.utf8_buffer.extend_from_slice(&combined_data[valid_up_to..]); } match std::str::from_utf8(&combined_data[..valid_up_to]) { Ok(text) => text, Err(_) => return, // Can't process this data } } }; // Count words using wc-like algorithm that tracks state across chunks for ch in text.chars() { let is_whitespace = ch.is_whitespace(); if !self.in_word && !is_whitespace { // Transition from whitespace to word - start of new word self.word_count += 1; self.in_word = true; } else if self.in_word && is_whitespace { // Transition from word to whitespace - end of current word self.in_word = false; } } } /// Helper method to perform binary detection and return appropriate metadata /// Returns (metadata, should_finalize) tuple fn perform_binary_detection(&mut self, buffer: &[u8]) -> (Vec, bool) { let mut metadata = Vec::new(); let is_binary_result = is_binary(buffer); self.is_binary_content = Some(is_binary_result); // Output text and binary status let text_value = if is_binary_result { "false".to_string() } else { "true".to_string() }; let binary_value = if is_binary_result { "true".to_string() } else { "false".to_string() }; // Use process_metadata_outputs to handle output mapping if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text", text_value, self.base.outputs() ) { metadata.push(meta_data); } if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "binary", binary_value, self.base.outputs() ) { metadata.push(meta_data); } (metadata, is_binary_result) } /// Helper method to output word and line counts fn output_word_line_counts(&mut self) -> Vec { let mut metadata = Vec::new(); // Process any remaining data in utf8_buffer if !self.utf8_buffer.is_empty() { self.count_text_stats(&[]); } // Output word and line counts if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text_word_count", self.word_count.to_string(), self.base.outputs() ) { metadata.push(meta_data); } if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text_line_count", self.line_count.to_string(), self.base.outputs() ) { metadata.push(meta_data); } metadata } } impl MetaPlugin for TextMetaPlugin { fn is_finalized(&self) -> bool { self.is_finalized } fn set_finalized(&mut self, finalized: bool) { self.is_finalized = finalized; } fn update(&mut self, data: &[u8]) -> MetaPluginResponse { // If already finalized, don't process more data if self.is_finalized { return MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } let mut metadata = Vec::new(); // If we haven't determined if content is binary yet, build buffer and check if self.is_binary_content.is_none() { if let Some(buffer) = &mut self.buffer { // Add data to our buffer up to max_buffer_size let remaining_capacity = self.max_buffer_size.saturating_sub(buffer.len()); let bytes_to_take = std::cmp::min(data.len(), remaining_capacity); buffer.extend_from_slice(&data[..bytes_to_take]); // If we have enough data to make a binary determination, do it now if buffer.len() >= std::cmp::min(1024, self.max_buffer_size) { let (binary_metadata, is_binary) = self.perform_binary_detection(buffer); metadata.extend(binary_metadata); // If it's binary, we're done with this plugin if is_binary { self.buffer = None; // Drop the buffer self.is_finalized = true; return MetaPluginResponse { metadata, is_finalized: true, }; } // If it's text, count words and lines for this chunk and stop buffering self.count_text_stats(&data[..bytes_to_take]); // If we've reached our buffer limit, drop the buffer and finalize if buffer.len() >= self.max_buffer_size { self.buffer = None; // Drop the buffer self.is_finalized = true; } } else { // Still building up buffer, count words and lines for this chunk self.count_text_stats(&data[..bytes_to_take]); } } } else if self.is_binary_content == Some(false) { // We've already determined it's text, just count words and lines self.count_text_stats(data); } // If is_binary_content == Some(true), we should have already finalized, but just in case: else if self.is_binary_content == Some(true) { self.is_finalized = true; return MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } MetaPluginResponse { metadata, is_finalized: self.is_finalized, } } fn finalize(&mut self) -> MetaPluginResponse { // If already finalized, don't process again if self.is_finalized { return MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } let mut metadata = Vec::new(); // If we haven't determined binary status yet, do it now with whatever we have if self.is_binary_content.is_none() { if let Some(buffer) = &self.buffer { if !buffer.is_empty() { let (binary_metadata, is_binary) = self.perform_binary_detection(buffer); metadata.extend(binary_metadata); // If it's binary, we're done if is_binary { self.buffer = None; // Drop the buffer self.is_finalized = true; return MetaPluginResponse { metadata, is_finalized: true, }; } } } } // If content is text, output word and line counts if self.is_binary_content == Some(false) { let word_line_metadata = self.output_word_line_counts(); metadata.extend(word_line_metadata); } // Drop the buffer since we're done with it self.buffer = None; // Mark as finalized self.is_finalized = true; MetaPluginResponse { metadata, is_finalized: true, } } fn meta_name(&self) -> String { self.base.meta_name.clone() } fn outputs(&self) -> &std::collections::HashMap { self.base.outputs() } fn outputs_mut(&mut self) -> &mut std::collections::HashMap { self.base.outputs_mut() } fn default_outputs(&self) -> Vec { vec!["text".to_string(), "binary".to_string(), "text_word_count".to_string(), "text_line_count".to_string()] } fn options(&self) -> &std::collections::HashMap { self.base.options() } fn options_mut(&mut self) -> &mut std::collections::HashMap { self.base.options_mut() } fn configure_options(&mut self, options: &std::collections::HashMap) -> anyhow::Result<()> { if let Some(max_buffer_size) = options.get("max_buffer_size") { if let Some(size) = max_buffer_size.as_u64() { self.max_buffer_size = size as usize; } } Ok(()) } }