Processing Engine Exploration

processing_engine_exploration
Jackson Newhouse 2024-11-14 14:20:37 -08:00
parent 43755c2d9c
commit 1b826c5fca
27 changed files with 586 additions and 50 deletions

107
Cargo.lock generated
View File

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "addr2line"
@ -161,6 +161,7 @@ dependencies = [
"arrow-schema",
"arrow-select",
"arrow-string",
"pyo3",
]
[[package]]
@ -343,6 +344,9 @@ dependencies = [
name = "arrow-schema"
version = "53.0.0"
source = "git+https://github.com/influxdata/arrow-rs.git?rev=e38787d2177f2ebfa481bfac62d208eef8ea82fb#e38787d2177f2ebfa481bfac62d208eef8ea82fb"
dependencies = [
"bitflags 2.6.0",
]
[[package]]
name = "arrow-select"
@ -2674,6 +2678,12 @@ dependencies = [
"serde",
]
[[package]]
name = "indoc"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5"
[[package]]
name = "influxdb-line-protocol"
version = "1.0.0"
@ -2783,6 +2793,7 @@ dependencies = [
"indexmap 2.6.0",
"influxdb-line-protocol",
"influxdb3_id",
"influxdb3_process_engine",
"influxdb3_wal",
"insta",
"iox_time",
@ -2832,6 +2843,7 @@ dependencies = [
"clap",
"csv",
"dotenvy",
"futures",
"humantime",
"influxdb3_client",
"influxdb3_process",
@ -2861,6 +2873,17 @@ dependencies = [
"uuid",
]
[[package]]
name = "influxdb3_process_engine"
version = "0.1.0"
dependencies = [
"arrow",
"datafusion",
"influxdb3_id",
"pyo3",
"serde",
]
[[package]]
name = "influxdb3_server"
version = "0.1.0"
@ -2892,6 +2915,7 @@ dependencies = [
"influxdb3_catalog",
"influxdb3_id",
"influxdb3_process",
"influxdb3_process_engine",
"influxdb3_sys_events",
"influxdb3_telemetry",
"influxdb3_wal",
@ -4322,6 +4346,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
[[package]]
name = "portable-atomic"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -4565,6 +4595,69 @@ dependencies = [
"prost 0.12.6",
]
[[package]]
name = "pyo3"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f402062616ab18202ae8319da13fa4279883a2b8a9d9f83f20dbade813ce1884"
dependencies = [
"cfg-if",
"indoc",
"libc",
"memoffset",
"once_cell",
"portable-atomic",
"pyo3-build-config",
"pyo3-ffi",
"pyo3-macros",
"unindent",
]
[[package]]
name = "pyo3-build-config"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b14b5775b5ff446dd1056212d778012cbe8a0fbffd368029fd9e25b514479c38"
dependencies = [
"once_cell",
"target-lexicon",
]
[[package]]
name = "pyo3-ffi"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ab5bcf04a2cdcbb50c7d6105de943f543f9ed92af55818fd17b660390fc8636"
dependencies = [
"libc",
"pyo3-build-config",
]
[[package]]
name = "pyo3-macros"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fd24d897903a9e6d80b968368a34e1525aeb719d568dba8b3d4bfa5dc67d453"
dependencies = [
"proc-macro2",
"pyo3-macros-backend",
"quote",
"syn 2.0.87",
]
[[package]]
name = "pyo3-macros-backend"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36c011a03ba1e50152b4b394b479826cad97e7a21eb52df179cd91ac411cbfbe"
dependencies = [
"heck 0.5.0",
"proc-macro2",
"pyo3-build-config",
"quote",
"syn 2.0.87",
]
[[package]]
name = "query_functions"
version = "0.1.0"
@ -5832,6 +5925,12 @@ dependencies = [
"libc",
]
[[package]]
name = "target-lexicon"
version = "0.12.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1"
[[package]]
name = "tempfile"
version = "3.14.0"
@ -6568,6 +6667,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "unindent"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce"
[[package]]
name = "untrusted"
version = "0.9.0"

View File

@ -7,7 +7,7 @@ members = [
"influxdb3_client",
"influxdb3_id",
"influxdb3_load_generator",
"influxdb3_process",
"influxdb3_process", "influxdb3_process_engine",
"influxdb3_server",
"influxdb3_telemetry",
"influxdb3_test_helpers",
@ -38,7 +38,7 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
anyhow = "1.0"
arrow = { version = "53.0.0", features = ["prettyprint", "chrono-tz"] }
arrow = { version = "53.0.0", features = ["prettyprint", "chrono-tz", "pyarrow"] }
arrow-array = "53.0.0"
arrow-buffer = "53.0.0"
arrow-csv = "53.0.0"
@ -181,6 +181,7 @@ inherits = "release"
codegen-units = 16
lto = false
incremental = true
debug = true
# This profile extends the `quick-release` profile with debuginfo turned on in order to
# produce more human friendly symbols for profiling tools

View File

@ -108,20 +108,21 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
assert_batches_sorted_eq!(
[
"+--------------+--------------------+---------------+------------+",
"| catalog_name | db_schema_name | table_name | table_type |",
"+--------------+--------------------+---------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | df_settings | VIEW |",
"| public | information_schema | schemata | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | information_schema | views | VIEW |",
"| public | iox | cpu | BASE TABLE |",
"| public | system | last_caches | BASE TABLE |",
"| public | system | meta_caches | BASE TABLE |",
"| public | system | parquet_files | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+--------------+--------------------+---------------+------------+",
"+--------------+--------------------+----------------------+------------+",
"| catalog_name | db_schema_name | table_name | table_type |",
"+--------------+--------------------+----------------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | df_settings | VIEW |",
"| public | information_schema | schemata | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | information_schema | views | VIEW |",
"| public | iox | cpu | BASE TABLE |",
"| public | system | last_caches | BASE TABLE |",
"| public | system | meta_caches | BASE TABLE |",
"| public | system | parquet_files | BASE TABLE |",
"| public | system | process_engine_calls | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+--------------+--------------------+----------------------+------------+",
],
&batches
);

View File

@ -15,6 +15,7 @@ iox_time.workspace = true
# Local deps
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_process_engine = { path = "../influxdb3_process_engine"}
# crates.io dependencies
arrow.workspace = true

View File

@ -1,10 +1,14 @@
//! Implementation of the Catalog that sits entirely in memory.
use crate::catalog::Error::TableNotFound;
use crate::catalog::Error::{
ProcessingEngineCallNotFound, ProcessingEngineUnimplemented, TableNotFound,
};
use arrow::array::RecordBatch;
use bimap::BiHashMap;
use hashbrown::HashMap;
use hashbrown::{HashMap, HashSet};
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_process_engine::python_call::{ProcessEngineTrigger, PythonCall, TriggerType};
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteTableDefinition, FieldAdditions, LastCacheDefinition,
LastCacheDelete, MetaCacheDefinition, MetaCacheDelete,
@ -73,6 +77,28 @@ pub enum Error {
table_name: String,
existing: String,
},
#[error(
"Cannot overwrite Process Engine Call {} in Database {}",
call_name,
database_name
)]
ProcessEngineCallExists {
database_name: String,
call_name: String,
},
#[error(
"Processing Engine Call {} not in DB schema for {}",
call_name,
database_name
)]
ProcessingEngineCallNotFound {
call_name: String,
database_name: String,
},
#[error("Processing Engine Unimplemented: {}", feature_description)]
ProcessingEngineUnimplemented { feature_description: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -321,6 +347,136 @@ impl Catalog {
}
}
pub fn insert_process_engine_call(
&self,
db: &str,
call_name: String,
code: String,
function_name: String,
) -> Result<()> {
let mut inner = self.inner.write();
let Some(db_id) = inner.db_map.get_by_right(db) else {
return Err(TableNotFound {
db_name: format!("{:?}", inner.db_map).into(),
table_name: "".into(),
});
};
let db = inner.databases.get(db_id).expect("db should exist");
if db.processing_engine_calls.contains_key(&call_name) {
return Err(Error::ProcessEngineCallExists {
database_name: db.name.to_string(),
call_name,
});
}
let mut new_db = db.as_ref().clone();
new_db.processing_engine_calls.insert(
call_name.clone(),
PythonCall::new(call_name, code, function_name),
);
inner.upsert_db(new_db);
Ok(())
}
pub fn insert_process_engine_trigger(
&self,
db_name: &str,
call_name: String,
source_table: String,
derived_table: String,
keys: Option<Vec<String>>,
) -> Result<()> {
let mut inner = self.inner.write();
// TODO: proper error
let db_id = inner.db_map.get_by_right(db_name).unwrap();
let db = inner.databases.get(db_id).expect("db should exist");
if db.table_id_and_definition(derived_table.as_str()).is_some() {
return Err(Error::ProcessEngineCallExists {
database_name: derived_table,
call_name,
});
}
let Some((source_table_id, table_definition)) =
db.table_id_and_definition(source_table.as_str())
else {
return Err(TableNotFound {
db_name: db_name.into(),
table_name: source_table.into(),
});
};
let Some(call) = db.processing_engine_calls.get(&call_name) else {
panic!("create error.")
};
let output_keys = match &keys {
Some(inner_keys) => Some(inner_keys.iter().map(|key| key.as_str()).collect()),
None => table_definition.schema.series_key(),
};
let key_set = output_keys
.clone()
.unwrap_or_default()
.iter()
.map(|x| *x)
.collect::<HashSet<_>>();
let output_schema = call
.call(&RecordBatch::new_empty(table_definition.schema.as_arrow()))
.unwrap()
.schema();
let table_id = TableId::new();
let mut key_column_ids = vec![];
// This is a new table, so build up its columns:
let mut columns = Vec::new();
for field in output_schema.fields() {
let col_id = ColumnId::new();
let column_type =
if let Some(influx_column_type) = field.metadata().get("iox::column::type") {
InfluxColumnType::try_from(influx_column_type.as_str()).unwrap()
} else if key_set.contains(field.name().as_str()) {
key_column_ids.push(col_id);
InfluxColumnType::Tag
} else if field.name() == "time" {
InfluxColumnType::Timestamp
} else {
InfluxColumnType::Field(field.data_type().clone().try_into().unwrap())
};
columns.push((col_id, field.name().as_str().into(), column_type));
}
let key_column_ids = if output_keys.is_none() {
None
} else {
Some(key_column_ids)
};
let new_definition = TableDefinition::new(
table_id,
derived_table.as_str().into(),
columns,
key_column_ids,
)?;
let mut new_db = db.as_ref().clone();
new_db.insert_table(table_id, Arc::new(new_definition));
let trigger = ProcessEngineTrigger {
source_table: source_table_id,
trigger_table: table_id,
trigger_name: call_name,
trigger_type: TriggerType::OnRead,
};
new_db
.processing_engine_write_triggers
.entry(source_table_id)
.or_default()
.insert(table_id, trigger.clone());
new_db
.processing_engine_source_table
.insert(table_id, trigger);
inner.upsert_db(new_db);
Ok(())
}
pub fn inner(&self) -> &RwLock<InnerCatalog> {
&self.inner
}
@ -480,6 +636,10 @@ pub struct DatabaseSchema {
/// The database is a map of tables
pub tables: SerdeVecMap<TableId, Arc<TableDefinition>>,
pub table_map: BiHashMap<TableId, Arc<str>>,
pub processing_engine_calls: HashMap<String, PythonCall>,
// Map from source table to output table for processing engine triggers.
pub processing_engine_write_triggers: HashMap<TableId, HashMap<TableId, ProcessEngineTrigger>>,
pub processing_engine_source_table: HashMap<TableId, ProcessEngineTrigger>,
pub deleted: bool,
}
@ -490,6 +650,9 @@ impl DatabaseSchema {
name,
tables: Default::default(),
table_map: BiHashMap::new(),
processing_engine_calls: HashMap::new(),
processing_engine_write_triggers: HashMap::new(),
processing_engine_source_table: HashMap::new(),
deleted: false,
}
}
@ -531,7 +694,7 @@ impl DatabaseSchema {
.get(&field_additions.table_id)
.or_else(|| db_schema.tables.get(&field_additions.table_id))
else {
return Err(Error::TableNotFound {
return Err(TableNotFound {
db_name: Arc::clone(&field_additions.database_name),
table_name: Arc::clone(&field_additions.table_name),
});
@ -539,6 +702,37 @@ impl DatabaseSchema {
if let Some(new_table) =
new_or_existing_table.new_if_field_additions_add_fields(field_additions)?
{
// this table's schema was updated. Downstream processing engines may need to be modified.
if let Some(downstream_triggers) = db_schema
.processing_engine_write_triggers
.get(&new_table.table_id)
{
for (downstream_table_id, trigger) in downstream_triggers {
let downstream_table =
db_schema.tables.get(downstream_table_id).unwrap();
let Some(trigger) =
db_schema.processing_engine_calls.get(&trigger.trigger_name)
else {
return Err(ProcessingEngineCallNotFound {
call_name: trigger.trigger_name.to_string(),
database_name: db_schema.name.to_string(),
});
};
// TODO: Handle python errors
let new_schema = trigger
.call(&RecordBatch::new_empty(new_table.schema.as_arrow()))
.unwrap()
.schema();
// TODO: Determine what semantics we want when new fields are added to source table.
if new_schema != downstream_table.schema.as_arrow() {
return Err(ProcessingEngineUnimplemented {
feature_description: "output schema update not supported"
.to_string(),
});
}
}
}
updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table));
}
}
@ -641,6 +835,9 @@ impl DatabaseSchema {
name: schema_name,
tables: updated_or_new_tables,
table_map: new_table_maps,
processing_engine_calls: HashMap::new(),
processing_engine_write_triggers: HashMap::new(),
processing_engine_source_table: HashMap::new(),
deleted: schema_deleted,
}))
}
@ -1201,6 +1398,9 @@ mod tests {
map.insert(TableId::from(2), "test_table_2".into());
map
},
processing_engine_calls: Default::default(),
processing_engine_write_triggers: HashMap::new(),
processing_engine_source_table: HashMap::new(),
deleted: false,
};
use InfluxColumnType::*;
@ -1409,6 +1609,9 @@ mod tests {
name: "test".into(),
tables: SerdeVecMap::new(),
table_map: BiHashMap::new(),
processing_engine_calls: Default::default(),
processing_engine_write_triggers: HashMap::new(),
processing_engine_source_table: HashMap::new(),
deleted: false,
};
database.tables.insert(
@ -1466,6 +1669,9 @@ mod tests {
map.insert(TableId::from(1), "test_table_1".into());
map
},
processing_engine_calls: Default::default(),
processing_engine_write_triggers: HashMap::new(),
processing_engine_source_table: HashMap::new(),
deleted: false,
};
use InfluxColumnType::*;
@ -1525,6 +1731,9 @@ mod tests {
map.insert(TableId::from(0), "test".into());
map
},
processing_engine_calls: Default::default(),
processing_engine_write_triggers: HashMap::new(),
processing_engine_source_table: HashMap::new(),
deleted: false,
};
use InfluxColumnType::*;

