diff --git a/Cargo.lock b/Cargo.lock index c223d7e..1640232 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1713,6 +1713,7 @@ dependencies = [ "sha2 0.10.9", "similar", "smart-default", + "strfmt", "strip-ansi-escapes", "strum", "subtle", @@ -2812,6 +2813,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "strfmt" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29fdc163db75f7b5ffa3daf0c5a7136fb0d4b2f35523cd1769da05e034159feb" + [[package]] name = "strip-ansi-escapes" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index f9a29b0..917e3c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ sha2 = "0.10" md5 = "0.7" subtle = "2.6" env_logger = "0.11" +strfmt = "0.2" strum = { version = "0.27", features = ["derive"] } term = "1.2" tokio = { version = "1.0", features = ["full"] } diff --git a/src/args.rs b/src/args.rs index b0bf5b2..e576e4f 100644 --- a/src/args.rs +++ b/src/args.rs @@ -24,56 +24,64 @@ pub struct Args { /// Struct for mode-specific arguments, defining CLI flags for different operations. #[derive(Parser, Debug, Clone)] pub struct ModeArgs { - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["get", "diff", "list", "delete", "info", "update", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["get", "diff", "list", "delete", "info", "update", "status", "export", "import"]))] #[arg(help("Save an item using any tags or metadata provided"))] pub save: bool, - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "diff", "list", "delete", "info", "update", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "diff", "list", "delete", "info", "update", "status", "export", "import"]))] #[arg(help( "Get an item either by it's ID or by a combination of matching tags and metatdata" ))] pub get: bool, - #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "list", "delete", "info", "update", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "list", "delete", "info", "update", "status", "export", "import"]))] #[arg(help("Show a diff between two items by ID"))] pub diff: bool, - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "delete", "info", "update", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "delete", "info", "update", "status", "export", "import"]))] #[arg(help("List items, filtering on tags or metadata if given"))] pub list: bool, - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "info", "update", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "info", "update", "status", "export", "import"]))] #[arg(help("Delete items either by ID or by matching tags"))] #[arg(requires = "ids_or_tags")] pub delete: bool, - #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "delete", "update", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short, long, conflicts_with_all(["save", "get", "diff", "list", "delete", "update", "status", "export", "import"]))] #[arg(help( "Get an item either by it's ID or by a combination of matching tags and metatdata" ))] pub info: bool, - #[arg(group("mode"), help_heading("Mode Options"), short('u'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "status"]))] + #[arg(group("mode"), help_heading("Mode Options"), short('u'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "status", "export", "import"]))] #[arg(help("Update an item's tags and metadata by ID"))] pub update: bool, - #[arg(group("mode"), help_heading("Mode Options"), short('S'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "server", "status_plugins"]))] + #[arg(group("mode"), help_heading("Mode Options"), short('S'), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "server", "status_plugins", "export", "import"]))] #[arg(help("Show status of directories and supported compression algorithms"))] pub status: bool, - #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server"]))] + #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server", "export", "import"]))] #[arg(help("Show available plugins and their configurations"))] pub status_plugins: bool, + #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "import"]))] + #[arg(help("Export an item to data and metadata files (default: latest item)"))] + pub export: bool, + + #[arg(group("mode"), help_heading("Mode Options"), long, value_name("META_FILE"), conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "export"]))] + #[arg(help("Import an item from a metadata file (data from --import-data-file or stdin)"))] + pub import: Option, + #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status"]))] #[arg(help("Start REST HTTP server"))] pub server: bool, - #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server"]))] + #[arg(group("mode"), help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server", "export", "import"]))] #[arg(help("Generate default configuration and output to stdout"))] pub generate_config: bool, - #[arg(help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server", "generate_config"]))] + #[arg(help_heading("Mode Options"), long, conflicts_with_all(["save", "get", "diff", "list", "delete", "info", "update", "status", "server", "generate_config", "export", "import"]))] #[arg(help("Generate shell completion script (bash, zsh, fish, elvish, powershell)"))] pub generate_completion: Option, @@ -192,6 +200,18 @@ pub struct ItemArgs { #[arg(help_heading("Item Options"), long, env("KEEP_FILTERS"))] #[arg(help("Filter string to apply to content when getting items"))] pub filters: Option, + + #[arg( + help_heading("Export Options"), + long, + default_value = "{id}_{tags}_{ts}" + )] + #[arg(help("Template for export filename. Variables: {id} {tags} {ts} {compression}"))] + pub export_filename_format: String, + + #[arg(help_heading("Import Options"), long, value_name("DATA_FILE"))] + #[arg(help("Data file for import (reads from stdin if omitted)"))] + pub import_data_file: Option, } /// Struct for general options, including verbosity, paths, and output settings. diff --git a/src/client.rs b/src/client.rs index 1c04af7..20896bd 100644 --- a/src/client.rs +++ b/src/client.rs @@ -357,6 +357,20 @@ impl KeepClient { } pub fn get_item_content_raw(&self, id: i64) -> Result<(Vec, String), CoreError> { + let (mut reader, compression) = self.get_item_content_stream(id)?; + let mut bytes = Vec::new(); + reader + .read_to_end(&mut bytes) + .map_err(|e| CoreError::Other(anyhow::anyhow!("{}", e)))?; + Ok((bytes, compression)) + } + + /// Get a streaming reader for item content without decompression. + /// + /// Returns a reader over the HTTP response body and the compression type + /// from the X-Keep-Compression header. The caller can stream through + /// decompression readers without buffering the entire file in memory. + pub fn get_item_content_stream(&self, id: i64) -> Result<(Box, String), CoreError> { let url = format!( "{}?decompress=false", self.url(&format!("/api/item/{id}/content")) @@ -376,12 +390,8 @@ impl KeepClient { .unwrap_or("none") .to_string(); - let mut body = response.into_body(); - let bytes = body - .read_to_vec() - .map_err(|e| CoreError::Other(anyhow::anyhow!("{}", e)))?; - - Ok((bytes, compression)) + let reader = response.into_body().into_reader(); + Ok((Box::new(reader), compression)) } pub fn diff_items(&self, id_a: i64, id_b: i64) -> Result, CoreError> { diff --git a/src/config.rs b/src/config.rs index 380022e..a044d35 100644 --- a/src/config.rs +++ b/src/config.rs @@ -212,6 +212,12 @@ pub struct Settings { // Metadata key-value pairs from --meta CLI flag #[serde(skip)] pub meta: Vec<(String, Option)>, + // Export filename format template (--export-filename-format) + #[serde(skip)] + pub export_filename_format: String, + // Import data file path (--import-data-file) + #[serde(skip)] + pub import_data_file: Option, } impl Settings { @@ -526,6 +532,10 @@ impl Settings { }) .collect(); + // Set export filename format from CLI args + settings.export_filename_format = args.item.export_filename_format.clone(); + settings.import_data_file = args.item.import_data_file.clone(); + debug!("CONFIG: Final settings: {settings:?}"); Ok(settings) } diff --git a/src/db.rs b/src/db.rs index 0d6d86a..7488fee 100644 --- a/src/db.rs +++ b/src/db.rs @@ -292,6 +292,35 @@ pub fn create_item( }) } +/// Creates a new item with a specific timestamp (for import). +/// +/// # Arguments +/// +/// * `conn` - Database connection. +/// * `ts` - Timestamp to use for the item. +/// * `compression` - Compression type string (e.g., "lz4", "gzip", "none"). +/// +/// # Returns +/// +/// * `Result` - The created item with its ID set. +pub fn insert_item_with_ts( + conn: &Connection, + ts: chrono::DateTime, + compression: &str, +) -> Result { + let item = Item { + id: None, + ts, + size: None, + compression: compression.to_string(), + }; + let item_id = insert_item(conn, item.clone())?; + Ok(Item { + id: Some(item_id), + ..item + }) +} + /// Adds a tag to an item. /// /// Inserts a new tag association in the `tags` table. diff --git a/src/main.rs b/src/main.rs index 95dceb7..2a47459 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,7 +81,7 @@ fn main() -> Result<(), Error> { let ids = &mut Vec::new(); let tags = &mut Vec::new(); - // For --info and --get modes, treat numeric strings as IDs + // For --info, --get, and --export modes, treat numeric strings as IDs for v in args.ids_or_tags.iter() { debug!("MAIN: Parsed value: {v:?}"); match v.clone() { @@ -90,15 +90,15 @@ fn main() -> Result<(), Error> { ids.push(num) } NumberOrString::Str(str) => { - // For --info and --get, try to parse strings as numbers to treat them as IDs - if (args.mode.info || args.mode.get) + // For --info, --get, and --export, try to parse strings as numbers to treat them as IDs + if (args.mode.info || args.mode.get || args.mode.export) && let Ok(num) = str.parse::() { debug!("MAIN: Adding parsed string to ids: {num}"); ids.push(num); continue; } - // If not a number, or not using --info/--get, treat as tag + // If not a number, or not using --info/--get/--export, treat as tag debug!("MAIN: Adding to tags: {str}"); tags.push(str) } @@ -118,6 +118,8 @@ fn main() -> Result<(), Error> { Delete, Info, Update, + Export, + Import, Status, StatusPlugins, Server, @@ -140,6 +142,10 @@ fn main() -> Result<(), Error> { mode = KeepModes::Info; } else if args.mode.update { mode = KeepModes::Update; + } else if args.mode.export { + mode = KeepModes::Export; + } else if args.mode.import.is_some() { + mode = KeepModes::Import; } else if args.mode.status { mode = KeepModes::Status; } else if args.mode.status_plugins { @@ -258,6 +264,13 @@ fn main() -> Result<(), Error> { KeepModes::Update => { keep::modes::client::update::mode(&client, &mut cmd, &settings, ids, tags) } + KeepModes::Export => { + keep::modes::client::export::mode(&client, &mut cmd, &settings, ids, tags) + } + KeepModes::Import => { + let meta_file = args.mode.import.as_ref().unwrap(); + keep::modes::client::import::mode(&client, &mut cmd, &settings, meta_file) + } _ => { cmd.error( ErrorKind::InvalidValue, @@ -313,6 +326,19 @@ fn main() -> Result<(), Error> { KeepModes::Update => { modes::update::mode_update(&mut cmd, &settings, ids, tags, &mut conn, data_path) } + KeepModes::Export => modes::export::mode_export( + &mut cmd, + &settings, + ids, + tags, + &mut conn, + data_path, + filter_chain, + ), + KeepModes::Import => { + let meta_file = args.mode.import.as_ref().unwrap(); + modes::import::mode_import(&mut cmd, &settings, meta_file, &mut conn, data_path) + } KeepModes::Status => modes::status::mode_status(&mut cmd, &settings, data_path, db_path), KeepModes::StatusPlugins => { modes::status_plugins::mode_status_plugins(&mut cmd, &settings, data_path, db_path) diff --git a/src/modes/client/export.rs b/src/modes/client/export.rs new file mode 100644 index 0000000..7288858 --- /dev/null +++ b/src/modes/client/export.rs @@ -0,0 +1,93 @@ +use anyhow::{anyhow, Context, Result}; +use chrono::Utc; +use clap::Command; +use log::debug; +use std::collections::HashMap; +use std::fs; +use std::io::{Read, Write}; + +use crate::client::KeepClient; +use crate::config; +use crate::modes::common::{resolve_item_id, sanitize_tags, ExportMeta}; + +/// Export an item to data and metadata files via client. +/// +/// If no IDs or tags are specified, exports the latest item. +/// Streams data in fixed-size buffers without loading entire file into memory. +pub fn mode( + client: &KeepClient, + cmd: &mut Command, + settings: &config::Settings, + ids: &[i64], + tags: &[String], +) -> Result<()> { + if ids.len() > 1 { + cmd.error( + clap::error::ErrorKind::InvalidValue, + "More than one ID given, you must supply exactly one ID when using --export", + ) + .exit(); + } + + let item_id = resolve_item_id(client, ids, tags)?; + + // Get item info + let item_info = client.get_item_info(item_id)?; + + // Get streaming reader for raw compressed content + let (mut reader, compression) = client.get_item_content_stream(item_id)?; + + // Build template variables + let mut vars = HashMap::new(); + vars.insert("id".to_string(), item_id.to_string()); + vars.insert("tags".to_string(), sanitize_tags(&item_info.tags)); + let ts = chrono::DateTime::parse_from_rfc3339(&item_info.ts) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + vars.insert( + "ts".to_string(), + ts.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + ); + vars.insert("compression".to_string(), compression.clone()); + + let basename = strfmt::strfmt(&settings.export_filename_format, &vars).map_err(|e| { + anyhow!( + "Invalid export filename format '{}': {}", + settings.export_filename_format, + e + ) + })?; + + // Stream data file write with fixed-size buffers + let data_filename = format!("{}.data.{}", basename, compression); + let mut data_file = fs::File::create(&data_filename) + .with_context(|| format!("Cannot create data file: {}", data_filename))?; + let mut total_bytes: usize = 0; + crate::common::stream_copy(&mut reader, |chunk| { + data_file.write_all(chunk)?; + total_bytes += chunk.len(); + Ok(()) + })?; + debug!( + "CLIENT_EXPORT: Wrote {} bytes to {}", + total_bytes, data_filename + ); + + // Write meta file + let meta_filename = format!("{}.meta.yml", basename); + let export_meta = ExportMeta { + ts, + compression, + size: item_info.size, + tags: item_info.tags.clone(), + metadata: item_info.metadata.clone(), + }; + let meta_yaml = serde_yaml::to_string(&export_meta)?; + fs::write(&meta_filename, &meta_yaml) + .with_context(|| format!("Cannot write meta file: {}", meta_filename))?; + debug!("CLIENT_EXPORT: Wrote metadata to {}", meta_filename); + + eprintln!("{} {}", data_filename, meta_filename); + + Ok(()) +} diff --git a/src/modes/client/get.rs b/src/modes/client/get.rs index 53b63ab..3cf9646 100644 --- a/src/modes/client/get.rs +++ b/src/modes/client/get.rs @@ -1,9 +1,9 @@ use crate::client::KeepClient; use crate::compression_engine::CompressionType; use crate::filter_plugin::FilterChain; +use crate::modes::common::{check_binary_tty, resolve_item_id}; use anyhow::Result; use clap::Command; -use is_terminal::IsTerminal; use log::debug; use std::io::{Read, Write}; use std::str::FromStr; @@ -18,83 +18,58 @@ pub fn mode( ) -> Result<(), anyhow::Error> { debug!("CLIENT_GET: Getting item via remote server"); - // Find the item ID - let item_id = if !ids.is_empty() { - ids[0] - } else if !tags.is_empty() { - // Find item by tags - let items = client.list_items(tags, "newest", 0, 1, &std::collections::HashMap::new())?; - if items.is_empty() { - return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags)); - } - items[0].id - } else { - // Get latest item - let items = client.list_items(&[], "newest", 0, 1, &std::collections::HashMap::new())?; - if items.is_empty() { - return Err(anyhow::anyhow!("No items found")); - } - items[0].id - }; + let item_id = resolve_item_id(client, ids, tags)?; - // Get item info to determine compression type + // Get item info for metadata let item_info = client.get_item_info(item_id)?; + let metadata = &item_info.metadata; - // Get raw content from server - let (raw_bytes, compression) = client.get_item_content_raw(item_id)?; - - // Check if binary content would be sent to TTY - let is_text = item_info - .metadata - .get("text") - .map(|v| v == "true") - .unwrap_or(false); - - if std::io::stdout().is_terminal() && !is_text && !settings.force { - // Check if content is binary - let sample_len = std::cmp::min(raw_bytes.len(), 8192); - if crate::common::is_binary::is_binary(&raw_bytes[..sample_len]) { - return Err(anyhow::anyhow!( - "Refusing to output binary data to a terminal. Use --force to override." - )); - } - } - - // Decompress locally using the server-reported compression type + // Get streaming reader for raw content + let (reader, compression) = client.get_item_content_stream(item_id)?; let compression_type = CompressionType::from_str(&compression).unwrap_or(CompressionType::None); - let decompressed = match compression_type { + // Decompress through streaming readers + let mut decompressed_reader: Box = match compression_type { CompressionType::GZip => { use flate2::read::GzDecoder; - let mut decoder = GzDecoder::new(&raw_bytes[..]); - let mut content = Vec::new(); - decoder.read_to_end(&mut content)?; - content + Box::new(GzDecoder::new(reader)) } CompressionType::LZ4 => { use lz4_flex::frame::FrameDecoder; - let mut decoder = FrameDecoder::new(&raw_bytes[..]); - let mut content = Vec::new(); - decoder.read_to_end(&mut content)?; - content + Box::new(FrameDecoder::new(reader)) } - _ => raw_bytes, + _ => reader, }; - // Apply filters if present - let output = if let Some(mut chain) = filter_chain { - let mut filtered = Vec::new(); - chain.filter(&mut &decompressed[..], &mut filtered)?; - filtered + // Binary detection: sample first chunk + let mut sample_buf = [0u8; crate::common::PIPESIZE]; + let sample_len = decompressed_reader.read(&mut sample_buf)?; + check_binary_tty(metadata, &sample_buf[..sample_len], settings.force)?; + + // If filters present, buffer through filter chain; otherwise stream directly + if let Some(mut chain) = filter_chain { + // Apply filter to sample first, then remaining + let mut output = Vec::new(); + chain.filter(&mut &sample_buf[..sample_len], &mut output)?; + crate::common::stream_copy(&mut decompressed_reader, |chunk| { + chain.filter(&mut std::io::Cursor::new(chunk), &mut output)?; + Ok(()) + })?; + let stdout = std::io::stdout(); + let mut stdout = stdout.lock(); + stdout.write_all(&output)?; + stdout.flush()?; } else { - decompressed - }; - - // Stream to stdout - let stdout = std::io::stdout(); - let mut stdout = stdout.lock(); - stdout.write_all(&output)?; - stdout.flush()?; + // Stream decompressed content to stdout + let stdout = std::io::stdout(); + let mut stdout = stdout.lock(); + stdout.write_all(&sample_buf[..sample_len])?; + crate::common::stream_copy(&mut decompressed_reader, |chunk| { + stdout.write_all(chunk)?; + Ok(()) + })?; + stdout.flush()?; + } Ok(()) } diff --git a/src/modes/client/import.rs b/src/modes/client/import.rs new file mode 100644 index 0000000..8e89d93 --- /dev/null +++ b/src/modes/client/import.rs @@ -0,0 +1,109 @@ +use anyhow::{Context, Result, anyhow}; +use clap::Command; +use log::debug; +use std::collections::HashMap; +use std::fs; +use std::io::Read; + +use crate::client::KeepClient; +use crate::compression_engine::CompressionType; +use crate::config; +use crate::modes::common::ImportMeta; +use std::str::FromStr; + +/// Import an item from a metadata file via client. +/// +/// Streams data to server without buffering entire file in memory. +/// Sends original timestamp to server so it's preserved. +pub fn mode( + client: &KeepClient, + cmd: &mut Command, + settings: &config::Settings, + meta_file: &str, +) -> Result<()> { + // Read and parse metadata + let meta_yaml = fs::read_to_string(meta_file) + .with_context(|| format!("Cannot read metadata file: {}", meta_file))?; + let import_meta: ImportMeta = serde_yaml::from_str(&meta_yaml) + .with_context(|| format!("Cannot parse metadata file: {}", meta_file))?; + + // Validate compression type + CompressionType::from_str(&import_meta.compression).map_err(|_| { + anyhow!( + "Invalid compression type '{}' in metadata file", + import_meta.compression + ) + })?; + + debug!( + "CLIENT_IMPORT: Parsed meta: ts={}, compression={}, tags={:?}", + import_meta.ts, import_meta.compression, import_meta.tags + ); + + // Build query parameters + let ts_str = import_meta.ts.to_rfc3339(); + let params = [ + ("compress".to_string(), "false".to_string()), + ("meta".to_string(), "false".to_string()), + ("tags".to_string(), import_meta.tags.join(",")), + ( + "compression_type".to_string(), + import_meta.compression.clone(), + ), + ("ts".to_string(), ts_str), + ]; + let param_refs: Vec<(&str, &str)> = params + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + + // Stream data to server without buffering entire file + let item_info = if let Some(ref data_file) = settings.import_data_file { + let mut reader = fs::File::open(data_file) + .with_context(|| format!("Cannot read data file: {}", data_file.display()))?; + client.post_stream("/api/item/", &mut reader, ¶m_refs)? + } else { + // For stdin, we need to buffer since stdin can't be seeked + // and post_stream may need to retry. Use a BufReader for efficiency. + let mut buf = Vec::new(); + std::io::stdin() + .read_to_end(&mut buf) + .context("Cannot read data from stdin")?; + if buf.is_empty() { + cmd.error( + clap::error::ErrorKind::InvalidValue, + "No data provided (empty stdin)", + ) + .exit(); + } + let mut cursor = std::io::Cursor::new(&buf); + client.post_stream("/api/item/", &mut cursor, ¶m_refs)? + }; + + let item_id = item_info.id; + debug!("CLIENT_IMPORT: Created item {} via server", item_id); + + // Set uncompressed size if known from metadata + if let Some(size) = import_meta.size { + client.set_item_size(item_id, size as u64)?; + debug!("CLIENT_IMPORT: Set size to {}", size); + } + + // Post metadata + if !import_meta.metadata.is_empty() { + client.post_metadata(item_id, &import_meta.metadata)?; + debug!( + "CLIENT_IMPORT: Set {} metadata entries", + import_meta.metadata.len() + ); + } + + if !settings.quiet { + println!( + "KEEP: Imported item {} tags: {:?}", + item_id, import_meta.tags + ); + } + + Ok(()) +} diff --git a/src/modes/client/info.rs b/src/modes/client/info.rs index 641d54d..c2dc7a1 100644 --- a/src/modes/client/info.rs +++ b/src/modes/client/info.rs @@ -1,6 +1,7 @@ use crate::client::KeepClient; use crate::modes::common::{ - DisplayItemInfo, OutputFormat, format_size, render_item_info_table, settings_output_format, + DisplayItemInfo, OutputFormat, format_size, render_item_info_table, resolve_item_ids, + settings_output_format, }; use clap::Command; use log::debug; @@ -15,17 +16,7 @@ pub fn mode( debug!("CLIENT_INFO: Getting item info via remote server"); let output_format = settings_output_format(settings); - - // If tags provided, find matching item first - let item_ids: Vec = if !tags.is_empty() { - let items = client.list_items(tags, "newest", 0, 1, &std::collections::HashMap::new())?; - if items.is_empty() { - return Err(anyhow::anyhow!("No items found matching tags: {:?}", tags)); - } - items.into_iter().map(|i| i.id).collect() - } else { - ids.to_vec() - }; + let item_ids = resolve_item_ids(client, ids, tags)?; for &id in &item_ids { let item = client.get_item_info(id)?; diff --git a/src/modes/client/mod.rs b/src/modes/client/mod.rs index eb6084c..90b5b82 100644 --- a/src/modes/client/mod.rs +++ b/src/modes/client/mod.rs @@ -1,6 +1,8 @@ pub mod delete; pub mod diff; +pub mod export; pub mod get; +pub mod import; pub mod info; pub mod list; pub mod save; diff --git a/src/modes/client/save.rs b/src/modes/client/save.rs index c92f56c..7a1dfd4 100644 --- a/src/modes/client/save.rs +++ b/src/modes/client/save.rs @@ -75,8 +75,8 @@ pub fn mode( // Wrap pipe writer with appropriate compression let mut compressor: Box = match compression_type_clone { CompressionType::GZip => { - use flate2::write::GzEncoder; use flate2::Compression; + use flate2::write::GzEncoder; Box::new(GzEncoder::new(pipe_writer, Compression::default())) } CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(pipe_writer)), diff --git a/src/modes/common.rs b/src/modes/common.rs index bd5cb98..4bb2782 100644 --- a/src/modes/common.rs +++ b/src/modes/common.rs @@ -16,11 +16,14 @@ use crate::compression_engine::CompressionType; /// ``` use crate::config; use crate::meta_plugin::MetaPluginType; +use anyhow::{Result, anyhow}; +use chrono::{DateTime, Utc}; use clap::Command; use clap::error::ErrorKind; use comfy_table::{Attribute, Cell, ContentArrangement, Table}; use log::debug; use regex::Regex; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::env; use std::io::IsTerminal; @@ -618,3 +621,111 @@ pub fn build_path_table(path_info: &PathInfo, table_config: &config::TableConfig path_table } + +/// Sanitize tags for use in filenames. +/// +/// Replaces non-alphanumeric characters with underscores and joins with `_`. +/// Empty tags are filtered out to avoid double underscores. +pub fn sanitize_tags(tags: &[String]) -> String { + tags.iter() + .filter(|t| !t.is_empty()) + .map(|t| { + t.chars() + .map(|c| if c.is_alphanumeric() { c } else { '_' }) + .collect::() + }) + .collect::>() + .join("_") +} + +/// Metadata structure for export to YAML. Shared by local and client export modes. +#[derive(Debug, Serialize)] +pub struct ExportMeta { + pub ts: DateTime, + pub compression: String, + pub size: Option, + pub tags: Vec, + pub metadata: HashMap, +} + +/// Metadata structure for import from YAML. Shared by local and client import modes. +#[derive(Debug, Deserialize)] +pub struct ImportMeta { + pub ts: DateTime, + pub compression: String, + #[serde(default)] + pub size: Option, + #[serde(default)] + pub tags: Vec, + #[serde(default)] + pub metadata: HashMap, +} + +/// Resolve a single item ID from explicit IDs, tags, or latest item. +/// +/// Returns the first ID if provided, the newest item matching tags, +/// or the newest item overall if neither is specified. +pub fn resolve_item_id( + client: &crate::client::KeepClient, + ids: &[i64], + tags: &[String], +) -> Result { + if !ids.is_empty() { + Ok(ids[0]) + } else if !tags.is_empty() { + let items = client.list_items(tags, "newest", 0, 1, &HashMap::new())?; + if items.is_empty() { + return Err(anyhow!("No items found matching tags: {:?}", tags)); + } + Ok(items[0].id) + } else { + let items = client.list_items(&[], "newest", 0, 1, &HashMap::new())?; + if items.is_empty() { + return Err(anyhow!("No items found")); + } + Ok(items[0].id) + } +} + +/// Resolve item IDs from explicit IDs or tags (multi-item variant). +pub fn resolve_item_ids( + client: &crate::client::KeepClient, + ids: &[i64], + tags: &[String], +) -> Result> { + if !ids.is_empty() { + Ok(ids.to_vec()) + } else if !tags.is_empty() { + let items = client.list_items(tags, "newest", 0, 0, &HashMap::new())?; + if items.is_empty() { + return Err(anyhow!("No items found matching tags: {:?}", tags)); + } + Ok(items.into_iter().map(|i| i.id).collect()) + } else { + let items = client.list_items(&[], "newest", 0, 1, &HashMap::new())?; + if items.is_empty() { + return Err(anyhow!("No items found")); + } + Ok(vec![items[0].id]) + } +} + +/// Check if binary content should be blocked from TTY output. +/// +/// Uses metadata `text` field as fast path, then falls back to byte sampling. +/// Returns Err if content is binary and should not be displayed. +pub fn check_binary_tty( + metadata: &HashMap, + data_sample: &[u8], + force: bool, +) -> Result<()> { + if force || !std::io::stdout().is_terminal() { + return Ok(()); + } + if crate::common::is_binary::is_content_binary_from_metadata(metadata, data_sample) { + return Err(anyhow!( + "Refusing to output binary data to TTY, use --force to override" + )); + } + Ok(()) +} diff --git a/src/modes/export.rs b/src/modes/export.rs new file mode 100644 index 0000000..7681970 --- /dev/null +++ b/src/modes/export.rs @@ -0,0 +1,129 @@ +use anyhow::{anyhow, Context, Result}; +use chrono::{DateTime, Utc}; +use clap::Command; +use log::debug; +use std::collections::HashMap; +use std::fs; +use std::io::{Read, Write}; +use std::path::PathBuf; + +use crate::config; +use crate::filter_plugin::FilterChain; +use crate::modes::common::{sanitize_tags, ExportMeta}; +use crate::services::item_service::ItemService; + +/// Export an item to data and metadata files. +/// +/// If no IDs or tags are specified, exports the latest item. +/// Writes `{basename}.data.{compression}` for raw data and `{basename}.meta.yml` for metadata. +pub fn mode_export( + cmd: &mut Command, + settings: &config::Settings, + ids: &mut [i64], + tags: &mut [String], + conn: &mut rusqlite::Connection, + data_path: PathBuf, + filter_chain: Option, +) -> Result<()> { + if !ids.is_empty() && !tags.is_empty() { + cmd.error( + clap::error::ErrorKind::InvalidValue, + "Both ID and tags given, you must supply either IDs or tags when using --export", + ) + .exit(); + } else if ids.len() > 1 { + cmd.error( + clap::error::ErrorKind::InvalidValue, + "More than one ID given, you must supply exactly one ID when using --export", + ) + .exit(); + } + + let item_service = ItemService::new(data_path.clone()); + let meta_filter: HashMap> = settings + .meta + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let item_with_meta = item_service + .find_item(conn, ids, tags, &meta_filter) + .map_err(|e| anyhow!("Unable to find matching item: {}", e))?; + + let item_id = item_with_meta.item.id.context("Item missing ID")?; + let item_tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); + let meta_map = item_with_meta.meta_as_map(); + + // Build template variables + let mut vars = HashMap::new(); + vars.insert("id".to_string(), item_id.to_string()); + vars.insert("tags".to_string(), sanitize_tags(&item_tags)); + vars.insert( + "ts".to_string(), + item_with_meta + .item + .ts + .format("%Y-%m-%dT%H:%M:%SZ") + .to_string(), + ); + vars.insert( + "compression".to_string(), + item_with_meta.item.compression.clone(), + ); + + let basename = strfmt::strfmt(&settings.export_filename_format, &vars).map_err(|e| { + anyhow!( + "Invalid export filename format '{}': {}", + settings.export_filename_format, + e + ) + })?; + + // Write data file + let data_filename = format!("{}.data.{}", basename, item_with_meta.item.compression); + + let mut item_path = data_path.clone(); + item_path.push(item_id.to_string()); + + if filter_chain.is_some() { + // Apply filters: decompress, filter, write + let (mut reader, _, _) = item_service.get_item_content_info_streaming_with_chain( + conn, + item_id, + filter_chain.as_ref(), + )?; + let mut out_file = fs::File::create(&data_filename) + .with_context(|| format!("Cannot create data file: {}", data_filename))?; + let mut buf = [0u8; 8192]; + loop { + let n = reader.read(&mut buf)?; + if n == 0 { + break; + } + out_file.write_all(&buf[..n])?; + } + debug!("EXPORT: Wrote filtered data to {}", data_filename); + } else { + // Raw copy of compressed file + fs::copy(&item_path, &data_filename) + .with_context(|| format!("Cannot copy {} to {}", item_path.display(), data_filename))?; + debug!("EXPORT: Copied raw data to {}", data_filename); + } + + // Write meta file + let meta_filename = format!("{}.meta.yml", basename); + let export_meta = ExportMeta { + ts: item_with_meta.item.ts, + compression: item_with_meta.item.compression.clone(), + size: item_with_meta.item.size, + tags: item_tags, + metadata: meta_map, + }; + let meta_yaml = serde_yaml::to_string(&export_meta)?; + fs::write(&meta_filename, &meta_yaml) + .with_context(|| format!("Cannot write meta file: {}", meta_filename))?; + debug!("EXPORT: Wrote metadata to {}", meta_filename); + + eprintln!("{} {}", data_filename, meta_filename); + + Ok(()) +} diff --git a/src/modes/import.rs b/src/modes/import.rs new file mode 100644 index 0000000..b99d041 --- /dev/null +++ b/src/modes/import.rs @@ -0,0 +1,146 @@ +use anyhow::{Context, Result, anyhow}; +use chrono::{DateTime, Utc}; +use clap::Command; +use log::debug; +use std::collections::HashMap; +use std::fs; +use std::io::{Read, Write}; +use std::path::PathBuf; +use std::str::FromStr; + +use crate::common::PIPESIZE; +use crate::compression_engine::CompressionType; +use crate::config; +use crate::db; +use crate::modes::common::ImportMeta; + +/// Import an item from a metadata file and optional data file. +/// +/// If `import_data_file` is not provided, reads data from stdin. +pub fn mode_import( + cmd: &mut Command, + settings: &config::Settings, + meta_file: &str, + conn: &mut rusqlite::Connection, + data_path: PathBuf, +) -> Result<()> { + // Read metadata + let meta_yaml = fs::read_to_string(meta_file) + .with_context(|| format!("Cannot read metadata file: {}", meta_file))?; + let import_meta: ImportMeta = serde_yaml::from_str(&meta_yaml) + .with_context(|| format!("Cannot parse metadata file: {}", meta_file))?; + + // Validate compression type + CompressionType::from_str(&import_meta.compression).map_err(|_| { + anyhow!( + "Invalid compression type '{}' in metadata file", + import_meta.compression + ) + })?; + + debug!( + "IMPORT: Parsed meta: ts={}, compression={}, tags={:?}", + import_meta.ts, import_meta.compression, import_meta.tags + ); + + // Create item with original timestamp + let item = db::insert_item_with_ts(conn, import_meta.ts, &import_meta.compression)?; + let item_id = item.id.context("New item missing ID")?; + + debug!( + "IMPORT: Created item {} with compression {}", + item_id, import_meta.compression + ); + + // Set tags + if !import_meta.tags.is_empty() { + db::set_item_tags(conn, item.clone(), &import_meta.tags)?; + debug!("IMPORT: Set {} tags", import_meta.tags.len()); + } + + // Write data to storage using streaming copy + let mut item_path = data_path; + item_path.push(item_id.to_string()); + + let data_size: i64 = if let Some(ref data_file) = settings.import_data_file { + // Stream from file to storage using fixed-size buffers + let mut reader = fs::File::open(data_file) + .with_context(|| format!("Cannot read data file: {}", data_file.display()))?; + let mut writer = fs::File::create(&item_path) + .with_context(|| format!("Cannot create item file: {}", item_path.display()))?; + let mut buf = [0u8; PIPESIZE]; + let mut total = 0i64; + loop { + let n = reader.read(&mut buf)?; + if n == 0 { + break; + } + writer.write_all(&buf[..n])?; + total += n as i64; + } + total + } else { + // Stream from stdin to storage + let mut writer = fs::File::create(&item_path) + .with_context(|| format!("Cannot create item file: {}", item_path.display()))?; + let mut stdin = std::io::stdin().lock(); + let mut buf = [0u8; PIPESIZE]; + let mut total = 0i64; + loop { + let n = stdin.read(&mut buf)?; + if n == 0 { + break; + } + writer.write_all(&buf[..n])?; + total += n as i64; + } + total + }; + + if data_size == 0 { + cmd.error( + clap::error::ErrorKind::InvalidValue, + "No data provided (empty file or stdin)", + ) + .exit(); + } + + debug!( + "IMPORT: Wrote {} bytes to {}", + data_size, + item_path.display() + ); + + // Set metadata + for (key, value) in &import_meta.metadata { + db::query_upsert_meta( + conn, + db::Meta { + id: item_id, + name: key.clone(), + value: value.clone(), + }, + )?; + } + if !import_meta.metadata.is_empty() { + debug!( + "IMPORT: Set {} metadata entries", + import_meta.metadata.len() + ); + } + + // Update item size (use imported size if available, otherwise data length) + let size_to_record = import_meta.size.unwrap_or(data_size); + let mut updated_item = item; + updated_item.size = Some(size_to_record); + db::update_item(conn, updated_item)?; + + if !settings.quiet { + println!( + "KEEP: Imported item {} tags: {:?}", + item_id, import_meta.tags + ); + } + + Ok(()) +} diff --git a/src/modes/mod.rs b/src/modes/mod.rs index 96f6bbe..d9b21b7 100644 --- a/src/modes/mod.rs +++ b/src/modes/mod.rs @@ -9,8 +9,10 @@ pub mod common; pub mod delete; pub mod diff; +pub mod export; pub mod generate_config; pub mod get; +pub mod import; pub mod info; pub mod list; pub mod save; @@ -27,12 +29,18 @@ pub use delete::mode_delete; /// Compares two items and shows differences. pub use diff::mode_diff; +/// Exports an item to data and metadata files. +pub use export::mode_export; + /// Generates a default configuration file. pub use generate_config::mode_generate_config; /// Retrieves and outputs item content. pub use get::mode_get; +/// Imports an item from metadata and data files. +pub use import::mode_import; + /// Displays detailed information about items. pub use info::mode_info; diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index be7e142..e5dcfa1 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -3,9 +3,8 @@ use crate::modes::server::common::{ ApiResponse, AppState, CreateItemQuery, ItemContentQuery, ItemInfo, ItemInfoListResponse, ItemInfoResponse, ItemQuery, ListItemsQuery, MetadataResponse, TagsQuery, }; -use crate::services::async_data_service::AsyncDataService; -use crate::services::data_service::DataService; use crate::services::error::CoreError; +use crate::services::item_service::ItemService; use crate::services::utils::parse_comma_tags; use axum::{ body::Body, @@ -70,45 +69,6 @@ impl Read for MpscReader { } } -// Helper functions to replace the missing binary_detection module -async fn check_binary_content_allowed( - data_service: &AsyncDataService, - item_id: i64, - metadata: &HashMap, - allow_binary: bool, -) -> Result<(), StatusCode> { - if !allow_binary { - let is_binary = is_content_binary(data_service, item_id, metadata).await?; - if is_binary { - return Err(StatusCode::BAD_REQUEST); - } - } - Ok(()) -} - -/// Helper function to determine if content is binary -async fn is_content_binary( - data_service: &AsyncDataService, - item_id: i64, - metadata: &HashMap, -) -> Result { - if let Some(text_val) = metadata.get("text") { - Ok(text_val == "false") - } else { - // If text metadata isn't set, we need to check the content using streaming approach - match data_service - .get_item_content_info_streaming(item_id, None) - .await - { - Ok((_, _, is_binary)) => Ok(is_binary), - Err(e) => { - log::warn!("Failed to get content info for binary check for item {item_id}: {e}"); - Err(StatusCode::INTERNAL_SERVER_ERROR) - } - } - } -} - /// Helper function to get mime type from metadata fn get_mime_type(metadata: &HashMap) -> String { metadata @@ -128,15 +88,6 @@ fn handle_item_error(error: CoreError) -> StatusCode { } } -/// Helper function to create AsyncDataService from AppState -fn create_data_service(state: &AppState) -> AsyncDataService { - AsyncDataService::new( - state.data_dir.clone(), - state.settings.clone(), - state.db.clone(), - ) -} - #[utoipa::path( get, path = "/api/item/", @@ -179,14 +130,23 @@ pub async fn handle_list_items( HashMap::new() }; - let data_service = create_data_service(&state); - let mut items_with_meta = data_service - .list_items(tags, meta_filter) - .await - .map_err(|e| { - warn!("Failed to get items: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; + let data_dir = state.data_dir.clone(); + let db = state.db.clone(); + + let mut items_with_meta = task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_service = ItemService::new(data_dir); + item_service.list_items(&conn, &tags, &meta_filter) + }) + .await + .map_err(|e| { + warn!("Blocking task failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + warn!("Failed to get items: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; // Apply ordering (default is newest first) match params.order.as_deref().unwrap_or("newest") { @@ -232,163 +192,167 @@ pub async fn handle_list_items( /// Handle as_meta=true response by returning JSON with metadata and content async fn handle_as_meta_response( - data_service: &AsyncDataService, + db: &Arc>, + data_dir: &std::path::Path, item_id: i64, offset: u64, length: u64, ) -> Result { - // Get the item with metadata - let item_with_meta = data_service.get_item(item_id).await.map_err(|e| { + let db1 = db.clone(); + let data_dir1 = data_dir.to_path_buf(); + let item_with_meta = task::spawn_blocking(move || { + let conn = db1.blocking_lock(); + let item_service = ItemService::new(data_dir1); + item_service.get_item(&conn, item_id) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { warn!("Failed to get item {item_id} for as_meta content: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; let metadata = item_with_meta.meta_as_map(); - handle_as_meta_response_with_metadata(data_service, item_id, &metadata, offset, length).await + handle_as_meta_response_with_metadata(db, data_dir, item_id, &metadata, offset, length).await } -/// Handle as_meta=true response with pre-fetched metadata using streaming +/// Handle as_meta=true response with pre-fetched metadata using streaming. +/// +/// Reads a sample to check binary status, then streams remaining content with +/// offset/length applied at the stream level — never loads the full item into memory. async fn handle_as_meta_response_with_metadata( - data_service: &AsyncDataService, + db: &Arc>, + data_dir: &std::path::Path, item_id: i64, metadata: &HashMap, offset: u64, length: u64, ) -> Result { - // Check if content is binary - let is_binary = is_content_binary(data_service, item_id, metadata).await?; + // Binary detection: read a sample in a blocking task, check, and return early + let db1 = db.clone(); + let data_dir1 = data_dir.to_path_buf(); + let is_binary = task::spawn_blocking(move || { + let conn = db1.blocking_lock(); + let item_service = ItemService::new(data_dir1); + let (mut reader, _) = item_service.get_item_content_streaming(&conn, item_id)?; + let mut sample = vec![0u8; crate::common::PIPESIZE]; + let n = reader.read(&mut sample)?; + sample.truncate(n); + Ok::(crate::common::is_binary::is_binary(&sample)) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + warn!("Failed to check binary status for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; - // Get the content if it's not binary if is_binary { - // Return JSON with content as None and error message let response_body = serde_json::json!({ "metadata": metadata, "content": serde_json::Value::Null, "error": "Content is binary" }); - - Response::builder() + return Response::builder() .header(header::CONTENT_TYPE, "application/json") .status(StatusCode::UNPROCESSABLE_ENTITY) .body(axum::body::Body::from(response_body.to_string())) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) - } else { - // Use streaming approach to read content - let db = data_service.db(); - let data_path = data_service.data_path().clone(); - let settings = data_service.settings(); + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR); + } - // Get streaming reader from sync service - let (reader, content_len_result) = tokio::task::spawn_blocking(move || { - let mut conn = db.blocking_lock(); - let sync_service = - crate::services::SyncDataService::new(data_path, settings.as_ref().clone()); - let (reader, item_with_meta) = sync_service.get_content(&mut conn, item_id)?; - let content_len = item_with_meta.item.size.unwrap_or(0); - Ok::<_, CoreError>((reader, content_len)) - }) - .await - .map_err(|e| { - warn!("Blocking task failed for item {item_id}: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })? - .map_err(|e| { - warn!("Failed to get content reader for item {item_id}: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; + // Stream content with offset/length applied at the stream level + let db2 = db.clone(); + let data_dir2 = data_dir.to_path_buf(); + let content_str = task::spawn_blocking(move || -> Result { + let conn = db2.blocking_lock(); + let item_service = ItemService::new(data_dir2); + let (mut reader, item_with_meta) = + item_service.get_item_content_streaming(&conn, item_id)?; + let content_len = item_with_meta.item.size.unwrap_or(0) as u64; - let content_len = content_len_result as u64; - - // Calculate offset and length bounds - let start = std::cmp::min(offset, content_len); - let end = if length > 0 { - std::cmp::min(start + length, content_len) - } else { - content_len - }; - let response_len = end - start; - - // Read content with offset and length using fixed-size buffer - let content_str = tokio::task::spawn_blocking(move || { - let mut reader = reader; - let mut buf = [0u8; crate::common::PIPESIZE]; - let mut result = Vec::new(); - let mut bytes_read = 0u64; - - // Skip offset bytes - if offset > 0 { - let mut remaining = offset; - while remaining > 0 { - let to_read = std::cmp::min(remaining, buf.len() as u64) as usize; - match reader.read(&mut buf[..to_read]) { - Ok(0) => break, - Ok(n) => remaining -= n as u64, - Err(e) => return Err(CoreError::Io(e)), - } - } - } - - // Read up to length bytes - let mut remaining = if length > 0 { length } else { u64::MAX }; - while remaining > 0 && bytes_read < response_len { - let to_read = std::cmp::min(remaining, buf.len() as u64) as usize; + // Sample is already consumed by the first task; this is a fresh reader. + // Skip to offset using PIPESIZE buffer + if offset > 0 { + let skip = std::cmp::min(offset, content_len); + let mut skipped = 0u64; + let mut buf = vec![0u8; crate::common::PIPESIZE]; + while skipped < skip { + let to_read = std::cmp::min(skip - skipped, buf.len() as u64) as usize; match reader.read(&mut buf[..to_read]) { Ok(0) => break, - Ok(n) => { - result.extend_from_slice(&buf[..n]); - bytes_read += n as u64; - if length > 0 { - remaining -= n as u64; - } - } + Ok(n) => skipped += n as u64, Err(e) => return Err(CoreError::Io(e)), } } + } - Ok::, CoreError>(result) - }) - .await - .map_err(|e| { - warn!("Blocking task failed for item {item_id}: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })? - .map_err(|e| { - warn!("Failed to read content for item {item_id}: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - // Convert to UTF-8 string - let content_str = match String::from_utf8(content_str) { - Ok(s) => s, - Err(_) => { - // This shouldn't happen since we checked is_binary, but handle it just in case - let response_body = serde_json::json!({ - "metadata": metadata, - "content": serde_json::Value::Null, - "error": "Content is not valid UTF-8" - }); - - let response = Response::builder() - .header(header::CONTENT_TYPE, "application/json") - .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(axum::body::Body::from(response_body.to_string())) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - return Ok(response); - } + // Read up to length bytes (or all if length == 0) + let max_bytes = if length > 0 { + std::cmp::min(length, content_len.saturating_sub(offset)) + } else { + content_len.saturating_sub(offset) }; + let mut result = Vec::with_capacity(std::cmp::min(max_bytes, 64 * 1024) as usize); + let mut buf = vec![0u8; crate::common::PIPESIZE]; + let mut bytes_read = 0u64; + while bytes_read < max_bytes { + let to_read = std::cmp::min(max_bytes - bytes_read, buf.len() as u64) as usize; + match reader.read(&mut buf[..to_read]) { + Ok(0) => break, + Ok(n) => { + result.extend_from_slice(&buf[..n]); + bytes_read += n as u64; + } + Err(e) => return Err(CoreError::Io(e)), + } + } - // Return JSON with metadata and content - let response_body = serde_json::json!({ - "metadata": metadata, - "content": content_str, - "error": serde_json::Value::Null - }); + String::from_utf8(result) + .map_err(|_| CoreError::InvalidInput("Content is not valid UTF-8".to_string())) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; - Response::builder() - .header(header::CONTENT_TYPE, "application/json") - .body(axum::body::Body::from(response_body.to_string())) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) - } + let content_str = match content_str { + Ok(s) => s, + Err(CoreError::InvalidInput(msg)) if msg == "Content is not valid UTF-8" => { + let response_body = serde_json::json!({ + "metadata": metadata, + "content": serde_json::Value::Null, + "error": "Content is not valid UTF-8" + }); + return Response::builder() + .header(header::CONTENT_TYPE, "application/json") + .status(StatusCode::UNPROCESSABLE_ENTITY) + .body(axum::body::Body::from(response_body.to_string())) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR); + } + Err(e) => { + warn!("Failed to read content for item {item_id}: {e}"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + + // Return JSON with metadata and content + let response_body = serde_json::json!({ + "metadata": metadata, + "content": content_str, + "error": serde_json::Value::Null + }); + + Response::builder() + .header(header::CONTENT_TYPE, "application/json") + .body(axum::body::Body::from(response_body.to_string())) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) } #[utoipa::path( @@ -469,10 +433,22 @@ pub async fn handle_post_item( // Parse client-specified compression type (only used when compress=false) let client_compression_type = params.compression_type.as_deref().map(|ct| { - ct.parse::().unwrap_or_else(|_| { - warn!("Unknown compression type from client: {ct}, defaulting to none"); - crate::compression_engine::CompressionType::None - }) + ct.parse::() + .unwrap_or_else(|_| { + warn!("Unknown compression type from client: {ct}, defaulting to none"); + crate::compression_engine::CompressionType::None + }) + }); + + // Parse optional timestamp for import + let import_ts = params.ts.as_deref().and_then(|ts_str| { + match chrono::DateTime::parse_from_rfc3339(ts_str) { + Ok(dt) => Some(dt.with_timezone(&chrono::Utc)), + Err(e) => { + warn!("Invalid timestamp from client: {ts_str}: {e}"); + None + } + } }); // Stream body through an mpsc channel with fixed-size frames. @@ -522,10 +498,9 @@ pub async fn handle_post_item( // Blocking task: consume streaming reader, save via save_item_raw_streaming let item_with_meta = task::spawn_blocking(move || { let mut conn = db.blocking_lock(); - let sync_service = - crate::services::SyncDataService::new(data_dir, settings.as_ref().clone()); + let item_service = crate::services::ItemService::new(data_dir); let mut reader = MpscReader::new(rx); - sync_service.save_item_raw_streaming( + item_service.save_item_raw_streaming( &mut conn, &mut reader, tags, @@ -533,6 +508,8 @@ pub async fn handle_post_item( compress, run_meta, client_compression_type, + import_ts, + &settings, ) }) .await @@ -634,10 +611,21 @@ pub async fn handle_get_item_latest_content( .map(|s| parse_comma_tags(s)) .unwrap_or_default(); - let data_service = create_data_service(&state); + let db = state.db.clone(); + let data_dir = state.data_dir.clone(); // First find the item to get its ID and metadata - let item_with_meta = data_service.find_item(vec![], tags, HashMap::new()).await; + let find_tags = tags; + let item_with_meta = task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_service = ItemService::new(data_dir); + item_service.find_item(&conn, &[], &find_tags, &HashMap::new()) + }) + .await + .map_err(|e| { + warn!("Blocking task failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; match item_with_meta { Ok(item) => { @@ -648,9 +636,9 @@ pub async fn handle_get_item_latest_content( let metadata = item.meta_as_map(); // Handle as_meta parameter if params.as_meta { - // Force stream=false and allow_binary=false for as_meta=true handle_as_meta_response_with_metadata( - &data_service, + &state.db, + &state.data_dir, item_id, &metadata, params.offset, @@ -659,7 +647,8 @@ pub async fn handle_get_item_latest_content( .await } else { stream_item_content_response_with_metadata( - &data_service, + &state.db, + &state.data_dir, item_id, &metadata, params.allow_binary, @@ -722,12 +711,16 @@ pub async fn handle_get_item_content( params.stream, params.allow_binary, params.offset, params.length ); - let data_service = create_data_service(&state); - // Handle as_meta parameter if params.as_meta { // Force stream=false and allow_binary=false for as_meta=true - let result = - handle_as_meta_response(&data_service, item_id, params.offset, params.length).await; + let result = handle_as_meta_response( + &state.db, + &state.data_dir, + item_id, + params.offset, + params.length, + ) + .await; if let Ok(response) = &result { debug!( "ITEM_API: Response content-length: {:?}", @@ -737,7 +730,8 @@ pub async fn handle_get_item_content( result } else { let result = stream_item_content_response( - &data_service, + &state.db, + &state.data_dir, item_id, params.allow_binary, params.offset, @@ -759,7 +753,8 @@ pub async fn handle_get_item_content( #[allow(clippy::too_many_arguments)] async fn stream_item_content_response( - data_service: &AsyncDataService, + db: &Arc>, + data_dir: &std::path::Path, item_id: i64, allow_binary: bool, offset: u64, @@ -770,14 +765,27 @@ async fn stream_item_content_response( ) -> Result { debug!("STREAM_ITEM_CONTENT_RESPONSE: stream={stream}, decompress={decompress}"); // Get the item with metadata once - let item_with_meta = data_service.get_item(item_id).await.map_err(|e| { + let db_clone = db.clone(); + let data_dir_clone = data_dir.to_path_buf(); + let item_with_meta = task::spawn_blocking(move || { + let conn = db_clone.blocking_lock(); + let item_service = ItemService::new(data_dir_clone); + item_service.get_item(&conn, item_id) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { warn!("Failed to get item {item_id} for content: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; let metadata = item_with_meta.meta_as_map(); stream_item_content_response_with_metadata( - data_service, + db, + data_dir, item_id, &metadata, allow_binary, @@ -794,35 +802,57 @@ async fn stream_item_content_response( /// /// Returns the stored file bytes without decompression or filtering using streaming. async fn stream_raw_content_response( - data_service: &AsyncDataService, + db: &Arc>, + data_dir: &std::path::Path, item_id: i64, offset: u64, length: u64, ) -> Result { // Get item info to find the compression type - let item_with_meta = data_service.get_item(item_id).await.map_err(|e| { + let db_clone = db.clone(); + let data_dir_clone = data_dir.to_path_buf(); + let item_with_meta = task::spawn_blocking(move || { + let conn = db_clone.blocking_lock(); + let item_service = ItemService::new(data_dir_clone); + item_service.get_item(&conn, item_id) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { warn!("Failed to get item {item_id} for raw content: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; let compression = item_with_meta.item.compression.clone(); - // Get streaming reader for raw content - let reader = data_service - .get_raw_item_content_reader(item_id) - .await - .map_err(|e| { - warn!("Failed to get raw content reader for item {item_id}: {e}"); - StatusCode::INTERNAL_SERVER_ERROR + // Open file directly for raw content streaming + let item_path = data_dir.join(item_id.to_string()); + let reader: Box = task::spawn_blocking(move || { + let file = std::fs::File::open(&item_path).map_err(|e| { + CoreError::Io(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Item file not found: {item_path:?}: {e}"), + )) })?; + Ok::, CoreError>(Box::new(file)) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + warn!("Failed to get raw content reader for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; // Get the actual file size on disk (raw bytes, not uncompressed size) let file_size = { - let mut data_path = data_service.data_path().clone(); - data_path.push(item_id.to_string()); - std::fs::metadata(&data_path) - .map(|m| m.len()) - .unwrap_or(0) + let item_path = data_dir.join(item_id.to_string()); + std::fs::metadata(&item_path).map(|m| m.len()).unwrap_or(0) }; // Calculate the actual response length @@ -911,7 +941,8 @@ async fn stream_raw_content_response( #[allow(clippy::too_many_arguments)] async fn stream_item_content_response_with_metadata( - data_service: &AsyncDataService, + db: &Arc>, + data_dir: &std::path::Path, item_id: i64, metadata: &HashMap, allow_binary: bool, @@ -925,46 +956,126 @@ async fn stream_item_content_response_with_metadata( // When decompress=false, return raw stored bytes if !decompress { - return stream_raw_content_response(data_service, item_id, offset, length).await; + return stream_raw_content_response(db, data_dir, item_id, offset, length).await; } let mime_type = get_mime_type(metadata); - // Check if content is binary when allow_binary is false - check_binary_content_allowed(data_service, item_id, metadata, allow_binary).await?; + // Check if content is binary when allow_binary is false. + // Uses a sample of actual content bytes (not metadata-only) for reliable detection. + if !allow_binary { + let db_check = db.clone(); + let data_dir_check = data_dir.to_path_buf(); + let is_binary = task::spawn_blocking(move || { + let conn = db_check.blocking_lock(); + let item_service = ItemService::new(data_dir_check); + let (mut reader, _) = item_service.get_item_content_streaming(&conn, item_id)?; + let mut sample = vec![0u8; crate::common::PIPESIZE]; + let n = reader.read(&mut sample)?; + sample.truncate(n); + Ok::(crate::common::is_binary::is_binary(&sample)) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for binary check on item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + warn!("Failed to check binary status for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + if is_binary { + return Err(StatusCode::BAD_REQUEST); + } + } if stream { debug!("STREAMING: Using streaming approach"); - match data_service - .stream_item_content_by_id_with_metadata(item_id, metadata, true, offset, length, None) - .await - { - Ok((stream, _)) => { - let body = axum::body::Body::from_stream(stream); - let response = Response::builder() - .header(header::CONTENT_TYPE, mime_type) - .body(body) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - Ok(response) + let db = db.clone(); + let data_dir = data_dir.to_path_buf(); + let (tx, rx) = mpsc::channel::, std::io::Error>>(16); + + tokio::task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_service = ItemService::new(data_dir); + let (mut reader, _, _) = + match item_service.get_item_content_info_streaming(&conn, item_id, None) { + Ok(r) => r, + Err(e) => { + let _ = tx.blocking_send(Err(std::io::Error::other(format!("{e}")))); + return; + } + }; + + // Apply offset + if offset > 0 { + let mut buf = [0u8; crate::common::PIPESIZE]; + let mut remaining = offset; + while remaining > 0 { + let to_read = std::cmp::min(remaining, buf.len() as u64) as usize; + match reader.read(&mut buf[..to_read]) { + Ok(0) => break, + Ok(n) => remaining -= n as u64, + Err(e) => { + let _ = tx.blocking_send(Err(e)); + return; + } + } + } } - Err(e) => { - warn!("Failed to stream content for item {item_id}: {e}"); - Err(StatusCode::INTERNAL_SERVER_ERROR) + + // Read and send data + let mut buf = [0u8; crate::common::PIPESIZE]; + let mut remaining_length = length; + loop { + let to_read = if length > 0 { + std::cmp::min(remaining_length, buf.len() as u64) as usize + } else { + buf.len() + }; + if to_read == 0 { + break; + } + match reader.read(&mut buf[..to_read]) { + Ok(0) => break, + Ok(n) => { + if tx.blocking_send(Ok(buf[..n].to_vec())).is_err() { + break; + } + if length > 0 { + remaining_length -= n as u64; + if remaining_length == 0 { + break; + } + } + } + Err(e) => { + let _ = tx.blocking_send(Err(e)); + break; + } + } } - } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let body = axum::body::Body::from_stream(stream); + let response = Response::builder() + .header(header::CONTENT_TYPE, mime_type) + .body(body) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(response) } else { debug!("NON-STREAMING: Building full response in memory using streaming reader"); - // Use streaming approach even for non-streaming response - let db = data_service.db(); - let data_path = data_service.data_path().clone(); - let settings = data_service.settings(); + let db = db.clone(); + let data_dir = data_dir.to_path_buf(); - // Get streaming reader from sync service + // Get streaming reader from item service let (reader, content_len_result) = tokio::task::spawn_blocking(move || { - let mut conn = db.blocking_lock(); - let sync_service = - crate::services::SyncDataService::new(data_path, settings.as_ref().clone()); - let (reader, item_with_meta) = sync_service.get_content(&mut conn, item_id)?; + let conn = db.blocking_lock(); + let item_service = ItemService::new(data_dir); + let (reader, item_with_meta) = + item_service.get_item_content_streaming(&conn, item_id)?; let content_len = item_with_meta.item.size.unwrap_or(0); Ok::<_, CoreError>((reader, content_len)) }) @@ -1074,9 +1185,22 @@ pub async fn handle_get_item_latest_meta( .map(|s| parse_comma_tags(s)) .unwrap_or_default(); - let data_service = create_data_service(&state); + let db = state.db.clone(); + let data_dir = state.data_dir.clone(); - match data_service.find_item(vec![], tags, HashMap::new()).await { + let find_tags = tags; + let result = task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_service = ItemService::new(data_dir); + item_service.find_item(&conn, &[], &find_tags, &HashMap::new()) + }) + .await + .map_err(|e| { + warn!("Blocking task failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + match result { Ok(item_with_meta) => { let item_meta = item_with_meta.meta_as_map(); @@ -1121,9 +1245,21 @@ pub async fn handle_get_item_meta( return Err(StatusCode::BAD_REQUEST); } - let data_service = create_data_service(&state); + let db = state.db.clone(); + let data_dir = state.data_dir.clone(); - match data_service.get_item(item_id).await { + let result = task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_service = ItemService::new(data_dir); + item_service.get_item(&conn, item_id) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + match result { Ok(item_with_meta) => { let item_meta = item_with_meta.meta_as_map(); @@ -1148,24 +1284,31 @@ pub async fn handle_post_item_meta( return Err(StatusCode::BAD_REQUEST); } - let data_service = create_data_service(&state); + let db = state.db.clone(); + let data_dir = state.data_dir.clone(); + let meta = metadata.clone(); - // Verify item exists - data_service - .get_item(item_id) - .await - .map_err(handle_item_error)?; + // Verify item exists and add metadata in a single blocking task + task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_service = ItemService::new(data_dir); + item_service + .get_item(&conn, item_id) + .map_err(handle_item_error)?; - // Add each metadata entry - for (key, value) in &metadata { - data_service - .add_item_meta(item_id, key, value) - .await - .map_err(|e| { + for (key, value) in &meta { + crate::db::add_meta(&conn, item_id, key, value).map_err(|e| { warn!("Failed to add metadata {key} for item {item_id}: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; - } + } + Ok::<(), StatusCode>(()) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })??; let response = ApiResponse { success: true, @@ -1201,16 +1344,20 @@ pub async fn handle_delete_item( return Err(StatusCode::BAD_REQUEST); } - let mut conn = state.db.lock().await; + let db = state.db.clone(); + let data_dir = state.data_dir.clone(); - let sync_service = crate::services::SyncDataService::new( - state.data_dir.clone(), - state.settings.as_ref().clone(), - ); - - let deleted_item = sync_service - .delete_item(&mut conn, item_id) - .map_err(handle_item_error)?; + let deleted_item = task::spawn_blocking(move || { + let mut conn = db.blocking_lock(); + let item_service = crate::services::ItemService::new(data_dir); + item_service.delete_item(&mut conn, item_id) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for delete item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(handle_item_error)?; let item_info = ItemInfo { id: deleted_item.id.ok_or_else(|| { @@ -1258,16 +1405,20 @@ pub async fn handle_get_item_info( return Err(StatusCode::BAD_REQUEST); } - let mut conn = state.db.lock().await; + let db = state.db.clone(); + let data_dir = state.data_dir.clone(); - let sync_service = crate::services::SyncDataService::new( - state.data_dir.clone(), - state.settings.as_ref().clone(), - ); - - let item_with_meta = sync_service - .get_item(&mut conn, item_id) - .map_err(handle_item_error)?; + let item_with_meta = task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_service = crate::services::ItemService::new(data_dir); + item_service.get_item(&conn, item_id) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for get item info {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(handle_item_error)?; let tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); let metadata = item_with_meta.meta_as_map(); @@ -1326,82 +1477,96 @@ pub async fn handle_diff_items( State(state): State, Query(query): Query, ) -> Result>>, StatusCode> { - let mut conn = state.db.lock().await; + let db = state.db.clone(); + let data_dir = state.data_dir.clone(); + let id_a_param = query.id_a; + let id_b_param = query.id_b; + let tag_a_param = query.tag_a; + let tag_b_param = query.tag_b; - let sync_service = crate::services::SyncDataService::new( - state.data_dir.clone(), - state.settings.as_ref().clone(), - ); + let diff_lines = task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_service = crate::services::ItemService::new(data_dir); - let item_a = if let Some(id_a) = query.id_a { - sync_service - .get_item(&mut conn, id_a) - .map_err(handle_item_error)? - } else if let Some(tag) = &query.tag_a { - sync_service - .find_item(&mut conn, vec![], vec![tag.clone()], HashMap::new()) - .map_err(handle_item_error)? - } else { - return Err(StatusCode::BAD_REQUEST); - }; + let item_a = if let Some(id_a) = id_a_param { + item_service + .get_item(&conn, id_a) + .map_err(handle_item_error)? + } else if let Some(tag) = &tag_a_param { + #[allow(clippy::cloned_ref_to_slice_refs)] + let tags = &[tag.clone()]; + item_service + .find_item(&conn, &[], tags, &HashMap::new()) + .map_err(handle_item_error)? + } else { + return Err(StatusCode::BAD_REQUEST); + }; - let item_b = if let Some(id_b) = query.id_b { - sync_service - .get_item(&mut conn, id_b) - .map_err(handle_item_error)? - } else if let Some(tag) = &query.tag_b { - sync_service - .find_item(&mut conn, vec![], vec![tag.clone()], HashMap::new()) - .map_err(handle_item_error)? - } else { - return Err(StatusCode::BAD_REQUEST); - }; + let item_b = if let Some(id_b) = id_b_param { + item_service + .get_item(&conn, id_b) + .map_err(handle_item_error)? + } else if let Some(tag) = &tag_b_param { + #[allow(clippy::cloned_ref_to_slice_refs)] + let tags = &[tag.clone()]; + item_service + .find_item(&conn, &[], tags, &HashMap::new()) + .map_err(handle_item_error)? + } else { + return Err(StatusCode::BAD_REQUEST); + }; - let id_a = item_a.item.id.ok_or(StatusCode::BAD_REQUEST)?; - let id_b = item_b.item.id.ok_or(StatusCode::BAD_REQUEST)?; + let id_a = item_a.item.id.ok_or(StatusCode::BAD_REQUEST)?; + let id_b = item_b.item.id.ok_or(StatusCode::BAD_REQUEST)?; - // Size limit for diff operation (10MB per item) - const MAX_DIFF_SIZE: i64 = 10 * 1024 * 1024; + // Size limit for diff operation (10MB per item) + const MAX_DIFF_SIZE: i64 = 10 * 1024 * 1024; - if let Some(size_a) = item_a.item.size - && size_a > MAX_DIFF_SIZE - { - warn!( - "Item A ({}) exceeds diff size limit: {} > {}", - id_a, size_a, MAX_DIFF_SIZE - ); - return Err(StatusCode::PAYLOAD_TOO_LARGE); - } - if let Some(size_b) = item_b.item.size - && size_b > MAX_DIFF_SIZE - { - warn!( - "Item B ({}) exceeds diff size limit: {} > {}", - id_b, size_b, MAX_DIFF_SIZE - ); - return Err(StatusCode::PAYLOAD_TOO_LARGE); - } + if let Some(size_a) = item_a.item.size + && size_a > MAX_DIFF_SIZE + { + warn!( + "Item A ({}) exceeds diff size limit: {} > {}", + id_a, size_a, MAX_DIFF_SIZE + ); + return Err(StatusCode::PAYLOAD_TOO_LARGE); + } + if let Some(size_b) = item_b.item.size + && size_b > MAX_DIFF_SIZE + { + warn!( + "Item B ({}) exceeds diff size limit: {} > {}", + id_b, size_b, MAX_DIFF_SIZE + ); + return Err(StatusCode::PAYLOAD_TOO_LARGE); + } - let (mut reader_a, _) = sync_service - .get_content(&mut conn, id_a) - .map_err(handle_item_error)?; - let (mut reader_b, _) = sync_service - .get_content(&mut conn, id_b) - .map_err(handle_item_error)?; + let (mut reader_a, _) = item_service + .get_item_content_streaming(&conn, id_a) + .map_err(handle_item_error)?; + let (mut reader_b, _) = item_service + .get_item_content_streaming(&conn, id_b) + .map_err(handle_item_error)?; - let mut content_a = Vec::new(); - reader_a.read_to_end(&mut content_a).map_err(|e| { - log::error!("Failed to read content A: {e}"); + let mut content_a = Vec::new(); + reader_a.read_to_end(&mut content_a).map_err(|e| { + log::error!("Failed to read content A: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let mut content_b = Vec::new(); + reader_b.read_to_end(&mut content_b).map_err(|e| { + log::error!("Failed to read content B: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok::, StatusCode>(compute_diff(&content_a, &content_b)) + }) + .await + .map_err(|e| { + warn!("Blocking task failed for diff items: {e}"); StatusCode::INTERNAL_SERVER_ERROR - })?; - - let mut content_b = Vec::new(); - reader_b.read_to_end(&mut content_b).map_err(|e| { - log::error!("Failed to read content B: {e}"); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - let diff_lines = compute_diff(&content_a, &content_b); + })??; let response = ApiResponse { success: true, @@ -1491,82 +1656,92 @@ pub async fn handle_update_item( .map(crate::services::utils::parse_comma_tags) .unwrap_or_default(); - let mut conn = state.db.lock().await; + let db = state.db.clone(); + let data_dir = state.data_dir.clone(); + let settings = state.settings.clone(); + let size_param = params.size; - let sync_service = crate::services::sync_data_service::SyncDataService::new( - state.data_dir.clone(), - state.settings.as_ref().clone(), - ); + let item_info = task::spawn_blocking(move || { + let mut conn = db.blocking_lock(); + let item_service = crate::services::ItemService::new(data_dir); - // If only size is being set (no plugins/metadata/tags), do a lightweight update - #[allow(clippy::collapsible_if)] - if let Some(size) = params.size { - if plugin_names.is_empty() && metadata.is_empty() && tags.is_empty() { - return match crate::db::get_item(&conn, item_id) { - Ok(Some(mut item)) => { - item.size = Some(size); - if let Err(e) = crate::db::update_item(&conn, item) { - warn!("Failed to update item size: {e}"); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - match sync_service.get_item(&mut conn, item_id) { - Ok(iwm) => { - let tags: Vec = - iwm.tags.iter().map(|t| t.name.clone()).collect(); - let metadata = iwm.meta_as_map(); - Ok(Json(ApiResponse { - success: true, - data: Some(ItemInfo { + // If only size is being set (no plugins/metadata/tags), do a lightweight update + #[allow(clippy::collapsible_if)] + if let Some(size) = size_param { + if plugin_names.is_empty() && metadata.is_empty() && tags.is_empty() { + return match crate::db::get_item(&conn, item_id) { + Ok(Some(mut item)) => { + item.size = Some(size); + if let Err(e) = crate::db::update_item(&conn, item) { + warn!("Failed to update item size: {e}"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + match item_service.get_item(&conn, item_id) { + Ok(iwm) => { + let tags: Vec = + iwm.tags.iter().map(|t| t.name.clone()).collect(); + let metadata = iwm.meta_as_map(); + Ok(ItemInfo { id: item_id, ts: iwm.item.ts.to_rfc3339(), size: iwm.item.size, compression: iwm.item.compression.clone(), tags, metadata, - }), - error: None, - })) + }) + } + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), } - Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), } - } - Ok(None) => Err(StatusCode::NOT_FOUND), - Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), - }; + Ok(None) => Err(StatusCode::NOT_FOUND), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + }; + } } - } - let result = - sync_service.update_item_plugins(&mut conn, item_id, &plugin_names, metadata, &tags); + let result = item_service.update_item_plugins( + &mut conn, + item_id, + &plugin_names, + metadata, + &tags, + &settings, + ); - match result { - Ok(item_with_meta) => { - let tags: Vec = - item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); - let metadata = item_with_meta.meta_as_map(); + match result { + Ok(item_with_meta) => { + let tags: Vec = + item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); + let metadata = item_with_meta.meta_as_map(); - let item_info = ItemInfo { - id: item_with_meta.item.id.ok_or_else(|| { - warn!("Item missing ID"); - StatusCode::INTERNAL_SERVER_ERROR - })?, - ts: item_with_meta.item.ts.to_rfc3339(), - size: item_with_meta.item.size, - compression: item_with_meta.item.compression.clone(), - tags, - metadata, - }; - - Ok(Json(ApiResponse { - success: true, - data: Some(item_info), - error: None, - })) + Ok(ItemInfo { + id: item_with_meta.item.id.ok_or_else(|| { + warn!("Item missing ID"); + StatusCode::INTERNAL_SERVER_ERROR + })?, + ts: item_with_meta.item.ts.to_rfc3339(), + size: item_with_meta.item.size, + compression: item_with_meta.item.compression.clone(), + tags, + metadata, + }) + } + Err(CoreError::ItemNotFound(_)) => Err(StatusCode::NOT_FOUND), + Err(e) => { + warn!("Failed to update item {item_id}: {e}"); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } } - Err(CoreError::ItemNotFound(_)) => Err(StatusCode::NOT_FOUND), - Err(e) => { - warn!("Failed to update item {item_id}: {e}"); - Err(StatusCode::INTERNAL_SERVER_ERROR) - } - } + }) + .await + .map_err(|e| { + warn!("Blocking task failed for update item {item_id}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })??; + + Ok(Json(ApiResponse { + success: true, + data: Some(item_info), + error: None, + })) } diff --git a/src/modes/server/common.rs b/src/modes/server/common.rs index e3951c3..e43ea2f 100644 --- a/src/modes/server/common.rs +++ b/src/modes/server/common.rs @@ -644,6 +644,10 @@ pub struct CreateItemQuery { /// Only used when compress=false — tells the server what compression /// the client applied so the correct type is recorded in the database. pub compression_type: Option, + /// Optional timestamp for the item (RFC 3339 format). + /// Used during import to preserve the original item's timestamp. + /// If not provided, the server uses the current time. + pub ts: Option, } /// Query parameters for updating item metadata via POST. diff --git a/src/services/async_data_service.rs b/src/services/async_data_service.rs deleted file mode 100644 index 5588637..0000000 --- a/src/services/async_data_service.rs +++ /dev/null @@ -1,284 +0,0 @@ -use crate::common::status::StatusInfo; -use crate::config::Settings; -use crate::db::Item; -use crate::db::Meta; -use crate::services::data_service::DataService; -use crate::services::error::CoreError; -use crate::services::types::{ItemWithContent, ItemWithMeta}; -use clap::Command; -use futures::Stream; -use rusqlite::Connection; -use std::collections::HashMap; -use std::io::Read; -use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::sync::Arc; -use tokio::sync::Mutex; - -pub struct AsyncDataService { - data_path: PathBuf, - settings: Arc, - db: Arc>, - sync_service: crate::services::SyncDataService, -} - -impl AsyncDataService { - pub fn new(data_path: PathBuf, settings: Arc, db: Arc>) -> Self { - let sync_service = - crate::services::SyncDataService::new(data_path.clone(), settings.as_ref().clone()); - Self { - data_path, - settings, - db, - sync_service, - } - } - - pub fn data_path(&self) -> &PathBuf { - &self.data_path - } - - pub fn settings(&self) -> Arc { - self.settings.clone() - } - - pub fn db(&self) -> Arc> { - self.db.clone() - } - - pub async fn get_item(&self, id: i64) -> Result { - let mut conn = self.db.lock().await; - self.get(&mut conn, id) - } - - pub async fn add_item_meta( - &self, - item_id: i64, - name: &str, - value: &str, - ) -> Result<(), CoreError> { - let conn = self.db.lock().await; - crate::db::add_meta(&conn, item_id, name, value)?; - Ok(()) - } - - pub async fn list_items( - &self, - tags: Vec, - meta: HashMap>, - ) -> Result, CoreError> { - let mut conn = self.db.lock().await; - self.list(&mut conn, tags, meta) - } - - pub async fn find_item( - &self, - ids: Vec, - tags: Vec, - meta: HashMap>, - ) -> Result { - let mut conn = self.db.lock().await; - DataService::find_item(self, &mut conn, ids, tags, meta) - } - - pub async fn get_item_content_info_streaming( - &self, - id: i64, - _filter: Option, - ) -> Result< - ( - Pin, CoreError>> + Send>>, - ItemWithMeta, - bool, - ), - CoreError, - > { - let mut conn = self.db.lock().await; - let (reader, item_with_meta) = self.get_content(&mut conn, id)?; - let is_binary = item_with_meta - .meta - .iter() - .find(|m| m.name == "text") - .map(|m| m.value == "false") - .unwrap_or(false); - - // Convert reader to stream with optimized buffer reuse - let stream = async_stream::stream! { - let mut reader = reader; - let mut buf = [0u8; 8192]; - loop { - match reader.read(&mut buf) { - Ok(0) => break, - Ok(n) => yield Ok(buf[..n].to_vec()), - Err(e) => yield Err(CoreError::from(e)), - } - } - }; - - Ok((Box::pin(stream), item_with_meta, is_binary)) - } - - pub async fn stream_item_content_by_id_with_metadata( - &self, - id: i64, - _metadata: &HashMap, - _force_text: bool, - offset: u64, - length: u64, - _filter: Option, - ) -> Result< - ( - Pin, std::io::Error>> + Send>>, - u64, - ), - CoreError, - > { - let mut conn = self.db.lock().await; - let (mut reader, _item_with_meta) = self.get_content(&mut conn, id)?; - - // Skip bytes for offset - if offset > 0 { - let mut skip_buf = [0u8; 8192]; - let mut remaining = offset; - while remaining > 0 { - let to_read = std::cmp::min(8192, remaining as usize); - let n = reader.read(&mut skip_buf[..to_read])?; - if n == 0 { - break; - } - remaining -= n as u64; - } - } - - let content_length = if length > 0 { length } else { u64::MAX }; - - // Optimized stream that reuses a single buffer for reading - let stream = async_stream::stream! { - let mut reader = reader; - let mut remaining = content_length; - let mut buf = [0u8; 8192]; - - while remaining > 0 { - let to_read = std::cmp::min(8192, remaining as usize); - - match reader.read(&mut buf[..to_read]) { - Ok(0) => break, - Ok(n) => { - remaining -= n as u64; - yield Ok(buf[..n].to_vec()); - } - Err(e) => { - yield Err(e); - break; - } - } - } - }; - - Ok((Box::pin(stream), content_length)) - } - - /// Get raw item content without decompression as a streaming reader. - /// - /// Opens the stored file directly from disk, bypassing decompression. - /// Used when the client requests raw bytes with `decompress=false`. - /// Returns a boxed reader that can be used for streaming. - pub async fn get_raw_item_content_reader( - &self, - id: i64, - ) -> Result, CoreError> { - let data_path = self.data_path.clone(); - - tokio::task::spawn_blocking(move || { - let mut item_path = data_path; - item_path.push(id.to_string()); - - let file = std::fs::File::open(&item_path).map_err(|e| { - CoreError::Io(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Item file not found: {item_path:?}: {e}"), - )) - })?; - - Ok(Box::new(file) as Box) - }) - .await - .map_err(|e| CoreError::Other(anyhow::anyhow!("Task join error: {}", e)))? - } -} - -impl DataService for AsyncDataService { - type Error = CoreError; - - fn save( - &self, - content: R, - cmd: &mut Command, - settings: &Settings, - tags: Vec, - conn: &mut Connection, - ) -> Result { - self.sync_service.save(content, cmd, settings, tags, conn) - } - - fn get(&self, conn: &mut Connection, id: i64) -> Result { - self.sync_service.get(conn, id) - } - - fn get_content( - &self, - conn: &mut Connection, - id: i64, - ) -> Result<(Box, ItemWithMeta), Self::Error> { - self.sync_service.get_content(conn, id) - } - - fn list( - &self, - conn: &mut Connection, - tags: Vec, - meta: HashMap>, - ) -> Result, Self::Error> { - self.sync_service.list(conn, tags, meta) - } - - fn delete(&self, conn: &mut Connection, id: i64) -> Result { - self.sync_service.delete(conn, id) - } - - fn find_item( - &self, - conn: &mut Connection, - ids: Vec, - tags: Vec, - meta: HashMap>, - ) -> Result { - self.sync_service.find_item(conn, ids, tags, meta) - } - - fn get_items( - &self, - conn: &mut Connection, - ids: &[i64], - tags: &[String], - meta: &HashMap>, - ) -> Result, Self::Error> { - self.sync_service.get_items(conn, ids, tags, meta) - } - - fn generate_status( - &self, - settings: &Settings, - data_path: &Path, - db_path: &Path, - ) -> Result { - let mut cmd = Command::new("keep"); - let status_service = crate::services::StatusService::new(); - Ok(status_service.generate_status( - &mut cmd, - settings, - data_path.to_path_buf(), - db_path.to_path_buf(), - )?) - } -} diff --git a/src/services/async_item_service.rs b/src/services/async_item_service.rs deleted file mode 100644 index a2f085e..0000000 --- a/src/services/async_item_service.rs +++ /dev/null @@ -1,390 +0,0 @@ -/// Asynchronous service wrapper for `ItemService`. -/// -/// Uses `tokio::task::spawn_blocking` to offload synchronous operations (DB/FS) -/// to a blocking thread pool, allowing non-blocking async usage in servers. -use crate::common::PIPESIZE; -use crate::config::Settings; -use crate::services::error::CoreError; -use crate::services::item_service::ItemService; -use crate::services::types::{ItemWithContent, ItemWithMeta}; -use clap::Command; -use rusqlite::Connection; -use std::collections::HashMap; -use std::io::Read; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::sync::Mutex; - -/// An asynchronous wrapper around the `ItemService` for use in async contexts like the web server. -/// It uses `tokio::task::spawn_blocking` to run synchronous database and filesystem operations -/// on a dedicated thread pool, preventing them from blocking the async runtime. -#[allow(dead_code)] -/// Async wrapper for ItemService operations. -pub struct AsyncItemService { - pub data_dir: PathBuf, - db: Arc>, - item_service: Arc, - cmd: Arc>, - settings: Arc, -} - -#[allow(dead_code)] -impl AsyncItemService { - /// Creates a new `AsyncItemService`. - /// - /// # Arguments - /// - /// * `data_dir` - Path to data directory. - /// * `db` - Arc-wrapped mutex for DB connection. - /// * `item_service` - Arc-wrapped ItemService. - /// * `cmd` - Arc-wrapped mutex for Clap command. - /// * `settings` - Arc-wrapped settings. - /// - /// # Returns - /// - /// A new `AsyncItemService`. - pub fn new( - data_dir: PathBuf, - db: Arc>, - item_service: Arc, - cmd: Arc>, - settings: Arc, - ) -> Self { - Self { - data_dir, - db, - item_service, - cmd, - settings, - } - } - - /// Internal helper to execute synchronous operations in a blocking task. - /// - /// Spawns a blocking task with the DB connection and ItemService. - /// - /// # Type Parameters - /// - /// * `F` - Closure type. - /// * `T` - Return type. - /// - /// # Arguments - /// - /// * `f` - The synchronous closure to execute. - /// - /// # Returns - /// - /// Result of the closure, or CoreError on task failure. - async fn execute_blocking(&self, f: F) -> Result - where - F: FnOnce(&Connection, &ItemService) -> Result + Send + 'static, - T: Send + 'static, - { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - f(&conn, &item_service) - }) - .await - .map_err(|e| CoreError::Other(anyhow::anyhow!("Blocking task failed: {}", e)))? - } - - pub async fn get_item(&self, id: i64) -> Result { - self.execute_blocking(move |conn, item_service| item_service.get_item(conn, id)) - .await - } - - pub async fn get_item_content(&self, id: i64) -> Result { - self.execute_blocking(move |conn, item_service| item_service.get_item_content(conn, id)) - .await - } - - pub async fn stream_item_content_by_id( - &self, - item_id: i64, - allow_binary: bool, - offset: u64, - length: u64, - ) -> Result< - ( - std::pin::Pin< - Box< - dyn tokio_stream::Stream< - Item = Result, - > + Send, - >, - >, - String, - ), - CoreError, - > { - // Use streaming approach: get reader and stream chunks in requested range - let (reader, mime_type, is_binary) = self - .execute_blocking(move |conn, item_service| { - item_service.get_item_content_info_streaming(conn, item_id, None) - }) - .await?; - - // Check if content is binary when allow_binary is false - if !allow_binary && is_binary { - return Err(CoreError::InvalidInput( - "Binary content not allowed".to_string(), - )); - } - - // Convert the reader into an async stream with offset and length applied - use tokio_util::bytes::Bytes; - - // Create a channel to stream data between the blocking thread and async runtime - let (tx, rx) = tokio::sync::mpsc::channel::>(16); - - // Spawn a blocking task to read from the reader and send chunks - tokio::task::spawn_blocking(move || { - let mut reader = reader; - let mut buf = [0u8; PIPESIZE]; - - // Apply offset by reading and discarding bytes - if offset > 0 { - let mut remaining = offset; - while remaining > 0 { - let to_read = std::cmp::min(remaining, buf.len() as u64) as usize; - match reader.read(&mut buf[..to_read]) { - Ok(0) => break, // EOF reached before offset - Ok(n) => remaining -= n as u64, - Err(e) => { - let _ = tx.blocking_send(Err(e)); - return; - } - } - } - } - - // Read and send data up to the specified length - let mut remaining_length = length; - - loop { - // Determine how much to read in this iteration - let to_read = if length > 0 { - // If length is specified, don't read more than remaining_length - std::cmp::min(remaining_length, buf.len() as u64) as usize - } else { - buf.len() - }; - - if to_read == 0 { - break; // We've read the requested length - } - - match reader.read(&mut buf[..to_read]) { - Ok(0) => break, // EOF - Ok(n) => { - let chunk = Bytes::copy_from_slice(&buf[..n]); - // Block on sending to the channel - if tx.blocking_send(Ok(chunk)).is_err() { - break; // Receiver dropped - } - if length > 0 { - remaining_length -= n as u64; - if remaining_length == 0 { - break; // Reached the requested length - } - } - } - Err(e) => { - let _ = tx.blocking_send(Err(e)); - break; - } - } - } - }); - - // Convert the receiver into a stream - let stream = tokio_stream::wrappers::ReceiverStream::new(rx); - - Ok((Box::pin(stream), mime_type)) - } - - pub async fn stream_item_content_by_id_with_metadata( - &self, - item_id: i64, - metadata: &HashMap, - allow_binary: bool, - offset: u64, - length: u64, - filter: Option, - ) -> Result< - ( - std::pin::Pin< - Box< - dyn tokio_stream::Stream< - Item = Result, - > + Send, - >, - >, - String, - ), - CoreError, - > { - // Use provided metadata to determine MIME type and binary status - let mime_type = metadata - .get("mime_type") - .map(|s| s.to_string()) - .unwrap_or_else(|| "application/octet-stream".to_string()); - - // Check if content is binary when allow_binary is false - if !allow_binary { - let is_binary = if let Some(text_val) = metadata.get("text") { - text_val == "false" - } else { - // Get binary status using streaming approach - let (_, _, is_binary) = self.get_item_content_info_streaming(item_id, None).await?; - is_binary - }; - - if is_binary { - return Err(CoreError::InvalidInput( - "Binary content not allowed".to_string(), - )); - } - } - - // Get a streaming reader for the content with filtering applied - let reader = { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - let filter = filter.clone(); - tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - item_service - .get_item_content_info_streaming(&conn, item_id, filter) - .map(|(reader, _, _)| reader) - }) - .await - .map_err(|e| CoreError::Other(anyhow::anyhow!("Blocking task failed: {}", e)))? - }; - - // Convert the reader into an async stream manually - use tokio_util::bytes::Bytes; - - // Create a channel to stream data between the blocking thread and async runtime - let (tx, rx) = tokio::sync::mpsc::channel(1); - - // Spawn a blocking task to read from the reader and send chunks - tokio::task::spawn_blocking(move || { - let mut reader = reader; - // Apply offset by reading and discarding bytes - if offset > 0 { - let mut remaining = offset; - let mut buf = [0; PIPESIZE]; - while remaining > 0 { - let to_read = std::cmp::min(remaining, buf.len() as u64); - match reader.as_mut().unwrap().read(&mut buf[..to_read as usize]) { - Ok(0) => break, // EOF reached before offset - Ok(n) => remaining -= n as u64, - Err(e) => { - let _ = tx.blocking_send(Err(e)); - return; - } - } - } - } - - // Read and send data up to the specified length - let mut remaining_length = length; - let mut buffer = [0; PIPESIZE]; - - loop { - // Determine how much to read in this iteration - let to_read = if length > 0 { - // If length is specified, don't read more than remaining_length - std::cmp::min(remaining_length, buffer.len() as u64) as usize - } else { - buffer.len() - }; - - if to_read == 0 { - break; // We've read the requested length - } - - match reader.as_mut().unwrap().read(&mut buffer[..to_read]) { - Ok(0) => break, // EOF - Ok(n) => { - let chunk = Bytes::copy_from_slice(&buffer[..n]); - // Block on sending to the channel - if tx.blocking_send(Ok(chunk)).is_err() { - break; // Receiver dropped - } - if length > 0 { - remaining_length -= n as u64; - if remaining_length == 0 { - break; // Reached the requested length - } - } - } - Err(e) => { - let _ = tx.blocking_send(Err(e)); - break; - } - } - } - }); - - // Convert the receiver into a stream - let stream = tokio_stream::wrappers::ReceiverStream::new(rx); - - Ok((Box::pin(stream), mime_type)) - } - - pub async fn get_item_content_info_streaming( - &self, - item_id: i64, - filter: Option, - ) -> Result<(Box, String, bool), CoreError> { - self.execute_blocking(move |conn, item_service| { - item_service.get_item_content_info_streaming(conn, item_id, filter) - }) - .await - } - - pub async fn find_item( - &self, - ids: Vec, - tags: Vec, - meta: HashMap>, - ) -> Result { - let ids_clone = ids.clone(); - let tags_clone = tags.clone(); - let meta_clone = meta.clone(); - self.execute_blocking(move |conn, item_service| { - item_service.find_item(conn, &ids_clone, &tags_clone, &meta_clone) - }) - .await - } - - pub async fn list_items( - &self, - tags: Vec, - meta: HashMap>, - ) -> Result, CoreError> { - let tags_clone = tags.clone(); - let meta_clone = meta.clone(); - self.execute_blocking(move |conn, item_service| { - item_service.list_items(conn, &tags_clone, &meta_clone) - }) - .await - } - - pub async fn delete_item(&self, id: i64) -> Result<(), CoreError> { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let mut conn = db.blocking_lock(); - item_service.delete_item(&mut conn, id) - }) - .await - .map_err(|e| CoreError::Other(anyhow::anyhow!("task join error: {e}")))? - } -} diff --git a/src/services/data_service.rs b/src/services/data_service.rs deleted file mode 100644 index b30c895..0000000 --- a/src/services/data_service.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::common::status::StatusInfo; -use crate::config::Settings; -use crate::db::Item; -use crate::services::error::CoreError; -use crate::services::types::{ItemWithContent, ItemWithMeta}; -use clap::Command; -use rusqlite::Connection; -use std::collections::HashMap; -use std::io::Read; -use std::path::Path; - -pub trait DataService { - type Error; - - fn save( - &self, - content: R, - cmd: &mut Command, - settings: &Settings, - tags: Vec, - conn: &mut Connection, - ) -> Result; - - fn get(&self, conn: &mut Connection, id: i64) -> Result; - - fn get_content( - &self, - conn: &mut Connection, - id: i64, - ) -> Result<(Box, ItemWithMeta), Self::Error>; - - fn list( - &self, - conn: &mut Connection, - tags: Vec, - meta: HashMap>, - ) -> Result, Self::Error>; - - fn delete(&self, conn: &mut Connection, id: i64) -> Result; - - fn find_item( - &self, - conn: &mut Connection, - ids: Vec, - tags: Vec, - meta: HashMap>, - ) -> Result; - - fn get_items( - &self, - conn: &mut Connection, - ids: &[i64], - tags: &[String], - meta: &HashMap>, - ) -> Result, Self::Error>; - - fn generate_status( - &self, - settings: &Settings, - data_path: &Path, - db_path: &Path, - ) -> Result; -} diff --git a/src/services/item_service.rs b/src/services/item_service.rs index c1b5718..579b19d 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -8,12 +8,14 @@ use crate::services::error::CoreError; use crate::services::filter_service::FilterService; use crate::services::meta_service::MetaService; use crate::services::types::{ItemWithContent, ItemWithMeta}; +use chrono::DateTime; +use chrono::Utc; use clap::Command; use log::debug; use rusqlite::Connection; use std::collections::HashMap; use std::fs; -use std::io::{IsTerminal, Read, Write}; +use std::io::{Cursor, IsTerminal, Read, Write}; use std::path::PathBuf; /// Service for managing items in the Keep application. @@ -530,7 +532,7 @@ impl ItemService { /// ```ignore /// item_service.delete_item(&mut conn, 1)?; /// ``` - pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<(), CoreError> { + pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result { debug!("ITEM_SERVICE: Deleting item with id: {id}"); if id <= 0 { return Err(CoreError::InvalidInput(format!("Invalid item ID: {id}"))); @@ -542,6 +544,7 @@ impl ItemService { item_path.push(id.to_string()); debug!("ITEM_SERVICE: Deleting file at path: {item_path:?}"); + let deleted_item = item.clone(); db::delete_item(conn, item)?; fs::remove_file(&item_path).or_else(|e| { if e.kind() == std::io::ErrorKind::NotFound { @@ -552,7 +555,7 @@ impl ItemService { })?; debug!("ITEM_SERVICE: Successfully deleted item {id}"); - Ok(()) + Ok(deleted_item) } /// Saves content from a reader to a new item. @@ -723,6 +726,270 @@ impl ItemService { pub fn get_data_path(&self) -> &PathBuf { &self.data_path } + + /// Returns a streaming reader and item metadata for the given item. + pub fn get_item_content_streaming( + &self, + conn: &Connection, + id: i64, + ) -> Result<(Box, ItemWithMeta), CoreError> { + let (reader, _mime, _is_binary) = self.get_item_content_info_streaming(conn, id, None)?; + let item_with_meta = self.get_item(conn, id)?; + Ok((reader, item_with_meta)) + } + + /// Fetches multiple items by ID, silently skipping not-found items. + /// Falls back to `list_items` if the ID list is empty. + pub fn get_items( + &self, + conn: &Connection, + ids: &[i64], + tags: &[String], + meta: &HashMap>, + ) -> Result, CoreError> { + if ids.is_empty() { + return self.list_items(conn, tags, meta); + } + + let mut results = Vec::new(); + for id in ids { + match self.get_item(conn, *id) { + Ok(item) => results.push(item), + Err(CoreError::ItemNotFound(_)) => continue, + Err(e) => return Err(e), + } + } + Ok(results) + } + + /// Save an item with granular control over compression and meta plugins. + /// + /// This method allows callers to control whether compression and meta plugins + /// run server-side or were already handled by the client. + /// + /// # Arguments + /// + /// * `conn` - Database connection. + /// * `content` - Raw content bytes. + /// * `tags` - Tags to associate with the item. + /// * `metadata` - Client-provided metadata. + /// * `compress` - Whether the server should compress the content. + /// * `run_meta` - Whether the server should run meta plugins. + /// * `settings` - Application settings. + /// + /// # Returns + /// + /// * `Result` - The saved item with full details. + #[allow(clippy::too_many_arguments)] + pub fn save_item_raw( + &self, + conn: &mut Connection, + content: &[u8], + tags: Vec, + metadata: HashMap, + compress: bool, + run_meta: bool, + settings: &Settings, + ) -> Result { + let mut cursor = Cursor::new(content); + self.save_item_raw_streaming( + conn, + &mut cursor, + tags, + metadata, + compress, + run_meta, + None, + None, + settings, + ) + } + + /// Save an item from a streaming reader with granular control over compression. + /// + /// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method + /// reads from the reader in chunks and writes directly to the compression + /// engine, avoiding buffering the entire content in memory. + #[allow(clippy::too_many_arguments)] + pub fn save_item_raw_streaming( + &self, + conn: &mut Connection, + reader: &mut dyn Read, + tags: Vec, + metadata: HashMap, + compress: bool, + run_meta: bool, + client_compression_type: Option, + import_ts: Option>, + settings: &Settings, + ) -> Result { + let mut cmd = Command::new("keep"); + let mut tags = tags; + + crate::modes::common::ensure_default_tag(&mut tags); + + let (compression_type_for_db, compression_engine) = if compress { + let ct = settings_compression_type(&mut cmd, settings); + let engine = get_compression_engine(ct.clone())?; + (ct, engine) + } else { + let ct = client_compression_type.unwrap_or(CompressionType::None); + let engine = get_compression_engine(CompressionType::None)?; + (ct, engine) + }; + + let item_id; + let mut item; + { + item = if let Some(ts) = import_ts { + db::insert_item_with_ts(conn, ts, &compression_type_for_db.to_string())? + } else { + db::create_item(conn, compression_type_for_db.clone())? + }; + item_id = item + .id + .ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; + db::set_item_tags(conn, item.clone(), &tags)?; + } + + let collected_meta: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let collector = collected_meta.clone(); + let save_meta: crate::meta_plugin::SaveMetaFn = + std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| { + if let Ok(mut v) = collector.lock() { + v.push((name.to_string(), value.to_string())); + } + })); + + let meta_service = MetaService::new(save_meta); + let mut plugins = if run_meta { + meta_service.get_plugins(&mut cmd, settings) + } else { + Vec::new() + }; + + if run_meta { + meta_service.initialize_plugins(&mut plugins); + } + + let mut item_path = self.data_path.clone(); + item_path.push(item_id.to_string()); + + let mut item_out = compression_engine.create(item_path)?; + + let mut total_bytes = 0i64; + + crate::common::stream_copy(reader, |chunk| { + item_out.write_all(chunk)?; + total_bytes += chunk.len() as i64; + if run_meta { + meta_service.process_chunk(&mut plugins, chunk); + } + Ok(()) + })?; + + item_out.flush()?; + drop(item_out); + + if run_meta { + meta_service.finalize_plugins(&mut plugins); + } + + if run_meta && let Ok(entries) = collected_meta.lock() { + for (name, value) in entries.iter() { + db::add_meta(conn, item_id, name, value)?; + } + } + + for (key, value) in &metadata { + if key != "uncompressed_size" { + db::add_meta(conn, item_id, key, value)?; + } + } + + item.size = Some(total_bytes); + db::update_item(conn, item)?; + + self.get_item(conn, item_id) + } + + /// Runs specified meta plugins on an existing item's content and stores the results. + pub fn update_item_plugins( + &self, + conn: &mut Connection, + item_id: i64, + plugin_names: &[String], + metadata: HashMap, + tags: &[String], + settings: &Settings, + ) -> Result { + let item = db::get_item(conn, item_id)?.ok_or_else(|| CoreError::ItemNotFound(item_id))?; + + let collected_meta: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let collector = collected_meta.clone(); + let save_meta: crate::meta_plugin::SaveMetaFn = + std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| { + if let Ok(mut v) = collector.lock() { + v.push((name.to_string(), value.to_string())); + } + })); + + let meta_service = MetaService::new(save_meta); + let mut cmd = Command::new("keep"); + + let all_plugins = meta_service.get_plugins(&mut cmd, settings); + let mut plugins: Vec> = all_plugins + .into_iter() + .filter(|p| { + let plugin_name = p.meta_type().to_string(); + plugin_names.iter().any(|n| n == &plugin_name) + }) + .collect(); + + if plugins.is_empty() && metadata.is_empty() { + return self.get_item(conn, item_id); + } + + let mut item_path = self.data_path.clone(); + item_path.push(item_id.to_string()); + + if !item_path.exists() { + return Err(CoreError::ItemNotFound(item_id)); + } + + if !plugins.is_empty() { + let compression_service = CompressionService::new(); + let mut reader = + compression_service.stream_item_content(item_path, &item.compression)?; + + meta_service.initialize_plugins(&mut plugins); + + crate::common::stream_copy(&mut reader, |chunk| { + meta_service.process_chunk(&mut plugins, chunk); + Ok(()) + })?; + + meta_service.finalize_plugins(&mut plugins); + + if let Ok(entries) = collected_meta.lock() { + for (name, value) in entries.iter() { + db::add_meta(conn, item_id, name, value)?; + } + } + } + + for (key, value) in &metadata { + db::add_meta(conn, item_id, key, value)?; + } + + for tag in tags { + db::upsert_tag(conn, item_id, tag)?; + } + + self.get_item(conn, item_id) + } } /// A reader that applies a filter chain to the data as it's read. diff --git a/src/services/mod.rs b/src/services/mod.rs index 60ec2a4..458080e 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,25 +1,17 @@ -pub mod async_data_service; -pub mod async_item_service; pub mod compression_service; -pub mod data_service; pub mod error; pub mod filter_service; pub mod item_service; pub mod meta_service; pub mod status_service; -pub mod sync_data_service; pub mod types; pub mod utils; -pub use async_data_service::AsyncDataService; -pub use async_item_service::AsyncItemService; pub use compression_service::CompressionService; -pub use data_service::DataService; pub use error::CoreError; pub use filter_service::{FilterService, register_filter_plugin}; pub use item_service::ItemService; pub use meta_service::MetaService; pub use status_service::StatusService; -pub use sync_data_service::SyncDataService; pub use types::{ItemWithContent, ItemWithMeta}; pub use utils::{calc_byte_range, extract_tags, parse_comma_tags}; diff --git a/src/services/sync_data_service.rs b/src/services/sync_data_service.rs deleted file mode 100644 index ec22b57..0000000 --- a/src/services/sync_data_service.rs +++ /dev/null @@ -1,450 +0,0 @@ -use crate::common::status::StatusInfo; -use crate::compression_engine::{CompressionType, get_compression_engine}; -use crate::config::Settings; -use crate::db::Item; -use crate::db::Meta; -use crate::modes::common::settings_compression_type; -use crate::services::data_service::DataService; -use crate::services::error::CoreError; -use crate::services::item_service::ItemService; -use crate::services::meta_service::MetaService; -use crate::services::status_service::StatusService; -use crate::services::types::{ItemWithContent, ItemWithMeta}; -use clap::Command; -use rusqlite::Connection; -use std::collections::HashMap; -use std::io::{Cursor, Read, Write}; -use std::path::{Path, PathBuf}; - -pub struct SyncDataService { - item_service: ItemService, - settings: Settings, -} - -impl SyncDataService { - pub fn new(data_path: PathBuf, settings: Settings) -> Self { - Self { - item_service: ItemService::new(data_path), - settings, - } - } - - pub fn with_connection(data_path: PathBuf, settings: Settings, _conn: &Connection) -> Self { - Self::new(data_path, settings) - } - - pub fn item_service(&self) -> &ItemService { - &self.item_service - } - - pub fn settings(&self) -> &Settings { - &self.settings - } - - pub fn get_data_path(&self) -> &PathBuf { - self.item_service.get_data_path() - } - - pub fn save_item( - &self, - content: R, - cmd: &mut Command, - settings: &Settings, - tags: &mut Vec, - conn: &mut Connection, - ) -> Result { - self.item_service - .save_item(content, cmd, settings, tags, conn) - } - - /// Save an item with granular control over compression and meta plugins. - /// - /// This method allows clients to control whether compression and meta plugins - /// run server-side or were already handled by the client. - /// - /// # Arguments - /// - /// * `conn` - Database connection. - /// * `content` - Raw content bytes. - /// * `tags` - Tags to associate with the item. - /// * `metadata` - Client-provided metadata. - /// * `compress` - Whether the server should compress the content. - /// * `run_meta` - Whether the server should run meta plugins. - /// - /// # Returns - /// - /// * `Result` - The saved item with full details. - pub fn save_item_raw( - &self, - conn: &mut Connection, - content: &[u8], - tags: Vec, - metadata: HashMap, - compress: bool, - run_meta: bool, - ) -> Result { - let mut cursor = Cursor::new(content); - self.save_item_raw_streaming(conn, &mut cursor, tags, metadata, compress, run_meta, None) - } - - /// Save an item from a streaming reader with granular control over compression. - /// - /// Unlike `save_item_raw` which takes a pre-buffered `&[u8]`, this method - /// reads from the reader in chunks and writes directly to the compression - /// engine, avoiding buffering the entire content in memory. - #[allow(clippy::too_many_arguments)] - pub fn save_item_raw_streaming( - &self, - conn: &mut Connection, - reader: &mut dyn Read, - tags: Vec, - metadata: HashMap, - compress: bool, - run_meta: bool, - client_compression_type: Option, - ) -> Result { - let mut cmd = Command::new("keep"); - let settings = &self.settings; - let mut tags = tags; - - crate::modes::common::ensure_default_tag(&mut tags); - - // Determine compression type for DB record and for the file writer. - // When compress=true: server compresses using its configured engine. - // When compress=false: client already compressed — write raw bytes to disk - // but record the client's compression type in the DB. - let (compression_type_for_db, compression_engine) = if compress { - let ct = settings_compression_type(&mut cmd, settings); - let engine = get_compression_engine(ct.clone())?; - (ct, engine) - } else { - // Client already compressed — write raw (no engine), record actual type - let ct = client_compression_type.unwrap_or(CompressionType::None); - let engine = get_compression_engine(CompressionType::None)?; - (ct, engine) - }; - - let item_id; - let mut item; - { - item = crate::db::create_item(conn, compression_type_for_db.clone())?; - item_id = item - .id - .ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; - crate::db::set_item_tags(conn, item.clone(), &tags)?; - } - - // Initialize meta plugins if requested - // Collect metadata in memory, write to DB after plugins finish. - let collected_meta: std::sync::Arc>> = - std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); - let collector = collected_meta.clone(); - let save_meta: crate::meta_plugin::SaveMetaFn = - std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| { - if let Ok(mut v) = collector.lock() { - v.push((name.to_string(), value.to_string())); - } - })); - - let meta_service = MetaService::new(save_meta); - let mut plugins = if run_meta { - meta_service.get_plugins(&mut cmd, settings) - } else { - Vec::new() - }; - - if run_meta { - meta_service.initialize_plugins(&mut plugins); - } - - // Write content to file via streaming - let mut item_path = self.item_service.get_data_path().clone(); - item_path.push(item_id.to_string()); - - let mut item_out = compression_engine.create(item_path)?; - - let mut total_bytes = 0i64; - - crate::common::stream_copy(reader, |chunk| { - item_out.write_all(chunk)?; - total_bytes += chunk.len() as i64; - if run_meta { - meta_service.process_chunk(&mut plugins, chunk); - } - Ok(()) - })?; - - item_out.flush()?; - drop(item_out); - - // Finalize meta plugins - if run_meta { - meta_service.finalize_plugins(&mut plugins); - } - - // Write collected plugin metadata to DB - if run_meta && let Ok(entries) = collected_meta.lock() { - for (name, value) in entries.iter() { - crate::db::add_meta(conn, item_id, name, value)?; - } - } - - // Add client-provided metadata (excluding internal fields) - for (key, value) in &metadata { - if key != "uncompressed_size" { - crate::db::add_meta(conn, item_id, key, value)?; - } - } - - item.size = Some(total_bytes); - crate::db::update_item(conn, item)?; - - self.get_item(conn, item_id) - } - - pub fn get_item(&self, conn: &mut Connection, id: i64) -> Result { - self.item_service.get_item(conn, id) - } - - pub fn get_item_content( - &self, - conn: &Connection, - id: i64, - ) -> Result { - self.item_service.get_item_content(conn, id) - } - - pub fn get_item_content_streaming( - &self, - conn: &Connection, - id: i64, - ) -> Result<(Box, ItemWithMeta), CoreError> { - let (reader, _mime, _is_binary) = self - .item_service - .get_item_content_info_streaming(conn, id, None)?; - let item_with_meta = self.item_service.get_item(conn, id)?; - Ok((reader, item_with_meta)) - } - - pub fn list_items( - &self, - conn: &mut Connection, - tags: Vec, - meta: HashMap>, - ) -> Result, CoreError> { - self.item_service.list_items(conn, &tags, &meta) - } - - pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result { - let item_with_meta = self.item_service.get_item(conn, id)?; - let item = item_with_meta.item.clone(); - self.item_service.delete_item(conn, id)?; - Ok(item) - } - - pub fn find_item( - &self, - conn: &mut Connection, - ids: Vec, - tags: Vec, - meta: HashMap>, - ) -> Result { - self.item_service.find_item(conn, &ids, &tags, &meta) - } - - pub fn generate_status( - &self, - cmd: &mut Command, - settings: &Settings, - data_path: PathBuf, - db_path: PathBuf, - ) -> StatusInfo { - let status_service = StatusService::new(); - status_service - .generate_status(cmd, settings, data_path, db_path) - .unwrap_or_else(|_| StatusInfo::default()) - } -} - -impl DataService for SyncDataService { - type Error = CoreError; - - fn save( - &self, - content: R, - cmd: &mut Command, - settings: &Settings, - mut tags: Vec, - conn: &mut Connection, - ) -> Result { - crate::modes::common::ensure_default_tag(&mut tags); - - self.item_service - .save_item(content, cmd, settings, &mut tags, conn) - } - - fn get(&self, conn: &mut Connection, id: i64) -> Result { - self.get_item(conn, id) - } - - fn get_content( - &self, - conn: &mut Connection, - id: i64, - ) -> Result<(Box, ItemWithMeta), Self::Error> { - self.get_item_content_streaming(conn, id) - } - - fn list( - &self, - conn: &mut Connection, - tags: Vec, - meta: HashMap>, - ) -> Result, Self::Error> { - self.list_items(conn, tags, meta) - } - - fn delete(&self, conn: &mut Connection, id: i64) -> Result { - self.delete_item(conn, id) - } - - fn find_item( - &self, - conn: &mut Connection, - ids: Vec, - tags: Vec, - meta: HashMap>, - ) -> Result { - self.find_item(conn, ids, tags, meta) - } - - fn get_items( - &self, - conn: &mut Connection, - ids: &[i64], - tags: &[String], - meta: &HashMap>, - ) -> Result, Self::Error> { - if ids.is_empty() { - return self.list_items(conn, tags.to_vec(), meta.clone()); - } - - let mut results = Vec::new(); - for id in ids { - match self.get_item(conn, *id) { - Ok(item) => results.push(item), - Err(CoreError::ItemNotFound(_)) => continue, - Err(e) => return Err(e), - } - } - Ok(results) - } - - fn generate_status( - &self, - settings: &Settings, - data_path: &Path, - db_path: &Path, - ) -> Result { - let status_service = StatusService::new(); - let mut cmd = Command::new("keep"); - Ok(status_service.generate_status( - &mut cmd, - settings, - data_path.to_path_buf(), - db_path.to_path_buf(), - )?) - } -} - -/// Runs specified meta plugins on an existing item's content and stores the results. -impl SyncDataService { - pub fn update_item_plugins( - &self, - conn: &mut Connection, - item_id: i64, - plugin_names: &[String], - metadata: HashMap, - tags: &[String], - ) -> Result { - use crate::services::compression_service::CompressionService; - use std::io::Read; - - let item = - crate::db::get_item(conn, item_id)?.ok_or_else(|| CoreError::ItemNotFound(item_id))?; - - // Collect metadata in memory - let collected_meta: std::sync::Arc>> = - std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); - let collector = collected_meta.clone(); - let save_meta: crate::meta_plugin::SaveMetaFn = - std::sync::Arc::new(std::sync::Mutex::new(move |name: &str, value: &str| { - if let Ok(mut v) = collector.lock() { - v.push((name.to_string(), value.to_string())); - } - })); - - // Create MetaService and get only the requested plugins - let meta_service = crate::services::meta_service::MetaService::new(save_meta); - let mut cmd = Command::new("keep"); - let settings = &self.settings; - - // Filter to only the requested plugin types - let all_plugins = meta_service.get_plugins(&mut cmd, settings); - let mut plugins: Vec> = all_plugins - .into_iter() - .filter(|p| { - let plugin_name = p.meta_type().to_string(); - plugin_names.iter().any(|n| n == &plugin_name) - }) - .collect(); - - if plugins.is_empty() && metadata.is_empty() { - // Nothing to do, return current item info - return self.get_item(conn, item_id); - } - - // Open and decompress the stored file - let mut item_path = self.item_service.get_data_path().clone(); - item_path.push(item_id.to_string()); - - if !item_path.exists() { - return Err(CoreError::ItemNotFound(item_id)); - } - - if !plugins.is_empty() { - let compression_service = CompressionService::new(); - let mut reader = - compression_service.stream_item_content(item_path, &item.compression)?; - - // Run plugins on the content - meta_service.initialize_plugins(&mut plugins); - - crate::common::stream_copy(&mut reader, |chunk| { - meta_service.process_chunk(&mut plugins, chunk); - Ok(()) - })?; - - meta_service.finalize_plugins(&mut plugins); - - // Write collected plugin metadata to DB - if let Ok(entries) = collected_meta.lock() { - for (name, value) in entries.iter() { - crate::db::add_meta(conn, item_id, name, value)?; - } - } - } - - // Apply direct metadata overrides - for (key, value) in &metadata { - crate::db::add_meta(conn, item_id, key, value)?; - } - - // Apply tags - for tag in tags { - crate::db::upsert_tag(conn, item_id, tag)?; - } - - self.get_item(conn, item_id) - } -}