diff --git a/src/services/item_service.rs b/src/services/item_service.rs new file mode 100644 index 0000000..73a24f8 --- /dev/null +++ b/src/services/item_service.rs @@ -0,0 +1,262 @@ +use crate::config::Settings; +use crate::services::compression_service::CompressionService; +use crate::services::error::CoreError; +use crate::services::meta_service::MetaService; +use crate::services::types::{ItemWithContent, ItemWithMeta}; +use crate::meta_plugin::{get_meta_plugin, MetaPlugin, MetaPluginType}; +use crate::db::{self, Meta}; +use crate::compression_engine::{get_compression_engine, CompressionType}; +use crate::modes::common::settings_compression_type; +use clap::Command; +use rusqlite::Connection; +use std::collections::HashMap; +use std::fs; +use std::io::{IsTerminal, Read, Write}; +use std::path::PathBuf; + +pub struct ItemService { + data_path: PathBuf, + compression_service: CompressionService, + meta_service: MetaService, +} + +impl ItemService { + pub fn new(data_path: PathBuf) -> Self { + Self { + data_path, + compression_service: CompressionService::new(), + meta_service: MetaService::new(), + } + } + + pub fn get_item(&self, conn: &Connection, id: i64) -> Result { + let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?; + let tags = db::get_item_tags(conn, &item)?; + let meta = db::get_item_meta(conn, &item)?; + Ok(ItemWithMeta { item, tags, meta }) + } + + pub fn get_item_content(&self, conn: &Connection, id: i64) -> Result { + let item_with_meta = self.get_item(conn, id)?; + let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; + + if item_id <= 0 { + return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", item_id))); + } + + let mut item_path = self.data_path.clone(); + item_path.push(item_id.to_string()); + + let content = self + .compression_service + .get_item_content(item_path, &item_with_meta.item.compression)?; + + Ok(ItemWithContent { + item_with_meta, + content, + }) + } + + pub fn find_item(&self, conn: &Connection, ids: &[i64], tags: &[String], meta: &HashMap) -> Result { + let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) { + (false, _) => db::get_item(conn, ids[0])?, + (true, true) => db::get_item_last(conn)?, + (true, false) => db::get_item_matching(conn, &tags.to_vec(), meta)? + }; + + let item = item_maybe.ok_or(CoreError::ItemNotFoundGeneric)?; + let item_id = item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?; + self.get_item(conn, item_id) + } + + pub fn list_items(&self, conn: &Connection, tags: &[String], meta: &HashMap) -> Result, CoreError> { + let items = db::get_items_matching(conn, &tags.to_vec(), meta)?; + + let item_ids: Vec = items.iter().filter_map(|item| item.id).collect(); + if item_ids.is_empty() { + return Ok(Vec::new()); + } + + let tags_map = db::get_tags_for_items(conn, &item_ids)?; + let meta_map_db = db::get_meta_for_items(conn, &item_ids)?; + + let mut result = Vec::new(); + for item in items { + let item_id = item.id.unwrap(); + let tags = tags_map.get(&item_id).cloned().unwrap_or_default(); + let meta_hm = meta_map_db.get(&item_id).cloned().unwrap_or_default(); + let meta = meta_hm.into_iter().map(|(name, value)| Meta { id: item_id, name, value }).collect(); + + result.push(ItemWithMeta { item, tags, meta }); + } + + Ok(result) + } + + pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<(), CoreError> { + if id <= 0 { + return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", id))); + } + let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?; + + let mut item_path = self.data_path.clone(); + item_path.push(id.to_string()); + + let tx = conn.transaction()?; + db::delete_item(&tx, item)?; + fs::remove_file(&item_path).or_else(|e| if e.kind() == std::io::ErrorKind::NotFound { Ok(()) } else { Err(e) })?; + tx.commit()?; + + Ok(()) + } + + pub fn save_item( + &self, + mut input: R, + cmd: &mut Command, + settings: &Settings, + tags: &mut Vec, + conn: &mut Connection, + ) -> Result { + if tags.is_empty() { + tags.push("none".to_string()); + } + + let compression_type = settings_compression_type(cmd, settings); + let compression_engine = get_compression_engine(compression_type.clone())?; + + let tx = conn.transaction()?; + + let item_id; + let mut item; + { + item = db::create_item(&tx, compression_type.clone())?; + item_id = item.id.unwrap(); + db::set_item_tags(&tx, item.clone(), tags)?; + let item_meta = self.meta_service.collect_initial_meta(); + for (k, v) in item_meta.iter() { + db::add_meta(&tx, item_id, k, v)?; + } + } + + let mut plugins = self.meta_service.get_plugins(cmd, settings); + self.meta_service.initialize_plugins(&mut plugins, &tx, item_id); + + let mut item_path = self.data_path.clone(); + item_path.push(item_id.to_string()); + + let mut item_out = compression_engine.create(item_path.clone())?; + + let mut buffer = [0; 8192]; + let mut total_bytes = 0; + + loop { + let n = input.read(&mut buffer)?; + if n == 0 { break; } + + total_bytes += n as i64; + item_out.write_all(&buffer[..n])?; + self.meta_service.process_chunk(&mut plugins, &buffer[..n], &tx); + } + + item_out.flush()?; + drop(item_out); + + self.meta_service.finalize_plugins(&mut plugins, &tx); + + item.size = Some(total_bytes); + db::update_item(&tx, item.clone())?; + + tx.commit()?; + + if !settings.quiet { + if std::io::stderr().is_terminal() { + let mut t = term::stderr().unwrap(); + let _ = t.reset(); + let _ = t.attr(term::Attr::Bold); + let _ = write!(t, "KEEP:"); + let _ = t.reset(); + let _ = write!(t, " New item "); + let _ = t.attr(term::Attr::Bold); + let _ = write!(t, "{item_id}"); + let _ = t.reset(); + let _ = write!(t, " tags: "); + let _ = t.attr(term::Attr::Bold); + let _ = write!(t, "{}", tags.join(" ")); + let _ = t.reset(); + let _ = writeln!(t); + let _ = std::io::stderr().flush(); + } else { + let mut t = std::io::stderr(); + let _ = writeln!(t, "KEEP: New item: {} tags: {:?}", item_id, tags); + } + } + + self.get_item(conn, item_id) + } + + pub fn save_item_from_mcp( + &self, + content: &[u8], + tags: &Vec, + metadata: &HashMap, + conn: &mut Connection, + ) -> Result { + let compression_type = CompressionType::LZ4; + let compression_engine = get_compression_engine(compression_type.clone())?; + + let tx = conn.transaction()?; + let item_id; + let mut item; + + { + item = db::create_item(&tx, compression_type.clone())?; + item_id = item.id.unwrap(); + + // Add tags + for tag in tags { + db::add_tag(&tx, item_id, tag)?; + } + + // Add custom metadata + for (key, value) in metadata { + db::add_meta(&tx, item_id, key, value)?; + } + } + + let mut item_path = self.data_path.clone(); + item_path.push(item_id.to_string()); + + let mut writer = compression_engine.create(item_path.clone())?; + writer.write_all(content)?; + drop(writer); + + let plugin_types = vec![ + MetaPluginType::FileMime, + MetaPluginType::FileEncoding, + MetaPluginType::Binary, + MetaPluginType::LineCount, + MetaPluginType::WordCount, + MetaPluginType::DigestSha256, + MetaPluginType::Uid, + MetaPluginType::User, + MetaPluginType::Hostname, + ]; + + let mut plugins: Vec> = + plugin_types.iter().map(|p| get_meta_plugin(p.clone())).collect(); + + self.meta_service + .initialize_plugins(&mut plugins, &tx, item_id); + self.meta_service + .process_chunk(&mut plugins, content, &tx); + self.meta_service.finalize_plugins(&mut plugins, &tx); + + item.size = Some(content.len() as i64); + db::update_item(&tx, item.clone())?; + + tx.commit()?; + + self.get_item(conn, item_id) + } +}