use anyhow::{anyhow, Result}; use clap::Command; use log::debug; use std::io::{Read, Write, IsTerminal}; // Import the missing functions from common module use crate::modes::common::{settings_digest_type, settings_compression_type, settings_meta_plugin_types}; use crate::config; fn validate_save_args(cmd: &mut Command, ids: &Vec) { if !ids.is_empty() { cmd.error( clap::error::ErrorKind::InvalidValue, "ID given, you cannot supply IDs when using --save", ) .exit(); } } fn initialize_tags(tags: &mut Vec) { if tags.is_empty() { tags.push("none".to_string()); } } fn setup_compression_and_plugins( cmd: &mut Command, settings: &config::Settings, ) -> (crate::compression_engine::CompressionType, Box, Vec>) { let digest_type = settings_digest_type(cmd, settings); debug!("MAIN: Digest type: {:?}", digest_type); let compression_type = settings_compression_type(cmd, settings); debug!("MAIN: Compression type: {:?}", compression_type); let compression_engine = crate::compression_engine::get_compression_engine(compression_type.clone()).expect("Unable to get compression engine"); // Start with meta plugin types from settings let mut meta_plugin_types: Vec = settings_meta_plugin_types(cmd, settings); debug!("MAIN: Meta plugin types: {:?}", meta_plugin_types); // Convert digest type to meta plugin type and add to the list if needed let digest_meta_plugin_type = match digest_type { crate::meta_plugin::MetaPluginType::DigestSha256 => Some(crate::meta_plugin::MetaPluginType::DigestSha256), crate::meta_plugin::MetaPluginType::DigestMd5 => Some(crate::meta_plugin::MetaPluginType::DigestMd5), _ => None, }; // Add digest meta plugin to the list if needed if let Some(digest_plugin_type) = digest_meta_plugin_type { if !meta_plugin_types.contains(&digest_plugin_type) { meta_plugin_types.push(digest_plugin_type); } } // Initialize meta_plugins with MetaPlugin instances for each MetaPluginType let mut meta_plugins: Vec> = meta_plugin_types .iter() .map(|meta_plugin_type| crate::meta_plugin::get_meta_plugin(meta_plugin_type.clone())) .collect(); // Check for unsupported meta plugins, warn the user, and remove them from the list let mut i = 0; meta_plugins.retain(|meta_plugin| { let is_supported = meta_plugin.is_supported(); if !is_supported { // We need to get the meta name for the warning message // Since we can't mutably borrow meta_plugin here, we create a temporary one let meta_plugin_type = meta_plugin_types[i].clone(); let mut temp_plugin = crate::meta_plugin::get_meta_plugin(meta_plugin_type); eprintln!("Warning: Meta plugin '{}' is enabled but not supported on this system", temp_plugin.meta_name()); } i += 1; is_supported }); (compression_type, compression_engine, meta_plugins) } fn create_and_log_item( conn: &mut rusqlite::Connection, settings: &config::Settings, tags: &Vec, compression_type: &crate::compression_engine::CompressionType, ) -> Result { let mut item = crate::db::Item { id: None, ts: chrono::Utc::now(), size: None, compression: compression_type.to_string(), }; let id = crate::db::insert_item(conn, item.clone())?; item.id = Some(id); debug!("MAIN: Added item {:?}", item.clone()); if !settings.quiet { if std::io::stderr().is_terminal() { let mut t = term::stderr().unwrap(); t.reset().unwrap_or(()); t.attr(term::Attr::Bold).unwrap_or(()); write!(t, "KEEP:").unwrap_or(()); t.reset().unwrap_or(()); write!(t, " New item ").unwrap_or(()); t.attr(term::Attr::Bold).unwrap_or(()); write!(t, "{id}")?; t.reset().unwrap_or(()); write!(t, " tags: ")?; t.attr(term::Attr::Bold).unwrap_or(()); write!(t, "{}", tags.join(" "))?; t.reset().unwrap_or(()); writeln!(t)?; std::io::stderr().flush()?; } else { let mut t = std::io::stderr(); writeln!(t, "KEEP: New item: {} tags: {:?}", id, tags)?; } } Ok(item) } fn setup_item_metadata( conn: &mut rusqlite::Connection, _settings: &config::Settings, item: &crate::db::Item, tags: &Vec, ) -> Result<(), anyhow::Error> { crate::db::set_item_tags(conn, item.clone(), tags)?; Ok(()) } fn collect_item_meta(settings: &config::Settings) -> std::collections::HashMap { let mut item_meta: std::collections::HashMap = crate::modes::common::get_meta_from_env(); if let Ok(hostname) = gethostname::gethostname().into_string() { if !item_meta.contains_key("hostname") { item_meta.insert("hostname".to_string(), hostname); } } for item in settings.meta.iter() { let item = item.clone(); item_meta.insert(item.key, item.value); } item_meta } fn process_input_stream( compression_engine: &Box, data_path: &std::path::PathBuf, item_id: i64, meta_plugins: &mut Vec>, ) -> Result<(Box, crate::db::Item), anyhow::Error> { let mut item = crate::db::Item { id: Some(item_id), ts: chrono::Utc::now(), size: None, compression: String::new(), // Will be set later }; let mut item_path = data_path.clone(); item_path.push(item_id.to_string()); let mut stdin = std::io::stdin().lock(); let mut stdout = std::io::stdout().lock(); let mut buffer = [0; libc::BUFSIZ as usize]; let mut item_out: Box = compression_engine .create(item_path.clone()) .map_err(|e| anyhow!("Unable to write file {:?}: {}", item_path, e))?; debug!("MAIN: Starting IO loop"); loop { let n = stdin.read(&mut buffer[..libc::BUFSIZ as usize])?; item.size = match item.size { None => Some(n as i64), Some(prev_n) => Some(prev_n + n as i64), }; if n == 0 { debug!("MAIN: EOF on STDIN"); break; } debug!("MAIN: Loop - {:?} bytes", item.size); stdout.write_all(&buffer[..n])?; item_out.write_all(&buffer[..n])?; for meta_plugin in meta_plugins.iter_mut() { meta_plugin.update(&buffer[..n]); } } debug!("MAIN: Ending IO loop after {:?} bytes", item.size); stdout.flush()?; item_out.flush()?; Ok((item_out, item)) } fn finalize_meta_plugins( conn: &rusqlite::Connection, meta_plugins: &mut Vec>, item: &crate::db::Item, ) -> Result<(), anyhow::Error> { for meta_plugin in meta_plugins.iter_mut() { let meta_name = meta_plugin.meta_name(); match meta_plugin.finalize() { Ok(meta_value) => { let meta = crate::db::Meta { id: item.id.ok_or_else(|| anyhow!("Item missing ID"))?, name: meta_name.clone(), value: meta_value, }; if let Err(e) = crate::db::store_meta(conn, meta) { eprintln!("Warning: Failed to store meta value for {}: {}", meta_name, e); } } Err(e) => { eprintln!("Warning: Failed to finalize meta plugin {}: {}", meta_name, e); } } } Ok(()) } pub fn mode_save( cmd: &mut Command, settings: &config::Settings, _config: &config::Settings, ids: &mut Vec, tags: &mut Vec, conn: &mut rusqlite::Connection, data_path: std::path::PathBuf, ) -> Result<(), anyhow::Error> { validate_save_args(cmd, ids); initialize_tags(tags); let (compression_type, compression_engine, mut meta_plugins) = setup_compression_and_plugins(cmd, settings); let mut item = create_and_log_item(conn, settings, tags, &compression_type)?; setup_item_metadata(conn, settings, &item, tags)?; // Pass mutable reference // Save as much as possible in case something breaks - don't use transactions // This allows partial saves to succeed even if some metadata operations fail let item_meta = collect_item_meta(settings); let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?; for kv in item_meta.iter() { let meta = crate::db::Meta { id: item_id, name: kv.0.to_string(), value: kv.1.to_string(), }; crate::db::store_meta(conn, meta)?; } let (_item_out, processed_item) = process_input_stream( &compression_engine, &data_path, item_id, &mut meta_plugins, )?; item.size = processed_item.size; item.compression = compression_type.to_string(); finalize_meta_plugins(conn, &mut meta_plugins, &item)?; crate::db::update_item(conn, item.clone())?; Ok(()) }