From 678fb81892cdb2042e3ab562b051178476e8972d Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 21 Oct 2022 14:25:43 +0200 Subject: [PATCH] refactor(ingester): use partition buffer FSM This commit makes use of the partition buffer state machine introduced in https://github.com/influxdata/influxdb_iox/pull/5943. This commit significantly changes the buffering, and querying, of data from a partition, swapping out the existing "DataBuffer" for the new state machine implementation (itself simplified due to temporary lack of incremental snapshot generation, see #5944). This commit simplifies the query path, removing multiple types that wrapped one-another to pass around various state necessary to perform a query, with various query functions needing different types or combinations of types. The query path now operates using a single type (named "QueryAdaptor") that provides a queryable interface over the set of RecordBatch returned from a partition. There is significantly increased testing of the PartitionData itself, covering data in various states and the ordering of returned RecordBatch (to ensure correct materialisation of updates). There are also invariants upheld by the type system / compiler to minimise the complexities of working with empty batches & states, and many asserts that ensure (mostly existing!) invariants are upheld. --- Cargo.lock | 2 +- ingester/Cargo.toml | 4 +- ingester/src/compact.rs | 356 ++--- ingester/src/data.rs | 144 +- ingester/src/data/namespace.rs | 72 +- ingester/src/data/partition.rs | 1236 ++++++++++++----- ingester/src/data/partition/buffer.rs | 346 ++--- .../src/data/partition/buffer/always_some.rs | 69 +- .../data/partition/buffer/state_machine.rs | 10 +- .../src/data/partition/resolver/catalog.rs | 2 +- ingester/src/data/sequence_range.rs | 2 +- ingester/src/data/table.rs | 80 +- ingester/src/lib.rs | 2 +- ingester/src/querier_handler.rs | 210 ++- ingester/src/{query.rs => query_adaptor.rs} | 160 ++- ingester/src/test_util.rs | 109 +- iox_query/src/provider.rs | 3 +- 17 files changed, 1469 insertions(+), 1338 deletions(-) rename ingester/src/{query.rs => query_adaptor.rs} (57%) diff --git a/Cargo.lock b/Cargo.lock index 6285ded57d..4238ca6ff7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2279,7 +2279,6 @@ dependencies = [ "assert_matches", "async-trait", "backoff", - "bitflags", "bytes", "chrono", "data_types", @@ -2293,6 +2292,7 @@ dependencies = [ "iox_catalog", "iox_query", "iox_time", + "lazy_static", "metric", "mutable_batch", "mutable_batch_lp", diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 857900e923..910493463c 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -47,11 +47,11 @@ write_summary = { path = "../write_summary" } tokio-util = { version = "0.7.4" } trace = { path = "../trace" } rand = "0.8.5" +once_cell = "1" [dev-dependencies] assert_matches = "1.5.0" -bitflags = {version = "1.3.2"} -once_cell = "1" +lazy_static = "1.4.0" paste = "1.0.9" test_helpers = { path = "../test_helpers", features = ["future_timeout"] } tokio-stream = {version = "0.1.11", default_features = false } diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index ba08c693ee..7c57dd7510 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -11,7 +11,7 @@ use iox_query::{ use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey}; use snafu::{ResultExt, Snafu}; -use crate::{data::partition::PersistingBatch, query::QueryableBatch}; +use crate::query_adaptor::QueryAdaptor; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -85,14 +85,14 @@ impl std::fmt::Debug for CompactedStream { } } -/// Compact a given persisting batch into a [`CompactedStream`] or -/// `None` if there is no data to compact. +/// Compact a given batch into a [`CompactedStream`] or `None` if there is no +/// data to compact, returning an updated sort key, if any. pub(crate) async fn compact_persisting_batch( executor: &Executor, sort_key: Option, - batch: Arc, + batch: QueryAdaptor, ) -> Result { - assert!(!batch.data.data.is_empty()); + assert!(!batch.record_batches().is_empty()); // Get sort key from the catalog or compute it from // cardinality. @@ -104,12 +104,12 @@ pub(crate) async fn compact_persisting_batch( // // If there are any new columns, add them to the end of the sort key in the catalog and // return that to be updated in the catalog. - adjust_sort_key_columns(&sk, &batch.data.schema().primary_key()) + adjust_sort_key_columns(&sk, &batch.schema().primary_key()) } None => { let sort_key = compute_sort_key( - batch.data.schema().as_ref(), - batch.data.data.iter().map(|sb| sb.data.as_ref()), + batch.schema().as_ref(), + batch.record_batches().iter().map(|sb| sb.as_ref()), ); // Use the sort key computed from the cardinality as the sort key for this parquet // file's metadata, also return the sort key to be stored in the catalog @@ -118,7 +118,7 @@ pub(crate) async fn compact_persisting_batch( }; // Compact - let stream = compact(executor, Arc::clone(&batch.data), data_sort_key.clone()).await?; + let stream = compact(executor, Arc::new(batch), data_sort_key.clone()).await?; Ok(CompactedStream { stream, @@ -127,10 +127,10 @@ pub(crate) async fn compact_persisting_batch( }) } -/// Compact a given Queryable Batch +/// Compact a given batch without updating the sort key. pub(crate) async fn compact( executor: &Executor, - data: Arc, + data: Arc, sort_key: SortKey, ) -> Result { // Build logical plan for compaction @@ -157,9 +157,9 @@ pub(crate) async fn compact( #[cfg(test)] mod tests { use arrow_util::assert_batches_eq; + use data_types::PartitionId; use mutable_batch_lp::lines_to_batches; use schema::selection::Selection; - use uuid::Uuid; use super::*; use crate::test_util::{ @@ -169,14 +169,14 @@ mod tests { create_batches_with_influxtype_same_columns_different_type, create_one_record_batch_with_influxtype_duplicates, create_one_record_batch_with_influxtype_no_duplicates, - create_one_row_record_batch_with_influxtype, make_persisting_batch, make_queryable_batch, + create_one_row_record_batch_with_influxtype, }; // this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782 // where if sending in a single row it would compact into an output of two batches, one of // which was empty, which would cause this to panic. #[tokio::test] - async fn test_compact_persisting_batch_on_one_record_batch_with_one_row() { + async fn test_compact_batch_on_one_record_batch_with_one_row() { // create input data let batch = lines_to_batches("cpu bar=2 20", 0) .unwrap() @@ -184,26 +184,15 @@ mod tests { .unwrap() .to_arrow(Selection::All) .unwrap(); - let batches = vec![Arc::new(batch)]; - // build persisting batch from the input batches - let uuid = Uuid::new_v4(); - let table_name = "test_table"; - let shard_id = 1; - let seq_num_start: i64 = 1; - let table_id = 1; - let partition_id = 1; - let persisting_batch = make_persisting_batch( - shard_id, - seq_num_start, - table_id, - table_name, - partition_id, - uuid, - batches, + + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + vec![Arc::new(batch)], ); // verify PK - let schema = persisting_batch.data.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["time"]; assert_eq!(expected_pk, pk); @@ -211,7 +200,7 @@ mod tests { // compact let exc = Executor::new(1); let CompactedStream { stream, .. } = - compact_persisting_batch(&exc, Some(SortKey::empty()), persisting_batch) + compact_persisting_batch(&exc, Some(SortKey::empty()), batch) .await .unwrap(); @@ -232,29 +221,16 @@ mod tests { } #[tokio::test] - async fn test_compact_persisting_batch_on_one_record_batch_no_dupilcates() { + async fn test_compact_batch_on_one_record_batch_no_dupilcates() { // create input data - let batches = create_one_record_batch_with_influxtype_no_duplicates().await; - - // build persisting batch from the input batches - let uuid = Uuid::new_v4(); - let table_name = "test_table"; - let shard_id = 1; - let seq_num_start: i64 = 1; - let table_id = 1; - let partition_id = 1; - let persisting_batch = make_persisting_batch( - shard_id, - seq_num_start, - table_id, - table_name, - partition_id, - uuid, - batches, + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_one_record_batch_with_influxtype_no_duplicates().await, ); // verify PK - let schema = persisting_batch.data.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); @@ -265,7 +241,7 @@ mod tests { stream, data_sort_key, catalog_sort_key_update, - } = compact_persisting_batch(&exc, Some(SortKey::empty()), persisting_batch) + } = compact_persisting_batch(&exc, Some(SortKey::empty()), batch) .await .unwrap(); @@ -295,29 +271,16 @@ mod tests { } #[tokio::test] - async fn test_compact_persisting_batch_no_sort_key() { + async fn test_compact_batch_no_sort_key() { // create input data - let batches = create_batches_with_influxtype_different_cardinality().await; - - // build persisting batch from the input batches - let uuid = Uuid::new_v4(); - let table_name = "test_table"; - let shard_id = 1; - let seq_num_start: i64 = 1; - let table_id = 1; - let partition_id = 1; - let persisting_batch = make_persisting_batch( - shard_id, - seq_num_start, - table_id, - table_name, - partition_id, - uuid, - batches, + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, ); // verify PK - let schema = persisting_batch.data.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "tag3", "time"]; assert_eq!(expected_pk, pk); @@ -329,7 +292,7 @@ mod tests { stream, data_sort_key, catalog_sort_key_update, - } = compact_persisting_batch(&exc, Some(SortKey::empty()), persisting_batch) + } = compact_persisting_batch(&exc, Some(SortKey::empty()), batch) .await .unwrap(); @@ -363,29 +326,16 @@ mod tests { } #[tokio::test] - async fn test_compact_persisting_batch_with_specified_sort_key() { + async fn test_compact_batch_with_specified_sort_key() { // create input data - let batches = create_batches_with_influxtype_different_cardinality().await; - - // build persisting batch from the input batches - let uuid = Uuid::new_v4(); - let table_name = "test_table"; - let shard_id = 1; - let seq_num_start: i64 = 1; - let table_id = 1; - let partition_id = 1; - let persisting_batch = make_persisting_batch( - shard_id, - seq_num_start, - table_id, - table_name, - partition_id, - uuid, - batches, + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, ); // verify PK - let schema = persisting_batch.data.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "tag3", "time"]; assert_eq!(expected_pk, pk); @@ -401,7 +351,7 @@ mod tests { } = compact_persisting_batch( &exc, Some(SortKey::from_columns(["tag3", "tag1", "time"])), - persisting_batch, + batch, ) .await .unwrap(); @@ -435,29 +385,16 @@ mod tests { } #[tokio::test] - async fn test_compact_persisting_batch_new_column_for_sort_key() { + async fn test_compact_batch_new_column_for_sort_key() { // create input data - let batches = create_batches_with_influxtype_different_cardinality().await; - - // build persisting batch from the input batches - let uuid = Uuid::new_v4(); - let table_name = "test_table"; - let shard_id = 1; - let seq_num_start: i64 = 1; - let table_id = 1; - let partition_id = 1; - let persisting_batch = make_persisting_batch( - shard_id, - seq_num_start, - table_id, - table_name, - partition_id, - uuid, - batches, + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, ); // verify PK - let schema = persisting_batch.data.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "tag3", "time"]; assert_eq!(expected_pk, pk); @@ -471,13 +408,9 @@ mod tests { stream, data_sort_key, catalog_sort_key_update, - } = compact_persisting_batch( - &exc, - Some(SortKey::from_columns(["tag3", "time"])), - persisting_batch, - ) - .await - .unwrap(); + } = compact_persisting_batch(&exc, Some(SortKey::from_columns(["tag3", "time"])), batch) + .await + .unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) .await @@ -511,29 +444,16 @@ mod tests { } #[tokio::test] - async fn test_compact_persisting_batch_missing_column_for_sort_key() { + async fn test_compact_batch_missing_column_for_sort_key() { // create input data - let batches = create_batches_with_influxtype_different_cardinality().await; - - // build persisting batch from the input batches - let uuid = Uuid::new_v4(); - let table_name = "test_table"; - let shard_id = 1; - let seq_num_start: i64 = 1; - let table_id = 1; - let partition_id = 1; - let persisting_batch = make_persisting_batch( - shard_id, - seq_num_start, - table_id, - table_name, - partition_id, - uuid, - batches, + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_batches_with_influxtype_different_cardinality().await, ); // verify PK - let schema = persisting_batch.data.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "tag3", "time"]; assert_eq!(expected_pk, pk); @@ -550,7 +470,7 @@ mod tests { } = compact_persisting_batch( &exc, Some(SortKey::from_columns(["tag3", "tag1", "tag4", "time"])), - persisting_batch, + batch, ) .await .unwrap(); @@ -588,26 +508,25 @@ mod tests { test_helpers::maybe_start_logging(); // create input data - let batches = create_one_row_record_batch_with_influxtype().await; - - // build queryable batch from the input batches - let compact_batch = make_queryable_batch("test_table", 0, 1, batches); + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_one_row_record_batch_with_influxtype().await, + ); // verify PK - let schema = compact_batch.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact let exc = Executor::new(1); - let stream = compact(&exc, compact_batch, sort_key).await.unwrap(); + let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) .await .unwrap(); @@ -629,26 +548,25 @@ mod tests { #[tokio::test] async fn test_compact_one_batch_with_duplicates() { // create input data - let batches = create_one_record_batch_with_influxtype_duplicates().await; - - // build queryable batch from the input batches - let compact_batch = make_queryable_batch("test_table", 0, 1, batches); + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_one_record_batch_with_influxtype_duplicates().await, + ); // verify PK - let schema = compact_batch.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact let exc = Executor::new(1); - let stream = compact(&exc, compact_batch, sort_key).await.unwrap(); + let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) .await .unwrap(); @@ -678,26 +596,25 @@ mod tests { #[tokio::test] async fn test_compact_many_batches_same_columns_with_duplicates() { // create many-batches input data - let batches = create_batches_with_influxtype().await; - - // build queryable batch from the input batches - let compact_batch = make_queryable_batch("test_table", 0, 1, batches); + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_batches_with_influxtype().await, + ); // verify PK - let schema = compact_batch.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact let exc = Executor::new(1); - let stream = compact(&exc, compact_batch, sort_key).await.unwrap(); + let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) .await .unwrap(); @@ -724,26 +641,25 @@ mod tests { #[tokio::test] async fn test_compact_many_batches_different_columns_with_duplicates() { // create many-batches input data - let batches = create_batches_with_influxtype_different_columns().await; - - // build queryable batch from the input batches - let compact_batch = make_queryable_batch("test_table", 0, 1, batches); + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_batches_with_influxtype_different_columns().await, + ); // verify PK - let schema = compact_batch.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact let exc = Executor::new(1); - let stream = compact(&exc, compact_batch, sort_key).await.unwrap(); + let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) .await .unwrap(); @@ -774,26 +690,25 @@ mod tests { #[tokio::test] async fn test_compact_many_batches_different_columns_different_order_with_duplicates() { // create many-batches input data - let batches = create_batches_with_influxtype_different_columns_different_order().await; - - // build queryable batch from the input batches - let compact_batch = make_queryable_batch("test_table", 0, 1, batches); + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_batches_with_influxtype_different_columns_different_order().await, + ); // verify PK - let schema = compact_batch.schema(); + let schema = batch.schema(); let pk = schema.primary_key(); let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = + compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact let exc = Executor::new(1); - let stream = compact(&exc, compact_batch, sort_key).await.unwrap(); + let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap(); let output_batches = datafusion::physical_plan::common::collect(stream) .await .unwrap(); @@ -823,68 +738,17 @@ mod tests { assert_batches_eq!(&expected, &output_batches); } - // BUG - #[tokio::test] - async fn test_compact_many_batches_different_columns_different_order_with_duplicates2() { - // create many-batches input data - let batches = create_batches_with_influxtype_different_columns_different_order().await; - - // build queryable batch from the input batches - let compact_batch = make_queryable_batch("test_table", 0, 1, batches); - - // verify PK - let schema = compact_batch.schema(); - let pk = schema.primary_key(); - let expected_pk = vec!["tag1", "tag2", "time"]; - assert_eq!(expected_pk, pk); - - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); - assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); - - // compact - let exc = Executor::new(1); - let stream = compact(&exc, compact_batch, sort_key).await.unwrap(); - let output_batches = datafusion::physical_plan::common::collect(stream) - .await - .unwrap(); - - // verify compacted data - // data is sorted and all duplicates are removed - let expected = vec![ - "+-----------+------+------+--------------------------------+", - "| field_int | tag1 | tag2 | time |", - "+-----------+------+------+--------------------------------+", - "| 5 | | AL | 1970-01-01T00:00:00.000005Z |", - "| 10 | | AL | 1970-01-01T00:00:00.000007Z |", - "| 70 | | CT | 1970-01-01T00:00:00.000000100Z |", - "| 1000 | | CT | 1970-01-01T00:00:00.000001Z |", - "| 100 | | MA | 1970-01-01T00:00:00.000000050Z |", - "| 10 | AL | MA | 1970-01-01T00:00:00.000000050Z |", - "| 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |", - "| 70 | CT | CT | 1970-01-01T00:00:00.000000500Z |", - "| 30 | MT | AL | 1970-01-01T00:00:00.000000005Z |", - "| 20 | MT | AL | 1970-01-01T00:00:00.000007Z |", - "| 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |", - "| 1000 | MT | CT | 1970-01-01T00:00:00.000002Z |", - "+-----------+------+------+--------------------------------+", - ]; - - assert_batches_eq!(&expected, &output_batches); - } - #[tokio::test] #[should_panic(expected = "Schemas compatible")] async fn test_compact_many_batches_same_columns_different_types() { // create many-batches input data - let batches = create_batches_with_influxtype_same_columns_different_type().await; + let batch = QueryAdaptor::new( + "test_table".into(), + PartitionId::new(1), + create_batches_with_influxtype_same_columns_different_type().await, + ); - // build queryable batch from the input batches - let compact_batch = make_queryable_batch("test_table", 0, 1, batches); - - // the schema merge will thorw a panic - compact_batch.schema(); + // the schema merge should throw a panic + batch.schema(); } } diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 0f0270d910..70131538b7 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -20,6 +20,7 @@ use parquet_file::{ storage::{ParquetStorage, StorageId}, }; use snafu::{OptionExt, Snafu}; +use uuid::Uuid; use write_summary::ShardProgress; use crate::{ @@ -29,9 +30,12 @@ use crate::{ pub(crate) mod namespace; pub mod partition; +mod sequence_range; pub(crate) mod shard; pub(crate) mod table; +pub(crate) use sequence_range::*; + use self::{partition::resolver::PartitionProvider, shard::ShardData}; #[cfg(test)] @@ -245,26 +249,32 @@ impl Persister for IngesterData { ) { // lookup the state from the ingester data. If something isn't found, // it's unexpected. Crash so someone can take a look. - let shard_data = self + let namespace = self .shards .get(&shard_id) - .unwrap_or_else(|| panic!("shard state for {shard_id} not in ingester data")); - let namespace = shard_data - .namespace_by_id(namespace_id) + .and_then(|s| s.namespace_by_id(namespace_id)) .unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state")); let namespace_name = namespace.namespace_name(); + // Assert the namespace ID matches the index key. + assert_eq!(namespace.namespace_id(), namespace_id); let table_data = namespace.table_id(table_id).unwrap_or_else(|| { panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state") }); - let partition_key; let table_name; - let batch; + let partition_key; let sort_key; let last_persisted_sequence_number; + let batch; + let batch_sequence_number_range; { let mut guard = table_data.write().await; + // Assert various properties of the table to ensure the index is + // correct, out of an abundance of caution. + assert_eq!(guard.shard_id(), shard_id); + assert_eq!(guard.namespace_id(), namespace_id); + assert_eq!(guard.table_id(), table_id); table_name = guard.table_name().clone(); let partition = guard.get_partition(partition_id).unwrap_or_else(|| { @@ -273,12 +283,34 @@ impl Persister for IngesterData { ) }); + // Assert various properties of the partition to ensure the index is + // correct, out of an abundance of caution. + assert_eq!(partition.partition_id(), partition_id); + assert_eq!(partition.shard_id(), shard_id); + assert_eq!(partition.namespace_id(), namespace_id); + assert_eq!(partition.table_id(), table_id); + assert_eq!(*partition.table_name(), table_name); + partition_key = partition.partition_key().clone(); - batch = partition.snapshot_to_persisting_batch(); sort_key = partition.sort_key().clone(); last_persisted_sequence_number = partition.max_persisted_sequence_number(); + + // The sequence number MUST be read without releasing the write lock + // to ensure a consistent snapshot of batch contents and batch + // sequence number range. + batch = partition.mark_persisting(); + batch_sequence_number_range = partition.sequence_number_range(); }; + // From this point on, the code MUST be infallible. + // + // The partition data was moved to the persisting slot, and any + // subsequent calls would be an error. + // + // This is NOT an invariant, and this could be changed in the future to + // allow partitions to marked as persisting repeatedly. Today however, + // the code is infallible (or rather, terminal - it does cause a retry). + let sort_key = sort_key.get().await; trace!( %shard_id, @@ -306,8 +338,13 @@ impl Persister for IngesterData { // Check if there is any data to persist. let batch = match batch { - Some(v) if !v.data.data.is_empty() => v, - _ => { + Some(v) => { + // The partition state machine will NOT return an empty batch. + assert!(!v.record_batches().is_empty()); + v + } + None => { + // But it MAY return no batch at all. warn!( %shard_id, %namespace_id, @@ -322,17 +359,6 @@ impl Persister for IngesterData { } }; - assert_eq!(batch.shard_id(), shard_id); - assert_eq!(batch.table_id(), table_id); - assert_eq!(batch.partition_id(), partition_id); - - // Read the maximum SequenceNumber in the batch. - let (_min, max_sequence_number) = batch.data.min_max_sequence_numbers(); - - // Read the future object store ID before passing the batch into - // compaction, instead of retaining a copy of the data post-compaction. - let object_store_id = batch.object_store_id(); - // do the CPU intensive work of compaction, de-duplication and sorting let CompactedStream { stream: record_stream, @@ -342,6 +368,10 @@ impl Persister for IngesterData { .await .expect("unable to compact persisting batch"); + // Generate a UUID to uniquely identify this parquet file in object + // storage. + let object_store_id = Uuid::new_v4(); + // Construct the metadata for this parquet file. let iox_metadata = IoxMetadata { object_store_id, @@ -353,7 +383,7 @@ impl Persister for IngesterData { table_name: Arc::clone(&*table_name), partition_id, partition_key: partition_key.clone(), - max_sequence_number, + max_sequence_number: batch_sequence_number_range.inclusive_max().unwrap(), compaction_level: CompactionLevel::Initial, sort_key: Some(data_sort_key), }; @@ -503,15 +533,28 @@ impl Persister for IngesterData { .recorder(attributes) .record(file_size as u64); - // and remove the persisted data from memory - namespace - .mark_persisted( - &table_name, - &partition_key, - iox_metadata.max_sequence_number, - ) - .await; - debug!( + // Mark the partition as having completed persistence, causing it to + // release the reference to the in-flight persistence data it is + // holding. + // + // This SHOULD cause the data to be dropped, but there MAY be ongoing + // queries that currently hold a reference to the data. In either case, + // the persisted data will be dropped "shortly". + table_data + .write() + .await + .get_partition(partition_id) + .unwrap() + .mark_persisted(iox_metadata.max_sequence_number); + + // BUG: ongoing queries retain references to the persisting data, + // preventing it from being dropped, but memory is released back to + // lifecycle memory tracker when this fn returns. + // + // https://github.com/influxdata/influxdb_iox/issues/5805 + // + + info!( %object_store_id, %shard_id, %namespace_id, @@ -521,7 +564,7 @@ impl Persister for IngesterData { %partition_id, %partition_key, max_sequence_number=%iox_metadata.max_sequence_number.get(), - "marked partition as persisted" + "persisted partition" ); } @@ -656,8 +699,21 @@ mod tests { .await .unwrap(); assert_matches!(action, DmlApplyAction::Applied(false)); + + let w2 = DmlWrite::new( + "foo", + lines_to_batches("mem foo=1 10", 0).unwrap(), + Some("1970-01-01".into()), + DmlMeta::sequenced( + Sequence::new(ShardIndex::new(1), SequenceNumber::new(2)), + ignored_ts, + None, + 50, + ), + ); + let action = data - .buffer_operation(shard1.id, DmlOperation::Write(w1), &manager.handle()) + .buffer_operation(shard1.id, DmlOperation::Write(w2), &manager.handle()) .await .unwrap(); assert_matches!(action, DmlApplyAction::Applied(true)); @@ -1016,11 +1072,15 @@ mod tests { assert_eq!(buckets_with_counts, &[500 * 1024]); let mem_table = n.table_data(&"mem".into()).unwrap(); - let mem_table = mem_table.read().await; // verify that the parquet_max_sequence_number got updated assert_eq!( - mem_table.parquet_max_sequence_number(), + mem_table + .write() + .await + .get_partition(partition_id) + .unwrap() + .max_persisted_sequence_number(), Some(SequenceNumber::new(2)) ); @@ -1310,13 +1370,17 @@ mod tests { .unwrap(); { let table_data = data.table_data(&"mem".into()).unwrap(); - let table = table_data.read().await; - let p = table.get_partition_by_key(&"1970-01-01".into()).unwrap(); + let mut table = table_data.write().await; + assert!(table + .partition_iter_mut() + .all(|p| p.get_query_data().is_none())); assert_eq!( - p.max_persisted_sequence_number(), + table + .get_partition_by_key(&"1970-01-01".into()) + .unwrap() + .max_persisted_sequence_number(), Some(SequenceNumber::new(1)) ); - assert!(p.data.buffer.is_none()); } assert_matches!(action, DmlApplyAction::Skipped); @@ -1329,8 +1393,8 @@ mod tests { let table = table_data.read().await; let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap(); assert_eq!( - partition.data.buffer.as_ref().unwrap().min_sequence_number, - SequenceNumber::new(2) + partition.sequence_number_range().inclusive_min(), + Some(SequenceNumber::new(2)) ); assert_matches!(data.table_count().observe(), Observation::U64Counter(v) => { diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 500345dcf3..74d6449f9d 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; -use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId}; +use data_types::{NamespaceId, SequenceNumber, ShardId, TableId}; use dml::DmlOperation; use iox_catalog::interface::Catalog; use metric::U64Counter; @@ -253,52 +253,7 @@ impl NamespaceData { } } - /// Snapshots the mutable buffer for the partition, which clears it out and moves it over to - /// snapshots. Then return a vec of the snapshots and the optional persisting batch. - #[cfg(test)] // Only used in tests - pub(crate) async fn snapshot( - &self, - table_name: &TableName, - partition_key: &PartitionKey, - ) -> Option<( - Vec>, - Option>, - )> { - if let Some(t) = self.table_data(table_name) { - let mut t = t.write().await; - - return t.get_partition_by_key_mut(partition_key).map(|p| { - p.data - .generate_snapshot() - .expect("snapshot on mutable batch should never fail"); - (p.data.snapshots.to_vec(), p.data.persisting.clone()) - }); - } - - None - } - - /// Snapshots the mutable buffer for the partition, which clears it out and then moves all - /// snapshots over to a persisting batch, which is returned. If there is no data to snapshot - /// or persist, None will be returned. - #[cfg(test)] // Only used in tests - pub(crate) async fn snapshot_to_persisting( - &self, - table_name: &TableName, - partition_key: &PartitionKey, - ) -> Option> { - if let Some(table_data) = self.table_data(table_name) { - let mut table_data = table_data.write().await; - - return table_data - .get_partition_by_key_mut(partition_key) - .and_then(|partition_data| partition_data.snapshot_to_persisting_batch()); - } - - None - } - - /// Gets the buffered table data + /// Return the specified [`TableData`] if it exists. pub(crate) fn table_data( &self, table_name: &TableName, @@ -353,30 +308,11 @@ impl NamespaceData { }) } - /// Walks down the table and partition and clears the persisting batch. The sequence number is - /// the max_sequence_number for the persisted parquet file, which should be kept in the table - /// data buffer. - pub(super) async fn mark_persisted( - &self, - table_name: &TableName, - partition_key: &PartitionKey, - sequence_number: SequenceNumber, - ) { - if let Some(t) = self.table_data(table_name) { - let mut t = t.write().await; - let partition = t.get_partition_by_key_mut(partition_key); - - if let Some(p) = partition { - p.mark_persisted(sequence_number); - } - } - } - /// Return progress from this Namespace pub(super) async fn progress(&self) -> ShardProgress { let tables: Vec<_> = self.tables.read().by_id.values().map(Arc::clone).collect(); - // Consolidate progtress across partitions. + // Consolidate progress across partitions. let mut progress = ShardProgress::new() // Properly account for any sequence number that is // actively buffering and thus not yet completely @@ -442,7 +378,7 @@ impl<'a> Drop for ScopedSequenceNumber<'a> { mod tests { use std::sync::Arc; - use data_types::{PartitionId, ShardIndex}; + use data_types::{PartitionId, PartitionKey, ShardIndex}; use metric::{Attributes, Metric}; use crate::{ diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index 1333849165..d6b0ded343 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -2,125 +2,23 @@ use std::sync::Arc; -use arrow::record_batch::RecordBatch; use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId}; use mutable_batch::MutableBatch; use observability_deps::tracing::*; -use schema::{selection::Selection, sort::SortKey}; -use snafu::ResultExt; -use uuid::Uuid; +use schema::sort::SortKey; use write_summary::ShardProgress; use self::{ - buffer::{BufferBatch, DataBuffer}, + buffer::{traits::Queryable, BufferState, DataBuffer, Persisting}, resolver::DeferredSortKey, }; -use crate::{querier_handler::PartitionStatus, query::QueryableBatch}; +use crate::query_adaptor::QueryAdaptor; -use super::table::TableName; +use super::{sequence_range::SequenceNumberRange, table::TableName}; mod buffer; pub mod resolver; -/// Read only copy of the unpersisted data for a partition in the ingester for a specific partition. -#[derive(Debug)] -pub(crate) struct UnpersistedPartitionData { - pub(crate) partition_id: PartitionId, - pub(crate) non_persisted: Vec>, - pub(crate) persisting: Option, - pub(crate) partition_status: PartitionStatus, -} - -/// PersistingBatch contains all needed info and data for creating -/// a parquet file for given set of SnapshotBatches -#[derive(Debug, PartialEq, Clone)] -pub(crate) struct PersistingBatch { - /// Shard id of the data - pub(crate) shard_id: ShardId, - - /// Table id of the data - pub(crate) table_id: TableId, - - /// Partition Id of the data - pub(crate) partition_id: PartitionId, - - /// Id of to-be-created parquet file of this data - pub(crate) object_store_id: Uuid, - - /// data - pub(crate) data: Arc, -} - -impl PersistingBatch { - pub(crate) fn object_store_id(&self) -> Uuid { - self.object_store_id - } - - pub(crate) fn shard_id(&self) -> ShardId { - self.shard_id - } - - pub(crate) fn table_id(&self) -> TableId { - self.table_id - } - - pub(crate) fn partition_id(&self) -> PartitionId { - self.partition_id - } -} - -/// SnapshotBatch contains data of many contiguous BufferBatches -#[derive(Debug, PartialEq)] -pub(crate) struct SnapshotBatch { - /// Min sequence number of its combined BufferBatches - pub(crate) min_sequence_number: SequenceNumber, - /// Max sequence number of its combined BufferBatches - pub(crate) max_sequence_number: SequenceNumber, - /// Data of its combined BufferBatches kept in one RecordBatch - pub(crate) data: Arc, -} - -impl SnapshotBatch { - /// Return only data of the given columns - pub(crate) fn scan( - &self, - selection: Selection<'_>, - ) -> Result>, super::Error> { - Ok(match selection { - Selection::All => Some(Arc::clone(&self.data)), - Selection::Some(columns) => { - let schema = self.data.schema(); - - let indices = columns - .iter() - .filter_map(|&column_name| { - match schema.index_of(column_name) { - Ok(idx) => Some(idx), - _ => None, // this batch does not include data of this column_name - } - }) - .collect::>(); - if indices.is_empty() { - None - } else { - Some(Arc::new( - self.data - .project(&indices) - .context(super::FilterColumnSnafu {})?, - )) - } - } - }) - } - - /// Return progress in this data - fn progress(&self) -> ShardProgress { - ShardProgress::new() - .with_buffered(self.min_sequence_number) - .with_buffered(self.max_sequence_number) - } -} - /// The load state of the [`SortKey`] for a given partition. #[derive(Debug, Clone)] pub(crate) enum SortKeyState { @@ -146,7 +44,7 @@ impl SortKeyState { #[derive(Debug)] pub struct PartitionData { /// The catalog ID of the partition this buffer is for. - id: PartitionId, + partition_id: PartitionId, /// The string partition key for this partition. partition_key: PartitionKey, @@ -168,7 +66,11 @@ pub struct PartitionData { /// The name of the table this partition is part of. table_name: TableName, - pub(super) data: DataBuffer, + /// A buffer for incoming writes. + buffer: DataBuffer, + + /// The currently persisting [`DataBuffer`], if any. + persisting: Option>, /// The max_persisted_sequence number for any parquet_file in this /// partition. @@ -189,92 +91,249 @@ impl PartitionData { max_persisted_sequence_number: Option, ) -> Self { Self { - id, + partition_id: id, partition_key, sort_key, shard_id, namespace_id, table_id, table_name, - data: Default::default(), + buffer: DataBuffer::default(), + persisting: None, max_persisted_sequence_number, } } - /// Snapshot anything in the buffer and move all snapshot data into a persisting batch - pub(super) fn snapshot_to_persisting_batch(&mut self) -> Option> { - self.data - .snapshot_to_persisting(self.shard_id, self.table_id, self.id, &self.table_name) - } - - /// Snapshot whatever is in the buffer and return a new vec of the - /// arc cloned snapshots - #[cfg(test)] - fn snapshot(&mut self) -> Result>, super::Error> { - self.data - .generate_snapshot() - .context(super::SnapshotSnafu)?; - Ok(self.data.get_snapshots().to_vec()) - } - - /// Return non persisting data - pub(super) fn get_non_persisting_data(&self) -> Result>, super::Error> { - self.data.buffer_and_snapshots() - } - - /// Return persisting data - pub(super) fn get_persisting_data(&self) -> Option { - self.data.get_persisting_data() - } - - /// Write the given mb in the buffer + /// Buffer the given [`MutableBatch`] in memory, ordered by the specified + /// [`SequenceNumber`]. + /// + /// # Panics + /// + /// This method panics if `sequence_number` is not strictly greater than + /// previous calls or the persisted maximum. pub(super) fn buffer_write( &mut self, - sequence_number: SequenceNumber, mb: MutableBatch, + sequence_number: SequenceNumber, ) -> Result<(), super::Error> { - let (min_sequence_number, max_sequence_number) = match &mut self.data.buffer { - Some(buf) => { - buf.max_sequence_number = sequence_number.max(buf.max_sequence_number); - buf.data.extend_from(&mb).context(super::BufferWriteSnafu)?; - (buf.min_sequence_number, buf.max_sequence_number) - } - None => { - self.data.buffer = Some(BufferBatch { - min_sequence_number: sequence_number, - max_sequence_number: sequence_number, - data: mb, - }); - (sequence_number, sequence_number) - } - }; + // Ensure that this write is strictly after any persisted ops. + if let Some(min) = self.max_persisted_sequence_number { + assert!(sequence_number > min, "monotonicity violation"); + } + + // Buffer the write, which ensures monotonicity of writes within the + // buffer itself. + self.buffer + .buffer_write(mb, sequence_number) + .map_err(|e| super::Error::BufferWrite { source: e })?; + trace!( - min_sequence_number=?min_sequence_number, - max_sequence_number=?max_sequence_number, + shard_id = %self.shard_id, + namespace_id = %self.namespace_id, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + min_sequence_number=?self.buffer.sequence_number_range().inclusive_min(), + max_sequence_number=?self.buffer.sequence_number_range().inclusive_max(), "buffered write" ); Ok(()) } + /// Return all data for this partition, ordered by the [`SequenceNumber`] + /// from which it was buffered with. + pub(crate) fn get_query_data(&mut self) -> Option { + // Extract the buffered data, if any. + let buffered_data = self.buffer.get_query_data(); + + // Prepend any currently persisting batches. + // + // The persisting RecordBatch instances MUST be ordered before the + // buffered data to preserve the ordering of writes such that updates to + // existing rows materialise to the correct output. + let data = self + .persisting + .iter() + .flat_map(|b| b.get_query_data()) + .chain(buffered_data) + .collect::>(); + + trace!( + shard_id = %self.shard_id, + namespace_id = %self.namespace_id, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + min_sequence_number=?self.buffer.sequence_number_range().inclusive_min(), + max_sequence_number=?self.buffer.sequence_number_range().inclusive_max(), + max_persisted=?self.max_persisted_sequence_number(), + n_batches = data.len(), + "read partition data" + ); + + if data.is_empty() { + return None; + } + + // Construct the query adaptor over the partition data. + // + // `data` MUST contain at least one row, or the constructor panics. This + // is upheld by the FSM, which ensures only non-empty snapshots / + // RecordBatch are generated. Because `data` contains at least one + // RecordBatch, this invariant holds. + Some(QueryAdaptor::new( + self.table_name.clone(), + self.partition_id, + data, + )) + } + + /// Return the range of [`SequenceNumber`] currently queryable by calling + /// [`PartitionData::get_query_data()`]. + /// + /// This includes buffered data, snapshots, and currently persisting data. + pub(super) fn sequence_number_range(&self) -> SequenceNumberRange { + self.persisting + .as_ref() + .map(|v| v.sequence_number_range().clone()) + .unwrap_or_default() + .merge(self.buffer.sequence_number_range()) + } + /// Return the progress from this Partition pub(super) fn progress(&self) -> ShardProgress { - self.data.progress() + let mut p = ShardProgress::default(); + + let range = self.buffer.sequence_number_range(); + // Observe both the min & max, as the ShardProgress tracks both. + if let Some(v) = range.inclusive_min() { + p = p.with_buffered(v); + p = p.with_buffered(range.inclusive_max().unwrap()); + } + + // Observe the buffered state, if any. + if let Some(range) = self.persisting.as_ref().map(|p| p.sequence_number_range()) { + // Observe both the min & max, as the ShardProgress tracks both. + // + // All persisting batches MUST contain data. This is an invariant + // upheld by the state machine. + p = p.with_buffered(range.inclusive_min().unwrap()); + p = p.with_buffered(range.inclusive_max().unwrap()); + } + + // And finally report the persist watermark for this partition. + if let Some(v) = self.max_persisted_sequence_number() { + p = p.with_persisted(v) + } + + trace!( + shard_id = %self.shard_id, + namespace_id = %self.namespace_id, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + progress = ?p, + "progress query" + ); + + p } - pub(super) fn partition_id(&self) -> PartitionId { - self.id - } + /// Snapshot and mark all buffered data as persisting. + /// + /// This method returns [`None`] if no data is buffered in [`Self`]. + /// + /// A reference to the persisting data is retained until a corresponding + /// call to [`Self::mark_persisted()`] is made to release it. + /// + /// # Panics + /// + /// This method panics if [`Self`] contains data already an ongoing persist + /// operation. All calls to [`Self::mark_persisting()`] must be followed by + /// a matching call to [`Self::mark_persisted()`] before a new persist can + /// begin. + pub(super) fn mark_persisting(&mut self) -> Option { + // Assert that there is at most one persist operation per partition + // ongoing at any one time. + // + // This is not a system invariant, however the system MUST make + // persisted partitions visible in monotonic order w.r.t their sequence + // numbers. + assert!( + self.persisting.is_none(), + "starting persistence on partition in persisting state" + ); - /// Return the [`SequenceNumber`] that forms the (inclusive) persistence - /// watermark for this partition. - pub(crate) fn max_persisted_sequence_number(&self) -> Option { - self.max_persisted_sequence_number + let persisting = std::mem::take(&mut self.buffer).into_persisting()?; + + // From this point on, all code MUST be infallible or the buffered data + // contained within persisting may be dropped. + + debug!( + shard_id = %self.shard_id, + namespace_id = %self.namespace_id, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + current_max_persisted_sequence_number = ?self.max_persisted_sequence_number, + persisting_min_sequence_number = ?persisting.sequence_number_range().inclusive_min(), + persisting_max_sequence_number = ?persisting.sequence_number_range().inclusive_max(), + "marking partition as persisting" + ); + + let data = persisting.get_query_data(); + self.persisting = Some(persisting); + + Some(QueryAdaptor::new( + self.table_name.clone(), + self.partition_id, + data, + )) } /// Mark this partition as having completed persistence up to, and /// including, the specified [`SequenceNumber`]. + /// + /// All references to actively persisting are released. + /// + /// # Panics + /// + /// This method panics if [`Self`] is not marked as undergoing a persist + /// operation. All calls to [`Self::mark_persisted()`] must be preceded by a + /// matching call to [`Self::mark_persisting()`]. pub(super) fn mark_persisted(&mut self, sequence_number: SequenceNumber) { + // Assert there is a batch marked as persisting in self, that it has a + // non-empty sequence number range, and that the persisted upper bound + // matches the data in the batch being dropped. + // + // TODO: once this has been deployed without issue (the assert does not + // fire), passing the sequence number is redundant and can be removed. + let persisting_max = self + .persisting + .as_ref() + .expect("must be a persisting batch when marking complete") + .sequence_number_range() + .inclusive_max() + .expect("persisting batch must contain sequence numbers"); + assert_eq!( + persisting_max, sequence_number, + "marking {:?} as persisted but persisting batch max is {:?}", + sequence_number, persisting_max + ); + + // Additionally assert the persisting batch is ordered strictly before + // the data in the buffer, if any. + // + // This asserts writes are monotonically applied. + if let Some(buffer_min) = self.buffer.sequence_number_range().inclusive_min() { + assert!(persisting_max < buffer_min, "monotonicity violation"); + } + // It is an invariant that partitions are persisted in order so that // both the per-shard, and per-partition watermarks are correctly // advanced and accurate. @@ -288,35 +347,53 @@ impl PartitionData { } self.max_persisted_sequence_number = Some(sequence_number); - self.data.mark_persisted(); + self.persisting = None; + + debug!( + shard_id = %self.shard_id, + namespace_id = %self.namespace_id, + table_id = %self.table_id, + table_name = %self.table_name, + partition_id = %self.partition_id, + partition_key = %self.partition_key, + current_max_persisted_sequence_number = ?self.max_persisted_sequence_number, + "marking partition persistence complete" + ); + } + + pub(crate) fn partition_id(&self) -> PartitionId { + self.partition_id + } + + /// Return the [`SequenceNumber`] that forms the (inclusive) persistence + /// watermark for this partition. + pub(crate) fn max_persisted_sequence_number(&self) -> Option { + self.max_persisted_sequence_number } /// Return the name of the table this [`PartitionData`] is buffering writes /// for. - #[cfg(test)] - pub(crate) fn table_name(&self) -> &str { - self.table_name.as_ref() + pub(crate) fn table_name(&self) -> &TableName { + &self.table_name } /// Return the shard ID for this partition. - #[cfg(test)] pub(crate) fn shard_id(&self) -> ShardId { self.shard_id } /// Return the table ID for this partition. - #[cfg(test)] pub(crate) fn table_id(&self) -> TableId { self.table_id } /// Return the partition key for this partition. - pub fn partition_key(&self) -> &PartitionKey { + pub(crate) fn partition_key(&self) -> &PartitionKey { &self.partition_key } /// Return the [`NamespaceId`] this partition is a part of. - pub fn namespace_id(&self) -> NamespaceId { + pub(crate) fn namespace_id(&self) -> NamespaceId { self.namespace_id } @@ -338,190 +415,467 @@ impl PartitionData { #[cfg(test)] mod tests { - use std::time::Duration; + use std::{ops::Deref, time::Duration}; - use arrow_util::assert_batches_sorted_eq; + use arrow::compute::SortOptions; + use arrow_util::assert_batches_eq; use assert_matches::assert_matches; use backoff::BackoffConfig; use data_types::ShardIndex; + use datafusion::{ + physical_expr::PhysicalSortExpr, + physical_plan::{expressions::col, memory::MemoryExec, ExecutionPlan}, + }; + use datafusion_util::test_collect; use iox_catalog::interface::Catalog; + use iox_query::QueryChunk; + use lazy_static::lazy_static; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use crate::test_util::populate_catalog; use super::*; - #[test] - fn snapshot_buffer_different_but_compatible_schemas() { - let mut partition_data = PartitionData::new( - PartitionId::new(1), - "bananas".into(), - ShardId::new(1), - NamespaceId::new(42), - TableId::new(1), - "foo".into(), - SortKeyState::Provided(None), - None, - ); + const PARTITION_ID: PartitionId = PartitionId::new(1); - let seq_num1 = SequenceNumber::new(1); - // Missing tag `t1` - let (_, mut mutable_batch1) = - lp_to_mutable_batch(r#"foo iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#); - partition_data - .buffer_write(seq_num1, mutable_batch1.clone()) - .unwrap(); - - let seq_num2 = SequenceNumber::new(2); - // Missing field `iv` - let (_, mutable_batch2) = - lp_to_mutable_batch(r#"foo,t1=aoeu uv=1u,fv=12.0,bv=false,sv="bye" 10000"#); - - partition_data - .buffer_write(seq_num2, mutable_batch2.clone()) - .unwrap(); - partition_data.data.generate_snapshot().unwrap(); - - assert!(partition_data.data.buffer.is_none()); - assert_eq!(partition_data.data.snapshots.len(), 1); - - let snapshot = &partition_data.data.snapshots[0]; - assert_eq!(snapshot.min_sequence_number, seq_num1); - assert_eq!(snapshot.max_sequence_number, seq_num2); - - mutable_batch1.extend_from(&mutable_batch2).unwrap(); - let combined_record_batch = mutable_batch1.to_arrow(Selection::All).unwrap(); - assert_eq!(&*snapshot.data, &combined_record_batch); + lazy_static! { + static ref PARTITION_KEY: PartitionKey = PartitionKey::from("platanos"); + static ref TABLE_NAME: TableName = TableName::from("bananas"); } - // Test deletes mixed with writes on a single parittion + // Write some data and read it back from the buffer. + // + // This ensures the sequence range, progress API, buffering, snapshot + // generation & query all work as intended. #[tokio::test] - async fn writes() { - // Make a partition with empty DataBuffer - let s_id = 1; - let t_id = 1; - let p_id = 1; + async fn test_write_read() { let mut p = PartitionData::new( - PartitionId::new(p_id), - "bananas".into(), - ShardId::new(s_id), - NamespaceId::new(42), - TableId::new(t_id), - "restaurant".into(), + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), SortKeyState::Provided(None), None, ); - // ------------------------------------------ - // Fill `buffer` - // --- seq_num: 1 - let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Boston day="fri",temp=50 10"#); - p.buffer_write(SequenceNumber::new(1), mb).unwrap(); + // No writes should report no sequence offsets. + { + let range = p.sequence_number_range(); + assert_eq!(range.inclusive_min(), None); + assert_eq!(range.inclusive_max(), None); + } - // --- seq_num: 2 - let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Andover day="thu",temp=44 15"#); + // The progress API should indicate there is no progress status. + assert!(p.progress().is_empty()); - p.buffer_write(SequenceNumber::new(2), mb).unwrap(); + // And no data should be returned when queried. + assert!(p.get_query_data().is_none()); - // verify data - assert_eq!( - p.data.buffer.as_ref().unwrap().min_sequence_number, - SequenceNumber::new(1) - ); - assert_eq!( - p.data.buffer.as_ref().unwrap().max_sequence_number, - SequenceNumber::new(2) - ); - assert_eq!(p.data.snapshots.len(), 0); - assert_eq!(p.data.persisting, None); + // Perform a single write. + let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; + p.buffer_write(mb, SequenceNumber::new(1)) + .expect("write should succeed"); - // ------------------------------------------ - // Fill `buffer` - // --- seq_num: 4 - let (_, mb) = lp_to_mutable_batch( - r#" - restaurant,city=Medford day="sun",temp=55 22 - restaurant,city=Boston day="sun",temp=57 24 - "#, - ); - p.buffer_write(SequenceNumber::new(4), mb).unwrap(); + // The sequence range should now cover the single write. + { + let range = p.sequence_number_range(); + assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(1))); + assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(1))); + } - // --- seq_num: 5 - let (_, mb) = lp_to_mutable_batch(r#"restaurant,city=Andover day="tue",temp=56 30"#); + // The progress API should indicate there is some data buffered, but not + // persisted. + { + let progress = p.progress(); + assert!(progress.readable(SequenceNumber::new(1))); + assert!(!progress.persisted(SequenceNumber::new(1))); + } - p.buffer_write(SequenceNumber::new(5), mb).unwrap(); + // The data should be readable. + { + let data = p.get_query_data().expect("should return data"); + assert_eq!(data.partition_id(), PARTITION_ID); + assert_eq!(data.table_name(), TABLE_NAME.to_string()); - // verify data - assert_eq!( - p.data.buffer.as_ref().unwrap().min_sequence_number, - SequenceNumber::new(1) - ); - assert_eq!( - p.data.buffer.as_ref().unwrap().max_sequence_number, - SequenceNumber::new(5) - ); - assert_eq!(p.data.snapshots.len(), 0); - assert_eq!(p.data.persisting, None); - assert!(p.data.buffer.is_some()); + let expected = [ + "+--------+--------+----------+--------------------------------+", + "| city | people | pigeons | time |", + "+--------+--------+----------+--------------------------------+", + "| London | 2 | millions | 1970-01-01T00:00:00.000000010Z |", + "+--------+--------+----------+--------------------------------+", + ]; + assert_batches_eq!( + expected, + &*data + .record_batches() + .iter() + .map(Deref::deref) + .cloned() + .collect::>() + ); + } - // ------------------------------------------ - // Persisting - let p_batch = p.snapshot_to_persisting_batch().unwrap(); + // Perform a another write, adding data to the existing queryable data + // snapshot. + let mb = lp_to_mutable_batch(r#"bananas,city=Madrid people=4,pigeons="none" 20"#).1; + p.buffer_write(mb, SequenceNumber::new(2)) + .expect("write should succeed"); - // verify data - assert!(p.data.buffer.is_none()); // always empty after issuing persit - assert_eq!(p.data.snapshots.len(), 0); // always empty after issuing persit - assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); + // The sequence range should now cover both writes. + { + let range = p.sequence_number_range(); + assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(1))); + assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(2))); + } - // verify data - assert!(p.data.buffer.is_none()); - assert_eq!(p.data.snapshots.len(), 0); // no snpashots becasue buffer has not data yet and the - // snapshot was empty too - assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); + // The progress API should indicate there is more data buffered, but not + // persisted. + { + let progress = p.progress(); + assert!(progress.readable(SequenceNumber::new(1))); + assert!(progress.readable(SequenceNumber::new(2))); + assert!(!progress.persisted(SequenceNumber::new(1))); + assert!(!progress.persisted(SequenceNumber::new(2))); + } - // ------------------------------------------ - // Fill `buffer` - // --- seq_num: 8 - let (_, mb) = lp_to_mutable_batch( - r#" - restaurant,city=Wilmington day="sun",temp=55 35 - restaurant,city=Boston day="sun",temp=60 36 - restaurant,city=Boston day="sun",temp=62 38 - "#, - ); - p.buffer_write(SequenceNumber::new(8), mb).unwrap(); + // And finally both writes should be readable. + { + let data = p.get_query_data().expect("should contain data"); + assert_eq!(data.partition_id(), PARTITION_ID); + assert_eq!(data.table_name(), TABLE_NAME.to_string()); - // verify data - assert_eq!( - p.data.buffer.as_ref().unwrap().min_sequence_number, - SequenceNumber::new(8) - ); // 1 newly added mutable batch of 3 rows of data - assert_eq!(p.data.snapshots.len(), 0); // still empty - assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); - - // ------------------------------------------ - // Take snapshot of the `buffer` - p.snapshot().unwrap(); - // verify data - assert!(p.data.buffer.is_none()); // empty after snapshot - assert_eq!(p.data.snapshots.len(), 1); // data moved from buffer - assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch))); - // snapshot has three rows moved from buffer - let data = (*p.data.snapshots[0].data).clone(); - let expected = vec![ - "+------------+-----+------+--------------------------------+", - "| city | day | temp | time |", - "+------------+-----+------+--------------------------------+", - "| Wilmington | sun | 55 | 1970-01-01T00:00:00.000000035Z |", - "| Boston | sun | 60 | 1970-01-01T00:00:00.000000036Z |", - "| Boston | sun | 62 | 1970-01-01T00:00:00.000000038Z |", - "+------------+-----+------+--------------------------------+", - ]; - assert_batches_sorted_eq!(&expected, &[data]); - assert_eq!(p.data.snapshots[0].min_sequence_number.get(), 8); - assert_eq!(p.data.snapshots[0].max_sequence_number.get(), 8); + let expected = [ + "+--------+--------+----------+--------------------------------+", + "| city | people | pigeons | time |", + "+--------+--------+----------+--------------------------------+", + "| London | 2 | millions | 1970-01-01T00:00:00.000000010Z |", + "| Madrid | 4 | none | 1970-01-01T00:00:00.000000020Z |", + "+--------+--------+----------+--------------------------------+", + ]; + assert_batches_eq!( + expected, + &*data + .record_batches() + .iter() + .map(Deref::deref) + .cloned() + .collect::>() + ); + } } + // Test persist operations against the partition, ensuring data is readable + // both before, during, and after a persist takes place. + #[tokio::test] + async fn test_persist() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + None, + ); + + assert!(p.max_persisted_sequence_number().is_none()); + assert!(p.get_query_data().is_none()); + + // Perform a single write. + let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; + p.buffer_write(mb, SequenceNumber::new(1)) + .expect("write should succeed"); + + // Begin persisting the partition. + let persisting_data = p.mark_persisting().expect("must contain existing data"); + // And validate the data being persisted. + assert_eq!(persisting_data.partition_id(), PARTITION_ID); + assert_eq!(persisting_data.table_name(), TABLE_NAME.to_string()); + assert_eq!(persisting_data.record_batches().len(), 1); + let expected = [ + "+--------+--------+----------+--------------------------------+", + "| city | people | pigeons | time |", + "+--------+--------+----------+--------------------------------+", + "| London | 2 | millions | 1970-01-01T00:00:00.000000010Z |", + "+--------+--------+----------+--------------------------------+", + ]; + assert_batches_eq!( + expected, + &*persisting_data + .record_batches() + .iter() + .map(Deref::deref) + .cloned() + .collect::>() + ); + + // The sequence range should now cover the single persisting write. + { + let range = p.sequence_number_range(); + assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(1))); + assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(1))); + } + + // The progress API should indicate there is some data buffered, but not + // yet persisted. + { + let progress = p.progress(); + assert!(progress.readable(SequenceNumber::new(1))); + assert!(!progress.persisted(SequenceNumber::new(1))); + } + + // And the max_persisted_sequence_number should not have changed. + assert!(p.max_persisted_sequence_number().is_none()); + + // Buffer another write during an ongoing persist. + let mb = lp_to_mutable_batch(r#"bananas,city=Madrid people=4,pigeons="none" 20"#).1; + p.buffer_write(mb, SequenceNumber::new(2)) + .expect("write should succeed"); + + // Which must be readable, alongside the ongoing persist data. + { + let data = p.get_query_data().expect("must have data"); + assert_eq!(data.partition_id(), PARTITION_ID); + assert_eq!(data.table_name(), TABLE_NAME.to_string()); + assert_eq!(data.record_batches().len(), 2); + let expected = [ + "+--------+--------+----------+--------------------------------+", + "| city | people | pigeons | time |", + "+--------+--------+----------+--------------------------------+", + "| London | 2 | millions | 1970-01-01T00:00:00.000000010Z |", + "| Madrid | 4 | none | 1970-01-01T00:00:00.000000020Z |", + "+--------+--------+----------+--------------------------------+", + ]; + assert_batches_eq!( + expected, + &*data + .record_batches() + .iter() + .map(Deref::deref) + .cloned() + .collect::>() + ); + } + + // The sequence range should still cover both writes. + { + let range = p.sequence_number_range(); + assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(1))); + assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(2))); + } + + // The progress API should indicate that both writes are still data + // buffered. + { + let progress = p.progress(); + assert!(progress.readable(SequenceNumber::new(1))); + assert!(progress.readable(SequenceNumber::new(2))); + assert!(!progress.persisted(SequenceNumber::new(1))); + assert!(!progress.persisted(SequenceNumber::new(2))); + } + + // And the max_persisted_sequence_number should not have changed. + assert!(p.max_persisted_sequence_number().is_none()); + + // The persist now "completes". + p.mark_persisted(SequenceNumber::new(1)); + + // The sequence range should now cover only the second remaining + // buffered write. + { + let range = p.sequence_number_range(); + assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(2))); + assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(2))); + } + + // The progress API should indicate that the writes are readable + // (somewhere, not necessarily in the ingester), and the first write is + // persisted. + { + let progress = p.progress(); + assert!(progress.readable(SequenceNumber::new(1))); + assert!(progress.readable(SequenceNumber::new(2))); + assert!(progress.persisted(SequenceNumber::new(1))); + assert!(!progress.persisted(SequenceNumber::new(2))); + } + + // And the max_persisted_sequence_number should reflect the completed + // persist op. + assert_eq!( + p.max_persisted_sequence_number(), + Some(SequenceNumber::new(1)) + ); + + // Querying the buffer should now return only the second write. + { + let data = p.get_query_data().expect("must have data"); + assert_eq!(data.partition_id(), PARTITION_ID); + assert_eq!(data.table_name(), TABLE_NAME.to_string()); + assert_eq!(data.record_batches().len(), 1); + let expected = [ + "+--------+--------+---------+--------------------------------+", + "| city | people | pigeons | time |", + "+--------+--------+---------+--------------------------------+", + "| Madrid | 4 | none | 1970-01-01T00:00:00.000000020Z |", + "+--------+--------+---------+--------------------------------+", + ]; + assert_batches_eq!( + expected, + &*data + .record_batches() + .iter() + .map(Deref::deref) + .cloned() + .collect::>() + ); + } + } + + // Ensure the ordering of snapshots & persisting data is preserved such that + // updates resolve correctly. + #[tokio::test] + async fn test_record_batch_ordering() { + // A helper function to dedupe the record batches in [`QueryAdaptor`] + // and assert the resulting batch contents. + async fn assert_deduped(expect: &[&str], batch: QueryAdaptor) { + let batch = batch + .record_batches() + .iter() + .map(Deref::deref) + .cloned() + .collect::>(); + + let sort_keys = vec![PhysicalSortExpr { + expr: col("time", &batch[0].schema()).unwrap(), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]; + + // Setup in memory stream + let schema = batch[0].schema(); + let projection = None; + let input = Arc::new(MemoryExec::try_new(&[batch], schema, projection).unwrap()); + + // Create and run the deduplicator + let exec = Arc::new(iox_query::provider::DeduplicateExec::new(input, sort_keys)); + let got = test_collect(Arc::clone(&exec) as Arc).await; + + assert_batches_eq!(expect, &*got); + } + + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + None, + ); + + // Perform the initial write. + // + // In the next series of writes this test will overwrite the value of x + // and assert the deduped resulting state. + let mb = lp_to_mutable_batch(r#"bananas x=1 42"#).1; + p.buffer_write(mb, SequenceNumber::new(1)) + .expect("write should succeed"); + + assert_eq!(p.get_query_data().unwrap().record_batches().len(), 1); + assert_deduped( + &[ + "+--------------------------------+---+", + "| time | x |", + "+--------------------------------+---+", + "| 1970-01-01T00:00:00.000000042Z | 1 |", + "+--------------------------------+---+", + ], + p.get_query_data().unwrap(), + ) + .await; + + // Write an update + let mb = lp_to_mutable_batch(r#"bananas x=2 42"#).1; + p.buffer_write(mb, SequenceNumber::new(2)) + .expect("write should succeed"); + + assert_eq!(p.get_query_data().unwrap().record_batches().len(), 1); + assert_deduped( + &[ + "+--------------------------------+---+", + "| time | x |", + "+--------------------------------+---+", + "| 1970-01-01T00:00:00.000000042Z | 2 |", + "+--------------------------------+---+", + ], + p.get_query_data().unwrap(), + ) + .await; + + // Begin persisting the data, moving the buffer to the persisting state. + { + let batches = p.mark_persisting().unwrap(); + assert_eq!(batches.record_batches().len(), 1); + assert_deduped( + &[ + "+--------------------------------+---+", + "| time | x |", + "+--------------------------------+---+", + "| 1970-01-01T00:00:00.000000042Z | 2 |", + "+--------------------------------+---+", + ], + batches, + ) + .await; + } + + // Buffer another write, and generate a snapshot by querying it. + let mb = lp_to_mutable_batch(r#"bananas x=3 42"#).1; + p.buffer_write(mb, SequenceNumber::new(3)) + .expect("write should succeed"); + + assert_eq!(p.get_query_data().unwrap().record_batches().len(), 2); + assert_deduped( + &[ + "+--------------------------------+---+", + "| time | x |", + "+--------------------------------+---+", + "| 1970-01-01T00:00:00.000000042Z | 3 |", + "+--------------------------------+---+", + ], + p.get_query_data().unwrap(), + ) + .await; + + // Finish persisting. + p.mark_persisted(SequenceNumber::new(2)); + assert_eq!( + p.max_persisted_sequence_number(), + Some(SequenceNumber::new(2)) + ); + + // And assert the correct value remains. + assert_eq!(p.get_query_data().unwrap().record_batches().len(), 1); + assert_deduped( + &[ + "+--------------------------------+---+", + "| time | x |", + "+--------------------------------+---+", + "| 1970-01-01T00:00:00.000000042Z | 3 |", + "+--------------------------------+---+", + ], + p.get_query_data().unwrap(), + ) + .await; + } + + // Ensure an updated sort key is returned. #[tokio::test] async fn test_update_provided_sort_key() { let starting_state = @@ -545,6 +899,7 @@ mod tests { assert_eq!(p.sort_key().get().await, want); } + // Test loading a deferred sort key from the catalog on demand. #[tokio::test] async fn test_update_deferred_sort_key() { let metrics = Arc::new(metric::Registry::default()); @@ -600,4 +955,243 @@ mod tests { assert_matches!(p.sort_key(), SortKeyState::Provided(_)); assert_eq!(p.sort_key().get().await, want); } + + // Perform writes with non-monotonic sequence numbers. + #[tokio::test] + #[should_panic(expected = "monotonicity violation")] + async fn test_non_monotonic_writes() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + None, + ); + + // Perform out of order writes. + let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; + p.buffer_write(mb.clone(), SequenceNumber::new(2)) + .expect("write should succeed"); + let _ = p.buffer_write(mb, SequenceNumber::new(1)); + } + + #[tokio::test] + #[should_panic(expected = "must be a persisting batch when marking complete")] + async fn test_mark_persisted_not_persisting() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + None, + ); + + p.mark_persisted(SequenceNumber::new(1)); + } + + #[tokio::test] + async fn test_mark_persisting_no_data() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + None, + ); + + assert!(p.mark_persisting().is_none()); + } + + #[tokio::test] + #[should_panic(expected = "starting persistence on partition in persisting state")] + async fn test_mark_persisting_twice() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + None, + ); + + let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; + p.buffer_write(mb, SequenceNumber::new(2)) + .expect("write should succeed"); + + assert!(p.mark_persisting().is_some()); + + p.mark_persisting(); + } + + #[tokio::test] + #[should_panic( + expected = "marking SequenceNumber(42) as persisted but persisting batch max is SequenceNumber(2)" + )] + async fn test_mark_persisted_wrong_sequence_number() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + None, + ); + + let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; + p.buffer_write(mb, SequenceNumber::new(2)) + .expect("write should succeed"); + + assert!(p.mark_persisting().is_some()); + + p.mark_persisted(SequenceNumber::new(42)); + } + + // Because persisting moves the data out of the "hot" buffer, the sequence + // numbers are not validated as being monotonic (the new buffer has no + // sequence numbers to compare against). + // + // Instead this check is performed when marking the persist op as complete. + #[tokio::test] + #[should_panic(expected = "monotonicity violation")] + async fn test_non_monotonic_writes_with_persistence() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + None, + ); + + let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; + p.buffer_write(mb.clone(), SequenceNumber::new(42)) + .expect("write should succeed"); + + assert!(p.mark_persisting().is_some()); + + // This succeeds due to a new buffer being in place that cannot track + // previous sequence numbers. + p.buffer_write(mb, SequenceNumber::new(1)) + .expect("out of order write should succeed"); + + // The assert on non-monotonic writes moves to here instead. + p.mark_persisted(SequenceNumber::new(42)); + } + + // As above, the sequence numbers are not tracked between buffer instances. + // + // This ensures that a write after a batch is persisted is still required to + // be monotonic. + #[tokio::test] + #[should_panic(expected = "monotonicity violation")] + async fn test_non_monotonic_writes_after_persistence() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + None, + ); + + let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; + p.buffer_write(mb.clone(), SequenceNumber::new(42)) + .expect("write should succeed"); + + assert!(p.mark_persisting().is_some()); + p.mark_persisted(SequenceNumber::new(42)); + + // This should fail as the write "goes backwards". + p.buffer_write(mb, SequenceNumber::new(1)) + .expect("out of order write should succeed"); + } + + // As above, but with a pre-configured persist marker. + #[tokio::test] + #[should_panic(expected = "monotonicity violation")] + async fn test_non_monotonic_writes_persist_marker() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + Some(SequenceNumber::new(42)), + ); + assert_eq!( + p.max_persisted_sequence_number(), + Some(SequenceNumber::new(42)) + ); + + let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; + + // This should fail as the write "goes backwards". + p.buffer_write(mb, SequenceNumber::new(1)) + .expect("out of order write should succeed"); + } + + // Restoring a persist marker is included in progress reports. + #[tokio::test] + async fn test_persist_marker_progress() { + let p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + Some(SequenceNumber::new(42)), + ); + assert_eq!( + p.max_persisted_sequence_number(), + Some(SequenceNumber::new(42)) + ); + + // Sequence number ranges cover buffered data only. + assert!(p.sequence_number_range().inclusive_min().is_none()); + assert!(p.sequence_number_range().inclusive_max().is_none()); + + // Progress API returns that the op is persisted and readable (not on + // the ingester, but via object storage) + assert!(p.progress().readable(SequenceNumber::new(42))); + assert!(p.progress().persisted(SequenceNumber::new(42))); + } + + // Ensure an empty PartitionData does not panic due to constructing an empty + // QueryAdaptor. + #[test] + fn test_empty_partition_no_queryadaptor_panic() { + let mut p = PartitionData::new( + PARTITION_ID, + PARTITION_KEY.clone(), + ShardId::new(2), + NamespaceId::new(3), + TableId::new(4), + TABLE_NAME.clone(), + SortKeyState::Provided(None), + Some(SequenceNumber::new(42)), + ); + + assert!(p.get_query_data().is_none()); + } } diff --git a/ingester/src/data/partition/buffer.rs b/ingester/src/data/partition/buffer.rs index 866e7a966c..00e6f376ba 100644 --- a/ingester/src/data/partition/buffer.rs +++ b/ingester/src/data/partition/buffer.rs @@ -1,274 +1,112 @@ -//! Data for the lifecycle of the Ingester - use std::sync::Arc; -use data_types::{PartitionId, SequenceNumber, ShardId, TableId}; +use arrow::record_batch::RecordBatch; +use data_types::SequenceNumber; use mutable_batch::MutableBatch; -use schema::selection::Selection; -use snafu::ResultExt; -use uuid::Uuid; -use write_summary::ShardProgress; -use crate::data::table::TableName; +use crate::data::SequenceNumberRange; -use super::{PersistingBatch, QueryableBatch, SnapshotBatch}; +mod always_some; +mod mutable_buffer; +mod state_machine; +pub(crate) mod traits; -/// Data of an IOx partition split into batches -/// ┌────────────────────────┐ ┌────────────────────────┐ ┌─────────────────────────┐ -/// │ Buffer │ │ Snapshots │ │ Persisting │ -/// │ ┌───────────────────┐ │ │ │ │ │ -/// │ │ ┌───────────────┐│ │ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │ -/// │ │ ┌┴──────────────┐│├─┼────────┼─┼─▶┌───────────────┐│ │ │ │ ┌───────────────┐│ │ -/// │ │┌┴──────────────┐├┘│ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │ -/// │ ││ BufferBatch ├┘ │ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │ -/// │ │└───────────────┘ │ │ ┌───┼─▶│ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │ -/// │ └───────────────────┘ │ │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │ -/// │ ... │ │ │ └───────────────────┘ │ │ └───────────────────┘ │ -/// │ ┌───────────────────┐ │ │ │ │ │ │ -/// │ │ ┌───────────────┐│ │ │ │ ... │ │ ... │ -/// │ │ ┌┴──────────────┐││ │ │ │ │ │ │ -/// │ │┌┴──────────────┐├┘│─┼────┘ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │ -/// │ ││ BufferBatch ├┘ │ │ │ │ ┌───────────────┐│ │ │ │ ┌───────────────┐│ │ -/// │ │└───────────────┘ │ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │ -/// │ └───────────────────┘ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │ -/// │ │ │ ││ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │ -/// │ ... │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │ -/// │ │ │ └───────────────────┘ │ │ └───────────────────┘ │ -/// └────────────────────────┘ └────────────────────────┘ └─────────────────────────┘ -#[derive(Debug, Default)] -pub(crate) struct DataBuffer { - /// Buffer of incoming writes - pub(crate) buffer: Option, +pub(crate) use state_machine::*; - /// Data in `buffer` will be moved to a `snapshot` when one of these happens: - /// . A background persist is called - /// . A read request from Querier - /// The `buffer` will be empty when this happens. - pub(crate) snapshots: Vec>, - /// When a persist is called, data in `buffer` will be moved to a `snapshot` - /// and then all `snapshots` will be moved to a `persisting`. - /// Both `buffer` and 'snaphots` will be empty when this happens. - pub(crate) persisting: Option>, - // Extra Notes: - // . In MVP, we will only persist a set of snapshots at a time. - // In later version, multiple persisting operations may be happening concurrently but - // their persisted info must be added into the Catalog in their data - // ingesting order. - // . When a read request comes from a Querier, all data from `snapshots` - // and `persisting` must be sent to the Querier. - // . After the `persisting` data is persisted and successfully added - // into the Catalog, it will be removed from this Data Buffer. - // This data might be added into an extra cache to serve up to - // Queriers that may not have loaded the parquet files from object - // storage yet. But this will be decided after MVP. +use self::{always_some::AlwaysSome, traits::Queryable}; + +/// The current state of the [`BufferState`] state machine. +/// +/// NOTE that this does NOT contain the [`Persisting`] state, as this is a +/// immutable, terminal state that does not accept further writes and is +/// directly queryable. +#[derive(Debug)] +#[must_use = "FSM should not be dropped unused"] +enum FsmState { + /// The data buffer contains no data snapshots, and is accepting writes. + Buffering(BufferState), } +impl Default for FsmState { + fn default() -> Self { + Self::Buffering(BufferState::new()) + } +} + +impl FsmState { + /// Return the current range of writes in the [`BufferState`] state machine, + /// if any. + pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange { + match self { + Self::Buffering(v) => v.sequence_number_range(), + } + } +} + +/// A helper wrapper over the [`BufferState`] FSM to abstract the caller from +/// state transitions during reads and writes from the underlying buffer. +#[derive(Debug, Default)] +#[must_use = "DataBuffer should not be dropped unused"] +pub(crate) struct DataBuffer(AlwaysSome); + impl DataBuffer { - /// If a [`BufferBatch`] exists, convert it to a [`SnapshotBatch`] and add - /// it to the list of snapshots. + /// Return the range of [`SequenceNumber`] currently queryable by calling + /// [`Self::get_query_data()`]. + pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange { + self.0.sequence_number_range() + } + + /// Buffer the given [`MutableBatch`] in memory, ordered by the specified + /// [`SequenceNumber`]. /// - /// Does nothing if there is no [`BufferBatch`]. - pub(crate) fn generate_snapshot(&mut self) -> Result<(), mutable_batch::Error> { - let snapshot = self.copy_buffer_to_snapshot()?; - if let Some(snapshot) = snapshot { - self.snapshots.push(snapshot); - self.buffer = None; - } - - Ok(()) - } - - /// Returns snapshot of the buffer but keeps data in the buffer - fn copy_buffer_to_snapshot(&self) -> Result>, mutable_batch::Error> { - if let Some(buf) = &self.buffer { - return Ok(Some(Arc::new(SnapshotBatch { - min_sequence_number: buf.min_sequence_number, - max_sequence_number: buf.max_sequence_number, - data: Arc::new(buf.data.to_arrow(Selection::All)?), - }))); - } - - Ok(None) - } - - /// Snapshots the buffer and make a QueryableBatch for all the snapshots - /// Both buffer and snapshots will be empty after this - pub(super) fn snapshot_to_queryable_batch( + /// # Panics + /// + /// This method panics if `sequence_number` is not strictly greater than + /// previous calls. + pub(crate) fn buffer_write( &mut self, - table_name: &TableName, - partition_id: PartitionId, - ) -> Option { - self.generate_snapshot() - .expect("This mutable batch snapshot error should be impossible."); - - let mut data = vec![]; - std::mem::swap(&mut data, &mut self.snapshots); - - // only produce batch if there is any data - if data.is_empty() { - None - } else { - Some(QueryableBatch::new(table_name.clone(), partition_id, data)) - } + mb: MutableBatch, + sequence_number: SequenceNumber, + ) -> Result<(), mutable_batch::Error> { + // Take ownership of the FSM and apply the write. + self.0.mutate(|fsm| match fsm { + // Mutable stats simply have the write applied. + FsmState::Buffering(mut b) => { + let ret = b.write(mb, sequence_number); + (FsmState::Buffering(b), ret) + } + }) } - /// Returns all existing snapshots plus data in the buffer - /// This only read data. Data in the buffer will be kept in the buffer - pub(super) fn buffer_and_snapshots( - &self, - ) -> Result>, crate::data::Error> { - // Existing snapshots - let mut snapshots = self.snapshots.clone(); - - // copy the buffer to a snapshot - let buffer_snapshot = self - .copy_buffer_to_snapshot() - .context(crate::data::BufferToSnapshotSnafu)?; - snapshots.extend(buffer_snapshot); - - Ok(snapshots) + /// Return all data for this buffer, ordered by the [`SequenceNumber`] from + /// which it was buffered with. + pub(crate) fn get_query_data(&mut self) -> Vec> { + // Take ownership of the FSM and return the data within it. + self.0.mutate(|fsm| match fsm { + // The buffering state can return data. + FsmState::Buffering(b) => { + let ret = b.get_query_data(); + (FsmState::Buffering(b), ret) + } + }) } - /// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. - /// - /// # Panic - /// - /// Panics if there is already a persisting batch. - pub(super) fn snapshot_to_persisting( - &mut self, - shard_id: ShardId, - table_id: TableId, - partition_id: PartitionId, - table_name: &TableName, - ) -> Option> { - if self.persisting.is_some() { - panic!("Unable to snapshot while persisting. This is an unexpected state.") - } - - if let Some(queryable_batch) = self.snapshot_to_queryable_batch(table_name, partition_id) { - let persisting_batch = Arc::new(PersistingBatch { - shard_id, - table_id, - partition_id, - object_store_id: Uuid::new_v4(), - data: Arc::new(queryable_batch), - }); - - self.persisting = Some(Arc::clone(&persisting_batch)); - - Some(persisting_batch) - } else { - None - } - } - - /// Return a QueryableBatch of the persisting batch after applying new tombstones - pub(super) fn get_persisting_data(&self) -> Option { - let persisting = match &self.persisting { - Some(p) => p, - None => return None, + // Deconstruct the [`DataBuffer`] into the underlying FSM in a + // [`Persisting`] state, if the buffer contains any data. + pub(crate) fn into_persisting(self) -> Option> { + let p = match self.0.into_inner() { + FsmState::Buffering(b) => { + // Attempt to snapshot the buffer to an immutable state. + match b.snapshot() { + Transition::Ok(b) => b.into_persisting(), + Transition::Unchanged(_) => { + // The buffer contains no data. + return None; + } + } + } }; - // persisting data - Some((*persisting.data).clone()) - } - - /// Return the progress in this DataBuffer - pub(super) fn progress(&self) -> ShardProgress { - let progress = ShardProgress::new(); - - let progress = if let Some(buffer) = &self.buffer { - progress.combine(buffer.progress()) - } else { - progress - }; - - let progress = self.snapshots.iter().fold(progress, |progress, snapshot| { - progress.combine(snapshot.progress()) - }); - - if let Some(persisting) = &self.persisting { - persisting - .data - .data - .iter() - .fold(progress, |progress, snapshot| { - progress.combine(snapshot.progress()) - }) - } else { - progress - } - } - - #[cfg(test)] - pub(super) fn get_snapshots(&self) -> &[Arc] { - self.snapshots.as_ref() - } - - pub(crate) fn mark_persisted(&mut self) { - self.persisting = None; - } -} - -/// BufferBatch is a MutableBatch with its ingesting order, sequence_number, that helps the -/// ingester keep the batches of data in their ingesting order -#[derive(Debug)] -pub(crate) struct BufferBatch { - /// Sequence number of the first write in this batch - pub(crate) min_sequence_number: SequenceNumber, - /// Sequence number of the last write in this batch - pub(super) max_sequence_number: SequenceNumber, - /// Ingesting data - pub(super) data: MutableBatch, -} - -impl BufferBatch { - /// Return the progress in this DataBuffer - fn progress(&self) -> ShardProgress { - ShardProgress::new() - .with_buffered(self.min_sequence_number) - .with_buffered(self.max_sequence_number) - } -} - -#[cfg(test)] -mod tests { - use mutable_batch_lp::test_helpers::lp_to_mutable_batch; - - use super::*; - - #[test] - fn snapshot_empty_buffer_adds_no_snapshots() { - let mut data_buffer = DataBuffer::default(); - - data_buffer.generate_snapshot().unwrap(); - - assert!(data_buffer.snapshots.is_empty()); - } - - #[test] - fn snapshot_buffer_batch_moves_to_snapshots() { - let mut data_buffer = DataBuffer::default(); - - let seq_num1 = SequenceNumber::new(1); - let (_, mutable_batch1) = - lp_to_mutable_batch(r#"foo,t1=asdf iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#); - let buffer_batch1 = BufferBatch { - min_sequence_number: seq_num1, - max_sequence_number: seq_num1, - data: mutable_batch1, - }; - let record_batch1 = buffer_batch1.data.to_arrow(Selection::All).unwrap(); - data_buffer.buffer = Some(buffer_batch1); - - data_buffer.generate_snapshot().unwrap(); - - assert!(data_buffer.buffer.is_none()); - assert_eq!(data_buffer.snapshots.len(), 1); - - let snapshot = &data_buffer.snapshots[0]; - assert_eq!(snapshot.min_sequence_number, seq_num1); - assert_eq!(snapshot.max_sequence_number, seq_num1); - assert_eq!(&*snapshot.data, &record_batch1); + Some(p) } } diff --git a/ingester/src/data/partition/buffer/always_some.rs b/ingester/src/data/partition/buffer/always_some.rs index 2ae59380e0..ce85e4accd 100644 --- a/ingester/src/data/partition/buffer/always_some.rs +++ b/ingester/src/data/partition/buffer/always_some.rs @@ -1,31 +1,8 @@ //! A helper type that ensures an `Option` is always `Some` once the guard is //! dropped. -/// A guard through which a value can be placed back into the [`AlwaysSome`]. -#[derive(Debug)] -#[must_use = "Guard must be used to restore the value"] -pub(super) struct Guard<'a, T>(&'a mut Option); - -impl<'a, T> Guard<'a, T> { - /// Store `value` in the [`AlwaysSome`] for subsequent - /// [`AlwaysSome::take()`] calls. - pub(super) fn store(self, value: T) { - assert!(self.0.is_none()); - *self.0 = Some(value); - } -} - -/// A helper type that aims to ease working with an [`Option`] that must always -/// be restored in a given scope. -/// -/// Accessing the value within an [`AlwaysSome`] returns a [`Guard`], which MUST -/// be used to store the value before going out of scope. Failure to store a -/// value cause a subsequent [`Self::take()`] call to panic. -/// -/// Failing to store a value in the [`Guard`] causes a compiler warning, however -/// this does not prevent failing to return a value to the [`AlwaysSome`] as the -/// warning can be falsely silenced by using it within one conditional code path -/// and not the other. +/// A helper type that aims to ease calling methods on a type that takes `self`, +/// that must always be restored at the end of the method call. #[derive(Debug)] pub(super) struct AlwaysSome(Option); @@ -52,14 +29,14 @@ impl AlwaysSome { Self(Some(value)) } - /// Read the value. - pub(super) fn take(&mut self) -> (Guard<'_, T>, T) { + pub(super) fn mutate(&mut self, f: F) -> R + where + F: FnOnce(T) -> (T, R), + { let value = std::mem::take(&mut self.0); - - ( - Guard(&mut self.0), - value.expect("AlwaysSome value is None!"), - ) + let (value, ret) = f(value.expect("AlwaysSome value is None!")); + self.0 = Some(value); + ret } /// Deconstruct `self`, returning the inner value. @@ -76,24 +53,18 @@ mod tests { fn test_always_some() { let mut a = AlwaysSome::::default(); - let (guard, value) = a.take(); - assert_eq!(value, 0); - guard.store(42); + let ret = a.mutate(|value| { + assert_eq!(value, 0); + (42, true) + }); + assert!(ret); - let (guard, value) = a.take(); - assert_eq!(value, 42); - guard.store(24); + let ret = a.mutate(|value| { + assert_eq!(value, 42); + (13, "bananas") + }); + assert_eq!(ret, "bananas"); - assert_eq!(a.into_inner(), 24); - } - - #[test] - #[should_panic = "AlwaysSome value is None!"] - fn test_drops_guard() { - let mut a = AlwaysSome::::default(); - { - let _ = a.take(); - } - let _ = a.take(); + assert_eq!(a.into_inner(), 13); } } diff --git a/ingester/src/data/partition/buffer/state_machine.rs b/ingester/src/data/partition/buffer/state_machine.rs index 278d0384f9..f15e714a04 100644 --- a/ingester/src/data/partition/buffer/state_machine.rs +++ b/ingester/src/data/partition/buffer/state_machine.rs @@ -31,7 +31,7 @@ pub(crate) enum Transition { impl Transition { /// A helper function to construct [`Self::Ok`] variants. - pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Transition { + pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Self { Self::Ok(BufferState { state: v, sequence_range, @@ -39,7 +39,7 @@ impl Transition { } /// A helper function to construct [`Self::Unchanged`] variants. - pub(super) fn unchanged(v: BufferState) -> Transition { + pub(super) fn unchanged(v: BufferState) -> Self { Self::Unchanged(v) } } @@ -164,7 +164,7 @@ mod tests { // Keep the data to validate they are ref-counted copies after further // writes below. Note this construct allows the caller to decide when/if // to allocate. - let w1_data = buffer.get_query_data().to_owned(); + let w1_data = buffer.get_query_data(); let expected = vec![ "+-------+----------+----------+--------------------------------+", @@ -193,7 +193,7 @@ mod tests { }; // Verify the writes are still queryable. - let w2_data = buffer.get_query_data().to_owned(); + let w2_data = buffer.get_query_data(); let expected = vec![ "+-------+----------+----------+--------------------------------+", "| great | how_much | tag | time |", @@ -214,7 +214,7 @@ mod tests { let same_arcs = w2_data .iter() .zip(second_read.iter()) - .all(|(a, b)| Arc::ptr_eq(a, &b)); + .all(|(a, b)| Arc::ptr_eq(a, b)); assert!(same_arcs); } diff --git a/ingester/src/data/partition/resolver/catalog.rs b/ingester/src/data/partition/resolver/catalog.rs index a7189e632b..ef34b6e681 100644 --- a/ingester/src/data/partition/resolver/catalog.rs +++ b/ingester/src/data/partition/resolver/catalog.rs @@ -157,7 +157,7 @@ mod tests { .repositories() .await .partitions() - .get_by_id(got.id) + .get_by_id(got.partition_id) .await .unwrap() .expect("partition not created"); diff --git a/ingester/src/data/sequence_range.rs b/ingester/src/data/sequence_range.rs index d8e2bb2033..42c4a15baa 100644 --- a/ingester/src/data/sequence_range.rs +++ b/ingester/src/data/sequence_range.rs @@ -39,7 +39,7 @@ impl SequenceNumberRange { let merged_range = self .range .into_iter() - .chain(other.range.clone()) + .chain(other.range) .reduce(|a, b| (a.0.min(b.0), a.1.max(b.1))); Self { diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 3e0fd0d6c4..e314dc3821 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -7,8 +7,11 @@ use mutable_batch::MutableBatch; use observability_deps::tracing::*; use write_summary::ShardProgress; -use super::partition::{resolver::PartitionProvider, PartitionData, UnpersistedPartitionData}; -use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle, querier_handler::PartitionStatus}; +use super::{ + partition::{resolver::PartitionProvider, PartitionData}, + DmlApplyAction, +}; +use crate::lifecycle::LifecycleHandle; /// A double-referenced map where [`PartitionData`] can be looked up by /// [`PartitionKey`], or ID. @@ -72,6 +75,12 @@ impl std::ops::Deref for TableName { } } +impl PartialEq for TableName { + fn eq(&self, other: &str) -> bool { + &*self.0 == other + } +} + /// Data of a Table in a given Namesapce that belongs to a given Shard #[derive(Debug)] pub(crate) struct TableData { @@ -119,16 +128,6 @@ impl TableData { } } - /// Return parquet_max_sequence_number - pub(super) fn parquet_max_sequence_number(&self) -> Option { - self.partition_data - .by_key - .values() - .map(|p| p.max_persisted_sequence_number()) - .max() - .flatten() - } - // buffers the table write and returns true if the lifecycle manager indicates that // ingest should be paused. pub(super) async fn buffer_table_write( @@ -171,7 +170,7 @@ impl TableData { let size = batch.size(); let rows = batch.rows(); - partition_data.buffer_write(sequence_number, batch)?; + partition_data.buffer_write(batch, sequence_number)?; // Record the write as having been buffered. // @@ -191,6 +190,18 @@ impl TableData { Ok(DmlApplyAction::Applied(should_pause)) } + /// Return a mutable reference to all partitions buffered for this table. + /// + /// # Ordering + /// + /// The order of [`PartitionData`] in the iterator is arbitrary and should + /// not be relied upon. + pub(crate) fn partition_iter_mut( + &mut self, + ) -> impl Iterator + ExactSizeIterator { + self.partition_data.by_key.values_mut() + } + /// Return the [`PartitionData`] for the specified ID. #[allow(unused)] pub(crate) fn get_partition( @@ -209,43 +220,12 @@ impl TableData { self.partition_data.by_key(partition_key) } - /// Return the [`PartitionData`] for the specified partition key. - pub(crate) fn get_partition_by_key_mut( - &mut self, - partition_key: &PartitionKey, - ) -> Option<&mut PartitionData> { - self.partition_data.by_key_mut(partition_key) - } - - pub(crate) fn unpersisted_partition_data(&self) -> Vec { - self.partition_data - .by_key - .values() - .map(|p| UnpersistedPartitionData { - partition_id: p.partition_id(), - non_persisted: p - .get_non_persisting_data() - .expect("get_non_persisting should always work"), - persisting: p.get_persisting_data(), - partition_status: PartitionStatus { - parquet_max_sequence_number: p.max_persisted_sequence_number(), - }, - }) - .collect() - } - /// Return progress from this Table pub(super) fn progress(&self) -> ShardProgress { - let progress = ShardProgress::new(); - let progress = match self.parquet_max_sequence_number() { - Some(n) => progress.with_persisted(n), - None => progress, - }; - self.partition_data .by_key .values() - .fold(progress, |progress, partition_data| { + .fold(Default::default(), |progress, partition_data| { progress.combine(partition_data.progress()) }) } @@ -259,6 +239,16 @@ impl TableData { pub(crate) fn table_name(&self) -> &TableName { &self.table_name } + + /// Return the shard ID for this table. + pub(crate) fn shard_id(&self) -> ShardId { + self.shard_id + } + + /// Return the [`NamespaceId`] this table is a part of. + pub fn namespace_id(&self) -> NamespaceId { + self.namespace_id + } } #[cfg(test)] diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index f2ebc67cc0..eee2692e26 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -24,7 +24,7 @@ mod job; pub mod lifecycle; mod poison; pub mod querier_handler; -pub(crate) mod query; +pub(crate) mod query_adaptor; pub mod server; pub(crate) mod stream_handler; diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index 7eaa269289..4851571177 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -1,12 +1,7 @@ //! Handle all requests from Querier -use crate::{ - data::{ - namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName, - IngesterData, - }, - query::QueryableBatch, -}; +use std::{pin::Pin, sync::Arc}; + use arrow::{array::new_null_array, error::ArrowError, record_batch::RecordBatch}; use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use data_types::{PartitionId, SequenceNumber}; @@ -17,9 +12,10 @@ use generated_types::ingester::IngesterQueryRequest; use observability_deps::tracing::debug; use schema::{merge::SchemaMerger, selection::Selection}; use snafu::{ensure, Snafu}; -use std::{pin::Pin, sync::Arc}; use trace::span::{Span, SpanRecorder}; +use crate::data::{namespace::NamespaceName, table::TableName, IngesterData}; + /// Number of table data read locks that shall be acquired in parallel const CONCURRENT_TABLE_DATA_LOCKS: usize = 10; @@ -264,10 +260,11 @@ pub async fn prepare_data_to_querier( ) -> Result { debug!(?request, "prepare_data_to_querier"); - let span_recorder = SpanRecorder::new(span); + let mut span_recorder = SpanRecorder::new(span); - let mut tables_data = vec![]; + let mut table_refs = vec![]; let mut found_namespace = false; + for (shard_id, shard_data) in ingest_data.shards() { debug!(shard_id=%shard_id.get()); let namespace_name = NamespaceName::from(&request.namespace); @@ -293,7 +290,7 @@ pub async fn prepare_data_to_querier( } }; - tables_data.push(table_data); + table_refs.push(table_data); } ensure!( @@ -303,113 +300,83 @@ pub async fn prepare_data_to_querier( }, ); - // acquire locks in parallel - let unpersisted_partitions: Vec<_> = futures::stream::iter(tables_data) - .map(|table_data| async move { - let table_data = table_data.read().await; - table_data.unpersisted_partition_data() - }) - // Note: the order doesn't matter - .buffer_unordered(CONCURRENT_TABLE_DATA_LOCKS) - .concat() - .await; - ensure!( - !unpersisted_partitions.is_empty(), + !table_refs.is_empty(), TableNotFoundSnafu { namespace_name: &request.namespace, table_name: &request.table }, ); - let request = Arc::clone(request); - let partitions = - futures::stream::iter(unpersisted_partitions.into_iter().map(move |partition| { - // extract payload - let partition_id = partition.partition_id; - let status = partition.partition_status.clone(); - let snapshots: Vec<_> = prepare_data_to_querier_for_partition( - partition, - &request, - span_recorder.child_span("ingester prepare data to querier for partition"), - ) - .into_iter() - .map(Ok) - .collect(); - - // Note: include partition in `unpersisted_partitions` even when there we might filter - // out all the data, because the metadata (e.g. max persisted parquet file) is - // important for the querier. - Ok(IngesterQueryPartition::new( - Box::pin(futures::stream::iter(snapshots)), - partition_id, - status, - )) - })); - - Ok(IngesterQueryResponse::new(Box::pin(partitions))) -} - -fn prepare_data_to_querier_for_partition( - unpersisted_partition_data: UnpersistedPartitionData, - request: &IngesterQueryRequest, - span: Option, -) -> Vec { - let mut span_recorder = SpanRecorder::new(span); - - // ------------------------------------------------ - // Accumulate data - - // Make Filters - let selection_columns: Vec<_> = request.columns.iter().map(String::as_str).collect(); - let selection = if selection_columns.is_empty() { - Selection::All - } else { - Selection::Some(&selection_columns) - }; - - // figure out what batches - let queryable_batch = unpersisted_partition_data - .persisting - .unwrap_or_else(|| { - QueryableBatch::new( - request.table.clone().into(), - unpersisted_partition_data.partition_id, - vec![], - ) + // acquire locks and read table data in parallel + let unpersisted_partitions: Vec<_> = futures::stream::iter(table_refs) + .map(|table_data| async move { + let mut table_data = table_data.write().await; + table_data + .partition_iter_mut() + .map(|p| { + ( + p.partition_id(), + p.get_query_data(), + p.max_persisted_sequence_number(), + ) + }) + .collect::>() }) - .with_data(unpersisted_partition_data.non_persisted); + // Note: the order doesn't matter + .buffer_unordered(CONCURRENT_TABLE_DATA_LOCKS) + .concat() + .await; - let streams = queryable_batch - .data - .iter() - .map(|snapshot_batch| { - let batch = snapshot_batch.data.as_ref(); - let schema = batch.schema(); + let request = Arc::clone(request); + let partitions = futures::stream::iter(unpersisted_partitions.into_iter().map( + move |(partition_id, data, max_persisted_sequence_number)| { + let snapshots = match data { + None => Box::pin(futures::stream::empty()) as SnapshotStream, - // Apply selection to in-memory batch - let batch = match selection { - Selection::All => batch.clone(), - Selection::Some(columns) => { - let projection = columns + Some(batch) => { + assert_eq!(partition_id, batch.partition_id()); + + // Project the data if necessary + let columns = request + .columns .iter() - .flat_map(|&column_name| { - // ignore non-existing columns - schema.index_of(column_name).ok() - }) + .map(String::as_str) .collect::>(); - batch.project(&projection).expect("bug in projection") + let selection = if columns.is_empty() { + Selection::All + } else { + Selection::Some(columns.as_ref()) + }; + + let snapshots = batch.project_selection(selection).into_iter().map(|batch| { + // Create a stream from the batch. + Ok(Box::pin(MemoryStream::new(vec![batch])) as SendableRecordBatchStream) + }); + + Box::pin(futures::stream::iter(snapshots)) as SnapshotStream } }; - // create stream - Box::pin(MemoryStream::new(vec![batch])) as SendableRecordBatchStream - }) - .collect(); + // NOTE: the partition persist watermark MUST always be provided to + // the querier for any partition that has performed (or is aware of) + // a persist operation. + // + // This allows the querier to use the per-partition persist marker + // when planning queries. + Ok(IngesterQueryPartition::new( + snapshots, + partition_id, + PartitionStatus { + parquet_max_sequence_number: max_persisted_sequence_number, + }, + )) + }, + )); span_recorder.ok("done"); - streams + Ok(IngesterQueryResponse::new(Box::pin(partitions))) } #[cfg(test)] @@ -427,7 +394,7 @@ mod tests { use predicate::Predicate; use super::*; - use crate::test_util::{make_ingester_data, DataLocation, TEST_NAMESPACE, TEST_TABLE}; + use crate::test_util::{make_ingester_data, TEST_NAMESPACE, TEST_TABLE}; #[tokio::test] async fn test_ingester_query_response_flatten() { @@ -517,23 +484,11 @@ mod tests { async fn test_prepare_data_to_querier() { test_helpers::maybe_start_logging(); - let span = None; - // make 14 scenarios for ingester data let mut scenarios = vec![]; for two_partitions in [false, true] { - for loc in [ - DataLocation::BUFFER, - DataLocation::BUFFER_SNAPSHOT, - DataLocation::BUFFER_PERSISTING, - DataLocation::BUFFER_SNAPSHOT_PERSISTING, - DataLocation::SNAPSHOT, - DataLocation::SNAPSHOT_PERSISTING, - DataLocation::PERSISTING, - ] { - let scenario = Arc::new(make_ingester_data(two_partitions, loc).await); - scenarios.push((loc, scenario)); - } + let scenario = Arc::new(make_ingester_data(two_partitions).await); + scenarios.push(scenario); } // read data from all scenarios without any filters @@ -557,9 +512,8 @@ mod tests { "| Wilmington | mon | | 1970-01-01T00:00:00.000000035Z |", // in group 3 - seq_num: 6 "+------------+-----+------+--------------------------------+", ]; - for (loc, scenario) in &scenarios { - println!("Location: {loc:?}"); - let result = prepare_data_to_querier(scenario, &request, span.clone()) + for scenario in &scenarios { + let result = prepare_data_to_querier(scenario, &request, None) .await .unwrap() .into_record_batches() @@ -593,9 +547,8 @@ mod tests { "| Wilmington | | 1970-01-01T00:00:00.000000035Z |", "+------------+------+--------------------------------+", ]; - for (loc, scenario) in &scenarios { - println!("Location: {loc:?}"); - let result = prepare_data_to_querier(scenario, &request, span.clone()) + for scenario in &scenarios { + let result = prepare_data_to_querier(scenario, &request, None) .await .unwrap() .into_record_batches() @@ -638,9 +591,8 @@ mod tests { "| Wilmington | | 1970-01-01T00:00:00.000000035Z |", "+------------+------+--------------------------------+", ]; - for (loc, scenario) in &scenarios { - println!("Location: {loc:?}"); - let result = prepare_data_to_querier(scenario, &request, span.clone()) + for scenario in &scenarios { + let result = prepare_data_to_querier(scenario, &request, None) .await .unwrap() .into_record_batches() @@ -655,9 +607,8 @@ mod tests { vec![], None, )); - for (loc, scenario) in &scenarios { - println!("Location: {loc:?}"); - let err = prepare_data_to_querier(scenario, &request, span.clone()) + for scenario in &scenarios { + let err = prepare_data_to_querier(scenario, &request, None) .await .unwrap_err(); assert_matches!(err, Error::TableNotFound { .. }); @@ -670,9 +621,8 @@ mod tests { vec![], None, )); - for (loc, scenario) in &scenarios { - println!("Location: {loc:?}"); - let err = prepare_data_to_querier(scenario, &request, span.clone()) + for scenario in &scenarios { + let err = prepare_data_to_querier(scenario, &request, None) .await .unwrap_err(); assert_matches!(err, Error::NamespaceNotFound { .. }); diff --git a/ingester/src/query.rs b/ingester/src/query_adaptor.rs similarity index 57% rename from ingester/src/query.rs rename to ingester/src/query_adaptor.rs index dc38001e4f..67902ab83f 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query_adaptor.rs @@ -1,12 +1,12 @@ -//! Module to handle query on Ingester's data +//! An adaptor over a set of [`RecordBatch`] allowing them to be used as an IOx +//! [`QueryChunk`]. use std::{any::Any, sync::Arc}; use arrow::record_batch::RecordBatch; use arrow_util::util::ensure_schema; use data_types::{ - ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, - TimestampMinMax, + ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax, }; use datafusion::{ error::DataFusionError, @@ -21,11 +21,12 @@ use iox_query::{ QueryChunk, QueryChunkMeta, }; use observability_deps::tracing::trace; +use once_cell::sync::OnceCell; use predicate::Predicate; use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema}; use snafu::{ResultExt, Snafu}; -use crate::data::{partition::SnapshotBatch, table::TableName}; +use crate::data::table::TableName; #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] @@ -47,72 +48,106 @@ pub enum Error { /// A specialized `Error` for Ingester's Query errors pub type Result = std::result::Result; -/// Queryable data used for both query and persistence +/// A queryable wrapper over a set of ordered [`RecordBatch`] snapshot from a +/// single [`PartitionData`]. +/// +/// It is an invariant that a [`QueryAdaptor`] MUST always contain at least one +/// row. This frees the caller of having to reason about empty [`QueryAdaptor`] +/// instances yielding empty [`RecordBatch`]. +/// +/// [`PartitionData`]: crate::data::partition::PartitionData #[derive(Debug, PartialEq, Clone)] -pub(crate) struct QueryableBatch { - /// data - pub(crate) data: Vec>, +pub(crate) struct QueryAdaptor { + /// The snapshot data from a partition. + /// + /// This MUST be non-pub / closed for modification / immutable to support + /// interning the merged schema in [`Self::schema()`]. + data: Vec>, - /// This is needed to return a reference for a trait function - pub(crate) table_name: TableName, + /// The name of the table this data is part of. + table_name: TableName, - /// Partition ID - pub(crate) partition_id: PartitionId, + /// The catalog ID of the partition the this data is part of. + partition_id: PartitionId, + + /// An interned schema for all [`RecordBatch`] in data. + schema: OnceCell>, } -impl QueryableBatch { - /// Initilaize a QueryableBatch +impl QueryAdaptor { + /// Construct a [`QueryAdaptor`]. + /// + /// # Panics + /// + /// This constructor panics if `data` contains no [`RecordBatch`], or all + /// [`RecordBatch`] are empty. pub(crate) fn new( table_name: TableName, partition_id: PartitionId, - data: Vec>, + data: Vec>, ) -> Self { + // There must always be at least one record batch and one row. + // + // This upholds an invariant that simplifies dealing with empty + // partitions - if there is a QueryAdaptor, it contains data. + assert!(data.iter().map(|b| b.num_rows()).sum::() > 0); + Self { data, table_name, partition_id, + schema: OnceCell::default(), } } - /// Add snapshots to this batch - pub(crate) fn with_data(mut self, mut data: Vec>) -> Self { - self.data.append(&mut data); - self + pub(crate) fn project_selection(&self, selection: Selection<'_>) -> Vec { + // Project the column selection across all RecordBatch + self.data + .iter() + .map(|data| { + let batch = data.as_ref(); + let schema = batch.schema(); + + // Apply selection to in-memory batch + match selection { + Selection::All => batch.clone(), + Selection::Some(columns) => { + let projection = columns + .iter() + .flat_map(|&column_name| { + // ignore non-existing columns + schema.index_of(column_name).ok() + }) + .collect::>(); + batch.project(&projection).expect("bug in projection") + } + } + }) + .collect() } - /// return min and max of all the snapshots - pub(crate) fn min_max_sequence_numbers(&self) -> (SequenceNumber, SequenceNumber) { - let min = self - .data - .first() - .expect("The Queryable Batch should not empty") - .min_sequence_number; + /// Returns the [`RecordBatch`] instances in this [`QueryAdaptor`]. + pub(crate) fn record_batches(&self) -> &[Arc] { + self.data.as_ref() + } - let max = self - .data - .first() - .expect("The Queryable Batch should not empty") - .max_sequence_number; - - assert!(min <= max); - - (min, max) + /// Returns the partition ID from which the data this [`QueryAdaptor`] was + /// sourced from. + pub(crate) fn partition_id(&self) -> PartitionId { + self.partition_id } } -impl QueryChunkMeta for QueryableBatch { +impl QueryChunkMeta for QueryAdaptor { fn summary(&self) -> Option> { None } fn schema(&self) -> Arc { - // TODO: may want store this schema as a field of QueryableBatch and - // only do this schema merge the first time it is called - - // Merge schema of all RecordBatches of the PerstingBatch - let batches: Vec> = - self.data.iter().map(|s| Arc::clone(&s.data)).collect(); - merge_record_batch_schemas(&batches) + Arc::clone( + self.schema + .get_or_init(|| merge_record_batch_schemas(&self.data)), + ) } fn partition_sort_key(&self) -> Option<&SortKey> { @@ -139,11 +174,11 @@ impl QueryChunkMeta for QueryableBatch { } } -impl QueryChunk for QueryableBatch { +impl QueryChunk for QueryAdaptor { // This function should not be used in QueryBatch context fn id(&self) -> ChunkId { - // To return a value for debugging and make it consistent with ChunkId created in Compactor, - // use Uuid for this + // To return a value for debugging and make it consistent with ChunkId + // created in Compactor, use Uuid for this ChunkId::new() } @@ -152,10 +187,11 @@ impl QueryChunk for QueryableBatch { &self.table_name } - /// Returns true if the chunk may contain a duplicate "primary - /// key" within itself + /// Returns true if the chunk may contain a duplicate "primary key" within + /// itself fn may_contain_pk_duplicates(&self) -> bool { - // always true because they are not deduplicated yet + // always true because the rows across record batches have not been + // de-duplicated. true } @@ -204,22 +240,15 @@ impl QueryChunk for QueryableBatch { .context(SchemaSnafu) .map_err(|e| DataFusionError::External(Box::new(e)))?; - // Get all record batches from their snapshots + // Apply the projection over all the data in self, ensuring each batch + // has the specified schema. let batches = self - .data - .iter() - .filter_map(|snapshot| { - let batch = snapshot - // Only return columns in the selection - .scan(selection) - .context(FilterColumnsSnafu {}) - .transpose()? - // ensure batch has desired schema - .and_then(|batch| { - ensure_schema(&schema.as_arrow(), &batch).context(ConcatBatchesSnafu {}) - }) - .map(Arc::new); - Some(batch) + .project_selection(selection) + .into_iter() + .map(|batch| { + ensure_schema(&schema.as_arrow(), &batch) + .context(ConcatBatchesSnafu {}) + .map(Arc::new) }) .collect::, _>>() .map_err(|e| DataFusionError::External(Box::new(e)))?; @@ -233,10 +262,9 @@ impl QueryChunk for QueryableBatch { /// Returns chunk type fn chunk_type(&self) -> &str { - "PersistingBatch" + "QueryAdaptor" } - // This function should not be used in PersistingBatch context fn order(&self) -> ChunkOrder { unimplemented!() } diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 693d9cbdc9..e58c573e11 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -7,9 +7,8 @@ use std::{sync::Arc, time::Duration}; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; -use bitflags::bitflags; use data_types::{ - NamespaceId, PartitionId, PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId, + NamespaceId, PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId, }; use dml::{DmlMeta, DmlOperation, DmlWrite}; use iox_catalog::{interface::Catalog, mem::MemCatalog}; @@ -17,71 +16,12 @@ use iox_query::test::{raw_data, TestChunk}; use iox_time::{SystemProvider, Time}; use mutable_batch_lp::lines_to_batches; use object_store::memory::InMemory; -use uuid::Uuid; use crate::{ - data::{ - partition::{resolver::CatalogPartitionResolver, PersistingBatch, SnapshotBatch}, - IngesterData, - }, + data::{partition::resolver::CatalogPartitionResolver, IngesterData}, lifecycle::{LifecycleConfig, LifecycleManager}, - query::QueryableBatch, }; -#[allow(clippy::too_many_arguments)] -pub(crate) fn make_persisting_batch( - shard_id: i64, - seq_num_start: i64, - table_id: i64, - table_name: &str, - partition_id: i64, - object_store_id: Uuid, - batches: Vec>, -) -> Arc { - let queryable_batch = make_queryable_batch(table_name, partition_id, seq_num_start, batches); - Arc::new(PersistingBatch { - shard_id: ShardId::new(shard_id), - table_id: TableId::new(table_id), - partition_id: PartitionId::new(partition_id), - object_store_id, - data: queryable_batch, - }) -} - -pub(crate) fn make_queryable_batch( - table_name: &str, - partition_id: i64, - seq_num_start: i64, - batches: Vec>, -) -> Arc { - // make snapshots for the batches - let mut snapshots = vec![]; - let mut seq_num = seq_num_start; - for batch in batches { - let seq = SequenceNumber::new(seq_num); - snapshots.push(Arc::new(make_snapshot_batch(batch, seq, seq))); - seq_num += 1; - } - - Arc::new(QueryableBatch::new( - table_name.into(), - PartitionId::new(partition_id), - snapshots, - )) -} - -pub(crate) fn make_snapshot_batch( - batch: Arc, - min: SequenceNumber, - max: SequenceNumber, -) -> SnapshotBatch { - SnapshotBatch { - min_sequence_number: min, - max_sequence_number: max, - data: batch, - } -} - pub(crate) async fn create_one_row_record_batch_with_influxtype() -> Vec> { let chunk1 = Arc::new( TestChunk::new("t") @@ -506,32 +446,9 @@ pub(crate) const TEST_TABLE: &str = "test_table"; pub(crate) const TEST_PARTITION_1: &str = "test+partition_1"; pub(crate) const TEST_PARTITION_2: &str = "test+partition_2"; -bitflags! { - /// Make the same in-memory data but data are split between: - /// . one or two partition - /// . The first partition will have a choice to have data in either - /// . buffer only - /// . snapshot only - /// . persisting only - /// . buffer + snapshot - /// . buffer + persisting - /// . snapshot + persisting - /// . buffer + snapshot + persisting - /// . If the second partittion exists, it only has data in its buffer - pub(crate) struct DataLocation: u8 { - const BUFFER = 0b001; - const SNAPSHOT = 0b010; - const PERSISTING = 0b100; - const BUFFER_SNAPSHOT = Self::BUFFER.bits | Self::SNAPSHOT.bits; - const BUFFER_PERSISTING = Self::BUFFER.bits | Self::PERSISTING.bits; - const SNAPSHOT_PERSISTING = Self::SNAPSHOT.bits | Self::PERSISTING.bits; - const BUFFER_SNAPSHOT_PERSISTING = Self::BUFFER.bits | Self::SNAPSHOT.bits | Self::PERSISTING.bits; - } -} - /// This function produces one scenario but with the parameter combination (2*7), /// you will be able to produce 14 scenarios by calling it in 2 loops -pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterData { +pub(crate) async fn make_ingester_data(two_partitions: bool) -> IngesterData { // Whatever data because they won't be used in the tests let metrics: Arc = Default::default(); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); @@ -576,26 +493,6 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation) .unwrap(); } - if loc.contains(DataLocation::PERSISTING) { - // Move partition 1 data to persisting - let _ignored = ingester - .shard(shard_id) - .unwrap() - .namespace(&TEST_NAMESPACE.into()) - .unwrap() - .snapshot_to_persisting(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1)) - .await; - } else if loc.contains(DataLocation::SNAPSHOT) { - // move partition 1 data to snapshot - let _ignored = ingester - .shard(shard_id) - .unwrap() - .namespace(&TEST_NAMESPACE.into()) - .unwrap() - .snapshot(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1)) - .await; - } - ingester } diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index 8b9912f45e..d649345e6b 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -40,8 +40,7 @@ mod deduplicate; pub mod overlap; mod physical; use self::overlap::group_potential_duplicates; -pub(crate) use deduplicate::DeduplicateExec; -pub use deduplicate::RecordBatchDeduplicator; +pub use deduplicate::{DeduplicateExec, RecordBatchDeduplicator}; pub(crate) use physical::IOxReadFilterNode; #[derive(Debug, Snafu)]