perf: streaming compaction in ingester

Reduces memory usage in the ingester during persist operations by
streaming the results of the snapshot merge/sort/dedupe directly to
the parquet file.

Prior to this commit the output of the compact was buffered in memory
before being wrote to the parquet file.
pull/24376/head
Dom Dwyer 2022-06-07 11:45:29 +01:00
parent 20e832a5d1
commit 1fc5596023
4 changed files with 57 additions and 186 deletions

View File

@ -1,7 +1,6 @@
//! This module is responsible for compacting Ingester's data
use crate::data::{PersistingBatch, QueryableBatch};
use arrow::record_batch::RecordBatch;
use data_types::{NamespaceId, PartitionInfo};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
@ -30,9 +29,6 @@ pub enum Error {
#[snafu(display("Error while executing Ingester's compaction"))]
ExecutePlan { source: DataFusionError },
#[snafu(display("Error while collecting Ingester's compaction data into a Record Batch"))]
CollectStream { source: DataFusionError },
#[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))]
DeletePredicate {
source: predicate::delete_predicate::Error,
@ -51,15 +47,17 @@ pub enum Error {
/// A specialized `Error` for Ingester's Compact errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Compact a given persisting batch
/// Return compacted data with its metadata
/// Compact a given persisting batch, returning a stream of compacted
/// [`RecordBatch`] and the associated [`IoxMetadata`].
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
pub async fn compact_persisting_batch(
time_provider: Arc<dyn TimeProvider>,
executor: &Executor,
namespace_id: i64,
partition_info: &PartitionInfo,
batch: Arc<PersistingBatch>,
) -> Result<Option<(Vec<RecordBatch>, IoxMetadata, Option<SortKey>)>> {
) -> Result<Option<(SendableRecordBatchStream, IoxMetadata, Option<SortKey>)>> {
// Nothing to compact
if batch.data.data.is_empty() {
return Ok(None);
@ -93,16 +91,6 @@ pub async fn compact_persisting_batch(
// Compact
let stream = compact(executor, Arc::clone(&batch.data), metadata_sort_key.clone()).await?;
// Collect compacted data into record batches for computing statistics
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.context(CollectStreamSnafu {})?;
// Filter empty record batches
let output_batches: Vec<_> = output_batches
.into_iter()
.filter(|b| b.num_rows() != 0)
.collect();
// Compute min and max sequence numbers
let (min_seq, max_seq) = batch.data.min_max_sequence_numbers();
@ -123,7 +111,7 @@ pub async fn compact_persisting_batch(
sort_key: Some(metadata_sort_key),
};
Ok(Some((output_batches, meta, sort_key_update)))
Ok(Some((stream, meta, sort_key_update)))
}
/// Compact a given Queryable Batch
@ -227,12 +215,16 @@ mod tests {
},
};
let (output_batches, _, _) =
let (stream, _, _) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.expect("should execute plan");
// verify compacted data
// should be the same as the input but sorted on tag1 & time
let expected_data = vec![
@ -293,12 +285,16 @@ mod tests {
},
};
let (output_batches, meta, sort_key_update) =
let (stream, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.expect("should execute plan");
// verify compacted data
// should be the same as the input but sorted on tag1 & time
let expected_data = vec![
@ -384,12 +380,16 @@ mod tests {
},
};
let (output_batches, meta, sort_key_update) =
let (stream, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.expect("should execute plan");
// verify compacted data
// should be the same as the input but sorted on the computed sort key of tag1, tag3, & time
let expected_data = vec![
@ -478,12 +478,16 @@ mod tests {
},
};
let (output_batches, meta, sort_key_update) =
let (stream, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.expect("should execute plan");
// verify compacted data
// should be the same as the input but sorted on the specified sort key of tag3, tag1, &
// time
@ -573,12 +577,16 @@ mod tests {
},
};
let (output_batches, meta, sort_key_update) =
let (stream, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.expect("should execute plan");
// verify compacted data
// should be the same as the input but sorted on the specified sort key of tag3, tag1, &
// time
@ -671,12 +679,16 @@ mod tests {
},
};
let (output_batches, meta, sort_key_update) =
let (stream, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.expect("should execute plan");
// verify compacted data
// should be the same as the input but sorted on the specified sort key of tag3, tag1, &
// time

View File

@ -4,7 +4,6 @@ use crate::{
compact::compact_persisting_batch,
lifecycle::LifecycleHandle,
partioning::{Partitioner, PartitionerError},
persist::persist,
querier_handler::query,
};
use arrow::record_batch::RecordBatch;
@ -279,7 +278,7 @@ impl Persister for IngesterData {
if let Some(persisting_batch) = persisting_batch {
// do the CPU intensive work of compaction, de-duplication and sorting
let (record_batches, iox_meta, sort_key_update) = match compact_persisting_batch(
let (record_stream, iox_meta, sort_key_update) = match compact_persisting_batch(
Arc::new(SystemProvider::new()),
&self.exec,
namespace.namespace_id.get(),
@ -300,31 +299,30 @@ impl Persister for IngesterData {
}
};
// save the compacted data to a parquet file in object storage
let file_size_and_md = Backoff::new(&self.backoff_config)
.retry_all_errors("persist to object store", || {
persist(&iox_meta, record_batches.to_vec(), self.store.clone())
// Save the compacted data to a parquet file in object storage.
//
// This call retries until it completes.
let (md, file_size) = self
.store
.upload(record_stream, &iox_meta)
.await
.expect("unexpected fatal persist error");
// Add the parquet file to the catalog until succeed
let parquet_file = iox_meta.to_parquet_file(partition_id, file_size, &md);
Backoff::new(&self.backoff_config)
.retry_all_errors("add parquet file to catalog", || async {
let mut repos = self.catalog.repositories().await;
debug!(
table_name=%iox_meta.table_name,
"adding parquet file to catalog"
);
repos.parquet_files().create(parquet_file.clone()).await
})
.await
.expect("retry forever");
if let Some((file_size, md)) = file_size_and_md {
// Add the parquet file to the catalog until succeed
let parquet_file = iox_meta.to_parquet_file(partition_id, file_size, &md);
Backoff::new(&self.backoff_config)
.retry_all_errors("add parquet file to catalog", || async {
let mut repos = self.catalog.repositories().await;
debug!(
table_name=%iox_meta.table_name,
"adding parquet file to catalog"
);
repos.parquet_files().create(parquet_file.clone()).await
})
.await
.expect("retry forever");
}
// Update the sort key in the catalog if there are additional columns
if let Some(new_sort_key) = sort_key_update {
let sort_key_string = new_sort_key.to_columns();

View File

@ -19,7 +19,6 @@ pub mod handler;
mod job;
pub mod lifecycle;
pub mod partioning;
pub mod persist;
mod poison;
pub mod querier_handler;
pub mod query;

View File

@ -1,138 +0,0 @@
//! Persist compacted data to parquet files in object storage
use arrow::record_batch::RecordBatch;
use futures::StreamExt;
use parquet_file::{
metadata::{IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
};
use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Could not serialize and persist record batches {}", source))]
Persist {
source: parquet_file::storage::UploadError,
},
}
/// A specialized `Error` for Ingester's persistence errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Write the given data to the given location in the given object storage.
///
/// Returns the persisted file size (in bytes) and metadata if a file was created.
pub async fn persist(
metadata: &IoxMetadata,
record_batches: Vec<RecordBatch>,
store: ParquetStorage,
) -> Result<Option<(usize, IoxParquetMetaData)>> {
if record_batches.is_empty() {
return Ok(None);
}
// TODO(4324): Yield the buffered RecordBatch instances as a stream as a
// temporary measure until streaming compaction is complete.
let stream = futures::stream::iter(record_batches).map(Ok);
let (meta, file_size) = store.upload(stream, metadata).await.context(PersistSnafu)?;
Ok(Some((file_size, meta)))
}
#[cfg(test)]
mod tests {
use super::*;
use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId};
use futures::{StreamExt, TryStreamExt};
use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
use iox_query::test::{raw_data, TestChunk};
use iox_time::Time;
use object_store::{memory::InMemory, DynObjectStore};
use std::sync::Arc;
use uuid::Uuid;
fn now() -> Time {
Time::from_timestamp(0, 0)
}
fn object_store() -> Arc<DynObjectStore> {
Arc::new(InMemory::new())
}
#[tokio::test]
async fn empty_list_writes_nothing() {
let metadata = IoxMetadata {
object_store_id: Uuid::new_v4(),
creation_timestamp: now(),
namespace_id: NamespaceId::new(1),
namespace_name: "mydata".into(),
sequencer_id: SequencerId::new(2),
table_id: TableId::new(3),
table_name: "temperature".into(),
partition_id: PartitionId::new(4),
partition_key: "somehour".into(),
min_sequence_number: SequenceNumber::new(5),
max_sequence_number: SequenceNumber::new(6),
compaction_level: INITIAL_COMPACTION_LEVEL,
sort_key: None,
};
let object_store = object_store();
persist(
&metadata,
vec![],
ParquetStorage::new(Arc::clone(&object_store)),
)
.await
.unwrap();
let mut list = object_store.list(None).await.unwrap();
assert!(list.next().await.is_none());
}
#[tokio::test]
async fn list_with_batches_writes_to_object_store() {
let metadata = IoxMetadata {
object_store_id: Uuid::new_v4(),
creation_timestamp: now(),
namespace_id: NamespaceId::new(1),
namespace_name: "mydata".into(),
sequencer_id: SequencerId::new(2),
table_id: TableId::new(3),
table_name: "temperature".into(),
partition_id: PartitionId::new(4),
partition_key: "somehour".into(),
min_sequence_number: SequenceNumber::new(5),
max_sequence_number: SequenceNumber::new(6),
compaction_level: INITIAL_COMPACTION_LEVEL,
sort_key: None,
};
let chunk1 = Arc::new(
TestChunk::new("t")
.with_id(1)
.with_time_column() //_with_full_stats(
.with_tag_column("tag1")
.with_i64_field_column("field_int")
.with_three_rows_of_data(),
);
let batches = raw_data(&[chunk1]).await;
assert_eq!(batches.len(), 1);
let object_store = object_store();
persist(
&metadata,
batches,
ParquetStorage::new(Arc::clone(&object_store)),
)
.await
.unwrap();
let list = object_store.list(None).await.unwrap();
let obj_store_paths: Vec<_> = list.try_collect().await.unwrap();
assert_eq!(obj_store_paths.len(), 1);
}
}