diff --git a/compactor_test_utils/src/lib.rs b/compactor_test_utils/src/lib.rs index 4e954bb24e..d27b814cb4 100644 --- a/compactor_test_utils/src/lib.rs +++ b/compactor_test_utils/src/lib.rs @@ -42,7 +42,7 @@ use compactor::{ PartitionInfo, }; use compactor_scheduler::SchedulerConfig; -use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId}; +use data_types::{ColumnType, CompactionLevel, ParquetFile, SortedColumnSet, TableId}; use datafusion::arrow::record_batch::RecordBatch; use datafusion_util::config::register_iox_object_store; use futures::TryStreamExt; @@ -98,17 +98,21 @@ impl TestSetupBuilder { let ns = catalog.create_namespace_1hr_retention("ns").await; let table = ns.create_table("table").await; table.create_column("field_int", ColumnType::I64).await; - table.create_column("tag1", ColumnType::Tag).await; - table.create_column("tag2", ColumnType::Tag).await; - table.create_column("tag3", ColumnType::Tag).await; - table.create_column("time", ColumnType::Time).await; + let tag1 = table.create_column("tag1", ColumnType::Tag).await; + let tag2 = table.create_column("tag2", ColumnType::Tag).await; + let tag3 = table.create_column("tag3", ColumnType::Tag).await; + let col_time = table.create_column("time", ColumnType::Time).await; let partition = table.create_partition("2022-07-13").await; // The sort key comes from the catalog and should be the union of all tags the // ingester has seen let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]); - let partition = partition.update_sort_key(sort_key.clone()).await; + let sort_key_col_ids = + SortedColumnSet::from([tag1.id(), tag2.id(), tag3.id(), col_time.id()]); + let partition = partition + .update_sort_key(sort_key.clone(), &sort_key_col_ids) + .await; // Ensure the input scenario conforms to the expected invariants. let invariant_check = Arc::new(CatalogInvariants { diff --git a/data_types/src/columns.rs b/data_types/src/columns.rs index 4e63cf79fa..96fd859eb7 100644 --- a/data_types/src/columns.rs +++ b/data_types/src/columns.rs @@ -90,6 +90,17 @@ impl ColumnsByName { self.0.values().map(|c| c.id) } + /// Return column ids of the given column names + /// Will panic if any of the names are not found + pub fn ids_for_names(&self, names: &[&str]) -> SortedColumnSet { + SortedColumnSet::from(names.iter().map(|name| { + self.get(name) + .unwrap_or_else(|| panic!("column name not found: {}", name)) + .id + .get() + })) + } + /// Get a column by its name. pub fn get(&self, name: &str) -> Option<&ColumnSchema> { self.0.get(name) @@ -364,6 +375,11 @@ impl ColumnSet { pub fn size(&self) -> usize { std::mem::size_of_val(self) + (std::mem::size_of::() * self.0.capacity()) } + + /// The set is empty or not + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } } impl From for Vec { @@ -531,4 +547,61 @@ mod tests { ColumnSchema::try_from(&proto).expect_err("should succeed"); } + + #[test] + fn test_columns_by_names_exist() { + let columns = build_columns_by_names(); + + let ids = columns.ids_for_names(&["foo", "bar"]); + assert_eq!(ids, SortedColumnSet::from([1, 2])); + } + + #[test] + fn test_columns_by_names_exist_different_order() { + let columns = build_columns_by_names(); + + let ids = columns.ids_for_names(&["bar", "foo"]); + assert_eq!(ids, SortedColumnSet::from([2, 1])); + } + + #[test] + #[should_panic = "column name not found: baz"] + fn test_columns_by_names_not_exist() { + let columns = build_columns_by_names(); + columns.ids_for_names(&["foo", "baz"]); + } + + fn build_columns_by_names() -> ColumnsByName { + let mut columns: BTreeMap = BTreeMap::new(); + columns.insert( + "foo".to_string(), + ColumnSchema { + id: ColumnId::new(1), + column_type: ColumnType::I64, + }, + ); + columns.insert( + "bar".to_string(), + ColumnSchema { + id: ColumnId::new(2), + column_type: ColumnType::I64, + }, + ); + columns.insert( + "time".to_string(), + ColumnSchema { + id: ColumnId::new(3), + column_type: ColumnType::Time, + }, + ); + columns.insert( + "tag1".to_string(), + ColumnSchema { + id: ColumnId::new(4), + column_type: ColumnType::Tag, + }, + ); + + ColumnsByName(columns) + } } diff --git a/data_types/src/partition.rs b/data_types/src/partition.rs index 884da1b9ca..e924624cd5 100644 --- a/data_types/src/partition.rs +++ b/data_types/src/partition.rs @@ -467,6 +467,11 @@ impl Partition { Some(SortKey::from_columns(self.sort_key.iter().map(|s| &**s))) } + + /// The sort_key_ids if present + pub fn sort_key_ids(&self) -> Option<&SortedColumnSet> { + self.sort_key_ids.as_ref() + } } #[cfg(test)] diff --git a/generated_types/protos/influxdata/iox/catalog/v1/service.proto b/generated_types/protos/influxdata/iox/catalog/v1/service.proto index 3e370874dc..94a6c84446 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/service.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/service.proto @@ -50,6 +50,14 @@ message Partition { repeated string array_sort_key = 6; PartitionIdentifier identifier = 8; + + // the sort key ids sort_key_ids for data in parquet files of this partition which + // is an array of column ids of the sort keys + optional SortKeyIds sort_key_ids = 9; +} + +message SortKeyIds { + repeated int64 array_sort_key_ids = 1; } message GetPartitionsByTableIdRequest { diff --git a/import_export/src/file/import.rs b/import_export/src/file/import.rs index ed67dbcd62..09d66b76d3 100644 --- a/import_export/src/file/import.rs +++ b/import_export/src/file/import.rs @@ -6,8 +6,9 @@ use data_types::{ partition_template::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, PARTITION_BY_DAY_PROTO, }, - ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceName, NamespaceNameError, - ParquetFileParams, Partition, Statistics, Table, TableId, Timestamp, + ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceName, + NamespaceNameError, ParquetFileParams, Partition, PartitionKey, SortedColumnSet, Statistics, + Table, TableId, Timestamp, }; use generated_types::influxdata::iox::catalog::v1 as proto; // ParquetFile as ProtoParquetFile, Partition as ProtoPartition, @@ -369,8 +370,10 @@ impl RemoteImporter { let table_id = table.id; debug!(%table_id, "Inserting catalog records into table"); - let partition = self - .partition_for_parquet_file(repos.as_mut(), &table, &iox_metadata) + // Create a new partition + let partition_key = iox_metadata.partition_key.clone(); + let mut partition = self + .create_partition(repos.as_mut(), &table, partition_key) .await?; // Note that for some reason, the object_store_id that is @@ -416,6 +419,11 @@ impl RemoteImporter { } }; + // Update partition sort key + let partition = self + .update_partition(&mut partition, repos.as_mut(), &table, &iox_metadata) + .await?; + // Now copy the parquet files into the object store let transition_partition_id = partition.transition_partition_id(); @@ -478,25 +486,42 @@ impl RemoteImporter { Ok(table) } - /// Return the catalog [`Partition`] into which the specified parquet + /// Create the catalog [`Partition`] into which the specified parquet + /// file shoudl be inserted. + /// + /// The sort_key and sort_key_ids of the partition should be empty when it is first created + /// because there are no columns in any parquet files to use for sorting yet. + /// The sort_key and sort_key_ids will be updated after the parquet files are created. + async fn create_partition( + &self, + repos: &mut dyn RepoCollection, + table: &Table, + partition_key: PartitionKey, + ) -> Result { + let partition = repos + .partitions() + .create_or_get(partition_key, table.id) + .await?; + + Ok(partition) + } + + /// Update sort keys of the partition + /// /// file shoudl be inserted. /// /// First attempts to use any available metadata from the /// catalog export, and falls back to what is in the iox /// metadata stored in the parquet file, if needed - async fn partition_for_parquet_file( + async fn update_partition( &self, + partition: &mut Partition, repos: &mut dyn RepoCollection, table: &Table, iox_metadata: &IoxMetadata, ) -> Result { let partition_key = iox_metadata.partition_key.clone(); - let partition = repos - .partitions() - .create_or_get(partition_key.clone(), table.id) - .await?; - // Note we use the table_id embedded in the file's metadata // from the source catalog to match the exported catlog (which // is dfferent than the new table we just created in the @@ -505,14 +530,24 @@ impl RemoteImporter { .exported_contents .partition_metadata(iox_metadata.table_id.get(), partition_key.inner()); - let new_sort_key: Vec<&str> = if let Some(proto_partition) = proto_partition.as_ref() { + let (new_sort_key, new_sort_key_ids) = if let Some(proto_partition) = + proto_partition.as_ref() + { // Use the sort key from the source catalog debug!(array_sort_key=?proto_partition.array_sort_key, "Using sort key from catalog export"); - proto_partition + let new_sort_key = proto_partition .array_sort_key .iter() .map(|s| s.as_str()) - .collect() + .collect::>(); + + let new_sort_key_ids = match &proto_partition.sort_key_ids { + Some(sort_key_ids) => sort_key_ids.array_sort_key_ids.clone(), + None => vec![], + }; + let new_sort_key_ids = SortedColumnSet::from(new_sort_key_ids); + + (new_sort_key, new_sort_key_ids) } else { warn!("Could not find sort key in catalog metadata export, falling back to embedded metadata"); let sort_key = iox_metadata @@ -520,7 +555,13 @@ impl RemoteImporter { .as_ref() .ok_or_else(|| Error::NoSortKey)?; - sort_key.to_columns().collect() + let new_sort_key = sort_key.to_columns().collect::>(); + + // fecth table columns + let columns = ColumnsByName::new(repos.columns().list_by_table_id(table.id).await?); + let new_sort_key_ids = columns.ids_for_names(&new_sort_key); + + (new_sort_key, new_sort_key_ids) }; if !partition.sort_key.is_empty() && partition.sort_key != new_sort_key { @@ -536,6 +577,7 @@ impl RemoteImporter { &partition.transition_partition_id(), Some(partition.sort_key.clone()), &new_sort_key, + &new_sort_key_ids, ) .await; diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index b9eede03c4..d5c435b151 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use data_types::{ - sequence_number_set::SequenceNumberSet, NamespaceId, PartitionKey, SequenceNumber, TableId, - TimestampMinMax, TransitionPartitionId, + sequence_number_set::SequenceNumberSet, NamespaceId, PartitionKey, SequenceNumber, + SortedColumnSet, TableId, TimestampMinMax, TransitionPartitionId, }; use mutable_batch::MutableBatch; use observability_deps::tracing::*; @@ -31,7 +31,7 @@ pub(crate) enum SortKeyState { /// The [`SortKey`] has not yet been fetched from the catalog, and will be /// lazy loaded (or loaded in the background) by a call to /// [`DeferredLoad::get()`]. - Deferred(Arc>>), + Deferred(Arc, Option)>>), /// The sort key is known and specified. Provided(Option), } @@ -39,7 +39,7 @@ pub(crate) enum SortKeyState { impl SortKeyState { pub(crate) async fn get(&self) -> Option { match self { - Self::Deferred(v) => v.get().await, + Self::Deferred(v) => v.get().await.0, Self::Provided(v) => v.clone(), } } @@ -423,6 +423,7 @@ mod tests { use arrow_util::assert_batches_eq; use assert_matches::assert_matches; use backoff::BackoffConfig; + use data_types::SortedColumnSet; use datafusion::{ physical_expr::PhysicalSortExpr, physical_plan::{expressions::col, memory::MemoryExec, ExecutionPlan}, @@ -1003,18 +1004,26 @@ mod tests { .create_or_get(partition_key.clone(), table_id) .await .expect("should create"); - // Test: sort_key_ids from create_or_get - assert!(partition.sort_key_ids.is_none()); + // Test: sort_key_ids from create_or_get which is empty + assert!(partition.sort_key_ids().unwrap().is_empty()); let updated_partition = catalog .repositories() .await .partitions() - .cas_sort_key(&partition.transition_partition_id(), None, &["terrific"]) + .cas_sort_key( + &partition.transition_partition_id(), + None, + &["terrific"], + &SortedColumnSet::from([1]), + ) .await .unwrap(); // Test: sort_key_ids after updating - assert!(updated_partition.sort_key_ids.is_none()); + assert_eq!( + updated_partition.sort_key_ids(), + Some(&SortedColumnSet::from([1])) + ); // Read the just-created sort key (None) let fetcher = Arc::new(DeferredLoad::new( diff --git a/ingester/src/buffer_tree/partition/resolver/cache.rs b/ingester/src/buffer_tree/partition/resolver/cache.rs index d455ce5bb0..59014d14ec 100644 --- a/ingester/src/buffer_tree/partition/resolver/cache.rs +++ b/ingester/src/buffer_tree/partition/resolver/cache.rs @@ -216,6 +216,7 @@ mod tests { // Harmless in tests - saves a bunch of extra vars. #![allow(clippy::await_holding_lock)] + use data_types::SortedColumnSet; use iox_catalog::mem::MemCatalog; use super::*; @@ -288,7 +289,7 @@ mod tests { ARBITRARY_TABLE_ID, stored_partition_key.clone(), vec!["dos".to_string(), "bananas".to_string()], - None, + Some(SortedColumnSet::from([1, 2])), Default::default(), ); diff --git a/ingester/src/buffer_tree/partition/resolver/sort_key.rs b/ingester/src/buffer_tree/partition/resolver/sort_key.rs index a5fcc8c497..6f5b207e39 100644 --- a/ingester/src/buffer_tree/partition/resolver/sort_key.rs +++ b/ingester/src/buffer_tree/partition/resolver/sort_key.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use backoff::{Backoff, BackoffConfig}; -use data_types::{PartitionKey, TableId}; +use data_types::{PartitionKey, SortedColumnSet, TableId}; use iox_catalog::interface::Catalog; use schema::sort::SortKey; @@ -31,19 +31,20 @@ impl SortKeyResolver { } } - /// Fetch the [`SortKey`] from the [`Catalog`] for `partition_id`, retrying - /// endlessly when errors occur. - pub(crate) async fn fetch(self) -> Option { + /// Fetch the [`SortKey`] and its corresponding sort key ids from the from the [`Catalog`] + /// for `partition_id`, retrying endlessly when errors occur. + pub(crate) async fn fetch(self) -> (Option, Option) { Backoff::new(&self.backoff_config) .retry_all_errors("fetch partition sort key", || async { let mut repos = self.catalog.repositories().await; - let s = repos + let partition = repos .partitions() .create_or_get(self.partition_key.clone(), self.table_id) - .await? - .sort_key(); + .await?; - Result::<_, iox_catalog::interface::Error>::Ok(s) + let (sort_key, sort_key_ids) = (partition.sort_key(), partition.sort_key_ids); + + Result::<_, iox_catalog::interface::Error>::Ok((sort_key, sort_key_ids)) }) .await .expect("retry forever") @@ -54,6 +55,8 @@ impl SortKeyResolver { mod tests { use std::sync::Arc; + use data_types::SortedColumnSet; + use super::*; use crate::test_util::populate_catalog; @@ -78,8 +81,9 @@ mod tests { .create_or_get(PARTITION_KEY.into(), table_id) .await .expect("should create"); - // Test: sort_key_ids from create_or_get - assert!(partition.sort_key_ids.is_none()); + + // Test: sort_key_ids from create_or_get which is empty + assert!(partition.sort_key_ids().unwrap().is_empty()); let fetcher = SortKeyResolver::new( PARTITION_KEY.into(), @@ -97,14 +101,15 @@ mod tests { &partition.transition_partition_id(), None, &["uno", "dos", "bananas"], + &SortedColumnSet::from([1, 2, 3]), ) .await .expect("should update existing partition key"); - let fetched = fetcher.fetch().await; - assert_eq!(fetched, catalog_state.sort_key()); - - // Test: sort_key_ids after updating - assert!(catalog_state.sort_key_ids.is_none()); + // Test: sort_key_ids from cas_sort_key + // fetch sort key for the partition from the catalog + let (fetched_sort_key, fetched_sort_key_ids) = fetcher.fetch().await; + assert_eq!(fetched_sort_key, catalog_state.sort_key()); + assert_eq!(fetched_sort_key_ids, catalog_state.sort_key_ids); } } diff --git a/ingester/src/persist/handle.rs b/ingester/src/persist/handle.rs index 79719f2e3a..b2ba164bef 100644 --- a/ingester/src/persist/handle.rs +++ b/ingester/src/persist/handle.rs @@ -418,7 +418,13 @@ impl PersistQueue for PersistHandle { // queries to at most `n_workers`. let sort_key = match partition.lock().sort_key() { - SortKeyState::Deferred(v) => v.peek().flatten(), + SortKeyState::Deferred(v) => { + let sort_key = v.peek(); + match sort_key { + None => None, + Some((sort_key, _sort_key_ids)) => sort_key, + } + } SortKeyState::Provided(v) => v.as_ref().cloned(), }; @@ -475,6 +481,7 @@ mod tests { use std::{sync::Arc, task::Poll, time::Duration}; use assert_matches::assert_matches; + use data_types::SortedColumnSet; use futures::Future; use iox_catalog::mem::MemCatalog; use object_store::memory::InMemory; @@ -648,7 +655,7 @@ mod tests { // Generate a partition with a resolved, but empty sort key. let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new( Duration::from_secs(1), - async { None }, + async { (None, None) }, &metrics, )))) .await; @@ -736,7 +743,12 @@ mod tests { // the data within the partition's buffer. let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new( Duration::from_secs(1), - async { Some(SortKey::from_columns(["time", "some-other-column"])) }, + async { + ( + Some(SortKey::from_columns(["time", "some-other-column"])), + Some(SortedColumnSet::from([1, 2])), + ) + }, &metrics, )))) .await; @@ -823,7 +835,12 @@ mod tests { // the data within the partition's buffer. let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new( Duration::from_secs(1), - async { Some(SortKey::from_columns(["time", "good"])) }, + async { + ( + Some(SortKey::from_columns(["time", "good"])), + Some(SortedColumnSet::from([1, 2])), + ) + }, &metrics, )))) .await; @@ -904,7 +921,12 @@ mod tests { // Generate a partition let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new( Duration::from_secs(1), - async { Some(SortKey::from_columns(["time", "good"])) }, + async { + ( + Some(SortKey::from_columns(["time", "good"])), + Some(SortedColumnSet::from([1, 2])), + ) + }, &metrics, )))) .await; @@ -916,7 +938,12 @@ mod tests { // Generate a second partition let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new( Duration::from_secs(1), - async { Some(SortKey::from_columns(["time", "good"])) }, + async { + ( + Some(SortKey::from_columns(["time", "good"])), + Some(SortedColumnSet::from([1, 2])), + ) + }, &metrics, )))) .await; diff --git a/ingester/src/persist/mod.rs b/ingester/src/persist/mod.rs index 54ad438794..c30378e29a 100644 --- a/ingester/src/persist/mod.rs +++ b/ingester/src/persist/mod.rs @@ -16,7 +16,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; - use data_types::{CompactionLevel, ParquetFile}; + use data_types::{CompactionLevel, ParquetFile, SortedColumnSet}; use futures::TryStreamExt; use iox_catalog::{ interface::{get_schema_by_id, Catalog, SoftDeletedRows}, @@ -345,12 +345,18 @@ mod tests { .cas_sort_key( &partition_id, None, - &["bananas", "are", "good", "for", "you"], + // must use column names that exist in the partition data + &["region"], + // column id of region + &SortedColumnSet::from([2]), ) .await .expect("failed to set catalog sort key"); // Test: sort_key_ids after updating - assert!(updated_partition.sort_key_ids.is_none()); + assert_eq!( + updated_partition.sort_key_ids(), + Some(&SortedColumnSet::from([2])) + ); // Enqueue the persist job let notify = handle.enqueue(Arc::clone(&partition), data).await; @@ -382,10 +388,11 @@ mod tests { // mark_persisted() was called. assert_eq!(partition.lock().completed_persistence_count(), 1); - // Assert the sort key was also updated, adding the new columns to the + // Assert the sort key was also updated, adding the new columns (time) to the // end of the concurrently updated catalog sort key. assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(Some(p)) => { - assert_eq!(p.to_columns().collect::>(), &["bananas", "are", "good", "for", "you", "region", "time"]); + // Before there is only ["region"] (manual sort key update above). Now ["region", "time"] + assert_eq!(p.to_columns().collect::>(), &["region", "time"]); }); // Ensure a file was made visible in the catalog diff --git a/ingester/src/persist/worker.rs b/ingester/src/persist/worker.rs index e064d4ee1a..82fbf1e4f5 100644 --- a/ingester/src/persist/worker.rs +++ b/ingester/src/persist/worker.rs @@ -2,7 +2,7 @@ use std::{ops::ControlFlow, sync::Arc}; use async_channel::RecvError; use backoff::Backoff; -use data_types::{CompactionLevel, ParquetFileParams}; +use data_types::{ColumnsByName, CompactionLevel, ParquetFileParams}; use iox_catalog::interface::{get_table_columns_by_id, CasFailure, Catalog}; use iox_query::exec::Executor; use iox_time::{SystemProvider, TimeProvider}; @@ -174,8 +174,15 @@ async fn compact_and_upload( where O: Send + Sync, { - let compacted = compact(ctx, worker_state).await; - let (sort_key_update, parquet_table_data) = upload(ctx, worker_state, compacted).await; + // load sort key + let sort_key = ctx.sort_key().get().await; + // fetch column map + // THIS MUST BE DONE AFTER THE SORT KEY IS LOADED + let (sort_key, columns) = fetch_column_map(ctx, worker_state, sort_key).await?; + + let compacted = compact(ctx, worker_state, sort_key).await; + let (sort_key_update, parquet_table_data) = + upload(ctx, worker_state, compacted, &columns).await; if let Some(update) = sort_key_update { update_catalog_sort_key( @@ -183,6 +190,7 @@ where worker_state, update, parquet_table_data.object_store_id, + &columns, ) .await? } @@ -192,12 +200,14 @@ where /// Compact the data in `ctx` using sorted by the sort key returned from /// [`Context::sort_key()`]. -async fn compact(ctx: &Context, worker_state: &SharedWorkerState) -> CompactedStream +async fn compact( + ctx: &Context, + worker_state: &SharedWorkerState, + sort_key: Option, +) -> CompactedStream where O: Send + Sync, { - let sort_key = ctx.sort_key().get().await; - debug!( namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), @@ -231,6 +241,7 @@ async fn upload( ctx: &Context, worker_state: &SharedWorkerState, compacted: CompactedStream, + columns: &ColumnsByName, ) -> (Option, ParquetFileParams) where O: Send + Sync, @@ -294,15 +305,6 @@ where "partition parquet uploaded" ); - // Read the table's columns from the catalog to get a map of column name -> column IDs. - let columns = Backoff::new(&Default::default()) - .retry_all_errors("get table schema", || async { - let mut repos = worker_state.catalog.repositories().await; - get_table_columns_by_id(ctx.table_id(), repos.as_mut()).await - }) - .await - .expect("retry forever"); - // Build the data that must be inserted into the parquet_files catalog // table in order to make the file visible to queriers. let parquet_table_data = @@ -321,6 +323,45 @@ where (catalog_sort_key_update, parquet_table_data) } +/// Fetch the table column map from the catalog and verify if they contain all columns in the sort key +async fn fetch_column_map( + ctx: &Context, + worker_state: &SharedWorkerState, + // NOTE: CALLER MUST LOAD SORT KEY BEFORE CALLING THIS FUNCTION EVEN IF THE sort key IS NONE. + // THIS IS A MUST TO GUARANTEE THE RETURNED COLUMN MAP CONTAINS ALL COLUMNS IN THE SORT KEY + // The purpose to put the sort_key as a param here is to make sure the caller has already loaded the sort key + // and the same sort_key is returned + sort_key: Option, +) -> Result<(Option, ColumnsByName), PersistError> +where + O: Send + Sync, +{ + // Read the table's columns from the catalog to get a map of column name -> column IDs. + let column_map = Backoff::new(&Default::default()) + .retry_all_errors("get table schema", || async { + let mut repos = worker_state.catalog.repositories().await; + get_table_columns_by_id(ctx.table_id(), repos.as_mut()).await + }) + .await + .expect("retry forever"); + + // Verify that the sort key columns are in the column map + if let Some(sort_key) = &sort_key { + for sort_key_column in sort_key.to_columns() { + if !column_map.contains_column_name(sort_key_column) { + panic!( + "sort key column {} of partition id {} is not in the column map {:?}", + sort_key_column, + ctx.partition_id(), + column_map + ); + } + } + } + + Ok((sort_key, column_map)) +} + /// Update the sort key value stored in the catalog for this [`Context`]. /// /// # Concurrent Updates @@ -333,6 +374,7 @@ async fn update_catalog_sort_key( worker_state: &SharedWorkerState, new_sort_key: SortKey, object_store_id: Uuid, + columns: &ColumnsByName, ) -> Result<(), PersistError> where O: Send + Sync, @@ -360,13 +402,19 @@ where .retry_with_backoff("cas_sort_key", || { let old_sort_key = old_sort_key.clone(); let new_sort_key_str = new_sort_key.to_columns().collect::>(); + let new_sort_key_colids = columns.ids_for_names(&new_sort_key_str); let catalog = Arc::clone(&worker_state.catalog); let ctx = &ctx; async move { let mut repos = catalog.repositories().await; match repos .partitions() - .cas_sort_key(ctx.partition_id(), old_sort_key.clone(), &new_sort_key_str) + .cas_sort_key( + ctx.partition_id(), + old_sort_key.clone(), + &new_sort_key_str, + &new_sort_key_colids, + ) .await { Ok(_) => ControlFlow::Break(Ok(())), @@ -389,7 +437,8 @@ where partition_key = %ctx.partition_key(), expected=?old_sort_key, ?observed, - update=?new_sort_key_str, + update_sort_key=?new_sort_key_str, + update_sort_key_ids=?new_sort_key_colids, "detected matching concurrent sort key update" ); ControlFlow::Break(Ok(())) @@ -415,7 +464,8 @@ where partition_key = %ctx.partition_key(), expected=?old_sort_key, ?observed, - update=?new_sort_key_str, + update_sort_key=?new_sort_key_str, + update_sort_key_ids=?new_sort_key_colids, "detected concurrent sort key update, regenerating parquet" ); // Stop the retry loop with an error containing the diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 9e3c844d5a..dae145440b 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -6,7 +6,7 @@ use data_types::{ Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName, NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, - Table, TableId, TableSchema, Timestamp, TransitionPartitionId, + SortedColumnSet, Table, TableId, TableSchema, Timestamp, TransitionPartitionId, }; use iox_time::TimeProvider; use snafu::{OptionExt, Snafu}; @@ -408,11 +408,15 @@ pub trait PartitionRepo: Send + Sync { /// Implementations are allowed to spuriously return /// [`CasFailure::ValueMismatch`] for performance reasons in the presence of /// concurrent writers. + // TODO: After the sort_key_ids field is converetd into NOT NULL, the implementation of this function + // must be changed to compare old_sort_key_ids with the existing sort_key_ids instead of + // comparing old_sort_key with existing sort_key async fn cas_sort_key( &mut self, partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], + new_sort_key_ids: &SortedColumnSet, ) -> Result>>; /// Record an instance of a partition being selected for compaction but compaction was not @@ -713,6 +717,17 @@ pub async fn list_schemas( Ok(iter) } +/// panic if sort_key and sort_key_ids have different lengths +pub(crate) fn verify_sort_key_length(sort_key: &[&str], sort_key_ids: &SortedColumnSet) { + assert_eq!( + sort_key.len(), + sort_key_ids.len(), + "sort_key {:?} and sort_key_ids {:?} are not the same length", + sort_key, + sort_key_ids + ); +} + #[cfg(test)] pub(crate) mod test_helpers { use crate::{ @@ -1498,7 +1513,7 @@ pub(crate) mod test_helpers { .await .expect("failed to create partition"); // Test: sort_key_ids from create_or_get - assert!(partition.sort_key_ids.is_none()); + assert!(partition.sort_key_ids().unwrap().is_empty()); created.insert(partition.id, partition.clone()); // partition to use let partition_bar = repos @@ -1572,7 +1587,7 @@ pub(crate) mod test_helpers { batch.sort_by_key(|p| p.id); assert_eq!(created_sorted, batch); // Test: sort_key_ids from get_by_id_batch - assert!(batch.iter().all(|p| p.sort_key_ids.is_none())); + assert!(batch.iter().all(|p| p.sort_key_ids().unwrap().is_empty())); let mut batch = repos .partitions() .get_by_hash_id_batch( @@ -1586,7 +1601,7 @@ pub(crate) mod test_helpers { .unwrap(); batch.sort_by_key(|p| p.id); // Test: sort_key_ids from get_by_hash_id_batch - assert!(batch.iter().all(|p| p.sort_key_ids.is_none())); + assert!(batch.iter().all(|p| p.sort_key_ids().unwrap().is_empty())); assert_eq!(created_sorted, batch); let listed = repos @@ -1598,7 +1613,9 @@ pub(crate) mod test_helpers { .map(|v| (v.id, v)) .collect::>(); // Test: sort_key_ids from list_by_table_id - assert!(listed.values().all(|p| p.sort_key_ids.is_none())); + assert!(listed + .values() + .all(|p| p.sort_key_ids().unwrap().is_empty())); assert_eq!(created, listed); @@ -1631,11 +1648,17 @@ pub(crate) mod test_helpers { &to_skip_partition.transition_partition_id(), None, &["tag2", "tag1", "time"], + &SortedColumnSet::from([2, 1, 3]), ) .await .unwrap(); - // Test: sort_key_ids after updating from cas_sort_key - assert!(updated_partition.sort_key_ids.is_none()); + + // verify sort key and sort key ids are updated + assert_eq!(updated_partition.sort_key, &["tag2", "tag1", "time"]); + assert_eq!( + updated_partition.sort_key_ids, + Some(SortedColumnSet::from([2, 1, 3])) + ); // test sort key CAS with an incorrect value let err = repos @@ -1644,6 +1667,7 @@ pub(crate) mod test_helpers { &to_skip_partition.transition_partition_id(), Some(["bananas".to_string()].to_vec()), &["tag2", "tag1", "tag3 , with comma", "time"], + &SortedColumnSet::from([1, 2, 3, 4]), ) .await .expect_err("CAS with incorrect value should fail"); @@ -1658,12 +1682,15 @@ pub(crate) mod test_helpers { .await .unwrap() .unwrap(); + // still has the old sort key assert_eq!( updated_other_partition.sort_key, vec!["tag2", "tag1", "time"] ); - // Test: sort_key_ids from get_by_id - assert!(updated_other_partition.sort_key_ids.is_none()); + assert_eq!( + updated_other_partition.sort_key_ids, + Some(SortedColumnSet::from([2, 1, 3])) + ); let updated_other_partition = repos .partitions() @@ -1676,7 +1703,10 @@ pub(crate) mod test_helpers { vec!["tag2", "tag1", "time"] ); // Test: sort_key_ids from get_by_hash_id - assert!(updated_other_partition.sort_key_ids.is_none()); + assert_eq!( + updated_other_partition.sort_key_ids, + Some(SortedColumnSet::from([2, 1, 3])) + ); // test sort key CAS with no value let err = repos @@ -1685,6 +1715,7 @@ pub(crate) mod test_helpers { &to_skip_partition.transition_partition_id(), None, &["tag2", "tag1", "tag3 , with comma", "time"], + &SortedColumnSet::from([1, 2, 3, 4]), ) .await .expect_err("CAS with incorrect value should fail"); @@ -1699,6 +1730,7 @@ pub(crate) mod test_helpers { &to_skip_partition.transition_partition_id(), Some(["bananas".to_string()].to_vec()), &["tag2", "tag1", "tag3 , with comma", "time"], + &SortedColumnSet::from([1, 2, 3, 4]), ) .await .expect_err("CAS with incorrect value should fail"); @@ -1718,11 +1750,18 @@ pub(crate) mod test_helpers { .collect(), ), &["tag2", "tag1", "tag3 , with comma", "time"], + &SortedColumnSet::from([2, 1, 4, 3]), ) .await .unwrap(); - // Test: sort_key_ids afer updating from cas_sort_key - assert!(updated_partition.sort_key_ids.is_none()); + assert_eq!( + updated_partition.sort_key, + vec!["tag2", "tag1", "tag3 , with comma", "time"] + ); + assert_eq!( + updated_partition.sort_key_ids, + Some(SortedColumnSet::from([2, 1, 4, 3])) + ); // test getting the new sort key let updated_partition = repos @@ -1735,8 +1774,10 @@ pub(crate) mod test_helpers { updated_partition.sort_key, vec!["tag2", "tag1", "tag3 , with comma", "time"] ); - // Test: sort_key_ids from get_by_id after after updating - assert!(updated_partition.sort_key_ids.is_none()); + assert_eq!( + updated_partition.sort_key_ids, + Some(SortedColumnSet::from([2, 1, 4, 3])) + ); let updated_partition = repos .partitions() @@ -1748,8 +1789,10 @@ pub(crate) mod test_helpers { updated_partition.sort_key, vec!["tag2", "tag1", "tag3 , with comma", "time"] ); - // sort_key_ids gotten back from the udpate is still null - assert!(updated_partition.sort_key_ids.is_none()); + assert_eq!( + updated_partition.sort_key_ids, + Some(SortedColumnSet::from([2, 1, 4, 3])) + ); // The compactor can log why compaction was skipped let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap(); @@ -1931,8 +1974,32 @@ pub(crate) mod test_helpers { .await .expect("should list most recent"); assert_eq!(recent.len(), 4); + // Test: sort_key_ids from most_recent_n - assert!(recent.iter().all(|p| p.sort_key_ids.is_none())); + // Only the second one has vallues, the other 3 are empty + let empty_vec_string: Vec = vec![]; + assert_eq!(recent[0].sort_key, empty_vec_string); + assert_eq!(recent[0].sort_key_ids, Some(SortedColumnSet::from(vec![]))); + + assert_eq!( + recent[1].sort_key, + vec![ + "tag2".to_string(), + "tag1".to_string(), + "tag3 , with comma".to_string(), + "time".to_string() + ] + ); + assert_eq!( + recent[1].sort_key_ids, + Some(SortedColumnSet::from(vec![2, 1, 4, 3])) + ); + + assert_eq!(recent[2].sort_key, empty_vec_string); + assert_eq!(recent[2].sort_key_ids, Some(SortedColumnSet::from(vec![]))); + + assert_eq!(recent[3].sort_key, empty_vec_string); + assert_eq!(recent[3].sort_key_ids, Some(SortedColumnSet::from(vec![]))); let recent = repos .partitions() diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 330dcb977b..c21be6637c 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -1,7 +1,7 @@ //! This module implements an in-memory implementation of the iox_catalog interface. It can be //! used for testing or for an IOx designed to run without catalog persistence. -use crate::interface::MAX_PARQUET_FILES_SELECTED_ONCE_FOR_DELETE; +use crate::interface::{verify_sort_key_length, MAX_PARQUET_FILES_SELECTED_ONCE_FOR_DELETE}; use crate::{ interface::{ CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, @@ -12,6 +12,7 @@ use crate::{ DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, }; use async_trait::async_trait; +use data_types::SortedColumnSet; use data_types::{ partition_template::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart, @@ -566,7 +567,7 @@ impl PartitionRepo for MemTxn { table_id, key, vec![], - None, + Some(SortedColumnSet::new(vec![])), None, ); stage.partitions.push(p); @@ -662,7 +663,10 @@ impl PartitionRepo for MemTxn { partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], + new_sort_key_ids: &SortedColumnSet, ) -> Result>> { + verify_sort_key_length(new_sort_key, new_sort_key_ids); + let stage = self.stage(); let old_sort_key = old_sort_key.unwrap_or_default(); @@ -674,6 +678,7 @@ impl PartitionRepo for MemTxn { }) { Some(p) if p.sort_key == old_sort_key => { p.sort_key = new_sort_key.iter().map(|s| s.to_string()).collect(); + p.sort_key_ids = Some(new_sort_key_ids.clone()); Ok(p.clone()) } Some(p) => return Err(CasFailure::ValueMismatch(p.sort_key.clone())), diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index e1b87fb167..de76ea952f 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -9,8 +9,8 @@ use data_types::{ partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride}, Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams, - Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, - Timestamp, TransitionPartitionId, + Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, SortedColumnSet, + Table, TableId, Timestamp, TransitionPartitionId, }; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; @@ -176,7 +176,7 @@ decorate!( "partition_get_by_hash_id_batch" = get_by_hash_id_batch(&mut self, partition_hash_ids: &[&PartitionHashId]) -> Result>; "partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result>; "partition_list_ids" = list_ids(&mut self) -> Result>; - "partition_update_sort_key" = cas_sort_key(&mut self, partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str]) -> Result>>; + "partition_update_sort_key" = cas_sort_key(&mut self, partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], new_sort_key_ids: &SortedColumnSet) -> Result>>; "partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>; "partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result>; "partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 9a1e8f3b38..1388234c3b 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1,6 +1,6 @@ //! A Postgres backed implementation of the Catalog -use crate::interface::MAX_PARQUET_FILES_SELECTED_ONCE_FOR_DELETE; +use crate::interface::{verify_sort_key_length, MAX_PARQUET_FILES_SELECTED_ONCE_FOR_DELETE}; use crate::{ interface::{ self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, @@ -16,6 +16,7 @@ use crate::{ DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, }; use async_trait::async_trait; +use data_types::SortedColumnSet; use data_types::{ partition_template::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart, @@ -1159,9 +1160,9 @@ impl PartitionRepo for PostgresTxn { let v = sqlx::query_as::<_, Partition>( r#" INSERT INTO partition - (partition_key, shard_id, table_id, hash_id, sort_key) + (partition_key, shard_id, table_id, hash_id, sort_key, sort_key_ids) VALUES - ( $1, $2, $3, $4, '{}') + ( $1, $2, $3, $4, '{}', '{}') ON CONFLICT ON CONSTRAINT partition_key_unique DO UPDATE SET partition_key = partition.partition_key RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; @@ -1301,32 +1302,37 @@ WHERE table_id = $1; partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], + new_sort_key_ids: &SortedColumnSet, ) -> Result>> { + verify_sort_key_length(new_sort_key, new_sort_key_ids); + let old_sort_key = old_sort_key.unwrap_or_default(); // This `match` will go away when all partitions have hash IDs in the database. let query = match partition_id { TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, Partition>( r#" UPDATE partition -SET sort_key = $1 +SET sort_key = $1, sort_key_ids = $4 WHERE hash_id = $2 AND sort_key = $3 RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; "#, ) .bind(new_sort_key) // $1 .bind(hash_id) // $2 - .bind(&old_sort_key), // $3 + .bind(&old_sort_key) // $3 + .bind(new_sort_key_ids), // $4 TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, Partition>( r#" UPDATE partition -SET sort_key = $1 +SET sort_key = $1, sort_key_ids = $4 WHERE id = $2 AND sort_key = $3 RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; "#, ) .bind(new_sort_key) // $1 .bind(id) // $2 - .bind(&old_sort_key), // $3 + .bind(&old_sort_key) // $3 + .bind(new_sort_key_ids), // $4 }; let res = query.fetch_one(&mut self.inner).await; @@ -1362,6 +1368,7 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file ?partition_id, ?old_sort_key, ?new_sort_key, + ?new_sort_key_ids, "partition sort key cas successful" ); @@ -2173,7 +2180,7 @@ mod tests { assert_eq!(a.hash_id().unwrap(), &hash_id); // Test: sort_key_ids from partition_create_or_get_idempotent - assert!(a.sort_key_ids.is_none()); + assert!(a.sort_key_ids().unwrap().is_empty()); // Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent. let b = repos @@ -2194,8 +2201,9 @@ mod tests { .unwrap(); assert_eq!(table_partitions.len(), 1); assert_eq!(table_partitions[0].hash_id().unwrap(), &hash_id); + // Test: sort_key_ids from partition_create_or_get_idempotent - assert!(table_partitions[0].sort_key_ids.is_none()); + assert!(table_partitions[0].sort_key_ids().unwrap().is_empty()); } #[tokio::test] @@ -2217,9 +2225,9 @@ mod tests { sqlx::query( r#" INSERT INTO partition - (partition_key, shard_id, table_id, sort_key) + (partition_key, shard_id, table_id, sort_key, sort_key_ids) VALUES - ( $1, $2, $3, '{}') + ( $1, $2, $3, '{}', '{}') ON CONFLICT ON CONSTRAINT partition_key_unique DO UPDATE SET partition_key = partition.partition_key RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; @@ -2246,6 +2254,9 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file .await .expect("idempotent write should succeed"); + // Test: sort_key_ids from freshly insert with empty value + assert!(inserted_again.sort_key_ids().unwrap().is_empty()); + assert_eq!(partition, &inserted_again); // Create a Parquet file record in this partition to ensure we don't break new data diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index ac62933ace..d5b874a577 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -2,9 +2,9 @@ use crate::{ interface::{ - self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo, - ParquetFileRepo, PartitionRepo, RepoCollection, Result, SoftDeletedRows, TableRepo, - MAX_PARQUET_FILES_SELECTED_ONCE_FOR_RETENTION, + self, verify_sort_key_length, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, + Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, RepoCollection, Result, + SoftDeletedRows, TableRepo, MAX_PARQUET_FILES_SELECTED_ONCE_FOR_RETENTION, }, kafkaless_transition::{ SHARED_QUERY_POOL, SHARED_QUERY_POOL_ID, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, @@ -852,9 +852,9 @@ impl PartitionRepo for SqliteTxn { let v = sqlx::query_as::<_, PartitionPod>( r#" INSERT INTO partition - (partition_key, shard_id, table_id, hash_id, sort_key) + (partition_key, shard_id, table_id, hash_id, sort_key, sort_key_ids) VALUES - ($1, $2, $3, $4, '[]') + ($1, $2, $3, $4, '[]', '[]') ON CONFLICT (table_id, partition_key) DO UPDATE SET partition_key = partition.partition_key RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; @@ -1012,33 +1012,39 @@ WHERE table_id = $1; partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], + new_sort_key_ids: &SortedColumnSet, ) -> Result>> { + verify_sort_key_length(new_sort_key, new_sort_key_ids); + let old_sort_key = old_sort_key.unwrap_or_default(); + let raw_new_sort_key_ids: Vec<_> = new_sort_key_ids.iter().map(|cid| cid.get()).collect(); // This `match` will go away when all partitions have hash IDs in the database. let query = match partition_id { TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, PartitionPod>( r#" UPDATE partition -SET sort_key = $1 +SET sort_key = $1, sort_key_ids = $4 WHERE hash_id = $2 AND sort_key = $3 RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; "#, ) .bind(Json(new_sort_key)) // $1 .bind(hash_id) // $2 - .bind(Json(&old_sort_key)), // $3 + .bind(Json(&old_sort_key)) // $3 + .bind(Json(&raw_new_sort_key_ids)), // $4 TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, PartitionPod>( r#" UPDATE partition -SET sort_key = $1 +SET sort_key = $1, sort_key_ids = $4 WHERE id = $2 AND sort_key = $3 RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; "#, ) .bind(Json(new_sort_key)) // $1 .bind(id) // $2 - .bind(Json(&old_sort_key)), // $3 + .bind(Json(&old_sort_key)) // $3 + .bind(Json(&raw_new_sort_key_ids)), // $4 }; let res = query.fetch_one(self.inner.get_mut()).await; @@ -1779,13 +1785,14 @@ mod tests { .unwrap(); assert_eq!(table_partitions.len(), 1); assert_eq!(table_partitions[0].hash_id().unwrap(), &hash_id); + // Test: sort_key_ids from partition_create_or_get_idempotent - assert!(table_partitions[0].sort_key_ids.is_none()); + assert!(table_partitions[0].sort_key_ids().unwrap().is_empty()); } #[tokio::test] async fn existing_partitions_without_hash_id() { - let sqlite = setup_db().await; + let sqlite: SqliteCatalog = setup_db().await; let pool = sqlite.pool.clone(); let sqlite: Arc = Arc::new(sqlite); let mut repos = sqlite.repositories().await; @@ -1800,9 +1807,9 @@ mod tests { sqlx::query( r#" INSERT INTO partition - (partition_key, shard_id, table_id, sort_key) + (partition_key, shard_id, table_id, sort_key, sort_key_ids) VALUES - ($1, $2, $3, '[]') + ($1, $2, $3, '[]', '[]') ON CONFLICT (table_id, partition_key) DO UPDATE SET partition_key = partition.partition_key RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; @@ -1828,6 +1835,9 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file .await .expect("idempotent write should succeed"); + // Test: sort_key_ids from freshly insert with empty value + assert!(inserted_again.sort_key_ids().unwrap().is_empty()); + assert_eq!(partition, &inserted_again); // Create a Parquet file record in this partition to ensure we don't break new data diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index 57490345e3..170f6e45df 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -7,8 +7,8 @@ use arrow::{ use data_types::{ partition_template::TablePartitionTemplateOverride, Column, ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceName, NamespaceSchema, ParquetFile, - ParquetFileParams, Partition, PartitionId, Table, TableId, TableSchema, Timestamp, - TransitionPartitionId, + ParquetFileParams, Partition, PartitionId, SortedColumnSet, Table, TableId, TableSchema, + Timestamp, TransitionPartitionId, }; use datafusion::physical_plan::metrics::Count; use datafusion_util::{unbounded_memory_pool, MemoryStream}; @@ -318,6 +318,7 @@ impl TestTable { self: &Arc, key: &str, sort_key: &[&str], + sort_key_ids: &[i64], ) -> Arc { let mut repos = self.catalog.catalog.repositories().await; @@ -333,11 +334,10 @@ impl TestTable { &TransitionPartitionId::Deprecated(partition.id), None, sort_key, + &SortedColumnSet::from(sort_key_ids.iter().cloned()), ) .await .unwrap(); - // Test: sort_key_ids after updating - assert!(partition.sort_key_ids.is_none()); Arc::new(TestPartition { catalog: Arc::clone(&self.catalog), @@ -427,6 +427,12 @@ pub struct TestColumn { pub column: Column, } +impl TestColumn { + pub fn id(&self) -> i64 { + self.column.id.get() + } +} + /// A test catalog with specified namespace, table, partition #[allow(missing_docs)] #[derive(Debug)] @@ -439,7 +445,11 @@ pub struct TestPartition { impl TestPartition { /// Update sort key. - pub async fn update_sort_key(self: &Arc, sort_key: SortKey) -> Arc { + pub async fn update_sort_key( + self: &Arc, + sort_key: SortKey, + sort_key_ids: &SortedColumnSet, + ) -> Arc { let old_sort_key = partition_lookup( self.catalog.catalog.repositories().await.as_mut(), &self.partition.transition_partition_id(), @@ -459,11 +469,10 @@ impl TestPartition { &self.partition.transition_partition_id(), Some(old_sort_key), &sort_key.to_columns().collect::>(), + sort_key_ids, ) .await .unwrap(); - // Test: sort_key_ids after updating - assert!(partition.sort_key_ids.is_none()); Arc::new(Self { catalog: Arc::clone(&self.catalog), @@ -779,6 +788,11 @@ async fn update_catalog_sort_key_if_needed( // Fetch the latest partition info from the catalog let partition = partition_lookup(repos, id).await.unwrap().unwrap(); + // fecth column ids from catalog + let columns = get_table_columns_by_id(partition.table_id, repos) + .await + .unwrap(); + // Similarly to what the ingester does, if there's an existing sort key in the catalog, add new // columns onto the end match partition.sort_key() { @@ -792,7 +806,10 @@ async fn update_catalog_sort_key_if_needed( catalog_sort_key.to_columns().collect::>(), &new_columns, ); - let updated_partition = repos + + let column_ids = columns.ids_for_names(&new_columns); + + repos .partitions() .cas_sort_key( id, @@ -803,23 +820,21 @@ async fn update_catalog_sort_key_if_needed( .collect::>(), ), &new_columns, + &column_ids, ) .await .unwrap(); - // Test: sort_key_ids after updating - assert!(updated_partition.sort_key_ids.is_none()); } } None => { let new_columns = sort_key.to_columns().collect::>(); debug!("Updating sort key from None to {:?}", &new_columns); - let updated_partition = repos + let column_ids = columns.ids_for_names(&new_columns); + repos .partitions() - .cas_sort_key(id, None, &new_columns) + .cas_sort_key(id, None, &new_columns, &column_ids) .await .unwrap(); - // Test: sort_key_ids after updating - assert!(updated_partition.sort_key_ids.is_none()); } } } diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index c842270ca2..a07b17db0c 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -391,7 +391,7 @@ mod tests { use async_trait::async_trait; use data_types::{ partition_template::TablePartitionTemplateOverride, ColumnType, PartitionId, PartitionKey, - TableId, + SortedColumnSet, TableId, }; use futures::StreamExt; use generated_types::influxdata::iox::partition_template::v1::{ @@ -410,7 +410,7 @@ mod tests { let c1 = t.create_column("tag", ColumnType::Tag).await; let c2 = t.create_column("time", ColumnType::Time).await; let p1 = t - .create_partition_with_sort_key("k1", &["tag", "time"]) + .create_partition_with_sort_key("k1", &["tag", "time"], &[c1.id(), c2.id()]) .await .partition .clone(); @@ -865,10 +865,10 @@ mod tests { // set sort key let p = p - .update_sort_key(SortKey::from_columns([ - c1.column.name.as_str(), - c2.column.name.as_str(), - ])) + .update_sort_key( + SortKey::from_columns([c1.column.name.as_str(), c2.column.name.as_str()]), + &SortedColumnSet::from([c1.column.id.get(), c2.column.id.get()]), + ) .await; assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 1); @@ -1110,11 +1110,12 @@ mod tests { partition_template: TablePartitionTemplateOverride::default(), }); const N_PARTITIONS: usize = 20; + let c_id = c.column.id.get(); let mut partitions = futures::stream::iter(0..N_PARTITIONS) .then(|i| { let t = Arc::clone(&t); async move { - t.create_partition_with_sort_key(&format!("p{i}"), &["time"]) + t.create_partition_with_sort_key(&format!("p{i}"), &["time"], &[c_id]) .await .partition .transition_partition_id() diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index dbac06a736..794caba889 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -105,7 +105,7 @@ pub mod tests { use super::*; use arrow::{datatypes::DataType, record_batch::RecordBatch}; use arrow_util::assert_batches_eq; - use data_types::{ColumnType, ParquetFile}; + use data_types::{ColumnType, ParquetFile, SortedColumnSet}; use datafusion_util::config::register_iox_object_store; use iox_query::{ exec::{ExecutorType, IOxSessionContext}, @@ -186,17 +186,20 @@ pub mod tests { .join("\n"); let ns = catalog.create_namespace_1hr_retention("ns").await; let table = ns.create_table("table").await; - table.create_column("tag1", ColumnType::Tag).await; - table.create_column("tag2", ColumnType::Tag).await; + let tag1 = table.create_column("tag1", ColumnType::Tag).await; + let tag2 = table.create_column("tag2", ColumnType::Tag).await; table.create_column("tag3", ColumnType::Tag).await; - table.create_column("tag4", ColumnType::Tag).await; + let tag4 = table.create_column("tag4", ColumnType::Tag).await; table.create_column("field_int", ColumnType::I64).await; table.create_column("field_float", ColumnType::F64).await; - table.create_column("time", ColumnType::Time).await; + let col_time = table.create_column("time", ColumnType::Time).await; let partition = table .create_partition("part") .await - .update_sort_key(SortKey::from_columns(["tag1", "tag2", "tag4", "time"])) + .update_sort_key( + SortKey::from_columns(["tag1", "tag2", "tag4", "time"]), + &SortedColumnSet::from([tag1.id(), tag2.id(), tag4.id(), col_time.id()]), + ) .await; let builder = TestParquetFileBuilder::default().with_line_protocol(&lp); let parquet_file = Arc::new(partition.create_parquet_file(builder).await.parquet_file); diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index 169aaa372b..5353503417 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -232,11 +232,23 @@ fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile { fn to_partition(p: data_types::Partition) -> Partition { let identifier = to_partition_identifier(&p.transition_partition_id()); + let array_sort_key_ids = p + .sort_key_ids + .map(|cols| cols.iter().map(|id| id.get()).collect::>()); + + let array_sort_key_ids = match array_sort_key_ids { + None => vec![], + Some(array_sort_key_ids) => array_sort_key_ids, + }; + + let proto_sort_key_id = SortKeyIds { array_sort_key_ids }; + Partition { identifier: Some(identifier), key: p.partition_key.to_string(), table_id: p.table_id.get(), array_sort_key: p.sort_key, + sort_key_ids: Some(proto_sort_key_id), } } @@ -269,7 +281,7 @@ mod tests { .await .unwrap(); // Test: sort_key_ids from create_or_get in catalog_service - assert!(partition.sort_key_ids.is_none()); + assert!(partition.sort_key_ids().unwrap().is_empty()); let p1params = ParquetFileParams { namespace_id: namespace.id, table_id: table.id,