feat: plugin-declared parallel execution, switch to env_logger, update deps

Parallel execution (opt-in via MetaPlugin::parallel_safe):
- Add Send bound to MetaPlugin, parallel_safe() method (default false)
- Override to true in digest, tokens, exec, magic_file plugins
- MetaService: std::thread::scope for initialize_plugins and process_chunk
- Extract plugins via NullMetaPlugin sentinel + std::mem::replace (no unsafe)
- Panic tracking: join errors logged, NullMetaPlugin restored and finalized
- MetaPluginExec: Box<dyn Write> -> Box<dyn Write + Send>
- SendCookie wrapper for libmagic Cookie with unsafe impl Send

Logging (stderrlog -> env_logger):
- Custom format: [SSSSSS.mmm] LEVEL [module:] message (time-since-start ms)
- Default level: Warn (matches previous behavior)
- -v: Debug, -vv+: Trace, -q: off
- -vv+ shows module path

Maintenance:
- Bump deps: thiserror 2.0, config 0.15, dns-lookup 3.0, lz4_flex 0.12,
  ringbuf 0.4, rand 0.9, lazy_static 1.5, env_logger 0.11
- Update Cargo.lock (186 transitive packages)
- Clippy fixes: is_multiple_of, to_string_in_format_args, collapsible_if
- Fix double-counting bug in TokensMetaPlugin::update
- Fix schema description using plugin.description()

Co-Authored-By: opencode <noreply@opencode.ai>
This commit is contained in:
2026-03-13 21:49:51 -03:00
parent e7d8a83369
commit a07bb6b350
12 changed files with 1227 additions and 853 deletions

1585
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -10,67 +10,67 @@ categories = ["command-line-utilities"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
anyhow = "1.0.72" anyhow = "1.0"
axum = { version = "0.8.4", optional = true } axum = { version = "0.8", optional = true }
derive_more = { version = "2.0", features = ["full"] } derive_more = { version = "2.0", features = ["full"] }
smart-default = "0.7" smart-default = "0.7"
thiserror = "1.0" thiserror = "2.0"
base64 = "0.22.1" base64 = "0.22"
chrono = { version = "0.4.26", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.3.10", features = ["derive", "env"] } clap = { version = "4.6", features = ["derive", "env"] }
config = "0.14.0" config = "0.15"
ctor = "0.2" ctor = "0.2"
directories = "6.0.0" directories = "6.0"
dns-lookup = "2.0.2" dns-lookup = "3.0"
enum-map = "2.6.1" enum-map = "2.7"
flate2 = { version = "1.0.27", features = ["zlib-ng-compat"], optional = true } flate2 = { version = "1.0", features = ["zlib-ng-compat"], optional = true }
futures = "0.3" futures = "0.3"
gethostname = "1.0.2" gethostname = "1.0"
humansize = "2.1.3" humansize = "2.1"
async-stream = "0.3" async-stream = "0.3"
hyper = { version = "1.0", features = ["full"] } hyper = { version = "1.0", features = ["full"] }
http-body-util = "0.1" http-body-util = "0.1"
inventory = "0.3" inventory = "0.3"
is-terminal = "0.4.9" is-terminal = "0.4"
lazy_static = "1.4.0" lazy_static = "1.5"
libc = "0.2.147" libc = "0.2"
local-ip-address = "0.6.5" local-ip-address = "0.6"
log = "0.4.19" log = "0.4"
lz4_flex = { version = "0.11.1", optional = true } lz4_flex = { version = "0.12", optional = true }
magic = { version = "0.13.0", optional = true } magic = { version = "0.13", optional = true }
nix = "0.30.1" nix = "0.30"
once_cell = "1.19.0" once_cell = "1.21"
comfy-table = "7.2.0" comfy-table = "7.2"
pwhash = "1.0.0" pwhash = "1.0"
regex = "1.9.5" regex = "1.10"
ringbuf = "0.3" ringbuf = "0.4"
rmcp = { version = "0.2.0", features = ["server"], optional = true } rmcp = { version = "0.2", features = ["server"], optional = true }
rusqlite = { version = "0.37.0", features = ["bundled", "array", "chrono"] } rusqlite = { version = "0.37", features = ["bundled", "array", "chrono"] }
rusqlite_migration = "2.3.0" rusqlite_migration = "2.3"
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.142" serde_json = "1.0"
serde_yaml = "0.9.34" serde_yaml = "0.9"
sha2 = "0.10.0" sha2 = "0.10"
md5 = "0.7.0" md5 = "0.7"
subtle = "2.6" subtle = "2.6"
stderrlog = "0.6.0" env_logger = "0.11"
strum = { version = "0.27.2", features = ["derive"] } strum = { version = "0.27", features = ["derive"] }
term = "1.1.0" term = "1.2"
tokio = { version = "1.0", features = ["full"] } tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1" tokio-stream = "0.1"
tokio-util = "0.7.16" tokio-util = "0.7"
tower = { version = "0.5.2", optional = true } tower = { version = "0.5", optional = true }
tower-http = { version = "0.6.6", features = ["cors", "fs", "trace"], optional = true } tower-http = { version = "0.6", features = ["cors", "fs", "trace"], optional = true }
utoipa = { version = "5.4.0", features = ["axum_extras"], optional = true } utoipa = { version = "5.4", features = ["axum_extras"], optional = true }
utoipa-swagger-ui = { version = "9.0.2", features = ["axum"], optional = true } utoipa-swagger-ui = { version = "9.0", features = ["axum"], optional = true }
uzers = "0.12.1" uzers = "0.12"
which = "8.0.0" which = "8.0"
xdg = "2.5.2" xdg = "2.5"
strip-ansi-escapes = "0.2.1" strip-ansi-escapes = "0.2"
pest = "2.8.1" pest = "2.8"
pest_derive = "2.8.1" pest_derive = "2.8"
dirs = "6.0.0" dirs = "6.0"
similar = { version = "2.7.0", default-features = false, features = ["text"] } similar = { version = "2.7", default-features = false, features = ["text"] }
ureq = { version = "3", features = ["json"], optional = true } ureq = { version = "3", features = ["json"], optional = true }
os_pipe = { version = "1", optional = true } os_pipe = { version = "1", optional = true }
axum-server = { version = "0.8", features = ["tls-rustls"], optional = true } axum-server = { version = "0.8", features = ["tls-rustls"], optional = true }
@@ -118,5 +118,5 @@ tls = ["dep:axum-server"]
tokens = ["dep:tiktoken-rs"] tokens = ["dep:tiktoken-rs"]
[dev-dependencies] [dev-dependencies]
tempfile = "3.3.0" tempfile = "3.3"
rand = "0.8.5" rand = "0.9"

View File

@@ -149,7 +149,7 @@ fn has_binary_signature(data: &[u8]) -> bool {
/// Check if data looks like UTF-16 without BOM /// Check if data looks like UTF-16 without BOM
fn looks_like_utf16(data: &[u8]) -> bool { fn looks_like_utf16(data: &[u8]) -> bool {
if data.len() < 4 || data.len() % 2 != 0 { if data.len() < 4 || !data.len().is_multiple_of(2) {
return false; return false;
} }

View File

@@ -232,9 +232,6 @@ pub fn get_compression_engine(ct: CompressionType) -> Result<Box<dyn Compression
if engine.is_supported() { if engine.is_supported() {
Ok(engine.clone()) Ok(engine.clone())
} else { } else {
Err(anyhow!( Err(anyhow!("Compression engine for {ct} is not supported",))
"Compression engine for {} is not supported",
ct.to_string()
))
} }
} }

View File

@@ -1,3 +1,6 @@
use std::io::Write;
use std::time::Instant;
use anyhow::{Context, Error, Result, anyhow}; use anyhow::{Context, Error, Result, anyhow};
use clap::error::ErrorKind; use clap::error::ErrorKind;
use clap::*; use clap::*;
@@ -25,13 +28,36 @@ fn main() -> Result<(), Error> {
cmd.error(ErrorKind::ValueValidation, e).exit(); cmd.error(ErrorKind::ValueValidation, e).exit();
} }
stderrlog::new() let start = Instant::now();
.module(module_path!()) let mut builder = env_logger::Builder::new();
.quiet(args.options.quiet) let show_module = args.options.verbose >= 2;
.verbosity(usize::from(args.options.verbose + 2)) builder.format(move |buf, record| {
//.timestamp(stderrlog::Timestamp::Second) let elapsed = start.elapsed();
.init() let ts = format!("[{:>6}.{:03}]", elapsed.as_secs(), elapsed.subsec_millis());
.unwrap(); if show_module {
writeln!(
buf,
"{} {:<5} {}: {}",
ts,
record.level(),
record.module_path().unwrap_or("?"),
record.args()
)
} else {
writeln!(buf, "{} {:<5} {}", ts, record.level(), record.args())
}
});
let max_level = if args.options.quiet {
LevelFilter::Off
} else {
match args.options.verbose {
0 => LevelFilter::Warn,
1 => LevelFilter::Debug,
_ => LevelFilter::Trace,
}
};
builder.filter_module("keep", max_level);
builder.init();
debug!("MAIN: Start"); debug!("MAIN: Start");

View File

@@ -258,6 +258,10 @@ impl MetaPlugin for DigestMetaPlugin {
) -> anyhow::Result<&mut std::collections::HashMap<String, serde_yaml::Value>> { ) -> anyhow::Result<&mut std::collections::HashMap<String, serde_yaml::Value>> {
Ok(self.base.options_mut()) Ok(self.base.options_mut())
} }
fn parallel_safe(&self) -> bool {
true
}
} }
use crate::meta_plugin::register_meta_plugin; use crate::meta_plugin::register_meta_plugin;

View File

@@ -20,7 +20,7 @@ pub struct MetaPluginExec {
pub supported: bool, pub supported: bool,
pub split_whitespace: bool, pub split_whitespace: bool,
process: Option<Child>, process: Option<Child>,
writer: Option<Box<dyn Write>>, writer: Option<Box<dyn Write + Send>>,
result: Option<String>, result: Option<String>,
base: BaseMetaPlugin, base: BaseMetaPlugin,
} }
@@ -263,6 +263,10 @@ impl MetaPlugin for MetaPluginExec {
fn default_outputs(&self) -> Vec<String> { fn default_outputs(&self) -> Vec<String> {
vec!["exec".to_string()] vec!["exec".to_string()]
} }
fn parallel_safe(&self) -> bool {
true
}
} }
use crate::meta_plugin::register_meta_plugin; use crate::meta_plugin::register_meta_plugin;

View File

@@ -12,13 +12,36 @@ use crate::meta_plugin::{
process_metadata_outputs, process_metadata_outputs,
}; };
/// Wrapper around `magic::Cookie` that is Send.
///
/// Libmagic cookies are thread-safe per-instance (separate cookies have
/// independent state). The raw pointer `*mut magic_sys::magic_set` does not
/// auto-derive Send, but concurrent access to distinct cookies is safe per
/// the libmagic documentation.
#[cfg(feature = "magic")]
struct SendCookie(Cookie);
#[cfg(feature = "magic")]
// SAFETY: Each SendCookie owns a distinct libmagic instance. Libmagic
// documents that separate cookies can be used from different threads
// concurrently without synchronization.
#[allow(unsafe_code)]
unsafe impl Send for SendCookie {}
#[cfg(feature = "magic")]
impl std::fmt::Debug for SendCookie {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SendCookie").finish()
}
}
#[cfg(feature = "magic")] #[cfg(feature = "magic")]
#[derive(Debug)] #[derive(Debug)]
pub struct MagicFileMetaPluginImpl { pub struct MagicFileMetaPluginImpl {
buffer: Vec<u8>, buffer: Vec<u8>,
max_buffer_size: usize, max_buffer_size: usize,
is_finalized: bool, is_finalized: bool,
cookie: Option<Cookie>, cookie: Option<SendCookie>,
base: BaseMetaPlugin, base: BaseMetaPlugin,
} }
@@ -51,7 +74,8 @@ impl MagicFileMetaPluginImpl {
} }
fn get_magic_result(&self, flags: CookieFlags) -> io::Result<String> { fn get_magic_result(&self, flags: CookieFlags) -> io::Result<String> {
if let Some(cookie) = &self.cookie { if let Some(send_cookie) = &self.cookie {
let cookie = &send_cookie.0;
cookie cookie
.set_flags(flags) .set_flags(flags)
.map_err(|e| io::Error::other(format!("Failed to set magic flags: {e}")))?; .map_err(|e| io::Error::other(format!("Failed to set magic flags: {e}")))?;
@@ -125,7 +149,7 @@ impl MetaPlugin for MagicFileMetaPluginImpl {
}; };
} }
self.cookie = Some(cookie); self.cookie = Some(SendCookie(cookie));
MetaPluginResponse { MetaPluginResponse {
metadata: Vec::new(), metadata: Vec::new(),
@@ -210,6 +234,10 @@ impl MetaPlugin for MagicFileMetaPluginImpl {
) -> anyhow::Result<&mut std::collections::HashMap<String, serde_yaml::Value>> { ) -> anyhow::Result<&mut std::collections::HashMap<String, serde_yaml::Value>> {
Ok(self.base.options_mut()) Ok(self.base.options_mut())
} }
fn parallel_safe(&self) -> bool {
true
}
} }
#[cfg(feature = "magic")] #[cfg(feature = "magic")]
@@ -308,16 +336,15 @@ impl FallbackMagicFileMetaPlugin {
} }
// Get human-readable file type via --brief // Get human-readable file type via --brief
if let Some(file_type) = self.run_file_command(&["--brief"]) { if let Some(file_type) = self.run_file_command(&["--brief"])
if !file_type.is_empty() { && !file_type.is_empty()
if let Some(meta_data) = process_metadata_outputs( && let Some(meta_data) = process_metadata_outputs(
"file_type", "file_type",
serde_yaml::Value::String(file_type), serde_yaml::Value::String(file_type),
self.base.outputs(), self.base.outputs(),
) { )
metadata.push(meta_data); {
} metadata.push(meta_data);
}
} }
metadata metadata
@@ -415,6 +442,10 @@ impl MetaPlugin for FallbackMagicFileMetaPlugin {
) -> anyhow::Result<&mut std::collections::HashMap<String, serde_yaml::Value>> { ) -> anyhow::Result<&mut std::collections::HashMap<String, serde_yaml::Value>> {
Ok(self.base.options_mut()) Ok(self.base.options_mut())
} }
fn parallel_safe(&self) -> bool {
true
}
} }
#[cfg(not(feature = "magic"))] #[cfg(not(feature = "magic"))]

