fix: do not panic when partition was removed from catalog (#6773)

Fixes https://github.com/influxdata/idpe/issues/17040 .
pull/24376/head
Marco Neumann 2023-01-31 12:54:34 +01:00 committed by GitHub
parent b682ffcb36
commit 7cadd38a3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 155 additions and 20 deletions

View File

@ -29,7 +29,7 @@ const CACHE_ID: &str = "partition";
type CacheT = Box<
dyn Cache<
K = PartitionId,
V = CachedPartition,
V = Option<CachedPartition>,
GetExtra = (Arc<CachedTable>, Option<Span>),
PeekExtra = ((), Option<Span>),
>,
@ -39,7 +39,7 @@ type CacheT = Box<
#[derive(Debug)]
pub struct PartitionCache {
cache: CacheT,
remove_if_handle: RemoveIfHandle<PartitionId, CachedPartition>,
remove_if_handle: RemoveIfHandle<PartitionId, Option<CachedPartition>>,
}
impl PartitionCache {
@ -68,17 +68,16 @@ impl PartitionCache {
.await
})
.await
.expect("retry forever")
.expect("partition gone from catalog?!");
.expect("retry forever")?;
let sort_key = partition.sort_key().map(|sort_key| {
Arc::new(PartitionSortKey::new(sort_key, &extra.column_id_map_rev))
});
CachedPartition {
Some(CachedPartition {
shard_id: partition.shard_id,
sort_key,
}
})
}
});
let loader = Arc::new(MetricsLoader::new(
@ -96,8 +95,12 @@ impl PartitionCache {
backend.add_policy(LruPolicy::new(
ram_pool,
CACHE_ID,
Arc::new(FunctionEstimator::new(|k, v: &CachedPartition| {
RamSize(size_of_val(k) + size_of_val(v) + v.size())
Arc::new(FunctionEstimator::new(|k, v: &Option<CachedPartition>| {
RamSize(
size_of_val(k)
+ size_of_val(v)
+ v.as_ref().map(|v| v.size()).unwrap_or_default(),
)
})),
));
@ -121,11 +124,11 @@ impl PartitionCache {
cached_table: Arc<CachedTable>,
partition_id: PartitionId,
span: Option<Span>,
) -> ShardId {
) -> Option<ShardId> {
self.cache
.get(partition_id, (cached_table, span))
.await
.shard_id
.map(|p| p.shard_id)
}
/// Get sort key
@ -143,7 +146,7 @@ impl PartitionCache {
&self.cache,
partition_id,
|cached_partition| {
if let Some(sort_key) = &cached_partition.sort_key {
if let Some(sort_key) = &cached_partition.and_then(|p| p.sort_key) {
should_cover
.iter()
.any(|col| !sort_key.column_set.contains(col))
@ -155,7 +158,7 @@ impl PartitionCache {
(cached_table, span),
)
.await
.sort_key
.and_then(|p| p.sort_key)
}
}
@ -261,17 +264,35 @@ mod tests {
true,
);
let id1 = cache.shard_id(Arc::clone(&cached_table), p1.id, None).await;
let id1 = cache
.shard_id(Arc::clone(&cached_table), p1.id, None)
.await
.unwrap();
assert_eq!(id1, s1.shard.id);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
let id2 = cache.shard_id(Arc::clone(&cached_table), p2.id, None).await;
let id2 = cache
.shard_id(Arc::clone(&cached_table), p2.id, None)
.await
.unwrap();
assert_eq!(id2, s2.shard.id);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
let id1 = cache.shard_id(Arc::clone(&cached_table), p1.id, None).await;
let id1 = cache
.shard_id(Arc::clone(&cached_table), p1.id, None)
.await
.unwrap();
assert_eq!(id1, s1.shard.id);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
// non-existing partition
for _ in 0..2 {
let res = cache
.shard_id(Arc::clone(&cached_table), PartitionId::new(i64::MAX), None)
.await;
assert_eq!(res, None);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
}
}
#[tokio::test]
@ -346,6 +367,20 @@ mod tests {
sort_key1b.as_ref().unwrap()
));
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
// non-existing partition
for _ in 0..2 {
let res = cache
.sort_key(
Arc::clone(&cached_table),
PartitionId::new(i64::MAX),
&Vec::new(),
None,
)
.await;
assert_eq!(res, None);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
}
}
#[tokio::test]

View File

@ -580,13 +580,46 @@ async fn execute(
decoder.finalize().await
}
/// Current partition used while decoding the ingester response stream.
#[derive(Debug)]
enum CurrentPartition {
/// There exists a partition.
Some(IngesterPartition),
/// There is no existing partition.
None,
/// Skip the current partition (e.g. because it is gone from the catalog).
Skip,
}
impl CurrentPartition {
fn take(&mut self) -> Option<IngesterPartition> {
let mut tmp = Self::None;
std::mem::swap(&mut tmp, self);
match tmp {
Self::None | Self::Skip => None,
Self::Some(p) => Some(p),
}
}
fn is_skip(&self) -> bool {
matches!(self, Self::Skip)
}
fn is_some(&self) -> bool {
matches!(self, Self::Some(_))
}
}
/// Helper to disassemble the data from the ingester Apache Flight arrow stream.
///
/// This should be used AFTER the stream was drained because we will perform some catalog IO and
/// this should likely not block the ingester.
struct IngesterStreamDecoder {
finished_partitions: HashMap<PartitionId, IngesterPartition>,
current_partition: Option<IngesterPartition>,
current_partition: CurrentPartition,
current_chunk: Option<(Schema, Vec<RecordBatch>)>,
ingester_address: Arc<str>,
catalog_cache: Arc<CatalogCache>,
@ -604,7 +637,7 @@ impl IngesterStreamDecoder {
) -> Self {
Self {
finished_partitions: HashMap::new(),
current_partition: None,
current_partition: CurrentPartition::None,
current_chunk: None,
ingester_address,
catalog_cache,
@ -620,8 +653,11 @@ impl IngesterStreamDecoder {
.current_partition
.take()
.expect("Partition should have been checked before chunk creation");
self.current_partition =
Some(current_partition.try_add_chunk(ChunkId::new(), schema, batches)?);
self.current_partition = CurrentPartition::Some(current_partition.try_add_chunk(
ChunkId::new(),
schema,
batches,
)?);
}
Ok(())
@ -705,6 +741,11 @@ impl IngesterStreamDecoder {
)
.await;
let Some(shard_id) = shard_id else {
self.current_partition = CurrentPartition::Skip;
return Ok(())
};
// Use a temporary empty partition sort key. We are going to fetch this AFTER we
// know all chunks because then we are able to detect all relevant primary key
// columns that the sort key must cover.
@ -731,9 +772,13 @@ impl IngesterStreamDecoder {
None,
partition_sort_key,
);
self.current_partition = Some(partition);
self.current_partition = CurrentPartition::Some(partition);
}
DecodedPayload::Schema(schema) => {
if self.current_partition.is_skip() {
return Ok(());
}
self.flush_chunk()?;
ensure!(
self.current_partition.is_some(),
@ -755,6 +800,10 @@ impl IngesterStreamDecoder {
self.current_chunk = Some((schema, vec![]));
}
DecodedPayload::RecordBatch(batch) => {
if self.current_partition.is_skip() {
return Ok(());
}
let current_chunk =
self.current_chunk
.as_mut()
@ -1431,6 +1480,57 @@ mod tests {
assert!(partitions.is_empty());
}
#[tokio::test]
async fn test_flight_unknown_partitions() {
let record_batch = lp_to_record_batch("table foo=1 1");
let schema = record_batch.schema();
let mock_flight_client = Arc::new(
MockFlightClient::new([(
"addr1",
Ok(MockQueryData {
results: vec![
metadata(
1000,
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
}),
),
metadata(
1001,
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
}),
),
Ok((
DecodedPayload::Schema(Arc::clone(&schema)),
IngesterQueryResponseMetadata::default(),
)),
metadata(
1002,
Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
}),
),
Ok((
DecodedPayload::Schema(Arc::clone(&schema)),
IngesterQueryResponseMetadata::default(),
)),
Ok((
DecodedPayload::RecordBatch(record_batch),
IngesterQueryResponseMetadata::default(),
)),
],
}),
)])
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let partitions = get_partitions(&ingester_conn, &[1]).await.unwrap();
assert!(partitions.is_empty());
}
#[tokio::test]
async fn no_ingester_addresses_found_is_a_configuration_error() {
let mock_flight_client = Arc::new(