fix: do not create empty batches in ingester (#4443)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-04-27 19:52:22 +02:00 committed by GitHub
parent a195973cfb
commit 59f6556483
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 45 additions and 30 deletions

View File

@ -645,7 +645,7 @@ impl NamespaceData {
return table_data
.partition_data
.get_mut(&partition_info.partition.partition_key)
.map(|partition_data| {
.and_then(|partition_data| {
partition_data.snapshot_to_persisting_batch(
partition_info.partition.sequencer_id,
partition_info.partition.table_id,
@ -958,7 +958,7 @@ impl PartitionData {
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
) -> Arc<PersistingBatch> {
) -> Option<Arc<PersistingBatch>> {
self.data
.snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name)
}
@ -1027,13 +1027,16 @@ impl PartitionData {
// First apply the tombstone on all in-memeory & non-persisting data
// Make a QueryableBatch for all buffer + snapshots + the given tombstone
let max_sequencer_number = tombstone.sequence_number;
let query_batch = self
let query_batch = match self
.data
.snapshot_to_queryable_batch(table_name, Some(tombstone.clone()));
if query_batch.is_empty() {
//No need to procedd further
return;
}
.snapshot_to_queryable_batch(table_name, Some(tombstone.clone()))
{
Some(query_batch) if !query_batch.is_empty() => query_batch,
_ => {
// No need to procedd further
return;
}
};
let (min_sequencer_number, _) = query_batch.min_max_sequence_numbers();
assert!(min_sequencer_number <= max_sequencer_number);
@ -1201,7 +1204,7 @@ impl DataBuffer {
&mut self,
table_name: &str,
tombstone: Option<Tombstone>,
) -> QueryableBatch {
) -> Option<QueryableBatch> {
self.snapshot()
.expect("This mutable batch snapshot error should be impossible.");
@ -1212,7 +1215,13 @@ impl DataBuffer {
if let Some(tombstone) = tombstone {
tombstones.push(tombstone);
}
QueryableBatch::new(table_name, data, tombstones)
// only produce batch if there is any data
if data.is_empty() {
None
} else {
Some(QueryableBatch::new(table_name, data, tombstones))
}
}
/// Returns all existing snapshots plus data in the buffer
@ -1230,32 +1239,36 @@ impl DataBuffer {
Ok(snapshots)
}
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. Returns error
/// if there is already a persisting batch.
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`.
///
/// # Panic
/// Panics if there is already a persisting batch.
pub fn snapshot_to_persisting(
&mut self,
sequencer_id: SequencerId,
table_id: TableId,
partition_id: PartitionId,
table_name: &str,
) -> Arc<PersistingBatch> {
) -> Option<Arc<PersistingBatch>> {
if self.persisting.is_some() {
panic!("Unable to snapshot while persisting. This is an unexpected state.")
}
let queryable_batch = self.snapshot_to_queryable_batch(table_name, None);
if let Some(queryable_batch) = self.snapshot_to_queryable_batch(table_name, None) {
let persisting_batch = Arc::new(PersistingBatch {
sequencer_id,
table_id,
partition_id,
object_store_id: Uuid::new_v4(),
data: Arc::new(queryable_batch),
});
let persisting_batch = Arc::new(PersistingBatch {
sequencer_id,
table_id,
partition_id,
object_store_id: Uuid::new_v4(),
data: Arc::new(queryable_batch),
});
self.persisting = Some(Arc::clone(&persisting_batch));
self.persisting = Some(Arc::clone(&persisting_batch));
persisting_batch
Some(persisting_batch)
} else {
None
}
}
/// Add a persiting batch into the buffer persisting list
@ -1990,12 +2003,14 @@ mod tests {
// ------------------------------------------
// Persisting
let p_batch = p.snapshot_to_persisting_batch(
SequencerId::new(s_id),
TableId::new(t_id),
PartitionId::new(p_id),
table_name,
);
let p_batch = p
.snapshot_to_persisting_batch(
SequencerId::new(s_id),
TableId::new(t_id),
PartitionId::new(p_id),
table_name,
)
.unwrap();
// verify data
assert!(p.data.buffer.is_none()); // always empty after issuing persit