use std::time::Instant; use crate::meta_plugin::{MetaPlugin, MetaPluginType, BaseMetaPlugin}; #[derive(Debug, Clone, Default)] /// Meta plugin that calculates the read rate (KB/s) of input data. /// /// Tracks bytes read and elapsed time, then computes the rate in finalize(). /// Outputs the rate via configured mappings. Supports options for customization /// (though defaults are used here). /// /// # Fields /// /// * `start_time` - Start time of reading, if begun. /// * `bytes_read` - Total bytes accumulated. /// * `is_finalized` - Whether processing is complete. /// * `base` - Base plugin for outputs and options. pub struct ReadRateMetaPlugin { start_time: Option, bytes_read: u64, is_finalized: bool, base: BaseMetaPlugin, } impl ReadRateMetaPlugin { /// Creates a new `ReadRateMetaPlugin` instance. /// /// Initializes with default options and outputs, merging provided ones. /// Starts tracking from zero bytes and no start time. /// /// # Arguments /// /// * `_options` - Optional configuration options (merged with defaults; unused specifics here). /// * `outputs` - Optional output mappings (merged with default "read_rate"). /// /// # Returns /// /// A new, un-finalized `ReadRateMetaPlugin` instance. /// /// # Examples /// /// ``` /// let plugin = ReadRateMetaPlugin::new(None, None); /// assert!(!plugin.is_finalized()); /// ``` pub fn new( _options: Option>, outputs: Option>, ) -> ReadRateMetaPlugin { let mut base = BaseMetaPlugin::new(); // Set default outputs let default_outputs = &["read_rate"]; base.initialize_plugin(default_outputs, _options, outputs); ReadRateMetaPlugin { start_time: None, bytes_read: 0, is_finalized: false, base, } } } impl MetaPlugin for ReadRateMetaPlugin { /// Checks if the plugin has been finalized. /// /// # Returns /// /// `true` if finalized (processing complete), `false` otherwise. fn is_finalized(&self) -> bool { self.is_finalized } /// Sets the finalized state of the plugin. /// /// Marks the plugin as complete or resets it. /// /// # Arguments /// /// * `finalized` - Whether processing is now complete. fn set_finalized(&mut self, finalized: bool) { self.is_finalized = finalized; } /// Finalizes the plugin, calculating the read rate. /// /// Computes KB/s from bytes read and elapsed time. Outputs via mappings. /// Idempotent: skips if already finalized. /// /// # Returns /// /// A `MetaPluginResponse` with rate metadata (if computable) and finalized=true. /// /// # Errors /// /// None; returns empty metadata if no start time or zero duration. fn finalize(&mut self) -> crate::meta_plugin::MetaPluginResponse { // If already finalized, don't process again if self.is_finalized { return crate::meta_plugin::MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } let mut metadata = Vec::new(); if let Some(start_time) = self.start_time { let duration = start_time.elapsed(); let rate = if duration.as_secs_f64() > 0.0 { format!("{:.2} KB/s", (self.bytes_read as f64 / 1024.0) / duration.as_secs_f64()) } else { "N/A".to_string() }; // Use process_metadata_outputs to handle output mapping if let Some(meta_data) = crate::meta_plugin::process_metadata_outputs( "read_rate", serde_yaml::Value::String(rate), self.base.outputs() ) { metadata.push(meta_data); } } // Mark as finalized self.is_finalized = true; crate::meta_plugin::MetaPluginResponse { metadata, is_finalized: true, } } /// Updates the plugin with new data, accumulating bytes read. /// /// Starts timer on first update if not set. Accumulates byte count. /// Idempotent post-finalize: ignores data. /// /// # Arguments /// /// * `data` - Byte slice to process (length added to total). /// /// # Returns /// /// `MetaPluginResponse` with no metadata and finalized=false (unless already done). fn update(&mut self, data: &[u8]) -> crate::meta_plugin::MetaPluginResponse { // If already finalized, don't process more data if self.is_finalized { return crate::meta_plugin::MetaPluginResponse { metadata: Vec::new(), is_finalized: true, }; } if self.start_time.is_none() { self.start_time = Some(Instant::now()); } self.bytes_read += data.len() as u64; crate::meta_plugin::MetaPluginResponse { metadata: Vec::new(), is_finalized: false, } } /// Returns the type of this meta plugin. /// /// # Returns /// /// `MetaPluginType::ReadRate`. fn meta_type(&self) -> MetaPluginType { MetaPluginType::ReadRate } /// Returns a reference to the outputs mapping. /// /// # Returns /// /// Immutable reference to the outputs HashMap. fn outputs(&self) -> &std::collections::HashMap { self.base.outputs() } /// Returns a mutable reference to the outputs mapping. /// /// Allows modification of output configurations. /// /// # Returns /// /// Mutable reference to the outputs HashMap. fn outputs_mut(&mut self) -> &mut std::collections::HashMap { self.base.outputs_mut() } /// Returns the default output names for this plugin. /// /// # Returns /// /// Vector containing "read_rate". fn default_outputs(&self) -> Vec { vec!["read_rate".to_string()] } /// Returns a reference to the options mapping. /// /// # Returns /// /// Immutable reference to the options HashMap. fn options(&self) -> &std::collections::HashMap { self.base.options() } /// Returns a mutable reference to the options mapping. /// /// Allows modification of plugin options. /// /// # Returns /// /// Mutable reference to the options HashMap. fn options_mut(&mut self) -> &mut std::collections::HashMap { self.base.options_mut() } } use crate::meta_plugin::register_meta_plugin; // Register the plugin at module initialization time #[ctor::ctor] fn register_read_rate_plugin() { register_meta_plugin(MetaPluginType::ReadRate, |options, outputs| { Box::new(ReadRateMetaPlugin::new(options, outputs)) }); }