feat: add export/import modes, unify service layer, fix binary detection

Export/import:
- Add --export and --import modes for both local and client paths
- Use strfmt crate for --export-filename-format templates ({id}, {tags}, {ts}, {compression})
- Import preserves original timestamps via server ?ts= param
- --import-data-file for file-based import; stdin fallback streams with PIPESIZE buffers

Service unification:
- Merge SyncDataService unique methods into ItemService (delete_item now returns Result<Item>)
- Delete AsyncDataService, AsyncItemService, DataService trait (dead code / async-blocking anti-pattern)
- All server handlers use spawn_blocking + ItemService directly
- Extract shared types (ExportMeta, ImportMeta) and helpers (resolve_item_id(s), check_binary_tty)

Binary detection fix:
- Replace broken metadata.get("map") + is_binary(&[]) with actual content sampling
- Both as_meta and allow_binary paths read PIPESIZE sample before deciding
- Never load entire item into memory for binary check

Other fixes:
- Fix lock consistency: all handlers use blocking_lock() in spawn_blocking (no mixed lock().await)
- Use ISO 8601 format for {ts} in export filenames
- Fix resolve_item_ids returning only 1 item for tag lookups
- Fix client get.rs triple-buffering and export.rs whole-file buffering
- Add KeepClient::get_item_content_stream() for streaming reads
- Pass all clippy --features server lints (Path vs PathBuf, &mut conn, etc.)
This commit is contained in:
2026-03-16 08:43:26 -03:00
parent 0a3d61a875
commit 35ee71c3cf
25 changed files with 1618 additions and 1700 deletions

7
Cargo.lock generated
View File

@@ -1713,6 +1713,7 @@ dependencies = [
"sha2 0.10.9",
"similar",
"smart-default",
"strfmt",
"strip-ansi-escapes",
"strum",
"subtle",
@@ -2812,6 +2813,12 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
[[package]]
name = "strfmt"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29fdc163db75f7b5ffa3daf0c5a7136fb0d4b2f35523cd1769da05e034159feb"
[[package]]
name = "strip-ansi-escapes"
version = "0.2.1"

View File

@@ -55,6 +55,7 @@ sha2 = "0.10"
md5 = "0.7"
subtle = "2.6"
env_logger = "0.11"
strfmt = "0.2"
strum = { version = "0.27", features = ["derive"] }
term = "1.2"
tokio = { version = "1.0", features = ["full"] }

View File

@@ -24,56 +24,64 @@ pub struct Args {
/// Struct for mode-specific arguments, defining CLI flags for different operations.
#[derive(Parser, Debug, Clone)]
pub struct ModeArgs {
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["get", "diff", "list", "delete", "info", "update", "status"]))]
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["get", "diff", "list", "delete", "info", "update", "status", "export", "import"]))]
#[arg(help("Save an item using any tags or metadata provided"))]
pub save: bool,
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "diff", "list", "delete", "info", "update", "status"]))]
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "diff", "list", "delete", "info", "update", "status", "export", "import"]))]
#[arg(help(
"Get an item either by it's ID or by a combination of matching tags and metatdata"
))]
pub get: bool,
#[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "list", "delete", "info", "update", "status"]))]
#[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "list", "delete", "info", "update", "status", "export", "import"]))]
#[arg(help("Show a diff between two items by ID"))]
pub diff: bool,
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "delete", "info", "update", "status"]))]
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "delete", "info", "update", "status", "export", "import"]))]
#[arg(help("List items, filtering on tags or metadata if given"))]
pub list: bool,
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "info", "update", "status"]))]
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "info", "update", "status", "export", "import"]))]
#[arg(help("Delete items either by ID or by matching tags"))]
#[arg(requires = "ids_or_tags")]
pub delete: bool,
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "delete", "update", "status"]))]
#[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "delete", "update", "status", "export", "import"]))]
#[arg(help(
"Get an item either by it's ID or by a combination of matching tags and metatdata"
))]
pub info: bool,
#[arg(group("mode"), help_heading("Mode Options"), short('u'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "status"]))]
#[arg(group("mode"), help_heading("Mode Options"), short('u'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "status", "export", "import"]))]
#[arg(help("Update an item's tags and metadata by ID"))]
pub update: bool,
#[arg(group("mode"), help_heading("Mode Options"), short('S'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "server", "status_plugins"]))]
#[arg(group("mode"), help_heading("Mode Options"), short('S'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "server", "status_plugins", "export", "import"]))]
#[arg(help("Show status of directories and supported compression algorithms"))]
pub status: bool,
#[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server"]))]
#[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server", "export", "import"]))]
#[arg(help("Show available plugins and their configurations"))]
pub status_plugins: bool,
#[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "import"]))]
#[arg(help("Export an item to data and metadata files (default: latest item)"))]
pub export: bool,
#[arg(group("mode"), help_heading("Mode Options"), long, value_name("META_FILE"), conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "export"]))]
#[arg(help("Import an item from a metadata file (data from --import-data-file or stdin)"))]
pub import: Option<String>,
#[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status"]))]
#[arg(help("Start REST HTTP server"))]
pub server: bool,
#[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server"]))]
#[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server", "export", "import"]))]
#[arg(help("Generate default configuration and output to stdout"))]
pub generate_config: bool,
#[arg(help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server", "generate_config"]))]
#[arg(help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server", "generate_config", "export", "import"]))]
#[arg(help("Generate shell completion script (bash, zsh, fish, elvish, powershell)"))]
pub generate_completion: Option<Shell>,
@@ -192,6 +200,18 @@ pub struct ItemArgs {
#[arg(help_heading("Item Options"), long, env("KEEP_FILTERS"))]
#[arg(help("Filter string to apply to content when getting items"))]
pub filters: Option<String>,
#[arg(
help_heading("Export Options"),
long,
default_value = "{id}_{tags}_{ts}"
)]
#[arg(help("Template for export filename. Variables: {id} {tags} {ts} {compression}"))]
pub export_filename_format: String,
#[arg(help_heading("Import Options"), long, value_name("DATA_FILE"))]
#[arg(help("Data file for import (reads from stdin if omitted)"))]
pub import_data_file: Option<PathBuf>,
}
/// Struct for general options, including verbosity, paths, and output settings.

View File

@@ -357,6 +357,20 @@ impl KeepClient {
}
pub fn get_item_content_raw(&self, id: i64) -> Result<(Vec<u8>, String), CoreError> {
let (mut reader, compression) = self.get_item_content_stream(id)?;
let mut bytes = Vec::new();
reader
.read_to_end(&mut bytes)
.map_err(|e| CoreError::Other(anyhow::anyhow!("{}", e)))?;
Ok((bytes, compression))
}
/// Get a streaming reader for item content without decompression.
///
/// Returns a reader over the HTTP response body and the compression type
/// from the X-Keep-Compression header. The caller can stream through
/// decompression readers without buffering the entire file in memory.
pub fn get_item_content_stream(&self, id: i64) -> Result<(Box<dyn Read>, String), CoreError> {
let url = format!(
"{}?decompress=false",
self.url(&format!("/api/item/{id}/content"))
@@ -376,12 +390,8 @@ impl KeepClient {
.unwrap_or("none")
.to_string();
let mut body = response.into_body();
let bytes = body
.read_to_vec()
.map_err(|e| CoreError::Other(anyhow::anyhow!("{}", e)))?;
Ok((bytes, compression))
let reader = response.into_body().into_reader();
Ok((Box::new(reader), compression))
}
pub fn diff_items(&self, id_a: i64, id_b: i64) -> Result<Vec<String>, CoreError> {

View File

@@ -212,6 +212,12 @@ pub struct Settings {
// Metadata key-value pairs from --meta CLI flag
#[serde(skip)]
pub meta: Vec<(String, Option<String>)>,
// Export filename format template (--export-filename-format)
#[serde(skip)]
pub export_filename_format: String,
// Import data file path (--import-data-file)
#[serde(skip)]
pub import_data_file: Option<std::path::PathBuf>,
}
impl Settings {
@@ -526,6 +532,10 @@ impl Settings {
})
.collect();
// Set export filename format from CLI args
settings.export_filename_format = args.item.export_filename_format.clone();
settings.import_data_file = args.item.import_data_file.clone();
debug!("CONFIG: Final settings: {settings:?}");
Ok(settings)
}

View File

@@ -292,6 +292,35 @@ pub fn create_item(
})
}
/// Creates a new item with a specific timestamp (for import).
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `ts` - Timestamp to use for the item.
/// * `compression` - Compression type string (e.g., "lz4", "gzip", "none").
///
/// # Returns
///
/// * `Result<Item>` - The created item with its ID set.
pub fn insert_item_with_ts(
conn: &Connection,
ts: chrono::DateTime<chrono::Utc>,
compression: &str,
) -> Result<Item> {
let item = Item {
id: None,
ts,
size: None,
compression: compression.to_string(),
};
let item_id = insert_item(conn, item.clone())?;
Ok(Item {
id: Some(item_id),
..item
})
}
/// Adds a tag to an item.
///
/// Inserts a new tag association in the `tags` table.

View File