View File

@ -3,10 +3,12 @@ use crate::catalog::DatabaseSchema;
use crate::catalog::TableDefinition;
use arrow::datatypes::DataType as ArrowDataType;
use bimap::BiHashMap;
use hashbrown::HashMap;
use influxdb3_id::ColumnId;
use influxdb3_id::DbId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_process_engine::python_call::{ProcessEngineTrigger, PythonCall};
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
@ -38,6 +40,8 @@ struct DatabaseSnapshot {
id: DbId,
name: Arc<str>,
tables: SerdeVecMap<TableId, TableSnapshot>,
#[serde(default)]
processing_engine_calls: SerdeVecMap<String, ProcessingEngineCallSnapshot>,
deleted: bool,
}
@ -51,6 +55,11 @@ impl From<&DatabaseSchema> for DatabaseSnapshot {
.iter()
.map(|(table_id, table_def)| (*table_id, table_def.as_ref().into()))
.collect(),
processing_engine_calls: db
.processing_engine_calls
.iter()
.map(|(name, call)| (name.clone(), call.into()))
.collect(),
deleted: db.deleted,
}
}
@ -67,11 +76,19 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
(id, Arc::new(table.into()))
})
.collect();
let python_calls = snap
.processing_engine_calls
.into_iter()
.map(|(name, call)| (name, call.into()))
.collect();
Self {
id: snap.id,
name: snap.name,
tables,
table_map,
processing_engine_calls: python_calls,
processing_engine_write_triggers: HashMap::new(),
processing_engine_source_table: HashMap::new(),
deleted: snap.deleted,
}
}
@ -114,6 +131,13 @@ struct TableSnapshot {
deleted: bool,
}
#[derive(Debug, Serialize, Deserialize)]
struct ProcessingEngineCallSnapshot {
pub call_name: String,
pub code: String,
pub function_name: String,
}
/// Representation of Arrow's `DataType` for table snapshots.
///
/// Uses `#[non_exhaustive]` with the assumption that variants will be added as we support
@ -346,6 +370,26 @@ impl From<TableSnapshot> for TableDefinition {
}
}
impl From<&PythonCall> for ProcessingEngineCallSnapshot {
fn from(call: &PythonCall) -> Self {
Self {
call_name: call.call_name.to_string(),
code: call.code.to_string(),
function_name: call.function_name.to_string(),
}
}
}
impl From<ProcessingEngineCallSnapshot> for PythonCall {
fn from(call: ProcessingEngineCallSnapshot) -> Self {
Self {
call_name: call.call_name.to_string(),
code: call.code.to_string(),
function_name: call.function_name.to_string(),
}
}
}
// NOTE: Ideally, we will remove the need for the InfluxFieldType, and be able
// to use Arrow's DataType directly. If that happens, this conversion will need
// to support the entirety of Arrow's DataType enum, which is why [`DataType`]

