feat: fill sort_key_ids when partition is inserted and updated (#8517)

* feat: read null sort_key_ids

* chore: clearer explanation about test strategy

* chore: Apply suggestions from code review

Co-authored-by: Marco Neumann <marco@crepererum.net>

* test: tests that add partition with NULL sort_key_ids

* feat: set sort_key_ids to empty array {} during partition insertion

* feat: initial step to update sort_key_ids

* chore: address review comments

* chore: remove unecessary comments and tests

* fix: typos

* chore: remove unecessary tests

* feat: continue the work of updating sort_key_ids

* fix: chec duplicates for SortedColumnSet

* test: tests for sort ley ids

* test: fix a test

* chore: remove unused comments

* chore: address first half of review comments and removing tests of tests

* chore: address review commnets for fetching colums in ingester

---------

Co-authored-by: Marco Neumann <marco@crepererum.net>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2023-08-21 10:26:57 -04:00 committed by GitHub
parent fbb2460c84
commit 3e98f7ea5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 503 additions and 148 deletions

View File

@ -42,7 +42,7 @@ use compactor::{
PartitionInfo,
};
use compactor_scheduler::SchedulerConfig;
use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId};
use data_types::{ColumnType, CompactionLevel, ParquetFile, SortedColumnSet, TableId};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion_util::config::register_iox_object_store;
use futures::TryStreamExt;
@ -98,17 +98,21 @@ impl TestSetupBuilder<false> {
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let tag1 = table.create_column("tag1", ColumnType::Tag).await;
let tag2 = table.create_column("tag2", ColumnType::Tag).await;
let tag3 = table.create_column("tag3", ColumnType::Tag).await;
let col_time = table.create_column("time", ColumnType::Time).await;
let partition = table.create_partition("2022-07-13").await;
// The sort key comes from the catalog and should be the union of all tags the
// ingester has seen
let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]);
let partition = partition.update_sort_key(sort_key.clone()).await;
let sort_key_col_ids =
SortedColumnSet::from([tag1.id(), tag2.id(), tag3.id(), col_time.id()]);
let partition = partition
.update_sort_key(sort_key.clone(), &sort_key_col_ids)
.await;
// Ensure the input scenario conforms to the expected invariants.
let invariant_check = Arc::new(CatalogInvariants {

View File

@ -90,6 +90,17 @@ impl ColumnsByName {
self.0.values().map(|c| c.id)
}
/// Return column ids of the given column names
/// Will panic if any of the names are not found
pub fn ids_for_names(&self, names: &[&str]) -> SortedColumnSet {
SortedColumnSet::from(names.iter().map(|name| {
self.get(name)
.unwrap_or_else(|| panic!("column name not found: {}", name))
.id
.get()
}))
}
/// Get a column by its name.
pub fn get(&self, name: &str) -> Option<&ColumnSchema> {
self.0.get(name)
@ -364,6 +375,11 @@ impl ColumnSet {
pub fn size(&self) -> usize {
std::mem::size_of_val(self) + (std::mem::size_of::<ColumnId>() * self.0.capacity())
}
/// The set is empty or not
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl From<ColumnSet> for Vec<ColumnId> {
@ -531,4 +547,61 @@ mod tests {
ColumnSchema::try_from(&proto).expect_err("should succeed");
}
#[test]
fn test_columns_by_names_exist() {
let columns = build_columns_by_names();
let ids = columns.ids_for_names(&["foo", "bar"]);
assert_eq!(ids, SortedColumnSet::from([1, 2]));
}
#[test]
fn test_columns_by_names_exist_different_order() {
let columns = build_columns_by_names();
let ids = columns.ids_for_names(&["bar", "foo"]);
assert_eq!(ids, SortedColumnSet::from([2, 1]));
}
#[test]
#[should_panic = "column name not found: baz"]
fn test_columns_by_names_not_exist() {
let columns = build_columns_by_names();
columns.ids_for_names(&["foo", "baz"]);
}
fn build_columns_by_names() -> ColumnsByName {
let mut columns: BTreeMap<String, ColumnSchema> = BTreeMap::new();
columns.insert(
"foo".to_string(),
ColumnSchema {
id: ColumnId::new(1),
column_type: ColumnType::I64,
},
);
columns.insert(
"bar".to_string(),
ColumnSchema {
id: ColumnId::new(2),
column_type: ColumnType::I64,
},
);
columns.insert(
"time".to_string(),
ColumnSchema {
id: ColumnId::new(3),
column_type: ColumnType::Time,
},
);
columns.insert(
"tag1".to_string(),
ColumnSchema {
id: ColumnId::new(4),
column_type: ColumnType::Tag,
},
);
ColumnsByName(columns)
}
}

View File

@ -467,6 +467,11 @@ impl Partition {
Some(SortKey::from_columns(self.sort_key.iter().map(|s| &**s)))
}
/// The sort_key_ids if present
pub fn sort_key_ids(&self) -> Option<&SortedColumnSet> {
self.sort_key_ids.as_ref()
}
}
#[cfg(test)]

View File

@ -50,6 +50,14 @@ message Partition {
repeated string array_sort_key = 6;
PartitionIdentifier identifier = 8;
// the sort key ids sort_key_ids for data in parquet files of this partition which
// is an array of column ids of the sort keys
optional SortKeyIds sort_key_ids = 9;
}
message SortKeyIds {
repeated int64 array_sort_key_ids = 1;
}
message GetPartitionsByTableIdRequest {

View File

@ -6,8 +6,9 @@ use data_types::{
partition_template::{
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, PARTITION_BY_DAY_PROTO,
},
ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceName, NamespaceNameError,
ParquetFileParams, Partition, Statistics, Table, TableId, Timestamp,
ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceName,
NamespaceNameError, ParquetFileParams, Partition, PartitionKey, SortedColumnSet, Statistics,
Table, TableId, Timestamp,
};
use generated_types::influxdata::iox::catalog::v1 as proto;
// ParquetFile as ProtoParquetFile, Partition as ProtoPartition,
@ -369,8 +370,10 @@ impl RemoteImporter {
let table_id = table.id;
debug!(%table_id, "Inserting catalog records into table");
let partition = self
.partition_for_parquet_file(repos.as_mut(), &table, &iox_metadata)
// Create a new partition
let partition_key = iox_metadata.partition_key.clone();
let mut partition = self
.create_partition(repos.as_mut(), &table, partition_key)
.await?;
// Note that for some reason, the object_store_id that is
@ -416,6 +419,11 @@ impl RemoteImporter {
}
};
// Update partition sort key
let partition = self
.update_partition(&mut partition, repos.as_mut(), &table, &iox_metadata)
.await?;
// Now copy the parquet files into the object store
let transition_partition_id = partition.transition_partition_id();
@ -478,25 +486,42 @@ impl RemoteImporter {
Ok(table)
}
/// Return the catalog [`Partition`] into which the specified parquet
/// Create the catalog [`Partition`] into which the specified parquet
/// file shoudl be inserted.
///
/// The sort_key and sort_key_ids of the partition should be empty when it is first created
/// because there are no columns in any parquet files to use for sorting yet.
/// The sort_key and sort_key_ids will be updated after the parquet files are created.
async fn create_partition(
&self,
repos: &mut dyn RepoCollection,
table: &Table,
partition_key: PartitionKey,
) -> Result<Partition> {
let partition = repos
.partitions()
.create_or_get(partition_key, table.id)
.await?;
Ok(partition)
}
/// Update sort keys of the partition
///
/// file shoudl be inserted.
///
/// First attempts to use any available metadata from the
/// catalog export, and falls back to what is in the iox
/// metadata stored in the parquet file, if needed
async fn partition_for_parquet_file(
async fn update_partition(
&self,
partition: &mut Partition,
repos: &mut dyn RepoCollection,
table: &Table,
iox_metadata: &IoxMetadata,
) -> Result<Partition> {
let partition_key = iox_metadata.partition_key.clone();
let partition = repos
.partitions()
.create_or_get(partition_key.clone(), table.id)
.await?;
// Note we use the table_id embedded in the file's metadata
// from the source catalog to match the exported catlog (which
// is dfferent than the new table we just created in the
@ -505,14 +530,24 @@ impl RemoteImporter {
.exported_contents
.partition_metadata(iox_metadata.table_id.get(), partition_key.inner());
let new_sort_key: Vec<&str> = if let Some(proto_partition) = proto_partition.as_ref() {
let (new_sort_key, new_sort_key_ids) = if let Some(proto_partition) =
proto_partition.as_ref()
{
// Use the sort key from the source catalog
debug!(array_sort_key=?proto_partition.array_sort_key, "Using sort key from catalog export");
proto_partition
let new_sort_key = proto_partition
.array_sort_key
.iter()
.map(|s| s.as_str())
.collect()
.collect::<Vec<&str>>();
let new_sort_key_ids = match &proto_partition.sort_key_ids {
Some(sort_key_ids) => sort_key_ids.array_sort_key_ids.clone(),
None => vec![],
};
let new_sort_key_ids = SortedColumnSet::from(new_sort_key_ids);
(new_sort_key, new_sort_key_ids)
} else {
warn!("Could not find sort key in catalog metadata export, falling back to embedded metadata");
let sort_key = iox_metadata
@ -520,7 +555,13 @@ impl RemoteImporter {
.as_ref()
.ok_or_else(|| Error::NoSortKey)?;
sort_key.to_columns().collect()
let new_sort_key = sort_key.to_columns().collect::<Vec<_>>();
// fecth table columns
let columns = ColumnsByName::new(repos.columns().list_by_table_id(table.id).await?);
let new_sort_key_ids = columns.ids_for_names(&new_sort_key);
(new_sort_key, new_sort_key_ids)
};
if !partition.sort_key.is_empty() && partition.sort_key != new_sort_key {
@ -536,6 +577,7 @@ impl RemoteImporter {
&partition.transition_partition_id(),
Some(partition.sort_key.clone()),
&new_sort_key,
&new_sort_key_ids,
)
.await;

View File

@ -3,8 +3,8 @@
use std::sync::Arc;
use data_types::{
sequence_number_set::SequenceNumberSet, NamespaceId, PartitionKey, SequenceNumber, TableId,
TimestampMinMax, TransitionPartitionId,
sequence_number_set::SequenceNumberSet, NamespaceId, PartitionKey, SequenceNumber,
SortedColumnSet, TableId, TimestampMinMax, TransitionPartitionId,
};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
@ -31,7 +31,7 @@ pub(crate) enum SortKeyState {
/// The [`SortKey`] has not yet been fetched from the catalog, and will be
/// lazy loaded (or loaded in the background) by a call to
/// [`DeferredLoad::get()`].
Deferred(Arc<DeferredLoad<Option<SortKey>>>),
Deferred(Arc<DeferredLoad<(Option<SortKey>, Option<SortedColumnSet>)>>),
/// The sort key is known and specified.
Provided(Option<SortKey>),
}
@ -39,7 +39,7 @@ pub(crate) enum SortKeyState {
impl SortKeyState {
pub(crate) async fn get(&self) -> Option<SortKey> {
match self {
Self::Deferred(v) => v.get().await,
Self::Deferred(v) => v.get().await.0,
Self::Provided(v) => v.clone(),
}
}
@ -423,6 +423,7 @@ mod tests {
use arrow_util::assert_batches_eq;
use assert_matches::assert_matches;
use backoff::BackoffConfig;
use data_types::SortedColumnSet;
use datafusion::{
physical_expr::PhysicalSortExpr,
physical_plan::{expressions::col, memory::MemoryExec, ExecutionPlan},
@ -1003,18 +1004,26 @@ mod tests {
.create_or_get(partition_key.clone(), table_id)
.await
.expect("should create");
// Test: sort_key_ids from create_or_get
assert!(partition.sort_key_ids.is_none());
// Test: sort_key_ids from create_or_get which is empty
assert!(partition.sort_key_ids().unwrap().is_empty());
let updated_partition = catalog
.repositories()
.await
.partitions()
.cas_sort_key(&partition.transition_partition_id(), None, &["terrific"])
.cas_sort_key(
&partition.transition_partition_id(),
None,
&["terrific"],
&SortedColumnSet::from([1]),
)
.await
.unwrap();
// Test: sort_key_ids after updating
assert!(updated_partition.sort_key_ids.is_none());
assert_eq!(
updated_partition.sort_key_ids(),
Some(&SortedColumnSet::from([1]))
);
// Read the just-created sort key (None)
let fetcher = Arc::new(DeferredLoad::new(

View File

@ -216,6 +216,7 @@ mod tests {
// Harmless in tests - saves a bunch of extra vars.
#![allow(clippy::await_holding_lock)]
use data_types::SortedColumnSet;
use iox_catalog::mem::MemCatalog;
use super::*;
@ -288,7 +289,7 @@ mod tests {
ARBITRARY_TABLE_ID,
stored_partition_key.clone(),
vec!["dos".to_string(), "bananas".to_string()],
None,
Some(SortedColumnSet::from([1, 2])),
Default::default(),
);

View File

@ -3,7 +3,7 @@
use std::sync::Arc;
use backoff::{Backoff, BackoffConfig};
use data_types::{PartitionKey, TableId};
use data_types::{PartitionKey, SortedColumnSet, TableId};
use iox_catalog::interface::Catalog;
use schema::sort::SortKey;
@ -31,19 +31,20 @@ impl SortKeyResolver {
}
}
/// Fetch the [`SortKey`] from the [`Catalog`] for `partition_id`, retrying
/// endlessly when errors occur.
pub(crate) async fn fetch(self) -> Option<SortKey> {
/// Fetch the [`SortKey`] and its corresponding sort key ids from the from the [`Catalog`]
/// for `partition_id`, retrying endlessly when errors occur.
pub(crate) async fn fetch(self) -> (Option<SortKey>, Option<SortedColumnSet>) {
Backoff::new(&self.backoff_config)
.retry_all_errors("fetch partition sort key", || async {
let mut repos = self.catalog.repositories().await;
let s = repos
let partition = repos
.partitions()
.create_or_get(self.partition_key.clone(), self.table_id)
.await?
.sort_key();
.await?;
Result::<_, iox_catalog::interface::Error>::Ok(s)
let (sort_key, sort_key_ids) = (partition.sort_key(), partition.sort_key_ids);
Result::<_, iox_catalog::interface::Error>::Ok((sort_key, sort_key_ids))
})
.await
.expect("retry forever")
@ -54,6 +55,8 @@ impl SortKeyResolver {
mod tests {
use std::sync::Arc;
use data_types::SortedColumnSet;
use super::*;
use crate::test_util::populate_catalog;
@ -78,8 +81,9 @@ mod tests {
.create_or_get(PARTITION_KEY.into(), table_id)
.await
.expect("should create");
// Test: sort_key_ids from create_or_get
assert!(partition.sort_key_ids.is_none());
// Test: sort_key_ids from create_or_get which is empty
assert!(partition.sort_key_ids().unwrap().is_empty());
let fetcher = SortKeyResolver::new(
PARTITION_KEY.into(),
@ -97,14 +101,15 @@ mod tests {
&partition.transition_partition_id(),
None,
&["uno", "dos", "bananas"],
&SortedColumnSet::from([1, 2, 3]),
)
.await
.expect("should update existing partition key");
let fetched = fetcher.fetch().await;
assert_eq!(fetched, catalog_state.sort_key());
// Test: sort_key_ids after updating
assert!(catalog_state.sort_key_ids.is_none());
// Test: sort_key_ids from cas_sort_key
// fetch sort key for the partition from the catalog
let (fetched_sort_key, fetched_sort_key_ids) = fetcher.fetch().await;
assert_eq!(fetched_sort_key, catalog_state.sort_key());
assert_eq!(fetched_sort_key_ids, catalog_state.sort_key_ids);
}
}

View File

@ -418,7 +418,13 @@ impl PersistQueue for PersistHandle {
// queries to at most `n_workers`.
let sort_key = match partition.lock().sort_key() {
SortKeyState::Deferred(v) => v.peek().flatten(),
SortKeyState::Deferred(v) => {
let sort_key = v.peek();
match sort_key {
None => None,
Some((sort_key, _sort_key_ids)) => sort_key,
}
}
SortKeyState::Provided(v) => v.as_ref().cloned(),
};
@ -475,6 +481,7 @@ mod tests {
use std::{sync::Arc, task::Poll, time::Duration};
use assert_matches::assert_matches;
use data_types::SortedColumnSet;
use futures::Future;
use iox_catalog::mem::MemCatalog;
use object_store::memory::InMemory;
@ -648,7 +655,7 @@ mod tests {
// Generate a partition with a resolved, but empty sort key.
let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { None },
async { (None, None) },
&metrics,
))))
.await;
@ -736,7 +743,12 @@ mod tests {
// the data within the partition's buffer.
let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { Some(SortKey::from_columns(["time", "some-other-column"])) },
async {
(
Some(SortKey::from_columns(["time", "some-other-column"])),
Some(SortedColumnSet::from([1, 2])),
)
},
&metrics,
))))
.await;
@ -823,7 +835,12 @@ mod tests {
// the data within the partition's buffer.
let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { Some(SortKey::from_columns(["time", "good"])) },
async {
(
Some(SortKey::from_columns(["time", "good"])),
Some(SortedColumnSet::from([1, 2])),
)
},
&metrics,
))))
.await;
@ -904,7 +921,12 @@ mod tests {
// Generate a partition
let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { Some(SortKey::from_columns(["time", "good"])) },
async {
(
Some(SortKey::from_columns(["time", "good"])),
Some(SortedColumnSet::from([1, 2])),
)
},
&metrics,
))))
.await;
@ -916,7 +938,12 @@ mod tests {
// Generate a second partition
let p = new_partition(SortKeyState::Deferred(Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { Some(SortKey::from_columns(["time", "good"])) },
async {
(
Some(SortKey::from_columns(["time", "good"])),
Some(SortedColumnSet::from([1, 2])),
)
},
&metrics,
))))
.await;

View File

@ -16,7 +16,7 @@ mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::{CompactionLevel, ParquetFile};
use data_types::{CompactionLevel, ParquetFile, SortedColumnSet};
use futures::TryStreamExt;
use iox_catalog::{
interface::{get_schema_by_id, Catalog, SoftDeletedRows},
@ -345,12 +345,18 @@ mod tests {
.cas_sort_key(
&partition_id,
None,
&["bananas", "are", "good", "for", "you"],
// must use column names that exist in the partition data
&["region"],
// column id of region
&SortedColumnSet::from([2]),
)
.await
.expect("failed to set catalog sort key");
// Test: sort_key_ids after updating
assert!(updated_partition.sort_key_ids.is_none());
assert_eq!(
updated_partition.sort_key_ids(),
Some(&SortedColumnSet::from([2]))
);
// Enqueue the persist job
let notify = handle.enqueue(Arc::clone(&partition), data).await;
@ -382,10 +388,11 @@ mod tests {
// mark_persisted() was called.
assert_eq!(partition.lock().completed_persistence_count(), 1);
// Assert the sort key was also updated, adding the new columns to the
// Assert the sort key was also updated, adding the new columns (time) to the
// end of the concurrently updated catalog sort key.
assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(Some(p)) => {
assert_eq!(p.to_columns().collect::<Vec<_>>(), &["bananas", "are", "good", "for", "you", "region", "time"]);
// Before there is only ["region"] (manual sort key update above). Now ["region", "time"]
assert_eq!(p.to_columns().collect::<Vec<_>>(), &["region", "time"]);
});
// Ensure a file was made visible in the catalog

View File

@ -2,7 +2,7 @@ use std::{ops::ControlFlow, sync::Arc};
use async_channel::RecvError;
use backoff::Backoff;
use data_types::{CompactionLevel, ParquetFileParams};
use data_types::{ColumnsByName, CompactionLevel, ParquetFileParams};
use iox_catalog::interface::{get_table_columns_by_id, CasFailure, Catalog};
use iox_query::exec::Executor;
use iox_time::{SystemProvider, TimeProvider};
@ -174,8 +174,15 @@ async fn compact_and_upload<O>(
where
O: Send + Sync,
{
let compacted = compact(ctx, worker_state).await;
let (sort_key_update, parquet_table_data) = upload(ctx, worker_state, compacted).await;
// load sort key
let sort_key = ctx.sort_key().get().await;
// fetch column map
// THIS MUST BE DONE AFTER THE SORT KEY IS LOADED
let (sort_key, columns) = fetch_column_map(ctx, worker_state, sort_key).await?;
let compacted = compact(ctx, worker_state, sort_key).await;
let (sort_key_update, parquet_table_data) =
upload(ctx, worker_state, compacted, &columns).await;
if let Some(update) = sort_key_update {
update_catalog_sort_key(
@ -183,6 +190,7 @@ where
worker_state,
update,
parquet_table_data.object_store_id,
&columns,
)
.await?
}
@ -192,12 +200,14 @@ where
/// Compact the data in `ctx` using sorted by the sort key returned from
/// [`Context::sort_key()`].
async fn compact<O>(ctx: &Context, worker_state: &SharedWorkerState<O>) -> CompactedStream
async fn compact<O>(
ctx: &Context,
worker_state: &SharedWorkerState<O>,
sort_key: Option<SortKey>,
) -> CompactedStream
where
O: Send + Sync,
{
let sort_key = ctx.sort_key().get().await;
debug!(
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
@ -231,6 +241,7 @@ async fn upload<O>(
ctx: &Context,
worker_state: &SharedWorkerState<O>,
compacted: CompactedStream,
columns: &ColumnsByName,
) -> (Option<SortKey>, ParquetFileParams)
where
O: Send + Sync,
@ -294,15 +305,6 @@ where
"partition parquet uploaded"
);
// Read the table's columns from the catalog to get a map of column name -> column IDs.
let columns = Backoff::new(&Default::default())
.retry_all_errors("get table schema", || async {
let mut repos = worker_state.catalog.repositories().await;
get_table_columns_by_id(ctx.table_id(), repos.as_mut()).await
})
.await
.expect("retry forever");
// Build the data that must be inserted into the parquet_files catalog
// table in order to make the file visible to queriers.
let parquet_table_data =
@ -321,6 +323,45 @@ where
(catalog_sort_key_update, parquet_table_data)
}
/// Fetch the table column map from the catalog and verify if they contain all columns in the sort key
async fn fetch_column_map<O>(
ctx: &Context,
worker_state: &SharedWorkerState<O>,
// NOTE: CALLER MUST LOAD SORT KEY BEFORE CALLING THIS FUNCTION EVEN IF THE sort key IS NONE.
// THIS IS A MUST TO GUARANTEE THE RETURNED COLUMN MAP CONTAINS ALL COLUMNS IN THE SORT KEY
// The purpose to put the sort_key as a param here is to make sure the caller has already loaded the sort key
// and the same sort_key is returned
sort_key: Option<SortKey>,
) -> Result<(Option<SortKey>, ColumnsByName), PersistError>
where
O: Send + Sync,
{
// Read the table's columns from the catalog to get a map of column name -> column IDs.
let column_map = Backoff::new(&Default::default())
.retry_all_errors("get table schema", || async {
let mut repos = worker_state.catalog.repositories().await;
get_table_columns_by_id(ctx.table_id(), repos.as_mut()).await
})
.await
.expect("retry forever");
// Verify that the sort key columns are in the column map
if let Some(sort_key) = &sort_key {
for sort_key_column in sort_key.to_columns() {
if !column_map.contains_column_name(sort_key_column) {
panic!(
"sort key column {} of partition id {} is not in the column map {:?}",
sort_key_column,
ctx.partition_id(),
column_map
);
}
}
}
Ok((sort_key, column_map))
}
/// Update the sort key value stored in the catalog for this [`Context`].
///
/// # Concurrent Updates
@ -333,6 +374,7 @@ async fn update_catalog_sort_key<O>(
worker_state: &SharedWorkerState<O>,
new_sort_key: SortKey,
object_store_id: Uuid,
columns: &ColumnsByName,
) -> Result<(), PersistError>
where
O: Send + Sync,
@ -360,13 +402,19 @@ where
.retry_with_backoff("cas_sort_key", || {
let old_sort_key = old_sort_key.clone();
let new_sort_key_str = new_sort_key.to_columns().collect::<Vec<_>>();
let new_sort_key_colids = columns.ids_for_names(&new_sort_key_str);
let catalog = Arc::clone(&worker_state.catalog);
let ctx = &ctx;
async move {
let mut repos = catalog.repositories().await;
match repos
.partitions()
.cas_sort_key(ctx.partition_id(), old_sort_key.clone(), &new_sort_key_str)
.cas_sort_key(
ctx.partition_id(),
old_sort_key.clone(),
&new_sort_key_str,
&new_sort_key_colids,
)
.await
{
Ok(_) => ControlFlow::Break(Ok(())),
@ -389,7 +437,8 @@ where
partition_key = %ctx.partition_key(),
expected=?old_sort_key,
?observed,
update=?new_sort_key_str,
update_sort_key=?new_sort_key_str,
update_sort_key_ids=?new_sort_key_colids,
"detected matching concurrent sort key update"
);
ControlFlow::Break(Ok(()))
@ -415,7 +464,8 @@ where
partition_key = %ctx.partition_key(),
expected=?old_sort_key,
?observed,
update=?new_sort_key_str,
update_sort_key=?new_sort_key_str,
update_sort_key_ids=?new_sort_key_colids,
"detected concurrent sort key update, regenerating parquet"
);
// Stop the retry loop with an error containing the

View File

@ -6,7 +6,7 @@ use data_types::{
Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction,
Table, TableId, TableSchema, Timestamp, TransitionPartitionId,
SortedColumnSet, Table, TableId, TableSchema, Timestamp, TransitionPartitionId,
};
use iox_time::TimeProvider;
use snafu::{OptionExt, Snafu};
@ -408,11 +408,15 @@ pub trait PartitionRepo: Send + Sync {
/// Implementations are allowed to spuriously return
/// [`CasFailure::ValueMismatch`] for performance reasons in the presence of
/// concurrent writers.
// TODO: After the sort_key_ids field is converetd into NOT NULL, the implementation of this function
// must be changed to compare old_sort_key_ids with the existing sort_key_ids instead of
// comparing old_sort_key with existing sort_key
async fn cas_sort_key(
&mut self,
partition_id: &TransitionPartitionId,
old_sort_key: Option<Vec<String>>,
new_sort_key: &[&str],
new_sort_key_ids: &SortedColumnSet,
) -> Result<Partition, CasFailure<Vec<String>>>;
/// Record an instance of a partition being selected for compaction but compaction was not
@ -713,6 +717,17 @@ pub async fn list_schemas(
Ok(iter)
}
/// panic if sort_key and sort_key_ids have different lengths
pub(crate) fn verify_sort_key_length(sort_key: &[&str], sort_key_ids: &SortedColumnSet) {
assert_eq!(
sort_key.len(),
sort_key_ids.len(),
"sort_key {:?} and sort_key_ids {:?} are not the same length",
sort_key,
sort_key_ids
);
}
#[cfg(test)]
pub(crate) mod test_helpers {
use crate::{
@ -1498,7 +1513,7 @@ pub(crate) mod test_helpers {
.await
.expect("failed to create partition");
// Test: sort_key_ids from create_or_get
assert!(partition.sort_key_ids.is_none());
assert!(partition.sort_key_ids().unwrap().is_empty());
created.insert(partition.id, partition.clone());
// partition to use
let partition_bar = repos
@ -1572,7 +1587,7 @@ pub(crate) mod test_helpers {
batch.sort_by_key(|p| p.id);
assert_eq!(created_sorted, batch);
// Test: sort_key_ids from get_by_id_batch
assert!(batch.iter().all(|p| p.sort_key_ids.is_none()));
assert!(batch.iter().all(|p| p.sort_key_ids().unwrap().is_empty()));
let mut batch = repos
.partitions()
.get_by_hash_id_batch(
@ -1586,7 +1601,7 @@ pub(crate) mod test_helpers {
.unwrap();
batch.sort_by_key(|p| p.id);
// Test: sort_key_ids from get_by_hash_id_batch
assert!(batch.iter().all(|p| p.sort_key_ids.is_none()));
assert!(batch.iter().all(|p| p.sort_key_ids().unwrap().is_empty()));
assert_eq!(created_sorted, batch);
let listed = repos
@ -1598,7 +1613,9 @@ pub(crate) mod test_helpers {
.map(|v| (v.id, v))
.collect::<BTreeMap<_, _>>();
// Test: sort_key_ids from list_by_table_id
assert!(listed.values().all(|p| p.sort_key_ids.is_none()));
assert!(listed
.values()
.all(|p| p.sort_key_ids().unwrap().is_empty()));
assert_eq!(created, listed);
@ -1631,11 +1648,17 @@ pub(crate) mod test_helpers {
&to_skip_partition.transition_partition_id(),
None,
&["tag2", "tag1", "time"],
&SortedColumnSet::from([2, 1, 3]),
)
.await
.unwrap();
// Test: sort_key_ids after updating from cas_sort_key
assert!(updated_partition.sort_key_ids.is_none());
// verify sort key and sort key ids are updated
assert_eq!(updated_partition.sort_key, &["tag2", "tag1", "time"]);
assert_eq!(
updated_partition.sort_key_ids,
Some(SortedColumnSet::from([2, 1, 3]))
);
// test sort key CAS with an incorrect value
let err = repos
@ -1644,6 +1667,7 @@ pub(crate) mod test_helpers {
&to_skip_partition.transition_partition_id(),
Some(["bananas".to_string()].to_vec()),
&["tag2", "tag1", "tag3 , with comma", "time"],
&SortedColumnSet::from([1, 2, 3, 4]),
)
.await
.expect_err("CAS with incorrect value should fail");
@ -1658,12 +1682,15 @@ pub(crate) mod test_helpers {
.await
.unwrap()
.unwrap();
// still has the old sort key
assert_eq!(
updated_other_partition.sort_key,
vec!["tag2", "tag1", "time"]
);
// Test: sort_key_ids from get_by_id
assert!(updated_other_partition.sort_key_ids.is_none());
assert_eq!(
updated_other_partition.sort_key_ids,
Some(SortedColumnSet::from([2, 1, 3]))
);
let updated_other_partition = repos
.partitions()
@ -1676,7 +1703,10 @@ pub(crate) mod test_helpers {
vec!["tag2", "tag1", "time"]
);
// Test: sort_key_ids from get_by_hash_id
assert!(updated_other_partition.sort_key_ids.is_none());
assert_eq!(
updated_other_partition.sort_key_ids,
Some(SortedColumnSet::from([2, 1, 3]))
);
// test sort key CAS with no value
let err = repos
@ -1685,6 +1715,7 @@ pub(crate) mod test_helpers {
&to_skip_partition.transition_partition_id(),
None,
&["tag2", "tag1", "tag3 , with comma", "time"],
&SortedColumnSet::from([1, 2, 3, 4]),
)
.await
.expect_err("CAS with incorrect value should fail");
@ -1699,6 +1730,7 @@ pub(crate) mod test_helpers {
&to_skip_partition.transition_partition_id(),
Some(["bananas".to_string()].to_vec()),
&["tag2", "tag1", "tag3 , with comma", "time"],
&SortedColumnSet::from([1, 2, 3, 4]),
)
.await
.expect_err("CAS with incorrect value should fail");
@ -1718,11 +1750,18 @@ pub(crate) mod test_helpers {
.collect(),
),
&["tag2", "tag1", "tag3 , with comma", "time"],
&SortedColumnSet::from([2, 1, 4, 3]),
)
.await
.unwrap();
// Test: sort_key_ids afer updating from cas_sort_key
assert!(updated_partition.sort_key_ids.is_none());
assert_eq!(
updated_partition.sort_key,
vec!["tag2", "tag1", "tag3 , with comma", "time"]
);
assert_eq!(
updated_partition.sort_key_ids,
Some(SortedColumnSet::from([2, 1, 4, 3]))
);
// test getting the new sort key
let updated_partition = repos
@ -1735,8 +1774,10 @@ pub(crate) mod test_helpers {
updated_partition.sort_key,
vec!["tag2", "tag1", "tag3 , with comma", "time"]
);
// Test: sort_key_ids from get_by_id after after updating
assert!(updated_partition.sort_key_ids.is_none());
assert_eq!(
updated_partition.sort_key_ids,
Some(SortedColumnSet::from([2, 1, 4, 3]))
);
let updated_partition = repos
.partitions()
@ -1748,8 +1789,10 @@ pub(crate) mod test_helpers {
updated_partition.sort_key,
vec!["tag2", "tag1", "tag3 , with comma", "time"]
);
// sort_key_ids gotten back from the udpate is still null
assert!(updated_partition.sort_key_ids.is_none());
assert_eq!(
updated_partition.sort_key_ids,
Some(SortedColumnSet::from([2, 1, 4, 3]))
);
// The compactor can log why compaction was skipped
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
@ -1931,8 +1974,32 @@ pub(crate) mod test_helpers {
.await
.expect("should list most recent");
assert_eq!(recent.len(), 4);
// Test: sort_key_ids from most_recent_n
assert!(recent.iter().all(|p| p.sort_key_ids.is_none()));
// Only the second one has vallues, the other 3 are empty
let empty_vec_string: Vec<String> = vec![];
assert_eq!(recent[0].sort_key, empty_vec_string);
assert_eq!(recent[0].sort_key_ids, Some(SortedColumnSet::from(vec![])));
assert_eq!(
recent[1].sort_key,
vec![
"tag2".to_string(),
"tag1".to_string(),
"tag3 , with comma".to_string(),
"time".to_string()
]
);
assert_eq!(
recent[1].sort_key_ids,
Some(SortedColumnSet::from(vec![2, 1, 4, 3]))
);
assert_eq!(recent[2].sort_key, empty_vec_string);
assert_eq!(recent[2].sort_key_ids, Some(SortedColumnSet::from(vec![])));
assert_eq!(recent[3].sort_key, empty_vec_string);
assert_eq!(recent[3].sort_key_ids, Some(SortedColumnSet::from(vec![])));
let recent = repos
.partitions()

View File

@ -1,7 +1,7 @@
//! This module implements an in-memory implementation of the iox_catalog interface. It can be
//! used for testing or for an IOx designed to run without catalog persistence.
use crate::interface::MAX_PARQUET_FILES_SELECTED_ONCE_FOR_DELETE;
use crate::interface::{verify_sort_key_length, MAX_PARQUET_FILES_SELECTED_ONCE_FOR_DELETE};
use crate::{
interface::{
CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo,
@ -12,6 +12,7 @@ use crate::{
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
};
use async_trait::async_trait;
use data_types::SortedColumnSet;
use data_types::{
partition_template::{
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
@ -566,7 +567,7 @@ impl PartitionRepo for MemTxn {
table_id,
key,
vec![],
None,
Some(SortedColumnSet::new(vec![])),
None,
);
stage.partitions.push(p);
@ -662,7 +663,10 @@ impl PartitionRepo for MemTxn {
partition_id: &TransitionPartitionId,
old_sort_key: Option<Vec<String>>,
new_sort_key: &[&str],
new_sort_key_ids: &SortedColumnSet,
) -> Result<Partition, CasFailure<Vec<String>>> {
verify_sort_key_length(new_sort_key, new_sort_key_ids);
let stage = self.stage();
let old_sort_key = old_sort_key.unwrap_or_default();
@ -674,6 +678,7 @@ impl PartitionRepo for MemTxn {
}) {
Some(p) if p.sort_key == old_sort_key => {
p.sort_key = new_sort_key.iter().map(|s| s.to_string()).collect();
p.sort_key_ids = Some(new_sort_key_ids.clone());
Ok(p.clone())
}
Some(p) => return Err(CasFailure::ValueMismatch(p.sort_key.clone())),

View File

@ -9,8 +9,8 @@ use data_types::{
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp, TransitionPartitionId,
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, SortedColumnSet,
Table, TableId, Timestamp, TransitionPartitionId,
};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -176,7 +176,7 @@ decorate!(
"partition_get_by_hash_id_batch" = get_by_hash_id_batch(&mut self, partition_hash_ids: &[&PartitionHashId]) -> Result<Vec<Partition>>;
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
"partition_list_ids" = list_ids(&mut self) -> Result<Vec<PartitionId>>;
"partition_update_sort_key" = cas_sort_key(&mut self, partition_id: &TransitionPartitionId, old_sort_key: Option<Vec<String>>, new_sort_key: &[&str]) -> Result<Partition, CasFailure<Vec<String>>>;
"partition_update_sort_key" = cas_sort_key(&mut self, partition_id: &TransitionPartitionId, old_sort_key: Option<Vec<String>>, new_sort_key: &[&str], new_sort_key_ids: &SortedColumnSet) -> Result<Partition, CasFailure<Vec<String>>>;
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>;
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
"partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;

View File

@ -1,6 +1,6 @@
//! A Postgres backed implementation of the Catalog
use crate::interface::MAX_PARQUET_FILES_SELECTED_ONCE_FOR_DELETE;
use crate::interface::{verify_sort_key_length, MAX_PARQUET_FILES_SELECTED_ONCE_FOR_DELETE};
use crate::{
interface::{
self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo,
@ -16,6 +16,7 @@ use crate::{
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
};
use async_trait::async_trait;
use data_types::SortedColumnSet;
use data_types::{
partition_template::{
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
@ -1159,9 +1160,9 @@ impl PartitionRepo for PostgresTxn {
let v = sqlx::query_as::<_, Partition>(
r#"
INSERT INTO partition
(partition_key, shard_id, table_id, hash_id, sort_key)
(partition_key, shard_id, table_id, hash_id, sort_key, sort_key_ids)
VALUES
( $1, $2, $3, $4, '{}')
( $1, $2, $3, $4, '{}', '{}')
ON CONFLICT ON CONSTRAINT partition_key_unique
DO UPDATE SET partition_key = partition.partition_key
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
@ -1301,32 +1302,37 @@ WHERE table_id = $1;
partition_id: &TransitionPartitionId,
old_sort_key: Option<Vec<String>>,
new_sort_key: &[&str],
new_sort_key_ids: &SortedColumnSet,
) -> Result<Partition, CasFailure<Vec<String>>> {
verify_sort_key_length(new_sort_key, new_sort_key_ids);
let old_sort_key = old_sort_key.unwrap_or_default();
// This `match` will go away when all partitions have hash IDs in the database.
let query = match partition_id {
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, Partition>(
r#"
UPDATE partition
SET sort_key = $1
SET sort_key = $1, sort_key_ids = $4
WHERE hash_id = $2 AND sort_key = $3
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
"#,
)
.bind(new_sort_key) // $1
.bind(hash_id) // $2
.bind(&old_sort_key), // $3
.bind(&old_sort_key) // $3
.bind(new_sort_key_ids), // $4
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, Partition>(
r#"
UPDATE partition
SET sort_key = $1
SET sort_key = $1, sort_key_ids = $4
WHERE id = $2 AND sort_key = $3
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
"#,
)
.bind(new_sort_key) // $1
.bind(id) // $2
.bind(&old_sort_key), // $3
.bind(&old_sort_key) // $3
.bind(new_sort_key_ids), // $4
};
let res = query.fetch_one(&mut self.inner).await;
@ -1362,6 +1368,7 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file
?partition_id,
?old_sort_key,
?new_sort_key,
?new_sort_key_ids,
"partition sort key cas successful"
);
@ -2173,7 +2180,7 @@ mod tests {
assert_eq!(a.hash_id().unwrap(), &hash_id);
// Test: sort_key_ids from partition_create_or_get_idempotent
assert!(a.sort_key_ids.is_none());
assert!(a.sort_key_ids().unwrap().is_empty());
// Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent.
let b = repos
@ -2194,8 +2201,9 @@ mod tests {
.unwrap();
assert_eq!(table_partitions.len(), 1);
assert_eq!(table_partitions[0].hash_id().unwrap(), &hash_id);
// Test: sort_key_ids from partition_create_or_get_idempotent
assert!(table_partitions[0].sort_key_ids.is_none());
assert!(table_partitions[0].sort_key_ids().unwrap().is_empty());
}
#[tokio::test]
@ -2217,9 +2225,9 @@ mod tests {
sqlx::query(
r#"
INSERT INTO partition
(partition_key, shard_id, table_id, sort_key)
(partition_key, shard_id, table_id, sort_key, sort_key_ids)
VALUES
( $1, $2, $3, '{}')
( $1, $2, $3, '{}', '{}')
ON CONFLICT ON CONSTRAINT partition_key_unique
DO UPDATE SET partition_key = partition.partition_key
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
@ -2246,6 +2254,9 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file
.await
.expect("idempotent write should succeed");
// Test: sort_key_ids from freshly insert with empty value
assert!(inserted_again.sort_key_ids().unwrap().is_empty());
assert_eq!(partition, &inserted_again);
// Create a Parquet file record in this partition to ensure we don't break new data

View File

@ -2,9 +2,9 @@
use crate::{
interface::{
self, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu, Error, NamespaceRepo,
ParquetFileRepo, PartitionRepo, RepoCollection, Result, SoftDeletedRows, TableRepo,
MAX_PARQUET_FILES_SELECTED_ONCE_FOR_RETENTION,
self, verify_sort_key_length, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu,
Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, RepoCollection, Result,
SoftDeletedRows, TableRepo, MAX_PARQUET_FILES_SELECTED_ONCE_FOR_RETENTION,
},
kafkaless_transition::{
SHARED_QUERY_POOL, SHARED_QUERY_POOL_ID, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
@ -852,9 +852,9 @@ impl PartitionRepo for SqliteTxn {
let v = sqlx::query_as::<_, PartitionPod>(
r#"
INSERT INTO partition
(partition_key, shard_id, table_id, hash_id, sort_key)
(partition_key, shard_id, table_id, hash_id, sort_key, sort_key_ids)
VALUES
($1, $2, $3, $4, '[]')
($1, $2, $3, $4, '[]', '[]')
ON CONFLICT (table_id, partition_key)
DO UPDATE SET partition_key = partition.partition_key
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
@ -1012,33 +1012,39 @@ WHERE table_id = $1;
partition_id: &TransitionPartitionId,
old_sort_key: Option<Vec<String>>,
new_sort_key: &[&str],
new_sort_key_ids: &SortedColumnSet,
) -> Result<Partition, CasFailure<Vec<String>>> {
verify_sort_key_length(new_sort_key, new_sort_key_ids);
let old_sort_key = old_sort_key.unwrap_or_default();
let raw_new_sort_key_ids: Vec<_> = new_sort_key_ids.iter().map(|cid| cid.get()).collect();
// This `match` will go away when all partitions have hash IDs in the database.
let query = match partition_id {
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, PartitionPod>(
r#"
UPDATE partition
SET sort_key = $1
SET sort_key = $1, sort_key_ids = $4
WHERE hash_id = $2 AND sort_key = $3
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
"#,
)
.bind(Json(new_sort_key)) // $1
.bind(hash_id) // $2
.bind(Json(&old_sort_key)), // $3
.bind(Json(&old_sort_key)) // $3
.bind(Json(&raw_new_sort_key_ids)), // $4
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, PartitionPod>(
r#"
UPDATE partition
SET sort_key = $1
SET sort_key = $1, sort_key_ids = $4
WHERE id = $2 AND sort_key = $3
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
"#,
)
.bind(Json(new_sort_key)) // $1
.bind(id) // $2
.bind(Json(&old_sort_key)), // $3
.bind(Json(&old_sort_key)) // $3
.bind(Json(&raw_new_sort_key_ids)), // $4
};
let res = query.fetch_one(self.inner.get_mut()).await;
@ -1779,13 +1785,14 @@ mod tests {
.unwrap();
assert_eq!(table_partitions.len(), 1);
assert_eq!(table_partitions[0].hash_id().unwrap(), &hash_id);
// Test: sort_key_ids from partition_create_or_get_idempotent
assert!(table_partitions[0].sort_key_ids.is_none());
assert!(table_partitions[0].sort_key_ids().unwrap().is_empty());
}
#[tokio::test]
async fn existing_partitions_without_hash_id() {
let sqlite = setup_db().await;
let sqlite: SqliteCatalog = setup_db().await;
let pool = sqlite.pool.clone();
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
let mut repos = sqlite.repositories().await;
@ -1800,9 +1807,9 @@ mod tests {
sqlx::query(
r#"
INSERT INTO partition
(partition_key, shard_id, table_id, sort_key)
(partition_key, shard_id, table_id, sort_key, sort_key_ids)
VALUES
($1, $2, $3, '[]')
($1, $2, $3, '[]', '[]')
ON CONFLICT (table_id, partition_key)
DO UPDATE SET partition_key = partition.partition_key
RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file_at;
@ -1828,6 +1835,9 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, sort_key_ids, new_file
.await
.expect("idempotent write should succeed");
// Test: sort_key_ids from freshly insert with empty value
assert!(inserted_again.sort_key_ids().unwrap().is_empty());
assert_eq!(partition, &inserted_again);
// Create a Parquet file record in this partition to ensure we don't break new data

View File

@ -7,8 +7,8 @@ use arrow::{
use data_types::{
partition_template::TablePartitionTemplateOverride, Column, ColumnSet, ColumnType,
ColumnsByName, CompactionLevel, Namespace, NamespaceName, NamespaceSchema, ParquetFile,
ParquetFileParams, Partition, PartitionId, Table, TableId, TableSchema, Timestamp,
TransitionPartitionId,
ParquetFileParams, Partition, PartitionId, SortedColumnSet, Table, TableId, TableSchema,
Timestamp, TransitionPartitionId,
};
use datafusion::physical_plan::metrics::Count;
use datafusion_util::{unbounded_memory_pool, MemoryStream};
@ -318,6 +318,7 @@ impl TestTable {
self: &Arc<Self>,
key: &str,
sort_key: &[&str],
sort_key_ids: &[i64],
) -> Arc<TestPartition> {
let mut repos = self.catalog.catalog.repositories().await;
@ -333,11 +334,10 @@ impl TestTable {
&TransitionPartitionId::Deprecated(partition.id),
None,
sort_key,
&SortedColumnSet::from(sort_key_ids.iter().cloned()),
)
.await
.unwrap();
// Test: sort_key_ids after updating
assert!(partition.sort_key_ids.is_none());
Arc::new(TestPartition {
catalog: Arc::clone(&self.catalog),
@ -427,6 +427,12 @@ pub struct TestColumn {
pub column: Column,
}
impl TestColumn {
pub fn id(&self) -> i64 {
self.column.id.get()
}
}
/// A test catalog with specified namespace, table, partition
#[allow(missing_docs)]
#[derive(Debug)]
@ -439,7 +445,11 @@ pub struct TestPartition {
impl TestPartition {
/// Update sort key.
pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Arc<Self> {
pub async fn update_sort_key(
self: &Arc<Self>,
sort_key: SortKey,
sort_key_ids: &SortedColumnSet,
) -> Arc<Self> {
let old_sort_key = partition_lookup(
self.catalog.catalog.repositories().await.as_mut(),
&self.partition.transition_partition_id(),
@ -459,11 +469,10 @@ impl TestPartition {
&self.partition.transition_partition_id(),
Some(old_sort_key),
&sort_key.to_columns().collect::<Vec<_>>(),
sort_key_ids,
)
.await
.unwrap();
// Test: sort_key_ids after updating
assert!(partition.sort_key_ids.is_none());
Arc::new(Self {
catalog: Arc::clone(&self.catalog),
@ -779,6 +788,11 @@ async fn update_catalog_sort_key_if_needed<R>(
// Fetch the latest partition info from the catalog
let partition = partition_lookup(repos, id).await.unwrap().unwrap();
// fecth column ids from catalog
let columns = get_table_columns_by_id(partition.table_id, repos)
.await
.unwrap();
// Similarly to what the ingester does, if there's an existing sort key in the catalog, add new
// columns onto the end
match partition.sort_key() {
@ -792,7 +806,10 @@ async fn update_catalog_sort_key_if_needed<R>(
catalog_sort_key.to_columns().collect::<Vec<_>>(),
&new_columns,
);
let updated_partition = repos
let column_ids = columns.ids_for_names(&new_columns);
repos
.partitions()
.cas_sort_key(
id,
@ -803,23 +820,21 @@ async fn update_catalog_sort_key_if_needed<R>(
.collect::<Vec<_>>(),
),
&new_columns,
&column_ids,
)
.await
.unwrap();
// Test: sort_key_ids after updating
assert!(updated_partition.sort_key_ids.is_none());
}
}
None => {
let new_columns = sort_key.to_columns().collect::<Vec<_>>();
debug!("Updating sort key from None to {:?}", &new_columns);
let updated_partition = repos
let column_ids = columns.ids_for_names(&new_columns);
repos
.partitions()
.cas_sort_key(id, None, &new_columns)
.cas_sort_key(id, None, &new_columns, &column_ids)
.await
.unwrap();
// Test: sort_key_ids after updating
assert!(updated_partition.sort_key_ids.is_none());
}
}
}

View File

@ -391,7 +391,7 @@ mod tests {
use async_trait::async_trait;
use data_types::{
partition_template::TablePartitionTemplateOverride, ColumnType, PartitionId, PartitionKey,
TableId,
SortedColumnSet, TableId,
};
use futures::StreamExt;
use generated_types::influxdata::iox::partition_template::v1::{
@ -410,7 +410,7 @@ mod tests {
let c1 = t.create_column("tag", ColumnType::Tag).await;
let c2 = t.create_column("time", ColumnType::Time).await;
let p1 = t
.create_partition_with_sort_key("k1", &["tag", "time"])
.create_partition_with_sort_key("k1", &["tag", "time"], &[c1.id(), c2.id()])
.await
.partition
.clone();
@ -865,10 +865,10 @@ mod tests {
// set sort key
let p = p
.update_sort_key(SortKey::from_columns([
c1.column.name.as_str(),
c2.column.name.as_str(),
]))
.update_sort_key(
SortKey::from_columns([c1.column.name.as_str(), c2.column.name.as_str()]),
&SortedColumnSet::from([c1.column.id.get(), c2.column.id.get()]),
)
.await;
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 1);
@ -1110,11 +1110,12 @@ mod tests {
partition_template: TablePartitionTemplateOverride::default(),
});
const N_PARTITIONS: usize = 20;
let c_id = c.column.id.get();
let mut partitions = futures::stream::iter(0..N_PARTITIONS)
.then(|i| {
let t = Arc::clone(&t);
async move {
t.create_partition_with_sort_key(&format!("p{i}"), &["time"])
t.create_partition_with_sort_key(&format!("p{i}"), &["time"], &[c_id])
.await
.partition
.transition_partition_id()

View File

@ -105,7 +105,7 @@ pub mod tests {
use super::*;
use arrow::{datatypes::DataType, record_batch::RecordBatch};
use arrow_util::assert_batches_eq;
use data_types::{ColumnType, ParquetFile};
use data_types::{ColumnType, ParquetFile, SortedColumnSet};
use datafusion_util::config::register_iox_object_store;
use iox_query::{
exec::{ExecutorType, IOxSessionContext},
@ -186,17 +186,20 @@ pub mod tests {
.join("\n");
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table = ns.create_table("table").await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
let tag1 = table.create_column("tag1", ColumnType::Tag).await;
let tag2 = table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("tag4", ColumnType::Tag).await;
let tag4 = table.create_column("tag4", ColumnType::Tag).await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("field_float", ColumnType::F64).await;
table.create_column("time", ColumnType::Time).await;
let col_time = table.create_column("time", ColumnType::Time).await;
let partition = table
.create_partition("part")
.await
.update_sort_key(SortKey::from_columns(["tag1", "tag2", "tag4", "time"]))
.update_sort_key(
SortKey::from_columns(["tag1", "tag2", "tag4", "time"]),
&SortedColumnSet::from([tag1.id(), tag2.id(), tag4.id(), col_time.id()]),
)
.await;
let builder = TestParquetFileBuilder::default().with_line_protocol(&lp);
let parquet_file = Arc::new(partition.create_parquet_file(builder).await.parquet_file);

View File

@ -232,11 +232,23 @@ fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile {
fn to_partition(p: data_types::Partition) -> Partition {
let identifier = to_partition_identifier(&p.transition_partition_id());
let array_sort_key_ids = p
.sort_key_ids
.map(|cols| cols.iter().map(|id| id.get()).collect::<Vec<_>>());
let array_sort_key_ids = match array_sort_key_ids {
None => vec![],
Some(array_sort_key_ids) => array_sort_key_ids,
};
let proto_sort_key_id = SortKeyIds { array_sort_key_ids };
Partition {
identifier: Some(identifier),
key: p.partition_key.to_string(),
table_id: p.table_id.get(),
array_sort_key: p.sort_key,
sort_key_ids: Some(proto_sort_key_id),
}
}
@ -269,7 +281,7 @@ mod tests {
.await
.unwrap();
// Test: sort_key_ids from create_or_get in catalog_service
assert!(partition.sort_key_ids.is_none());
assert!(partition.sort_key_ids().unwrap().is_empty());
let p1params = ParquetFileParams {
namespace_id: namespace.id,
table_id: table.id,