diff --git a/Cargo.lock b/Cargo.lock index c9b8d8c2b6..38e191c851 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2495,6 +2495,7 @@ dependencies = [ "observability_deps", "once_cell", "parking_lot 0.12.1", + "parquet_file", "paste", "pin-project", "predicate", @@ -2502,6 +2503,7 @@ dependencies = [ "rand", "schema", "service_grpc_catalog", + "sharder", "tempfile", "test_helpers", "thiserror", diff --git a/ingester2/Cargo.toml b/ingester2/Cargo.toml index 312bf3d456..a9a7bc5a84 100644 --- a/ingester2/Cargo.toml +++ b/ingester2/Cargo.toml @@ -31,12 +31,14 @@ object_store = "0.5.1" observability_deps = { version = "0.1.0", path = "../observability_deps" } once_cell = "1.16.0" parking_lot = "0.12.1" +parquet_file = { version = "0.1.0", path = "../parquet_file" } pin-project = "1.0.12" predicate = { version = "0.1.0", path = "../predicate" } prost = { version = "0.11.2", default-features = false, features = ["std"] } rand = "0.8.5" schema = { version = "0.1.0", path = "../schema" } service_grpc_catalog = { version = "0.1.0", path = "../service_grpc_catalog" } +sharder = { version = "0.1.0", path = "../sharder" } thiserror = "1.0.37" tokio = { version = "1.22", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } tonic = "0.8.3" diff --git a/ingester2/src/buffer_tree/partition.rs b/ingester2/src/buffer_tree/partition.rs index a22bbb32e3..63e5f19b34 100644 --- a/ingester2/src/buffer_tree/partition.rs +++ b/ingester2/src/buffer_tree/partition.rs @@ -15,7 +15,7 @@ use super::{namespace::NamespaceName, table::TableName}; use crate::{deferred_load::DeferredLoad, query_adaptor::QueryAdaptor}; mod buffer; -mod persisting; +pub(crate) mod persisting; pub(crate) mod resolver; /// The load state of the [`SortKey`] for a given partition. diff --git a/ingester2/src/lib.rs b/ingester2/src/lib.rs index 0235aace75..07c662e1b3 100644 --- a/ingester2/src/lib.rs +++ b/ingester2/src/lib.rs @@ -75,6 +75,7 @@ mod arcmap; mod buffer_tree; mod deferred_load; mod dml_sink; +mod persist; mod query; mod query_adaptor; pub(crate) mod server; diff --git a/ingester2/src/persist/actor.rs b/ingester2/src/persist/actor.rs new file mode 100644 index 0000000000..327d4c2450 --- /dev/null +++ b/ingester2/src/persist/actor.rs @@ -0,0 +1,104 @@ +use std::sync::Arc; + +use iox_catalog::interface::Catalog; +use iox_query::exec::Executor; +use parquet_file::storage::ParquetStorage; +use sharder::JumpHash; +use tokio::{sync::mpsc, task::JoinHandle}; + +use super::context::{Context, PersistRequest}; + +/// An actor implementation that fans out incoming persistence jobs to a set of +/// workers. +/// +/// See [`PersistHandle`]. +/// +/// [`PersistHandle`]: super::handle::PersistHandle +#[must_use = "PersistActor must be ran by calling run()"] +pub(crate) struct PersistActor { + rx: mpsc::Receiver, + + /// THe state/dependencies shared across all worker tasks. + inner: Arc, + + /// A consistent hash implementation used to consistently map buffers from + /// one partition to the same worker queue. + /// + /// This ensures persistence is serialised per-partition, but in parallel + /// across partitions (up to the number of worker tasks). + persist_queues: JumpHash>, + + /// Task handles for the worker tasks, aborted on drop of this + /// [`PersistActor`]. + tasks: Vec>, +} + +impl Drop for PersistActor { + fn drop(&mut self) { + // Stop all background tasks when the actor goes out of scope. + self.tasks.iter().for_each(|v| v.abort()) + } +} + +impl PersistActor { + pub(super) fn new( + rx: mpsc::Receiver, + exec: Executor, + store: ParquetStorage, + catalog: Arc, + workers: usize, + worker_queue_depth: usize, + ) -> Self { + let inner = Arc::new(Inner { + exec, + store, + catalog, + }); + + let (tx_handles, tasks): (Vec<_>, Vec<_>) = (0..workers) + .map(|_| { + let inner = Arc::clone(&inner); + let (tx, rx) = mpsc::channel(worker_queue_depth); + (tx, tokio::spawn(run_task(inner, rx))) + }) + .unzip(); + + // TODO(test): N workers running + // TODO(test): queue depths + + Self { + rx, + inner, + persist_queues: JumpHash::new(tx_handles), + tasks, + } + } + + /// Execute this actor task and block until all [`PersistHandle`] are + /// dropped. + /// + /// [`PersistHandle`]: super::handle::PersistHandle + pub(crate) async fn run(mut self) { + while let Some(req) = self.rx.recv().await { + let tx = self.persist_queues.hash(req.partition_id()); + tx.send(req).await.expect("persist worker has stopped;") + } + } +} + +pub(super) struct Inner { + pub(super) exec: Executor, + pub(super) store: ParquetStorage, + pub(super) catalog: Arc, +} + +async fn run_task(inner: Arc, mut rx: mpsc::Receiver) { + while let Some(req) = rx.recv().await { + let ctx = Context::new(req, Arc::clone(&inner)); + + let compacted = ctx.compact().await; + let (sort_key_update, parquet_table_data) = ctx.upload(compacted).await; + ctx.update_database(sort_key_update, parquet_table_data) + .await; + } +} diff --git a/ingester2/src/persist/compact.rs b/ingester2/src/persist/compact.rs new file mode 100644 index 0000000000..effa1da88a --- /dev/null +++ b/ingester2/src/persist/compact.rs @@ -0,0 +1,1114 @@ +use std::sync::Arc; + +use datafusion::physical_plan::SendableRecordBatchStream; +use iox_query::{ + exec::{Executor, ExecutorType}, + frontend::reorg::ReorgPlanner, + QueryChunk, QueryChunkMeta, +}; +use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey}; + +use crate::{buffer_tree::table::TableName, query_adaptor::QueryAdaptor}; + +/// Result of calling [`compact_persisting_batch`] +pub(super) struct CompactedStream { + /// A stream of compacted, deduplicated + /// [`RecordBatch`](arrow::record_batch::RecordBatch)es + pub(super) stream: SendableRecordBatchStream, + + /// The sort key value the catalog should be updated to, if any. + /// + /// If returned, the compaction required extending the partition's + /// [`SortKey`] (typically because new columns were in this parquet file + /// that were not in previous files). + pub(super) catalog_sort_key_update: Option, + + /// The sort key to be used for compaction. + /// + /// This should be used in the [`IoxMetadata`] for the compacted data, and + /// may be a subset of the full sort key contained in + /// [`Self::catalog_sort_key_update`] (or the existing sort key in the + /// catalog). + /// + /// [`IoxMetadata`]: parquet_file::metadata::IoxMetadata + pub(super) data_sort_key: SortKey, +} + +impl std::fmt::Debug for CompactedStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CompactedStream") + .field("stream", &"") + .field("data_sort_key", &self.data_sort_key) + .field("catalog_sort_key_update", &self.catalog_sort_key_update) + .finish() + } +} + +/// Compact a given batch into a [`CompactedStream`] or `None` if there is no +/// data to compact, returning an updated sort key, if any. +pub(super) async fn compact_persisting_batch( + executor: &Executor, + sort_key: Option, + table_name: TableName, + batch: QueryAdaptor, +) -> Result { + assert!(!batch.record_batches().is_empty()); + + // Get sort key from the catalog or compute it from + // cardinality. + let (data_sort_key, catalog_sort_key_update) = match sort_key { + Some(sk) => { + // Remove any columns not present in this data from the + // sort key that will be used to compact this parquet file + // (and appear in its metadata) + // + // If there are any new columns, add them to the end of the sort key in the catalog and + // return that to be updated in the catalog. + adjust_sort_key_columns(&sk, &batch.schema().primary_key()) + } + None => { + let sort_key = compute_sort_key( + batch.schema().as_ref(), + batch.record_batches().iter().map(|sb| sb.as_ref()), + ); + // Use the sort key computed from the cardinality as the sort key for this parquet + // file's metadata, also return the sort key to be stored in the catalog + (sort_key.clone(), Some(sort_key)) + } + }; + + let batch = Arc::new(batch); + + // Build logical plan for compaction + let ctx = executor.new_context(ExecutorType::Reorg); + let logical_plan = ReorgPlanner::new(ctx.child_ctx("ReorgPlanner")) + .compact_plan( + table_name.into(), + batch.schema(), + [batch as Arc], + data_sort_key.clone(), + ) + .unwrap(); + + // Build physical plan + let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap(); + + // Execute the plan and return the compacted stream + let output_stream = ctx.execute_stream(physical_plan).await.unwrap(); + + Ok(CompactedStream { + stream: output_stream, + catalog_sort_key_update, + data_sort_key, + }) +} + +#[cfg(test)] +mod tests { + use arrow::record_batch::RecordBatch; + use arrow_util::assert_batches_eq; + use data_types::PartitionId; + use iox_query::test::{raw_data, TestChunk}; + use mutable_batch_lp::lines_to_batches; + use schema::Projection; + + use super::*; + + // this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782 + // where if sending in a single row it would compact into an output of two batches, one of + // which was empty, which would cause this to panic. + #[tokio::test] + async fn test_compact_batch_on_one_record_batch_with_one_row() { + // create input data + let batch = lines_to_batches("cpu bar=2 20", 0) + .unwrap() + .get("cpu") + .unwrap() + .to_arrow(Projection::All) + .unwrap(); + + let batch = QueryAdaptor::new(PartitionId::new(1), vec![Arc::new(batch)]); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["time"]; + assert_eq!(expected_pk, pk); + + // compact + let exc = Executor::new(1); + let CompactedStream { stream, .. } = + compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on tag1 & time + let expected_data = vec![ + "+-----+--------------------------------+", + "| bar | time |", + "+-----+--------------------------------+", + "| 2 | 1970-01-01T00:00:00.000000020Z |", + "+-----+--------------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + } + + #[tokio::test] + async fn test_compact_batch_on_one_record_batch_no_dupilcates() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_one_record_batch_with_influxtype_no_duplicates().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "time"]; + assert_eq!(expected_pk, pk); + + // compact + let exc = Executor::new(1); + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on tag1 & time + let expected_data = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 70 | UT | 1970-01-01T00:00:00.000020Z |", + "| 10 | VT | 1970-01-01T00:00:00.000010Z |", + "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!(data_sort_key, SortKey::from_columns(["tag1", "time"])); + + assert_eq!( + catalog_sort_key_update.unwrap(), + SortKey::from_columns(["tag1", "time"]) + ); + } + + #[tokio::test] + async fn test_compact_batch_no_sort_key() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag3", "time"]; + assert_eq!(expected_pk, pk); + + let exc = Executor::new(1); + + // NO SORT KEY from the catalog here, first persisting batch + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on the computed sort key of tag1, tag3, & time + let expected_data = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!( + data_sort_key, + SortKey::from_columns(["tag1", "tag3", "time"]) + ); + + assert_eq!( + catalog_sort_key_update.unwrap(), + SortKey::from_columns(["tag1", "tag3", "time"]) + ); + } + + #[tokio::test] + async fn test_compact_batch_with_specified_sort_key() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag3", "time"]; + assert_eq!(expected_pk, pk); + + let exc = Executor::new(1); + + // SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog + // this is NOT what the computed sort key would be based on this data's cardinality + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch( + &exc, + Some(SortKey::from_columns(["tag3", "tag1", "time"])), + "test_table".into(), + batch, + ) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on the specified sort key of tag3, tag1, & + // time + let expected_data = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!( + data_sort_key, + SortKey::from_columns(["tag3", "tag1", "time"]) + ); + + // The sort key does not need to be updated in the catalog + assert!(catalog_sort_key_update.is_none()); + } + + #[tokio::test] + async fn test_compact_batch_new_column_for_sort_key() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag3", "time"]; + assert_eq!(expected_pk, pk); + + let exc = Executor::new(1); + + // SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog + // this is NOT what the computed sort key would be based on this data's cardinality + // The new column, tag1, should get added just before the time column + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch( + &exc, + Some(SortKey::from_columns(["tag3", "time"])), + "test_table".into(), + batch, + ) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on the specified sort key of tag3, tag1, & + // time + let expected_data = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!( + data_sort_key, + SortKey::from_columns(["tag3", "tag1", "time"]) + ); + + // The sort key in the catalog needs to be updated to include the new column + assert_eq!( + catalog_sort_key_update.unwrap(), + SortKey::from_columns(["tag3", "tag1", "time"]) + ); + } + + #[tokio::test] + async fn test_compact_batch_missing_column_for_sort_key() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag3", "time"]; + assert_eq!(expected_pk, pk); + + let exc = Executor::new(1); + + // SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog + // this is NOT what the computed sort key would be based on this data's cardinality + // This contains a sort key, "tag4", that doesn't appear in the data. + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch( + &exc, + Some(SortKey::from_columns(["tag3", "tag1", "tag4", "time"])), + "test_table".into(), + batch, + ) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on the specified sort key of tag3, tag1, & + // time + let expected_data = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!( + data_sort_key, + SortKey::from_columns(["tag3", "tag1", "time"]) + ); + + // The sort key in the catalog should NOT get a new value + assert!(catalog_sort_key_update.is_none()); + } + + #[tokio::test] + async fn test_compact_one_row_batch() { + test_helpers::maybe_start_logging(); + + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_one_row_record_batch_with_influxtype().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream.stream) + .await + .unwrap(); + + // verify no empty record batches - bug #3782 + assert_eq!(output_batches.len(), 1); + + // verify compacted data + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | MA | 1970-01-01T00:00:00.000001Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + async fn test_compact_one_batch_with_duplicates() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_one_record_batch_with_influxtype_duplicates().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream.stream) + .await + .unwrap(); + // verify no empty record bacthes - bug #3782 + assert_eq!(output_batches.len(), 2); + assert_eq!(output_batches[0].num_rows(), 6); + assert_eq!(output_batches[1].num_rows(), 1); + + // verify compacted data + // data is sorted and all duplicates are removed + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + async fn test_compact_many_batches_same_columns_with_duplicates() { + // create many-batches input data + let batch = QueryAdaptor::new(PartitionId::new(1), create_batches_with_influxtype().await); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap() + .stream; + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + // verify compacted data + // data is sorted and all duplicates are removed + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 5 | MT | 1970-01-01T00:00:00.000005Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + async fn test_compact_many_batches_different_columns_with_duplicates() { + // create many-batches input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_columns().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag2", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap() + .stream; + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + // verify compacted data + // data is sorted and all duplicates are removed + let expected = vec![ + "+-----------+------------+------+------+--------------------------------+", + "| field_int | field_int2 | tag1 | tag2 | time |", + "+-----------+------------+------+------+--------------------------------+", + "| 10 | | AL | | 1970-01-01T00:00:00.000000050Z |", + "| 100 | 100 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 70 | | CT | | 1970-01-01T00:00:00.000000100Z |", + "| 70 | | CT | | 1970-01-01T00:00:00.000000500Z |", + "| 70 | 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |", + "| 30 | | MT | | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | | MT | | 1970-01-01T00:00:00.000001Z |", + "| 1000 | | MT | | 1970-01-01T00:00:00.000002Z |", + "| 20 | | MT | | 1970-01-01T00:00:00.000007Z |", + "| 5 | 5 | MT | AL | 1970-01-01T00:00:00.000005Z |", + "| 10 | 10 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 1000 | 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |", + "+-----------+------------+------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + async fn test_compact_many_batches_different_columns_different_order_with_duplicates() { + // create many-batches input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_columns_different_order().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag2", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap() + .stream; + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + // verify compacted data + // data is sorted and all duplicates are removed + // CORRECT RESULT + let expected = vec![ + "+-----------+------+------+--------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+--------------------------------+", + "| 5 | | AL | 1970-01-01T00:00:00.000005Z |", + "| 10 | | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | | CT | 1970-01-01T00:00:00.000000100Z |", + "| 1000 | | CT | 1970-01-01T00:00:00.000001Z |", + "| 100 | | MA | 1970-01-01T00:00:00.000000050Z |", + "| 10 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |", + "| 70 | CT | CT | 1970-01-01T00:00:00.000000500Z |", + "| 30 | MT | AL | 1970-01-01T00:00:00.000000005Z |", + "| 20 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |", + "| 1000 | MT | CT | 1970-01-01T00:00:00.000002Z |", + "+-----------+------+------+--------------------------------+", + ]; + + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + #[should_panic(expected = "Schemas compatible")] + async fn test_compact_many_batches_same_columns_different_types() { + // create many-batches input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_same_columns_different_type().await, + ); + + // the schema merge should throw a panic + batch.schema(); + } + + async fn create_one_row_record_batch_with_influxtype() -> Vec> { + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_one_row_of_data(), + ); + let batches = raw_data(&[chunk1]).await; + + // Make sure all data in one record batch + assert_eq!(batches.len(), 1); + + // verify data + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | MA | 1970-01-01T00:00:00.000001Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &batches); + + let batches: Vec<_> = batches.iter().map(|r| Arc::new(r.clone())).collect(); + batches + } + + async fn create_one_record_batch_with_influxtype_no_duplicates() -> Vec> { + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_three_rows_of_data(), + ); + let batches = raw_data(&[chunk1]).await; + + // Make sure all data in one record batch + assert_eq!(batches.len(), 1); + + // verify data + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", + "| 10 | VT | 1970-01-01T00:00:00.000010Z |", + "| 70 | UT | 1970-01-01T00:00:00.000020Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &batches); + + let batches: Vec<_> = batches.iter().map(|r| Arc::new(r.clone())).collect(); + batches + } + + async fn create_one_record_batch_with_influxtype_duplicates() -> Vec> { + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() //_with_full_stats( + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batches = raw_data(&[chunk1]).await; + + // Make sure all data in one record batch + assert_eq!(batches.len(), 1); + + // verify data + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &batches); + + let batches: Vec<_> = batches.iter().map(|r| Arc::new(r.clone())).collect(); + batches + } + + /// RecordBatches with knowledge of influx metadata + async fn create_batches_with_influxtype() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // chunk1 with 10 rows and 3 columns + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1)); + + // chunk2 having duplicate data with chunk 1 + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + // verify data from both batches + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000005Z |", + "+-----------+------+--------------------------------+", + ]; + let b: Vec<_> = batches.iter().map(|b| (**b).clone()).collect(); + assert_batches_eq!(&expected, &b); + + batches + } + + /// RecordBatches with knowledge of influx metadata + async fn create_batches_with_influxtype_different_columns() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // chunk1 with 10 rows and 3 columns + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1)); + + // chunk2 having duplicate data with chunk 1 + // mmore columns + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_tag_column("tag2") + .with_i64_field_column("field_int2") + .with_five_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------------+------+------+--------------------------------+", + "| field_int | field_int2 | tag1 | tag2 | time |", + "+-----------+------------+------+------+--------------------------------+", + "| 1000 | 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |", + "| 10 | 10 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | 100 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 5 | 5 | MT | AL | 1970-01-01T00:00:00.000005Z |", + "+-----------+------------+------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + batches + } + + /// RecordBatches with knowledge of influx metadata + async fn create_batches_with_influxtype_different_columns_different_order( + ) -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // chunk1 with 10 rows and 3 columns + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_tag_column("tag2") + .with_ten_rows_of_data_some_duplicates(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+------+--------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+--------------------------------+", + "| 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | AL | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | CT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | AL | 1970-01-01T00:00:00.000000005Z |", + "+-----------+------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1.clone())); + + // chunk2 having duplicate data with chunk 1 + // mmore columns + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag2 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | CT | 1970-01-01T00:00:00.000001Z |", + "| 10 | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | MA | 1970-01-01T00:00:00.000000050Z |", + "| 5 | AL | 1970-01-01T00:00:00.000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + batches + } + + /// Has 2 tag columns; tag1 has a lower cardinality (3) than tag3 (4) + async fn create_batches_with_influxtype_different_cardinality() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_tag_column("tag3") + .with_four_rows_of_data(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1.clone())); + + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag3") + .with_i64_field_column("field_int") + .with_four_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + batches + } + + /// RecordBatches with knowledge of influx metadata + async fn create_batches_with_influxtype_same_columns_different_type() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // chunk1 + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_three_rows_of_data(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", + "| 10 | VT | 1970-01-01T00:00:00.000010Z |", + "| 70 | UT | 1970-01-01T00:00:00.000020Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1)); + + // chunk2 having duplicate data with chunk 1 + // mmore columns + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_u64_column("field_int") // u64 type but on existing name "field_int" used for i64 in chunk 1 + .with_tag_column("tag2") + .with_three_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag2 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | SC | 1970-01-01T00:00:00.000008Z |", + "| 10 | NC | 1970-01-01T00:00:00.000010Z |", + "| 70 | RI | 1970-01-01T00:00:00.000020Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + batches + } +} diff --git a/ingester2/src/persist/context.rs b/ingester2/src/persist/context.rs new file mode 100644 index 0000000000..d82a3a5784 --- /dev/null +++ b/ingester2/src/persist/context.rs @@ -0,0 +1,384 @@ +use std::sync::Arc; + +use backoff::Backoff; +use data_types::{ + CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, + TableId, +}; +use iox_catalog::interface::get_table_schema_by_id; +use iox_time::{SystemProvider, TimeProvider}; +use observability_deps::tracing::*; +use parking_lot::Mutex; +use parquet_file::metadata::IoxMetadata; +use schema::sort::SortKey; +use tokio::sync::Notify; +use uuid::Uuid; + +use crate::{ + buffer_tree::{ + namespace::NamespaceName, + partition::{persisting::PersistingData, PartitionData, SortKeyState}, + table::TableName, + }, + deferred_load::DeferredLoad, + persist::compact::{compact_persisting_batch, CompactedStream}, + TRANSITION_SHARD_ID, +}; + +use super::actor::Inner; + +/// An internal type that contains all necessary information to run a persist task. +/// +/// Used to communicate between actor handles & actor task. +#[derive(Debug)] +pub(super) struct PersistRequest { + complete: Arc, + partition: Arc>, + data: PersistingData, +} + +impl PersistRequest { + pub(super) fn new(partition: Arc>, data: PersistingData) -> Self { + Self { + complete: Arc::new(Notify::default()), + partition, + data, + } + } + + /// Return the partition ID of the persisting data. + pub(super) fn partition_id(&self) -> PartitionId { + self.data.partition_id() + } + + /// Obtain the completion notification handle for this request. + /// + /// This notification is fired once persistence is complete. + pub(super) fn complete_notification(&self) -> Arc { + Arc::clone(&self.complete) + } +} + +pub(super) struct Context { + partition: Arc>, + data: PersistingData, + inner: Arc, + + /// IDs loaded from the partition at construction time. + namespace_id: NamespaceId, + table_id: TableId, + partition_id: PartitionId, + + // The partition key for this partition + partition_key: PartitionKey, + + /// Deferred strings needed for persistence. + /// + /// These [`DeferredLoad`] are given a pre-fetch hint when this [`Context`] + /// is constructed to load them in the background (if not already resolved) + /// in order to avoid incurring the query latency when the values are + /// needed. + namespace_name: Arc>, + table_name: Arc>, + + /// The [`SortKey`] for the [`PartitionData`] at the time of [`Context`] + /// construction. + /// + /// The [`SortKey`] MUST NOT change during persistence, as updates to the + /// sort key are not commutative and thus require serialising. This + /// precludes parallel persists of partitions, unless they can be proven not + /// to need to update the sort key. + sort_key: SortKeyState, + + /// A notification signal to indicate to the caller that this partition has + /// persisted. + complete: Arc, +} + +impl Context { + /// Construct a persistence job [`Context`] from `req`. + /// + /// Locks the [`PartitionData`] in `req` to read various properties which + /// are then cached in the [`Context`]. + pub(super) fn new(req: PersistRequest, inner: Arc) -> Self { + let partition_id = req.data.partition_id(); + + // Obtain the partition lock and load the immutable values that will be + // used during this persistence. + let s = { + let complete = req.complete_notification(); + let p = Arc::clone(&req.partition); + let guard = p.lock(); + + assert_eq!(partition_id, guard.partition_id()); + + Self { + partition: req.partition, + data: req.data, + inner, + namespace_id: guard.namespace_id(), + table_id: guard.table_id(), + partition_id, + partition_key: guard.partition_key().clone(), + namespace_name: Arc::clone(guard.namespace_name()), + table_name: Arc::clone(guard.table_name()), + + // Technically the sort key isn't immutable, but MUST NOT change + // during the execution of this persist. + sort_key: guard.sort_key().clone(), + + complete, + } + }; + + // Pre-fetch the deferred values in a background thread (if not already + // resolved) + s.namespace_name.prefetch_now(); + s.table_name.prefetch_now(); + if let SortKeyState::Deferred(ref d) = s.sort_key { + d.prefetch_now(); + } + + s + } + + pub(super) async fn compact(&self) -> CompactedStream { + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + "compacting partition" + ); + + assert!(!self.data.record_batches().is_empty()); + + // Run a compaction sort the data and resolve any duplicate values. + // + // This demands the deferred load values and may have to wait for them + // to be loaded before compaction starts. + compact_persisting_batch( + &self.inner.exec, + self.sort_key.get().await, + self.table_name.get().await, + self.data.query_adaptor(), + ) + .await + .expect("unable to compact persisting batch") + } + + pub(super) async fn upload( + &self, + compacted: CompactedStream, + ) -> (Option, ParquetFileParams) { + let CompactedStream { + stream: record_stream, + catalog_sort_key_update, + data_sort_key, + } = compacted; + + // Generate a UUID to uniquely identify this parquet file in + // object storage. + let object_store_id = Uuid::new_v4(); + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + sort_key = %data_sort_key, + "uploading partition parquet" + ); + + // Construct the metadata for this parquet file. + let iox_metadata = IoxMetadata { + object_store_id, + creation_timestamp: SystemProvider::new().now(), + shard_id: TRANSITION_SHARD_ID, + namespace_id: self.namespace_id, + namespace_name: Arc::clone(&*self.namespace_name.get().await), + table_id: self.table_id, + table_name: Arc::clone(&*self.table_name.get().await), + partition_id: self.partition_id, + partition_key: self.partition_key.clone(), + max_sequence_number: SequenceNumber::new(0), // TODO: not ordered! + compaction_level: CompactionLevel::Initial, + sort_key: Some(data_sort_key), + }; + + // Save the compacted data to a parquet file in object storage. + // + // This call retries until it completes. + let (md, file_size) = self + .inner + .store + .upload(record_stream, &iox_metadata) + .await + .expect("unexpected fatal persist error"); + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + file_size, + "partition parquet uploaded" + ); + + // Read the table schema from the catalog to act as a map of column name + // -> column IDs. + let table_schema = Backoff::new(&Default::default()) + .retry_all_errors("get table schema", || async { + let mut repos = self.inner.catalog.repositories().await; + get_table_schema_by_id(self.table_id, repos.as_mut()).await + }) + .await + .expect("retry forever"); + + // Build the data that must be inserted into the parquet_files catalog + // table in order to make the file visible to queriers. + let parquet_table_data = + iox_metadata.to_parquet_file(self.partition_id, file_size, &md, |name| { + table_schema.columns.get(name).expect("unknown column").id + }); + + (catalog_sort_key_update, parquet_table_data) + } + + pub(crate) async fn update_database( + self, + sort_key_update: Option, + parquet_table_data: ParquetFileParams, + ) { + // Extract the object store ID to the local scope so that it can easily + // be referenced in debug logging to aid correlation of persist events + // for a specific file. + let object_store_id = parquet_table_data.object_store_id; + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + ?parquet_table_data, + ?sort_key_update, + "updating catalog" + ); + + // If necessary, update the partition sort key in the catalog and update + // the local cached copy in the PartitionData. + // + // This update MUST be made visibile before the parquet file, otherwise + // the consumer of the parquet file will observe an inconsistent sort + // key. + if let Some(new_sort_key) = sort_key_update { + let sort_key = new_sort_key.to_columns().collect::>(); + Backoff::new(&Default::default()) + .retry_all_errors("update_sort_key", || async { + let mut repos = self.inner.catalog.repositories().await; + let _partition = repos + .partitions() + .update_sort_key(self.partition_id, &sort_key) + .await?; + Ok(()) as Result<(), iox_catalog::interface::Error> + }) + .await + .expect("retry forever"); + + // Update the sort key in the partition cache. + let old_key; + { + let mut guard = self.partition.lock(); + old_key = guard.sort_key().clone(); + guard.update_sort_key(Some(new_sort_key.clone())); + }; + + // Assert the serialisation of sort key updates. + // + // Both of these get() should not block due to both of the + // values having been previously resolved / used. + assert_eq!(old_key.get().await, self.sort_key.get().await); + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + old_sort_key = ?sort_key, + %new_sort_key, + "adjusted partition sort key" + ); + } + + // Add the parquet file to the catalog. + // + // This has the effect of allowing the queriers to "discover" the + // parquet file by polling / querying the catalog. + Backoff::new(&Default::default()) + .retry_all_errors("add parquet file to catalog", || async { + let mut repos = self.inner.catalog.repositories().await; + let parquet_file = repos + .parquet_files() + .create(parquet_table_data.clone()) + .await?; + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + ?parquet_table_data, + parquet_file_id=?parquet_file.id, + "parquet file added to catalog" + ); + + // compiler insisted on getting told the type of the error :shrug: + Ok(()) as Result<(), iox_catalog::interface::Error> + }) + .await + .expect("retry forever"); + + // Mark the partition as having completed persistence, causing it to + // release the reference to the in-flight persistence data it is + // holding. + // + // This SHOULD cause the data to be dropped, but there MAY be ongoing + // queries that currently hold a reference to the data. In either case, + // the persisted data will be dropped "shortly". + self.partition.lock().mark_persisted(self.data); + + info!( + %object_store_id, + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + "persisted partition" + ); + + // Notify all observers of this persistence task + self.complete.notify_waiters(); + } +} + +// TODO(test): persist +// TODO(test): persist completion notification diff --git a/ingester2/src/persist/handle.rs b/ingester2/src/persist/handle.rs new file mode 100644 index 0000000000..ad03ed23f5 --- /dev/null +++ b/ingester2/src/persist/handle.rs @@ -0,0 +1,173 @@ +use std::sync::Arc; + +use iox_catalog::interface::Catalog; +use iox_query::exec::Executor; +use parking_lot::Mutex; +use parquet_file::storage::ParquetStorage; +use thiserror::Error; +use tokio::sync::{ + mpsc::{self}, + Notify, +}; + +use crate::buffer_tree::partition::{persisting::PersistingData, PartitionData}; + +use super::{actor::PersistActor, context::PersistRequest}; + +#[derive(Debug, Error)] +pub(crate) enum PersistError { + #[error("persist queue is full")] + QueueFull, +} + +/// A persistence task submission handle. +/// +/// This type is cheap to clone to share across threads. +/// +/// # Usage +/// +/// The caller should construct an [`PersistHandle`] and [`PersistActor`] by +/// calling [`PersistHandle::new()`], and run the provided [`PersistActor`] +/// instance in another thread / task (by calling [`PersistActor::run()`]). +/// +/// From this point on, the caller interacts only with the [`PersistHandle`] +/// which can be cheaply cloned and passed over thread/task boundaries to +/// enqueue persist tasks. +/// +/// Dropping all [`PersistHandle`] instances stops the [`PersistActor`], which +/// immediately stops all workers. +/// +/// # Topology +/// +/// The persist actor is uses an internal work group to parallelise persistence +/// operations up to `n_workers` number of parallel tasks. +/// +/// Submitting a persistence request places the job into a bounded queue, +/// providing a buffer for persistence requests to wait for an available worker. +/// +/// Two types of queue exists: +/// +/// * Submission queue (bounded by `submission_queue_depth`) +/// * `n_worker` number of worker queues (bounded by `worker_queue_depth`) +/// +/// ```text +/// ┌─────────────┐ +/// │PersistHandle├┐ +/// └┬────────────┘├┐ +/// └┬────────────┘│ +/// └─────┬───────┘ +/// │ +/// submission_queue_depth +/// │ +/// ▼ +/// ╔═══════════════╗ +/// ║ PersistActor ║ +/// ╚═══════════════╝ +/// │ +/// ┌────────────────┼────────────────┐ +/// │ │ │ +/// │ │ │ +/// worker_queue_depth worker_queue_depth worker_queue_depth +/// │ │ │ +/// ▼ ▼ ▼ +/// ┌────────────┐ ┌────────────┐ ┌────────────┐ +/// │ Worker 1 │ │ Worker 2 │ │ Worker N │ +/// └────────────┘ └────────────┘ └────────────┘ +/// ▲ ▲ ▲ +/// │ +/// │ ▼ │ +/// ┌ ─ ─ ─ ─ ─ ─ +/// └ ─ ─ ─ ─▶ Executor │◀ ─ ─ ─ ─ ┘ +/// └ ─ ─ ─ ─ ─ ─ +/// ``` +/// +/// Compaction is performed in the provided [`Executor`] re-org thread-pool and +/// is shared across all workers. +/// +/// At any one time, the number of outstanding persist tasks is bounded by: +/// +/// ```text +/// +/// submission_queue_depth + (workers * worker_queue_depth) +/// +/// ``` +/// +/// At any one time, there may be at most `submission_queue_depth + +/// worker_queue_depth` number of outstanding persist jobs for a single +/// partition. +/// +/// These two queues are used to decouple submissions from individual workers - +/// this prevents a "hot" / backlogged worker with a full worker queue from +/// blocking tasks from being passed through to workers with spare capacity. +/// +/// # Parallelism & Partition Serialisation +/// +/// Persistence jobs are parallelised across partitions, with up to at most +/// `n_worker` parallel persist executions at once. +/// +/// Because updates of a partition's [`SortKey`] are not commutative, they must +/// be serialised. For this reason, persist operations for given partition are +/// always placed in the same worker queue, ensuring they execute sequentially. +/// +/// [`SortKey`]: schema::sort::SortKey +#[derive(Debug, Clone)] +pub(crate) struct PersistHandle { + tx: mpsc::Sender, +} + +impl PersistHandle { + /// Initialise a new persist actor & obtain the first handle. + /// + /// The caller should call [`PersistActor::run()`] in a separate + /// thread / task to start the persistence executor. + pub(crate) fn new( + submission_queue_depth: usize, + n_workers: usize, + worker_queue_depth: usize, + exec: Executor, + store: ParquetStorage, + catalog: Arc, + ) -> (Self, PersistActor) { + let (tx, rx) = mpsc::channel(submission_queue_depth); + + let actor = PersistActor::new(rx, exec, store, catalog, n_workers, worker_queue_depth); + + (Self { tx }, actor) + } + + /// Place `data` from `partition` into the persistence queue. + /// + /// This call (asynchronously) waits for space to become available in the + /// submission queue. + /// + /// Once persistence is complete, the partition will be locked and the sort + /// key will be updated, and [`PartitionData::mark_persisted()`] is called + /// with `data`. + /// + /// Once all persistence related tasks are complete, the returned [`Notify`] + /// broadcasts a notification. + /// + /// # Panics + /// + /// Panics (asynchronously) if the [`PartitionData`]'s sort key is updated + /// between persistence starting and ending. + /// + /// This will panic (asynchronously) if `data` was not from `partition` or + /// all worker threads have stopped. + pub(crate) async fn queue_persist( + &self, + partition: Arc>, + data: PersistingData, + ) -> Arc { + // Build the persist task request + let r = PersistRequest::new(partition, data); + let notify = r.complete_notification(); + + self.tx + .send(r) + .await + .expect("no persist worker tasks running"); + + notify + } +} diff --git a/ingester2/src/persist/mod.rs b/ingester2/src/persist/mod.rs new file mode 100644 index 0000000000..b9365e2567 --- /dev/null +++ b/ingester2/src/persist/mod.rs @@ -0,0 +1,4 @@ +mod actor; +pub(super) mod compact; +mod context; +mod handle;