use crate::common::PIPESIZE; use crate::common::is_binary::is_binary; use crate::meta_plugin::{MetaPlugin, MetaPluginResponse, MetaPluginType}; #[derive(Debug, Clone)] pub struct TextMetaPlugin { buffer: Option>, max_buffer_size: usize, is_finalized: bool, word_count: usize, line_count: usize, is_binary_content: Option, // State for tracking word boundaries across chunks in_word: bool, // Buffer for handling UTF-8 character boundaries utf8_buffer: Vec, base: crate::meta_plugin::BaseMetaPlugin, // Options to track specific statistics track_word_count: bool, track_line_count: bool, track_line_lengths: bool, // Flags for which line length statistics to output output_line_max_len: bool, output_line_mean_len: bool, output_line_median_len: bool, // For tracking line lengths line_lengths: Option>, current_line_length: usize, // For incremental calculation of max and mean max_line_length: usize, total_line_length: usize, line_count_for_stats: usize, } impl TextMetaPlugin { pub fn new( options: Option>, outputs: Option>, ) -> TextMetaPlugin { let mut base = crate::meta_plugin::BaseMetaPlugin::new(); // Initialize with helper function base.initialize_plugin( &[ "text", "text_word_count", "text_line_count", "text_line_max_len", "text_line_mean_len", "text_line_median_len", ], &options, &outputs, ); // Set disabled outputs to null based on options let outputs_to_disable = vec![ ("text_word_count", "text_word_count"), ("text_line_count", "text_line_count"), ("text_line_max_len", "text_line_max_len"), ("text_line_mean_len", "text_line_mean_len"), ("text_line_median_len", "text_line_median_len"), ]; for (option_name, output_name) in outputs_to_disable { if let Some(value) = base.options.get(option_name) { // Handle both boolean false and string "false" let should_disable = match value { serde_yaml::Value::Bool(b) => !b, serde_yaml::Value::String(s) => s == "false", _ => false, }; if should_disable { base.outputs .insert(output_name.to_string(), serde_yaml::Value::Null); } } } // Set default options if not provided let default_options = vec![ ( "text_detect_size", serde_yaml::Value::Number(PIPESIZE.into()), ), ("text_word_count", serde_yaml::Value::Bool(true)), ("text_line_count", serde_yaml::Value::Bool(true)), ("text_line_max_len", serde_yaml::Value::Bool(true)), ("text_line_mean_len", serde_yaml::Value::Bool(true)), ("text_line_median_len", serde_yaml::Value::Bool(false)), ]; for (key, value) in default_options { if !base.options.contains_key(key) { base.options.insert(key.to_string(), value); } } // Get text_detect_size (previously max_buffer_size) let max_buffer_size = base .options .get("text_detect_size") .or_else(|| base.options.get("max_buffer_size")) // Handle backward compatibility .and_then(|v| v.as_u64()) .unwrap_or(PIPESIZE as u64) as usize; // Get which statistics to track let track_word_count = base .options .get("text_word_count") .and_then(|v| v.as_bool()) .unwrap_or(true); let track_line_count = base .options .get("text_line_count") .and_then(|v| v.as_bool()) .unwrap_or(true); let track_line_max_len = base .options .get("text_line_max_len") .and_then(|v| v.as_bool()) .unwrap_or(true); let track_line_mean_len = base .options .get("text_line_mean_len") .and_then(|v| v.as_bool()) .unwrap_or(true); let track_line_median_len = base .options .get("text_line_median_len") .and_then(|v| v.as_bool()) .unwrap_or(false); // Track line lengths if any of the line length options are enabled let track_line_lengths = track_line_max_len || track_line_mean_len || track_line_median_len; TextMetaPlugin { buffer: Some(Vec::new()), max_buffer_size, is_finalized: false, word_count: 0, line_count: 0, is_binary_content: None, in_word: false, utf8_buffer: Vec::new(), base, // Add fields for line length tracking track_word_count, track_line_count, track_line_lengths, // Set output flags output_line_max_len: track_line_max_len, output_line_mean_len: track_line_mean_len, output_line_median_len: track_line_median_len, line_lengths: if track_line_lengths { Some(Vec::new()) } else { None }, current_line_length: 0, // Initialize incremental tracking for max and mean max_line_length: 0, total_line_length: 0, line_count_for_stats: 0, } } /// Count words and lines in a text chunk, handling block boundaries correctly. /// /// Processes UTF-8 data, tracks word transitions, and updates line length statistics. /// /// # Arguments /// /// * `data` - Byte slice of text content. fn count_text_stats(&mut self, data: &[u8]) { // Count lines (newlines) if needed if self.track_line_count { self.line_count += data.iter().filter(|&&b| b == b'\n').count(); } // Handle UTF-8 character boundaries by combining with any buffered bytes let combined_data = if !self.utf8_buffer.is_empty() { let mut combined = self.utf8_buffer.clone(); combined.extend_from_slice(data); combined } else { data.to_vec() }; // Clear the UTF-8 buffer self.utf8_buffer.clear(); // Convert to string, handling potential UTF-8 boundaries let text = match std::str::from_utf8(&combined_data) { Ok(text) => text, Err(e) => { // If we have incomplete UTF-8 at the end, buffer those bytes for next chunk let valid_up_to = e.valid_up_to(); if valid_up_to < combined_data.len() { self.utf8_buffer .extend_from_slice(&combined_data[valid_up_to..]); } match std::str::from_utf8(&combined_data[..valid_up_to]) { Ok(text) => text, Err(_) => return, // Can't process this data } } }; // Count words if needed if self.track_word_count { for ch in text.chars() { let is_whitespace = ch.is_whitespace(); if !self.in_word && !is_whitespace { // Transition from whitespace to word - start of new word self.word_count += 1; self.in_word = true; } else if self.in_word && is_whitespace { // Transition from word to whitespace - end of current word self.in_word = false; } } } // Track line lengths if needed if self.track_line_lengths { for ch in text.chars() { if ch == '\n' { // Update max line length if self.current_line_length > self.max_line_length { self.max_line_length = self.current_line_length; } // Update total for mean calculation self.total_line_length += self.current_line_length; self.line_count_for_stats += 1; // Only store individual lengths if median is needed if let Some(ref mut lengths) = self.line_lengths { lengths.push(self.current_line_length); } self.current_line_length = 0; } else { self.current_line_length += 1; } } } } /// Helper method to perform binary detection and return appropriate metadata. /// /// Uses the is_binary function to check the buffer and sets text-related outputs accordingly. /// /// # Arguments /// /// * `buffer` - Data to check for binary content. /// /// # Returns /// /// * `(Vec, bool)` - Metadata updates and whether content is binary. fn perform_binary_detection( &mut self, buffer: &[u8], ) -> (Vec, bool) { let mut metadata = Vec::new(); let is_binary_result = is_binary(buffer); self.is_binary_content = Some(is_binary_result); // Output text status let text_value = if is_binary_result { "false".to_string() } else { "true".to_string() }; // Use process_metadata_outputs to handle output mapping if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "text", serde_yaml::Value::String(text_value), self.base.outputs(), ) { metadata.push(meta_data); } // If content is binary, set all text-related outputs to None if is_binary_result { let text_outputs = vec![ "text_word_count", "text_line_count", "text_line_max_len", "text_line_mean_len", "text_line_median_len", ]; for output_name in text_outputs { if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( output_name, serde_yaml::Value::Null, self.base.outputs(), ) { metadata.push(meta_data); } } } (metadata, is_binary_result) } /// Helper method to process the remaining UTF-8 buffer and finalize text statistics. /// /// Calls count_text_stats with empty data to handle any pending UTF-8 bytes. fn process_remaining_utf8_buffer(&mut self) { if !self.utf8_buffer.is_empty() { self.count_text_stats(&[]); } } /// Helper method to handle the last line when tracking line lengths. /// /// Updates statistics for any unfinished line at EOF. fn handle_last_line_for_length_tracking(&mut self) { if self.track_line_lengths && self.current_line_length > 0 { // Update max line length for the last line if self.current_line_length > self.max_line_length { self.max_line_length = self.current_line_length; } // Update total for mean calculation for the last line self.total_line_length += self.current_line_length; self.line_count_for_stats += 1; // Only store individual lengths if median is needed if let Some(ref mut lengths) = self.line_lengths { lengths.push(self.current_line_length); } } } /// Helper method to output word count metadata. /// /// # Returns /// /// * `Option` - Metadata entry if tracking is enabled. fn output_word_count_metadata(&self) -> Option { if self.track_word_count { crate::meta_plugin::process_metadata_outputs( "text_word_count", serde_yaml::Value::String(self.word_count.to_string()), self.base.outputs(), ) } else { None } } /// Helper method to output line count metadata. /// /// # Returns /// /// * `Option` - Metadata entry if tracking is enabled. fn output_line_count_metadata(&self) -> Option { if self.track_line_count { crate::meta_plugin::process_metadata_outputs( "text_line_count", serde_yaml::Value::String(self.line_count.to_string()), self.base.outputs(), ) } else { None } } /// Helper method to output max line length metadata. /// /// # Returns /// /// * `Option` - Metadata entry if enabled and data exists. fn output_max_line_length_metadata(&self) -> Option { if self.output_line_max_len && self.line_count_for_stats > 0 { crate::meta_plugin::process_metadata_outputs( "text_line_max_len", serde_yaml::Value::String(self.max_line_length.to_string()), self.base.outputs(), ) } else { None } } /// Helper method to output mean line length metadata. /// /// Computes average line length and rounds to nearest integer. /// /// # Returns /// /// * `Option` - Metadata entry if enabled and data exists. fn output_mean_line_length_metadata(&self) -> Option { if self.output_line_mean_len && self.line_count_for_stats > 0 { let mean_len = self.total_line_length as f64 / self.line_count_for_stats as f64; // Round to nearest integer let mean_len_int = mean_len.round() as usize; crate::meta_plugin::process_metadata_outputs( "text_line_mean_len", serde_yaml::Value::String(mean_len_int.to_string()), self.base.outputs(), ) } else { None } } /// Helper method to output median line length metadata. /// /// Sorts line lengths and computes median (average of middle two for even count). /// /// # Returns /// /// * `Option` - Metadata entry if enabled and data exists. fn output_median_line_length_metadata(&self) -> Option { if self.output_line_median_len && let Some(lengths) = &self.line_lengths && !lengths.is_empty() { let mut sorted_lengths = lengths.clone(); sorted_lengths.sort(); let median_len = if lengths.len() % 2 == 0 { (sorted_lengths[lengths.len() / 2 - 1] + sorted_lengths[lengths.len() / 2]) as f64 / 2.0 } else { sorted_lengths[lengths.len() / 2] as f64 }; return crate::meta_plugin::process_metadata_outputs( "text_line_median_len", serde_yaml::Value::String(median_len.to_string()), self.base.outputs(), ); } None } /// Helper method to output word and line counts. /// /// Finalizes pending data and collects all enabled text statistics metadata. /// /// # Returns /// /// * `Vec` - List of metadata entries. fn output_word_line_counts(&mut self) -> Vec { // Process any remaining data in utf8_buffer self.process_remaining_utf8_buffer(); // Handle the last line if tracking line lengths self.handle_last_line_for_length_tracking(); // Collect all metadata outputs let mut metadata = Vec::new(); // Add metadata outputs using a more concise approach let outputs_to_check = vec![ (self.output_word_count_metadata(), "word count"), (self.output_line_count_metadata(), "line count"), ]; for (output, _) in outputs_to_check { if let Some(meta_data) = output { metadata.push(meta_data); } } // Output line length statistics if tracked if self.track_line_lengths && self.line_count_for_stats > 0 { let line_stats_outputs = vec![ (self.output_max_line_length_metadata(), "max line length"), (self.output_mean_line_length_metadata(), "mean line length"), ( self.output_median_line_length_metadata(), "median line length", ), ]; for (output, _) in line_stats_outputs { if let Some(meta_data) = output { metadata.push(meta_data); } } } metadata } } impl MetaPlugin for TextMetaPlugin { /// Checks if the plugin has been finalized. /// /// # Returns /// /// `true` if finalized, `false` otherwise. fn is_finalized(&self) -> bool { self.is_finalized } /// Sets the finalized state of the plugin. /// /// # Arguments /// /// * `finalized` - The new finalized state. fn set_finalized(&mut self, finalized: bool) { self.is_finalized = finalized; } /// Updates the plugin with new data chunk. /// /// Accumulates data for binary detection (if pending) or text statistics. /// Finalizes early if binary content is detected. /// /// # Arguments /// /// * `data` - Byte slice of content chunk. /// /// # Returns /// /// * `MetaPluginResponse` - Current metadata and finalized status. fn update(&mut self, data: &[u8]) -> MetaPluginResponse { // If already finalized, don't process more data if self.is_finalized { return MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } let mut metadata = Vec::new(); // If we haven't determined if content is binary yet, build buffer and check if self.is_binary_content.is_none() { let should_finalize = if let Some(ref mut buffer) = self.buffer { // Add data to our buffer up to max_buffer_size let remaining_capacity = self.max_buffer_size.saturating_sub(buffer.len()); let bytes_to_take = std::cmp::min(data.len(), remaining_capacity); buffer.extend_from_slice(&data[..bytes_to_take]); // If we have enough data to make a binary determination, do it now let buffer_len = buffer.len(); if buffer_len >= std::cmp::min(1024, self.max_buffer_size) { // Clone the buffer data for binary detection to avoid borrowing conflicts let buffer_clone = buffer.clone(); let (binary_metadata, is_binary) = self.perform_binary_detection(&buffer_clone); metadata.extend(binary_metadata); self.is_binary_content = Some(is_binary); // If it's binary, we're done with this plugin if is_binary { self.buffer = None; // Drop the buffer self.is_finalized = true; return MetaPluginResponse { metadata, is_finalized: true, }; } // If it's text, count words and lines for this chunk self.count_text_stats(&data[..bytes_to_take]); // If we've reached our buffer limit, drop the buffer to save memory // But don't finalize yet - we need to keep counting words and lines if buffer_len >= self.max_buffer_size { self.buffer = None; // Drop the buffer } false // Never finalize here for text content } else { // Still building up buffer, count words and lines for this chunk self.count_text_stats(&data[..bytes_to_take]); false } } else { false }; if should_finalize { return MetaPluginResponse { metadata, is_finalized: true, }; } } else if self.is_binary_content == Some(false) { // We've already determined it's text, just count words and lines self.count_text_stats(data); } // If is_binary_content == Some(true), we should have already finalized, but just in case: else if self.is_binary_content == Some(true) { self.is_finalized = true; return MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } MetaPluginResponse { metadata, is_finalized: self.is_finalized, } } /// Finalizes the plugin and emits all pending text statistics. /// /// Performs binary detection if not done, then outputs enabled statistics. /// Handles head/tail options for content preview (future implementation). /// /// # Returns /// /// * `MetaPluginResponse` - Final metadata and finalized status. fn finalize(&mut self) -> MetaPluginResponse { // If already finalized, don't process again if self.is_finalized { return MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } let mut metadata = Vec::new(); // Check if we have head/tail options let head_bytes = self .base .options .get("head_bytes") .and_then(|v| v.as_u64()) .map(|v| v as usize); let head_lines = self .base .options .get("head_lines") .and_then(|v| v.as_u64()) .map(|v| v as usize); let tail_bytes = self .base .options .get("tail_bytes") .and_then(|v| v.as_u64()) .map(|v| v as usize); let tail_lines = self .base .options .get("tail_lines") .and_then(|v| v.as_u64()) .map(|v| v as usize); // If we haven't determined binary status yet, do it now with whatever we have if self.is_binary_content.is_none() && let Some(buffer) = &self.buffer && !buffer.is_empty() { let buffer = if head_bytes.is_some() || head_lines.is_some() || tail_bytes.is_some() || tail_lines.is_some() { // Build filter string from individual parameters let mut filter_parts = Vec::new(); if let Some(bytes) = head_bytes { filter_parts.push(format!("head_bytes({bytes})")); } if let Some(lines) = head_lines { filter_parts.push(format!("head_lines({lines})")); } if let Some(bytes) = tail_bytes { filter_parts.push(format!("tail_bytes({bytes})")); } if let Some(lines) = tail_lines { filter_parts.push(format!("tail_lines({lines})")); } // Apply filters if any are specified let filter_string = filter_parts.join(","); match crate::services::FilterService::new() .process_with_filter(buffer, Some(&filter_string)) { Ok(filtered) => filtered, Err(e) => { log::warn!("Failed to apply filters: {e}"); buffer.clone() } } } else { buffer.clone() }; // Clone the processed buffer data for binary detection let (binary_metadata, is_binary) = self.perform_binary_detection(&buffer); metadata.extend(binary_metadata); self.is_binary_content = Some(is_binary); // If it's binary, we're done if is_binary { self.buffer = None; // Drop the buffer self.is_finalized = true; // Set all text-related outputs to None since content is binary // Only include outputs that are enabled in the configuration let text_outputs = vec![ ("text_word_count", self.track_word_count), ("text_line_count", self.track_line_count), ("text_line_max_len", self.output_line_max_len), ("text_line_mean_len", self.output_line_mean_len), ("text_line_median_len", self.output_line_median_len), ]; for (output_name, is_enabled) in text_outputs { if is_enabled && let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( output_name, serde_yaml::Value::Null, self.base.outputs(), ) { metadata.push(meta_data); } } return MetaPluginResponse { metadata, is_finalized: true, }; } } // If content is text, output word and line counts if self.is_binary_content == Some(false) { let word_line_metadata = self.output_word_line_counts(); metadata.extend(word_line_metadata); } // Only include outputs that are enabled in the configuration // Disabled outputs should not be emitted at all (not even as null) // So we don't need to add anything for disabled outputs // Drop the buffer since we're done with it self.buffer = None; // Mark as finalized self.is_finalized = true; MetaPluginResponse { metadata, is_finalized: true, } } /// Returns the type of this meta plugin. /// /// # Returns /// /// `MetaPluginType::Text`. fn meta_type(&self) -> MetaPluginType { MetaPluginType::Text } /// Returns a reference to the outputs mapping. /// /// # Returns /// /// A reference to the `HashMap` of outputs. fn outputs(&self) -> &std::collections::HashMap { self.base.outputs() } /// Returns a mutable reference to the outputs mapping. /// /// # Returns /// /// A mutable reference to the `HashMap` of outputs. fn outputs_mut( &mut self, ) -> anyhow::Result<&mut std::collections::HashMap> { Ok(self.base.outputs_mut()) } /// Returns the default output names for this plugin. /// /// # Returns /// /// Vector of default output field names. fn default_outputs(&self) -> Vec { vec![ "text".to_string(), "text_word_count".to_string(), "text_line_count".to_string(), "text_line_max_len".to_string(), "text_line_mean_len".to_string(), "text_line_median_len".to_string(), ] } /// Returns a reference to the options mapping. /// /// # Returns /// /// A reference to the `HashMap` of outputs. fn options(&self) -> &std::collections::HashMap { self.base.options() } /// Returns a mutable reference to the options mapping. /// /// # Returns /// /// A mutable reference to the `HashMap` of outputs. fn options_mut( &mut self, ) -> anyhow::Result<&mut std::collections::HashMap> { Ok(self.base.options_mut()) } } use crate::meta_plugin::register_meta_plugin; // Register the plugin at module initialization time #[ctor::ctor] fn register_text_plugin() { register_meta_plugin(MetaPluginType::Text, |options, outputs| { Box::new(TextMetaPlugin::new(options, outputs)) }); }