feat(catalog): linearise sort key updates

Updating the sort key is not commutative and MUST be serialised. The
correctness of the current catalog interface relies on the caller
serialising updates globally, something it cannot reasonably assert in a
distributed system.

This change of the catalog interface pushes this responsibility to the
catalog itself where it can be effectively enforced, and allows a caller
to detect parallel updates to the sort key.
pull/24376/head
Dom Dwyer 2022-12-20 11:11:02 +01:00
parent 3e2362ea9b
commit adc6fcfb04
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
14 changed files with 274 additions and 80 deletions

View File

@ -842,7 +842,11 @@ pub mod tests {
// update sort key for this another_partition // update sort key for this another_partition
let another_partition = txn let another_partition = txn
.partitions() .partitions()
.update_sort_key(another_partition.id, &["tag1", "time"]) .cas_sort_key(
another_partition.id,
Some(another_partition.sort_key),
&["tag1", "time"],
)
.await .await
.unwrap(); .unwrap();
txn.commit().await.unwrap(); txn.commit().await.unwrap();

View File

@ -6,7 +6,7 @@ use data_types::{
Partition, PartitionKey, QueryPoolId, ShardId, TableSchema, TopicId, Partition, PartitionKey, QueryPoolId, ShardId, TableSchema, TopicId,
}; };
use influxdb_iox_client::connection::{Connection, GrpcConnection}; use influxdb_iox_client::connection::{Connection, GrpcConnection};
use iox_catalog::interface::{get_schema_by_name, Catalog, RepoCollection}; use iox_catalog::interface::{get_schema_by_name, CasFailure, Catalog, RepoCollection};
use schema::{ use schema::{
sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder}, sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder},
InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME, InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME,
@ -23,6 +23,9 @@ pub enum UpdateCatalogError {
#[error("Error returned from the Catalog: {0}")] #[error("Error returned from the Catalog: {0}")]
CatalogError(#[from] iox_catalog::interface::Error), CatalogError(#[from] iox_catalog::interface::Error),
#[error("Error returned from the Catalog: failed to cas sort key update")]
SortKeyCasError,
#[error("Couldn't construct namespace from org and bucket: {0}")] #[error("Couldn't construct namespace from org and bucket: {0}")]
InvalidOrgBucket(#[from] OrgBucketMappingError), InvalidOrgBucket(#[from] OrgBucketMappingError),
@ -301,9 +304,12 @@ where
let sort_key = sort_key.to_columns().collect::<Vec<_>>(); let sort_key = sort_key.to_columns().collect::<Vec<_>>();
repos repos
.partitions() .partitions()
.update_sort_key(partition.id, &sort_key) .cas_sort_key(partition.id, Some(partition.sort_key), &sort_key)
.await .await
.map_err(UpdateCatalogError::CatalogError)?; .map_err(|e| match e {
CasFailure::ValueMismatch(_) => UpdateCatalogError::SortKeyCasError,
CasFailure::QueryError(e) => UpdateCatalogError::CatalogError(e),
})?;
} }
} }
} }

View File

