use anyhow::{Context, Error, Result}; use chrono::prelude::*; use lazy_static::lazy_static; use log::*; use rusqlite::{Connection, OpenFlags}; use rusqlite_migration::{Migrations, M}; use std::collections::HashMap; use std::path::PathBuf; use std::rc::Rc; lazy_static! { static ref MIGRATIONS: Migrations<'static> = Migrations::new(vec![ M::up( "CREATE TABLE items( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, ts TEXT NOT NULL, size INTEGER NULL, compression TEXT NOT NULL)" ), M::up( "CREATE TABLE tags ( id INTEGER NOT NULL, name TEXT NOT NULL, FOREIGN KEY(id) REFERENCES items(id) ON DELETE CASCADE, PRIMARY KEY(id, name));" ), M::up( "CREATE TABLE metas ( id INTEGER NOT NULL, name TEXT NOT NULL, value TEXT NOT NULL, FOREIGN KEY(id) REFERENCES items(id) ON DELETE CASCADE, PRIMARY KEY(id, name));" ), M::up( "ALTER TABLE items ALTER COLUMN compression SET DEFAULT 'none';" ), M::up( "ALTER TABLE items ADD COLUMN digest_type TEXT NOT NULL DEFAULT 'none';" ), M::up( "ALTER TABLE items ADD COLUMN digest_value TEXT NULL;" ), ]); } #[derive(Debug, Clone)] pub struct Item { pub id: Option, pub ts: DateTime, pub size: Option, pub compression: String, pub digest_type: String, pub digest_value: String, } #[derive(Debug, Clone)] pub struct Tag { pub id: i64, pub name: String, } #[derive(Debug, Clone)] pub struct Meta { pub id: i64, pub name: String, pub value: String, } pub fn open(path: PathBuf) -> Result { debug!("DB: Opening file: {:?}", path); let mut conn = Connection::open_with_flags( path, OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE, ) .context("Problem opening file")?; conn.pragma_update(None, "foreign_keys", "ON") .context("Problem enabling SQLite foreign_keys pragma")?; MIGRATIONS .to_latest(&mut conn) .context("Problem performing database migrations")?; rusqlite::vtab::array::load_module(&conn).context("Problem enabling array module")?; Ok(conn) } pub fn insert_item(conn: &Connection, item: Item) -> Result { debug!("DB: Inserting item: {:?}", item); conn.execute( "INSERT INTO items (ts, size, compression) VALUES (?1, ?2, ?3)", (item.ts, item.size, item.compression), )?; Ok(conn.last_insert_rowid()) } pub fn update_item(conn: &Connection, item: Item) -> Result<()> { debug!("DB: Updating item: {:?}", item); conn.execute( "UPDATE items SET size=?2, compression=?3 WHERE id=?1", (item.id, item.size, item.compression), )?; Ok(()) } pub fn delete_item(conn: &Connection, item: Item) -> Result<()> { debug!("DB: Deleting item: {:?}", item); conn.execute("DELETE FROM items WHERE id=?1", [item.id])?; Ok(()) } pub fn query_delete_meta(conn: &Connection, meta: Meta) -> Result<()> { debug!("DB: Deleting meta: {:?}", meta); conn.execute( "DELETE FROM metas WHERE id=?1 AND name=?2", (meta.id, meta.name), )?; Ok(()) } pub fn query_upsert_meta(conn: &Connection, meta: Meta) -> Result<()> { debug!("DB: Inserting meta: {:?}", meta); conn.execute( "INSERT INTO metas (id, name, value) VALUES (?1, ?2, ?3) ON CONFLICT(id, name) DO UPDATE SET value=?3", (meta.id, meta.name, meta.value), )?; Ok(()) } pub fn store_meta(conn: &Connection, meta: Meta) -> Result<()> { debug!("DB: Storing meta: {:?}", meta); if meta.value.is_empty() { query_delete_meta(conn, meta)?; } else { query_upsert_meta(conn, meta)?; } Ok(()) } pub fn insert_tag(conn: &Connection, tag: Tag) -> Result<()> { debug!("DB: Inserting tag: {:?}", tag); conn.execute( "INSERT INTO tags (id, name) VALUES (?1, ?2)", (tag.id, tag.name), )?; Ok(()) } pub fn delete_item_tags(conn: &Connection, item: Item) -> Result<()> { debug!("DB: Deleting all item tags: {:?}", item); conn.execute("DELETE FROM tags WHERE id=?1", [item.id])?; Ok(()) } pub fn set_item_tags(conn: &Connection, item: Item, tags: &Vec) -> Result<()> { debug!("DB: Setting tags for item: {:?} ?{:?}", item, tags); delete_item_tags(conn, item.clone())?; let item_id = item.id.unwrap(); for tag_name in tags { insert_tag( conn, Tag { id: item_id, name: tag_name.to_string(), }, )?; } Ok(()) } pub fn query_all_items(conn: &Connection) -> Result> { debug!("DB: Querying all items"); let mut statement = conn .prepare("SELECT id, ts, size, compression FROM items ORDER BY id ASC") .context("Problem preparing SQL statement")?; let mut rows = statement.query([])?; let mut items = Vec::new(); while let Some(row) = rows.next()? { let item = Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, }; items.push(item); } Ok(items) } pub fn query_tagged_items<'a>(conn: &'a Connection, tags: &'a Vec) -> Result> { debug!("DB: Querying tagged items: {:?}", tags); let mut statement = conn .prepare_cached( " SELECT items.id, items.ts, items.size, items.compression, count(tags_match.id) as tags_score FROM items, (SELECT tags.id FROM tags WHERE tags.name IN rarray(?1)) as tags_match WHERE items.id = tags_match.id GROUP BY items.id HAVING tags_score = ?2 ORDER BY items.id ASC", ) .context("Problem preparing SQL statement")?; let tags_values: Vec = tags .iter() .map(|s| rusqlite::types::Value::from(s.clone())) .collect(); let tags_ptr = Rc::new(tags_values); let mut rows = statement.query((&tags_ptr, &tags.len()))?; let mut items = Vec::new(); while let Some(row) = rows.next()? { let item = Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, }; items.push(item); } Ok(items) } pub fn get_items(conn: &Connection) -> Result> { debug!("DB: Getting all items"); query_all_items(conn) } pub fn get_items_matching( conn: &Connection, tags: &Vec, meta: &HashMap, ) -> Result> { debug!( "DB: Getting items matching: tags={:?} meta={:?}", tags, meta ); let items = match tags.is_empty() { true => query_all_items(conn)?, false => query_tagged_items(conn, tags)?, }; if meta.is_empty() { debug!("DB: Not filtering on meta"); Ok(items) } else { debug!("DB: Filtering on meta"); let mut filtered_items: Vec = Vec::new(); for item in items.iter() { let mut item_ok = true; let mut item_meta: HashMap = HashMap::new(); for meta in get_item_meta(conn, item)? { item_meta.insert(meta.name, meta.value); } debug!("DB: Matching: {:?}: {:?}", item, item_meta); for (k, v) in meta.iter() { match item_meta.get(k) { Some(value) => item_ok = v.eq(value), None => item_ok = false, } if item_ok { break; } } if item_ok { filtered_items.push(item.clone()); } } Ok(filtered_items) } } pub fn get_item_matching( conn: &Connection, tags: &Vec, _meta: &HashMap, ) -> Result> { debug!("DB: Get item matching tags: {:?}", tags); let mut statement = conn .prepare_cached( " SELECT items.id, items.ts, items.size, items.compression, count(sel.id) as score FROM items, (SELECT tags.id FROM tags WHERE tags.name IN rarray(?1)) as sel WHERE items.id = sel.id GROUP BY items.id HAVING score = ?2 ORDER BY items.id DESC LIMIT 1", ) .context("Problem preparing SQL statement")?; let tags_values: Vec = tags .iter() .map(|s| rusqlite::types::Value::from(s.clone())) .collect(); let tags_ptr = Rc::new(tags_values); let mut rows = statement.query((&tags_ptr, &tags.len()))?; match rows.next()? { Some(row) => Ok(Some(Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, })), None => Ok(None), } } pub fn get_item(conn: &Connection, item_id: i64) -> Result> { debug!("DB: Getting item {:?}", item_id); let mut statement = conn .prepare_cached( " SELECT id, ts, size, compression FROM items WHERE items.id = ?1", ) .context("Problem preparing SQL statement")?; let mut rows = statement.query([item_id])?; match rows.next()? { Some(row) => Ok(Some(Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, })), None => Ok(None), } } pub fn get_item_last(conn: &Connection) -> Result> { debug!("DB: Getting last item"); let mut statement = conn .prepare_cached( " SELECT id, ts, size, compression FROM items ORDER BY id DESC LIMIT 1", ) .context("Problem preparing SQL statement")?; let mut rows = statement.query([])?; match rows.next()? { Some(row) => Ok(Some(Item { id: row.get(0)?, ts: row.get(1)?, size: row.get(2)?, compression: row.get(3)?, })), None => Ok(None), } } pub fn get_item_tags(conn: &Connection, item: &Item) -> Result> { debug!("DB: Getting tags for item: {:?}", item); let mut statement = conn .prepare_cached("SELECT id, name FROM tags WHERE id=?1 ORDER BY name ASC") .context("Problem preparing SQL statement")?; let mut rows = statement.query([item.id])?; let mut tags = Vec::new(); while let Some(row) = rows.next()? { tags.push(Tag { id: row.get(0)?, name: row.get(1)?, }); } Ok(tags) } pub fn get_item_meta(conn: &Connection, item: &Item) -> Result> { debug!("DB: Getting item meta: {:?}", item); let mut statement = conn .prepare_cached("SELECT id, name, value FROM metas WHERE id=?1 ORDER BY name ASC") .context("Problem preparing SQL statement")?; let mut rows = statement.query([item.id])?; let mut metas = Vec::new(); while let Some(row) = rows.next()? { metas.push(Meta { id: row.get(0)?, name: row.get(1)?, value: row.get(2)?, }); } Ok(metas) }