From 9bf1c8c11c891fe57beddab5d55b52fc55658193 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Fri, 11 Aug 2023 11:36:27 -0400 Subject: [PATCH] chore: revert fill sort_key_ids --- compactor_test_utils/src/lib.rs | 15 ++-- data_types/src/columns.rs | 80 +---------------- data_types/src/partition.rs | 18 +--- .../influxdata/iox/catalog/v1/service.proto | 4 - import_export/src/file/import.rs | 67 ++++----------- ingester/src/buffer_tree/partition.rs | 8 +- .../buffer_tree/partition/resolver/cache.rs | 4 - .../partition/resolver/old_filter.rs | 2 - .../partition/resolver/sort_key.rs | 2 - ingester/src/persist/mod.rs | 12 +-- ingester/src/persist/worker.rs | 85 ++++--------------- iox_catalog/src/interface.rs | 22 ++--- iox_catalog/src/mem.rs | 18 +--- iox_catalog/src/metrics.rs | 4 +- iox_catalog/src/postgres.rs | 49 +++++------ iox_catalog/src/sqlite.rs | 50 ++++------- iox_tests/src/builders.rs | 1 - iox_tests/src/catalog.rs | 27 +----- querier/src/cache/partition.rs | 17 ++-- querier/src/parquet/mod.rs | 15 ++-- service_grpc_catalog/src/lib.rs | 1 - 21 files changed, 113 insertions(+), 388 deletions(-) diff --git a/compactor_test_utils/src/lib.rs b/compactor_test_utils/src/lib.rs index 6f72ddb465..886553e38c 100644 --- a/compactor_test_utils/src/lib.rs +++ b/compactor_test_utils/src/lib.rs @@ -42,7 +42,7 @@ use compactor::{ PartitionInfo, }; use compactor_scheduler::SchedulerConfig; -use data_types::{ColumnSet, ColumnType, CompactionLevel, ParquetFile, TableId}; +use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId}; use datafusion::arrow::record_batch::RecordBatch; use datafusion_util::config::register_iox_object_store; use futures::TryStreamExt; @@ -93,20 +93,17 @@ impl TestSetupBuilder { let ns = catalog.create_namespace_1hr_retention("ns").await; let table = ns.create_table("table").await; table.create_column("field_int", ColumnType::I64).await; - let tag1 = table.create_column("tag1", ColumnType::Tag).await; - let tag2 = table.create_column("tag2", ColumnType::Tag).await; - let tag3 = table.create_column("tag3", ColumnType::Tag).await; - let col_time = table.create_column("time", ColumnType::Time).await; + table.create_column("tag1", ColumnType::Tag).await; + table.create_column("tag2", ColumnType::Tag).await; + table.create_column("tag3", ColumnType::Tag).await; + table.create_column("time", ColumnType::Time).await; let partition = table.create_partition("2022-07-13").await; // The sort key comes from the catalog and should be the union of all tags the // ingester has seen let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]); - let sort_key_col_ids = ColumnSet::from([tag1.id(), tag2.id(), tag3.id(), col_time.id()]); - let partition = partition - .update_sort_key(sort_key.clone(), &sort_key_col_ids) - .await; + let partition = partition.update_sort_key(sort_key.clone()).await; // Ensure the input scenario conforms to the expected invariants. let invariant_check = Arc::new(CatalogInvariants { diff --git a/data_types/src/columns.rs b/data_types/src/columns.rs index c5a9278262..c5d737be73 100644 --- a/data_types/src/columns.rs +++ b/data_types/src/columns.rs @@ -90,17 +90,6 @@ impl ColumnsByName { self.0.values().map(|c| c.id) } - /// Return column ids of the given column names - /// Will panic if any of the names are not found - pub fn ids_for_names(&self, names: &[&str]) -> ColumnSet { - ColumnSet::from(names.iter().map(|name| { - self.get(name) - .unwrap_or_else(|| panic!("column name not found: {}", name)) - .id - .get() - })) - } - /// Get a column by its name. pub fn get(&self, name: &str) -> Option<&ColumnSchema> { self.0.get(name) @@ -342,7 +331,7 @@ impl TryFrom for ColumnType { } /// Set of columns. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, sqlx::Type)] +#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)] #[sqlx(transparent, no_pg_array)] pub struct ColumnSet(Vec); @@ -382,15 +371,6 @@ impl From for Vec { } } -impl From for ColumnSet -where - I: IntoIterator, -{ - fn from(ids: I) -> Self { - Self(ids.into_iter().map(ColumnId::new).collect()) - } -} - impl Deref for ColumnSet { type Target = [ColumnId]; @@ -402,7 +382,6 @@ impl Deref for ColumnSet { #[cfg(test)] mod tests { use assert_matches::assert_matches; - use std::collections::BTreeMap; use super::*; @@ -471,61 +450,4 @@ mod tests { ColumnSchema::try_from(&proto).expect_err("should succeed"); } - - #[test] - fn test_columns_by_names_exist() { - let columns = build_columns_by_names(); - - let ids = columns.ids_for_names(&["foo", "bar"]); - assert_eq!(ids, ColumnSet::from([1, 2])); - } - - #[test] - fn test_columns_by_names_exist_different_order() { - let columns = build_columns_by_names(); - - let ids = columns.ids_for_names(&["bar", "foo"]); - assert_eq!(ids, ColumnSet::from([2, 1])); - } - - #[test] - #[should_panic = "column name not found: baz"] - fn test_columns_by_names_not_exist() { - let columns = build_columns_by_names(); - columns.ids_for_names(&["foo", "baz"]); - } - - fn build_columns_by_names() -> ColumnsByName { - let mut columns: BTreeMap = BTreeMap::new(); - columns.insert( - "foo".to_string(), - ColumnSchema { - id: ColumnId::new(1), - column_type: ColumnType::I64, - }, - ); - columns.insert( - "bar".to_string(), - ColumnSchema { - id: ColumnId::new(2), - column_type: ColumnType::I64, - }, - ); - columns.insert( - "time".to_string(), - ColumnSchema { - id: ColumnId::new(3), - column_type: ColumnType::Time, - }, - ); - columns.insert( - "tag1".to_string(), - ColumnSchema { - id: ColumnId::new(4), - column_type: ColumnType::Tag, - }, - ); - - ColumnsByName(columns) - } } diff --git a/data_types/src/partition.rs b/data_types/src/partition.rs index bcf9cf62d4..76fd95bb33 100644 --- a/data_types/src/partition.rs +++ b/data_types/src/partition.rs @@ -1,7 +1,5 @@ //! Types having to do with partitions. -use crate::ColumnSet; - use super::{TableId, Timestamp}; use schema::sort::SortKey; @@ -360,15 +358,9 @@ pub struct Partition { pub table_id: TableId, /// the string key of the partition pub partition_key: PartitionKey, - - // TODO: This `sort_key` will be removed after `sort_key_ids` is fully implemented - /// See comments of [`Partition::sort_key_ids`] for this as it plays the - /// same role but stores column IDs instead of names - pub sort_key: Vec, - - /// vector of column IDs that describes how *every* parquet file + /// vector of column names that describes how *every* parquet file /// in this [`Partition`] is sorted. The sort_key contains all the - /// ID of primary key (PK) columns that have been persisted, and nothing + /// primary key (PK) columns that have been persisted, and nothing /// else. The PK columns are all `tag` columns and the `time` /// column. /// @@ -391,7 +383,7 @@ pub struct Partition { /// For example, updating `A,B,C` to either `A,D,B,C` or `A,B,C,D` /// is legal. However, updating to `A,C,D,B` is not because the /// relative order of B and C have been reversed. - pub sort_key_ids: ColumnSet, + pub sort_key: Vec, /// The time at which the newest file of the partition is created pub new_file_at: Option, @@ -407,7 +399,6 @@ impl Partition { table_id: TableId, partition_key: PartitionKey, sort_key: Vec, - sort_key_ids: ColumnSet, new_file_at: Option, ) -> Self { let hash_id = PartitionHashId::new(table_id, &partition_key); @@ -417,7 +408,6 @@ impl Partition { table_id, partition_key, sort_key, - sort_key_ids, new_file_at, } } @@ -434,7 +424,6 @@ impl Partition { table_id: TableId, partition_key: PartitionKey, sort_key: Vec, - sort_key_ids: ColumnSet, new_file_at: Option, ) -> Self { Self { @@ -443,7 +432,6 @@ impl Partition { table_id, partition_key, sort_key, - sort_key_ids, new_file_at, } } diff --git a/generated_types/protos/influxdata/iox/catalog/v1/service.proto b/generated_types/protos/influxdata/iox/catalog/v1/service.proto index 1b0c42a139..3e370874dc 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/service.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/service.proto @@ -50,10 +50,6 @@ message Partition { repeated string array_sort_key = 6; PartitionIdentifier identifier = 8; - - // the sort key ids sort_key_ids for data in parquet files of this partition which - // is an array of column ids of the sort keys - repeated int64 array_sort_key_ids = 9; } message GetPartitionsByTableIdRequest { diff --git a/import_export/src/file/import.rs b/import_export/src/file/import.rs index 21ffcef88d..c6530e824d 100644 --- a/import_export/src/file/import.rs +++ b/import_export/src/file/import.rs @@ -6,9 +6,8 @@ use data_types::{ partition_template::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, PARTITION_BY_DAY_PROTO, }, - ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceName, - NamespaceNameError, ParquetFileParams, Partition, PartitionKey, Statistics, Table, TableId, - Timestamp, + ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceName, NamespaceNameError, + ParquetFileParams, Partition, Statistics, Table, TableId, Timestamp, }; use generated_types::influxdata::iox::catalog::v1 as proto; // ParquetFile as ProtoParquetFile, Partition as ProtoPartition, @@ -370,10 +369,8 @@ impl RemoteImporter { let table_id = table.id; debug!(%table_id, "Inserting catalog records into table"); - // Create a new partition - let partition_key = iox_metadata.partition_key.clone(); - let mut partition = self - .create_partition(repos.as_mut(), &table, partition_key) + let partition = self + .partition_for_parquet_file(repos.as_mut(), &table, &iox_metadata) .await?; // Note that for some reason, the object_store_id that is @@ -419,11 +416,6 @@ impl RemoteImporter { } }; - // Update partition sort key - let partition = self - .update_partition(&mut partition, repos.as_mut(), &table, &iox_metadata) - .await?; - // Now copy the parquet files into the object store //let partition_id = TransitionPartitionId::Deprecated(partition.id); let transition_partition_id = partition.transition_partition_id(); @@ -479,40 +471,25 @@ impl RemoteImporter { Ok(table) } - /// Create the catalog [`Partition`] into which the specified parquet + /// Return the catalog [`Partition`] into which the specified parquet /// file shoudl be inserted. /// - /// The sort_key and sort_key_ids of the partition should be empty when it is first created - /// because there are no columns in any parquet files to use for sorting yet. - /// The sort_key and sort_key_ids will be updated after the parquet files are created. - async fn create_partition( - &self, - repos: &mut dyn RepoCollection, - table: &Table, - partition_key: PartitionKey, - ) -> Result { - let partition = repos - .partitions() - .create_or_get(partition_key, table.id) - .await?; - - Ok(partition) - } - - /// Update sort keys of the partition - /// /// First attempts to use any available metadata from the /// catalog export, and falls back to what is in the iox /// metadata stored in the parquet file, if needed - async fn update_partition( + async fn partition_for_parquet_file( &self, - partition: &mut Partition, repos: &mut dyn RepoCollection, table: &Table, iox_metadata: &IoxMetadata, ) -> Result { let partition_key = iox_metadata.partition_key.clone(); + let partition = repos + .partitions() + .create_or_get(partition_key.clone(), table.id) + .await?; + // Note we use the table_id embedded in the file's metadata // from the source catalog to match the exported catlog (which // is dfferent than the new table we just created in the @@ -521,21 +498,14 @@ impl RemoteImporter { .exported_contents .partition_metadata(iox_metadata.table_id.get(), partition_key.inner()); - let (new_sort_key, new_sort_key_ids) = if let Some(proto_partition) = - proto_partition.as_ref() - { + let new_sort_key: Vec<&str> = if let Some(proto_partition) = proto_partition.as_ref() { // Use the sort key from the source catalog debug!(array_sort_key=?proto_partition.array_sort_key, "Using sort key from catalog export"); - let new_sort_key = proto_partition + proto_partition .array_sort_key .iter() .map(|s| s.as_str()) - .collect::>(); - - let new_sort_key_ids = - ColumnSet::from(proto_partition.array_sort_key_ids.iter().cloned()); - - (new_sort_key, new_sort_key_ids) + .collect() } else { warn!("Could not find sort key in catalog metadata export, falling back to embedded metadata"); let sort_key = iox_metadata @@ -543,13 +513,7 @@ impl RemoteImporter { .as_ref() .ok_or_else(|| Error::NoSortKey)?; - let new_sort_key = sort_key.to_columns().collect::>(); - - // fecth table columns - let columns = ColumnsByName::new(repos.columns().list_by_table_id(table.id).await?); - let new_sort_key_ids = columns.ids_for_names(&new_sort_key); - - (new_sort_key, new_sort_key_ids) + sort_key.to_columns().collect() }; if !partition.sort_key.is_empty() && partition.sort_key != new_sort_key { @@ -565,7 +529,6 @@ impl RemoteImporter { &partition.transition_partition_id(), Some(partition.sort_key.clone()), &new_sort_key, - &new_sort_key_ids, ) .await; diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index c21d9e270e..25aeaaf224 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -423,7 +423,6 @@ mod tests { use arrow_util::assert_batches_eq; use assert_matches::assert_matches; use backoff::BackoffConfig; - use data_types::ColumnSet; use datafusion::{ physical_expr::PhysicalSortExpr, physical_plan::{expressions::col, memory::MemoryExec, ExecutionPlan}, @@ -1007,12 +1006,7 @@ mod tests { .repositories() .await .partitions() - .cas_sort_key( - &partition.transition_partition_id(), - None, - &["terrific"], - &ColumnSet::from([1]), - ) + .cas_sort_key(&partition.transition_partition_id(), None, &["terrific"]) .await .unwrap(); diff --git a/ingester/src/buffer_tree/partition/resolver/cache.rs b/ingester/src/buffer_tree/partition/resolver/cache.rs index 7a212402d0..dc47914ee5 100644 --- a/ingester/src/buffer_tree/partition/resolver/cache.rs +++ b/ingester/src/buffer_tree/partition/resolver/cache.rs @@ -215,7 +215,6 @@ mod tests { // Harmless in tests - saves a bunch of extra vars. #![allow(clippy::await_holding_lock)] - use data_types::ColumnSet; use iox_catalog::mem::MemCatalog; use super::*; @@ -288,7 +287,6 @@ mod tests { ARBITRARY_TABLE_ID, stored_partition_key.clone(), vec!["dos".to_string(), "bananas".to_string()], - ColumnSet::from([1, 2]), Default::default(), ); @@ -351,7 +349,6 @@ mod tests { ARBITRARY_PARTITION_KEY.clone(), Default::default(), Default::default(), - Default::default(), ); let cache = new_cache(inner, [partition]); @@ -389,7 +386,6 @@ mod tests { ARBITRARY_PARTITION_KEY.clone(), Default::default(), Default::default(), - Default::default(), ); let cache = new_cache(inner, [partition]); diff --git a/ingester/src/buffer_tree/partition/resolver/old_filter.rs b/ingester/src/buffer_tree/partition/resolver/old_filter.rs index 9fbd88f0e2..977f3c6f93 100644 --- a/ingester/src/buffer_tree/partition/resolver/old_filter.rs +++ b/ingester/src/buffer_tree/partition/resolver/old_filter.rs @@ -304,7 +304,6 @@ mod tests { table_id, partition_key, vec![], - Default::default(), None, ) } @@ -388,7 +387,6 @@ mod tests { ARBITRARY_TABLE_ID, ARBITRARY_PARTITION_KEY.clone(), vec![], - Default::default(), None, ); let want_id = p.transition_partition_id().clone(); diff --git a/ingester/src/buffer_tree/partition/resolver/sort_key.rs b/ingester/src/buffer_tree/partition/resolver/sort_key.rs index 13b785fccf..8980f0c9e7 100644 --- a/ingester/src/buffer_tree/partition/resolver/sort_key.rs +++ b/ingester/src/buffer_tree/partition/resolver/sort_key.rs @@ -53,7 +53,6 @@ impl SortKeyResolver { #[cfg(test)] mod tests { - use data_types::ColumnSet; use std::sync::Arc; use super::*; @@ -96,7 +95,6 @@ mod tests { &partition.transition_partition_id(), None, &["uno", "dos", "bananas"], - &ColumnSet::from([1, 2, 3]), ) .await .expect("should update existing partition key"); diff --git a/ingester/src/persist/mod.rs b/ingester/src/persist/mod.rs index 7838ad6602..1a1669a7bb 100644 --- a/ingester/src/persist/mod.rs +++ b/ingester/src/persist/mod.rs @@ -16,7 +16,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; - use data_types::{ColumnSet, CompactionLevel, ParquetFile}; + use data_types::{CompactionLevel, ParquetFile}; use futures::TryStreamExt; use iox_catalog::{ interface::{get_schema_by_id, Catalog, SoftDeletedRows}, @@ -345,10 +345,7 @@ mod tests { .cas_sort_key( &partition_id, None, - // must use column names that exist in the partition data - &["region"], - // column id of region - &ColumnSet::from([2]), + &["bananas", "are", "good", "for", "you"], ) .await .expect("failed to set catalog sort key"); @@ -383,11 +380,10 @@ mod tests { // mark_persisted() was called. assert_eq!(partition.lock().completed_persistence_count(), 1); - // Assert the sort key was also updated, adding the new columns (time) to the + // Assert the sort key was also updated, adding the new columns to the // end of the concurrently updated catalog sort key. assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(Some(p)) => { - // Before there is only ["region"] (manual sort key update above). Now ["region", "time"] - assert_eq!(p.to_columns().collect::>(), &["region", "time"]); + assert_eq!(p.to_columns().collect::>(), &["bananas", "are", "good", "for", "you", "region", "time"]); }); // Ensure a file was made visible in the catalog diff --git a/ingester/src/persist/worker.rs b/ingester/src/persist/worker.rs index 92b0155ba9..e064d4ee1a 100644 --- a/ingester/src/persist/worker.rs +++ b/ingester/src/persist/worker.rs @@ -2,7 +2,7 @@ use std::{ops::ControlFlow, sync::Arc}; use async_channel::RecvError; use backoff::Backoff; -use data_types::{ColumnsByName, CompactionLevel, ParquetFileParams}; +use data_types::{CompactionLevel, ParquetFileParams}; use iox_catalog::interface::{get_table_columns_by_id, CasFailure, Catalog}; use iox_query::exec::Executor; use iox_time::{SystemProvider, TimeProvider}; @@ -174,15 +174,8 @@ async fn compact_and_upload( where O: Send + Sync, { - // load sort key - let sort_key = ctx.sort_key().get().await; - // fetch column map - // THIS MUST BE DONE AFTER THE SORT KEY IS LOADED - let columns = fetch_column_map(ctx, worker_state, sort_key.as_ref()).await?; - - let compacted = compact(ctx, worker_state, sort_key).await; - let (sort_key_update, parquet_table_data) = - upload(ctx, worker_state, compacted, &columns).await; + let compacted = compact(ctx, worker_state).await; + let (sort_key_update, parquet_table_data) = upload(ctx, worker_state, compacted).await; if let Some(update) = sort_key_update { update_catalog_sort_key( @@ -190,7 +183,6 @@ where worker_state, update, parquet_table_data.object_store_id, - &columns, ) .await? } @@ -200,14 +192,12 @@ where /// Compact the data in `ctx` using sorted by the sort key returned from /// [`Context::sort_key()`]. -async fn compact( - ctx: &Context, - worker_state: &SharedWorkerState, - sort_key: Option, -) -> CompactedStream +async fn compact(ctx: &Context, worker_state: &SharedWorkerState) -> CompactedStream where O: Send + Sync, { + let sort_key = ctx.sort_key().get().await; + debug!( namespace_id = %ctx.namespace_id(), namespace_name = %ctx.namespace_name(), @@ -241,7 +231,6 @@ async fn upload( ctx: &Context, worker_state: &SharedWorkerState, compacted: CompactedStream, - columns: &ColumnsByName, ) -> (Option, ParquetFileParams) where O: Send + Sync, @@ -305,6 +294,15 @@ where "partition parquet uploaded" ); + // Read the table's columns from the catalog to get a map of column name -> column IDs. + let columns = Backoff::new(&Default::default()) + .retry_all_errors("get table schema", || async { + let mut repos = worker_state.catalog.repositories().await; + get_table_columns_by_id(ctx.table_id(), repos.as_mut()).await + }) + .await + .expect("retry forever"); + // Build the data that must be inserted into the parquet_files catalog // table in order to make the file visible to queriers. let parquet_table_data = @@ -323,44 +321,6 @@ where (catalog_sort_key_update, parquet_table_data) } -/// Fetch the table column map from the catalog and verify if they contain all columns in the sort key -async fn fetch_column_map( - ctx: &Context, - worker_state: &SharedWorkerState, - // NOTE: CALLER MUST LOAD SORT KEY BEFORE CALLING THIS FUNCTION EVEN IF THE sort key IS NONE - // THIS IS A MUST TO GUARANTEE THE RETURNED COLUMN MAP CONTAINS ALL COLUMNS IN THE SORT KEY - // The purpose to put the sort_key as a param here is to make sure the caller has already loaded the sort key - sort_key: Option<&SortKey>, -) -> Result -where - O: Send + Sync, -{ - // Read the table's columns from the catalog to get a map of column name -> column IDs. - let column_map = Backoff::new(&Default::default()) - .retry_all_errors("get table schema", || async { - let mut repos = worker_state.catalog.repositories().await; - get_table_columns_by_id(ctx.table_id(), repos.as_mut()).await - }) - .await - .expect("retry forever"); - - // Verify that the sort key columns are in the column map - if let Some(sort_key) = sort_key { - for sort_key_column in sort_key.to_columns() { - if !column_map.contains_column_name(sort_key_column) { - panic!( - "sort key column {} of partition id {} is not in the column map {:?}", - sort_key_column, - ctx.partition_id(), - column_map - ); - } - } - } - - Ok(column_map) -} - /// Update the sort key value stored in the catalog for this [`Context`]. /// /// # Concurrent Updates @@ -373,7 +333,6 @@ async fn update_catalog_sort_key( worker_state: &SharedWorkerState, new_sort_key: SortKey, object_store_id: Uuid, - columns: &ColumnsByName, ) -> Result<(), PersistError> where O: Send + Sync, @@ -401,19 +360,13 @@ where .retry_with_backoff("cas_sort_key", || { let old_sort_key = old_sort_key.clone(); let new_sort_key_str = new_sort_key.to_columns().collect::>(); - let new_sort_key_colids = columns.ids_for_names(&new_sort_key_str); let catalog = Arc::clone(&worker_state.catalog); let ctx = &ctx; async move { let mut repos = catalog.repositories().await; match repos .partitions() - .cas_sort_key( - ctx.partition_id(), - old_sort_key.clone(), - &new_sort_key_str, - &new_sort_key_colids, - ) + .cas_sort_key(ctx.partition_id(), old_sort_key.clone(), &new_sort_key_str) .await { Ok(_) => ControlFlow::Break(Ok(())), @@ -436,8 +389,7 @@ where partition_key = %ctx.partition_key(), expected=?old_sort_key, ?observed, - update_sort_key=?new_sort_key_str, - update_sort_key_ids=?new_sort_key_colids, + update=?new_sort_key_str, "detected matching concurrent sort key update" ); ControlFlow::Break(Ok(())) @@ -463,8 +415,7 @@ where partition_key = %ctx.partition_key(), expected=?old_sort_key, ?observed, - update_sort_key=?new_sort_key_str, - update_sort_key_ids=?new_sort_key_colids, + update=?new_sort_key_str, "detected concurrent sort key update, regenerating parquet" ); // Stop the retry loop with an error containing the diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 2d52f5759c..c40f53fbc4 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -3,10 +3,10 @@ use async_trait::async_trait; use data_types::{ partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride}, - Column, ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, - NamespaceName, NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, - ParquetFileId, ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, - SkippedCompaction, Table, TableId, TableSchema, Timestamp, TransitionPartitionId, + Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName, + NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, + ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, + Table, TableId, TableSchema, Timestamp, TransitionPartitionId, }; use iox_time::TimeProvider; use snafu::{OptionExt, Snafu}; @@ -413,7 +413,6 @@ pub trait PartitionRepo: Send + Sync { partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], - new_sort_key_ids: &ColumnSet, ) -> Result>>; /// Record an instance of a partition being selected for compaction but compaction was not @@ -1599,24 +1598,19 @@ pub(crate) mod test_helpers { assert_eq!(created.keys().copied().collect::>(), listed); - // sort_key and sort_key_ids should be empty on creation + // sort_key should be empty on creation assert!(to_skip_partition.sort_key.is_empty()); - assert!(to_skip_partition.sort_key_ids.is_empty()); // test update_sort_key from None to Some - let partition = repos + repos .partitions() .cas_sort_key( &to_skip_partition.transition_partition_id(), None, &["tag2", "tag1", "time"], - &ColumnSet::from([1, 2, 3]), ) .await .unwrap(); - // verify sort key and sort key ids are updated - assert_eq!(partition.sort_key, &["tag2", "tag1", "time"]); - assert_eq!(partition.sort_key_ids, ColumnSet::from([1, 2, 3])); // test sort key CAS with an incorrect value let err = repos @@ -1625,7 +1619,6 @@ pub(crate) mod test_helpers { &to_skip_partition.transition_partition_id(), Some(["bananas".to_string()].to_vec()), &["tag2", "tag1", "tag3 , with comma", "time"], - &ColumnSet::from([1, 2, 3, 4]), ) .await .expect_err("CAS with incorrect value should fail"); @@ -1662,7 +1655,6 @@ pub(crate) mod test_helpers { &to_skip_partition.transition_partition_id(), None, &["tag2", "tag1", "tag3 , with comma", "time"], - &ColumnSet::from([1, 2, 3, 4]), ) .await .expect_err("CAS with incorrect value should fail"); @@ -1677,7 +1669,6 @@ pub(crate) mod test_helpers { &to_skip_partition.transition_partition_id(), Some(["bananas".to_string()].to_vec()), &["tag2", "tag1", "tag3 , with comma", "time"], - &ColumnSet::from([1, 2, 3, 4]), ) .await .expect_err("CAS with incorrect value should fail"); @@ -1697,7 +1688,6 @@ pub(crate) mod test_helpers { .collect(), ), &["tag2", "tag1", "tag3 , with comma", "time"], - &ColumnSet::from([1, 2, 3, 4]), ) .await .unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index e9005945b3..14705afdee 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -16,10 +16,10 @@ use data_types::{ partition_template::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart, }, - Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, - NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, - ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, - Table, TableId, Timestamp, TransitionPartitionId, + Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, + NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams, + Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, + Timestamp, TransitionPartitionId, }; use iox_time::{SystemProvider, TimeProvider}; use snafu::ensure; @@ -566,7 +566,6 @@ impl PartitionRepo for MemTxn { table_id, key, vec![], - Default::default(), None, ); stage.partitions.push(p); @@ -662,15 +661,7 @@ impl PartitionRepo for MemTxn { partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], - new_sort_key_ids: &ColumnSet, ) -> Result>> { - // panic if new_sort_key and new_sort_key_ids have different lengths - assert_eq!( - new_sort_key.len(), - new_sort_key_ids.len(), - "new_sort_key and new_sort_key_ids must be the same length" - ); - let stage = self.stage(); let old_sort_key = old_sort_key.unwrap_or_default(); @@ -682,7 +673,6 @@ impl PartitionRepo for MemTxn { }) { Some(p) if p.sort_key == old_sort_key => { p.sort_key = new_sort_key.iter().map(|s| s.to_string()).collect(); - p.sort_key_ids = new_sort_key_ids.clone(); Ok(p.clone()) } Some(p) => return Err(CasFailure::ValueMismatch(p.sort_key.clone())), diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index a5fb2fe4e0..6b6e4df0e6 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -7,7 +7,7 @@ use crate::interface::{ use async_trait::async_trait; use data_types::{ partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride}, - Column, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, + Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp, TransitionPartitionId, @@ -176,7 +176,7 @@ decorate!( "partition_get_by_hash_id_batch" = get_by_hash_id_batch(&mut self, partition_hash_ids: &[&PartitionHashId]) -> Result>; "partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result>; "partition_list_ids" = list_ids(&mut self) -> Result>; - "partition_update_sort_key" = cas_sort_key(&mut self, partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], new_sort_key_ids: &ColumnSet) -> Result>>; + "partition_update_sort_key" = cas_sort_key(&mut self, partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str]) -> Result>>; "partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>; "partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result>; "partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 79bcf5d93e..88148b2936 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -20,7 +20,7 @@ use data_types::{ partition_template::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart, }, - Column, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, + Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp, TransitionPartitionId, @@ -1163,12 +1163,12 @@ impl PartitionRepo for PostgresTxn { let v = sqlx::query_as::<_, Partition>( r#" INSERT INTO partition - (partition_key, shard_id, table_id, hash_id, sort_key, sort_key_ids) + (partition_key, shard_id, table_id, hash_id, sort_key) VALUES - ( $1, $2, $3, $4, '{}', '{}') + ( $1, $2, $3, $4, '{}') ON CONFLICT ON CONSTRAINT partition_key_unique DO UPDATE SET partition_key = partition.partition_key -RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; +RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; "#, ) .bind(key) // $1 @@ -1191,7 +1191,7 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file async fn get_by_id(&mut self, partition_id: PartitionId) -> Result> { let rec = sqlx::query_as::<_, Partition>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE id = $1; "#, @@ -1214,7 +1214,7 @@ WHERE id = $1; sqlx::query_as::<_, Partition>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE id = ANY($1); "#, @@ -1231,7 +1231,7 @@ WHERE id = ANY($1); ) -> Result> { let rec = sqlx::query_as::<_, Partition>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE hash_id = $1; "#, @@ -1257,7 +1257,7 @@ WHERE hash_id = $1; sqlx::query_as::<_, Partition>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE hash_id = ANY($1); "#, @@ -1271,7 +1271,7 @@ WHERE hash_id = ANY($1); async fn list_by_table_id(&mut self, table_id: TableId) -> Result> { sqlx::query_as::<_, Partition>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE table_id = $1; "#, @@ -1305,42 +1305,32 @@ WHERE table_id = $1; partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], - new_sort_key_ids: &ColumnSet, ) -> Result>> { - // panic if new_sort_key and new_sort_key_ids have different lengths - assert_eq!( - new_sort_key.len(), - new_sort_key_ids.len(), - "new_sort_key and new_sort_key_ids must be the same length" - ); - let old_sort_key = old_sort_key.unwrap_or_default(); // This `match` will go away when all partitions have hash IDs in the database. let query = match partition_id { TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, Partition>( r#" UPDATE partition -SET sort_key = $1, sort_key_ids = $4 +SET sort_key = $1 WHERE hash_id = $2 AND sort_key = $3 -RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; +RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; "#, ) .bind(new_sort_key) // $1 .bind(hash_id) // $2 - .bind(&old_sort_key) // $3 - .bind(new_sort_key_ids), // $4 + .bind(&old_sort_key), // $3 TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, Partition>( r#" UPDATE partition -SET sort_key = $1, sort_key_ids = $4 +SET sort_key = $1 WHERE id = $2 AND sort_key = $3 -RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; +RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; "#, ) .bind(new_sort_key) // $1 .bind(id) // $2 - .bind(&old_sort_key) // $3 - .bind(new_sort_key_ids), // $4 + .bind(&old_sort_key), // $3 }; let res = query.fetch_one(&mut self.inner).await; @@ -1376,7 +1366,6 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file ?partition_id, ?old_sort_key, ?new_sort_key, - ?new_sort_key_ids, "partition sort key cas successful" ); @@ -1475,7 +1464,7 @@ RETURNING * async fn most_recent_n(&mut self, n: usize) -> Result> { sqlx::query_as( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, persisted_sequence_number, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, persisted_sequence_number, new_file_at FROM partition ORDER BY id DESC LIMIT $1;"#, @@ -2205,12 +2194,12 @@ mod tests { sqlx::query( r#" INSERT INTO partition - (partition_key, shard_id, table_id, sort_key, sort_key_ids) + (partition_key, shard_id, table_id, sort_key) VALUES - ( $1, $2, $3, '{}', '{}') + ( $1, $2, $3, '{}') ON CONFLICT ON CONSTRAINT partition_key_unique DO UPDATE SET partition_key = partition.partition_key -RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; +RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; "#, ) .bind(&key) // $1 diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index d0d9698822..de2dd2f830 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -818,7 +818,6 @@ struct PartitionPod { table_id: TableId, partition_key: PartitionKey, sort_key: Json>, - sort_key_ids: Json>, new_file_at: Option, } @@ -830,7 +829,6 @@ impl From for Partition { value.table_id, value.partition_key, value.sort_key.0, - ColumnSet::from(value.sort_key_ids.0), value.new_file_at, ) } @@ -848,12 +846,12 @@ impl PartitionRepo for SqliteTxn { let v = sqlx::query_as::<_, PartitionPod>( r#" INSERT INTO partition - (partition_key, shard_id, table_id, hash_id, sort_key, sort_key_ids) + (partition_key, shard_id, table_id, hash_id, sort_key) VALUES - ($1, $2, $3, $4, '[]', '[]') + ($1, $2, $3, $4, '[]') ON CONFLICT (table_id, partition_key) DO UPDATE SET partition_key = partition.partition_key -RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; +RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; "#, ) .bind(key) // $1 @@ -876,7 +874,7 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file async fn get_by_id(&mut self, partition_id: PartitionId) -> Result> { let rec = sqlx::query_as::<_, PartitionPod>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE id = $1; "#, @@ -900,7 +898,7 @@ WHERE id = $1; sqlx::query_as::<_, PartitionPod>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE id IN (SELECT value FROM json_each($1)); "#, @@ -918,7 +916,7 @@ WHERE id IN (SELECT value FROM json_each($1)); ) -> Result> { let rec = sqlx::query_as::<_, PartitionPod>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE hash_id = $1; "#, @@ -956,7 +954,7 @@ WHERE hash_id = $1; sqlx::query_as::<_, PartitionPod>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE hex(hash_id) IN (SELECT value FROM json_each($1)); "#, @@ -971,7 +969,7 @@ WHERE hex(hash_id) IN (SELECT value FROM json_each($1)); async fn list_by_table_id(&mut self, table_id: TableId) -> Result> { Ok(sqlx::query_as::<_, PartitionPod>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition WHERE table_id = $1; "#, @@ -1008,44 +1006,33 @@ WHERE table_id = $1; partition_id: &TransitionPartitionId, old_sort_key: Option>, new_sort_key: &[&str], - new_sort_key_ids: &ColumnSet, ) -> Result>> { - // panic if new_sort_key and new_sort_key_ids have different lengths - assert_eq!( - new_sort_key.len(), - new_sort_key_ids.len(), - "new_sort_key and new_sort_key_ids must be the same length" - ); - let old_sort_key = old_sort_key.unwrap_or_default(); - let raw_new_sort_key_ids: Vec<_> = new_sort_key_ids.iter().map(|cid| cid.get()).collect(); // This `match` will go away when all partitions have hash IDs in the database. let query = match partition_id { TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, PartitionPod>( r#" UPDATE partition -SET sort_key = $1, sort_key_ids = $4 +SET sort_key = $1 WHERE hash_id = $2 AND sort_key = $3 -RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; +RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; "#, ) .bind(Json(new_sort_key)) // $1 .bind(hash_id) // $2 - .bind(Json(&old_sort_key)) // $3 - .bind(Json(&raw_new_sort_key_ids)), // $4 + .bind(Json(&old_sort_key)), // $3 TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, PartitionPod>( r#" UPDATE partition -SET sort_key = $1, sort_key_ids = $4 +SET sort_key = $1 WHERE id = $2 AND sort_key = $3 -RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; +RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; "#, ) .bind(Json(new_sort_key)) // $1 .bind(id) // $2 - .bind(Json(&old_sort_key)) // $3 - .bind(Json(&raw_new_sort_key_ids)), // $4 + .bind(Json(&old_sort_key)), // $3 }; let res = query.fetch_one(self.inner.get_mut()).await; @@ -1081,7 +1068,6 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file ?partition_id, ?old_sort_key, ?new_sort_key, - ?new_sort_key_ids, "partition sort key cas successful" ); @@ -1178,7 +1164,7 @@ RETURNING * async fn most_recent_n(&mut self, n: usize) -> Result> { Ok(sqlx::query_as::<_, PartitionPod>( r#" -SELECT id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at FROM partition ORDER BY id DESC LIMIT $1; @@ -1789,12 +1775,12 @@ mod tests { sqlx::query( r#" INSERT INTO partition - (partition_key, shard_id, table_id, sort_key, sort_key_ids) + (partition_key, shard_id, table_id, sort_key) VALUES - ($1, $2, $3, '[]', '[]') + ($1, $2, $3, '[]') ON CONFLICT (table_id, partition_key) DO UPDATE SET partition_key = partition.partition_key -RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at; +RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; "#, ) .bind(&key) // $1 diff --git a/iox_tests/src/builders.rs b/iox_tests/src/builders.rs index 54251e0e54..ec73ce4fab 100644 --- a/iox_tests/src/builders.rs +++ b/iox_tests/src/builders.rs @@ -161,7 +161,6 @@ impl PartitionBuilder { TableId::new(0), PartitionKey::from("key"), vec![], - Default::default(), None, ), } diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index d02d5c00fe..9d36bf787b 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -318,7 +318,6 @@ impl TestTable { self: &Arc, key: &str, sort_key: &[&str], - sort_key_ids: &[i64], ) -> Arc { let mut repos = self.catalog.catalog.repositories().await; @@ -334,7 +333,6 @@ impl TestTable { &TransitionPartitionId::Deprecated(partition.id), None, sort_key, - &ColumnSet::from(sort_key_ids.iter().cloned()), ) .await .unwrap(); @@ -427,12 +425,6 @@ pub struct TestColumn { pub column: Column, } -impl TestColumn { - pub fn id(&self) -> i64 { - self.column.id.get() - } -} - /// A test catalog with specified namespace, table, partition #[allow(missing_docs)] #[derive(Debug)] @@ -445,11 +437,7 @@ pub struct TestPartition { impl TestPartition { /// Update sort key. - pub async fn update_sort_key( - self: &Arc, - sort_key: SortKey, - sort_key_ids: &ColumnSet, - ) -> Arc { + pub async fn update_sort_key(self: &Arc, sort_key: SortKey) -> Arc { let old_sort_key = partition_lookup( self.catalog.catalog.repositories().await.as_mut(), &self.partition.transition_partition_id(), @@ -469,7 +457,6 @@ impl TestPartition { &self.partition.transition_partition_id(), Some(old_sort_key), &sort_key.to_columns().collect::>(), - sort_key_ids, ) .await .unwrap(); @@ -788,11 +775,6 @@ async fn update_catalog_sort_key_if_needed( // Fetch the latest partition info from the catalog let partition = partition_lookup(repos, id).await.unwrap().unwrap(); - // fecth column ids from catalog - let columns = get_table_columns_by_id(partition.table_id, repos) - .await - .unwrap(); - // Similarly to what the ingester does, if there's an existing sort key in the catalog, add new // columns onto the end match partition.sort_key() { @@ -806,9 +788,6 @@ async fn update_catalog_sort_key_if_needed( catalog_sort_key.to_columns().collect::>(), &new_columns, ); - - let column_ids = columns.ids_for_names(&new_columns); - repos .partitions() .cas_sort_key( @@ -820,7 +799,6 @@ async fn update_catalog_sort_key_if_needed( .collect::>(), ), &new_columns, - &column_ids, ) .await .unwrap(); @@ -829,10 +807,9 @@ async fn update_catalog_sort_key_if_needed( None => { let new_columns = sort_key.to_columns().collect::>(); debug!("Updating sort key from None to {:?}", &new_columns); - let column_ids = columns.ids_for_names(&new_columns); repos .partitions() - .cas_sort_key(id, None, &new_columns, &column_ids) + .cas_sort_key(id, None, &new_columns) .await .unwrap(); } diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 035a2ffa4d..c842270ca2 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -390,8 +390,8 @@ mod tests { }; use async_trait::async_trait; use data_types::{ - partition_template::TablePartitionTemplateOverride, ColumnSet, ColumnType, PartitionId, - PartitionKey, TableId, + partition_template::TablePartitionTemplateOverride, ColumnType, PartitionId, PartitionKey, + TableId, }; use futures::StreamExt; use generated_types::influxdata::iox::partition_template::v1::{ @@ -410,7 +410,7 @@ mod tests { let c1 = t.create_column("tag", ColumnType::Tag).await; let c2 = t.create_column("time", ColumnType::Time).await; let p1 = t - .create_partition_with_sort_key("k1", &["tag", "time"], &[c1.id(), c2.id()]) + .create_partition_with_sort_key("k1", &["tag", "time"]) .await .partition .clone(); @@ -865,10 +865,10 @@ mod tests { // set sort key let p = p - .update_sort_key( - SortKey::from_columns([c1.column.name.as_str(), c2.column.name.as_str()]), - &ColumnSet::from([c1.column.id.get(), c2.column.id.get()]), - ) + .update_sort_key(SortKey::from_columns([ + c1.column.name.as_str(), + c2.column.name.as_str(), + ])) .await; assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 1); @@ -1110,12 +1110,11 @@ mod tests { partition_template: TablePartitionTemplateOverride::default(), }); const N_PARTITIONS: usize = 20; - let c_id = c.column.id.get(); let mut partitions = futures::stream::iter(0..N_PARTITIONS) .then(|i| { let t = Arc::clone(&t); async move { - t.create_partition_with_sort_key(&format!("p{i}"), &["time"], &[c_id]) + t.create_partition_with_sort_key(&format!("p{i}"), &["time"]) .await .partition .transition_partition_id() diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index 7cf83f6c91..dbac06a736 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -105,7 +105,7 @@ pub mod tests { use super::*; use arrow::{datatypes::DataType, record_batch::RecordBatch}; use arrow_util::assert_batches_eq; - use data_types::{ColumnSet, ColumnType, ParquetFile}; + use data_types::{ColumnType, ParquetFile}; use datafusion_util::config::register_iox_object_store; use iox_query::{ exec::{ExecutorType, IOxSessionContext}, @@ -186,20 +186,17 @@ pub mod tests { .join("\n"); let ns = catalog.create_namespace_1hr_retention("ns").await; let table = ns.create_table("table").await; - let tag1 = table.create_column("tag1", ColumnType::Tag).await; - let tag2 = table.create_column("tag2", ColumnType::Tag).await; + table.create_column("tag1", ColumnType::Tag).await; + table.create_column("tag2", ColumnType::Tag).await; table.create_column("tag3", ColumnType::Tag).await; - let tag4 = table.create_column("tag4", ColumnType::Tag).await; + table.create_column("tag4", ColumnType::Tag).await; table.create_column("field_int", ColumnType::I64).await; table.create_column("field_float", ColumnType::F64).await; - let col_time = table.create_column("time", ColumnType::Time).await; + table.create_column("time", ColumnType::Time).await; let partition = table .create_partition("part") .await - .update_sort_key( - SortKey::from_columns(["tag1", "tag2", "tag4", "time"]), - &ColumnSet::from([tag1.id(), tag2.id(), tag4.id(), col_time.id()]), - ) + .update_sort_key(SortKey::from_columns(["tag1", "tag2", "tag4", "time"])) .await; let builder = TestParquetFileBuilder::default().with_line_protocol(&lp); let parquet_file = Arc::new(partition.create_parquet_file(builder).await.parquet_file); diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index 368852b37d..c5787ac07e 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -237,7 +237,6 @@ fn to_partition(p: data_types::Partition) -> Partition { key: p.partition_key.to_string(), table_id: p.table_id.get(), array_sort_key: p.sort_key, - array_sort_key_ids: p.sort_key_ids.iter().map(|id| id.get()).collect(), } }