diff --git a/src/args.rs b/src/args.rs index 67fa2a1..f9fec4a 100644 --- a/src/args.rs +++ b/src/args.rs @@ -228,7 +228,7 @@ pub struct OptionsArgs { #[arg( long, env("KEEP_LIST_FORMAT"), - default_value("id,time,size,tags,meta:hostname") + default_value("id,time,size,meta:text_line_count,tags,meta:hostname_short,meta:command") )] #[arg(help("A comma separated list of columns to display with --list"))] pub list_format: String, diff --git a/src/client.rs b/src/client.rs index 4ed45f4..0e6b42c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -35,6 +35,18 @@ fn url_encode(s: &str) -> String { result } +fn append_query_params(url: &mut String, params: &[(&str, &str)]) { + if !params.is_empty() { + url.push('?'); + for (i, (key, value)) in params.iter().enumerate() { + if i > 0 { + url.push('&'); + } + url.push_str(&format!("{}={}", url_encode(key), url_encode(value))); + } + } +} + pub struct KeepClient { base_url: String, agent: ureq::Agent, @@ -127,15 +139,7 @@ impl KeepClient { params: &[(&str, &str)], ) -> Result { let mut url = self.url(path); - if !params.is_empty() { - url.push('?'); - for (i, (key, value)) in params.iter().enumerate() { - if i > 0 { - url.push('&'); - } - url.push_str(&format!("{}={}", url_encode(key), url_encode(value))); - } - } + append_query_params(&mut url, params); let mut req = self.agent.get(&url); if let Some(ref auth) = self.auth_header() { req = req.header("Authorization", auth); @@ -180,15 +184,7 @@ impl KeepClient { params: &[(&str, &str)], ) -> Result { let mut url = self.url(path); - if !params.is_empty() { - url.push('?'); - for (i, (key, value)) in params.iter().enumerate() { - if i > 0 { - url.push('&'); - } - url.push_str(&format!("{}={}", url_encode(key), url_encode(value))); - } - } + append_query_params(&mut url, params); let mut req = self.agent.post(&url); if let Some(ref auth) = self.auth_header() { @@ -246,11 +242,17 @@ impl KeepClient { #[derive(serde::Deserialize)] struct ApiResponse { data: Option, + error: Option, } let response: ApiResponse = self.get_json(&format!("/api/item/{id}/info"))?; - response - .data - .ok_or_else(|| CoreError::Other(anyhow::anyhow!("Item not found"))) + response.data.ok_or_else(|| { + CoreError::Other(anyhow::anyhow!( + "{}", + response + .error + .unwrap_or_else(|| "Item not found".to_string()) + )) + }) } pub fn list_items( @@ -265,6 +267,7 @@ impl KeepClient { #[derive(serde::Deserialize)] struct ApiResponse { data: Option>, + error: Option, } let mut params: Vec<(String, String)> = Vec::new(); @@ -296,7 +299,13 @@ impl KeepClient { .collect(); let response: ApiResponse = self.get_json_with_query("/api/item/", ¶m_refs)?; - Ok(response.data.unwrap_or_default()) + if let Some(data) = response.data { + return Ok(data); + } + if let Some(err) = response.error { + return Err(CoreError::Other(anyhow::anyhow!("Server error: {err}"))); + } + Ok(Vec::new()) } pub fn save_item( @@ -358,7 +367,7 @@ impl KeepClient { let url = format!( "{}?uncompressed_size={}", self.url(&format!("/api/item/{id}/update")), - size + url_encode(&size.to_string()) ); let mut req = self.agent.post(&url); if let Some(ref auth) = self.auth_header() { @@ -446,15 +455,7 @@ impl KeepClient { .collect(); let mut url = self.url("/api/export"); - if !param_refs.is_empty() { - url.push('?'); - for (i, (key, value)) in param_refs.iter().enumerate() { - if i > 0 { - url.push('&'); - } - url.push_str(&format!("{}={}", url_encode(key), url_encode(value))); - } - } + append_query_params(&mut url, ¶m_refs); let mut req = self.agent.get(&url); if let Some(ref auth) = self.auth_header() { diff --git a/src/config.rs b/src/config.rs index d4be49e..7d45f1a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -489,7 +489,9 @@ impl Settings { } // Override list_format from --list-format CLI arg - if args.options.list_format != "id,time,size,tags,meta:hostname" { + if args.options.list_format + != "id,time,size,meta:text_line_count,tags,meta:hostname_short,meta:command" + { debug!("CONFIG: Overriding list_format from --list-format CLI arg"); settings.list_format = Settings::parse_list_format(&args.options.list_format); } diff --git a/src/db.rs b/src/db.rs index 9db733a..cb0fb82 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Error, Result, anyhow}; use chrono::prelude::*; use lazy_static::lazy_static; use log::*; -use rusqlite::{Connection, OpenFlags, params}; +use rusqlite::{Connection, OpenFlags, Row, params}; use rusqlite_migration::{M, Migrations}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -112,6 +112,17 @@ pub struct Item { pub compression: String, } +fn item_from_row(row: &Row) -> Result { + Ok(Item { + id: row.get(0)?, + ts: row.get(1)?, + uncompressed_size: row.get(2)?, + compressed_size: row.get(3)?, + closed: row.get(4)?, + compression: row.get(5)?, + }) +} + /// Represents a tag associated with an item. /// /// Defines the relationship between items and tags in a many-to-many structure. @@ -852,15 +863,7 @@ pub fn query_all_items(conn: &Connection) -> Result> { let mut items = Vec::new(); while let Some(row) = rows.next()? { - let item = Item { - id: row.get(0)?, - ts: row.get(1)?, - uncompressed_size: row.get(2)?, - compressed_size: row.get(3)?, - closed: row.get(4)?, - compression: row.get(5)?, - }; - items.push(item); + items.push(item_from_row(row)?); } Ok(items) @@ -931,15 +934,7 @@ pub fn query_tagged_items<'a>(conn: &'a Connection, tags: &'a Vec) -> Re let mut items = Vec::new(); while let Some(row) = rows.next()? { - let item = Item { - id: row.get(0)?, - ts: row.get(1)?, - uncompressed_size: row.get(2)?, - compressed_size: row.get(3)?, - closed: row.get(4)?, - compression: row.get(5)?, - }; - items.push(item); + items.push(item_from_row(row)?); } Ok(items) diff --git a/src/export_tar.rs b/src/export_tar.rs index 91e69ae..e8185e2 100644 --- a/src/export_tar.rs +++ b/src/export_tar.rs @@ -20,10 +20,10 @@ pub fn common_tags(items: &[ItemWithMeta]) -> Vec { return Vec::new(); } - let mut common: HashSet = items[0].tags.iter().map(|t| t.name.clone()).collect(); + let mut common: HashSet = items[0].tag_names().into_iter().collect(); for item in items.iter().skip(1) { - let item_tags: HashSet = item.tags.iter().map(|t| t.name.clone()).collect(); + let item_tags: HashSet = item.tag_names().into_iter().collect(); common = common.intersection(&item_tags).cloned().collect(); } @@ -78,7 +78,7 @@ pub fn write_export_tar( let item_id = item_with_meta.item.id.context("Item missing ID")?; let compression = &item_with_meta.item.compression; - let item_tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); + let item_tags = item_with_meta.tag_names(); let meta_map = item_with_meta.meta_as_map(); let data_path_entry = format!("{dir_name}/{item_id}.data.{compression}"); diff --git a/src/filter_plugin/exec.rs b/src/filter_plugin/exec.rs index e2434e0..db7eb16 100644 --- a/src/filter_plugin/exec.rs +++ b/src/filter_plugin/exec.rs @@ -164,13 +164,6 @@ impl FilterPlugin for ExecFilter { Ok(()) } - /// Clones this filter into a new boxed instance. - /// - /// Creates a new instance without active process handles. - /// - /// # Returns - /// - /// A new `Box` representing a clone of this filter. fn clone_box(&self) -> Box { Box::new(ExecFilter { program: self.program.clone(), diff --git a/src/filter_plugin/grep.rs b/src/filter_plugin/grep.rs index 136bad0..d076937 100644 --- a/src/filter_plugin/grep.rs +++ b/src/filter_plugin/grep.rs @@ -87,21 +87,6 @@ impl FilterPlugin for GrepFilter { Ok(()) } - /// Clones this filter into a new boxed instance. - /// - /// Creates a new GrepFilter with the same regex pattern. - /// - /// # Returns - /// - /// A new `Box` representing a clone of this filter. - /// - /// # Examples - /// - /// ``` - /// # use keep::filter_plugin::{FilterPlugin, GrepFilter}; - /// let filter = GrepFilter::new("test".to_string()).unwrap(); - /// let cloned = filter.clone_box(); - /// ``` fn clone_box(&self) -> Box { Box::new(Self { regex: self.regex.clone(), @@ -126,11 +111,7 @@ impl FilterPlugin for GrepFilter { /// assert!(opts[0].required); /// ``` fn options(&self) -> Vec { - vec![FilterOption { - name: "pattern".to_string(), - default: None, - required: true, - }] + crate::filter_plugin::pattern_option() } fn description(&self) -> &str { diff --git a/src/filter_plugin/head.rs b/src/filter_plugin/head.rs index b8520fe..a1d2378 100644 --- a/src/filter_plugin/head.rs +++ b/src/filter_plugin/head.rs @@ -3,14 +3,7 @@ use crate::common::PIPESIZE; use crate::services::filter_service::register_filter_plugin; use std::io::{BufRead, Read, Result, Write}; -/// A filter that reads the first N bytes from the input stream. -/// -/// Limits the output to the initial bytes specified in the configuration. -/// Useful for previewing file contents without reading everything. -/// -/// # Fields -/// -/// * `remaining` - Number of bytes left to read before stopping. +#[derive(Clone)] pub struct HeadBytesFilter { remaining: usize, } @@ -94,21 +87,6 @@ impl FilterPlugin for HeadBytesFilter { Ok(()) } - /// Clones this filter into a new boxed instance. - /// - /// Creates an independent copy with the same configuration. - /// - /// # Returns - /// - /// A new `Box` clone. - /// - /// # Examples - /// - /// ``` - /// # use keep::filter_plugin::{FilterPlugin, HeadBytesFilter}; - /// let filter = HeadBytesFilter::new(100); - /// let cloned = filter.clone_box(); - /// ``` fn clone_box(&self) -> Box { Box::new(Self { remaining: self.remaining, @@ -134,11 +112,7 @@ impl FilterPlugin for HeadBytesFilter { /// assert!(opts[0].required); /// ``` fn options(&self) -> Vec { - vec![FilterOption { - name: "count".to_string(), - default: None, - required: true, - }] + crate::filter_plugin::count_option() } fn description(&self) -> &str { @@ -146,7 +120,7 @@ impl FilterPlugin for HeadBytesFilter { } } -/// A filter that reads the first N lines from the input stream. +#[derive(Clone)] pub struct HeadLinesFilter { remaining: usize, } @@ -228,21 +202,6 @@ impl FilterPlugin for HeadLinesFilter { Ok(()) } - /// Clones this filter into a new boxed instance. - /// - /// Creates an independent copy with the same configuration. - /// - /// # Returns - /// - /// A new `Box` clone. - /// - /// # Examples - /// - /// ``` - /// # use keep::filter_plugin::{FilterPlugin, HeadLinesFilter}; - /// let filter = HeadLinesFilter::new(5); - /// let cloned = filter.clone_box(); - /// ``` fn clone_box(&self) -> Box { Box::new(Self { remaining: self.remaining, @@ -250,29 +209,8 @@ impl FilterPlugin for HeadLinesFilter { } /// Returns the configuration options for this filter. - /// - /// Defines the "count" parameter as required with no default. - /// - /// # Returns - /// - /// Vector of `FilterOption` describing parameters. - /// - /// # Examples - /// - /// ``` - /// # use keep::filter_plugin::{FilterPlugin, HeadLinesFilter}; - /// let filter = HeadLinesFilter::new(5); - /// let opts = filter.options(); - /// assert_eq!(opts.len(), 1); - /// assert_eq!(opts[0].name, "count"); - /// assert!(opts[0].required); - /// ``` fn options(&self) -> Vec { - vec![FilterOption { - name: "count".to_string(), - default: None, - required: true, - }] + crate::filter_plugin::count_option() } fn description(&self) -> &str { diff --git a/src/filter_plugin/mod.rs b/src/filter_plugin/mod.rs index 64a3455..ee70048 100644 --- a/src/filter_plugin/mod.rs +++ b/src/filter_plugin/mod.rs @@ -108,18 +108,16 @@ pub trait FilterPlugin: Send { /// struct MyFilter; /// impl FilterPlugin for MyFilter { /// fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> { - /// // Read and filter data /// let mut buf = [0; 1024]; /// loop { /// let n = reader.read(&mut buf)?; /// if n == 0 { break; } - /// // Apply filter logic to buf[0..n] /// writer.write_all(&buf[0..n])?; /// } /// Ok(()) /// } /// fn clone_box(&self) -> Box { - /// Box::new(MyFilter) + /// Box::new(Self) /// } /// fn options(&self) -> Vec { /// vec![] @@ -131,22 +129,6 @@ pub trait FilterPlugin: Send { Ok(()) } - /// Clones this plugin into a new boxed instance. - /// - /// This method is required for dynamic dispatch and cloning in filter chains. - /// - /// # Returns - /// - /// A new `Box` clone of the current plugin. - /// - /// # Examples - /// - /// ``` - /// # use keep::filter_plugin::FilterPlugin; - /// fn example_clone_box(filter: &dyn FilterPlugin) -> Box { - /// filter.clone_box() - /// } - /// ``` fn clone_box(&self) -> Box; /// Returns the configuration options for this plugin. @@ -183,6 +165,22 @@ pub trait FilterPlugin: Send { } } +pub fn count_option() -> Vec { + vec![FilterOption { + name: "count".to_string(), + default: None, + required: true, + }] +} + +pub fn pattern_option() -> Vec { + vec![FilterOption { + name: "pattern".to_string(), + default: None, + required: true, + }] +} + /// Enum representing the different types of filters. /// /// Used for parsing and instantiating specific filter plugins. @@ -262,16 +260,27 @@ impl Clone for FilterChain { } impl Clone for Box { - /// Clones the boxed filter plugin. - /// - /// # Returns - /// - /// A new boxed clone of the filter plugin. fn clone(&self) -> Self { self.clone_box() } } +#[macro_export] +macro_rules! filter_clone_box { + ($self:expr) => { + Box::new($self.clone()) + }; + ($self:expr, $field:ident) => { + Box::new(Self { $field: $self.$field.clone() }) + }; + ($self:expr, $field:ident, $($rest:ident),+) => { + Box::new(Self { + $field: $self.$field.clone(), + $($rest: $self.$rest.clone()),+ + }) + }; +} + impl Default for FilterChain { fn default() -> Self { Self::new() diff --git a/src/filter_plugin/skip.rs b/src/filter_plugin/skip.rs index fd6a3f2..2c2dc4b 100644 --- a/src/filter_plugin/skip.rs +++ b/src/filter_plugin/skip.rs @@ -4,6 +4,7 @@ use crate::services::filter_service::register_filter_plugin; use std::io::{BufRead, Read, Result, Write}; /// A filter that skips the first N bytes from the input stream. +#[derive(Clone)] pub struct SkipBytesFilter { remaining: usize, } @@ -49,11 +50,6 @@ impl FilterPlugin for SkipBytesFilter { Ok(()) } - /// Clones this filter into a new boxed instance. - /// - /// # Returns - /// - /// A new `Box` representing a clone of this filter. fn clone_box(&self) -> Box { Box::new(Self { remaining: self.remaining, @@ -61,16 +57,8 @@ impl FilterPlugin for SkipBytesFilter { } /// Returns the configuration options for this filter. - /// - /// # Returns - /// - /// A vector of `FilterOption` describing the filter's configurable parameters. fn options(&self) -> Vec { - vec![FilterOption { - name: "count".to_string(), - default: None, - required: true, - }] + crate::filter_plugin::count_option() } fn description(&self) -> &str { @@ -79,6 +67,7 @@ impl FilterPlugin for SkipBytesFilter { } /// A filter that skips the first N lines from the input stream. +#[derive(Clone)] pub struct SkipLinesFilter { remaining: usize, } @@ -118,11 +107,6 @@ impl FilterPlugin for SkipLinesFilter { Ok(()) } - /// Clones this filter into a new boxed instance. - /// - /// # Returns - /// - /// A new `Box` representing a clone of this filter. fn clone_box(&self) -> Box { Box::new(Self { remaining: self.remaining, @@ -130,16 +114,8 @@ impl FilterPlugin for SkipLinesFilter { } /// Returns the configuration options for this filter. - /// - /// # Returns - /// - /// A vector of `FilterOption` describing the filter's configurable parameters. fn options(&self) -> Vec { - vec![FilterOption { - name: "count".to_string(), - default: None, - required: true, - }] + crate::filter_plugin::count_option() } fn description(&self) -> &str { diff --git a/src/filter_plugin/strip_ansi.rs b/src/filter_plugin/strip_ansi.rs index a7813f6..e18b57a 100644 --- a/src/filter_plugin/strip_ansi.rs +++ b/src/filter_plugin/strip_ansi.rs @@ -7,7 +7,7 @@ use strip_ansi_escapes::Writer; /// # Fields /// /// None, stateless filter. -#[derive(Default)] +#[derive(Default, Clone)] pub struct StripAnsiFilter; impl StripAnsiFilter { @@ -39,22 +39,12 @@ impl FilterPlugin for StripAnsiFilter { Ok(()) } - /// Clones this filter into a new boxed instance. - /// - /// # Returns - /// - /// A new `Box` representing a clone of this filter. fn clone_box(&self) -> Box { Box::new(Self) } - /// Returns the configuration options for this filter (none required). - /// - /// # Returns - /// - /// An empty vector since this filter has no configurable options. fn options(&self) -> Vec { - Vec::new() // strip_ansi doesn't take any options + Vec::new() } fn description(&self) -> &str { diff --git a/src/filter_plugin/tail.rs b/src/filter_plugin/tail.rs index 321142b..da4fb71 100644 --- a/src/filter_plugin/tail.rs +++ b/src/filter_plugin/tail.rs @@ -4,7 +4,7 @@ use crate::services::filter_service::register_filter_plugin; use std::collections::VecDeque; use std::io::{BufRead, Read, Result, Write}; -/// A filter that reads the last N bytes from the input stream. +#[derive(Clone)] pub struct TailBytesFilter { buffer: VecDeque, count: usize, @@ -58,11 +58,6 @@ impl FilterPlugin for TailBytesFilter { Ok(()) } - /// Clones this filter into a new boxed instance. - /// - /// # Returns - /// - /// A new `Box` representing a clone of this filter. fn clone_box(&self) -> Box { Box::new(Self { buffer: self.buffer.clone(), @@ -71,16 +66,8 @@ impl FilterPlugin for TailBytesFilter { } /// Returns the configuration options for this filter. - /// - /// # Returns - /// - /// A vector of `FilterOption` describing the filter's configurable parameters. fn options(&self) -> Vec { - vec![FilterOption { - name: "count".to_string(), - default: None, - required: true, - }] + crate::filter_plugin::count_option() } fn description(&self) -> &str { @@ -89,6 +76,7 @@ impl FilterPlugin for TailBytesFilter { } /// A filter that reads the last N lines from the input stream. +#[derive(Clone)] pub struct TailLinesFilter { lines: VecDeque, count: usize, @@ -136,11 +124,6 @@ impl FilterPlugin for TailLinesFilter { Ok(()) } - /// Clones this filter into a new boxed instance. - /// - /// # Returns - /// - /// A new `Box` representing a clone of this filter. fn clone_box(&self) -> Box { Box::new(Self { lines: self.lines.clone(), @@ -149,16 +132,8 @@ impl FilterPlugin for TailLinesFilter { } /// Returns the configuration options for this filter. - /// - /// # Returns - /// - /// A vector of `FilterOption` describing the filter's configurable parameters. fn options(&self) -> Vec { - vec![FilterOption { - name: "count".to_string(), - default: None, - required: true, - }] + crate::filter_plugin::count_option() } fn description(&self) -> &str { diff --git a/src/filter_plugin/tokens.rs b/src/filter_plugin/tokens.rs index 899dfd3..306fc39 100644 --- a/src/filter_plugin/tokens.rs +++ b/src/filter_plugin/tokens.rs @@ -8,11 +8,7 @@ use std::io::{Read, Result, Write}; // head_tokens // --------------------------------------------------------------------------- -/// A filter that outputs only the first N tokens of the input stream. -/// -/// Streams bytes directly until the token limit is reached. When the limit -/// falls mid-chunk, uses `split_by_token_iter` to find the exact byte boundary -/// without allocating token strings beyond what is needed. +#[derive(Clone)] pub struct HeadTokensFilter { pub remaining: usize, pub tokenizer: Tokenizer, @@ -78,7 +74,7 @@ impl FilterPlugin for HeadTokensFilter { fn clone_box(&self) -> Box { Box::new(Self { remaining: self.remaining, - tokenizer: get_tokenizer(self.encoding).clone(), + tokenizer: self.tokenizer.clone(), encoding: self.encoding, }) } @@ -107,7 +103,7 @@ impl FilterPlugin for HeadTokensFilter { // skip_tokens // --------------------------------------------------------------------------- -/// A filter that skips the first N tokens of the input stream and outputs the rest. +#[derive(Clone)] pub struct SkipTokensFilter { pub remaining: usize, pub tokenizer: Tokenizer, @@ -180,7 +176,7 @@ impl FilterPlugin for SkipTokensFilter { fn clone_box(&self) -> Box { Box::new(Self { remaining: self.remaining, - tokenizer: get_tokenizer(self.encoding).clone(), + tokenizer: self.tokenizer.clone(), encoding: self.encoding, }) } @@ -211,8 +207,7 @@ impl FilterPlugin for SkipTokensFilter { /// A filter that outputs only the last N tokens of the input stream. /// -/// Buffers all bytes from the stream, then at finalize tokenizes the -/// content and writes only the last N tokens. +#[derive(Clone)] pub struct TailTokensFilter { pub count: usize, /// Buffer holding all bytes from the stream. @@ -276,7 +271,7 @@ impl FilterPlugin for TailTokensFilter { Box::new(Self { count: self.count, buffer: Vec::new(), - tokenizer: get_tokenizer(self.encoding).clone(), + tokenizer: self.tokenizer.clone(), encoding: self.encoding, }) } diff --git a/src/meta_plugin/mod.rs b/src/meta_plugin/mod.rs index 3e1f7d2..7294110 100644 --- a/src/meta_plugin/mod.rs +++ b/src/meta_plugin/mod.rs @@ -306,22 +306,7 @@ pub fn process_metadata_outputs( return None; } if let Some(custom_name) = mapping.as_str() { - // Convert the value to a string representation - let value_str = match &value { - serde_yaml::Value::Null => "null".to_string(), - serde_yaml::Value::Bool(b) => b.to_string(), - serde_yaml::Value::Number(n) => n.to_string(), - serde_yaml::Value::String(s) => s.clone(), - serde_yaml::Value::Sequence(_) => { - serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string()) - } - serde_yaml::Value::Mapping(_) => { - serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string()) - } - serde_yaml::Value::Tagged(_) => { - serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string()) - } - }; + let value_str = yaml_value_to_string(&value); debug!( "META: Processing metadata: internal_name={internal_name}, custom_name={custom_name}, value={value_str}" ); @@ -332,22 +317,7 @@ pub fn process_metadata_outputs( } } - // Convert the value to a string representation - let value_str = match &value { - serde_yaml::Value::Null => "null".to_string(), - serde_yaml::Value::Bool(b) => b.to_string(), - serde_yaml::Value::Number(n) => n.to_string(), - serde_yaml::Value::String(s) => s.clone(), - serde_yaml::Value::Sequence(_) => { - serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string()) - } - serde_yaml::Value::Mapping(_) => { - serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string()) - } - serde_yaml::Value::Tagged(_) => { - serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string()) - } - }; + let value_str = yaml_value_to_string(&value); // Default: use internal name as output name debug!("META: Processing metadata: name={internal_name}, value={value_str}"); @@ -357,6 +327,20 @@ pub fn process_metadata_outputs( }) } +fn yaml_value_to_string(value: &serde_yaml::Value) -> String { + match value { + serde_yaml::Value::Null => "null".to_string(), + serde_yaml::Value::Bool(b) => b.to_string(), + serde_yaml::Value::Number(n) => n.to_string(), + serde_yaml::Value::String(s) => s.clone(), + serde_yaml::Value::Sequence(_) + | serde_yaml::Value::Mapping(_) + | serde_yaml::Value::Tagged(_) => { + serde_yaml::to_string(value).unwrap_or_else(|_| "".to_string()) + } + } +} + pub trait MetaPlugin: Send where Self: 'static, diff --git a/src/modes/common.rs b/src/modes/common.rs index 4e7b494..83e0ef2 100644 --- a/src/modes/common.rs +++ b/src/modes/common.rs @@ -446,15 +446,6 @@ pub struct DisplayItemInfo { pub metadata: Vec<(String, String)>, } -/// Display data for a single list row (used by --list). -pub struct DisplayListItem { - pub id: i64, - pub time: String, - pub size: String, - pub compression: String, - pub tags: Vec, -} - /// Renders item detail table. Shared by local and client info modes. pub fn render_item_info_table(info: &DisplayItemInfo, table_config: &config::TableConfig) { use comfy_table::{Attribute, Cell}; diff --git a/src/modes/export.rs b/src/modes/export.rs index 97a08d5..88bc4f5 100644 --- a/src/modes/export.rs +++ b/src/modes/export.rs @@ -109,7 +109,7 @@ pub fn mode_export( if items.len() == 1 { let item = &items[0]; let item_id = item.item.id.context("Item missing ID")?; - let item_tags: Vec = item.tags.iter().map(|t| t.name.clone()).collect(); + let item_tags = item.tag_names(); vars.insert("id".to_string(), item_id.to_string()); vars.insert("tags".to_string(), sanitize_tags(&item_tags)); vars.insert("compression".to_string(), item.item.compression.clone()); diff --git a/src/modes/info.rs b/src/modes/info.rs index 650a79c..c85310c 100644 --- a/src/modes/info.rs +++ b/src/modes/info.rs @@ -143,9 +143,9 @@ fn show_item( return show_item_structured(item_with_meta, settings, data_path, output_format); } + let item_tags = item_with_meta.tag_names(); let item = item_with_meta.item; let item_id = item.id.context("Item missing ID")?; - let item_tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); let mut item_path_buf = data_path.clone(); item_path_buf.push(item_id.to_string()); @@ -216,7 +216,7 @@ fn show_item_structured( data_path: PathBuf, output_format: OutputFormat, ) -> Result<()> { - let item_tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); + let item_tags = item_with_meta.tag_names(); let meta_map = item_with_meta.meta_as_map(); let item = item_with_meta.item; let item_id = item.id.context("Item missing ID")?; diff --git a/src/modes/list.rs b/src/modes/list.rs index 9c0b918..d5563ae 100644 --- a/src/modes/list.rs +++ b/src/modes/list.rs @@ -121,7 +121,7 @@ pub fn mode_list( table.set_header(header_cells); for item_with_meta in items_with_meta { - let tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); + let tags = item_with_meta.tag_names(); let meta = item_with_meta.meta_as_map(); let item = item_with_meta.item; @@ -268,7 +268,7 @@ fn show_list_structured( let mut list_items = Vec::new(); for item_with_meta in items_with_meta { - let tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); + let tags = item_with_meta.tag_names(); let meta = item_with_meta.meta_as_map(); let item = item_with_meta.item; let item_id = item.id.context("Item missing ID")?; diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index 1217aac..071e119 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -235,25 +235,7 @@ async fn handle_as_meta_response_with_metadata( length: u64, ) -> Result { // Binary detection: read a sample in a blocking task, check, and return early - let db1 = db.clone(); - let item_service1 = item_service.clone(); - let is_binary = task::spawn_blocking(move || { - let conn = db1.blocking_lock(); - let (mut reader, _) = item_service1.get_item_content_streaming(&conn, item_id)?; - let mut sample = vec![0u8; crate::common::PIPESIZE]; - let n = reader.read(&mut sample)?; - sample.truncate(n); - Ok::(crate::common::is_binary::is_binary(&sample)) - }) - .await - .map_err(|e| { - warn!("Blocking task failed for item {item_id}: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })? - .map_err(|e| { - warn!("Failed to check binary status for item {item_id}: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; + let is_binary = check_binary_content(db, item_service, item_id).await?; if is_binary { let response_body = serde_json::json!({ @@ -427,40 +409,12 @@ pub async fn handle_post_item( .and_then(|s| s.max_body_size) .filter(|&v| v > 0); - let (tx, rx) = mpsc::channel::, std::io::Error>>(16); let body_truncated = Arc::new(AtomicBool::new(false)); - let truncated_flag = body_truncated.clone(); - - // Async task: read body frames, track size, stop when limit exceeded - tokio::spawn(async move { - let mut body = body; - let mut total_bytes: u64 = 0; - loop { - match body.frame().await { - None => break, - Some(Err(e)) => { - let _ = tx - .send(Err(std::io::Error::other(format!("Body error: {e}")))) - .await; - break; - } - Some(Ok(frame)) => { - if let Ok(data) = frame.into_data() { - total_bytes += data.len() as u64; - if let Some(limit) = max_body_size - && total_bytes > limit - { - truncated_flag.store(true, Ordering::Relaxed); - break; // Drop sender → reader sees EOF - } - if tx.send(Ok(data.to_vec())).await.is_err() { - break; - } - } - } - } - } - }); + let rx = spawn_body_reader( + body, + max_body_size, + LimitBehavior::SetFlag(body_truncated.clone()), + ); // Blocking task: consume streaming reader, save via save_item_raw_streaming let truncated_flag = body_truncated.clone(); @@ -822,59 +776,7 @@ async fn stream_raw_content_response( // Spawn blocking task to read with offset and length tokio::task::spawn_blocking(move || { - let mut reader = reader; - let mut buf = [0u8; crate::common::PIPESIZE]; - - // Apply offset by reading and discarding bytes - if offset > 0 { - let mut remaining = offset; - while remaining > 0 { - let to_read = std::cmp::min(remaining, buf.len() as u64) as usize; - match reader.read(&mut buf[..to_read]) { - Ok(0) => break, - Ok(n) => remaining -= n as u64, - Err(e) => { - let _ = tx.blocking_send(Err(e)); - return; - } - } - } - } - - // Read and send data up to the specified length - let mut remaining_length = length; - - loop { - let to_read = if length > 0 { - std::cmp::min(remaining_length, buf.len() as u64) as usize - } else { - buf.len() - }; - - if to_read == 0 { - break; - } - - match reader.read(&mut buf[..to_read]) { - Ok(0) => break, - Ok(n) => { - let chunk = buf[..n].to_vec(); - if tx.blocking_send(Ok(chunk)).is_err() { - break; - } - if length > 0 { - remaining_length -= n as u64; - if remaining_length == 0 { - break; - } - } - } - Err(e) => { - let _ = tx.blocking_send(Err(e)); - break; - } - } - } + stream_with_offset_and_length(reader, tx, offset, length); }); // Convert the receiver into a stream @@ -918,25 +820,7 @@ async fn stream_item_content_response_with_metadata( // Check if content is binary when allow_binary is false. // Uses a sample of actual content bytes (not metadata-only) for reliable detection. if !allow_binary { - let db_check = db.clone(); - let item_service_check = item_service.clone(); - let is_binary = task::spawn_blocking(move || { - let conn = db_check.blocking_lock(); - let (mut reader, _) = item_service_check.get_item_content_streaming(&conn, item_id)?; - let mut sample = vec![0u8; crate::common::PIPESIZE]; - let n = reader.read(&mut sample)?; - sample.truncate(n); - Ok::(crate::common::is_binary::is_binary(&sample)) - }) - .await - .map_err(|e| { - warn!("Blocking task failed for binary check on item {item_id}: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })? - .map_err(|e| { - warn!("Failed to check binary status for item {item_id}: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; + let is_binary = check_binary_content(db, item_service, item_id).await?; if is_binary { return Err(StatusCode::BAD_REQUEST); @@ -951,7 +835,7 @@ async fn stream_item_content_response_with_metadata( tokio::task::spawn_blocking(move || { let conn = db.blocking_lock(); - let (mut reader, _, _) = + let (reader, _, _) = match item_service_stream.get_item_content_info_streaming(&conn, item_id, None) { Ok(r) => r, Err(e) => { @@ -959,55 +843,7 @@ async fn stream_item_content_response_with_metadata( return; } }; - - // Apply offset - if offset > 0 { - let mut buf = [0u8; crate::common::PIPESIZE]; - let mut remaining = offset; - while remaining > 0 { - let to_read = std::cmp::min(remaining, buf.len() as u64) as usize; - match reader.read(&mut buf[..to_read]) { - Ok(0) => break, - Ok(n) => remaining -= n as u64, - Err(e) => { - let _ = tx.blocking_send(Err(e)); - return; - } - } - } - } - - // Read and send data - let mut buf = [0u8; crate::common::PIPESIZE]; - let mut remaining_length = length; - loop { - let to_read = if length > 0 { - std::cmp::min(remaining_length, buf.len() as u64) as usize - } else { - buf.len() - }; - if to_read == 0 { - break; - } - match reader.read(&mut buf[..to_read]) { - Ok(0) => break, - Ok(n) => { - if tx.blocking_send(Ok(buf[..n].to_vec())).is_err() { - break; - } - if length > 0 { - remaining_length -= n as u64; - if remaining_length == 0 { - break; - } - } - } - Err(e) => { - let _ = tx.blocking_send(Err(e)); - break; - } - } - } + stream_with_offset_and_length(reader, tx, offset, length); }); let stream = tokio_stream::wrappers::ReceiverStream::new(rx); @@ -1727,40 +1563,7 @@ pub async fn handle_import_items( .as_ref() .and_then(|s| s.max_body_size) .filter(|&v| v > 0); - let (tx, rx) = mpsc::channel::, std::io::Error>>(16); - - // Async task: stream body into channel - tokio::spawn(async move { - let mut body = body; - let mut total_bytes: u64 = 0; - loop { - match body.frame().await { - None => break, - Some(Err(e)) => { - let _ = tx - .send(Err(std::io::Error::other(format!("Body error: {e}")))) - .await; - break; - } - Some(Ok(frame)) => { - if let Ok(data) = frame.into_data() { - total_bytes += data.len() as u64; - if let Some(limit) = max_body_size - && total_bytes > limit - { - let _ = tx - .send(Err(std::io::Error::other("Payload too large"))) - .await; - break; - } - if tx.send(Ok(data.to_vec())).await.is_err() { - break; - } - } - } - } - } - }); + let rx = spawn_body_reader(body, max_body_size, LimitBehavior::SendError); let data_dir = state.data_dir.clone(); let db = state.db.clone(); @@ -1815,3 +1618,144 @@ pub async fn handle_import_items( Ok(Json(ApiResponse::ok(response_data))) } + +/// Controls behavior when body exceeds `max_body_size`. +enum LimitBehavior { + /// Set the flag and silently drop remaining data (partial upload is OK). + SetFlag(Arc), + /// Send an explicit IO error to the receiver (reject the payload). + SendError, +} + +/// Spawn an async task that reads body frames into an mpsc channel, +/// enforcing `max_body_size` with the given limit behavior. +fn spawn_body_reader( + body: Body, + max_body_size: Option, + limit_behavior: LimitBehavior, +) -> tokio::sync::mpsc::Receiver, std::io::Error>> { + let (tx, rx) = mpsc::channel::, std::io::Error>>(16); + tokio::spawn(async move { + let mut body = body; + let mut total_bytes: u64 = 0; + loop { + match body.frame().await { + None => break, + Some(Err(e)) => { + let _ = tx + .send(Err(std::io::Error::other(format!("Body error: {e}")))) + .await; + break; + } + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + total_bytes += data.len() as u64; + if let Some(limit) = max_body_size + && total_bytes > limit + { + match &limit_behavior { + LimitBehavior::SetFlag(flag) => { + flag.store(true, Ordering::Relaxed); + } + LimitBehavior::SendError => { + let _ = tx + .send(Err(std::io::Error::other("Payload too large"))) + .await; + } + } + break; + } + if tx.send(Ok(data.to_vec())).await.is_err() { + break; + } + } + } + } + } + }); + rx +} + +/// Read a sample of an item's content and check if it's binary. +async fn check_binary_content( + db: &Arc>, + item_service: &Arc, + item_id: i64, +) -> Result { + let db = db.clone(); + let item_service = item_service.clone(); + task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let (mut reader, _) = item_service.get_item_content_streaming(&conn, item_id)?; + let mut sample = vec![0u8; crate::common::PIPESIZE]; + let n = reader.read(&mut sample)?; + sample.truncate(n); + Ok::(crate::common::is_binary::is_binary(&sample)) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for binary check on item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + warn!("Failed to check binary status for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} + +/// Stream bytes from a reader to an mpsc channel, applying offset skip and length limit. +fn stream_with_offset_and_length( + mut reader: Box, + tx: tokio::sync::mpsc::Sender, std::io::Error>>, + offset: u64, + length: u64, +) { + let mut buf = [0u8; crate::common::PIPESIZE]; + + // Apply offset by reading and discarding bytes + if offset > 0 { + let mut remaining = offset; + while remaining > 0 { + let to_read = std::cmp::min(remaining, buf.len() as u64) as usize; + match reader.read(&mut buf[..to_read]) { + Ok(0) => break, + Ok(n) => remaining -= n as u64, + Err(e) => { + let _ = tx.blocking_send(Err(e)); + return; + } + } + } + } + + // Read and send data up to the specified length + let mut remaining_length = length; + loop { + let to_read = if length > 0 { + std::cmp::min(remaining_length, buf.len() as u64) as usize + } else { + buf.len() + }; + if to_read == 0 { + break; + } + match reader.read(&mut buf[..to_read]) { + Ok(0) => break, + Ok(n) => { + if tx.blocking_send(Ok(buf[..n].to_vec())).is_err() { + break; + } + if length > 0 { + remaining_length -= n as u64; + if remaining_length == 0 { + break; + } + } + } + Err(e) => { + let _ = tx.blocking_send(Err(e)); + break; + } + } + } +} diff --git a/src/modes/server/api/status.rs b/src/modes/server/api/status.rs index 28ffb3f..5a3d092 100644 --- a/src/modes/server/api/status.rs +++ b/src/modes/server/api/status.rs @@ -2,6 +2,32 @@ use axum::{extract::State, http::StatusCode, response::Json}; use crate::modes::server::common::{ApiResponse, AppState, StatusInfoResponse}; +async fn generate_status( + state: &AppState, +) -> Result { + let db_path = state + .db + .lock() + .await + .path() + .unwrap_or("unknown") + .to_string(); + + let status_service = crate::services::status_service::StatusService::new(); + let mut cmd = state.cmd.lock().await; + status_service + .generate_status( + &mut cmd, + &state.settings, + state.data_dir.clone(), + db_path.into(), + ) + .map_err(|e| { + log::warn!("Failed to generate status: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} + #[utoipa::path( get, path = "/api/status", @@ -48,29 +74,7 @@ use crate::modes::server::common::{ApiResponse, AppState, StatusInfoResponse}; pub async fn handle_status( State(state): State, ) -> Result, StatusCode> { - // Get database path - let db_path = state - .db - .lock() - .await - .path() - .unwrap_or("unknown") - .to_string(); - - // Use the status service to generate status info showing configured plugins - let status_service = crate::services::status_service::StatusService::new(); - let mut cmd = state.cmd.lock().await; - let status_info = status_service - .generate_status( - &mut cmd, - &state.settings, - state.data_dir.clone(), - db_path.into(), - ) - .map_err(|e| { - log::warn!("Failed to generate status: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; + let status_info = generate_status(&state).await?; let response = StatusInfoResponse { success: true, @@ -107,27 +111,7 @@ pub struct PluginsStatusResponse { pub async fn handle_plugins_status( State(state): State, ) -> Result>, StatusCode> { - let db_path = state - .db - .lock() - .await - .path() - .unwrap_or("unknown") - .to_string(); - - let status_service = crate::services::status_service::StatusService::new(); - let mut cmd = state.cmd.lock().await; - let status_info = status_service - .generate_status( - &mut cmd, - &state.settings, - state.data_dir.clone(), - db_path.into(), - ) - .map_err(|e| { - log::warn!("Failed to generate status: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; + let status_info = generate_status(&state).await?; let response_data = PluginsStatusResponse { meta_plugins: status_info.meta_plugins, diff --git a/src/services/compression_service.rs b/src/services/compression_service.rs index 0976b32..74ce758 100644 --- a/src/services/compression_service.rs +++ b/src/services/compression_service.rs @@ -7,27 +7,6 @@ use std::str::FromStr; pub struct CompressionService; -/// Service for handling compression and decompression of item content. -/// -/// Provides methods to read compressed item files either fully into memory -/// or as streaming readers. Supports various compression types via engines. -/// This service abstracts the underlying compression engines for consistent access. -/// -/// # Examples -/// -/// ```ignore -/// let service = CompressionService::new(); -/// let content = service.get_item_content(path, "gzip")?; -/// ``` -/// Provides methods to read compressed item files either fully into memory -/// or as streaming readers. Supports various compression types via engines. -/// -/// # Examples -/// -/// ```ignore -/// let service = CompressionService::new(); -/// let content = service.get_item_content(path, "gzip")?; -/// ``` impl CompressionService { /// Creates a new CompressionService instance. ///