use anyhow::{Context, Error, Result}; use chrono::prelude::*; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use log::*; use rusqlite::{Connection, OpenFlags, params}; use rusqlite_migration::{M, Migrations}; use std::collections::HashMap; use std::path::PathBuf; use std::rc::Rc; lazy_static! { static ref MIGRATIONS: Migrations<'static> = Migrations::new(vec![ M::up( "CREATE TABLE items( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, ts TEXT NOT NULL, size INTEGER NULL, compression TEXT NOT NULL)" ), M::up( "CREATE TABLE tags ( id INTEGER NOT NULL, name TEXT NOT NULL, FOREIGN KEY(id) REFERENCES items(id) ON DELETE CASCADE, PRIMARY KEY(id, name));" ), M::up( "CREATE TABLE metas ( id INTEGER NOT NULL, name TEXT NOT NULL, value TEXT NOT NULL, FOREIGN KEY(id) REFERENCES items(id) ON DELETE CASCADE, PRIMARY KEY(id, name));" ), M::up("CREATE INDEX idx_tags_name ON tags(name)"), M::up("CREATE INDEX idx_metas_name ON metas(name)"), ]); } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Item { pub id: Option, pub ts: DateTime, pub size: Option, pub compression: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Tag { pub id: i64, pub name: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Meta { pub id: i64, pub name: String, pub value: String, } pub fn open(path: PathBuf) -> Result { debug!("DB: Opening file: {:?}", path); let mut conn = Connection::open_with_flags( path, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, ) .context("Problem opening file")?; conn.pragma_update(None, "foreign_keys", "ON") .context("Problem enabling SQLite foreign_keys pragma")?; MIGRATIONS .to_latest(&mut conn) .context("Problem performing database migrations")?; rusqlite::vtab::array::load_module(&conn).context("Problem enabling array module")?; Ok(conn) } pub fn insert_item(conn: &Connection, item: Item) -> Result { debug!("DB: Inserting item: {:?}", item); conn.execute( "INSERT INTO items (ts, size, compression) VALUES (?1, ?2, ?3)", params![item.ts, item.size, item.compression], )?; Ok(conn.last_insert_rowid()) } pub fn create_item(conn: &Connection, compression_type: crate::compression_engine::CompressionType) -> Result { let item = Item { id: None, ts: chrono::Utc::now(), size: None, compression: compression_type.to_string(), }; let item_id = insert_item(conn, item.clone())?; Ok(Item { id: Some(item_id), ..item }) } pub fn add_tag(conn: &Connection, item_id: i64, tag_name: &str) -> Result<()> { let tag = Tag { id: item_id, name: tag_name.to_string(), }; insert_tag(conn, tag) } pub fn add_meta(conn: &Connection, item_id: i64, name: &str, value: &str) -> Result<()> { let meta = Meta { id: item_id, name: name.to_string(), value: value.to_string(), }; store_meta(conn, meta) } pub fn update_item(conn: &Connection, item: Item) -> Result<()> { debug!("DB: Updating item: {:?}", item); conn.execute( "UPDATE items SET size=?2, compression=?3 WHERE id=?1", params![ item.id, item.size, item.compression, ], )?; Ok(()) } pub fn delete_item(conn: &Connection, item: Item) -> Result<()> { debug!("DB: Deleting item: {:?}", item); conn.execute("DELETE FROM items WHERE id=?1", params![item.id])?; Ok(()) } pub fn query_delete_meta(conn: &Connection, meta: Meta) -> Result<()> { debug!("DB: Deleting meta: {:?}", meta); conn.execute( "DELETE FROM metas WHERE id=?1 AND name=?2", params![meta.id, meta.name], )?; Ok(()) } pub fn query_upsert_meta(conn: &Connection, meta: Meta) -> Result<()> { debug!("DB: Inserting meta: {:?}", meta); conn.execute( "INSERT INTO metas (id, name, value) VALUES (?1, ?2, ?3) ON CONFLICT(id, name) DO UPDATE SET value=?3", params![meta.id, meta.name, meta.value], )?; Ok(()) } pub fn store_meta(conn: &Connection, meta: Meta) -> Result<()> { debug!("DB: Storing meta: {:?}", meta); if meta.value.is_empty() { query_delete_meta(conn, meta)?; } else { query_upsert_meta(conn, meta)?; } Ok(()) } pub fn insert_tag(conn: &Connection, tag: Tag) -> Result<()> { debug!("DB: Inserting tag: {:?}", tag); conn.execute( "INSERT INTO tags (id, name) VALUES (?1, ?2)", params![tag.id, tag.name], )?; Ok(()) } pub fn delete_item_tags(conn: &Connection, item: Item) -> Result<()> { debug!("DB: Deleting all item tags: {:?}", item); conn.execute("DELETE FROM tags WHERE id=?1", params![item.id])?; Ok(()) } pub fn set_item_tags(conn: &Connection, item: Item, tags: &Vec) -> Result<()> { debug!("DB: Setting tags for item: {:?} ?{:?}", item, tags); delete_item_tags(conn, item.clone())?; let item_id = item.id.unwrap(); for tag_name in tags { insert_tag( conn, Tag { id: item_id, name: tag_name.to_string(), }, )?; } Ok(()) } pub fn query_all_items(conn: &Connection) -> Result> { debug!("DB: Querying all items"); let mut statement = conn .prepare("SELECT id, ts, size, compression FROM items ORDER BY id ASC") .context("Problem preparing SQL statement")?; let mut rows = statement.query(params![])?; let mut items = Vec::new(); while let Some(row) = rows.next()? { let item = Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, }; items.push(item); } Ok(items) } pub fn query_tagged_items<'a>(conn: &'a Connection, tags: &'a Vec) -> Result> { debug!("DB: Querying tagged items: {:?}", tags); let mut statement = conn .prepare_cached( " SELECT items.id, items.ts, items.size, items.compression, count(tags_match.id) as tags_score FROM items, (SELECT tags.id FROM tags WHERE tags.name IN rarray(?1)) as tags_match WHERE items.id = tags_match.id GROUP BY items.id HAVING tags_score = ?2 ORDER BY items.id ASC", ) .context("Problem preparing SQL statement")?; let tags_values: Vec = tags .iter() .map(|s| rusqlite::types::Value::from(s.clone())) .collect(); let tags_ptr = Rc::new(tags_values); let mut rows = statement.query(params![&tags_ptr, &tags.len()])?; let mut items = Vec::new(); while let Some(row) = rows.next()? { let item = Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, }; items.push(item); } Ok(items) } pub fn get_items(conn: &Connection) -> Result> { debug!("DB: Getting all items"); query_all_items(conn) } pub fn get_items_matching( conn: &Connection, tags: &Vec, meta: &HashMap, ) -> Result> { debug!( "DB: Getting items matching: tags={:?} meta={:?}", tags, meta ); let items = match tags.is_empty() { true => query_all_items(conn)?, false => query_tagged_items(conn, tags)?, }; if meta.is_empty() { debug!("DB: Not filtering on meta"); Ok(items) } else { debug!("DB: Filtering on meta"); let mut filtered_items: Vec = Vec::new(); for item in items.iter() { let mut item_ok = true; let mut item_meta: HashMap = HashMap::new(); for meta in get_item_meta(conn, item)? { item_meta.insert(meta.name, meta.value); } debug!("DB: Matching: {:?}: {:?}", item, item_meta); for (k, v) in meta.iter() { match item_meta.get(k) { Some(value) => item_ok = v.eq(value), None => item_ok = false, } if item_ok { break; } } if item_ok { filtered_items.push(item.clone()); } } Ok(filtered_items) } } pub fn get_item_matching( conn: &Connection, tags: &Vec, _meta: &HashMap, ) -> Result> { debug!("DB: Get item matching tags: {:?}", tags); let mut statement = conn .prepare_cached( " SELECT items.id, items.ts, items.size, items.compression, count(sel.id) as score FROM items, (SELECT tags.id FROM tags WHERE tags.name IN rarray(?1)) as sel WHERE items.id = sel.id GROUP BY items.id HAVING score = ?2 ORDER BY items.id DESC LIMIT 1", ) .context("Problem preparing SQL statement")?; let tags_values: Vec = tags .iter() .map(|s| rusqlite::types::Value::from(s.clone())) .collect(); let tags_ptr = Rc::new(tags_values); let mut rows = statement.query(params![&tags_ptr, &tags.len()])?; match rows.next()? { Some(row) => Ok(Some(Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, })), None => Ok(None), } } pub fn get_item(conn: &Connection, item_id: i64) -> Result> { debug!("DB: Getting item {:?}", item_id); let mut statement = conn .prepare_cached( " SELECT id, ts, size, compression FROM items WHERE items.id = ?1", ) .context("Problem preparing SQL statement")?; let mut rows = statement.query(params![item_id])?; match rows.next()? { Some(row) => Ok(Some(Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, })), None => Ok(None), } } pub fn get_item_last(conn: &Connection) -> Result> { debug!("DB: Getting last item"); let mut statement = conn .prepare_cached( " SELECT id, ts, size, compression FROM items ORDER BY id DESC LIMIT 1", ) .context("Problem preparing SQL statement")?; let mut rows = statement.query(params![])?; match rows.next()? { Some(row) => Ok(Some(Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, })), None => Ok(None), } } pub fn get_item_tags(conn: &Connection, item: &Item) -> Result> { debug!("DB: Getting tags for item: {:?}", item); let mut statement = conn .prepare_cached("SELECT id, name FROM tags WHERE id=?1 ORDER BY name ASC") .context("Problem preparing SQL statement")?; let mut rows = statement.query(params![item.id])?; let mut tags = Vec::new(); while let Some(row) = rows.next()? { tags.push(Tag { id: row.get(0)?, name: row.get(1)?, }); } Ok(tags) } pub fn get_item_meta(conn: &Connection, item: &Item) -> Result> { debug!("DB: Getting item meta: {:?}", item); let mut statement = conn .prepare_cached("SELECT id, name, value FROM metas WHERE id=?1 ORDER BY name ASC") .context("Problem preparing SQL statement")?; let mut rows = statement.query(params![item.id])?; let mut metas = Vec::new(); while let Some(row) = rows.next()? { metas.push(Meta { id: row.get(0)?, name: row.get(1)?, value: row.get(2)?, }); } Ok(metas) } pub fn get_item_meta_name(conn: &Connection, item: &Item, name: String) -> Result> { debug!("DB: Getting item meta name: {:?} {:?}", item, name); let mut statement = conn .prepare_cached("SELECT id, name, value FROM metas WHERE id=?1 AND name=?2") .context("Problem preparing SQL statement")?; let mut rows = statement.query(params![item.id, name])?; match rows.next()? { Some(row) => Ok(Some(Meta { id: row.get(0)?, name: row.get(1)?, value: row.get(2)?, })), None => Ok(None), } } pub fn get_item_meta_value(conn: &Connection, item: &Item, name: String) -> Result> { debug!("DB: Getting item meta value: {:?} {:?}", item, name); let mut statement = conn .prepare_cached("SELECT value FROM metas WHERE id=?1 AND name=?2") .context("Problem preparing SQL statement")?; let mut rows = statement.query(params![item.id, name])?; match rows.next()? { Some(row) => Ok(Some(row.get(0)?)), None => Ok(None), } } pub fn get_tags_for_items(conn: &Connection, item_ids: &[i64]) -> Result>> { debug!("DB: Getting tags for items: {:?}", item_ids); if item_ids.is_empty() { return Ok(std::collections::HashMap::new()); } // Create placeholders for the IN clause let placeholders: Vec = item_ids.iter().map(|_| "?".to_string()).collect(); let placeholders_str = placeholders.join(","); let sql = format!("SELECT id, name FROM tags WHERE id IN ({}) ORDER BY id ASC, name ASC", placeholders_str); let mut statement = conn .prepare_cached(&sql) .context("Problem preparing SQL statement")?; let mut rows = statement.query(rusqlite::params_from_iter(item_ids))?; let mut tags_map: std::collections::HashMap> = std::collections::HashMap::new(); while let Some(row) = rows.next()? { let id: i64 = row.get(0)?; let name: String = row.get(1)?; tags_map.entry(id).or_insert_with(Vec::new).push(Tag { id, name }); } Ok(tags_map) } pub fn get_meta_for_items(conn: &Connection, item_ids: &[i64]) -> Result>> { debug!("DB: Getting meta for items: {:?}", item_ids); if item_ids.is_empty() { return Ok(std::collections::HashMap::new()); } // Create placeholders for the IN clause let placeholders: Vec = item_ids.iter().map(|_| "?".to_string()).collect(); let placeholders_str = placeholders.join(","); let sql = format!("SELECT id, name, value FROM metas WHERE id IN ({}) ORDER BY id ASC, name ASC", placeholders_str); let mut statement = conn .prepare_cached(&sql) .context("Problem preparing SQL statement")?; let mut rows = statement.query(rusqlite::params_from_iter(item_ids))?; let mut meta_map: std::collections::HashMap> = std::collections::HashMap::new(); while let Some(row) = rows.next()? { let id: i64 = row.get(0)?; let name: String = row.get(1)?; let value: String = row.get(2)?; meta_map.entry(id).or_insert_with(std::collections::HashMap::new).insert(name, value); } Ok(meta_map) }