diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index c37db0a960..2b26d97989 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -1,7 +1,6 @@ //! This module is responsible for compacting Ingester's data use crate::data::{PersistingBatch, QueryableBatch}; -use arrow::record_batch::RecordBatch; use data_types::{NamespaceId, PartitionInfo}; use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; use iox_catalog::interface::INITIAL_COMPACTION_LEVEL; @@ -30,9 +29,6 @@ pub enum Error { #[snafu(display("Error while executing Ingester's compaction"))] ExecutePlan { source: DataFusionError }, - #[snafu(display("Error while collecting Ingester's compaction data into a Record Batch"))] - CollectStream { source: DataFusionError }, - #[snafu(display("Error while building delete predicate from start time, {}, stop time, {}, and serialized predicate, {}", min, max, predicate))] DeletePredicate { source: predicate::delete_predicate::Error, @@ -51,15 +47,17 @@ pub enum Error { /// A specialized `Error` for Ingester's Compact errors pub type Result = std::result::Result; -/// Compact a given persisting batch -/// Return compacted data with its metadata +/// Compact a given persisting batch, returning a stream of compacted +/// [`RecordBatch`] and the associated [`IoxMetadata`]. +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch pub async fn compact_persisting_batch( time_provider: Arc, executor: &Executor, namespace_id: i64, partition_info: &PartitionInfo, batch: Arc, -) -> Result, IoxMetadata, Option)>> { +) -> Result)>> { // Nothing to compact if batch.data.data.is_empty() { return Ok(None); @@ -93,16 +91,6 @@ pub async fn compact_persisting_batch( // Compact let stream = compact(executor, Arc::clone(&batch.data), metadata_sort_key.clone()).await?; - // Collect compacted data into record batches for computing statistics - let output_batches = datafusion::physical_plan::common::collect(stream) - .await - .context(CollectStreamSnafu {})?; - - // Filter empty record batches - let output_batches: Vec<_> = output_batches - .into_iter() - .filter(|b| b.num_rows() != 0) - .collect(); // Compute min and max sequence numbers let (min_seq, max_seq) = batch.data.min_max_sequence_numbers(); @@ -123,7 +111,7 @@ pub async fn compact_persisting_batch( sort_key: Some(metadata_sort_key), }; - Ok(Some((output_batches, meta, sort_key_update))) + Ok(Some((stream, meta, sort_key_update))) } /// Compact a given Queryable Batch @@ -227,12 +215,16 @@ mod tests { }, }; - let (output_batches, _, _) = + let (stream, _, _) = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await .unwrap() .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + // verify compacted data // should be the same as the input but sorted on tag1 & time let expected_data = vec![ @@ -293,12 +285,16 @@ mod tests { }, }; - let (output_batches, meta, sort_key_update) = + let (stream, meta, sort_key_update) = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await .unwrap() .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + // verify compacted data // should be the same as the input but sorted on tag1 & time let expected_data = vec![ @@ -384,12 +380,16 @@ mod tests { }, }; - let (output_batches, meta, sort_key_update) = + let (stream, meta, sort_key_update) = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await .unwrap() .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + // verify compacted data // should be the same as the input but sorted on the computed sort key of tag1, tag3, & time let expected_data = vec![ @@ -478,12 +478,16 @@ mod tests { }, }; - let (output_batches, meta, sort_key_update) = + let (stream, meta, sort_key_update) = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await .unwrap() .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + // verify compacted data // should be the same as the input but sorted on the specified sort key of tag3, tag1, & // time @@ -573,12 +577,16 @@ mod tests { }, }; - let (output_batches, meta, sort_key_update) = + let (stream, meta, sort_key_update) = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await .unwrap() .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + // verify compacted data // should be the same as the input but sorted on the specified sort key of tag3, tag1, & // time @@ -671,12 +679,16 @@ mod tests { }, }; - let (output_batches, meta, sort_key_update) = + let (stream, meta, sort_key_update) = compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch) .await .unwrap() .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + // verify compacted data // should be the same as the input but sorted on the specified sort key of tag3, tag1, & // time diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 38d16f2359..d6437af2a9 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -4,7 +4,6 @@ use crate::{ compact::compact_persisting_batch, lifecycle::LifecycleHandle, partioning::{Partitioner, PartitionerError}, - persist::persist, querier_handler::query, }; use arrow::record_batch::RecordBatch; @@ -279,7 +278,7 @@ impl Persister for IngesterData { if let Some(persisting_batch) = persisting_batch { // do the CPU intensive work of compaction, de-duplication and sorting - let (record_batches, iox_meta, sort_key_update) = match compact_persisting_batch( + let (record_stream, iox_meta, sort_key_update) = match compact_persisting_batch( Arc::new(SystemProvider::new()), &self.exec, namespace.namespace_id.get(), @@ -300,31 +299,30 @@ impl Persister for IngesterData { } }; - // save the compacted data to a parquet file in object storage - let file_size_and_md = Backoff::new(&self.backoff_config) - .retry_all_errors("persist to object store", || { - persist(&iox_meta, record_batches.to_vec(), self.store.clone()) + // Save the compacted data to a parquet file in object storage. + // + // This call retries until it completes. + let (md, file_size) = self + .store + .upload(record_stream, &iox_meta) + .await + .expect("unexpected fatal persist error"); + + // Add the parquet file to the catalog until succeed + let parquet_file = iox_meta.to_parquet_file(partition_id, file_size, &md); + Backoff::new(&self.backoff_config) + .retry_all_errors("add parquet file to catalog", || async { + let mut repos = self.catalog.repositories().await; + debug!( + table_name=%iox_meta.table_name, + "adding parquet file to catalog" + ); + + repos.parquet_files().create(parquet_file.clone()).await }) .await .expect("retry forever"); - if let Some((file_size, md)) = file_size_and_md { - // Add the parquet file to the catalog until succeed - let parquet_file = iox_meta.to_parquet_file(partition_id, file_size, &md); - Backoff::new(&self.backoff_config) - .retry_all_errors("add parquet file to catalog", || async { - let mut repos = self.catalog.repositories().await; - debug!( - table_name=%iox_meta.table_name, - "adding parquet file to catalog" - ); - - repos.parquet_files().create(parquet_file.clone()).await - }) - .await - .expect("retry forever"); - } - // Update the sort key in the catalog if there are additional columns if let Some(new_sort_key) = sort_key_update { let sort_key_string = new_sort_key.to_columns(); diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index a4534839d9..bdfc7d919f 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -19,7 +19,6 @@ pub mod handler; mod job; pub mod lifecycle; pub mod partioning; -pub mod persist; mod poison; pub mod querier_handler; pub mod query; diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs deleted file mode 100644 index bfc60e7dbe..0000000000 --- a/ingester/src/persist.rs +++ /dev/null @@ -1,138 +0,0 @@ -//! Persist compacted data to parquet files in object storage - -use arrow::record_batch::RecordBatch; -use futures::StreamExt; -use parquet_file::{ - metadata::{IoxMetadata, IoxParquetMetaData}, - storage::ParquetStorage, -}; -use snafu::{ResultExt, Snafu}; - -#[derive(Debug, Snafu)] -#[allow(missing_docs)] -pub enum Error { - #[snafu(display("Could not serialize and persist record batches {}", source))] - Persist { - source: parquet_file::storage::UploadError, - }, -} - -/// A specialized `Error` for Ingester's persistence errors -pub type Result = std::result::Result; - -/// Write the given data to the given location in the given object storage. -/// -/// Returns the persisted file size (in bytes) and metadata if a file was created. -pub async fn persist( - metadata: &IoxMetadata, - record_batches: Vec, - store: ParquetStorage, -) -> Result> { - if record_batches.is_empty() { - return Ok(None); - } - - // TODO(4324): Yield the buffered RecordBatch instances as a stream as a - // temporary measure until streaming compaction is complete. - let stream = futures::stream::iter(record_batches).map(Ok); - - let (meta, file_size) = store.upload(stream, metadata).await.context(PersistSnafu)?; - - Ok(Some((file_size, meta))) -} - -#[cfg(test)] -mod tests { - use super::*; - use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId}; - use futures::{StreamExt, TryStreamExt}; - use iox_catalog::interface::INITIAL_COMPACTION_LEVEL; - use iox_query::test::{raw_data, TestChunk}; - use iox_time::Time; - use object_store::{memory::InMemory, DynObjectStore}; - use std::sync::Arc; - use uuid::Uuid; - - fn now() -> Time { - Time::from_timestamp(0, 0) - } - - fn object_store() -> Arc { - Arc::new(InMemory::new()) - } - - #[tokio::test] - async fn empty_list_writes_nothing() { - let metadata = IoxMetadata { - object_store_id: Uuid::new_v4(), - creation_timestamp: now(), - namespace_id: NamespaceId::new(1), - namespace_name: "mydata".into(), - sequencer_id: SequencerId::new(2), - table_id: TableId::new(3), - table_name: "temperature".into(), - partition_id: PartitionId::new(4), - partition_key: "somehour".into(), - min_sequence_number: SequenceNumber::new(5), - max_sequence_number: SequenceNumber::new(6), - compaction_level: INITIAL_COMPACTION_LEVEL, - sort_key: None, - }; - let object_store = object_store(); - - persist( - &metadata, - vec![], - ParquetStorage::new(Arc::clone(&object_store)), - ) - .await - .unwrap(); - - let mut list = object_store.list(None).await.unwrap(); - assert!(list.next().await.is_none()); - } - - #[tokio::test] - async fn list_with_batches_writes_to_object_store() { - let metadata = IoxMetadata { - object_store_id: Uuid::new_v4(), - creation_timestamp: now(), - namespace_id: NamespaceId::new(1), - namespace_name: "mydata".into(), - sequencer_id: SequencerId::new(2), - table_id: TableId::new(3), - table_name: "temperature".into(), - partition_id: PartitionId::new(4), - partition_key: "somehour".into(), - min_sequence_number: SequenceNumber::new(5), - max_sequence_number: SequenceNumber::new(6), - compaction_level: INITIAL_COMPACTION_LEVEL, - sort_key: None, - }; - - let chunk1 = Arc::new( - TestChunk::new("t") - .with_id(1) - .with_time_column() //_with_full_stats( - .with_tag_column("tag1") - .with_i64_field_column("field_int") - .with_three_rows_of_data(), - ); - let batches = raw_data(&[chunk1]).await; - assert_eq!(batches.len(), 1); - - let object_store = object_store(); - - persist( - &metadata, - batches, - ParquetStorage::new(Arc::clone(&object_store)), - ) - .await - .unwrap(); - - let list = object_store.list(None).await.unwrap(); - let obj_store_paths: Vec<_> = list.try_collect().await.unwrap(); - assert_eq!(obj_store_paths.len(), 1); - } -}