From ac426fe5e1f7dab9d21b3f67fd0eae49b5356cdf Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 14 Sep 2023 10:06:21 -0400 Subject: [PATCH] feat: ingester reads `sort_key_ids` instead of `sort_key` (#8588) * feat: have ingester's SortKeyState include sort_key_ids * fix: test failures * chore: address review comments * feat: first step to compare sort_key_ids * feat: compare sort_key_ids in cas_sort_key * fix: comment typos * feat: ingester reads sort_key_ids instead of sort_key * refactor: use direct assert instead of going true a function * chore: fix typo * test: add tests and comments * chore: fix typos * test: add more test to handle empty sort key * chore: address review comments * fix: type * chore: address review comments --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../buffer_tree/partition/resolver/catalog.rs | 38 +++- .../partition/resolver/sort_key.rs | 164 ++++++++++++++++-- ingester/src/test_util.rs | 29 ++++ 3 files changed, 213 insertions(+), 18 deletions(-) diff --git a/ingester/src/buffer_tree/partition/resolver/catalog.rs b/ingester/src/buffer_tree/partition/resolver/catalog.rs index 224506cbe9..a540766c2b 100644 --- a/ingester/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester/src/buffer_tree/partition/resolver/catalog.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{NamespaceId, Partition, PartitionKey, TableId}; +use data_types::{Column, NamespaceId, Partition, PartitionKey, TableId}; use iox_catalog::interface::Catalog; use observability_deps::tracing::debug; use parking_lot::Mutex; @@ -14,7 +14,9 @@ use super::r#trait::PartitionProvider; use crate::{ buffer_tree::{ namespace::NamespaceName, - partition::{PartitionData, SortKeyState}, + partition::{ + resolver::build_sort_key_from_sort_key_ids_and_columns, PartitionData, SortKeyState, + }, table::metadata::TableMetadata, }, deferred_load::DeferredLoad, @@ -51,6 +53,18 @@ impl CatalogPartitionResolver { .create_or_get(partition_key, table_id) .await } + + async fn get_columns( + &self, + table_id: TableId, + ) -> Result, iox_catalog::interface::Error> { + self.catalog + .repositories() + .await + .columns() + .list_by_table_id(table_id) + .await + } } #[async_trait] @@ -79,10 +93,20 @@ impl PartitionProvider for CatalogPartitionResolver { let p_sort_key = p.sort_key(); let p_sort_key_ids = p.sort_key_ids_none_if_empty(); - assert_eq!( - p_sort_key.as_ref().map(|v| v.len()), - p_sort_key_ids.as_ref().map(|v| v.len()) - ); + // fetch columns of the table to build sort_key from sort_key_ids + let columns = Backoff::new(&self.backoff_config) + .retry_all_errors("resolve partition's table columns", || { + self.get_columns(table_id) + }) + .await + .expect("retry forever"); + + // build sort_key from sort_key_ids and columns + let sort_key = + build_sort_key_from_sort_key_ids_and_columns(&p_sort_key_ids, columns.into_iter()); + + // This is here to catch bugs and will be removed once the sort_key is removed from the partition + assert_eq!(sort_key, p_sort_key); Arc::new(Mutex::new(PartitionData::new( p.transition_partition_id(), @@ -94,7 +118,7 @@ impl PartitionProvider for CatalogPartitionResolver { namespace_name, table_id, table, - SortKeyState::Provided(p_sort_key, p_sort_key_ids), + SortKeyState::Provided(sort_key, p_sort_key_ids), ))) } } diff --git a/ingester/src/buffer_tree/partition/resolver/sort_key.rs b/ingester/src/buffer_tree/partition/resolver/sort_key.rs index 097b1befd5..46555f0c82 100644 --- a/ingester/src/buffer_tree/partition/resolver/sort_key.rs +++ b/ingester/src/buffer_tree/partition/resolver/sort_key.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use backoff::{Backoff, BackoffConfig}; -use data_types::{PartitionKey, SortedColumnSet, TableId}; +use data_types::{Column, PartitionKey, SortedColumnSet, TableId}; use iox_catalog::interface::Catalog; use schema::sort::SortKey; @@ -37,34 +37,64 @@ impl SortKeyResolver { Backoff::new(&self.backoff_config) .retry_all_errors("fetch partition sort key", || async { let mut repos = self.catalog.repositories().await; + + // fetch partition let partition = repos .partitions() .create_or_get(self.partition_key.clone(), self.table_id) .await?; - let (sort_key, sort_key_ids) = + let (p_sort_key, p_sort_key_ids) = (partition.sort_key(), partition.sort_key_ids_none_if_empty()); - assert_eq!( - sort_key.as_ref().map(|v| v.len()), - sort_key_ids.as_ref().map(|v| v.len()) + // fetch partition's table columns + let columns = repos.columns().list_by_table_id(self.table_id).await?; + + // build sort_key from sort_key_ids and columns + let sort_key = build_sort_key_from_sort_key_ids_and_columns( + &p_sort_key_ids, + columns.into_iter(), ); - Result::<_, iox_catalog::interface::Error>::Ok((sort_key, sort_key_ids)) + // This is here to catch bugs and will be removed once the sort_key is removed from the partition + assert_eq!(sort_key, p_sort_key); + + Result::<_, iox_catalog::interface::Error>::Ok((sort_key, p_sort_key_ids)) }) .await .expect("retry forever") } } +// build sort_key from sort_key_ids and columns +// panic if the sort_key_ids are not found in the columns +pub(crate) fn build_sort_key_from_sort_key_ids_and_columns( + sort_key_ids: &Option, + columns: I, +) -> Option +where + I: Iterator, +{ + let mut column_names = columns + .map(|c| (c.id, c.name)) + .collect::>(); + sort_key_ids.as_ref().map(|ids| { + SortKey::from_columns(ids.iter().map(|id| { + column_names + .remove(id) + .unwrap_or_else(|| panic!("cannot find column names for sort key id {}", id.get())) + })) + }) +} + #[cfg(test)] mod tests { use std::sync::Arc; - use data_types::SortedColumnSet; + use data_types::{ColumnId, ColumnType, SortedColumnSet}; use super::*; - use crate::test_util::populate_catalog; + use crate::test_util::populate_catalog_with_table_columns; const TABLE_NAME: &str = "bananas"; const NAMESPACE_NAME: &str = "platanos"; @@ -77,8 +107,14 @@ mod tests { let catalog: Arc = Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); - // Populate the catalog with the namespace / table - let (_ns_id, table_id) = populate_catalog(&*catalog, NAMESPACE_NAME, TABLE_NAME).await; + // Populate the catalog with the namespace, table & columns + let (_ns_id, table_id, col_ids) = populate_catalog_with_table_columns( + &*catalog, + NAMESPACE_NAME, + TABLE_NAME, + &["bananas", "uno", "dos", "tres"], + ) + .await; let partition = catalog .repositories() @@ -108,7 +144,7 @@ mod tests { None, None, &["uno", "dos", "bananas"], - &SortedColumnSet::from([1, 2, 3]), + &SortedColumnSet::from(vec![col_ids[1].get(), col_ids[2].get(), col_ids[0].get()]), ) .await .expect("should update existing partition key"); @@ -122,4 +158,110 @@ mod tests { catalog_state.sort_key_ids_none_if_empty() ); } + + // panic if the sort_key_ids are not found in the columns + #[tokio::test] + #[should_panic(expected = "cannot find column names for sort key id 3")] + async fn test_panic_build_sort_key_from_sort_key_ids_and_columns() { + // table columns + let columns = vec![ + Column { + name: "uno".into(), + id: ColumnId::new(1), + column_type: ColumnType::Tag, + table_id: TableId::new(1), + }, + Column { + name: "dos".into(), + id: ColumnId::new(2), + column_type: ColumnType::Tag, + table_id: TableId::new(1), + }, + ]; + + // sort_key_ids include some columns that are not in the columns + let sort_key_ids = Some(SortedColumnSet::from([2, 3])); + let _sort_key = + build_sort_key_from_sort_key_ids_and_columns(&sort_key_ids, columns.into_iter()); + } + + #[tokio::test] + async fn test_build_sort_key_from_sort_key_ids_and_columns() { + // table columns + let columns = vec![ + Column { + name: "uno".into(), + id: ColumnId::new(1), + column_type: ColumnType::Tag, + table_id: TableId::new(1), + }, + Column { + name: "dos".into(), + id: ColumnId::new(2), + column_type: ColumnType::Tag, + table_id: TableId::new(1), + }, + Column { + name: "tres".into(), + id: ColumnId::new(3), + column_type: ColumnType::Tag, + table_id: TableId::new(1), + }, + ]; + + // sort_key_ids is None + let sort_key_ids = None; + let sort_key = build_sort_key_from_sort_key_ids_and_columns( + &sort_key_ids, + columns.clone().into_iter(), + ); + assert_eq!(sort_key, None); + + // sort_key_ids is empty + let sort_key_ids = Some(SortedColumnSet::new(vec![])); + let sort_key = build_sort_key_from_sort_key_ids_and_columns( + &sort_key_ids, + columns.clone().into_iter(), + ); + let vec: Vec<&str> = vec![]; + assert_eq!(sort_key, Some(SortKey::from_columns(vec))); + + // sort_key_ids include all columns and in the same order + let sort_key_ids = Some(SortedColumnSet::from([1, 2, 3])); + let sort_key = build_sort_key_from_sort_key_ids_and_columns( + &sort_key_ids, + columns.clone().into_iter(), + ); + assert_eq!( + sort_key, + Some(SortKey::from_columns(vec!["uno", "dos", "tres"])) + ); + + // sort_key_ids include all columns but in different order + let sort_key_ids = Some(SortedColumnSet::from([2, 3, 1])); + let sort_key = build_sort_key_from_sort_key_ids_and_columns( + &sort_key_ids, + columns.clone().into_iter(), + ); + assert_eq!( + sort_key, + Some(SortKey::from_columns(vec!["dos", "tres", "uno"])) + ); + + // sort_key_ids include some columns + let sort_key_ids = Some(SortedColumnSet::from([2, 3])); + let sort_key = build_sort_key_from_sort_key_ids_and_columns( + &sort_key_ids, + columns.clone().into_iter(), + ); + assert_eq!(sort_key, Some(SortKey::from_columns(vec!["dos", "tres"]))); + + // sort_key_ids include some columns in different order + let sort_key_ids = Some(SortedColumnSet::from([3, 1])); + let sort_key = build_sort_key_from_sort_key_ids_and_columns( + &sort_key_ids, + columns.clone().into_iter(), + ); + assert_eq!(sort_key, Some(SortKey::from_columns(vec!["tres", "uno"]))); + } } diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index f5c4ae41d3..e383882d59 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -413,6 +413,35 @@ pub(crate) async fn populate_catalog( (ns_id, table_id) } +pub(crate) async fn populate_catalog_with_table_columns( + catalog: &dyn Catalog, + namespace: &str, + table: &str, + columns: &[&str], +) -> (NamespaceId, TableId, Vec) { + let mut c = catalog.repositories().await; + let ns_id = arbitrary_namespace(&mut *c, namespace).await.id; + let table_id = c + .tables() + .create(table, Default::default(), ns_id) + .await + .unwrap() + .id; + + let mut column_ids = Vec::with_capacity(columns.len()); + for column in columns { + column_ids.push( + c.columns() + .create_or_get(column, table_id, data_types::ColumnType::Tag) + .await + .expect("create column failed") + .id, + ); + } + + (ns_id, table_id, column_ids) +} + /// Assert `a` and `b` have identical metadata, and that when converting /// them to Arrow batches they produces identical output. #[track_caller]