refactor: deduplicate filter plugins, extract helpers across codebase

Bug fixes:
- client: add error field to ApiResponse to avoid swallowing server errors
- args/config: fix list_format default mismatch (5 vs 7 columns)
- client: url-encode size param in set_item_size

Dedup - filter plugins:
- Extract count_option() and pattern_option() helpers, replace 7 identical options()
- Add #[derive(Clone)] to all filter structs; remove verbose clone_box() impls
- Simplify FilterChain clone() and impl Clone for Box<dyn FilterPlugin>
- Add filter_clone_box! macro for future use
- Fix doctest example missing clone_box

Dedup - server API:
- Extract spawn_body_reader() with LimitBehavior enum for body streaming
- Extract check_binary_content() helper
- Extract stream_with_offset_and_length() helper
- Extract generate_status() helper in status.rs
- Extract append_query_params() helper in client.rs

Dedup - other:
- Extract yaml_value_to_string() in meta_plugin/mod.rs
- Extract item_from_row() in db.rs
- Delete unused DisplayListItem struct

Misc:
- Remove duplicate doc comment in compression_service.rs
This commit is contained in:
2026-03-20 15:54:33 -03:00
parent 00be72f3d0
commit 52e9787edb
21 changed files with 309 additions and 572 deletions

View File

@@ -228,7 +228,7 @@ pub struct OptionsArgs {
#[arg(
long,
env("KEEP_LIST_FORMAT"),
default_value("id,time,size,tags,meta:hostname")
default_value("id,time,size,meta:text_line_count,tags,meta:hostname_short,meta:command")
)]
#[arg(help("A comma separated list of columns to display with --list"))]
pub list_format: String,

View File

@@ -35,6 +35,18 @@ fn url_encode(s: &str) -> String {
result
}
fn append_query_params(url: &mut String, params: &[(&str, &str)]) {
if !params.is_empty() {
url.push('?');
for (i, (key, value)) in params.iter().enumerate() {
if i > 0 {
url.push('&');
}
url.push_str(&format!("{}={}", url_encode(key), url_encode(value)));
}
}
}
pub struct KeepClient {
base_url: String,
agent: ureq::Agent,
@@ -127,15 +139,7 @@ impl KeepClient {
params: &[(&str, &str)],
) -> Result<T, CoreError> {
let mut url = self.url(path);
if !params.is_empty() {
url.push('?');
for (i, (key, value)) in params.iter().enumerate() {
if i > 0 {
url.push('&');
}
url.push_str(&format!("{}={}", url_encode(key), url_encode(value)));
}
}
append_query_params(&mut url, params);
let mut req = self.agent.get(&url);
if let Some(ref auth) = self.auth_header() {
req = req.header("Authorization", auth);
@@ -180,15 +184,7 @@ impl KeepClient {
params: &[(&str, &str)],
) -> Result<ItemInfo, CoreError> {
let mut url = self.url(path);
if !params.is_empty() {
url.push('?');
for (i, (key, value)) in params.iter().enumerate() {
if i > 0 {
url.push('&');
}
url.push_str(&format!("{}={}", url_encode(key), url_encode(value)));
}
}
append_query_params(&mut url, params);
let mut req = self.agent.post(&url);
if let Some(ref auth) = self.auth_header() {
@@ -246,11 +242,17 @@ impl KeepClient {
#[derive(serde::Deserialize)]
struct ApiResponse {
data: Option<ItemInfo>,
error: Option<String>,
}
let response: ApiResponse = self.get_json(&format!("/api/item/{id}/info"))?;
response
.data
.ok_or_else(|| CoreError::Other(anyhow::anyhow!("Item not found")))
response.data.ok_or_else(|| {
CoreError::Other(anyhow::anyhow!(
"{}",
response
.error
.unwrap_or_else(|| "Item not found".to_string())
))
})
}
pub fn list_items(
@@ -265,6 +267,7 @@ impl KeepClient {
#[derive(serde::Deserialize)]
struct ApiResponse {
data: Option<Vec<ItemInfo>>,
error: Option<String>,
}
let mut params: Vec<(String, String)> = Vec::new();
@@ -296,7 +299,13 @@ impl KeepClient {
.collect();
let response: ApiResponse = self.get_json_with_query("/api/item/", &param_refs)?;
Ok(response.data.unwrap_or_default())
if let Some(data) = response.data {
return Ok(data);
}
if let Some(err) = response.error {
return Err(CoreError::Other(anyhow::anyhow!("Server error: {err}")));
}
Ok(Vec::new())
}
pub fn save_item(
@@ -358,7 +367,7 @@ impl KeepClient {
let url = format!(
"{}?uncompressed_size={}",
self.url(&format!("/api/item/{id}/update")),
size
url_encode(&size.to_string())
);
let mut req = self.agent.post(&url);
if let Some(ref auth) = self.auth_header() {
@@ -446,15 +455,7 @@ impl KeepClient {
.collect();
let mut url = self.url("/api/export");
if !param_refs.is_empty() {
url.push('?');
for (i, (key, value)) in param_refs.iter().enumerate() {
if i > 0 {
url.push('&');
}
url.push_str(&format!("{}={}", url_encode(key), url_encode(value)));
}
}
append_query_params(&mut url, &param_refs);
let mut req = self.agent.get(&url);
if let Some(ref auth) = self.auth_header() {

View File

@@ -489,7 +489,9 @@ impl Settings {
}
// Override list_format from --list-format CLI arg
if args.options.list_format != "id,time,size,tags,meta:hostname" {
if args.options.list_format
!= "id,time,size,meta:text_line_count,tags,meta:hostname_short,meta:command"
{
debug!("CONFIG: Overriding list_format from --list-format CLI arg");
settings.list_format = Settings::parse_list_format(&args.options.list_format);
}

View File

@@ -2,7 +2,7 @@ use anyhow::{Context, Error, Result, anyhow};
use chrono::prelude::*;
use lazy_static::lazy_static;
use log::*;
use rusqlite::{Connection, OpenFlags, params};
use rusqlite::{Connection, OpenFlags, Row, params};
use rusqlite_migration::{M, Migrations};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -112,6 +112,17 @@ pub struct Item {
pub compression: String,
}
fn item_from_row(row: &Row) -> Result<Item> {
Ok(Item {
id: row.get(0)?,
ts: row.get(1)?,
uncompressed_size: row.get(2)?,
compressed_size: row.get(3)?,
closed: row.get(4)?,
compression: row.get(5)?,
})
}
/// Represents a tag associated with an item.
///
/// Defines the relationship between items and tags in a many-to-many structure.
@@ -852,15 +863,7 @@ pub fn query_all_items(conn: &Connection) -> Result<Vec<Item>> {
let mut items = Vec::new();
while let Some(row) = rows.next()? {
let item = Item {
id: row.get(0)?,
ts: row.get(1)?,
uncompressed_size: row.get(2)?,
compressed_size: row.get(3)?,
closed: row.get(4)?,
compression: row.get(5)?,
};
items.push(item);
items.push(item_from_row(row)?);
}
Ok(items)
@@ -931,15 +934,7 @@ pub fn query_tagged_items<'a>(conn: &'a Connection, tags: &'a Vec<String>) -> Re
let mut items = Vec::new();
while let Some(row) = rows.next()? {
let item = Item {
id: row.get(0)?,
ts: row.get(1)?,
uncompressed_size: row.get(2)?,
compressed_size: row.get(3)?,
closed: row.get(4)?,
compression: row.get(5)?,
};
items.push(item);
items.push(item_from_row(row)?);
}
Ok(items)

View File

@@ -20,10 +20,10 @@ pub fn common_tags(items: &[ItemWithMeta]) -> Vec<String> {
return Vec::new();
}
let mut common: HashSet<String> = items[0].tags.iter().map(|t| t.name.clone()).collect();
let mut common: HashSet<String> = items[0].tag_names().into_iter().collect();
for item in items.iter().skip(1) {
let item_tags: HashSet<String> = item.tags.iter().map(|t| t.name.clone()).collect();
let item_tags: HashSet<String> = item.tag_names().into_iter().collect();
common = common.intersection(&item_tags).cloned().collect();
}
@@ -78,7 +78,7 @@ pub fn write_export_tar<W: Write>(
let item_id = item_with_meta.item.id.context("Item missing ID")?;
let compression = &item_with_meta.item.compression;
let item_tags: Vec<String> = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let item_tags = item_with_meta.tag_names();
let meta_map = item_with_meta.meta_as_map();
let data_path_entry = format!("{dir_name}/{item_id}.data.{compression}");

View File

@@ -164,13 +164,6 @@ impl FilterPlugin for ExecFilter {
Ok(())
}
/// Clones this filter into a new boxed instance.
///
/// Creates a new instance without active process handles.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` representing a clone of this filter.
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(ExecFilter {
program: self.program.clone(),

View File

@@ -87,21 +87,6 @@ impl FilterPlugin for GrepFilter {
Ok(())
}
/// Clones this filter into a new boxed instance.
///
/// Creates a new GrepFilter with the same regex pattern.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` representing a clone of this filter.
///
/// # Examples
///
/// ```
/// # use keep::filter_plugin::{FilterPlugin, GrepFilter};
/// let filter = GrepFilter::new("test".to_string()).unwrap();
/// let cloned = filter.clone_box();
/// ```
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self {
regex: self.regex.clone(),
@@ -126,11 +111,7 @@ impl FilterPlugin for GrepFilter {
/// assert!(opts[0].required);
/// ```
fn options(&self) -> Vec<FilterOption> {
vec![FilterOption {
name: "pattern".to_string(),
default: None,
required: true,
}]
crate::filter_plugin::pattern_option()
}
fn description(&self) -> &str {

View File

@@ -3,14 +3,7 @@ use crate::common::PIPESIZE;
use crate::services::filter_service::register_filter_plugin;
use std::io::{BufRead, Read, Result, Write};
/// A filter that reads the first N bytes from the input stream.
///
/// Limits the output to the initial bytes specified in the configuration.
/// Useful for previewing file contents without reading everything.
///
/// # Fields
///
/// * `remaining` - Number of bytes left to read before stopping.
#[derive(Clone)]
pub struct HeadBytesFilter {
remaining: usize,
}
@@ -94,21 +87,6 @@ impl FilterPlugin for HeadBytesFilter {
Ok(())
}
/// Clones this filter into a new boxed instance.
///
/// Creates an independent copy with the same configuration.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` clone.
///
/// # Examples
///
/// ```
/// # use keep::filter_plugin::{FilterPlugin, HeadBytesFilter};
/// let filter = HeadBytesFilter::new(100);
/// let cloned = filter.clone_box();
/// ```
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self {
remaining: self.remaining,
@@ -134,11 +112,7 @@ impl FilterPlugin for HeadBytesFilter {
/// assert!(opts[0].required);
/// ```
fn options(&self) -> Vec<FilterOption> {
vec![FilterOption {
name: "count".to_string(),
default: None,
required: true,
}]
crate::filter_plugin::count_option()
}
fn description(&self) -> &str {
@@ -146,7 +120,7 @@ impl FilterPlugin for HeadBytesFilter {
}
}
/// A filter that reads the first N lines from the input stream.
#[derive(Clone)]
pub struct HeadLinesFilter {
remaining: usize,
}
@@ -228,21 +202,6 @@ impl FilterPlugin for HeadLinesFilter {
Ok(())
}
/// Clones this filter into a new boxed instance.
///
/// Creates an independent copy with the same configuration.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` clone.
///
/// # Examples
///
/// ```
/// # use keep::filter_plugin::{FilterPlugin, HeadLinesFilter};
/// let filter = HeadLinesFilter::new(5);
/// let cloned = filter.clone_box();
/// ```
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self {
remaining: self.remaining,
@@ -250,29 +209,8 @@ impl FilterPlugin for HeadLinesFilter {
}
/// Returns the configuration options for this filter.
///
/// Defines the "count" parameter as required with no default.
///
/// # Returns
///
/// Vector of `FilterOption` describing parameters.
///
/// # Examples
///
/// ```
/// # use keep::filter_plugin::{FilterPlugin, HeadLinesFilter};
/// let filter = HeadLinesFilter::new(5);
/// let opts = filter.options();
/// assert_eq!(opts.len(), 1);
/// assert_eq!(opts[0].name, "count");
/// assert!(opts[0].required);
/// ```
fn options(&self) -> Vec<FilterOption> {
vec![FilterOption {
name: "count".to_string(),
default: None,
required: true,
}]
crate::filter_plugin::count_option()
}
fn description(&self) -> &str {

View File

@@ -108,18 +108,16 @@ pub trait FilterPlugin: Send {
/// struct MyFilter;
/// impl FilterPlugin for MyFilter {
/// fn filter(&mut self, reader: &mut dyn Read, writer: &mut dyn Write) -> Result<()> {
/// // Read and filter data
/// let mut buf = [0; 1024];
/// loop {
/// let n = reader.read(&mut buf)?;
/// if n == 0 { break; }
/// // Apply filter logic to buf[0..n]
/// writer.write_all(&buf[0..n])?;
/// }
/// Ok(())
/// }
/// fn clone_box(&self) -> Box<dyn FilterPlugin> {
/// Box::new(MyFilter)
/// Box::new(Self)
/// }
/// fn options(&self) -> Vec<FilterOption> {
/// vec![]
@@ -131,22 +129,6 @@ pub trait FilterPlugin: Send {
Ok(())
}
/// Clones this plugin into a new boxed instance.
///
/// This method is required for dynamic dispatch and cloning in filter chains.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` clone of the current plugin.
///
/// # Examples
///
/// ```
/// # use keep::filter_plugin::FilterPlugin;
/// fn example_clone_box(filter: &dyn FilterPlugin) -> Box<dyn FilterPlugin> {
/// filter.clone_box()
/// }
/// ```
fn clone_box(&self) -> Box<dyn FilterPlugin>;
/// Returns the configuration options for this plugin.
@@ -183,6 +165,22 @@ pub trait FilterPlugin: Send {
}
}
pub fn count_option() -> Vec<FilterOption> {
vec![FilterOption {
name: "count".to_string(),
default: None,
required: true,
}]
}
pub fn pattern_option() -> Vec<FilterOption> {
vec![FilterOption {
name: "pattern".to_string(),
default: None,
required: true,
}]
}
/// Enum representing the different types of filters.
///
/// Used for parsing and instantiating specific filter plugins.
@@ -262,16 +260,27 @@ impl Clone for FilterChain {
}
impl Clone for Box<dyn FilterPlugin> {
/// Clones the boxed filter plugin.
///
/// # Returns
///
/// A new boxed clone of the filter plugin.
fn clone(&self) -> Self {
self.clone_box()
}
}
#[macro_export]
macro_rules! filter_clone_box {
($self:expr) => {
Box::new($self.clone())
};
($self:expr, $field:ident) => {
Box::new(Self { $field: $self.$field.clone() })
};
($self:expr, $field:ident, $($rest:ident),+) => {
Box::new(Self {
$field: $self.$field.clone(),
$($rest: $self.$rest.clone()),+
})
};
}
impl Default for FilterChain {
fn default() -> Self {
Self::new()

View File

@@ -4,6 +4,7 @@ use crate::services::filter_service::register_filter_plugin;
use std::io::{BufRead, Read, Result, Write};
/// A filter that skips the first N bytes from the input stream.
#[derive(Clone)]
pub struct SkipBytesFilter {
remaining: usize,
}
@@ -49,11 +50,6 @@ impl FilterPlugin for SkipBytesFilter {
Ok(())
}
/// Clones this filter into a new boxed instance.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` representing a clone of this filter.
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self {
remaining: self.remaining,
@@ -61,16 +57,8 @@ impl FilterPlugin for SkipBytesFilter {
}
/// Returns the configuration options for this filter.
///
/// # Returns
///
/// A vector of `FilterOption` describing the filter's configurable parameters.
fn options(&self) -> Vec<FilterOption> {
vec![FilterOption {
name: "count".to_string(),
default: None,
required: true,
}]
crate::filter_plugin::count_option()
}
fn description(&self) -> &str {
@@ -79,6 +67,7 @@ impl FilterPlugin for SkipBytesFilter {
}
/// A filter that skips the first N lines from the input stream.
#[derive(Clone)]
pub struct SkipLinesFilter {
remaining: usize,
}
@@ -118,11 +107,6 @@ impl FilterPlugin for SkipLinesFilter {
Ok(())
}
/// Clones this filter into a new boxed instance.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` representing a clone of this filter.
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self {
remaining: self.remaining,
@@ -130,16 +114,8 @@ impl FilterPlugin for SkipLinesFilter {
}
/// Returns the configuration options for this filter.
///
/// # Returns
///
/// A vector of `FilterOption` describing the filter's configurable parameters.
fn options(&self) -> Vec<FilterOption> {
vec![FilterOption {
name: "count".to_string(),
default: None,
required: true,
}]
crate::filter_plugin::count_option()
}
fn description(&self) -> &str {

View File

@@ -7,7 +7,7 @@ use strip_ansi_escapes::Writer;
/// # Fields
///
/// None, stateless filter.
#[derive(Default)]
#[derive(Default, Clone)]
pub struct StripAnsiFilter;
impl StripAnsiFilter {
@@ -39,22 +39,12 @@ impl FilterPlugin for StripAnsiFilter {
Ok(())
}
/// Clones this filter into a new boxed instance.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` representing a clone of this filter.
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self)
}
/// Returns the configuration options for this filter (none required).
///
/// # Returns
///
/// An empty vector since this filter has no configurable options.
fn options(&self) -> Vec<FilterOption> {
Vec::new() // strip_ansi doesn't take any options
Vec::new()
}
fn description(&self) -> &str {

View File

@@ -4,7 +4,7 @@ use crate::services::filter_service::register_filter_plugin;
use std::collections::VecDeque;
use std::io::{BufRead, Read, Result, Write};
/// A filter that reads the last N bytes from the input stream.
#[derive(Clone)]
pub struct TailBytesFilter {
buffer: VecDeque<u8>,
count: usize,
@@ -58,11 +58,6 @@ impl FilterPlugin for TailBytesFilter {
Ok(())
}
/// Clones this filter into a new boxed instance.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` representing a clone of this filter.
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self {
buffer: self.buffer.clone(),
@@ -71,16 +66,8 @@ impl FilterPlugin for TailBytesFilter {
}
/// Returns the configuration options for this filter.
///
/// # Returns
///
/// A vector of `FilterOption` describing the filter's configurable parameters.
fn options(&self) -> Vec<FilterOption> {
vec![FilterOption {
name: "count".to_string(),
default: None,
required: true,
}]
crate::filter_plugin::count_option()
}
fn description(&self) -> &str {
@@ -89,6 +76,7 @@ impl FilterPlugin for TailBytesFilter {
}
/// A filter that reads the last N lines from the input stream.
#[derive(Clone)]
pub struct TailLinesFilter {
lines: VecDeque<String>,
count: usize,
@@ -136,11 +124,6 @@ impl FilterPlugin for TailLinesFilter {
Ok(())
}
/// Clones this filter into a new boxed instance.
///
/// # Returns
///
/// A new `Box<dyn FilterPlugin>` representing a clone of this filter.
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self {
lines: self.lines.clone(),
@@ -149,16 +132,8 @@ impl FilterPlugin for TailLinesFilter {
}
/// Returns the configuration options for this filter.
///
/// # Returns
///
/// A vector of `FilterOption` describing the filter's configurable parameters.
fn options(&self) -> Vec<FilterOption> {
vec![FilterOption {
name: "count".to_string(),
default: None,
required: true,
}]
crate::filter_plugin::count_option()
}
fn description(&self) -> &str {

View File

@@ -8,11 +8,7 @@ use std::io::{Read, Result, Write};
// head_tokens
// ---------------------------------------------------------------------------
/// A filter that outputs only the first N tokens of the input stream.
///
/// Streams bytes directly until the token limit is reached. When the limit
/// falls mid-chunk, uses `split_by_token_iter` to find the exact byte boundary
/// without allocating token strings beyond what is needed.
#[derive(Clone)]
pub struct HeadTokensFilter {
pub remaining: usize,
pub tokenizer: Tokenizer,
@@ -78,7 +74,7 @@ impl FilterPlugin for HeadTokensFilter {
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self {
remaining: self.remaining,
tokenizer: get_tokenizer(self.encoding).clone(),
tokenizer: self.tokenizer.clone(),
encoding: self.encoding,
})
}
@@ -107,7 +103,7 @@ impl FilterPlugin for HeadTokensFilter {
// skip_tokens
// ---------------------------------------------------------------------------
/// A filter that skips the first N tokens of the input stream and outputs the rest.
#[derive(Clone)]
pub struct SkipTokensFilter {
pub remaining: usize,
pub tokenizer: Tokenizer,
@@ -180,7 +176,7 @@ impl FilterPlugin for SkipTokensFilter {
fn clone_box(&self) -> Box<dyn FilterPlugin> {
Box::new(Self {
remaining: self.remaining,
tokenizer: get_tokenizer(self.encoding).clone(),
tokenizer: self.tokenizer.clone(),
encoding: self.encoding,
})
}
@@ -211,8 +207,7 @@ impl FilterPlugin for SkipTokensFilter {
/// A filter that outputs only the last N tokens of the input stream.
///
/// Buffers all bytes from the stream, then at finalize tokenizes the
/// content and writes only the last N tokens.
#[derive(Clone)]
pub struct TailTokensFilter {
pub count: usize,
/// Buffer holding all bytes from the stream.
@@ -276,7 +271,7 @@ impl FilterPlugin for TailTokensFilter {
Box::new(Self {
count: self.count,
buffer: Vec::new(),
tokenizer: get_tokenizer(self.encoding).clone(),
tokenizer: self.tokenizer.clone(),
encoding: self.encoding,
})
}

View File

@@ -306,22 +306,7 @@ pub fn process_metadata_outputs(
return None;
}
if let Some(custom_name) = mapping.as_str() {
// Convert the value to a string representation
let value_str = match &value {
serde_yaml::Value::Null => "null".to_string(),
serde_yaml::Value::Bool(b) => b.to_string(),
serde_yaml::Value::Number(n) => n.to_string(),
serde_yaml::Value::String(s) => s.clone(),
serde_yaml::Value::Sequence(_) => {
serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string())
}
serde_yaml::Value::Mapping(_) => {
serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string())
}
serde_yaml::Value::Tagged(_) => {
serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string())
}
};
let value_str = yaml_value_to_string(&value);
debug!(
"META: Processing metadata: internal_name={internal_name}, custom_name={custom_name}, value={value_str}"
);
@@ -332,22 +317,7 @@ pub fn process_metadata_outputs(
}
}
// Convert the value to a string representation
let value_str = match &value {
serde_yaml::Value::Null => "null".to_string(),
serde_yaml::Value::Bool(b) => b.to_string(),
serde_yaml::Value::Number(n) => n.to_string(),
serde_yaml::Value::String(s) => s.clone(),
serde_yaml::Value::Sequence(_) => {
serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string())
}
serde_yaml::Value::Mapping(_) => {
serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string())
}
serde_yaml::Value::Tagged(_) => {
serde_yaml::to_string(&value).unwrap_or_else(|_| "".to_string())
}
};
let value_str = yaml_value_to_string(&value);
// Default: use internal name as output name
debug!("META: Processing metadata: name={internal_name}, value={value_str}");
@@ -357,6 +327,20 @@ pub fn process_metadata_outputs(
})
}
fn yaml_value_to_string(value: &serde_yaml::Value) -> String {
match value {
serde_yaml::Value::Null => "null".to_string(),
serde_yaml::Value::Bool(b) => b.to_string(),
serde_yaml::Value::Number(n) => n.to_string(),
serde_yaml::Value::String(s) => s.clone(),
serde_yaml::Value::Sequence(_)
| serde_yaml::Value::Mapping(_)
| serde_yaml::Value::Tagged(_) => {
serde_yaml::to_string(value).unwrap_or_else(|_| "".to_string())
}
}
}
pub trait MetaPlugin: Send
where
Self: 'static,

View File

@@ -446,15 +446,6 @@ pub struct DisplayItemInfo {
pub metadata: Vec<(String, String)>,
}
/// Display data for a single list row (used by --list).
pub struct DisplayListItem {
pub id: i64,
pub time: String,
pub size: String,
pub compression: String,
pub tags: Vec<String>,
}
/// Renders item detail table. Shared by local and client info modes.
pub fn render_item_info_table(info: &DisplayItemInfo, table_config: &config::TableConfig) {
use comfy_table::{Attribute, Cell};

View File

@@ -109,7 +109,7 @@ pub fn mode_export(
if items.len() == 1 {
let item = &items[0];
let item_id = item.item.id.context("Item missing ID")?;
let item_tags: Vec<String> = item.tags.iter().map(|t| t.name.clone()).collect();
let item_tags = item.tag_names();
vars.insert("id".to_string(), item_id.to_string());
vars.insert("tags".to_string(), sanitize_tags(&item_tags));
vars.insert("compression".to_string(), item.item.compression.clone());

View File

@@ -143,9 +143,9 @@ fn show_item(
return show_item_structured(item_with_meta, settings, data_path, output_format);
}
let item_tags = item_with_meta.tag_names();
let item = item_with_meta.item;
let item_id = item.id.context("Item missing ID")?;
let item_tags: Vec<String> = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let mut item_path_buf = data_path.clone();
item_path_buf.push(item_id.to_string());
@@ -216,7 +216,7 @@ fn show_item_structured(
data_path: PathBuf,
output_format: OutputFormat,
) -> Result<()> {
let item_tags: Vec<String> = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let item_tags = item_with_meta.tag_names();
let meta_map = item_with_meta.meta_as_map();
let item = item_with_meta.item;
let item_id = item.id.context("Item missing ID")?;

View File

@@ -121,7 +121,7 @@ pub fn mode_list(
table.set_header(header_cells);
for item_with_meta in items_with_meta {
let tags: Vec<String> = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let tags = item_with_meta.tag_names();
let meta = item_with_meta.meta_as_map();
let item = item_with_meta.item;
@@ -268,7 +268,7 @@ fn show_list_structured(
let mut list_items = Vec::new();
for item_with_meta in items_with_meta {
let tags: Vec<String> = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let tags = item_with_meta.tag_names();
let meta = item_with_meta.meta_as_map();
let item = item_with_meta.item;
let item_id = item.id.context("Item missing ID")?;

View File

@@ -235,25 +235,7 @@ async fn handle_as_meta_response_with_metadata(
length: u64,
) -> Result<Response, StatusCode> {
// Binary detection: read a sample in a blocking task, check, and return early
let db1 = db.clone();
let item_service1 = item_service.clone();
let is_binary = task::spawn_blocking(move || {
let conn = db1.blocking_lock();
let (mut reader, _) = item_service1.get_item_content_streaming(&conn, item_id)?;
let mut sample = vec![0u8; crate::common::PIPESIZE];
let n = reader.read(&mut sample)?;
sample.truncate(n);
Ok::<bool, CoreError>(crate::common::is_binary::is_binary(&sample))
})
.await
.map_err(|e| {
warn!("Blocking task failed for item {item_id}: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?
.map_err(|e| {
warn!("Failed to check binary status for item {item_id}: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let is_binary = check_binary_content(db, item_service, item_id).await?;
if is_binary {
let response_body = serde_json::json!({
@@ -427,40 +409,12 @@ pub async fn handle_post_item(
.and_then(|s| s.max_body_size)
.filter(|&v| v > 0);
let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(16);
let body_truncated = Arc::new(AtomicBool::new(false));
let truncated_flag = body_truncated.clone();
// Async task: read body frames, track size, stop when limit exceeded
tokio::spawn(async move {
let mut body = body;
let mut total_bytes: u64 = 0;
loop {
match body.frame().await {
None => break,
Some(Err(e)) => {
let _ = tx
.send(Err(std::io::Error::other(format!("Body error: {e}"))))
.await;
break;
}
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
total_bytes += data.len() as u64;
if let Some(limit) = max_body_size
&& total_bytes > limit
{
truncated_flag.store(true, Ordering::Relaxed);
break; // Drop sender → reader sees EOF
}
if tx.send(Ok(data.to_vec())).await.is_err() {
break;
}
}
}
}
}
});
let rx = spawn_body_reader(
body,
max_body_size,
LimitBehavior::SetFlag(body_truncated.clone()),
);
// Blocking task: consume streaming reader, save via save_item_raw_streaming
let truncated_flag = body_truncated.clone();
@@ -822,59 +776,7 @@ async fn stream_raw_content_response(
// Spawn blocking task to read with offset and length
tokio::task::spawn_blocking(move || {
let mut reader = reader;
let mut buf = [0u8; crate::common::PIPESIZE];
// Apply offset by reading and discarding bytes
if offset > 0 {
let mut remaining = offset;
while remaining > 0 {
let to_read = std::cmp::min(remaining, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => remaining -= n as u64,
Err(e) => {
let _ = tx.blocking_send(Err(e));
return;
}
}
}
}
// Read and send data up to the specified length
let mut remaining_length = length;
loop {
let to_read = if length > 0 {
std::cmp::min(remaining_length, buf.len() as u64) as usize
} else {
buf.len()
};
if to_read == 0 {
break;
}
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => {
let chunk = buf[..n].to_vec();
if tx.blocking_send(Ok(chunk)).is_err() {
break;
}
if length > 0 {
remaining_length -= n as u64;
if remaining_length == 0 {
break;
}
}
}
Err(e) => {
let _ = tx.blocking_send(Err(e));
break;
}
}
}
stream_with_offset_and_length(reader, tx, offset, length);
});
// Convert the receiver into a stream
@@ -918,25 +820,7 @@ async fn stream_item_content_response_with_metadata(
// Check if content is binary when allow_binary is false.
// Uses a sample of actual content bytes (not metadata-only) for reliable detection.
if !allow_binary {
let db_check = db.clone();
let item_service_check = item_service.clone();
let is_binary = task::spawn_blocking(move || {
let conn = db_check.blocking_lock();
let (mut reader, _) = item_service_check.get_item_content_streaming(&conn, item_id)?;
let mut sample = vec![0u8; crate::common::PIPESIZE];
let n = reader.read(&mut sample)?;
sample.truncate(n);
Ok::<bool, CoreError>(crate::common::is_binary::is_binary(&sample))
})
.await
.map_err(|e| {
warn!("Blocking task failed for binary check on item {item_id}: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?
.map_err(|e| {
warn!("Failed to check binary status for item {item_id}: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let is_binary = check_binary_content(db, item_service, item_id).await?;
if is_binary {
return Err(StatusCode::BAD_REQUEST);
@@ -951,7 +835,7 @@ async fn stream_item_content_response_with_metadata(
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
let (mut reader, _, _) =
let (reader, _, _) =
match item_service_stream.get_item_content_info_streaming(&conn, item_id, None) {
Ok(r) => r,
Err(e) => {
@@ -959,55 +843,7 @@ async fn stream_item_content_response_with_metadata(
return;
}
};
// Apply offset
if offset > 0 {
let mut buf = [0u8; crate::common::PIPESIZE];
let mut remaining = offset;
while remaining > 0 {
let to_read = std::cmp::min(remaining, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => remaining -= n as u64,
Err(e) => {
let _ = tx.blocking_send(Err(e));
return;
}
}
}
}
// Read and send data
let mut buf = [0u8; crate::common::PIPESIZE];
let mut remaining_length = length;
loop {
let to_read = if length > 0 {
std::cmp::min(remaining_length, buf.len() as u64) as usize
} else {
buf.len()
};
if to_read == 0 {
break;
}
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => {
if tx.blocking_send(Ok(buf[..n].to_vec())).is_err() {
break;
}
if length > 0 {
remaining_length -= n as u64;
if remaining_length == 0 {
break;
}
}
}
Err(e) => {
let _ = tx.blocking_send(Err(e));
break;
}
}
}
stream_with_offset_and_length(reader, tx, offset, length);
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
@@ -1727,40 +1563,7 @@ pub async fn handle_import_items(
.as_ref()
.and_then(|s| s.max_body_size)
.filter(|&v| v > 0);
let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(16);
// Async task: stream body into channel
tokio::spawn(async move {
let mut body = body;
let mut total_bytes: u64 = 0;
loop {
match body.frame().await {
None => break,
Some(Err(e)) => {
let _ = tx
.send(Err(std::io::Error::other(format!("Body error: {e}"))))
.await;
break;
}
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
total_bytes += data.len() as u64;
if let Some(limit) = max_body_size
&& total_bytes > limit
{
let _ = tx
.send(Err(std::io::Error::other("Payload too large")))
.await;
break;
}
if tx.send(Ok(data.to_vec())).await.is_err() {
break;
}
}
}
}
}
});
let rx = spawn_body_reader(body, max_body_size, LimitBehavior::SendError);
let data_dir = state.data_dir.clone();
let db = state.db.clone();
@@ -1815,3 +1618,144 @@ pub async fn handle_import_items(
Ok(Json(ApiResponse::ok(response_data)))
}
/// Controls behavior when body exceeds `max_body_size`.
enum LimitBehavior {
/// Set the flag and silently drop remaining data (partial upload is OK).
SetFlag(Arc<AtomicBool>),
/// Send an explicit IO error to the receiver (reject the payload).
SendError,
}
/// Spawn an async task that reads body frames into an mpsc channel,
/// enforcing `max_body_size` with the given limit behavior.
fn spawn_body_reader(
body: Body,
max_body_size: Option<u64>,
limit_behavior: LimitBehavior,
) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, std::io::Error>> {
let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(16);
tokio::spawn(async move {
let mut body = body;
let mut total_bytes: u64 = 0;
loop {
match body.frame().await {
None => break,
Some(Err(e)) => {
let _ = tx
.send(Err(std::io::Error::other(format!("Body error: {e}"))))
.await;
break;
}
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
total_bytes += data.len() as u64;
if let Some(limit) = max_body_size
&& total_bytes > limit
{
match &limit_behavior {
LimitBehavior::SetFlag(flag) => {
flag.store(true, Ordering::Relaxed);
}
LimitBehavior::SendError => {
let _ = tx
.send(Err(std::io::Error::other("Payload too large")))
.await;
}
}
break;
}
if tx.send(Ok(data.to_vec())).await.is_err() {
break;
}
}
}
}
}
});
rx
}
/// Read a sample of an item's content and check if it's binary.
async fn check_binary_content(
db: &Arc<tokio::sync::Mutex<rusqlite::Connection>>,
item_service: &Arc<ItemService>,
item_id: i64,
) -> Result<bool, StatusCode> {
let db = db.clone();
let item_service = item_service.clone();
task::spawn_blocking(move || {
let conn = db.blocking_lock();
let (mut reader, _) = item_service.get_item_content_streaming(&conn, item_id)?;
let mut sample = vec![0u8; crate::common::PIPESIZE];
let n = reader.read(&mut sample)?;
sample.truncate(n);
Ok::<bool, CoreError>(crate::common::is_binary::is_binary(&sample))
})
.await
.map_err(|e| {
warn!("Blocking task failed for binary check on item {item_id}: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?
.map_err(|e| {
warn!("Failed to check binary status for item {item_id}: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})
}
/// Stream bytes from a reader to an mpsc channel, applying offset skip and length limit.
fn stream_with_offset_and_length(
mut reader: Box<dyn std::io::Read + Send>,
tx: tokio::sync::mpsc::Sender<Result<Vec<u8>, std::io::Error>>,
offset: u64,
length: u64,
) {
let mut buf = [0u8; crate::common::PIPESIZE];
// Apply offset by reading and discarding bytes
if offset > 0 {
let mut remaining = offset;
while remaining > 0 {
let to_read = std::cmp::min(remaining, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => remaining -= n as u64,
Err(e) => {
let _ = tx.blocking_send(Err(e));
return;
}
}
}
}
// Read and send data up to the specified length
let mut remaining_length = length;
loop {
let to_read = if length > 0 {
std::cmp::min(remaining_length, buf.len() as u64) as usize
} else {
buf.len()
};
if to_read == 0 {
break;
}
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => {
if tx.blocking_send(Ok(buf[..n].to_vec())).is_err() {
break;
}
if length > 0 {
remaining_length -= n as u64;
if remaining_length == 0 {
break;
}
}
}
Err(e) => {
let _ = tx.blocking_send(Err(e));
break;
}
}
}
}

View File

@@ -2,6 +2,32 @@ use axum::{extract::State, http::StatusCode, response::Json};
use crate::modes::server::common::{ApiResponse, AppState, StatusInfoResponse};
async fn generate_status(
state: &AppState,
) -> Result<crate::common::status::StatusInfo, StatusCode> {
let db_path = state
.db
.lock()
.await
.path()
.unwrap_or("unknown")
.to_string();
let status_service = crate::services::status_service::StatusService::new();
let mut cmd = state.cmd.lock().await;
status_service
.generate_status(
&mut cmd,
&state.settings,
state.data_dir.clone(),
db_path.into(),
)
.map_err(|e| {
log::warn!("Failed to generate status: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})
}
#[utoipa::path(
get,
path = "/api/status",
@@ -48,29 +74,7 @@ use crate::modes::server::common::{ApiResponse, AppState, StatusInfoResponse};
pub async fn handle_status(
State(state): State<AppState>,
) -> Result<Json<StatusInfoResponse>, StatusCode> {
// Get database path
let db_path = state
.db
.lock()
.await
.path()
.unwrap_or("unknown")
.to_string();
// Use the status service to generate status info showing configured plugins
let status_service = crate::services::status_service::StatusService::new();
let mut cmd = state.cmd.lock().await;
let status_info = status_service
.generate_status(
&mut cmd,
&state.settings,
state.data_dir.clone(),
db_path.into(),
)
.map_err(|e| {
log::warn!("Failed to generate status: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let status_info = generate_status(&state).await?;
let response = StatusInfoResponse {
success: true,
@@ -107,27 +111,7 @@ pub struct PluginsStatusResponse {
pub async fn handle_plugins_status(
State(state): State<AppState>,
) -> Result<Json<crate::modes::server::common::ApiResponse<PluginsStatusResponse>>, StatusCode> {
let db_path = state
.db
.lock()
.await
.path()
.unwrap_or("unknown")
.to_string();
let status_service = crate::services::status_service::StatusService::new();
let mut cmd = state.cmd.lock().await;
let status_info = status_service
.generate_status(
&mut cmd,
&state.settings,
state.data_dir.clone(),
db_path.into(),
)
.map_err(|e| {
log::warn!("Failed to generate status: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let status_info = generate_status(&state).await?;
let response_data = PluginsStatusResponse {
meta_plugins: status_info.meta_plugins,

View File

@@ -7,27 +7,6 @@ use std::str::FromStr;
pub struct CompressionService;
/// Service for handling compression and decompression of item content.
///
/// Provides methods to read compressed item files either fully into memory
/// or as streaming readers. Supports various compression types via engines.
/// This service abstracts the underlying compression engines for consistent access.
///
/// # Examples
///
/// ```ignore
/// let service = CompressionService::new();
/// let content = service.get_item_content(path, "gzip")?;
/// ```
/// Provides methods to read compressed item files either fully into memory
/// or as streaming readers. Supports various compression types via engines.
///
/// # Examples
///
/// ```ignore
/// let service = CompressionService::new();
/// let content = service.get_item_content(path, "gzip")?;
/// ```
impl CompressionService {
/// Creates a new CompressionService instance.
///