feat(processing_engine): log processing engine logging calls to sys events. (#25939)

chore/fix-build
Jackson Newhouse 2025-02-04 15:16:04 -08:00 committed by GitHub
parent fa18b6d8da
commit fbcb9403c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 405 additions and 42 deletions

5
Cargo.lock generated
View File

@ -3015,6 +3015,7 @@ dependencies = [
"influxdb3_client",
"influxdb3_internal_api",
"influxdb3_py_api",
"influxdb3_sys_events",
"influxdb3_types",
"influxdb3_wal",
"influxdb3_write",
@ -3043,11 +3044,14 @@ dependencies = [
"chrono",
"futures",
"hashbrown 0.15.2",
"humantime",
"influxdb3_catalog",
"influxdb3_id",
"influxdb3_internal_api",
"influxdb3_sys_events",
"influxdb3_wal",
"iox_query_params",
"iox_time",
"observability_deps",
"parking_lot",
"pyo3",
@ -3090,6 +3094,7 @@ dependencies = [
"influxdb3_internal_api",
"influxdb3_process",
"influxdb3_processing_engine",
"influxdb3_py_api",
"influxdb3_sys_events",
"influxdb3_telemetry",
"influxdb3_types",

View File

@ -22,6 +22,7 @@ use influxdb3_processing_engine::environment::{
DisabledManager, PipManager, PythonEnvironmentManager, UVManager,
};
use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager;
use influxdb3_processing_engine::ProcessingEngineManagerImpl;
use influxdb3_server::{
auth::AllOrNothingAuthorizer,
builder::ServerBuilder,
@ -595,7 +596,6 @@ pub async fn command(config: Config) -> Result<()> {
trace_exporter,
trace_header_parser,
Arc::clone(&telemetry_store),
setup_processing_engine_env_manager(&config.processing_engine_config),
)?;
let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
@ -613,13 +613,24 @@ pub async fn command(config: Config) -> Result<()> {
.await
.map_err(Error::BindAddress)?;
let processing_engine = ProcessingEngineManagerImpl::new(
setup_processing_engine_env_manager(&config.processing_engine_config),
write_buffer.catalog(),
Arc::clone(&write_buffer),
Arc::clone(&query_executor) as _,
Arc::clone(&time_provider) as _,
write_buffer.wal(),
sys_events_store,
);
let builder = ServerBuilder::new(common_state)
.max_request_size(config.max_http_request_size)
.write_buffer(write_buffer)
.query_executor(query_executor)
.time_provider(time_provider)
.persister(persister)
.tcp_listener(listener);
.tcp_listener(listener)
.processing_engine(processing_engine);
let server = if let Some(token) = config.bearer_token.map(hex::decode).transpose()? {
builder

View File

@ -2,4 +2,4 @@
source: influxdb3/tests/cli/mod.rs
expression: output
---
Show command failed: system table 'cpu' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_triggers", "queries"]
Show command failed: system table 'cpu' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_logs", "processing_engine_triggers", "queries"]

View File

@ -2,4 +2,4 @@
source: influxdb3/tests/cli/mod.rs
expression: output
---
Show command failed: system table 'meow' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_triggers", "queries"]
Show command failed: system table 'meow' not found: please use a valid system table name: ["distinct_caches", "last_caches", "parquet_files", "processing_engine_logs", "processing_engine_triggers", "queries"]

View File

@ -17,6 +17,9 @@ parquet_files summary:
| table_name | path | size_bytes | row_count | min_time | max_time |
+------------+------+------------+-----------+----------+----------+
+------------+------+------------+-----------+----------+----------+
processing_engine_logs summary:
++
++
processing_engine_triggers summary:
++
++

View File

@ -8,6 +8,7 @@ expression: output
| distinct_caches | [table, name, column_ids, column_names, max_cardinality, max_age_seconds] |
| last_caches | [table, name, key_column_ids, key_column_names, value_column_ids, value_column_names, count, ttl] |
| parquet_files | [table_name, path, size_bytes, row_count, min_time, max_time] |
| processing_engine_logs | [event_time, trigger_name, log_level, log_text] |
| processing_engine_triggers | [trigger_name, plugin_filename, trigger_specification, disabled] |
| queries | [id, phase, issue_time, query_type, query_text, partitions, parquet_files, plan_duration, permit_duration, execute_duration, end2end_duration, compute_duration, max_memory, success, running, cancelled, trace_id] |
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -120,6 +120,7 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
"| public | system | distinct_caches | BASE TABLE |",
"| public | system | last_caches | BASE TABLE |",
"| public | system | parquet_files | BASE TABLE |",
"| public | system | processing_engine_logs | BASE TABLE |",
"| public | system | processing_engine_triggers | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+--------------+--------------------+----------------------------+------------+",

View File

@ -1,4 +1,5 @@
use crate::cli::run_with_confirmation;
#[cfg(feature = "system-py")]
use crate::server::{ConfigProvider, TestServer};
use anyhow::{bail, Result};
use serde_json::Value;

View File

@ -24,6 +24,7 @@ influxdb3_client = { path = "../influxdb3_client" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_py_api = { path = "../influxdb3_py_api" }
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_sys_events = { path = "../influxdb3_sys_events"}
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
observability_deps.workspace = true

View File

@ -10,6 +10,7 @@ use hyper::{Body, Response};
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound;
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_sys_events::SysEventStore;
use influxdb3_types::http::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
@ -44,6 +45,8 @@ pub struct ProcessingEngineManagerImpl {
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
time_provider: Arc<dyn TimeProvider>,
#[allow(unused)]
sys_event_store: Arc<SysEventStore>,
wal: Arc<dyn Wal>,
plugin_event_tx: RwLock<PluginChannels>,
}
@ -212,6 +215,7 @@ impl ProcessingEngineManagerImpl {
query_executor: Arc<dyn QueryExecutor>,
time_provider: Arc<dyn TimeProvider>,
wal: Arc<dyn Wal>,
sys_event_store: Arc<SysEventStore>,
) -> Self {
// if given a plugin dir, try to initialize the virtualenv.
#[cfg(feature = "system-py")]
@ -229,6 +233,7 @@ impl ProcessingEngineManagerImpl {
catalog,
write_buffer,
query_executor,
sys_event_store,
time_provider,
wal,
plugin_event_tx: Default::default(),
@ -442,6 +447,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let plugin_context = PluginContext {
write_buffer,
query_executor,
sys_event_store: Arc::clone(&self.sys_event_store),
};
let plugin_code = self.read_plugin_code(&trigger.plugin_filename).await?;
match trigger.trigger.plugin_type() {
@ -788,6 +794,7 @@ mod tests {
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_catalog::catalog;
use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor;
use influxdb3_sys_events::SysEventStore;
use influxdb3_wal::{Gen1Duration, TriggerSpecificationDefinition, WalConfig};
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs};
@ -1046,6 +1053,8 @@ def process_writes(influxdb3_local, table_batches, args=None):
package_manager: Arc::new(DisabledManager),
};
let sys_event_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider)));
(
ProcessingEngineManagerImpl::new(
environment_manager,
@ -1054,6 +1063,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
qe,
time_provider,
wal,
sys_event_store,
),
file,
)

View File

@ -9,6 +9,8 @@ use influxdb3_catalog::catalog::Catalog;
#[cfg(feature = "system-py")]
use influxdb3_internal_api::query_executor::QueryExecutor;
#[cfg(feature = "system-py")]
use influxdb3_sys_events::SysEventStore;
#[cfg(feature = "system-py")]
use influxdb3_types::http::{WalPluginTestRequest, WalPluginTestResponse};
use influxdb3_wal::Gen1Duration;
#[cfg(feature = "system-py")]
@ -95,6 +97,7 @@ pub(crate) fn run_wal_contents_plugin(
plugin_code,
write_buffer: context.write_buffer,
query_executor: context.query_executor,
sys_event_store: context.sys_event_store,
};
tokio::task::spawn(async move {
trigger_plugin
@ -135,6 +138,7 @@ pub(crate) fn run_schedule_plugin(
plugin_code,
write_buffer: context.write_buffer,
query_executor: context.query_executor,
sys_event_store: context.sys_event_store,
};
let runner = python_plugin::ScheduleTriggerRunner::try_new(
@ -165,6 +169,7 @@ pub(crate) fn run_request_plugin(
plugin_code,
write_buffer: context.write_buffer,
query_executor: context.query_executor,
sys_event_store: context.sys_event_store,
};
tokio::task::spawn(async move {
trigger_plugin
@ -180,6 +185,8 @@ pub(crate) struct PluginContext {
pub(crate) write_buffer: Arc<dyn WriteBuffer>,
// query executor to hand off to the plugin
pub(crate) query_executor: Arc<dyn QueryExecutor>,
// sys events for writing logs to ring buffers
pub(crate) sys_event_store: Arc<SysEventStore>,
}
#[cfg(feature = "system-py")]
@ -190,6 +197,7 @@ struct TriggerPlugin {
db_name: String,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
sys_event_store: Arc<SysEventStore>,
}
#[cfg(feature = "system-py")]
@ -205,7 +213,7 @@ mod python_plugin {
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_py_api::system_py::{
execute_python_with_batch, execute_request_trigger, execute_schedule_trigger,
PluginReturnState,
PluginReturnState, ProcessingEngineLogger,
};
use influxdb3_wal::{WalContents, WalOp};
use influxdb3_write::Precision;
@ -306,6 +314,10 @@ mod python_plugin {
self.plugin_code.code().as_ref(),
Arc::clone(&schema),
Arc::clone(&self.query_executor),
Some(ProcessingEngineLogger::new(
Arc::clone(&self.sys_event_store),
self.trigger_definition.trigger_name.as_str(),
)),
&self.trigger_definition.trigger_arguments,
request.query_params,
request.headers,
@ -418,6 +430,10 @@ mod python_plugin {
write_batch,
Arc::clone(&schema),
Arc::clone(&self.query_executor),
Some(ProcessingEngineLogger::new(
Arc::clone(&self.sys_event_store),
self.trigger_definition.trigger_name.as_str(),
)),
table_filter,
&self.trigger_definition.trigger_arguments,
)?;
@ -569,6 +585,10 @@ mod python_plugin {
trigger_time,
Arc::clone(&db_schema),
Arc::clone(&plugin.query_executor),
Some(ProcessingEngineLogger::new(
Arc::clone(&plugin.sys_event_store),
plugin.trigger_definition.trigger_name.as_str(),
)),
&plugin.trigger_definition.trigger_arguments,
)?;
@ -635,6 +655,7 @@ pub(crate) fn run_test_wal_plugin(
db,
query_executor,
None,
None,
&request.input_arguments,
)?;
@ -761,6 +782,7 @@ pub(crate) fn run_test_schedule_plugin(
schedule_time,
db,
query_executor,
None,
&request.input_arguments,
)?;

View File

@ -15,10 +15,13 @@ arrow-schema.workspace = true
bytes.workspace = true
chrono.workspace = true
hashbrown.workspace = true
humantime.workspace = true
iox_time.workspace = true
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_catalog = {path = "../influxdb3_catalog"}
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
iox_query_params.workspace = true
observability_deps.workspace = true
parking_lot.workspace = true

View File

@ -13,5 +13,6 @@ pub enum ExecutePluginError {
PluginError(#[from] anyhow::Error),
}
pub mod logging;
#[cfg(feature = "system-py")]
pub mod system_py;

View File

@ -0,0 +1,90 @@
use arrow_array::builder::{StringBuilder, TimestampNanosecondBuilder};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit};
use influxdb3_sys_events::{Event, RingBuffer, ToRecordBatch};
use iox_time::Time;
use std::fmt::Display;
use std::sync::Arc;
#[derive(Debug)]
pub struct ProcessingEngineLog {
event_time: Time,
log_level: LogLevel,
trigger_name: Arc<str>,
log_line: String,
}
#[derive(Debug, Copy, Clone)]
pub enum LogLevel {
Info,
Warn,
Error,
}
impl Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogLevel::Info => write!(f, "INFO"),
LogLevel::Warn => write!(f, "WARN"),
LogLevel::Error => write!(f, "ERROR"),
}
}
}
impl ProcessingEngineLog {
pub fn new(
event_time: Time,
log_level: LogLevel,
trigger_name: Arc<str>,
log_line: String,
) -> Self {
Self {
event_time,
log_level,
trigger_name,
log_line,
}
}
}
impl ToRecordBatch<ProcessingEngineLog> for ProcessingEngineLog {
fn schema() -> Schema {
let fields = vec![
Field::new(
"event_time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("trigger_name", DataType::Utf8, false),
Field::new("log_level", DataType::Utf8, false),
Field::new("log_text", DataType::Utf8, false),
];
Schema::new(fields)
}
fn to_record_batch(
items: Option<&RingBuffer<Event<ProcessingEngineLog>>>,
) -> Option<Result<RecordBatch, ArrowError>> {
let items = items?;
let capacity = items.len();
let mut event_time_builder = TimestampNanosecondBuilder::with_capacity(capacity);
let mut trigger_name_builder = StringBuilder::new();
let mut log_level_builder = StringBuilder::new();
let mut log_text_builder = StringBuilder::new();
for item in items.in_order() {
let event = &item.data;
event_time_builder.append_value(event.event_time.timestamp_nanos());
trigger_name_builder.append_value(&event.trigger_name);
log_level_builder.append_value(event.log_level.to_string());
log_text_builder.append_value(event.log_line.as_str());
}
let columns: Vec<ArrayRef> = vec![
Arc::new(event_time_builder.finish()),
Arc::new(trigger_name_builder.finish()),
Arc::new(log_level_builder.finish()),
Arc::new(log_text_builder.finish()),
];
Some(RecordBatch::try_new(Arc::new(Self::schema()), columns))
}
}

View File

@ -1,3 +1,4 @@
use crate::logging::{LogLevel, ProcessingEngineLog};
use crate::ExecutePluginError;
use anyhow::Context;
use arrow_array::types::Int32Type;
@ -10,11 +11,14 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use hashbrown::HashMap;
use humantime::format_duration;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_id::TableId;
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_sys_events::SysEventStore;
use influxdb3_wal::{FieldData, WriteBatch};
use iox_query_params::StatementParams;
use iox_time::TimeProvider;
use observability_deps::tracing::{error, info, warn};
use parking_lot::Mutex;
use pyo3::exceptions::{PyException, PyValueError};
@ -35,6 +39,31 @@ struct PyPluginCallApi {
db_schema: Arc<DatabaseSchema>,
query_executor: Arc<dyn QueryExecutor>,
return_state: Arc<Mutex<PluginReturnState>>,
logger: Option<ProcessingEngineLogger>,
}
#[derive(Debug, Clone)]
pub struct ProcessingEngineLogger {
sys_event_store: Arc<SysEventStore>,
trigger_name: Arc<str>,
}
impl ProcessingEngineLogger {
pub fn new(sys_event_store: Arc<SysEventStore>, trigger_name: impl Into<Arc<str>>) -> Self {
Self {
sys_event_store,
trigger_name: trigger_name.into(),
}
}
pub fn log(&self, log_level: LogLevel, log_line: impl Into<String>) {
self.sys_event_store.record(ProcessingEngineLog::new(
self.sys_event_store.time_provider().now(),
log_level,
Arc::clone(&self.trigger_name),
log_line.into(),
))
}
}
#[derive(Debug, Default)]
@ -79,6 +108,7 @@ impl PyPluginCallApi {
let line = self.log_args_to_string(args)?;
info!("processing engine: {}", line);
self.write_to_logger(LogLevel::Info, line.clone());
self.return_state.lock().log_lines.push(LogLine::Info(line));
Ok(())
}
@ -88,6 +118,7 @@ impl PyPluginCallApi {
let line = self.log_args_to_string(args)?;
warn!("processing engine: {}", line);
self.write_to_logger(LogLevel::Warn, line.clone());
self.return_state
.lock()
.log_lines
@ -100,6 +131,7 @@ impl PyPluginCallApi {
let line = self.log_args_to_string(args)?;
error!("processing engine: {}", line);
self.write_to_logger(LogLevel::Error, line.clone());
self.return_state
.lock()
.log_lines
@ -256,6 +288,14 @@ impl PyPluginCallApi {
}
}
impl PyPluginCallApi {
fn write_to_logger(&self, level: LogLevel, log_line: String) {
if let Some(logger) = &self.logger {
logger.log(level, log_line);
}
}
}
// constant for the process writes call site string
const PROCESS_WRITES_CALL_SITE: &str = "process_writes";
@ -396,9 +436,19 @@ pub fn execute_python_with_batch(
write_batch: &WriteBatch,
schema: Arc<DatabaseSchema>,
query_executor: Arc<dyn QueryExecutor>,
logger: Option<ProcessingEngineLogger>,
table_filter: Option<TableId>,
args: &Option<HashMap<String, String>>,
) -> Result<PluginReturnState, ExecutePluginError> {
let start_time = if let Some(logger) = &logger {
logger.log(
LogLevel::Info,
"starting execution of wal plugin.".to_string(),
);
Some(logger.sys_event_store.time_provider().now())
} else {
None
};
Python::with_gil(|py| {
// import the LineBuilder for use in the python code
let globals = PyDict::new(py);
@ -495,6 +545,7 @@ pub fn execute_python_with_batch(
let api = PyPluginCallApi {
db_schema: schema,
query_executor,
logger: logger.clone(),
return_state: Default::default(),
};
let return_state = Arc::clone(&api.return_state);
@ -522,6 +573,21 @@ pub fn execute_python_with_batch(
let empty_return_state = PluginReturnState::default();
let ret = std::mem::replace(&mut *return_state.lock(), empty_return_state);
if let Some(logger) = &logger {
let runtime = logger
.sys_event_store
.time_provider()
.now()
.checked_duration_since(start_time.unwrap());
logger.log(
LogLevel::Info,
format!(
"finished execution in {}",
format_duration(runtime.unwrap_or_default())
),
);
}
Ok(ret)
})
}
@ -531,8 +597,18 @@ pub fn execute_schedule_trigger(
schedule_time: DateTime<Utc>,
schema: Arc<DatabaseSchema>,
query_executor: Arc<dyn QueryExecutor>,
logger: Option<ProcessingEngineLogger>,
args: &Option<HashMap<String, String>>,
) -> Result<PluginReturnState, ExecutePluginError> {
let start_time = if let Some(logger) = &logger {
logger.log(
LogLevel::Info,
format!("starting execution with scheduled time {}", schedule_time),
);
Some(logger.sys_event_store.time_provider().now())
} else {
None
};
Python::with_gil(|py| {
// import the LineBuilder for use in the python code
let globals = PyDict::new(py);
@ -552,6 +628,7 @@ pub fn execute_schedule_trigger(
let api = PyPluginCallApi {
db_schema: schema,
query_executor,
logger: logger.clone(),
return_state: Default::default(),
};
let return_state = Arc::clone(&api.return_state);
@ -579,20 +656,44 @@ pub fn execute_schedule_trigger(
// swap with an empty return state to avoid cloning
let empty_return_state = PluginReturnState::default();
let ret = std::mem::replace(&mut *return_state.lock(), empty_return_state);
if let Some(logger) = &logger {
let runtime = logger
.sys_event_store
.time_provider()
.now()
.checked_duration_since(start_time.unwrap());
logger.log(
LogLevel::Info,
format!(
"finished execution in {:?}",
format_duration(runtime.unwrap_or_default())
),
);
}
Ok(ret)
})
}
#[allow(clippy::too_many_arguments)]
pub fn execute_request_trigger(
code: &str,
db_schema: Arc<DatabaseSchema>,
query_executor: Arc<dyn QueryExecutor>,
logger: Option<ProcessingEngineLogger>,
args: &Option<HashMap<String, String>>,
query_params: HashMap<String, String>,
request_headers: HashMap<String, String>,
request_body: Bytes,
) -> Result<(u16, HashMap<String, String>, String, PluginReturnState), ExecutePluginError> {
let start_time = if let Some(logger) = &logger {
logger.log(
LogLevel::Info,
"starting execution of request plugin.".to_string(),
);
Some(logger.sys_event_store.time_provider().now())
} else {
None
};
Python::with_gil(|py| {
// import the LineBuilder for use in the python code
let globals = PyDict::new(py);
@ -607,6 +708,7 @@ pub fn execute_request_trigger(
let api = PyPluginCallApi {
db_schema,
query_executor,
logger: logger.clone(),
return_state: Default::default(),
};
let return_state = Arc::clone(&api.return_state);
@ -670,6 +772,21 @@ pub fn execute_request_trigger(
let empty_return_state = PluginReturnState::default();
let ret = std::mem::replace(&mut *return_state.lock(), empty_return_state);
if let Some(logger) = &logger {
let runtime = logger
.sys_event_store
.time_provider()
.now()
.checked_duration_since(start_time.unwrap());
logger.log(
LogLevel::Info,
format!(
"finished execution in {}",
format_duration(runtime.unwrap_or_default())
),
)
}
Ok((
response_code,
response_headers,

View File

@ -42,6 +42,7 @@ influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_processing_engine = { path = "../influxdb3_processing_engine" }
influxdb3_types = { path = "../influxdb3_types"}
influxdb3_py_api = {path = "../influxdb3_py_api"}
influxdb3_wal = { path = "../influxdb3_wal"}
influxdb3_write = { path = "../influxdb3_write" }
iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" }

View File

@ -10,7 +10,7 @@ use iox_time::TimeProvider;
use tokio::net::TcpListener;
#[derive(Debug)]
pub struct ServerBuilder<W, Q, P, T, L> {
pub struct ServerBuilder<W, Q, P, T, L, E> {
common_state: CommonServerState,
time_provider: T,
max_request_size: usize,
@ -18,10 +18,20 @@ pub struct ServerBuilder<W, Q, P, T, L> {
query_executor: Q,
persister: P,
listener: L,
processing_engine: E,
authorizer: Arc<dyn Authorizer>,
}
impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister, NoTimeProvider, NoListener> {
impl
ServerBuilder<
NoWriteBuf,
NoQueryExec,
NoPersister,
NoTimeProvider,
NoListener,
NoProcessingEngine,
>
{
pub fn new(common_state: CommonServerState) -> Self {
Self {
common_state,
@ -32,11 +42,12 @@ impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister, NoTimeProvider, NoListe
persister: NoPersister,
listener: NoListener,
authorizer: Arc::new(DefaultAuthorizer),
processing_engine: NoProcessingEngine,
}
}
}
impl<W, Q, P, T, L> ServerBuilder<W, Q, P, T, L> {
impl<W, Q, P, T, L, E> ServerBuilder<W, Q, P, T, L, E> {
pub fn max_request_size(mut self, max_request_size: usize) -> Self {
self.max_request_size = max_request_size;
self
@ -69,8 +80,16 @@ pub struct NoListener;
#[derive(Debug)]
pub struct WithListener(TcpListener);
impl<Q, P, T, L> ServerBuilder<NoWriteBuf, Q, P, T, L> {
pub fn write_buffer(self, wb: Arc<dyn WriteBuffer>) -> ServerBuilder<WithWriteBuf, Q, P, T, L> {
#[derive(Debug)]
pub struct NoProcessingEngine;
#[derive(Debug)]
pub struct WithProcessingEngine(ProcessingEngineManagerImpl);
impl<Q, P, T, L, E> ServerBuilder<NoWriteBuf, Q, P, T, L, E> {
pub fn write_buffer(
self,
wb: Arc<dyn WriteBuffer>,
) -> ServerBuilder<WithWriteBuf, Q, P, T, L, E> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
@ -80,15 +99,16 @@ impl<Q, P, T, L> ServerBuilder<NoWriteBuf, Q, P, T, L> {
persister: self.persister,
listener: self.listener,
authorizer: self.authorizer,
processing_engine: self.processing_engine,
}
}
}
impl<W, P, T, L> ServerBuilder<W, NoQueryExec, P, T, L> {
impl<W, P, T, L, E> ServerBuilder<W, NoQueryExec, P, T, L, E> {
pub fn query_executor(
self,
qe: Arc<dyn QueryExecutor>,
) -> ServerBuilder<W, WithQueryExec, P, T, L> {
) -> ServerBuilder<W, WithQueryExec, P, T, L, E> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
@ -98,12 +118,13 @@ impl<W, P, T, L> ServerBuilder<W, NoQueryExec, P, T, L> {
persister: self.persister,
listener: self.listener,
authorizer: self.authorizer,
processing_engine: self.processing_engine,
}
}
}
impl<W, Q, T, L> ServerBuilder<W, Q, NoPersister, T, L> {
pub fn persister(self, p: Arc<Persister>) -> ServerBuilder<W, Q, WithPersister, T, L> {
impl<W, Q, T, L, E> ServerBuilder<W, Q, NoPersister, T, L, E> {
pub fn persister(self, p: Arc<Persister>) -> ServerBuilder<W, Q, WithPersister, T, L, E> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
@ -113,12 +134,13 @@ impl<W, Q, T, L> ServerBuilder<W, Q, NoPersister, T, L> {
persister: WithPersister(p),
listener: self.listener,
authorizer: self.authorizer,
processing_engine: self.processing_engine,
}
}
}
impl<W, Q, P, L> ServerBuilder<W, Q, P, NoTimeProvider, L> {
pub fn time_provider<T>(self, tp: Arc<T>) -> ServerBuilder<W, Q, P, WithTimeProvider<T>, L> {
impl<W, Q, P, L, E> ServerBuilder<W, Q, P, NoTimeProvider, L, E> {
pub fn time_provider<T>(self, tp: Arc<T>) -> ServerBuilder<W, Q, P, WithTimeProvider<T>, L, E> {
ServerBuilder {
common_state: self.common_state,
time_provider: WithTimeProvider(tp),
@ -128,12 +150,13 @@ impl<W, Q, P, L> ServerBuilder<W, Q, P, NoTimeProvider, L> {
persister: self.persister,
listener: self.listener,
authorizer: self.authorizer,
processing_engine: self.processing_engine,
}
}
}
impl<W, Q, P, T> ServerBuilder<W, Q, P, T, NoListener> {
pub fn tcp_listener(self, listener: TcpListener) -> ServerBuilder<W, Q, P, T, WithListener> {
impl<W, Q, P, T, E> ServerBuilder<W, Q, P, T, NoListener, E> {
pub fn tcp_listener(self, listener: TcpListener) -> ServerBuilder<W, Q, P, T, WithListener, E> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
@ -143,24 +166,44 @@ impl<W, Q, P, T> ServerBuilder<W, Q, P, T, NoListener> {
persister: self.persister,
listener: WithListener(listener),
authorizer: self.authorizer,
processing_engine: self.processing_engine,
}
}
}
impl<W, Q, P, T, L> ServerBuilder<W, Q, P, T, L, NoProcessingEngine> {
pub fn processing_engine(
self,
processing_engine: ProcessingEngineManagerImpl,
) -> ServerBuilder<W, Q, P, T, L, WithProcessingEngine> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
max_request_size: self.max_request_size,
write_buffer: self.write_buffer,
query_executor: self.query_executor,
persister: self.persister,
listener: self.listener,
authorizer: self.authorizer,
processing_engine: WithProcessingEngine(processing_engine),
}
}
}
impl<T: TimeProvider>
ServerBuilder<WithWriteBuf, WithQueryExec, WithPersister, WithTimeProvider<T>, WithListener>
ServerBuilder<
WithWriteBuf,
WithQueryExec,
WithPersister,
WithTimeProvider<T>,
WithListener,
WithProcessingEngine,
>
{
pub async fn build(self) -> Server<T> {
let persister = Arc::clone(&self.persister.0);
let authorizer = Arc::clone(&self.authorizer);
let processing_engine = Arc::new(ProcessingEngineManagerImpl::new(
self.common_state.processing_engine_environment.clone(),
self.write_buffer.0.catalog(),
Arc::clone(&self.write_buffer.0),
Arc::clone(&self.query_executor.0),
Arc::clone(&self.time_provider.0) as _,
self.write_buffer.0.wal(),
));
let processing_engine = Arc::new(self.processing_engine.0);
processing_engine
.start_triggers()

View File

@ -27,7 +27,6 @@ use authz::Authorizer;
use hyper::server::conn::AddrIncoming;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::persister::Persister;
use iox_time::TimeProvider;
@ -79,7 +78,6 @@ pub struct CommonServerState {
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,
trace_header_parser: TraceHeaderParser,
telemetry_store: Arc<TelemetryStore>,
processing_engine_environment: ProcessingEngineEnvironmentManager,
}
impl CommonServerState {
@ -88,14 +86,12 @@ impl CommonServerState {
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,
trace_header_parser: TraceHeaderParser,
telemetry_store: Arc<TelemetryStore>,
processing_engine_environment: ProcessingEngineEnvironmentManager,
) -> Result<Self> {
Ok(Self {
metrics,
trace_exporter,
trace_header_parser,
telemetry_store,
processing_engine_environment,
})
}
@ -223,6 +219,7 @@ mod tests {
use influxdb3_id::{DbId, TableId};
use influxdb3_processing_engine::environment::DisabledManager;
use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager;
use influxdb3_processing_engine::ProcessingEngineManagerImpl;
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::WalConfig;
@ -802,14 +799,9 @@ mod tests {
None,
trace_header_parser,
Arc::clone(&sample_telem_store),
ProcessingEngineEnvironmentManager {
plugin_dir: None,
virtual_env_location: None,
package_manager: Arc::new(DisabledManager),
},
)
.unwrap();
let query_executor = QueryExecutorImpl::new(CreateQueryExecutorArgs {
let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
catalog: write_buffer.catalog(),
write_buffer: Arc::clone(&write_buffer),
exec: Arc::clone(&exec),
@ -818,7 +810,7 @@ mod tests {
query_log_size: 10,
telemetry_store: Arc::clone(&sample_telem_store),
sys_events_store: Arc::clone(&sys_events_store),
});
}));
// bind to port 0 will assign a random available port:
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
@ -827,13 +819,28 @@ mod tests {
.expect("bind tcp address");
let addr = listener.local_addr().unwrap();
let processing_engine = ProcessingEngineManagerImpl::new(
ProcessingEngineEnvironmentManager {
plugin_dir: None,
virtual_env_location: None,
package_manager: Arc::new(DisabledManager),
},
write_buffer.catalog(),
Arc::clone(&write_buffer),
Arc::clone(&query_executor) as _,
Arc::clone(&time_provider) as _,
write_buffer.wal(),
sys_events_store,
);
let server = ServerBuilder::new(common_state)
.write_buffer(Arc::clone(&write_buffer))
.query_executor(Arc::new(query_executor))
.query_executor(query_executor)
.persister(persister)
.authorizer(Arc::new(DefaultAuthorizer))
.time_provider(Arc::clone(&time_provider))
.tcp_listener(listener)
.processing_engine(processing_engine)
.build()
.await;
let frontend_shutdown = CancellationToken::new();

View File

@ -21,7 +21,7 @@ use self::{last_caches::LastCachesTable, queries::QueriesTable};
mod distinct_caches;
mod last_caches;
mod parquet_files;
use crate::system_tables::python_call::ProcessingEngineTriggerTable;
use crate::system_tables::python_call::{ProcessingEngineLogsTable, ProcessingEngineTriggerTable};
mod python_call;
mod queries;
@ -36,6 +36,8 @@ pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files";
const PROCESSING_ENGINE_TRIGGERS_TABLE_NAME: &str = "processing_engine_triggers";
const PROCESSING_ENGINE_LOGS_TABLE_NAME: &str = "processing_engine_logs";
#[derive(Debug)]
pub(crate) enum SystemSchemaProvider {
AllSystemSchemaTables(AllSystemSchemaTablesProvider),
@ -90,7 +92,7 @@ impl AllSystemSchemaTablesProvider {
db_schema: Arc<DatabaseSchema>,
query_log: Arc<QueryLog>,
buffer: Arc<dyn WriteBuffer>,
_sys_events_store: Arc<SysEventStore>,
sys_events_store: Arc<SysEventStore>,
) -> Self {
let mut tables = HashMap::<&'static str, Arc<dyn TableProvider>>::new();
let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new(
@ -123,6 +125,10 @@ impl AllSystemSchemaTablesProvider {
))),
);
tables.insert(PARQUET_FILES_TABLE_NAME, parquet_files);
let logs_table = Arc::new(SystemTableProvider::new(Arc::new(
ProcessingEngineLogsTable::new(sys_events_store),
)));
tables.insert(PROCESSING_ENGINE_LOGS_TABLE_NAME, logs_table);
Self {
buffer,
db_schema,

View File

@ -3,6 +3,8 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::common::Result;
use datafusion::logical_expr::Expr;
use influxdb3_py_api::logging::ProcessingEngineLog;
use influxdb3_sys_events::{SysEventStore, ToRecordBatch};
use influxdb3_wal::TriggerDefinition;
use iox_system_tables::IoxSystemTable;
use std::sync::Arc;
@ -72,3 +74,37 @@ impl IoxSystemTable for ProcessingEngineTriggerTable {
Ok(RecordBatch::try_new(Arc::clone(&self.schema), columns)?)
}
}
#[derive(Debug)]
pub(super) struct ProcessingEngineLogsTable {
sys_event_store: Arc<SysEventStore>,
}
impl ProcessingEngineLogsTable {
pub fn new(sys_event_store: Arc<SysEventStore>) -> Self {
Self { sys_event_store }
}
}
#[async_trait]
impl IoxSystemTable for ProcessingEngineLogsTable {
fn schema(&self) -> SchemaRef {
Arc::new(ProcessingEngineLog::schema())
}
async fn scan(
&self,
_filters: Option<Vec<Expr>>,
_limit: Option<usize>,
) -> Result<RecordBatch> {
let Some(result) = self
.sys_event_store
.as_record_batch::<ProcessingEngineLog>()
else {
return Ok(RecordBatch::new_empty(Arc::new(
ProcessingEngineLog::schema(),
)));
};
Ok(result?)
}
}

View File

@ -48,6 +48,10 @@ impl SysEventStore {
}
}
pub fn time_provider(&self) -> &Arc<dyn TimeProvider> {
&self.time_provider
}
/// records an event by adding it to this event store
pub fn record<E>(&self, val: E)
where