diff --git a/src/services/async_item_service.rs b/src/services/async_item_service.rs index 8b9fdde..ae582b7 100644 --- a/src/services/async_item_service.rs +++ b/src/services/async_item_service.rs @@ -41,48 +41,36 @@ impl AsyncItemService { } } - pub async fn get_item(&self, id: i64) -> Result { + async fn execute_blocking(&self, f: F) -> Result + where + F: FnOnce(&Connection, &ItemService) -> Result + Send + 'static, + T: Send + 'static, + { let db = self.db.clone(); let item_service = self.item_service.clone(); - + tokio::task::spawn_blocking(move || { let conn = db.blocking_lock(); - item_service.get_item(&conn, id) - }) - .await - .unwrap() // Propagate panics from spawn_blocking - } - - pub async fn get_item_content(&self, id: i64) -> Result { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - item_service.get_item_content(&conn, id) + f(&conn, &item_service) }) .await .unwrap() } + pub async fn get_item(&self, id: i64) -> Result { + self.execute_blocking(|conn, item_service| item_service.get_item(conn, id)).await + } + + pub async fn get_item_content(&self, id: i64) -> Result { + self.execute_blocking(|conn, item_service| item_service.get_item_content(conn, id)).await + } + pub async fn get_item_content_info( &self, id: i64, filter: Option, ) -> Result<(Vec, String, bool), CoreError> { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - item_service.get_item_content_info( - &conn, - id, - filter - ) - }) - .await - .unwrap() + self.execute_blocking(|conn, item_service| item_service.get_item_content_info(conn, id, filter)).await } pub async fn stream_item_content_by_id( @@ -92,17 +80,10 @@ impl AsyncItemService { offset: u64, length: u64, ) -> Result<(std::pin::Pin> + Send>>, String), CoreError> { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - // Get item content - let content = tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - let item_with_content = item_service.get_item_content(&conn, item_id)?; + let content = self.execute_blocking(|conn, item_service| { + let item_with_content = item_service.get_item_content(conn, item_id)?; Ok::<_, CoreError>(item_with_content.content) - }) - .await - .unwrap()?; + }).await?; // Clone content for use in the binary check closure let content_clone = content.clone(); @@ -285,19 +266,7 @@ impl AsyncItemService { item_id: i64, filter: Option, ) -> Result<(Box, String, bool), CoreError> { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - item_service.get_item_content_info_streaming( - &conn, - item_id, - filter - ) - }) - .await - .unwrap() + self.execute_blocking(|conn, item_service| item_service.get_item_content_info_streaming(conn, item_id, filter)).await } pub async fn find_item( @@ -306,15 +275,7 @@ impl AsyncItemService { tags: Vec, meta: HashMap, ) -> Result { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - item_service.find_item(&conn, &ids, &tags, &meta) - }) - .await - .unwrap() + self.execute_blocking(|conn, item_service| item_service.find_item(conn, &ids, &tags, &meta)).await } pub async fn list_items( @@ -322,15 +283,7 @@ impl AsyncItemService { tags: Vec, meta: HashMap, ) -> Result, CoreError> { - let db = self.db.clone(); - let item_service = self.item_service.clone(); - - tokio::task::spawn_blocking(move || { - let conn = db.blocking_lock(); - item_service.list_items(&conn, &tags, &meta) - }) - .await - .unwrap() + self.execute_blocking(|conn, item_service| item_service.list_items(conn, &tags, &meta)).await } pub async fn delete_item(&self, id: i64) -> Result<(), CoreError> {