View File

@ -2,6 +2,7 @@
source: influxdb3_catalog/src/catalog.rs
description: catalog serialization to help catch breaking changes
expression: catalog
snapshot_kind: text
---
{
"databases": [
@ -252,6 +253,7 @@ expression: catalog
}
]
],
"processing_engine_calls": [],
"deleted": false
}
]

View File

@ -2,6 +2,7 @@
source: influxdb3_catalog/src/catalog.rs
description: catalog serialization to help catch breaking changes
expression: catalog
snapshot_kind: text
---
{
"databases": [
@ -108,6 +109,7 @@ expression: catalog
}
]
],
"processing_engine_calls": [],
"deleted": false
}
]

View File

@ -2,6 +2,7 @@
source: influxdb3_catalog/src/catalog.rs
description: catalog serialization to help catch breaking changes
expression: catalog
snapshot_kind: text
---
{
"databases": [
@ -97,6 +98,7 @@ expression: catalog
}
]
],
"processing_engine_calls": [],
"deleted": false
}
]

View File

@ -31,6 +31,7 @@ sysinfo.workspace = true
tokio.workspace = true
thiserror.workspace = true
url.workspace = true
futures = "0.3"
[lints]
workspace = true

View File

@ -4,6 +4,9 @@ use crate::specification::DataSpec;
use anyhow::Context;
use chrono::{DateTime, Local};
use clap::Parser;
use futures::future::MaybeDone::Future;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use influxdb3_client::{Client, Precision};
use std::ops::Add;
use std::path::PathBuf;
@ -185,7 +188,7 @@ pub(crate) async fn run_write_load(
};
// spawn tokio tasks for each writer
let mut tasks = Vec::new();
let mut tasks = FuturesUnordered::new();
for generator in generators {
let reporter = Arc::clone(&reporter);
let sampling_interval = sampling_interval.into();
@ -202,8 +205,8 @@ pub(crate) async fn run_write_load(
}
// wait for all tasks to complete
for task in tasks {
task.await?;
while let Some(result) = tasks.next().await {
result?;
}
println!("all writers finished");
@ -334,6 +337,7 @@ async fn write_sample(
.body(buffer)
.send()
.await;
eprintln!("wrote in {:?}", start_request.elapsed());
let response_time = start_request.elapsed().as_millis() as u64;
// log the report

View File

@ -33,7 +33,7 @@ pub(crate) fn spec() -> BuiltInSpec {
},
],
copies: None,
lines_per_sample: Some(10_000),
lines_per_sample: Some(20_000),
}],
};

