feat: add infra to check table-wide schemas

pull/24376/head
Marco Neumann 2021-07-07 11:23:56 +02:00
parent 315217362f
commit 5936452895
1 changed files with 249 additions and 1 deletions

View File

@ -2,20 +2,36 @@ use super::partition::Partition;
use crate::db::catalog::metrics::TableMetrics;
use data_types::partition_metadata::PartitionSummary;
use hashbrown::HashMap;
use std::sync::Arc;
use internal_types::schema::{
builder::SchemaBuilder,
merge::{Error as SchemaMergerError, SchemaMerger},
Schema,
};
use std::{ops::Deref, sync::Arc};
use tracker::RwLock;
/// Table-wide schema.
///
/// We need an async lock here because waiting for write locks (e.g. during column additions) can block for quite a while.
type TableSchema = Arc<tokio::sync::RwLock<Schema>>;
/// A `Table` is a collection of `Partition` each of which is a collection of `Chunk`
#[derive(Debug)]
pub struct Table {
/// Database name
db_name: Arc<str>,
/// Table name
table_name: Arc<str>,
/// key is partition key
partitions: HashMap<Arc<str>, Arc<RwLock<Partition>>>,
/// Table metrics
metrics: TableMetrics,
/// Table-wide schema.
schema: TableSchema,
}
impl Table {
@ -25,11 +41,17 @@ impl Table {
/// created using the interfaces on [`Catalog`](crate::db::catalog::Catalog) and not
/// instantiated directly.
pub(super) fn new(db_name: Arc<str>, table_name: Arc<str>, metrics: TableMetrics) -> Self {
// build empty schema for this table
let mut builder = SchemaBuilder::new();
builder.measurement(db_name.as_ref());
let schema = builder.build().expect("cannot build empty schema");
Self {
db_name,
table_name,
partitions: Default::default(),
metrics,
schema: Arc::new(tokio::sync::RwLock::new(schema)),
}
}
@ -75,3 +97,229 @@ impl Table {
self.partitions.values().map(|x| x.read().summary())
}
}
/// Inner state of [`TableSchemaUpsertHandle`] that depends if the schema will be changed during the write operation or
/// not.
enum TableSchemaUpsertHandleInner {
/// Schema will not be changed.
NoChange {
table_schema_read: tokio::sync::OwnedRwLockReadGuard<Schema>,
},
/// Schema might change (if write to mutable buffer is successfull).
MightChange {
table_schema_write: tokio::sync::OwnedRwLockWriteGuard<Schema>,
merged_schema: Schema,
},
}
/// Handle that can be used to modify the table-wide [schema](Schema) during new writes.
struct TableSchemaUpsertHandle {
inner: TableSchemaUpsertHandleInner,
}
impl TableSchemaUpsertHandle {
async fn new(
table_schema: TableSchema,
new_schema: &Schema,
) -> std::result::Result<Self, SchemaMergerError> {
// Be optimistic and only get a read lock. It is rather rate that the schema will change when new data arrives
// and we do NOT want to serialize all writes on a single lock.
let table_schema_read = Arc::clone(&table_schema).read_owned().await;
// Let's see if we can merge the new schema with the existing one (this may or may not result in any schema
// change).
let merged_schema = Self::try_merge(&table_schema_read, new_schema)?;
// Now check if this would actually change the schema:
if &merged_schema == table_schema_read.deref() {
// Optimism payed off and we get away we the read lock.
Ok(Self {
inner: TableSchemaUpsertHandleInner::NoChange { table_schema_read },
})
} else {
// Schema changed, so we need a real write lock. To do that, we must first drop the read lock.
drop(table_schema_read);
// !!! Here we have a lock-gap !!!
// Re-lock with write permissions.
let table_schema_write = table_schema.write_owned().await;
// During the above write lock, the schema might have changed again, so we need to perform the merge again.
// This may also lead to a failure now, e.g. when adding a column that was added with a different type
// during the lock gap.
let merged_schema = Self::try_merge(&table_schema_write, new_schema)?;
Ok(Self {
inner: TableSchemaUpsertHandleInner::MightChange {
table_schema_write,
merged_schema,
},
})
}
}
/// Try to merge schema.
///
/// This will also sort the columns!
fn try_merge(
schema1: &Schema,
schema2: &Schema,
) -> std::result::Result<Schema, SchemaMergerError> {
Ok(SchemaMerger::new().merge(schema1)?.merge(schema2)?.build())
}
/// Commit potential schema change.
fn commit(self) {
match self.inner {
TableSchemaUpsertHandleInner::NoChange { table_schema_read } => {
// Nothing to do since there was no schema changed queued. Just drop the read guard.
drop(table_schema_read);
}
TableSchemaUpsertHandleInner::MightChange {
mut table_schema_write,
merged_schema,
} => {
// Commit new schema and drop write guard;
*table_schema_write = merged_schema;
drop(table_schema_write);
}
}
}
}
#[cfg(test)]
mod tests {
use internal_types::schema::{InfluxColumnType, InfluxFieldType};
use super::*;
#[tokio::test]
async fn test_handle_no_change() {
let table_schema_orig = SchemaBuilder::new()
.measurement("m1")
.influx_column("tag1", InfluxColumnType::Tag)
.influx_column("tag2", InfluxColumnType::Tag)
.build()
.unwrap();
let table_schema = Arc::new(tokio::sync::RwLock::new(table_schema_orig.clone()));
// writing with the same schema must not trigger a change
let schema1 = SchemaBuilder::new()
.measurement("m1")
.influx_column("tag1", InfluxColumnType::Tag)
.influx_column("tag2", InfluxColumnType::Tag)
.build()
.unwrap();
let handle = TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &schema1)
.await
.unwrap();
assert!(matches!(
handle.inner,
TableSchemaUpsertHandleInner::NoChange { .. }
));
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
handle.commit();
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
// writing with different column order must not trigger a change
let schema2 = SchemaBuilder::new()
.measurement("m1")
.influx_column("tag2", InfluxColumnType::Tag)
.influx_column("tag1", InfluxColumnType::Tag)
.build()
.unwrap();
let handle = TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &schema2)
.await
.unwrap();
assert!(matches!(
handle.inner,
TableSchemaUpsertHandleInner::NoChange { .. }
));
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
handle.commit();
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
// writing with a column subset must not trigger a change
let schema3 = SchemaBuilder::new()
.measurement("m1")
.influx_column("tag1", InfluxColumnType::Tag)
.build()
.unwrap();
let handle = TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &schema3)
.await
.unwrap();
assert!(matches!(
handle.inner,
TableSchemaUpsertHandleInner::NoChange { .. }
));
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
handle.commit();
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
}
#[tokio::test]
async fn test_handle_might_change() {
let table_schema_orig = SchemaBuilder::new()
.measurement("m1")
.influx_column("tag1", InfluxColumnType::Tag)
.influx_column("tag2", InfluxColumnType::Tag)
.build()
.unwrap();
let table_schema = Arc::new(tokio::sync::RwLock::new(table_schema_orig.clone()));
let new_schema = SchemaBuilder::new()
.measurement("m1")
.influx_column("tag1", InfluxColumnType::Tag)
.influx_column("tag3", InfluxColumnType::Tag)
.build()
.unwrap();
let handle = TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &new_schema)
.await
.unwrap();
assert!(matches!(
handle.inner,
TableSchemaUpsertHandleInner::MightChange { .. }
));
// cannot read while lock is held
assert!(table_schema.try_read().is_err());
handle.commit();
let table_schema_expected = SchemaBuilder::new()
.measurement("m1")
.influx_column("tag1", InfluxColumnType::Tag)
.influx_column("tag2", InfluxColumnType::Tag)
.influx_column("tag3", InfluxColumnType::Tag)
.build()
.unwrap();
assert_eq!(table_schema.read().await.deref(), &table_schema_expected);
}
#[tokio::test]
async fn test_handle_error() {
let table_schema_orig = SchemaBuilder::new()
.measurement("m1")
.influx_column("tag1", InfluxColumnType::Tag)
.influx_column("tag2", InfluxColumnType::Tag)
.build()
.unwrap();
let table_schema = Arc::new(tokio::sync::RwLock::new(table_schema_orig.clone()));
let schema1 = SchemaBuilder::new()
.measurement("m1")
.influx_column("tag1", InfluxColumnType::Tag)
.influx_column("tag2", InfluxColumnType::Field(InfluxFieldType::String))
.build()
.unwrap();
assert!(
TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &schema1)
.await
.is_err()
);
// schema did not change
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
}
}