Merge branch 'main' into ntran/parquet_write_2

pull/24376/head
kodiakhq[bot] 2021-04-01 21:54:19 +00:00 committed by GitHub
commit eba799e4e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 419 additions and 21 deletions

View File

@ -10,7 +10,7 @@ use generated_types::{google::FieldViolation, influxdata::iox::management::v1 as
use serde::{Deserialize, Serialize};
/// Which storage system is a chunk located in?
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)]
pub enum ChunkStorage {
/// The chunk is still open for new writes, in the Mutable Buffer
OpenMutableBuffer,
@ -25,6 +25,18 @@ pub enum ChunkStorage {
ObjectStore,
}
impl ChunkStorage {
/// Return a str representation of this storage state
pub fn as_str(&self) -> &'static str {
match self {
Self::OpenMutableBuffer => "OpenMutableBuffer",
Self::ClosedMutableBuffer => "ClosedMutableBuffer",
Self::ReadBuffer => "ReadBuffer",
Self::ObjectStore => "ObjectStore",
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
/// Represents metadata about a chunk in a database.
/// A chunk can contain one or more tables.

View File

@ -27,16 +27,22 @@ use parquet_file::chunk::{Chunk, MemWriter};
use query::{Database, DEFAULT_SCHEMA};
use read_buffer::Database as ReadBufferDb;
use crate::db::lifecycle::LifecycleManager;
use crate::tracker::{TrackedFutureExt, Tracker};
use crate::{buffer::Buffer, JobRegistry};
use super::{
buffer::Buffer,
tracker::{TrackedFutureExt, Tracker},
JobRegistry,
};
use data_types::job::Job;
use lifecycle::LifecycleManager;
use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA};
pub mod catalog;
mod chunk;
mod lifecycle;
pub mod pred;
mod streams;
mod system_tables;
#[derive(Debug, Snafu)]
pub enum Error {
@ -132,7 +138,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
const STARTING_SEQUENCE: u64 = 1;
#[derive(Debug)]
/// This is the main IOx Database object. It is the root object of any
/// specific InfluxDB IOx instance
///
@ -188,6 +193,7 @@ const STARTING_SEQUENCE: u64 = 1;
/// manipulating the catalog state alongside the state in the `Db`
/// itself. The catalog state can be observed (but not mutated) by things
/// outside of the Db
#[derive(Debug)]
pub struct Db {
pub rules: RwLock<DatabaseRules>,
@ -206,6 +212,9 @@ pub struct Db {
/// A handle to the global jobs registry for long running tasks
jobs: Arc<JobRegistry>,
/// The system schema provider
system_tables: Arc<SystemSchemaProvider>,
sequence: AtomicU64,
worker_iterations: AtomicUsize,
@ -222,12 +231,14 @@ impl Db {
let wal_buffer = wal_buffer.map(Mutex::new);
let read_buffer = Arc::new(read_buffer);
let catalog = Arc::new(Catalog::new());
let system_tables = Arc::new(SystemSchemaProvider::new(Arc::clone(&catalog)));
Self {
rules,
catalog,
read_buffer,
wal_buffer,
jobs,
system_tables,
sequence: AtomicU64::new(STARTING_SEQUENCE),
worker_iterations: AtomicUsize::new(0),
}
@ -698,13 +709,17 @@ impl CatalogProvider for Db {
}
fn schema_names(&self) -> Vec<String> {
vec![DEFAULT_SCHEMA.to_string()]
vec![
DEFAULT_SCHEMA.to_string(),
system_tables::SYSTEM_SCHEMA.to_string(),
]
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
info!(%name, "using schema");
match name {
DEFAULT_SCHEMA => Some(Arc::<Catalog>::clone(&self.catalog)),
SYSTEM_SCHEMA => Some(Arc::<SystemSchemaProvider>::clone(&self.system_tables)),
_ => None,
}
}

View File

@ -13,6 +13,7 @@ use chunk::Chunk;
use data_types::chunk::ChunkSummary;
use data_types::database_rules::{Order, Sort, SortOrder};
use data_types::error::ErrorLogger;
use data_types::partition_metadata::PartitionSummary;
use internal_types::selection::Selection;
use partition::Partition;
use query::{
@ -123,6 +124,15 @@ impl Catalog {
.context(UnknownPartition { partition_key })
}
/// Returns a list of partition summaries
pub fn partition_summaries(&self) -> Vec<PartitionSummary> {
self.partitions
.read()
.values()
.map(|partition| partition.read().summary())
.collect()
}
/// Returns all chunks within the catalog in an arbitrary order
pub fn chunks(&self) -> Vec<Arc<RwLock<Chunk>>> {
let mut chunks = Vec::new();

View File

@ -0,0 +1,281 @@
use std::any::Any;
use std::convert::AsRef;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use arrow_deps::{
arrow::{
array::{Array, StringBuilder, TimestampNanosecondBuilder, UInt32Builder, UInt64Builder},
datatypes::{Field, Schema},
error::Result,
record_batch::RecordBatch,
},
datafusion::{
catalog::schema::SchemaProvider,
datasource::{MemTable, TableProvider},
},
};
use data_types::{chunk::ChunkSummary, error::ErrorLogger, partition_metadata::PartitionSummary};
use super::catalog::Catalog;
// The IOx system schema
pub const SYSTEM_SCHEMA: &str = "system";
const CHUNKS: &str = "chunks";
const COLUMNS: &str = "columns";
#[derive(Debug)]
pub struct SystemSchemaProvider {
catalog: Arc<Catalog>,
}
impl SystemSchemaProvider {
pub fn new(catalog: Arc<Catalog>) -> Self {
Self { catalog }
}
}
impl SchemaProvider for SystemSchemaProvider {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
fn table_names(&self) -> Vec<String> {
vec![CHUNKS.to_string(), COLUMNS.to_string()]
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
// TODO: Use of a MemTable potentially results in materializing redundant data
let batch = match name {
CHUNKS => from_chunk_summaries(self.catalog.chunk_summaries())
.log_if_error("chunks table")
.ok()?,
COLUMNS => from_partition_summaries(self.catalog.partition_summaries())
.log_if_error("chunks table")
.ok()?,
_ => return None,
};
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])
.log_if_error("constructing chunks system table")
.ok()?;
Some(Arc::<MemTable>::new(table))
}
}
fn append_time(
builder: &mut TimestampNanosecondBuilder,
time: Option<DateTime<Utc>>,
) -> Result<()> {
match time {
Some(time) => builder.append_value(time.timestamp_nanos()),
None => builder.append_null(),
}
}
// TODO: Use a custom proc macro or serde to reduce the boilerplate
fn from_chunk_summaries(chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
let mut id = UInt32Builder::new(chunks.len());
let mut partition_key = StringBuilder::new(chunks.len());
let mut storage = StringBuilder::new(chunks.len());
let mut estimated_bytes = UInt64Builder::new(chunks.len());
let mut time_of_first_write = TimestampNanosecondBuilder::new(chunks.len());
let mut time_of_last_write = TimestampNanosecondBuilder::new(chunks.len());
let mut time_closing = TimestampNanosecondBuilder::new(chunks.len());
for chunk in chunks {
id.append_value(chunk.id)?;
partition_key.append_value(chunk.partition_key.as_ref())?;
storage.append_value(chunk.storage.as_str())?;
estimated_bytes.append_value(chunk.estimated_bytes as u64)?;
append_time(&mut time_of_first_write, chunk.time_of_first_write)?;
append_time(&mut time_of_last_write, chunk.time_of_last_write)?;
append_time(&mut time_closing, chunk.time_closing)?;
}
let id = id.finish();
let partition_key = partition_key.finish();
let storage = storage.finish();
let estimated_bytes = estimated_bytes.finish();
let time_of_first_write = time_of_first_write.finish();
let time_of_last_write = time_of_last_write.finish();
let time_closing = time_closing.finish();
let schema = Schema::new(vec![
Field::new("id", id.data_type().clone(), false),
Field::new("partition_key", partition_key.data_type().clone(), false),
Field::new("storage", storage.data_type().clone(), false),
Field::new("estimated_bytes", estimated_bytes.data_type().clone(), true),
Field::new(
"time_of_first_write",
time_of_first_write.data_type().clone(),
true,
),
Field::new(
"time_of_last_write",
time_of_last_write.data_type().clone(),
true,
),
Field::new("time_closing", time_closing.data_type().clone(), true),
]);
RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(id),
Arc::new(partition_key),
Arc::new(storage),
Arc::new(estimated_bytes),
Arc::new(time_of_first_write),
Arc::new(time_of_last_write),
Arc::new(time_closing),
],
)
}
fn from_partition_summaries(partitions: Vec<PartitionSummary>) -> Result<RecordBatch> {
// Assume each partition has roughly 5 tables with 5 columns
let row_estimate = partitions.len() * 25;
let mut partition_key = StringBuilder::new(row_estimate);
let mut table_name = StringBuilder::new(row_estimate);
let mut column_name = StringBuilder::new(row_estimate);
let mut count = UInt64Builder::new(row_estimate);
// Note no rows are produced for partitions with no tabes, or
// tables with no columns: There are other tables to list tables
// and columns
for partition in partitions {
for table in partition.tables {
for column in table.columns {
partition_key.append_value(&partition.key)?;
table_name.append_value(&table.name)?;
column_name.append_value(&column.name)?;
count.append_value(column.count())?;
}
}
}
let partition_key = partition_key.finish();
let table_name = table_name.finish();
let column_name = column_name.finish();
let count = count.finish();
let schema = Schema::new(vec![
Field::new("partition_key", partition_key.data_type().clone(), false),
Field::new("table_name", table_name.data_type().clone(), true),
Field::new("column_name", column_name.data_type().clone(), true),
Field::new("count", count.data_type().clone(), true),
]);
RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(partition_key),
Arc::new(table_name),
Arc::new(column_name),
Arc::new(count),
],
)
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_deps::assert_table_eq;
use chrono::NaiveDateTime;
use data_types::chunk::ChunkStorage;
use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics, TableSummary};
#[test]
fn test_from_chunk_summaries() {
let chunks = vec![
ChunkSummary {
partition_key: Arc::new("".to_string()),
id: 0,
storage: ChunkStorage::OpenMutableBuffer,
estimated_bytes: 23754,
time_of_first_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(10, 0),
Utc,
)),
time_of_last_write: None,
time_closing: None,
},
ChunkSummary {
partition_key: Arc::new("".to_string()),
id: 0,
storage: ChunkStorage::OpenMutableBuffer,
estimated_bytes: 23454,
time_of_first_write: None,
time_of_last_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(80, 0),
Utc,
)),
time_closing: None,
},
];
let expected = vec![
"+----+---------------+-------------------+-----------------+---------------------+---------------------+--------------+",
"| id | partition_key | storage | estimated_bytes | time_of_first_write | time_of_last_write | time_closing |",
"+----+---------------+-------------------+-----------------+---------------------+---------------------+--------------+",
"| 0 | | OpenMutableBuffer | 23754 | 1970-01-01 00:00:10 | | |",
"| 0 | | OpenMutableBuffer | 23454 | | 1970-01-01 00:01:20 | |",
"+----+---------------+-------------------+-----------------+---------------------+---------------------+--------------+",
];
let batch = from_chunk_summaries(chunks).unwrap();
assert_table_eq!(&expected, &[batch]);
}
#[test]
fn test_from_partition_summaries() {
let partitions = vec![
PartitionSummary {
key: "p1".to_string(),
tables: vec![TableSummary {
name: "t1".to_string(),
columns: vec![
ColumnSummary {
name: "c1".to_string(),
stats: Statistics::I64(StatValues::new(23)),
},
ColumnSummary {
name: "c2".to_string(),
stats: Statistics::I64(StatValues::new(43)),
},
],
}],
},
PartitionSummary {
key: "p2".to_string(),
tables: vec![],
},
PartitionSummary {
key: "p3".to_string(),
tables: vec![TableSummary {
name: "t1".to_string(),
columns: vec![],
}],
},
];
let expected = vec![
"+---------------+------------+-------------+-------+",
"| partition_key | table_name | column_name | count |",
"+---------------+------------+-------------+-------+",
"| p1 | t1 | c1 | 1 |",
"| p1 | t1 | c2 | 1 |",
"+---------------+------------+-------------+-------+",
];
let batch = from_partition_summaries(partitions).unwrap();
assert_table_eq!(&expected, &[batch]);
}
}