View File

@ -0,0 +1,22 @@
[package]
name = "influxdb3_process_engine"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
datafusion.workspace = true
arrow.workspace = true
# Local Deps
influxdb3_id = { path = "../influxdb3_id" }
serde = { version = "1.0.214", features = ["derive"] }
[dependencies.pyo3]
version = "0.22.6"
# this is necessary to automatically initialize the Python interpreter
features = ["auto-initialize", "gil-refs"]
[lints]
workspace = true

View File

@ -0,0 +1 @@
pub mod python_call;

View File

@ -0,0 +1,49 @@
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::pyarrow::PyArrowType;
use influxdb3_id::TableId;
use pyo3::prelude::*;
use pyo3::{PyResult, Python};
use serde::Deserialize;
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct PythonCall {
pub call_name: String,
pub code: String,
pub function_name: String,
}
impl PythonCall {
pub fn new(call_name: String, code: String, function_name: String) -> Self {
Self {
call_name,
code,
function_name,
}
}
}
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct ProcessEngineTrigger {
pub source_table: TableId,
pub trigger_table: TableId,
pub trigger_name: String,
pub trigger_type: TriggerType,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Deserialize)]
pub enum TriggerType {
OnRead,
}
impl PythonCall {
pub fn call(&self, input_batch: &RecordBatch) -> PyResult<RecordBatch> {
Python::with_gil(|py| {
py.run_bound(self.code.as_str(), None, None)?;
let py_batch: PyArrowType<_> = PyArrowType(input_batch.clone());
let py_func = py.eval_bound(self.function_name.as_str(), None, None)?;
let result = py_func.call1((py_batch,))?;
let updated_batch: PyArrowType<RecordBatch> = result.extract()?;
Ok(updated_batch.0)
})
}
}

