fix: update warnings to use log::warn
Co-authored-by: aider (openai/andrew/openrouter/mistralai/mistral-medium-3.1) <aider@aider.chat>
This commit is contained in:
@@ -8,6 +8,7 @@ use std::io::Read;
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
use log::warn;
|
||||||
|
|
||||||
/// An asynchronous wrapper around the `ItemService` for use in async contexts like the web server.
|
/// An asynchronous wrapper around the `ItemService` for use in async contexts like the web server.
|
||||||
/// It uses `tokio::task::spawn_blocking` to run synchronous database and filesystem operations
|
/// It uses `tokio::task::spawn_blocking` to run synchronous database and filesystem operations
|
||||||
@@ -61,7 +62,6 @@ impl AsyncItemService {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn stream_item_content_by_id(
|
pub async fn stream_item_content_by_id(
|
||||||
&self,
|
&self,
|
||||||
item_id: i64,
|
item_id: i64,
|
||||||
@@ -80,10 +80,10 @@ impl AsyncItemService {
|
|||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap()?;
|
.unwrap()?;
|
||||||
|
|
||||||
// Clone content for use in the binary check closure
|
// Clone content for use in the binary check closure
|
||||||
let content_clone = content.clone();
|
let content_clone = content.clone();
|
||||||
|
|
||||||
// Get metadata to determine MIME type and binary status
|
// Get metadata to determine MIME type and binary status
|
||||||
let (mime_type, is_binary) = {
|
let (mime_type, is_binary) = {
|
||||||
let db = self.db.clone();
|
let db = self.db.clone();
|
||||||
@@ -92,32 +92,32 @@ impl AsyncItemService {
|
|||||||
let conn = db.blocking_lock();
|
let conn = db.blocking_lock();
|
||||||
let item_with_meta = item_service.get_item(&conn, item_id)?;
|
let item_with_meta = item_service.get_item(&conn, item_id)?;
|
||||||
let metadata = item_with_meta.meta_as_map();
|
let metadata = item_with_meta.meta_as_map();
|
||||||
|
|
||||||
let mime_type = metadata
|
let mime_type = metadata
|
||||||
.get("mime_type")
|
.get("mime_type")
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
.unwrap_or_else(|| "application/octet-stream".to_string());
|
.unwrap_or_else(|| "application/octet-stream".to_string());
|
||||||
|
|
||||||
let is_binary = if let Some(binary_val) = metadata.get("binary") {
|
let is_binary = if let Some(binary_val) = metadata.get("binary") {
|
||||||
binary_val == "true"
|
binary_val == "true"
|
||||||
} else {
|
} else {
|
||||||
crate::common::is_binary::is_binary(&content_clone)
|
crate::common::is_binary::is_binary(&content_clone)
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok::<_, CoreError>((mime_type, is_binary))
|
Ok::<_, CoreError>((mime_type, is_binary))
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap()?
|
.unwrap()?
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check if content is binary when allow_binary is false
|
// Check if content is binary when allow_binary is false
|
||||||
if !allow_binary && is_binary {
|
if !allow_binary && is_binary {
|
||||||
return Err(CoreError::InvalidInput("Binary content not allowed".to_string()));
|
return Err(CoreError::InvalidInput("Binary content not allowed".to_string()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a stream that reads only the requested portion
|
// Create a stream that reads only the requested portion
|
||||||
let content_len = content.len() as u64;
|
let content_len = content.len() as u64;
|
||||||
|
|
||||||
// Apply offset and length constraints
|
// Apply offset and length constraints
|
||||||
let start = std::cmp::min(offset, content_len);
|
let start = std::cmp::min(offset, content_len);
|
||||||
let end = if length > 0 {
|
let end = if length > 0 {
|
||||||
@@ -125,14 +125,14 @@ impl AsyncItemService {
|
|||||||
} else {
|
} else {
|
||||||
content_len
|
content_len
|
||||||
};
|
};
|
||||||
|
|
||||||
let stream = if start < content_len {
|
let stream = if start < content_len {
|
||||||
let chunk = tokio_util::bytes::Bytes::from(content[start as usize..end as usize].to_vec());
|
let chunk = tokio_util::bytes::Bytes::from(content[start as usize..end as usize].to_vec());
|
||||||
Box::pin(tokio_stream::iter(vec![Ok(chunk)]))
|
Box::pin(tokio_stream::iter(vec![Ok(chunk)]))
|
||||||
} else {
|
} else {
|
||||||
Box::pin(tokio_stream::iter(vec![]))
|
Box::pin(tokio_stream::iter(vec![]))
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((stream, mime_type))
|
Ok((stream, mime_type))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -152,7 +152,7 @@ impl AsyncItemService {
|
|||||||
.get("mime_type")
|
.get("mime_type")
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
.unwrap_or_else(|| "application/octet-stream".to_string());
|
.unwrap_or_else(|| "application/octet-stream".to_string());
|
||||||
|
|
||||||
// Check if content is binary when allow_binary is false
|
// Check if content is binary when allow_binary is false
|
||||||
if !allow_binary {
|
if !allow_binary {
|
||||||
let is_binary = if let Some(binary_val) = metadata.get("binary") {
|
let is_binary = if let Some(binary_val) = metadata.get("binary") {
|
||||||
@@ -162,12 +162,12 @@ impl AsyncItemService {
|
|||||||
let (_, _, is_binary) = self.get_item_content_info_streaming(item_id).await?;
|
let (_, _, is_binary) = self.get_item_content_info_streaming(item_id).await?;
|
||||||
is_binary
|
is_binary
|
||||||
};
|
};
|
||||||
|
|
||||||
if is_binary {
|
if is_binary {
|
||||||
return Err(CoreError::InvalidInput("Binary content not allowed".to_string()));
|
return Err(CoreError::InvalidInput("Binary content not allowed".to_string()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a streaming reader for the content
|
// Get a streaming reader for the content
|
||||||
let reader = {
|
let reader = {
|
||||||
let db = self.db.clone();
|
let db = self.db.clone();
|
||||||
@@ -176,28 +176,28 @@ impl AsyncItemService {
|
|||||||
let conn = db.blocking_lock();
|
let conn = db.blocking_lock();
|
||||||
let item_with_meta = item_service.get_item(&conn, item_id)?;
|
let item_with_meta = item_service.get_item(&conn, item_id)?;
|
||||||
let item_id_val = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
let item_id_val = item_with_meta.item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
||||||
|
|
||||||
let mut item_path = item_service.get_data_path().clone();
|
let mut item_path = item_service.get_data_path().clone();
|
||||||
item_path.push(item_id_val.to_string());
|
item_path.push(item_id_val.to_string());
|
||||||
|
|
||||||
let reader = item_service.get_compression_service().stream_item_content(
|
let reader = item_service.get_compression_service().stream_item_content(
|
||||||
item_path,
|
item_path,
|
||||||
&item_with_meta.item.compression
|
&item_with_meta.item.compression
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok::<_, CoreError>(reader)
|
Ok::<_, CoreError>(reader)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap()?
|
.unwrap()?
|
||||||
};
|
};
|
||||||
|
|
||||||
// Convert the reader into an async stream manually
|
// Convert the reader into an async stream manually
|
||||||
// Since ReaderStream requires AsyncRead, we'll create our own implementation
|
// Since ReaderStream requires AsyncRead, we'll create our own implementation
|
||||||
use tokio_util::bytes::Bytes;
|
use tokio_util::bytes::Bytes;
|
||||||
|
|
||||||
// Create a channel to stream data between the blocking thread and async runtime
|
// Create a channel to stream data between the blocking thread and async runtime
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||||
|
|
||||||
// Spawn a blocking task to read from the reader and send chunks
|
// Spawn a blocking task to read from the reader and send chunks
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
let mut reader = reader;
|
let mut reader = reader;
|
||||||
@@ -217,11 +217,11 @@ impl AsyncItemService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read and send data up to the specified length
|
// Read and send data up to the specified length
|
||||||
let mut remaining_length = length;
|
let mut remaining_length = length;
|
||||||
let mut buffer = [0; PIPESIZE];
|
let mut buffer = [0; PIPESIZE];
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Determine how much to read in this iteration
|
// Determine how much to read in this iteration
|
||||||
let to_read = if length > 0 {
|
let to_read = if length > 0 {
|
||||||
@@ -230,11 +230,11 @@ impl AsyncItemService {
|
|||||||
} else {
|
} else {
|
||||||
buffer.len()
|
buffer.len()
|
||||||
};
|
};
|
||||||
|
|
||||||
if to_read == 0 {
|
if to_read == 0 {
|
||||||
break; // We've read the requested length
|
break; // We've read the requested length
|
||||||
}
|
}
|
||||||
|
|
||||||
match reader.read(&mut buffer[..to_read]) {
|
match reader.read(&mut buffer[..to_read]) {
|
||||||
Ok(0) => break, // EOF
|
Ok(0) => break, // EOF
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
@@ -257,10 +257,10 @@ impl AsyncItemService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Convert the receiver into a stream
|
// Convert the receiver into a stream
|
||||||
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
|
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
|
||||||
|
|
||||||
Ok((Box::pin(stream), mime_type))
|
Ok((Box::pin(stream), mime_type))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use crate::db::{self, Meta};
|
|||||||
use crate::compression_engine::{get_compression_engine, CompressionType};
|
use crate::compression_engine::{get_compression_engine, CompressionType};
|
||||||
use crate::modes::common::settings_compression_type;
|
use crate::modes::common::settings_compression_type;
|
||||||
use clap::Command;
|
use clap::Command;
|
||||||
use log::debug;
|
use log::{debug, warn};
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
@@ -55,12 +55,12 @@ impl ItemService {
|
|||||||
let mut item_path = self.data_path.clone();
|
let mut item_path = self.data_path.clone();
|
||||||
item_path.push(item_id.to_string());
|
item_path.push(item_id.to_string());
|
||||||
debug!("ITEM_SERVICE: Reading content from path: {:?}", item_path);
|
debug!("ITEM_SERVICE: Reading content from path: {:?}", item_path);
|
||||||
|
|
||||||
let content = self
|
let content = self
|
||||||
.compression_service
|
.compression_service
|
||||||
.get_item_content(item_path, &item_with_meta.item.compression)?;
|
.get_item_content(item_path, &item_with_meta.item.compression)?;
|
||||||
debug!("ITEM_SERVICE: Read {} bytes of content for item {}", content.len(), id);
|
debug!("ITEM_SERVICE: Read {} bytes of content for item {}", content.len(), id);
|
||||||
|
|
||||||
Ok(ItemWithContent {
|
Ok(ItemWithContent {
|
||||||
item_with_meta,
|
item_with_meta,
|
||||||
content,
|
content,
|
||||||
@@ -70,19 +70,19 @@ impl ItemService {
|
|||||||
pub fn get_item_content_info(&self, conn: &Connection, id: i64) -> Result<(Vec<u8>, String, bool), CoreError> {
|
pub fn get_item_content_info(&self, conn: &Connection, id: i64) -> Result<(Vec<u8>, String, bool), CoreError> {
|
||||||
let item_with_content = self.get_item_content(conn, id)?;
|
let item_with_content = self.get_item_content(conn, id)?;
|
||||||
let metadata = item_with_content.item_with_meta.meta_as_map();
|
let metadata = item_with_content.item_with_meta.meta_as_map();
|
||||||
|
|
||||||
let mime_type = metadata
|
let mime_type = metadata
|
||||||
.get("mime_type")
|
.get("mime_type")
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
.unwrap_or_else(|| "application/octet-stream".to_string());
|
.unwrap_or_else(|| "application/octet-stream".to_string());
|
||||||
|
|
||||||
// Check if content is binary
|
// Check if content is binary
|
||||||
let is_binary = if let Some(binary_val) = metadata.get("binary") {
|
let is_binary = if let Some(binary_val) = metadata.get("binary") {
|
||||||
binary_val == "true"
|
binary_val == "true"
|
||||||
} else {
|
} else {
|
||||||
crate::common::is_binary::is_binary(&item_with_content.content)
|
crate::common::is_binary::is_binary(&item_with_content.content)
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((item_with_content.content, mime_type, is_binary))
|
Ok((item_with_content.content, mime_type, is_binary))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,29 +100,29 @@ impl ItemService {
|
|||||||
|
|
||||||
let mut item_path = self.data_path.clone();
|
let mut item_path = self.data_path.clone();
|
||||||
item_path.push(item_id.to_string());
|
item_path.push(item_id.to_string());
|
||||||
|
|
||||||
let reader = self.compression_service.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
|
let reader = self.compression_service.stream_item_content(item_path.clone(), &item_with_meta.item.compression)?;
|
||||||
|
|
||||||
let metadata = item_with_meta.meta_as_map();
|
let metadata = item_with_meta.meta_as_map();
|
||||||
let mime_type = metadata
|
let mime_type = metadata
|
||||||
.get("mime_type")
|
.get("mime_type")
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
.unwrap_or_else(|| "application/octet-stream".to_string());
|
.unwrap_or_else(|| "application/octet-stream".to_string());
|
||||||
|
|
||||||
// Check if content is binary using only the first 8192 bytes
|
// Check if content is binary using only the first 8192 bytes
|
||||||
let is_binary = if let Some(binary_val) = metadata.get("binary") {
|
let is_binary = if let Some(binary_val) = metadata.get("binary") {
|
||||||
binary_val == "true"
|
binary_val == "true"
|
||||||
} else {
|
} else {
|
||||||
// Read only the first 8192 bytes for binary detection
|
// Read only the first 8192 bytes for binary detection
|
||||||
let mut sample_reader = self.compression_service.stream_item_content(
|
let mut sample_reader = self.compression_service.stream_item_content(
|
||||||
item_path,
|
item_path,
|
||||||
&item_with_meta.item.compression
|
&item_with_meta.item.compression
|
||||||
)?;
|
)?;
|
||||||
let mut sample_buffer = vec![0; 8192];
|
let mut sample_buffer = vec![0; 8192];
|
||||||
let bytes_read = sample_reader.read(&mut sample_buffer)?;
|
let bytes_read = sample_reader.read(&mut sample_buffer)?;
|
||||||
crate::common::is_binary::is_binary(&sample_buffer[..bytes_read])
|
crate::common::is_binary::is_binary(&sample_buffer[..bytes_read])
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((reader, mime_type, is_binary))
|
Ok((reader, mime_type, is_binary))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,14 +145,14 @@ impl ItemService {
|
|||||||
|
|
||||||
let item = item_maybe.ok_or(CoreError::ItemNotFoundGeneric)?;
|
let item = item_maybe.ok_or(CoreError::ItemNotFoundGeneric)?;
|
||||||
debug!("ITEM_SERVICE: Found matching item: {:?}", item);
|
debug!("ITEM_SERVICE: Found matching item: {:?}", item);
|
||||||
|
|
||||||
// Get tags and meta directly instead of calling get_item which makes redundant queries
|
// Get tags and meta directly instead of calling get_item which makes redundant queries
|
||||||
let item_id = item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
let item_id = item.id.ok_or_else(|| CoreError::InvalidInput("Item missing ID".to_string()))?;
|
||||||
let tags = db::get_item_tags(conn, &item)?;
|
let tags = db::get_item_tags(conn, &item)?;
|
||||||
debug!("ITEM_SERVICE: Found {} tags for item {}", tags.len(), item_id);
|
debug!("ITEM_SERVICE: Found {} tags for item {}", tags.len(), item_id);
|
||||||
let meta = db::get_item_meta(conn, &item)?;
|
let meta = db::get_item_meta(conn, &item)?;
|
||||||
debug!("ITEM_SERVICE: Found {} meta entries for item {}", meta.len(), item_id);
|
debug!("ITEM_SERVICE: Found {} meta entries for item {}", meta.len(), item_id);
|
||||||
|
|
||||||
Ok(ItemWithMeta { item, tags, meta })
|
Ok(ItemWithMeta { item, tags, meta })
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -160,7 +160,7 @@ impl ItemService {
|
|||||||
debug!("ITEM_SERVICE: Listing items with tags: {:?}, meta: {:?}", tags, meta);
|
debug!("ITEM_SERVICE: Listing items with tags: {:?}, meta: {:?}", tags, meta);
|
||||||
let items = db::get_items_matching(conn, &tags.to_vec(), meta)?;
|
let items = db::get_items_matching(conn, &tags.to_vec(), meta)?;
|
||||||
debug!("ITEM_SERVICE: Found {} matching items", items.len());
|
debug!("ITEM_SERVICE: Found {} matching items", items.len());
|
||||||
|
|
||||||
let item_ids: Vec<i64> = items.iter().filter_map(|item| item.id).collect();
|
let item_ids: Vec<i64> = items.iter().filter_map(|item| item.id).collect();
|
||||||
if item_ids.is_empty() {
|
if item_ids.is_empty() {
|
||||||
debug!("ITEM_SERVICE: No items found, returning empty list");
|
debug!("ITEM_SERVICE: No items found, returning empty list");
|
||||||
@@ -170,17 +170,17 @@ impl ItemService {
|
|||||||
debug!("ITEM_SERVICE: Getting tags and meta for {} items", item_ids.len());
|
debug!("ITEM_SERVICE: Getting tags and meta for {} items", item_ids.len());
|
||||||
let tags_map = db::get_tags_for_items(conn, &item_ids)?;
|
let tags_map = db::get_tags_for_items(conn, &item_ids)?;
|
||||||
let meta_map_db = db::get_meta_for_items(conn, &item_ids)?;
|
let meta_map_db = db::get_meta_for_items(conn, &item_ids)?;
|
||||||
|
|
||||||
let mut result = Vec::new();
|
let mut result = Vec::new();
|
||||||
for item in items {
|
for item in items {
|
||||||
let item_id = item.id.unwrap();
|
let item_id = item.id.unwrap();
|
||||||
let tags = tags_map.get(&item_id).cloned().unwrap_or_default();
|
let tags = tags_map.get(&item_id).cloned().unwrap_or_default();
|
||||||
let meta_hm = meta_map_db.get(&item_id).cloned().unwrap_or_default();
|
let meta_hm = meta_map_db.get(&item_id).cloned().unwrap_or_default();
|
||||||
let meta = meta_hm.into_iter().map(|(name, value)| Meta { id: item_id, name, value }).collect();
|
let meta = meta_hm.into_iter().map(|(name, value)| Meta { id: item_id, name, value }).collect();
|
||||||
|
|
||||||
result.push(ItemWithMeta { item, tags, meta });
|
result.push(ItemWithMeta { item, tags, meta });
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("ITEM_SERVICE: Returning {} items with full metadata", result.len());
|
debug!("ITEM_SERVICE: Returning {} items with full metadata", result.len());
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
@@ -205,7 +205,7 @@ impl ItemService {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn save_item<R: Read>(
|
pub fn save_item<R: Read>(
|
||||||
&self,
|
&self,
|
||||||
mut input: R,
|
mut input: R,
|
||||||
@@ -225,7 +225,7 @@ impl ItemService {
|
|||||||
let compression_engine = get_compression_engine(compression_type.clone())?;
|
let compression_engine = get_compression_engine(compression_type.clone())?;
|
||||||
|
|
||||||
let tx = conn.transaction()?;
|
let tx = conn.transaction()?;
|
||||||
|
|
||||||
let item_id;
|
let item_id;
|
||||||
let mut item;
|
let mut item;
|
||||||
{
|
{
|
||||||
@@ -264,17 +264,17 @@ impl ItemService {
|
|||||||
let _ = writeln!(t, "KEEP: New item: {} tags: {:?}", item_id, tags);
|
let _ = writeln!(t, "KEEP: New item: {} tags: {:?}", item_id, tags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut plugins = self.meta_service.get_plugins(cmd, settings);
|
let mut plugins = self.meta_service.get_plugins(cmd, settings);
|
||||||
debug!("ITEM_SERVICE: Got {} meta plugins", plugins.len());
|
debug!("ITEM_SERVICE: Got {} meta plugins", plugins.len());
|
||||||
self.meta_service.initialize_plugins(&mut plugins, &tx, item_id);
|
self.meta_service.initialize_plugins(&mut plugins, &tx, item_id);
|
||||||
|
|
||||||
let mut item_path = self.data_path.clone();
|
let mut item_path = self.data_path.clone();
|
||||||
item_path.push(item_id.to_string());
|
item_path.push(item_id.to_string());
|
||||||
debug!("ITEM_SERVICE: Writing item to path: {:?}", item_path);
|
debug!("ITEM_SERVICE: Writing item to path: {:?}", item_path);
|
||||||
|
|
||||||
let mut item_out = compression_engine.create(item_path.clone())?;
|
let mut item_out = compression_engine.create(item_path.clone())?;
|
||||||
|
|
||||||
let mut buffer = [0; PIPESIZE];
|
let mut buffer = [0; PIPESIZE];
|
||||||
let mut total_bytes = 0;
|
let mut total_bytes = 0;
|
||||||
|
|
||||||
@@ -282,25 +282,25 @@ impl ItemService {
|
|||||||
loop {
|
loop {
|
||||||
let n = input.read(&mut buffer)?;
|
let n = input.read(&mut buffer)?;
|
||||||
if n == 0 { break; }
|
if n == 0 { break; }
|
||||||
|
|
||||||
total_bytes += n as i64;
|
total_bytes += n as i64;
|
||||||
item_out.write_all(&buffer[..n])?;
|
item_out.write_all(&buffer[..n])?;
|
||||||
self.meta_service.process_chunk(&mut plugins, &buffer[..n], &tx);
|
self.meta_service.process_chunk(&mut plugins, &buffer[..n], &tx);
|
||||||
}
|
}
|
||||||
debug!("ITEM_SERVICE: Processed {} bytes total", total_bytes);
|
debug!("ITEM_SERVICE: Processed {} bytes total", total_bytes);
|
||||||
|
|
||||||
item_out.flush()?;
|
item_out.flush()?;
|
||||||
drop(item_out);
|
drop(item_out);
|
||||||
|
|
||||||
debug!("ITEM_SERVICE: Finalizing meta plugins");
|
debug!("ITEM_SERVICE: Finalizing meta plugins");
|
||||||
self.meta_service.finalize_plugins(&mut plugins, &tx);
|
self.meta_service.finalize_plugins(&mut plugins, &tx);
|
||||||
|
|
||||||
item.size = Some(total_bytes);
|
item.size = Some(total_bytes);
|
||||||
db::update_item(&tx, item.clone())?;
|
db::update_item(&tx, item.clone())?;
|
||||||
|
|
||||||
tx.commit()?;
|
tx.commit()?;
|
||||||
debug!("ITEM_SERVICE: Transaction committed successfully");
|
debug!("ITEM_SERVICE: Transaction committed successfully");
|
||||||
|
|
||||||
Ok(item_id)
|
Ok(item_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -311,7 +311,7 @@ impl ItemService {
|
|||||||
metadata: &HashMap<String, String>,
|
metadata: &HashMap<String, String>,
|
||||||
conn: &mut Connection,
|
conn: &mut Connection,
|
||||||
) -> Result<ItemWithMeta, CoreError> {
|
) -> Result<ItemWithMeta, CoreError> {
|
||||||
debug!("ITEM_SERVICE: Starting save_item_from_mcp with {} bytes, {} tags, {} metadata entries",
|
debug!("ITEM_SERVICE: Starting save_item_from_mcp with {} bytes, {} tags, {} metadata entries",
|
||||||
content.len(), tags.len(), metadata.len());
|
content.len(), tags.len(), metadata.len());
|
||||||
let compression_type = CompressionType::LZ4;
|
let compression_type = CompressionType::LZ4;
|
||||||
let compression_engine = get_compression_engine(compression_type.clone())?;
|
let compression_engine = get_compression_engine(compression_type.clone())?;
|
||||||
|
|||||||
@@ -51,11 +51,11 @@ impl MetaService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = meta_plugin.configure_outputs(&configured_outputs) {
|
if let Err(e) = meta_plugin.configure_outputs(&configured_outputs) {
|
||||||
eprintln!("Warning: Failed to configure outputs for meta plugin '{}': {}", plugin_name, e);
|
log::warn!("Warning: Failed to configure outputs for meta plugin '{}': {}", plugin_name, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = meta_plugin.configure_options(&configured_options) {
|
if let Err(e) = meta_plugin.configure_options(&configured_options) {
|
||||||
eprintln! (
|
log::warn!(
|
||||||
"Warning: Failed to configure options for meta plugin '{}': {}",
|
"Warning: Failed to configure options for meta plugin '{}': {}",
|
||||||
plugin_name, e
|
plugin_name, e
|
||||||
);
|
);
|
||||||
@@ -67,8 +67,7 @@ impl MetaService {
|
|||||||
let original_len = meta_plugins.len();
|
let original_len = meta_plugins.len();
|
||||||
meta_plugins.retain(|meta_plugin| meta_plugin.is_supported());
|
meta_plugins.retain(|meta_plugin| meta_plugin.is_supported());
|
||||||
if meta_plugins.len() < original_len {
|
if meta_plugins.len() < original_len {
|
||||||
// This is not perfect as it doesn't say which one, but avoids complex logic from save.rs
|
log::warn!("Warning: Some meta plugins are enabled but not supported on this system");
|
||||||
eprintln!("Warning: Some meta plugins are enabled but not supported on this system");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
meta_plugins
|
meta_plugins
|
||||||
@@ -82,7 +81,7 @@ impl MetaService {
|
|||||||
) {
|
) {
|
||||||
// Check for duplicate output names before initializing plugins
|
// Check for duplicate output names before initializing plugins
|
||||||
let mut output_names: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
|
let mut output_names: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
|
||||||
|
|
||||||
for plugin in plugins.iter() {
|
for plugin in plugins.iter() {
|
||||||
let plugin_name = plugin.meta_name();
|
let plugin_name = plugin.meta_name();
|
||||||
// For each plugin, collect all the output names it might write to
|
// For each plugin, collect all the output names it might write to
|
||||||
@@ -93,7 +92,7 @@ impl MetaService {
|
|||||||
serde_yaml::Value::Bool(false) => continue, // This output is disabled
|
serde_yaml::Value::Bool(false) => continue, // This output is disabled
|
||||||
_ => internal_name.clone(), // Default to internal name for other types
|
_ => internal_name.clone(), // Default to internal name for other types
|
||||||
};
|
};
|
||||||
|
|
||||||
// Only track outputs that will actually be written
|
// Only track outputs that will actually be written
|
||||||
if !matches!(output_config, serde_yaml::Value::Bool(false)) {
|
if !matches!(output_config, serde_yaml::Value::Bool(false)) {
|
||||||
output_names.entry(output_name)
|
output_names.entry(output_name)
|
||||||
@@ -102,16 +101,16 @@ impl MetaService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Print warnings for duplicate output names
|
// Print warnings for duplicate output names
|
||||||
for (output_name, plugin_names) in &output_names {
|
for (output_name, plugin_names) in &output_names {
|
||||||
if plugin_names.len() > 1 {
|
if plugin_names.len() > 1 {
|
||||||
log::warn!("META_SERVICE: Output name '{}' is provided by multiple plugins: {}",
|
log::warn!("META_SERVICE: Output name '{}' is provided by multiple plugins: {}",
|
||||||
output_name,
|
output_name,
|
||||||
plugin_names.join(", "));
|
plugin_names.join(", "));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for meta_plugin in plugins.iter_mut() {
|
for meta_plugin in plugins.iter_mut() {
|
||||||
if let Err(e) = meta_plugin.initialize(conn, item_id) {
|
if let Err(e) = meta_plugin.initialize(conn, item_id) {
|
||||||
log::warn!("META_SERVICE: Failed to initialize meta plugin: {}", e);
|
log::warn!("META_SERVICE: Failed to initialize meta plugin: {}", e);
|
||||||
@@ -133,7 +132,7 @@ impl MetaService {
|
|||||||
pub fn finalize_plugins(&self, plugins: &mut [Box<dyn MetaPlugin>], conn: &Connection) {
|
pub fn finalize_plugins(&self, plugins: &mut [Box<dyn MetaPlugin>], conn: &Connection) {
|
||||||
for meta_plugin in plugins.iter_mut() {
|
for meta_plugin in plugins.iter_mut() {
|
||||||
if let Err(e) = meta_plugin.finalize(conn) {
|
if let Err(e) = meta_plugin.finalize(conn) {
|
||||||
eprintln!("Warning: Failed to finalize meta plugin: {}", e);
|
log::warn!("Warning: Failed to finalize meta plugin: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user