feat: ingester reads `sort_key_ids` instead of `sort_key` (#8588)

* feat: have ingester's SortKeyState include sort_key_ids

* fix: test failures

* chore: address review comments

* feat: first step to compare sort_key_ids

* feat: compare sort_key_ids in cas_sort_key

* fix: comment typos

* feat: ingester reads sort_key_ids instead of sort_key

* refactor: use direct assert instead of going true a function

* chore: fix typo

* test: add tests and comments

* chore: fix typos

* test: add more test to handle empty sort key

* chore: address review comments

* fix: type

* chore: address review comments

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2023-09-14 10:06:21 -04:00 committed by GitHub
parent 75ceafff54
commit ac426fe5e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 213 additions and 18 deletions

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{NamespaceId, Partition, PartitionKey, TableId};
use data_types::{Column, NamespaceId, Partition, PartitionKey, TableId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug;
use parking_lot::Mutex;
@ -14,7 +14,9 @@ use super::r#trait::PartitionProvider;
use crate::{
buffer_tree::{
namespace::NamespaceName,
partition::{PartitionData, SortKeyState},
partition::{
resolver::build_sort_key_from_sort_key_ids_and_columns, PartitionData, SortKeyState,
},
table::metadata::TableMetadata,
},
deferred_load::DeferredLoad,
@ -51,6 +53,18 @@ impl CatalogPartitionResolver {
.create_or_get(partition_key, table_id)
.await
}
async fn get_columns(
&self,
table_id: TableId,
) -> Result<Vec<Column>, iox_catalog::interface::Error> {
self.catalog
.repositories()
.await
.columns()
.list_by_table_id(table_id)
.await
}
}
#[async_trait]
@ -79,10 +93,20 @@ impl PartitionProvider for CatalogPartitionResolver {
let p_sort_key = p.sort_key();
let p_sort_key_ids = p.sort_key_ids_none_if_empty();
assert_eq!(
p_sort_key.as_ref().map(|v| v.len()),
p_sort_key_ids.as_ref().map(|v| v.len())
);
// fetch columns of the table to build sort_key from sort_key_ids
let columns = Backoff::new(&self.backoff_config)
.retry_all_errors("resolve partition's table columns", || {
self.get_columns(table_id)
})
.await
.expect("retry forever");
// build sort_key from sort_key_ids and columns
let sort_key =
build_sort_key_from_sort_key_ids_and_columns(&p_sort_key_ids, columns.into_iter());
// This is here to catch bugs and will be removed once the sort_key is removed from the partition
assert_eq!(sort_key, p_sort_key);
Arc::new(Mutex::new(PartitionData::new(
p.transition_partition_id(),
@ -94,7 +118,7 @@ impl PartitionProvider for CatalogPartitionResolver {
namespace_name,
table_id,
table,
SortKeyState::Provided(p_sort_key, p_sort_key_ids),
SortKeyState::Provided(sort_key, p_sort_key_ids),
)))
}
}

View File

@ -3,7 +3,7 @@
use std::sync::Arc;
use backoff::{Backoff, BackoffConfig};
use data_types::{PartitionKey, SortedColumnSet, TableId};
use data_types::{Column, PartitionKey, SortedColumnSet, TableId};
use iox_catalog::interface::Catalog;
use schema::sort::SortKey;
@ -37,34 +37,64 @@ impl SortKeyResolver {
Backoff::new(&self.backoff_config)
.retry_all_errors("fetch partition sort key", || async {
let mut repos = self.catalog.repositories().await;
// fetch partition
let partition = repos
.partitions()
.create_or_get(self.partition_key.clone(), self.table_id)
.await?;
let (sort_key, sort_key_ids) =
let (p_sort_key, p_sort_key_ids) =
(partition.sort_key(), partition.sort_key_ids_none_if_empty());
assert_eq!(
sort_key.as_ref().map(|v| v.len()),
sort_key_ids.as_ref().map(|v| v.len())
// fetch partition's table columns
let columns = repos.columns().list_by_table_id(self.table_id).await?;
// build sort_key from sort_key_ids and columns
let sort_key = build_sort_key_from_sort_key_ids_and_columns(
&p_sort_key_ids,
columns.into_iter(),
);
Result::<_, iox_catalog::interface::Error>::Ok((sort_key, sort_key_ids))
// This is here to catch bugs and will be removed once the sort_key is removed from the partition
assert_eq!(sort_key, p_sort_key);
Result::<_, iox_catalog::interface::Error>::Ok((sort_key, p_sort_key_ids))
})
.await
.expect("retry forever")
}
}
// build sort_key from sort_key_ids and columns
// panic if the sort_key_ids are not found in the columns
pub(crate) fn build_sort_key_from_sort_key_ids_and_columns<I>(
sort_key_ids: &Option<SortedColumnSet>,
columns: I,
) -> Option<SortKey>
where
I: Iterator<Item = Column>,
{
let mut column_names = columns
.map(|c| (c.id, c.name))
.collect::<hashbrown::HashMap<_, _>>();
sort_key_ids.as_ref().map(|ids| {
SortKey::from_columns(ids.iter().map(|id| {
column_names
.remove(id)
.unwrap_or_else(|| panic!("cannot find column names for sort key id {}", id.get()))
}))
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::SortedColumnSet;
use data_types::{ColumnId, ColumnType, SortedColumnSet};
use super::*;
use crate::test_util::populate_catalog;
use crate::test_util::populate_catalog_with_table_columns;
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
@ -77,8 +107,14 @@ mod tests {
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
// Populate the catalog with the namespace / table
let (_ns_id, table_id) = populate_catalog(&*catalog, NAMESPACE_NAME, TABLE_NAME).await;
// Populate the catalog with the namespace, table & columns
let (_ns_id, table_id, col_ids) = populate_catalog_with_table_columns(
&*catalog,
NAMESPACE_NAME,
TABLE_NAME,
&["bananas", "uno", "dos", "tres"],
)
.await;
let partition = catalog
.repositories()
@ -108,7 +144,7 @@ mod tests {
None,
None,
&["uno", "dos", "bananas"],
&SortedColumnSet::from([1, 2, 3]),
&SortedColumnSet::from(vec![col_ids[1].get(), col_ids[2].get(), col_ids[0].get()]),
)
.await
.expect("should update existing partition key");
@ -122,4 +158,110 @@ mod tests {
catalog_state.sort_key_ids_none_if_empty()
);
}
// panic if the sort_key_ids are not found in the columns
#[tokio::test]
#[should_panic(expected = "cannot find column names for sort key id 3")]
async fn test_panic_build_sort_key_from_sort_key_ids_and_columns() {
// table columns
let columns = vec![
Column {
name: "uno".into(),
id: ColumnId::new(1),
column_type: ColumnType::Tag,
table_id: TableId::new(1),
},
Column {
name: "dos".into(),
id: ColumnId::new(2),
column_type: ColumnType::Tag,
table_id: TableId::new(1),
},
];
// sort_key_ids include some columns that are not in the columns
let sort_key_ids = Some(SortedColumnSet::from([2, 3]));
let _sort_key =
build_sort_key_from_sort_key_ids_and_columns(&sort_key_ids, columns.into_iter());
}
#[tokio::test]
async fn test_build_sort_key_from_sort_key_ids_and_columns() {
// table columns
let columns = vec![
Column {
name: "uno".into(),
id: ColumnId::new(1),
column_type: ColumnType::Tag,
table_id: TableId::new(1),
},
Column {
name: "dos".into(),
id: ColumnId::new(2),
column_type: ColumnType::Tag,
table_id: TableId::new(1),
},
Column {
name: "tres".into(),
id: ColumnId::new(3),
column_type: ColumnType::Tag,
table_id: TableId::new(1),
},
];
// sort_key_ids is None
let sort_key_ids = None;
let sort_key = build_sort_key_from_sort_key_ids_and_columns(
&sort_key_ids,
columns.clone().into_iter(),
);
assert_eq!(sort_key, None);
// sort_key_ids is empty
let sort_key_ids = Some(SortedColumnSet::new(vec![]));
let sort_key = build_sort_key_from_sort_key_ids_and_columns(
&sort_key_ids,
columns.clone().into_iter(),
);
let vec: Vec<&str> = vec![];
assert_eq!(sort_key, Some(SortKey::from_columns(vec)));
// sort_key_ids include all columns and in the same order
let sort_key_ids = Some(SortedColumnSet::from([1, 2, 3]));
let sort_key = build_sort_key_from_sort_key_ids_and_columns(
&sort_key_ids,
columns.clone().into_iter(),
);
assert_eq!(
sort_key,
Some(SortKey::from_columns(vec!["uno", "dos", "tres"]))
);
// sort_key_ids include all columns but in different order
let sort_key_ids = Some(SortedColumnSet::from([2, 3, 1]));
let sort_key = build_sort_key_from_sort_key_ids_and_columns(
&sort_key_ids,
columns.clone().into_iter(),
);
assert_eq!(
sort_key,
Some(SortKey::from_columns(vec!["dos", "tres", "uno"]))
);
// sort_key_ids include some columns
let sort_key_ids = Some(SortedColumnSet::from([2, 3]));
let sort_key = build_sort_key_from_sort_key_ids_and_columns(
&sort_key_ids,
columns.clone().into_iter(),
);
assert_eq!(sort_key, Some(SortKey::from_columns(vec!["dos", "tres"])));
// sort_key_ids include some columns in different order
let sort_key_ids = Some(SortedColumnSet::from([3, 1]));
let sort_key = build_sort_key_from_sort_key_ids_and_columns(
&sort_key_ids,
columns.clone().into_iter(),
);
assert_eq!(sort_key, Some(SortKey::from_columns(vec!["tres", "uno"])));
}
}

View File

@ -413,6 +413,35 @@ pub(crate) async fn populate_catalog(
(ns_id, table_id)
}
pub(crate) async fn populate_catalog_with_table_columns(
catalog: &dyn Catalog,
namespace: &str,
table: &str,
columns: &[&str],
) -> (NamespaceId, TableId, Vec<ColumnId>) {
let mut c = catalog.repositories().await;
let ns_id = arbitrary_namespace(&mut *c, namespace).await.id;
let table_id = c
.tables()
.create(table, Default::default(), ns_id)
.await
.unwrap()
.id;
let mut column_ids = Vec::with_capacity(columns.len());
for column in columns {
column_ids.push(
c.columns()
.create_or_get(column, table_id, data_types::ColumnType::Tag)
.await
.expect("create column failed")
.id,
);
}
(ns_id, table_id, column_ids)
}
/// Assert `a` and `b` have identical metadata, and that when converting
/// them to Arrow batches they produces identical output.
#[track_caller]