refactor: Introduce execute_blocking to reduce code duplication in AsyncItemService

Co-authored-by: aider (openai/andrew/openrouter/deepseek/deepseek-chat-v3.1) <aider@aider.chat>
This commit is contained in:
Andrew Phillips
2025-09-03 07:29:30 -03:00
parent 493d28699c
commit 1bd3f95627

View File

@@ -41,48 +41,36 @@ impl AsyncItemService {
} }
} }
pub async fn get_item(&self, id: i64) -> Result<ItemWithMeta, CoreError> { async fn execute_blocking<F, T>(&self, f: F) -> Result<T, CoreError>
where
F: FnOnce(&Connection, &ItemService) -> Result<T, CoreError> + Send + 'static,
T: Send + 'static,
{
let db = self.db.clone(); let db = self.db.clone();
let item_service = self.item_service.clone(); let item_service = self.item_service.clone();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock(); let conn = db.blocking_lock();
item_service.get_item(&conn, id) f(&conn, &item_service)
})
.await
.unwrap() // Propagate panics from spawn_blocking
}
pub async fn get_item_content(&self, id: i64) -> Result<ItemWithContent, CoreError> {
let db = self.db.clone();
let item_service = self.item_service.clone();
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
item_service.get_item_content(&conn, id)
}) })
.await .await
.unwrap() .unwrap()
} }
pub async fn get_item(&self, id: i64) -> Result<ItemWithMeta, CoreError> {
self.execute_blocking(|conn, item_service| item_service.get_item(conn, id)).await
}
pub async fn get_item_content(&self, id: i64) -> Result<ItemWithContent, CoreError> {
self.execute_blocking(|conn, item_service| item_service.get_item_content(conn, id)).await
}
pub async fn get_item_content_info( pub async fn get_item_content_info(
&self, &self,
id: i64, id: i64,
filter: Option<String>, filter: Option<String>,
) -> Result<(Vec<u8>, String, bool), CoreError> { ) -> Result<(Vec<u8>, String, bool), CoreError> {
let db = self.db.clone(); self.execute_blocking(|conn, item_service| item_service.get_item_content_info(conn, id, filter)).await
let item_service = self.item_service.clone();
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
item_service.get_item_content_info(
&conn,
id,
filter
)
})
.await
.unwrap()
} }
pub async fn stream_item_content_by_id( pub async fn stream_item_content_by_id(
@@ -92,17 +80,10 @@ impl AsyncItemService {
offset: u64, offset: u64,
length: u64, length: u64,
) -> Result<(std::pin::Pin<Box<dyn tokio_stream::Stream<Item = Result<tokio_util::bytes::Bytes, std::io::Error>> + Send>>, String), CoreError> { ) -> Result<(std::pin::Pin<Box<dyn tokio_stream::Stream<Item = Result<tokio_util::bytes::Bytes, std::io::Error>> + Send>>, String), CoreError> {
let db = self.db.clone(); let content = self.execute_blocking(|conn, item_service| {
let item_service = self.item_service.clone(); let item_with_content = item_service.get_item_content(conn, item_id)?;
// Get item content
let content = tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
let item_with_content = item_service.get_item_content(&conn, item_id)?;
Ok::<_, CoreError>(item_with_content.content) Ok::<_, CoreError>(item_with_content.content)
}) }).await?;
.await
.unwrap()?;
// Clone content for use in the binary check closure // Clone content for use in the binary check closure
let content_clone = content.clone(); let content_clone = content.clone();
@@ -285,19 +266,7 @@ impl AsyncItemService {
item_id: i64, item_id: i64,
filter: Option<String>, filter: Option<String>,
) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> { ) -> Result<(Box<dyn Read + Send>, String, bool), CoreError> {
let db = self.db.clone(); self.execute_blocking(|conn, item_service| item_service.get_item_content_info_streaming(conn, item_id, filter)).await
let item_service = self.item_service.clone();
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
item_service.get_item_content_info_streaming(
&conn,
item_id,
filter
)
})
.await
.unwrap()
} }
pub async fn find_item( pub async fn find_item(
@@ -306,15 +275,7 @@ impl AsyncItemService {
tags: Vec<String>, tags: Vec<String>,
meta: HashMap<String, String>, meta: HashMap<String, String>,
) -> Result<ItemWithMeta, CoreError> { ) -> Result<ItemWithMeta, CoreError> {
let db = self.db.clone(); self.execute_blocking(|conn, item_service| item_service.find_item(conn, &ids, &tags, &meta)).await
let item_service = self.item_service.clone();
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
item_service.find_item(&conn, &ids, &tags, &meta)
})
.await
.unwrap()
} }
pub async fn list_items( pub async fn list_items(
@@ -322,15 +283,7 @@ impl AsyncItemService {
tags: Vec<String>, tags: Vec<String>,
meta: HashMap<String, String>, meta: HashMap<String, String>,
) -> Result<Vec<ItemWithMeta>, CoreError> { ) -> Result<Vec<ItemWithMeta>, CoreError> {
let db = self.db.clone(); self.execute_blocking(|conn, item_service| item_service.list_items(conn, &tags, &meta)).await
let item_service = self.item_service.clone();
tokio::task::spawn_blocking(move || {
let conn = db.blocking_lock();
item_service.list_items(&conn, &tags, &meta)
})
.await
.unwrap()
} }
pub async fn delete_item(&self, id: i64) -> Result<(), CoreError> { pub async fn delete_item(&self, id: i64) -> Result<(), CoreError> {