View File

@ -39,6 +39,7 @@ influxdb3_write = { path = "../influxdb3_write" }
iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
influxdb3_process_engine = { path = "../influxdb3_process_engine" }
# crates.io Dependencies
anyhow.workspace = true

View File

@ -1,5 +1,6 @@
use std::{any::Any, collections::HashMap, sync::Arc};
use self::{last_caches::LastCachesTable, queries::QueriesTable};
use datafusion::{catalog::SchemaProvider, datasource::TableProvider, error::DataFusionError};
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_sys_events::SysEventStore;
@ -10,13 +11,14 @@ use meta_caches::MetaCachesTable;
use parquet_files::ParquetFilesTable;
use tonic::async_trait;
use self::{last_caches::LastCachesTable, queries::QueriesTable};
mod last_caches;
mod meta_caches;
mod parquet_files;
use crate::system_tables::python_call::PythonCallTable;
#[cfg(test)]
pub(crate) use parquet_files::table_name_predicate_error;
mod python_call;
mod queries;
pub const SYSTEM_SCHEMA_NAME: &str = "system";
@ -26,6 +28,8 @@ const META_CACHES_TABLE_NAME: &str = "meta_caches";
const PARQUET_FILES_TABLE_NAME: &str = "parquet_files";
const QUERIES_TABLE_NAME: &str = "queries";
const PROCESS_ENGINE_CALLS_TABLE_NAME: &str = "process_engine_calls";
pub(crate) struct SystemSchemaProvider {
tables: HashMap<&'static str, Arc<dyn TableProvider>>,
}
@ -67,6 +71,16 @@ impl SystemSchemaProvider {
db_schema.id,
buffer,
))));
tables.insert(
PROCESS_ENGINE_CALLS_TABLE_NAME,
Arc::new(SystemTableProvider::new(Arc::new(PythonCallTable::new(
db_schema
.processing_engine_calls
.iter()
.map(|(_name, call)| call.clone())
.collect(),
)))),
);
tables.insert(PARQUET_FILES_TABLE_NAME, parquet_files);
Self { tables }
}

