feat: move core services to services directory
This commit is contained in:
committed by
Andrew Phillips (aider)
parent
8cc0cfc606
commit
321e00171e
@@ -1,262 +0,0 @@
|
||||
use crate::config::Settings;
|
||||
use crate::services::compression_service::CompressionService;
|
||||
use crate::services::error::CoreError;
|
||||
use crate::services::meta_service::MetaService;
|
||||
use crate::services::types::{ItemWithContent, ItemWithMeta};
|
||||
use crate::meta_plugin::{get_meta_plugin, MetaPlugin, MetaPluginType};
|
||||
use crate::db::{self, Meta};
|
||||
use crate::compression_engine::{get_compression_engine, CompressionType};
|
||||
use crate::modes::common::settings_compression_type;
|
||||
use clap::Command;
|
||||
use rusqlite::Connection;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::io::{IsTerminal, Read, Write};
|
||||
use std::path::PathBuf;
|
||||
|
||||
pub struct ItemService {
|
||||
data_path: PathBuf,
|
||||
compression_service: CompressionService,
|
||||
meta_service: MetaService,
|
||||
}
|
||||
|
||||
impl ItemService {
|
||||
pub fn new(data_path: PathBuf) -> Self {
|
||||
Self {
|
||||
data_path,
|
||||
compression_service: CompressionService::new(),
|
||||
meta_service: MetaService::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_item(&self, conn: &Connection, id: i64) -> Result<ItemWithMeta, CoreError> {
|
||||
let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?;
|
||||
let tags = db::get_item_tags(conn, &item)?;
|
||||
let meta = db::get_item_meta(conn, &item)?;
|
||||
Ok(ItemWithMeta { item, tags, meta })
|
||||
}
|
||||
|
||||
pub fn get_item_content(&self, conn: &Connection, id: i64) -> Result<ItemWithContent, CoreError> {
|
||||
let item_with_meta = self.get_item(conn, id)?;
|
||||
let item_id = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
||||
|
||||
if item_id <= 0 {
|
||||
return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", item_id)));
|
||||
}
|
||||
|
||||
let mut item_path = self.data_path.clone();
|
||||
item_path.push(item_id.to_string());
|
||||
|
||||
let content = self
|
||||
.compression_service
|
||||
.get_item_content(item_path, &item_with_meta.item.compression)?;
|
||||
|
||||
Ok(ItemWithContent {
|
||||
item_with_meta,
|
||||
content,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn find_item(&self, conn: &Connection, ids: &[i64], tags: &[String], meta: &HashMap<String, String>) -> Result<ItemWithMeta, CoreError> {
|
||||
let item_maybe = match (ids.is_empty(), tags.is_empty() && meta.is_empty()) {
|
||||
(false, _) => db::get_item(conn, ids[0])?,
|
||||
(true, true) => db::get_item_last(conn)?,
|
||||
(true, false) => db::get_item_matching(conn, &tags.to_vec(), meta)?
|
||||
};
|
||||
|
||||
let item = item_maybe.ok_or(CoreError::ItemNotFoundGeneric)?;
|
||||
let item_id = item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
||||
self.get_item(conn, item_id)
|
||||
}
|
||||
|
||||
pub fn list_items(&self, conn: &Connection, tags: &[String], meta: &HashMap<String, String>) -> Result<Vec<ItemWithMeta>, CoreError> {
|
||||
let items = db::get_items_matching(conn, &tags.to_vec(), meta)?;
|
||||
|
||||
let item_ids: Vec<i64> = items.iter().filter_map(|item| item.id).collect();
|
||||
if item_ids.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let tags_map = db::get_tags_for_items(conn, &item_ids)?;
|
||||
let meta_map_db = db::get_meta_for_items(conn, &item_ids)?;
|
||||
|
||||
let mut result = Vec::new();
|
||||
for item in items {
|
||||
let item_id = item.id.unwrap();
|
||||
let tags = tags_map.get(&item_id).cloned().unwrap_or_default();
|
||||
let meta_hm = meta_map_db.get(&item_id).cloned().unwrap_or_default();
|
||||
let meta = meta_hm.into_iter().map(|(name, value)| Meta { id: item_id, name, value }).collect();
|
||||
|
||||
result.push(ItemWithMeta { item, tags, meta });
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn delete_item(&self, conn: &mut Connection, id: i64) -> Result<(), CoreError> {
|
||||
if id <= 0 {
|
||||
return Err(CoreError::InvalidInput(format!("Invalid item ID: {}", id)));
|
||||
}
|
||||
let item = db::get_item(conn, id)?.ok_or(CoreError::ItemNotFound(id))?;
|
||||
|
||||
let mut item_path = self.data_path.clone();
|
||||
item_path.push(id.to_string());
|
||||
|
||||
let tx = conn.transaction()?;
|
||||
db::delete_item(&tx, item)?;
|
||||
fs::remove_file(&item_path).or_else(|e| if e.kind() == std::io::ErrorKind::NotFound { Ok(()) } else { Err(e) })?;
|
||||
tx.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn save_item<R: Read>(
|
||||
&self,
|
||||
mut input: R,
|
||||
cmd: &mut Command,
|
||||
settings: &Settings,
|
||||
tags: &mut Vec<String>,
|
||||
conn: &mut Connection,
|
||||
) -> Result<ItemWithMeta, CoreError> {
|
||||
if tags.is_empty() {
|
||||
tags.push("none".to_string());
|
||||
}
|
||||
|
||||
let compression_type = settings_compression_type(cmd, settings);
|
||||
let compression_engine = get_compression_engine(compression_type.clone())?;
|
||||
|
||||
let tx = conn.transaction()?;
|
||||
|
||||
let item_id;
|
||||
let mut item;
|
||||
{
|
||||
item = db::create_item(&tx, compression_type.clone())?;
|
||||
item_id = item.id.unwrap();
|
||||
db::set_item_tags(&tx, item.clone(), tags)?;
|
||||
let item_meta = self.meta_service.collect_initial_meta();
|
||||
for (k, v) in item_meta.iter() {
|
||||
db::add_meta(&tx, item_id, k, v)?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut plugins = self.meta_service.get_plugins(cmd, settings);
|
||||
self.meta_service.initialize_plugins(&mut plugins, &tx, item_id);
|
||||
|
||||
let mut item_path = self.data_path.clone();
|
||||
item_path.push(item_id.to_string());
|
||||
|
||||
let mut item_out = compression_engine.create(item_path.clone())?;
|
||||
|
||||
let mut buffer = [0; 8192];
|
||||
let mut total_bytes = 0;
|
||||
|
||||
loop {
|
||||
let n = input.read(&mut buffer)?;
|
||||
if n == 0 { break; }
|
||||
|
||||
total_bytes += n as i64;
|
||||
item_out.write_all(&buffer[..n])?;
|
||||
self.meta_service.process_chunk(&mut plugins, &buffer[..n], &tx);
|
||||
}
|
||||
|
||||
item_out.flush()?;
|
||||
drop(item_out);
|
||||
|
||||
self.meta_service.finalize_plugins(&mut plugins, &tx);
|
||||
|
||||
item.size = Some(total_bytes);
|
||||
db::update_item(&tx, item.clone())?;
|
||||
|
||||
tx.commit()?;
|
||||
|
||||
if !settings.quiet {
|
||||
if std::io::stderr().is_terminal() {
|
||||
let mut t = term::stderr().unwrap();
|
||||
let _ = t.reset();
|
||||
let _ = t.attr(term::Attr::Bold);
|
||||
let _ = write!(t, "KEEP:");
|
||||
let _ = t.reset();
|
||||
let _ = write!(t, " New item ");
|
||||
let _ = t.attr(term::Attr::Bold);
|
||||
let _ = write!(t, "{item_id}");
|
||||
let _ = t.reset();
|
||||
let _ = write!(t, " tags: ");
|
||||
let _ = t.attr(term::Attr::Bold);
|
||||
let _ = write!(t, "{}", tags.join(" "));
|
||||
let _ = t.reset();
|
||||
let _ = writeln!(t);
|
||||
let _ = std::io::stderr().flush();
|
||||
} else {
|
||||
let mut t = std::io::stderr();
|
||||
let _ = writeln!(t, "KEEP: New item: {} tags: {:?}", item_id, tags);
|
||||
}
|
||||
}
|
||||
|
||||
self.get_item(conn, item_id)
|
||||
}
|
||||
|
||||
pub fn save_item_from_mcp(
|
||||
&self,
|
||||
content: &[u8],
|
||||
tags: &Vec<String>,
|
||||
metadata: &HashMap<String, String>,
|
||||
conn: &mut Connection,
|
||||
) -> Result<ItemWithMeta, CoreError> {
|
||||
let compression_type = CompressionType::LZ4;
|
||||
let compression_engine = get_compression_engine(compression_type.clone())?;
|
||||
|
||||
let tx = conn.transaction()?;
|
||||
let item_id;
|
||||
let mut item;
|
||||
|
||||
{
|
||||
item = db::create_item(&tx, compression_type.clone())?;
|
||||
item_id = item.id.unwrap();
|
||||
|
||||
// Add tags
|
||||
for tag in tags {
|
||||
db::add_tag(&tx, item_id, tag)?;
|
||||
}
|
||||
|
||||
// Add custom metadata
|
||||
for (key, value) in metadata {
|
||||
db::add_meta(&tx, item_id, key, value)?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut item_path = self.data_path.clone();
|
||||
item_path.push(item_id.to_string());
|
||||
|
||||
let mut writer = compression_engine.create(item_path.clone())?;
|
||||
writer.write_all(content)?;
|
||||
drop(writer);
|
||||
|
||||
let plugin_types = vec![
|
||||
MetaPluginType::FileMime,
|
||||
MetaPluginType::FileEncoding,
|
||||
MetaPluginType::Binary,
|
||||
MetaPluginType::LineCount,
|
||||
MetaPluginType::WordCount,
|
||||
MetaPluginType::DigestSha256,
|
||||
MetaPluginType::Uid,
|
||||
MetaPluginType::User,
|
||||
MetaPluginType::Hostname,
|
||||
];
|
||||
|
||||
let mut plugins: Vec<Box<dyn MetaPlugin>> =
|
||||
plugin_types.iter().map(|p| get_meta_plugin(p.clone())).collect();
|
||||
|
||||
self.meta_service
|
||||
.initialize_plugins(&mut plugins, &tx, item_id);
|
||||
self.meta_service
|
||||
.process_chunk(&mut plugins, content, &tx);
|
||||
self.meta_service.finalize_plugins(&mut plugins, &tx);
|
||||
|
||||
item.size = Some(content.len() as i64);
|
||||
db::update_item(&tx, item.clone())?;
|
||||
|
||||
tx.commit()?;
|
||||
|
||||
self.get_item(conn, item_id)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user