View File

@@ -318,7 +318,7 @@ pub fn process_metadata_outputs(
}) })
} }
pub trait MetaPlugin pub trait MetaPlugin: Send
where where
Self: 'static, Self: 'static,
{ {
@@ -488,6 +488,17 @@ where
"" ""
} }
/// Returns true if this plugin can execute concurrently with other
/// parallel-safe plugins.
///
/// Plugins that do significant per-chunk work (hashing, tokenization,
/// piping to child processes) should return true. The MetaService will
/// run all parallel-safe plugins in separate threads per phase, then
/// process results sequentially.
fn parallel_safe(&self) -> bool {
false
}
/// Builds the schema for this plugin from its options and outputs. /// Builds the schema for this plugin from its options and outputs.
/// ///
/// Default implementation infers option types from YAML values and /// Default implementation infers option types from YAML values and

View File

@@ -221,28 +221,27 @@ impl MetaPlugin for TokensMetaPlugin {
let mut metadata = Vec::new(); let mut metadata = Vec::new();
// If binary detection hasn't completed, do it now // If binary detection hasn't completed, do it now
if self.is_binary_content.is_none() { if self.is_binary_content.is_none()
if let Some(buffer) = &self.buffer { && let Some(buffer) = &self.buffer
if !buffer.is_empty() { && !buffer.is_empty()
let buffer_data = buffer.clone(); {
let is_binary = self.detect_binary(&buffer_data); let buffer_data = buffer.clone();
let is_binary = self.detect_binary(&buffer_data);
if is_binary { if is_binary {
if let Some(md) = crate::meta_plugin::process_metadata_outputs( if let Some(md) = crate::meta_plugin::process_metadata_outputs(
"token_count", "token_count",
serde_yaml::Value::Null, serde_yaml::Value::Null,
self.base.outputs(), self.base.outputs(),
) { ) {
metadata.push(md); metadata.push(md);
}
self.buffer = None;
self.is_finalized = true;
return MetaPluginResponse {
metadata,
is_finalized: true,
};
}
} }
self.buffer = None;
self.is_finalized = true;
return MetaPluginResponse {
metadata,
is_finalized: true,
};
} }
} }
@@ -301,6 +300,10 @@ impl MetaPlugin for TokensMetaPlugin {
) -> anyhow::Result<&mut std::collections::HashMap<String, serde_yaml::Value>> { ) -> anyhow::Result<&mut std::collections::HashMap<String, serde_yaml::Value>> {
Ok(self.base.options_mut()) Ok(self.base.options_mut())
} }
fn parallel_safe(&self) -> bool {
true
}
} }
use crate::meta_plugin::register_meta_plugin; use crate::meta_plugin::register_meta_plugin;

