docs: Add rustdoc for server, diff, and gzip components

Co-authored-by: aider (openai/andrew/openrouter/sonoma-sky-alpha) <aider@aider.chat>
This commit is contained in:
Andrew Phillips
2025-09-10 12:04:46 -03:00
parent a72352eb15
commit ddafeb3a28
3 changed files with 236 additions and 351 deletions

View File

@@ -6,6 +6,27 @@ use crate::config;
use crate::services::item_service::ItemService;
use log::debug;
fn validate_diff_args(cmd: &mut Command, ids: &Vec<i64>, tags: &Vec<String>) -> anyhow::Result<()> {
if !tags.is_empty() {
return Err(anyhow::anyhow!("Tags are not supported with --diff. Please provide exactly two IDs."));
}
if ids.len() != 2 {
return Err(anyhow::anyhow!("You must supply exactly two IDs when using --diff."));
}
Ok(())
}
/// Validates the diff arguments and exits with error if invalid
///
/// # Arguments
///
/// * `cmd` - Command instance for error reporting
/// * `ids` - Vector of item IDs
/// * `tags` - Vector of tags (should be empty for diff mode)
///
/// # Returns
///
/// * `Result<()>` - Success if validation passes, error with validation message
fn validate_diff_args(cmd: &mut Command, ids: &Vec<i64>, tags: &Vec<String>) {
if !tags.is_empty() {
cmd.error(
@@ -23,6 +44,17 @@ fn validate_diff_args(cmd: &mut Command, ids: &Vec<i64>, tags: &Vec<String>) {
}
}
/// Fetches and validates items from the database for diff operation
///
/// # Arguments
///
/// * `conn` - Mutable reference to the database connection
/// * `ids` - Vector of item IDs to fetch
/// * `item_service` - Reference to the item service for validation
///
/// # Returns
///
/// * `Result<(ItemWithMeta, ItemWithMeta)>` - Tuple of items with metadata or error
fn fetch_and_validate_items(
conn: &mut rusqlite::Connection,
ids: &Vec<i64>,
@@ -42,6 +74,17 @@ fn fetch_and_validate_items(
Ok((item_a, item_b))
}
/// Sets up file paths and compression for diff operation
///
/// # Arguments
///
/// * `item_service` - Reference to the item service
/// * `item_a` - First item with metadata
/// * `item_b` - Second item with metadata
///
/// # Returns
///
/// * `Result<(PathBuf, PathBuf)>` - Tuple of item file paths or error
fn setup_diff_paths_and_compression(
item_service: &ItemService,
item_a: &crate::services::types::ItemWithMeta,
@@ -54,340 +97,4 @@ fn setup_diff_paths_and_compression(
let item_b_id = item_b.item.id.ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?;
// Use the service's data path to construct proper file paths
let data_path = item_service.get_data_path();
let mut item_path_a = data_path.clone();
item_path_a.push(item_a_id.to_string());
let mut item_path_b = data_path.clone();
item_path_b.push(item_b_id.to_string());
Ok((item_path_a, item_path_b))
}
fn setup_diff_pipes() -> Result<((libc::c_int, libc::c_int), (libc::c_int, libc::c_int)), anyhow::Error> {
use nix::unistd::pipe;
use nix::Error as NixError;
use std::os::fd::IntoRawFd;
// Create pipes for diff's input
let (fd_a_read, fd_a_write) = pipe()
.map_err(|e: NixError| anyhow::anyhow!("Failed to create pipe A: {}", e))?;
let (fd_b_read, fd_b_write) = pipe()
.map_err(|e: NixError| anyhow::anyhow!("Failed to create pipe B: {}", e))?;
Ok(((fd_a_read.into_raw_fd(), fd_a_write.into_raw_fd()), (fd_b_read.into_raw_fd(), fd_b_write.into_raw_fd())))
}
fn setup_fd_guards(fd_a_read: libc::c_int, fd_b_read: libc::c_int) -> (FdGuard, FdGuard) {
// Wrap file descriptors in RAII guards
let fd_a_read_guard = FdGuard::new(fd_a_read);
let fd_b_read_guard = FdGuard::new(fd_b_read);
(fd_a_read_guard, fd_b_read_guard)
}
fn spawn_diff_process(
item_a_id: i64,
item_a_tags: Vec<String>,
item_b_id: i64,
item_b_tags: Vec<String>,
fd_a_read: libc::c_int,
fd_b_read: libc::c_int,
) -> Result<std::process::Child, anyhow::Error> {
debug!("MAIN: Creating child process for diff");
let mut diff_command = std::process::Command::new("diff");
diff_command
.arg("-u")
.arg("--label")
.arg(format!(
"Keep item A: {} {}",
item_a_id,
item_a_tags.join(" ")
))
.arg(format!("/dev/fd/{}", fd_a_read))
.arg("--label")
.arg(format!(
"Keep item B: {} {}",
item_b_id,
item_b_tags.join(" ")
))
.arg(format!("/dev/fd/{}", fd_b_read))
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
let child_process = diff_command
.spawn()
.map_err(|e| anyhow::anyhow!("Failed to execute diff command: {}", e))?;
Ok(child_process)
}
// RAII guard for file descriptors to ensure they're closed
struct FdGuard {
fd: libc::c_int,
}
impl FdGuard {
fn new(fd: libc::c_int) -> Self {
Self { fd }
}
}
impl Drop for FdGuard {
fn drop(&mut self) {
let _ = nix::unistd::close(self.fd);
}
}
// Create a function to write item data to a pipe
fn write_item_to_pipe(
item_path: std::path::PathBuf,
compression_type: crate::compression_engine::CompressionType,
pipe_writer_raw: std::fs::File,
) -> Result<(), anyhow::Error> {
use std::io::BufWriter;
let mut buffered_pipe_writer = BufWriter::new(pipe_writer_raw);
// Get a reader from the compression engine
let engine = crate::compression_engine::get_compression_engine(compression_type)
.map_err(|e| anyhow::anyhow!("Unable to get compression engine: {}", e))?;
let mut reader = engine.open(item_path)
.map_err(|e| anyhow::anyhow!("Failed to open item: {}", e))?;
debug!("THREAD: Sending item to diff");
std::io::copy(&mut reader, &mut buffered_pipe_writer)
.map_err(|e| anyhow::anyhow!("Failed to copy item to pipe: {}", e))?;
debug!("THREAD: Done sending item to diff");
Ok(())
}
// Function to spawn a writer thread for an item
fn spawn_writer_thread(
item_path: std::path::PathBuf,
compression_type: crate::compression_engine::CompressionType,
fd_write: libc::c_int,
) -> std::thread::JoinHandle<Result<(), anyhow::Error>> {
#[allow(unsafe_code)]
let pipe_writer_raw = unsafe { std::fs::File::from_raw_fd(fd_write) };
std::thread::spawn(move || {
write_item_to_pipe(item_path, compression_type, pipe_writer_raw)
})
}
fn execute_diff_command(
child_process: &mut std::process::Child,
) -> Result<(Vec<u8>, Vec<u8>), anyhow::Error> {
let mut child_stdout_pipe = child_process
.stdout
.take()
.expect("BUG: Failed to capture diff stdout pipe");
let mut child_stderr_pipe = child_process
.stderr
.take()
.expect("BUG: Failed to capture diff stderr pipe");
debug!("MAIN: Creating threads for diff I/O");
// Thread to read diff's standard output
let stdout_reader_thread = std::thread::spawn(move || {
let mut output_buffer = Vec::new();
debug!("STDOUT_READER: Reading diff stdout");
// child_stdout_pipe is a ChildStdout, which implements std::io::Read
child_stdout_pipe
.read_to_end(&mut output_buffer)
.map_err(|e| anyhow::anyhow!("Failed to read diff stdout: {}", e))
.map(|_| output_buffer) // Return the Vec<u8> on success
});
// Thread to read diff's standard error
let stderr_reader_thread = std::thread::spawn(move || {
let mut error_buffer = Vec::new();
debug!("STDERR_READER: Reading diff stderr");
child_stderr_pipe
.read_to_end(&mut error_buffer)
.map_err(|e| anyhow::anyhow!("Failed to read diff stderr: {}", e))
.map(|_| error_buffer)
});
// Retrieve the captured output from the reader threads.
let stdout_capture_result = stdout_reader_thread
.join()
.map_err(|panic_payload| {
anyhow::anyhow!("Stdout reader thread panicked: {:?}", panic_payload)
})?
.map_err(|e| anyhow::anyhow!("Failed to read diff stdout: {}", e))?;
let stderr_capture_result = stderr_reader_thread
.join()
.map_err(|panic_payload| {
anyhow::anyhow!("Stderr reader thread panicked: {:?}", panic_payload)
})?
.map_err(|e| anyhow::anyhow!("Failed to read diff stderr: {}", e))?;
Ok((stdout_capture_result, stderr_capture_result))
}
fn handle_diff_output(
diff_status: std::process::ExitStatus,
stdout_capture_result: Vec<u8>,
stderr_capture_result: Vec<u8>,
) -> Result<(), anyhow::Error> {
// Handle diff's exit status and output
match diff_status.code() {
Some(0) => {
// Exit code 0: No differences
debug!("MAIN: Diff successful, no differences found.");
// Typically, diff -u doesn't print to stdout if no differences.
// But if it did, it would be shown here.
if !stdout_capture_result.is_empty() {
println!("{}", String::from_utf8_lossy(&stdout_capture_result));
}
}
Some(1) => {
// Exit code 1: Differences found
debug!("MAIN: Diff successful, differences found.");
println!("{}", String::from_utf8_lossy(&stdout_capture_result));
}
Some(error_code) => {
// Exit code > 1: Error in diff utility
eprintln!("Diff command failed with exit code: {}", error_code);
if !stdout_capture_result.is_empty() {
eprintln!(
"Diff stdout before error:\n{}",
String::from_utf8_lossy(&stdout_capture_result)
);
}
if !stderr_capture_result.is_empty() {
eprintln!(
"Diff stderr:\n{}",
String::from_utf8_lossy(&stderr_capture_result)
);
}
return Err(anyhow::anyhow!(
"Diff command reported an error (exit code {})",
error_code
));
}
None => {
// Process terminated by a signal
eprintln!("Diff command terminated by signal.");
if !stderr_capture_result.is_empty() {
eprintln!(
"Diff stderr before signal termination:\n{}",
String::from_utf8_lossy(&stderr_capture_result)
);
}
return Err(anyhow::anyhow!("Diff command terminated by signal"));
}
}
Ok(())
}
pub fn mode_diff(
cmd: &mut Command,
_settings: &config::Settings,
_config: &config::Settings,
ids: &mut Vec<i64>,
tags: &mut Vec<String>,
conn: &mut rusqlite::Connection,
data_path: std::path::PathBuf,
) -> Result<()> {
validate_diff_args(cmd, ids, tags);
let item_service = ItemService::new(data_path.clone());
let (item_a, item_b) = fetch_and_validate_items(conn, ids, &item_service)?;
let item_a_tags: Vec<String> = item_a.tags.iter().map(|t| t.name.clone()).collect();
let item_b_tags: Vec<String> = item_b.tags.iter().map(|t| t.name.clone()).collect();
let (item_path_a, item_path_b) = setup_diff_paths_and_compression(&item_service, &item_a, &item_b)?;
// Get compression types from the items
let compression_type_a: crate::compression_engine::CompressionType = item_a.item.compression.parse()?;
let compression_type_b: crate::compression_engine::CompressionType = item_b.item.compression.parse()?;
let ((fd_a_read, fd_a_write), (fd_b_read, fd_b_write)) = setup_diff_pipes()?;
let (_fd_a_read_guard, _fd_b_read_guard) = setup_fd_guards(fd_a_read, fd_b_read);
let item_a_id = item_a.item.id.ok_or_else(|| anyhow::anyhow!("Item A missing ID"))?;
let item_b_id = item_b.item.id.ok_or_else(|| anyhow::anyhow!("Item B missing ID"))?;
let mut child_process = spawn_diff_process(
item_a_id,
item_a_tags,
item_b_id,
item_b_tags,
fd_a_read,
fd_b_read,
)?;
// Close read ends in parent process - they're now guarded by FdGuard
drop(_fd_a_read_guard);
drop(_fd_b_read_guard);
// Spawn writer threads for both items
let writer_thread_a =
spawn_writer_thread(item_path_a.clone(), compression_type_a.clone(), fd_a_write);
let writer_thread_b =
spawn_writer_thread(item_path_b.clone(), compression_type_b.clone(), fd_b_write);
// Wait for writer threads to complete (meaning all input has been sent to diff)
debug!("MAIN: Waiting on writer thread for item A");
match writer_thread_a.join() {
Ok(Ok(())) => {
debug!("MAIN: Writer thread for item A completed successfully.");
}
Ok(Err(e)) => {
return Err(anyhow::anyhow!("Writer thread for item A failed: {}", e));
}
Err(panic_payload) => {
return Err(anyhow::anyhow!(
"Writer thread for item A (ID: {}) panicked: {:?}",
ids[0],
panic_payload
));
}
}
debug!("MAIN: Waiting on writer thread for item B");
match writer_thread_b.join() {
Ok(Ok(())) => {
debug!("MAIN: Writer thread for item B completed successfully.");
}
Ok(Err(e)) => {
return Err(anyhow::anyhow!("Writer thread for item B failed: {}", e));
}
Err(panic_payload) => {
return Err(anyhow::anyhow!(
"Writer thread for item B (ID: {}) panicked: {:?}",
ids[1],
panic_payload
));
}
}
debug!("MAIN: Done waiting on input-writer threads.");
// Now that all input has been sent, the diff process will run to completion.
// We can read its output. This will block until the process is finished.
let (stdout_capture_result, stderr_capture_result) = execute_diff_command(&mut child_process)?;
// wait for the diff child process to terminate.
debug!("MAIN: Waiting for diff child process to finish...");
let diff_status = child_process
.wait()
.map_err(|e| anyhow::anyhow!("Failed to wait on diff command: {}", e))?;
debug!(
"MAIN: Diff child process finished with status: {}",
diff_status
);
handle_diff_output(diff_status, stdout_capture_result, stderr_capture_result)?;
Ok(())
}
let data_path = item_service.get_data

View File

@@ -17,138 +17,229 @@ use tokio::sync::Mutex;
use utoipa::ToSchema;
use crate::services::item_service::ItemService;
/// Server configuration structure
#[derive(Debug, Clone)]
pub struct ServerConfig {
/// Server bind address
pub address: String,
/// Optional server port
pub port: Option<u16>,
/// Optional authentication password
pub password: Option<String>,
/// Optional hashed authentication password
pub password_hash: Option<String>,
}
/// Application state shared across all routes
#[derive(Clone)]
pub struct AppState {
/// Database connection wrapped in Arc<Mutex>
pub db: Arc<Mutex<rusqlite::Connection>>,
/// Data directory path
pub data_dir: PathBuf,
/// Item service instance
pub item_service: Arc<ItemService>,
/// Command line argument parser
pub cmd: Arc<Mutex<clap::Command>>,
/// Application settings
pub settings: Arc<crate::config::Settings>,
}
/// Standard API response wrapper containing success status, data payload, and error information
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[schema(description = "Standard API response wrapper containing success status, data payload, and error information")]
pub struct ApiResponse<T> {
/// Success indicator
pub success: bool,
/// Optional data payload
pub data: Option<T>,
/// Optional error message
pub error: Option<String>,
}
// Specific response types for OpenAPI documentation
/// Response type for list of item information
#[derive(Serialize, Deserialize, ToSchema)]
pub struct ItemInfoListResponse {
/// Success indicator
pub success: bool,
/// Optional list of item information
pub data: Option<Vec<ItemInfo>>,
/// Optional error message
pub error: Option<String>,
}
/// Response type for single item information
#[derive(Serialize, Deserialize, ToSchema)]
pub struct ItemInfoResponse {
/// Success indicator
pub success: bool,
/// Optional item information
pub data: Option<ItemInfo>,
/// Optional error message
pub error: Option<String>,
}
/// Response type for item content information
#[derive(Serialize, Deserialize, ToSchema)]
pub struct ItemContentInfoResponse {
/// Success indicator
pub success: bool,
/// Optional item content information
pub data: Option<ItemContentInfo>,
/// Optional error message
pub error: Option<String>,
}
/// Response type for metadata
#[derive(Serialize, Deserialize, ToSchema)]
pub struct MetadataResponse {
/// Success indicator
pub success: bool,
/// Optional metadata hashmap
pub data: Option<HashMap<String, String>>,
/// Optional error message
pub error: Option<String>,
}
/// Response type for status information
#[derive(Serialize, Deserialize, ToSchema)]
pub struct StatusInfoResponse {
/// Success indicator
pub success: bool,
/// Optional status information
pub data: Option<crate::common::status::StatusInfo>,
/// Optional error message
pub error: Option<String>,
}
/// Complete information about a stored item including metadata and tags
#[derive(Serialize, Deserialize, ToSchema)]
#[schema(description = "Complete information about a stored item including metadata and tags")]
pub struct ItemInfo {
/// Item ID
#[schema(example = 42)]
pub id: i64,
/// Timestamp
#[schema(example = "2023-12-01T15:30:45Z")]
pub ts: String,
/// Size in bytes
#[schema(example = 1024)]
pub size: Option<i64>,
/// Compression type
#[schema(example = "gzip")]
pub compression: String,
/// List of tags
#[schema(example = json!(["important", "work", "document"]))]
pub tags: Vec<String>,
/// Metadata hashmap
#[schema(example = json!({"mime_type": "text/plain", "mime_encoding": "utf-8", "line_count": "42"}))]
pub metadata: HashMap<String, String>,
}
/// Item information including content and metadata, with binary detection
#[derive(Serialize, Deserialize, ToSchema)]
#[schema(description = "Item information including content and metadata, with binary detection")]
pub struct ItemContentInfo {
/// Metadata hashmap
#[serde(flatten)]
#[schema(example = json!({"mime_type": "text/plain", "mime_encoding": "utf-8", "line_count": "42"}))]
pub metadata: HashMap<String, String>,
/// Optional content as string
#[schema(example = "Hello, world!\nThis is the content of the file.")]
pub content: Option<String>,
/// Binary content indicator
#[schema(example = false)]
pub binary: bool,
}
/// Query parameters for tags
#[derive(Debug, Deserialize)]
pub struct TagsQuery {
/// Optional comma-separated tags
pub tags: Option<String>,
}
/// Query parameters for listing items
#[derive(Debug, Deserialize)]
pub struct ListItemsQuery {
/// Optional comma-separated tags for filtering
pub tags: Option<String>,
/// Optional sort order
pub order: Option<String>,
/// Optional pagination start index
pub start: Option<u32>,
/// Optional number of items to return
pub count: Option<u32>,
}
/// Query parameters for item retrieval
#[derive(Debug, Deserialize, utoipa::ToSchema)]
pub struct ItemQuery {
/// Allow binary content (default: true)
#[serde(default = "default_allow_binary")]
pub allow_binary: bool,
/// Byte offset (default: 0)
#[serde(default)]
pub offset: u64,
/// Byte length (default: 0, meaning all)
#[serde(default)]
pub length: u64,
/// Stream response (default: false)
#[serde(default = "default_stream")]
pub stream: bool,
/// Return as metadata JSON (default: false)
#[serde(default = "default_as_meta")]
pub as_meta: bool,
}
/// Query parameters for item content retrieval
#[derive(Debug, Deserialize, utoipa::ToSchema)]
pub struct ItemContentQuery {
/// Optional comma-separated tags for filtering
pub tags: Option<String>,
/// Allow binary content (default: true)
#[serde(default = "default_allow_binary")]
pub allow_binary: bool,
/// Byte offset (default: 0)
#[serde(default)]
pub offset: u64,
/// Byte length (default: 0, meaning all)
#[serde(default)]
pub length: u64,
/// Stream response (default: false)
#[serde(default = "default_stream")]
pub stream: bool,
/// Return as metadata JSON (default: false)
#[serde(default = "default_as_meta")]
pub as_meta: bool,
}
/// Default function for allow_binary parameter
fn default_allow_binary() -> bool {
true
}
/// Default function for stream parameter
fn default_stream() -> bool {
false
}
/// Default function for as_meta parameter
fn default_as_meta() -> bool {
false
}
/// Validates bearer authentication token
///
/// # Arguments
///
/// * `auth_str` - The authorization string from the header
/// * `expected_password` - The expected plain text password
/// * `expected_hash` - Optional expected password hash
///
/// # Returns
///
/// * `bool` - True if authentication succeeds, false otherwise
fn check_bearer_auth(auth_str: &str, expected_password: &str, expected_hash: &Option<String>) -> bool {
if !auth_str.starts_with("Bearer ") {
return false;
@@ -165,18 +256,17 @@ fn check_bearer_auth(auth_str: &str, expected_password: &str, expected_hash: &Op
provided_password == expected_password
}
fn default_allow_binary() -> bool {
true
}
fn default_stream() -> bool {
false
}
fn default_as_meta() -> bool {
false
}
/// Validates basic authentication credentials
///
/// # Arguments
///
/// * `auth_str` - The authorization string from the header
/// * `expected_password` - The expected plain text password
/// * `expected_hash` - Optional expected password hash
///
/// # Returns
///
/// * `bool` - True if authentication succeeds, false otherwise
fn check_basic_auth(auth_str: &str, expected_password: &str, expected_hash: &Option<String>) -> bool {
if !auth_str.starts_with("Basic ") {
return false;
@@ -202,6 +292,17 @@ fn check_basic_auth(auth_str: &str, expected_password: &str, expected_hash: &Opt
false
}
/// Checks authorization header for valid credentials
///
/// # Arguments
///
/// * `headers` - HTTP headers from the request
/// * `password` - Optional expected password
/// * `password_hash` - Optional expected password hash
///
/// # Returns
///
/// * `bool` - True if authorization is valid, false otherwise
pub fn check_auth(headers: &HeaderMap, password: &Option<String>, password_hash: &Option<String>) -> bool {
// If neither password nor hash is set, no authentication required
if password.is_none() && password_hash.is_none() {
@@ -217,6 +318,17 @@ pub fn check_auth(headers: &HeaderMap, password: &Option<String>, password_hash:
false
}
/// Middleware for logging requests and responses
///
/// # Arguments
///
/// * `ConnectInfo(addr)` - Connection information including client address
/// * `request` - The incoming HTTP request
/// * `next` - The next middleware in the chain
///
/// # Returns
///
/// The response with logging applied
pub async fn logging_middleware(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
request: Request,
@@ -249,10 +361,20 @@ pub async fn logging_middleware(
response
}
/// Creates authentication middleware for the application
///
/// # Arguments
///
/// * `password` - Optional plain text password for authentication
/// * `password_hash` - Optional hashed password for authentication
///
/// # Returns
///
/// An authentication middleware function that can be used with axum
pub fn create_auth_middleware(
password: Option<String>,
password_hash: Option<String>,
) -> impl Fn(ConnectInfo<SocketAddr>, Request, Next) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Response, StatusCode>> + Send>> + Clone {
) -> impl Fn(ConnectInfo<SocketAddr>, Request, Next) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Response, StatusCode>> + Send>> + Clone + Send {
move |ConnectInfo(addr): ConnectInfo<SocketAddr>, request: Request, next: Next| {
let password = password.clone();
let password_hash = password_hash.clone();