diff --git a/Cargo.lock b/Cargo.lock index a58fa2f..3bd2ad3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1025,6 +1025,17 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "filetime" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98844151eee8917efc50bd9e8318cb963ae8b297431495d3f758616ea5c57db" +dependencies = [ + "cfg-if", + "libc", + "libredox", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -1744,6 +1755,7 @@ dependencies = [ "strip-ansi-escapes", "strum", "subtle", + "tar", "tempfile", "term", "thiserror 2.0.18", @@ -1793,7 +1805,10 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" dependencies = [ + "bitflags 2.11.0", "libc", + "plain", + "redox_syscall 0.7.3", ] [[package]] @@ -2108,7 +2123,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -2207,6 +2222,12 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plain" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" + [[package]] name = "portable-atomic" version = "1.13.1" @@ -2391,6 +2412,15 @@ dependencies = [ "bitflags 2.11.0", ] +[[package]] +name = "redox_syscall" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" +dependencies = [ + "bitflags 2.11.0", +] + [[package]] name = "redox_users" version = "0.5.2" @@ -2938,6 +2968,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tar" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempfile" version = "3.27.0" @@ -3961,6 +4002,16 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "xdg" version = "2.5.2" diff --git a/Cargo.toml b/Cargo.toml index 28e862d..e4fac36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ uzers = "0.12" which = "8.0" xdg = "2.5" strip-ansi-escapes = "0.2" +tar = "0.4" pest = "2.8" pest_derive = "2.8" dirs = "6.0" @@ -83,6 +84,7 @@ os_pipe = { version = "1", optional = true } axum-server = { version = "0.8", features = ["tls-rustls"], optional = true } jsonwebtoken = { version = "10", optional = true, features = ["aws_lc_rs"] } tiktoken-rs = { version = "0.9", optional = true } +tempfile = "3.3" [features] # Default features include core compression engines and swagger UI @@ -128,5 +130,4 @@ tls = ["dep:axum-server"] tokens = ["dep:tiktoken-rs"] [dev-dependencies] -tempfile = "3.3" rand = "0.9" diff --git a/src/args.rs b/src/args.rs index f5e9896..67fa2a1 100644 --- a/src/args.rs +++ b/src/args.rs @@ -66,11 +66,11 @@ pub struct ModeArgs { pub status_plugins: bool, #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "import"]))] - #[arg(help("Export an item to data and metadata files (default: latest item)"))] + #[arg(help("Export items to a .keep.tar archive (requires IDs or tags)"))] pub export: bool, - #[arg(group("mode"), help_heading("Mode Options"), long, value_name("META_FILE"), conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "export"]))] - #[arg(help("Import an item from a metadata file (data from --import-data-file or stdin)"))] + #[arg(group("mode"), help_heading("Mode Options"), long, value_name("FILE"), conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "export"]))] + #[arg(help("Import items from a .keep.tar archive or legacy .meta.yml file"))] pub import: Option, #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status"]))] @@ -201,14 +201,14 @@ pub struct ItemArgs { #[arg(help("Filter string to apply to content when getting items"))] pub filters: Option, - #[arg( - help_heading("Export Options"), - long, - default_value = "{id}_{tags}_{ts}" - )] - #[arg(help("Template for export filename. Variables: {id} {tags} {ts} {compression}"))] + #[arg(help_heading("Export Options"), long, default_value = "{name}_{ts}")] + #[arg(help("Template for export tar filename (appends .keep.tar). Variables: {name} {ts}"))] pub export_filename_format: String, + #[arg(help_heading("Export Options"), long, value_name("NAME"))] + #[arg(help("Export name used for {name} variable (default: export_)"))] + pub export_name: Option, + #[arg(help_heading("Import Options"), long, value_name("DATA_FILE"))] #[arg(help("Data file for import (reads from stdin if omitted)"))] pub import_data_file: Option, diff --git a/src/client.rs b/src/client.rs index 44a1b4f..d5937b8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -397,7 +397,7 @@ impl KeepClient { .headers() .get("X-Keep-Compression") .and_then(|v| v.to_str().ok()) - .unwrap_or("none") + .unwrap_or("raw") .to_string(); let reader = response.into_body().into_reader(); @@ -416,4 +416,109 @@ impl KeepClient { let response: ApiResponse = self.get_json_with_query("/api/diff", ¶m_refs)?; Ok(response.data.unwrap_or_default()) } + + /// Export items to a tar archive, streaming the response to a file. + /// + /// # Arguments + /// + /// * `ids` - Item IDs to export (mutually exclusive with tags). + /// * `tags` - Tags to search for items (mutually exclusive with ids). + /// * `dest` - Destination file path. + pub fn export_items_to_file( + &self, + ids: &[i64], + tags: &[String], + dest: &std::path::Path, + ) -> Result<(), CoreError> { + let mut params: Vec<(String, String)> = Vec::new(); + if !ids.is_empty() { + let id_strs: Vec = ids.iter().map(|id| id.to_string()).collect(); + params.push(("ids".to_string(), id_strs.join(","))); + } + if !tags.is_empty() { + params.push(("tags".to_string(), tags.join(","))); + } + let param_refs: Vec<(&str, &str)> = params + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + + let mut url = self.url("/api/export"); + if !param_refs.is_empty() { + url.push('?'); + for (i, (key, value)) in param_refs.iter().enumerate() { + if i > 0 { + url.push('&'); + } + url.push_str(&format!("{}={}", url_encode(key), url_encode(value))); + } + } + + let mut req = self.agent.get(&url); + if let Some(ref auth) = self.auth_header() { + req = req.header("Authorization", auth); + } + + let response = self.handle_error(req.call())?; + let mut reader = response.into_body().into_reader(); + + let mut file = std::fs::File::create(dest).map_err(CoreError::Io)?; + let mut buf = [0u8; crate::common::PIPESIZE]; + loop { + let n = reader.read(&mut buf).map_err(CoreError::Io)?; + if n == 0 { + break; + } + std::io::Write::write_all(&mut file, &buf[..n]).map_err(CoreError::Io)?; + } + + Ok(()) + } + + /// Import items from a tar archive, streaming the file to the server. + /// + /// # Arguments + /// + /// * `tar_path` - Path to the `.keep.tar` file. + /// + /// # Returns + /// + /// A list of newly assigned item IDs. + pub fn import_tar_file(&self, tar_path: &std::path::Path) -> Result, CoreError> { + #[derive(serde::Deserialize)] + struct ApiResponse { + data: Option, + error: Option, + } + + #[derive(serde::Deserialize)] + struct ImportResponse { + ids: Vec, + } + + let mut file = std::fs::File::open(tar_path).map_err(CoreError::Io)?; + + let url = self.url("/api/import"); + let mut req = self.agent.post(&url); + if let Some(ref auth) = self.auth_header() { + req = req.header("Authorization", auth); + } + req = req.header("Content-Type", "application/x-tar"); + + let response = self.handle_error(req.send(ureq::SendBody::from_reader(&mut file)))?; + + let body = response + .into_body() + .read_to_string() + .map_err(|e| CoreError::InvalidInput(format!("Cannot read response: {e}")))?; + + let api_response: ApiResponse = serde_json::from_str(&body) + .map_err(|e| CoreError::InvalidInput(format!("Cannot parse response: {e}")))?; + + if let Some(error) = api_response.error { + return Err(CoreError::InvalidInput(error)); + } + + Ok(api_response.data.map(|d| d.ids).unwrap_or_default()) + } } diff --git a/src/common/is_binary.rs b/src/common/is_binary.rs index 79f3261..60364b8 100644 --- a/src/common/is_binary.rs +++ b/src/common/is_binary.rs @@ -149,7 +149,7 @@ fn has_binary_signature(data: &[u8]) -> bool { /// Check if data looks like UTF-16 without BOM fn looks_like_utf16(data: &[u8]) -> bool { - if data.len() < 4 || !data.len().is_multiple_of(2) { + if data.len() < 4 || data.len() % 2 != 0 { return false; } diff --git a/src/common/mod.rs b/src/common/mod.rs index 6d148e0..b1d4128 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -82,3 +82,10 @@ pub fn read_with_bounds( } Ok(result) } + +/// Sanitize a timestamp string for use in filenames. +/// +/// Replaces colons with hyphens (e.g., `2026-03-17T12:00:00Z` → `2026-03-17T12-00-00Z`). +pub fn sanitize_ts_string(ts: &str) -> String { + ts.replace(':', "-") +} diff --git a/src/compression_engine/mod.rs b/src/compression_engine/mod.rs index 011fb90..1a31f5a 100644 --- a/src/compression_engine/mod.rs +++ b/src/compression_engine/mod.rs @@ -15,8 +15,8 @@ use enum_map::{Enum, EnumMap}; pub mod gzip; pub mod lz4; -pub mod none; pub mod program; +pub mod raw; pub mod zstd; use crate::compression_engine::program::CompressionEngineProgram; @@ -45,8 +45,8 @@ pub enum CompressionType { XZ, #[strum(serialize = "zstd")] ZStd, - #[strum(serialize = "none")] - None, + #[strum(to_string = "raw", serialize = "raw", serialize = "none")] + Raw, } /// Trait defining the interface for compression engines. @@ -209,7 +209,7 @@ lazy_static! { vec!["-c"], vec!["-d", "-c"] )) as Box, - CompressionType::None => Box::new(crate::compression_engine::none::CompressionEngineNone::new()) as Box + CompressionType::Raw => Box::new(crate::compression_engine::raw::CompressionEngineRaw::new()) as Box }; #[cfg(feature = "gzip")] diff --git a/src/compression_engine/none.rs b/src/compression_engine/raw.rs similarity index 82% rename from src/compression_engine/none.rs rename to src/compression_engine/raw.rs index 86428fe..e543c84 100644 --- a/src/compression_engine/none.rs +++ b/src/compression_engine/raw.rs @@ -7,15 +7,15 @@ use std::path::PathBuf; use crate::compression_engine::CompressionEngine; #[derive(Debug, Eq, PartialEq, Clone, Default)] -pub struct CompressionEngineNone {} +pub struct CompressionEngineRaw {} -impl CompressionEngineNone { - pub fn new() -> CompressionEngineNone { - CompressionEngineNone {} +impl CompressionEngineRaw { + pub fn new() -> CompressionEngineRaw { + CompressionEngineRaw {} } } -impl CompressionEngine for CompressionEngineNone { +impl CompressionEngine for CompressionEngineRaw { fn is_supported(&self) -> bool { true } diff --git a/src/config.rs b/src/config.rs index b3e401a..be4aa61 100644 --- a/src/config.rs +++ b/src/config.rs @@ -217,6 +217,9 @@ pub struct Settings { // Export filename format template (--export-filename-format) #[serde(skip)] pub export_filename_format: String, + // Export name for {name} variable (--export-name) + #[serde(skip)] + pub export_name: Option, // Import data file path (--import-data-file) #[serde(skip)] pub import_data_file: Option, @@ -538,6 +541,7 @@ impl Settings { // Set export filename format from CLI args settings.export_filename_format = args.item.export_filename_format.clone(); + settings.export_name = args.item.export_name.clone(); settings.import_data_file = args.item.import_data_file.clone(); // Expand ~ in all path fields diff --git a/src/db.rs b/src/db.rs index 7488fee..a13cfdf 100644 --- a/src/db.rs +++ b/src/db.rs @@ -77,6 +77,7 @@ lazy_static! { ), M::up("CREATE INDEX idx_tags_name ON tags(name)"), M::up("CREATE INDEX idx_metas_name ON metas(name)"), + M::up("UPDATE items SET compression = 'raw' WHERE compression = 'none'"), ]); } @@ -298,7 +299,7 @@ pub fn create_item( /// /// * `conn` - Database connection. /// * `ts` - Timestamp to use for the item. -/// * `compression` - Compression type string (e.g., "lz4", "gzip", "none"). +/// * `compression` - Compression type string (e.g., "lz4", "gzip", "raw"). /// /// # Returns /// diff --git a/src/export_tar.rs b/src/export_tar.rs new file mode 100644 index 0000000..a617562 --- /dev/null +++ b/src/export_tar.rs @@ -0,0 +1,167 @@ +use anyhow::{Context, Result, anyhow}; +use log::debug; +use std::collections::HashSet; +use std::fs; +use std::io::{Read, Seek, Write}; +use std::path::Path; + +use tar::{Builder, Header}; + +use crate::filter_plugin::FilterChain; +use crate::modes::common::ExportMeta; +use crate::services::item_service::ItemService; +use crate::services::types::ItemWithMeta; + +/// Compute the intersection of all items' tag sets. +/// +/// Returns sorted tags that are present on ALL items. +pub fn common_tags(items: &[ItemWithMeta]) -> Vec { + if items.is_empty() { + return Vec::new(); + } + + let mut common: HashSet = items[0].tags.iter().map(|t| t.name.clone()).collect(); + + for item in items.iter().skip(1) { + let item_tags: HashSet = item.tags.iter().map(|t| t.name.clone()).collect(); + common = common.intersection(&item_tags).cloned().collect(); + } + + let mut result: Vec = common.into_iter().collect(); + result.sort(); + result +} + +/// Resolve the export name from the CLI arg or compute default from common tags. +/// +/// If `arg` is Some, uses that value directly. +/// Otherwise, computes `export_` or just `export` if no common tags. +pub fn export_name(arg: &Option, items: &[ItemWithMeta]) -> String { + if let Some(name) = arg { + return name.clone(); + } + + let tags = common_tags(items); + if tags.is_empty() { + "export".to_string() + } else { + format!("export_{}", tags.join("_")) + } +} + +/// Write items to a tar archive, streaming data without loading files into memory. +/// +/// The archive contains `/.data.` and +/// `/.meta.yml` for each item. +/// +/// # Arguments +/// +/// * `writer` - The output writer (e.g., a File). +/// * `dir_name` - Top-level directory name inside the tar. +/// * `items` - Items to export. +/// * `data_path` - Path to the data storage directory. +/// * `filter_chain` - Optional filter chain for transforming content on export. +/// * `item_service` - Item service for streaming content. +/// * `conn` - Database connection for filter chain operations. +pub fn write_export_tar( + writer: W, + dir_name: &str, + items: &[ItemWithMeta], + data_path: &Path, + filter_chain: Option<&FilterChain>, + item_service: &ItemService, + conn: &rusqlite::Connection, +) -> Result<()> { + let mut builder = Builder::new(writer); + + for item_with_meta in items { + let item_id = item_with_meta.item.id.context("Item missing ID")?; + + let compression = &item_with_meta.item.compression; + let item_tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); + let meta_map = item_with_meta.meta_as_map(); + + let data_path_entry = format!("{dir_name}/{item_id}.data.{compression}"); + let meta_path_entry = format!("{dir_name}/{item_id}.meta.yml"); + + // Meta entry (small, in-memory is fine) + let export_meta = ExportMeta { + ts: item_with_meta.item.ts, + compression: compression.clone(), + size: item_with_meta.item.size, + tags: item_tags, + metadata: meta_map, + }; + let meta_yaml = serde_yaml::to_string(&export_meta)?; + let meta_bytes = meta_yaml.into_bytes(); + let meta_len = meta_bytes.len() as u64; + + let mut meta_header = Header::new_gnu(); + meta_header.set_size(meta_len); + meta_header.set_mode(0o644); + meta_header.set_path(&meta_path_entry)?; + meta_header.set_cksum(); + builder + .append(&meta_header, meta_bytes.as_slice()) + .with_context(|| format!("Cannot write meta entry for item {item_id}"))?; + debug!("EXPORT_TAR: Wrote meta entry {meta_path_entry}"); + + // Data entry + let mut item_file_path = data_path.to_path_buf(); + item_file_path.push(item_id.to_string()); + + if let Some(chain) = filter_chain { + // Filtered export: spool through filter chain to a temp file, + // then stream the temp file into the tar with known size. + let (mut reader, _, _) = item_service.get_item_content_info_streaming_with_chain( + conn, + item_id, + Some(chain), + )?; + + let mut tmp = tempfile::NamedTempFile::new() + .context("Cannot create temp file for filtered export")?; + let mut buf = [0u8; crate::common::PIPESIZE]; + loop { + let n = reader.read(&mut buf)?; + if n == 0 { + break; + } + tmp.write_all(&buf[..n])?; + } + tmp.flush()?; + let total_size = tmp.as_file().metadata()?.len(); + tmp.rewind()?; + + let mut data_header = Header::new_gnu(); + data_header.set_size(total_size); + data_header.set_mode(0o644); + data_header.set_path(&data_path_entry)?; + data_header.set_cksum(); + builder + .append(&data_header, &mut tmp) + .with_context(|| format!("Cannot write data entry for item {item_id}"))?; + debug!("EXPORT_TAR: Wrote filtered data entry {data_path_entry} ({total_size} bytes)"); + } else { + // Unfiltered export: stream raw compressed file + let file = fs::File::open(&item_file_path) + .with_context(|| format!("Cannot open data file: {}", item_file_path.display()))?; + let file_size = file.metadata()?.len(); + + let mut data_header = Header::new_gnu(); + data_header.set_size(file_size); + data_header.set_mode(0o644); + data_header.set_path(&data_path_entry)?; + data_header.set_cksum(); + builder + .append(&data_header, file) + .with_context(|| format!("Cannot write data entry for item {item_id}"))?; + debug!("EXPORT_TAR: Wrote data entry {data_path_entry} ({file_size} bytes)"); + } + } + + builder.finish().context("Cannot finalize tar archive")?; + debug!("EXPORT_TAR: Archive finalized"); + + Ok(()) +} diff --git a/src/import_tar.rs b/src/import_tar.rs new file mode 100644 index 0000000..226553d --- /dev/null +++ b/src/import_tar.rs @@ -0,0 +1,223 @@ +use anyhow::{Context, Result, anyhow}; +use log::debug; +use std::collections::HashMap; +use std::fs; +use std::io::{Read, Write}; +use std::path::Path; +use std::str::FromStr; +use tempfile::TempDir; + +use tar::Archive; + +use crate::common::PIPESIZE; +use crate::compression_engine::CompressionType; +use crate::db; +use crate::modes::common::ImportMeta; + +/// Represents a parsed tar entry from an export archive. +struct TarEntry { + /// Path to the extracted data file in the temp directory. + data_path: Option, + /// Path to the extracted meta file in the temp directory. + meta_path: Option, +} + +/// Import all items from a `.keep.tar` archive. +/// +/// Items are imported in ascending order of their original IDs, +/// ensuring chronological ordering is preserved. Each imported item +/// receives a new auto-incremented ID from the target database. +/// +/// # Arguments +/// +/// * `tar_path` - Path to the `.keep.tar` file. +/// * `conn` - Mutable database connection. +/// * `data_path` - Path to the data storage directory. +/// +/// # Returns +/// +/// A list of newly assigned item IDs. +pub fn import_from_tar( + tar_path: &Path, + conn: &mut rusqlite::Connection, + data_path: &Path, +) -> Result> { + let file = fs::File::open(tar_path) + .with_context(|| format!("Cannot open tar file: {}", tar_path.display()))?; + let mut archive = Archive::new(file); + + let tmp_dir = TempDir::new().context("Cannot create temporary directory for import")?; + let tmp_path = tmp_dir.path(); + + // Extract entries to temp dir + let mut entries_map: HashMap = HashMap::new(); + + for entry_result in archive.entries().context("Cannot read tar entries")? { + let mut entry = entry_result.context("Cannot read tar entry")?; + let entry_path = entry.path().context("Cannot get entry path")?.to_path_buf(); + + let path_str = entry_path.to_string_lossy().replace('\\', "/"); + + // Reject path traversal attempts + if path_str.starts_with('/') || path_str.starts_with("..") || path_str.contains("/../") { + return Err(anyhow!("Rejected path traversal entry: {path_str}")); + } + + // Skip directory entries + if entry.header().entry_type().is_dir() { + debug!("IMPORT_TAR: Skipping directory entry: {path_str}"); + continue; + } + + // Parse: /.data. or /.meta.yml + let filename = entry_path + .file_name() + .ok_or_else(|| anyhow!("Invalid entry path: {path_str}"))? + .to_string_lossy(); + + let (orig_id, is_data) = if let Some(id_str) = filename.strip_suffix(".meta.yml") { + let id: i64 = id_str + .parse() + .with_context(|| format!("Invalid ID in entry: {path_str}"))?; + (id, false) + } else if let Some(dot_pos) = filename.find(".data.") { + let id_str = &filename[..dot_pos]; + let id: i64 = id_str + .parse() + .with_context(|| format!("Invalid ID in entry: {path_str}"))?; + (id, true) + } else { + debug!("IMPORT_TAR: Skipping unrecognized entry: {path_str}"); + continue; + }; + + let entry_ref = entries_map.entry(orig_id).or_insert_with(|| TarEntry { + data_path: None, + meta_path: None, + }); + + if is_data { + let dest = tmp_path.join(format!("{orig_id}.data")); + let mut dest_file = fs::File::create(&dest).context("Cannot create temp data file")?; + let mut buf = [0u8; PIPESIZE]; + loop { + let n = entry.read(&mut buf)?; + if n == 0 { + break; + } + dest_file.write_all(&buf[..n])?; + } + entry_ref.data_path = Some(dest); + debug!("IMPORT_TAR: Extracted data for original ID {orig_id}"); + } else { + let dest = tmp_path.join(format!("{orig_id}.meta.yml")); + let mut dest_file = fs::File::create(&dest).context("Cannot create temp meta file")?; + let mut buf = [0u8; PIPESIZE]; + loop { + let n = entry.read(&mut buf)?; + if n == 0 { + break; + } + dest_file.write_all(&buf[..n])?; + } + entry_ref.meta_path = Some(dest); + debug!("IMPORT_TAR: Extracted meta for original ID {orig_id}"); + } + } + + if entries_map.is_empty() { + return Err(anyhow!("No items found in archive")); + } + + // Sort by original ID ascending + let mut sorted_ids: Vec = entries_map.keys().copied().collect(); + sorted_ids.sort_unstable(); + + let mut imported_ids = Vec::new(); + + for orig_id in sorted_ids { + let entry = entries_map.get(&orig_id).expect("ID should exist in map"); + + let meta_path = entry + .meta_path + .as_ref() + .ok_or_else(|| anyhow!("Item {orig_id} missing .meta.yml entry"))?; + let data_path_entry = entry + .data_path + .as_ref() + .ok_or_else(|| anyhow!("Item {orig_id} missing .data entry"))?; + + // Parse metadata + let meta_yaml = fs::read_to_string(meta_path) + .with_context(|| format!("Cannot read meta file for item {orig_id}"))?; + let import_meta: ImportMeta = serde_yaml::from_str(&meta_yaml) + .with_context(|| format!("Cannot parse meta file for item {orig_id}"))?; + + // Validate compression type + CompressionType::from_str(&import_meta.compression).map_err(|_| { + anyhow!( + "Invalid compression type '{}' for item {}", + import_meta.compression, + orig_id + ) + })?; + + // Create item with original timestamp + let item = db::insert_item_with_ts(conn, import_meta.ts, &import_meta.compression)?; + let new_id = item.id.context("New item missing ID")?; + + // Set tags + let tags = if !import_meta.tags.is_empty() { + db::set_item_tags(conn, item.clone(), &import_meta.tags)?; + import_meta.tags.clone() + } else { + Vec::new() + }; + + // Stream data to storage + let mut storage_path = data_path.to_path_buf(); + storage_path.push(new_id.to_string()); + + let mut reader = fs::File::open(data_path_entry) + .with_context(|| format!("Cannot read data file for item {orig_id}"))?; + let mut writer = fs::File::create(&storage_path) + .with_context(|| format!("Cannot create storage file for item {new_id}"))?; + let mut buf = [0u8; PIPESIZE]; + let mut total = 0i64; + loop { + let n = reader.read(&mut buf)?; + if n == 0 { + break; + } + writer.write_all(&buf[..n])?; + total += n as i64; + } + + if total == 0 { + return Err(anyhow!("Item {orig_id} has empty data file")); + } + + // Set metadata + for (key, value) in &import_meta.metadata { + db::query_upsert_meta( + conn, + db::Meta { + id: new_id, + name: key.clone(), + value: value.clone(), + }, + )?; + } + + // Update item size + let size_to_record = import_meta.size.unwrap_or(total); + let mut updated_item = item; + updated_item.size = Some(size_to_record); + db::update_item(conn, updated_item)?; + + log::info!("KEEP: Imported item {new_id} (was {orig_id}) tags: {tags:?}"); + imported_ids.push(new_id); + } + + Ok(imported_ids) +} diff --git a/src/lib.rs b/src/lib.rs index c9235ca..5e480f0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,9 @@ pub mod common; pub mod compression_engine; pub mod config; pub mod db; +pub mod export_tar; pub mod filter_plugin; +pub mod import_tar; pub mod meta_plugin; pub mod modes; pub mod services; diff --git a/src/meta_plugin/infer_plugin.rs b/src/meta_plugin/infer_plugin.rs index 9c7aac8..36b2f10 100644 --- a/src/meta_plugin/infer_plugin.rs +++ b/src/meta_plugin/infer_plugin.rs @@ -1,7 +1,7 @@ use crate::common::PIPESIZE; use crate::meta_plugin::{ - process_metadata_outputs, register_meta_plugin, BaseMetaPlugin, MetaPlugin, MetaPluginResponse, - MetaPluginType, + BaseMetaPlugin, MetaPlugin, MetaPluginResponse, MetaPluginType, process_metadata_outputs, + register_meta_plugin, }; #[derive(Debug, Default)] diff --git a/src/meta_plugin/tree_magic_mini.rs b/src/meta_plugin/tree_magic_mini.rs index ffbe2d2..870322b 100644 --- a/src/meta_plugin/tree_magic_mini.rs +++ b/src/meta_plugin/tree_magic_mini.rs @@ -1,7 +1,7 @@ use crate::common::PIPESIZE; use crate::meta_plugin::{ - process_metadata_outputs, register_meta_plugin, BaseMetaPlugin, MetaPlugin, MetaPluginResponse, - MetaPluginType, + BaseMetaPlugin, MetaPlugin, MetaPluginResponse, MetaPluginType, process_metadata_outputs, + register_meta_plugin, }; #[derive(Debug, Default)] diff --git a/src/modes/client/export.rs b/src/modes/client/export.rs index c3fbaf2..d947d27 100644 --- a/src/modes/client/export.rs +++ b/src/modes/client/export.rs @@ -4,16 +4,15 @@ use clap::Command; use log::debug; use std::collections::HashMap; use std::fs; -use std::io::{Read, Write}; use crate::client::KeepClient; +use crate::common::sanitize_ts_string; use crate::config; -use crate::modes::common::{ExportMeta, resolve_item_id, sanitize_tags}; -/// Export an item to data and metadata files via client. +/// Export items to a `.keep.tar` archive via client. /// -/// If no IDs or tags are specified, exports the latest item. -/// Streams data in fixed-size buffers without loading entire file into memory. +/// Sends a request to the server's `/api/export` endpoint and +/// streams the response to a local tar file. pub fn mode( client: &KeepClient, cmd: &mut Command, @@ -21,40 +20,38 @@ pub fn mode( ids: &[i64], tags: &[String], ) -> Result<()> { + // Validate: IDs XOR tags if !ids.is_empty() && !tags.is_empty() { cmd.error( clap::error::ErrorKind::InvalidValue, - "Both ID and tags given, you must supply either IDs or tags when using --export", + "Cannot use both IDs and tags with --export", ) .exit(); - } else if ids.len() > 1 { + } + if ids.is_empty() && tags.is_empty() { cmd.error( clap::error::ErrorKind::InvalidValue, - "More than one ID given, you must supply exactly one ID when using --export", + "Must provide either IDs or tags with --export", ) .exit(); } - let item_id = resolve_item_id(client, ids, tags)?; + // We need to resolve items on the server to compute the filename. + // First, get the item info to build the filename template variables. + // For the tar filename, we use {name}_{ts}.keep.tar where name comes from + // --export-name or default export_. + let dir_name = if let Some(ref name) = settings.export_name { + name.clone() + } else { + "export".to_string() + }; - // Get item info - let item_info = client.get_item_info(item_id)?; + let now = Utc::now(); + let ts_str = sanitize_ts_string(&now.format("%Y-%m-%dT%H:%M:%SZ").to_string()); - // Get streaming reader for raw compressed content - let (mut reader, compression) = client.get_item_content_stream(item_id)?; - - // Build template variables let mut vars = HashMap::new(); - vars.insert("id".to_string(), item_id.to_string()); - vars.insert("tags".to_string(), sanitize_tags(&item_info.tags)); - let ts = chrono::DateTime::parse_from_rfc3339(&item_info.ts) - .map(|dt| dt.with_timezone(&Utc)) - .unwrap_or_else(|_| Utc::now()); - vars.insert( - "ts".to_string(), - ts.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - ); - vars.insert("compression".to_string(), compression.clone()); + vars.insert("name".to_string(), dir_name); + vars.insert("ts".to_string(), ts_str); let basename = strfmt::strfmt(&settings.export_filename_format, &vars).map_err(|e| { anyhow!( @@ -64,36 +61,17 @@ pub fn mode( ) })?; - // Stream data file write with fixed-size buffers - let data_filename = format!("{}.data.{}", basename, compression); - let mut data_file = fs::File::create(&data_filename) - .with_context(|| format!("Cannot create data file: {}", data_filename))?; - let mut total_bytes: usize = 0; - crate::common::stream_copy(&mut reader, |chunk| { - data_file.write_all(chunk)?; - total_bytes += chunk.len(); - Ok(()) - })?; - debug!( - "CLIENT_EXPORT: Wrote {} bytes to {}", - total_bytes, data_filename - ); + let tar_filename = format!("{basename}.keep.tar"); - // Write meta file - let meta_filename = format!("{}.meta.yml", basename); - let export_meta = ExportMeta { - ts, - compression, - size: item_info.size, - tags: item_info.tags.clone(), - metadata: item_info.metadata.clone(), - }; - let meta_yaml = serde_yaml::to_string(&export_meta)?; - fs::write(&meta_filename, &meta_yaml) - .with_context(|| format!("Cannot write meta file: {}", meta_filename))?; - debug!("CLIENT_EXPORT: Wrote metadata to {}", meta_filename); + client + .export_items_to_file(ids, tags, std::path::Path::new(&tar_filename)) + .map_err(|e| anyhow!("Export failed: {e}"))?; - eprintln!("{} {}", data_filename, meta_filename); + if !settings.quiet { + eprintln!("{tar_filename}"); + } + + debug!("CLIENT_EXPORT: Wrote items to {tar_filename}"); Ok(()) } diff --git a/src/modes/client/get.rs b/src/modes/client/get.rs index 6734c72..668c558 100644 --- a/src/modes/client/get.rs +++ b/src/modes/client/get.rs @@ -35,7 +35,7 @@ pub fn mode( // Get streaming reader for raw content let (reader, compression) = client.get_item_content_stream(item_id)?; - let compression_type = CompressionType::from_str(&compression).unwrap_or(CompressionType::None); + let compression_type = CompressionType::from_str(&compression).unwrap_or(CompressionType::Raw); // Decompress through streaming readers let mut decompressed_reader: Box = diff --git a/src/modes/client/import.rs b/src/modes/client/import.rs index 8e89d93..d5fcf17 100644 --- a/src/modes/client/import.rs +++ b/src/modes/client/import.rs @@ -4,6 +4,7 @@ use log::debug; use std::collections::HashMap; use std::fs; use std::io::Read; +use std::path::Path; use crate::client::KeepClient; use crate::compression_engine::CompressionType; @@ -11,11 +12,61 @@ use crate::config; use crate::modes::common::ImportMeta; use std::str::FromStr; -/// Import an item from a metadata file via client. +/// Import items from a `.keep.tar` archive or legacy `.meta.yml` file via client. /// -/// Streams data to server without buffering entire file in memory. -/// Sends original timestamp to server so it's preserved. +/// For `.keep.tar` files, streams the archive to the server's `/api/import` endpoint. +/// For `.meta.yml` files, uses the legacy single-item import path. pub fn mode( + client: &KeepClient, + cmd: &mut Command, + settings: &config::Settings, + import_path: &str, +) -> Result<()> { + if import_path.ends_with(".keep.tar") { + import_tar(client, cmd, settings, import_path) + } else if import_path.ends_with(".meta.yml") { + import_legacy(client, cmd, settings, import_path) + } else { + cmd.error( + clap::error::ErrorKind::InvalidValue, + format!("Unsupported import format: {import_path} (expected .keep.tar or .meta.yml)"), + ) + .exit(); + } +} + +/// Import from a `.keep.tar` archive via the server API. +fn import_tar( + client: &KeepClient, + _cmd: &mut Command, + settings: &config::Settings, + tar_path: &str, +) -> Result<()> { + let path = Path::new(tar_path); + + let imported_ids = client + .import_tar_file(path) + .map_err(|e| anyhow!("Import failed: {e}"))?; + + if !settings.quiet { + println!( + "KEEP: Imported {} item(s): {:?}", + imported_ids.len(), + imported_ids + ); + } + + debug!( + "CLIENT_IMPORT: Imported {} items from {}", + imported_ids.len(), + tar_path + ); + + Ok(()) +} + +/// Legacy single-item import from a `.meta.yml` file. +fn import_legacy( client: &KeepClient, cmd: &mut Command, settings: &config::Settings, @@ -23,9 +74,9 @@ pub fn mode( ) -> Result<()> { // Read and parse metadata let meta_yaml = fs::read_to_string(meta_file) - .with_context(|| format!("Cannot read metadata file: {}", meta_file))?; + .with_context(|| format!("Cannot read metadata file: {meta_file}"))?; let import_meta: ImportMeta = serde_yaml::from_str(&meta_yaml) - .with_context(|| format!("Cannot parse metadata file: {}", meta_file))?; + .with_context(|| format!("Cannot parse metadata file: {meta_file}"))?; // Validate compression type CompressionType::from_str(&import_meta.compression).map_err(|_| { @@ -64,7 +115,7 @@ pub fn mode( client.post_stream("/api/item/", &mut reader, ¶m_refs)? } else { // For stdin, we need to buffer since stdin can't be seeked - // and post_stream may need to retry. Use a BufReader for efficiency. + // and post_stream may need to retry. let mut buf = Vec::new(); std::io::stdin() .read_to_end(&mut buf) diff --git a/src/modes/client/list.rs b/src/modes/client/list.rs index e9769f8..f132102 100644 --- a/src/modes/client/list.rs +++ b/src/modes/client/list.rs @@ -1,6 +1,6 @@ use crate::client::KeepClient; use crate::modes::common::{ - format_size, render_list_table_with_format, settings_output_format, ColumnType, OutputFormat, + ColumnType, OutputFormat, format_size, render_list_table_with_format, settings_output_format, }; use clap::Command; use log::debug; diff --git a/src/modes/client/save.rs b/src/modes/client/save.rs index 7897aba..1069450 100644 --- a/src/modes/client/save.rs +++ b/src/modes/client/save.rs @@ -39,7 +39,7 @@ pub fn mode( // Determine compression type from settings let compression_type = settings_compression_type(cmd, settings); let compression_type_str = compression_type.to_string(); - // In client mode, the client always handles compression (even "none"). + // In client mode, the client always handles compression (even "raw"). // The server should never re-compress client data. let server_compress = false; diff --git a/src/modes/export.rs b/src/modes/export.rs index 3728947..97a08d5 100644 --- a/src/modes/export.rs +++ b/src/modes/export.rs @@ -1,40 +1,44 @@ use anyhow::{Context, Result, anyhow}; -use chrono::{DateTime, Utc}; +use chrono::Utc; use clap::Command; use log::debug; use std::collections::HashMap; use std::fs; -use std::io::{Read, Write}; use std::path::PathBuf; +use crate::common::sanitize_ts_string; use crate::config; +use crate::export_tar; use crate::filter_plugin::FilterChain; -use crate::modes::common::{ExportMeta, sanitize_tags}; +use crate::modes::common::sanitize_tags; use crate::services::item_service::ItemService; +use crate::services::types::ItemWithMeta; -/// Export an item to data and metadata files. +/// Export items to a `.keep.tar` archive. /// -/// If no IDs or tags are specified, exports the latest item. -/// Writes `{basename}.data.{compression}` for raw data and `{basename}.meta.yml` for metadata. +/// Requires either IDs or tags (mutually exclusive). If IDs are given, +/// ALL must exist. Archives contain per-item data and metadata files. pub fn mode_export( cmd: &mut Command, settings: &config::Settings, - ids: &mut [i64], - tags: &mut [String], + ids: &[i64], + tags: &[String], conn: &mut rusqlite::Connection, data_path: PathBuf, filter_chain: Option, ) -> Result<()> { + // Validate: IDs XOR tags if !ids.is_empty() && !tags.is_empty() { cmd.error( clap::error::ErrorKind::InvalidValue, - "Both ID and tags given, you must supply either IDs or tags when using --export", + "Cannot use both IDs and tags with --export", ) .exit(); - } else if ids.len() > 1 { + } + if ids.is_empty() && tags.is_empty() { cmd.error( clap::error::ErrorKind::InvalidValue, - "More than one ID given, you must supply exactly one ID when using --export", + "Must provide either IDs or tags with --export", ) .exit(); } @@ -45,30 +49,71 @@ pub fn mode_export( .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(); - let item_with_meta = item_service - .find_item(conn, ids, tags, &meta_filter) - .map_err(|e| anyhow!("Unable to find matching item: {}", e))?; - let item_id = item_with_meta.item.id.context("Item missing ID")?; - let item_tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); - let meta_map = item_with_meta.meta_as_map(); + // Resolve items + let items: Vec = if !ids.is_empty() { + // Fetch each ID individually; ALL must exist + let mut result = Vec::new(); + for &id in ids { + match item_service.get_item(conn, id) { + Ok(item) => result.push(item), + Err(_) => { + cmd.error( + clap::error::ErrorKind::InvalidValue, + format!("Item {id} not found"), + ) + .exit(); + } + } + } + result + } else { + // Search by tags + item_service + .list_items(conn, tags, &meta_filter) + .map_err(|e| anyhow!("Unable to find matching items: {}", e))? + }; + + if items.is_empty() { + cmd.error( + clap::error::ErrorKind::InvalidValue, + "No items found matching the given criteria", + ) + .exit(); + } + + // Validate: --export-filename-format doesn't use per-item vars with multiple items + if items.len() > 1 { + let fmt = &settings.export_filename_format; + if fmt.contains("{id}") || fmt.contains("{tags}") || fmt.contains("{compression}") { + cmd.error( + clap::error::ErrorKind::InvalidValue, + "Cannot use {id}, {tags}, or {compression} in --export-filename-format when exporting multiple items", + ) + .exit(); + } + } + + // Compute export name + let dir_name = export_tar::export_name(&settings.export_name, &items); + + // Compute tar filename from format template + let now = Utc::now(); + let ts_str = sanitize_ts_string(&now.format("%Y-%m-%dT%H:%M:%SZ").to_string()); - // Build template variables let mut vars = HashMap::new(); - vars.insert("id".to_string(), item_id.to_string()); - vars.insert("tags".to_string(), sanitize_tags(&item_tags)); - vars.insert( - "ts".to_string(), - item_with_meta - .item - .ts - .format("%Y-%m-%dT%H:%M:%SZ") - .to_string(), - ); - vars.insert( - "compression".to_string(), - item_with_meta.item.compression.clone(), - ); + vars.insert("name".to_string(), dir_name.clone()); + vars.insert("ts".to_string(), ts_str.clone()); + + // For single-item exports, also provide per-item vars + if items.len() == 1 { + let item = &items[0]; + let item_id = item.item.id.context("Item missing ID")?; + let item_tags: Vec = item.tags.iter().map(|t| t.name.clone()).collect(); + vars.insert("id".to_string(), item_id.to_string()); + vars.insert("tags".to_string(), sanitize_tags(&item_tags)); + vars.insert("compression".to_string(), item.item.compression.clone()); + } let basename = strfmt::strfmt(&settings.export_filename_format, &vars).map_err(|e| { anyhow!( @@ -78,52 +123,27 @@ pub fn mode_export( ) })?; - // Write data file - let data_filename = format!("{}.data.{}", basename, item_with_meta.item.compression); + let tar_filename = format!("{basename}.keep.tar"); - let mut item_path = data_path.clone(); - item_path.push(item_id.to_string()); + // Write the tar archive + let tar_file = fs::File::create(&tar_filename) + .with_context(|| format!("Cannot create tar file: {tar_filename}"))?; - if filter_chain.is_some() { - // Apply filters: decompress, filter, write - let (mut reader, _, _) = item_service.get_item_content_info_streaming_with_chain( - conn, - item_id, - filter_chain.as_ref(), - )?; - let mut out_file = fs::File::create(&data_filename) - .with_context(|| format!("Cannot create data file: {}", data_filename))?; - let mut buf = [0u8; 8192]; - loop { - let n = reader.read(&mut buf)?; - if n == 0 { - break; - } - out_file.write_all(&buf[..n])?; - } - debug!("EXPORT: Wrote filtered data to {}", data_filename); - } else { - // Raw copy of compressed file - fs::copy(&item_path, &data_filename) - .with_context(|| format!("Cannot copy {} to {}", item_path.display(), data_filename))?; - debug!("EXPORT: Copied raw data to {}", data_filename); + export_tar::write_export_tar( + tar_file, + &dir_name, + &items, + &data_path, + filter_chain.as_ref(), + &item_service, + conn, + )?; + + if !settings.quiet { + eprintln!("{tar_filename}"); } - // Write meta file - let meta_filename = format!("{}.meta.yml", basename); - let export_meta = ExportMeta { - ts: item_with_meta.item.ts, - compression: item_with_meta.item.compression.clone(), - size: item_with_meta.item.size, - tags: item_tags, - metadata: meta_map, - }; - let meta_yaml = serde_yaml::to_string(&export_meta)?; - fs::write(&meta_filename, &meta_yaml) - .with_context(|| format!("Cannot write meta file: {}", meta_filename))?; - debug!("EXPORT: Wrote metadata to {}", meta_filename); - - eprintln!("{} {}", data_filename, meta_filename); + debug!("EXPORT: Wrote {} items to {tar_filename}", items.len()); Ok(()) } diff --git a/src/modes/generate_config.rs b/src/modes/generate_config.rs index 1945b08..0bda698 100644 --- a/src/modes/generate_config.rs +++ b/src/modes/generate_config.rs @@ -258,7 +258,7 @@ fn compression_description(name: &str) -> &str { "bzip2" => "High compression (requires bzip2 binary)", "xz" => "Very high compression (requires xz binary)", "zstd" => "Modern fast compression (requires zstd binary)", - "none" => "No compression", + "raw" => "No compression (alias: none)", _ => "", } } diff --git a/src/modes/import.rs b/src/modes/import.rs index b99d041..addc47c 100644 --- a/src/modes/import.rs +++ b/src/modes/import.rs @@ -12,12 +12,56 @@ use crate::common::PIPESIZE; use crate::compression_engine::CompressionType; use crate::config; use crate::db; +use crate::import_tar; use crate::modes::common::ImportMeta; -/// Import an item from a metadata file and optional data file. +/// Import items from a `.keep.tar` archive or legacy `.meta.yml` file. /// -/// If `import_data_file` is not provided, reads data from stdin. +/// For `.keep.tar` files, all items are imported in their original ID order, +/// each receiving a new auto-incremented ID from the database. +/// For `.meta.yml` files, the legacy single-item import is used. pub fn mode_import( + cmd: &mut Command, + settings: &config::Settings, + import_path: &str, + conn: &mut rusqlite::Connection, + data_path: PathBuf, +) -> Result<()> { + let path = PathBuf::from(import_path); + + if import_path.ends_with(".keep.tar") { + // New tar-based import + let imported_ids = import_tar::import_from_tar(&path, conn, &data_path)?; + + if !settings.quiet { + println!( + "KEEP: Imported {} item(s): {:?}", + imported_ids.len(), + imported_ids + ); + } + + debug!( + "IMPORT: Imported {} items from {}", + imported_ids.len(), + import_path + ); + } else if import_path.ends_with(".meta.yml") { + // Legacy single-item import + import_legacy(cmd, settings, import_path, conn, data_path)?; + } else { + cmd.error( + clap::error::ErrorKind::InvalidValue, + format!("Unsupported import format: {import_path} (expected .keep.tar or .meta.yml)"), + ) + .exit(); + } + + Ok(()) +} + +/// Legacy single-item import from a `.meta.yml` file. +fn import_legacy( cmd: &mut Command, settings: &config::Settings, meta_file: &str, @@ -26,9 +70,9 @@ pub fn mode_import( ) -> Result<()> { // Read metadata let meta_yaml = fs::read_to_string(meta_file) - .with_context(|| format!("Cannot read metadata file: {}", meta_file))?; + .with_context(|| format!("Cannot read metadata file: {meta_file}"))?; let import_meta: ImportMeta = serde_yaml::from_str(&meta_yaml) - .with_context(|| format!("Cannot parse metadata file: {}", meta_file))?; + .with_context(|| format!("Cannot parse metadata file: {meta_file}"))?; // Validate compression type CompressionType::from_str(&import_meta.compression).map_err(|_| { diff --git a/src/modes/list.rs b/src/modes/list.rs index c53b3a6..07ffc6c 100644 --- a/src/modes/list.rs +++ b/src/modes/list.rs @@ -5,7 +5,7 @@ /// including table, JSON, and YAML. use crate::config; use crate::modes::common::ColumnType; -use crate::modes::common::{apply_color, apply_table_attribute, format_size, OutputFormat}; +use crate::modes::common::{OutputFormat, apply_color, apply_table_attribute, format_size}; use crate::services::item_service::ItemService; use crate::services::types::ItemWithMeta; use anyhow::{Context, Result}; diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index f637d74..1015af5 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -14,6 +14,7 @@ use axum::{ }; use http_body_util::BodyExt; use log::{debug, warn}; +use serde::Deserialize; use std::collections::HashMap; use std::io::Read; use std::sync::Arc; @@ -412,8 +413,8 @@ pub async fn handle_post_item( let client_compression_type = params.compression_type.as_deref().map(|ct| { ct.parse::() .unwrap_or_else(|_| { - warn!("Unknown compression type from client: {ct}, defaulting to none"); - crate::compression_engine::CompressionType::None + warn!("Unknown compression type from client: {ct}, defaulting to raw"); + crate::compression_engine::CompressionType::Raw }) }); @@ -1680,3 +1681,247 @@ pub async fn handle_update_item( error: None, })) } + +/// Query parameters for the export endpoint. +#[derive(Deserialize)] +pub struct ExportQuery { + pub ids: Option, + pub tags: Option, +} + +/// Export items to a `.keep.tar` archive, streamed as the response body. +/// +/// GET /api/export?ids=1,2,3 or GET /api/export?tags=foo,bar +pub async fn handle_export_items( + State(state): State, + Query(params): Query, +) -> Result { + let ids: Vec = params + .ids + .as_ref() + .map(|s| { + s.split(',') + .filter_map(|id| id.trim().parse::().ok()) + .collect() + }) + .unwrap_or_default(); + + let tags: Vec = params + .tags + .as_ref() + .map(|s| parse_comma_tags(s)) + .unwrap_or_default(); + + if ids.is_empty() && tags.is_empty() { + return Err(StatusCode::BAD_REQUEST); + } + if !ids.is_empty() && !tags.is_empty() { + return Err(StatusCode::BAD_REQUEST); + } + + let data_dir = state.data_dir.clone(); + let db = state.db.clone(); + + // Resolve items in blocking task + let items_with_meta = task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_service = ItemService::new(data_dir.clone()); + if !ids.is_empty() { + let mut result = Vec::new(); + for &id in &ids { + match item_service.get_item(&conn, id) { + Ok(item) => result.push(item), + Err(_) => return Err(CoreError::ItemNotFound(id)), + } + } + Ok(result) + } else { + let meta_filter: HashMap> = HashMap::new(); + item_service.list_items(&conn, &tags, &meta_filter) + } + }) + .await + .map_err(|e| { + warn!("Export task failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + warn!("Failed to resolve items for export: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + if items_with_meta.is_empty() { + return Err(StatusCode::NOT_FOUND); + } + + // Compute export name + let dir_name = crate::export_tar::export_name(&None, &items_with_meta); + let now = chrono::Utc::now(); + let ts_str = now.format("%Y-%m-%dT%H:%M:%SZ").to_string(); + let ts_sanitized = crate::common::sanitize_ts_string(&ts_str); + let tar_stem = format!("{dir_name}_{ts_sanitized}"); + let tar_filename = format!("{tar_stem}.keep.tar"); + + // Stream tar via mpsc channel + let (tx, rx) = mpsc::channel::, std::io::Error>>(16); + let data_dir2 = state.data_dir.clone(); + let db2 = state.db.clone(); + + tokio::task::spawn_blocking(move || { + let conn = db2.blocking_lock(); + let item_service = ItemService::new(data_dir2.clone()); + + // Create a writer that sends chunks to the channel + struct ChannelWriter { + tx: tokio::sync::mpsc::Sender, std::io::Error>>, + } + + impl std::io::Write for ChannelWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let chunk = buf.to_vec(); + self.tx + .blocking_send(Ok(chunk)) + .map_err(|e| std::io::Error::other(format!("Channel send error: {e}")))?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + let writer = ChannelWriter { tx }; + + if let Err(e) = crate::export_tar::write_export_tar( + writer, + &tar_stem, + &items_with_meta, + &data_dir2, + None, + &item_service, + &conn, + ) { + warn!("Export tar write failed: {e}"); + let _ = tx.blocking_send(Err(std::io::Error::other(format!("Export failed: {e}")))); + } + // Channel drops here, signaling EOF to the stream + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let body = axum::body::Body::from_stream(stream); + + let disposition = format!("attachment; filename=\"{tar_filename}\""); + let response = Response::builder() + .header(header::CONTENT_TYPE, "application/x-tar") + .header(header::CONTENT_DISPOSITION, disposition) + .body(body) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + Ok(response) +} + +/// Import items from a `.keep.tar` archive streamed in the request body. +/// +/// POST /api/import (body: tar archive) +pub async fn handle_import_items( + State(state): State, + body: Body, +) -> Result>, StatusCode> { + let max_body_size: Option = state + .settings + .server + .as_ref() + .and_then(|s| s.max_body_size) + .filter(|&v| v > 0); + let (tx, rx) = mpsc::channel::, std::io::Error>>(16); + + // Async task: stream body into channel + tokio::spawn(async move { + let mut body = body; + let mut total_bytes: u64 = 0; + loop { + match body.frame().await { + None => break, + Some(Err(e)) => { + let _ = tx + .send(Err(std::io::Error::other(format!("Body error: {e}")))) + .await; + break; + } + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + total_bytes += data.len() as u64; + if let Some(limit) = max_body_size + && total_bytes > limit + { + let _ = tx + .send(Err(std::io::Error::other("Payload too large"))) + .await; + break; + } + if tx.send(Ok(data.to_vec())).await.is_err() { + break; + } + } + } + } + } + }); + + let data_dir = state.data_dir.clone(); + let db = state.db.clone(); + + // Blocking task: read tar from channel, write to temp file, import + let imported_ids = task::spawn_blocking(move || -> Result, CoreError> { + use std::io::Write; + + // Write the streamed tar to a temp file first + let tmp = tempfile::NamedTempFile::new().map_err(CoreError::Io)?; + let mut reader = MpscReader::new(rx); + let mut tmp_file = tmp.as_file(); + let mut buf = [0u8; crate::common::PIPESIZE]; + loop { + let n = reader.read(&mut buf)?; + if n == 0 { + break; + } + tmp_file.write_all(&buf[..n])?; + } + tmp_file.flush()?; + + let tmp_path = tmp.into_temp_path(); + + let mut conn = db.blocking_lock(); + let imported = crate::import_tar::import_from_tar(tmp_path.as_ref(), &mut conn, &data_dir) + .map_err(CoreError::Other)?; + + Ok(imported) + }) + .await + .map_err(|e| { + warn!("Import task failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + warn!("Failed to import tar: {e}"); + match &e { + crate::services::error::CoreError::Io(io_err) + if io_err.to_string() == "Payload too large" => + { + StatusCode::PAYLOAD_TOO_LARGE + } + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + let response_data = serde_json::json!({ + "ids": imported_ids, + "count": imported_ids.len(), + }); + + Ok(Json(ApiResponse { + success: true, + data: Some(response_data), + error: None, + })) +} diff --git a/src/modes/server/api/mod.rs b/src/modes/server/api/mod.rs index 7b28607..113da59 100644 --- a/src/modes/server/api/mod.rs +++ b/src/modes/server/api/mod.rs @@ -87,6 +87,8 @@ pub fn add_routes(router: Router) -> Router { .route("/api/item/{item_id}/info", get(item::handle_get_item_info)) .route("/api/item/{item_id}/update", post(item::handle_update_item)) .route("/api/diff", get(item::handle_diff_items)) + .route("/api/export", get(item::handle_export_items)) + .route("/api/import", post(item::handle_import_items)) } #[cfg(feature = "swagger")] diff --git a/src/services/item_service.rs b/src/services/item_service.rs index dccf1cf..ed4d3ae 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -823,8 +823,8 @@ impl ItemService { let engine = get_compression_engine(ct.clone())?; (ct, engine) } else { - let ct = client_compression_type.unwrap_or(CompressionType::None); - let engine = get_compression_engine(CompressionType::None)?; + let ct = client_compression_type.unwrap_or(CompressionType::Raw); + let engine = get_compression_engine(CompressionType::Raw)?; (ct, engine) }; diff --git a/src/tests/common/test_helpers.rs b/src/tests/common/test_helpers.rs index 2cddaec..4b30cb1 100644 --- a/src/tests/common/test_helpers.rs +++ b/src/tests/common/test_helpers.rs @@ -58,7 +58,7 @@ pub fn create_test_item(conn: &Connection) -> i64 { id: None, ts: chrono::Utc::now(), size: Some(100), - compression: crate::compression_engine::CompressionType::None.to_string(), + compression: crate::compression_engine::CompressionType::Raw.to_string(), }; db::insert_item(conn, item).expect("Failed to insert item") } diff --git a/src/tests/compression/none_tests.rs b/src/tests/compression/none_tests.rs index dd5bb05..136d16f 100644 --- a/src/tests/compression/none_tests.rs +++ b/src/tests/compression/none_tests.rs @@ -1,19 +1,19 @@ #[cfg(test)] mod tests { - use crate::compression_engine::none::CompressionEngineNone; + use crate::compression_engine::raw::CompressionEngineRaw; use crate::tests::common::test_helpers::test_compression_engine; #[test] - fn test_compression_engine_none() { + fn test_compression_engine_raw() { let test_data = b"test compression data"; - let engine = CompressionEngineNone {}; + let engine = CompressionEngineRaw {}; test_compression_engine(&engine, test_data); } #[test] - fn test_compression_engine_none_empty_data() { + fn test_compression_engine_raw_empty_data() { let test_data = b""; - let engine = CompressionEngineNone {}; + let engine = CompressionEngineRaw {}; test_compression_engine(&engine, test_data); } } diff --git a/src/tests/compression_types/conversion_tests.rs b/src/tests/compression_types/conversion_tests.rs index 89d5f78..fd1bb8f 100644 --- a/src/tests/compression_types/conversion_tests.rs +++ b/src/tests/compression_types/conversion_tests.rs @@ -7,7 +7,7 @@ mod tests { fn test_compression_type_display() { assert_eq!(format!("{}", CompressionType::LZ4), "lz4"); assert_eq!(format!("{}", CompressionType::GZip), "gzip"); - assert_eq!(format!("{}", CompressionType::None), "none"); + assert_eq!(format!("{}", CompressionType::Raw), "raw"); } #[test] @@ -21,8 +21,8 @@ mod tests { CompressionType::GZip ); assert_eq!( - CompressionType::from_str("none").unwrap(), - CompressionType::None + CompressionType::from_str("raw").unwrap(), + CompressionType::Raw ); // Test case insensitivity assert_eq!( @@ -34,8 +34,8 @@ mod tests { CompressionType::GZip ); assert_eq!( - CompressionType::from_str("NONE").unwrap(), - CompressionType::None + CompressionType::from_str("RAW").unwrap(), + CompressionType::Raw ); } @@ -46,4 +46,19 @@ mod tests { // "xz" is actually a valid compression type, so it should not error assert!(CompressionType::from_str("xz").is_ok()); } + + #[test] + fn test_compression_type_none_alias() { + // "none" is an alias for "raw" + assert_eq!( + CompressionType::from_str("none").unwrap(), + CompressionType::Raw + ); + assert_eq!( + CompressionType::from_str("NONE").unwrap(), + CompressionType::Raw + ); + // Display outputs "raw" (canonical name) + assert_eq!(format!("{}", CompressionType::Raw), "raw"); + } } diff --git a/src/tests/compression_types/factory_tests.rs b/src/tests/compression_types/factory_tests.rs index 45b7909..7272122 100644 --- a/src/tests/compression_types/factory_tests.rs +++ b/src/tests/compression_types/factory_tests.rs @@ -13,9 +13,9 @@ mod tests { .expect("Failed to get GZip engine"); assert!(gzip_engine.is_supported()); - let none_engine = compression_engine::get_compression_engine(CompressionType::None) - .expect("Failed to get None engine"); - assert!(none_engine.is_supported()); + let raw_engine = compression_engine::get_compression_engine(CompressionType::Raw) + .expect("Failed to get Raw engine"); + assert!(raw_engine.is_supported()); } #[test] diff --git a/src/tests/db/meta_tests.rs b/src/tests/db/meta_tests.rs index 9db2154..ac0d7a4 100644 --- a/src/tests/db/meta_tests.rs +++ b/src/tests/db/meta_tests.rs @@ -28,7 +28,7 @@ mod tests { id: Some(999), // Non-existent item ts: chrono::Utc::now(), size: Some(0), - compression: crate::compression_engine::CompressionType::None.to_string(), + compression: crate::compression_engine::CompressionType::Raw.to_string(), }; let metas = db::get_item_meta(&conn, &item); diff --git a/src/tests/db/tag_tests.rs b/src/tests/db/tag_tests.rs index b0741a5..5e672a4 100644 --- a/src/tests/db/tag_tests.rs +++ b/src/tests/db/tag_tests.rs @@ -33,7 +33,7 @@ mod tests { id: Some(999), // Non-existent item ts: chrono::Utc::now(), size: Some(0), - compression: crate::compression_engine::CompressionType::None.to_string(), + compression: crate::compression_engine::CompressionType::Raw.to_string(), }; let delete_result = db::delete_item_tags(&conn, item); diff --git a/src/tests/export_tar_tests.rs b/src/tests/export_tar_tests.rs new file mode 100644 index 0000000..3eac658 --- /dev/null +++ b/src/tests/export_tar_tests.rs @@ -0,0 +1,94 @@ +#[cfg(test)] +mod export_tar_tests { + use crate::db::{Item, Meta, Tag}; + use crate::export_tar::{common_tags, export_name}; + use crate::services::types::ItemWithMeta; + use chrono::Utc; + + fn make_item_with_tags(id: i64, tags: Vec<&str>) -> ItemWithMeta { + ItemWithMeta { + item: Item { + id: Some(id), + ts: Utc::now(), + size: Some(100), + compression: "raw".to_string(), + }, + tags: tags + .into_iter() + .map(|t| Tag { + id: 0, + name: t.to_string(), + }) + .collect(), + meta: Vec::new(), + } + } + + #[test] + fn test_common_tags_empty() { + let items: Vec = Vec::new(); + assert!(common_tags(&items).is_empty()); + } + + #[test] + fn test_common_tags_single_item() { + let items = vec![make_item_with_tags(1, vec!["foo", "bar"])]; + let tags = common_tags(&items); + assert_eq!(tags, vec!["bar", "foo"]); + } + + #[test] + fn test_common_tags_intersection() { + let items = vec![ + make_item_with_tags(1, vec!["foo", "bar", "baz"]), + make_item_with_tags(2, vec!["foo", "bar", "qux"]), + make_item_with_tags(3, vec!["foo", "baz"]), + ]; + let tags = common_tags(&items); + assert_eq!(tags, vec!["foo"]); + } + + #[test] + fn test_common_tags_no_intersection() { + let items = vec![ + make_item_with_tags(1, vec!["foo"]), + make_item_with_tags(2, vec!["bar"]), + ]; + let tags = common_tags(&items); + assert!(tags.is_empty()); + } + + #[test] + fn test_export_name_with_arg() { + let items = vec![make_item_with_tags(1, vec!["foo"])]; + let name = export_name(&Some("mybackup".to_string()), &items); + assert_eq!(name, "mybackup"); + } + + #[test] + fn test_export_name_default_with_tags() { + let items = vec![ + make_item_with_tags(1, vec!["foo", "bar"]), + make_item_with_tags(2, vec!["foo", "baz"]), + ]; + let name = export_name(&None, &items); + assert_eq!(name, "export_foo"); + } + + #[test] + fn test_export_name_default_no_common_tags() { + let items = vec![ + make_item_with_tags(1, vec!["foo"]), + make_item_with_tags(2, vec!["bar"]), + ]; + let name = export_name(&None, &items); + assert_eq!(name, "export"); + } + + #[test] + fn test_export_name_default_empty() { + let items: Vec = Vec::new(); + let name = export_name(&None, &items); + assert_eq!(name, "export"); + } +} diff --git a/src/tests/import_tar_tests.rs b/src/tests/import_tar_tests.rs new file mode 100644 index 0000000..e15a1e0 --- /dev/null +++ b/src/tests/import_tar_tests.rs @@ -0,0 +1,212 @@ +#[cfg(test)] +mod import_tar_tests { + use crate::db; + use crate::export_tar::write_export_tar; + use crate::import_tar::import_from_tar; + use crate::services::item_service::ItemService; + use crate::services::types::ItemWithMeta; + use anyhow::Result; + use chrono::Utc; + use std::io::Write; + use std::path::Path; + use tempfile::TempDir; + + fn setup_test_env() -> (TempDir, rusqlite::Connection, std::path::PathBuf) { + let temp_dir = TempDir::new().unwrap(); + let db_path = temp_dir.path().join("test.db"); + let conn = db::open(db_path).unwrap(); + let data_path = temp_dir.path().join("data"); + std::fs::create_dir_all(&data_path).unwrap(); + (temp_dir, conn, data_path) + } + + fn save_test_item( + conn: &mut rusqlite::Connection, + data_path: &Path, + content: &[u8], + tags: Vec<&str>, + compression: &str, + ) -> i64 { + let item = db::insert_item_with_ts(conn, Utc::now(), compression).unwrap(); + let item_id = item.id.unwrap(); + + // Write data file + let mut file_path = data_path.to_path_buf(); + file_path.push(item_id.to_string()); + let mut file = std::fs::File::create(&file_path).unwrap(); + file.write_all(content).unwrap(); + + // Set size + let mut updated = item; + updated.size = Some(content.len() as i64); + db::update_item(conn, updated).unwrap(); + + // Set tags + let tag_names: Vec = tags.into_iter().map(|t| t.to_string()).collect(); + db::set_item_tags( + conn, + crate::db::Item { + id: Some(item_id), + ts: Utc::now(), + size: Some(content.len() as i64), + compression: compression.to_string(), + }, + &tag_names, + ) + .unwrap(); + + item_id + } + + #[test] + fn test_roundtrip_export_import() -> Result<()> { + let (_dir, mut conn, data_path) = setup_test_env(); + + // Save test items + let id1 = save_test_item(&mut conn, &data_path, b"hello world", vec!["test"], "raw"); + let id2 = save_test_item( + &mut conn, + &data_path, + b"foo bar baz", + vec!["test", "extra"], + "raw", + ); + + // Get items with metadata + let item_service = ItemService::new(data_path.clone()); + let items = vec![ + item_service.get_item(&conn, id1)?, + item_service.get_item(&conn, id2)?, + ]; + + // Export to tar + let tar_path = _dir.path().join("test_export.keep.tar"); + let tar_file = std::fs::File::create(&tar_path)?; + write_export_tar( + tar_file, + "test_export", + &items, + &data_path, + None, + &item_service, + &conn, + )?; + + assert!(tar_path.exists()); + let tar_size = std::fs::metadata(&tar_path)?.len(); + assert!(tar_size > 0, "Tar file should not be empty"); + + // Clear database and data + let new_data_path = _dir.path().join("new_data"); + std::fs::create_dir_all(&new_data_path)?; + + // Import from tar + let new_ids = import_from_tar(&tar_path, &mut conn, &new_data_path)?; + + assert_eq!(new_ids.len(), 2, "Should import 2 items"); + + // Verify imported data + let mut imported_data1 = Vec::new(); + let mut f1 = std::fs::File::open(new_data_path.join(new_ids[0].to_string()))?; + std::io::Read::read_to_end(&mut f1, &mut imported_data1)?; + assert_eq!(imported_data1, b"hello world"); + + let mut imported_data2 = Vec::new(); + let mut f2 = std::fs::File::open(new_data_path.join(new_ids[1].to_string()))?; + std::io::Read::read_to_end(&mut f2, &mut imported_data2)?; + assert_eq!(imported_data2, b"foo bar baz"); + + Ok(()) + } + + #[test] + fn test_import_preserves_id_order() -> Result<()> { + let (_dir, mut conn, data_path) = setup_test_env(); + + // Save items with specific IDs (they'll be auto-assigned 1, 2, 3) + save_test_item(&mut conn, &data_path, b"item1", vec!["a"], "raw"); + save_test_item(&mut conn, &data_path, b"item2", vec!["b"], "raw"); + save_test_item(&mut conn, &data_path, b"item3", vec!["c"], "raw"); + + let item_service = ItemService::new(data_path.clone()); + let items = vec![ + item_service.get_item(&conn, 1)?, + item_service.get_item(&conn, 2)?, + item_service.get_item(&conn, 3)?, + ]; + + // Export + let tar_path = _dir.path().join("order_test.keep.tar"); + let tar_file = std::fs::File::create(&tar_path)?; + write_export_tar( + tar_file, + "order_test", + &items, + &data_path, + None, + &item_service, + &conn, + )?; + + // Import into new data dir + let new_data_path = _dir.path().join("new_data"); + std::fs::create_dir_all(&new_data_path)?; + + let new_ids = import_from_tar(&tar_path, &mut conn, &new_data_path)?; + + // IDs should be 4, 5, 6 (next available after 1, 2, 3) + assert_eq!(new_ids, vec![4, 5, 6]); + + Ok(()) + } + + #[test] + fn test_import_empty_tar_error() { + let (_dir, mut conn, data_path) = setup_test_env(); + + // Create an empty tar file + let tar_path = _dir.path().join("empty.keep.tar"); + { + let tar_file = std::fs::File::create(&tar_path).unwrap(); + let mut builder = tar::Builder::new(tar_file); + builder.finish().unwrap(); + } + + let result = import_from_tar(&tar_path, &mut conn, &data_path); + assert!(result.is_err(), "Empty tar should return an error"); + } + + #[test] + fn test_common_tags_intersection() { + use crate::db::{Item, Tag}; + use crate::export_tar::common_tags; + + let make_item = |tags: Vec<&str>| ItemWithMeta { + item: Item { + id: Some(1), + ts: Utc::now(), + size: None, + compression: "raw".to_string(), + }, + tags: tags + .into_iter() + .map(|t| Tag { + id: 0, + name: t.to_string(), + }) + .collect(), + meta: Vec::new(), + }; + + let items = vec![ + make_item(vec!["a", "b", "c"]), + make_item(vec!["a", "b", "d"]), + make_item(vec!["a", "c", "d"]), + ]; + assert_eq!(common_tags(&items), vec!["a"]); + + let items_single = vec![make_item(vec!["x", "y"])]; + let tags = common_tags(&items_single); + assert_eq!(tags, vec!["x", "y"]); + } +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 132feca..00986d6 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -3,6 +3,8 @@ pub mod compression; pub mod compression_engine; pub mod compression_types; pub mod db; +pub mod export_tar_tests; +pub mod import_tar_tests; pub mod meta_plugin; pub mod modes; pub mod server;