feat: implement core services and refactor modes

Co-authored-by: aider (openai/andrew/openrouter/google/gemini-2.5-pro) <aider@aider.chat>
This commit is contained in:
Andrew Phillips
2025-08-24 23:56:06 -03:00
parent 437a05e5d6
commit 7ec0603e00
11 changed files with 659 additions and 466 deletions

View File

@@ -1,11 +1,9 @@
use anyhow::{anyhow, Result};
use anyhow::Result;
use clap::Command;
use log::debug;
use std::io::{Read, Write, IsTerminal};
use std::io::{Read, Write};
// Import the missing functions from common module
use crate::modes::common::{settings_compression_type, settings_meta_plugin_types};
use crate::config;
use crate::core::item_service::ItemService;
fn validate_save_args(cmd: &mut Command, ids: &Vec<i64>) {
if !ids.is_empty() {
@@ -17,209 +15,20 @@ fn validate_save_args(cmd: &mut Command, ids: &Vec<i64>) {
}
}
fn initialize_tags(tags: &mut Vec<String>) {
if tags.is_empty() {
tags.push("none".to_string());
}
// Tee reader that writes to a writer as it is read
struct TeeReader<R: Read, W: Write> {
reader: R,
writer: W,
}
fn setup_compression_and_plugins(
cmd: &mut Command,
settings: &config::Settings,
) -> (crate::compression_engine::CompressionType, Box<dyn crate::compression_engine::CompressionEngine>, Vec<Box<dyn crate::meta_plugin::MetaPlugin>>) {
let compression_type = settings_compression_type(cmd, settings);
debug!("MAIN: Compression type: {:?}", compression_type);
let compression_engine =
crate::compression_engine::get_compression_engine(compression_type.clone()).expect("Unable to get compression engine");
// Get meta plugin types from settings
let meta_plugin_types: Vec<crate::meta_plugin::MetaPluginType> = settings_meta_plugin_types(cmd, settings);
debug!("MAIN: Meta plugin types: {:?}", meta_plugin_types);
// Initialize meta_plugins with MetaPlugin instances for each MetaPluginType
let mut meta_plugins: Vec<Box<dyn crate::meta_plugin::MetaPlugin>> = meta_plugin_types
.iter()
.map(|meta_plugin_type| crate::meta_plugin::get_meta_plugin(meta_plugin_type.clone()))
.collect();
// Configure meta plugins with their options and outputs from settings
if let Some(meta_plugin_configs) = &settings.meta_plugins {
for meta_plugin in meta_plugins.iter_mut() {
let plugin_name = meta_plugin.meta_name();
if let Some(config) = meta_plugin_configs.iter().find(|c| c.name == plugin_name) {
// Start with default outputs and options, then apply configuration on top
let mut configured_outputs = meta_plugin.outputs().clone();
for (key, value) in &config.outputs {
configured_outputs.insert(key.clone(), serde_yaml::Value::String(value.clone()));
}
let mut configured_options = meta_plugin.default_options();
for (key, value) in &config.options {
configured_options.insert(key.clone(), value.clone());
}
// Apply the combined configuration
if let Err(e) = meta_plugin.configure_outputs(&configured_outputs) {
eprintln!("Warning: Failed to configure outputs for meta plugin '{}': {}", plugin_name, e);
}
if let Err(e) = meta_plugin.configure_options(&configured_options) {
eprintln!("Warning: Failed to configure options for meta plugin '{}': {}", plugin_name, e);
}
}
impl<R: Read, W: Write> Read for TeeReader<R, W> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let n = self.reader.read(buf)?;
if n > 0 {
self.writer.write_all(&buf[..n])?;
}
Ok(n)
}
// Check for unsupported meta plugins, warn the user, and remove them from the list
let mut i = 0;
meta_plugins.retain(|meta_plugin| {
let is_supported = meta_plugin.is_supported();
if !is_supported {
// We need to get the meta name for the warning message
// Since we can't mutably borrow meta_plugin here, we create a temporary one
let meta_plugin_type = meta_plugin_types[i].clone();
let mut temp_plugin = crate::meta_plugin::get_meta_plugin(meta_plugin_type);
eprintln!("Warning: Meta plugin '{}' is enabled but not supported on this system", temp_plugin.meta_name());
}
i += 1;
is_supported
});
(compression_type, compression_engine, meta_plugins)
}
fn create_and_log_item(
conn: &mut rusqlite::Connection,
settings: &config::Settings,
tags: &Vec<String>,
compression_type: &crate::compression_engine::CompressionType,
) -> Result<crate::db::Item, anyhow::Error> {
let mut item = crate::db::Item {
id: None,
ts: chrono::Utc::now(),
size: None,
compression: compression_type.to_string(),
};
let id = crate::db::insert_item(conn, item.clone())?;
item.id = Some(id);
debug!("MAIN: Added item {:?}", item.clone());
if !settings.quiet {
if std::io::stderr().is_terminal() {
let mut t = term::stderr().unwrap();
t.reset().unwrap_or(());
t.attr(term::Attr::Bold).unwrap_or(());
write!(t, "KEEP:").unwrap_or(());
t.reset().unwrap_or(());
write!(t, " New item ").unwrap_or(());
t.attr(term::Attr::Bold).unwrap_or(());
write!(t, "{id}")?;
t.reset().unwrap_or(());
write!(t, " tags: ")?;
t.attr(term::Attr::Bold).unwrap_or(());
write!(t, "{}", tags.join(" "))?;
t.reset().unwrap_or(());
writeln!(t)?;
std::io::stderr().flush()?;
} else {
let mut t = std::io::stderr();
writeln!(t, "KEEP: New item: {} tags: {:?}", id, tags)?;
}
}
Ok(item)
}
fn setup_item_metadata(
conn: &mut rusqlite::Connection,
_settings: &config::Settings,
item: &crate::db::Item,
tags: &Vec<String>,
) -> Result<(), anyhow::Error> {
crate::db::set_item_tags(conn, item.clone(), tags)?;
Ok(())
}
fn collect_item_meta(_settings: &config::Settings) -> std::collections::HashMap<String, String> {
let mut item_meta: std::collections::HashMap<String, String> = crate::modes::common::get_meta_from_env();
if let Ok(hostname) = gethostname::gethostname().into_string() {
if !item_meta.contains_key("hostname") {
item_meta.insert("hostname".to_string(), hostname);
}
}
// Add any additional metadata from settings if needed
// (currently there's no direct metadata in settings, but this could be extended)
item_meta
}
fn process_input_stream(
compression_engine: &Box<dyn crate::compression_engine::CompressionEngine>,
data_path: &std::path::PathBuf,
item_id: i64,
meta_plugins: &mut Vec<Box<dyn crate::meta_plugin::MetaPlugin>>,
conn: &rusqlite::Connection,
) -> Result<(Box<dyn std::io::Write>, crate::db::Item), anyhow::Error> {
let mut item = crate::db::Item {
id: Some(item_id),
ts: chrono::Utc::now(),
size: None,
compression: String::new(), // Will be set later
};
let mut item_path = data_path.clone();
item_path.push(item_id.to_string());
let mut stdin = std::io::stdin().lock();
let mut stdout = std::io::stdout().lock();
let mut buffer = [0; libc::BUFSIZ as usize];
let mut item_out: Box<dyn std::io::Write> =
compression_engine
.create(item_path.clone())
.map_err(|e| anyhow!("Unable to write file {:?}: {}", item_path, e))?;
let mut total_bytes = 0;
debug!("MAIN: Starting IO loop");
loop {
let n = stdin.read(&mut buffer[..libc::BUFSIZ as usize])?;
item.size = match item.size {
None => Some(n as i64),
Some(prev_n) => Some(prev_n + n as i64),
};
if n == 0 {
debug!("MAIN: EOF on STDIN");
break;
}
total_bytes += n;
debug!("MAIN: Loop - {:?} bytes (total: {})", item.size, total_bytes);
stdout.write_all(&buffer[..n])?;
item_out.write_all(&buffer[..n])?;
// Process data with meta plugins
for meta_plugin in meta_plugins.iter_mut() {
meta_plugin.update(&buffer[..n], conn);
}
}
debug!("MAIN: Ending IO loop after {:?} bytes", item.size);
// Finalize meta plugins
for meta_plugin in meta_plugins.iter_mut() {
if let Err(e) = meta_plugin.finalize(conn) {
eprintln!("Warning: Failed to finalize meta plugin: {}", e);
}
}
stdout.flush()?;
item_out.flush()?;
Ok((item_out, item))
}
pub fn mode_save(
@@ -231,46 +40,18 @@ pub fn mode_save(
data_path: std::path::PathBuf,
) -> Result<(), anyhow::Error> {
validate_save_args(cmd, ids);
initialize_tags(tags);
let (compression_type, compression_engine, mut meta_plugins) = setup_compression_and_plugins(cmd, settings);
let item_service = ItemService::new(data_path);
let mut item = create_and_log_item(conn, settings, tags, &compression_type)?;
setup_item_metadata(conn, settings, &item, tags)?; // Pass mutable reference
let stdin = std::io::stdin();
let stdout = std::io::stdout();
// Save as much as possible in case something breaks - don't use transactions
// This allows partial saves to succeed even if some metadata operations fail
let item_meta = collect_item_meta(settings);
let item_id = item.id.ok_or_else(|| anyhow!("Item missing ID"))?;
let tee_reader = TeeReader {
reader: stdin.lock(),
writer: stdout.lock(),
};
for kv in item_meta.iter() {
let meta = crate::db::Meta {
id: item_id,
name: kv.0.to_string(),
value: kv.1.to_string(),
};
crate::db::store_meta(conn, meta)?;
}
// Initialize meta plugins with database connection
for meta_plugin in meta_plugins.iter_mut() {
if let Err(e) = meta_plugin.initialize(conn, item_id) {
eprintln!("Warning: Failed to initialize meta plugin: {}", e);
}
}
let (_item_out, processed_item) = process_input_stream(
&compression_engine,
&data_path,
item_id,
&mut meta_plugins,
conn,
)?;
item.size = processed_item.size;
item.compression = compression_type.to_string();
crate::db::update_item(conn, item.clone())?;
item_service.save_item(tee_reader, cmd, settings, tags, conn)?;
Ok(())
}