use crate::client::{ItemInfo, KeepClient}; use crate::compression_engine::CompressionType; use crate::config::Settings; use crate::modes::common::settings_compression_type; use anyhow::Result; use clap::Command; use is_terminal::IsTerminal; use log::debug; use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::io::{Read, Write}; use std::sync::{Arc, Mutex}; /// Streaming save mode for client. /// /// Uses three threads for true streaming with constant memory: /// - Reader thread: reads stdin, tees to stdout, computes SHA-256, /// compresses data, writes to OS pipe /// - Pipe: zero-copy transfer of compressed bytes between threads /// - Streamer thread: reads from pipe, streams to server via chunked HTTP /// /// Memory usage is O(PIPESIZE) regardless of data size. pub fn mode( client: &KeepClient, cmd: &mut Command, settings: &Settings, tags: &mut Vec, metadata: HashMap, ) -> Result<(), anyhow::Error> { debug!("CLIENT_SAVE: Saving item via remote server (streaming)"); if tags.is_empty() { tags.push("none".to_string()); } // Determine compression type from settings let compression_type = settings_compression_type(cmd, settings); let server_compress = matches!(compression_type, CompressionType::None); // Create OS pipe for streaming compressed bytes between threads let (pipe_reader, pipe_writer) = os_pipe::pipe()?; // Shared state for reader thread results let shared = Arc::new(Mutex::new((0u64, String::new()))); let shared_reader = Arc::clone(&shared); // Reader thread: stdin → tee(stdout) → hash → compress → pipe let compression_type_clone = compression_type.clone(); let reader_handle = std::thread::spawn(move || -> Result<(u64, String)> { let stdin = std::io::stdin(); let stdout = std::io::stdout(); let mut stdin_lock = stdin.lock(); let mut stdout_lock = stdout.lock(); let mut hasher = Sha256::new(); let mut total_bytes = 0u64; let mut buffer = [0u8; 8192]; // Wrap pipe writer with appropriate compression let mut compressor: Box = match compression_type_clone { CompressionType::GZip => { use flate2::Compression; use flate2::write::GzEncoder; Box::new(GzEncoder::new(pipe_writer, Compression::default())) } CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(pipe_writer)), _ => Box::new(pipe_writer), }; loop { let n = stdin_lock.read(&mut buffer)?; if n == 0 { break; } // Tee to stdout stdout_lock.write_all(&buffer[..n])?; // Update hash hasher.update(&buffer[..n]); total_bytes += n as u64; // Compress and write to pipe compressor.write_all(&buffer[..n])?; } // Finalize compression (flushes any buffered compressed data) drop(compressor); // Pipe writer is now dropped (inside compressor), signaling EOF to streamer let digest = format!("{:x}", hasher.finalize()); // Set shared state for main thread let mut shared = shared_reader.lock().unwrap(); *shared = (total_bytes, digest.clone()); Ok((total_bytes, digest)) }); // Streamer thread: reads compressed bytes from pipe → POST to server let client_url = client.base_url().to_string(); let client_password = client.password().cloned(); let tags_clone = tags.clone(); let streamer_handle = std::thread::spawn(move || -> Result { let streaming_client = KeepClient::new(&client_url, client_password)?; let params = [ ("compress".to_string(), server_compress.to_string()), ("meta".to_string(), "false".to_string()), ("tags".to_string(), tags_clone.join(",")), ]; let param_refs: Vec<(&str, &str)> = params .iter() .map(|(k, v)| (k.as_str(), v.as_str())) .collect(); let mut reader: Box = Box::new(pipe_reader); let item_info = streaming_client.post_stream("/api/item/", &mut reader, ¶m_refs)?; Ok(item_info) }); // Wait for streaming to complete, capture item info let item_info = streamer_handle .join() .map_err(|e| anyhow::anyhow!("Streamer thread panicked: {:?}", e))??; // Wait for reader thread (should complete quickly after pipe is drained) reader_handle .join() .map_err(|e| anyhow::anyhow!("Reader thread panicked: {:?}", e))??; // Read results from shared state let (uncompressed_size, digest) = { let shared = shared.lock().unwrap(); shared.clone() }; // Build local metadata and send to server let mut local_metadata = metadata; local_metadata.insert("digest_sha256".to_string(), digest); local_metadata.insert( "uncompressed_size".to_string(), uncompressed_size.to_string(), ); // Add hostname if let Ok(hostname) = gethostname::gethostname().into_string() { local_metadata.insert("hostname".to_string(), hostname.clone()); let short = hostname.split('.').next().unwrap_or(&hostname).to_string(); local_metadata.insert("hostname_short".to_string(), short); } // Send metadata to server if !local_metadata.is_empty() { client.post_metadata(item_info.id, &local_metadata)?; } // Print status to stderr if !settings.quiet { if std::io::stderr().is_terminal() { eprintln!("KEEP: New item (streaming) tags: {}", tags.join(" ")); } else { eprintln!("KEEP: New item (streaming) tags: {tags:?}"); } } debug!("CLIENT_SAVE: Streaming complete, {uncompressed_size} bytes uncompressed"); Ok(()) }