refactor: rename size to uncompressed_size, add compressed_size and closed columns

Schema changes:
- Rename items.size to items.uncompressed_size for clarity
- Add compressed_size (INTEGER NULL) - tracks compressed file size on disk
- Add closed (BOOLEAN NOT NULL DEFAULT 1) - tracks whether item is fully written
- Existing items default to closed=true via migration

Lifecycle:
- Items created with closed=false, set to true on successful save/import
- Compressed size captured via fs::metadata() after compression writer closes
- Truncated uploads (413) get compressed_size set, closed=true, uncompressed_size=None
- Update command now backfills both uncompressed_size and compressed_size

Also includes bug fixes and dedup from prior review:
- Fix stream_raw_content_response using uncompressed_size for raw byte Content-Length
- ApiResponse::ok()/empty() constructors, TryFrom<ItemWithMeta> for ItemInfo
- tag_names() method on ItemWithMeta, meta_filter() on Settings
- Fix .unwrap() panics in compression engine Read/Write impls
- Fix TOCTOU race in stream_raw_content_response (now uses compressed_size)
- Fix swallowed write errors in meta plugins (digest, magic_file, exec)
- Fix term::stderr().unwrap() panic in item_service
- Deduplicate ItemService::new() calls across 20 API handlers
- ImportMeta supports #[serde(alias = "size")] for backward compat

All 75 tests, 67 doc tests pass. Clippy clean.
This commit is contained in:
2026-03-18 10:58:26 -03:00
parent 49793a0f94
commit 00be72f3d0
28 changed files with 377 additions and 308 deletions

View File

@@ -9,7 +9,9 @@ use std::io::Read;
pub struct ItemInfo {
pub id: i64,
pub ts: String,
pub size: Option<i64>,
pub uncompressed_size: Option<i64>,
pub compressed_size: Option<i64>,
pub closed: bool,
pub compression: String,
pub tags: Vec<String>,
pub metadata: HashMap<String, String>,
@@ -354,7 +356,7 @@ impl KeepClient {
/// Set the uncompressed size for an item.
pub fn set_item_size(&self, id: i64, size: u64) -> Result<(), CoreError> {
let url = format!(
"{}?size={}",
"{}?uncompressed_size={}",
self.url(&format!("/api/item/{id}/update")),
size
);

View File

@@ -93,10 +93,22 @@ impl<W: Write> Drop for AutoFinishGzEncoder<W> {
#[cfg(feature = "gzip")]
impl<W: Write> Write for AutoFinishGzEncoder<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.encoder.as_mut().unwrap().write(buf)
match self.encoder.as_mut() {
Some(encoder) => encoder.write(buf),
None => Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"encoder already finished",
)),
}
}
fn flush(&mut self) -> io::Result<()> {
self.encoder.as_mut().unwrap().flush()
match self.encoder.as_mut() {
Some(encoder) => encoder.flush(),
None => Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"encoder already finished",
)),
}
}
}

View File

@@ -15,7 +15,13 @@ pub struct ProgramReader {
impl Read for ProgramReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.stdout.as_mut().unwrap().read(buf)
match self.stdout.as_mut() {
Some(stdout) => stdout.read(buf),
None => Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"stdout already taken",
)),
}
}
}
@@ -33,11 +39,23 @@ pub struct ProgramWriter {
impl Write for ProgramWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.stdin.as_mut().unwrap().write(buf)
match self.stdin.as_mut() {
Some(stdin) => stdin.write(buf),
None => Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"stdin already taken",
)),
}
}
fn flush(&mut self) -> std::io::Result<()> {
self.stdin.as_mut().unwrap().flush()
match self.stdin.as_mut() {
Some(stdin) => stdin.flush(),
None => Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"stdin already taken",
)),
}
}
}

View File

@@ -694,6 +694,14 @@ impl Settings {
.unwrap_or_default()
}
/// Returns the metadata filter as a HashMap.
///
/// Converts the `meta` field (list of key-value pairs from CLI --meta flags)
/// into a `HashMap<String, Option<String>>` suitable for filtering.
pub fn meta_filter(&self) -> std::collections::HashMap<String, Option<String>> {
self.meta.iter().cloned().collect()
}
/// Validates the configuration against plugin schemas.
///
/// Checks that:

103
src/db.rs
View File