@ -939,7 +939,7 @@ mod tests {
.repositories() .repositories()
.await .await
.partitions() .partitions()
.update_sort_key(partition_id, &["terrific"]) .cas_sort_key(partition_id, None, &["terrific"])
.await .await
.unwrap(); .unwrap();

View File

@ -92,7 +92,7 @@ mod tests {
.repositories() .repositories()
.await .await
.partitions() .partitions()
.update_sort_key(partition_id, &["uno", "dos", "bananas"]) .cas_sort_key(partition_id, None, &["uno", "dos", "bananas"])
.await .await
.expect("should update existing partition key"); .expect("should update existing partition key");

View File

@ -12,7 +12,7 @@ use data_types::{
CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, ShardIndex, TableId, CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, ShardIndex, TableId,
}; };
use dml::DmlOperation; use dml::DmlOperation;
use iox_catalog::interface::{get_table_schema_by_id, Catalog}; use iox_catalog::interface::{get_table_schema_by_id, CasFailure, Catalog};
use iox_query::exec::Executor; use iox_query::exec::Executor;
use iox_time::{SystemProvider, TimeProvider}; use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, Metric, U64Histogram, U64HistogramOptions}; use metric::{Attributes, Metric, U64Histogram, U64HistogramOptions};
@ -442,7 +442,7 @@ impl Persister for IngesterData {
stream: record_stream, stream: record_stream,
catalog_sort_key_update, catalog_sort_key_update,
data_sort_key, data_sort_key,
} = compact_persisting_batch(&self.exec, sort_key, table_name.clone(), batch) } = compact_persisting_batch(&self.exec, sort_key.clone(), table_name.clone(), batch)
.await .await
.expect("unable to compact persisting batch"); .expect("unable to compact persisting batch");
@ -481,16 +481,35 @@ impl Persister for IngesterData {
// compactor may see a parquet file with an inconsistent // compactor may see a parquet file with an inconsistent
// sort key. https://github.com/influxdata/influxdb_iox/issues/5090 // sort key. https://github.com/influxdata/influxdb_iox/issues/5090
if let Some(new_sort_key) = catalog_sort_key_update { if let Some(new_sort_key) = catalog_sort_key_update {
let sort_key = new_sort_key.to_columns().collect::<Vec<_>>(); let new_sort_key_str = new_sort_key.to_columns().collect::<Vec<_>>();
let old_sort_key: Option<Vec<String>> =
sort_key.map(|v| v.to_columns().map(ToString::to_string).collect());
Backoff::new(&self.backoff_config) Backoff::new(&self.backoff_config)
.retry_all_errors("update_sort_key", || async { .retry_all_errors("cas_sort_key", || {
let mut repos = self.catalog.repositories().await; let old_sort_key = old_sort_key.clone();
let _partition = repos async {
.partitions() let mut repos = self.catalog.repositories().await;
.update_sort_key(partition_id, &sort_key) match repos
.await?; .partitions()
// compiler insisted on getting told the type of the error :shrug: .cas_sort_key(partition_id, old_sort_key, &new_sort_key_str)
Ok(()) as Result<(), iox_catalog::interface::Error> .await
{
Ok(_) => {}
Err(CasFailure::ValueMismatch(_)) => {
// An ingester concurrently updated the sort key.
//
// This breaks a sort-key update invariant - sort
// key updates MUST be serialised. This should
// not happen because writes for a given table
// are always mapped to a single ingester
// instance.
panic!("detected concurrent sort key update");
}
Err(CasFailure::QueryError(e)) => return Err(e),
};
// compiler insisted on getting told the type of the error :shrug:
Ok(()) as Result<(), iox_catalog::interface::Error>
}
}) })
.await .await
.expect("retry forever"); .expect("retry forever");
@ -507,7 +526,7 @@ impl Persister for IngesterData {
%table_name, %table_name,
%partition_id, %partition_id,
%partition_key, %partition_key,
old_sort_key = ?sort_key, ?old_sort_key,
%new_sort_key, %new_sort_key,
"adjusted sort key during batch compact & persist" "adjusted sort key during batch compact & persist"
); );

View File

@ -1006,7 +1006,7 @@ mod tests {
.repositories() .repositories()
.await .await
.partitions() .partitions()
.update_sort_key(partition_id, &["terrific"]) .cas_sort_key(partition_id, None, &["terrific"])
.await .await
.unwrap(); .unwrap();

View File

@ -92,7 +92,7 @@ mod tests {
.repositories() .repositories()
.await .await
.partitions() .partitions()
.update_sort_key(partition_id, &["uno", "dos", "bananas"]) .cas_sort_key(partition_id, None, &["uno", "dos", "bananas"])
.await .await
.expect("should update existing partition key"); .expect("should update existing partition key");

View File

@ -5,7 +5,7 @@ use data_types::{
CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber,
ShardId, TableId, ShardId, TableId,
}; };
use iox_catalog::interface::get_table_schema_by_id; use iox_catalog::interface::{get_table_schema_by_id, CasFailure};
use iox_time::{SystemProvider, TimeProvider}; use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::*; use observability_deps::tracing::*;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -319,15 +319,54 @@ impl Context {
// the consumer of the parquet file will observe an inconsistent sort // the consumer of the parquet file will observe an inconsistent sort
// key. // key.
if let Some(new_sort_key) = sort_key_update { if let Some(new_sort_key) = sort_key_update {
let sort_key = new_sort_key.to_columns().collect::<Vec<_>>(); let old_sort_key = self
.sort_key
.get()
.await
.map(|v| v.to_columns().map(|v| v.to_string()).collect::<Vec<_>>());
Backoff::new(&Default::default()) Backoff::new(&Default::default())
.retry_all_errors("update_sort_key", || async { .retry_all_errors("cas_sort_key", || {
let mut repos = self.inner.catalog.repositories().await; let mut old_sort_key = old_sort_key.clone();
let _partition = repos let new_sort_key_str = new_sort_key.to_columns().collect::<Vec<_>>();
.partitions() let catalog = Arc::clone(&self.inner.catalog);
.update_sort_key(self.partition_id, &sort_key) async move {
.await?; let mut repos = catalog.repositories().await;
Ok(()) as Result<(), iox_catalog::interface::Error> loop {
match repos
.partitions()
.cas_sort_key(
self.partition_id,
old_sort_key.clone(),
&new_sort_key_str,
)
.await
{
Ok(_) => break,
Err(CasFailure::ValueMismatch(old)) => {
// An ingester concurrently updated the sort
// key.
//
// This breaks a sort-key update invariant -
// sort key updates MUST be serialised. This
// currently cannot be enforced.
//
// See:
// https://github.com/influxdata/influxdb_iox/issues/6439
//
error!(
expected=?old_sort_key,
observed=?old,
update=?new_sort_key_str,
"detected concurrent sort key update"
);
// Retry using the new CAS value.
old_sort_key = Some(old);
}
Err(CasFailure::QueryError(e)) => return Err(e),
};
}
Ok(()) as Result<(), iox_catalog::interface::Error>
}
}) })
.await .await
.expect("retry forever"); .expect("retry forever");
@ -340,10 +379,11 @@ impl Context {
guard.update_sort_key(Some(new_sort_key.clone())); guard.update_sort_key(Some(new_sort_key.clone()));
}; };
// Assert the serialisation of sort key updates. // Assert the internal (to this instance) serialisation of sort key
// updates.
// //
// Both of these get() should not block due to both of the // Both of these get() should not block due to both of the values
// values having been previously resolved / used. // having been previously resolved / used.
assert_eq!(old_key.get().await, self.sort_key.get().await); assert_eq!(old_key.get().await, self.sort_key.get().await);
debug!( debug!(
@ -354,7 +394,7 @@ impl Context {
partition_id = %self.partition_id, partition_id = %self.partition_id,
partition_key = %self.partition_key, partition_key = %self.partition_key,
%object_store_id, %object_store_id,
old_sort_key = ?sort_key, ?old_sort_key,
%new_sort_key, %new_sort_key,
"adjusted partition sort key" "adjusted partition sort key"
); );

View File

@ -17,6 +17,18 @@ use std::{
}; };
use uuid::Uuid; use uuid::Uuid;
/// An error wrapper detailing the reason for a compare-and-swap failure.
#[derive(Debug)]
pub enum CasFailure<T> {
/// The compare-and-swap failed because the current value differers from the
/// comparator.
///
/// Contains the new current value.
ValueMismatch(T),
/// A query error occurred.
QueryError(Error),
}
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)] #[allow(missing_copy_implementations, missing_docs)]
#[snafu(visibility(pub(crate)))] #[snafu(visibility(pub(crate)))]
@ -441,15 +453,22 @@ pub trait PartitionRepo: Send + Sync {
/// return the partitions by table id /// return the partitions by table id
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>; async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
/// Update the sort key for the partition. /// Update the sort key for the partition, setting it to `new_sort_key` iff
/// the current value matches `old_sort_key`. Returns
/// ///
/// NOTE: it is expected that ONLY the ingesters update sort keys for /// NOTE: it is expected that ONLY the ingesters update sort keys for
/// existing partitions. /// existing partitions.
async fn update_sort_key( ///
/// # Spurious failure
///
/// Implementations are allowed to spuriously return
/// [`CasFailure::ValueMismatch`] for performance reasons.
async fn cas_sort_key(
&mut self, &mut self,
partition_id: PartitionId, partition_id: PartitionId,
sort_key: &[&str], old_sort_key: Option<Vec<String>>,
) -> Result<Partition>; new_sort_key: &[&str],
) -> Result<Partition, CasFailure<Vec<String>>>;
/// Record an instance of a partition being selected for compaction but compaction was not /// Record an instance of a partition being selected for compaction but compaction was not
/// completed for the specified reason. /// completed for the specified reason.
@ -1570,10 +1589,24 @@ pub(crate) mod test_helpers {
// test update_sort_key from None to Some // test update_sort_key from None to Some
repos repos
.partitions() .partitions()
.update_sort_key(other_partition.id, &["tag2", "tag1", "time"]) .cas_sort_key(other_partition.id, None, &["tag2", "tag1", "time"])
.await .await
.unwrap(); .unwrap();
// test sort key CAS with an incorrect value
let err = repos
.partitions()
.cas_sort_key(
other_partition.id,
Some(["bananas".to_string()].to_vec()),
&["tag2", "tag1", "tag3 , with comma", "time"],
)
.await
.expect_err("CAS with incorrect value should fail");
assert_matches!(err, CasFailure::ValueMismatch(old) => {
assert_eq!(old, &["tag2", "tag1", "time"]);
});
// test getting the new sort key // test getting the new sort key
let updated_other_partition = repos let updated_other_partition = repos
.partitions() .partitions()
@ -1586,11 +1619,45 @@ pub(crate) mod test_helpers {
vec!["tag2", "tag1", "time"] vec!["tag2", "tag1", "time"]
); );
// test sort key CAS with no value
let err = repos
.partitions()
.cas_sort_key(
other_partition.id,
None,
&["tag2", "tag1", "tag3 , with comma", "time"],
)
.await
.expect_err("CAS with incorrect value should fail");
assert_matches!(err, CasFailure::ValueMismatch(old) => {
assert_eq!(old, ["tag2", "tag1", "time"]);
});
// test sort key CAS with an incorrect value
let err = repos
.partitions()
.cas_sort_key(
other_partition.id,
Some(["bananas".to_string()].to_vec()),
&["tag2", "tag1", "tag3 , with comma", "time"],
)
.await
.expect_err("CAS with incorrect value should fail");
assert_matches!(err, CasFailure::ValueMismatch(old) => {
assert_eq!(old, ["tag2", "tag1", "time"]);
});
// test update_sort_key from Some value to Some other value // test update_sort_key from Some value to Some other value
repos repos
.partitions() .partitions()
.update_sort_key( .cas_sort_key(
other_partition.id, other_partition.id,
Some(
["tag2", "tag1", "time"]
.into_iter()
.map(ToString::to_string)
.collect(),
),
&["tag2", "tag1", "tag3 , with comma", "time"], &["tag2", "tag1", "tag3 , with comma", "time"],
) )
.await .await

View File

@ -3,10 +3,10 @@
use crate::{ use crate::{
interface::{ interface::{
sealed::TransactionFinalize, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu,
NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo, QueryPoolRepo, Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo,
RepoCollection, Result, ShardRepo, TableRepo, TombstoneRepo, TopicMetadataRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, TableRepo, TombstoneRepo,
Transaction, TopicMetadataRepo, Transaction,
}, },
metrics::MetricDecorator, metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
@ -818,18 +818,23 @@ impl PartitionRepo for MemTxn {
Ok(partitions) Ok(partitions)
} }
async fn update_sort_key( async fn cas_sort_key(
&mut self, &mut self,
partition_id: PartitionId, partition_id: PartitionId,
sort_key: &[&str], old_sort_key: Option<Vec<String>>,
) -> Result<Partition> { new_sort_key: &[&str],
) -> Result<Partition, CasFailure<Vec<String>>> {
let stage = self.stage(); let stage = self.stage();
let old_sort_key = old_sort_key.unwrap_or_default();
match stage.partitions.iter_mut().find(|p| p.id == partition_id) { match stage.partitions.iter_mut().find(|p| p.id == partition_id) {
Some(p) => { Some(p) if p.sort_key == old_sort_key => {
p.sort_key = sort_key.iter().map(|s| s.to_string()).collect(); p.sort_key = new_sort_key.iter().map(|s| s.to_string()).collect();
Ok(p.clone()) Ok(p.clone())
} }
None => Err(Error::PartitionNotFound { id: partition_id }), Some(p) => return Err(CasFailure::ValueMismatch(p.sort_key.clone())),
None => Err(CasFailure::QueryError(Error::PartitionNotFound {
id: partition_id,
})),
} }
} }

View File

@ -1,9 +1,9 @@
//! Metric instrumentation for catalog implementations. //! Metric instrumentation for catalog implementations.
use crate::interface::{ use crate::interface::{
sealed::TransactionFinalize, ColumnRepo, NamespaceRepo, ParquetFileRepo, PartitionRepo, sealed::TransactionFinalize, CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo,
ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, TableRepo, PartitionRepo, ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo,
TombstoneRepo, TopicMetadataRepo, TableRepo, TombstoneRepo, TopicMetadataRepo,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{ use data_types::{
@ -138,7 +138,7 @@ macro_rules! decorate {
$metric:literal = $method:ident( $metric:literal = $method:ident(
&mut self $(,)? &mut self $(,)?
$($arg:ident : $t:ty),* $($arg:ident : $t:ty),*
) -> Result<$out:ty>; ) -> Result<$out:ty$(, $err:ty)?>;
)+] )+]
) => { ) => {
#[async_trait] #[async_trait]
@ -149,7 +149,7 @@ macro_rules! decorate {
/// below. /// below.
$( $(
async fn $method(&mut self, $($arg : $t),*) -> Result<$out> { async fn $method(&mut self, $($arg : $t),*) -> Result<$out$(, $err)?> {
let observer: Metric<DurationHistogram> = self.metrics.register_metric( let observer: Metric<DurationHistogram> = self.metrics.register_metric(
"catalog_op_duration", "catalog_op_duration",
"catalog call duration", "catalog call duration",
@ -245,7 +245,7 @@ decorate!(
"partition_list_by_shard" = list_by_shard(&mut self, shard_id: ShardId) -> Result<Vec<Partition>>; "partition_list_by_shard" = list_by_shard(&mut self, shard_id: ShardId) -> Result<Vec<Partition>>;
"partition_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>>; "partition_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>>;
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>; "partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
"partition_update_sort_key" = update_sort_key(&mut self, partition_id: PartitionId, sort_key: &[&str]) -> Result<Partition>; "partition_update_sort_key" = cas_sort_key(&mut self, partition_id: PartitionId, old_sort_key: Option<Vec<String>>, new_sort_key: &[&str]) -> Result<Partition, CasFailure<Vec<String>>>;
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>; "partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>;
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>; "partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
"partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>; "partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;

View File

@ -2,10 +2,10 @@
use crate::{ use crate::{
interface::{ interface::{
self, sealed::TransactionFinalize, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, self, sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo,
NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo, QueryPoolRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, ParquetFileRepo, PartitionRepo,
RepoCollection, Result, ShardRepo, TableRepo, TombstoneRepo, TopicMetadataRepo, ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, TableRepo,
Transaction, TombstoneRepo, TopicMetadataRepo, Transaction,
}, },
metrics::MetricDecorator, metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
@ -1234,34 +1234,65 @@ WHERE table_id = $1;
.map_err(|e| Error::SqlxError { source: e }) .map_err(|e| Error::SqlxError { source: e })
} }
async fn update_sort_key( /// Update the sort key for `partition_id` if and only if `old_sort_key`
/// matches the current value in the database.
///
/// This compare-and-swap operation is allowed to spuriously return
/// [`CasFailure::ValueMismatch`] for performance reasons (avoiding multiple
/// round trips to service a transaction in the happy path).
async fn cas_sort_key(
&mut self, &mut self,
partition_id: PartitionId, partition_id: PartitionId,
sort_key: &[&str], old_sort_key: Option<Vec<String>>,
) -> Result<Partition> { new_sort_key: &[&str],
let rec = sqlx::query_as::<_, Partition>( ) -> Result<Partition, CasFailure<Vec<String>>> {
let old_sort_key = old_sort_key.unwrap_or_default();
let res = sqlx::query_as::<_, Partition>(
r#" r#"
UPDATE partition UPDATE partition
SET sort_key = $1 SET sort_key = $1
WHERE id = $2 WHERE id = $2 AND sort_key = $3
RETURNING *; RETURNING *;
"#, "#,
) )
.bind(sort_key) .bind(new_sort_key) // $1
.bind(partition_id) .bind(partition_id) // $2
.bind(&old_sort_key) // $3
.fetch_one(&mut self.inner) .fetch_one(&mut self.inner)
.await; .await;
let partition = rec.map_err(|e| match e { let partition = match res {
sqlx::Error::RowNotFound => Error::PartitionNotFound { id: partition_id }, Ok(v) => v,
_ => Error::SqlxError { source: e }, Err(sqlx::Error::RowNotFound) => {
})?; // This update may have failed either because:
//
// * A row with the specified ID did not exist at query time
// (but may exist now!)
// * The sort key does not match.
//
// To differentiate, we submit a get partition query, returning
// the actual sort key if successful.
//
// NOTE: this is racy, but documented - this might return "Sort
// key differs! Old key: <old sort key you provided>"
return Err(CasFailure::ValueMismatch(
PartitionRepo::get_by_id(self, partition_id)
.await
.map_err(CasFailure::QueryError)?
.ok_or(CasFailure::QueryError(Error::PartitionNotFound {
id: partition_id,
}))?
.sort_key,
));
}
Err(e) => return Err(CasFailure::QueryError(Error::SqlxError { source: e })),
};
debug!( debug!(
?partition_id, ?partition_id,
input_sort_key=?sort_key, ?old_sort_key,
partition_after_catalog_update=?partition, ?new_sort_key,
"Partition after updating sort key" "partition sort key cas successful"
); );
Ok(partition) Ok(partition)

View File

@ -495,7 +495,7 @@ impl TestTableBoundShard {
let partition = repos let partition = repos
.partitions() .partitions()
.update_sort_key(partition.id, sort_key) .cas_sort_key(partition.id, None, sort_key)
.await .await
.unwrap(); .unwrap();
@ -553,14 +553,27 @@ pub struct TestPartition {
impl TestPartition { impl TestPartition {
/// Update sort key. /// Update sort key.
pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Arc<Self> { pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Arc<Self> {
let old_sort_key = self
.catalog
.catalog
.repositories()
.await
.partitions()
.get_by_id(self.partition.id)
.await
.unwrap()
.unwrap()
.sort_key;
let partition = self let partition = self
.catalog .catalog
.catalog .catalog
.repositories() .repositories()
.await .await
.partitions() .partitions()
.update_sort_key( .cas_sort_key(
self.partition.id, self.partition.id,
Some(old_sort_key),
&sort_key.to_columns().collect::<Vec<_>>(), &sort_key.to_columns().collect::<Vec<_>>(),
) )
.await .await
@ -897,7 +910,16 @@ async fn update_catalog_sort_key_if_needed(
&new_columns, &new_columns,
); );
partitions_catalog partitions_catalog
.update_sort_key(partition_id, &new_columns) .cas_sort_key(
partition_id,
Some(
catalog_sort_key
.to_columns()
.map(ToString::to_string)
.collect::<Vec<_>>(),
),
&new_columns,
)
.await .await
.unwrap(); .unwrap();
} }
@ -906,7 +928,7 @@ async fn update_catalog_sort_key_if_needed(
let new_columns = sort_key.to_columns().collect::<Vec<_>>(); let new_columns = sort_key.to_columns().collect::<Vec<_>>();
debug!("Updating sort key from None to {:?}", &new_columns); debug!("Updating sort key from None to {:?}", &new_columns);
partitions_catalog partitions_catalog
.update_sort_key(partition_id, &new_columns) .cas_sort_key(partition_id, None, &new_columns)
.await .await
.unwrap(); .unwrap();
} }

View File

@ -495,7 +495,7 @@ mod tests {
column_order: vec![c1.column.id, c2.column.id], column_order: vec![c1.column.id, c2.column.id],
} }
); );
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
// subsets and the full key don't expire // subsets and the full key don't expire
for should_cover in [ for should_cover in [
@ -511,7 +511,7 @@ mod tests {
sort_key.as_ref().unwrap(), sort_key.as_ref().unwrap(),
sort_key_2.as_ref().unwrap() sort_key_2.as_ref().unwrap()
)); ));
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
} }
// unknown columns expire // unknown columns expire
@ -529,7 +529,7 @@ mod tests {
sort_key_2.as_ref().unwrap() sort_key_2.as_ref().unwrap()
)); ));
assert_eq!(sort_key, sort_key_2); assert_eq!(sort_key, sort_key_2);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
} }
fn schema() -> Arc<Schema> { fn schema() -> Arc<Schema> {