feat: add client mode with streaming support

Add client mode enabling the keep CLI to connect to a remote keep
server over HTTP. Local plugins (compression, meta, filters) run on
the client; the server stores/retrieves binary blobs.

Architecture:
- Client save uses 3-thread streaming pipeline: reader thread (stdin
  → tee/stdout → hash → compress), OS pipe, streamer thread (pipe →
  chunked HTTP POST). Memory usage is O(PIPESIZE) regardless of data
  size.
- Server accepts compress=false, meta=false, decompress=false query
  params for granular control of server-side processing.
- Streaming body handling on server via async channel → sync reader
  bridge (ChannelReader).

Key additions:
- src/client.rs: KeepClient with post_stream() for chunked upload
- src/modes/client/: save, get, list, info, delete, diff, status
- --client-url / KEEP_CLIENT_URL configuration
- --client-password / KEEP_CLIENT_PASSWORD for auth
- os_pipe dependency for zero-copy pipe streaming

Co-Authored-By: andrew/openrouter/hunter-alpha <noreply@opencode.ai>
This commit is contained in:
2026-03-12 18:01:36 -03:00
parent d2581358e9
commit c5529bedbf
16 changed files with 1105 additions and 20 deletions

171
src/modes/client/save.rs Normal file
View File

@@ -0,0 +1,171 @@
use crate::client::{ItemInfo, KeepClient};
use crate::compression_engine::CompressionType;
use crate::config::Settings;
use crate::modes::common::settings_compression_type;
use anyhow::Result;
use clap::Command;
use is_terminal::IsTerminal;
use log::debug;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
/// Streaming save mode for client.
///
/// Uses three threads for true streaming with constant memory:
/// - Reader thread: reads stdin, tees to stdout, computes SHA-256,
/// compresses data, writes to OS pipe
/// - Pipe: zero-copy transfer of compressed bytes between threads
/// - Streamer thread: reads from pipe, streams to server via chunked HTTP
///
/// Memory usage is O(PIPESIZE) regardless of data size.
pub fn mode(
client: &KeepClient,
cmd: &mut Command,
settings: &Settings,
tags: &mut Vec<String>,
metadata: HashMap<String, String>,
) -> Result<(), anyhow::Error> {
debug!("CLIENT_SAVE: Saving item via remote server (streaming)");
if tags.is_empty() {
tags.push("none".to_string());
}
// Determine compression type from settings
let compression_type = settings_compression_type(cmd, settings);
let server_compress = matches!(compression_type, CompressionType::None);
// Create OS pipe for streaming compressed bytes between threads
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
// Shared state for reader thread results
let shared = Arc::new(Mutex::new((0u64, String::new())));
let shared_reader = Arc::clone(&shared);
// Reader thread: stdin → tee(stdout) → hash → compress → pipe
let compression_type_clone = compression_type.clone();
let reader_handle = std::thread::spawn(move || -> Result<(u64, String)> {
let stdin = std::io::stdin();
let stdout = std::io::stdout();
let mut stdin_lock = stdin.lock();
let mut stdout_lock = stdout.lock();
let mut hasher = Sha256::new();
let mut total_bytes = 0u64;
let mut buffer = [0u8; 8192];
// Wrap pipe writer with appropriate compression
let mut compressor: Box<dyn Write> = match compression_type_clone {
CompressionType::GZip => {
use flate2::Compression;
use flate2::write::GzEncoder;
Box::new(GzEncoder::new(pipe_writer, Compression::default()))
}
CompressionType::LZ4 => Box::new(lz4_flex::frame::FrameEncoder::new(pipe_writer)),
_ => Box::new(pipe_writer),
};
loop {
let n = stdin_lock.read(&mut buffer)?;
if n == 0 {
break;
}
// Tee to stdout
stdout_lock.write_all(&buffer[..n])?;
// Update hash
hasher.update(&buffer[..n]);
total_bytes += n as u64;
// Compress and write to pipe
compressor.write_all(&buffer[..n])?;
}
// Finalize compression (flushes any buffered compressed data)
drop(compressor);
// Pipe writer is now dropped (inside compressor), signaling EOF to streamer
let digest = format!("{:x}", hasher.finalize());
// Set shared state for main thread
let mut shared = shared_reader.lock().unwrap();
*shared = (total_bytes, digest.clone());
Ok((total_bytes, digest))
});
// Streamer thread: reads compressed bytes from pipe → POST to server
let client_url = client.base_url().to_string();
let client_password = client.password().cloned();
let tags_clone = tags.clone();
let streamer_handle = std::thread::spawn(move || -> Result<ItemInfo> {
let streaming_client = KeepClient::new(&client_url, client_password)?;
let params = [
("compress".to_string(), server_compress.to_string()),
("meta".to_string(), "false".to_string()),
("tags".to_string(), tags_clone.join(",")),
];
let param_refs: Vec<(&str, &str)> = params
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
let mut reader: Box<dyn Read> = Box::new(pipe_reader);
let item_info = streaming_client.post_stream("/api/item/", &mut reader, &param_refs)?;
Ok(item_info)
});
// Wait for streaming to complete, capture item info
let item_info = streamer_handle
.join()
.map_err(|e| anyhow::anyhow!("Streamer thread panicked: {:?}", e))??;
// Wait for reader thread (should complete quickly after pipe is drained)
reader_handle
.join()
.map_err(|e| anyhow::anyhow!("Reader thread panicked: {:?}", e))??;
// Read results from shared state
let (uncompressed_size, digest) = {
let shared = shared.lock().unwrap();
shared.clone()
};
// Build local metadata and send to server
let mut local_metadata = metadata;
local_metadata.insert("digest_sha256".to_string(), digest);
local_metadata.insert(
"uncompressed_size".to_string(),
uncompressed_size.to_string(),
);
// Add hostname
if let Ok(hostname) = gethostname::gethostname().into_string() {
local_metadata.insert("hostname".to_string(), hostname.clone());
let short = hostname.split('.').next().unwrap_or(&hostname).to_string();
local_metadata.insert("hostname_short".to_string(), short);
}
// Send metadata to server
if !local_metadata.is_empty() {
client.post_metadata(item_info.id, &local_metadata)?;
}
// Print status to stderr
if !settings.quiet {
if std::io::stderr().is_terminal() {
eprintln!("KEEP: New item (streaming) tags: {}", tags.join(" "));
} else {
eprintln!("KEEP: New item (streaming) tags: {tags:?}");
}
}
debug!("CLIENT_SAVE: Streaming complete, {uncompressed_size} bytes uncompressed");
Ok(())
}