feat: store schemas per table

This way we can:

- check for schema matches even for writes going into different
  partitions
- solve #1768 and #1884 in some future PR

Closes #1897.
pull/24376/head
Marco Neumann 2021-07-07 15:48:46 +02:00
parent 5ca9760c94
commit b528ac2b55
5 changed files with 156 additions and 34 deletions

View File

@ -143,6 +143,15 @@ pub enum Error {
path: DirsAndFileName,
},
#[snafu(
display("Schema for {:?} does not work with existing schema: {}", path, source),
visibility(pub)
)]
SchemaError {
source: Box<dyn std::error::Error + Send + Sync>,
path: DirsAndFileName,
},
#[snafu(
display("Cannot create parquet chunk from {:?}: {}", path, source),
visibility(pub)

View File

@ -1,6 +1,7 @@
//! This module contains the main IOx Database object which has the
//! instances of the mutable buffer, read buffer, and object store
use crate::db::catalog::table::TableSchemaUpsertHandle;
pub(crate) use crate::db::chunk::DbChunk;
use crate::db::lifecycle::ArcDb;
use crate::{
@ -113,6 +114,16 @@ pub enum Error {
#[snafu(display("error finding min/max time on table batch: {}", source))]
TableBatchTimeError { source: entry::Error },
#[snafu(display("Table batch has invalid schema: {}", source))]
TableBatchSchemaExtractError {
source: internal_types::schema::builder::Error,
},
#[snafu(display("Table batch has mismatching schema: {}", source))]
TableBatchSchemaMergeError {
source: internal_types::schema::merge::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -643,10 +654,33 @@ impl Db {
continue;
}
let partition = self
let (partition, table_schema) = self
.catalog
.get_or_create_partition(table_batch.name(), partition_key);
let batch_schema =
match table_batch.schema().context(TableBatchSchemaExtractError) {
Ok(batch_schema) => batch_schema,
Err(e) => {
if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY {
errors.push(e);
}
continue;
}
};
let schema_handle =
match TableSchemaUpsertHandle::new(&table_schema, &batch_schema)
.context(TableBatchSchemaMergeError)
{
Ok(schema_handle) => schema_handle,
Err(e) => {
if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY {
errors.push(e);
}
continue;
}
};
let mut partition = partition.write();
let (min_time, max_time) =
@ -702,6 +736,8 @@ impl Db {
};
partition.update_last_write_at();
schema_handle.commit();
match partition.persistence_windows() {
Some(windows) => {
windows.add_range(
@ -844,6 +880,7 @@ mod tests {
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use data_types::{
chunk_metadata::ChunkStorage,
database_rules::{PartitionTemplate, TemplatePart},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
};
use entry::test_helpers::lp_to_entry;
@ -2742,6 +2779,77 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=1 10").await;
}
#[tokio::test]
async fn table_wide_schema_enforcement() {
// need a table with a partition template that uses a tag column, so that we can easily write to different partitions
let test_db = TestDb::builder()
.partition_template(PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
})
.build()
.await;
let db = test_db.db;
// first write should create schema
try_write_lp(&db, "my_table,tag_partition_by=a field_integer=1 10")
.await
.unwrap();
// other writes are allowed to evolve the schema
try_write_lp(&db, "my_table,tag_partition_by=a field_string=\"foo\" 10")
.await
.unwrap();
try_write_lp(&db, "my_table,tag_partition_by=b field_float=1.1 10")
.await
.unwrap();
// check that we have the expected partitions
let mut partition_keys = db.partition_keys().unwrap();
partition_keys.sort();
assert_eq!(
partition_keys,
vec![
"tag_partition_by_a".to_string(),
"tag_partition_by_b".to_string(),
]
);
// illegal changes
try_write_lp(&db, "my_table,tag_partition_by=a field_integer=\"foo\" 10")
.await
.unwrap_err();
try_write_lp(&db, "my_table,tag_partition_by=b field_integer=\"foo\" 10")
.await
.unwrap_err();
try_write_lp(&db, "my_table,tag_partition_by=c field_integer=\"foo\" 10")
.await
.unwrap_err();
// drop all chunks
for partition_key in db.partition_keys().unwrap() {
let chunk_ids: Vec<_> = {
let partition = db.partition("my_table", &partition_key).unwrap();
let partition = partition.read();
partition
.chunks()
.map(|chunk| {
let chunk = chunk.read();
chunk.id()
})
.collect()
};
for chunk_id in chunk_ids {
db.drop_chunk("my_table", &partition_key, chunk_id).unwrap();
}
}
// schema is still there
try_write_lp(&db, "my_table,tag_partition_by=a field_integer=\"foo\" 10")
.await
.unwrap_err();
}
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, u32) {
write_lp(db, "cpu bar=1 10").await;
let partition_key = "1970-01-01T00";

View File

@ -7,6 +7,7 @@ use hashbrown::{HashMap, HashSet};
use data_types::chunk_metadata::ChunkSummary;
use data_types::chunk_metadata::DetailedChunkSummary;
use data_types::partition_metadata::{PartitionSummary, TableSummary};
use internal_types::schema::Schema;
use snafu::Snafu;
use tracker::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
@ -193,12 +194,12 @@ impl Catalog {
}
/// Gets or creates a new partition in the catalog and returns
/// a reference to it
/// a reference to it alongside the current table schema.
pub fn get_or_create_partition(
&self,
table_name: impl AsRef<str>,
partition_key: impl AsRef<str>,
) -> Arc<RwLock<Partition>> {
) -> (Arc<RwLock<Partition>>, Arc<RwLock<Schema>>) {
let mut tables = self.tables.write();
let (_, table) = tables
.raw_entry_mut()
@ -215,7 +216,7 @@ impl Catalog {
});
let partition = table.get_or_create_partition(partition_key);
Arc::clone(&partition)
(Arc::clone(&partition), table.schema())
}
/// Returns a list of summaries for each partition.
@ -374,8 +375,8 @@ mod tests {
#[test]
fn chunk_create() {
let catalog = Catalog::test();
let p1 = catalog.get_or_create_partition("t1", "p1");
let p2 = catalog.get_or_create_partition("t2", "p2");
let (p1, _schema) = catalog.get_or_create_partition("t1", "p1");
let (p2, _schema) = catalog.get_or_create_partition("t2", "p2");
create_open_chunk(&p1);
create_open_chunk(&p1);
@ -406,13 +407,13 @@ mod tests {
fn chunk_list() {
let catalog = Catalog::test();
let p1 = catalog.get_or_create_partition("table1", "p1");
let p2 = catalog.get_or_create_partition("table2", "p1");
let (p1, _schema) = catalog.get_or_create_partition("table1", "p1");
let (p2, _schema) = catalog.get_or_create_partition("table2", "p1");
create_open_chunk(&p1);
create_open_chunk(&p1);
create_open_chunk(&p2);
let p3 = catalog.get_or_create_partition("table1", "p2");
let (p3, _schema) = catalog.get_or_create_partition("table1", "p2");
create_open_chunk(&p3);
assert_eq!(
@ -450,13 +451,13 @@ mod tests {
fn chunk_drop() {
let catalog = Catalog::test();
let p1 = catalog.get_or_create_partition("p1", "table1");
let p2 = catalog.get_or_create_partition("p1", "table2");
let (p1, _schema) = catalog.get_or_create_partition("p1", "table1");
let (p2, _schema) = catalog.get_or_create_partition("p1", "table2");
create_open_chunk(&p1);
create_open_chunk(&p1);
create_open_chunk(&p2);
let p3 = catalog.get_or_create_partition("p2", "table1");
let (p3, _schema) = catalog.get_or_create_partition("p2", "table1");
create_open_chunk(&p3);
assert_eq!(chunk_strings(&catalog).len(), 4);
@ -486,7 +487,7 @@ mod tests {
#[test]
fn chunk_drop_non_existent_chunk() {
let catalog = Catalog::test();
let p3 = catalog.get_or_create_partition("table1", "p3");
let (p3, _schema) = catalog.get_or_create_partition("table1", "p3");
create_open_chunk(&p3);
let mut p3 = p3.write();
@ -499,7 +500,7 @@ mod tests {
fn chunk_recreate_dropped() {
let catalog = Catalog::test();
let p1 = catalog.get_or_create_partition("table1", "p1");
let (p1, _schema) = catalog.get_or_create_partition("table1", "p1");
create_open_chunk(&p1);
create_open_chunk(&p1);
assert_eq!(
@ -526,9 +527,9 @@ mod tests {
use TableNameFilter::*;
let catalog = Catalog::test();
let p1 = catalog.get_or_create_partition("table1", "p1");
let p2 = catalog.get_or_create_partition("table2", "p1");
let p3 = catalog.get_or_create_partition("table2", "p2");
let (p1, _schema) = catalog.get_or_create_partition("table1", "p1");
let (p2, _schema) = catalog.get_or_create_partition("table2", "p1");
let (p3, _schema) = catalog.get_or_create_partition("table2", "p2");
create_open_chunk(&p1);
create_open_chunk(&p2);
create_open_chunk(&p3);

View File

@ -10,9 +10,6 @@ use internal_types::schema::{
use std::{ops::Deref, result::Result, sync::Arc};
use tracker::{RwLock, RwLockReadGuard, RwLockWriteGuard};
/// Table-wide schema.
type TableSchema = RwLock<Schema>;
/// A `Table` is a collection of `Partition` each of which is a collection of `Chunk`
#[derive(Debug)]
pub struct Table {
@ -29,7 +26,7 @@ pub struct Table {
metrics: TableMetrics,
/// Table-wide schema.
schema: TableSchema,
schema: Arc<RwLock<Schema>>,
}
impl Table {
@ -41,9 +38,9 @@ impl Table {
pub(super) fn new(db_name: Arc<str>, table_name: Arc<str>, metrics: TableMetrics) -> Self {
// build empty schema for this table
let mut builder = SchemaBuilder::new();
builder.measurement(db_name.as_ref());
builder.measurement(table_name.as_ref());
let schema = builder.build().expect("cannot build empty schema");
let schema = metrics.new_table_lock(schema);
let schema = Arc::new(metrics.new_table_lock(schema));
Self {
db_name,
@ -96,16 +93,14 @@ impl Table {
self.partitions.values().map(|x| x.read().summary())
}
pub fn schema_upsert_handle(
&self,
new_schema: &Schema,
) -> Result<TableSchemaUpsertHandle<'_>, SchemaMergerError> {
TableSchemaUpsertHandle::new(&self.schema, new_schema)
pub fn schema(&self) -> Arc<RwLock<Schema>> {
Arc::clone(&self.schema)
}
}
/// Inner state of [`TableSchemaUpsertHandle`] that depends if the schema will be changed during the write operation or
/// not.
#[derive(Debug)]
enum TableSchemaUpsertHandleInner<'a> {
/// Schema will not be changed.
NoChange {
@ -120,12 +115,16 @@ enum TableSchemaUpsertHandleInner<'a> {
}
/// Handle that can be used to modify the table-wide [schema](Schema) during new writes.
#[derive(Debug)]
pub struct TableSchemaUpsertHandle<'a> {
inner: TableSchemaUpsertHandleInner<'a>,
}
impl<'a> TableSchemaUpsertHandle<'a> {
fn new(table_schema: &'a TableSchema, new_schema: &Schema) -> Result<Self, SchemaMergerError> {
pub(crate) fn new(
table_schema: &'a RwLock<Schema>,
new_schema: &Schema,
) -> Result<Self, SchemaMergerError> {
// Be optimistic and only get a read lock. It is rather rate that the schema will change when new data arrives
// and we do NOT want to serialize all writes on a single lock.
let table_schema_read = table_schema.read();
@ -171,7 +170,7 @@ impl<'a> TableSchemaUpsertHandle<'a> {
}
/// Commit potential schema change.
fn commit(self) {
pub(crate) fn commit(self) {
match self.inner {
TableSchemaUpsertHandleInner::NoChange { table_schema_read } => {
// Nothing to do since there was no schema changed queued. Just drop the read guard.
@ -264,7 +263,7 @@ mod tests {
.influx_column("tag2", InfluxColumnType::Tag)
.build()
.unwrap();
let table_schema = lock_tracker.new_lock(table_schema_orig.clone());
let table_schema = lock_tracker.new_lock(table_schema_orig);
let new_schema = SchemaBuilder::new()
.measurement("m1")

View File

@ -13,7 +13,7 @@ use parquet_file::{
};
use snafu::ResultExt;
use crate::db::catalog::chunk::ChunkStage;
use crate::db::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle};
use super::catalog::Catalog;
@ -139,7 +139,7 @@ impl CatalogState for Catalog {
object_store: Arc<ObjectStore>,
info: CatalogParquetInfo,
) -> parquet_file::catalog::Result<()> {
use parquet_file::catalog::MetadataExtractFailed;
use parquet_file::catalog::{MetadataExtractFailed, SchemaError};
// extract relevant bits from parquet file metadata
let iox_md = info
@ -168,12 +168,17 @@ impl CatalogState for Catalog {
// Get partition from the catalog
// Note that the partition might not exist yet if the chunk is loaded from an existing preserved catalog.
let partition = self.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key);
let (partition, table_schema) =
self.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key);
let mut partition = partition.write();
if partition.chunk(iox_md.chunk_id).is_some() {
return Err(parquet_file::catalog::Error::ParquetFileAlreadyExists { path: info.path });
}
let schema_handle = TableSchemaUpsertHandle::new(&table_schema, &parquet_chunk.schema())
.map_err(|e| Box::new(e) as _)
.context(SchemaError { path: info.path })?;
partition.insert_object_store_only_chunk(iox_md.chunk_id, parquet_chunk);
schema_handle.commit();
Ok(())
}