feat: add PersistenceWindows sytem table (#2030) (#2062)

* feat: add PersistenceWindows sytem table (#2030)

* chore: update log

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-07-20 14:10:57 +01:00 committed by GitHub
parent e4d2c51e8b
commit 091837420f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 220 additions and 36 deletions

View File

@ -1,25 +1,27 @@
-- Test Setup: OneMeasurementAllChunksDropped
-- SQL: SELECT * from information_schema.tables;
+---------------+--------------------+---------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------+------------+
+---------------+--------------------+---------------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | system | persistence_windows | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------------+------------+
-- SQL: SHOW TABLES;
+---------------+--------------------+---------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------+------------+
+---------------+--------------------+---------------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | system | persistence_windows | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------------+------------+

View File

@ -184,18 +184,19 @@ async fn sql_select_from_information_schema_tables() {
// validate we have access to information schema for listing table
// names
let expected = vec![
"+---------------+--------------------+---------------+------------+",
"| table_catalog | table_schema | table_name | table_type |",
"+---------------+--------------------+---------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | iox | h2o | BASE TABLE |",
"| public | iox | o2 | BASE TABLE |",
"| public | system | chunk_columns | BASE TABLE |",
"| public | system | chunks | BASE TABLE |",
"| public | system | columns | BASE TABLE |",
"| public | system | operations | BASE TABLE |",
"+---------------+--------------------+---------------+------------+",
"+---------------+--------------------+---------------------+------------+",
"| table_catalog | table_schema | table_name | table_type |",
"+---------------+--------------------+---------------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | iox | h2o | BASE TABLE |",
"| public | iox | o2 | BASE TABLE |",
"| public | system | chunk_columns | BASE TABLE |",
"| public | system | chunks | BASE TABLE |",
"| public | system | columns | BASE TABLE |",
"| public | system | operations | BASE TABLE |",
"| public | system | persistence_windows | BASE TABLE |",
"+---------------+--------------------+---------------------+------------+",
];
run_sql_test_case!(
TwoMeasurementsManyFields {},

View File

@ -6,7 +6,7 @@ use hashbrown::{HashMap, HashSet};
use data_types::chunk_metadata::ChunkSummary;
use data_types::chunk_metadata::DetailedChunkSummary;
use data_types::partition_metadata::{PartitionSummary, TableSummary};
use data_types::partition_metadata::{PartitionAddr, PartitionSummary, TableSummary};
use internal_types::schema::Schema;
use snafu::{OptionExt, Snafu};
use tracker::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
@ -15,6 +15,7 @@ use self::chunk::CatalogChunk;
use self::metrics::CatalogMetrics;
use self::partition::Partition;
use self::table::Table;
use data_types::write_summary::WriteSummary;
pub mod chunk;
mod metrics;
@ -225,6 +226,23 @@ impl Catalog {
.collect()
}
/// Returns a list of persistence window summaries for each partition
pub fn persistence_summaries(&self) -> Vec<(PartitionAddr, WriteSummary)> {
let mut summaries = Vec::new();
let tables = self.tables.read();
for table in tables.values() {
for partition in table.partitions() {
let partition = partition.read();
if let Some(w) = partition.persistence_windows() {
for summary in w.summaries() {
summaries.push((partition.addr().clone(), summary))
}
}
}
}
summaries
}
pub fn chunk_summaries(&self) -> Vec<ChunkSummary> {
let partition_key = None;
let table_names = TableNameFilter::AllTables;

View File

@ -31,6 +31,7 @@ use super::catalog::Catalog;
mod chunks;
mod columns;
mod operations;
mod persistence;
// The IOx system schema
pub const SYSTEM_SCHEMA: &str = "system";
@ -39,12 +40,14 @@ const CHUNKS: &str = "chunks";
const COLUMNS: &str = "columns";
const CHUNK_COLUMNS: &str = "chunk_columns";
const OPERATIONS: &str = "operations";
const PERSISTENCE_WINDOWS: &str = "persistence_windows";
pub struct SystemSchemaProvider {
chunks: Arc<dyn TableProvider>,
columns: Arc<dyn TableProvider>,
chunk_columns: Arc<dyn TableProvider>,
operations: Arc<dyn TableProvider>,
persistence_windows: Arc<dyn TableProvider>,
}
impl std::fmt::Debug for SystemSchemaProvider {
@ -65,16 +68,20 @@ impl SystemSchemaProvider {
inner: columns::ColumnsTable::new(Arc::clone(&catalog)),
});
let chunk_columns = Arc::new(SystemTableProvider {
inner: columns::ChunkColumnsTable::new(catalog),
inner: columns::ChunkColumnsTable::new(Arc::clone(&catalog)),
});
let operations = Arc::new(SystemTableProvider {
inner: operations::OperationsTable::new(db_name, jobs),
});
let persistence_windows = Arc::new(SystemTableProvider {
inner: persistence::PersistenceWindowsTable::new(catalog),
});
Self {
chunks,
columns,
chunk_columns,
operations,
persistence_windows,
}
}
}
@ -90,6 +97,7 @@ impl SchemaProvider for SystemSchemaProvider {
COLUMNS.to_string(),
CHUNK_COLUMNS.to_string(),
OPERATIONS.to_string(),
PERSISTENCE_WINDOWS.to_string(),
]
}
@ -99,6 +107,7 @@ impl SchemaProvider for SystemSchemaProvider {
COLUMNS => Some(Arc::clone(&self.columns)),
CHUNK_COLUMNS => Some(Arc::clone(&self.chunk_columns)),
OPERATIONS => Some(Arc::clone(&self.operations)),
PERSISTENCE_WINDOWS => Some(Arc::clone(&self.persistence_windows)),
_ => None,
}
}

View File

@ -0,0 +1,154 @@
use std::sync::Arc;
use arrow::array::{StringArray, TimestampNanosecondArray, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::error::ErrorLogger;
use data_types::partition_metadata::PartitionAddr;
use data_types::write_summary::WriteSummary;
use crate::db::catalog::Catalog;
use crate::db::system_tables::IoxSystemTable;
/// Implementation of system.persistence_windows table
#[derive(Debug)]
pub(super) struct PersistenceWindowsTable {
schema: SchemaRef,
catalog: Arc<Catalog>,
}
impl PersistenceWindowsTable {
pub(super) fn new(catalog: Arc<Catalog>) -> Self {
Self {
schema: persistence_windows_schema(),
catalog,
}
}
}
impl IoxSystemTable for PersistenceWindowsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_write_summaries(self.schema(), self.catalog.persistence_summaries())
.log_if_error("system.persistence_windows table")
}
}
fn persistence_windows_schema() -> SchemaRef {
let ts = DataType::Timestamp(TimeUnit::Nanosecond, None);
Arc::new(Schema::new(vec![
Field::new("partition_key", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("row_count", DataType::UInt64, false),
Field::new("time_of_first_write", ts.clone(), false),
Field::new("time_of_last_write", ts.clone(), false),
Field::new("min_timestamp", ts.clone(), false),
Field::new("max_timestamp", ts, false),
]))
}
fn from_write_summaries(
schema: SchemaRef,
chunks: Vec<(PartitionAddr, WriteSummary)>,
) -> Result<RecordBatch> {
let partition_key = chunks
.iter()
.map(|(addr, _)| Some(addr.partition_key.as_ref()))
.collect::<StringArray>();
let table_name = chunks
.iter()
.map(|(addr, _)| Some(addr.table_name.as_ref()))
.collect::<StringArray>();
let row_counts = chunks
.iter()
.map(|(_, w)| Some(w.row_count as u64))
.collect::<UInt64Array>();
let time_of_first_write = chunks
.iter()
.map(|(_, w)| Some(w.time_of_first_write.timestamp_nanos()))
.collect::<TimestampNanosecondArray>();
let time_of_last_write = chunks
.iter()
.map(|(_, w)| Some(w.time_of_last_write.timestamp_nanos()))
.collect::<TimestampNanosecondArray>();
let min_timestamp = chunks
.iter()
.map(|(_, w)| Some(w.min_timestamp.timestamp_nanos()))
.collect::<TimestampNanosecondArray>();
let max_timestamp = chunks
.iter()
.map(|(_, w)| Some(w.max_timestamp.timestamp_nanos()))
.collect::<TimestampNanosecondArray>();
RecordBatch::try_new(
schema,
vec![
Arc::new(partition_key),
Arc::new(table_name),
Arc::new(row_counts),
Arc::new(time_of_first_write),
Arc::new(time_of_last_write),
Arc::new(min_timestamp),
Arc::new(max_timestamp),
],
)
}
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use arrow_util::assert_batches_eq;
use super::*;
#[test]
fn test_from_write_summaries() {
let addr = PartitionAddr {
db_name: Arc::from("db"),
table_name: Arc::from("table"),
partition_key: Arc::from("partition"),
};
let summaries = vec![
(
addr.clone(),
WriteSummary {
time_of_first_write: Utc.timestamp_nanos(0),
time_of_last_write: Utc.timestamp_nanos(20),
min_timestamp: Utc.timestamp_nanos(50),
max_timestamp: Utc.timestamp_nanos(60),
row_count: 320,
},
),
(
addr,
WriteSummary {
time_of_first_write: Utc.timestamp_nanos(6),
time_of_last_write: Utc.timestamp_nanos(21),
min_timestamp: Utc.timestamp_nanos(1),
max_timestamp: Utc.timestamp_nanos(2),
row_count: 2,
},
),
];
let expected = vec![
"+---------------+------------+-----------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+",
"| partition_key | table_name | row_count | time_of_first_write | time_of_last_write | min_timestamp | max_timestamp |",
"+---------------+------------+-----------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+",
"| partition | table | 320 | 1970-01-01 00:00:00 | 1970-01-01 00:00:00.000000020 | 1970-01-01 00:00:00.000000050 | 1970-01-01 00:00:00.000000060 |",
"| partition | table | 2 | 1970-01-01 00:00:00.000000006 | 1970-01-01 00:00:00.000000021 | 1970-01-01 00:00:00.000000001 | 1970-01-01 00:00:00.000000002 |",
"+---------------+------------+-----------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+",
];
let schema = persistence_windows_schema();
let batch = from_write_summaries(schema, summaries).unwrap();
assert_batches_eq!(&expected, &[batch]);
}
}