feat: add native zstd compression plugin and deduplicate shared compression/meta utilities

- Add zstd crate (v0.13) with native Rust compression engine (level 3)
- Gate behind 'zstd' feature flag, fall back to program-based when disabled
- Extract CompressionService::decompressing_reader/compressing_writer with zstd support
- Extract MetaService::with_collector() to eliminate Arc<Mutex<Vec>> boilerplate
- Extract read_with_bounds() helper for skip+read pattern
- Add input validation for mutually exclusive --id and --tags flags
- Add zstd round-trip tests
This commit is contained in:
2026-03-16 20:03:30 -03:00
parent 35ee71c3cf
commit a90c19efc1
19 changed files with 310 additions and 159 deletions

29
Cargo.lock generated
View File

@@ -1732,6 +1732,7 @@ dependencies = [
"uzers",
"which",
"xdg",
"zstd",
]
[[package]]
@@ -4048,3 +4049,31 @@ dependencies = [
"log",
"simd-adler32",
]
[[package]]
name = "zstd"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "7.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.16+zstd.1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748"
dependencies = [
"cc",
"pkg-config",
]

View File

@@ -39,6 +39,7 @@ libc = "0.2"
local-ip-address = "0.6"
log = "0.4"
lz4_flex = { version = "0.12", optional = true }
zstd = { version = "0.13", optional = true }
magic = { version = "0.13", optional = true }
nix = { version = "0.30", features = ["fs", "process"] }
once_cell = "1.21"
@@ -96,7 +97,7 @@ gzip = ["flate2"]
lz4 = ["lz4_flex"]
bzip2 = []
xz = []
zstd = []
zstd = ["dep:zstd"]
# Plugin features (meta and filter)
all-meta-plugins = ["dep:magic"]

View File

@@ -26,3 +26,59 @@ pub fn stream_copy<R: std::io::Read + ?Sized>(
}
Ok(())
}
/// Reads content from a reader with offset and length bounds.
///
/// Skips `offset` bytes from the reader, then reads up to `length` bytes
/// (or all remaining if `length` is 0). Uses PIPESIZE buffers throughout.
///
/// # Arguments
///
/// * `reader` - The source reader positioned at the start.
/// * `offset` - Number of bytes to skip before reading.
/// * `length` - Maximum bytes to read (0 = read all remaining).
/// * `content_len` - Total content size (used to cap skip/read amounts).
///
/// # Returns
///
/// A `Vec<u8>` containing the requested byte range.
pub fn read_with_bounds<R: std::io::Read>(
reader: &mut R,
offset: u64,
length: u64,
content_len: u64,
) -> std::io::Result<Vec<u8>> {
// Skip offset bytes
let skip = std::cmp::min(offset, content_len);
let mut remaining = skip;
let mut buf = [0u8; PIPESIZE];
while remaining > 0 {
let to_read = std::cmp::min(remaining, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => remaining -= n as u64,
Err(e) => return Err(e),
}
}
// Read bounded content
let max_bytes = if length > 0 {
std::cmp::min(length, content_len.saturating_sub(offset))
} else {
content_len.saturating_sub(offset)
};
let mut result = Vec::with_capacity(std::cmp::min(max_bytes, 64 * 1024) as usize);
let mut bytes_read = 0u64;
while bytes_read < max_bytes {
let to_read = std::cmp::min(max_bytes - bytes_read, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => {
result.extend_from_slice(&buf[..n]);
bytes_read += n as u64;
}
Err(e) => return Err(e),
}
}
Ok(result)
}

View File

@@ -11,12 +11,12 @@ use std::io::{Read, Write};
#[cfg(feature = "gzip")]
use std::path::PathBuf;
#[cfg(feature = "gzip")]
use flate2::Compression;
#[cfg(feature = "gzip")]
use flate2::read::GzDecoder;
#[cfg(feature = "gzip")]
use flate2::write::GzEncoder;
#[cfg(feature = "gzip")]
use flate2::Compression;
#[cfg(feature = "gzip")]
use crate::compression_engine::CompressionEngine;

View File

@@ -1,4 +1,4 @@
use anyhow::{Result, anyhow};
use anyhow::{anyhow, Result};
use std::io;
use std::io::{Read, Write};
use std::path::PathBuf;
@@ -17,6 +17,7 @@ pub mod gzip;
pub mod lz4;
pub mod none;
pub mod program;
pub mod zstd;
use crate::compression_engine::program::CompressionEngineProgram;
@@ -225,6 +226,13 @@ lazy_static! {
as Box<dyn CompressionEngine>;
}
#[cfg(feature = "zstd")]
{
em[CompressionType::ZStd] =
Box::new(crate::compression_engine::zstd::CompressionEngineZstd::new())
as Box<dyn CompressionEngine>;
}
em
};
}

View File

@@ -1,4 +1,4 @@
use anyhow::{Context, Result, anyhow};
use anyhow::{anyhow, Context, Result};
use log::*;
use std::fs::File;
use std::io::{Read, Write};

View File

@@ -0,0 +1,54 @@
#[cfg(feature = "zstd")]
use anyhow::Result;
#[cfg(feature = "zstd")]
use log::*;
#[cfg(feature = "zstd")]
use std::io::Write;
#[cfg(feature = "zstd")]
use std::fs::File;
#[cfg(feature = "zstd")]
use std::io::Read;
#[cfg(feature = "zstd")]
use std::path::PathBuf;
#[cfg(feature = "zstd")]
use zstd::stream::read::Decoder;
#[cfg(feature = "zstd")]
use zstd::stream::write::Encoder;
#[cfg(feature = "zstd")]
use crate::compression_engine::CompressionEngine;
#[cfg(feature = "zstd")]
#[derive(Debug, Eq, PartialEq, Clone, Default)]
pub struct CompressionEngineZstd {}
#[cfg(feature = "zstd")]
impl CompressionEngineZstd {
pub fn new() -> CompressionEngineZstd {
CompressionEngineZstd {}
}
}
#[cfg(feature = "zstd")]
impl CompressionEngine for CompressionEngineZstd {
fn open(&self, file_path: PathBuf) -> Result<Box<dyn Read + Send>> {
debug!("COMPRESSION: Opening {:?} using {:?}", file_path, *self);
let file = File::open(file_path)?;
Ok(Box::new(Decoder::new(file)?))
}
fn create(&self, file_path: PathBuf) -> Result<Box<dyn Write>> {
debug!("COMPRESSION: Writing to {:?} using {:?}", file_path, *self);
let file = File::create(file_path)?;
let zstd_write = Encoder::new(file, 3)?.auto_finish();
Ok(Box::new(zstd_write))
}
fn clone_box(&self) -> Box<dyn CompressionEngine> {
Box::new(self.clone())
}
}

View File

