From adc6fcfb04d2486e73cc78bd0f0f3a6b9f6b0182 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 20 Dec 2022 11:11:02 +0100 Subject: [PATCH] feat(catalog): linearise sort key updates Updating the sort key is not commutative and MUST be serialised. The correctness of the current catalog interface relies on the caller serialising updates globally, something it cannot reasonably assert in a distributed system. This change of the catalog interface pushes this responsibility to the catalog itself where it can be effectively enforced, and allows a caller to detect parallel updates to the sort key. --- compactor/src/compact.rs | 6 +- .../aggregate_tsm_schema/update_catalog.rs | 12 ++- ingester/src/buffer_tree/partition.rs | 2 +- .../partition/resolver/sort_key.rs | 2 +- ingester/src/data.rs | 43 +++++++--- ingester2/src/buffer_tree/partition.rs | 2 +- .../partition/resolver/sort_key.rs | 2 +- ingester2/src/persist/context.rs | 66 +++++++++++++--- iox_catalog/src/interface.rs | 79 +++++++++++++++++-- iox_catalog/src/mem.rs | 25 +++--- iox_catalog/src/metrics.rs | 12 +-- iox_catalog/src/postgres.rs | 67 +++++++++++----- iox_tests/src/util.rs | 30 ++++++- querier/src/cache/partition.rs | 6 +- 14 files changed, 274 insertions(+), 80 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index e4b3545ad7..d7b31b300f 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -842,7 +842,11 @@ pub mod tests { // update sort key for this another_partition let another_partition = txn .partitions() - .update_sort_key(another_partition.id, &["tag1", "time"]) + .cas_sort_key( + another_partition.id, + Some(another_partition.sort_key), + &["tag1", "time"], + ) .await .unwrap(); txn.commit().await.unwrap(); diff --git a/import/src/aggregate_tsm_schema/update_catalog.rs b/import/src/aggregate_tsm_schema/update_catalog.rs index 4a9a5af9be..415307fde2 100644 --- a/import/src/aggregate_tsm_schema/update_catalog.rs +++ b/import/src/aggregate_tsm_schema/update_catalog.rs @@ -6,7 +6,7 @@ use data_types::{ Partition, PartitionKey, QueryPoolId, ShardId, TableSchema, TopicId, }; use influxdb_iox_client::connection::{Connection, GrpcConnection}; -use iox_catalog::interface::{get_schema_by_name, Catalog, RepoCollection}; +use iox_catalog::interface::{get_schema_by_name, CasFailure, Catalog, RepoCollection}; use schema::{ sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder}, InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME, @@ -23,6 +23,9 @@ pub enum UpdateCatalogError { #[error("Error returned from the Catalog: {0}")] CatalogError(#[from] iox_catalog::interface::Error), + #[error("Error returned from the Catalog: failed to cas sort key update")] + SortKeyCasError, + #[error("Couldn't construct namespace from org and bucket: {0}")] InvalidOrgBucket(#[from] OrgBucketMappingError), @@ -301,9 +304,12 @@ where let sort_key = sort_key.to_columns().collect::>(); repos .partitions() - .update_sort_key(partition.id, &sort_key) + .cas_sort_key(partition.id, Some(partition.sort_key), &sort_key) .await - .map_err(UpdateCatalogError::CatalogError)?; + .map_err(|e| match e { + CasFailure::ValueMismatch(_) => UpdateCatalogError::SortKeyCasError, + CasFailure::QueryError(e) => UpdateCatalogError::CatalogError(e), + })?; } } } diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index 27d99589b8..189a2ff825 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -939,7 +939,7 @@ mod tests { .repositories() .await .partitions() - .update_sort_key(partition_id, &["terrific"]) + .cas_sort_key(partition_id, None, &["terrific"]) .await .unwrap(); diff --git a/ingester/src/buffer_tree/partition/resolver/sort_key.rs b/ingester/src/buffer_tree/partition/resolver/sort_key.rs index 1c75e66dd2..9f0d5b9229 100644 --- a/ingester/src/buffer_tree/partition/resolver/sort_key.rs +++ b/ingester/src/buffer_tree/partition/resolver/sort_key.rs @@ -92,7 +92,7 @@ mod tests { .repositories() .await .partitions() - .update_sort_key(partition_id, &["uno", "dos", "bananas"]) + .cas_sort_key(partition_id, None, &["uno", "dos", "bananas"]) .await .expect("should update existing partition key"); diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 0cf3d7a808..970d6e7094 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -12,7 +12,7 @@ use data_types::{ CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, ShardIndex, TableId, }; use dml::DmlOperation; -use iox_catalog::interface::{get_table_schema_by_id, Catalog}; +use iox_catalog::interface::{get_table_schema_by_id, CasFailure, Catalog}; use iox_query::exec::Executor; use iox_time::{SystemProvider, TimeProvider}; use metric::{Attributes, Metric, U64Histogram, U64HistogramOptions}; @@ -442,7 +442,7 @@ impl Persister for IngesterData { stream: record_stream, catalog_sort_key_update, data_sort_key, - } = compact_persisting_batch(&self.exec, sort_key, table_name.clone(), batch) + } = compact_persisting_batch(&self.exec, sort_key.clone(), table_name.clone(), batch) .await .expect("unable to compact persisting batch"); @@ -481,16 +481,35 @@ impl Persister for IngesterData { // compactor may see a parquet file with an inconsistent // sort key. https://github.com/influxdata/influxdb_iox/issues/5090 if let Some(new_sort_key) = catalog_sort_key_update { - let sort_key = new_sort_key.to_columns().collect::>(); + let new_sort_key_str = new_sort_key.to_columns().collect::>(); + let old_sort_key: Option> = + sort_key.map(|v| v.to_columns().map(ToString::to_string).collect()); Backoff::new(&self.backoff_config) - .retry_all_errors("update_sort_key", || async { - let mut repos = self.catalog.repositories().await; - let _partition = repos - .partitions() - .update_sort_key(partition_id, &sort_key) - .await?; - // compiler insisted on getting told the type of the error :shrug: - Ok(()) as Result<(), iox_catalog::interface::Error> + .retry_all_errors("cas_sort_key", || { + let old_sort_key = old_sort_key.clone(); + async { + let mut repos = self.catalog.repositories().await; + match repos + .partitions() + .cas_sort_key(partition_id, old_sort_key, &new_sort_key_str) + .await + { + Ok(_) => {} + Err(CasFailure::ValueMismatch(_)) => { + // An ingester concurrently updated the sort key. + // + // This breaks a sort-key update invariant - sort + // key updates MUST be serialised. This should + // not happen because writes for a given table + // are always mapped to a single ingester + // instance. + panic!("detected concurrent sort key update"); + } + Err(CasFailure::QueryError(e)) => return Err(e), + }; + // compiler insisted on getting told the type of the error :shrug: + Ok(()) as Result<(), iox_catalog::interface::Error> + } }) .await .expect("retry forever"); @@ -507,7 +526,7 @@ impl Persister for IngesterData { %table_name, %partition_id, %partition_key, - old_sort_key = ?sort_key, + ?old_sort_key, %new_sort_key, "adjusted sort key during batch compact & persist" ); diff --git a/ingester2/src/buffer_tree/partition.rs b/ingester2/src/buffer_tree/partition.rs index c86d4f7d02..aee1da9013 100644 --- a/ingester2/src/buffer_tree/partition.rs +++ b/ingester2/src/buffer_tree/partition.rs @@ -1006,7 +1006,7 @@ mod tests { .repositories() .await .partitions() - .update_sort_key(partition_id, &["terrific"]) + .cas_sort_key(partition_id, None, &["terrific"]) .await .unwrap(); diff --git a/ingester2/src/buffer_tree/partition/resolver/sort_key.rs b/ingester2/src/buffer_tree/partition/resolver/sort_key.rs index 1c75e66dd2..9f0d5b9229 100644 --- a/ingester2/src/buffer_tree/partition/resolver/sort_key.rs +++ b/ingester2/src/buffer_tree/partition/resolver/sort_key.rs @@ -92,7 +92,7 @@ mod tests { .repositories() .await .partitions() - .update_sort_key(partition_id, &["uno", "dos", "bananas"]) + .cas_sort_key(partition_id, None, &["uno", "dos", "bananas"]) .await .expect("should update existing partition key"); diff --git a/ingester2/src/persist/context.rs b/ingester2/src/persist/context.rs index 5c85786b70..29d0bad60c 100644 --- a/ingester2/src/persist/context.rs +++ b/ingester2/src/persist/context.rs @@ -5,7 +5,7 @@ use data_types::{ CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId, }; -use iox_catalog::interface::get_table_schema_by_id; +use iox_catalog::interface::{get_table_schema_by_id, CasFailure}; use iox_time::{SystemProvider, TimeProvider}; use observability_deps::tracing::*; use parking_lot::Mutex; @@ -319,15 +319,54 @@ impl Context { // the consumer of the parquet file will observe an inconsistent sort // key. if let Some(new_sort_key) = sort_key_update { - let sort_key = new_sort_key.to_columns().collect::>(); + let old_sort_key = self + .sort_key + .get() + .await + .map(|v| v.to_columns().map(|v| v.to_string()).collect::>()); Backoff::new(&Default::default()) - .retry_all_errors("update_sort_key", || async { - let mut repos = self.inner.catalog.repositories().await; - let _partition = repos - .partitions() - .update_sort_key(self.partition_id, &sort_key) - .await?; - Ok(()) as Result<(), iox_catalog::interface::Error> + .retry_all_errors("cas_sort_key", || { + let mut old_sort_key = old_sort_key.clone(); + let new_sort_key_str = new_sort_key.to_columns().collect::>(); + let catalog = Arc::clone(&self.inner.catalog); + async move { + let mut repos = catalog.repositories().await; + loop { + match repos + .partitions() + .cas_sort_key( + self.partition_id, + old_sort_key.clone(), + &new_sort_key_str, + ) + .await + { + Ok(_) => break, + Err(CasFailure::ValueMismatch(old)) => { + // An ingester concurrently updated the sort + // key. + // + // This breaks a sort-key update invariant - + // sort key updates MUST be serialised. This + // currently cannot be enforced. + // + // See: + // https://github.com/influxdata/influxdb_iox/issues/6439 + // + error!( + expected=?old_sort_key, + observed=?old, + update=?new_sort_key_str, + "detected concurrent sort key update" + ); + // Retry using the new CAS value. + old_sort_key = Some(old); + } + Err(CasFailure::QueryError(e)) => return Err(e), + }; + } + Ok(()) as Result<(), iox_catalog::interface::Error> + } }) .await .expect("retry forever"); @@ -340,10 +379,11 @@ impl Context { guard.update_sort_key(Some(new_sort_key.clone())); }; - // Assert the serialisation of sort key updates. + // Assert the internal (to this instance) serialisation of sort key + // updates. // - // Both of these get() should not block due to both of the - // values having been previously resolved / used. + // Both of these get() should not block due to both of the values + // having been previously resolved / used. assert_eq!(old_key.get().await, self.sort_key.get().await); debug!( @@ -354,7 +394,7 @@ impl Context { partition_id = %self.partition_id, partition_key = %self.partition_key, %object_store_id, - old_sort_key = ?sort_key, + ?old_sort_key, %new_sort_key, "adjusted partition sort key" ); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 1a74b744ae..12532163fa 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -17,6 +17,18 @@ use std::{ }; use uuid::Uuid; +/// An error wrapper detailing the reason for a compare-and-swap failure. +#[derive(Debug)] +pub enum CasFailure { + /// The compare-and-swap failed because the current value differers from the + /// comparator. + /// + /// Contains the new current value. + ValueMismatch(T), + /// A query error occurred. + QueryError(Error), +} + #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] #[snafu(visibility(pub(crate)))] @@ -441,15 +453,22 @@ pub trait PartitionRepo: Send + Sync { /// return the partitions by table id async fn list_by_table_id(&mut self, table_id: TableId) -> Result>; - /// Update the sort key for the partition. + /// Update the sort key for the partition, setting it to `new_sort_key` iff + /// the current value matches `old_sort_key`. Returns /// /// NOTE: it is expected that ONLY the ingesters update sort keys for /// existing partitions. - async fn update_sort_key( + /// + /// # Spurious failure + /// + /// Implementations are allowed to spuriously return + /// [`CasFailure::ValueMismatch`] for performance reasons. + async fn cas_sort_key( &mut self, partition_id: PartitionId, - sort_key: &[&str], - ) -> Result; + old_sort_key: Option>, + new_sort_key: &[&str], + ) -> Result>>; /// Record an instance of a partition being selected for compaction but compaction was not /// completed for the specified reason. @@ -1570,10 +1589,24 @@ pub(crate) mod test_helpers { // test update_sort_key from None to Some repos .partitions() - .update_sort_key(other_partition.id, &["tag2", "tag1", "time"]) + .cas_sort_key(other_partition.id, None, &["tag2", "tag1", "time"]) .await .unwrap(); + // test sort key CAS with an incorrect value + let err = repos + .partitions() + .cas_sort_key( + other_partition.id, + Some(["bananas".to_string()].to_vec()), + &["tag2", "tag1", "tag3 , with comma", "time"], + ) + .await + .expect_err("CAS with incorrect value should fail"); + assert_matches!(err, CasFailure::ValueMismatch(old) => { + assert_eq!(old, &["tag2", "tag1", "time"]); + }); + // test getting the new sort key let updated_other_partition = repos .partitions() @@ -1586,11 +1619,45 @@ pub(crate) mod test_helpers { vec!["tag2", "tag1", "time"] ); + // test sort key CAS with no value + let err = repos + .partitions() + .cas_sort_key( + other_partition.id, + None, + &["tag2", "tag1", "tag3 , with comma", "time"], + ) + .await + .expect_err("CAS with incorrect value should fail"); + assert_matches!(err, CasFailure::ValueMismatch(old) => { + assert_eq!(old, ["tag2", "tag1", "time"]); + }); + + // test sort key CAS with an incorrect value + let err = repos + .partitions() + .cas_sort_key( + other_partition.id, + Some(["bananas".to_string()].to_vec()), + &["tag2", "tag1", "tag3 , with comma", "time"], + ) + .await + .expect_err("CAS with incorrect value should fail"); + assert_matches!(err, CasFailure::ValueMismatch(old) => { + assert_eq!(old, ["tag2", "tag1", "time"]); + }); + // test update_sort_key from Some value to Some other value repos .partitions() - .update_sort_key( + .cas_sort_key( other_partition.id, + Some( + ["tag2", "tag1", "time"] + .into_iter() + .map(ToString::to_string) + .collect(), + ), &["tag2", "tag1", "tag3 , with comma", "time"], ) .await diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 0a28b40587..cd55a124af 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -3,10 +3,10 @@ use crate::{ interface::{ - sealed::TransactionFinalize, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, - NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo, QueryPoolRepo, - RepoCollection, Result, ShardRepo, TableRepo, TombstoneRepo, TopicMetadataRepo, - Transaction, + sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, + Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo, + QueryPoolRepo, RepoCollection, Result, ShardRepo, TableRepo, TombstoneRepo, + TopicMetadataRepo, Transaction, }, metrics::MetricDecorator, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, @@ -818,18 +818,23 @@ impl PartitionRepo for MemTxn { Ok(partitions) } - async fn update_sort_key( + async fn cas_sort_key( &mut self, partition_id: PartitionId, - sort_key: &[&str], - ) -> Result { + old_sort_key: Option>, + new_sort_key: &[&str], + ) -> Result>> { let stage = self.stage(); + let old_sort_key = old_sort_key.unwrap_or_default(); match stage.partitions.iter_mut().find(|p| p.id == partition_id) { - Some(p) => { - p.sort_key = sort_key.iter().map(|s| s.to_string()).collect(); + Some(p) if p.sort_key == old_sort_key => { + p.sort_key = new_sort_key.iter().map(|s| s.to_string()).collect(); Ok(p.clone()) } - None => Err(Error::PartitionNotFound { id: partition_id }), + Some(p) => return Err(CasFailure::ValueMismatch(p.sort_key.clone())), + None => Err(CasFailure::QueryError(Error::PartitionNotFound { + id: partition_id, + })), } } diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 77115e1926..d5cf9e5808 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -1,9 +1,9 @@ //! Metric instrumentation for catalog implementations. use crate::interface::{ - sealed::TransactionFinalize, ColumnRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, - ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, TableRepo, - TombstoneRepo, TopicMetadataRepo, + sealed::TransactionFinalize, CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo, + PartitionRepo, ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, + TableRepo, TombstoneRepo, TopicMetadataRepo, }; use async_trait::async_trait; use data_types::{ @@ -138,7 +138,7 @@ macro_rules! decorate { $metric:literal = $method:ident( &mut self $(,)? $($arg:ident : $t:ty),* - ) -> Result<$out:ty>; + ) -> Result<$out:ty$(, $err:ty)?>; )+] ) => { #[async_trait] @@ -149,7 +149,7 @@ macro_rules! decorate { /// below. $( - async fn $method(&mut self, $($arg : $t),*) -> Result<$out> { + async fn $method(&mut self, $($arg : $t),*) -> Result<$out$(, $err)?> { let observer: Metric = self.metrics.register_metric( "catalog_op_duration", "catalog call duration", @@ -245,7 +245,7 @@ decorate!( "partition_list_by_shard" = list_by_shard(&mut self, shard_id: ShardId) -> Result>; "partition_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result>; "partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result>; - "partition_update_sort_key" = update_sort_key(&mut self, partition_id: PartitionId, sort_key: &[&str]) -> Result; + "partition_update_sort_key" = cas_sort_key(&mut self, partition_id: PartitionId, old_sort_key: Option>, new_sort_key: &[&str]) -> Result>>; "partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>; "partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result>; "partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index c70a43d8a3..1400f2c4cd 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -2,10 +2,10 @@ use crate::{ interface::{ - self, sealed::TransactionFinalize, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, - NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo, QueryPoolRepo, - RepoCollection, Result, ShardRepo, TableRepo, TombstoneRepo, TopicMetadataRepo, - Transaction, + self, sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo, + ColumnTypeMismatchSnafu, Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, + ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, TableRepo, + TombstoneRepo, TopicMetadataRepo, Transaction, }, metrics::MetricDecorator, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, @@ -1234,34 +1234,65 @@ WHERE table_id = $1; .map_err(|e| Error::SqlxError { source: e }) } - async fn update_sort_key( + /// Update the sort key for `partition_id` if and only if `old_sort_key` + /// matches the current value in the database. + /// + /// This compare-and-swap operation is allowed to spuriously return + /// [`CasFailure::ValueMismatch`] for performance reasons (avoiding multiple + /// round trips to service a transaction in the happy path). + async fn cas_sort_key( &mut self, partition_id: PartitionId, - sort_key: &[&str], - ) -> Result { - let rec = sqlx::query_as::<_, Partition>( + old_sort_key: Option>, + new_sort_key: &[&str], + ) -> Result>> { + let old_sort_key = old_sort_key.unwrap_or_default(); + let res = sqlx::query_as::<_, Partition>( r#" UPDATE partition SET sort_key = $1 -WHERE id = $2 +WHERE id = $2 AND sort_key = $3 RETURNING *; "#, ) - .bind(sort_key) - .bind(partition_id) + .bind(new_sort_key) // $1 + .bind(partition_id) // $2 + .bind(&old_sort_key) // $3 .fetch_one(&mut self.inner) .await; - let partition = rec.map_err(|e| match e { - sqlx::Error::RowNotFound => Error::PartitionNotFound { id: partition_id }, - _ => Error::SqlxError { source: e }, - })?; + let partition = match res { + Ok(v) => v, + Err(sqlx::Error::RowNotFound) => { + // This update may have failed either because: + // + // * A row with the specified ID did not exist at query time + // (but may exist now!) + // * The sort key does not match. + // + // To differentiate, we submit a get partition query, returning + // the actual sort key if successful. + // + // NOTE: this is racy, but documented - this might return "Sort + // key differs! Old key: " + return Err(CasFailure::ValueMismatch( + PartitionRepo::get_by_id(self, partition_id) + .await + .map_err(CasFailure::QueryError)? + .ok_or(CasFailure::QueryError(Error::PartitionNotFound { + id: partition_id, + }))? + .sort_key, + )); + } + Err(e) => return Err(CasFailure::QueryError(Error::SqlxError { source: e })), + }; debug!( ?partition_id, - input_sort_key=?sort_key, - partition_after_catalog_update=?partition, - "Partition after updating sort key" + ?old_sort_key, + ?new_sort_key, + "partition sort key cas successful" ); Ok(partition) diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index e3113d586f..219552c4a4 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -495,7 +495,7 @@ impl TestTableBoundShard { let partition = repos .partitions() - .update_sort_key(partition.id, sort_key) + .cas_sort_key(partition.id, None, sort_key) .await .unwrap(); @@ -553,14 +553,27 @@ pub struct TestPartition { impl TestPartition { /// Update sort key. pub async fn update_sort_key(self: &Arc, sort_key: SortKey) -> Arc { + let old_sort_key = self + .catalog + .catalog + .repositories() + .await + .partitions() + .get_by_id(self.partition.id) + .await + .unwrap() + .unwrap() + .sort_key; + let partition = self .catalog .catalog .repositories() .await .partitions() - .update_sort_key( + .cas_sort_key( self.partition.id, + Some(old_sort_key), &sort_key.to_columns().collect::>(), ) .await @@ -897,7 +910,16 @@ async fn update_catalog_sort_key_if_needed( &new_columns, ); partitions_catalog - .update_sort_key(partition_id, &new_columns) + .cas_sort_key( + partition_id, + Some( + catalog_sort_key + .to_columns() + .map(ToString::to_string) + .collect::>(), + ), + &new_columns, + ) .await .unwrap(); } @@ -906,7 +928,7 @@ async fn update_catalog_sort_key_if_needed( let new_columns = sort_key.to_columns().collect::>(); debug!("Updating sort key from None to {:?}", &new_columns); partitions_catalog - .update_sort_key(partition_id, &new_columns) + .cas_sort_key(partition_id, None, &new_columns) .await .unwrap(); } diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 3f3799e87e..10fd37f8b5 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -495,7 +495,7 @@ mod tests { column_order: vec![c1.column.id, c2.column.id], } ); - assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); // subsets and the full key don't expire for should_cover in [ @@ -511,7 +511,7 @@ mod tests { sort_key.as_ref().unwrap(), sort_key_2.as_ref().unwrap() )); - assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); } // unknown columns expire @@ -529,7 +529,7 @@ mod tests { sort_key_2.as_ref().unwrap() )); assert_eq!(sort_key, sort_key_2); - assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); } fn schema() -> Arc {