diff --git a/query_tests/cases/in/all_chunks_dropped.expected b/query_tests/cases/in/all_chunks_dropped.expected index 5febb4d2e9..65e17df50a 100644 --- a/query_tests/cases/in/all_chunks_dropped.expected +++ b/query_tests/cases/in/all_chunks_dropped.expected @@ -1,25 +1,27 @@ -- Test Setup: OneMeasurementAllChunksDropped -- SQL: SELECT * from information_schema.tables; -+---------------+--------------------+---------------+------------+ -| table_catalog | table_schema | table_name | table_type | -+---------------+--------------------+---------------+------------+ -| public | iox | h2o | BASE TABLE | -| public | system | chunks | BASE TABLE | -| public | system | columns | BASE TABLE | -| public | system | chunk_columns | BASE TABLE | -| public | system | operations | BASE TABLE | -| public | information_schema | tables | VIEW | -| public | information_schema | columns | VIEW | -+---------------+--------------------+---------------+------------+ ++---------------+--------------------+---------------------+------------+ +| table_catalog | table_schema | table_name | table_type | ++---------------+--------------------+---------------------+------------+ +| public | iox | h2o | BASE TABLE | +| public | system | chunks | BASE TABLE | +| public | system | columns | BASE TABLE | +| public | system | chunk_columns | BASE TABLE | +| public | system | operations | BASE TABLE | +| public | system | persistence_windows | BASE TABLE | +| public | information_schema | tables | VIEW | +| public | information_schema | columns | VIEW | ++---------------+--------------------+---------------------+------------+ -- SQL: SHOW TABLES; -+---------------+--------------------+---------------+------------+ -| table_catalog | table_schema | table_name | table_type | -+---------------+--------------------+---------------+------------+ -| public | iox | h2o | BASE TABLE | -| public | system | chunks | BASE TABLE | -| public | system | columns | BASE TABLE | -| public | system | chunk_columns | BASE TABLE | -| public | system | operations | BASE TABLE | -| public | information_schema | tables | VIEW | -| public | information_schema | columns | VIEW | -+---------------+--------------------+---------------+------------+ ++---------------+--------------------+---------------------+------------+ +| table_catalog | table_schema | table_name | table_type | ++---------------+--------------------+---------------------+------------+ +| public | iox | h2o | BASE TABLE | +| public | system | chunks | BASE TABLE | +| public | system | columns | BASE TABLE | +| public | system | chunk_columns | BASE TABLE | +| public | system | operations | BASE TABLE | +| public | system | persistence_windows | BASE TABLE | +| public | information_schema | tables | VIEW | +| public | information_schema | columns | VIEW | ++---------------+--------------------+---------------------+------------+ diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 58072c5a97..6fc2c13550 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -184,18 +184,19 @@ async fn sql_select_from_information_schema_tables() { // validate we have access to information schema for listing table // names let expected = vec![ - "+---------------+--------------------+---------------+------------+", - "| table_catalog | table_schema | table_name | table_type |", - "+---------------+--------------------+---------------+------------+", - "| public | information_schema | columns | VIEW |", - "| public | information_schema | tables | VIEW |", - "| public | iox | h2o | BASE TABLE |", - "| public | iox | o2 | BASE TABLE |", - "| public | system | chunk_columns | BASE TABLE |", - "| public | system | chunks | BASE TABLE |", - "| public | system | columns | BASE TABLE |", - "| public | system | operations | BASE TABLE |", - "+---------------+--------------------+---------------+------------+", + "+---------------+--------------------+---------------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+---------------------+------------+", + "| public | information_schema | columns | VIEW |", + "| public | information_schema | tables | VIEW |", + "| public | iox | h2o | BASE TABLE |", + "| public | iox | o2 | BASE TABLE |", + "| public | system | chunk_columns | BASE TABLE |", + "| public | system | chunks | BASE TABLE |", + "| public | system | columns | BASE TABLE |", + "| public | system | operations | BASE TABLE |", + "| public | system | persistence_windows | BASE TABLE |", + "+---------------+--------------------+---------------------+------------+", ]; run_sql_test_case!( TwoMeasurementsManyFields {}, diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index c2953a3de6..dff3c37b6b 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -6,7 +6,7 @@ use hashbrown::{HashMap, HashSet}; use data_types::chunk_metadata::ChunkSummary; use data_types::chunk_metadata::DetailedChunkSummary; -use data_types::partition_metadata::{PartitionSummary, TableSummary}; +use data_types::partition_metadata::{PartitionAddr, PartitionSummary, TableSummary}; use internal_types::schema::Schema; use snafu::{OptionExt, Snafu}; use tracker::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; @@ -15,6 +15,7 @@ use self::chunk::CatalogChunk; use self::metrics::CatalogMetrics; use self::partition::Partition; use self::table::Table; +use data_types::write_summary::WriteSummary; pub mod chunk; mod metrics; @@ -225,6 +226,23 @@ impl Catalog { .collect() } + /// Returns a list of persistence window summaries for each partition + pub fn persistence_summaries(&self) -> Vec<(PartitionAddr, WriteSummary)> { + let mut summaries = Vec::new(); + let tables = self.tables.read(); + for table in tables.values() { + for partition in table.partitions() { + let partition = partition.read(); + if let Some(w) = partition.persistence_windows() { + for summary in w.summaries() { + summaries.push((partition.addr().clone(), summary)) + } + } + } + } + summaries + } + pub fn chunk_summaries(&self) -> Vec { let partition_key = None; let table_names = TableNameFilter::AllTables; diff --git a/server/src/db/system_tables.rs b/server/src/db/system_tables.rs index f83c793fa5..bcc474e230 100644 --- a/server/src/db/system_tables.rs +++ b/server/src/db/system_tables.rs @@ -31,6 +31,7 @@ use super::catalog::Catalog; mod chunks; mod columns; mod operations; +mod persistence; // The IOx system schema pub const SYSTEM_SCHEMA: &str = "system"; @@ -39,12 +40,14 @@ const CHUNKS: &str = "chunks"; const COLUMNS: &str = "columns"; const CHUNK_COLUMNS: &str = "chunk_columns"; const OPERATIONS: &str = "operations"; +const PERSISTENCE_WINDOWS: &str = "persistence_windows"; pub struct SystemSchemaProvider { chunks: Arc, columns: Arc, chunk_columns: Arc, operations: Arc, + persistence_windows: Arc, } impl std::fmt::Debug for SystemSchemaProvider { @@ -65,16 +68,20 @@ impl SystemSchemaProvider { inner: columns::ColumnsTable::new(Arc::clone(&catalog)), }); let chunk_columns = Arc::new(SystemTableProvider { - inner: columns::ChunkColumnsTable::new(catalog), + inner: columns::ChunkColumnsTable::new(Arc::clone(&catalog)), }); let operations = Arc::new(SystemTableProvider { inner: operations::OperationsTable::new(db_name, jobs), }); + let persistence_windows = Arc::new(SystemTableProvider { + inner: persistence::PersistenceWindowsTable::new(catalog), + }); Self { chunks, columns, chunk_columns, operations, + persistence_windows, } } } @@ -90,6 +97,7 @@ impl SchemaProvider for SystemSchemaProvider { COLUMNS.to_string(), CHUNK_COLUMNS.to_string(), OPERATIONS.to_string(), + PERSISTENCE_WINDOWS.to_string(), ] } @@ -99,6 +107,7 @@ impl SchemaProvider for SystemSchemaProvider { COLUMNS => Some(Arc::clone(&self.columns)), CHUNK_COLUMNS => Some(Arc::clone(&self.chunk_columns)), OPERATIONS => Some(Arc::clone(&self.operations)), + PERSISTENCE_WINDOWS => Some(Arc::clone(&self.persistence_windows)), _ => None, } } diff --git a/server/src/db/system_tables/persistence.rs b/server/src/db/system_tables/persistence.rs new file mode 100644 index 0000000000..3392ff5032 --- /dev/null +++ b/server/src/db/system_tables/persistence.rs @@ -0,0 +1,154 @@ +use std::sync::Arc; + +use arrow::array::{StringArray, TimestampNanosecondArray, UInt64Array}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow::error::Result; +use arrow::record_batch::RecordBatch; + +use data_types::error::ErrorLogger; +use data_types::partition_metadata::PartitionAddr; +use data_types::write_summary::WriteSummary; + +use crate::db::catalog::Catalog; +use crate::db::system_tables::IoxSystemTable; + +/// Implementation of system.persistence_windows table +#[derive(Debug)] +pub(super) struct PersistenceWindowsTable { + schema: SchemaRef, + catalog: Arc, +} + +impl PersistenceWindowsTable { + pub(super) fn new(catalog: Arc) -> Self { + Self { + schema: persistence_windows_schema(), + catalog, + } + } +} + +impl IoxSystemTable for PersistenceWindowsTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn batch(&self) -> Result { + from_write_summaries(self.schema(), self.catalog.persistence_summaries()) + .log_if_error("system.persistence_windows table") + } +} + +fn persistence_windows_schema() -> SchemaRef { + let ts = DataType::Timestamp(TimeUnit::Nanosecond, None); + Arc::new(Schema::new(vec![ + Field::new("partition_key", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("row_count", DataType::UInt64, false), + Field::new("time_of_first_write", ts.clone(), false), + Field::new("time_of_last_write", ts.clone(), false), + Field::new("min_timestamp", ts.clone(), false), + Field::new("max_timestamp", ts, false), + ])) +} + +fn from_write_summaries( + schema: SchemaRef, + chunks: Vec<(PartitionAddr, WriteSummary)>, +) -> Result { + let partition_key = chunks + .iter() + .map(|(addr, _)| Some(addr.partition_key.as_ref())) + .collect::(); + let table_name = chunks + .iter() + .map(|(addr, _)| Some(addr.table_name.as_ref())) + .collect::(); + let row_counts = chunks + .iter() + .map(|(_, w)| Some(w.row_count as u64)) + .collect::(); + let time_of_first_write = chunks + .iter() + .map(|(_, w)| Some(w.time_of_first_write.timestamp_nanos())) + .collect::(); + let time_of_last_write = chunks + .iter() + .map(|(_, w)| Some(w.time_of_last_write.timestamp_nanos())) + .collect::(); + let min_timestamp = chunks + .iter() + .map(|(_, w)| Some(w.min_timestamp.timestamp_nanos())) + .collect::(); + let max_timestamp = chunks + .iter() + .map(|(_, w)| Some(w.max_timestamp.timestamp_nanos())) + .collect::(); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(partition_key), + Arc::new(table_name), + Arc::new(row_counts), + Arc::new(time_of_first_write), + Arc::new(time_of_last_write), + Arc::new(min_timestamp), + Arc::new(max_timestamp), + ], + ) +} + +#[cfg(test)] +mod tests { + use chrono::{TimeZone, Utc}; + + use arrow_util::assert_batches_eq; + + use super::*; + + #[test] + fn test_from_write_summaries() { + let addr = PartitionAddr { + db_name: Arc::from("db"), + table_name: Arc::from("table"), + partition_key: Arc::from("partition"), + }; + + let summaries = vec![ + ( + addr.clone(), + WriteSummary { + time_of_first_write: Utc.timestamp_nanos(0), + time_of_last_write: Utc.timestamp_nanos(20), + min_timestamp: Utc.timestamp_nanos(50), + max_timestamp: Utc.timestamp_nanos(60), + row_count: 320, + }, + ), + ( + addr, + WriteSummary { + time_of_first_write: Utc.timestamp_nanos(6), + time_of_last_write: Utc.timestamp_nanos(21), + min_timestamp: Utc.timestamp_nanos(1), + max_timestamp: Utc.timestamp_nanos(2), + row_count: 2, + }, + ), + ]; + + let expected = vec![ + "+---------------+------------+-----------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+", + "| partition_key | table_name | row_count | time_of_first_write | time_of_last_write | min_timestamp | max_timestamp |", + "+---------------+------------+-----------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+", + "| partition | table | 320 | 1970-01-01 00:00:00 | 1970-01-01 00:00:00.000000020 | 1970-01-01 00:00:00.000000050 | 1970-01-01 00:00:00.000000060 |", + "| partition | table | 2 | 1970-01-01 00:00:00.000000006 | 1970-01-01 00:00:00.000000021 | 1970-01-01 00:00:00.000000001 | 1970-01-01 00:00:00.000000002 |", + "+---------------+------------+-----------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+", + ]; + + let schema = persistence_windows_schema(); + let batch = from_write_summaries(schema, summaries).unwrap(); + assert_batches_eq!(&expected, &[batch]); + } +}