@@ -1,4 +1,4 @@
use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result, anyhow};
use chrono::Utc;
use clap::Command;
use log::debug;
@@ -8,7 +8,7 @@ use std::io::{Read, Write};
use crate::client::KeepClient;
use crate::config;
use crate::modes::common::{resolve_item_id, sanitize_tags, ExportMeta};
use crate::modes::common::{ExportMeta, resolve_item_id, sanitize_tags};
/// Export an item to data and metadata files via client.
///
@@ -21,7 +21,13 @@ pub fn mode(
ids: &[i64],
tags: &[String],
) -> Result<()> {
if ids.len() > 1 {
if !ids.is_empty() && !tags.is_empty() {
cmd.error(
clap::error::ErrorKind::InvalidValue,
"Both ID and tags given, you must supply either IDs or tags when using --export",
)
.exit();
} else if ids.len() > 1 {
cmd.error(
clap::error::ErrorKind::InvalidValue,
"More than one ID given, you must supply exactly one ID when using --export",

View File

@@ -2,6 +2,7 @@ use crate::client::KeepClient;
use crate::compression_engine::CompressionType;
use crate::filter_plugin::FilterChain;
use crate::modes::common::{check_binary_tty, resolve_item_id};
use crate::services::compression_service::CompressionService;
use anyhow::Result;
use clap::Command;
use log::debug;
@@ -10,7 +11,7 @@ use std::str::FromStr;
pub fn mode(
client: &KeepClient,
_cmd: &mut Command,
cmd: &mut Command,
settings: &crate::config::Settings,
ids: &[i64],
tags: &[String],
@@ -18,6 +19,14 @@ pub fn mode(
) -> Result<(), anyhow::Error> {
debug!("CLIENT_GET: Getting item via remote server");
if !ids.is_empty() && !tags.is_empty() {
cmd.error(
clap::error::ErrorKind::InvalidValue,
"Both ID and tags given, you must supply either IDs or tags when using --get",
)
.exit();
}
let item_id = resolve_item_id(client, ids, tags)?;
// Get item info for metadata
@@ -29,17 +38,8 @@ pub fn mode(
let compression_type = CompressionType::from_str(&compression).unwrap_or(CompressionType::None);
// Decompress through streaming readers
let mut decompressed_reader: Box<dyn Read> = match compression_type {
CompressionType::GZip => {
use flate2::read::GzDecoder;
Box::new(GzDecoder::new(reader))
}
CompressionType::LZ4 => {
use lz4_flex::frame::FrameDecoder;
Box::new(FrameDecoder::new(reader))
}
_ => reader,
};
let mut decompressed_reader: Box<dyn Read> =
CompressionService::decompressing_reader(reader, &compression_type);
// Binary detection: sample first chunk
let mut sample_buf = [0u8; crate::common::PIPESIZE];

View File

@@ -3,6 +3,7 @@ use crate::compression_engine::CompressionType;
use crate::config::Settings;
use crate::meta_plugin::SaveMetaFn;
use crate::modes::common::settings_compression_type;
use crate::services::compression_service::CompressionService;
use crate::services::meta_service::MetaService;
use anyhow::Result;
use clap::Command;
@@ -73,15 +74,8 @@ pub fn mode(
meta_service.initialize_plugins(&mut plugins);
// Wrap pipe writer with appropriate compression
let mut compressor: Box<dyn Write> = match compression_type_clone {
CompressionType::GZip => {
use flate2::Compression;
use flate2::write::GzEncoder;
Box::new(GzEncoder::new(pipe_writer, Compression::default()))
}
CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(pipe_writer)),
_ => Box::new(pipe_writer),
};
let mut compressor: Box<dyn Write> =
CompressionService::compressing_writer(Box::new(pipe_writer), &compression_type_clone);
loop {
let n = stdin_lock.read(&mut buffer)?;

View File

@@ -1,4 +1,4 @@
use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Utc};
use clap::Command;
use log::debug;
@@ -9,7 +9,7 @@ use std::path::PathBuf;
use crate::config;
use crate::filter_plugin::FilterChain;
use crate::modes::common::{sanitize_tags, ExportMeta};
use crate::modes::common::{ExportMeta, sanitize_tags};
use crate::services::item_service::ItemService;
/// Export an item to data and metadata files.

View File

@@ -276,42 +276,8 @@ async fn handle_as_meta_response_with_metadata(
item_service.get_item_content_streaming(&conn, item_id)?;
let content_len = item_with_meta.item.size.unwrap_or(0) as u64;
// Sample is already consumed by the first task; this is a fresh reader.
// Skip to offset using PIPESIZE buffer
if offset > 0 {
let skip = std::cmp::min(offset, content_len);
let mut skipped = 0u64;
let mut buf = vec![0u8; crate::common::PIPESIZE];
while skipped < skip {
let to_read = std::cmp::min(skip - skipped, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => skipped += n as u64,
Err(e) => return Err(CoreError::Io(e)),
}
}
}
// Read up to length bytes (or all if length == 0)
let max_bytes = if length > 0 {
std::cmp::min(length, content_len.saturating_sub(offset))
} else {
content_len.saturating_sub(offset)
};
let mut result = Vec::with_capacity(std::cmp::min(max_bytes, 64 * 1024) as usize);
let mut buf = vec![0u8; crate::common::PIPESIZE];
let mut bytes_read = 0u64;
while bytes_read < max_bytes {
let to_read = std::cmp::min(max_bytes - bytes_read, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => {
result.extend_from_slice(&buf[..n]);
bytes_read += n as u64;
}
Err(e) => return Err(CoreError::Io(e)),
}
}
let result = crate::common::read_with_bounds(&mut reader, offset, length, content_len)
.map_err(CoreError::Io)?;
String::from_utf8(result)
.map_err(|_| CoreError::InvalidInput("Content is not valid UTF-8".to_string()))
@@ -1091,53 +1057,11 @@ async fn stream_item_content_response_with_metadata(
let content_len = content_len_result as u64;
// Calculate offset and length bounds
let start = std::cmp::min(offset, content_len);
let end = if length > 0 {
std::cmp::min(start + length, content_len)
} else {
content_len
};
let response_len = end - start;
// Read content with offset and length using fixed-size buffer
let content = tokio::task::spawn_blocking(move || {
let mut reader = reader;
let mut buf = [0u8; crate::common::PIPESIZE];
let mut result = Vec::new();
let mut bytes_read = 0u64;
// Skip offset bytes
if offset > 0 {
let mut remaining = offset;
while remaining > 0 {
let to_read = std::cmp::min(remaining, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => remaining -= n as u64,
Err(e) => return Err(CoreError::Io(e)),
}
}
}
// Read up to length bytes
let mut remaining = if length > 0 { length } else { u64::MAX };
while remaining > 0 && bytes_read < response_len {
let to_read = std::cmp::min(remaining, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => {
result.extend_from_slice(&buf[..n]);
bytes_read += n as u64;
if length > 0 {
remaining -= n as u64;
}
}
Err(e) => return Err(CoreError::Io(e)),
}
}
Ok::<Vec<u8>, CoreError>(result)
let mut r = reader;
crate::common::read_with_bounds(&mut r, offset, length, content_len)
.map_err(CoreError::Io)
})
.await
.map_err(|e| {

View File

@@ -10,7 +10,6 @@ use crate::services::meta_service::MetaService;
use clap::Command;
use log::debug;
use rusqlite::Connection;
use std::sync::{Arc, Mutex};
/// Handles the update mode: modifies tags and metadata for an existing item by ID.
///
@@ -197,16 +196,7 @@ fn run_meta_plugins_on_item(
}
// Collect metadata in memory
let collected_meta: Arc<Mutex<Vec<(String, String)>>> = Arc::new(Mutex::new(Vec::new()));
let collector = collected_meta.clone();
let save_meta: crate::meta_plugin::SaveMetaFn =
Arc::new(Mutex::new(move |name: &str, value: &str| {
if let Ok(mut v) = collector.lock() {
v.push((name.to_string(), value.to_string()));
}
}));
let meta_service = MetaService::new(save_meta);
let (meta_service, collected_meta) = MetaService::with_collector();
let mut plugins = meta_service.get_plugins(cmd, settings);
if plugins.is_empty() {

View File

@@ -1,7 +1,7 @@
use crate::compression_engine::{CompressionType, get_compression_engine};
use crate::compression_engine::{get_compression_engine, CompressionType};
use crate::services::error::CoreError;
use anyhow::anyhow;
use std::io::Read;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::str::FromStr;
@@ -132,6 +132,78 @@ impl CompressionService {
})?;
Ok(reader)
}
/// Creates a decompressing reader wrapping the given reader.
///
/// Returns a boxed reader that decompresses on the fly based on the compression type.
/// Useful for decompressing network streams or other non-file sources.
///
/// # Arguments
///
/// * `reader` - The underlying compressed reader.
/// * `compression` - Compression type string (e.g., "gzip", "lz4").
///
/// # Returns
///
/// A boxed decompressing reader. Unknown/none types pass through unchanged.
pub fn decompressing_reader(
reader: Box<dyn Read>,
compression: &CompressionType,
) -> Box<dyn Read> {
match compression {
CompressionType::GZip => {
use flate2::read::GzDecoder;
Box::new(GzDecoder::new(reader))
}
CompressionType::LZ4 => {
use lz4_flex::frame::FrameDecoder;
Box::new(FrameDecoder::new(reader))
}
#[cfg(feature = "zstd")]
CompressionType::ZStd => {
use zstd::stream::read::Decoder;
Box::new(Decoder::new(reader).expect("Failed to create zstd decoder"))
}
_ => reader,
}
}
/// Creates a compressing writer wrapping the given writer.
///
/// Returns a boxed writer that compresses on the fly based on the compression type.
/// Useful for compressing data to network streams or pipes.
///
/// # Arguments
///
/// * `writer` - The underlying destination writer.
/// * `compression` - Compression type string (e.g., "gzip", "lz4").
///
/// # Returns
///
/// A boxed compressing writer. Unknown/none types pass through unchanged.
pub fn compressing_writer(
writer: Box<dyn Write>,
compression: &CompressionType,
) -> Box<dyn Write> {
match compression {
CompressionType::GZip => {
use flate2::write::GzEncoder;
use flate2::Compression;
Box::new(GzEncoder::new(writer, Compression::default()))
}
CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(writer)),
#[cfg(feature = "zstd")]
CompressionType::ZStd => {
use zstd::stream::write::Encoder;
Box::new(
Encoder::new(writer, 3)
.expect("Failed to create zstd encoder")
.auto_finish(),
)
}
_ => writer,
}
}
}
impl Default for CompressionService {

View File

@@ -656,17 +656,7 @@ impl ItemService {
// Collect metadata from plugins into a Vec, then write to DB after plugins finish.
// This avoids capturing &Connection in the save_meta closure (which would need unsafe
// and wouldn't be Send for parallel plugins).
let collected_meta: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let collector = collected_meta.clone();
let save_meta: crate::meta_plugin::SaveMetaFn =
std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| {
if let Ok(mut v) = collector.lock() {
v.push((name.to_string(), value.to_string()));
}
}));
let meta_service = MetaService::new(save_meta);
let (meta_service, collected_meta) = MetaService::with_collector();
let mut plugins = meta_service.get_plugins(cmd, settings);
debug!("ITEM_SERVICE: Got {} meta plugins", plugins.len());
meta_service.initialize_plugins(&mut plugins);
@@ -852,17 +842,7 @@ impl ItemService {
db::set_item_tags(conn, item.clone(), &tags)?;
}
let collected_meta: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let collector = collected_meta.clone();
let save_meta: crate::meta_plugin::SaveMetaFn =
std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| {
if let Ok(mut v) = collector.lock() {
v.push((name.to_string(), value.to_string()));
}
}));
let meta_service = MetaService::new(save_meta);
let (meta_service, collected_meta) = MetaService::with_collector();
let mut plugins = if run_meta {
meta_service.get_plugins(&mut cmd, settings)
} else {
@@ -926,17 +906,7 @@ impl ItemService {
) -> Result<ItemWithMeta, CoreError> {
let item = db::get_item(conn, item_id)?.ok_or_else(|| CoreError::ItemNotFound(item_id))?;
let collected_meta: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let collector = collected_meta.clone();
let save_meta: crate::meta_plugin::SaveMetaFn =
std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| {
if let Ok(mut v) = collector.lock() {
v.push((name.to_string(), value.to_string()));
}
}));
let meta_service = MetaService::new(save_meta);
let (meta_service, collected_meta) = MetaService::with_collector();
let mut cmd = Command::new("keep");
let all_plugins = meta_service.get_plugins(&mut cmd, settings);

View File

@@ -4,6 +4,10 @@ use crate::modes::common::settings_meta_plugin_types;
use clap::Command;
use log::{debug, error, warn};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
/// Shared collector for metadata entries from plugins.
pub type MetaCollector = Arc<Mutex<Vec<(String, String)>>>;
pub struct MetaService {
save_meta: SaveMetaFn,
@@ -47,6 +51,27 @@ impl MetaService {
Self { save_meta }
}
/// Creates a MetaService with a built-in Vec collector.
///
/// Returns both the service and the shared collector. Plugins write
/// metadata entries into the collector via the internal save_meta callback.
/// This eliminates the boilerplate of creating the Arc<Mutex<Vec<...>>> manually.
///
/// # Returns
///
/// A tuple of (MetaService, Arc<Mutex<Vec<(String, String)>>>) where the
/// collector accumulates (name, value) pairs from plugin responses.
pub fn with_collector() -> (Self, MetaCollector) {
let collected: MetaCollector = Arc::new(Mutex::new(Vec::new()));
let collector = collected.clone();
let save_meta: SaveMetaFn = Arc::new(Mutex::new(move |name: &str, value: &str| {
if let Ok(mut v) = collector.lock() {
v.push((name.to_string(), value.to_string()));
}
}));
(Self::new(save_meta), collected)
}
pub fn get_plugins(&self, cmd: &mut Command, settings: &Settings) -> Vec<Box<dyn MetaPlugin>> {
debug!("META_SERVICE: get_plugins called");
let meta_plugin_types: Vec<MetaPluginType> = settings_meta_plugin_types(cmd, settings);

View File

@@ -1,7 +1,7 @@
#[cfg(test)]
mod tests {
use crate::compression_engine::CompressionEngine;
use crate::compression_engine::gzip::CompressionEngineGZip;
use crate::compression_engine::CompressionEngine;
use crate::tests::common::test_helpers::test_compression_engine;
#[test]

View File

@@ -4,3 +4,5 @@ pub mod gzip_tests;
pub mod lz4_tests;
#[cfg(test)]
pub mod none_tests;
#[cfg(test)]
pub mod zstd_tests;

View File

@@ -0,0 +1,20 @@
#[cfg(test)]
#[cfg(feature = "zstd")]
mod tests {
use crate::compression_engine::zstd::CompressionEngineZstd;
use crate::tests::common::test_helpers::test_compression_engine;
#[test]
fn test_compression_engine_zstd() {
let test_data = b"test compression data";
let engine = CompressionEngineZstd {};
test_compression_engine(&engine, test_data);
}
#[test]
fn test_compression_engine_zstd_large_data() {
let test_data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
let engine = CompressionEngineZstd {};
test_compression_engine(&engine, &test_data);
}
}