diff --git a/ingester/src/data.rs b/ingester/src/data.rs index e8ee96c8a4..6a2e1dd542 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -251,16 +251,16 @@ impl Persister for IngesterData { .unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state")); let namespace_name = namespace.namespace_name(); + let table_data = namespace.table_id(table_id).unwrap_or_else(|| { + panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state") + }); + let partition_key; let table_name; let batch; let sort_key; let last_persisted_sequence_number; { - let table_data = namespace.table_id(table_id).unwrap_or_else(|| { - panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state") - }); - let mut guard = table_data.write().await; table_name = guard.table_name().clone(); @@ -276,6 +276,7 @@ impl Persister for IngesterData { last_persisted_sequence_number = partition.max_persisted_sequence_number(); }; + let sort_key = sort_key.get().await; trace!( %shard_id, %namespace_id, @@ -284,9 +285,9 @@ impl Persister for IngesterData { %table_name, %partition_id, %partition_key, - "fetching sort key" + ?sort_key, + "fetched sort key" ); - let sort_key = sort_key.get().await; debug!( %shard_id, @@ -382,6 +383,15 @@ impl Persister for IngesterData { }) .await .expect("retry forever"); + + // Update the sort key in the partition cache. + table_data + .write() + .await + .get_partition(partition_id) + .unwrap() + .update_sort_key(Some(new_sort_key.clone())); + debug!( %object_store_id, %shard_id, @@ -549,6 +559,7 @@ mod tests { use mutable_batch_lp::lines_to_batches; use object_store::memory::InMemory; + use schema::sort::SortKey; use uuid::Uuid; use super::*; @@ -927,6 +938,30 @@ mod tests { Some(SequenceNumber::new(2)) ); + // verify it set a sort key on the partition in the catalog + assert_eq!(partition.sort_key, vec!["time"]); + + // Verify the partition sort key cache was updated to reflect the new + // catalog value. + let cached_sort_key = data + .shard(shard1.id) + .unwrap() + .namespace_by_id(namespace.id) + .unwrap() + .table_id(table_id) + .unwrap() + .write() + .await + .get_partition(partition_id) + .unwrap() + .sort_key() + .get() + .await; + assert_eq!( + cached_sort_key, + Some(SortKey::from_columns(partition.sort_key)) + ); + // This value should be recorded in the metrics asserted next; // it is less than 500 KB // @@ -967,15 +1002,6 @@ mod tests { // Only the < 500 KB bucket has a count assert_eq!(buckets_with_counts, &[500 * 1024]); - // verify it set a sort key on the partition in the catalog - let partition_info = repos - .partitions() - .get_by_id(partition_id) - .await - .unwrap() - .unwrap(); - assert_eq!(partition_info.sort_key, vec!["time"]); - let mem_table = n.table_data(&"mem".into()).unwrap(); let mem_table = mem_table.read().await; diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index c91254407e..1333849165 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -326,13 +326,29 @@ impl PartitionData { pub(crate) fn sort_key(&self) -> &SortKeyState { &self.sort_key } + + /// Set the cached [`SortKey`] to the specified value. + /// + /// All subsequent calls to [`Self::sort_key`] will return + /// [`SortKeyState::Provided`] with the `new`. + pub(crate) fn update_sort_key(&mut self, new: Option) { + self.sort_key = SortKeyState::Provided(new); + } } #[cfg(test)] mod tests { + use std::time::Duration; + use arrow_util::assert_batches_sorted_eq; + use assert_matches::assert_matches; + use backoff::BackoffConfig; + use data_types::ShardIndex; + use iox_catalog::interface::Catalog; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; + use crate::test_util::populate_catalog; + use super::*; #[test] @@ -505,4 +521,83 @@ mod tests { assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 8); assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 8); } + + #[tokio::test] + async fn test_update_provided_sort_key() { + let starting_state = + SortKeyState::Provided(Some(SortKey::from_columns(["banana", "time"]))); + + let mut p = PartitionData::new( + PartitionId::new(1), + "bananas".into(), + ShardId::new(1), + NamespaceId::new(42), + TableId::new(1), + "platanos".into(), + starting_state, + None, + ); + + let want = Some(SortKey::from_columns(["banana", "platanos", "time"])); + p.update_sort_key(want.clone()); + + assert_matches!(p.sort_key(), SortKeyState::Provided(_)); + assert_eq!(p.sort_key().get().await, want); + } + + #[tokio::test] + async fn test_update_deferred_sort_key() { + let metrics = Arc::new(metric::Registry::default()); + let backoff_config = BackoffConfig::default(); + let catalog: Arc = + Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); + + // Populate the catalog with the shard / namespace / table + let (shard_id, _ns_id, table_id) = + populate_catalog(&*catalog, ShardIndex::new(1), "bananas", "platanos").await; + + let partition_id = catalog + .repositories() + .await + .partitions() + .create_or_get("test".into(), shard_id, table_id) + .await + .expect("should create") + .id; + + catalog + .repositories() + .await + .partitions() + .update_sort_key(partition_id, &["terrific"]) + .await + .unwrap(); + + // Read the just-created sort key (None) + let fetcher = Arc::new(DeferredSortKey::new( + partition_id, + Duration::from_nanos(1), + Arc::clone(&catalog), + backoff_config.clone(), + )); + + let starting_state = SortKeyState::Deferred(fetcher); + + let mut p = PartitionData::new( + PartitionId::new(1), + "bananas".into(), + ShardId::new(1), + NamespaceId::new(42), + TableId::new(1), + "platanos".into(), + starting_state, + None, + ); + + let want = Some(SortKey::from_columns(["banana", "platanos", "time"])); + p.update_sort_key(want.clone()); + + assert_matches!(p.sort_key(), SortKeyState::Provided(_)); + assert_eq!(p.sort_key().get().await, want); + } }