From cf8a60252d03ce52b7960bd23e1c891b5965da6a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 20 Jul 2021 13:19:20 +0100 Subject: [PATCH] refactor: split system_tables module into smaller modules (#2061) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- server/src/db/system_tables.rs | 698 +--------------------- server/src/db/system_tables/chunks.rs | 201 +++++++ server/src/db/system_tables/columns.rs | 404 +++++++++++++ server/src/db/system_tables/operations.rs | 108 ++++ 4 files changed, 729 insertions(+), 682 deletions(-) create mode 100644 server/src/db/system_tables/chunks.rs create mode 100644 server/src/db/system_tables/columns.rs create mode 100644 server/src/db/system_tables/operations.rs diff --git a/server/src/db/system_tables.rs b/server/src/db/system_tables.rs index f80f06b9bc..f83c793fa5 100644 --- a/server/src/db/system_tables.rs +++ b/server/src/db/system_tables.rs @@ -7,38 +7,30 @@ //! //! For example `SELECT * FROM system.chunks` -use std::convert::AsRef; +use std::any::Any; use std::sync::Arc; -use std::{any::Any, collections::HashMap}; - -use chrono::{DateTime, Utc}; use arrow::{ - array::{ - ArrayRef, StringArray, StringBuilder, Time64NanosecondArray, TimestampNanosecondArray, - UInt32Array, UInt32Builder, UInt64Array, UInt64Builder, - }, - datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}, + datatypes::{Field, Schema, SchemaRef}, error::Result, record_batch::RecordBatch, }; -use data_types::{ - chunk_metadata::{ChunkSummary, DetailedChunkSummary}, - error::ErrorLogger, - job::Job, - partition_metadata::PartitionSummary, -}; +use chrono::{DateTime, Utc}; + use datafusion::{ catalog::schema::SchemaProvider, datasource::{datasource::Statistics, TableProvider}, error::{DataFusionError, Result as DataFusionResult}, physical_plan::{memory::MemoryExec, ExecutionPlan}, }; -use tracker::TaskTracker; + +use crate::JobRegistry; use super::catalog::Catalog; -use crate::JobRegistry; -use data_types::partition_metadata::TableSummary; + +mod chunks; +mod columns; +mod operations; // The IOx system schema pub const SYSTEM_SCHEMA: &str = "system"; @@ -67,16 +59,16 @@ impl SystemSchemaProvider { pub fn new(db_name: impl Into, catalog: Arc, jobs: Arc) -> Self { let db_name = db_name.into(); let chunks = Arc::new(SystemTableProvider { - inner: ChunksTable::new(Arc::clone(&catalog)), + inner: chunks::ChunksTable::new(Arc::clone(&catalog)), }); let columns = Arc::new(SystemTableProvider { - inner: ColumnsTable::new(Arc::clone(&catalog)), + inner: columns::ColumnsTable::new(Arc::clone(&catalog)), }); let chunk_columns = Arc::new(SystemTableProvider { - inner: ChunkColumnsTable::new(catalog), + inner: columns::ChunkColumnsTable::new(catalog), }); let operations = Arc::new(SystemTableProvider { - inner: OperationsTable::new(db_name, jobs), + inner: operations::OperationsTable::new(db_name, jobs), }); Self { chunks, @@ -162,407 +154,6 @@ fn time_to_ts(time: Option>) -> Option { time.map(|ts| ts.timestamp_nanos()) } -/// Implementation of system.chunks table -#[derive(Debug)] -struct ChunksTable { - schema: SchemaRef, - catalog: Arc, -} - -impl ChunksTable { - fn new(catalog: Arc) -> Self { - Self { - schema: chunk_summaries_schema(), - catalog, - } - } -} - -impl IoxSystemTable for ChunksTable { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn batch(&self) -> Result { - from_chunk_summaries(self.schema(), self.catalog.chunk_summaries()) - .log_if_error("system.chunks table") - } -} - -fn chunk_summaries_schema() -> SchemaRef { - let ts = DataType::Timestamp(TimeUnit::Nanosecond, None); - Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt32, false), - Field::new("partition_key", DataType::Utf8, false), - Field::new("table_name", DataType::Utf8, false), - Field::new("storage", DataType::Utf8, false), - Field::new("lifecycle_action", DataType::Utf8, true), - Field::new("memory_bytes", DataType::UInt64, false), - Field::new("object_store_bytes", DataType::UInt64, false), - Field::new("row_count", DataType::UInt64, false), - Field::new("time_of_first_write", ts.clone(), true), - Field::new("time_of_last_write", ts.clone(), true), - Field::new("time_closed", ts, true), - ])) -} - -fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result { - let id = chunks.iter().map(|c| Some(c.id)).collect::(); - let partition_key = chunks - .iter() - .map(|c| Some(c.partition_key.as_ref())) - .collect::(); - let table_name = chunks - .iter() - .map(|c| Some(c.table_name.as_ref())) - .collect::(); - let storage = chunks - .iter() - .map(|c| Some(c.storage.as_str())) - .collect::(); - let lifecycle_action = chunks - .iter() - .map(|c| c.lifecycle_action.map(|a| a.name())) - .collect::(); - let memory_bytes = chunks - .iter() - .map(|c| Some(c.memory_bytes as u64)) - .collect::(); - let object_store_bytes = chunks - .iter() - .map(|c| Some(c.object_store_bytes as u64).filter(|&v| v > 0)) - .collect::(); - let row_counts = chunks - .iter() - .map(|c| Some(c.row_count as u64)) - .collect::(); - let time_of_first_write = chunks - .iter() - .map(|c| c.time_of_first_write) - .map(time_to_ts) - .collect::(); - let time_of_last_write = chunks - .iter() - .map(|c| c.time_of_last_write) - .map(time_to_ts) - .collect::(); - let time_closed = chunks - .iter() - .map(|c| c.time_closed) - .map(time_to_ts) - .collect::(); - - RecordBatch::try_new( - schema, - vec![ - Arc::new(id), - Arc::new(partition_key), - Arc::new(table_name), - Arc::new(storage), - Arc::new(lifecycle_action), - Arc::new(memory_bytes), - Arc::new(object_store_bytes), - Arc::new(row_counts), - Arc::new(time_of_first_write), - Arc::new(time_of_last_write), - Arc::new(time_closed), - ], - ) -} - -/// Implementation of `system.columns` system table -#[derive(Debug)] -struct ColumnsTable { - schema: SchemaRef, - catalog: Arc, -} - -impl ColumnsTable { - fn new(catalog: Arc) -> Self { - Self { - schema: partition_summaries_schema(), - catalog, - } - } -} - -impl IoxSystemTable for ColumnsTable { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - fn batch(&self) -> Result { - from_partition_summaries(self.schema(), self.catalog.partition_summaries()) - .log_if_error("system.columns table") - } -} - -fn partition_summaries_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("partition_key", DataType::Utf8, false), - Field::new("table_name", DataType::Utf8, false), - Field::new("column_name", DataType::Utf8, false), - Field::new("column_type", DataType::Utf8, false), - Field::new("influxdb_type", DataType::Utf8, true), - ])) -} - -fn from_partition_summaries( - schema: SchemaRef, - partitions: Vec, -) -> Result { - // Assume each partition has roughly 5 tables with 5 columns - let row_estimate = partitions.len() * 25; - - let mut partition_key = StringBuilder::new(row_estimate); - let mut table_name = StringBuilder::new(row_estimate); - let mut column_name = StringBuilder::new(row_estimate); - let mut column_type = StringBuilder::new(row_estimate); - let mut influxdb_type = StringBuilder::new(row_estimate); - - // Note no rows are produced for partitions with no tabes, or - // tables with no columns: There are other tables to list tables - // and columns - for partition in partitions { - let table = partition.table; - for column in table.columns { - partition_key.append_value(&partition.key)?; - table_name.append_value(&table.name)?; - column_name.append_value(&column.name)?; - column_type.append_value(column.type_name())?; - if let Some(t) = &column.influxdb_type { - influxdb_type.append_value(t.as_str())?; - } else { - influxdb_type.append_null()?; - } - } - } - - RecordBatch::try_new( - schema, - vec![ - Arc::new(partition_key.finish()) as ArrayRef, - Arc::new(table_name.finish()), - Arc::new(column_name.finish()), - Arc::new(column_type.finish()), - Arc::new(influxdb_type.finish()), - ], - ) -} - -/// Implementation of system.column_chunks table -#[derive(Debug)] -struct ChunkColumnsTable { - schema: SchemaRef, - catalog: Arc, -} - -impl ChunkColumnsTable { - fn new(catalog: Arc) -> Self { - Self { - schema: chunk_columns_schema(), - catalog, - } - } -} - -impl IoxSystemTable for ChunkColumnsTable { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn batch(&self) -> Result { - assemble_chunk_columns(self.schema(), self.catalog.detailed_chunk_summaries()) - .log_if_error("system.column_chunks table") - } -} - -fn chunk_columns_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("partition_key", DataType::Utf8, false), - Field::new("chunk_id", DataType::UInt32, false), - Field::new("table_name", DataType::Utf8, false), - Field::new("column_name", DataType::Utf8, false), - Field::new("storage", DataType::Utf8, false), - Field::new("row_count", DataType::UInt64, true), - Field::new("min_value", DataType::Utf8, true), - Field::new("max_value", DataType::Utf8, true), - Field::new("memory_bytes", DataType::UInt64, true), - ])) -} - -fn assemble_chunk_columns( - schema: SchemaRef, - chunk_summaries: Vec<(Arc, DetailedChunkSummary)>, -) -> Result { - /// Builds an index from column_name -> size - fn make_column_index(summary: &DetailedChunkSummary) -> HashMap<&str, u64> { - summary - .columns - .iter() - .map(|column_summary| { - ( - column_summary.name.as_ref(), - column_summary.memory_bytes as u64, - ) - }) - .collect() - } - - // Assume each chunk has roughly 5 columns - let row_estimate = chunk_summaries.len() * 5; - - let mut partition_key = StringBuilder::new(row_estimate); - let mut chunk_id = UInt32Builder::new(row_estimate); - let mut table_name = StringBuilder::new(row_estimate); - let mut column_name = StringBuilder::new(row_estimate); - let mut storage = StringBuilder::new(row_estimate); - let mut row_count = UInt64Builder::new(row_estimate); - let mut min_values = StringBuilder::new(row_estimate); - let mut max_values = StringBuilder::new(row_estimate); - let mut memory_bytes = UInt64Builder::new(row_estimate); - - // Note no rows are produced for partitions with no chunks, or - // tables with no partitions: There are other tables to list tables - // and columns - for (table_summary, chunk_summary) in chunk_summaries { - let mut column_index = make_column_index(&chunk_summary); - let storage_value = chunk_summary.inner.storage.as_str(); - - for column in &table_summary.columns { - partition_key.append_value(chunk_summary.inner.partition_key.as_ref())?; - chunk_id.append_value(chunk_summary.inner.id)?; - table_name.append_value(&chunk_summary.inner.table_name)?; - column_name.append_value(&column.name)?; - storage.append_value(storage_value)?; - row_count.append_value(column.count())?; - if let Some(v) = column.stats.min_as_str() { - min_values.append_value(v)?; - } else { - min_values.append(false)?; - } - if let Some(v) = column.stats.max_as_str() { - max_values.append_value(v)?; - } else { - max_values.append(false)?; - } - - let size = column_index.remove(column.name.as_str()); - - memory_bytes.append_option(size)?; - } - } - - RecordBatch::try_new( - schema, - vec![ - Arc::new(partition_key.finish()) as ArrayRef, - Arc::new(chunk_id.finish()), - Arc::new(table_name.finish()), - Arc::new(column_name.finish()), - Arc::new(storage.finish()), - Arc::new(row_count.finish()), - Arc::new(min_values.finish()), - Arc::new(max_values.finish()), - Arc::new(memory_bytes.finish()), - ], - ) -} - -/// Implementation of system.operations table -#[derive(Debug)] -struct OperationsTable { - schema: SchemaRef, - db_name: String, - jobs: Arc, -} - -impl OperationsTable { - fn new(db_name: String, jobs: Arc) -> Self { - Self { - schema: operations_schema(), - db_name, - jobs, - } - } -} - -impl IoxSystemTable for OperationsTable { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn batch(&self) -> Result { - from_task_trackers(self.schema(), &self.db_name, self.jobs.tracked()) - .log_if_error("system.operations table") - } -} - -fn operations_schema() -> SchemaRef { - let ts = DataType::Time64(TimeUnit::Nanosecond); - Arc::new(Schema::new(vec![ - Field::new("id", DataType::Utf8, false), - Field::new("status", DataType::Utf8, true), - Field::new("cpu_time_used", ts.clone(), true), - Field::new("wall_time_used", ts, true), - Field::new("partition_key", DataType::Utf8, true), - Field::new("chunk_id", DataType::UInt32, true), - Field::new("description", DataType::Utf8, true), - ])) -} - -fn from_task_trackers( - schema: SchemaRef, - db_name: &str, - jobs: Vec>, -) -> Result { - let jobs = jobs - .into_iter() - .filter(|job| job.metadata().db_name() == Some(db_name)) - .collect::>(); - - let ids = jobs - .iter() - .map(|job| Some(job.id().to_string())) - .collect::(); - let statuses = jobs - .iter() - .map(|job| Some(job.get_status().name())) - .collect::(); - let cpu_time_used = jobs - .iter() - .map(|job| job.get_status().cpu_nanos().map(|n| n as i64)) - .collect::(); - let wall_time_used = jobs - .iter() - .map(|job| job.get_status().wall_nanos().map(|n| n as i64)) - .collect::(); - let partition_keys = jobs - .iter() - .map(|job| job.metadata().partition_key()) - .collect::(); - let chunk_ids = jobs - .iter() - .map(|job| job.metadata().chunk_id()) - .collect::(); - let descriptions = jobs - .iter() - .map(|job| Some(job.metadata().description())) - .collect::(); - - RecordBatch::try_new( - schema, - vec![ - Arc::new(ids) as ArrayRef, - Arc::new(statuses), - Arc::new(cpu_time_used), - Arc::new(wall_time_used), - Arc::new(partition_keys), - Arc::new(chunk_ids), - Arc::new(descriptions), - ], - ) -} - /// Creates a DataFusion ExecutionPlan node that scans a single batch /// of records. fn scan_batch( @@ -605,141 +196,10 @@ fn scan_batch( #[cfg(test)] mod tests { - use super::*; + use arrow::array::{ArrayRef, UInt64Array}; use arrow_util::assert_batches_eq; - use chrono::NaiveDateTime; - use data_types::{ - chunk_metadata::{ChunkColumnSummary, ChunkLifecycleAction, ChunkStorage}, - partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, - }; - #[test] - fn test_from_chunk_summaries() { - let chunks = vec![ - ChunkSummary { - partition_key: Arc::from("p1"), - table_name: Arc::from("table1"), - id: 0, - storage: ChunkStorage::OpenMutableBuffer, - lifecycle_action: None, - memory_bytes: 23754, - object_store_bytes: 0, - row_count: 11, - time_of_first_write: Some(DateTime::from_utc( - NaiveDateTime::from_timestamp(10, 0), - Utc, - )), - time_of_last_write: None, - time_closed: None, - }, - ChunkSummary { - partition_key: Arc::from("p1"), - table_name: Arc::from("table1"), - id: 1, - storage: ChunkStorage::OpenMutableBuffer, - lifecycle_action: Some(ChunkLifecycleAction::Persisting), - memory_bytes: 23455, - object_store_bytes: 0, - row_count: 22, - time_of_first_write: None, - time_of_last_write: Some(DateTime::from_utc( - NaiveDateTime::from_timestamp(80, 0), - Utc, - )), - time_closed: None, - }, - ChunkSummary { - partition_key: Arc::from("p1"), - table_name: Arc::from("table1"), - id: 2, - storage: ChunkStorage::ObjectStoreOnly, - lifecycle_action: None, - memory_bytes: 1234, - object_store_bytes: 5678, - row_count: 33, - time_of_first_write: Some(DateTime::from_utc( - NaiveDateTime::from_timestamp(100, 0), - Utc, - )), - time_of_last_write: Some(DateTime::from_utc( - NaiveDateTime::from_timestamp(200, 0), - Utc, - )), - time_closed: None, - }, - ]; - - let expected = vec![ - "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+", - "| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |", - "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+", - "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | 1970-01-01 00:00:10 | | |", - "| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | | 1970-01-01 00:01:20 | |", - "| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01 00:01:40 | 1970-01-01 00:03:20 | |", - "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+", - ]; - - let schema = chunk_summaries_schema(); - let batch = from_chunk_summaries(schema, chunks).unwrap(); - assert_batches_eq!(&expected, &[batch]); - } - - #[test] - fn test_from_partition_summaries() { - let partitions = vec![ - PartitionSummary { - key: "p1".to_string(), - table: TableSummary { - name: "t1".to_string(), - columns: vec![ - ColumnSummary { - name: "c1".to_string(), - influxdb_type: Some(InfluxDbType::Tag), - stats: Statistics::I64(StatValues::new_with_value(23)), - }, - ColumnSummary { - name: "c2".to_string(), - influxdb_type: Some(InfluxDbType::Field), - stats: Statistics::I64(StatValues::new_with_value(43)), - }, - ColumnSummary { - name: "c3".to_string(), - influxdb_type: None, - stats: Statistics::String(StatValues::new_with_value( - "foo".to_string(), - )), - }, - ColumnSummary { - name: "time".to_string(), - influxdb_type: Some(InfluxDbType::Timestamp), - stats: Statistics::I64(StatValues::new_with_value(43)), - }, - ], - }, - }, - PartitionSummary { - key: "p3".to_string(), - table: TableSummary { - name: "t1".to_string(), - columns: vec![], - }, - }, - ]; - - let expected = vec![ - "+---------------+------------+-------------+-------------+---------------+", - "| partition_key | table_name | column_name | column_type | influxdb_type |", - "+---------------+------------+-------------+-------------+---------------+", - "| p1 | t1 | c1 | I64 | Tag |", - "| p1 | t1 | c2 | I64 | Field |", - "| p1 | t1 | c3 | String | |", - "| p1 | t1 | time | I64 | Timestamp |", - "+---------------+------------+-------------+-------------+---------------+", - ]; - - let batch = from_partition_summaries(partition_summaries_schema(), partitions).unwrap(); - assert_batches_eq!(&expected, &[batch]); - } + use super::*; fn seq_array(start: u64, end: u64) -> ArrayRef { Arc::new(UInt64Array::from_iter_values(start..end)) @@ -820,130 +280,4 @@ mod tests { err_string ); } - - #[test] - fn test_assemble_chunk_columns() { - let lifecycle_action = None; - - let summaries = vec![ - ( - Arc::new(TableSummary { - name: "t1".to_string(), - columns: vec![ - ColumnSummary { - name: "c1".to_string(), - influxdb_type: Some(InfluxDbType::Field), - stats: Statistics::String(StatValues::new( - Some("bar".to_string()), - Some("foo".to_string()), - 55, - )), - }, - ColumnSummary { - name: "c2".to_string(), - influxdb_type: Some(InfluxDbType::Field), - stats: Statistics::F64(StatValues::new(Some(11.0), Some(43.0), 66)), - }, - ], - }), - DetailedChunkSummary { - inner: ChunkSummary { - partition_key: "p1".into(), - table_name: "t1".into(), - id: 42, - storage: ChunkStorage::ReadBuffer, - lifecycle_action, - memory_bytes: 23754, - object_store_bytes: 0, - row_count: 11, - time_of_first_write: None, - time_of_last_write: None, - time_closed: None, - }, - columns: vec![ - ChunkColumnSummary { - name: "c1".into(), - memory_bytes: 11, - }, - ChunkColumnSummary { - name: "c2".into(), - memory_bytes: 12, - }, - ], - }, - ), - ( - Arc::new(TableSummary { - name: "t1".to_string(), - columns: vec![ColumnSummary { - name: "c1".to_string(), - influxdb_type: Some(InfluxDbType::Field), - stats: Statistics::F64(StatValues::new(Some(110.0), Some(430.0), 667)), - }], - }), - DetailedChunkSummary { - inner: ChunkSummary { - partition_key: "p2".into(), - table_name: "t1".into(), - id: 43, - storage: ChunkStorage::OpenMutableBuffer, - lifecycle_action, - memory_bytes: 23754, - object_store_bytes: 0, - row_count: 11, - time_of_first_write: None, - time_of_last_write: None, - time_closed: None, - }, - columns: vec![ChunkColumnSummary { - name: "c1".into(), - memory_bytes: 100, - }], - }, - ), - ( - Arc::new(TableSummary { - name: "t2".to_string(), - columns: vec![ColumnSummary { - name: "c3".to_string(), - influxdb_type: Some(InfluxDbType::Field), - stats: Statistics::F64(StatValues::new(Some(-1.0), Some(2.0), 4)), - }], - }), - DetailedChunkSummary { - inner: ChunkSummary { - partition_key: "p2".into(), - table_name: "t2".into(), - id: 44, - storage: ChunkStorage::OpenMutableBuffer, - lifecycle_action, - memory_bytes: 23754, - object_store_bytes: 0, - row_count: 11, - time_of_first_write: None, - time_of_last_write: None, - time_closed: None, - }, - columns: vec![ChunkColumnSummary { - name: "c3".into(), - memory_bytes: 200, - }], - }, - ), - ]; - - let expected = vec![ - "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", - "| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | memory_bytes |", - "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", - "| p1 | 42 | t1 | c1 | ReadBuffer | 55 | bar | foo | 11 |", - "| p1 | 42 | t1 | c2 | ReadBuffer | 66 | 11 | 43 | 12 |", - "| p2 | 43 | t1 | c1 | OpenMutableBuffer | 667 | 110 | 430 | 100 |", - "| p2 | 44 | t2 | c3 | OpenMutableBuffer | 4 | -1 | 2 | 200 |", - "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", - ]; - - let batch = assemble_chunk_columns(chunk_columns_schema(), summaries).unwrap(); - assert_batches_eq!(&expected, &[batch]); - } } diff --git a/server/src/db/system_tables/chunks.rs b/server/src/db/system_tables/chunks.rs new file mode 100644 index 0000000000..90acda0629 --- /dev/null +++ b/server/src/db/system_tables/chunks.rs @@ -0,0 +1,201 @@ +use std::sync::Arc; + +use arrow::array::{StringArray, TimestampNanosecondArray, UInt32Array, UInt64Array}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow::error::Result; +use arrow::record_batch::RecordBatch; + +use data_types::chunk_metadata::ChunkSummary; +use data_types::error::ErrorLogger; + +use crate::db::catalog::Catalog; +use crate::db::system_tables::{time_to_ts, IoxSystemTable}; + +/// Implementation of system.chunks table +#[derive(Debug)] +pub(super) struct ChunksTable { + schema: SchemaRef, + catalog: Arc, +} + +impl ChunksTable { + pub(super) fn new(catalog: Arc) -> Self { + Self { + schema: chunk_summaries_schema(), + catalog, + } + } +} + +impl IoxSystemTable for ChunksTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn batch(&self) -> Result { + from_chunk_summaries(self.schema(), self.catalog.chunk_summaries()) + .log_if_error("system.chunks table") + } +} + +fn chunk_summaries_schema() -> SchemaRef { + let ts = DataType::Timestamp(TimeUnit::Nanosecond, None); + Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new("partition_key", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("storage", DataType::Utf8, false), + Field::new("lifecycle_action", DataType::Utf8, true), + Field::new("memory_bytes", DataType::UInt64, false), + Field::new("object_store_bytes", DataType::UInt64, false), + Field::new("row_count", DataType::UInt64, false), + Field::new("time_of_first_write", ts.clone(), true), + Field::new("time_of_last_write", ts.clone(), true), + Field::new("time_closed", ts, true), + ])) +} + +fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result { + let id = chunks.iter().map(|c| Some(c.id)).collect::(); + let partition_key = chunks + .iter() + .map(|c| Some(c.partition_key.as_ref())) + .collect::(); + let table_name = chunks + .iter() + .map(|c| Some(c.table_name.as_ref())) + .collect::(); + let storage = chunks + .iter() + .map(|c| Some(c.storage.as_str())) + .collect::(); + let lifecycle_action = chunks + .iter() + .map(|c| c.lifecycle_action.map(|a| a.name())) + .collect::(); + let memory_bytes = chunks + .iter() + .map(|c| Some(c.memory_bytes as u64)) + .collect::(); + let object_store_bytes = chunks + .iter() + .map(|c| Some(c.object_store_bytes as u64).filter(|&v| v > 0)) + .collect::(); + let row_counts = chunks + .iter() + .map(|c| Some(c.row_count as u64)) + .collect::(); + let time_of_first_write = chunks + .iter() + .map(|c| c.time_of_first_write) + .map(time_to_ts) + .collect::(); + let time_of_last_write = chunks + .iter() + .map(|c| c.time_of_last_write) + .map(time_to_ts) + .collect::(); + let time_closed = chunks + .iter() + .map(|c| c.time_closed) + .map(time_to_ts) + .collect::(); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(id), + Arc::new(partition_key), + Arc::new(table_name), + Arc::new(storage), + Arc::new(lifecycle_action), + Arc::new(memory_bytes), + Arc::new(object_store_bytes), + Arc::new(row_counts), + Arc::new(time_of_first_write), + Arc::new(time_of_last_write), + Arc::new(time_closed), + ], + ) +} + +#[cfg(test)] +mod tests { + use chrono::{DateTime, NaiveDateTime, Utc}; + + use arrow_util::assert_batches_eq; + use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage}; + + use super::*; + + #[test] + fn test_from_chunk_summaries() { + let chunks = vec![ + ChunkSummary { + partition_key: Arc::from("p1"), + table_name: Arc::from("table1"), + id: 0, + storage: ChunkStorage::OpenMutableBuffer, + lifecycle_action: None, + memory_bytes: 23754, + object_store_bytes: 0, + row_count: 11, + time_of_first_write: Some(DateTime::from_utc( + NaiveDateTime::from_timestamp(10, 0), + Utc, + )), + time_of_last_write: None, + time_closed: None, + }, + ChunkSummary { + partition_key: Arc::from("p1"), + table_name: Arc::from("table1"), + id: 1, + storage: ChunkStorage::OpenMutableBuffer, + lifecycle_action: Some(ChunkLifecycleAction::Persisting), + memory_bytes: 23455, + object_store_bytes: 0, + row_count: 22, + time_of_first_write: None, + time_of_last_write: Some(DateTime::from_utc( + NaiveDateTime::from_timestamp(80, 0), + Utc, + )), + time_closed: None, + }, + ChunkSummary { + partition_key: Arc::from("p1"), + table_name: Arc::from("table1"), + id: 2, + storage: ChunkStorage::ObjectStoreOnly, + lifecycle_action: None, + memory_bytes: 1234, + object_store_bytes: 5678, + row_count: 33, + time_of_first_write: Some(DateTime::from_utc( + NaiveDateTime::from_timestamp(100, 0), + Utc, + )), + time_of_last_write: Some(DateTime::from_utc( + NaiveDateTime::from_timestamp(200, 0), + Utc, + )), + time_closed: None, + }, + ]; + + let expected = vec![ + "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+", + "| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |", + "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+", + "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | 1970-01-01 00:00:10 | | |", + "| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | | 1970-01-01 00:01:20 | |", + "| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01 00:01:40 | 1970-01-01 00:03:20 | |", + "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+", + ]; + + let schema = chunk_summaries_schema(); + let batch = from_chunk_summaries(schema, chunks).unwrap(); + assert_batches_eq!(&expected, &[batch]); + } +} diff --git a/server/src/db/system_tables/columns.rs b/server/src/db/system_tables/columns.rs new file mode 100644 index 0000000000..5f0b8f6fdd --- /dev/null +++ b/server/src/db/system_tables/columns.rs @@ -0,0 +1,404 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use arrow::array::{ArrayRef, StringBuilder, UInt32Builder, UInt64Builder}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::error::Result; +use arrow::record_batch::RecordBatch; + +use data_types::chunk_metadata::DetailedChunkSummary; +use data_types::error::ErrorLogger; +use data_types::partition_metadata::{PartitionSummary, TableSummary}; + +use crate::db::catalog::Catalog; +use crate::db::system_tables::IoxSystemTable; + +/// Implementation of `system.columns` system table +#[derive(Debug)] +pub(super) struct ColumnsTable { + schema: SchemaRef, + catalog: Arc, +} + +impl ColumnsTable { + pub(super) fn new(catalog: Arc) -> Self { + Self { + schema: partition_summaries_schema(), + catalog, + } + } +} + +impl IoxSystemTable for ColumnsTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + fn batch(&self) -> Result { + from_partition_summaries(self.schema(), self.catalog.partition_summaries()) + .log_if_error("system.columns table") + } +} + +fn partition_summaries_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("partition_key", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("column_name", DataType::Utf8, false), + Field::new("column_type", DataType::Utf8, false), + Field::new("influxdb_type", DataType::Utf8, true), + ])) +} + +fn from_partition_summaries( + schema: SchemaRef, + partitions: Vec, +) -> Result { + // Assume each partition has roughly 5 tables with 5 columns + let row_estimate = partitions.len() * 25; + + let mut partition_key = StringBuilder::new(row_estimate); + let mut table_name = StringBuilder::new(row_estimate); + let mut column_name = StringBuilder::new(row_estimate); + let mut column_type = StringBuilder::new(row_estimate); + let mut influxdb_type = StringBuilder::new(row_estimate); + + // Note no rows are produced for partitions with no tabes, or + // tables with no columns: There are other tables to list tables + // and columns + for partition in partitions { + let table = partition.table; + for column in table.columns { + partition_key.append_value(&partition.key)?; + table_name.append_value(&table.name)?; + column_name.append_value(&column.name)?; + column_type.append_value(column.type_name())?; + if let Some(t) = &column.influxdb_type { + influxdb_type.append_value(t.as_str())?; + } else { + influxdb_type.append_null()?; + } + } + } + + RecordBatch::try_new( + schema, + vec![ + Arc::new(partition_key.finish()) as ArrayRef, + Arc::new(table_name.finish()), + Arc::new(column_name.finish()), + Arc::new(column_type.finish()), + Arc::new(influxdb_type.finish()), + ], + ) +} + +/// Implementation of system.column_chunks table +#[derive(Debug)] +pub(super) struct ChunkColumnsTable { + schema: SchemaRef, + catalog: Arc, +} + +impl ChunkColumnsTable { + pub(super) fn new(catalog: Arc) -> Self { + Self { + schema: chunk_columns_schema(), + catalog, + } + } +} + +impl IoxSystemTable for ChunkColumnsTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn batch(&self) -> Result { + assemble_chunk_columns(self.schema(), self.catalog.detailed_chunk_summaries()) + .log_if_error("system.column_chunks table") + } +} + +fn chunk_columns_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("partition_key", DataType::Utf8, false), + Field::new("chunk_id", DataType::UInt32, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("column_name", DataType::Utf8, false), + Field::new("storage", DataType::Utf8, false), + Field::new("row_count", DataType::UInt64, true), + Field::new("min_value", DataType::Utf8, true), + Field::new("max_value", DataType::Utf8, true), + Field::new("memory_bytes", DataType::UInt64, true), + ])) +} + +fn assemble_chunk_columns( + schema: SchemaRef, + chunk_summaries: Vec<(Arc, DetailedChunkSummary)>, +) -> Result { + /// Builds an index from column_name -> size + fn make_column_index(summary: &DetailedChunkSummary) -> HashMap<&str, u64> { + summary + .columns + .iter() + .map(|column_summary| { + ( + column_summary.name.as_ref(), + column_summary.memory_bytes as u64, + ) + }) + .collect() + } + + // Assume each chunk has roughly 5 columns + let row_estimate = chunk_summaries.len() * 5; + + let mut partition_key = StringBuilder::new(row_estimate); + let mut chunk_id = UInt32Builder::new(row_estimate); + let mut table_name = StringBuilder::new(row_estimate); + let mut column_name = StringBuilder::new(row_estimate); + let mut storage = StringBuilder::new(row_estimate); + let mut row_count = UInt64Builder::new(row_estimate); + let mut min_values = StringBuilder::new(row_estimate); + let mut max_values = StringBuilder::new(row_estimate); + let mut memory_bytes = UInt64Builder::new(row_estimate); + + // Note no rows are produced for partitions with no chunks, or + // tables with no partitions: There are other tables to list tables + // and columns + for (table_summary, chunk_summary) in chunk_summaries { + let mut column_index = make_column_index(&chunk_summary); + let storage_value = chunk_summary.inner.storage.as_str(); + + for column in &table_summary.columns { + partition_key.append_value(chunk_summary.inner.partition_key.as_ref())?; + chunk_id.append_value(chunk_summary.inner.id)?; + table_name.append_value(&chunk_summary.inner.table_name)?; + column_name.append_value(&column.name)?; + storage.append_value(storage_value)?; + row_count.append_value(column.count())?; + if let Some(v) = column.stats.min_as_str() { + min_values.append_value(v)?; + } else { + min_values.append(false)?; + } + if let Some(v) = column.stats.max_as_str() { + max_values.append_value(v)?; + } else { + max_values.append(false)?; + } + + let size = column_index.remove(column.name.as_str()); + + memory_bytes.append_option(size)?; + } + } + + RecordBatch::try_new( + schema, + vec![ + Arc::new(partition_key.finish()) as ArrayRef, + Arc::new(chunk_id.finish()), + Arc::new(table_name.finish()), + Arc::new(column_name.finish()), + Arc::new(storage.finish()), + Arc::new(row_count.finish()), + Arc::new(min_values.finish()), + Arc::new(max_values.finish()), + Arc::new(memory_bytes.finish()), + ], + ) +} + +#[cfg(test)] +mod tests { + use arrow_util::assert_batches_eq; + use data_types::chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary}; + use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; + + use super::*; + + #[test] + fn test_from_partition_summaries() { + let partitions = vec![ + PartitionSummary { + key: "p1".to_string(), + table: TableSummary { + name: "t1".to_string(), + columns: vec![ + ColumnSummary { + name: "c1".to_string(), + influxdb_type: Some(InfluxDbType::Tag), + stats: Statistics::I64(StatValues::new_with_value(23)), + }, + ColumnSummary { + name: "c2".to_string(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::I64(StatValues::new_with_value(43)), + }, + ColumnSummary { + name: "c3".to_string(), + influxdb_type: None, + stats: Statistics::String(StatValues::new_with_value( + "foo".to_string(), + )), + }, + ColumnSummary { + name: "time".to_string(), + influxdb_type: Some(InfluxDbType::Timestamp), + stats: Statistics::I64(StatValues::new_with_value(43)), + }, + ], + }, + }, + PartitionSummary { + key: "p3".to_string(), + table: TableSummary { + name: "t1".to_string(), + columns: vec![], + }, + }, + ]; + + let expected = vec![ + "+---------------+------------+-------------+-------------+---------------+", + "| partition_key | table_name | column_name | column_type | influxdb_type |", + "+---------------+------------+-------------+-------------+---------------+", + "| p1 | t1 | c1 | I64 | Tag |", + "| p1 | t1 | c2 | I64 | Field |", + "| p1 | t1 | c3 | String | |", + "| p1 | t1 | time | I64 | Timestamp |", + "+---------------+------------+-------------+-------------+---------------+", + ]; + + let batch = from_partition_summaries(partition_summaries_schema(), partitions).unwrap(); + assert_batches_eq!(&expected, &[batch]); + } + + #[test] + fn test_assemble_chunk_columns() { + let lifecycle_action = None; + + let summaries = vec![ + ( + Arc::new(TableSummary { + name: "t1".to_string(), + columns: vec![ + ColumnSummary { + name: "c1".to_string(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::String(StatValues::new( + Some("bar".to_string()), + Some("foo".to_string()), + 55, + )), + }, + ColumnSummary { + name: "c2".to_string(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::F64(StatValues::new(Some(11.0), Some(43.0), 66)), + }, + ], + }), + DetailedChunkSummary { + inner: ChunkSummary { + partition_key: "p1".into(), + table_name: "t1".into(), + id: 42, + storage: ChunkStorage::ReadBuffer, + lifecycle_action, + memory_bytes: 23754, + object_store_bytes: 0, + row_count: 11, + time_of_first_write: None, + time_of_last_write: None, + time_closed: None, + }, + columns: vec![ + ChunkColumnSummary { + name: "c1".into(), + memory_bytes: 11, + }, + ChunkColumnSummary { + name: "c2".into(), + memory_bytes: 12, + }, + ], + }, + ), + ( + Arc::new(TableSummary { + name: "t1".to_string(), + columns: vec![ColumnSummary { + name: "c1".to_string(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::F64(StatValues::new(Some(110.0), Some(430.0), 667)), + }], + }), + DetailedChunkSummary { + inner: ChunkSummary { + partition_key: "p2".into(), + table_name: "t1".into(), + id: 43, + storage: ChunkStorage::OpenMutableBuffer, + lifecycle_action, + memory_bytes: 23754, + object_store_bytes: 0, + row_count: 11, + time_of_first_write: None, + time_of_last_write: None, + time_closed: None, + }, + columns: vec![ChunkColumnSummary { + name: "c1".into(), + memory_bytes: 100, + }], + }, + ), + ( + Arc::new(TableSummary { + name: "t2".to_string(), + columns: vec![ColumnSummary { + name: "c3".to_string(), + influxdb_type: Some(InfluxDbType::Field), + stats: Statistics::F64(StatValues::new(Some(-1.0), Some(2.0), 4)), + }], + }), + DetailedChunkSummary { + inner: ChunkSummary { + partition_key: "p2".into(), + table_name: "t2".into(), + id: 44, + storage: ChunkStorage::OpenMutableBuffer, + lifecycle_action, + memory_bytes: 23754, + object_store_bytes: 0, + row_count: 11, + time_of_first_write: None, + time_of_last_write: None, + time_closed: None, + }, + columns: vec![ChunkColumnSummary { + name: "c3".into(), + memory_bytes: 200, + }], + }, + ), + ]; + + let expected = vec![ + "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", + "| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | memory_bytes |", + "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", + "| p1 | 42 | t1 | c1 | ReadBuffer | 55 | bar | foo | 11 |", + "| p1 | 42 | t1 | c2 | ReadBuffer | 66 | 11 | 43 | 12 |", + "| p2 | 43 | t1 | c1 | OpenMutableBuffer | 667 | 110 | 430 | 100 |", + "| p2 | 44 | t2 | c3 | OpenMutableBuffer | 4 | -1 | 2 | 200 |", + "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+", + ]; + + let batch = assemble_chunk_columns(chunk_columns_schema(), summaries).unwrap(); + assert_batches_eq!(&expected, &[batch]); + } +} diff --git a/server/src/db/system_tables/operations.rs b/server/src/db/system_tables/operations.rs new file mode 100644 index 0000000000..d8b2af0ac2 --- /dev/null +++ b/server/src/db/system_tables/operations.rs @@ -0,0 +1,108 @@ +use std::sync::Arc; + +use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray, UInt32Array}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow::error::Result; +use arrow::record_batch::RecordBatch; + +use data_types::error::ErrorLogger; +use data_types::job::Job; +use tracker::TaskTracker; + +use crate::db::system_tables::IoxSystemTable; +use crate::JobRegistry; + +/// Implementation of system.operations table +#[derive(Debug)] +pub(super) struct OperationsTable { + schema: SchemaRef, + db_name: String, + jobs: Arc, +} + +impl OperationsTable { + pub(super) fn new(db_name: String, jobs: Arc) -> Self { + Self { + schema: operations_schema(), + db_name, + jobs, + } + } +} + +impl IoxSystemTable for OperationsTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn batch(&self) -> Result { + from_task_trackers(self.schema(), &self.db_name, self.jobs.tracked()) + .log_if_error("system.operations table") + } +} + +fn operations_schema() -> SchemaRef { + let ts = DataType::Time64(TimeUnit::Nanosecond); + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("status", DataType::Utf8, true), + Field::new("cpu_time_used", ts.clone(), true), + Field::new("wall_time_used", ts, true), + Field::new("partition_key", DataType::Utf8, true), + Field::new("chunk_id", DataType::UInt32, true), + Field::new("description", DataType::Utf8, true), + ])) +} + +fn from_task_trackers( + schema: SchemaRef, + db_name: &str, + jobs: Vec>, +) -> Result { + let jobs = jobs + .into_iter() + .filter(|job| job.metadata().db_name() == Some(db_name)) + .collect::>(); + + let ids = jobs + .iter() + .map(|job| Some(job.id().to_string())) + .collect::(); + let statuses = jobs + .iter() + .map(|job| Some(job.get_status().name())) + .collect::(); + let cpu_time_used = jobs + .iter() + .map(|job| job.get_status().cpu_nanos().map(|n| n as i64)) + .collect::(); + let wall_time_used = jobs + .iter() + .map(|job| job.get_status().wall_nanos().map(|n| n as i64)) + .collect::(); + let partition_keys = jobs + .iter() + .map(|job| job.metadata().partition_key()) + .collect::(); + let chunk_ids = jobs + .iter() + .map(|job| job.metadata().chunk_id()) + .collect::(); + let descriptions = jobs + .iter() + .map(|job| Some(job.metadata().description())) + .collect::(); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(ids) as ArrayRef, + Arc::new(statuses), + Arc::new(cpu_time_used), + Arc::new(wall_time_used), + Arc::new(partition_keys), + Arc::new(chunk_ids), + Arc::new(descriptions), + ], + ) +}