diff --git a/src/client.rs b/src/client.rs index d5937b8..4ed45f4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -9,7 +9,9 @@ use std::io::Read; pub struct ItemInfo { pub id: i64, pub ts: String, - pub size: Option, + pub uncompressed_size: Option, + pub compressed_size: Option, + pub closed: bool, pub compression: String, pub tags: Vec, pub metadata: HashMap, @@ -354,7 +356,7 @@ impl KeepClient { /// Set the uncompressed size for an item. pub fn set_item_size(&self, id: i64, size: u64) -> Result<(), CoreError> { let url = format!( - "{}?size={}", + "{}?uncompressed_size={}", self.url(&format!("/api/item/{id}/update")), size ); diff --git a/src/compression_engine/gzip.rs b/src/compression_engine/gzip.rs index 80fa973..3783ddc 100644 --- a/src/compression_engine/gzip.rs +++ b/src/compression_engine/gzip.rs @@ -93,10 +93,22 @@ impl Drop for AutoFinishGzEncoder { #[cfg(feature = "gzip")] impl Write for AutoFinishGzEncoder { fn write(&mut self, buf: &[u8]) -> io::Result { - self.encoder.as_mut().unwrap().write(buf) + match self.encoder.as_mut() { + Some(encoder) => encoder.write(buf), + None => Err(io::Error::new( + io::ErrorKind::BrokenPipe, + "encoder already finished", + )), + } } fn flush(&mut self) -> io::Result<()> { - self.encoder.as_mut().unwrap().flush() + match self.encoder.as_mut() { + Some(encoder) => encoder.flush(), + None => Err(io::Error::new( + io::ErrorKind::BrokenPipe, + "encoder already finished", + )), + } } } diff --git a/src/compression_engine/program.rs b/src/compression_engine/program.rs index 506cd8e..76d9ac5 100644 --- a/src/compression_engine/program.rs +++ b/src/compression_engine/program.rs @@ -15,7 +15,13 @@ pub struct ProgramReader { impl Read for ProgramReader { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - self.stdout.as_mut().unwrap().read(buf) + match self.stdout.as_mut() { + Some(stdout) => stdout.read(buf), + None => Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "stdout already taken", + )), + } } } @@ -33,11 +39,23 @@ pub struct ProgramWriter { impl Write for ProgramWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.stdin.as_mut().unwrap().write(buf) + match self.stdin.as_mut() { + Some(stdin) => stdin.write(buf), + None => Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "stdin already taken", + )), + } } fn flush(&mut self) -> std::io::Result<()> { - self.stdin.as_mut().unwrap().flush() + match self.stdin.as_mut() { + Some(stdin) => stdin.flush(), + None => Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "stdin already taken", + )), + } } } diff --git a/src/config.rs b/src/config.rs index be4aa61..d4be49e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -694,6 +694,14 @@ impl Settings { .unwrap_or_default() } + /// Returns the metadata filter as a HashMap. + /// + /// Converts the `meta` field (list of key-value pairs from CLI --meta flags) + /// into a `HashMap>` suitable for filtering. + pub fn meta_filter(&self) -> std::collections::HashMap> { + self.meta.iter().cloned().collect() + } + /// Validates the configuration against plugin schemas. /// /// Checks that: diff --git a/src/db.rs b/src/db.rs index a13cfdf..9db733a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -19,7 +19,7 @@ and query utilities for efficient data access. # Schema The database uses three main tables: -- `items`: Core item information (ID, timestamp, size, compression). +- `items`: Core item information (ID, timestamp, uncompressed_size, compressed_size, closed, compression). - `tags`: Item-tag associations (many-to-many). - `metas`: Item-metadata associations (many-to-many). @@ -42,7 +42,7 @@ let conn = db::open(PathBuf::from("keep.db"))?; ``` Insert an item: ```ignore -let item = db::Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() }; +let item = db::Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; let id = db::insert_item(&conn, item)?; ``` */ @@ -78,6 +78,9 @@ lazy_static! { M::up("CREATE INDEX idx_tags_name ON tags(name)"), M::up("CREATE INDEX idx_metas_name ON metas(name)"), M::up("UPDATE items SET compression = 'raw' WHERE compression = 'none'"), + M::up("ALTER TABLE items RENAME COLUMN size TO uncompressed_size"), + M::up("ALTER TABLE items ADD COLUMN compressed_size INTEGER NULL"), + M::up("ALTER TABLE items ADD COLUMN closed BOOLEAN NOT NULL DEFAULT 1"), ]); } @@ -89,7 +92,9 @@ lazy_static! { /// /// * `id` - Unique identifier, `None` for new items. /// * `ts` - Creation timestamp in UTC. -/// * `size` - Content size in bytes, `None` if not set. +/// * `uncompressed_size` - Uncompressed content size in bytes, `None` if not set. +/// * `compressed_size` - Compressed file size on disk, `None` if not set. +/// * `closed` - Whether the item has been fully written and closed. /// * `compression` - Compression algorithm used. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Item { @@ -97,8 +102,12 @@ pub struct Item { pub id: Option, /// Timestamp when the item was created. pub ts: DateTime, - /// Size of the item content in bytes, None if not set. - pub size: Option, + /// Uncompressed size of the item content in bytes, None if not set. + pub uncompressed_size: Option, + /// Compressed file size on disk in bytes, None if not set. + pub compressed_size: Option, + /// Whether the item has been fully written and closed. + pub closed: bool, /// Compression algorithm used for the item content. pub compression: String, } @@ -224,7 +233,9 @@ pub fn open(path: PathBuf) -> Result { /// let item = Item { /// id: None, /// ts: Utc::now(), -/// size: None, +/// uncompressed_size: None, +/// compressed_size: None, +/// closed: false, /// compression: "lz4".to_string(), /// }; /// let id = db::insert_item(&conn, item)?; @@ -235,8 +246,8 @@ pub fn open(path: PathBuf) -> Result { pub fn insert_item(conn: &Connection, item: Item) -> Result { debug!("DB: Inserting item: {item:?}"); conn.execute( - "INSERT INTO items (ts, size, compression) VALUES (?1, ?2, ?3)", - params![item.ts, item.size, item.compression], + "INSERT INTO items (ts, uncompressed_size, compressed_size, closed, compression) VALUES (?1, ?2, ?3, ?4, ?5)", + params![item.ts, item.uncompressed_size, item.compressed_size, item.closed, item.compression], )?; Ok(conn.last_insert_rowid()) } @@ -283,7 +294,9 @@ pub fn create_item( let item = Item { id: None, ts: chrono::Utc::now(), - size: None, + uncompressed_size: None, + compressed_size: None, + closed: false, compression: compression_type.to_string(), }; let item_id = insert_item(conn, item.clone())?; @@ -312,7 +325,9 @@ pub fn insert_item_with_ts( let item = Item { id: None, ts, - size: None, + uncompressed_size: None, + compressed_size: None, + closed: false, compression: compression.to_string(), }; let item_id = insert_item(conn, item.clone())?; @@ -353,7 +368,7 @@ pub fn insert_item_with_ts( /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let item_id = db::insert_item(&conn, item)?; /// db::add_tag(&conn, item_id, "important")?; /// # Ok(()) @@ -411,7 +426,7 @@ pub fn upsert_tag(conn: &Connection, item_id: i64, tag_name: &str) -> Result<()> /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let item_id = db::insert_item(&conn, item)?; /// db::add_meta(&conn, item_id, "mime_type", "text/plain")?; /// # Ok(()) @@ -456,7 +471,7 @@ pub fn add_meta(conn: &Connection, item_id: i64, name: &str, value: &str) -> Res /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: Some(1), size: Some(1024), compression: "lz4".to_string(), ts: Utc::now() }; +/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: Some(1024), compressed_size: Some(512), closed: true, compression: "lz4".to_string() }; /// db::update_item(&conn, item)?; /// # Ok(()) /// # } @@ -464,8 +479,8 @@ pub fn add_meta(conn: &Connection, item_id: i64, name: &str, value: &str) -> Res pub fn update_item(conn: &Connection, item: Item) -> Result<()> { debug!("DB: Updating item: {item:?}"); conn.execute( - "UPDATE items SET size=?2, compression=?3 WHERE id=?1", - params![item.id, item.size, item.compression,], + "UPDATE items SET uncompressed_size=?2, compressed_size=?3, closed=?4, compression=?5 WHERE id=?1", + params![item.id, item.uncompressed_size, item.compressed_size, item.closed, item.compression,], )?; Ok(()) } @@ -500,7 +515,7 @@ pub fn update_item(conn: &Connection, item: Item) -> Result<()> { /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// db::delete_item(&conn, item)?; /// # Ok(()) /// # } @@ -584,7 +599,7 @@ pub fn query_delete_meta(conn: &Connection, meta: Meta) -> Result<()> { /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let item_id = db::insert_item(&conn, item)?; /// let meta = Meta { id: item_id, name: "mime_type".to_string(), value: "text/plain".to_string() }; /// db::query_upsert_meta(&conn, meta)?; @@ -630,7 +645,7 @@ pub fn query_upsert_meta(conn: &Connection, meta: Meta) -> Result<()> { /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let item_id = db::insert_item(&conn, item)?; /// // Insert new metadata /// let meta = Meta { id: item_id, name: "source".to_string(), value: "cli".to_string() }; @@ -681,7 +696,7 @@ pub fn store_meta(conn: &Connection, meta: Meta) -> Result<()> { /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let item_id = db::insert_item(&conn, item)?; /// let tag = Tag { id: item_id, name: "work".to_string() }; /// db::insert_tag(&conn, tag)?; @@ -726,7 +741,7 @@ pub fn insert_tag(conn: &Connection, tag: Tag) -> Result<()> { /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// db::delete_item_tags(&conn, item)?; /// # Ok(()) /// # } @@ -768,9 +783,9 @@ pub fn delete_item_tags(conn: &Connection, item: Item) -> Result<()> { /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let item_id = db::insert_item(&conn, item)?; -/// let item = Item { id: Some(item_id), ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: Some(item_id), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let tags = vec!["project_a".to_string(), "urgent".to_string()]; /// db::set_item_tags(&conn, item, &tags)?; /// # Ok(()) @@ -831,7 +846,7 @@ pub fn set_item_tags(conn: &Connection, item: Item, tags: &Vec) -> Resul pub fn query_all_items(conn: &Connection) -> Result> { debug!("DB: Querying all items"); let mut statement = conn - .prepare("SELECT id, ts, size, compression FROM items ORDER BY id ASC") + .prepare("SELECT id, ts, uncompressed_size, compressed_size, closed, compression FROM items ORDER BY id ASC") .context("Problem preparing SQL statement")?; let mut rows = statement.query(params![])?; let mut items = Vec::new(); @@ -840,8 +855,10 @@ pub fn query_all_items(conn: &Connection) -> Result> { let item = Item { id: row.get(0)?, ts: row.get(1)?, - size: row.get(2)?, - compression: row.get(3)?, + uncompressed_size: row.get(2)?, + compressed_size: row.get(3)?, + closed: row.get(4)?, + compression: row.get(5)?, }; items.push(item); } @@ -889,7 +906,9 @@ pub fn query_tagged_items<'a>(conn: &'a Connection, tags: &'a Vec) -> Re " SELECT items.id, items.ts, - items.size, + items.uncompressed_size, + items.compressed_size, + items.closed, items.compression, count(tags_match.id) as tags_score FROM items, @@ -915,8 +934,10 @@ pub fn query_tagged_items<'a>(conn: &'a Connection, tags: &'a Vec) -> Re let item = Item { id: row.get(0)?, ts: row.get(1)?, - size: row.get(2)?, - compression: row.get(3)?, + uncompressed_size: row.get(2)?, + compressed_size: row.get(3)?, + closed: row.get(4)?, + compression: row.get(5)?, }; items.push(item); } @@ -1107,7 +1128,7 @@ pub fn get_item_matching( /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let item_id = db::insert_item(&conn, item)?; /// let item = db::get_item(&conn, item_id)?; /// assert!(item.is_some()); @@ -1119,7 +1140,7 @@ pub fn get_item(conn: &Connection, item_id: i64) -> Result> { let mut statement = conn .prepare_cached( " - SELECT id, ts, size, compression + SELECT id, ts, uncompressed_size, compressed_size, closed, compression FROM items WHERE items.id = ?1", ) @@ -1131,8 +1152,10 @@ pub fn get_item(conn: &Connection, item_id: i64) -> Result> { Some(row) => Ok(Some(Item { id: row.get(0)?, ts: row.get(1)?, - size: row.get(2)?, - compression: row.get(3)?, + uncompressed_size: row.get(2)?, + compressed_size: row.get(3)?, + closed: row.get(4)?, + compression: row.get(5)?, })), None => Ok(None), } @@ -1174,7 +1197,7 @@ pub fn get_item_last(conn: &Connection) -> Result> { let mut statement = conn .prepare_cached( " - SELECT id, ts, size, compression + SELECT id, ts, uncompressed_size, compressed_size, closed, compression FROM items ORDER BY id DESC LIMIT 1", @@ -1187,8 +1210,10 @@ pub fn get_item_last(conn: &Connection) -> Result> { Some(row) => Ok(Some(Item { id: row.get(0)?, ts: row.get(1)?, - size: row.get(2)?, - compression: row.get(3)?, + uncompressed_size: row.get(2)?, + compressed_size: row.get(3)?, + closed: row.get(4)?, + compression: row.get(5)?, })), None => Ok(None), } @@ -1223,7 +1248,7 @@ pub fn get_item_last(conn: &Connection) -> Result> { /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let tags = db::get_item_tags(&conn, &item)?; /// # Ok(()) /// # } @@ -1276,7 +1301,7 @@ pub fn get_item_tags(conn: &Connection, item: &Item) -> Result> { /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let meta = db::get_item_meta(&conn, &item)?; /// # Ok(()) /// # } @@ -1331,7 +1356,7 @@ pub fn get_item_meta(conn: &Connection, item: &Item) -> Result> { /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let meta = db::get_item_meta_name(&conn, &item, "mime_type".to_string())?; /// # Ok(()) /// # } @@ -1383,7 +1408,7 @@ pub fn get_item_meta_name(conn: &Connection, item: &Item, name: String) -> Resul /// let _tmp = tempfile::tempdir()?; /// let db_path = _tmp.path().join("keep.db"); /// let conn = db::open(db_path)?; -/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() }; +/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() }; /// let value = db::get_item_meta_value(&conn, &item, "source".to_string())?; /// # Ok(()) /// # } diff --git a/src/export_tar.rs b/src/export_tar.rs index a617562..91e69ae 100644 --- a/src/export_tar.rs +++ b/src/export_tar.rs @@ -88,7 +88,7 @@ pub fn write_export_tar( let export_meta = ExportMeta { ts: item_with_meta.item.ts, compression: compression.clone(), - size: item_with_meta.item.size, + uncompressed_size: item_with_meta.item.uncompressed_size, tags: item_tags, metadata: meta_map, }; diff --git a/src/import_tar.rs b/src/import_tar.rs index 226553d..3602aa0 100644 --- a/src/import_tar.rs +++ b/src/import_tar.rs @@ -209,10 +209,12 @@ pub fn import_from_tar( )?; } - // Update item size - let size_to_record = import_meta.size.unwrap_or(total); + // Update item sizes + let size_to_record = import_meta.uncompressed_size.unwrap_or(total); let mut updated_item = item; - updated_item.size = Some(size_to_record); + updated_item.uncompressed_size = Some(size_to_record); + updated_item.compressed_size = Some(std::fs::metadata(&storage_path)?.len() as i64); + updated_item.closed = true; db::update_item(conn, updated_item)?; log::info!("KEEP: Imported item {new_id} (was {orig_id}) tags: {tags:?}"); diff --git a/src/meta_plugin/digest.rs b/src/meta_plugin/digest.rs index 3f77301..75918c0 100644 --- a/src/meta_plugin/digest.rs +++ b/src/meta_plugin/digest.rs @@ -32,7 +32,7 @@ impl Hasher { match self { Hasher::Sha256(hasher) => hasher.update(data), Hasher::Md5(hasher) => { - let _ = hasher.write(data); + hasher.consume(data); } Hasher::Sha512(hasher) => hasher.update(data), } diff --git a/src/meta_plugin/exec.rs b/src/meta_plugin/exec.rs index f66a335..82a36f4 100644 --- a/src/meta_plugin/exec.rs +++ b/src/meta_plugin/exec.rs @@ -131,7 +131,19 @@ impl MetaPluginExec { match cmd.spawn() { Ok(mut child) => { - let stdin = child.stdin.take().unwrap(); + let stdin = match child.stdin.take() { + Some(s) => s, + None => { + error!( + "META: Exec plugin: failed to capture stdin for '{}'", + self.program + ); + return MetaPluginResponse { + metadata: Vec::new(), + is_finalized: true, + }; + } + }; self.writer = Some(Box::new(stdin)); self.process = Some(child); debug!("META: Exec plugin: started process for '{}'", self.program); diff --git a/src/meta_plugin/magic_file.rs b/src/meta_plugin/magic_file.rs index 89ee2b0..10897c3 100644 --- a/src/meta_plugin/magic_file.rs +++ b/src/meta_plugin/magic_file.rs @@ -267,7 +267,10 @@ impl FallbackMagicFileMetaPlugin { .spawn() .and_then(|mut child| { if let Some(mut stdin) = child.stdin.take() { - let _ = stdin.write_all(&self.buffer); + if stdin.write_all(&self.buffer).is_err() { + // Ignore write error; child will see EOF and likely fail + // the file detection, returning no output. + } } child.wait_with_output() }); diff --git a/src/modes/client/import.rs b/src/modes/client/import.rs index d5fcf17..ede4ea7 100644 --- a/src/modes/client/import.rs +++ b/src/modes/client/import.rs @@ -135,7 +135,7 @@ fn import_legacy( debug!("CLIENT_IMPORT: Created item {} via server", item_id); // Set uncompressed size if known from metadata - if let Some(size) = import_meta.size { + if let Some(size) = import_meta.uncompressed_size { client.set_item_size(item_id, size as u64)?; debug!("CLIENT_IMPORT: Set size to {}", size); } diff --git a/src/modes/client/info.rs b/src/modes/client/info.rs index c2dc7a1..ac9328a 100644 --- a/src/modes/client/info.rs +++ b/src/modes/client/info.rs @@ -31,7 +31,7 @@ pub fn mode( timestamp: item.ts.clone(), path: String::new(), stream_size: item - .size + .uncompressed_size .map(|s| format_size(s as u64, settings.human_readable)) .unwrap_or_else(|| "N/A".to_string()), compression: item.compression.clone(), diff --git a/src/modes/client/list.rs b/src/modes/client/list.rs index f132102..1d6b177 100644 --- a/src/modes/client/list.rs +++ b/src/modes/client/list.rs @@ -46,7 +46,7 @@ pub fn mode( Some(ColumnType::Id) => item.id.to_string(), Some(ColumnType::Time) => item.ts.clone(), Some(ColumnType::Size) => item - .size + .uncompressed_size .map(|s| format_size(s as u64, settings.human_readable)) .unwrap_or_default(), Some(ColumnType::Compression) => item.compression.clone(), diff --git a/src/modes/common.rs b/src/modes/common.rs index 17e23e1..4e7b494 100644 --- a/src/modes/common.rs +++ b/src/modes/common.rs @@ -643,7 +643,7 @@ pub fn sanitize_tags(tags: &[String]) -> String { pub struct ExportMeta { pub ts: DateTime, pub compression: String, - pub size: Option, + pub uncompressed_size: Option, pub tags: Vec, pub metadata: HashMap, } @@ -653,8 +653,8 @@ pub struct ExportMeta { pub struct ImportMeta { pub ts: DateTime, pub compression: String, - #[serde(default)] - pub size: Option, + #[serde(default, alias = "size")] + pub uncompressed_size: Option, #[serde(default)] pub tags: Vec, #[serde(default)] diff --git a/src/modes/import.rs b/src/modes/import.rs index addc47c..efc9d6d 100644 --- a/src/modes/import.rs +++ b/src/modes/import.rs @@ -173,10 +173,12 @@ fn import_legacy( ); } - // Update item size (use imported size if available, otherwise data length) - let size_to_record = import_meta.size.unwrap_or(data_size); + // Update item sizes (use imported size if available, otherwise data length) + let size_to_record = import_meta.uncompressed_size.unwrap_or(data_size); let mut updated_item = item; - updated_item.size = Some(size_to_record); + updated_item.uncompressed_size = Some(size_to_record); + updated_item.compressed_size = Some(std::fs::metadata(&item_path)?.len() as i64); + updated_item.closed = true; db::update_item(conn, updated_item)?; if !settings.quiet { diff --git a/src/modes/info.rs b/src/modes/info.rs index 32cea93..650a79c 100644 --- a/src/modes/info.rs +++ b/src/modes/info.rs @@ -150,7 +150,7 @@ fn show_item( let mut item_path_buf = data_path.clone(); item_path_buf.push(item_id.to_string()); - let size_str = match item.size { + let size_str = match item.uncompressed_size { Some(size) => format_size(size as u64, settings.human_readable), None => "Missing".to_string(), }; @@ -230,7 +230,7 @@ fn show_item_structured( None => "Missing".to_string(), }; - let stream_size_formatted = match item.size { + let stream_size_formatted = match item.uncompressed_size { Some(size) => format_size(size as u64, settings.human_readable), None => "Missing".to_string(), }; @@ -243,7 +243,7 @@ fn show_item_structured( .format("%F %T %Z") .to_string(), path: item_path_buf.to_str().unwrap_or("").to_string(), - stream_size: item.size.map(|s| s as u64), + stream_size: item.uncompressed_size.map(|s| s as u64), stream_size_formatted, compression: item.compression, file_size, diff --git a/src/modes/list.rs b/src/modes/list.rs index 07ffc6c..9c0b918 100644 --- a/src/modes/list.rs +++ b/src/modes/list.rs @@ -152,7 +152,7 @@ pub fn mode_list( .with_timezone(&chrono::Local) .format("%F %T") .to_string(), - ColumnType::Size => match item.size { + ColumnType::Size => match item.uncompressed_size { Some(size) => format_size(size as u64, settings.human_readable), None => match item_path.metadata() { Ok(_) => "Unknown".to_string(), @@ -218,7 +218,7 @@ pub fn mode_list( // Apply styling for specific cases match column_type { ColumnType::Size => { - if item.size.is_none() { + if item.uncompressed_size.is_none() { if item_path.metadata().is_ok() { cell = cell .fg(comfy_table::Color::Yellow) @@ -282,7 +282,7 @@ fn show_list_structured( None => "Missing".to_string(), }; - let size_formatted = match item.size { + let size_formatted = match item.uncompressed_size { Some(size) => crate::modes::common::format_size(size as u64, settings.human_readable), None => "Unknown".to_string(), }; @@ -294,7 +294,7 @@ fn show_list_structured( .with_timezone(&chrono::Local) .format("%F %T") .to_string(), - size: item.size.map(|s| s as u64), + size: item.uncompressed_size.map(|s| s as u64), size_formatted, compression: item.compression, file_size, diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index 1015af5..1217aac 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -142,12 +142,11 @@ pub async fn handle_list_items( HashMap::new() }; - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); let db = state.db.clone(); let mut items_with_meta = task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = ItemService::new(data_dir); item_service.get_items(&conn, &ids, &tags, &meta_filter) }) .await @@ -178,44 +177,26 @@ pub async fn handle_list_items( let item_infos: Vec = items_with_meta .into_iter() - .map(|item_with_meta| { - let item_id = item_with_meta.item.id.unwrap_or(0); - let item_tags: Vec = - item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); - let item_meta = item_with_meta.meta_as_map(); - - ItemInfo { - id: item_id, - ts: item_with_meta.item.ts.to_rfc3339(), - size: item_with_meta.item.size, - compression: item_with_meta.item.compression, - tags: item_tags, - metadata: item_meta, - } - }) + .filter_map(|iwm| ItemInfo::try_from(iwm).ok()) .collect(); - ResponseBuilder::json(ApiResponse { - success: true, - data: Some(item_infos), - error: None, - }) + ResponseBuilder::json(ApiResponse::ok(item_infos)) } /// Handle as_meta=true response by returning JSON with metadata and content async fn handle_as_meta_response( db: &Arc>, data_dir: &std::path::Path, + item_service: &Arc, item_id: i64, offset: u64, length: u64, ) -> Result { let db1 = db.clone(); - let data_dir1 = data_dir.to_path_buf(); + let item_service1 = item_service.clone(); let item_with_meta = task::spawn_blocking(move || { let conn = db1.blocking_lock(); - let item_service = ItemService::new(data_dir1); - item_service.get_item(&conn, item_id) + item_service1.get_item(&conn, item_id) }) .await .map_err(|e| { @@ -228,7 +209,16 @@ async fn handle_as_meta_response( })?; let metadata = item_with_meta.meta_as_map(); - handle_as_meta_response_with_metadata(db, data_dir, item_id, &metadata, offset, length).await + handle_as_meta_response_with_metadata( + db, + data_dir, + item_service, + item_id, + &metadata, + offset, + length, + ) + .await } /// Handle as_meta=true response with pre-fetched metadata using streaming. @@ -238,6 +228,7 @@ async fn handle_as_meta_response( async fn handle_as_meta_response_with_metadata( db: &Arc>, data_dir: &std::path::Path, + item_service: &Arc, item_id: i64, metadata: &HashMap, offset: u64, @@ -245,11 +236,10 @@ async fn handle_as_meta_response_with_metadata( ) -> Result { // Binary detection: read a sample in a blocking task, check, and return early let db1 = db.clone(); - let data_dir1 = data_dir.to_path_buf(); + let item_service1 = item_service.clone(); let is_binary = task::spawn_blocking(move || { let conn = db1.blocking_lock(); - let item_service = ItemService::new(data_dir1); - let (mut reader, _) = item_service.get_item_content_streaming(&conn, item_id)?; + let (mut reader, _) = item_service1.get_item_content_streaming(&conn, item_id)?; let mut sample = vec![0u8; crate::common::PIPESIZE]; let n = reader.read(&mut sample)?; sample.truncate(n); @@ -280,13 +270,12 @@ async fn handle_as_meta_response_with_metadata( // Stream content with offset/length applied at the stream level let db2 = db.clone(); - let data_dir2 = data_dir.to_path_buf(); + let item_service2 = item_service.clone(); let content_str = task::spawn_blocking(move || -> Result { let conn = db2.blocking_lock(); - let item_service = ItemService::new(data_dir2); let (mut reader, item_with_meta) = - item_service.get_item_content_streaming(&conn, item_id)?; - let content_len = item_with_meta.item.size.unwrap_or(0) as u64; + item_service2.get_item_content_streaming(&conn, item_id)?; + let content_len = item_with_meta.item.uncompressed_size.unwrap_or(0) as u64; let result = crate::common::read_with_bounds(&mut reader, offset, length, content_len) .map_err(CoreError::Io)?; @@ -366,7 +355,7 @@ pub async fn handle_post_item( body: Body, ) -> Result>, StatusCode> { let db = state.db.clone(); - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); let settings = state.settings.clone(); // Parse tags from query parameter @@ -474,10 +463,11 @@ pub async fn handle_post_item( }); // Blocking task: consume streaming reader, save via save_item_raw_streaming + let truncated_flag = body_truncated.clone(); let item_with_meta = task::spawn_blocking(move || { let mut conn = db.blocking_lock(); - let item_service = crate::services::ItemService::new(data_dir); let mut reader = MpscReader::new(rx); + let set_size = !truncated_flag.load(Ordering::Relaxed); item_service.save_item_raw_streaming( &mut conn, &mut reader, @@ -488,6 +478,7 @@ pub async fn handle_post_item( client_compression_type, import_ts, &settings, + set_size, ) }) .await @@ -527,29 +518,12 @@ pub async fn handle_post_item( return Err(StatusCode::PAYLOAD_TOO_LARGE); } - let compression = item_with_meta.item.compression.clone(); - let tags = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); - let metadata = item_with_meta.meta_as_map(); + let item_info = ItemInfo::try_from(item_with_meta).map_err(|e| { + warn!("Item conversion failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; - let item_info = ItemInfo { - id: item_with_meta.item.id.ok_or_else(|| { - warn!("Item missing ID"); - StatusCode::INTERNAL_SERVER_ERROR - })?, - ts: item_with_meta.item.ts.to_rfc3339(), - size: item_with_meta.item.size, - compression, - tags, - metadata, - }; - - let response = ApiResponse { - success: true, - data: Some(item_info), - error: None, - }; - - Ok(Json(response)) + Ok(Json(ApiResponse::ok(item_info))) } #[utoipa::path( @@ -590,13 +564,12 @@ pub async fn handle_get_item_latest_content( .unwrap_or_default(); let db = state.db.clone(); - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); // First find the item to get its ID and metadata let find_tags = tags; let item_with_meta = task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = ItemService::new(data_dir); item_service.find_item(&conn, &[], &find_tags, &HashMap::new()) }) .await @@ -617,6 +590,7 @@ pub async fn handle_get_item_latest_content( handle_as_meta_response_with_metadata( &state.db, &state.data_dir, + &state.item_service, item_id, &metadata, params.offset, @@ -627,6 +601,7 @@ pub async fn handle_get_item_latest_content( stream_item_content_response_with_metadata( &state.db, &state.data_dir, + &state.item_service, item_id, &metadata, params.allow_binary, @@ -694,6 +669,7 @@ pub async fn handle_get_item_content( let result = handle_as_meta_response( &state.db, &state.data_dir, + &state.item_service, item_id, params.offset, params.length, @@ -710,6 +686,7 @@ pub async fn handle_get_item_content( let result = stream_item_content_response( &state.db, &state.data_dir, + &state.item_service, item_id, params.allow_binary, params.offset, @@ -733,6 +710,7 @@ pub async fn handle_get_item_content( async fn stream_item_content_response( db: &Arc>, data_dir: &std::path::Path, + item_service: &Arc, item_id: i64, allow_binary: bool, offset: u64, @@ -744,11 +722,10 @@ async fn stream_item_content_response( debug!("STREAM_ITEM_CONTENT_RESPONSE: stream={stream}, decompress={decompress}"); // Get the item with metadata once let db_clone = db.clone(); - let data_dir_clone = data_dir.to_path_buf(); + let item_service_clone = item_service.clone(); let item_with_meta = task::spawn_blocking(move || { let conn = db_clone.blocking_lock(); - let item_service = ItemService::new(data_dir_clone); - item_service.get_item(&conn, item_id) + item_service_clone.get_item(&conn, item_id) }) .await .map_err(|e| { @@ -764,6 +741,7 @@ async fn stream_item_content_response( stream_item_content_response_with_metadata( db, data_dir, + item_service, item_id, &metadata, allow_binary, @@ -782,17 +760,17 @@ async fn stream_item_content_response( async fn stream_raw_content_response( db: &Arc>, data_dir: &std::path::Path, + item_service: &Arc, item_id: i64, offset: u64, length: u64, ) -> Result { // Get item info to find the compression type let db_clone = db.clone(); - let data_dir_clone = data_dir.to_path_buf(); + let item_service_clone = item_service.clone(); let item_with_meta = task::spawn_blocking(move || { let conn = db_clone.blocking_lock(); - let item_service = ItemService::new(data_dir_clone); - item_service.get_item(&conn, item_id) + item_service_clone.get_item(&conn, item_id) }) .await .map_err(|e| { @@ -827,14 +805,10 @@ async fn stream_raw_content_response( StatusCode::INTERNAL_SERVER_ERROR })?; - // Get the actual file size on disk (raw bytes, not uncompressed size) - let file_size = { - let item_path = data_dir.join(item_id.to_string()); - std::fs::metadata(&item_path).map(|m| m.len()).unwrap_or(0) - }; - - // Calculate the actual response length - let content_len = file_size; + // Use the already-fetched item size instead of stat()ing the file separately, + // which would be racy (file could change between stat and read). + // This streams raw on-disk bytes, so use compressed_size. + let content_len = item_with_meta.item.compressed_size.unwrap_or(0) as u64; let start = std::cmp::min(offset, content_len); let end = if length > 0 { std::cmp::min(start + length, content_len) @@ -921,6 +895,7 @@ async fn stream_raw_content_response( async fn stream_item_content_response_with_metadata( db: &Arc>, data_dir: &std::path::Path, + item_service: &Arc, item_id: i64, metadata: &HashMap, allow_binary: bool, @@ -934,7 +909,8 @@ async fn stream_item_content_response_with_metadata( // When decompress=false, return raw stored bytes if !decompress { - return stream_raw_content_response(db, data_dir, item_id, offset, length).await; + return stream_raw_content_response(db, data_dir, item_service, item_id, offset, length) + .await; } let mime_type = get_mime_type(metadata); @@ -943,11 +919,10 @@ async fn stream_item_content_response_with_metadata( // Uses a sample of actual content bytes (not metadata-only) for reliable detection. if !allow_binary { let db_check = db.clone(); - let data_dir_check = data_dir.to_path_buf(); + let item_service_check = item_service.clone(); let is_binary = task::spawn_blocking(move || { let conn = db_check.blocking_lock(); - let item_service = ItemService::new(data_dir_check); - let (mut reader, _) = item_service.get_item_content_streaming(&conn, item_id)?; + let (mut reader, _) = item_service_check.get_item_content_streaming(&conn, item_id)?; let mut sample = vec![0u8; crate::common::PIPESIZE]; let n = reader.read(&mut sample)?; sample.truncate(n); @@ -971,14 +946,13 @@ async fn stream_item_content_response_with_metadata( if stream { debug!("STREAMING: Using streaming approach"); let db = db.clone(); - let data_dir = data_dir.to_path_buf(); + let item_service_stream = item_service.clone(); let (tx, rx) = mpsc::channel::, std::io::Error>>(16); tokio::task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = ItemService::new(data_dir); let (mut reader, _, _) = - match item_service.get_item_content_info_streaming(&conn, item_id, None) { + match item_service_stream.get_item_content_info_streaming(&conn, item_id, None) { Ok(r) => r, Err(e) => { let _ = tx.blocking_send(Err(std::io::Error::other(format!("{e}")))); @@ -1046,15 +1020,14 @@ async fn stream_item_content_response_with_metadata( } else { debug!("NON-STREAMING: Building full response in memory using streaming reader"); let db = db.clone(); - let data_dir = data_dir.to_path_buf(); + let item_service_ns = item_service.clone(); // Get streaming reader from item service let (reader, content_len_result) = tokio::task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = ItemService::new(data_dir); let (reader, item_with_meta) = - item_service.get_item_content_streaming(&conn, item_id)?; - let content_len = item_with_meta.item.size.unwrap_or(0); + item_service_ns.get_item_content_streaming(&conn, item_id)?; + let content_len = item_with_meta.item.uncompressed_size.unwrap_or(0); Ok::<_, CoreError>((reader, content_len)) }) .await @@ -1122,12 +1095,11 @@ pub async fn handle_get_item_latest_meta( .unwrap_or_default(); let db = state.db.clone(); - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); let find_tags = tags; let result = task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = ItemService::new(data_dir); item_service.find_item(&conn, &[], &find_tags, &HashMap::new()) }) .await @@ -1139,14 +1111,7 @@ pub async fn handle_get_item_latest_meta( match result { Ok(item_with_meta) => { let item_meta = item_with_meta.meta_as_map(); - - let response = ApiResponse { - success: true, - data: Some(item_meta), - error: None, - }; - - Ok(Json(response)) + Ok(Json(ApiResponse::ok(item_meta))) } Err(e) => Err(handle_item_error(e)), } @@ -1182,11 +1147,10 @@ pub async fn handle_get_item_meta( } let db = state.db.clone(); - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); let result = task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = ItemService::new(data_dir); item_service.get_item(&conn, item_id) }) .await @@ -1198,14 +1162,7 @@ pub async fn handle_get_item_meta( match result { Ok(item_with_meta) => { let item_meta = item_with_meta.meta_as_map(); - - let response = ApiResponse { - success: true, - data: Some(item_meta), - error: None, - }; - - Ok(Json(response)) + Ok(Json(ApiResponse::ok(item_meta))) } Err(e) => Err(handle_item_error(e)), } @@ -1221,13 +1178,12 @@ pub async fn handle_post_item_meta( } let db = state.db.clone(); - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); let meta = metadata.clone(); // Verify item exists and add metadata in a single blocking task task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = ItemService::new(data_dir); item_service .get_item(&conn, item_id) .map_err(handle_item_error)?; @@ -1246,13 +1202,7 @@ pub async fn handle_post_item_meta( StatusCode::INTERNAL_SERVER_ERROR })??; - let response = ApiResponse { - success: true, - data: Some(()), - error: None, - }; - - Ok(Json(response)) + Ok(Json(ApiResponse::empty())) } #[utoipa::path( @@ -1281,11 +1231,10 @@ pub async fn handle_delete_item( } let db = state.db.clone(); - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); let deleted_item = task::spawn_blocking(move || { let mut conn = db.blocking_lock(); - let item_service = crate::services::ItemService::new(data_dir); item_service.delete_item(&mut conn, item_id) }) .await @@ -1301,19 +1250,15 @@ pub async fn handle_delete_item( StatusCode::INTERNAL_SERVER_ERROR })?, ts: deleted_item.ts.to_rfc3339(), - size: deleted_item.size, + uncompressed_size: deleted_item.uncompressed_size, + compressed_size: deleted_item.compressed_size, + closed: deleted_item.closed, compression: deleted_item.compression, tags: vec![], metadata: HashMap::new(), }; - let response = ApiResponse { - success: true, - data: Some(item_info), - error: None, - }; - - Ok(Json(response)) + Ok(Json(ApiResponse::ok(item_info))) } #[utoipa::path( @@ -1342,11 +1287,10 @@ pub async fn handle_get_item_info( } let db = state.db.clone(); - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); let item_with_meta = task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = crate::services::ItemService::new(data_dir); item_service.get_item(&conn, item_id) }) .await @@ -1356,28 +1300,12 @@ pub async fn handle_get_item_info( })? .map_err(handle_item_error)?; - let tags: Vec = item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); - let metadata = item_with_meta.meta_as_map(); + let item_info = ItemInfo::try_from(item_with_meta).map_err(|e| { + warn!("Item conversion failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; - let item_info = ItemInfo { - id: item_with_meta.item.id.ok_or_else(|| { - warn!("Item missing ID"); - StatusCode::INTERNAL_SERVER_ERROR - })?, - ts: item_with_meta.item.ts.to_rfc3339(), - size: item_with_meta.item.size, - compression: item_with_meta.item.compression.clone(), - tags, - metadata, - }; - - let response = ApiResponse { - success: true, - data: Some(item_info), - error: None, - }; - - Ok(Json(response)) + Ok(Json(ApiResponse::ok(item_info))) } #[derive(serde::Deserialize)] @@ -1414,7 +1342,7 @@ pub async fn handle_diff_items( Query(query): Query, ) -> Result>>, StatusCode> { let db = state.db.clone(); - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); let id_a_param = query.id_a; let id_b_param = query.id_b; let tag_a_param = query.tag_a; @@ -1422,7 +1350,6 @@ pub async fn handle_diff_items( let diff_lines = task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = crate::services::ItemService::new(data_dir); let item_a = if let Some(id_a) = id_a_param { item_service @@ -1458,7 +1385,7 @@ pub async fn handle_diff_items( // Size limit for diff operation (10MB per item) const MAX_DIFF_SIZE: i64 = 10 * 1024 * 1024; - if let Some(size_a) = item_a.item.size + if let Some(size_a) = item_a.item.uncompressed_size && size_a > MAX_DIFF_SIZE { warn!( @@ -1467,7 +1394,7 @@ pub async fn handle_diff_items( ); return Err(StatusCode::PAYLOAD_TOO_LARGE); } - if let Some(size_b) = item_b.item.size + if let Some(size_b) = item_b.item.uncompressed_size && size_b > MAX_DIFF_SIZE { warn!( @@ -1504,13 +1431,7 @@ pub async fn handle_diff_items( StatusCode::INTERNAL_SERVER_ERROR })??; - let response = ApiResponse { - success: true, - data: Some(diff_lines), - error: None, - }; - - Ok(Json(response)) + Ok(Json(ApiResponse::ok(diff_lines))) } fn compute_diff(a: &[u8], b: &[u8]) -> Vec { @@ -1593,13 +1514,12 @@ pub async fn handle_update_item( .unwrap_or_default(); let db = state.db.clone(); - let data_dir = state.data_dir.clone(); + let item_service = state.item_service.clone(); let settings = state.settings.clone(); - let size_param = params.size; + let size_param = params.uncompressed_size; let item_info = task::spawn_blocking(move || { let mut conn = db.blocking_lock(); - let item_service = crate::services::ItemService::new(data_dir); // If only size is being set (no plugins/metadata/tags), do a lightweight update #[allow(clippy::collapsible_if)] @@ -1607,25 +1527,16 @@ pub async fn handle_update_item( if plugin_names.is_empty() && metadata.is_empty() && tags.is_empty() { return match crate::db::get_item(&conn, item_id) { Ok(Some(mut item)) => { - item.size = Some(size); + item.uncompressed_size = Some(size); if let Err(e) = crate::db::update_item(&conn, item) { warn!("Failed to update item size: {e}"); return Err(StatusCode::INTERNAL_SERVER_ERROR); } match item_service.get_item(&conn, item_id) { - Ok(iwm) => { - let tags: Vec = - iwm.tags.iter().map(|t| t.name.clone()).collect(); - let metadata = iwm.meta_as_map(); - Ok(ItemInfo { - id: item_id, - ts: iwm.item.ts.to_rfc3339(), - size: iwm.item.size, - compression: iwm.item.compression.clone(), - tags, - metadata, - }) - } + Ok(iwm) => ItemInfo::try_from(iwm).map_err(|e| { + warn!("Item conversion failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + }), Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), } } @@ -1645,23 +1556,10 @@ pub async fn handle_update_item( ); match result { - Ok(item_with_meta) => { - let tags: Vec = - item_with_meta.tags.iter().map(|t| t.name.clone()).collect(); - let metadata = item_with_meta.meta_as_map(); - - Ok(ItemInfo { - id: item_with_meta.item.id.ok_or_else(|| { - warn!("Item missing ID"); - StatusCode::INTERNAL_SERVER_ERROR - })?, - ts: item_with_meta.item.ts.to_rfc3339(), - size: item_with_meta.item.size, - compression: item_with_meta.item.compression.clone(), - tags, - metadata, - }) - } + Ok(item_with_meta) => ItemInfo::try_from(item_with_meta).map_err(|e| { + warn!("Item conversion failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + }), Err(CoreError::ItemNotFound(_)) => Err(StatusCode::NOT_FOUND), Err(e) => { warn!("Failed to update item {item_id}: {e}"); @@ -1675,11 +1573,7 @@ pub async fn handle_update_item( StatusCode::INTERNAL_SERVER_ERROR })??; - Ok(Json(ApiResponse { - success: true, - data: Some(item_info), - error: None, - })) + Ok(Json(ApiResponse::ok(item_info))) } /// Query parameters for the export endpoint. @@ -1719,13 +1613,13 @@ pub async fn handle_export_items( return Err(StatusCode::BAD_REQUEST); } - let data_dir = state.data_dir.clone(); let db = state.db.clone(); + let item_service = state.item_service.clone(); + let data_dir = state.data_dir.clone(); // Resolve items in blocking task let items_with_meta = task::spawn_blocking(move || { let conn = db.blocking_lock(); - let item_service = ItemService::new(data_dir.clone()); if !ids.is_empty() { let mut result = Vec::new(); for &id in &ids { @@ -1765,11 +1659,11 @@ pub async fn handle_export_items( // Stream tar via mpsc channel let (tx, rx) = mpsc::channel::, std::io::Error>>(16); let data_dir2 = state.data_dir.clone(); + let item_service2 = state.item_service.clone(); let db2 = state.db.clone(); tokio::task::spawn_blocking(move || { let conn = db2.blocking_lock(); - let item_service = ItemService::new(data_dir2.clone()); // Create a writer that sends chunks to the channel struct ChannelWriter { @@ -1798,7 +1692,7 @@ pub async fn handle_export_items( &items_with_meta, &data_dir2, None, - &item_service, + &item_service2, &conn, ) { warn!("Export tar write failed: {e}"); @@ -1919,9 +1813,5 @@ pub async fn handle_import_items( "count": imported_ids.len(), }); - Ok(Json(ApiResponse { - success: true, - data: Some(response_data), - error: None, - })) + Ok(Json(ApiResponse::ok(response_data))) } diff --git a/src/modes/server/common.rs b/src/modes/server/common.rs index d93ab8d..72d1475 100644 --- a/src/modes/server/common.rs +++ b/src/modes/server/common.rs @@ -1,4 +1,5 @@ use crate::services::item_service::ItemService; +use crate::services::types::ItemWithMeta; /// Common utilities and types for the server module. /// /// This module provides shared structures, functions, and middleware used across @@ -182,6 +183,26 @@ pub struct ApiResponse { pub error: Option, } +impl ApiResponse { + /// Creates a successful API response with the given data. + pub fn ok(data: T) -> Self { + Self { + success: true, + data: Some(data), + error: None, + } + } + + /// Creates a successful API response with no data. + pub fn empty() -> Self { + Self { + success: true, + data: None, + error: None, + } + } +} + /// Response type for list of item information. /// /// Specialized response for endpoints that return multiple items. @@ -364,11 +385,19 @@ pub struct ItemInfo { /// The creation timestamp of the item in ISO 8601 format. #[schema(example = "2023-12-01T15:30:45Z")] pub ts: String, - /// Size in bytes. + /// Uncompressed size in bytes. /// - /// The size of the item's content in bytes, may be None if not set. + /// The uncompressed size of the item's content in bytes, may be None if not set. #[schema(example = 1024)] - pub size: Option, + pub uncompressed_size: Option, + /// Compressed size in bytes. + /// + /// The compressed file size on disk in bytes, may be None if not set. + #[schema(example = 512)] + pub compressed_size: Option, + /// Whether the item has been fully written and closed. + #[schema(example = true)] + pub closed: bool, /// Compression type. /// /// The compression algorithm used for the item's content. @@ -386,6 +415,26 @@ pub struct ItemInfo { pub metadata: HashMap, } +impl TryFrom for ItemInfo { + type Error = anyhow::Error; + + fn try_from(item_with_meta: ItemWithMeta) -> Result { + Ok(ItemInfo { + id: item_with_meta + .item + .id + .ok_or_else(|| anyhow::anyhow!("Item missing ID"))?, + ts: item_with_meta.item.ts.to_rfc3339(), + uncompressed_size: item_with_meta.item.uncompressed_size, + compressed_size: item_with_meta.item.compressed_size, + closed: item_with_meta.item.closed, + compression: item_with_meta.item.compression, + tags: item_with_meta.tag_names(), + metadata: item_with_meta.meta_as_map(), + }) + } +} + /// Item information including content and metadata, with binary detection. /// /// This structure provides item details along with its content, handling binary @@ -668,7 +717,7 @@ pub struct UpdateItemQuery { /// Optional comma-separated tags to add. pub tags: Option, /// Optional uncompressed size to set on the item. - pub size: Option, + pub uncompressed_size: Option, } /// Request body for creating a new item. diff --git a/src/modes/server/pages.rs b/src/modes/server/pages.rs index 074ee01..ed555a9 100644 --- a/src/modes/server/pages.rs +++ b/src/modes/server/pages.rs @@ -240,7 +240,10 @@ fn build_item_list( format!("{id_value}") } "time" => item.ts.format("%Y-%m-%d %H:%M:%S").to_string(), - "size" => item.size.map(|s| s.to_string()).unwrap_or_default(), + "size" => item + .uncompressed_size + .map(|s| s.to_string()) + .unwrap_or_default(), "tags" => { // Make sure we're using all tags for the item let tag_links: Vec = tags @@ -424,7 +427,7 @@ fn build_item_details(conn: &Connection, id: i64) -> Result { )); html.push_str(&format!( "Size{}", - item.size.unwrap_or(0) + item.uncompressed_size.unwrap_or(0) )); html.push_str(&format!( "Compression{}", diff --git a/src/modes/update.rs b/src/modes/update.rs index 78c61d1..edc1173 100644 --- a/src/modes/update.rs +++ b/src/modes/update.rs @@ -103,11 +103,20 @@ pub fn mode_update( // Backfill size if not set let mut updated_item = item.clone(); - if item.size.is_none() { + if item.uncompressed_size.is_none() { debug!("UPDATE: Size not set, backfilling from content file"); if let Some(size) = compute_item_size(&data_path, &item) { debug!("UPDATE: Computed size: {size}"); - updated_item.size = Some(size); + updated_item.uncompressed_size = Some(size); + db::update_item(conn, updated_item.clone())?; + } + } + + // Backfill compressed_size if not set + if item.compressed_size.is_none() { + let item_path = data_path.join(item_id.to_string()); + if let Ok(meta) = std::fs::metadata(&item_path) { + updated_item.compressed_size = Some(meta.len() as i64); db::update_item(conn, updated_item.clone())?; } } diff --git a/src/services/item_service.rs b/src/services/item_service.rs index ed4d3ae..9124bbc 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -150,7 +150,7 @@ impl ItemService { } // Check size guard before loading content - if let Some(size) = item_with_meta.item.size + if let Some(size) = item_with_meta.item.uncompressed_size && size > MAX_CONTENT_SIZE { return Err(CoreError::InvalidInput(format!( @@ -632,21 +632,22 @@ impl ItemService { // Print the "KEEP: New item" message before starting to read input if !settings.quiet { if std::io::stderr().is_terminal() { - let mut t = term::stderr().unwrap(); - let _ = t.reset(); - let _ = t.attr(term::Attr::Bold); - let _ = write!(t, "KEEP:"); - let _ = t.reset(); - let _ = write!(t, " New item "); - let _ = t.attr(term::Attr::Bold); - let _ = write!(t, "{item_id}"); - let _ = t.reset(); - let _ = write!(t, " tags: "); - let _ = t.attr(term::Attr::Bold); - let _ = write!(t, "{}", tags.join(" ")); - let _ = t.reset(); - let _ = writeln!(t); - let _ = std::io::stderr().flush(); + if let Some(mut t) = term::stderr() { + let _ = t.reset(); + let _ = t.attr(term::Attr::Bold); + let _ = write!(t, "KEEP:"); + let _ = t.reset(); + let _ = write!(t, " New item "); + let _ = t.attr(term::Attr::Bold); + let _ = write!(t, "{item_id}"); + let _ = t.reset(); + let _ = write!(t, " tags: "); + let _ = t.attr(term::Attr::Bold); + let _ = write!(t, "{}", tags.join(" ")); + let _ = t.reset(); + let _ = writeln!(t); + let _ = std::io::stderr().flush(); + } } else { let mut t = std::io::stderr(); let _ = writeln!(t, "KEEP: New item: {item_id} tags: {tags:?}"); @@ -681,6 +682,8 @@ impl ItemService { item_out.flush()?; drop(item_out); + let compressed_size = std::fs::metadata(&item_path)?.len() as i64; + debug!("ITEM_SERVICE: Finalizing meta plugins"); meta_service.finalize_plugins(&mut plugins); @@ -691,7 +694,9 @@ impl ItemService { } } - item.size = Some(total_bytes); + item.uncompressed_size = Some(total_bytes); + item.compressed_size = Some(compressed_size); + item.closed = true; db::update_item(conn, item.clone())?; debug!("ITEM_SERVICE: Save completed successfully"); @@ -792,6 +797,7 @@ impl ItemService { None, None, settings, + true, ) } @@ -812,6 +818,7 @@ impl ItemService { client_compression_type: Option, import_ts: Option>, settings: &Settings, + set_size: bool, ) -> Result { let mut cmd = Command::new("keep"); let mut tags = tags; @@ -856,7 +863,7 @@ impl ItemService { let mut item_path = self.data_path.clone(); item_path.push(item_id.to_string()); - let mut item_out = compression_engine.create(item_path)?; + let mut item_out = compression_engine.create(item_path.clone())?; let mut total_bytes = 0i64; @@ -872,6 +879,8 @@ impl ItemService { item_out.flush()?; drop(item_out); + let compressed_size = std::fs::metadata(&item_path)?.len() as i64; + if run_meta { meta_service.finalize_plugins(&mut plugins); } @@ -888,7 +897,9 @@ impl ItemService { } } - item.size = Some(total_bytes); + item.uncompressed_size = if set_size { Some(total_bytes) } else { None }; + item.compressed_size = Some(compressed_size); + item.closed = true; db::update_item(conn, item)?; self.get_item(conn, item_id) diff --git a/src/services/types.rs b/src/services/types.rs index a1f49aa..962e842 100644 --- a/src/services/types.rs +++ b/src/services/types.rs @@ -40,6 +40,15 @@ impl ItemWithMeta { .map(|m| (m.name, m.value)) .collect() } + + /// Returns a list of tag names for this item. + /// + /// # Returns + /// + /// `Vec` - Tag names extracted from the tags list. + pub fn tag_names(&self) -> Vec { + self.tags.iter().map(|t| t.name.clone()).collect() + } } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/tests/common/test_helpers.rs b/src/tests/common/test_helpers.rs index 4b30cb1..7a07b7c 100644 --- a/src/tests/common/test_helpers.rs +++ b/src/tests/common/test_helpers.rs @@ -57,7 +57,9 @@ pub fn create_test_item(conn: &Connection) -> i64 { let item = crate::db::Item { id: None, ts: chrono::Utc::now(), - size: Some(100), + uncompressed_size: Some(100), + compressed_size: Some(80), + closed: true, compression: crate::compression_engine::CompressionType::Raw.to_string(), }; db::insert_item(conn, item).expect("Failed to insert item") diff --git a/src/tests/db/meta_tests.rs b/src/tests/db/meta_tests.rs index ac0d7a4..1575720 100644 --- a/src/tests/db/meta_tests.rs +++ b/src/tests/db/meta_tests.rs @@ -27,7 +27,9 @@ mod tests { let item = crate::db::Item { id: Some(999), // Non-existent item ts: chrono::Utc::now(), - size: Some(0), + uncompressed_size: Some(0), + compressed_size: Some(0), + closed: true, compression: crate::compression_engine::CompressionType::Raw.to_string(), }; diff --git a/src/tests/db/tag_tests.rs b/src/tests/db/tag_tests.rs index 5e672a4..0565fcb 100644 --- a/src/tests/db/tag_tests.rs +++ b/src/tests/db/tag_tests.rs @@ -32,7 +32,9 @@ mod tests { let item = crate::db::Item { id: Some(999), // Non-existent item ts: chrono::Utc::now(), - size: Some(0), + uncompressed_size: Some(0), + compressed_size: Some(0), + closed: true, compression: crate::compression_engine::CompressionType::Raw.to_string(), }; diff --git a/src/tests/export_tar_tests.rs b/src/tests/export_tar_tests.rs index 3eac658..c4e4167 100644 --- a/src/tests/export_tar_tests.rs +++ b/src/tests/export_tar_tests.rs @@ -10,7 +10,9 @@ mod export_tar_tests { item: Item { id: Some(id), ts: Utc::now(), - size: Some(100), + uncompressed_size: Some(100), + compressed_size: Some(80), + closed: true, compression: "raw".to_string(), }, tags: tags diff --git a/src/tests/import_tar_tests.rs b/src/tests/import_tar_tests.rs index e15a1e0..6cb051c 100644 --- a/src/tests/import_tar_tests.rs +++ b/src/tests/import_tar_tests.rs @@ -38,7 +38,9 @@ mod import_tar_tests { // Set size let mut updated = item; - updated.size = Some(content.len() as i64); + updated.uncompressed_size = Some(content.len() as i64); + updated.compressed_size = Some(content.len() as i64); + updated.closed = true; db::update_item(conn, updated).unwrap(); // Set tags @@ -48,7 +50,9 @@ mod import_tar_tests { crate::db::Item { id: Some(item_id), ts: Utc::now(), - size: Some(content.len() as i64), + uncompressed_size: Some(content.len() as i64), + compressed_size: Some(content.len() as i64), + closed: true, compression: compression.to_string(), }, &tag_names, @@ -185,7 +189,9 @@ mod import_tar_tests { item: Item { id: Some(1), ts: Utc::now(), - size: None, + uncompressed_size: None, + compressed_size: None, + closed: false, compression: "raw".to_string(), }, tags: tags