Schema changes: - Rename items.size to items.uncompressed_size for clarity - Add compressed_size (INTEGER NULL) - tracks compressed file size on disk - Add closed (BOOLEAN NOT NULL DEFAULT 1) - tracks whether item is fully written - Existing items default to closed=true via migration Lifecycle: - Items created with closed=false, set to true on successful save/import - Compressed size captured via fs::metadata() after compression writer closes - Truncated uploads (413) get compressed_size set, closed=true, uncompressed_size=None - Update command now backfills both uncompressed_size and compressed_size Also includes bug fixes and dedup from prior review: - Fix stream_raw_content_response using uncompressed_size for raw byte Content-Length - ApiResponse::ok()/empty() constructors, TryFrom<ItemWithMeta> for ItemInfo - tag_names() method on ItemWithMeta, meta_filter() on Settings - Fix .unwrap() panics in compression engine Read/Write impls - Fix TOCTOU race in stream_raw_content_response (now uses compressed_size) - Fix swallowed write errors in meta plugins (digest, magic_file, exec) - Fix term::stderr().unwrap() panic in item_service - Deduplicate ItemService::new() calls across 20 API handlers - ImportMeta supports #[serde(alias = "size")] for backward compat All 75 tests, 67 doc tests pass. Clippy clean.
183 lines
5.0 KiB
Rust
183 lines
5.0 KiB
Rust
use anyhow::{Context, Result, anyhow};
|
|
use log::*;
|
|
use std::fs::File;
|
|
use std::io::{Read, Write};
|
|
use std::path::PathBuf;
|
|
use std::process::{Child, Command, Stdio};
|
|
use which::which;
|
|
|
|
use crate::compression_engine::CompressionEngine;
|
|
|
|
pub struct ProgramReader {
|
|
process: Child,
|
|
stdout: Option<std::process::ChildStdout>,
|
|
}
|
|
|
|
impl Read for ProgramReader {
|
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
|
match self.stdout.as_mut() {
|
|
Some(stdout) => stdout.read(buf),
|
|
None => Err(std::io::Error::new(
|
|
std::io::ErrorKind::BrokenPipe,
|
|
"stdout already taken",
|
|
)),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for ProgramReader {
|
|
fn drop(&mut self) {
|
|
// Ensure the process is waited on to prevent zombie processes
|
|
let _ = self.process.wait();
|
|
}
|
|
}
|
|
|
|
pub struct ProgramWriter {
|
|
process: Child,
|
|
stdin: Option<std::process::ChildStdin>,
|
|
}
|
|
|
|
impl Write for ProgramWriter {
|
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
match self.stdin.as_mut() {
|
|
Some(stdin) => stdin.write(buf),
|
|
None => Err(std::io::Error::new(
|
|
std::io::ErrorKind::BrokenPipe,
|
|
"stdin already taken",
|
|
)),
|
|
}
|
|
}
|
|
|
|
fn flush(&mut self) -> std::io::Result<()> {
|
|
match self.stdin.as_mut() {
|
|
Some(stdin) => stdin.flush(),
|
|
None => Err(std::io::Error::new(
|
|
std::io::ErrorKind::BrokenPipe,
|
|
"stdin already taken",
|
|
)),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for ProgramWriter {
|
|
fn drop(&mut self) {
|
|
// Close stdin to signal EOF to the child process
|
|
drop(self.stdin.take());
|
|
// Ensure the process is waited on to prevent zombie processes
|
|
let _ = self.process.wait();
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Eq, PartialEq, Clone)]
|
|
pub struct CompressionEngineProgram {
|
|
pub program: String,
|
|
pub compress: Vec<String>,
|
|
pub decompress: Vec<String>,
|
|
pub supported: bool,
|
|
}
|
|
|
|
impl CompressionEngineProgram {
|
|
pub fn new(
|
|
program: &str,
|
|
compress: Vec<&str>,
|
|
decompress: Vec<&str>,
|
|
) -> CompressionEngineProgram {
|
|
let program_path = which(program);
|
|
let supported = program_path.is_ok();
|
|
|
|
CompressionEngineProgram {
|
|
program: program_path
|
|
.map_or_else(|_| program.to_string(), |p| p.to_string_lossy().to_string()),
|
|
compress: compress.iter().map(|s| s.to_string()).collect(),
|
|
decompress: decompress.iter().map(|s| s.to_string()).collect(),
|
|
supported,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl CompressionEngine for CompressionEngineProgram {
|
|
fn is_supported(&self) -> bool {
|
|
self.supported
|
|
}
|
|
|
|
fn is_internal(&self) -> bool {
|
|
false
|
|
}
|
|
|
|
fn get_status_info(&self) -> (String, String, String) {
|
|
(
|
|
self.program.clone(),
|
|
self.compress.join(" "),
|
|
self.decompress.join(" "),
|
|
)
|
|
}
|
|
|
|
fn open(&self, file_path: PathBuf) -> Result<Box<dyn Read + Send>> {
|
|
debug!("COMPRESSION: Opening {file_path:?} using {self:?}");
|
|
|
|
let program = self.program.clone();
|
|
let args = self.decompress.clone();
|
|
|
|
debug!("COMPRESSION: Executing command: {program:?} {args:?} reading from {file_path:?}");
|
|
|
|
let file = File::open(file_path).context("Unable to open file for reading")?;
|
|
|
|
let mut process = Command::new(program.clone())
|
|
.args(args.clone())
|
|
.stdin(file)
|
|
.stdout(Stdio::piped())
|
|
.spawn()
|
|
.context(anyhow!(
|
|
"Unable to spawn child process: {:?} {:?}",
|
|
program,
|
|
args
|
|
))?;
|
|
|
|
let stdout = process
|
|
.stdout
|
|
.take()
|
|
.ok_or_else(|| anyhow!("Failed to capture stdout from child process"))?;
|
|
|
|
Ok(Box::new(ProgramReader {
|
|
process,
|
|
stdout: Some(stdout),
|
|
}))
|
|
}
|
|
|
|
fn create(&self, file_path: PathBuf) -> Result<Box<dyn Write>> {
|
|
debug!("COMPRESSION: Writing to {file_path:?} using {self:?}");
|
|
|
|
let program = self.program.clone();
|
|
let args = self.compress.clone();
|
|
|
|
debug!("COMPRESSION: Executing command: {program:?} {args:?} writing to {file_path:?}");
|
|
|
|
let file = File::create(file_path).context("Unable to open file for writing")?;
|
|
|
|
let mut process = Command::new(program.clone())
|
|
.args(args.clone())
|
|
.stdin(Stdio::piped())
|
|
.stdout(file)
|
|
.spawn()
|
|
.context(anyhow!(
|
|
"Problem spawning child process: {:?} {:?}",
|
|
program,
|
|
args
|
|
))?;
|
|
|
|
let stdin = process
|
|
.stdin
|
|
.take()
|
|
.ok_or_else(|| anyhow!("Failed to capture stdin from child process"))?;
|
|
|
|
Ok(Box::new(ProgramWriter {
|
|
process,
|
|
stdin: Some(stdin),
|
|
}))
|
|
}
|
|
|
|
fn clone_box(&self) -> Box<dyn CompressionEngine> {
|
|
Box::new(self.clone())
|
|
}
|
|
}
|