Files
keep/src/modes/client/save.rs
Andrew Phillips 30d7836bcf refactor: deduplicate ItemInfo, improve error handling, fix pre-existing bugs
- Move ItemInfo to services/types.rs for sharing between client and server
- Replace .expect() in compression_service with proper error handling
- Add CoreError::PayloadTooLarge variant for semantic error handling
- Export CoreError from lib.rs for library users
- Unify get_item_meta_name/value to take &str instead of String
- Extract item_path() helper in ItemService to reduce duplication
- Add warning logs for silent errors in list.rs
- Fix pre-existing borrow errors: tx moved in export handler,
  item_with_meta partial move in TryFrom implementation
- Fix unused data_dir variables in server code
2026-03-21 10:43:26 -03:00

181 lines
6.8 KiB
Rust

use crate::client::KeepClient;
use crate::compression_engine::CompressionType;
use crate::config::Settings;
use crate::meta_plugin::SaveMetaFn;
use crate::modes::common::settings_compression_type;
use crate::services::ItemInfo;
use crate::services::compression_service::CompressionService;
use crate::services::meta_service::MetaService;
use anyhow::Result;
use clap::Command;
use is_terminal::IsTerminal;
use log::debug;
use std::collections::HashMap;
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
/// Streaming save mode for client.
///
/// Uses three threads for true streaming with constant memory:
/// - Reader thread: reads stdin, tees to stdout, runs meta plugins,
/// compresses data, writes to OS pipe
/// - Pipe: zero-copy transfer of compressed bytes between threads
/// - Streamer thread: reads from pipe, streams to server via chunked HTTP
///
/// Meta plugins run on the client side during streaming. Collected metadata
/// is sent to the server via a separate POST after streaming completes.
///
/// Memory usage is O(PIPESIZE) regardless of data size.
pub fn mode(
client: &KeepClient,
cmd: &mut Command,
settings: &Settings,
tags: &mut Vec<String>,
metadata: HashMap<String, String>,
) -> Result<(), anyhow::Error> {
debug!("CLIENT_SAVE: Saving item via remote server (streaming)");
crate::modes::common::ensure_default_tag(tags);
// Determine compression type from settings
let compression_type = settings_compression_type(cmd, settings);
let compression_type_str = compression_type.to_string();
// In client mode, the client always handles compression (even "raw").
// The server should never re-compress client data.
let server_compress = false;
// Shared metadata collection: plugins write here via save_meta closure
let collected_meta: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::new()));
let meta_collector = collected_meta.clone();
let save_meta: SaveMetaFn = Arc::new(Mutex::new(move |name: &str, value: &str| {
if let Ok(mut map) = meta_collector.lock() {
map.insert(name.to_string(), value.to_string());
}
}));
// Create MetaService and get plugins (must happen before spawning reader thread)
let meta_service = MetaService::new(save_meta);
let mut plugins = meta_service.get_plugins(cmd, settings);
// Create OS pipe for streaming compressed bytes between threads
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
// Reader thread: stdin → tee(stdout) → meta plugins → compress → pipe
let compression_type_clone = compression_type.clone();
let reader_handle = std::thread::spawn(move || -> Result<u64> {
let stdin = std::io::stdin();
let stdout = std::io::stdout();
let mut stdin_lock = stdin.lock();
let mut stdout_lock = stdout.lock();
let mut total_bytes = 0u64;
let mut buffer = [0u8; 8192];
// Initialize meta plugins
meta_service.initialize_plugins(&mut plugins);
// Wrap pipe writer with appropriate compression
let mut compressor: Box<dyn Write> =
CompressionService::compressing_writer(Box::new(pipe_writer), &compression_type_clone)?;
loop {
let n = stdin_lock.read(&mut buffer)?;
if n == 0 {
break;
}
// Tee to stdout
stdout_lock.write_all(&buffer[..n])?;
// Feed chunk to meta plugins
meta_service.process_chunk(&mut plugins, &buffer[..n]);
total_bytes += n as u64;
// Compress and write to pipe
compressor.write_all(&buffer[..n])?;
}
// Finalize meta plugins (digest, text, tokens produce final output here)
meta_service.finalize_plugins(&mut plugins);
// Explicitly flush and finalize compression before dropping.
compressor.flush()?;
drop(compressor);
Ok(total_bytes)
});
// Streamer thread: reads compressed bytes from pipe → POST to server
let client_url = client.base_url().to_string();
let client_username = client.username().cloned();
let client_password = client.password().cloned();
let client_jwt = client.jwt().cloned();
let tags_clone = tags.clone();
let compression_type_str_clone = compression_type_str.clone();
let streamer_handle = std::thread::spawn(move || -> Result<ItemInfo> {
let streaming_client =
KeepClient::new(&client_url, client_username, client_password, client_jwt)?;
let params = [
("compress".to_string(), server_compress.to_string()),
("meta".to_string(), "false".to_string()),
("tags".to_string(), tags_clone.join(",")),
// Always send compression_type when compress=false (client handled compression)
("compression_type".to_string(), compression_type_str_clone),
];
// Filter out empty params
let params: Vec<(String, String)> =
params.into_iter().filter(|(_, v)| !v.is_empty()).collect();
let param_refs: Vec<(&str, &str)> = params
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
let mut reader: Box<dyn Read> = Box::new(pipe_reader);
let item_info = streaming_client.post_stream("/api/item/", &mut reader, &param_refs)?;
Ok(item_info)
});
// Wait for streaming to complete, capture item info
let item_info = streamer_handle
.join()
.map_err(|e| anyhow::anyhow!("Streamer thread panicked: {:?}", e))??;
// Wait for reader thread (should complete quickly after pipe is drained)
let uncompressed_size = reader_handle
.join()
.map_err(|e| anyhow::anyhow!("Reader thread panicked: {:?}", e))??;
// Merge plugin-collected metadata with CLI metadata
let mut local_metadata = metadata;
// Add plugin-collected metadata (digest, hostname, text stats, etc.)
if let Ok(plugin_meta) = collected_meta.lock() {
for (k, v) in plugin_meta.iter() {
local_metadata.entry(k.clone()).or_insert_with(|| v.clone());
}
}
// Send uncompressed size to server (proper field, not metadata)
client.set_item_size(item_info.id, uncompressed_size)?;
// Send metadata to server
if !local_metadata.is_empty() {
client.post_metadata(item_info.id, &local_metadata)?;
}
// Print status to stderr (item ID is known immediately from server response)
if !settings.quiet {
if std::io::stderr().is_terminal() {
eprintln!("KEEP: New item: {} tags: {}", item_info.id, tags.join(" "));
} else {
eprintln!("KEEP: New item: {} tags: {tags:?}", item_info.id);
}
}
debug!("CLIENT_SAVE: Streaming complete, {uncompressed_size} bytes uncompressed");
Ok(())
}