From 1640932148091a2c5d3c915836f26efd1a2dd247 Mon Sep 17 00:00:00 2001 From: Andrew Phillips Date: Mon, 25 Aug 2025 21:07:04 -0300 Subject: [PATCH] feat: implement streaming for large file handling Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) --- src/modes/get.rs | 50 +++++++++---- src/modes/server/api/item.rs | 4 +- src/services/async_item_service.rs | 104 +++++++++++++++++++--------- src/services/compression_service.rs | 15 ++++ src/services/item_service.rs | 40 +++++++++++ 5 files changed, 165 insertions(+), 48 deletions(-) diff --git a/src/modes/get.rs b/src/modes/get.rs index a4816a6..760882a 100644 --- a/src/modes/get.rs +++ b/src/modes/get.rs @@ -32,21 +32,16 @@ pub fn mode_get( } let item_service = ItemService::new(data_path); - let item_with_content = - item_service.find_item(conn, ids, tags, &meta) - .and_then(|item_with_meta| { - let item_id = item_with_meta.item.id.unwrap(); - item_service.get_item_content(conn, item_id) - }) + let item_with_meta = item_service.find_item(conn, ids, tags, &meta) .map_err(|e| anyhow!("Unable to find matching item in database: {}", e))?; - - let content = &item_with_content.content; - + + let item_id = item_with_meta.item.id.unwrap(); + // Determine if we should detect binary data let mut detect_binary = !settings.force && std::io::stdout().is_terminal(); if detect_binary { - let meta_map = item_with_content.item_with_meta.meta_as_map(); + let meta_map = item_with_meta.meta_as_map(); if let Some(binary_val) = meta_map.get("binary") { if binary_val == "false" { detect_binary = false; @@ -58,12 +53,37 @@ pub fn mode_get( } } - if detect_binary && is_binary(content) { - return Err(anyhow!( - "Refusing to output binary data to TTY, use --force to override" - )); + // Use streaming approach to handle large files + let mut reader = item_service.compression_service.stream_item_content( + data_path.join(item_id.to_string()), + &item_with_meta.item.compression + )?; + + if detect_binary { + // Read only the first 8192 bytes for binary detection + let mut sample_buffer = vec![0; 8192]; + let bytes_read = reader.read(&mut sample_buffer)?; + if is_binary(&sample_buffer[..bytes_read]) { + return Err(anyhow!( + "Refusing to output binary data to TTY, use --force to override" + )); + } + // We need to create a new reader since we consumed some bytes + reader = item_service.compression_service.stream_item_content( + data_path.join(item_id.to_string()), + &item_with_meta.item.compression + )?; } - std::io::stdout().write_all(content)?; + // Stream the content to stdout + let mut stdout = std::io::stdout(); + let mut buffer = [0; 8192]; + loop { + let bytes_read = reader.read(&mut buffer)?; + if bytes_read == 0 { + break; + } + stdout.write_all(&buffer[..bytes_read])?; + } Ok(()) } diff --git a/src/modes/server/api/item.rs b/src/modes/server/api/item.rs index 9ffa634..770a4a9 100644 --- a/src/modes/server/api/item.rs +++ b/src/modes/server/api/item.rs @@ -261,8 +261,8 @@ async fn stream_item_content_response_with_metadata( let is_binary = if let Some(binary_val) = metadata.get("binary") { binary_val == "true" } else { - // If binary metadata isn't set, we need to check the content - match item_service.get_item_content_info(item_id).await { + // If binary metadata isn't set, we need to check the content using streaming approach + match item_service.get_item_content_info_streaming(item_id).await { Ok((_, _, is_binary)) => is_binary, Err(e) => { warn!("Failed to get content info for binary check for item {}: {}", item_id, e); diff --git a/src/services/async_item_service.rs b/src/services/async_item_service.rs index 15cf1e3..0b3b991 100644 --- a/src/services/async_item_service.rs +++ b/src/services/async_item_service.rs @@ -145,51 +145,93 @@ impl AsyncItemService { let db = self.db.clone(); let item_service = self.item_service.clone(); - // Get item content - let content = tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - let item_with_content = item_service.get_item_content(&conn, item_id)?; - Ok::<_, CoreError>(item_with_content.content) - }) - .await - .unwrap()?; - // Use provided metadata to determine MIME type and binary status let mime_type = metadata .get("mime_type") .map(|s| s.to_string()) .unwrap_or_else(|| "application/octet-stream".to_string()); - let is_binary = if let Some(binary_val) = metadata.get("binary") { - binary_val == "true" - } else { - crate::common::is_binary::is_binary(&content) - }; - // Check if content is binary when allow_binary is false - if !allow_binary && is_binary { - return Err(CoreError::InvalidInput("Binary content not allowed".to_string())); + if !allow_binary { + let is_binary = if let Some(binary_val) = metadata.get("binary") { + binary_val == "true" + } else { + // Get binary status using streaming approach + let (_, _, is_binary) = self.get_item_content_info_streaming(item_id).await?; + is_binary + }; + + if is_binary { + return Err(CoreError::InvalidInput("Binary content not allowed".to_string())); + } } - // Create a stream that reads only the requested portion - let content_len = content.len() as u64; - - // Apply offset and length constraints - let start = std::cmp::min(offset, content_len); - let end = if length > 0 { - std::cmp::min(start + length, content_len) - } else { - content_len + // Get a streaming reader for the content + let (mut reader, content_len) = { + let db = self.db.clone(); + let item_service = self.item_service.clone(); + tokio::task::spawn_blocking(move || { + let conn = db.blocking_lock(); + let item_with_meta = item_service.get_item(&conn, item_id)?; + let item_id_val = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; + + let mut item_path = item_service.data_path.clone(); + item_path.push(item_id_val.to_string()); + + let reader = item_service.compression_service.stream_item_content( + item_path, + &item_with_meta.item.compression + )?; + + // Get content length from metadata + let content_len = item_with_meta.item.size.unwrap_or(0) as u64; + + Ok::<_, CoreError>((reader, content_len)) + }) + .await + .unwrap()? }; - let stream = if start < content_len { - let chunk = tokio_util::bytes::Bytes::from(content[start as usize..end as usize].to_vec()); - Box::pin(tokio_stream::iter(vec![Ok(chunk)])) + // Apply offset by reading and discarding bytes + if offset > 0 { + let mut remaining = offset; + let mut buf = [0; 8192]; + while remaining > 0 { + let to_read = std::cmp::min(remaining, buf.len() as u64); + let n = reader.read(&mut buf[..to_read as usize])?; + if n == 0 { + break; + } + remaining -= n as u64; + } + } + + // Create a stream that reads the content in chunks + let stream = tokio_stream::wrappers::ReaderStream::new(reader); + + // If length is specified, we need to limit the stream + let limited_stream = if length > 0 { + Box::pin(stream.take(length as usize)) } else { - Box::pin(tokio_stream::iter(vec![])) + Box::pin(stream) }; - Ok((stream, mime_type)) + Ok((limited_stream, mime_type)) + } + + pub async fn get_item_content_info_streaming( + &self, + item_id: i64, + ) -> Result<(Box, String, bool), CoreError> { + let db = self.db.clone(); + let item_service = self.item_service.clone(); + + tokio::task::spawn_blocking(move || { + let conn = db.blocking_lock(); + item_service.get_item_content_info_streaming(&conn, item_id) + }) + .await + .unwrap() } pub async fn find_item( diff --git a/src/services/compression_service.rs b/src/services/compression_service.rs index ac192db..ddfa328 100644 --- a/src/services/compression_service.rs +++ b/src/services/compression_service.rs @@ -24,6 +24,21 @@ impl CompressionService { reader.read_to_end(&mut content)?; Ok(content) } + + pub fn stream_item_content( + &self, + item_path: PathBuf, + compression: &str, + ) -> Result, CoreError> { + let compression_type = CompressionType::from_str(compression) + .map_err(|e| CoreError::Compression(e.to_string()))?; + let engine = get_compression_engine(compression_type) + .map_err(|e| CoreError::Other(anyhow!(e.to_string())))?; + + let reader = engine.open(item_path.clone()) + .map_err(|e| CoreError::Other(anyhow!("Failed to open item file {:?}: {}", item_path, e)))?; + Ok(reader) + } } impl Default for CompressionService { diff --git a/src/services/item_service.rs b/src/services/item_service.rs index d050d23..15c5cd6 100644 --- a/src/services/item_service.rs +++ b/src/services/item_service.rs @@ -85,6 +85,46 @@ impl ItemService { Ok((item_with_content.content, mime_type, is_binary)) } + pub fn get_item_content_info_streaming( + &self, + conn: &Connection, + id: i64, + ) -> Result<(Box, String, bool), CoreError> { + let item_with_meta = self.get_item(conn, id)?; + let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; + + if item_id <= 0 { + return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", item_id))); + } + + let mut item_path = self.data_path.clone(); + item_path.push(item_id.to_string()); + + let reader = self.compression_service.stream_item_content(item_path, &item_with_meta.item.compression)?; + + let metadata = item_with_meta.meta_as_map(); + let mime_type = metadata + .get("mime_type") + .map(|s| s.to_string()) + .unwrap_or_else(|| "application/octet-stream".to_string()); + + // Check if content is binary using only the first 8192 bytes + let is_binary = if let Some(binary_val) = metadata.get("binary") { + binary_val == "true" + } else { + // Read only the first 8192 bytes for binary detection + let mut sample_reader = self.compression_service.stream_item_content( + item_path.clone(), + &item_with_meta.item.compression + )?; + let mut sample_buffer = vec![0; 8192]; + let bytes_read = sample_reader.read(&mut sample_buffer)?; + crate::common::is_binary::is_binary(&sample_buffer[..bytes_read]) + }; + + Ok((reader, mime_type, is_binary)) + } + pub fn find_item(&self, conn: &Connection, ids: &[i64], tags: &[String], meta: &HashMap) -> Result { debug!("ITEM_SERVICE: Finding item with ids: {:?}, tags: {:?}, meta: {:?}", ids, tags, meta); let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) {