feat: add plugin schema system, tokenizer cache, and config validation
- Add plugin schema types and runtime discovery for meta/filter plugins - Rewrite --generate-config to use schema system instead of hardcoded types - Add Settings::validate_config() for startup validation - Cache tokenizer instances via static Lazy to avoid repeated BPE loading - Add split_by_token_iter() and count_bounded() to Tokenizer - Fix double-counting bug in TokensMetaPlugin when buffer < max_buffer_size - Eliminate unnecessary allocations in token count methods - Refactor token filters: remove Option<Tokenizer>, use iterator API - Fix TailTokensFilter correctness: unbounded buffer instead of ring buffer - Add encoding option to all token filters - Add description() to MetaPlugin and FilterPlugin traits - Fix unused_mut warning in compression engine (feature-gated code) Co-Authored-By: code-review-bot <noreply@anthropic.com>
This commit is contained in:
@@ -3,5 +3,8 @@ pub mod is_binary;
|
||||
/// Detects if data is binary or text based on signatures and printable ratios.
|
||||
pub mod status;
|
||||
|
||||
/// Plugin schema types and discovery functions.
|
||||
pub mod schema;
|
||||
|
||||
/// Standard buffer size for I/O operations (8KB)
|
||||
pub const PIPESIZE: usize = 8192;
|
||||
|
||||
166
src/common/schema.rs
Normal file
166
src/common/schema.rs
Normal file
@@ -0,0 +1,166 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use strum::IntoEnumIterator;
|
||||
|
||||
/// Value type for a plugin option.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum OptionType {
|
||||
String,
|
||||
Integer,
|
||||
Boolean,
|
||||
Any,
|
||||
}
|
||||
|
||||
impl OptionType {
|
||||
/// Infer the option type from a YAML value.
|
||||
pub fn from_yaml_value(value: &serde_yaml::Value) -> Self {
|
||||
match value {
|
||||
serde_yaml::Value::Bool(_) => OptionType::Boolean,
|
||||
serde_yaml::Value::Number(_) => OptionType::Integer,
|
||||
serde_yaml::Value::String(_) => OptionType::String,
|
||||
_ => OptionType::Any,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Schema for a single plugin option.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct OptionSchema {
|
||||
pub name: String,
|
||||
pub option_type: OptionType,
|
||||
pub default: Option<serde_yaml::Value>,
|
||||
pub required: bool,
|
||||
}
|
||||
|
||||
/// Schema for a single plugin output.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct OutputSchema {
|
||||
pub name: String,
|
||||
pub description: String,
|
||||
}
|
||||
|
||||
/// Schema describing a plugin's configuration requirements.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PluginSchema {
|
||||
pub name: String,
|
||||
pub description: String,
|
||||
pub options: Vec<OptionSchema>,
|
||||
pub outputs: Vec<OutputSchema>,
|
||||
}
|
||||
|
||||
/// Gathers schemas from all registered meta plugins.
|
||||
///
|
||||
/// Iterates all `MetaPluginType` variants, attempts to create a default instance,
|
||||
/// and collects their schemas. Plugins that fail to register (e.g., feature-gated)
|
||||
/// are silently skipped.
|
||||
pub fn gather_meta_plugin_schemas() -> Vec<PluginSchema> {
|
||||
use crate::meta_plugin::{MetaPluginType, get_meta_plugin};
|
||||
|
||||
let mut schemas = Vec::new();
|
||||
let mut sorted_types: Vec<MetaPluginType> = MetaPluginType::iter().collect();
|
||||
sorted_types.sort_by_key(|t| t.to_string());
|
||||
|
||||
for plugin_type in sorted_types {
|
||||
let plugin = match get_meta_plugin(plugin_type.clone(), None, None) {
|
||||
Ok(p) => p,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let name = plugin.meta_type().to_string();
|
||||
|
||||
let options: Vec<OptionSchema> = plugin
|
||||
.options()
|
||||
.iter()
|
||||
.map(|(key, value)| {
|
||||
let option_type = OptionType::from_yaml_value(value);
|
||||
let (default, required) = if value.is_null() {
|
||||
(None, true)
|
||||
} else {
|
||||
(Some(value.clone()), false)
|
||||
};
|
||||
OptionSchema {
|
||||
name: key.clone(),
|
||||
option_type,
|
||||
default,
|
||||
required,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut outputs: Vec<OutputSchema> = Vec::new();
|
||||
for (key, value) in plugin.outputs() {
|
||||
if !value.is_null() {
|
||||
outputs.push(OutputSchema {
|
||||
name: key.clone(),
|
||||
description: key.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
// Also include default outputs if outputs map is empty
|
||||
if outputs.is_empty() {
|
||||
for output_name in plugin.default_outputs() {
|
||||
outputs.push(OutputSchema {
|
||||
name: output_name.clone(),
|
||||
description: output_name,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
schemas.push(PluginSchema {
|
||||
name,
|
||||
description: plugin.description().to_string(),
|
||||
options,
|
||||
outputs,
|
||||
});
|
||||
}
|
||||
|
||||
schemas
|
||||
}
|
||||
|
||||
/// Gathers schemas from all registered filter plugins.
|
||||
///
|
||||
/// Uses the global filter plugin registry to discover all registered filters,
|
||||
/// creates a default instance of each, and collects their option schemas.
|
||||
pub fn gather_filter_plugin_schemas() -> Vec<PluginSchema> {
|
||||
use crate::services::filter_service::get_available_filter_plugins;
|
||||
|
||||
let plugins = get_available_filter_plugins();
|
||||
let mut schemas: Vec<PluginSchema> = plugins
|
||||
.into_iter()
|
||||
.map(|(name, creator)| {
|
||||
let plugin = creator();
|
||||
let options: Vec<OptionSchema> = plugin
|
||||
.options()
|
||||
.iter()
|
||||
.map(|opt| {
|
||||
let option_type = match &opt.default {
|
||||
Some(serde_json::Value::Bool(_)) => OptionType::Boolean,
|
||||
Some(serde_json::Value::Number(_)) => OptionType::Integer,
|
||||
Some(serde_json::Value::String(_)) => OptionType::String,
|
||||
_ => OptionType::Any,
|
||||
};
|
||||
OptionSchema {
|
||||
name: opt.name.clone(),
|
||||
option_type,
|
||||
default: opt.default.as_ref().map(|v| {
|
||||
// Convert serde_json::Value to serde_yaml::Value
|
||||
serde_yaml::to_value(v).unwrap_or(serde_yaml::Value::Null)
|
||||
}),
|
||||
required: opt.required,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
PluginSchema {
|
||||
name: name.clone(),
|
||||
description: plugin.description().to_string(),
|
||||
options,
|
||||
outputs: Vec::new(),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
schemas.sort_by(|a, b| a.name.cmp(&b.name));
|
||||
schemas
|
||||
}
|
||||
@@ -175,6 +175,7 @@ impl Clone for Box<dyn CompressionEngine> {
|
||||
|
||||
lazy_static! {
|
||||
static ref COMPRESSION_ENGINES: EnumMap<CompressionType, Box<dyn CompressionEngine>> = {
|
||||
#[allow(unused_mut)] // mut needed when gzip/lz4 features are enabled
|
||||
let mut em = enum_map! {
|
||||
CompressionType::LZ4 => Box::new(crate::compression_engine::program::CompressionEngineProgram::new(
|
||||
"lz4",
|
||||
|
||||
@@ -573,4 +573,65 @@ impl Settings {
|
||||
.map(|plugins| plugins.iter().map(|p| p.name.clone()).collect())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Validates the configuration against plugin schemas.
|
||||
///
|
||||
/// Checks that:
|
||||
/// - All configured meta plugin names are valid and registered
|
||||
/// - Required options are present for each meta plugin
|
||||
/// - Compression plugin name (if set) is a valid compression type
|
||||
///
|
||||
/// Returns a list of warning strings. An empty list means the config is valid.
|
||||
pub fn validate_config(&self) -> Vec<String> {
|
||||
use crate::common::schema::gather_meta_plugin_schemas;
|
||||
use crate::compression_engine::CompressionType;
|
||||
use strum::IntoEnumIterator;
|
||||
|
||||
let mut warnings = Vec::new();
|
||||
|
||||
// Validate compression plugin
|
||||
if let Some(ref comp) = self.compression_plugin {
|
||||
let valid_types: Vec<String> =
|
||||
CompressionType::iter().map(|ct| ct.to_string()).collect();
|
||||
if !valid_types.contains(&comp.name) {
|
||||
warnings.push(format!(
|
||||
"Unknown compression_plugin.name: '{}'. Valid types: {}",
|
||||
comp.name,
|
||||
valid_types.join(", ")
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Validate meta plugins
|
||||
if let Some(ref plugins) = self.meta_plugins {
|
||||
let schemas = gather_meta_plugin_schemas();
|
||||
let schema_map: std::collections::HashMap<&str, &crate::common::schema::PluginSchema> =
|
||||
schemas.iter().map(|s| (s.name.as_str(), s)).collect();
|
||||
|
||||
for plugin in plugins {
|
||||
match schema_map.get(plugin.name.as_str()) {
|
||||
Some(schema) => {
|
||||
// Check required options
|
||||
for opt in &schema.options {
|
||||
if opt.required && !plugin.options.contains_key(&opt.name) {
|
||||
warnings.push(format!(
|
||||
"Meta plugin '{}': missing required option '{}'",
|
||||
plugin.name, opt.name
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warnings.push(format!(
|
||||
"Unknown meta plugin: '{}'. Available: {}",
|
||||
plugin.name,
|
||||
schema_map.keys().copied().collect::<Vec<_>>().join(", ")
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
warnings
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use super::{FilterPlugin, FilterOption};
|
||||
use std::io::{Result, Read, Write};
|
||||
use std::process::{Command, Stdio, Child};
|
||||
use which::which;
|
||||
use super::{FilterOption, FilterPlugin};
|
||||
use log::*;
|
||||
use std::io::{Read, Result, Write};
|
||||
use std::process::{Child, Command, Stdio};
|
||||
use which::which;
|
||||
|
||||
/// A filter that executes an external program and pipes input through it.
|
||||
///
|
||||
@@ -43,16 +43,13 @@ impl ExecFilter {
|
||||
/// let filter = ExecFilter::new("grep", vec!["-i", "error"], false);
|
||||
/// assert!(filter.supported);
|
||||
/// ```
|
||||
pub fn new(
|
||||
program: &str,
|
||||
args: Vec<&str>,
|
||||
split_whitespace: bool,
|
||||
) -> ExecFilter {
|
||||
pub fn new(program: &str, args: Vec<&str>, split_whitespace: bool) -> ExecFilter {
|
||||
let program_path = which(program);
|
||||
let supported = program_path.is_ok();
|
||||
|
||||
ExecFilter {
|
||||
program: program_path.map_or_else(|| program.to_string(), |p| p.to_string_lossy().to_string()),
|
||||
program: program_path
|
||||
.map_or_else(|| program.to_string(), |p| p.to_string_lossy().to_string()),
|
||||
args: args.iter().map(|s| s.to_string()).collect(),
|
||||
supported,
|
||||
split_whitespace,
|
||||
@@ -101,7 +98,10 @@ impl FilterPlugin for ExecFilter {
|
||||
));
|
||||
}
|
||||
|
||||
debug!("FILTER_EXEC: Executing command: {} {:?}", self.program, self.args);
|
||||
debug!(
|
||||
"FILTER_EXEC: Executing command: {} {:?}",
|
||||
self.program, self.args
|
||||
);
|
||||
|
||||
// Read all input first
|
||||
let mut input_data = Vec::new();
|
||||
@@ -129,7 +129,7 @@ impl FilterPlugin for ExecFilter {
|
||||
|
||||
// Write input to child stdin
|
||||
stdin.write_all(&input_data)?;
|
||||
drop(stdin); // Close stdin to signal EOF
|
||||
drop(stdin); // Close stdin to signal EOF
|
||||
|
||||
let mut stdout = child.stdout.take().ok_or_else(|| {
|
||||
std::io::Error::new(
|
||||
@@ -142,13 +142,12 @@ impl FilterPlugin for ExecFilter {
|
||||
std::io::copy(&mut stdout, writer)?;
|
||||
|
||||
// Wait for the child process to finish
|
||||
let output = child.wait_with_output()
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to wait on child process: {}", e),
|
||||
)
|
||||
})?;
|
||||
let output = child.wait_with_output().map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to wait on child process: {}", e),
|
||||
)
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
@@ -205,6 +204,10 @@ impl FilterPlugin for ExecFilter {
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Pipe input through an external command"
|
||||
}
|
||||
}
|
||||
|
||||
// Register the plugin at module initialization time
|
||||
|
||||
@@ -132,4 +132,8 @@ impl FilterPlugin for GrepFilter {
|
||||
required: true,
|
||||
}]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Filter lines matching a regex pattern"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,6 +140,10 @@ impl FilterPlugin for HeadBytesFilter {
|
||||
required: true,
|
||||
}]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Read the first N bytes"
|
||||
}
|
||||
}
|
||||
|
||||
/// A filter that reads the first N lines from the input stream.
|
||||
@@ -270,6 +274,10 @@ impl FilterPlugin for HeadLinesFilter {
|
||||
required: true,
|
||||
}]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Read the first N lines"
|
||||
}
|
||||
}
|
||||
|
||||
// Register the plugin at module initialization time
|
||||
|
||||
@@ -172,6 +172,15 @@ pub trait FilterPlugin: Send {
|
||||
/// }
|
||||
/// ```
|
||||
fn options(&self) -> Vec<FilterOption>;
|
||||
|
||||
/// Returns a human-readable description of this filter.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A description string (empty by default).
|
||||
fn description(&self) -> &str {
|
||||
""
|
||||
}
|
||||
}
|
||||
|
||||
/// Enum representing the different types of filters.
|
||||
@@ -684,12 +693,13 @@ fn create_specific_filter(
|
||||
"head_tokens filter requires 'count' parameter",
|
||||
)
|
||||
})?;
|
||||
let encoding = crate::tokenizer::TokenEncoding::Cl100kBase;
|
||||
let encoding = options
|
||||
.get("encoding")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| s.parse::<crate::tokenizer::TokenEncoding>().ok())
|
||||
.unwrap_or_default();
|
||||
let mut f = tokens::HeadTokensFilter::new(count);
|
||||
f.tokenizer = Some(
|
||||
crate::tokenizer::Tokenizer::new(encoding)
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))?,
|
||||
);
|
||||
f.tokenizer = crate::tokenizer::get_tokenizer(encoding).clone();
|
||||
f.encoding = encoding;
|
||||
Ok(Box::new(f))
|
||||
}
|
||||
@@ -705,12 +715,13 @@ fn create_specific_filter(
|
||||
"skip_tokens filter requires 'count' parameter",
|
||||
)
|
||||
})?;
|
||||
let encoding = crate::tokenizer::TokenEncoding::Cl100kBase;
|
||||
let encoding = options
|
||||
.get("encoding")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| s.parse::<crate::tokenizer::TokenEncoding>().ok())
|
||||
.unwrap_or_default();
|
||||
let mut f = tokens::SkipTokensFilter::new(count);
|
||||
f.tokenizer = Some(
|
||||
crate::tokenizer::Tokenizer::new(encoding)
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))?,
|
||||
);
|
||||
f.tokenizer = crate::tokenizer::get_tokenizer(encoding).clone();
|
||||
f.encoding = encoding;
|
||||
Ok(Box::new(f))
|
||||
}
|
||||
@@ -726,12 +737,13 @@ fn create_specific_filter(
|
||||
"tail_tokens filter requires 'count' parameter",
|
||||
)
|
||||
})?;
|
||||
let encoding = crate::tokenizer::TokenEncoding::Cl100kBase;
|
||||
let encoding = options
|
||||
.get("encoding")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| s.parse::<crate::tokenizer::TokenEncoding>().ok())
|
||||
.unwrap_or_default();
|
||||
let mut f = tokens::TailTokensFilter::new(count);
|
||||
f.tokenizer = Some(
|
||||
crate::tokenizer::Tokenizer::new(encoding)
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))?,
|
||||
);
|
||||
f.tokenizer = crate::tokenizer::get_tokenizer(encoding).clone();
|
||||
f.encoding = encoding;
|
||||
Ok(Box::new(f))
|
||||
}
|
||||
|
||||
@@ -72,6 +72,10 @@ impl FilterPlugin for SkipBytesFilter {
|
||||
required: true,
|
||||
}]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Skip the first N bytes"
|
||||
}
|
||||
}
|
||||
|
||||
/// A filter that skips the first N lines from the input stream.
|
||||
@@ -137,6 +141,10 @@ impl FilterPlugin for SkipLinesFilter {
|
||||
required: true,
|
||||
}]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Skip the first N lines"
|
||||
}
|
||||
}
|
||||
|
||||
// Register the plugin at module initialization time
|
||||
|
||||
@@ -56,4 +56,8 @@ impl FilterPlugin for StripAnsiFilter {
|
||||
fn options(&self) -> Vec<FilterOption> {
|
||||
Vec::new() // strip_ansi doesn't take any options
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Strip ANSI escape sequences"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,6 +82,10 @@ impl FilterPlugin for TailBytesFilter {
|
||||
required: true,
|
||||
}]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Read the last N bytes"
|
||||
}
|
||||
}
|
||||
|
||||
/// A filter that reads the last N lines from the input stream.
|
||||
@@ -156,6 +160,10 @@ impl FilterPlugin for TailLinesFilter {
|
||||
required: true,
|
||||
}]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Read the last N lines"
|
||||
}
|
||||
}
|
||||
|
||||
// Register the plugin at module initialization time
|
||||
|
||||
@@ -1,20 +1,9 @@
|
||||
use super::{FilterOption, FilterPlugin};
|
||||
use crate::common::PIPESIZE;
|
||||
use crate::services::filter_service::register_filter_plugin;
|
||||
use crate::tokenizer::{TokenEncoding, Tokenizer};
|
||||
use std::collections::VecDeque;
|
||||
use crate::tokenizer::{TokenEncoding, Tokenizer, get_tokenizer};
|
||||
use std::io::{Read, Result, Write};
|
||||
|
||||
/// Resolve the tokenizer from a JSON options map.
|
||||
fn resolve_tokenizer(options: &Option<serde_json::Value>) -> Tokenizer {
|
||||
let encoding = options
|
||||
.as_ref()
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| s.parse::<TokenEncoding>().ok())
|
||||
.unwrap_or_default();
|
||||
Tokenizer::new(encoding).expect("Failed to create tokenizer")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// head_tokens
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -22,19 +11,21 @@ fn resolve_tokenizer(options: &Option<serde_json::Value>) -> Tokenizer {
|
||||
/// A filter that outputs only the first N tokens of the input stream.
|
||||
///
|
||||
/// Streams bytes directly until the token limit is reached. When the limit
|
||||
/// falls mid-chunk, uses `split_by_token` to find the exact byte boundary.
|
||||
/// falls mid-chunk, uses `split_by_token_iter` to find the exact byte boundary
|
||||
/// without allocating token strings beyond what is needed.
|
||||
pub struct HeadTokensFilter {
|
||||
pub remaining: usize,
|
||||
pub tokenizer: Option<Tokenizer>,
|
||||
pub tokenizer: Tokenizer,
|
||||
pub encoding: TokenEncoding,
|
||||
}
|
||||
|
||||
impl HeadTokensFilter {
|
||||
pub fn new(count: usize) -> Self {
|
||||
let encoding = TokenEncoding::default();
|
||||
Self {
|
||||
remaining: count,
|
||||
tokenizer: None,
|
||||
encoding: TokenEncoding::default(),
|
||||
tokenizer: get_tokenizer(encoding).clone(),
|
||||
encoding,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -45,11 +36,7 @@ impl FilterPlugin for HeadTokensFilter {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let tokenizer = self
|
||||
.tokenizer
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| panic!("HeadTokensFilter: tokenizer not initialized"));
|
||||
|
||||
let tokenizer = &self.tokenizer;
|
||||
let mut buffer = vec![0u8; PIPESIZE];
|
||||
let mut total_tokens = 0usize;
|
||||
|
||||
@@ -71,22 +58,15 @@ impl FilterPlugin for HeadTokensFilter {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// Cutoff is within this chunk — split at exact token boundary
|
||||
// Cutoff is within this chunk — use iterator to find exact
|
||||
// boundary without allocating all token strings
|
||||
let tokens_to_write = self.remaining - total_tokens;
|
||||
let token_strs = tokenizer
|
||||
.split_by_token(&text)
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))?;
|
||||
let mut byte_pos = 0usize;
|
||||
for token_str in token_strs.iter().take(tokens_to_write) {
|
||||
byte_pos += token_str.len();
|
||||
for token_str in tokenizer.split_by_token_iter(&text).take(tokens_to_write) {
|
||||
byte_pos += token_str
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))?
|
||||
.len();
|
||||
}
|
||||
// Write only the bytes for the tokens we want.
|
||||
// Map byte positions in the lossy string back to positions in the
|
||||
// original byte slice. Since from_utf8_lossy replaces invalid
|
||||
// bytes with the replacement character (3 bytes), we need to be
|
||||
// careful. For simplicity, write the valid prefix of the chunk.
|
||||
// We use the original bytes up to the calculated position, adjusting
|
||||
// for any UTF-8 replacement character differences.
|
||||
let write_len = map_lossy_pos_to_bytes(chunk, &text, byte_pos);
|
||||
writer.write_all(&chunk[..write_len])?;
|
||||
break;
|
||||
@@ -98,20 +78,28 @@ impl FilterPlugin for HeadTokensFilter {
|
||||
fn clone_box(&self) -> Box<dyn FilterPlugin> {
|
||||
Box::new(Self {
|
||||
remaining: self.remaining,
|
||||
tokenizer: self
|
||||
.tokenizer
|
||||
.as_ref()
|
||||
.map(|_| Tokenizer::new(self.encoding).unwrap()),
|
||||
tokenizer: get_tokenizer(self.encoding).clone(),
|
||||
encoding: self.encoding,
|
||||
})
|
||||
}
|
||||
|
||||
fn options(&self) -> Vec<FilterOption> {
|
||||
vec![FilterOption {
|
||||
name: "count".to_string(),
|
||||
default: None,
|
||||
required: true,
|
||||
}]
|
||||
vec![
|
||||
FilterOption {
|
||||
name: "count".to_string(),
|
||||
default: None,
|
||||
required: true,
|
||||
},
|
||||
FilterOption {
|
||||
name: "encoding".to_string(),
|
||||
default: Some(serde_json::Value::String("cl100k_base".to_string())),
|
||||
required: false,
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Read the first N LLM tokens"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,16 +110,17 @@ impl FilterPlugin for HeadTokensFilter {
|
||||
/// A filter that skips the first N tokens of the input stream and outputs the rest.
|
||||
pub struct SkipTokensFilter {
|
||||
pub remaining: usize,
|
||||
pub tokenizer: Option<Tokenizer>,
|
||||
pub tokenizer: Tokenizer,
|
||||
pub encoding: TokenEncoding,
|
||||
}
|
||||
|
||||
impl SkipTokensFilter {
|
||||
pub fn new(count: usize) -> Self {
|
||||
let encoding = TokenEncoding::default();
|
||||
Self {
|
||||
remaining: count,
|
||||
tokenizer: None,
|
||||
encoding: TokenEncoding::default(),
|
||||
tokenizer: get_tokenizer(encoding).clone(),
|
||||
encoding,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -142,11 +131,7 @@ impl FilterPlugin for SkipTokensFilter {
|
||||
return std::io::copy(reader, writer).map(|_| ());
|
||||
}
|
||||
|
||||
let tokenizer = self
|
||||
.tokenizer
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| panic!("SkipTokensFilter: tokenizer not initialized"));
|
||||
|
||||
let tokenizer = &self.tokenizer;
|
||||
let mut buffer = vec![0u8; PIPESIZE];
|
||||
let mut total_tokens = 0usize;
|
||||
let mut done_skipping = false;
|
||||
@@ -173,14 +158,14 @@ impl FilterPlugin for SkipTokensFilter {
|
||||
done_skipping = true;
|
||||
}
|
||||
} else {
|
||||
// Cutoff is within this chunk — skip past the boundary, write rest
|
||||
// Cutoff is within this chunk — use iterator to skip past
|
||||
// the boundary without allocating all token strings
|
||||
let tokens_to_skip = self.remaining - total_tokens;
|
||||
let token_strs = tokenizer
|
||||
.split_by_token(&text)
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))?;
|
||||
let mut byte_pos = 0usize;
|
||||
for token_str in token_strs.iter().take(tokens_to_skip) {
|
||||
byte_pos += token_str.len();
|
||||
for token_str in tokenizer.split_by_token_iter(&text).take(tokens_to_skip) {
|
||||
byte_pos += token_str
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))?
|
||||
.len();
|
||||
}
|
||||
let skip_len = map_lossy_pos_to_bytes(chunk, &text, byte_pos);
|
||||
if skip_len < n {
|
||||
@@ -195,20 +180,28 @@ impl FilterPlugin for SkipTokensFilter {
|
||||
fn clone_box(&self) -> Box<dyn FilterPlugin> {
|
||||
Box::new(Self {
|
||||
remaining: self.remaining,
|
||||
tokenizer: self
|
||||
.tokenizer
|
||||
.as_ref()
|
||||
.map(|_| Tokenizer::new(self.encoding).unwrap()),
|
||||
tokenizer: get_tokenizer(self.encoding).clone(),
|
||||
encoding: self.encoding,
|
||||
})
|
||||
}
|
||||
|
||||
fn options(&self) -> Vec<FilterOption> {
|
||||
vec![FilterOption {
|
||||
name: "count".to_string(),
|
||||
default: None,
|
||||
required: true,
|
||||
}]
|
||||
vec![
|
||||
FilterOption {
|
||||
name: "count".to_string(),
|
||||
default: None,
|
||||
required: true,
|
||||
},
|
||||
FilterOption {
|
||||
name: "encoding".to_string(),
|
||||
default: Some(serde_json::Value::String("cl100k_base".to_string())),
|
||||
required: false,
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Skip the first N LLM tokens"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,27 +211,24 @@ impl FilterPlugin for SkipTokensFilter {
|
||||
|
||||
/// A filter that outputs only the last N tokens of the input stream.
|
||||
///
|
||||
/// Uses a bounded ring buffer (last ~2× PIPESIZE) to keep recent bytes.
|
||||
/// At finalize, tokenizes the buffered content and writes only the last N tokens.
|
||||
/// Buffers all bytes from the stream, then at finalize tokenizes the
|
||||
/// content and writes only the last N tokens.
|
||||
pub struct TailTokensFilter {
|
||||
pub count: usize,
|
||||
/// Ring buffer holding the most recent bytes from the stream.
|
||||
pub ring: VecDeque<u8>,
|
||||
pub ring_capacity: usize,
|
||||
pub tokenizer: Option<Tokenizer>,
|
||||
/// Buffer holding all bytes from the stream.
|
||||
buffer: Vec<u8>,
|
||||
pub tokenizer: Tokenizer,
|
||||
pub encoding: TokenEncoding,
|
||||
}
|
||||
|
||||
impl TailTokensFilter {
|
||||
pub fn new(count: usize) -> Self {
|
||||
// Keep enough bytes for ~2 chunks worth of data
|
||||
let ring_capacity = PIPESIZE * 2;
|
||||
let encoding = TokenEncoding::default();
|
||||
Self {
|
||||
count,
|
||||
ring: VecDeque::with_capacity(ring_capacity),
|
||||
ring_capacity,
|
||||
tokenizer: None,
|
||||
encoding: TokenEncoding::default(),
|
||||
buffer: Vec::with_capacity(PIPESIZE),
|
||||
tokenizer: get_tokenizer(encoding).clone(),
|
||||
encoding,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -249,36 +239,23 @@ impl FilterPlugin for TailTokensFilter {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let tokenizer = self
|
||||
.tokenizer
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| panic!("TailTokensFilter: tokenizer not initialized"));
|
||||
let tokenizer = &self.tokenizer;
|
||||
|
||||
// Stream all bytes through the ring buffer
|
||||
let mut buffer = vec![0u8; PIPESIZE];
|
||||
loop {
|
||||
let n = reader.read(&mut buffer)?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
for &byte in &buffer[..n] {
|
||||
if self.ring.len() >= self.ring_capacity {
|
||||
self.ring.pop_front();
|
||||
}
|
||||
self.ring.push_back(byte);
|
||||
}
|
||||
// Buffer all bytes from the stream
|
||||
std::io::copy(reader, &mut self.buffer)?;
|
||||
|
||||
if self.buffer.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Tokenize the buffered content and extract last N tokens
|
||||
let buffered: Vec<u8> = self.ring.iter().copied().collect();
|
||||
let text = String::from_utf8_lossy(&buffered);
|
||||
let text = String::from_utf8_lossy(&self.buffer);
|
||||
let token_strs = tokenizer
|
||||
.split_by_token(&text)
|
||||
.map_err(|e| std::io::Error::other(e.to_string()))?;
|
||||
|
||||
if token_strs.len() <= self.count {
|
||||
// All tokens fit — write everything
|
||||
writer.write_all(&buffered)?;
|
||||
writer.write_all(&self.buffer)?;
|
||||
} else {
|
||||
// Write only the last N tokens
|
||||
let skip = token_strs.len() - self.count;
|
||||
@@ -286,9 +263,9 @@ impl FilterPlugin for TailTokensFilter {
|
||||
for token_str in token_strs.iter().take(skip) {
|
||||
byte_offset += token_str.len();
|
||||
}
|
||||
let write_len = map_lossy_pos_to_bytes(&buffered, &text, byte_offset);
|
||||
if write_len < buffered.len() {
|
||||
writer.write_all(&buffered[write_len..])?;
|
||||
let write_len = map_lossy_pos_to_bytes(&self.buffer, &text, byte_offset);
|
||||
if write_len < self.buffer.len() {
|
||||
writer.write_all(&self.buffer[write_len..])?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,22 +275,29 @@ impl FilterPlugin for TailTokensFilter {
|
||||
fn clone_box(&self) -> Box<dyn FilterPlugin> {
|
||||
Box::new(Self {
|
||||
count: self.count,
|
||||
ring: self.ring.clone(),
|
||||
ring_capacity: self.ring_capacity,
|
||||
tokenizer: self
|
||||
.tokenizer
|
||||
.as_ref()
|
||||
.map(|_| Tokenizer::new(self.encoding).unwrap()),
|
||||
buffer: self.buffer.clone(),
|
||||
tokenizer: get_tokenizer(self.encoding).clone(),
|
||||
encoding: self.encoding,
|
||||
})
|
||||
}
|
||||
|
||||
fn options(&self) -> Vec<FilterOption> {
|
||||
vec![FilterOption {
|
||||
name: "count".to_string(),
|
||||
default: None,
|
||||
required: true,
|
||||
}]
|
||||
vec![
|
||||
FilterOption {
|
||||
name: "count".to_string(),
|
||||
default: None,
|
||||
required: true,
|
||||
},
|
||||
FilterOption {
|
||||
name: "encoding".to_string(),
|
||||
default: Some(serde_json::Value::String("cl100k_base".to_string())),
|
||||
required: false,
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Read the last N LLM tokens"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -393,21 +377,9 @@ fn map_lossy_pos_to_bytes(original: &[u8], lossy: &str, lossy_pos: usize) -> usi
|
||||
|
||||
#[ctor::ctor]
|
||||
fn register_token_filters() {
|
||||
register_filter_plugin("head_tokens", || {
|
||||
let mut f = HeadTokensFilter::new(0);
|
||||
f.tokenizer = Some(resolve_tokenizer(&None));
|
||||
Box::new(f)
|
||||
});
|
||||
register_filter_plugin("skip_tokens", || {
|
||||
let mut f = SkipTokensFilter::new(0);
|
||||
f.tokenizer = Some(resolve_tokenizer(&None));
|
||||
Box::new(f)
|
||||
});
|
||||
register_filter_plugin("tail_tokens", || {
|
||||
let mut f = TailTokensFilter::new(0);
|
||||
f.tokenizer = Some(resolve_tokenizer(&None));
|
||||
Box::new(f)
|
||||
});
|
||||
register_filter_plugin("head_tokens", || Box::new(HeadTokensFilter::new(0)));
|
||||
register_filter_plugin("skip_tokens", || Box::new(SkipTokensFilter::new(0)));
|
||||
register_filter_plugin("tail_tokens", || Box::new(TailTokensFilter::new(0)));
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -416,13 +388,13 @@ mod tests {
|
||||
use std::io::Cursor;
|
||||
|
||||
fn make_tokenizer() -> Tokenizer {
|
||||
Tokenizer::new(TokenEncoding::Cl100kBase).unwrap()
|
||||
get_tokenizer(TokenEncoding::Cl100kBase).clone()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_head_tokens_basic() {
|
||||
let mut filter = HeadTokensFilter::new(3);
|
||||
filter.tokenizer = Some(make_tokenizer());
|
||||
filter.tokenizer = make_tokenizer();
|
||||
|
||||
let input = b"The quick brown fox";
|
||||
let mut output = Vec::new();
|
||||
@@ -437,7 +409,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_head_tokens_zero() {
|
||||
let mut filter = HeadTokensFilter::new(0);
|
||||
filter.tokenizer = Some(make_tokenizer());
|
||||
filter.tokenizer = make_tokenizer();
|
||||
|
||||
let input = b"The quick brown fox";
|
||||
let mut output = Vec::new();
|
||||
@@ -448,7 +420,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_head_tokens_more_than_available() {
|
||||
let mut filter = HeadTokensFilter::new(1000);
|
||||
filter.tokenizer = Some(make_tokenizer());
|
||||
filter.tokenizer = make_tokenizer();
|
||||
|
||||
let input = b"Hello world";
|
||||
let mut output = Vec::new();
|
||||
@@ -459,7 +431,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_skip_tokens_basic() {
|
||||
let mut filter = SkipTokensFilter::new(2);
|
||||
filter.tokenizer = Some(make_tokenizer());
|
||||
filter.tokenizer = make_tokenizer();
|
||||
|
||||
let input = b"The quick brown fox";
|
||||
let mut output = Vec::new();
|
||||
@@ -473,7 +445,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_skip_tokens_zero() {
|
||||
let mut filter = SkipTokensFilter::new(0);
|
||||
filter.tokenizer = Some(make_tokenizer());
|
||||
filter.tokenizer = make_tokenizer();
|
||||
|
||||
let input = b"Hello world";
|
||||
let mut output = Vec::new();
|
||||
@@ -484,7 +456,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_tail_tokens_basic() {
|
||||
let mut filter = TailTokensFilter::new(2);
|
||||
filter.tokenizer = Some(make_tokenizer());
|
||||
filter.tokenizer = make_tokenizer();
|
||||
|
||||
let input = b"The quick brown fox jumps over the lazy dog";
|
||||
let mut output = Vec::new();
|
||||
@@ -499,7 +471,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_tail_tokens_zero() {
|
||||
let mut filter = TailTokensFilter::new(0);
|
||||
filter.tokenizer = Some(make_tokenizer());
|
||||
filter.tokenizer = make_tokenizer();
|
||||
|
||||
let input = b"Hello world";
|
||||
let mut output = Vec::new();
|
||||
|
||||
@@ -479,6 +479,71 @@ where
|
||||
vec![self.meta_type().to_string()]
|
||||
}
|
||||
|
||||
/// Returns a description of this plugin for display in config templates.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A description string (empty by default).
|
||||
fn description(&self) -> &str {
|
||||
""
|
||||
}
|
||||
|
||||
/// Builds the schema for this plugin from its options and outputs.
|
||||
///
|
||||
/// Default implementation infers option types from YAML values and
|
||||
/// collects enabled outputs.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A `PluginSchema` describing this plugin's configuration.
|
||||
fn schema(&self) -> crate::common::schema::PluginSchema {
|
||||
use crate::common::schema::{OptionSchema, OptionType, OutputSchema, PluginSchema};
|
||||
|
||||
let options: Vec<OptionSchema> = self
|
||||
.options()
|
||||
.iter()
|
||||
.map(|(key, value)| {
|
||||
let option_type = OptionType::from_yaml_value(value);
|
||||
let (default, required) = if value.is_null() {
|
||||
(None, true)
|
||||
} else {
|
||||
(Some(value.clone()), false)
|
||||
};
|
||||
OptionSchema {
|
||||
name: key.clone(),
|
||||
option_type,
|
||||
default,
|
||||
required,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut outputs: Vec<OutputSchema> = Vec::new();
|
||||
for (key, value) in self.outputs() {
|
||||
if !value.is_null() {
|
||||
outputs.push(OutputSchema {
|
||||
name: key.clone(),
|
||||
description: key.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
if outputs.is_empty() {
|
||||
for output_name in self.default_outputs() {
|
||||
outputs.push(OutputSchema {
|
||||
name: output_name.clone(),
|
||||
description: output_name,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
PluginSchema {
|
||||
name: self.meta_type().to_string(),
|
||||
description: self.description().to_string(),
|
||||
options,
|
||||
outputs,
|
||||
}
|
||||
}
|
||||
|
||||
/// Method to downcast to concrete type (for checking finalization state).
|
||||
///
|
||||
/// # Returns
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::common::PIPESIZE;
|
||||
use crate::common::is_binary::is_binary;
|
||||
use crate::meta_plugin::{MetaPlugin, MetaPluginResponse, MetaPluginType};
|
||||
use crate::tokenizer::{TokenEncoding, Tokenizer};
|
||||
use crate::tokenizer::{TokenEncoding, get_tokenizer};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TokensMetaPlugin {
|
||||
@@ -15,8 +15,8 @@ pub struct TokensMetaPlugin {
|
||||
/// UTF-8 boundary carry buffer.
|
||||
utf8_buffer: Vec<u8>,
|
||||
base: crate::meta_plugin::BaseMetaPlugin,
|
||||
/// The tokenizer instance.
|
||||
tokenizer: Tokenizer,
|
||||
/// The tokenizer encoding.
|
||||
encoding: TokenEncoding,
|
||||
}
|
||||
|
||||
impl TokensMetaPlugin {
|
||||
@@ -59,8 +59,6 @@ impl TokensMetaPlugin {
|
||||
.and_then(|s| s.parse::<TokenEncoding>().ok())
|
||||
.unwrap_or_default();
|
||||
|
||||
let tokenizer = Tokenizer::new(encoding).expect("Failed to create tokenizer");
|
||||
|
||||
Self {
|
||||
buffer: Some(Vec::new()),
|
||||
max_buffer_size,
|
||||
@@ -69,7 +67,7 @@ impl TokensMetaPlugin {
|
||||
token_count: 0,
|
||||
utf8_buffer: Vec::new(),
|
||||
base,
|
||||
tokenizer,
|
||||
encoding,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,36 +75,59 @@ impl TokensMetaPlugin {
|
||||
///
|
||||
/// Combines with any pending UTF-8 carry bytes, converts to text,
|
||||
/// and adds the token count to the running total.
|
||||
///
|
||||
/// Avoids unnecessary allocations when there is no pending UTF-8 carry
|
||||
/// and the data is valid UTF-8.
|
||||
fn count_tokens(&mut self, data: &[u8]) {
|
||||
if data.is_empty() && self.utf8_buffer.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let combined = if !self.utf8_buffer.is_empty() {
|
||||
let mut c = self.utf8_buffer.clone();
|
||||
c.extend_from_slice(data);
|
||||
c
|
||||
} else {
|
||||
data.to_vec()
|
||||
};
|
||||
self.utf8_buffer.clear();
|
||||
let tokenizer = get_tokenizer(self.encoding);
|
||||
|
||||
let text = match std::str::from_utf8(&combined) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
let valid = e.valid_up_to();
|
||||
if valid < combined.len() {
|
||||
self.utf8_buffer.extend_from_slice(&combined[valid..]);
|
||||
if self.utf8_buffer.is_empty() {
|
||||
// Fast path: no pending carry — try to use data directly
|
||||
match std::str::from_utf8(data) {
|
||||
Ok(text) => {
|
||||
if !text.is_empty() {
|
||||
self.token_count += tokenizer.count(text);
|
||||
}
|
||||
return;
|
||||
}
|
||||
match std::str::from_utf8(&combined[..valid]) {
|
||||
Ok(t) => t,
|
||||
Err(_) => return,
|
||||
Err(e) => {
|
||||
let valid_up_to = e.valid_up_to();
|
||||
if valid_up_to > 0 {
|
||||
// Count the valid prefix without copying
|
||||
let text =
|
||||
std::str::from_utf8(&data[..valid_up_to]).expect("validated prefix");
|
||||
self.token_count += tokenizer.count(text);
|
||||
}
|
||||
// Save invalid trailing bytes for next call
|
||||
self.utf8_buffer.extend_from_slice(&data[valid_up_to..]);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if !text.is_empty() {
|
||||
self.token_count += self.tokenizer.count(text);
|
||||
// Slow path: pending carry bytes — must build combined buffer
|
||||
let mut combined = std::mem::take(&mut self.utf8_buffer);
|
||||
combined.extend_from_slice(data);
|
||||
|
||||
match std::str::from_utf8(&combined) {
|
||||
Ok(text) => {
|
||||
if !text.is_empty() {
|
||||
self.token_count += tokenizer.count(text);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let valid_up_to = e.valid_up_to();
|
||||
if valid_up_to > 0 {
|
||||
let text =
|
||||
std::str::from_utf8(&combined[..valid_up_to]).expect("validated prefix");
|
||||
self.token_count += tokenizer.count(text);
|
||||
}
|
||||
self.utf8_buffer.extend_from_slice(&combined[valid_up_to..]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,8 +170,8 @@ impl MetaPlugin for TokensMetaPlugin {
|
||||
};
|
||||
|
||||
if should_detect {
|
||||
let buf_clone = self.buffer.as_ref().unwrap().clone();
|
||||
let is_binary = self.detect_binary(&buf_clone);
|
||||
let buffer_data = self.buffer.as_ref().unwrap().clone();
|
||||
let is_binary = self.detect_binary(&buffer_data);
|
||||
|
||||
if is_binary {
|
||||
if let Some(md) = crate::meta_plugin::process_metadata_outputs(
|
||||
@@ -168,19 +189,10 @@ impl MetaPlugin for TokensMetaPlugin {
|
||||
};
|
||||
}
|
||||
|
||||
// It's text — tokenize the full accumulated buffer
|
||||
self.count_tokens(&buf_clone);
|
||||
|
||||
if buf_clone.len() >= self.max_buffer_size {
|
||||
self.buffer = None;
|
||||
}
|
||||
} else if self.buffer.is_some() {
|
||||
// Still building up buffer — tokenize what was just added
|
||||
let remaining = self
|
||||
.max_buffer_size
|
||||
.saturating_sub(self.buffer.as_ref().map_or(0, |b| b.len()));
|
||||
let to_take = std::cmp::min(data.len(), remaining);
|
||||
self.count_tokens(&data[..to_take]);
|
||||
// It's text — tokenize the full buffer (nothing was counted yet),
|
||||
// then clear to avoid double-counting in finalize().
|
||||
self.count_tokens(&buffer_data);
|
||||
self.buffer = Some(Vec::new());
|
||||
}
|
||||
} else if self.is_binary_content == Some(false) {
|
||||
self.count_tokens(data);
|
||||
@@ -212,8 +224,8 @@ impl MetaPlugin for TokensMetaPlugin {
|
||||
if self.is_binary_content.is_none() {
|
||||
if let Some(buffer) = &self.buffer {
|
||||
if !buffer.is_empty() {
|
||||
let buf_clone = buffer.clone();
|
||||
let is_binary = self.detect_binary(&buf_clone);
|
||||
let buffer_data = buffer.clone();
|
||||
let is_binary = self.detect_binary(&buffer_data);
|
||||
|
||||
if is_binary {
|
||||
if let Some(md) = crate::meta_plugin::process_metadata_outputs(
|
||||
@@ -234,6 +246,12 @@ impl MetaPlugin for TokensMetaPlugin {
|
||||
}
|
||||
}
|
||||
|
||||
// Tokenize any bytes in the buffer
|
||||
if let Some(buffer) = &self.buffer {
|
||||
let data = buffer.clone();
|
||||
self.count_tokens(&data);
|
||||
}
|
||||
|
||||
// Process any remaining UTF-8 bytes
|
||||
if !self.utf8_buffer.is_empty() {
|
||||
self.count_tokens(&[]);
|
||||
|
||||
@@ -1,81 +1,17 @@
|
||||
use crate::meta_plugin::MetaPlugin;
|
||||
use anyhow::Result;
|
||||
use clap::Command;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_yaml;
|
||||
use std::collections::HashMap;
|
||||
use strum::IntoEnumIterator;
|
||||
|
||||
/// Mode for generating a default configuration file.
|
||||
///
|
||||
/// This module creates a commented YAML template with default values for settings,
|
||||
/// including list format, server config, compression, and meta plugins.
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
/// Default configuration structure for the generated template.
|
||||
///
|
||||
/// Includes core settings, list formatting, server options, compression, and meta plugins.
|
||||
struct DefaultConfig {
|
||||
dir: Option<String>,
|
||||
list_format: Vec<ColumnConfig>,
|
||||
human_readable: bool,
|
||||
output_format: Option<String>,
|
||||
quiet: bool,
|
||||
force: bool,
|
||||
server: Option<ServerConfig>,
|
||||
compression_plugin: Option<CompressionPluginConfig>,
|
||||
meta_plugins: Option<Vec<MetaPluginConfig>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
/// Configuration for a column in the list format.
|
||||
struct ColumnConfig {
|
||||
name: String,
|
||||
label: Option<String>,
|
||||
#[serde(default)]
|
||||
align: ColumnAlignment,
|
||||
#[serde(default)]
|
||||
max_len: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
/// Alignment options for table columns.
|
||||
enum ColumnAlignment {
|
||||
#[default]
|
||||
Left,
|
||||
Right,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
/// Server configuration options.
|
||||
struct ServerConfig {
|
||||
address: Option<String>,
|
||||
port: Option<u16>,
|
||||
password_file: Option<String>,
|
||||
password: Option<String>,
|
||||
password_hash: Option<String>,
|
||||
cors_origin: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
/// Configuration for the compression plugin.
|
||||
struct CompressionPluginConfig {
|
||||
name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
/// Configuration for a meta plugin.
|
||||
struct MetaPluginConfig {
|
||||
name: String,
|
||||
#[serde(default)]
|
||||
options: std::collections::HashMap<String, serde_yaml::Value>,
|
||||
#[serde(default)]
|
||||
outputs: std::collections::HashMap<String, String>,
|
||||
}
|
||||
use crate::common::schema::{gather_filter_plugin_schemas, gather_meta_plugin_schemas};
|
||||
use crate::compression_engine::CompressionType;
|
||||
use crate::config;
|
||||
|
||||
/// Generates and prints a default commented YAML configuration template.
|
||||
///
|
||||
/// Creates instances of available meta plugins to populate default options and outputs,
|
||||
/// then serializes the config to YAML with all lines commented for easy editing.
|
||||
/// Discovers all registered meta plugins, filter plugins, and compression engines
|
||||
/// at runtime via the plugin schema system. Outputs a commented YAML template
|
||||
/// with all available plugins and their default options/outputs.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
@@ -85,153 +21,244 @@ struct MetaPluginConfig {
|
||||
/// # Returns
|
||||
///
|
||||
/// `Ok(())` on success.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```ignore
|
||||
/// // Example usage requires Command and Settings instances
|
||||
/// mode_generate_config(&mut cmd, &settings)?;
|
||||
/// ```
|
||||
pub fn mode_generate_config(_cmd: &mut Command, _settings: &crate::config::Settings) -> Result<()> {
|
||||
// Create instances of each meta plugin to get their default options and outputs
|
||||
let cwd_plugin = crate::meta_plugin::cwd::CwdMetaPlugin::new(None, None);
|
||||
let digest_plugin = crate::meta_plugin::digest::DigestMetaPlugin::new(None, None);
|
||||
let hostname_plugin = crate::meta_plugin::hostname::HostnameMetaPlugin::new(None, None);
|
||||
#[cfg(feature = "magic")]
|
||||
let magic_file_plugin = crate::meta_plugin::magic_file::MagicFileMetaPlugin::new(None, None);
|
||||
let env_plugin = crate::meta_plugin::env::EnvMetaPlugin::new(None, None);
|
||||
let meta_schemas = gather_meta_plugin_schemas();
|
||||
let filter_schemas = gather_filter_plugin_schemas();
|
||||
|
||||
// Create a default configuration
|
||||
let default_config = DefaultConfig {
|
||||
dir: Some("~/.local/share/keep".to_string()),
|
||||
list_format: vec![
|
||||
ColumnConfig {
|
||||
name: "id".to_string(),
|
||||
label: Some("Item".to_string()),
|
||||
align: ColumnAlignment::Right,
|
||||
max_len: None,
|
||||
},
|
||||
ColumnConfig {
|
||||
name: "time".to_string(),
|
||||
label: Some("Time".to_string()),
|
||||
align: ColumnAlignment::Right,
|
||||
max_len: None,
|
||||
},
|
||||
ColumnConfig {
|
||||
name: "size".to_string(),
|
||||
label: Some("Size".to_string()),
|
||||
align: ColumnAlignment::Right,
|
||||
max_len: None,
|
||||
},
|
||||
ColumnConfig {
|
||||
name: "tags".to_string(),
|
||||
label: Some("Tags".to_string()),
|
||||
align: ColumnAlignment::Left,
|
||||
max_len: Some("40".to_string()),
|
||||
},
|
||||
ColumnConfig {
|
||||
name: "meta:hostname_full".to_string(),
|
||||
label: Some("Hostname".to_string()),
|
||||
align: ColumnAlignment::Left,
|
||||
max_len: Some("28".to_string()),
|
||||
},
|
||||
],
|
||||
human_readable: false,
|
||||
output_format: Some("table".to_string()),
|
||||
quiet: false,
|
||||
force: false,
|
||||
server: Some(ServerConfig {
|
||||
address: Some("127.0.0.1".to_string()),
|
||||
port: Some(8080),
|
||||
password_file: None,
|
||||
password: None,
|
||||
password_hash: None,
|
||||
cors_origin: None,
|
||||
}),
|
||||
compression_plugin: None,
|
||||
meta_plugins: Some(vec![
|
||||
MetaPluginConfig {
|
||||
name: "cwd".to_string(),
|
||||
options: cwd_plugin.options().clone(),
|
||||
outputs: convert_outputs_to_string_map(cwd_plugin.outputs()),
|
||||
},
|
||||
MetaPluginConfig {
|
||||
name: "digest".to_string(),
|
||||
options: digest_plugin.options().clone(),
|
||||
outputs: convert_outputs_to_string_map(digest_plugin.outputs()),
|
||||
},
|
||||
MetaPluginConfig {
|
||||
name: "hostname".to_string(),
|
||||
options: hostname_plugin.options().clone(),
|
||||
outputs: convert_outputs_to_string_map(hostname_plugin.outputs()),
|
||||
},
|
||||
#[cfg(feature = "magic")]
|
||||
MetaPluginConfig {
|
||||
name: "magic_file".to_string(),
|
||||
options: magic_file_plugin.options().clone(),
|
||||
outputs: convert_outputs_to_string_map(magic_file_plugin.outputs()),
|
||||
},
|
||||
MetaPluginConfig {
|
||||
name: "env".to_string(),
|
||||
options: env_plugin.options().clone(),
|
||||
outputs: convert_outputs_to_string_map(env_plugin.outputs()),
|
||||
},
|
||||
]),
|
||||
};
|
||||
// Build list_format defaults matching config.rs
|
||||
let list_format = default_list_format();
|
||||
|
||||
// Serialize to YAML and comment out all lines
|
||||
let yaml = serde_yaml::to_string(&default_config)?;
|
||||
// Build meta_plugins with env as the default (active), rest commented
|
||||
let meta_plugins = build_meta_plugins_section(&meta_schemas);
|
||||
|
||||
// Comment out every line
|
||||
let commented_yaml = yaml
|
||||
.lines()
|
||||
.map(|line| {
|
||||
if line.trim().is_empty() {
|
||||
line.to_string()
|
||||
} else {
|
||||
format!("# {line}")
|
||||
// Build the full YAML
|
||||
let mut lines = Vec::with_capacity(128);
|
||||
|
||||
lines.push("# Keep configuration file".to_string());
|
||||
lines.push("# Uncomment and modify the settings you need.".to_string());
|
||||
lines.push(String::new());
|
||||
|
||||
// Core settings
|
||||
lines.push("# Data directory for storing items".to_string());
|
||||
lines.push("dir: ~/.local/share/keep".to_string());
|
||||
lines.push(String::new());
|
||||
|
||||
// List format
|
||||
lines.push("# Column configuration for --list output".to_string());
|
||||
lines.push("list_format:".to_string());
|
||||
for col in &list_format {
|
||||
lines.push(format!(" - name: {}", col.name));
|
||||
lines.push(format!(" label: {}", col.label));
|
||||
lines.push(format!(" align: {}", col.align));
|
||||
}
|
||||
lines.push(String::new());
|
||||
|
||||
// Table config
|
||||
lines.push("# Table display configuration".to_string());
|
||||
lines.push("#table_config:".to_string());
|
||||
lines.push("# style: nothing".to_string());
|
||||
lines.push("# modifiers: []".to_string());
|
||||
lines.push("# content_arrangement: dynamic".to_string());
|
||||
lines.push("# truncination_indicator: \"\"".to_string());
|
||||
lines.push(String::new());
|
||||
|
||||
// Other settings
|
||||
lines.push("human_readable: false".to_string());
|
||||
lines.push("output_format: table".to_string());
|
||||
lines.push("quiet: false".to_string());
|
||||
lines.push("force: false".to_string());
|
||||
lines.push(String::new());
|
||||
|
||||
// Server config
|
||||
lines.push("# Server configuration (only used with --server)".to_string());
|
||||
lines.push("server:".to_string());
|
||||
lines.push(" address: 127.0.0.1".to_string());
|
||||
lines.push(" port: 8080".to_string());
|
||||
lines.push("# username: keep".to_string());
|
||||
lines.push("# password: null".to_string());
|
||||
lines.push("# password_file: null".to_string());
|
||||
lines.push("# password_hash: null".to_string());
|
||||
lines.push("# jwt_secret: null".to_string());
|
||||
lines.push("# jwt_secret_file: null".to_string());
|
||||
lines.push("# cert_file: null".to_string());
|
||||
lines.push("# key_file: null".to_string());
|
||||
lines.push("# cors_origin: null".to_string());
|
||||
lines.push(String::new());
|
||||
|
||||
// Compression plugin
|
||||
lines.push("# Compression plugin to use".to_string());
|
||||
lines.push("#compression_plugin:".to_string());
|
||||
let mut comp_types: Vec<String> = CompressionType::iter().map(|ct| ct.to_string()).collect();
|
||||
comp_types.sort();
|
||||
for ct in &comp_types {
|
||||
lines.push(format!("# name: {ct} # {}", compression_description(ct)));
|
||||
}
|
||||
lines.push(String::new());
|
||||
|
||||
// Meta plugins
|
||||
lines.push("# Meta plugins to run when saving items".to_string());
|
||||
lines.push("meta_plugins:".to_string());
|
||||
for line in &meta_plugins {
|
||||
lines.push(line.clone());
|
||||
}
|
||||
lines.push(String::new());
|
||||
|
||||
// Filter plugins reference
|
||||
if !filter_schemas.is_empty() {
|
||||
lines.push("# Available filter plugins (use with --filter)".to_string());
|
||||
for schema in &filter_schemas {
|
||||
lines.push(format!("# {}", schema.name));
|
||||
if !schema.description.is_empty() {
|
||||
lines.push(format!("# {}", schema.description));
|
||||
}
|
||||
})
|
||||
.collect::<Vec<String>>()
|
||||
.join("\n");
|
||||
for opt in &schema.options {
|
||||
let req = if opt.required { "required" } else { "optional" };
|
||||
lines.push(format!(
|
||||
"# {} ({:?}, {})",
|
||||
opt.name, opt.option_type, req
|
||||
));
|
||||
}
|
||||
}
|
||||
lines.push(String::new());
|
||||
}
|
||||
|
||||
println!("{commented_yaml}");
|
||||
// Client config
|
||||
lines.push("# Client configuration (requires client feature)".to_string());
|
||||
lines.push("#client:".to_string());
|
||||
lines.push("# url: null".to_string());
|
||||
lines.push("# username: null".to_string());
|
||||
lines.push("# password: null".to_string());
|
||||
lines.push("# jwt: null".to_string());
|
||||
|
||||
// Print
|
||||
for line in &lines {
|
||||
println!("{line}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function to convert outputs from serde_yaml::Value to String.
|
||||
///
|
||||
/// Handles null (uses key), strings, and other values by serializing to YAML string.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `outputs` - Reference to the outputs HashMap.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A HashMap with string keys and values.
|
||||
fn convert_outputs_to_string_map(
|
||||
outputs: &std::collections::HashMap<String, serde_yaml::Value>,
|
||||
) -> std::collections::HashMap<String, String> {
|
||||
let mut result = std::collections::HashMap::new();
|
||||
for (key, value) in outputs {
|
||||
match value {
|
||||
serde_yaml::Value::Null => {
|
||||
// For null, use the key as the value
|
||||
result.insert(key.clone(), key.clone());
|
||||
struct ListColumn {
|
||||
name: String,
|
||||
label: String,
|
||||
align: String,
|
||||
}
|
||||
|
||||
fn default_list_format() -> Vec<ListColumn> {
|
||||
vec![
|
||||
ListColumn {
|
||||
name: "id".into(),
|
||||
label: "Item".into(),
|
||||
align: "right".into(),
|
||||
},
|
||||
ListColumn {
|
||||
name: "time".into(),
|
||||
label: "Time".into(),
|
||||
align: "right".into(),
|
||||
},
|
||||
ListColumn {
|
||||
name: "size".into(),
|
||||
label: "Size".into(),
|
||||
align: "right".into(),
|
||||
},
|
||||
ListColumn {
|
||||
name: "meta:text_line_count".into(),
|
||||
label: "Lines".into(),
|
||||
align: "right".into(),
|
||||
},
|
||||
ListColumn {
|
||||
name: "tags".into(),
|
||||
label: "Tags".into(),
|
||||
align: "left".into(),
|
||||
},
|
||||
ListColumn {
|
||||
name: "meta:hostname_short".into(),
|
||||
label: "Host".into(),
|
||||
align: "left".into(),
|
||||
},
|
||||
ListColumn {
|
||||
name: "meta:command".into(),
|
||||
label: "Command".into(),
|
||||
align: "left".into(),
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
fn build_meta_plugins_section(schemas: &[crate::common::schema::PluginSchema]) -> Vec<String> {
|
||||
let mut lines = Vec::new();
|
||||
|
||||
for (i, schema) in schemas.iter().enumerate() {
|
||||
let is_default = schema.name == "env";
|
||||
let prefix = if is_default { "" } else { "# " };
|
||||
|
||||
if i > 0 {
|
||||
lines.push(format!("{prefix}# --- {name} ---", name = schema.name));
|
||||
}
|
||||
|
||||
lines.push(format!("{prefix}- name: {}", schema.name));
|
||||
|
||||
// Options
|
||||
if !schema.options.is_empty() {
|
||||
lines.push(format!("{prefix} options:"));
|
||||
for opt in &schema.options {
|
||||
if let Some(ref default) = opt.default {
|
||||
let default_str = format_yaml_value(default);
|
||||
lines.push(format!("{prefix} {}: {}", opt.name, default_str));
|
||||
} else if opt.required {
|
||||
lines.push(format!("{prefix} {}: null # required", opt.name));
|
||||
}
|
||||
}
|
||||
serde_yaml::Value::String(s) => {
|
||||
result.insert(key.clone(), s.clone());
|
||||
}
|
||||
_ => {
|
||||
// Convert other values to their YAML string representation
|
||||
result.insert(
|
||||
key.clone(),
|
||||
serde_yaml::to_string(value).unwrap_or_default(),
|
||||
);
|
||||
} else {
|
||||
lines.push(format!("{prefix} options: {{}}"));
|
||||
}
|
||||
|
||||
// Outputs
|
||||
if !schema.outputs.is_empty() {
|
||||
lines.push(format!("{prefix} outputs:"));
|
||||
for output in &schema.outputs {
|
||||
lines.push(format!("{prefix} {}: {}", output.name, output.name));
|
||||
}
|
||||
} else {
|
||||
lines.push(format!("{prefix} outputs: {{}}"));
|
||||
}
|
||||
}
|
||||
result
|
||||
|
||||
lines
|
||||
}
|
||||
|
||||
fn format_yaml_value(value: &serde_yaml::Value) -> String {
|
||||
match value {
|
||||
serde_yaml::Value::Null => "null".into(),
|
||||
serde_yaml::Value::Bool(b) => b.to_string(),
|
||||
serde_yaml::Value::Number(n) => n.to_string(),
|
||||
serde_yaml::Value::String(s) => {
|
||||
if s.contains(' ') || s.contains(':') || s.contains('#') {
|
||||
format!("\"{s}\"")
|
||||
} else {
|
||||
s.clone()
|
||||
}
|
||||
}
|
||||
serde_yaml::Value::Sequence(_) | serde_yaml::Value::Mapping(_) => {
|
||||
serde_yaml::to_string(value)
|
||||
.unwrap_or_default()
|
||||
.trim()
|
||||
.to_string()
|
||||
}
|
||||
serde_yaml::Value::Tagged(_) => serde_yaml::to_string(value)
|
||||
.unwrap_or_default()
|
||||
.trim()
|
||||
.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn compression_description(name: &str) -> &str {
|
||||
match name {
|
||||
"lz4" => "Fast compression (native)",
|
||||
"gzip" => "Good compression ratio (native)",
|
||||
"bzip2" => "High compression (requires bzip2 binary)",
|
||||
"xz" => "Very high compression (requires xz binary)",
|
||||
"zstd" => "Modern fast compression (requires zstd binary)",
|
||||
"none" => "No compression",
|
||||
_ => "",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use anyhow::{Result, bail};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
/// Supported LLM token encodings.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
@@ -46,6 +47,25 @@ impl std::fmt::Debug for Tokenizer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Static tokenizer instances — loaded once per process, shared across all plugins.
|
||||
static CL100K: Lazy<Tokenizer> = Lazy::new(|| {
|
||||
Tokenizer::new(TokenEncoding::Cl100kBase).expect("Failed to create cl100k_base tokenizer")
|
||||
});
|
||||
static O200K: Lazy<Tokenizer> = Lazy::new(|| {
|
||||
Tokenizer::new(TokenEncoding::O200kBase).expect("Failed to create o200k_base tokenizer")
|
||||
});
|
||||
|
||||
/// Returns a reference to a cached tokenizer for the given encoding.
|
||||
///
|
||||
/// The BPE vocabulary is loaded once per encoding and reused for the
|
||||
/// lifetime of the process.
|
||||
pub fn get_tokenizer(encoding: TokenEncoding) -> &'static Tokenizer {
|
||||
match encoding {
|
||||
TokenEncoding::Cl100kBase => &CL100K,
|
||||
TokenEncoding::O200kBase => &O200K,
|
||||
}
|
||||
}
|
||||
|
||||
impl Tokenizer {
|
||||
/// Creates a new tokenizer for the specified encoding.
|
||||
pub fn new(encoding: TokenEncoding) -> Result<Self> {
|
||||
@@ -74,6 +94,37 @@ impl Tokenizer {
|
||||
self.bpe.split_by_token(text, false)
|
||||
}
|
||||
|
||||
/// Returns an iterator over decoded token strings.
|
||||
///
|
||||
/// Lazily produces token strings without allocating a Vec for all tokens.
|
||||
/// Use this when you only need the first N tokens (e.g., head/skip filters).
|
||||
pub fn split_by_token_iter<'a>(
|
||||
&'a self,
|
||||
text: &'a str,
|
||||
) -> impl Iterator<Item = Result<String>> + 'a {
|
||||
self.bpe.split_by_token_iter(text, false)
|
||||
}
|
||||
|
||||
/// Counts tokens up to `max_tokens` and returns `(token_count, byte_position)`.
|
||||
///
|
||||
/// Uses an iterator to stop early, avoiding allocation of token strings
|
||||
/// beyond `max_tokens`. The byte_position is in the lossy UTF-8 encoding
|
||||
/// of `text` — use `map_lossy_pos_to_bytes` to map back to original bytes.
|
||||
pub fn count_bounded(&self, text: &str, max_tokens: usize) -> (usize, usize) {
|
||||
let mut count = 0usize;
|
||||
let mut byte_pos = 0usize;
|
||||
for token_str in self.bpe.split_by_token_iter(text, false) {
|
||||
if let Ok(s) = token_str {
|
||||
byte_pos += s.len();
|
||||
}
|
||||
count += 1;
|
||||
if count >= max_tokens {
|
||||
break;
|
||||
}
|
||||
}
|
||||
(count, byte_pos)
|
||||
}
|
||||
|
||||
/// Decodes a slice of token IDs back into a string.
|
||||
pub fn decode(&self, tokens: &[u32]) -> Result<String> {
|
||||
self.bpe.decode(tokens.to_vec())
|
||||
|
||||
Reference in New Issue
Block a user