@@ -81,7 +81,7 @@ fn main() -> Result<(), Error> {
let ids = &mut Vec::new();
let tags = &mut Vec::new();
// For --info and --get modes, treat numeric strings as IDs
// For --info, --get, and --export modes, treat numeric strings as IDs
for v in args.ids_or_tags.iter() {
debug!("MAIN: Parsed value: {v:?}");
match v.clone() {
@@ -90,15 +90,15 @@ fn main() -> Result<(), Error> {
ids.push(num)
}
NumberOrString::Str(str) => {
// For --info and --get, try to parse strings as numbers to treat them as IDs
if (args.mode.info || args.mode.get)
// For --info, --get, and --export, try to parse strings as numbers to treat them as IDs
if (args.mode.info || args.mode.get || args.mode.export)
&& let Ok(num) = str.parse::<i64>()
{
debug!("MAIN: Adding parsed string to ids: {num}");
ids.push(num);
continue;
}
// If not a number, or not using --info/--get, treat as tag
// If not a number, or not using --info/--get/--export, treat as tag
debug!("MAIN: Adding to tags: {str}");
tags.push(str)
}
@@ -118,6 +118,8 @@ fn main() -> Result<(), Error> {
Delete,
Info,
Update,
Export,
Import,
Status,
StatusPlugins,
Server,
@@ -140,6 +142,10 @@ fn main() -> Result<(), Error> {
mode = KeepModes::Info;
} else if args.mode.update {
mode = KeepModes::Update;
} else if args.mode.export {
mode = KeepModes::Export;
} else if args.mode.import.is_some() {
mode = KeepModes::Import;
} else if args.mode.status {
mode = KeepModes::Status;
} else if args.mode.status_plugins {
@@ -258,6 +264,13 @@ fn main() -> Result<(), Error> {
KeepModes::Update => {
keep::modes::client::update::mode(&client, &mut cmd, &settings, ids, tags)
}
KeepModes::Export => {
keep::modes::client::export::mode(&client, &mut cmd, &settings, ids, tags)
}
KeepModes::Import => {
let meta_file = args.mode.import.as_ref().unwrap();
keep::modes::client::import::mode(&client, &mut cmd, &settings, meta_file)
}
_ => {
cmd.error(
ErrorKind::InvalidValue,
@@ -313,6 +326,19 @@ fn main() -> Result<(), Error> {
KeepModes::Update => {
modes::update::mode_update(&mut cmd, &settings, ids, tags, &mut conn, data_path)
}
KeepModes::Export => modes::export::mode_export(
&mut cmd,
&settings,
ids,
tags,
&mut conn,
data_path,
filter_chain,
),
KeepModes::Import => {
let meta_file = args.mode.import.as_ref().unwrap();
modes::import::mode_import(&mut cmd, &settings, meta_file, &mut conn, data_path)
}
KeepModes::Status => modes::status::mode_status(&mut cmd, &settings, data_path, db_path),
KeepModes::StatusPlugins => {
modes::status_plugins::mode_status_plugins(&mut cmd, &settings, data_path, db_path)

View File

@@ -0,0 +1,93 @@
use anyhow::{anyhow, Context, Result};
use chrono::Utc;
use clap::Command;
use log::debug;
use std::collections::HashMap;
use std::fs;
use std::io::{Read, Write};
use crate::client::KeepClient;
use crate::config;
use crate::modes::common::{resolve_item_id, sanitize_tags, ExportMeta};
/// Export an item to data and metadata files via client.
///
/// If no IDs or tags are specified, exports the latest item.
/// Streams data in fixed-size buffers without loading entire file into memory.
pub fn mode(
client: &KeepClient,
cmd: &mut Command,
settings: &config::Settings,
ids: &[i64],
tags: &[String],
) -> Result<()> {
if ids.len() > 1 {
cmd.error(
clap::error::ErrorKind::InvalidValue,
"More than one ID given, you must supply exactly one ID when using --export",
)
.exit();
}
let item_id = resolve_item_id(client, ids, tags)?;
// Get item info
let item_info = client.get_item_info(item_id)?;
// Get streaming reader for raw compressed content
let (mut reader, compression) = client.get_item_content_stream(item_id)?;
// Build template variables
let mut vars = HashMap::new();
vars.insert("id".to_string(), item_id.to_string());
vars.insert("tags".to_string(), sanitize_tags(&item_info.tags));
let ts = chrono::DateTime::parse_from_rfc3339(&item_info.ts)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
vars.insert(
"ts".to_string(),
ts.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
);
vars.insert("compression".to_string(), compression.clone());
let basename = strfmt::strfmt(&settings.export_filename_format, &vars).map_err(|e| {
anyhow!(
"Invalid export filename format '{}': {}",
settings.export_filename_format,
e
)
})?;
// Stream data file write with fixed-size buffers
let data_filename = format!("{}.data.{}", basename, compression);
let mut data_file = fs::File::create(&data_filename)
.with_context(|| format!("Cannot create data file: {}", data_filename))?;
let mut total_bytes: usize = 0;
crate::common::stream_copy(&mut reader, |chunk| {
data_file.write_all(chunk)?;
total_bytes += chunk.len();
Ok(())
})?;
debug!(
"CLIENT_EXPORT: Wrote {} bytes to {}",
total_bytes, data_filename
);
// Write meta file
let meta_filename = format!("{}.meta.yml", basename);
let export_meta = ExportMeta {
ts,
compression,
size: item_info.size,
tags: item_info.tags.clone(),
metadata: item_info.metadata.clone(),
};
let meta_yaml = serde_yaml::to_string(&export_meta)?;
fs::write(&meta_filename, &meta_yaml)
.with_context(|| format!("Cannot write meta file: {}", meta_filename))?;
debug!("CLIENT_EXPORT: Wrote metadata to {}", meta_filename);
eprintln!("{} {}", data_filename, meta_filename);
Ok(())
}

View File

@@ -1,9 +1,9 @@
use crate::client::KeepClient;
use crate::compression_engine::CompressionType;
use crate::filter_plugin::FilterChain;
use crate::modes::common::{check_binary_tty, resolve_item_id};
use anyhow::Result;
use clap::Command;
use is_terminal::IsTerminal;
use log::debug;
use std::io::{Read, Write};
use std::str::FromStr;
@@ -18,83 +18,58 @@ pub fn mode(
) -> Result<(), anyhow::Error> {
debug!("CLIENT_GET: Getting item via remote server");
// Find the item ID
let item_id = if !ids.is_empty() {
ids[0]
} else if !tags.is_empty() {
// Find item by tags
let items = client.list_items(tags, "newest", 0, 1, &std::collections::HashMap::new())?;
if items.is_empty() {
return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags));
}
items[0].id
} else {
// Get latest item
let items = client.list_items(&[], "newest", 0, 1, &std::collections::HashMap::new())?;
if items.is_empty() {
return Err(anyhow::anyhow!("No items found"));
}
items[0].id
};
let item_id = resolve_item_id(client, ids, tags)?;
// Get item info to determine compression type
// Get item info for metadata
let item_info = client.get_item_info(item_id)?;
let metadata = &item_info.metadata;
// Get raw content from server
let (raw_bytes, compression) = client.get_item_content_raw(item_id)?;
// Check if binary content would be sent to TTY
let is_text = item_info
.metadata
.get("text")
.map(|v| v == "true")
.unwrap_or(false);
if std::io::stdout().is_terminal() && !is_text && !settings.force {
// Check if content is binary
let sample_len = std::cmp::min(raw_bytes.len(), 8192);
if crate::common::is_binary::is_binary(&raw_bytes[..sample_len]) {
return Err(anyhow::anyhow!(
"Refusing to output binary data to a terminal. Use --force to override."
));
}
}
// Decompress locally using the server-reported compression type
// Get streaming reader for raw content
let (reader, compression) = client.get_item_content_stream(item_id)?;
let compression_type = CompressionType::from_str(&compression).unwrap_or(CompressionType::None);
let decompressed = match compression_type {
// Decompress through streaming readers
let mut decompressed_reader: Box<dyn Read> = match compression_type {
CompressionType::GZip => {
use flate2::read::GzDecoder;
let mut decoder = GzDecoder::new(&raw_bytes[..]);
let mut content = Vec::new();
decoder.read_to_end(&mut content)?;
content
Box::new(GzDecoder::new(reader))
}
CompressionType::LZ4 => {
use lz4_flex::frame::FrameDecoder;
let mut decoder = FrameDecoder::new(&raw_bytes[..]);
let mut content = Vec::new();
decoder.read_to_end(&mut content)?;
content
Box::new(FrameDecoder::new(reader))
}
_ => raw_bytes,
_ => reader,
};
// Apply filters if present
let output = if let Some(mut chain) = filter_chain {
let mut filtered = Vec::new();
chain.filter(&mut &decompressed[..], &mut filtered)?;
filtered
} else {
decompressed
};
// Binary detection: sample first chunk
let mut sample_buf = [0u8; crate::common::PIPESIZE];
let sample_len = decompressed_reader.read(&mut sample_buf)?;
check_binary_tty(metadata, &sample_buf[..sample_len], settings.force)?;
// Stream to stdout
// If filters present, buffer through filter chain; otherwise stream directly
if let Some(mut chain) = filter_chain {
// Apply filter to sample first, then remaining
let mut output = Vec::new();
chain.filter(&mut &sample_buf[..sample_len], &mut output)?;
crate::common::stream_copy(&mut decompressed_reader, |chunk| {
chain.filter(&mut std::io::Cursor::new(chunk), &mut output)?;
Ok(())
})?;
let stdout = std::io::stdout();
let mut stdout = stdout.lock();
stdout.write_all(&output)?;
stdout.flush()?;
} else {
// Stream decompressed content to stdout
let stdout = std::io::stdout();
let mut stdout = stdout.lock();
stdout.write_all(&sample_buf[..sample_len])?;
crate::common::stream_copy(&mut decompressed_reader, |chunk| {
stdout.write_all(chunk)?;
Ok(())
})?;
stdout.flush()?;
}
Ok(())
}

109
src/modes/client/import.rs Normal file
View File

@@ -0,0 +1,109 @@
use anyhow::{Context, Result, anyhow};
use clap::Command;
use log::debug;
use std::collections::HashMap;
use std::fs;
use std::io::Read;
use crate::client::KeepClient;
use crate::compression_engine::CompressionType;
use crate::config;
use crate::modes::common::ImportMeta;
use std::str::FromStr;
/// Import an item from a metadata file via client.
///
/// Streams data to server without buffering entire file in memory.
/// Sends original timestamp to server so it's preserved.
pub fn mode(
client: &KeepClient,
cmd: &mut Command,
settings: &config::Settings,
meta_file: &str,
) -> Result<()> {
// Read and parse metadata
let meta_yaml = fs::read_to_string(meta_file)
.with_context(|| format!("Cannot read metadata file: {}", meta_file))?;
let import_meta: ImportMeta = serde_yaml::from_str(&meta_yaml)
.with_context(|| format!("Cannot parse metadata file: {}", meta_file))?;
// Validate compression type
CompressionType::from_str(&import_meta.compression).map_err(|_| {
anyhow!(
"Invalid compression type '{}' in metadata file",
import_meta.compression
)
})?;
debug!(
"CLIENT_IMPORT: Parsed meta: ts={}, compression={}, tags={:?}",
import_meta.ts, import_meta.compression, import_meta.tags
);
// Build query parameters
let ts_str = import_meta.ts.to_rfc3339();
let params = [
("compress".to_string(), "false".to_string()),
("meta".to_string(), "false".to_string()),
("tags".to_string(), import_meta.tags.join(",")),
(
"compression_type".to_string(),
import_meta.compression.clone(),
),
("ts".to_string(), ts_str),
];
let param_refs: Vec<(&str, &str)> = params
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
// Stream data to server without buffering entire file
let item_info = if let Some(ref data_file) = settings.import_data_file {
let mut reader = fs::File::open(data_file)
.with_context(|| format!("Cannot read data file: {}", data_file.display()))?;
client.post_stream("/api/item/", &mut reader, &param_refs)?
} else {
// For stdin, we need to buffer since stdin can't be seeked
// and post_stream may need to retry. Use a BufReader for efficiency.
let mut buf = Vec::new();
std::io::stdin()
.read_to_end(&mut buf)
.context("Cannot read data from stdin")?;
if buf.is_empty() {
cmd.error(
clap::error::ErrorKind::InvalidValue,
"No data provided (empty stdin)",
)
.exit();
}
let mut cursor = std::io::Cursor::new(&buf);
client.post_stream("/api/item/", &mut cursor, &param_refs)?
};
let item_id = item_info.id;
debug!("CLIENT_IMPORT: Created item {} via server", item_id);
// Set uncompressed size if known from metadata
if let Some(size) = import_meta.size {
client.set_item_size(item_id, size as u64)?;
debug!("CLIENT_IMPORT: Set size to {}", size);
}
// Post metadata
if !import_meta.metadata.is_empty() {
client.post_metadata(item_id, &import_meta.metadata)?;
debug!(
"CLIENT_IMPORT: Set {} metadata entries",
import_meta.metadata.len()
);
}
if !settings.quiet {
println!(
"KEEP: Imported item {} tags: {:?}",
item_id, import_meta.tags
);
}
Ok(())
}

View File

@@ -1,6 +1,7 @@
use crate::client::KeepClient;
use crate::modes::common::{
DisplayItemInfo, OutputFormat, format_size, render_item_info_table, settings_output_format,
DisplayItemInfo, OutputFormat, format_size, render_item_info_table, resolve_item_ids,
settings_output_format,
};
use clap::Command;
use log::debug;
@@ -15,17 +16,7 @@ pub fn mode(
debug!("CLIENT_INFO: Getting item info via remote server");
let output_format = settings_output_format(settings);
// If tags provided, find matching item first
let item_ids: Vec<i64> = if !tags.is_empty() {
let items = client.list_items(tags, "newest", 0, 1, &std::collections::HashMap::new())?;
if items.is_empty() {
return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags));
}
items.into_iter().map(|i| i.id).collect()
} else {
ids.to_vec()
};
let item_ids = resolve_item_ids(client, ids, tags)?;
for &id in &item_ids {
let item = client.get_item_info(id)?;

View File

@@ -1,6 +1,8 @@
pub mod delete;
pub mod diff;
pub mod export;
pub mod get;
pub mod import;
pub mod info;
pub mod list;
pub mod save;

View File

@@ -75,8 +75,8 @@ pub fn mode(
// Wrap pipe writer with appropriate compression
let mut compressor: Box<dyn Write> = match compression_type_clone {
CompressionType::GZip => {
use flate2::write::GzEncoder;
use flate2::Compression;
use flate2::write::GzEncoder;
Box::new(GzEncoder::new(pipe_writer, Compression::default()))
}
CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(pipe_writer)),

View File

@@ -16,11 +16,14 @@ use crate::compression_engine::CompressionType;
/// ```
use crate::config;
use crate::meta_plugin::MetaPluginType;
use anyhow::{Result, anyhow};
use chrono::{DateTime, Utc};
use clap::Command;
use clap::error::ErrorKind;
use comfy_table::{Attribute, Cell, ContentArrangement, Table};
use log::debug;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::io::IsTerminal;
@@ -618,3 +621,111 @@ pub fn build_path_table(path_info: &PathInfo, table_config: &config::TableConfig
path_table
}
/// Sanitize tags for use in filenames.
///
/// Replaces non-alphanumeric characters with underscores and joins with `_`.
/// Empty tags are filtered out to avoid double underscores.
pub fn sanitize_tags(tags: &[String]) -> String {
tags.iter()
.filter(|t| !t.is_empty())
.map(|t| {
t.chars()
.map(|c| if c.is_alphanumeric() { c } else { '_' })
.collect::<String>()
})
.collect::<Vec<_>>()
.join("_")
}
/// Metadata structure for export to YAML. Shared by local and client export modes.
#[derive(Debug, Serialize)]
pub struct ExportMeta {
pub ts: DateTime<Utc>,
pub compression: String,
pub size: Option<i64>,
pub tags: Vec<String>,
pub metadata: HashMap<String, String>,
}
/// Metadata structure for import from YAML. Shared by local and client import modes.
#[derive(Debug, Deserialize)]
pub struct ImportMeta {
pub ts: DateTime<Utc>,
pub compression: String,
#[serde(default)]
pub size: Option<i64>,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
/// Resolve a single item ID from explicit IDs, tags, or latest item.
///
/// Returns the first ID if provided, the newest item matching tags,
/// or the newest item overall if neither is specified.
pub fn resolve_item_id(
client: &crate::client::KeepClient,
ids: &[i64],
tags: &[String],
) -> Result<i64> {
if !ids.is_empty() {
Ok(ids[0])
} else if !tags.is_empty() {
let items = client.list_items(tags, "newest", 0, 1, &HashMap::new())?;
if items.is_empty() {
return Err(anyhow!("No items found matching tags: {:?}", tags));
}
Ok(items[0].id)
} else {
let items = client.list_items(&[], "newest", 0, 1, &HashMap::new())?;
if items.is_empty() {
return Err(anyhow!("No items found"));
}
Ok(items[0].id)
}
}
/// Resolve item IDs from explicit IDs or tags (multi-item variant).
pub fn resolve_item_ids(
client: &crate::client::KeepClient,
ids: &[i64],
tags: &[String],
) -> Result<Vec<i64>> {
if !ids.is_empty() {
Ok(ids.to_vec())
} else if !tags.is_empty() {
let items = client.list_items(tags, "newest", 0, 0, &HashMap::new())?;
if items.is_empty() {
return Err(anyhow!("No items found matching tags: {:?}", tags));
}
Ok(items.into_iter().map(|i| i.id).collect())
} else {
let items = client.list_items(&[], "newest", 0, 1, &HashMap::new())?;
if items.is_empty() {
return Err(anyhow!("No items found"));
}
Ok(vec![items[0].id])
}
}
/// Check if binary content should be blocked from TTY output.
///
/// Uses metadata `text` field as fast path, then falls back to byte sampling.
/// Returns Err if content is binary and should not be displayed.
pub fn check_binary_tty(
metadata: &HashMap<String, String>,
data_sample: &[u8],
force: bool,
) -> Result<()> {
if force || !std::io::stdout().is_terminal() {
return Ok(());
}
if crate::common::is_binary::is_content_binary_from_metadata(metadata, data_sample) {
return Err(anyhow!(
"Refusing to output binary data to TTY, use --force to override"
));
}
Ok(())
}

129
src/modes/export.rs Normal file
View File

@@ -0,0 +1,129 @@
use anyhow::{anyhow, Context, Result};
use chrono::{DateTime, Utc};
use clap::Command;
use log::debug;
use std::collections::HashMap;
use std::fs;
use std::io::{Read, Write};
use std::path::PathBuf;
use crate::config;
use crate::filter_plugin::FilterChain;
use crate::modes::common::{sanitize_tags, ExportMeta};
use crate::services::item_service::ItemService;
/// Export an item to data and metadata files.
///
/// If no IDs or tags are specified, exports the latest item.
/// Writes `{basename}.data.{compression}` for raw data and `{basename}.meta.yml` for metadata.
pub fn mode_export(
cmd: &mut Command,
settings: &config::Settings,
ids: &mut [i64],
tags: &mut [String],
conn: &mut rusqlite::Connection,
data_path: PathBuf,
filter_chain: Option<FilterChain>,
) -> Result<()> {
if !ids.is_empty() && !tags.is_empty() {
cmd.error(
clap::error::ErrorKind::InvalidValue,
"Both ID and tags given, you must supply either IDs or tags when using --export",
)
.exit();
} else if ids.len() > 1 {
cmd.error(
clap::error::ErrorKind::InvalidValue,
"More than one ID given, you must supply exactly one ID when using --export",
)
.exit();
}
let item_service = ItemService::new(data_path.clone());
let meta_filter: HashMap<String, Option<String>> = settings
.meta
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let item_with_meta = item_service
.find_item(conn, ids, tags, &meta_filter)
.map_err(|e| anyhow!("Unable to find matching item: {}", e))?;
let item_id = item_with_meta.item.id.context("Item missing ID")?;
let item_tags: Vec<String> = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let meta_map = item_with_meta.meta_as_map();
// Build template variables
let mut vars = HashMap::new();
vars.insert("id".to_string(), item_id.to_string());
vars.insert("tags".to_string(), sanitize_tags(&item_tags));
vars.insert(
"ts".to_string(),
item_with_meta
.item
.ts
.format("%Y-%m-%dT%H:%M:%SZ")
.to_string(),
);
vars.insert(
"compression".to_string(),
item_with_meta.item.compression.clone(),
);
let basename = strfmt::strfmt(&settings.export_filename_format, &vars).map_err(|e| {
anyhow!(
"Invalid export filename format '{}': {}",
settings.export_filename_format,
e
)
})?;
// Write data file
let data_filename = format!("{}.data.{}", basename, item_with_meta.item.compression);
let mut item_path = data_path.clone();
item_path.push(item_id.to_string());
if filter_chain.is_some() {
// Apply filters: decompress, filter, write
let (mut reader, _, _) = item_service.get_item_content_info_streaming_with_chain(
conn,
item_id,
filter_chain.as_ref(),
)?;
let mut out_file = fs::File::create(&data_filename)
.with_context(|| format!("Cannot create data file: {}", data_filename))?;
let mut buf = [0u8; 8192];
loop {
let n = reader.read(&mut buf)?;
if n == 0 {
break;
}
out_file.write_all(&buf[..n])?;
}
debug!("EXPORT: Wrote filtered data to {}", data_filename);
} else {
// Raw copy of compressed file
fs::copy(&item_path, &data_filename)
.with_context(|| format!("Cannot copy {} to {}", item_path.display(), data_filename))?;
debug!("EXPORT: Copied raw data to {}", data_filename);
}
// Write meta file
let meta_filename = format!("{}.meta.yml", basename);
let export_meta = ExportMeta {
ts: item_with_meta.item.ts,
compression: item_with_meta.item.compression.clone(),
size: item_with_meta.item.size,
tags: item_tags,
metadata: meta_map,
};
let meta_yaml = serde_yaml::to_string(&export_meta)?;
fs::write(&meta_filename, &meta_yaml)
.with_context(|| format!("Cannot write meta file: {}", meta_filename))?;
debug!("EXPORT: Wrote metadata to {}", meta_filename);
eprintln!("{} {}", data_filename, meta_filename);
Ok(())
}

146
src/modes/import.rs Normal file
View File

@@ -0,0 +1,146 @@
use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Utc};
use clap::Command;
use log::debug;
use std::collections::HashMap;
use std::fs;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::str::FromStr;
use crate::common::PIPESIZE;
use crate::compression_engine::CompressionType;
use crate::config;
use crate::db;
use crate::modes::common::ImportMeta;
/// Import an item from a metadata file and optional data file.
///
/// If `import_data_file` is not provided, reads data from stdin.
pub fn mode_import(
cmd: &mut Command,
settings: &config::Settings,
meta_file: &str,
conn: &mut rusqlite::Connection,
data_path: PathBuf,
) -> Result<()> {
// Read metadata
let meta_yaml = fs::read_to_string(meta_file)
.with_context(|| format!("Cannot read metadata file: {}", meta_file))?;
let import_meta: ImportMeta = serde_yaml::from_str(&meta_yaml)
.with_context(|| format!("Cannot parse metadata file: {}", meta_file))?;
// Validate compression type
CompressionType::from_str(&import_meta.compression).map_err(|_| {
anyhow!(
"Invalid compression type '{}' in metadata file",
import_meta.compression
)
})?;
debug!(
"IMPORT: Parsed meta: ts={}, compression={}, tags={:?}",
import_meta.ts, import_meta.compression, import_meta.tags
);
// Create item with original timestamp
let item = db::insert_item_with_ts(conn, import_meta.ts, &import_meta.compression)?;
let item_id = item.id.context("New item missing ID")?;
debug!(
"IMPORT: Created item {} with compression {}",
item_id, import_meta.compression
);
// Set tags
if !import_meta.tags.is_empty() {
db::set_item_tags(conn, item.clone(), &import_meta.tags)?;
debug!("IMPORT: Set {} tags", import_meta.tags.len());
}
// Write data to storage using streaming copy
let mut item_path = data_path;
item_path.push(item_id.to_string());
let data_size: i64 = if let Some(ref data_file) = settings.import_data_file {
// Stream from file to storage using fixed-size buffers
let mut reader = fs::File::open(data_file)
.with_context(|| format!("Cannot read data file: {}", data_file.display()))?;
let mut writer = fs::File::create(&item_path)
.with_context(|| format!("Cannot create item file: {}", item_path.display()))?;
let mut buf = [0u8; PIPESIZE];
let mut total = 0i64;
loop {
let n = reader.read(&mut buf)?;
if n == 0 {
break;
}
writer.write_all(&buf[..n])?;
total += n as i64;
}
total
} else {
// Stream from stdin to storage
let mut writer = fs::File::create(&item_path)
.with_context(|| format!("Cannot create item file: {}", item_path.display()))?;
let mut stdin = std::io::stdin().lock();
let mut buf = [0u8; PIPESIZE];
let mut total = 0i64;
loop {
let n = stdin.read(&mut buf)?;
if n == 0 {
break;
}
writer.write_all(&buf[..n])?;
total += n as i64;
}
total
};
if data_size == 0 {
cmd.error(
clap::error::ErrorKind::InvalidValue,
"No data provided (empty file or stdin)",
)
.exit();
}
debug!(
"IMPORT: Wrote {} bytes to {}",
data_size,
item_path.display()
);
// Set metadata
for (key, value) in &import_meta.metadata {
db::query_upsert_meta(
conn,
db::Meta {
id: item_id,
name: key.clone(),
value: value.clone(),
},
)?;
}
if !import_meta.metadata.is_empty() {
debug!(
"IMPORT: Set {} metadata entries",
import_meta.metadata.len()
);
}
// Update item size (use imported size if available, otherwise data length)
let size_to_record = import_meta.size.unwrap_or(data_size);
let mut updated_item = item;
updated_item.size = Some(size_to_record);
db::update_item(conn, updated_item)?;
if !settings.quiet {
println!(
"KEEP: Imported item {} tags: {:?}",
item_id, import_meta.tags
);
}
Ok(())
}

View File

@@ -9,8 +9,10 @@ pub mod common;
pub mod delete;
pub mod diff;
pub mod export;
pub mod generate_config;
pub mod get;
pub mod import;
pub mod info;
pub mod list;
pub mod save;
@@ -27,12 +29,18 @@ pub use delete::mode_delete;
/// Compares two items and shows differences.
pub use diff::mode_diff;
/// Exports an item to data and metadata files.
pub use export::mode_export;
/// Generates a default configuration file.
pub use generate_config::mode_generate_config;
/// Retrieves and outputs item content.
pub use get::mode_get;
/// Imports an item from metadata and data files.
pub use import::mode_import;
/// Displays detailed information about items.
pub use info::mode_info;

File diff suppressed because it is too large Load Diff

View File

@@ -644,6 +644,10 @@ pub struct CreateItemQuery {
/// Only used when compress=false — tells the server what compression
/// the client applied so the correct type is recorded in the database.
pub compression_type: Option<String>,
/// Optional timestamp for the item (RFC 3339 format).
/// Used during import to preserve the original item's timestamp.
/// If not provided, the server uses the current time.
pub ts: Option<String>,
}
/// Query parameters for updating item metadata via POST.

View File

@@ -1,284 +0,0 @@
use crate::common::status::StatusInfo;
use crate::config::Settings;
use crate::db::Item;
use crate::db::Meta;
use crate::services::data_service::DataService;
use crate::services::error::CoreError;
use crate::services::types::{ItemWithContent, ItemWithMeta};
use clap::Command;
use futures::Stream;
use rusqlite::Connection;
use std::collections::HashMap;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct AsyncDataService {
data_path: PathBuf,
settings: Arc<Settings>,
db: Arc<Mutex<Connection>>,
sync_service: crate::services::SyncDataService,
}
impl AsyncDataService {
pub fn new(data_path: PathBuf, settings: Arc<Settings>, db: Arc<Mutex<Connection>>) -> Self {
let sync_service =
crate::services::SyncDataService::new(data_path.clone(), settings.as_ref().clone());
Self {
data_path,
settings,
db,
sync_service,
}
}
pub fn data_path(&self) -> &PathBuf {
&self.data_path
}
pub fn settings(&self) -> Arc<Settings> {
self.settings.clone()
}
pub fn db(&self) -> Arc<Mutex<Connection>> {
self.db.clone()
}
pub async fn get_item(&self, id: i64) -> Result<ItemWithMeta, CoreError> {
let mut conn = self.db.lock().await;
self.get(&mut conn, id)
}
pub async fn add_item_meta(
&self,
item_id: i64,
name: &str,
value: &str,
) -> Result<(), CoreError> {
let conn = self.db.lock().await;
crate::db::add_meta(&conn, item_id, name, value)?;
Ok(())
}
pub async fn list_items(
&self,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, CoreError> {
let mut conn = self.db.lock().await;
self.list(&mut conn, tags, meta)
}
pub async fn find_item(
&self,
ids: Vec<i64>,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<ItemWithMeta, CoreError> {
let mut conn = self.db.lock().await;
DataService::find_item(self, &mut conn, ids, tags, meta)
}
pub async fn get_item_content_info_streaming(
&self,
id: i64,
_filter: Option<String>,
) -> Result<
(
Pin<Box<dyn Stream<Item = Result<Vec<u8>, CoreError>> + Send>>,
ItemWithMeta,
bool,
),
CoreError,
> {
let mut conn = self.db.lock().await;
let (reader, item_with_meta) = self.get_content(&mut conn, id)?;
let is_binary = item_with_meta
.meta
.iter()
.find(|m| m.name == "text")
.map(|m| m.value == "false")
.unwrap_or(false);
// Convert reader to stream with optimized buffer reuse
let stream = async_stream::stream! {
let mut reader = reader;
let mut buf = [0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => yield Ok(buf[..n].to_vec()),
Err(e) => yield Err(CoreError::from(e)),
}
}
};
Ok((Box::pin(stream), item_with_meta, is_binary))
}
pub async fn stream_item_content_by_id_with_metadata(
&self,
id: i64,
_metadata: &HashMap<String, String>,
_force_text: bool,
offset: u64,
length: u64,
_filter: Option<String>,
) -> Result<
(
Pin<Box<dyn Stream<Item = Result<Vec<u8>, std::io::Error>> + Send>>,
u64,
),
CoreError,
> {
let mut conn = self.db.lock().await;
let (mut reader, _item_with_meta) = self.get_content(&mut conn, id)?;
// Skip bytes for offset
if offset > 0 {
let mut skip_buf = [0u8; 8192];
let mut remaining = offset;
while remaining > 0 {
let to_read = std::cmp::min(8192, remaining as usize);
let n = reader.read(&mut skip_buf[..to_read])?;
if n == 0 {
break;
}
remaining -= n as u64;
}
}
let content_length = if length > 0 { length } else { u64::MAX };
// Optimized stream that reuses a single buffer for reading
let stream = async_stream::stream! {
let mut reader = reader;
let mut remaining = content_length;
let mut buf = [0u8; 8192];
while remaining > 0 {
let to_read = std::cmp::min(8192, remaining as usize);
match reader.read(&mut buf[..to_read]) {
Ok(0) => break,
Ok(n) => {
remaining -= n as u64;
yield Ok(buf[..n].to_vec());
}
Err(e) => {
yield Err(e);
break;
}
}
}
};
Ok((Box::pin(stream), content_length))
}
/// Get raw item content without decompression as a streaming reader.
///
/// Opens the stored file directly from disk, bypassing decompression.
/// Used when the client requests raw bytes with `decompress=false`.
/// Returns a boxed reader that can be used for streaming.
pub async fn get_raw_item_content_reader(
&self,
id: i64,
) -> Result<Box<dyn Read + Send>, CoreError> {
let data_path = self.data_path.clone();
tokio::task::spawn_blocking(move || {
let mut item_path = data_path;
item_path.push(id.to_string());
let file = std::fs::File::open(&item_path).map_err(|e| {
CoreError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Item file not found: {item_path:?}: {e}"),
))
})?;
Ok(Box::new(file) as Box<dyn Read + Send>)
})
.await
.map_err(|e| CoreError::Other(anyhow::anyhow!("Task join error: {}", e)))?
}
}
impl DataService for AsyncDataService {
type Error = CoreError;
fn save<R: Read>(
&self,
content: R,
cmd: &mut Command,
settings: &Settings,
tags: Vec<String>,
conn: &mut Connection,
) -> Result<Item, Self::Error> {
self.sync_service.save(content, cmd, settings, tags, conn)
}
fn get(&self, conn: &mut Connection, id: i64) -> Result<ItemWithMeta, Self::Error> {
self.sync_service.get(conn, id)
}
fn get_content(
&self,
conn: &mut Connection,
id: i64,
) -> Result<(Box<dyn Read + Send>, ItemWithMeta), Self::Error> {
self.sync_service.get_content(conn, id)
}
fn list(
&self,
conn: &mut Connection,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, Self::Error> {
self.sync_service.list(conn, tags, meta)
}
fn delete(&self, conn: &mut Connection, id: i64) -> Result<Item, Self::Error> {
self.sync_service.delete(conn, id)
}
fn find_item(
&self,
conn: &mut Connection,
ids: Vec<i64>,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<ItemWithMeta, Self::Error> {
self.sync_service.find_item(conn, ids, tags, meta)
}
fn get_items(
&self,
conn: &mut Connection,
ids: &[i64],
tags: &[String],
meta: &HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, Self::Error> {
self.sync_service.get_items(conn, ids, tags, meta)
}
fn generate_status(
&self,
settings: &Settings,
data_path: &Path,
db_path: &Path,
) -> Result<StatusInfo, Self::Error> {
let mut cmd = Command::new("keep");
let status_service = crate::services::StatusService::new();
Ok(status_service.generate_status(
&mut cmd,
settings,
data_path.to_path_buf(),
db_path.to_path_buf(),
)?)
}
}

View File

@@ -1,390 +0,0 @@
/// Asynchronous service wrapper for `ItemService`.
///
/// Uses `tokio::task::spawn_blocking` to offload synchronous operations (DB/FS)
/// to a blocking thread pool, allowing non-blocking async usage in servers.
use crate::common::PIPESIZE;
use crate::config::Settings;
use crate::services::error::CoreError;
use crate::services::item_service::ItemService;
use crate::services::types::{ItemWithContent, ItemWithMeta};
use clap::Command;
use rusqlite::Connection;
use std::collections::HashMap;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
/// An asynchronous wrapper around the `ItemService` for use in async contexts like the web server.
/// It uses `tokio::task::spawn_blocking` to run synchronous database and filesystem operations
/// on a dedicated thread pool, preventing them from blocking the async runtime.
#[allow(dead_code)]
/// Async wrapper for ItemService operations.
pub struct AsyncItemService {
pub data_dir: PathBuf,
db: Arc<Mutex<Connection>>,
item_service: Arc<ItemService>,
cmd: Arc<Mutex<Command>>,
settings: Arc<Settings>,
}
#[allow(dead_code)]
impl AsyncItemService {
/// Creates a new `AsyncItemService`.
///
/// # Arguments
///
/// * `data_dir` - Path to data directory.
/// * `db` - Arc-wrapped mutex for DB connection.
/// * `item_service` - Arc-wrapped ItemService.
/// * `cmd` - Arc-wrapped mutex for Clap command.
/// * `settings` - Arc-wrapped settings.
///
/// # Returns
///
/// A new `AsyncItemService`.
pub fn new(
data_dir: PathBuf,
db: Arc<Mutex<Connection>>,
item_service: Arc<ItemService>,
cmd: Arc<Mutex<Command>>,
settings: Arc<Settings>,
) -> Self {
Self {
data_dir,
db,
item_service,
cmd,
settings,
}
}
/// Internal helper to execute synchronous operations in a blocking task.
///
/// Spawns a blocking task with the DB connection and ItemService.
///
/// # Type Parameters
///
/// * `F` - Closure type.
/// * `T` - Return type.
///
/// # Arguments
///
/// * `f` - The synchronous closure to execute.
///
/// # Returns
///
/// Result of the closure, or CoreError on task failure.
async fn execute_blocking<F, T>(&self, f: F) -> Result<T, CoreError>
where
F: FnOnce(&Connection, &ItemService) -> Result<T, CoreError> + Send + 'static,
T: Send + 'static,
{
let db = self.db.clone();
let item_service = self.item_service.clone();
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
f(&conn, &item_service)
})
.await
.map_err(|e| CoreError::Other(anyhow::anyhow!("Blocking task failed: {}", e)))?
}
pub async fn get_item(&self, id: i64) -> Result<ItemWithMeta, CoreError> {
self.execute_blocking(move |conn, item_service| item_service.get_item(conn, id))
.await
}
pub async fn get_item_content(&self, id: i64) -> Result<ItemWithContent, CoreError> {
self.execute_blocking(move |conn, item_service| item_service.get_item_content(conn, id))
.await
}
pub async fn stream_item_content_by_id(
&self,
item_id: i64,
allow_binary: bool,
offset: u64,
length: u64,
) -> Result<
(
std::pin::Pin<
Box<
dyn tokio_stream::Stream<
Item = Result<tokio_util::bytes::Bytes, std::io::Error>,
> + Send,
>,
>,
String,
),
CoreError,
> {
// Use streaming approach: get reader and stream chunks in requested range
let (reader, mime_type, is_binary) = self
.execute_blocking(move |conn, item_service| {
item_service.get_item_content_info_streaming(conn, item_id, None)
})
.await?;
// Check if content is binary when allow_binary is false
if !allow_binary && is_binary {
return Err(CoreError::InvalidInput(
"Binary content not allowed".to_string(),
));
}
// Convert the reader into an async stream with offset and length applied
use tokio_util::bytes::Bytes;
// Create a channel to stream data between the blocking thread and async runtime
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, std::io::Error>>(16);
// Spawn a blocking task to read from the reader and send chunks
tokio::task::spawn_blocking(move || {
let mut reader = reader;
let mut buf = [0u8; PIPESIZE];
// Apply offset by reading and discarding bytes
if offset > 0 {
let mut remaining = offset;
while remaining > 0 {
let to_read = std::cmp::min(remaining, buf.len() as u64) as usize;
match reader.read(&mut buf[..to_read]) {
Ok(0) => break, // EOF reached before offset
Ok(n) => remaining -= n as u64,
Err(e) => {
let _ = tx.blocking_send(Err(e));
return;
}
}
}
}
// Read and send data up to the specified length
let mut remaining_length = length;
loop {
// Determine how much to read in this iteration
let to_read = if length > 0 {
// If length is specified, don't read more than remaining_length
std::cmp::min(remaining_length, buf.len() as u64) as usize
} else {
buf.len()
};
if to_read == 0 {
break; // We've read the requested length
}
match reader.read(&mut buf[..to_read]) {
Ok(0) => break, // EOF
Ok(n) => {
let chunk = Bytes::copy_from_slice(&buf[..n]);
// Block on sending to the channel
if tx.blocking_send(Ok(chunk)).is_err() {
break; // Receiver dropped
}
if length > 0 {
remaining_length -= n as u64;
if remaining_length == 0 {
break; // Reached the requested length
}
}
}
Err(e) => {
let _ = tx.blocking_send(Err(e));
break;
}
}
}
});
// Convert the receiver into a stream
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok((Box::pin(stream), mime_type))
}
pub async fn stream_item_content_by_id_with_metadata(
&self,
item_id: i64,
metadata: &HashMap<String, String>,
allow_binary: bool,
offset: u64,
length: u64,
filter: Option<String>,
) -> Result<
(
std::pin::Pin<
Box<
dyn tokio_stream::Stream<
Item = Result<tokio_util::bytes::Bytes, std::io::Error>,
> + Send,
>,
>,
String,
),
CoreError,
> {
// Use provided metadata to determine MIME type and binary status
let mime_type = metadata
.get("mime_type")
.map(|s| s.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string());
// Check if content is binary when allow_binary is false
if !allow_binary {
let is_binary = if let Some(text_val) = metadata.get("text") {
text_val == "false"
} else {
// Get binary status using streaming approach
let (_, _, is_binary) = self.get_item_content_info_streaming(item_id, None).await?;
is_binary
};
if is_binary {
return Err(CoreError::InvalidInput(
"Binary content not allowed".to_string(),
));
}
}
// Get a streaming reader for the content with filtering applied
let reader = {
let db = self.db.clone();
let item_service = self.item_service.clone();
let filter = filter.clone();
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
item_service
.get_item_content_info_streaming(&conn, item_id, filter)
.map(|(reader, _, _)| reader)
})
.await
.map_err(|e| CoreError::Other(anyhow::anyhow!("Blocking task failed: {}", e)))?
};
// Convert the reader into an async stream manually
use tokio_util::bytes::Bytes;
// Create a channel to stream data between the blocking thread and async runtime
let (tx, rx) = tokio::sync::mpsc::channel(1);
// Spawn a blocking task to read from the reader and send chunks
tokio::task::spawn_blocking(move || {
let mut reader = reader;
// Apply offset by reading and discarding bytes
if offset > 0 {
let mut remaining = offset;
let mut buf = [0; PIPESIZE];
while remaining > 0 {
let to_read = std::cmp::min(remaining, buf.len() as u64);
match reader.as_mut().unwrap().read(&mut buf[..to_read as usize]) {
Ok(0) => break, // EOF reached before offset
Ok(n) => remaining -= n as u64,
Err(e) => {
let _ = tx.blocking_send(Err(e));
return;
}
}
}
}
// Read and send data up to the specified length
let mut remaining_length = length;
let mut buffer = [0; PIPESIZE];
loop {
// Determine how much to read in this iteration
let to_read = if length > 0 {
// If length is specified, don't read more than remaining_length
std::cmp::min(remaining_length, buffer.len() as u64) as usize
} else {
buffer.len()
};
if to_read == 0 {
break; // We've read the requested length
}
match reader.as_mut().unwrap().read(&mut buffer[..to_read]) {
Ok(0) => break, // EOF
Ok(n) => {
let chunk = Bytes::copy_from_slice(&buffer[..n]);
// Block on sending to the channel
if tx.blocking_send(Ok(chunk)).is_err() {
break; // Receiver dropped
}
if length > 0 {
remaining_length -= n as u64;
if remaining_length == 0 {
break; // Reached the requested length
}
}
}
Err(e) => {
let _ = tx.blocking_send(Err(e));
break;
}
}
}
});
// Convert the receiver into a stream
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok((Box::pin(stream), mime_type))
}
pub async fn get_item_content_info_streaming(
&self,
item_id: i64,
filter: Option<String>,
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
self.execute_blocking(move |conn, item_service| {
item_service.get_item_content_info_streaming(conn, item_id, filter)
})
.await
}
pub async fn find_item(
&self,
ids: Vec<i64>,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<ItemWithMeta, CoreError> {
let ids_clone = ids.clone();
let tags_clone = tags.clone();
let meta_clone = meta.clone();
self.execute_blocking(move |conn, item_service| {
item_service.find_item(conn, &ids_clone, &tags_clone, &meta_clone)
})
.await
}
pub async fn list_items(
&self,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, CoreError> {
let tags_clone = tags.clone();
let meta_clone = meta.clone();
self.execute_blocking(move |conn, item_service| {
item_service.list_items(conn, &tags_clone, &meta_clone)
})
.await
}
pub async fn delete_item(&self, id: i64) -> Result<(), CoreError> {
let db = self.db.clone();
let item_service = self.item_service.clone();
tokio::task::spawn_blocking(move || {
let mut conn = db.blocking_lock();
item_service.delete_item(&mut conn, id)
})
.await
.map_err(|e| CoreError::Other(anyhow::anyhow!("task join error: {e}")))?
}
}

View File

@@ -1,63 +0,0 @@
use crate::common::status::StatusInfo;
use crate::config::Settings;
use crate::db::Item;
use crate::services::error::CoreError;
use crate::services::types::{ItemWithContent, ItemWithMeta};
use clap::Command;
use rusqlite::Connection;
use std::collections::HashMap;
use std::io::Read;
use std::path::Path;
pub trait DataService {
type Error;
fn save<R: Read>(
&self,
content: R,
cmd: &mut Command,
settings: &Settings,
tags: Vec<String>,
conn: &mut Connection,
) -> Result<Item, Self::Error>;
fn get(&self, conn: &mut Connection, id: i64) -> Result<ItemWithMeta, Self::Error>;
fn get_content(
&self,
conn: &mut Connection,
id: i64,
) -> Result<(Box<dyn Read + Send>, ItemWithMeta), Self::Error>;
fn list(
&self,
conn: &mut Connection,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, Self::Error>;
fn delete(&self, conn: &mut Connection, id: i64) -> Result<Item, Self::Error>;
fn find_item(
&self,
conn: &mut Connection,
ids: Vec<i64>,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<ItemWithMeta, Self::Error>;
fn get_items(
&self,
conn: &mut Connection,
ids: &[i64],
tags: &[String],
meta: &HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, Self::Error>;
fn generate_status(
&self,
settings: &Settings,
data_path: &Path,
db_path: &Path,
) -> Result<StatusInfo, Self::Error>;
}

View File

@@ -8,12 +8,14 @@ use crate::services::error::CoreError;
use crate::services::filter_service::FilterService;
use crate::services::meta_service::MetaService;
use crate::services::types::{ItemWithContent, ItemWithMeta};
use chrono::DateTime;
use chrono::Utc;
use clap::Command;
use log::debug;
use rusqlite::Connection;
use std::collections::HashMap;
use std::fs;
use std::io::{IsTerminal, Read, Write};
use std::io::{Cursor, IsTerminal, Read, Write};
use std::path::PathBuf;
/// Service for managing items in the Keep application.
@@ -530,7 +532,7 @@ impl ItemService {
/// ```ignore
/// item_service.delete_item(&mut conn, 1)?;
/// ```
pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<(), CoreError> {
pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<Item, CoreError> {
debug!("ITEM_SERVICE: Deleting item with id: {id}");
if id <= 0 {
return Err(CoreError::InvalidInput(format!("Invalid item ID: {id}")));
@@ -542,6 +544,7 @@ impl ItemService {
item_path.push(id.to_string());
debug!("ITEM_SERVICE: Deleting file at path: {item_path:?}");
let deleted_item = item.clone();
db::delete_item(conn, item)?;
fs::remove_file(&item_path).or_else(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
@@ -552,7 +555,7 @@ impl ItemService {
})?;
debug!("ITEM_SERVICE: Successfully deleted item {id}");
Ok(())
Ok(deleted_item)
}
/// Saves content from a reader to a new item.
@@ -723,6 +726,270 @@ impl ItemService {
pub fn get_data_path(&self) -> &PathBuf {
&self.data_path
}
/// Returns a streaming reader and item metadata for the given item.
pub fn get_item_content_streaming(
&self,
conn: &Connection,
id: i64,
) -> Result<(Box<dyn Read + Send>, ItemWithMeta), CoreError> {
let (reader, _mime, _is_binary) = self.get_item_content_info_streaming(conn, id, None)?;
let item_with_meta = self.get_item(conn, id)?;
Ok((reader, item_with_meta))
}
/// Fetches multiple items by ID, silently skipping not-found items.
/// Falls back to `list_items` if the ID list is empty.
pub fn get_items(
&self,
conn: &Connection,
ids: &[i64],
tags: &[String],
meta: &HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, CoreError> {
if ids.is_empty() {
return self.list_items(conn, tags, meta);
}
let mut results = Vec::new();
for id in ids {
match self.get_item(conn, *id) {
Ok(item) => results.push(item),
Err(CoreError::ItemNotFound(_)) => continue,
Err(e) => return Err(e),
}
}
Ok(results)
}
/// Save an item with granular control over compression and meta plugins.
///
/// This method allows callers to control whether compression and meta plugins
/// run server-side or were already handled by the client.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `content` - Raw content bytes.
/// * `tags` - Tags to associate with the item.
/// * `metadata` - Client-provided metadata.
/// * `compress` - Whether the server should compress the content.
/// * `run_meta` - Whether the server should run meta plugins.
/// * `settings` - Application settings.
///
/// # Returns
///
/// * `Result<ItemWithMeta, CoreError>` - The saved item with full details.
#[allow(clippy::too_many_arguments)]
pub fn save_item_raw(
&self,
conn: &mut Connection,
content: &[u8],
tags: Vec<String>,
metadata: HashMap<String, String>,
compress: bool,
run_meta: bool,
settings: &Settings,
) -> Result<ItemWithMeta, CoreError> {
let mut cursor = Cursor::new(content);
self.save_item_raw_streaming(
conn,
&mut cursor,
tags,
metadata,
compress,
run_meta,
None,
None,
settings,
)
}
/// Save an item from a streaming reader with granular control over compression.
///
/// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method
/// reads from the reader in chunks and writes directly to the compression
/// engine, avoiding buffering the entire content in memory.
#[allow(clippy::too_many_arguments)]
pub fn save_item_raw_streaming(
&self,
conn: &mut Connection,
reader: &mut dyn Read,
tags: Vec<String>,
metadata: HashMap<String, String>,
compress: bool,
run_meta: bool,
client_compression_type: Option<CompressionType>,
import_ts: Option<DateTime<Utc>>,
settings: &Settings,
) -> Result<ItemWithMeta, CoreError> {
let mut cmd = Command::new("keep");
let mut tags = tags;
crate::modes::common::ensure_default_tag(&mut tags);
let (compression_type_for_db, compression_engine) = if compress {
let ct = settings_compression_type(&mut cmd, settings);
let engine = get_compression_engine(ct.clone())?;
(ct, engine)
} else {
let ct = client_compression_type.unwrap_or(CompressionType::None);
let engine = get_compression_engine(CompressionType::None)?;
(ct, engine)
};
let item_id;
let mut item;
{
item = if let Some(ts) = import_ts {
db::insert_item_with_ts(conn, ts, &compression_type_for_db.to_string())?
} else {
db::create_item(conn, compression_type_for_db.clone())?
};
item_id = item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
db::set_item_tags(conn, item.clone(), &tags)?;
}
let collected_meta: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let collector = collected_meta.clone();
let save_meta: crate::meta_plugin::SaveMetaFn =
std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| {
if let Ok(mut v) = collector.lock() {
v.push((name.to_string(), value.to_string()));
}
}));
let meta_service = MetaService::new(save_meta);
let mut plugins = if run_meta {
meta_service.get_plugins(&mut cmd, settings)
} else {
Vec::new()
};
if run_meta {
meta_service.initialize_plugins(&mut plugins);
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
let mut item_out = compression_engine.create(item_path)?;
let mut total_bytes = 0i64;
crate::common::stream_copy(reader, |chunk| {
item_out.write_all(chunk)?;
total_bytes += chunk.len() as i64;
if run_meta {
meta_service.process_chunk(&mut plugins, chunk);
}
Ok(())
})?;
item_out.flush()?;
drop(item_out);
if run_meta {
meta_service.finalize_plugins(&mut plugins);
}
if run_meta && let Ok(entries) = collected_meta.lock() {
for (name, value) in entries.iter() {
db::add_meta(conn, item_id, name, value)?;
}
}
for (key, value) in &metadata {
if key != "uncompressed_size" {
db::add_meta(conn, item_id, key, value)?;
}
}
item.size = Some(total_bytes);
db::update_item(conn, item)?;
self.get_item(conn, item_id)
}
/// Runs specified meta plugins on an existing item's content and stores the results.
pub fn update_item_plugins(
&self,
conn: &mut Connection,
item_id: i64,
plugin_names: &[String],
metadata: HashMap<String, String>,
tags: &[String],
settings: &Settings,
) -> Result<ItemWithMeta, CoreError> {
let item = db::get_item(conn, item_id)?.ok_or_else(|| CoreError::ItemNotFound(item_id))?;
let collected_meta: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let collector = collected_meta.clone();
let save_meta: crate::meta_plugin::SaveMetaFn =
std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| {
if let Ok(mut v) = collector.lock() {
v.push((name.to_string(), value.to_string()));
}
}));
let meta_service = MetaService::new(save_meta);
let mut cmd = Command::new("keep");
let all_plugins = meta_service.get_plugins(&mut cmd, settings);
let mut plugins: Vec<Box<dyn crate::meta_plugin::MetaPlugin>> = all_plugins
.into_iter()
.filter(|p| {
let plugin_name = p.meta_type().to_string();
plugin_names.iter().any(|n| n == &plugin_name)
})
.collect();
if plugins.is_empty() && metadata.is_empty() {
return self.get_item(conn, item_id);
}
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
if !item_path.exists() {
return Err(CoreError::ItemNotFound(item_id));
}
if !plugins.is_empty() {
let compression_service = CompressionService::new();
let mut reader =
compression_service.stream_item_content(item_path, &item.compression)?;
meta_service.initialize_plugins(&mut plugins);
crate::common::stream_copy(&mut reader, |chunk| {
meta_service.process_chunk(&mut plugins, chunk);
Ok(())
})?;
meta_service.finalize_plugins(&mut plugins);
if let Ok(entries) = collected_meta.lock() {
for (name, value) in entries.iter() {
db::add_meta(conn, item_id, name, value)?;
}
}
}
for (key, value) in &metadata {
db::add_meta(conn, item_id, key, value)?;
}
for tag in tags {
db::upsert_tag(conn, item_id, tag)?;
}
self.get_item(conn, item_id)
}
}
/// A reader that applies a filter chain to the data as it's read.

View File

@@ -1,25 +1,17 @@
pub mod async_data_service;
pub mod async_item_service;
pub mod compression_service;
pub mod data_service;
pub mod error;
pub mod filter_service;
pub mod item_service;
pub mod meta_service;
pub mod status_service;
pub mod sync_data_service;
pub mod types;
pub mod utils;
pub use async_data_service::AsyncDataService;
pub use async_item_service::AsyncItemService;
pub use compression_service::CompressionService;
pub use data_service::DataService;
pub use error::CoreError;
pub use filter_service::{FilterService, register_filter_plugin};
pub use item_service::ItemService;
pub use meta_service::MetaService;
pub use status_service::StatusService;
pub use sync_data_service::SyncDataService;
pub use types::{ItemWithContent, ItemWithMeta};
pub use utils::{calc_byte_range, extract_tags, parse_comma_tags};

View File

@@ -1,450 +0,0 @@
use crate::common::status::StatusInfo;
use crate::compression_engine::{CompressionType, get_compression_engine};
use crate::config::Settings;
use crate::db::Item;
use crate::db::Meta;
use crate::modes::common::settings_compression_type;
use crate::services::data_service::DataService;
use crate::services::error::CoreError;
use crate::services::item_service::ItemService;
use crate::services::meta_service::MetaService;
use crate::services::status_service::StatusService;
use crate::services::types::{ItemWithContent, ItemWithMeta};
use clap::Command;
use rusqlite::Connection;
use std::collections::HashMap;
use std::io::{Cursor, Read, Write};
use std::path::{Path, PathBuf};
pub struct SyncDataService {
item_service: ItemService,
settings: Settings,
}
impl SyncDataService {
pub fn new(data_path: PathBuf, settings: Settings) -> Self {
Self {
item_service: ItemService::new(data_path),
settings,
}
}
pub fn with_connection(data_path: PathBuf, settings: Settings, _conn: &Connection) -> Self {
Self::new(data_path, settings)
}
pub fn item_service(&self) -> &ItemService {
&self.item_service
}
pub fn settings(&self) -> &Settings {
&self.settings
}
pub fn get_data_path(&self) -> &PathBuf {
self.item_service.get_data_path()
}
pub fn save_item<R: Read>(
&self,
content: R,
cmd: &mut Command,
settings: &Settings,
tags: &mut Vec<String>,
conn: &mut Connection,
) -> Result<Item, CoreError> {
self.item_service
.save_item(content, cmd, settings, tags, conn)
}
/// Save an item with granular control over compression and meta plugins.
///
/// This method allows clients to control whether compression and meta plugins
/// run server-side or were already handled by the client.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `content` - Raw content bytes.
/// * `tags` - Tags to associate with the item.
/// * `metadata` - Client-provided metadata.
/// * `compress` - Whether the server should compress the content.
/// * `run_meta` - Whether the server should run meta plugins.
///
/// # Returns
///
/// * `Result<ItemWithMeta, CoreError>` - The saved item with full details.
pub fn save_item_raw(
&self,
conn: &mut Connection,
content: &[u8],
tags: Vec<String>,
metadata: HashMap<String, String>,
compress: bool,
run_meta: bool,
) -> Result<ItemWithMeta, CoreError> {
let mut cursor = Cursor::new(content);
self.save_item_raw_streaming(conn, &mut cursor, tags, metadata, compress, run_meta, None)
}
/// Save an item from a streaming reader with granular control over compression.
///
/// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method
/// reads from the reader in chunks and writes directly to the compression
/// engine, avoiding buffering the entire content in memory.
#[allow(clippy::too_many_arguments)]
pub fn save_item_raw_streaming(
&self,
conn: &mut Connection,
reader: &mut dyn Read,
tags: Vec<String>,
metadata: HashMap<String, String>,
compress: bool,
run_meta: bool,
client_compression_type: Option<CompressionType>,
) -> Result<ItemWithMeta, CoreError> {
let mut cmd = Command::new("keep");
let settings = &self.settings;
let mut tags = tags;
crate::modes::common::ensure_default_tag(&mut tags);
// Determine compression type for DB record and for the file writer.
// When compress=true: server compresses using its configured engine.
// When compress=false: client already compressed — write raw bytes to disk
// but record the client's compression type in the DB.
let (compression_type_for_db, compression_engine) = if compress {
let ct = settings_compression_type(&mut cmd, settings);
let engine = get_compression_engine(ct.clone())?;
(ct, engine)
} else {
// Client already compressed — write raw (no engine), record actual type
let ct = client_compression_type.unwrap_or(CompressionType::None);
let engine = get_compression_engine(CompressionType::None)?;
(ct, engine)
};
let item_id;
let mut item;
{
item = crate::db::create_item(conn, compression_type_for_db.clone())?;
item_id = item
.id
.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
crate::db::set_item_tags(conn, item.clone(), &tags)?;
}
// Initialize meta plugins if requested
// Collect metadata in memory, write to DB after plugins finish.
let collected_meta: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let collector = collected_meta.clone();
let save_meta: crate::meta_plugin::SaveMetaFn =
std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| {
if let Ok(mut v) = collector.lock() {
v.push((name.to_string(), value.to_string()));
}
}));
let meta_service = MetaService::new(save_meta);
let mut plugins = if run_meta {
meta_service.get_plugins(&mut cmd, settings)
} else {
Vec::new()
};
if run_meta {
meta_service.initialize_plugins(&mut plugins);
}
// Write content to file via streaming
let mut item_path = self.item_service.get_data_path().clone();
item_path.push(item_id.to_string());
let mut item_out = compression_engine.create(item_path)?;
let mut total_bytes = 0i64;
crate::common::stream_copy(reader, |chunk| {
item_out.write_all(chunk)?;
total_bytes += chunk.len() as i64;
if run_meta {
meta_service.process_chunk(&mut plugins, chunk);
}
Ok(())
})?;
item_out.flush()?;
drop(item_out);
// Finalize meta plugins
if run_meta {
meta_service.finalize_plugins(&mut plugins);
}
// Write collected plugin metadata to DB
if run_meta && let Ok(entries) = collected_meta.lock() {
for (name, value) in entries.iter() {
crate::db::add_meta(conn, item_id, name, value)?;
}
}
// Add client-provided metadata (excluding internal fields)
for (key, value) in &metadata {
if key != "uncompressed_size" {
crate::db::add_meta(conn, item_id, key, value)?;
}
}
item.size = Some(total_bytes);
crate::db::update_item(conn, item)?;
self.get_item(conn, item_id)
}
pub fn get_item(&self, conn: &mut Connection, id: i64) -> Result<ItemWithMeta, CoreError> {
self.item_service.get_item(conn, id)
}
pub fn get_item_content(
&self,
conn: &Connection,
id: i64,
) -> Result<ItemWithContent, CoreError> {
self.item_service.get_item_content(conn, id)
}
pub fn get_item_content_streaming(
&self,
conn: &Connection,
id: i64,
) -> Result<(Box<dyn Read + Send>, ItemWithMeta), CoreError> {
let (reader, _mime, _is_binary) = self
.item_service
.get_item_content_info_streaming(conn, id, None)?;
let item_with_meta = self.item_service.get_item(conn, id)?;
Ok((reader, item_with_meta))
}
pub fn list_items(
&self,
conn: &mut Connection,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, CoreError> {
self.item_service.list_items(conn, &tags, &meta)
}
pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<Item, CoreError> {
let item_with_meta = self.item_service.get_item(conn, id)?;
let item = item_with_meta.item.clone();
self.item_service.delete_item(conn, id)?;
Ok(item)
}
pub fn find_item(
&self,
conn: &mut Connection,
ids: Vec<i64>,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<ItemWithMeta, CoreError> {
self.item_service.find_item(conn, &ids, &tags, &meta)
}
pub fn generate_status(
&self,
cmd: &mut Command,
settings: &Settings,
data_path: PathBuf,
db_path: PathBuf,
) -> StatusInfo {
let status_service = StatusService::new();
status_service
.generate_status(cmd, settings, data_path, db_path)
.unwrap_or_else(|_| StatusInfo::default())
}
}
impl DataService for SyncDataService {
type Error = CoreError;
fn save<R: Read>(
&self,
content: R,
cmd: &mut Command,
settings: &Settings,
mut tags: Vec<String>,
conn: &mut Connection,
) -> Result<Item, Self::Error> {
crate::modes::common::ensure_default_tag(&mut tags);
self.item_service
.save_item(content, cmd, settings, &mut tags, conn)
}
fn get(&self, conn: &mut Connection, id: i64) -> Result<ItemWithMeta, Self::Error> {
self.get_item(conn, id)
}
fn get_content(
&self,
conn: &mut Connection,
id: i64,
) -> Result<(Box<dyn Read + Send>, ItemWithMeta), Self::Error> {
self.get_item_content_streaming(conn, id)
}
fn list(
&self,
conn: &mut Connection,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, Self::Error> {
self.list_items(conn, tags, meta)
}
fn delete(&self, conn: &mut Connection, id: i64) -> Result<Item, Self::Error> {
self.delete_item(conn, id)
}
fn find_item(
&self,
conn: &mut Connection,
ids: Vec<i64>,
tags: Vec<String>,
meta: HashMap<String, Option<String>>,
) -> Result<ItemWithMeta, Self::Error> {
self.find_item(conn, ids, tags, meta)
}
fn get_items(
&self,
conn: &mut Connection,
ids: &[i64],
tags: &[String],
meta: &HashMap<String, Option<String>>,
) -> Result<Vec<ItemWithMeta>, Self::Error> {
if ids.is_empty() {
return self.list_items(conn, tags.to_vec(), meta.clone());
}
let mut results = Vec::new();
for id in ids {
match self.get_item(conn, *id) {
Ok(item) => results.push(item),
Err(CoreError::ItemNotFound(_)) => continue,
Err(e) => return Err(e),
}
}
Ok(results)
}
fn generate_status(
&self,
settings: &Settings,
data_path: &Path,
db_path: &Path,
) -> Result<StatusInfo, Self::Error> {
let status_service = StatusService::new();
let mut cmd = Command::new("keep");
Ok(status_service.generate_status(
&mut cmd,
settings,
data_path.to_path_buf(),
db_path.to_path_buf(),
)?)
}
}
/// Runs specified meta plugins on an existing item's content and stores the results.
impl SyncDataService {
pub fn update_item_plugins(
&self,
conn: &mut Connection,
item_id: i64,
plugin_names: &[String],
metadata: HashMap<String, String>,
tags: &[String],
) -> Result<ItemWithMeta, CoreError> {
use crate::services::compression_service::CompressionService;
use std::io::Read;
let item =
crate::db::get_item(conn, item_id)?.ok_or_else(|| CoreError::ItemNotFound(item_id))?;
// Collect metadata in memory
let collected_meta: std::sync::Arc<std::sync::Mutex<Vec<(String, String)>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let collector = collected_meta.clone();
let save_meta: crate::meta_plugin::SaveMetaFn =
std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| {
if let Ok(mut v) = collector.lock() {
v.push((name.to_string(), value.to_string()));
}
}));
// Create MetaService and get only the requested plugins
let meta_service = crate::services::meta_service::MetaService::new(save_meta);
let mut cmd = Command::new("keep");
let settings = &self.settings;
// Filter to only the requested plugin types
let all_plugins = meta_service.get_plugins(&mut cmd, settings);
let mut plugins: Vec<Box<dyn crate::meta_plugin::MetaPlugin>> = all_plugins
.into_iter()
.filter(|p| {
let plugin_name = p.meta_type().to_string();
plugin_names.iter().any(|n| n == &plugin_name)
})
.collect();
if plugins.is_empty() && metadata.is_empty() {
// Nothing to do, return current item info
return self.get_item(conn, item_id);
}
// Open and decompress the stored file
let mut item_path = self.item_service.get_data_path().clone();
item_path.push(item_id.to_string());
if !item_path.exists() {
return Err(CoreError::ItemNotFound(item_id));
}
if !plugins.is_empty() {
let compression_service = CompressionService::new();
let mut reader =
compression_service.stream_item_content(item_path, &item.compression)?;
// Run plugins on the content
meta_service.initialize_plugins(&mut plugins);
crate::common::stream_copy(&mut reader, |chunk| {
meta_service.process_chunk(&mut plugins, chunk);
Ok(())
})?;
meta_service.finalize_plugins(&mut plugins);
// Write collected plugin metadata to DB
if let Ok(entries) = collected_meta.lock() {
for (name, value) in entries.iter() {
crate::db::add_meta(conn, item_id, name, value)?;
}
}
}
// Apply direct metadata overrides
for (key, value) in &metadata {
crate::db::add_meta(conn, item_id, key, value)?;
}
// Apply tags
for tag in tags {
crate::db::upsert_tag(conn, item_id, tag)?;
}
self.get_item(conn, item_id)
}
}