View File

@@ -208,12 +208,11 @@ pub fn settings_meta_plugin_types(
for meta_plugin_type in MetaPluginType::iter() { for meta_plugin_type in MetaPluginType::iter() {
if let Ok(meta_plugin) = if let Ok(meta_plugin) =
crate::meta_plugin::get_meta_plugin(meta_plugin_type.clone(), None, None) crate::meta_plugin::get_meta_plugin(meta_plugin_type.clone(), None, None)
&& meta_plugin.meta_type().to_string() == trimmed_name
{ {
if meta_plugin.meta_type().to_string() == trimmed_name { meta_plugin_types.push(meta_plugin_type);
meta_plugin_types.push(meta_plugin_type); found = true;
found = true; break;
break;
}
} }
} }

View File

@@ -1,13 +1,27 @@
use crate::config::Settings; use crate::config::Settings;
use crate::meta_plugin::{MetaPlugin, MetaPluginType}; use crate::meta_plugin::{MetaPlugin, MetaPluginResponse, MetaPluginType};
use crate::modes::common::settings_meta_plugin_types; use crate::modes::common::settings_meta_plugin_types;
use clap::Command; use clap::Command;
use log::debug; use log::{debug, error};
use rusqlite::Connection; use rusqlite::Connection;
use std::collections::HashMap; use std::collections::HashMap;
pub struct MetaService; pub struct MetaService;
/// Sentinel plugin used as a placeholder when extracting plugins for parallel
/// execution. The original plugin is written back immediately after the threads
/// complete. Never leaks into the DB or visible state.
struct NullMetaPlugin;
impl MetaPlugin for NullMetaPlugin {
fn meta_type(&self) -> MetaPluginType {
MetaPluginType::Digest
}
}
fn replace_plugin(plugins: &mut [Box<dyn MetaPlugin>], i: usize) -> Box<dyn MetaPlugin> {
std::mem::replace(&mut plugins[i], Box::new(NullMetaPlugin))
}
impl MetaService { impl MetaService {
pub fn new() -> Self { pub fn new() -> Self {
Self Self
@@ -77,16 +91,14 @@ impl MetaService {
for plugin in plugins.iter() { for plugin in plugins.iter() {
let plugin_name = plugin.meta_type().to_string(); let plugin_name = plugin.meta_type().to_string();
// For each plugin, collect all the output names it might write to
for (internal_name, output_config) in plugin.outputs() { for (internal_name, output_config) in plugin.outputs() {
let output_name = match output_config { let output_name = match output_config {
serde_yaml::Value::String(remapped_name) => remapped_name.clone(), serde_yaml::Value::String(remapped_name) => remapped_name.clone(),
serde_yaml::Value::Bool(true) => internal_name.clone(), serde_yaml::Value::Bool(true) => internal_name.clone(),
serde_yaml::Value::Bool(false) => continue, // This output is disabled serde_yaml::Value::Bool(false) => continue,
_ => internal_name.clone(), // Default to internal name for other types _ => internal_name.clone(),
}; };
// Only track outputs that will actually be written
if !matches!(output_config, serde_yaml::Value::Bool(false)) { if !matches!(output_config, serde_yaml::Value::Bool(false)) {
output_names output_names
.entry(output_name) .entry(output_name)
@@ -96,7 +108,6 @@ impl MetaService {
} }
} }
// Print warnings for duplicate output names
for (output_name, plugin_names) in &output_names { for (output_name, plugin_names) in &output_names {
if plugin_names.len() > 1 { if plugin_names.len() > 1 {
log::warn!( log::warn!(
@@ -107,9 +118,68 @@ impl MetaService {
} }
} }
for meta_plugin in plugins.iter_mut() { // Partition into parallel-safe and sequential indices
let response = meta_plugin.initialize(); let (parallel_idx, sequential_idx): (Vec<usize>, Vec<usize>) = plugins
self.process_plugin_response(conn, item_id, &mut **meta_plugin, response); .iter()
.enumerate()
.filter(|(_, p)| !p.is_finalized())
.map(|(i, _)| i)
.partition(|&i| plugins[i].parallel_safe());
// Run parallel-safe plugins concurrently
if !parallel_idx.is_empty() {
// Extract plugins by unique index into a flat Vec indexed by position
let mut parallel_plugins: Vec<Box<dyn MetaPlugin>> =
Vec::with_capacity(parallel_idx.len());
for &i in &parallel_idx {
parallel_plugins.push(replace_plugin(plugins, i));
}
// Write results back to original slots sequentially (DB writes are serial)
let (results, panicked): (Vec<(usize, MetaPluginResponse)>, Vec<usize>) =
std::thread::scope(|s| {
let handles: Vec<_> = parallel_plugins
.iter_mut()
.map(|plugin| s.spawn(move || plugin.initialize()))
.collect();
let mut results = Vec::with_capacity(handles.len());
let mut panicked = Vec::new();
for (j, handle) in handles.into_iter().enumerate() {
match handle.join() {
Ok(response) => results.push((j, response)),
Err(e) => {
error!("META_SERVICE: Plugin panicked during initialize: {e:?}");
panicked.push(j);
}
}
}
(results, panicked)
});
for (j, response) in results {
store_plugin_metadata(conn, item_id, &response);
let mut plugin = replace_plugin(&mut parallel_plugins, j);
if response.is_finalized {
plugin.set_finalized(true);
}
plugins[parallel_idx[j]] = plugin;
}
// Panicked plugins: restore the NullMetaPlugin sentinel and
// mark it finalized so future phases skip it cleanly.
for j in panicked {
let mut plugin = replace_plugin(&mut parallel_plugins, j);
plugin.set_finalized(true);
plugins[parallel_idx[j]] = plugin;
}
}
// Run sequential plugins
for &i in &sequential_idx {
let response = plugins[i].initialize();
store_plugin_metadata(conn, item_id, &response);
if response.is_finalized {
plugins[i].set_finalized(true);
}
} }
} }
@@ -120,18 +190,64 @@ impl MetaService {
conn: &Connection, conn: &Connection,
item_id: i64, item_id: i64,
) { ) {
for meta_plugin in plugins.iter_mut() { // Partition non-finalized plugins by parallel_safe
// Skip plugins that are already finalized let (parallel_idx, sequential_idx): (Vec<usize>, Vec<usize>) = plugins
if meta_plugin.is_finalized() { .iter()
continue; .enumerate()
.filter(|(_, p)| !p.is_finalized())
.map(|(i, _)| i)
.partition(|&i| plugins[i].parallel_safe());
// Run parallel-safe plugins concurrently on this chunk
if !parallel_idx.is_empty() {
// Extract plugins by unique index into a flat Vec indexed by position
let mut parallel_plugins: Vec<Box<dyn MetaPlugin>> =
Vec::with_capacity(parallel_idx.len());
for &i in &parallel_idx {
parallel_plugins.push(replace_plugin(plugins, i));
} }
let response = meta_plugin.update(chunk); let (results, panicked): (Vec<(usize, MetaPluginResponse)>, Vec<usize>) =
self.process_plugin_response(conn, item_id, &mut **meta_plugin, response.clone()); std::thread::scope(|s| {
let handles: Vec<_> = parallel_plugins
.iter_mut()
.map(|plugin| s.spawn(move || plugin.update(chunk)))
.collect();
let mut results = Vec::with_capacity(handles.len());
let mut panicked = Vec::new();
for (j, handle) in handles.into_iter().enumerate() {
match handle.join() {
Ok(response) => results.push((j, response)),
Err(e) => {
error!("META_SERVICE: Plugin panicked during update: {e:?}");
panicked.push(j);
}
}
}
(results, panicked)
});
// Set finalized flag if response indicates finalization for (j, response) in results {
store_plugin_metadata(conn, item_id, &response);
let mut plugin = replace_plugin(&mut parallel_plugins, j);
if response.is_finalized {
plugin.set_finalized(true);
}
plugins[parallel_idx[j]] = plugin;
}
for j in panicked {
let mut plugin = replace_plugin(&mut parallel_plugins, j);
plugin.set_finalized(true);
plugins[parallel_idx[j]] = plugin;
}
}
// Run sequential plugins
for &i in &sequential_idx {
let response = plugins[i].update(chunk);
store_plugin_metadata(conn, item_id, &response);
if response.is_finalized { if response.is_finalized {
meta_plugin.set_finalized(true); plugins[i].set_finalized(true);
} }
} }
} }
@@ -143,57 +259,19 @@ impl MetaService {
item_id: i64, item_id: i64,
) { ) {
for meta_plugin in plugins.iter_mut() { for meta_plugin in plugins.iter_mut() {
// Skip plugins that are already finalized
if meta_plugin.is_finalized() { if meta_plugin.is_finalized() {
continue; continue;
} }
let response = meta_plugin.finalize(); let response = meta_plugin.finalize();
self.process_plugin_response(conn, item_id, &mut **meta_plugin, response.clone()); store_plugin_metadata(conn, item_id, &response);
// Set finalized flag if response indicates finalization
if response.is_finalized { if response.is_finalized {
meta_plugin.set_finalized(true); meta_plugin.set_finalized(true);
} }
} }
} }
/// Internal helper to process a meta plugin response and store metadata.
///
/// Iterates over the metadata entries in the response and stores each in the database
/// using `store_meta`. Logs warnings if storage fails.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `item_id` - Item ID to associate with the metadata.
/// * `_plugin` - Reference to the plugin (unused).
/// * `response` - The plugin response containing metadata.
///
/// # Errors
///
/// Logs warnings for individual storage failures but does not return errors.
fn process_plugin_response(
&self,
conn: &Connection,
item_id: i64,
_plugin: &mut dyn MetaPlugin,
response: crate::meta_plugin::MetaPluginResponse,
) {
for meta_data in response.metadata {
// The metadata has already been processed by the plugin, so we can use it directly
// Save to database
let db_meta = crate::db::Meta {
id: item_id,
name: meta_data.name,
value: meta_data.value,
};
if let Err(e) = crate::db::store_meta(conn, db_meta) {
log::warn!("META_SERVICE: Failed to store metadata: {e}");
}
}
}
/// Collects initial metadata from environment variables and hostname. /// Collects initial metadata from environment variables and hostname.
/// ///
/// Gathers metadata from `KEEP_META_*` environment variables and adds hostname /// Gathers metadata from `KEEP_META_*` environment variables and adds hostname
@@ -222,6 +300,26 @@ impl MetaService {
} }
} }
/// Stores metadata entries from a plugin response into the database.
///
/// # Arguments
///
/// * `conn` - Database connection.
/// * `item_id` - Item ID to associate with the metadata.
/// * `response` - The plugin response containing metadata.
fn store_plugin_metadata(conn: &Connection, item_id: i64, response: &MetaPluginResponse) {
for meta_data in &response.metadata {
let db_meta = crate::db::Meta {
id: item_id,
name: meta_data.name.clone(),
value: meta_data.value.clone(),
};
if let Err(e) = crate::db::store_meta(conn, db_meta) {
log::warn!("META_SERVICE: Failed to store metadata: {e}");
}
}
}
impl Default for MetaService { impl Default for MetaService {
/// Provides a default `MetaService` instance. /// Provides a default `MetaService` instance.
/// ///