From 59f65564830cac70944feb965264798c0054a9ff Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 27 Apr 2022 19:52:22 +0200 Subject: [PATCH] fix: do not create empty batches in ingester (#4443) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- ingester/src/data.rs | 75 ++++++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 24f5090b88..2f9d1efe4f 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -645,7 +645,7 @@ impl NamespaceData { return table_data .partition_data .get_mut(&partition_info.partition.partition_key) - .map(|partition_data| { + .and_then(|partition_data| { partition_data.snapshot_to_persisting_batch( partition_info.partition.sequencer_id, partition_info.partition.table_id, @@ -958,7 +958,7 @@ impl PartitionData { table_id: TableId, partition_id: PartitionId, table_name: &str, - ) -> Arc { + ) -> Option> { self.data .snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name) } @@ -1027,13 +1027,16 @@ impl PartitionData { // First apply the tombstone on all in-memeory & non-persisting data // Make a QueryableBatch for all buffer + snapshots + the given tombstone let max_sequencer_number = tombstone.sequence_number; - let query_batch = self + let query_batch = match self .data - .snapshot_to_queryable_batch(table_name, Some(tombstone.clone())); - if query_batch.is_empty() { - //No need to procedd further - return; - } + .snapshot_to_queryable_batch(table_name, Some(tombstone.clone())) + { + Some(query_batch) if !query_batch.is_empty() => query_batch, + _ => { + // No need to procedd further + return; + } + }; let (min_sequencer_number, _) = query_batch.min_max_sequence_numbers(); assert!(min_sequencer_number <= max_sequencer_number); @@ -1201,7 +1204,7 @@ impl DataBuffer { &mut self, table_name: &str, tombstone: Option, - ) -> QueryableBatch { + ) -> Option { self.snapshot() .expect("This mutable batch snapshot error should be impossible."); @@ -1212,7 +1215,13 @@ impl DataBuffer { if let Some(tombstone) = tombstone { tombstones.push(tombstone); } - QueryableBatch::new(table_name, data, tombstones) + + // only produce batch if there is any data + if data.is_empty() { + None + } else { + Some(QueryableBatch::new(table_name, data, tombstones)) + } } /// Returns all existing snapshots plus data in the buffer @@ -1230,32 +1239,36 @@ impl DataBuffer { Ok(snapshots) } - /// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. Returns error - /// if there is already a persisting batch. + /// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. + /// + /// # Panic + /// Panics if there is already a persisting batch. pub fn snapshot_to_persisting( &mut self, sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId, table_name: &str, - ) -> Arc { + ) -> Option> { if self.persisting.is_some() { panic!("Unable to snapshot while persisting. This is an unexpected state.") } - let queryable_batch = self.snapshot_to_queryable_batch(table_name, None); + if let Some(queryable_batch) = self.snapshot_to_queryable_batch(table_name, None) { + let persisting_batch = Arc::new(PersistingBatch { + sequencer_id, + table_id, + partition_id, + object_store_id: Uuid::new_v4(), + data: Arc::new(queryable_batch), + }); - let persisting_batch = Arc::new(PersistingBatch { - sequencer_id, - table_id, - partition_id, - object_store_id: Uuid::new_v4(), - data: Arc::new(queryable_batch), - }); + self.persisting = Some(Arc::clone(&persisting_batch)); - self.persisting = Some(Arc::clone(&persisting_batch)); - - persisting_batch + Some(persisting_batch) + } else { + None + } } /// Add a persiting batch into the buffer persisting list @@ -1990,12 +2003,14 @@ mod tests { // ------------------------------------------ // Persisting - let p_batch = p.snapshot_to_persisting_batch( - SequencerId::new(s_id), - TableId::new(t_id), - PartitionId::new(p_id), - table_name, - ); + let p_batch = p + .snapshot_to_persisting_batch( + SequencerId::new(s_id), + TableId::new(t_id), + PartitionId::new(p_id), + table_name, + ) + .unwrap(); // verify data assert!(p.data.buffer.is_none()); // always empty after issuing persit