View File

@ -0,0 +1,67 @@
use arrow_array::{ArrayRef, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::common::Result;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::Expr;
use influxdb3_process_engine::python_call::PythonCall;
use iox_system_tables::IoxSystemTable;
use std::sync::Arc;
pub(super) struct PythonCallTable {
schema: SchemaRef,
python_calls: Vec<PythonCall>,
}
fn python_call_schema() -> SchemaRef {
let columns = vec![
Field::new("call_name", DataType::Utf8, false),
Field::new("function_name", DataType::Utf8, false),
Field::new("code", DataType::Utf8, false),
];
Schema::new(columns).into()
}
impl PythonCallTable {
pub fn new(python_calls: Vec<PythonCall>) -> Self {
Self {
schema: python_call_schema(),
python_calls,
}
}
}
#[async_trait]
impl IoxSystemTable for PythonCallTable {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
async fn scan(
&self,
_filters: Option<Vec<Expr>>,
_limit: Option<usize>,
) -> Result<RecordBatch, DataFusionError> {
let schema = self.schema();
let columns: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(
self.python_calls
.iter()
.map(|call| Some(call.call_name.clone()))
.collect::<StringArray>(),
)),
Arc::new(
self.python_calls
.iter()
.map(|p| Some(p.function_name.clone()))
.collect::<StringArray>(),
),
Arc::new(
self.python_calls
.iter()
.map(|p| Some(p.code.clone()))
.collect::<StringArray>(),
),
];
Ok(RecordBatch::try_new(schema, columns)?)
}
}

