refactor: use sync locks w/ better metrics
parent
5936452895
commit
ed3ebdcbd2
|
@ -26,9 +26,17 @@ impl CatalogMetrics {
|
|||
}
|
||||
|
||||
pub(super) fn new_table_metrics(&self, table_name: &str) -> TableMetrics {
|
||||
let chunk_lock_tracker = Default::default();
|
||||
let partition_lock_tracker = Default::default();
|
||||
let table_lock_tracker = Default::default();
|
||||
self.metrics_domain.register_observer(
|
||||
None,
|
||||
&[
|
||||
KeyValue::new("lock", "table"),
|
||||
KeyValue::new("table", table_name.to_string()),
|
||||
],
|
||||
&table_lock_tracker,
|
||||
);
|
||||
|
||||
let partition_lock_tracker = Default::default();
|
||||
self.metrics_domain.register_observer(
|
||||
None,
|
||||
&[
|
||||
|
@ -38,6 +46,7 @@ impl CatalogMetrics {
|
|||
&partition_lock_tracker,
|
||||
);
|
||||
|
||||
let chunk_lock_tracker = Default::default();
|
||||
self.metrics_domain.register_observer(
|
||||
None,
|
||||
&[
|
||||
|
@ -49,6 +58,7 @@ impl CatalogMetrics {
|
|||
|
||||
TableMetrics {
|
||||
metrics_domain: Arc::clone(&self.metrics_domain),
|
||||
table_lock_tracker,
|
||||
partition_lock_tracker,
|
||||
chunk_lock_tracker,
|
||||
}
|
||||
|
@ -60,12 +70,21 @@ pub struct TableMetrics {
|
|||
/// Metrics domain
|
||||
metrics_domain: Arc<metrics::Domain>,
|
||||
|
||||
/// Lock tracker for table-level locks
|
||||
table_lock_tracker: LockTracker,
|
||||
|
||||
/// Lock tracker for partition-level locks
|
||||
partition_lock_tracker: LockTracker,
|
||||
|
||||
/// Lock tracker for chunk-level locks
|
||||
chunk_lock_tracker: LockTracker,
|
||||
}
|
||||
|
||||
impl TableMetrics {
|
||||
pub(super) fn new_table_lock<T>(&self, t: T) -> RwLock<T> {
|
||||
self.table_lock_tracker.new_lock(t)
|
||||
}
|
||||
|
||||
pub(super) fn new_partition_lock<T>(&self, t: T) -> RwLock<T> {
|
||||
self.partition_lock_tracker.new_lock(t)
|
||||
}
|
||||
|
|
|
@ -7,13 +7,11 @@ use internal_types::schema::{
|
|||
merge::{Error as SchemaMergerError, SchemaMerger},
|
||||
Schema,
|
||||
};
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
use tracker::RwLock;
|
||||
use std::{ops::Deref, result::Result, sync::Arc};
|
||||
use tracker::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
/// Table-wide schema.
|
||||
///
|
||||
/// We need an async lock here because waiting for write locks (e.g. during column additions) can block for quite a while.
|
||||
type TableSchema = Arc<tokio::sync::RwLock<Schema>>;
|
||||
type TableSchema = RwLock<Schema>;
|
||||
|
||||
/// A `Table` is a collection of `Partition` each of which is a collection of `Chunk`
|
||||
#[derive(Debug)]
|
||||
|
@ -45,13 +43,14 @@ impl Table {
|
|||
let mut builder = SchemaBuilder::new();
|
||||
builder.measurement(db_name.as_ref());
|
||||
let schema = builder.build().expect("cannot build empty schema");
|
||||
let schema = metrics.new_table_lock(schema);
|
||||
|
||||
Self {
|
||||
db_name,
|
||||
table_name,
|
||||
partitions: Default::default(),
|
||||
metrics,
|
||||
schema: Arc::new(tokio::sync::RwLock::new(schema)),
|
||||
schema,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -96,36 +95,40 @@ impl Table {
|
|||
pub fn partition_summaries(&self) -> impl Iterator<Item = PartitionSummary> + '_ {
|
||||
self.partitions.values().map(|x| x.read().summary())
|
||||
}
|
||||
|
||||
pub fn schema_upsert_handle(
|
||||
&self,
|
||||
new_schema: &Schema,
|
||||
) -> Result<TableSchemaUpsertHandle<'_>, SchemaMergerError> {
|
||||
TableSchemaUpsertHandle::new(&self.schema, new_schema)
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner state of [`TableSchemaUpsertHandle`] that depends if the schema will be changed during the write operation or
|
||||
/// not.
|
||||
enum TableSchemaUpsertHandleInner {
|
||||
enum TableSchemaUpsertHandleInner<'a> {
|
||||
/// Schema will not be changed.
|
||||
NoChange {
|
||||
table_schema_read: tokio::sync::OwnedRwLockReadGuard<Schema>,
|
||||
table_schema_read: RwLockReadGuard<'a, Schema>,
|
||||
},
|
||||
|
||||
/// Schema might change (if write to mutable buffer is successfull).
|
||||
MightChange {
|
||||
table_schema_write: tokio::sync::OwnedRwLockWriteGuard<Schema>,
|
||||
table_schema_write: RwLockWriteGuard<'a, Schema>,
|
||||
merged_schema: Schema,
|
||||
},
|
||||
}
|
||||
|
||||
/// Handle that can be used to modify the table-wide [schema](Schema) during new writes.
|
||||
struct TableSchemaUpsertHandle {
|
||||
inner: TableSchemaUpsertHandleInner,
|
||||
pub struct TableSchemaUpsertHandle<'a> {
|
||||
inner: TableSchemaUpsertHandleInner<'a>,
|
||||
}
|
||||
|
||||
impl TableSchemaUpsertHandle {
|
||||
async fn new(
|
||||
table_schema: TableSchema,
|
||||
new_schema: &Schema,
|
||||
) -> std::result::Result<Self, SchemaMergerError> {
|
||||
impl<'a> TableSchemaUpsertHandle<'a> {
|
||||
fn new(table_schema: &'a TableSchema, new_schema: &Schema) -> Result<Self, SchemaMergerError> {
|
||||
// Be optimistic and only get a read lock. It is rather rate that the schema will change when new data arrives
|
||||
// and we do NOT want to serialize all writes on a single lock.
|
||||
let table_schema_read = Arc::clone(&table_schema).read_owned().await;
|
||||
let table_schema_read = table_schema.read();
|
||||
|
||||
// Let's see if we can merge the new schema with the existing one (this may or may not result in any schema
|
||||
// change).
|
||||
|
@ -144,7 +147,7 @@ impl TableSchemaUpsertHandle {
|
|||
// !!! Here we have a lock-gap !!!
|
||||
|
||||
// Re-lock with write permissions.
|
||||
let table_schema_write = table_schema.write_owned().await;
|
||||
let table_schema_write = table_schema.write();
|
||||
|
||||
// During the above write lock, the schema might have changed again, so we need to perform the merge again.
|
||||
// This may also lead to a failure now, e.g. when adding a column that was added with a different type
|
||||
|
@ -163,10 +166,7 @@ impl TableSchemaUpsertHandle {
|
|||
/// Try to merge schema.
|
||||
///
|
||||
/// This will also sort the columns!
|
||||
fn try_merge(
|
||||
schema1: &Schema,
|
||||
schema2: &Schema,
|
||||
) -> std::result::Result<Schema, SchemaMergerError> {
|
||||
fn try_merge(schema1: &Schema, schema2: &Schema) -> Result<Schema, SchemaMergerError> {
|
||||
Ok(SchemaMerger::new().merge(schema1)?.merge(schema2)?.build())
|
||||
}
|
||||
|
||||
|
@ -192,18 +192,20 @@ impl TableSchemaUpsertHandle {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use internal_types::schema::{InfluxColumnType, InfluxFieldType};
|
||||
use tracker::LockTracker;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_no_change() {
|
||||
#[test]
|
||||
fn test_handle_no_change() {
|
||||
let lock_tracker = LockTracker::default();
|
||||
let table_schema_orig = SchemaBuilder::new()
|
||||
.measurement("m1")
|
||||
.influx_column("tag1", InfluxColumnType::Tag)
|
||||
.influx_column("tag2", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_schema = Arc::new(tokio::sync::RwLock::new(table_schema_orig.clone()));
|
||||
let table_schema = lock_tracker.new_lock(table_schema_orig.clone());
|
||||
|
||||
// writing with the same schema must not trigger a change
|
||||
let schema1 = SchemaBuilder::new()
|
||||
|
@ -212,16 +214,14 @@ mod tests {
|
|||
.influx_column("tag2", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let handle = TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &schema1)
|
||||
.await
|
||||
.unwrap();
|
||||
let handle = TableSchemaUpsertHandle::new(&table_schema, &schema1).unwrap();
|
||||
assert!(matches!(
|
||||
handle.inner,
|
||||
TableSchemaUpsertHandleInner::NoChange { .. }
|
||||
));
|
||||
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
handle.commit();
|
||||
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
|
||||
// writing with different column order must not trigger a change
|
||||
let schema2 = SchemaBuilder::new()
|
||||
|
@ -230,16 +230,14 @@ mod tests {
|
|||
.influx_column("tag1", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let handle = TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &schema2)
|
||||
.await
|
||||
.unwrap();
|
||||
let handle = TableSchemaUpsertHandle::new(&table_schema, &schema2).unwrap();
|
||||
assert!(matches!(
|
||||
handle.inner,
|
||||
TableSchemaUpsertHandleInner::NoChange { .. }
|
||||
));
|
||||
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
handle.commit();
|
||||
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
|
||||
// writing with a column subset must not trigger a change
|
||||
let schema3 = SchemaBuilder::new()
|
||||
|
@ -247,27 +245,26 @@ mod tests {
|
|||
.influx_column("tag1", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let handle = TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &schema3)
|
||||
.await
|
||||
.unwrap();
|
||||
let handle = TableSchemaUpsertHandle::new(&table_schema, &schema3).unwrap();
|
||||
assert!(matches!(
|
||||
handle.inner,
|
||||
TableSchemaUpsertHandleInner::NoChange { .. }
|
||||
));
|
||||
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
handle.commit();
|
||||
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_might_change() {
|
||||
#[test]
|
||||
fn test_handle_might_change() {
|
||||
let lock_tracker = LockTracker::default();
|
||||
let table_schema_orig = SchemaBuilder::new()
|
||||
.measurement("m1")
|
||||
.influx_column("tag1", InfluxColumnType::Tag)
|
||||
.influx_column("tag2", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_schema = Arc::new(tokio::sync::RwLock::new(table_schema_orig.clone()));
|
||||
let table_schema = lock_tracker.new_lock(table_schema_orig.clone());
|
||||
|
||||
let new_schema = SchemaBuilder::new()
|
||||
.measurement("m1")
|
||||
|
@ -275,16 +272,14 @@ mod tests {
|
|||
.influx_column("tag3", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let handle = TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &new_schema)
|
||||
.await
|
||||
.unwrap();
|
||||
let handle = TableSchemaUpsertHandle::new(&table_schema, &new_schema).unwrap();
|
||||
assert!(matches!(
|
||||
handle.inner,
|
||||
TableSchemaUpsertHandleInner::MightChange { .. }
|
||||
));
|
||||
|
||||
// cannot read while lock is held
|
||||
assert!(table_schema.try_read().is_err());
|
||||
assert!(table_schema.try_read().is_none());
|
||||
|
||||
handle.commit();
|
||||
let table_schema_expected = SchemaBuilder::new()
|
||||
|
@ -294,18 +289,19 @@ mod tests {
|
|||
.influx_column("tag3", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
assert_eq!(table_schema.read().await.deref(), &table_schema_expected);
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_error() {
|
||||
#[test]
|
||||
fn test_handle_error() {
|
||||
let lock_tracker = LockTracker::default();
|
||||
let table_schema_orig = SchemaBuilder::new()
|
||||
.measurement("m1")
|
||||
.influx_column("tag1", InfluxColumnType::Tag)
|
||||
.influx_column("tag2", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_schema = Arc::new(tokio::sync::RwLock::new(table_schema_orig.clone()));
|
||||
let table_schema = lock_tracker.new_lock(table_schema_orig.clone());
|
||||
|
||||
let schema1 = SchemaBuilder::new()
|
||||
.measurement("m1")
|
||||
|
@ -313,13 +309,9 @@ mod tests {
|
|||
.influx_column("tag2", InfluxColumnType::Field(InfluxFieldType::String))
|
||||
.build()
|
||||
.unwrap();
|
||||
assert!(
|
||||
TableSchemaUpsertHandle::new(Arc::clone(&table_schema), &schema1)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
assert!(TableSchemaUpsertHandle::new(&table_schema, &schema1).is_err());
|
||||
|
||||
// schema did not change
|
||||
assert_eq!(table_schema.read().await.deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue