feat: Use TransitionPartitionId everywhere in the querier

pull/24376/head
Carol (Nichols || Goulding) 2023-07-21 14:51:56 -04:00
parent 61a485227a
commit 308d7f3d4b
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
5 changed files with 113 additions and 143 deletions

View File

@ -15,9 +15,10 @@ use datafusion_util::{unbounded_memory_pool, MemoryStream};
use generated_types::influxdata::iox::partition_template::v1::PartitionTemplate;
use iox_catalog::{
interface::{
get_schema_by_id, get_table_columns_by_id, Catalog, PartitionRepo, SoftDeletedRows,
get_schema_by_id, get_table_columns_by_id, Catalog, RepoCollection, SoftDeletedRows,
},
mem::MemCatalog,
partition_lookup,
test_helpers::arbitrary_table,
};
use iox_query::{
@ -437,17 +438,14 @@ pub struct TestPartition {
impl TestPartition {
/// Update sort key.
pub async fn update_sort_key(self: &Arc<Self>, sort_key: SortKey) -> Arc<Self> {
let old_sort_key = self
.catalog
.catalog
.repositories()
.await
.partitions()
.get_by_id(self.partition.id)
.await
.unwrap()
.unwrap()
.sort_key;
let old_sort_key = partition_lookup(
self.catalog.catalog.repositories().await.as_mut(),
&self.partition.transition_partition_id(),
)
.await
.unwrap()
.unwrap()
.sort_key;
let partition = self
.catalog
@ -456,7 +454,7 @@ impl TestPartition {
.await
.partitions()
.cas_sort_key(
&TransitionPartitionId::Deprecated(self.partition.id),
&self.partition.transition_partition_id(),
Some(old_sort_key),
&sort_key.to_columns().collect::<Vec<_>>(),
)
@ -553,7 +551,12 @@ impl TestPartition {
let result = self.create_parquet_file_catalog_record(builder).await;
let mut repos = self.catalog.catalog.repositories().await;
update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await;
update_catalog_sort_key_if_needed(
repos.as_mut(),
&self.partition.transition_partition_id(),
sort_key,
)
.await;
result
}
@ -762,17 +765,15 @@ impl TestParquetFileBuilder {
}
}
async fn update_catalog_sort_key_if_needed(
partitions_catalog: &mut dyn PartitionRepo,
partition_id: PartitionId,
async fn update_catalog_sort_key_if_needed<R>(
repos: &mut R,
id: &TransitionPartitionId,
sort_key: SortKey,
) {
) where
R: RepoCollection + ?Sized,
{
// Fetch the latest partition info from the catalog
let partition = partitions_catalog
.get_by_id(partition_id)
.await
.unwrap()
.unwrap();
let partition = partition_lookup(repos, id).await.unwrap().unwrap();
// Similarly to what the ingester does, if there's an existing sort key in the catalog, add new
// columns onto the end
@ -787,9 +788,10 @@ async fn update_catalog_sort_key_if_needed(
catalog_sort_key.to_columns().collect::<Vec<_>>(),
&new_columns,
);
partitions_catalog
repos
.partitions()
.cas_sort_key(
&TransitionPartitionId::Deprecated(partition_id),
id,
Some(
catalog_sort_key
.to_columns()
@ -805,12 +807,9 @@ async fn update_catalog_sort_key_if_needed(
None => {
let new_columns = sort_key.to_columns().collect::<Vec<_>>();
debug!("Updating sort key from None to {:?}", &new_columns);
partitions_catalog
.cas_sort_key(
&TransitionPartitionId::Deprecated(partition_id),
None,
&new_columns,
)
repos
.partitions()
.cas_sort_key(id, None, &new_columns)
.await
.unwrap();
}

View File

@ -193,7 +193,7 @@ impl PartitionCache {
if invalidates {
debug!(
partition_id = %partition_id,
%partition_id,
"invalidate partition cache",
);
}
@ -496,7 +496,7 @@ mod tests {
.get_one(
Arc::clone(&cached_table),
&TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)),
&Vec::new(),
&[],
None,
)
.await;
@ -847,7 +847,7 @@ mod tests {
c2.column.name.as_str(),
]))
.await;
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 1);
// expire & fetch
let p_sort_key = p.partition.sort_key();

View File

@ -116,7 +116,7 @@ pub enum Error {
"Duplicate partition info for partition {partition_id}, ingester: {ingester_address}"
))]
DuplicatePartitionInfo {
partition_id: PartitionId,
partition_id: TransitionPartitionId,
ingester_address: String,
},
@ -130,6 +130,12 @@ pub enum Error {
PartitionHashId {
source: data_types::PartitionHashIdError,
},
#[snafu(display(
"The partition failed to specify either a `partition_id` or a `partition_hash_id`; \
at least one of these is required."
))]
NoPartitionIdentifier,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -501,7 +507,7 @@ async fn execute(
/// This should be used AFTER the stream was drained because we will perform some catalog IO and
/// this should likely not block the ingester.
struct IngesterStreamDecoder {
finished_partitions: HashMap<PartitionId, IngesterPartition>,
finished_partitions: HashMap<TransitionPartitionId, IngesterPartition>,
current_partition: Option<IngesterPartition>,
current_chunk: Option<(Schema, Vec<RecordBatch>)>,
ingester_address: Arc<str>,
@ -544,7 +550,7 @@ impl IngesterStreamDecoder {
if let Some(current_partition) = self.current_partition.take() {
self.finished_partitions
.insert(current_partition.partition_id, current_partition);
.insert(current_partition.partition_id.clone(), current_partition);
}
Ok(())
@ -561,15 +567,26 @@ impl IngesterStreamDecoder {
// new partition announced
self.flush_partition()?;
// This unwrap is temporary for this commit only due to the far-reaching
// implications that changing this has
let partition_id = PartitionId::new(md.partition_id.unwrap());
let deprecated_partition_id = md.partition_id.map(PartitionId::new);
let partition_hash_id = md
.partition_hash_id
.map(|bytes| {
PartitionHashId::try_from(&bytes[..]).context(PartitionHashIdSnafu)
})
.transpose()?;
let partition_id = match (deprecated_partition_id, partition_hash_id) {
// If the ingester sends a hash ID for this partition, use it no matter what
// was sent or not for the partition ID.
(_, Some(hash_id)) => TransitionPartitionId::Deterministic(hash_id),
// If the ingester only sends a partition ID, this is an old-style partition
// that doesn't have a hash ID in the catalog so we need to use the database ID.
(Some(id), None) => TransitionPartitionId::Deprecated(id),
// The ingester needs to send at least one identifier for the partition. This
// should be impossible as long as the ingester is behaving.
(None, None) => return NoPartitionIdentifierSnafu.fail(),
};
ensure!(
!self.finished_partitions.contains_key(&partition_id),
DuplicatePartitionInfoSnafu {
@ -586,7 +603,6 @@ impl IngesterStreamDecoder {
let partition = IngesterPartition::new(
ingester_uuid,
partition_id,
partition_hash_id,
md.completed_persistence_count,
);
self.current_partition = Some(partition);
@ -771,13 +787,8 @@ pub struct IngesterPartition {
/// to refresh the catalog cache or not.
ingester_uuid: Uuid,
/// The database-assigned partition identifier. In the process of being deprecated.
partition_id: PartitionId,
/// Deterministic partition identifier based on the table ID and partition key. Not all
/// ingester responses will contain this value yet.
#[allow(dead_code)] // Nothing is using this value yet.
partition_hash_id: Option<PartitionHashId>,
/// The partition identifier.
partition_id: TransitionPartitionId,
/// The number of Parquet files this ingester UUID has persisted for this partition.
completed_persistence_count: u64,
@ -786,18 +797,16 @@ pub struct IngesterPartition {
}
impl IngesterPartition {
/// Creates a new IngesterPartition, translating the passed
/// `RecordBatches` into the correct types
/// Creates a new IngesterPartition, translating the passed `RecordBatches` into the correct
/// types
pub fn new(
ingester_uuid: Uuid,
partition_id: PartitionId,
partition_hash_id: Option<PartitionHashId>,
partition_id: TransitionPartitionId,
completed_persistence_count: u64,
) -> Self {
Self {
ingester_uuid,
partition_id,
partition_hash_id,
completed_persistence_count,
chunks: vec![],
}
@ -828,7 +837,7 @@ impl IngesterPartition {
let chunk = IngesterChunk {
chunk_id,
partition_id: self.transition_partition_id(),
partition_id: self.partition_id.clone(),
schema: expected_schema,
batches,
stats: None,
@ -860,8 +869,8 @@ impl IngesterPartition {
}
}
pub(crate) fn transition_partition_id(&self) -> TransitionPartitionId {
TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref()))
pub(crate) fn partition_id(&self) -> TransitionPartitionId {
self.partition_id.clone()
}
pub(crate) fn ingester_uuid(&self) -> Uuid {
@ -1138,8 +1147,7 @@ mod tests {
assert_eq!(partitions.len(), 1);
let p = &partitions[0];
assert_eq!(p.partition_id.get(), 1);
assert_eq!(p.partition_hash_id.as_ref().unwrap(), &partition_hash_id(1));
assert_eq!(p.partition_id, partition_id(1));
assert_eq!(p.chunks.len(), 0);
assert_eq!(p.ingester_uuid, ingester_uuid);
assert_eq!(p.completed_persistence_count, 5);
@ -1174,8 +1182,10 @@ mod tests {
assert_eq!(partitions.len(), 1);
let p = &partitions[0];
assert_eq!(p.partition_id.get(), 1);
assert!(p.partition_hash_id.is_none());
assert_eq!(
p.partition_id,
TransitionPartitionId::Deprecated(PartitionId::new(1))
);
assert_eq!(p.chunks.len(), 0);
assert_eq!(p.ingester_uuid, ingester_uuid);
assert_eq!(p.completed_persistence_count, 5);
@ -1324,11 +1334,7 @@ mod tests {
assert_eq!(partitions.len(), 3);
let p1 = &partitions[0];
assert_eq!(p1.partition_id.get(), 1);
assert_eq!(
p1.partition_hash_id.as_ref().unwrap(),
&partition_hash_id(1)
);
assert_eq!(p1.partition_id, partition_id(1));
assert_eq!(p1.chunks.len(), 2);
assert_eq!(p1.chunks[0].schema().as_arrow(), schema_1_1);
assert_eq!(p1.chunks[0].batches.len(), 2);
@ -1338,27 +1344,21 @@ mod tests {
assert_eq!(p1.chunks[1].batches.len(), 1);
assert_eq!(p1.chunks[1].batches[0].schema(), schema_1_2);
let p2 = &partitions[1];
assert_eq!(p2.partition_id.get(), 2);
assert_eq!(
p2.partition_hash_id.as_ref().unwrap(),
&partition_hash_id(2)
);
assert_eq!(p2.chunks.len(), 1);
assert_eq!(p2.chunks[0].schema().as_arrow(), schema_2_1);
assert_eq!(p2.chunks[0].batches.len(), 1);
assert_eq!(p2.chunks[0].batches[0].schema(), schema_2_1);
let p3 = &partitions[2];
assert_eq!(p3.partition_id.get(), 3);
assert_eq!(
p3.partition_hash_id.as_ref().unwrap(),
&partition_hash_id(3)
);
// The Partition for table 3 deterministically sorts second
let p3 = &partitions[1];
assert_eq!(p3.partition_id, partition_id(3));
assert_eq!(p3.chunks.len(), 1);
assert_eq!(p3.chunks[0].schema().as_arrow(), schema_3_1);
assert_eq!(p3.chunks[0].batches.len(), 1);
assert_eq!(p3.chunks[0].batches[0].schema(), schema_3_1);
// The Partition for table 2 deterministically sorts third
let p2 = &partitions[2];
assert_eq!(p2.partition_id, partition_id(2));
assert_eq!(p2.chunks.len(), 1);
assert_eq!(p2.chunks[0].schema().as_arrow(), schema_2_1);
assert_eq!(p2.chunks[0].batches.len(), 1);
assert_eq!(p2.chunks[0].batches[0].schema(), schema_2_1);
}
#[tokio::test]
@ -1471,29 +1471,17 @@ mod tests {
let p1 = &partitions[0];
assert_eq!(p1.ingester_uuid, ingester_uuid1);
assert_eq!(p1.completed_persistence_count, 0);
assert_eq!(p1.partition_id.get(), 1);
assert_eq!(
p1.partition_hash_id.as_ref().unwrap(),
&partition_hash_id(1)
);
assert_eq!(p1.partition_id, partition_id(1));
let p2 = &partitions[1];
let p2 = &partitions[2];
assert_eq!(p2.ingester_uuid, ingester_uuid1);
assert_eq!(p2.completed_persistence_count, 42);
assert_eq!(p2.partition_id.get(), 2);
assert_eq!(
p2.partition_hash_id.as_ref().unwrap(),
&partition_hash_id(2)
);
assert_eq!(p2.partition_id, partition_id(2));
let p3 = &partitions[2];
let p3 = &partitions[1];
assert_eq!(p3.ingester_uuid, ingester_uuid2);
assert_eq!(p3.completed_persistence_count, 9000);
assert_eq!(p3.partition_id.get(), 3);
assert_eq!(
p3.partition_hash_id.as_ref().unwrap(),
&partition_hash_id(3)
);
assert_eq!(p3.partition_id, partition_id(3));
}
#[tokio::test]
@ -1630,27 +1618,26 @@ mod tests {
type MockFlightResult = Result<(DecodedPayload, IngesterQueryResponseMetadata), FlightError>;
fn metadata(
partition_id: i64,
table_id: i64,
ingester_uuid: impl Into<String>,
completed_persistence_count: u64,
) -> MockFlightResult {
Ok((
DecodedPayload::None,
IngesterQueryResponseMetadata {
// Using the partition_id as both the PartitionId and the TableId in the
// PartitionHashId is a temporary way to reduce duplication in tests where
// the important part is which batches are in the same partition and which
// batches are in a different partition, not what the actual identifier
// values are. This will go away when the ingester no longer sends
// PartitionIds.
partition_id: Some(partition_id),
partition_hash_id: Some(partition_hash_id(partition_id).as_bytes().to_owned()),
partition_id: None,
partition_hash_id: Some(partition_hash_id(table_id).as_bytes().to_owned()),
ingester_uuid: ingester_uuid.into(),
completed_persistence_count,
},
))
}
// Create a `TransitionPartitionId` from the given table ID and the arbitrary partition key.
fn partition_id(table_id: i64) -> TransitionPartitionId {
TransitionPartitionId::Deterministic(partition_hash_id(table_id))
}
fn partition_hash_id(table_id: i64) -> PartitionHashId {
PartitionHashId::new(TableId::new(table_id), &PartitionKey::from("arbitrary"))
}
@ -1760,17 +1747,9 @@ mod tests {
for case in cases {
// Construct a partition and ensure it doesn't error
let ingester_partition = IngesterPartition::new(
ingester_uuid,
PartitionId::new(1),
Some(PartitionHashId::new(
TableId::new(1),
&PartitionKey::from("arbitrary"),
)),
0,
)
.try_add_chunk(ChunkId::new(), expected_schema.clone(), vec![case])
.unwrap();
let ingester_partition = IngesterPartition::new(ingester_uuid, partition_id(1), 0)
.try_add_chunk(ChunkId::new(), expected_schema.clone(), vec![case])
.unwrap();
for batch in &ingester_partition.chunks[0].batches {
assert_eq!(batch.schema(), expected_schema.as_arrow());
@ -1790,17 +1769,9 @@ mod tests {
let batch =
RecordBatch::try_from_iter(vec![("b", int64_array()), ("time", ts_array())]).unwrap();
let err = IngesterPartition::new(
ingester_uuid,
PartitionId::new(1),
Some(PartitionHashId::new(
TableId::new(1),
&PartitionKey::from("arbitrary"),
)),
0,
)
.try_add_chunk(ChunkId::new(), expected_schema, vec![batch])
.unwrap_err();
let err = IngesterPartition::new(ingester_uuid, partition_id(1), 0)
.try_add_chunk(ChunkId::new(), expected_schema, vec![batch])
.unwrap_err();
assert_matches!(err, Error::RecordBatchType { .. });
}

View File

@ -282,7 +282,7 @@ impl QuerierTable {
let chunks = partitions
.into_iter()
.filter_map(|mut c| {
let cached_partition = cached_partitions.get(&c.transition_partition_id())?;
let cached_partition = cached_partitions.get(&c.partition_id())?;
c.set_partition_column_ranges(&cached_partition.column_ranges);
Some(c)
})
@ -328,13 +328,14 @@ impl QuerierTable {
let mut should_cover: HashMap<TransitionPartitionId, HashSet<ColumnId>> =
HashMap::with_capacity(ingester_partitions.len());
// For ingester partitions we only need the column ranges -- which are static -- not the sort key. So it is
// sufficient to collect the partition IDs.
// For ingester partitions we only need the column ranges -- which are static -- not the
// sort key. So it is sufficient to collect the partition IDs.
for p in ingester_partitions {
should_cover.entry(p.transition_partition_id()).or_default();
should_cover.entry(p.partition_id()).or_default();
}
// For parquet files we must ensure that the -- potentially evolving -- sort key coveres the primary key.
// For parquet files we must ensure that the -- potentially evolving -- sort key coveres
// the primary key.
let pk = cached_table
.primary_key_column_ids
.iter()
@ -878,7 +879,7 @@ mod tests {
.with_lp(["table foo=1 1"]);
// set up performs a few lookups
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 4);
let querier_table = TestQuerierTable::new(&catalog, &table)
.await
@ -886,7 +887,7 @@ mod tests {
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 5);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 4);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_hash_id_batch",
@ -896,7 +897,7 @@ mod tests {
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 5);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 4);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_hash_id_batch",
@ -909,7 +910,7 @@ mod tests {
TestParquetFileBuilder::default().with_line_protocol("table,tag1=a foo=1,bar=1 11"),
)
.await;
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 5);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_hash_id_batch",
@ -919,7 +920,7 @@ mod tests {
// file not visible yet
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 5);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 5);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_hash_id_batch",
@ -933,7 +934,7 @@ mod tests {
.with_ingester_partition(ingester_partition_builder.build());
let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 6);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_hash_id", 5);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_hash_id_batch",

View File

@ -104,8 +104,7 @@ impl IngesterPartitionBuilder {
let mut part = IngesterPartition::new(
Uuid::new_v4(),
self.partition.partition.id,
self.partition.partition.hash_id().cloned(),
self.partition.partition.transition_partition_id(),
0,
)
.try_add_chunk(