feat: Calculate all system tables "on demand" (#1452)

* feat: compute system.columns table on demand

* feat: compute system.chunk_columns on demand

* feat: compute system.operations on demand

* fix: fixup schemas

* fix: Log errors

* fix: clippy

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2021-05-10 10:43:55 -04:00 committed by GitHub
parent 8dcb4a54a2
commit f037c1281a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 252 additions and 98 deletions

View File

@ -26,7 +26,7 @@ use data_types::{
}; };
use datafusion::{ use datafusion::{
catalog::schema::SchemaProvider, catalog::schema::SchemaProvider,
datasource::{datasource::Statistics, MemTable, TableProvider}, datasource::{datasource::Statistics, TableProvider},
error::{DataFusionError, Result as DataFusionResult}, error::{DataFusionError, Result as DataFusionResult},
physical_plan::{memory::MemoryExec, ExecutionPlan}, physical_plan::{memory::MemoryExec, ExecutionPlan},
}; };
@ -43,24 +43,41 @@ const COLUMNS: &str = "columns";
const CHUNK_COLUMNS: &str = "chunk_columns"; const CHUNK_COLUMNS: &str = "chunk_columns";
const OPERATIONS: &str = "operations"; const OPERATIONS: &str = "operations";
#[derive(Debug)]
pub struct SystemSchemaProvider { pub struct SystemSchemaProvider {
db_name: String, chunks: Arc<dyn TableProvider>,
catalog: Arc<Catalog>, columns: Arc<dyn TableProvider>,
jobs: Arc<JobRegistry>, chunk_columns: Arc<dyn TableProvider>,
operations: Arc<dyn TableProvider>,
}
chunks: Arc<ChunksProvider>, impl std::fmt::Debug for SystemSchemaProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SystemSchemaProvider")
.field("fields", &"...")
.finish()
}
} }
impl SystemSchemaProvider { impl SystemSchemaProvider {
pub fn new(db_name: impl Into<String>, catalog: Arc<Catalog>, jobs: Arc<JobRegistry>) -> Self { pub fn new(db_name: impl Into<String>, catalog: Arc<Catalog>, jobs: Arc<JobRegistry>) -> Self {
let db_name = db_name.into(); let db_name = db_name.into();
let chunks = Arc::new(ChunksProvider::new(Arc::clone(&catalog))); let chunks = Arc::new(SystemTableProvider {
inner: ChunksTable::new(Arc::clone(&catalog)),
});
let columns = Arc::new(SystemTableProvider {
inner: ColumnsTable::new(Arc::clone(&catalog)),
});
let chunk_columns = Arc::new(SystemTableProvider {
inner: ChunkColumnsTable::new(catalog),
});
let operations = Arc::new(SystemTableProvider {
inner: OperationsTable::new(db_name, jobs),
});
Self { Self {
db_name,
catalog,
jobs,
chunks, chunks,
columns,
chunk_columns,
operations,
} }
} }
} }
@ -80,32 +97,58 @@ impl SchemaProvider for SystemSchemaProvider {
} }
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> { fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
// TODO: Use of a MemTable potentially results in materializing redundant data match name {
if name == CHUNKS { CHUNKS => Some(Arc::clone(&self.chunks)),
return Some(Arc::clone(&self.chunks) as Arc<dyn TableProvider>); COLUMNS => Some(Arc::clone(&self.columns)),
CHUNK_COLUMNS => Some(Arc::clone(&self.chunk_columns)),
OPERATIONS => Some(Arc::clone(&self.operations)),
_ => None,
}
}
} }
let batch = match name { /// The minimal thing that a system table needs to implement
CHUNK_COLUMNS => assemble_chunk_columns( trait IoxSystemTable: Send + Sync {
self.catalog.unaggregated_partition_summaries(), /// Produce the schema from this system table
self.catalog.detailed_chunk_summaries(), fn schema(&self) -> SchemaRef;
)
.log_if_error("chunk_columns table")
.ok()?,
COLUMNS => from_partition_summaries(self.catalog.partition_summaries())
.log_if_error("chunks table")
.ok()?,
OPERATIONS => from_task_trackers(&self.db_name, self.jobs.tracked())
.log_if_error("operations table")
.ok()?,
_ => return None,
};
let table = MemTable::try_new(batch.schema(), vec![vec![batch]]) /// Get the contents of the system table as a single RecordBatch
.log_if_error("constructing chunks system table") fn batch(&self) -> Result<RecordBatch>;
.ok()?; }
Some(Arc::<MemTable>::new(table)) /// Adapter that makes any `IoxSystemTable` a DataFusion `TableProvider`
struct SystemTableProvider<T>
where
T: IoxSystemTable,
{
inner: T,
}
impl<T> TableProvider for SystemTableProvider<T>
where
T: IoxSystemTable + 'static,
{
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
fn scan(
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
// It would be cool to push projection and limit down
_filters: &[datafusion::logical_plan::Expr],
_limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
scan_batch(self.inner.batch()?, self.schema(), projection.as_ref())
}
fn statistics(&self) -> Statistics {
Statistics::default()
} }
} }
@ -116,22 +159,28 @@ fn time_to_ts(time: Option<DateTime<Utc>>) -> Option<i64> {
/// Implementation of system.chunks table /// Implementation of system.chunks table
#[derive(Debug)] #[derive(Debug)]
struct ChunksProvider { struct ChunksTable {
schema: SchemaRef, schema: SchemaRef,
catalog: Arc<Catalog>, catalog: Arc<Catalog>,
} }
impl ChunksProvider { impl ChunksTable {
fn new(catalog: Arc<Catalog>) -> Self { fn new(catalog: Arc<Catalog>) -> Self {
Self { Self {
schema: chunk_summaries_schema(), schema: chunk_summaries_schema(),
catalog, catalog,
} }
} }
}
fn chunk_summaries(&self) -> Result<RecordBatch> { impl IoxSystemTable for ChunksTable {
let chunks = self.catalog.chunk_summaries(); fn schema(&self) -> SchemaRef {
from_chunk_summaries(self.schema(), chunks) Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_chunk_summaries(self.schema(), self.catalog.chunk_summaries())
.log_if_error("system.chunks table")
} }
} }
@ -184,33 +233,46 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
) )
} }
impl TableProvider for ChunksProvider { /// Implementation of `system.columns` system table
fn as_any(&self) -> &dyn Any { #[derive(Debug)]
self struct ColumnsTable {
schema: SchemaRef,
catalog: Arc<Catalog>,
} }
fn schema(&self) -> arrow::datatypes::SchemaRef { impl ColumnsTable {
fn new(catalog: Arc<Catalog>) -> Self {
Self {
schema: partition_summaries_schema(),
catalog,
}
}
}
impl IoxSystemTable for ColumnsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema) Arc::clone(&self.schema)
} }
fn batch(&self) -> Result<RecordBatch> {
fn scan( from_partition_summaries(self.schema(), self.catalog.partition_summaries())
&self, .log_if_error("system.columns table")
projection: &Option<Vec<usize>>,
_batch_size: usize,
// It would be cool to push projection and limit down
_filters: &[datafusion::logical_plan::Expr],
_limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let batch = self.chunk_summaries().log_if_error("chunks table")?;
scan_batch(batch, self.schema(), projection.as_ref())
}
fn statistics(&self) -> Statistics {
Statistics::default()
} }
} }
fn from_partition_summaries(partitions: Vec<PartitionSummary>) -> Result<RecordBatch> { fn partition_summaries_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("partition_key", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("column_name", DataType::Utf8, false),
Field::new("column_type", DataType::Utf8, false),
Field::new("influxdb_type", DataType::Utf8, true),
]))
}
fn from_partition_summaries(
schema: SchemaRef,
partitions: Vec<PartitionSummary>,
) -> Result<RecordBatch> {
// Assume each partition has roughly 5 tables with 5 columns // Assume each partition has roughly 5 tables with 5 columns
let row_estimate = partitions.len() * 25; let row_estimate = partitions.len() * 25;
@ -239,22 +301,65 @@ fn from_partition_summaries(partitions: Vec<PartitionSummary>) -> Result<RecordB
} }
} }
let partition_key = partition_key.finish(); RecordBatch::try_new(
let table_name = table_name.finish(); schema,
let column_name = column_name.finish(); vec![
let column_type = column_type.finish(); Arc::new(partition_key.finish()) as ArrayRef,
let influxdb_type = influxdb_type.finish(); Arc::new(table_name.finish()),
Arc::new(column_name.finish()),
Arc::new(column_type.finish()),
Arc::new(influxdb_type.finish()),
],
)
}
RecordBatch::try_from_iter_with_nullable(vec![ /// Implementation of system.column_chunks table
("partition_key", Arc::new(partition_key) as ArrayRef, false), #[derive(Debug)]
("table_name", Arc::new(table_name), false), struct ChunkColumnsTable {
("column_name", Arc::new(column_name), false), schema: SchemaRef,
("column_type", Arc::new(column_type), false), catalog: Arc<Catalog>,
("influxdb_type", Arc::new(influxdb_type), true), }
])
impl ChunkColumnsTable {
fn new(catalog: Arc<Catalog>) -> Self {
Self {
schema: chunk_columns_schema(),
catalog,
}
}
}
impl IoxSystemTable for ChunkColumnsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
assemble_chunk_columns(
self.schema(),
self.catalog.unaggregated_partition_summaries(),
self.catalog.detailed_chunk_summaries(),
)
.log_if_error("system.column_chunks table")
}
}
fn chunk_columns_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("partition_key", DataType::Utf8, false),
Field::new("chunk_id", DataType::UInt32, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("column_name", DataType::Utf8, false),
Field::new("storage", DataType::Utf8, false),
Field::new("count", DataType::UInt64, true),
Field::new("min_value", DataType::Utf8, true),
Field::new("max_value", DataType::Utf8, true),
Field::new("estimated_bytes", DataType::UInt64, true),
]))
} }
fn assemble_chunk_columns( fn assemble_chunk_columns(
schema: SchemaRef,
partitions: Vec<UnaggregatedPartitionSummary>, partitions: Vec<UnaggregatedPartitionSummary>,
chunk_summaries: Vec<DetailedChunkSummary>, chunk_summaries: Vec<DetailedChunkSummary>,
) -> Result<RecordBatch> { ) -> Result<RecordBatch> {
@ -358,24 +463,69 @@ fn assemble_chunk_columns(
} }
} }
RecordBatch::try_from_iter_with_nullable(vec![ RecordBatch::try_new(
( schema,
"partition_key", vec![
Arc::new(partition_key.finish()) as ArrayRef, Arc::new(partition_key.finish()) as ArrayRef,
false, Arc::new(chunk_id.finish()),
), Arc::new(table_name.finish()),
("chunk_id", Arc::new(chunk_id.finish()), false), Arc::new(column_name.finish()),
("table_name", Arc::new(table_name.finish()), false), Arc::new(storage.finish()),
("column_name", Arc::new(column_name.finish()), false), Arc::new(count.finish()),
("storage", Arc::new(storage.finish()), false), Arc::new(min_values.finish()),
("count", Arc::new(count.finish()), false), Arc::new(max_values.finish()),
("min_value", Arc::new(min_values.finish()), true), Arc::new(estimated_bytes.finish()),
("max_value", Arc::new(max_values.finish()), true), ],
("estimated_bytes", Arc::new(estimated_bytes.finish()), true), )
])
} }
fn from_task_trackers(db_name: &str, jobs: Vec<TaskTracker<Job>>) -> Result<RecordBatch> { /// Implementation of system.operations table
#[derive(Debug)]
struct OperationsTable {
schema: SchemaRef,
db_name: String,
jobs: Arc<JobRegistry>,
}
impl OperationsTable {
fn new(db_name: String, jobs: Arc<JobRegistry>) -> Self {
Self {
schema: operations_schema(),
db_name,
jobs,
}
}
}
impl IoxSystemTable for OperationsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_task_trackers(self.schema(), &self.db_name, self.jobs.tracked())
.log_if_error("system.operations table")
}
}
fn operations_schema() -> SchemaRef {
let ts = DataType::Time64(TimeUnit::Nanosecond);
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("status", DataType::Utf8, true),
Field::new("cpu_time_used", ts.clone(), true),
Field::new("wall_time_used", ts, true),
Field::new("partition_key", DataType::Utf8, true),
Field::new("chunk_id", DataType::UInt32, true),
Field::new("description", DataType::Utf8, true),
]))
}
fn from_task_trackers(
schema: SchemaRef,
db_name: &str,
jobs: Vec<TaskTracker<Job>>,
) -> Result<RecordBatch> {
let jobs = jobs let jobs = jobs
.into_iter() .into_iter()
.filter(|job| job.metadata().db_name() == Some(db_name)) .filter(|job| job.metadata().db_name() == Some(db_name))
@ -397,15 +547,18 @@ fn from_task_trackers(db_name: &str, jobs: Vec<TaskTracker<Job>>) -> Result<Reco
let descriptions = let descriptions =
StringArray::from_iter(jobs.iter().map(|job| Some(job.metadata().description()))); StringArray::from_iter(jobs.iter().map(|job| Some(job.metadata().description())));
RecordBatch::try_from_iter_with_nullable(vec![ RecordBatch::try_new(
("id", Arc::new(ids) as ArrayRef, false), schema,
("status", Arc::new(statuses), true), vec![
("cpu_time_used", Arc::new(cpu_time_used), true), Arc::new(ids) as ArrayRef,
("wall_time_used", Arc::new(wall_time_used), true), Arc::new(statuses),
("partition_key", Arc::new(partition_keys), true), Arc::new(cpu_time_used),
("chunk_id", Arc::new(chunk_ids), true), Arc::new(wall_time_used),
("description", Arc::new(descriptions), true), Arc::new(partition_keys),
]) Arc::new(chunk_ids),
Arc::new(descriptions),
],
)
} }
/// Creates a DataFusion ExecutionPlan node that scans a single batch /// Creates a DataFusion ExecutionPlan node that scans a single batch
@ -565,7 +718,7 @@ mod tests {
"+---------------+------------+-------------+-------------+---------------+", "+---------------+------------+-------------+-------------+---------------+",
]; ];
let batch = from_partition_summaries(partitions).unwrap(); let batch = from_partition_summaries(partition_summaries_schema(), partitions).unwrap();
assert_batches_eq!(&expected, &[batch]); assert_batches_eq!(&expected, &[batch]);
} }
@ -784,7 +937,8 @@ mod tests {
"+---------------+----------+------------+-------------+-------------------+-------+-----------+-----------+-----------------+", "+---------------+----------+------------+-------------+-------------------+-------+-----------+-----------+-----------------+",
]; ];
let batch = assemble_chunk_columns(partitions, chunk_summaries).unwrap(); let batch =
assemble_chunk_columns(chunk_columns_schema(), partitions, chunk_summaries).unwrap();
assert_batches_eq!(&expected, &[batch]); assert_batches_eq!(&expected, &[batch]);
} }
} }