use crate::common::is_binary::is_binary; use crate::common::PIPESIZE; use crate::meta_plugin::{MetaPlugin, MetaPluginResponse}; #[derive(Debug, Clone)] pub struct TextMetaPlugin { buffer: Option>, max_buffer_size: usize, is_finalized: bool, word_count: usize, line_count: usize, is_binary_content: Option, // State for tracking word boundaries across chunks in_word: bool, // Buffer for handling UTF-8 character boundaries utf8_buffer: Vec, base: crate::meta_plugin::BaseMetaPlugin, // Options to track specific statistics track_word_count: bool, track_line_count: bool, track_line_lengths: bool, // For tracking line lengths line_lengths: Option>, current_line_length: usize, } impl TextMetaPlugin { pub fn new( options: Option>, outputs: Option>, ) -> TextMetaPlugin { let mut base = crate::meta_plugin::BaseMetaPlugin::new(); base.meta_name = "text".to_string(); // Initialize with helper function base.initialize_plugin( &["text", "binary", "text_word_count", "text_line_count", "text_line_max_len", "text_line_mean_len", "text_line_median_len"], options, outputs, ); log::debug!("TEXT: Plugin initialized with outputs: {:?}", base.outputs); // Get text_detect_size (previously max_buffer_size) let max_buffer_size = base.options.get("text_detect_size") .or_else(|| base.options.get("max_buffer_size")) // Handle backward compatibility .and_then(|v| v.as_u64()) .unwrap_or(PIPESIZE as u64) as usize; // Get which statistics to track let track_word_count = base.options.get("text_word_count") .and_then(|v| v.as_bool()) .unwrap_or(true); let track_line_count = base.options.get("text_line_count") .and_then(|v| v.as_bool()) .unwrap_or(true); let track_line_max_len = base.options.get("text_line_max_len") .and_then(|v| v.as_bool()) .unwrap_or(true); let track_line_mean_len = base.options.get("text_line_mean_len") .and_then(|v| v.as_bool()) .unwrap_or(true); let track_line_median_len = base.options.get("text_line_median_len") .and_then(|v| v.as_bool()) .unwrap_or(true); // Track line lengths if any of the line length options are enabled let track_line_lengths = track_line_max_len || track_line_mean_len || track_line_median_len; TextMetaPlugin { buffer: Some(Vec::new()), max_buffer_size, is_finalized: false, word_count: 0, line_count: 0, is_binary_content: None, in_word: false, utf8_buffer: Vec::new(), base, // Add fields for line length tracking track_word_count, track_line_count, track_line_lengths, line_lengths: if track_line_lengths { Some(Vec::new()) } else { None }, current_line_length: 0, } } pub fn new_simple() -> TextMetaPlugin { Self::new(None, None) } /// Count words and lines in a text chunk, handling block boundaries correctly fn count_text_stats(&mut self, data: &[u8]) { // Count lines (newlines) if needed if self.track_line_count { self.line_count += data.iter().filter(|&&b| b == b'\n').count(); } // Handle UTF-8 character boundaries by combining with any buffered bytes let combined_data = if !self.utf8_buffer.is_empty() { let mut combined = self.utf8_buffer.clone(); combined.extend_from_slice(data); combined } else { data.to_vec() }; // Clear the UTF-8 buffer self.utf8_buffer.clear(); // Convert to string, handling potential UTF-8 boundaries let text = match std::str::from_utf8(&combined_data) { Ok(text) => text, Err(e) => { // If we have incomplete UTF-8 at the end, buffer those bytes for next chunk let valid_up_to = e.valid_up_to(); if valid_up_to < combined_data.len() { self.utf8_buffer.extend_from_slice(&combined_data[valid_up_to..]); } match std::str::from_utf8(&combined_data[..valid_up_to]) { Ok(text) => text, Err(_) => return, // Can't process this data } } }; // Count words if needed if self.track_word_count { for ch in text.chars() { let is_whitespace = ch.is_whitespace(); if !self.in_word && !is_whitespace { // Transition from whitespace to word - start of new word self.word_count += 1; self.in_word = true; } else if self.in_word && is_whitespace { // Transition from word to whitespace - end of current word self.in_word = false; } } } // Track line lengths if needed if self.track_line_lengths { for ch in text.chars() { if ch == '\n' { if let Some(ref mut lengths) = self.line_lengths { lengths.push(self.current_line_length); } self.current_line_length = 0; } else { self.current_line_length += 1; } } } } /// Helper method to perform binary detection and return appropriate metadata /// Returns (metadata, should_finalize) tuple fn perform_binary_detection(&mut self, buffer: &[u8]) -> (Vec, bool) { let mut metadata = Vec::new(); let is_binary_result = is_binary(buffer); self.is_binary_content = Some(is_binary_result); // Output text and binary status let text_value = if is_binary_result { "false".to_string() } else { "true".to_string() }; let binary_value = if is_binary_result { "true".to_string() } else { "false".to_string() }; // Use process_metadata_outputs to handle output mapping if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text", text_value, self.base.outputs() ) { metadata.push(meta_data); } if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "binary", binary_value, self.base.outputs() ) { metadata.push(meta_data); } (metadata, is_binary_result) } /// Helper method to output word and line counts fn output_word_line_counts(&mut self) -> Vec { let mut metadata = Vec::new(); // Process any remaining data in utf8_buffer if !self.utf8_buffer.is_empty() { self.count_text_stats(&[]); } // Handle the last line if tracking line lengths if self.track_line_lengths && self.current_line_length > 0 { if let Some(ref mut lengths) = self.line_lengths { lengths.push(self.current_line_length); } } // Debug: check if outputs are configured log::debug!("TEXT: Outputs: {:?}", self.base.outputs()); log::debug!("TEXT: Word count: {}, Line count: {}", self.word_count, self.line_count); // Output word count if tracked if self.track_word_count { if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text_word_count", self.word_count.to_string(), self.base.outputs() ) { log::debug!("TEXT: Adding word count metadata: {:?}", meta_data); metadata.push(meta_data); } else { log::debug!("TEXT: Word count output is disabled or not mapped"); } } // Output line count if tracked if self.track_line_count { if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text_line_count", self.line_count.to_string(), self.base.outputs() ) { log::debug!("TEXT: Adding line count metadata: {:?}", meta_data); metadata.push(meta_data); } else { log::debug!("TEXT: Line count output is disabled or not mapped"); } } // Output line length statistics if tracked if self.track_line_lengths { if let Some(lengths) = &self.line_lengths { if !lengths.is_empty() { // Calculate max, mean, median let max_len = lengths.iter().max().unwrap(); let sum: usize = lengths.iter().sum(); let mean_len = sum as f64 / lengths.len() as f64; let mut sorted_lengths = lengths.clone(); sorted_lengths.sort(); let median_len = if lengths.len() % 2 == 0 { (sorted_lengths[lengths.len() / 2 - 1] + sorted_lengths[lengths.len() / 2]) as f64 / 2.0 } else { sorted_lengths[lengths.len() / 2] as f64 }; // Add each statistic if its corresponding option is enabled if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text_line_max_len", max_len.to_string(), self.base.outputs() ) { metadata.push(meta_data); } if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text_line_mean_len", mean_len.to_string(), self.base.outputs() ) { metadata.push(meta_data); } if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text_line_median_len", median_len.to_string(), self.base.outputs() ) { metadata.push(meta_data); } } } } metadata } } impl MetaPlugin for TextMetaPlugin { fn is_finalized(&self) -> bool { self.is_finalized } fn set_finalized(&mut self, finalized: bool) { self.is_finalized = finalized; } fn update(&mut self, data: &[u8]) -> MetaPluginResponse { // If already finalized, don't process more data if self.is_finalized { return MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } let mut metadata = Vec::new(); // If we haven't determined if content is binary yet, build buffer and check if self.is_binary_content.is_none() { let should_finalize = if let Some(ref mut buffer) = self.buffer { // Add data to our buffer up to max_buffer_size let remaining_capacity = self.max_buffer_size.saturating_sub(buffer.len()); let bytes_to_take = std::cmp::min(data.len(), remaining_capacity); buffer.extend_from_slice(&data[..bytes_to_take]); // If we have enough data to make a binary determination, do it now let buffer_len = buffer.len(); if buffer_len >= std::cmp::min(1024, self.max_buffer_size) { // Clone the buffer data for binary detection to avoid borrowing conflicts let buffer_clone = buffer.clone(); let (binary_metadata, is_binary) = self.perform_binary_detection(&buffer_clone); metadata.extend(binary_metadata); self.is_binary_content = Some(is_binary); // If it's binary, we're done with this plugin if is_binary { self.buffer = None; // Drop the buffer self.is_finalized = true; return MetaPluginResponse { metadata, is_finalized: true, }; } // If it's text, count words and lines for this chunk self.count_text_stats(&data[..bytes_to_take]); // If we've reached our buffer limit, drop the buffer to save memory // But don't finalize yet - we need to keep counting words and lines if buffer_len >= self.max_buffer_size { log::debug!("TEXT: Reached max buffer size, dropping buffer but not finalizing"); self.buffer = None; // Drop the buffer } false // Never finalize here for text content } else { // Still building up buffer, count words and lines for this chunk self.count_text_stats(&data[..bytes_to_take]); false } } else { false }; if should_finalize { return MetaPluginResponse { metadata, is_finalized: true, }; } } else if self.is_binary_content == Some(false) { // We've already determined it's text, just count words and lines self.count_text_stats(data); } // If is_binary_content == Some(true), we should have already finalized, but just in case: else if self.is_binary_content == Some(true) { self.is_finalized = true; return MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } MetaPluginResponse { metadata, is_finalized: self.is_finalized, } } fn finalize(&mut self) -> MetaPluginResponse { // If already finalized, don't process again if self.is_finalized { return MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } let mut metadata = Vec::new(); // If we haven't determined binary status yet, do it now with whatever we have if self.is_binary_content.is_none() { if let Some(buffer) = &self.buffer { if !buffer.is_empty() { // Clone the buffer data for binary detection to avoid borrowing conflicts let buffer_clone = buffer.clone(); let (binary_metadata, is_binary) = self.perform_binary_detection(&buffer_clone); metadata.extend(binary_metadata); self.is_binary_content = Some(is_binary); // If it's binary, we're done if is_binary { self.buffer = None; // Drop the buffer self.is_finalized = true; return MetaPluginResponse { metadata, is_finalized: true, }; } } } } // If content is text, output word and line counts if self.is_binary_content == Some(false) { log::debug!("TEXT: Content is text, outputting word and line counts"); let word_line_metadata = self.output_word_line_counts(); log::debug!("TEXT: Word line metadata: {:?}", word_line_metadata); metadata.extend(word_line_metadata); } else { log::debug!("TEXT: Content is not text, not outputting word and line counts. is_binary_content: {:?}", self.is_binary_content); } // Drop the buffer since we're done with it self.buffer = None; // Mark as finalized self.is_finalized = true; log::debug!("TEXT: Finalizing with metadata: {:?}", metadata); MetaPluginResponse { metadata, is_finalized: true, } } fn meta_name(&self) -> String { self.base.meta_name.clone() } fn outputs(&self) -> &std::collections::HashMap { self.base.outputs() } fn outputs_mut(&mut self) -> &mut std::collections::HashMap { self.base.outputs_mut() } fn default_outputs(&self) -> Vec { vec![ "text".to_string(), "binary".to_string(), "text_word_count".to_string(), "text_line_count".to_string(), "text_line_max_len".to_string(), "text_line_mean_len".to_string(), "text_line_median_len".to_string() ] } fn options(&self) -> &std::collections::HashMap { self.base.options() } fn options_mut(&mut self) -> &mut std::collections::HashMap { self.base.options_mut() } fn configure_options(&mut self, options: &std::collections::HashMap) -> anyhow::Result<()> { if let Some(text_detect_size) = options.get("text_detect_size") { if let Some(size) = text_detect_size.as_u64() { self.max_buffer_size = size as usize; } } // Handle the old option name for backward compatibility else if let Some(max_buffer_size) = options.get("max_buffer_size") { if let Some(size) = max_buffer_size.as_u64() { self.max_buffer_size = size as usize; } } // Update tracking options if let Some(track) = options.get("text_word_count") { if let Some(track_bool) = track.as_bool() { self.track_word_count = track_bool; } } if let Some(track) = options.get("text_line_count") { if let Some(track_bool) = track.as_bool() { self.track_line_count = track_bool; } } if let Some(track) = options.get("text_line_max_len") { if let Some(track_bool) = track.as_bool() { if track_bool { self.track_line_lengths = true; if self.line_lengths.is_none() { self.line_lengths = Some(Vec::new()); } } } } // Similar for mean and median, but we'll just check if any are true to enable tracking // For simplicity, we'll enable tracking if any of the line length options are true let track_line_max = options.get("text_line_max_len").and_then(|v| v.as_bool()).unwrap_or(false); let track_line_mean = options.get("text_line_mean_len").and_then(|v| v.as_bool()).unwrap_or(false); let track_line_median = options.get("text_line_median_len").and_then(|v| v.as_bool()).unwrap_or(false); self.track_line_lengths = track_line_max || track_line_mean || track_line_median; if self.track_line_lengths && self.line_lengths.is_none() { self.line_lengths = Some(Vec::new()); } Ok(()) } }