diff --git a/server/src/db/system_tables.rs b/server/src/db/system_tables.rs index c950eb9c1e..b2d9726537 100644 --- a/server/src/db/system_tables.rs +++ b/server/src/db/system_tables.rs @@ -26,7 +26,7 @@ use data_types::{ }; use datafusion::{ catalog::schema::SchemaProvider, - datasource::{datasource::Statistics, MemTable, TableProvider}, + datasource::{datasource::Statistics, TableProvider}, error::{DataFusionError, Result as DataFusionResult}, physical_plan::{memory::MemoryExec, ExecutionPlan}, }; @@ -43,24 +43,41 @@ const COLUMNS: &str = "columns"; const CHUNK_COLUMNS: &str = "chunk_columns"; const OPERATIONS: &str = "operations"; -#[derive(Debug)] pub struct SystemSchemaProvider { - db_name: String, - catalog: Arc, - jobs: Arc, + chunks: Arc, + columns: Arc, + chunk_columns: Arc, + operations: Arc, +} - chunks: Arc, +impl std::fmt::Debug for SystemSchemaProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SystemSchemaProvider") + .field("fields", &"...") + .finish() + } } impl SystemSchemaProvider { pub fn new(db_name: impl Into, catalog: Arc, jobs: Arc) -> Self { let db_name = db_name.into(); - let chunks = Arc::new(ChunksProvider::new(Arc::clone(&catalog))); + let chunks = Arc::new(SystemTableProvider { + inner: ChunksTable::new(Arc::clone(&catalog)), + }); + let columns = Arc::new(SystemTableProvider { + inner: ColumnsTable::new(Arc::clone(&catalog)), + }); + let chunk_columns = Arc::new(SystemTableProvider { + inner: ChunkColumnsTable::new(catalog), + }); + let operations = Arc::new(SystemTableProvider { + inner: OperationsTable::new(db_name, jobs), + }); Self { - db_name, - catalog, - jobs, chunks, + columns, + chunk_columns, + operations, } } } @@ -80,32 +97,58 @@ impl SchemaProvider for SystemSchemaProvider { } fn table(&self, name: &str) -> Option> { - // TODO: Use of a MemTable potentially results in materializing redundant data - if name == CHUNKS { - return Some(Arc::clone(&self.chunks) as Arc); + match name { + CHUNKS => Some(Arc::clone(&self.chunks)), + COLUMNS => Some(Arc::clone(&self.columns)), + CHUNK_COLUMNS => Some(Arc::clone(&self.chunk_columns)), + OPERATIONS => Some(Arc::clone(&self.operations)), + _ => None, } + } +} - let batch = match name { - CHUNK_COLUMNS => assemble_chunk_columns( - self.catalog.unaggregated_partition_summaries(), - self.catalog.detailed_chunk_summaries(), - ) - .log_if_error("chunk_columns table") - .ok()?, - COLUMNS => from_partition_summaries(self.catalog.partition_summaries()) - .log_if_error("chunks table") - .ok()?, - OPERATIONS => from_task_trackers(&self.db_name, self.jobs.tracked()) - .log_if_error("operations table") - .ok()?, - _ => return None, - }; +/// The minimal thing that a system table needs to implement +trait IoxSystemTable: Send + Sync { + /// Produce the schema from this system table + fn schema(&self) -> SchemaRef; - let table = MemTable::try_new(batch.schema(), vec![vec![batch]]) - .log_if_error("constructing chunks system table") - .ok()?; + /// Get the contents of the system table as a single RecordBatch + fn batch(&self) -> Result; +} - Some(Arc::::new(table)) +/// Adapter that makes any `IoxSystemTable` a DataFusion `TableProvider` +struct SystemTableProvider +where + T: IoxSystemTable, +{ + inner: T, +} + +impl TableProvider for SystemTableProvider +where + T: IoxSystemTable + 'static, +{ + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn scan( + &self, + projection: &Option>, + _batch_size: usize, + // It would be cool to push projection and limit down + _filters: &[datafusion::logical_plan::Expr], + _limit: Option, + ) -> DataFusionResult> { + scan_batch(self.inner.batch()?, self.schema(), projection.as_ref()) + } + + fn statistics(&self) -> Statistics { + Statistics::default() } } @@ -116,22 +159,28 @@ fn time_to_ts(time: Option>) -> Option { /// Implementation of system.chunks table #[derive(Debug)] -struct ChunksProvider { +struct ChunksTable { schema: SchemaRef, catalog: Arc, } -impl ChunksProvider { +impl ChunksTable { fn new(catalog: Arc) -> Self { Self { schema: chunk_summaries_schema(), catalog, } } +} - fn chunk_summaries(&self) -> Result { - let chunks = self.catalog.chunk_summaries(); - from_chunk_summaries(self.schema(), chunks) +impl IoxSystemTable for ChunksTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn batch(&self) -> Result { + from_chunk_summaries(self.schema(), self.catalog.chunk_summaries()) + .log_if_error("system.chunks table") } } @@ -184,33 +233,46 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< ) } -impl TableProvider for ChunksProvider { - fn as_any(&self) -> &dyn Any { - self - } +/// Implementation of `system.columns` system table +#[derive(Debug)] +struct ColumnsTable { + schema: SchemaRef, + catalog: Arc, +} - fn schema(&self) -> arrow::datatypes::SchemaRef { - Arc::clone(&self.schema) - } - - fn scan( - &self, - projection: &Option>, - _batch_size: usize, - // It would be cool to push projection and limit down - _filters: &[datafusion::logical_plan::Expr], - _limit: Option, - ) -> DataFusionResult> { - let batch = self.chunk_summaries().log_if_error("chunks table")?; - scan_batch(batch, self.schema(), projection.as_ref()) - } - - fn statistics(&self) -> Statistics { - Statistics::default() +impl ColumnsTable { + fn new(catalog: Arc) -> Self { + Self { + schema: partition_summaries_schema(), + catalog, + } } } -fn from_partition_summaries(partitions: Vec) -> Result { +impl IoxSystemTable for ColumnsTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + fn batch(&self) -> Result { + from_partition_summaries(self.schema(), self.catalog.partition_summaries()) + .log_if_error("system.columns table") + } +} + +fn partition_summaries_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("partition_key", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("column_name", DataType::Utf8, false), + Field::new("column_type", DataType::Utf8, false), + Field::new("influxdb_type", DataType::Utf8, true), + ])) +} + +fn from_partition_summaries( + schema: SchemaRef, + partitions: Vec, +) -> Result { // Assume each partition has roughly 5 tables with 5 columns let row_estimate = partitions.len() * 25; @@ -239,22 +301,65 @@ fn from_partition_summaries(partitions: Vec) -> Result, +} + +impl ChunkColumnsTable { + fn new(catalog: Arc) -> Self { + Self { + schema: chunk_columns_schema(), + catalog, + } + } +} + +impl IoxSystemTable for ChunkColumnsTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn batch(&self) -> Result { + assemble_chunk_columns( + self.schema(), + self.catalog.unaggregated_partition_summaries(), + self.catalog.detailed_chunk_summaries(), + ) + .log_if_error("system.column_chunks table") + } +} + +fn chunk_columns_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("partition_key", DataType::Utf8, false), + Field::new("chunk_id", DataType::UInt32, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("column_name", DataType::Utf8, false), + Field::new("storage", DataType::Utf8, false), + Field::new("count", DataType::UInt64, true), + Field::new("min_value", DataType::Utf8, true), + Field::new("max_value", DataType::Utf8, true), + Field::new("estimated_bytes", DataType::UInt64, true), + ])) } fn assemble_chunk_columns( + schema: SchemaRef, partitions: Vec, chunk_summaries: Vec, ) -> Result { @@ -358,24 +463,69 @@ fn assemble_chunk_columns( } } - RecordBatch::try_from_iter_with_nullable(vec![ - ( - "partition_key", + RecordBatch::try_new( + schema, + vec![ Arc::new(partition_key.finish()) as ArrayRef, - false, - ), - ("chunk_id", Arc::new(chunk_id.finish()), false), - ("table_name", Arc::new(table_name.finish()), false), - ("column_name", Arc::new(column_name.finish()), false), - ("storage", Arc::new(storage.finish()), false), - ("count", Arc::new(count.finish()), false), - ("min_value", Arc::new(min_values.finish()), true), - ("max_value", Arc::new(max_values.finish()), true), - ("estimated_bytes", Arc::new(estimated_bytes.finish()), true), - ]) + Arc::new(chunk_id.finish()), + Arc::new(table_name.finish()), + Arc::new(column_name.finish()), + Arc::new(storage.finish()), + Arc::new(count.finish()), + Arc::new(min_values.finish()), + Arc::new(max_values.finish()), + Arc::new(estimated_bytes.finish()), + ], + ) } -fn from_task_trackers(db_name: &str, jobs: Vec>) -> Result { +/// Implementation of system.operations table +#[derive(Debug)] +struct OperationsTable { + schema: SchemaRef, + db_name: String, + jobs: Arc, +} + +impl OperationsTable { + fn new(db_name: String, jobs: Arc) -> Self { + Self { + schema: operations_schema(), + db_name, + jobs, + } + } +} + +impl IoxSystemTable for OperationsTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn batch(&self) -> Result { + from_task_trackers(self.schema(), &self.db_name, self.jobs.tracked()) + .log_if_error("system.operations table") + } +} + +fn operations_schema() -> SchemaRef { + let ts = DataType::Time64(TimeUnit::Nanosecond); + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("status", DataType::Utf8, true), + Field::new("cpu_time_used", ts.clone(), true), + Field::new("wall_time_used", ts, true), + Field::new("partition_key", DataType::Utf8, true), + Field::new("chunk_id", DataType::UInt32, true), + Field::new("description", DataType::Utf8, true), + ])) +} + +fn from_task_trackers( + schema: SchemaRef, + db_name: &str, + jobs: Vec>, +) -> Result { let jobs = jobs .into_iter() .filter(|job| job.metadata().db_name() == Some(db_name)) @@ -397,15 +547,18 @@ fn from_task_trackers(db_name: &str, jobs: Vec>) -> Result