View File

@ -806,7 +806,7 @@ pub fn background_wal_flush<W: Wal>(
let snapshot_wal = Arc::clone(&wal);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert!(snapshot_info.snapshot_details == snapshot_details);
assert_eq!(snapshot_info.snapshot_details, snapshot_details);
snapshot_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)

View File

@ -10,7 +10,7 @@ use futures_util::stream::StreamExt;
use hashbrown::HashMap;
use object_store::path::{Path, PathPart};
use object_store::{ObjectStore, PutPayload};
use observability_deps::tracing::{debug, error, info};
use observability_deps::tracing::{debug, error, info, warn};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
@ -179,11 +179,14 @@ impl WalObjectStore {
/// the operation is durable in the configured object store.
async fn write_ops(&self, ops: Vec<WalOp>) -> crate::Result<(), crate::Error> {
let (tx, rx) = oneshot::channel();
let pre_lock_time = tokio::time::Instant::now();
info!("asking for lock.");
self.flush_buffer
.lock()
.await
.wal_buffer
.buffer_ops_with_response(ops, tx)?;
warn!("took {:?} to get lock", pre_lock_time.elapsed());
match rx.await {
Ok(WriteResult::Success(())) => Ok(()),
@ -429,7 +432,7 @@ impl FlushBuffer {
Vec<oneshot::Sender<WriteResult>>,
Option<(SnapshotInfo, OwnedSemaphorePermit)>,
) {
// convert into wal contents and resopnses and capture if a snapshot should be taken
// convert into wal contents and responses and capture if a snapshot should be taken
let (mut wal_contents, responses) = self.flush_buffer_with_responses();
self.snapshot_tracker.add_wal_period(WalPeriod {
wal_file_number: wal_contents.wal_file_number,

View File

@ -3330,6 +3330,9 @@ mod tests {
map.insert(TableId::from(1), "test_table_2".into());
map
},
processing_engine_calls: Default::default(),
processing_engine_write_triggers: Default::default(),
processing_engine_source_table: Default::default(),
deleted: false,
};
let table_id = TableId::from(0);

View File

@ -168,6 +168,10 @@ pub trait LastCacheManager: Debug + Send + Sync + 'static {
) -> Result<(), write_buffer::Error>;
}
/// [`ProcessEngineManager`] is used to manage external hooks that users have implemented.
#[async_trait::async_trait]
pub trait ProcessEngineManager: Debug + Send + Sync + 'static {}
/// A single write request can have many lines in it. A writer can request to accept all lines that are valid, while
/// returning an error for any invalid lines. This is the error information for a single invalid line.
#[derive(Debug, Serialize)]

View File

@ -99,6 +99,10 @@ impl QueryableBuffer {
_projection: Option<&Vec<usize>>,
_ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
info!(
"getting table chunks for table {:?} with filters {:?}",
table_name, filters
);
let (table_id, table_def) = db_schema
.table_id_and_definition(table_name)
.ok_or_else(|| DataFusionError::Execution(format!("table {} not found", table_name)))?;
@ -186,8 +190,10 @@ impl QueryableBuffer {
let table_def = db_schema
.table_definition_by_id(table_id)
.expect("table exists");
info!("table buffer before snapshot: {:?}", table_buffer);
let snapshot_chunks =
table_buffer.snapshot(table_def, snapshot_details.end_time_marker);
info!("table buffer after snapshot: {:?}", table_buffer);
for chunk in snapshot_chunks {
let table_name =

View File

@ -1,6 +1,7 @@
---
source: influxdb3_write/src/write_buffer/mod.rs
expression: catalog_json
snapshot_kind: text
---
{
"databases": [
@ -10,6 +11,7 @@ expression: catalog_json
"deleted": false,
"id": 0,
"name": "db",
"processing_engine_calls": [],
"tables": [
[
0,

View File

@ -1,6 +1,7 @@
---
source: influxdb3_write/src/write_buffer/mod.rs
expression: catalog_json
snapshot_kind: text
---
{
"databases": [
@ -10,6 +11,7 @@ expression: catalog_json
"deleted": false,
"id": 0,
"name": "db",
"processing_engine_calls": [],
"tables": [
[
0,

View File

@ -1,6 +1,7 @@
---
source: influxdb3_write/src/write_buffer/mod.rs
expression: catalog_json
snapshot_kind: text
---
{
"databases": [
@ -10,6 +11,7 @@ expression: catalog_json
"deleted": false,
"id": 0,
"name": "db",
"processing_engine_calls": [],
"tables": [
[
0,

View File

@ -13,7 +13,7 @@ use hashbrown::HashMap;
use influxdb3_catalog::catalog::TableDefinition;
use influxdb3_id::ColumnId;
use influxdb3_wal::{FieldData, Row};
use observability_deps::tracing::{debug, error, info};
use observability_deps::tracing::{debug, error};
use schema::sort::SortKey;
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder};
use std::collections::btree_map::Entry;
@ -166,8 +166,8 @@ impl TableBuffer {
let mut size = size_of::<Self>();
for c in self.chunk_time_to_chunks.values() {
for biulder in c.data.values() {
size += size_of::<ColumnId>() + size_of::<String>() + biulder.size();
for builder in c.data.values() {
size += size_of::<ColumnId>() + size_of::<String>() + builder.size();
}
size += c.index.size();
@ -181,7 +181,6 @@ impl TableBuffer {
table_def: Arc<TableDefinition>,
older_than_chunk_time: i64,
) -> Vec<SnapshotChunk> {
info!(%older_than_chunk_time, "Snapshotting table buffer");
let keys_to_remove = self
.chunk_time_to_chunks
.keys()
@ -269,10 +268,7 @@ impl MutableTableChunk {
debug!("Creating new timestamp builder");
let mut time_builder = TimestampNanosecondBuilder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
debug!("Appending null for timestamp");
time_builder.append_null();
}
time_builder.append_nulls(row_index + self.row_count);
Builder::Time(time_builder)
});
if let Builder::Time(b) = b {
@ -333,9 +329,7 @@ impl MutableTableChunk {
let b = self.data.entry(f.id).or_insert_with(|| {
let mut int_builder = Int64Builder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
int_builder.append_null();
}
int_builder.append_nulls(row_index + self.row_count);
Builder::I64(int_builder)
});
if let Builder::I64(b) = b {
@ -348,9 +342,7 @@ impl MutableTableChunk {
let b = self.data.entry(f.id).or_insert_with(|| {
let mut uint_builder = UInt64Builder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
uint_builder.append_null();
}
uint_builder.append_nulls(row_index + self.row_count);
Builder::U64(uint_builder)
});
if let Builder::U64(b) = b {
@ -363,9 +355,7 @@ impl MutableTableChunk {
let b = self.data.entry(f.id).or_insert_with(|| {
let mut float_builder = Float64Builder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
float_builder.append_null();
}
float_builder.append_nulls(row_index + self.row_count);
Builder::F64(float_builder)
});
if let Builder::F64(b) = b {
@ -378,9 +368,7 @@ impl MutableTableChunk {
let b = self.data.entry(f.id).or_insert_with(|| {
let mut bool_builder = BooleanBuilder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
bool_builder.append_null();
}
bool_builder.append_nulls(row_index + self.row_count);
Builder::Bool(bool_builder)
});
if let Builder::Bool(b) = b {
@ -1028,7 +1016,7 @@ mod tests {
table_buffer.buffer_chunk(0, rows);
let size = table_buffer.computed_size();
assert_eq!(size, 18119);
assert_eq!(size, 18120);
}
#[test]