View File

@ -168,6 +168,29 @@ impl DBSetup for TwoMeasurementsManyFields {
}
}
pub struct TwoMeasurementsManyFieldsOneChunk {}
#[async_trait]
impl DBSetup for TwoMeasurementsManyFieldsOneChunk {
async fn make(&self) -> Vec<DBScenario> {
let db = make_db();
let mut writer = TestLPWriter::default();
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
"h2o,state=CA,city=Boston other_temp=72.4 350",
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
"o2,state=CA temp=79.0 300",
];
writer.write_lp_string(&db, &lp_lines.join("\n")).unwrap();
vec![DBScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
}]
}
}
pub struct OneMeasurementManyFields {}
#[async_trait]
impl DBSetup for OneMeasurementManyFields {

View File

@ -190,6 +190,8 @@ async fn sql_select_from_information_schema_tables() {
"+---------------+--------------------+------------+------------+",
"| public | iox | h2o | BASE TABLE |",
"| public | iox | o2 | BASE TABLE |",
"| public | system | chunks | BASE TABLE |",
"| public | system | columns | BASE TABLE |",
"| public | information_schema | tables | VIEW |",
"| public | information_schema | columns | VIEW |",
"+---------------+--------------------+------------+------------+",
@ -207,21 +209,32 @@ async fn sql_select_from_information_schema_columns() {
// validate we have access to information schema for listing columns
// names
let expected = vec![
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
"| table_catalog | table_schema | table_name | column_name | ordinal_position | column_default | is_nullable | data_type | character_maximum_length | character_octet_length | numeric_precision | numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
"| public | iox | h2o | city | 0 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | iox | h2o | moisture | 1 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | h2o | other_temp | 2 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | h2o | state | 3 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | iox | h2o | temp | 4 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | h2o | time | 5 | | NO | Int64 | | | | | | | |",
"| public | iox | o2 | city | 0 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | iox | o2 | reading | 1 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | o2 | state | 2 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | iox | o2 | temp | 3 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | o2 | time | 4 | | NO | Int64 | | | | | | | |",
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
"+---------------+--------------+------------+---------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
"| table_catalog | table_schema | table_name | column_name | ordinal_position | column_default | is_nullable | data_type | character_maximum_length | character_octet_length | numeric_precision | numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
"+---------------+--------------+------------+---------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
"| public | iox | h2o | city | 0 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | iox | h2o | moisture | 1 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | h2o | other_temp | 2 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | h2o | state | 3 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | iox | h2o | temp | 4 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | h2o | time | 5 | | NO | Int64 | | | | | | | |",
"| public | iox | o2 | city | 0 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | iox | o2 | reading | 1 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | o2 | state | 2 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | iox | o2 | temp | 3 | | YES | Float64 | | | 24 | 2 | | | |",
"| public | iox | o2 | time | 4 | | NO | Int64 | | | | | | | |",
"| public | system | chunks | id | 0 | | NO | UInt32 | | | 32 | 2 | | | |",
"| public | system | chunks | partition_key | 1 | | NO | Utf8 | | 2147483647 | | | | | |",
"| public | system | chunks | storage | 2 | | NO | Utf8 | | 2147483647 | | | | | |",
"| public | system | chunks | estimated_bytes | 3 | | YES | UInt64 | | | | | | | |",
"| public | system | chunks | time_of_first_write | 4 | | YES | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | chunks | time_of_last_write | 5 | | YES | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | chunks | time_closing | 6 | | YES | Timestamp(Nanosecond, None) | | | | | | | |",
"| public | system | columns | partition_key | 0 | | NO | Utf8 | | 2147483647 | | | | | |",
"| public | system | columns | table_name | 1 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | system | columns | column_name | 2 | | YES | Utf8 | | 2147483647 | | | | | |",
"| public | system | columns | count | 3 | | YES | UInt64 | | | | | | | |",
"+---------------+--------------+------------+---------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
];
run_sql_test_case!(
TwoMeasurementsManyFields {},
@ -230,6 +243,50 @@ async fn sql_select_from_information_schema_columns() {
);
}
#[tokio::test]
async fn sql_select_from_system_tables() {
// system tables reflect the state of chunks, so don't run them
// with different chunk configurations.
// ensures the tables / plumbing are hooked up (so no need to
// test timestamps, etc)
let expected = vec![
"+----+---------------+-------------------+-----------------+",
"| id | partition_key | storage | estimated_bytes |",
"+----+---------------+-------------------+-----------------+",
"| 0 | 1970-01-01T00 | OpenMutableBuffer | 493 |",
"+----+---------------+-------------------+-----------------+",
];
run_sql_test_case!(
TwoMeasurementsManyFieldsOneChunk {},
"SELECT id, partition_key, storage, estimated_bytes from system.chunks",
&expected
);
let expected = vec![
"+---------------+------------+-------------+-------+",
"| partition_key | table_name | column_name | count |",
"+---------------+------------+-------------+-------+",
"| 1970-01-01T00 | h2o | state | 3 |",
"| 1970-01-01T00 | h2o | city | 3 |",
"| 1970-01-01T00 | h2o | temp | 1 |",
"| 1970-01-01T00 | h2o | time | 3 |",
"| 1970-01-01T00 | h2o | other_temp | 2 |",
"| 1970-01-01T00 | o2 | state | 2 |",
"| 1970-01-01T00 | o2 | city | 1 |",
"| 1970-01-01T00 | o2 | temp | 2 |",
"| 1970-01-01T00 | o2 | time | 2 |",
"| 1970-01-01T00 | o2 | reading | 1 |",
"+---------------+------------+-------------+-------+",
];
run_sql_test_case!(
TwoMeasurementsManyFieldsOneChunk {},
"SELECT * from system.columns",
&expected
);
}
#[tokio::test]
async fn sql_union_all() {
// validate name resolution works for UNION ALL queries