@@ -19,7 +19,7 @@ and query utilities for efficient data access.
# Schema
The database uses three main tables:
- `items`: Core item information (ID, timestamp, size, compression).
- `items`: Core item information (ID, timestamp, uncompressed_size, compressed_size, closed, compression).
- `tags`: Item-tag associations (many-to-many).
- `metas`: Item-metadata associations (many-to-many).
@@ -42,7 +42,7 @@ let conn = db::open(PathBuf::from("keep.db"))?;
```
Insert an item:
```ignore
let item = db::Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() };
let item = db::Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
let id = db::insert_item(&conn, item)?;
```
*/
@@ -78,6 +78,9 @@ lazy_static! {
M::up("CREATE INDEX idx_tags_name ON tags(name)"),
M::up("CREATE INDEX idx_metas_name ON metas(name)"),
M::up("UPDATE items SET compression = 'raw' WHERE compression = 'none'"),
M::up("ALTER TABLE items RENAME COLUMN size TO uncompressed_size"),
M::up("ALTER TABLE items ADD COLUMN compressed_size INTEGER NULL"),
M::up("ALTER TABLE items ADD COLUMN closed BOOLEAN NOT NULL DEFAULT 1"),
]);
}
@@ -89,7 +92,9 @@ lazy_static! {
///
/// * `id` - Unique identifier, `None` for new items.
/// * `ts` - Creation timestamp in UTC.
/// * `size` - Content size in bytes, `None` if not set.
/// * `uncompressed_size` - Uncompressed content size in bytes, `None` if not set.
/// * `compressed_size` - Compressed file size on disk, `None` if not set.
/// * `closed` - Whether the item has been fully written and closed.
/// * `compression` - Compression algorithm used.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Item {
@@ -97,8 +102,12 @@ pub struct Item {
pub id: Option<i64>,
/// Timestamp when the item was created.
pub ts: DateTime<Utc>,
/// Size of the item content in bytes, None if not set.
pub size: Option<i64>,
/// Uncompressed size of the item content in bytes, None if not set.
pub uncompressed_size: Option<i64>,
/// Compressed file size on disk in bytes, None if not set.
pub compressed_size: Option<i64>,
/// Whether the item has been fully written and closed.
pub closed: bool,
/// Compression algorithm used for the item content.
pub compression: String,
}
@@ -224,7 +233,9 @@ pub fn open(path: PathBuf) -> Result<Connection, Error> {
/// let item = Item {
/// id: None,
/// ts: Utc::now(),
/// size: None,
/// uncompressed_size: None,
/// compressed_size: None,
/// closed: false,
/// compression: "lz4".to_string(),
/// };
/// let id = db::insert_item(&conn, item)?;
@@ -235,8 +246,8 @@ pub fn open(path: PathBuf) -> Result<Connection, Error> {
pub fn insert_item(conn: &Connection, item: Item) -> Result<i64> {
debug!("DB: Inserting item: {item:?}");
conn.execute(
"INSERT INTO items (ts, size, compression) VALUES (?1, ?2, ?3)",
params![item.ts, item.size, item.compression],
"INSERT INTO items (ts, uncompressed_size, compressed_size, closed, compression) VALUES (?1, ?2, ?3, ?4, ?5)",
params![item.ts, item.uncompressed_size, item.compressed_size, item.closed, item.compression],
)?;
Ok(conn.last_insert_rowid())
}
@@ -283,7 +294,9 @@ pub fn create_item(
let item = Item {
id: None,
ts: chrono::Utc::now(),
size: None,
uncompressed_size: None,
compressed_size: None,
closed: false,
compression: compression_type.to_string(),
};
let item_id = insert_item(conn, item.clone())?;
@@ -312,7 +325,9 @@ pub fn insert_item_with_ts(
let item = Item {
id: None,
ts,
size: None,
uncompressed_size: None,
compressed_size: None,
closed: false,
compression: compression.to_string(),
};
let item_id = insert_item(conn, item.clone())?;
@@ -353,7 +368,7 @@ pub fn insert_item_with_ts(
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let item_id = db::insert_item(&conn, item)?;
/// db::add_tag(&conn, item_id, "important")?;
/// # Ok(())
@@ -411,7 +426,7 @@ pub fn upsert_tag(conn: &Connection, item_id: i64, tag_name: &str) -> Result<()>
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let item_id = db::insert_item(&conn, item)?;
/// db::add_meta(&conn, item_id, "mime_type", "text/plain")?;
/// # Ok(())
@@ -456,7 +471,7 @@ pub fn add_meta(conn: &Connection, item_id: i64, name: &str, value: &str) -> Res
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: Some(1), size: Some(1024), compression: "lz4".to_string(), ts: Utc::now() };
/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: Some(1024), compressed_size: Some(512), closed: true, compression: "lz4".to_string() };
/// db::update_item(&conn, item)?;
/// # Ok(())
/// # }
@@ -464,8 +479,8 @@ pub fn add_meta(conn: &Connection, item_id: i64, name: &str, value: &str) -> Res
pub fn update_item(conn: &Connection, item: Item) -> Result<()> {
debug!("DB: Updating item: {item:?}");
conn.execute(
"UPDATE items SET size=?2, compression=?3 WHERE id=?1",
params![item.id, item.size, item.compression,],
"UPDATE items SET uncompressed_size=?2, compressed_size=?3, closed=?4, compression=?5 WHERE id=?1",
params![item.id, item.uncompressed_size, item.compressed_size, item.closed, item.compression,],
)?;
Ok(())
}
@@ -500,7 +515,7 @@ pub fn update_item(conn: &Connection, item: Item) -> Result<()> {
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// db::delete_item(&conn, item)?;
/// # Ok(())
/// # }
@@ -584,7 +599,7 @@ pub fn query_delete_meta(conn: &Connection, meta: Meta) -> Result<()> {
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let item_id = db::insert_item(&conn, item)?;
/// let meta = Meta { id: item_id, name: "mime_type".to_string(), value: "text/plain".to_string() };
/// db::query_upsert_meta(&conn, meta)?;
@@ -630,7 +645,7 @@ pub fn query_upsert_meta(conn: &Connection, meta: Meta) -> Result<()> {
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let item_id = db::insert_item(&conn, item)?;
/// // Insert new metadata
/// let meta = Meta { id: item_id, name: "source".to_string(), value: "cli".to_string() };
@@ -681,7 +696,7 @@ pub fn store_meta(conn: &Connection, meta: Meta) -> Result<()> {
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let item_id = db::insert_item(&conn, item)?;
/// let tag = Tag { id: item_id, name: "work".to_string() };
/// db::insert_tag(&conn, tag)?;
@@ -726,7 +741,7 @@ pub fn insert_tag(conn: &Connection, tag: Tag) -> Result<()> {
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// db::delete_item_tags(&conn, item)?;
/// # Ok(())
/// # }
@@ -768,9 +783,9 @@ pub fn delete_item_tags(conn: &Connection, item: Item) -> Result<()> {
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let item_id = db::insert_item(&conn, item)?;
/// let item = Item { id: Some(item_id), ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: Some(item_id), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let tags = vec!["project_a".to_string(), "urgent".to_string()];
/// db::set_item_tags(&conn, item, &tags)?;
/// # Ok(())
@@ -831,7 +846,7 @@ pub fn set_item_tags(conn: &Connection, item: Item, tags: &Vec<String>) -> Resul
pub fn query_all_items(conn: &Connection) -> Result<Vec<Item>> {
debug!("DB: Querying all items");
let mut statement = conn
.prepare("SELECT id, ts, size, compression FROM items ORDER BY id ASC")
.prepare("SELECT id, ts, uncompressed_size, compressed_size, closed, compression FROM items ORDER BY id ASC")
.context("Problem preparing SQL statement")?;
let mut rows = statement.query(params![])?;
let mut items = Vec::new();
@@ -840,8 +855,10 @@ pub fn query_all_items(conn: &Connection) -> Result<Vec<Item>> {
let item = Item {
id: row.get(0)?,
ts: row.get(1)?,
size: row.get(2)?,
compression: row.get(3)?,
uncompressed_size: row.get(2)?,
compressed_size: row.get(3)?,
closed: row.get(4)?,
compression: row.get(5)?,
};
items.push(item);
}
@@ -889,7 +906,9 @@ pub fn query_tagged_items<'a>(conn: &'a Connection, tags: &'a Vec<String>) -> Re
"
SELECT items.id,
items.ts,
items.size,
items.uncompressed_size,
items.compressed_size,
items.closed,
items.compression,
count(tags_match.id) as tags_score
FROM items,
@@ -915,8 +934,10 @@ pub fn query_tagged_items<'a>(conn: &'a Connection, tags: &'a Vec<String>) -> Re
let item = Item {
id: row.get(0)?,
ts: row.get(1)?,
size: row.get(2)?,
compression: row.get(3)?,
uncompressed_size: row.get(2)?,
compressed_size: row.get(3)?,
closed: row.get(4)?,
compression: row.get(5)?,
};
items.push(item);
}
@@ -1107,7 +1128,7 @@ pub fn get_item_matching(
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: None, ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: None, ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let item_id = db::insert_item(&conn, item)?;
/// let item = db::get_item(&conn, item_id)?;
/// assert!(item.is_some());
@@ -1119,7 +1140,7 @@ pub fn get_item(conn: &Connection, item_id: i64) -> Result<Option<Item>> {
let mut statement = conn
.prepare_cached(
"
SELECT id, ts, size, compression
SELECT id, ts, uncompressed_size, compressed_size, closed, compression
FROM items
WHERE items.id = ?1",
)
@@ -1131,8 +1152,10 @@ pub fn get_item(conn: &Connection, item_id: i64) -> Result<Option<Item>> {
Some(row) => Ok(Some(Item {
id: row.get(0)?,
ts: row.get(1)?,
size: row.get(2)?,
compression: row.get(3)?,
uncompressed_size: row.get(2)?,
compressed_size: row.get(3)?,
closed: row.get(4)?,
compression: row.get(5)?,
})),
None => Ok(None),
}
@@ -1174,7 +1197,7 @@ pub fn get_item_last(conn: &Connection) -> Result<Option<Item>> {
let mut statement = conn
.prepare_cached(
"
SELECT id, ts, size, compression
SELECT id, ts, uncompressed_size, compressed_size, closed, compression
FROM items
ORDER BY id DESC
LIMIT 1",
@@ -1187,8 +1210,10 @@ pub fn get_item_last(conn: &Connection) -> Result<Option<Item>> {
Some(row) => Ok(Some(Item {
id: row.get(0)?,
ts: row.get(1)?,
size: row.get(2)?,
compression: row.get(3)?,
uncompressed_size: row.get(2)?,
compressed_size: row.get(3)?,
closed: row.get(4)?,
compression: row.get(5)?,
})),
None => Ok(None),
}
@@ -1223,7 +1248,7 @@ pub fn get_item_last(conn: &Connection) -> Result<Option<Item>> {
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let tags = db::get_item_tags(&conn, &item)?;
/// # Ok(())
/// # }
@@ -1276,7 +1301,7 @@ pub fn get_item_tags(conn: &Connection, item: &Item) -> Result<Vec<Tag>> {
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let meta = db::get_item_meta(&conn, &item)?;
/// # Ok(())
/// # }
@@ -1331,7 +1356,7 @@ pub fn get_item_meta(conn: &Connection, item: &Item) -> Result<Vec<Meta>> {
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let meta = db::get_item_meta_name(&conn, &item, "mime_type".to_string())?;
/// # Ok(())
/// # }
@@ -1383,7 +1408,7 @@ pub fn get_item_meta_name(conn: &Connection, item: &Item, name: String) -> Resul
/// let _tmp = tempfile::tempdir()?;
/// let db_path = _tmp.path().join("keep.db");
/// let conn = db::open(db_path)?;
/// let item = Item { id: Some(1), ts: Utc::now(), size: None, compression: "lz4".to_string() };
/// let item = Item { id: Some(1), ts: Utc::now(), uncompressed_size: None, compressed_size: None, closed: false, compression: "lz4".to_string() };
/// let value = db::get_item_meta_value(&conn, &item, "source".to_string())?;
/// # Ok(())
/// # }

View File

@@ -88,7 +88,7 @@ pub fn write_export_tar<W: Write>(
let export_meta = ExportMeta {
ts: item_with_meta.item.ts,
compression: compression.clone(),
size: item_with_meta.item.size,
uncompressed_size: item_with_meta.item.uncompressed_size,
tags: item_tags,
metadata: meta_map,
};

View File

@@ -209,10 +209,12 @@ pub fn import_from_tar(
)?;
}
// Update item size
let size_to_record = import_meta.size.unwrap_or(total);
// Update item sizes
let size_to_record = import_meta.uncompressed_size.unwrap_or(total);
let mut updated_item = item;
updated_item.size = Some(size_to_record);
updated_item.uncompressed_size = Some(size_to_record);
updated_item.compressed_size = Some(std::fs::metadata(&storage_path)?.len() as i64);
updated_item.closed = true;
db::update_item(conn, updated_item)?;
log::info!("KEEP: Imported item {new_id} (was {orig_id}) tags: {tags:?}");

View File

@@ -32,7 +32,7 @@ impl Hasher {
match self {
Hasher::Sha256(hasher) => hasher.update(data),
Hasher::Md5(hasher) => {
let _ = hasher.write(data);
hasher.consume(data);
}
Hasher::Sha512(hasher) => hasher.update(data),
}

View File

@@ -131,7 +131,19 @@ impl MetaPluginExec {
match cmd.spawn() {
Ok(mut child) => {
let stdin = child.stdin.take().unwrap();
let stdin = match child.stdin.take() {
Some(s) => s,
None => {
error!(
"META: Exec plugin: failed to capture stdin for '{}'",
self.program
);
return MetaPluginResponse {
metadata: Vec::new(),
is_finalized: true,
};
}
};
self.writer = Some(Box::new(stdin));
self.process = Some(child);
debug!("META: Exec plugin: started process for '{}'", self.program);

View File

@@ -267,7 +267,10 @@ impl FallbackMagicFileMetaPlugin {
.spawn()
.and_then(|mut child| {
if let Some(mut stdin) = child.stdin.take() {
let _ = stdin.write_all(&self.buffer);
if stdin.write_all(&self.buffer).is_err() {
// Ignore write error; child will see EOF and likely fail
// the file detection, returning no output.
}
}
child.wait_with_output()
});

View File

@@ -135,7 +135,7 @@ fn import_legacy(
debug!("CLIENT_IMPORT: Created item {} via server", item_id);
// Set uncompressed size if known from metadata
if let Some(size) = import_meta.size {
if let Some(size) = import_meta.uncompressed_size {
client.set_item_size(item_id, size as u64)?;
debug!("CLIENT_IMPORT: Set size to {}", size);
}

View File

@@ -31,7 +31,7 @@ pub fn mode(
timestamp: item.ts.clone(),
path: String::new(),
stream_size: item
.size
.uncompressed_size
.map(|s| format_size(s as u64, settings.human_readable))
.unwrap_or_else(|| "N/A".to_string()),
compression: item.compression.clone(),

View File

@@ -46,7 +46,7 @@ pub fn mode(
Some(ColumnType::Id) => item.id.to_string(),
Some(ColumnType::Time) => item.ts.clone(),
Some(ColumnType::Size) => item
.size
.uncompressed_size
.map(|s| format_size(s as u64, settings.human_readable))
.unwrap_or_default(),
Some(ColumnType::Compression) => item.compression.clone(),

View File

@@ -643,7 +643,7 @@ pub fn sanitize_tags(tags: &[String]) -> String {
pub struct ExportMeta {
pub ts: DateTime<Utc>,
pub compression: String,
pub size: Option<i64>,
pub uncompressed_size: Option<i64>,
pub tags: Vec<String>,
pub metadata: HashMap<String, String>,
}
@@ -653,8 +653,8 @@ pub struct ExportMeta {
pub struct ImportMeta {
pub ts: DateTime<Utc>,
pub compression: String,
#[serde(default)]
pub size: Option<i64>,
#[serde(default, alias = "size")]
pub uncompressed_size: Option<i64>,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]

View File

@@ -173,10 +173,12 @@ fn import_legacy(
);
}
// Update item size (use imported size if available, otherwise data length)
let size_to_record = import_meta.size.unwrap_or(data_size);
// Update item sizes (use imported size if available, otherwise data length)
let size_to_record = import_meta.uncompressed_size.unwrap_or(data_size);
let mut updated_item = item;
updated_item.size = Some(size_to_record);
updated_item.uncompressed_size = Some(size_to_record);
updated_item.compressed_size = Some(std::fs::metadata(&item_path)?.len() as i64);
updated_item.closed = true;
db::update_item(conn, updated_item)?;
if !settings.quiet {

View File

@@ -150,7 +150,7 @@ fn show_item(
let mut item_path_buf = data_path.clone();
item_path_buf.push(item_id.to_string());
let size_str = match item.size {
let size_str = match item.uncompressed_size {
Some(size) => format_size(size as u64, settings.human_readable),
None => "Missing".to_string(),
};
@@ -230,7 +230,7 @@ fn show_item_structured(
None => "Missing".to_string(),
};
let stream_size_formatted = match item.size {
let stream_size_formatted = match item.uncompressed_size {
Some(size) => format_size(size as u64, settings.human_readable),
None => "Missing".to_string(),
};
@@ -243,7 +243,7 @@ fn show_item_structured(
.format("%F %T %Z")
.to_string(),
path: item_path_buf.to_str().unwrap_or("").to_string(),
stream_size: item.size.map(|s| s as u64),
stream_size: item.uncompressed_size.map(|s| s as u64),
stream_size_formatted,
compression: item.compression,
file_size,

View File

@@ -152,7 +152,7 @@ pub fn mode_list(
.with_timezone(&chrono::Local)
.format("%F %T")
.to_string(),
ColumnType::Size => match item.size {
ColumnType::Size => match item.uncompressed_size {
Some(size) => format_size(size as u64, settings.human_readable),
None => match item_path.metadata() {
Ok(_) => "Unknown".to_string(),
@@ -218,7 +218,7 @@ pub fn mode_list(
// Apply styling for specific cases
match column_type {
ColumnType::Size => {
if item.size.is_none() {
if item.uncompressed_size.is_none() {
if item_path.metadata().is_ok() {
cell = cell
.fg(comfy_table::Color::Yellow)
@@ -282,7 +282,7 @@ fn show_list_structured(
None => "Missing".to_string(),
};
let size_formatted = match item.size {
let size_formatted = match item.uncompressed_size {
Some(size) => crate::modes::common::format_size(size as u64, settings.human_readable),
None => "Unknown".to_string(),
};
@@ -294,7 +294,7 @@ fn show_list_structured(
.with_timezone(&chrono::Local)
.format("%F %T")
.to_string(),
size: item.size.map(|s| s as u64),
size: item.uncompressed_size.map(|s| s as u64),
size_formatted,
compression: item.compression,
file_size,

View File

@@ -142,12 +142,11 @@ pub async fn handle_list_items(
HashMap::new()
};
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
let db = state.db.clone();
let mut items_with_meta = task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = ItemService::new(data_dir);
item_service.get_items(&conn, &ids, &tags, &meta_filter)
})
.await
@@ -178,44 +177,26 @@ pub async fn handle_list_items(
let item_infos: Vec<ItemInfo> = items_with_meta
.into_iter()
.map(|item_with_meta| {
let item_id = item_with_meta.item.id.unwrap_or(0);
let item_tags: Vec<String> =
item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let item_meta = item_with_meta.meta_as_map();
ItemInfo {
id: item_id,
ts: item_with_meta.item.ts.to_rfc3339(),
size: item_with_meta.item.size,
compression: item_with_meta.item.compression,
tags: item_tags,
metadata: item_meta,
}
})
.filter_map(|iwm| ItemInfo::try_from(iwm).ok())
.collect();
ResponseBuilder::json(ApiResponse {
success: true,
data: Some(item_infos),
error: None,
})
ResponseBuilder::json(ApiResponse::ok(item_infos))
}
/// Handle as_meta=true response by returning JSON with metadata and content
async fn handle_as_meta_response(
db: &Arc<tokio::sync::Mutex<rusqlite::Connection>>,
data_dir: &std::path::Path,
item_service: &Arc<ItemService>,
item_id: i64,
offset: u64,
length: u64,
) -> Result<Response, StatusCode> {
let db1 = db.clone();
let data_dir1 = data_dir.to_path_buf();
let item_service1 = item_service.clone();
let item_with_meta = task::spawn_blocking(move || {
let conn = db1.blocking_lock();
let item_service = ItemService::new(data_dir1);
item_service.get_item(&conn, item_id)
item_service1.get_item(&conn, item_id)
})
.await
.map_err(|e| {
@@ -228,7 +209,16 @@ async fn handle_as_meta_response(
})?;
let metadata = item_with_meta.meta_as_map();
handle_as_meta_response_with_metadata(db, data_dir, item_id, &metadata, offset, length).await
handle_as_meta_response_with_metadata(
db,
data_dir,
item_service,
item_id,
&metadata,
offset,
length,
)
.await
}
/// Handle as_meta=true response with pre-fetched metadata using streaming.
@@ -238,6 +228,7 @@ async fn handle_as_meta_response(
async fn handle_as_meta_response_with_metadata(
db: &Arc<tokio::sync::Mutex<rusqlite::Connection>>,
data_dir: &std::path::Path,
item_service: &Arc<ItemService>,
item_id: i64,
metadata: &HashMap<String, String>,
offset: u64,
@@ -245,11 +236,10 @@ async fn handle_as_meta_response_with_metadata(
) -> Result<Response, StatusCode> {
// Binary detection: read a sample in a blocking task, check, and return early
let db1 = db.clone();
let data_dir1 = data_dir.to_path_buf();
let item_service1 = item_service.clone();
let is_binary = task::spawn_blocking(move || {
let conn = db1.blocking_lock();
let item_service = ItemService::new(data_dir1);
let (mut reader, _) = item_service.get_item_content_streaming(&conn, item_id)?;
let (mut reader, _) = item_service1.get_item_content_streaming(&conn, item_id)?;
let mut sample = vec![0u8; crate::common::PIPESIZE];
let n = reader.read(&mut sample)?;
sample.truncate(n);
@@ -280,13 +270,12 @@ async fn handle_as_meta_response_with_metadata(
// Stream content with offset/length applied at the stream level
let db2 = db.clone();
let data_dir2 = data_dir.to_path_buf();
let item_service2 = item_service.clone();
let content_str = task::spawn_blocking(move || -> Result<String, CoreError> {
let conn = db2.blocking_lock();
let item_service = ItemService::new(data_dir2);
let (mut reader, item_with_meta) =
item_service.get_item_content_streaming(&conn, item_id)?;
let content_len = item_with_meta.item.size.unwrap_or(0) as u64;
item_service2.get_item_content_streaming(&conn, item_id)?;
let content_len = item_with_meta.item.uncompressed_size.unwrap_or(0) as u64;
let result = crate::common::read_with_bounds(&mut reader, offset, length, content_len)
.map_err(CoreError::Io)?;
@@ -366,7 +355,7 @@ pub async fn handle_post_item(
body: Body,
) -> Result<Json<ApiResponse<ItemInfo>>, StatusCode> {
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
let settings = state.settings.clone();
// Parse tags from query parameter
@@ -474,10 +463,11 @@ pub async fn handle_post_item(
});
// Blocking task: consume streaming reader, save via save_item_raw_streaming
let truncated_flag = body_truncated.clone();
let item_with_meta = task::spawn_blocking(move || {
let mut conn = db.blocking_lock();
let item_service = crate::services::ItemService::new(data_dir);
let mut reader = MpscReader::new(rx);
let set_size = !truncated_flag.load(Ordering::Relaxed);
item_service.save_item_raw_streaming(
&mut conn,
&mut reader,
@@ -488,6 +478,7 @@ pub async fn handle_post_item(
client_compression_type,
import_ts,
&settings,
set_size,
)
})
.await
@@ -527,29 +518,12 @@ pub async fn handle_post_item(
return Err(StatusCode::PAYLOAD_TOO_LARGE);
}
let compression = item_with_meta.item.compression.clone();
let tags = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let metadata = item_with_meta.meta_as_map();
let item_info = ItemInfo::try_from(item_with_meta).map_err(|e| {
warn!("Item conversion failed: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let item_info = ItemInfo {
id: item_with_meta.item.id.ok_or_else(|| {
warn!("Item missing ID");
StatusCode::INTERNAL_SERVER_ERROR
})?,
ts: item_with_meta.item.ts.to_rfc3339(),
size: item_with_meta.item.size,
compression,
tags,
metadata,
};
let response = ApiResponse {
success: true,
data: Some(item_info),
error: None,
};
Ok(Json(response))
Ok(Json(ApiResponse::ok(item_info)))
}
#[utoipa::path(
@@ -590,13 +564,12 @@ pub async fn handle_get_item_latest_content(
.unwrap_or_default();
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
// First find the item to get its ID and metadata
let find_tags = tags;
let item_with_meta = task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = ItemService::new(data_dir);
item_service.find_item(&conn, &[], &find_tags, &HashMap::new())
})
.await
@@ -617,6 +590,7 @@ pub async fn handle_get_item_latest_content(
handle_as_meta_response_with_metadata(
&state.db,
&state.data_dir,
&state.item_service,
item_id,
&metadata,
params.offset,
@@ -627,6 +601,7 @@ pub async fn handle_get_item_latest_content(
stream_item_content_response_with_metadata(
&state.db,
&state.data_dir,
&state.item_service,
item_id,
&metadata,
params.allow_binary,
@@ -694,6 +669,7 @@ pub async fn handle_get_item_content(
let result = handle_as_meta_response(
&state.db,
&state.data_dir,
&state.item_service,
item_id,
params.offset,
params.length,
@@ -710,6 +686,7 @@ pub async fn handle_get_item_content(
let result = stream_item_content_response(
&state.db,
&state.data_dir,
&state.item_service,
item_id,
params.allow_binary,
params.offset,
@@ -733,6 +710,7 @@ pub async fn handle_get_item_content(
async fn stream_item_content_response(
db: &Arc<tokio::sync::Mutex<rusqlite::Connection>>,
data_dir: &std::path::Path,
item_service: &Arc<ItemService>,
item_id: i64,
allow_binary: bool,
offset: u64,
@@ -744,11 +722,10 @@ async fn stream_item_content_response(
debug!("STREAM_ITEM_CONTENT_RESPONSE: stream={stream}, decompress={decompress}");
// Get the item with metadata once
let db_clone = db.clone();
let data_dir_clone = data_dir.to_path_buf();
let item_service_clone = item_service.clone();
let item_with_meta = task::spawn_blocking(move || {
let conn = db_clone.blocking_lock();
let item_service = ItemService::new(data_dir_clone);
item_service.get_item(&conn, item_id)
item_service_clone.get_item(&conn, item_id)
})
.await
.map_err(|e| {
@@ -764,6 +741,7 @@ async fn stream_item_content_response(
stream_item_content_response_with_metadata(
db,
data_dir,
item_service,
item_id,
&metadata,
allow_binary,
@@ -782,17 +760,17 @@ async fn stream_item_content_response(
async fn stream_raw_content_response(
db: &Arc<tokio::sync::Mutex<rusqlite::Connection>>,
data_dir: &std::path::Path,
item_service: &Arc<ItemService>,
item_id: i64,
offset: u64,
length: u64,
) -> Result<Response, StatusCode> {
// Get item info to find the compression type
let db_clone = db.clone();
let data_dir_clone = data_dir.to_path_buf();
let item_service_clone = item_service.clone();
let item_with_meta = task::spawn_blocking(move || {
let conn = db_clone.blocking_lock();
let item_service = ItemService::new(data_dir_clone);
item_service.get_item(&conn, item_id)
item_service_clone.get_item(&conn, item_id)
})
.await
.map_err(|e| {
@@ -827,14 +805,10 @@ async fn stream_raw_content_response(
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Get the actual file size on disk (raw bytes, not uncompressed size)
let file_size = {
let item_path = data_dir.join(item_id.to_string());
std::fs::metadata(&item_path).map(|m| m.len()).unwrap_or(0)
};
// Calculate the actual response length
let content_len = file_size;
// Use the already-fetched item size instead of stat()ing the file separately,
// which would be racy (file could change between stat and read).
// This streams raw on-disk bytes, so use compressed_size.
let content_len = item_with_meta.item.compressed_size.unwrap_or(0) as u64;
let start = std::cmp::min(offset, content_len);
let end = if length > 0 {
std::cmp::min(start + length, content_len)
@@ -921,6 +895,7 @@ async fn stream_raw_content_response(
async fn stream_item_content_response_with_metadata(
db: &Arc<tokio::sync::Mutex<rusqlite::Connection>>,
data_dir: &std::path::Path,
item_service: &Arc<ItemService>,
item_id: i64,
metadata: &HashMap<String, String>,
allow_binary: bool,
@@ -934,7 +909,8 @@ async fn stream_item_content_response_with_metadata(
// When decompress=false, return raw stored bytes
if !decompress {
return stream_raw_content_response(db, data_dir, item_id, offset, length).await;
return stream_raw_content_response(db, data_dir, item_service, item_id, offset, length)
.await;
}
let mime_type = get_mime_type(metadata);
@@ -943,11 +919,10 @@ async fn stream_item_content_response_with_metadata(
// Uses a sample of actual content bytes (not metadata-only) for reliable detection.
if !allow_binary {
let db_check = db.clone();
let data_dir_check = data_dir.to_path_buf();
let item_service_check = item_service.clone();
let is_binary = task::spawn_blocking(move || {
let conn = db_check.blocking_lock();
let item_service = ItemService::new(data_dir_check);
let (mut reader, _) = item_service.get_item_content_streaming(&conn, item_id)?;
let (mut reader, _) = item_service_check.get_item_content_streaming(&conn, item_id)?;
let mut sample = vec![0u8; crate::common::PIPESIZE];
let n = reader.read(&mut sample)?;
sample.truncate(n);
@@ -971,14 +946,13 @@ async fn stream_item_content_response_with_metadata(
if stream {
debug!("STREAMING: Using streaming approach");
let db = db.clone();
let data_dir = data_dir.to_path_buf();
let item_service_stream = item_service.clone();
let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(16);
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = ItemService::new(data_dir);
let (mut reader, _, _) =
match item_service.get_item_content_info_streaming(&conn, item_id, None) {
match item_service_stream.get_item_content_info_streaming(&conn, item_id, None) {
Ok(r) => r,
Err(e) => {
let _ = tx.blocking_send(Err(std::io::Error::other(format!("{e}"))));
@@ -1046,15 +1020,14 @@ async fn stream_item_content_response_with_metadata(
} else {
debug!("NON-STREAMING: Building full response in memory using streaming reader");
let db = db.clone();
let data_dir = data_dir.to_path_buf();
let item_service_ns = item_service.clone();
// Get streaming reader from item service
let (reader, content_len_result) = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = ItemService::new(data_dir);
let (reader, item_with_meta) =
item_service.get_item_content_streaming(&conn, item_id)?;
let content_len = item_with_meta.item.size.unwrap_or(0);
item_service_ns.get_item_content_streaming(&conn, item_id)?;
let content_len = item_with_meta.item.uncompressed_size.unwrap_or(0);
Ok::<_, CoreError>((reader, content_len))
})
.await
@@ -1122,12 +1095,11 @@ pub async fn handle_get_item_latest_meta(
.unwrap_or_default();
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
let find_tags = tags;
let result = task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = ItemService::new(data_dir);
item_service.find_item(&conn, &[], &find_tags, &HashMap::new())
})
.await
@@ -1139,14 +1111,7 @@ pub async fn handle_get_item_latest_meta(
match result {
Ok(item_with_meta) => {
let item_meta = item_with_meta.meta_as_map();
let response = ApiResponse {
success: true,
data: Some(item_meta),
error: None,
};
Ok(Json(response))
Ok(Json(ApiResponse::ok(item_meta)))
}
Err(e) => Err(handle_item_error(e)),
}
@@ -1182,11 +1147,10 @@ pub async fn handle_get_item_meta(
}
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
let result = task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = ItemService::new(data_dir);
item_service.get_item(&conn, item_id)
})
.await
@@ -1198,14 +1162,7 @@ pub async fn handle_get_item_meta(
match result {
Ok(item_with_meta) => {
let item_meta = item_with_meta.meta_as_map();
let response = ApiResponse {
success: true,
data: Some(item_meta),
error: None,
};
Ok(Json(response))
Ok(Json(ApiResponse::ok(item_meta)))
}
Err(e) => Err(handle_item_error(e)),
}
@@ -1221,13 +1178,12 @@ pub async fn handle_post_item_meta(
}
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
let meta = metadata.clone();
// Verify item exists and add metadata in a single blocking task
task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = ItemService::new(data_dir);
item_service
.get_item(&conn, item_id)
.map_err(handle_item_error)?;
@@ -1246,13 +1202,7 @@ pub async fn handle_post_item_meta(
StatusCode::INTERNAL_SERVER_ERROR
})??;
let response = ApiResponse {
success: true,
data: Some(()),
error: None,
};
Ok(Json(response))
Ok(Json(ApiResponse::empty()))
}
#[utoipa::path(
@@ -1281,11 +1231,10 @@ pub async fn handle_delete_item(
}
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
let deleted_item = task::spawn_blocking(move || {
let mut conn = db.blocking_lock();
let item_service = crate::services::ItemService::new(data_dir);
item_service.delete_item(&mut conn, item_id)
})
.await
@@ -1301,19 +1250,15 @@ pub async fn handle_delete_item(
StatusCode::INTERNAL_SERVER_ERROR
})?,
ts: deleted_item.ts.to_rfc3339(),
size: deleted_item.size,
uncompressed_size: deleted_item.uncompressed_size,
compressed_size: deleted_item.compressed_size,
closed: deleted_item.closed,
compression: deleted_item.compression,
tags: vec![],
metadata: HashMap::new(),
};
let response = ApiResponse {
success: true,
data: Some(item_info),
error: None,
};
Ok(Json(response))
Ok(Json(ApiResponse::ok(item_info)))
}
#[utoipa::path(
@@ -1342,11 +1287,10 @@ pub async fn handle_get_item_info(
}
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
let item_with_meta = task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = crate::services::ItemService::new(data_dir);
item_service.get_item(&conn, item_id)
})
.await
@@ -1356,28 +1300,12 @@ pub async fn handle_get_item_info(
})?
.map_err(handle_item_error)?;
let tags: Vec<String> = item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let metadata = item_with_meta.meta_as_map();
let item_info = ItemInfo::try_from(item_with_meta).map_err(|e| {
warn!("Item conversion failed: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let item_info = ItemInfo {
id: item_with_meta.item.id.ok_or_else(|| {
warn!("Item missing ID");
StatusCode::INTERNAL_SERVER_ERROR
})?,
ts: item_with_meta.item.ts.to_rfc3339(),
size: item_with_meta.item.size,
compression: item_with_meta.item.compression.clone(),
tags,
metadata,
};
let response = ApiResponse {
success: true,
data: Some(item_info),
error: None,
};
Ok(Json(response))
Ok(Json(ApiResponse::ok(item_info)))
}
#[derive(serde::Deserialize)]
@@ -1414,7 +1342,7 @@ pub async fn handle_diff_items(
Query(query): Query<DiffQuery>,
) -> Result<Json<ApiResponse<Vec<String>>>, StatusCode> {
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
let id_a_param = query.id_a;
let id_b_param = query.id_b;
let tag_a_param = query.tag_a;
@@ -1422,7 +1350,6 @@ pub async fn handle_diff_items(
let diff_lines = task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = crate::services::ItemService::new(data_dir);
let item_a = if let Some(id_a) = id_a_param {
item_service
@@ -1458,7 +1385,7 @@ pub async fn handle_diff_items(
// Size limit for diff operation (10MB per item)
const MAX_DIFF_SIZE: i64 = 10 * 1024 * 1024;
if let Some(size_a) = item_a.item.size
if let Some(size_a) = item_a.item.uncompressed_size
&& size_a > MAX_DIFF_SIZE
{
warn!(
@@ -1467,7 +1394,7 @@ pub async fn handle_diff_items(
);
return Err(StatusCode::PAYLOAD_TOO_LARGE);
}
if let Some(size_b) = item_b.item.size
if let Some(size_b) = item_b.item.uncompressed_size
&& size_b > MAX_DIFF_SIZE
{
warn!(
@@ -1504,13 +1431,7 @@ pub async fn handle_diff_items(
StatusCode::INTERNAL_SERVER_ERROR
})??;
let response = ApiResponse {
success: true,
data: Some(diff_lines),
error: None,
};
Ok(Json(response))
Ok(Json(ApiResponse::ok(diff_lines)))
}
fn compute_diff(a: &[u8], b: &[u8]) -> Vec<String> {
@@ -1593,13 +1514,12 @@ pub async fn handle_update_item(
.unwrap_or_default();
let db = state.db.clone();
let data_dir = state.data_dir.clone();
let item_service = state.item_service.clone();
let settings = state.settings.clone();
let size_param = params.size;
let size_param = params.uncompressed_size;
let item_info = task::spawn_blocking(move || {
let mut conn = db.blocking_lock();
let item_service = crate::services::ItemService::new(data_dir);
// If only size is being set (no plugins/metadata/tags), do a lightweight update
#[allow(clippy::collapsible_if)]
@@ -1607,25 +1527,16 @@ pub async fn handle_update_item(
if plugin_names.is_empty() && metadata.is_empty() && tags.is_empty() {
return match crate::db::get_item(&conn, item_id) {
Ok(Some(mut item)) => {
item.size = Some(size);
item.uncompressed_size = Some(size);
if let Err(e) = crate::db::update_item(&conn, item) {
warn!("Failed to update item size: {e}");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
match item_service.get_item(&conn, item_id) {
Ok(iwm) => {
let tags: Vec<String> =
iwm.tags.iter().map(|t| t.name.clone()).collect();
let metadata = iwm.meta_as_map();
Ok(ItemInfo {
id: item_id,
ts: iwm.item.ts.to_rfc3339(),
size: iwm.item.size,
compression: iwm.item.compression.clone(),
tags,
metadata,
})
}
Ok(iwm) => ItemInfo::try_from(iwm).map_err(|e| {
warn!("Item conversion failed: {e}");
StatusCode::INTERNAL_SERVER_ERROR
}),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
@@ -1645,23 +1556,10 @@ pub async fn handle_update_item(
);
match result {
Ok(item_with_meta) => {
let tags: Vec<String> =
item_with_meta.tags.iter().map(|t| t.name.clone()).collect();
let metadata = item_with_meta.meta_as_map();
Ok(ItemInfo {
id: item_with_meta.item.id.ok_or_else(|| {
warn!("Item missing ID");
StatusCode::INTERNAL_SERVER_ERROR
})?,
ts: item_with_meta.item.ts.to_rfc3339(),
size: item_with_meta.item.size,
compression: item_with_meta.item.compression.clone(),
tags,
metadata,
})
}
Ok(item_with_meta) => ItemInfo::try_from(item_with_meta).map_err(|e| {
warn!("Item conversion failed: {e}");
StatusCode::INTERNAL_SERVER_ERROR
}),
Err(CoreError::ItemNotFound(_)) => Err(StatusCode::NOT_FOUND),
Err(e) => {
warn!("Failed to update item {item_id}: {e}");
@@ -1675,11 +1573,7 @@ pub async fn handle_update_item(
StatusCode::INTERNAL_SERVER_ERROR
})??;
Ok(Json(ApiResponse {
success: true,
data: Some(item_info),
error: None,
}))
Ok(Json(ApiResponse::ok(item_info)))
}
/// Query parameters for the export endpoint.
@@ -1719,13 +1613,13 @@ pub async fn handle_export_items(
return Err(StatusCode::BAD_REQUEST);
}
let data_dir = state.data_dir.clone();
let db = state.db.clone();
let item_service = state.item_service.clone();
let data_dir = state.data_dir.clone();
// Resolve items in blocking task
let items_with_meta = task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_service = ItemService::new(data_dir.clone());
if !ids.is_empty() {
let mut result = Vec::new();
for &id in &ids {
@@ -1765,11 +1659,11 @@ pub async fn handle_export_items(
// Stream tar via mpsc channel
let (tx, rx) = mpsc::channel::<Result<Vec<u8>, std::io::Error>>(16);
let data_dir2 = state.data_dir.clone();
let item_service2 = state.item_service.clone();
let db2 = state.db.clone();
tokio::task::spawn_blocking(move || {
let conn = db2.blocking_lock();
let item_service = ItemService::new(data_dir2.clone());
// Create a writer that sends chunks to the channel
struct ChannelWriter {
@@ -1798,7 +1692,7 @@ pub async fn handle_export_items(
&items_with_meta,
&data_dir2,
None,
&item_service,
&item_service2,
&conn,
) {
warn!("Export tar write failed: {e}");
@@ -1919,9 +1813,5 @@ pub async fn handle_import_items(
"count": imported_ids.len(),
});
Ok(Json(ApiResponse {
success: true,
data: Some(response_data),
error: None,
}))
Ok(Json(ApiResponse::ok(response_data)))
}

View File

@@ -1,4 +1,5 @@
use crate::services::item_service::ItemService;
use crate::services::types::ItemWithMeta;
/// Common utilities and types for the server module.
///
/// This module provides shared structures, functions, and middleware used across
@@ -182,6 +183,26 @@ pub struct ApiResponse<T> {
pub error: Option<String>,
}
impl<T> ApiResponse<T> {
/// Creates a successful API response with the given data.
pub fn ok(data: T) -> Self {
Self {
success: true,
data: Some(data),
error: None,
}
}
/// Creates a successful API response with no data.
pub fn empty() -> Self {
Self {
success: true,
data: None,
error: None,
}
}
}
/// Response type for list of item information.
///
/// Specialized response for endpoints that return multiple items.
@@ -364,11 +385,19 @@ pub struct ItemInfo {
/// The creation timestamp of the item in ISO 8601 format.
#[schema(example = "2023-12-01T15:30:45Z")]
pub ts: String,
/// Size in bytes.
/// Uncompressed size in bytes.
///
/// The size of the item's content in bytes, may be None if not set.
/// The uncompressed size of the item's content in bytes, may be None if not set.
#[schema(example = 1024)]
pub size: Option<i64>,
pub uncompressed_size: Option<i64>,
/// Compressed size in bytes.
///
/// The compressed file size on disk in bytes, may be None if not set.
#[schema(example = 512)]
pub compressed_size: Option<i64>,
/// Whether the item has been fully written and closed.
#[schema(example = true)]
pub closed: bool,
/// Compression type.
///
/// The compression algorithm used for the item's content.
@@ -386,6 +415,26 @@ pub struct ItemInfo {
pub metadata: HashMap<String, String>,
}
impl TryFrom<ItemWithMeta> for ItemInfo {
type Error = anyhow::Error;
fn try_from(item_with_meta: ItemWithMeta) -> Result<Self, Self::Error> {
Ok(ItemInfo {
id: item_with_meta
.item
.id
.ok_or_else(|| anyhow::anyhow!("Item missing ID"))?,
ts: item_with_meta.item.ts.to_rfc3339(),
uncompressed_size: item_with_meta.item.uncompressed_size,
compressed_size: item_with_meta.item.compressed_size,
closed: item_with_meta.item.closed,
compression: item_with_meta.item.compression,
tags: item_with_meta.tag_names(),
metadata: item_with_meta.meta_as_map(),
})
}
}
/// Item information including content and metadata, with binary detection.
///
/// This structure provides item details along with its content, handling binary
@@ -668,7 +717,7 @@ pub struct UpdateItemQuery {
/// Optional comma-separated tags to add.
pub tags: Option<String>,
/// Optional uncompressed size to set on the item.
pub size: Option<i64>,
pub uncompressed_size: Option<i64>,
}
/// Request body for creating a new item.

View File

@@ -240,7 +240,10 @@ fn build_item_list(
format!("<a href=\"/item/{item_id}\">{id_value}</a>")
}
"time" => item.ts.format("%Y-%m-%d %H:%M:%S").to_string(),
"size" => item.size.map(|s| s.to_string()).unwrap_or_default(),
"size" => item
.uncompressed_size
.map(|s| s.to_string())
.unwrap_or_default(),
"tags" => {
// Make sure we're using all tags for the item
let tag_links: Vec<String> = tags
@@ -424,7 +427,7 @@ fn build_item_details(conn: &Connection, id: i64) -> Result<String> {
));
html.push_str(&format!(
"<tr><th>Size</th><td>{}</td></tr>",
item.size.unwrap_or(0)
item.uncompressed_size.unwrap_or(0)
));
html.push_str(&format!(
"<tr><th>Compression</th><td>{}</td></tr>",

View File

@@ -103,11 +103,20 @@ pub fn mode_update(
// Backfill size if not set
let mut updated_item = item.clone();
if item.size.is_none() {
if item.uncompressed_size.is_none() {
debug!("UPDATE: Size not set, backfilling from content file");
if let Some(size) = compute_item_size(&data_path, &item) {
debug!("UPDATE: Computed size: {size}");
updated_item.size = Some(size);
updated_item.uncompressed_size = Some(size);
db::update_item(conn, updated_item.clone())?;
}
}
// Backfill compressed_size if not set
if item.compressed_size.is_none() {
let item_path = data_path.join(item_id.to_string());
if let Ok(meta) = std::fs::metadata(&item_path) {
updated_item.compressed_size = Some(meta.len() as i64);
db::update_item(conn, updated_item.clone())?;
}
}

View File

@@ -150,7 +150,7 @@ impl ItemService {
}
// Check size guard before loading content
if let Some(size) = item_with_meta.item.size
if let Some(size) = item_with_meta.item.uncompressed_size
&& size > MAX_CONTENT_SIZE
{
return Err(CoreError::InvalidInput(format!(
@@ -632,21 +632,22 @@ impl ItemService {
// Print the "KEEP: New item" message before starting to read input
if !settings.quiet {
if std::io::stderr().is_terminal() {
let mut t = term::stderr().unwrap();
let _ = t.reset();
let _ = t.attr(term::Attr::Bold);
let _ = write!(t, "KEEP:");
let _ = t.reset();
let _ = write!(t, " New item ");
let _ = t.attr(term::Attr::Bold);
let _ = write!(t, "{item_id}");
let _ = t.reset();
let _ = write!(t, " tags: ");
let _ = t.attr(term::Attr::Bold);
let _ = write!(t, "{}", tags.join(" "));
let _ = t.reset();
let _ = writeln!(t);
let _ = std::io::stderr().flush();
if let Some(mut t) = term::stderr() {
let _ = t.reset();
let _ = t.attr(term::Attr::Bold);
let _ = write!(t, "KEEP:");
let _ = t.reset();
let _ = write!(t, " New item ");
let _ = t.attr(term::Attr::Bold);
let _ = write!(t, "{item_id}");
let _ = t.reset();
let _ = write!(t, " tags: ");
let _ = t.attr(term::Attr::Bold);
let _ = write!(t, "{}", tags.join(" "));
let _ = t.reset();
let _ = writeln!(t);
let _ = std::io::stderr().flush();
}
} else {
let mut t = std::io::stderr();
let _ = writeln!(t, "KEEP: New item: {item_id} tags: {tags:?}");
@@ -681,6 +682,8 @@ impl ItemService {
item_out.flush()?;
drop(item_out);
let compressed_size = std::fs::metadata(&item_path)?.len() as i64;
debug!("ITEM_SERVICE: Finalizing meta plugins");
meta_service.finalize_plugins(&mut plugins);
@@ -691,7 +694,9 @@ impl ItemService {
}
}
item.size = Some(total_bytes);
item.uncompressed_size = Some(total_bytes);
item.compressed_size = Some(compressed_size);
item.closed = true;
db::update_item(conn, item.clone())?;
debug!("ITEM_SERVICE: Save completed successfully");
@@ -792,6 +797,7 @@ impl ItemService {
None,
None,
settings,
true,
)
}
@@ -812,6 +818,7 @@ impl ItemService {
client_compression_type: Option<CompressionType>,
import_ts: Option<DateTime<Utc>>,
settings: &Settings,
set_size: bool,
) -> Result<ItemWithMeta, CoreError> {
let mut cmd = Command::new("keep");
let mut tags = tags;
@@ -856,7 +863,7 @@ impl ItemService {
let mut item_path = self.data_path.clone();
item_path.push(item_id.to_string());
let mut item_out = compression_engine.create(item_path)?;
let mut item_out = compression_engine.create(item_path.clone())?;
let mut total_bytes = 0i64;
@@ -872,6 +879,8 @@ impl ItemService {
item_out.flush()?;
drop(item_out);
let compressed_size = std::fs::metadata(&item_path)?.len() as i64;
if run_meta {
meta_service.finalize_plugins(&mut plugins);
}
@@ -888,7 +897,9 @@ impl ItemService {
}
}
item.size = Some(total_bytes);
item.uncompressed_size = if set_size { Some(total_bytes) } else { None };
item.compressed_size = Some(compressed_size);
item.closed = true;
db::update_item(conn, item)?;
self.get_item(conn, item_id)

View File

@@ -40,6 +40,15 @@ impl ItemWithMeta {
.map(|m| (m.name, m.value))
.collect()
}
/// Returns a list of tag names for this item.
///
/// # Returns
///
/// `Vec<String>` - Tag names extracted from the tags list.
pub fn tag_names(&self) -> Vec<String> {
self.tags.iter().map(|t| t.name.clone()).collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -57,7 +57,9 @@ pub fn create_test_item(conn: &Connection) -> i64 {
let item = crate::db::Item {
id: None,
ts: chrono::Utc::now(),
size: Some(100),
uncompressed_size: Some(100),
compressed_size: Some(80),
closed: true,
compression: crate::compression_engine::CompressionType::Raw.to_string(),
};
db::insert_item(conn, item).expect("Failed to insert item")

View File

@@ -27,7 +27,9 @@ mod tests {
let item = crate::db::Item {
id: Some(999), // Non-existent item
ts: chrono::Utc::now(),
size: Some(0),
uncompressed_size: Some(0),
compressed_size: Some(0),
closed: true,
compression: crate::compression_engine::CompressionType::Raw.to_string(),
};

View File

@@ -32,7 +32,9 @@ mod tests {
let item = crate::db::Item {
id: Some(999), // Non-existent item
ts: chrono::Utc::now(),
size: Some(0),
uncompressed_size: Some(0),
compressed_size: Some(0),
closed: true,
compression: crate::compression_engine::CompressionType::Raw.to_string(),
};

View File

@@ -10,7 +10,9 @@ mod export_tar_tests {
item: Item {
id: Some(id),
ts: Utc::now(),
size: Some(100),
uncompressed_size: Some(100),
compressed_size: Some(80),
closed: true,
compression: "raw".to_string(),
},
tags: tags

View File

@@ -38,7 +38,9 @@ mod import_tar_tests {
// Set size
let mut updated = item;
updated.size = Some(content.len() as i64);
updated.uncompressed_size = Some(content.len() as i64);
updated.compressed_size = Some(content.len() as i64);
updated.closed = true;
db::update_item(conn, updated).unwrap();
// Set tags
@@ -48,7 +50,9 @@ mod import_tar_tests {
crate::db::Item {
id: Some(item_id),
ts: Utc::now(),
size: Some(content.len() as i64),
uncompressed_size: Some(content.len() as i64),
compressed_size: Some(content.len() as i64),
closed: true,
compression: compression.to_string(),
},
&tag_names,
@@ -185,7 +189,9 @@ mod import_tar_tests {
item: Item {
id: Some(1),
ts: Utc::now(),
size: None,
uncompressed_size: None,
compressed_size: None,
closed: false,
compression: "raw".to_string(),
},
tags: tags