diff --git a/src/services/async_item_service.rs b/src/services/async_item_service.rs index 425422b..80df314 100644 --- a/src/services/async_item_service.rs +++ b/src/services/async_item_service.rs @@ -168,7 +168,7 @@ impl AsyncItemService { } // Get a streaming reader for the content - let mut reader = { + let reader = { let _db = self.db.clone(); let _item_service = self.item_service.clone(); tokio::task::spawn_blocking(move || { @@ -190,20 +190,6 @@ impl AsyncItemService { .unwrap()? }; - // Apply offset by reading and discarding bytes - if offset > 0 { - let mut remaining = offset; - let mut buf = [0; 8192]; - while remaining > 0 { - let to_read = std::cmp::min(remaining, buf.len() as u64); - let n = reader.read(&mut buf[..to_read as usize])?; - if n == 0 { - break; - } - remaining -= n as u64; - } - } - // Convert the reader into an async stream manually // Since ReaderStream requires AsyncRead, we'll create our own implementation use tokio_stream::StreamExt; @@ -214,9 +200,42 @@ impl AsyncItemService { // Spawn a blocking task to read from the reader and send chunks tokio::task::spawn_blocking(move || { + let mut reader = reader; + // Apply offset by reading and discarding bytes + if offset > 0 { + let mut remaining = offset; + let mut buf = [0; 8192]; + while remaining > 0 { + let to_read = std::cmp::min(remaining, buf.len() as u64); + match reader.read(&mut buf[..to_read as usize]) { + Ok(0) => break, // EOF reached before offset + Ok(n) => remaining -= n as u64, + Err(e) => { + let _ = tx.blocking_send(Err(e)); + return; + } + } + } + } + + // Read and send data up to the specified length + let mut remaining_length = length; let mut buffer = [0; 8192]; + loop { - match reader.read(&mut buffer) { + // Determine how much to read in this iteration + let to_read = if length > 0 { + // If length is specified, don't read more than remaining_length + std::cmp::min(remaining_length, buffer.len() as u64) as usize + } else { + buffer.len() + }; + + if to_read == 0 { + break; // We've read the requested length + } + + match reader.read(&mut buffer[..to_read]) { Ok(0) => break, // EOF Ok(n) => { let chunk = Bytes::copy_from_slice(&buffer[..n]); @@ -224,6 +243,12 @@ impl AsyncItemService { if tx.blocking_send(Ok(chunk)).is_err() { break; // Receiver dropped } + if length > 0 { + remaining_length -= n as u64; + if remaining_length == 0 { + break; // Reached the requested length + } + } } Err(e) => { let _ = tx.blocking_send(Err(e)); @@ -236,27 +261,7 @@ impl AsyncItemService { // Convert the receiver into a stream let stream = tokio_stream::wrappers::ReceiverStream::new(rx); - // If length is specified, we need to limit the stream - // Use a trait object to ensure both branches have the same type - let limited_stream: std::pin::Pin> + Send>> = - if length > 0 { - // We need to track how many bytes we've sent - let mut bytes_sent = 0; - let limited = stream.take_while(move |result| { - if bytes_sent >= length { - return false; - } - if let Ok(chunk) = result { - bytes_sent += chunk.len() as u64; - } - true - }); - Box::pin(limited) - } else { - Box::pin(stream) - }; - - Ok((limited_stream, mime_type)) + Ok((Box::pin(stream), mime_type)) } pub async fn get_item_content_info_streaming(