fix: update cached sort key
Once persist() has successfully updated the sort key in the catalog, set the partition sort key cache to reflect the new value.pull/24376/head
parent
3e70dc44a0
commit
86d28d3359
|
@ -251,16 +251,16 @@ impl Persister for IngesterData {
|
|||
.unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state"));
|
||||
let namespace_name = namespace.namespace_name();
|
||||
|
||||
let table_data = namespace.table_id(table_id).unwrap_or_else(|| {
|
||||
panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state")
|
||||
});
|
||||
|
||||
let partition_key;
|
||||
let table_name;
|
||||
let batch;
|
||||
let sort_key;
|
||||
let last_persisted_sequence_number;
|
||||
{
|
||||
let table_data = namespace.table_id(table_id).unwrap_or_else(|| {
|
||||
panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state")
|
||||
});
|
||||
|
||||
let mut guard = table_data.write().await;
|
||||
table_name = guard.table_name().clone();
|
||||
|
||||
|
@ -276,6 +276,7 @@ impl Persister for IngesterData {
|
|||
last_persisted_sequence_number = partition.max_persisted_sequence_number();
|
||||
};
|
||||
|
||||
let sort_key = sort_key.get().await;
|
||||
trace!(
|
||||
%shard_id,
|
||||
%namespace_id,
|
||||
|
@ -284,9 +285,9 @@ impl Persister for IngesterData {
|
|||
%table_name,
|
||||
%partition_id,
|
||||
%partition_key,
|
||||
"fetching sort key"
|
||||
?sort_key,
|
||||
"fetched sort key"
|
||||
);
|
||||
let sort_key = sort_key.get().await;
|
||||
|
||||
debug!(
|
||||
%shard_id,
|
||||
|
@ -382,6 +383,15 @@ impl Persister for IngesterData {
|
|||
})
|
||||
.await
|
||||
.expect("retry forever");
|
||||
|
||||
// Update the sort key in the partition cache.
|
||||
table_data
|
||||
.write()
|
||||
.await
|
||||
.get_partition(partition_id)
|
||||
.unwrap()
|
||||
.update_sort_key(Some(new_sort_key.clone()));
|
||||
|
||||
debug!(
|
||||
%object_store_id,
|
||||
%shard_id,
|
||||
|
@ -549,6 +559,7 @@ mod tests {
|
|||
use mutable_batch_lp::lines_to_batches;
|
||||
use object_store::memory::InMemory;
|
||||
|
||||
use schema::sort::SortKey;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
|
@ -927,6 +938,30 @@ mod tests {
|
|||
Some(SequenceNumber::new(2))
|
||||
);
|
||||
|
||||
// verify it set a sort key on the partition in the catalog
|
||||
assert_eq!(partition.sort_key, vec!["time"]);
|
||||
|
||||
// Verify the partition sort key cache was updated to reflect the new
|
||||
// catalog value.
|
||||
let cached_sort_key = data
|
||||
.shard(shard1.id)
|
||||
.unwrap()
|
||||
.namespace_by_id(namespace.id)
|
||||
.unwrap()
|
||||
.table_id(table_id)
|
||||
.unwrap()
|
||||
.write()
|
||||
.await
|
||||
.get_partition(partition_id)
|
||||
.unwrap()
|
||||
.sort_key()
|
||||
.get()
|
||||
.await;
|
||||
assert_eq!(
|
||||
cached_sort_key,
|
||||
Some(SortKey::from_columns(partition.sort_key))
|
||||
);
|
||||
|
||||
// This value should be recorded in the metrics asserted next;
|
||||
// it is less than 500 KB
|
||||
//
|
||||
|
@ -967,15 +1002,6 @@ mod tests {
|
|||
// Only the < 500 KB bucket has a count
|
||||
assert_eq!(buckets_with_counts, &[500 * 1024]);
|
||||
|
||||
// verify it set a sort key on the partition in the catalog
|
||||
let partition_info = repos
|
||||
.partitions()
|
||||
.get_by_id(partition_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(partition_info.sort_key, vec!["time"]);
|
||||
|
||||
let mem_table = n.table_data(&"mem".into()).unwrap();
|
||||
let mem_table = mem_table.read().await;
|
||||
|
||||
|
|
|
@ -326,13 +326,29 @@ impl PartitionData {
|
|||
pub(crate) fn sort_key(&self) -> &SortKeyState {
|
||||
&self.sort_key
|
||||
}
|
||||
|
||||
/// Set the cached [`SortKey`] to the specified value.
|
||||
///
|
||||
/// All subsequent calls to [`Self::sort_key`] will return
|
||||
/// [`SortKeyState::Provided`] with the `new`.
|
||||
pub(crate) fn update_sort_key(&mut self, new: Option<SortKey>) {
|
||||
self.sort_key = SortKeyState::Provided(new);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use assert_matches::assert_matches;
|
||||
use backoff::BackoffConfig;
|
||||
use data_types::ShardIndex;
|
||||
use iox_catalog::interface::Catalog;
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
|
||||
use crate::test_util::populate_catalog;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
|
@ -505,4 +521,83 @@ mod tests {
|
|||
assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 8);
|
||||
assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 8);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_provided_sort_key() {
|
||||
let starting_state =
|
||||
SortKeyState::Provided(Some(SortKey::from_columns(["banana", "time"])));
|
||||
|
||||
let mut p = PartitionData::new(
|
||||
PartitionId::new(1),
|
||||
"bananas".into(),
|
||||
ShardId::new(1),
|
||||
NamespaceId::new(42),
|
||||
TableId::new(1),
|
||||
"platanos".into(),
|
||||
starting_state,
|
||||
None,
|
||||
);
|
||||
|
||||
let want = Some(SortKey::from_columns(["banana", "platanos", "time"]));
|
||||
p.update_sort_key(want.clone());
|
||||
|
||||
assert_matches!(p.sort_key(), SortKeyState::Provided(_));
|
||||
assert_eq!(p.sort_key().get().await, want);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_deferred_sort_key() {
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let backoff_config = BackoffConfig::default();
|
||||
let catalog: Arc<dyn Catalog> =
|
||||
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
||||
// Populate the catalog with the shard / namespace / table
|
||||
let (shard_id, _ns_id, table_id) =
|
||||
populate_catalog(&*catalog, ShardIndex::new(1), "bananas", "platanos").await;
|
||||
|
||||
let partition_id = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.create_or_get("test".into(), shard_id, table_id)
|
||||
.await
|
||||
.expect("should create")
|
||||
.id;
|
||||
|
||||
catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.update_sort_key(partition_id, &["terrific"])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Read the just-created sort key (None)
|
||||
let fetcher = Arc::new(DeferredSortKey::new(
|
||||
partition_id,
|
||||
Duration::from_nanos(1),
|
||||
Arc::clone(&catalog),
|
||||
backoff_config.clone(),
|
||||
));
|
||||
|
||||
let starting_state = SortKeyState::Deferred(fetcher);
|
||||
|
||||
let mut p = PartitionData::new(
|
||||
PartitionId::new(1),
|
||||
"bananas".into(),
|
||||
ShardId::new(1),
|
||||
NamespaceId::new(42),
|
||||
TableId::new(1),
|
||||
"platanos".into(),
|
||||
starting_state,
|
||||
None,
|
||||
);
|
||||
|
||||
let want = Some(SortKey::from_columns(["banana", "platanos", "time"]));
|
||||
p.update_sort_key(want.clone());
|
||||
|
||||
assert_matches!(p.sort_key(), SortKeyState::Provided(_));
|
||||
assert_eq!(p.sort_key().get().await, want);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue