use crate::client::{ItemInfo, KeepClient}; use crate::compression_engine::CompressionType; use crate::config::Settings; use crate::meta_plugin::SaveMetaFn; use crate::modes::common::settings_compression_type; use crate::services::meta_service::MetaService; use anyhow::Result; use clap::Command; use is_terminal::IsTerminal; use log::debug; use std::collections::HashMap; use std::io::{Read, Write}; use std::sync::{Arc, Mutex}; /// Streaming save mode for client. /// /// Uses three threads for true streaming with constant memory: /// - Reader thread: reads stdin, tees to stdout, runs meta plugins, /// compresses data, writes to OS pipe /// - Pipe: zero-copy transfer of compressed bytes between threads /// - Streamer thread: reads from pipe, streams to server via chunked HTTP /// /// Meta plugins run on the client side during streaming. Collected metadata /// is sent to the server via a separate POST after streaming completes. /// /// Memory usage is O(PIPESIZE) regardless of data size. pub fn mode( client: &KeepClient, cmd: &mut Command, settings: &Settings, tags: &mut Vec, metadata: HashMap, ) -> Result<(), anyhow::Error> { debug!("CLIENT_SAVE: Saving item via remote server (streaming)"); crate::modes::common::ensure_default_tag(tags); // Determine compression type from settings let compression_type = settings_compression_type(cmd, settings); let server_compress = matches!(compression_type, CompressionType::None); // Shared metadata collection: plugins write here via save_meta closure let collected_meta: Arc>> = Arc::new(Mutex::new(HashMap::new())); let meta_collector = collected_meta.clone(); let save_meta: SaveMetaFn = Arc::new(Mutex::new(move |name: &str, value: &str| { if let Ok(mut map) = meta_collector.lock() { map.insert(name.to_string(), value.to_string()); } })); // Create MetaService and get plugins (must happen before spawning reader thread) let meta_service = MetaService::new(save_meta); let mut plugins = meta_service.get_plugins(cmd, settings); // Create OS pipe for streaming compressed bytes between threads let (pipe_reader, pipe_writer) = os_pipe::pipe()?; // Reader thread: stdin → tee(stdout) → meta plugins → compress → pipe let compression_type_clone = compression_type.clone(); let reader_handle = std::thread::spawn(move || -> Result { let stdin = std::io::stdin(); let stdout = std::io::stdout(); let mut stdin_lock = stdin.lock(); let mut stdout_lock = stdout.lock(); let mut total_bytes = 0u64; let mut buffer = [0u8; 8192]; // Initialize meta plugins meta_service.initialize_plugins(&mut plugins); // Wrap pipe writer with appropriate compression let mut compressor: Box = match compression_type_clone { CompressionType::GZip => { use flate2::Compression; use flate2::write::GzEncoder; Box::new(GzEncoder::new(pipe_writer, Compression::default())) } CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(pipe_writer)), _ => Box::new(pipe_writer), }; loop { let n = stdin_lock.read(&mut buffer)?; if n == 0 { break; } // Tee to stdout stdout_lock.write_all(&buffer[..n])?; // Feed chunk to meta plugins meta_service.process_chunk(&mut plugins, &buffer[..n]); total_bytes += n as u64; // Compress and write to pipe compressor.write_all(&buffer[..n])?; } // Finalize meta plugins (digest, text, tokens produce final output here) meta_service.finalize_plugins(&mut plugins); // Explicitly flush and finalize compression before dropping. compressor.flush()?; drop(compressor); Ok(total_bytes) }); // Streamer thread: reads compressed bytes from pipe → POST to server let client_url = client.base_url().to_string(); let client_username = client.username().cloned(); let client_password = client.password().cloned(); let client_jwt = client.jwt().cloned(); let tags_clone = tags.clone(); let streamer_handle = std::thread::spawn(move || -> Result { let streaming_client = KeepClient::new(&client_url, client_username, client_password, client_jwt)?; let params = [ ("compress".to_string(), server_compress.to_string()), ("meta".to_string(), "false".to_string()), ("tags".to_string(), tags_clone.join(",")), ]; let param_refs: Vec<(&str, &str)> = params .iter() .map(|(k, v)| (k.as_str(), v.as_str())) .collect(); let mut reader: Box = Box::new(pipe_reader); let item_info = streaming_client.post_stream("/api/item/", &mut reader, ¶m_refs)?; Ok(item_info) }); // Wait for streaming to complete, capture item info let item_info = streamer_handle .join() .map_err(|e| anyhow::anyhow!("Streamer thread panicked: {:?}", e))??; // Wait for reader thread (should complete quickly after pipe is drained) let uncompressed_size = reader_handle .join() .map_err(|e| anyhow::anyhow!("Reader thread panicked: {:?}", e))??; // Merge plugin-collected metadata with CLI metadata let mut local_metadata = metadata; // Add plugin-collected metadata (digest, hostname, text stats, etc.) if let Ok(plugin_meta) = collected_meta.lock() { for (k, v) in plugin_meta.iter() { local_metadata.entry(k.clone()).or_insert_with(|| v.clone()); } } // Add uncompressed_size (always tracked by client) local_metadata.insert( "uncompressed_size".to_string(), uncompressed_size.to_string(), ); // Record client compression type so the client can decompress on retrieval. if !matches!(compression_type, CompressionType::None) { local_metadata.insert( "_client_compression".to_string(), compression_type.to_string(), ); } // Send metadata to server if !local_metadata.is_empty() { client.post_metadata(item_info.id, &local_metadata)?; } // Print status to stderr if !settings.quiet { if std::io::stderr().is_terminal() { eprintln!("KEEP: New item (streaming) tags: {}", tags.join(" ")); } else { eprintln!("KEEP: New item (streaming) tags: {tags:?}"); } } debug!("CLIENT_SAVE: Streaming complete, {uncompressed_size} bytes uncompressed"); Ok(()) }