From f5246876028e1476d609892d071d7c5a5a61df18 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 1 Dec 2022 10:37:25 +0100 Subject: [PATCH] feat(ingester2): parallel partition persistence Implements actor-based, parallel persistence in ingester2 with controllable fan-out parallelism and queue depths. This implementation encapsulates the complexity of persistence, queuing and parallelism - the caller simply uses the handle to persist a partition, while the actor handles fan-out to a set of persistence workers, compaction in a separate thread-pool, and optional completion notifications. By consistently hashing persist jobs onto workers, parallelism is achieved across partitions, but serialisation of partition persists is enforced so that the sort key update is correctly serialised. --- Cargo.lock | 2 + ingester2/Cargo.toml | 2 + ingester2/src/buffer_tree/partition.rs | 2 +- ingester2/src/lib.rs | 1 + ingester2/src/persist/actor.rs | 104 +++ ingester2/src/persist/compact.rs | 1114 ++++++++++++++++++++++++ ingester2/src/persist/context.rs | 384 ++++++++ ingester2/src/persist/handle.rs | 173 ++++ ingester2/src/persist/mod.rs | 4 + 9 files changed, 1785 insertions(+), 1 deletion(-) create mode 100644 ingester2/src/persist/actor.rs create mode 100644 ingester2/src/persist/compact.rs create mode 100644 ingester2/src/persist/context.rs create mode 100644 ingester2/src/persist/handle.rs create mode 100644 ingester2/src/persist/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c9b8d8c2b6..38e191c851 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2495,6 +2495,7 @@ dependencies = [ "observability_deps", "once_cell", "parking_lot 0.12.1", + "parquet_file", "paste", "pin-project", "predicate", @@ -2502,6 +2503,7 @@ dependencies = [ "rand", "schema", "service_grpc_catalog", + "sharder", "tempfile", "test_helpers", "thiserror", diff --git a/ingester2/Cargo.toml b/ingester2/Cargo.toml index 312bf3d456..a9a7bc5a84 100644 --- a/ingester2/Cargo.toml +++ b/ingester2/Cargo.toml @@ -31,12 +31,14 @@ object_store = "0.5.1" observability_deps = { version = "0.1.0", path = "../observability_deps" } once_cell = "1.16.0" parking_lot = "0.12.1" +parquet_file = { version = "0.1.0", path = "../parquet_file" } pin-project = "1.0.12" predicate = { version = "0.1.0", path = "../predicate" } prost = { version = "0.11.2", default-features = false, features = ["std"] } rand = "0.8.5" schema = { version = "0.1.0", path = "../schema" } service_grpc_catalog = { version = "0.1.0", path = "../service_grpc_catalog" } +sharder = { version = "0.1.0", path = "../sharder" } thiserror = "1.0.37" tokio = { version = "1.22", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } tonic = "0.8.3" diff --git a/ingester2/src/buffer_tree/partition.rs b/ingester2/src/buffer_tree/partition.rs index a22bbb32e3..63e5f19b34 100644 --- a/ingester2/src/buffer_tree/partition.rs +++ b/ingester2/src/buffer_tree/partition.rs @@ -15,7 +15,7 @@ use super::{namespace::NamespaceName, table::TableName}; use crate::{deferred_load::DeferredLoad, query_adaptor::QueryAdaptor}; mod buffer; -mod persisting; +pub(crate) mod persisting; pub(crate) mod resolver; /// The load state of the [`SortKey`] for a given partition. diff --git a/ingester2/src/lib.rs b/ingester2/src/lib.rs index 0235aace75..07c662e1b3 100644 --- a/ingester2/src/lib.rs +++ b/ingester2/src/lib.rs @@ -75,6 +75,7 @@ mod arcmap; mod buffer_tree; mod deferred_load; mod dml_sink; +mod persist; mod query; mod query_adaptor; pub(crate) mod server; diff --git a/ingester2/src/persist/actor.rs b/ingester2/src/persist/actor.rs new file mode 100644 index 0000000000..327d4c2450 --- /dev/null +++ b/ingester2/src/persist/actor.rs @@ -0,0 +1,104 @@ +use std::sync::Arc; + +use iox_catalog::interface::Catalog; +use iox_query::exec::Executor; +use parquet_file::storage::ParquetStorage; +use sharder::JumpHash; +use tokio::{sync::mpsc, task::JoinHandle}; + +use super::context::{Context, PersistRequest}; + +/// An actor implementation that fans out incoming persistence jobs to a set of +/// workers. +/// +/// See [`PersistHandle`]. +/// +/// [`PersistHandle`]: super::handle::PersistHandle +#[must_use = "PersistActor must be ran by calling run()"] +pub(crate) struct PersistActor { + rx: mpsc::Receiver, + + /// THe state/dependencies shared across all worker tasks. + inner: Arc, + + /// A consistent hash implementation used to consistently map buffers from + /// one partition to the same worker queue. + /// + /// This ensures persistence is serialised per-partition, but in parallel + /// across partitions (up to the number of worker tasks). + persist_queues: JumpHash>, + + /// Task handles for the worker tasks, aborted on drop of this + /// [`PersistActor`]. + tasks: Vec>, +} + +impl Drop for PersistActor { + fn drop(&mut self) { + // Stop all background tasks when the actor goes out of scope. + self.tasks.iter().for_each(|v| v.abort()) + } +} + +impl PersistActor { + pub(super) fn new( + rx: mpsc::Receiver, + exec: Executor, + store: ParquetStorage, + catalog: Arc, + workers: usize, + worker_queue_depth: usize, + ) -> Self { + let inner = Arc::new(Inner { + exec, + store, + catalog, + }); + + let (tx_handles, tasks): (Vec<_>, Vec<_>) = (0..workers) + .map(|_| { + let inner = Arc::clone(&inner); + let (tx, rx) = mpsc::channel(worker_queue_depth); + (tx, tokio::spawn(run_task(inner, rx))) + }) + .unzip(); + + // TODO(test): N workers running + // TODO(test): queue depths + + Self { + rx, + inner, + persist_queues: JumpHash::new(tx_handles), + tasks, + } + } + + /// Execute this actor task and block until all [`PersistHandle`] are + /// dropped. + /// + /// [`PersistHandle`]: super::handle::PersistHandle + pub(crate) async fn run(mut self) { + while let Some(req) = self.rx.recv().await { + let tx = self.persist_queues.hash(req.partition_id()); + tx.send(req).await.expect("persist worker has stopped;") + } + } +} + +pub(super) struct Inner { + pub(super) exec: Executor, + pub(super) store: ParquetStorage, + pub(super) catalog: Arc, +} + +async fn run_task(inner: Arc, mut rx: mpsc::Receiver) { + while let Some(req) = rx.recv().await { + let ctx = Context::new(req, Arc::clone(&inner)); + + let compacted = ctx.compact().await; + let (sort_key_update, parquet_table_data) = ctx.upload(compacted).await; + ctx.update_database(sort_key_update, parquet_table_data) + .await; + } +} diff --git a/ingester2/src/persist/compact.rs b/ingester2/src/persist/compact.rs new file mode 100644 index 0000000000..effa1da88a --- /dev/null +++ b/ingester2/src/persist/compact.rs @@ -0,0 +1,1114 @@ +use std::sync::Arc; + +use datafusion::physical_plan::SendableRecordBatchStream; +use iox_query::{ + exec::{Executor, ExecutorType}, + frontend::reorg::ReorgPlanner, + QueryChunk, QueryChunkMeta, +}; +use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey}; + +use crate::{buffer_tree::table::TableName, query_adaptor::QueryAdaptor}; + +/// Result of calling [`compact_persisting_batch`] +pub(super) struct CompactedStream { + /// A stream of compacted, deduplicated + /// [`RecordBatch`](arrow::record_batch::RecordBatch)es + pub(super) stream: SendableRecordBatchStream, + + /// The sort key value the catalog should be updated to, if any. + /// + /// If returned, the compaction required extending the partition's + /// [`SortKey`] (typically because new columns were in this parquet file + /// that were not in previous files). + pub(super) catalog_sort_key_update: Option, + + /// The sort key to be used for compaction. + /// + /// This should be used in the [`IoxMetadata`] for the compacted data, and + /// may be a subset of the full sort key contained in + /// [`Self::catalog_sort_key_update`] (or the existing sort key in the + /// catalog). + /// + /// [`IoxMetadata`]: parquet_file::metadata::IoxMetadata + pub(super) data_sort_key: SortKey, +} + +impl std::fmt::Debug for CompactedStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CompactedStream") + .field("stream", &"") + .field("data_sort_key", &self.data_sort_key) + .field("catalog_sort_key_update", &self.catalog_sort_key_update) + .finish() + } +} + +/// Compact a given batch into a [`CompactedStream`] or `None` if there is no +/// data to compact, returning an updated sort key, if any. +pub(super) async fn compact_persisting_batch( + executor: &Executor, + sort_key: Option, + table_name: TableName, + batch: QueryAdaptor, +) -> Result { + assert!(!batch.record_batches().is_empty()); + + // Get sort key from the catalog or compute it from + // cardinality. + let (data_sort_key, catalog_sort_key_update) = match sort_key { + Some(sk) => { + // Remove any columns not present in this data from the + // sort key that will be used to compact this parquet file + // (and appear in its metadata) + // + // If there are any new columns, add them to the end of the sort key in the catalog and + // return that to be updated in the catalog. + adjust_sort_key_columns(&sk, &batch.schema().primary_key()) + } + None => { + let sort_key = compute_sort_key( + batch.schema().as_ref(), + batch.record_batches().iter().map(|sb| sb.as_ref()), + ); + // Use the sort key computed from the cardinality as the sort key for this parquet + // file's metadata, also return the sort key to be stored in the catalog + (sort_key.clone(), Some(sort_key)) + } + }; + + let batch = Arc::new(batch); + + // Build logical plan for compaction + let ctx = executor.new_context(ExecutorType::Reorg); + let logical_plan = ReorgPlanner::new(ctx.child_ctx("ReorgPlanner")) + .compact_plan( + table_name.into(), + batch.schema(), + [batch as Arc], + data_sort_key.clone(), + ) + .unwrap(); + + // Build physical plan + let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap(); + + // Execute the plan and return the compacted stream + let output_stream = ctx.execute_stream(physical_plan).await.unwrap(); + + Ok(CompactedStream { + stream: output_stream, + catalog_sort_key_update, + data_sort_key, + }) +} + +#[cfg(test)] +mod tests { + use arrow::record_batch::RecordBatch; + use arrow_util::assert_batches_eq; + use data_types::PartitionId; + use iox_query::test::{raw_data, TestChunk}; + use mutable_batch_lp::lines_to_batches; + use schema::Projection; + + use super::*; + + // this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782 + // where if sending in a single row it would compact into an output of two batches, one of + // which was empty, which would cause this to panic. + #[tokio::test] + async fn test_compact_batch_on_one_record_batch_with_one_row() { + // create input data + let batch = lines_to_batches("cpu bar=2 20", 0) + .unwrap() + .get("cpu") + .unwrap() + .to_arrow(Projection::All) + .unwrap(); + + let batch = QueryAdaptor::new(PartitionId::new(1), vec![Arc::new(batch)]); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["time"]; + assert_eq!(expected_pk, pk); + + // compact + let exc = Executor::new(1); + let CompactedStream { stream, .. } = + compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on tag1 & time + let expected_data = vec![ + "+-----+--------------------------------+", + "| bar | time |", + "+-----+--------------------------------+", + "| 2 | 1970-01-01T00:00:00.000000020Z |", + "+-----+--------------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + } + + #[tokio::test] + async fn test_compact_batch_on_one_record_batch_no_dupilcates() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_one_record_batch_with_influxtype_no_duplicates().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "time"]; + assert_eq!(expected_pk, pk); + + // compact + let exc = Executor::new(1); + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on tag1 & time + let expected_data = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 70 | UT | 1970-01-01T00:00:00.000020Z |", + "| 10 | VT | 1970-01-01T00:00:00.000010Z |", + "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!(data_sort_key, SortKey::from_columns(["tag1", "time"])); + + assert_eq!( + catalog_sort_key_update.unwrap(), + SortKey::from_columns(["tag1", "time"]) + ); + } + + #[tokio::test] + async fn test_compact_batch_no_sort_key() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag3", "time"]; + assert_eq!(expected_pk, pk); + + let exc = Executor::new(1); + + // NO SORT KEY from the catalog here, first persisting batch + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch(&exc, Some(SortKey::empty()), "test_table".into(), batch) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on the computed sort key of tag1, tag3, & time + let expected_data = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!( + data_sort_key, + SortKey::from_columns(["tag1", "tag3", "time"]) + ); + + assert_eq!( + catalog_sort_key_update.unwrap(), + SortKey::from_columns(["tag1", "tag3", "time"]) + ); + } + + #[tokio::test] + async fn test_compact_batch_with_specified_sort_key() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag3", "time"]; + assert_eq!(expected_pk, pk); + + let exc = Executor::new(1); + + // SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog + // this is NOT what the computed sort key would be based on this data's cardinality + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch( + &exc, + Some(SortKey::from_columns(["tag3", "tag1", "time"])), + "test_table".into(), + batch, + ) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on the specified sort key of tag3, tag1, & + // time + let expected_data = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!( + data_sort_key, + SortKey::from_columns(["tag3", "tag1", "time"]) + ); + + // The sort key does not need to be updated in the catalog + assert!(catalog_sort_key_update.is_none()); + } + + #[tokio::test] + async fn test_compact_batch_new_column_for_sort_key() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag3", "time"]; + assert_eq!(expected_pk, pk); + + let exc = Executor::new(1); + + // SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog + // this is NOT what the computed sort key would be based on this data's cardinality + // The new column, tag1, should get added just before the time column + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch( + &exc, + Some(SortKey::from_columns(["tag3", "time"])), + "test_table".into(), + batch, + ) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on the specified sort key of tag3, tag1, & + // time + let expected_data = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!( + data_sort_key, + SortKey::from_columns(["tag3", "tag1", "time"]) + ); + + // The sort key in the catalog needs to be updated to include the new column + assert_eq!( + catalog_sort_key_update.unwrap(), + SortKey::from_columns(["tag3", "tag1", "time"]) + ); + } + + #[tokio::test] + async fn test_compact_batch_missing_column_for_sort_key() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag3", "time"]; + assert_eq!(expected_pk, pk); + + let exc = Executor::new(1); + + // SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog + // this is NOT what the computed sort key would be based on this data's cardinality + // This contains a sort key, "tag4", that doesn't appear in the data. + let CompactedStream { + stream, + data_sort_key, + catalog_sort_key_update, + } = compact_persisting_batch( + &exc, + Some(SortKey::from_columns(["tag3", "tag1", "tag4", "time"])), + "test_table".into(), + batch, + ) + .await + .unwrap(); + + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .expect("should execute plan"); + + // verify compacted data + // should be the same as the input but sorted on the specified sort key of tag3, tag1, & + // time + let expected_data = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected_data, &output_batches); + + assert_eq!( + data_sort_key, + SortKey::from_columns(["tag3", "tag1", "time"]) + ); + + // The sort key in the catalog should NOT get a new value + assert!(catalog_sort_key_update.is_none()); + } + + #[tokio::test] + async fn test_compact_one_row_batch() { + test_helpers::maybe_start_logging(); + + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_one_row_record_batch_with_influxtype().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream.stream) + .await + .unwrap(); + + // verify no empty record batches - bug #3782 + assert_eq!(output_batches.len(), 1); + + // verify compacted data + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | MA | 1970-01-01T00:00:00.000001Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + async fn test_compact_one_batch_with_duplicates() { + // create input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_one_record_batch_with_influxtype_duplicates().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap(); + let output_batches = datafusion::physical_plan::common::collect(stream.stream) + .await + .unwrap(); + // verify no empty record bacthes - bug #3782 + assert_eq!(output_batches.len(), 2); + assert_eq!(output_batches[0].num_rows(), 6); + assert_eq!(output_batches[1].num_rows(), 1); + + // verify compacted data + // data is sorted and all duplicates are removed + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + async fn test_compact_many_batches_same_columns_with_duplicates() { + // create many-batches input data + let batch = QueryAdaptor::new(PartitionId::new(1), create_batches_with_influxtype().await); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap() + .stream; + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + // verify compacted data + // data is sorted and all duplicates are removed + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 5 | MT | 1970-01-01T00:00:00.000005Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + async fn test_compact_many_batches_different_columns_with_duplicates() { + // create many-batches input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_columns().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag2", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap() + .stream; + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + // verify compacted data + // data is sorted and all duplicates are removed + let expected = vec![ + "+-----------+------------+------+------+--------------------------------+", + "| field_int | field_int2 | tag1 | tag2 | time |", + "+-----------+------------+------+------+--------------------------------+", + "| 10 | | AL | | 1970-01-01T00:00:00.000000050Z |", + "| 100 | 100 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 70 | | CT | | 1970-01-01T00:00:00.000000100Z |", + "| 70 | | CT | | 1970-01-01T00:00:00.000000500Z |", + "| 70 | 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |", + "| 30 | | MT | | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | | MT | | 1970-01-01T00:00:00.000001Z |", + "| 1000 | | MT | | 1970-01-01T00:00:00.000002Z |", + "| 20 | | MT | | 1970-01-01T00:00:00.000007Z |", + "| 5 | 5 | MT | AL | 1970-01-01T00:00:00.000005Z |", + "| 10 | 10 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 1000 | 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |", + "+-----------+------------+------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + async fn test_compact_many_batches_different_columns_different_order_with_duplicates() { + // create many-batches input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_different_columns_different_order().await, + ); + + // verify PK + let schema = batch.schema(); + let pk = schema.primary_key(); + let expected_pk = vec!["tag1", "tag2", "time"]; + assert_eq!(expected_pk, pk); + + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); + + // compact + let exc = Executor::new(1); + let stream = compact_persisting_batch(&exc, Some(sort_key), "test_table".into(), batch) + .await + .unwrap() + .stream; + let output_batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + // verify compacted data + // data is sorted and all duplicates are removed + // CORRECT RESULT + let expected = vec![ + "+-----------+------+------+--------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+--------------------------------+", + "| 5 | | AL | 1970-01-01T00:00:00.000005Z |", + "| 10 | | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | | CT | 1970-01-01T00:00:00.000000100Z |", + "| 1000 | | CT | 1970-01-01T00:00:00.000001Z |", + "| 100 | | MA | 1970-01-01T00:00:00.000000050Z |", + "| 10 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |", + "| 70 | CT | CT | 1970-01-01T00:00:00.000000500Z |", + "| 30 | MT | AL | 1970-01-01T00:00:00.000000005Z |", + "| 20 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |", + "| 1000 | MT | CT | 1970-01-01T00:00:00.000002Z |", + "+-----------+------+------+--------------------------------+", + ]; + + assert_batches_eq!(&expected, &output_batches); + } + + #[tokio::test] + #[should_panic(expected = "Schemas compatible")] + async fn test_compact_many_batches_same_columns_different_types() { + // create many-batches input data + let batch = QueryAdaptor::new( + PartitionId::new(1), + create_batches_with_influxtype_same_columns_different_type().await, + ); + + // the schema merge should throw a panic + batch.schema(); + } + + async fn create_one_row_record_batch_with_influxtype() -> Vec> { + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_one_row_of_data(), + ); + let batches = raw_data(&[chunk1]).await; + + // Make sure all data in one record batch + assert_eq!(batches.len(), 1); + + // verify data + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | MA | 1970-01-01T00:00:00.000001Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &batches); + + let batches: Vec<_> = batches.iter().map(|r| Arc::new(r.clone())).collect(); + batches + } + + async fn create_one_record_batch_with_influxtype_no_duplicates() -> Vec> { + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_three_rows_of_data(), + ); + let batches = raw_data(&[chunk1]).await; + + // Make sure all data in one record batch + assert_eq!(batches.len(), 1); + + // verify data + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", + "| 10 | VT | 1970-01-01T00:00:00.000010Z |", + "| 70 | UT | 1970-01-01T00:00:00.000020Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &batches); + + let batches: Vec<_> = batches.iter().map(|r| Arc::new(r.clone())).collect(); + batches + } + + async fn create_one_record_batch_with_influxtype_duplicates() -> Vec> { + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() //_with_full_stats( + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batches = raw_data(&[chunk1]).await; + + // Make sure all data in one record batch + assert_eq!(batches.len(), 1); + + // verify data + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &batches); + + let batches: Vec<_> = batches.iter().map(|r| Arc::new(r.clone())).collect(); + batches + } + + /// RecordBatches with knowledge of influx metadata + async fn create_batches_with_influxtype() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // chunk1 with 10 rows and 3 columns + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1)); + + // chunk2 having duplicate data with chunk 1 + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + // verify data from both batches + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000005Z |", + "+-----------+------+--------------------------------+", + ]; + let b: Vec<_> = batches.iter().map(|b| (**b).clone()).collect(); + assert_batches_eq!(&expected, &b); + + batches + } + + /// RecordBatches with knowledge of influx metadata + async fn create_batches_with_influxtype_different_columns() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // chunk1 with 10 rows and 3 columns + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | 1970-01-01T00:00:00.000000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1)); + + // chunk2 having duplicate data with chunk 1 + // mmore columns + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_tag_column("tag2") + .with_i64_field_column("field_int2") + .with_five_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------------+------+------+--------------------------------+", + "| field_int | field_int2 | tag1 | tag2 | time |", + "+-----------+------------+------+------+--------------------------------+", + "| 1000 | 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |", + "| 10 | 10 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | 100 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 5 | 5 | MT | AL | 1970-01-01T00:00:00.000005Z |", + "+-----------+------------+------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + batches + } + + /// RecordBatches with knowledge of influx metadata + async fn create_batches_with_influxtype_different_columns_different_order( + ) -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // chunk1 with 10 rows and 3 columns + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_tag_column("tag2") + .with_ten_rows_of_data_some_duplicates(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+------+--------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+--------------------------------+", + "| 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |", + "| 10 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 5 | MT | AL | 1970-01-01T00:00:00.000000005Z |", + "| 1000 | MT | CT | 1970-01-01T00:00:00.000002Z |", + "| 20 | MT | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | CT | 1970-01-01T00:00:00.000000500Z |", + "| 10 | AL | MA | 1970-01-01T00:00:00.000000050Z |", + "| 30 | MT | AL | 1970-01-01T00:00:00.000000005Z |", + "+-----------+------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1.clone())); + + // chunk2 having duplicate data with chunk 1 + // mmore columns + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------+--------------------------------+", + "| field_int | tag2 | time |", + "+-----------+------+--------------------------------+", + "| 1000 | CT | 1970-01-01T00:00:00.000001Z |", + "| 10 | AL | 1970-01-01T00:00:00.000007Z |", + "| 70 | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | MA | 1970-01-01T00:00:00.000000050Z |", + "| 5 | AL | 1970-01-01T00:00:00.000005Z |", + "+-----------+------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + batches + } + + /// Has 2 tag columns; tag1 has a lower cardinality (3) than tag3 (4) + async fn create_batches_with_influxtype_different_cardinality() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_tag_column("tag3") + .with_four_rows_of_data(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1.clone())); + + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag3") + .with_i64_field_column("field_int") + .with_four_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------+------+-----------------------------+", + "| field_int | tag1 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |", + "| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |", + "| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |", + "| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |", + "+-----------+------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + batches + } + + /// RecordBatches with knowledge of influx metadata + async fn create_batches_with_influxtype_same_columns_different_type() -> Vec> { + // Use the available TestChunk to create chunks and then convert them to raw RecordBatches + let mut batches = vec![]; + + // chunk1 + let chunk1 = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_three_rows_of_data(), + ); + let batch1 = raw_data(&[chunk1]).await[0].clone(); + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", + "| 10 | VT | 1970-01-01T00:00:00.000010Z |", + "| 70 | UT | 1970-01-01T00:00:00.000020Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &[batch1.clone()]); + batches.push(Arc::new(batch1)); + + // chunk2 having duplicate data with chunk 1 + // mmore columns + let chunk2 = Arc::new( + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_u64_column("field_int") // u64 type but on existing name "field_int" used for i64 in chunk 1 + .with_tag_column("tag2") + .with_three_rows_of_data(), + ); + let batch2 = raw_data(&[chunk2]).await[0].clone(); + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag2 | time |", + "+-----------+------+-----------------------------+", + "| 1000 | SC | 1970-01-01T00:00:00.000008Z |", + "| 10 | NC | 1970-01-01T00:00:00.000010Z |", + "| 70 | RI | 1970-01-01T00:00:00.000020Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &[batch2.clone()]); + batches.push(Arc::new(batch2)); + + batches + } +} diff --git a/ingester2/src/persist/context.rs b/ingester2/src/persist/context.rs new file mode 100644 index 0000000000..d82a3a5784 --- /dev/null +++ b/ingester2/src/persist/context.rs @@ -0,0 +1,384 @@ +use std::sync::Arc; + +use backoff::Backoff; +use data_types::{ + CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, + TableId, +}; +use iox_catalog::interface::get_table_schema_by_id; +use iox_time::{SystemProvider, TimeProvider}; +use observability_deps::tracing::*; +use parking_lot::Mutex; +use parquet_file::metadata::IoxMetadata; +use schema::sort::SortKey; +use tokio::sync::Notify; +use uuid::Uuid; + +use crate::{ + buffer_tree::{ + namespace::NamespaceName, + partition::{persisting::PersistingData, PartitionData, SortKeyState}, + table::TableName, + }, + deferred_load::DeferredLoad, + persist::compact::{compact_persisting_batch, CompactedStream}, + TRANSITION_SHARD_ID, +}; + +use super::actor::Inner; + +/// An internal type that contains all necessary information to run a persist task. +/// +/// Used to communicate between actor handles & actor task. +#[derive(Debug)] +pub(super) struct PersistRequest { + complete: Arc, + partition: Arc>, + data: PersistingData, +} + +impl PersistRequest { + pub(super) fn new(partition: Arc>, data: PersistingData) -> Self { + Self { + complete: Arc::new(Notify::default()), + partition, + data, + } + } + + /// Return the partition ID of the persisting data. + pub(super) fn partition_id(&self) -> PartitionId { + self.data.partition_id() + } + + /// Obtain the completion notification handle for this request. + /// + /// This notification is fired once persistence is complete. + pub(super) fn complete_notification(&self) -> Arc { + Arc::clone(&self.complete) + } +} + +pub(super) struct Context { + partition: Arc>, + data: PersistingData, + inner: Arc, + + /// IDs loaded from the partition at construction time. + namespace_id: NamespaceId, + table_id: TableId, + partition_id: PartitionId, + + // The partition key for this partition + partition_key: PartitionKey, + + /// Deferred strings needed for persistence. + /// + /// These [`DeferredLoad`] are given a pre-fetch hint when this [`Context`] + /// is constructed to load them in the background (if not already resolved) + /// in order to avoid incurring the query latency when the values are + /// needed. + namespace_name: Arc>, + table_name: Arc>, + + /// The [`SortKey`] for the [`PartitionData`] at the time of [`Context`] + /// construction. + /// + /// The [`SortKey`] MUST NOT change during persistence, as updates to the + /// sort key are not commutative and thus require serialising. This + /// precludes parallel persists of partitions, unless they can be proven not + /// to need to update the sort key. + sort_key: SortKeyState, + + /// A notification signal to indicate to the caller that this partition has + /// persisted. + complete: Arc, +} + +impl Context { + /// Construct a persistence job [`Context`] from `req`. + /// + /// Locks the [`PartitionData`] in `req` to read various properties which + /// are then cached in the [`Context`]. + pub(super) fn new(req: PersistRequest, inner: Arc) -> Self { + let partition_id = req.data.partition_id(); + + // Obtain the partition lock and load the immutable values that will be + // used during this persistence. + let s = { + let complete = req.complete_notification(); + let p = Arc::clone(&req.partition); + let guard = p.lock(); + + assert_eq!(partition_id, guard.partition_id()); + + Self { + partition: req.partition, + data: req.data, + inner, + namespace_id: guard.namespace_id(), + table_id: guard.table_id(), + partition_id, + partition_key: guard.partition_key().clone(), + namespace_name: Arc::clone(guard.namespace_name()), + table_name: Arc::clone(guard.table_name()), + + // Technically the sort key isn't immutable, but MUST NOT change + // during the execution of this persist. + sort_key: guard.sort_key().clone(), + + complete, + } + }; + + // Pre-fetch the deferred values in a background thread (if not already + // resolved) + s.namespace_name.prefetch_now(); + s.table_name.prefetch_now(); + if let SortKeyState::Deferred(ref d) = s.sort_key { + d.prefetch_now(); + } + + s + } + + pub(super) async fn compact(&self) -> CompactedStream { + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + "compacting partition" + ); + + assert!(!self.data.record_batches().is_empty()); + + // Run a compaction sort the data and resolve any duplicate values. + // + // This demands the deferred load values and may have to wait for them + // to be loaded before compaction starts. + compact_persisting_batch( + &self.inner.exec, + self.sort_key.get().await, + self.table_name.get().await, + self.data.query_adaptor(), + ) + .await + .expect("unable to compact persisting batch") + } + + pub(super) async fn upload( + &self, + compacted: CompactedStream, + ) -> (Option, ParquetFileParams) { + let CompactedStream { + stream: record_stream, + catalog_sort_key_update, + data_sort_key, + } = compacted; + + // Generate a UUID to uniquely identify this parquet file in + // object storage. + let object_store_id = Uuid::new_v4(); + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + sort_key = %data_sort_key, + "uploading partition parquet" + ); + + // Construct the metadata for this parquet file. + let iox_metadata = IoxMetadata { + object_store_id, + creation_timestamp: SystemProvider::new().now(), + shard_id: TRANSITION_SHARD_ID, + namespace_id: self.namespace_id, + namespace_name: Arc::clone(&*self.namespace_name.get().await), + table_id: self.table_id, + table_name: Arc::clone(&*self.table_name.get().await), + partition_id: self.partition_id, + partition_key: self.partition_key.clone(), + max_sequence_number: SequenceNumber::new(0), // TODO: not ordered! + compaction_level: CompactionLevel::Initial, + sort_key: Some(data_sort_key), + }; + + // Save the compacted data to a parquet file in object storage. + // + // This call retries until it completes. + let (md, file_size) = self + .inner + .store + .upload(record_stream, &iox_metadata) + .await + .expect("unexpected fatal persist error"); + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + file_size, + "partition parquet uploaded" + ); + + // Read the table schema from the catalog to act as a map of column name + // -> column IDs. + let table_schema = Backoff::new(&Default::default()) + .retry_all_errors("get table schema", || async { + let mut repos = self.inner.catalog.repositories().await; + get_table_schema_by_id(self.table_id, repos.as_mut()).await + }) + .await + .expect("retry forever"); + + // Build the data that must be inserted into the parquet_files catalog + // table in order to make the file visible to queriers. + let parquet_table_data = + iox_metadata.to_parquet_file(self.partition_id, file_size, &md, |name| { + table_schema.columns.get(name).expect("unknown column").id + }); + + (catalog_sort_key_update, parquet_table_data) + } + + pub(crate) async fn update_database( + self, + sort_key_update: Option, + parquet_table_data: ParquetFileParams, + ) { + // Extract the object store ID to the local scope so that it can easily + // be referenced in debug logging to aid correlation of persist events + // for a specific file. + let object_store_id = parquet_table_data.object_store_id; + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + ?parquet_table_data, + ?sort_key_update, + "updating catalog" + ); + + // If necessary, update the partition sort key in the catalog and update + // the local cached copy in the PartitionData. + // + // This update MUST be made visibile before the parquet file, otherwise + // the consumer of the parquet file will observe an inconsistent sort + // key. + if let Some(new_sort_key) = sort_key_update { + let sort_key = new_sort_key.to_columns().collect::>(); + Backoff::new(&Default::default()) + .retry_all_errors("update_sort_key", || async { + let mut repos = self.inner.catalog.repositories().await; + let _partition = repos + .partitions() + .update_sort_key(self.partition_id, &sort_key) + .await?; + Ok(()) as Result<(), iox_catalog::interface::Error> + }) + .await + .expect("retry forever"); + + // Update the sort key in the partition cache. + let old_key; + { + let mut guard = self.partition.lock(); + old_key = guard.sort_key().clone(); + guard.update_sort_key(Some(new_sort_key.clone())); + }; + + // Assert the serialisation of sort key updates. + // + // Both of these get() should not block due to both of the + // values having been previously resolved / used. + assert_eq!(old_key.get().await, self.sort_key.get().await); + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + old_sort_key = ?sort_key, + %new_sort_key, + "adjusted partition sort key" + ); + } + + // Add the parquet file to the catalog. + // + // This has the effect of allowing the queriers to "discover" the + // parquet file by polling / querying the catalog. + Backoff::new(&Default::default()) + .retry_all_errors("add parquet file to catalog", || async { + let mut repos = self.inner.catalog.repositories().await; + let parquet_file = repos + .parquet_files() + .create(parquet_table_data.clone()) + .await?; + + debug!( + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + %object_store_id, + ?parquet_table_data, + parquet_file_id=?parquet_file.id, + "parquet file added to catalog" + ); + + // compiler insisted on getting told the type of the error :shrug: + Ok(()) as Result<(), iox_catalog::interface::Error> + }) + .await + .expect("retry forever"); + + // Mark the partition as having completed persistence, causing it to + // release the reference to the in-flight persistence data it is + // holding. + // + // This SHOULD cause the data to be dropped, but there MAY be ongoing + // queries that currently hold a reference to the data. In either case, + // the persisted data will be dropped "shortly". + self.partition.lock().mark_persisted(self.data); + + info!( + %object_store_id, + namespace_id = %self.namespace_id, + namespace_name = %self.namespace_name, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + "persisted partition" + ); + + // Notify all observers of this persistence task + self.complete.notify_waiters(); + } +} + +// TODO(test): persist +// TODO(test): persist completion notification diff --git a/ingester2/src/persist/handle.rs b/ingester2/src/persist/handle.rs new file mode 100644 index 0000000000..ad03ed23f5 --- /dev/null +++ b/ingester2/src/persist/handle.rs @@ -0,0 +1,173 @@ +use std::sync::Arc; + +use iox_catalog::interface::Catalog; +use iox_query::exec::Executor; +use parking_lot::Mutex; +use parquet_file::storage::ParquetStorage; +use thiserror::Error; +use tokio::sync::{ + mpsc::{self}, + Notify, +}; + +use crate::buffer_tree::partition::{persisting::PersistingData, PartitionData}; + +use super::{actor::PersistActor, context::PersistRequest}; + +#[derive(Debug, Error)] +pub(crate) enum PersistError { + #[error("persist queue is full")] + QueueFull, +} + +/// A persistence task submission handle. +/// +/// This type is cheap to clone to share across threads. +/// +/// # Usage +/// +/// The caller should construct an [`PersistHandle`] and [`PersistActor`] by +/// calling [`PersistHandle::new()`], and run the provided [`PersistActor`] +/// instance in another thread / task (by calling [`PersistActor::run()`]). +/// +/// From this point on, the caller interacts only with the [`PersistHandle`] +/// which can be cheaply cloned and passed over thread/task boundaries to +/// enqueue persist tasks. +/// +/// Dropping all [`PersistHandle`] instances stops the [`PersistActor`], which +/// immediately stops all workers. +/// +/// # Topology +/// +/// The persist actor is uses an internal work group to parallelise persistence +/// operations up to `n_workers` number of parallel tasks. +/// +/// Submitting a persistence request places the job into a bounded queue, +/// providing a buffer for persistence requests to wait for an available worker. +/// +/// Two types of queue exists: +/// +/// * Submission queue (bounded by `submission_queue_depth`) +/// * `n_worker` number of worker queues (bounded by `worker_queue_depth`) +/// +/// ```text +/// ┌─────────────┐ +/// │PersistHandle├┐ +/// └┬────────────┘├┐ +/// └┬────────────┘│ +/// └─────┬───────┘ +/// │ +/// submission_queue_depth +/// │ +/// ▼ +/// ╔═══════════════╗ +/// ║ PersistActor ║ +/// ╚═══════════════╝ +/// │ +/// ┌────────────────┼────────────────┐ +/// │ │ │ +/// │ │ │ +/// worker_queue_depth worker_queue_depth worker_queue_depth +/// │ │ │ +/// ▼ ▼ ▼ +/// ┌────────────┐ ┌────────────┐ ┌────────────┐ +/// │ Worker 1 │ │ Worker 2 │ │ Worker N │ +/// └────────────┘ └────────────┘ └────────────┘ +/// ▲ ▲ ▲ +/// │ +/// │ ▼ │ +/// ┌ ─ ─ ─ ─ ─ ─ +/// └ ─ ─ ─ ─▶ Executor │◀ ─ ─ ─ ─ ┘ +/// └ ─ ─ ─ ─ ─ ─ +/// ``` +/// +/// Compaction is performed in the provided [`Executor`] re-org thread-pool and +/// is shared across all workers. +/// +/// At any one time, the number of outstanding persist tasks is bounded by: +/// +/// ```text +/// +/// submission_queue_depth + (workers * worker_queue_depth) +/// +/// ``` +/// +/// At any one time, there may be at most `submission_queue_depth + +/// worker_queue_depth` number of outstanding persist jobs for a single +/// partition. +/// +/// These two queues are used to decouple submissions from individual workers - +/// this prevents a "hot" / backlogged worker with a full worker queue from +/// blocking tasks from being passed through to workers with spare capacity. +/// +/// # Parallelism & Partition Serialisation +/// +/// Persistence jobs are parallelised across partitions, with up to at most +/// `n_worker` parallel persist executions at once. +/// +/// Because updates of a partition's [`SortKey`] are not commutative, they must +/// be serialised. For this reason, persist operations for given partition are +/// always placed in the same worker queue, ensuring they execute sequentially. +/// +/// [`SortKey`]: schema::sort::SortKey +#[derive(Debug, Clone)] +pub(crate) struct PersistHandle { + tx: mpsc::Sender, +} + +impl PersistHandle { + /// Initialise a new persist actor & obtain the first handle. + /// + /// The caller should call [`PersistActor::run()`] in a separate + /// thread / task to start the persistence executor. + pub(crate) fn new( + submission_queue_depth: usize, + n_workers: usize, + worker_queue_depth: usize, + exec: Executor, + store: ParquetStorage, + catalog: Arc, + ) -> (Self, PersistActor) { + let (tx, rx) = mpsc::channel(submission_queue_depth); + + let actor = PersistActor::new(rx, exec, store, catalog, n_workers, worker_queue_depth); + + (Self { tx }, actor) + } + + /// Place `data` from `partition` into the persistence queue. + /// + /// This call (asynchronously) waits for space to become available in the + /// submission queue. + /// + /// Once persistence is complete, the partition will be locked and the sort + /// key will be updated, and [`PartitionData::mark_persisted()`] is called + /// with `data`. + /// + /// Once all persistence related tasks are complete, the returned [`Notify`] + /// broadcasts a notification. + /// + /// # Panics + /// + /// Panics (asynchronously) if the [`PartitionData`]'s sort key is updated + /// between persistence starting and ending. + /// + /// This will panic (asynchronously) if `data` was not from `partition` or + /// all worker threads have stopped. + pub(crate) async fn queue_persist( + &self, + partition: Arc>, + data: PersistingData, + ) -> Arc { + // Build the persist task request + let r = PersistRequest::new(partition, data); + let notify = r.complete_notification(); + + self.tx + .send(r) + .await + .expect("no persist worker tasks running"); + + notify + } +} diff --git a/ingester2/src/persist/mod.rs b/ingester2/src/persist/mod.rs new file mode 100644 index 0000000000..b9365e2567 --- /dev/null +++ b/ingester2/src/persist/mod.rs @@ -0,0 +1,4 @@ +mod actor; +pub(super) mod compact; +mod context; +mod handle;