refactor(ingester): use partition buffer FSM
This commit makes use of the partition buffer state machine introduced in https://github.com/influxdata/influxdb_iox/pull/5943. This commit significantly changes the buffering, and querying, of data from a partition, swapping out the existing "DataBuffer" for the new state machine implementation (itself simplified due to temporary lack of incremental snapshot generation, see #5944). This commit simplifies the query path, removing multiple types that wrapped one-another to pass around various state necessary to perform a query, with various query functions needing different types or combinations of types. The query path now operates using a single type (named "QueryAdaptor") that provides a queryable interface over the set of RecordBatch returned from a partition. There is significantly increased testing of the PartitionData itself, covering data in various states and the ordering of returned RecordBatch (to ensure correct materialisation of updates). There are also invariants upheld by the type system / compiler to minimise the complexities of working with empty batches & states, and many asserts that ensure (mostly existing!) invariants are upheld.pull/24376/head
parent
5f35e88706
commit
678fb81892
|
@ -2279,7 +2279,6 @@ dependencies = [
|
|||
"assert_matches",
|
||||
"async-trait",
|
||||
"backoff",
|
||||
"bitflags",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"data_types",
|
||||
|
@ -2293,6 +2292,7 @@ dependencies = [
|
|||
"iox_catalog",
|
||||
"iox_query",
|
||||
"iox_time",
|
||||
"lazy_static",
|
||||
"metric",
|
||||
"mutable_batch",
|
||||
"mutable_batch_lp",
|
||||
|
|
|
@ -47,11 +47,11 @@ write_summary = { path = "../write_summary" }
|
|||
tokio-util = { version = "0.7.4" }
|
||||
trace = { path = "../trace" }
|
||||
rand = "0.8.5"
|
||||
once_cell = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
assert_matches = "1.5.0"
|
||||
bitflags = {version = "1.3.2"}
|
||||
once_cell = "1"
|
||||
lazy_static = "1.4.0"
|
||||
paste = "1.0.9"
|
||||
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
|
||||
tokio-stream = {version = "0.1.11", default_features = false }
|
||||
|
|
|
@ -11,7 +11,7 @@ use iox_query::{
|
|||
use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use crate::{data::partition::PersistingBatch, query::QueryableBatch};
|
||||
use crate::query_adaptor::QueryAdaptor;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
|
@ -85,14 +85,14 @@ impl std::fmt::Debug for CompactedStream {
|
|||
}
|
||||
}
|
||||
|
||||
/// Compact a given persisting batch into a [`CompactedStream`] or
|
||||
/// `None` if there is no data to compact.
|
||||
/// Compact a given batch into a [`CompactedStream`] or `None` if there is no
|
||||
/// data to compact, returning an updated sort key, if any.
|
||||
pub(crate) async fn compact_persisting_batch(
|
||||
executor: &Executor,
|
||||
sort_key: Option<SortKey>,
|
||||
batch: Arc<PersistingBatch>,
|
||||
batch: QueryAdaptor,
|
||||
) -> Result<CompactedStream> {
|
||||
assert!(!batch.data.data.is_empty());
|
||||
assert!(!batch.record_batches().is_empty());
|
||||
|
||||
// Get sort key from the catalog or compute it from
|
||||
// cardinality.
|
||||
|
@ -104,12 +104,12 @@ pub(crate) async fn compact_persisting_batch(
|
|||
//
|
||||
// If there are any new columns, add them to the end of the sort key in the catalog and
|
||||
// return that to be updated in the catalog.
|
||||
adjust_sort_key_columns(&sk, &batch.data.schema().primary_key())
|
||||
adjust_sort_key_columns(&sk, &batch.schema().primary_key())
|
||||
}
|
||||
None => {
|
||||
let sort_key = compute_sort_key(
|
||||
batch.data.schema().as_ref(),
|
||||
batch.data.data.iter().map(|sb| sb.data.as_ref()),
|
||||
batch.schema().as_ref(),
|
||||
batch.record_batches().iter().map(|sb| sb.as_ref()),
|
||||
);
|
||||
// Use the sort key computed from the cardinality as the sort key for this parquet
|
||||
// file's metadata, also return the sort key to be stored in the catalog
|
||||
|
@ -118,7 +118,7 @@ pub(crate) async fn compact_persisting_batch(
|
|||
};
|
||||
|
||||
// Compact
|
||||
let stream = compact(executor, Arc::clone(&batch.data), data_sort_key.clone()).await?;
|
||||
let stream = compact(executor, Arc::new(batch), data_sort_key.clone()).await?;
|
||||
|
||||
Ok(CompactedStream {
|
||||
stream,
|
||||
|
@ -127,10 +127,10 @@ pub(crate) async fn compact_persisting_batch(
|
|||
})
|
||||
}
|
||||
|
||||
/// Compact a given Queryable Batch
|
||||
/// Compact a given batch without updating the sort key.
|
||||
pub(crate) async fn compact(
|
||||
executor: &Executor,
|
||||
data: Arc<QueryableBatch>,
|
||||
data: Arc<QueryAdaptor>,
|
||||
sort_key: SortKey,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
// Build logical plan for compaction
|
||||
|
@ -157,9 +157,9 @@ pub(crate) async fn compact(
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow_util::assert_batches_eq;
|
||||
use data_types::PartitionId;
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use schema::selection::Selection;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::{
|
||||
|
@ -169,14 +169,14 @@ mod tests {
|
|||
create_batches_with_influxtype_same_columns_different_type,
|
||||
create_one_record_batch_with_influxtype_duplicates,
|
||||
create_one_record_batch_with_influxtype_no_duplicates,
|
||||
create_one_row_record_batch_with_influxtype, make_persisting_batch, make_queryable_batch,
|
||||
create_one_row_record_batch_with_influxtype,
|
||||
};
|
||||
|
||||
// this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782
|
||||
// where if sending in a single row it would compact into an output of two batches, one of
|
||||
// which was empty, which would cause this to panic.
|
||||
#[tokio::test]
|
||||
async fn test_compact_persisting_batch_on_one_record_batch_with_one_row() {
|
||||
async fn test_compact_batch_on_one_record_batch_with_one_row() {
|
||||
// create input data
|
||||
let batch = lines_to_batches("cpu bar=2 20", 0)
|
||||
.unwrap()
|
||||
|
@ -184,26 +184,15 @@ mod tests {
|
|||
.unwrap()
|
||||
.to_arrow(Selection::All)
|
||||
.unwrap();
|
||||
let batches = vec![Arc::new(batch)];
|
||||
// build persisting batch from the input batches
|
||||
let uuid = Uuid::new_v4();
|
||||
let table_name = "test_table";
|
||||
let shard_id = 1;
|
||||
let seq_num_start: i64 = 1;
|
||||
let table_id = 1;
|
||||
let partition_id = 1;
|
||||
let persisting_batch = make_persisting_batch(
|
||||
shard_id,
|
||||
seq_num_start,
|
||||
table_id,
|
||||
table_name,
|
||||
partition_id,
|
||||
uuid,
|
||||
batches,
|
||||
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
vec![Arc::new(batch)],
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = persisting_batch.data.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
@ -211,7 +200,7 @@ mod tests {
|
|||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let CompactedStream { stream, .. } =
|
||||
compact_persisting_batch(&exc, Some(SortKey::empty()), persisting_batch)
|
||||
compact_persisting_batch(&exc, Some(SortKey::empty()), batch)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -232,29 +221,16 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compact_persisting_batch_on_one_record_batch_no_dupilcates() {
|
||||
async fn test_compact_batch_on_one_record_batch_no_dupilcates() {
|
||||
// create input data
|
||||
let batches = create_one_record_batch_with_influxtype_no_duplicates().await;
|
||||
|
||||
// build persisting batch from the input batches
|
||||
let uuid = Uuid::new_v4();
|
||||
let table_name = "test_table";
|
||||
let shard_id = 1;
|
||||
let seq_num_start: i64 = 1;
|
||||
let table_id = 1;
|
||||
let partition_id = 1;
|
||||
let persisting_batch = make_persisting_batch(
|
||||
shard_id,
|
||||
seq_num_start,
|
||||
table_id,
|
||||
table_name,
|
||||
partition_id,
|
||||
uuid,
|
||||
batches,
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_one_record_batch_with_influxtype_no_duplicates().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = persisting_batch.data.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
@ -265,7 +241,7 @@ mod tests {
|
|||
stream,
|
||||
data_sort_key,
|
||||
catalog_sort_key_update,
|
||||
} = compact_persisting_batch(&exc, Some(SortKey::empty()), persisting_batch)
|
||||
} = compact_persisting_batch(&exc, Some(SortKey::empty()), batch)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -295,29 +271,16 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compact_persisting_batch_no_sort_key() {
|
||||
async fn test_compact_batch_no_sort_key() {
|
||||
// create input data
|
||||
let batches = create_batches_with_influxtype_different_cardinality().await;
|
||||
|
||||
// build persisting batch from the input batches
|
||||
let uuid = Uuid::new_v4();
|
||||
let table_name = "test_table";
|
||||
let shard_id = 1;
|
||||
let seq_num_start: i64 = 1;
|
||||
let table_id = 1;
|
||||
let partition_id = 1;
|
||||
let persisting_batch = make_persisting_batch(
|
||||
shard_id,
|
||||
seq_num_start,
|
||||
table_id,
|
||||
table_name,
|
||||
partition_id,
|
||||
uuid,
|
||||
batches,
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_batches_with_influxtype_different_cardinality().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = persisting_batch.data.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "tag3", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
@ -329,7 +292,7 @@ mod tests {
|
|||
stream,
|
||||
data_sort_key,
|
||||
catalog_sort_key_update,
|
||||
} = compact_persisting_batch(&exc, Some(SortKey::empty()), persisting_batch)
|
||||
} = compact_persisting_batch(&exc, Some(SortKey::empty()), batch)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -363,29 +326,16 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compact_persisting_batch_with_specified_sort_key() {
|
||||
async fn test_compact_batch_with_specified_sort_key() {
|
||||
// create input data
|
||||
let batches = create_batches_with_influxtype_different_cardinality().await;
|
||||
|
||||
// build persisting batch from the input batches
|
||||
let uuid = Uuid::new_v4();
|
||||
let table_name = "test_table";
|
||||
let shard_id = 1;
|
||||
let seq_num_start: i64 = 1;
|
||||
let table_id = 1;
|
||||
let partition_id = 1;
|
||||
let persisting_batch = make_persisting_batch(
|
||||
shard_id,
|
||||
seq_num_start,
|
||||
table_id,
|
||||
table_name,
|
||||
partition_id,
|
||||
uuid,
|
||||
batches,
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_batches_with_influxtype_different_cardinality().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = persisting_batch.data.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "tag3", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
@ -401,7 +351,7 @@ mod tests {
|
|||
} = compact_persisting_batch(
|
||||
&exc,
|
||||
Some(SortKey::from_columns(["tag3", "tag1", "time"])),
|
||||
persisting_batch,
|
||||
batch,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -435,29 +385,16 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compact_persisting_batch_new_column_for_sort_key() {
|
||||
async fn test_compact_batch_new_column_for_sort_key() {
|
||||
// create input data
|
||||
let batches = create_batches_with_influxtype_different_cardinality().await;
|
||||
|
||||
// build persisting batch from the input batches
|
||||
let uuid = Uuid::new_v4();
|
||||
let table_name = "test_table";
|
||||
let shard_id = 1;
|
||||
let seq_num_start: i64 = 1;
|
||||
let table_id = 1;
|
||||
let partition_id = 1;
|
||||
let persisting_batch = make_persisting_batch(
|
||||
shard_id,
|
||||
seq_num_start,
|
||||
table_id,
|
||||
table_name,
|
||||
partition_id,
|
||||
uuid,
|
||||
batches,
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_batches_with_influxtype_different_cardinality().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = persisting_batch.data.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "tag3", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
@ -471,13 +408,9 @@ mod tests {
|
|||
stream,
|
||||
data_sort_key,
|
||||
catalog_sort_key_update,
|
||||
} = compact_persisting_batch(
|
||||
&exc,
|
||||
Some(SortKey::from_columns(["tag3", "time"])),
|
||||
persisting_batch,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
} = compact_persisting_batch(&exc, Some(SortKey::from_columns(["tag3", "time"])), batch)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
|
@ -511,29 +444,16 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compact_persisting_batch_missing_column_for_sort_key() {
|
||||
async fn test_compact_batch_missing_column_for_sort_key() {
|
||||
// create input data
|
||||
let batches = create_batches_with_influxtype_different_cardinality().await;
|
||||
|
||||
// build persisting batch from the input batches
|
||||
let uuid = Uuid::new_v4();
|
||||
let table_name = "test_table";
|
||||
let shard_id = 1;
|
||||
let seq_num_start: i64 = 1;
|
||||
let table_id = 1;
|
||||
let partition_id = 1;
|
||||
let persisting_batch = make_persisting_batch(
|
||||
shard_id,
|
||||
seq_num_start,
|
||||
table_id,
|
||||
table_name,
|
||||
partition_id,
|
||||
uuid,
|
||||
batches,
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_batches_with_influxtype_different_cardinality().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = persisting_batch.data.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "tag3", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
@ -550,7 +470,7 @@ mod tests {
|
|||
} = compact_persisting_batch(
|
||||
&exc,
|
||||
Some(SortKey::from_columns(["tag3", "tag1", "tag4", "time"])),
|
||||
persisting_batch,
|
||||
batch,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -588,26 +508,25 @@ mod tests {
|
|||
test_helpers::maybe_start_logging();
|
||||
|
||||
// create input data
|
||||
let batches = create_one_row_record_batch_with_influxtype().await;
|
||||
|
||||
// build queryable batch from the input batches
|
||||
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_one_row_record_batch_with_influxtype().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = compact_batch.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let sort_key = compute_sort_key(
|
||||
&schema,
|
||||
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
|
||||
);
|
||||
let sort_key =
|
||||
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
|
||||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
|
||||
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
|
||||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -629,26 +548,25 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_compact_one_batch_with_duplicates() {
|
||||
// create input data
|
||||
let batches = create_one_record_batch_with_influxtype_duplicates().await;
|
||||
|
||||
// build queryable batch from the input batches
|
||||
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_one_record_batch_with_influxtype_duplicates().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = compact_batch.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let sort_key = compute_sort_key(
|
||||
&schema,
|
||||
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
|
||||
);
|
||||
let sort_key =
|
||||
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
|
||||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
|
||||
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
|
||||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -678,26 +596,25 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_compact_many_batches_same_columns_with_duplicates() {
|
||||
// create many-batches input data
|
||||
let batches = create_batches_with_influxtype().await;
|
||||
|
||||
// build queryable batch from the input batches
|
||||
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_batches_with_influxtype().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = compact_batch.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let sort_key = compute_sort_key(
|
||||
&schema,
|
||||
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
|
||||
);
|
||||
let sort_key =
|
||||
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
|
||||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
|
||||
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
|
||||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -724,26 +641,25 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_compact_many_batches_different_columns_with_duplicates() {
|
||||
// create many-batches input data
|
||||
let batches = create_batches_with_influxtype_different_columns().await;
|
||||
|
||||
// build queryable batch from the input batches
|
||||
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_batches_with_influxtype_different_columns().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = compact_batch.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "tag2", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let sort_key = compute_sort_key(
|
||||
&schema,
|
||||
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
|
||||
);
|
||||
let sort_key =
|
||||
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
|
||||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
|
||||
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
|
||||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -774,26 +690,25 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_compact_many_batches_different_columns_different_order_with_duplicates() {
|
||||
// create many-batches input data
|
||||
let batches = create_batches_with_influxtype_different_columns_different_order().await;
|
||||
|
||||
// build queryable batch from the input batches
|
||||
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_batches_with_influxtype_different_columns_different_order().await,
|
||||
);
|
||||
|
||||
// verify PK
|
||||
let schema = compact_batch.schema();
|
||||
let schema = batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "tag2", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let sort_key = compute_sort_key(
|
||||
&schema,
|
||||
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
|
||||
);
|
||||
let sort_key =
|
||||
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
|
||||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
|
||||
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
|
||||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -823,68 +738,17 @@ mod tests {
|
|||
assert_batches_eq!(&expected, &output_batches);
|
||||
}
|
||||
|
||||
// BUG
|
||||
#[tokio::test]
|
||||
async fn test_compact_many_batches_different_columns_different_order_with_duplicates2() {
|
||||
// create many-batches input data
|
||||
let batches = create_batches_with_influxtype_different_columns_different_order().await;
|
||||
|
||||
// build queryable batch from the input batches
|
||||
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
|
||||
|
||||
// verify PK
|
||||
let schema = compact_batch.schema();
|
||||
let pk = schema.primary_key();
|
||||
let expected_pk = vec!["tag1", "tag2", "time"];
|
||||
assert_eq!(expected_pk, pk);
|
||||
|
||||
let sort_key = compute_sort_key(
|
||||
&schema,
|
||||
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
|
||||
);
|
||||
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
|
||||
|
||||
// compact
|
||||
let exc = Executor::new(1);
|
||||
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
|
||||
let output_batches = datafusion::physical_plan::common::collect(stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// verify compacted data
|
||||
// data is sorted and all duplicates are removed
|
||||
let expected = vec![
|
||||
"+-----------+------+------+--------------------------------+",
|
||||
"| field_int | tag1 | tag2 | time |",
|
||||
"+-----------+------+------+--------------------------------+",
|
||||
"| 5 | | AL | 1970-01-01T00:00:00.000005Z |",
|
||||
"| 10 | | AL | 1970-01-01T00:00:00.000007Z |",
|
||||
"| 70 | | CT | 1970-01-01T00:00:00.000000100Z |",
|
||||
"| 1000 | | CT | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 100 | | MA | 1970-01-01T00:00:00.000000050Z |",
|
||||
"| 10 | AL | MA | 1970-01-01T00:00:00.000000050Z |",
|
||||
"| 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |",
|
||||
"| 70 | CT | CT | 1970-01-01T00:00:00.000000500Z |",
|
||||
"| 30 | MT | AL | 1970-01-01T00:00:00.000000005Z |",
|
||||
"| 20 | MT | AL | 1970-01-01T00:00:00.000007Z |",
|
||||
"| 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 1000 | MT | CT | 1970-01-01T00:00:00.000002Z |",
|
||||
"+-----------+------+------+--------------------------------+",
|
||||
];
|
||||
|
||||
assert_batches_eq!(&expected, &output_batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "Schemas compatible")]
|
||||
async fn test_compact_many_batches_same_columns_different_types() {
|
||||
// create many-batches input data
|
||||
let batches = create_batches_with_influxtype_same_columns_different_type().await;
|
||||
let batch = QueryAdaptor::new(
|
||||
"test_table".into(),
|
||||
PartitionId::new(1),
|
||||
create_batches_with_influxtype_same_columns_different_type().await,
|
||||
);
|
||||
|
||||
// build queryable batch from the input batches
|
||||
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
|
||||
|
||||
// the schema merge will thorw a panic
|
||||
compact_batch.schema();
|
||||
// the schema merge should throw a panic
|
||||
batch.schema();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ use parquet_file::{
|
|||
storage::{ParquetStorage, StorageId},
|
||||
};
|
||||
use snafu::{OptionExt, Snafu};
|
||||
use uuid::Uuid;
|
||||
use write_summary::ShardProgress;
|
||||
|
||||
use crate::{
|
||||
|
@ -29,9 +30,12 @@ use crate::{
|
|||
|
||||
pub(crate) mod namespace;
|
||||
pub mod partition;
|
||||
mod sequence_range;
|
||||
pub(crate) mod shard;
|
||||
pub(crate) mod table;
|
||||
|
||||
pub(crate) use sequence_range::*;
|
||||
|
||||
use self::{partition::resolver::PartitionProvider, shard::ShardData};
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -245,26 +249,32 @@ impl Persister for IngesterData {
|
|||
) {
|
||||
// lookup the state from the ingester data. If something isn't found,
|
||||
// it's unexpected. Crash so someone can take a look.
|
||||
let shard_data = self
|
||||
let namespace = self
|
||||
.shards
|
||||
.get(&shard_id)
|
||||
.unwrap_or_else(|| panic!("shard state for {shard_id} not in ingester data"));
|
||||
let namespace = shard_data
|
||||
.namespace_by_id(namespace_id)
|
||||
.and_then(|s| s.namespace_by_id(namespace_id))
|
||||
.unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state"));
|
||||
let namespace_name = namespace.namespace_name();
|
||||
// Assert the namespace ID matches the index key.
|
||||
assert_eq!(namespace.namespace_id(), namespace_id);
|
||||
|
||||
let table_data = namespace.table_id(table_id).unwrap_or_else(|| {
|
||||
panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state")
|
||||
});
|
||||
|
||||
let partition_key;
|
||||
let table_name;
|
||||
let batch;
|
||||
let partition_key;
|
||||
let sort_key;
|
||||
let last_persisted_sequence_number;
|
||||
let batch;
|
||||
let batch_sequence_number_range;
|
||||
{
|
||||
let mut guard = table_data.write().await;
|
||||
// Assert various properties of the table to ensure the index is
|
||||
// correct, out of an abundance of caution.
|
||||
assert_eq!(guard.shard_id(), shard_id);
|
||||
assert_eq!(guard.namespace_id(), namespace_id);
|
||||
assert_eq!(guard.table_id(), table_id);
|
||||
table_name = guard.table_name().clone();
|
||||
|
||||
let partition = guard.get_partition(partition_id).unwrap_or_else(|| {
|
||||
|
@ -273,12 +283,34 @@ impl Persister for IngesterData {
|
|||
)
|
||||
});
|
||||
|
||||
// Assert various properties of the partition to ensure the index is
|
||||
// correct, out of an abundance of caution.
|
||||
assert_eq!(partition.partition_id(), partition_id);
|
||||
assert_eq!(partition.shard_id(), shard_id);
|
||||
assert_eq!(partition.namespace_id(), namespace_id);
|
||||
assert_eq!(partition.table_id(), table_id);
|
||||
assert_eq!(*partition.table_name(), table_name);
|
||||
|
||||
partition_key = partition.partition_key().clone();
|
||||
batch = partition.snapshot_to_persisting_batch();
|
||||
sort_key = partition.sort_key().clone();
|
||||
last_persisted_sequence_number = partition.max_persisted_sequence_number();
|
||||
|
||||
// The sequence number MUST be read without releasing the write lock
|
||||
// to ensure a consistent snapshot of batch contents and batch
|
||||
// sequence number range.
|
||||
batch = partition.mark_persisting();
|
||||
batch_sequence_number_range = partition.sequence_number_range();
|
||||
};
|
||||
|
||||
// From this point on, the code MUST be infallible.
|
||||
//
|
||||
// The partition data was moved to the persisting slot, and any
|
||||
// subsequent calls would be an error.
|
||||
//
|
||||
// This is NOT an invariant, and this could be changed in the future to
|
||||
// allow partitions to marked as persisting repeatedly. Today however,
|
||||
// the code is infallible (or rather, terminal - it does cause a retry).
|
||||
|
||||
let sort_key = sort_key.get().await;
|
||||
trace!(
|
||||
%shard_id,
|
||||
|
@ -306,8 +338,13 @@ impl Persister for IngesterData {
|
|||
|
||||
// Check if there is any data to persist.
|
||||
let batch = match batch {
|
||||
Some(v) if !v.data.data.is_empty() => v,
|
||||
_ => {
|
||||
Some(v) => {
|
||||
// The partition state machine will NOT return an empty batch.
|
||||
assert!(!v.record_batches().is_empty());
|
||||
v
|
||||
}
|
||||
None => {
|
||||
// But it MAY return no batch at all.
|
||||
warn!(
|
||||
%shard_id,
|
||||
%namespace_id,
|
||||
|
@ -322,17 +359,6 @@ impl Persister for IngesterData {
|
|||
}
|
||||
};
|
||||
|
||||
assert_eq!(batch.shard_id(), shard_id);
|
||||
assert_eq!(batch.table_id(), table_id);
|
||||
assert_eq!(batch.partition_id(), partition_id);
|
||||
|
||||
// Read the maximum SequenceNumber in the batch.
|
||||
let (_min, max_sequence_number) = batch.data.min_max_sequence_numbers();
|
||||
|
||||
// Read the future object store ID before passing the batch into
|
||||
// compaction, instead of retaining a copy of the data post-compaction.
|
||||
let object_store_id = batch.object_store_id();
|
||||
|
||||
// do the CPU intensive work of compaction, de-duplication and sorting
|
||||
let CompactedStream {
|
||||
stream: record_stream,
|
||||
|
@ -342,6 +368,10 @@ impl Persister for IngesterData {
|
|||
.await
|
||||
.expect("unable to compact persisting batch");
|
||||
|
||||
// Generate a UUID to uniquely identify this parquet file in object
|
||||
// storage.
|
||||
let object_store_id = Uuid::new_v4();
|
||||
|
||||
// Construct the metadata for this parquet file.
|
||||
let iox_metadata = IoxMetadata {
|
||||
object_store_id,
|
||||
|
@ -353,7 +383,7 @@ impl Persister for IngesterData {
|
|||
table_name: Arc::clone(&*table_name),
|
||||
partition_id,
|
||||
partition_key: partition_key.clone(),
|
||||
max_sequence_number,
|
||||
max_sequence_number: batch_sequence_number_range.inclusive_max().unwrap(),
|
||||
compaction_level: CompactionLevel::Initial,
|
||||
sort_key: Some(data_sort_key),
|
||||
};
|
||||
|
@ -503,15 +533,28 @@ impl Persister for IngesterData {
|
|||
.recorder(attributes)
|
||||
.record(file_size as u64);
|
||||
|
||||
// and remove the persisted data from memory
|
||||
namespace
|
||||
.mark_persisted(
|
||||
&table_name,
|
||||
&partition_key,
|
||||
iox_metadata.max_sequence_number,
|
||||
)
|
||||
.await;
|
||||
debug!(
|
||||
// Mark the partition as having completed persistence, causing it to
|
||||
// release the reference to the in-flight persistence data it is
|
||||
// holding.
|
||||
//
|
||||
// This SHOULD cause the data to be dropped, but there MAY be ongoing
|
||||
// queries that currently hold a reference to the data. In either case,
|
||||
// the persisted data will be dropped "shortly".
|
||||
table_data
|
||||
.write()
|
||||
.await
|
||||
.get_partition(partition_id)
|
||||
.unwrap()
|
||||
.mark_persisted(iox_metadata.max_sequence_number);
|
||||
|
||||
// BUG: ongoing queries retain references to the persisting data,
|
||||
// preventing it from being dropped, but memory is released back to
|
||||
// lifecycle memory tracker when this fn returns.
|
||||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/5805
|
||||
//
|
||||
|
||||
info!(
|
||||
%object_store_id,
|
||||
%shard_id,
|
||||
%namespace_id,
|
||||
|
@ -521,7 +564,7 @@ impl Persister for IngesterData {
|
|||
%partition_id,
|
||||
%partition_key,
|
||||
max_sequence_number=%iox_metadata.max_sequence_number.get(),
|
||||
"marked partition as persisted"
|
||||
"persisted partition"
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -656,8 +699,21 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
assert_matches!(action, DmlApplyAction::Applied(false));
|
||||
|
||||
let w2 = DmlWrite::new(
|
||||
"foo",
|
||||
lines_to_batches("mem foo=1 10", 0).unwrap(),
|
||||
Some("1970-01-01".into()),
|
||||
DmlMeta::sequenced(
|
||||
Sequence::new(ShardIndex::new(1), SequenceNumber::new(2)),
|
||||
ignored_ts,
|
||||
None,
|
||||
50,
|
||||
),
|
||||
);
|
||||
|
||||
let action = data
|
||||
.buffer_operation(shard1.id, DmlOperation::Write(w1), &manager.handle())
|
||||
.buffer_operation(shard1.id, DmlOperation::Write(w2), &manager.handle())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_matches!(action, DmlApplyAction::Applied(true));
|
||||
|
@ -1016,11 +1072,15 @@ mod tests {
|
|||
assert_eq!(buckets_with_counts, &[500 * 1024]);
|
||||
|
||||
let mem_table = n.table_data(&"mem".into()).unwrap();
|
||||
let mem_table = mem_table.read().await;
|
||||
|
||||
// verify that the parquet_max_sequence_number got updated
|
||||
assert_eq!(
|
||||
mem_table.parquet_max_sequence_number(),
|
||||
mem_table
|
||||
.write()
|
||||
.await
|
||||
.get_partition(partition_id)
|
||||
.unwrap()
|
||||
.max_persisted_sequence_number(),
|
||||
Some(SequenceNumber::new(2))
|
||||
);
|
||||
|
||||
|
@ -1310,13 +1370,17 @@ mod tests {
|
|||
.unwrap();
|
||||
{
|
||||
let table_data = data.table_data(&"mem".into()).unwrap();
|
||||
let table = table_data.read().await;
|
||||
let p = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
|
||||
let mut table = table_data.write().await;
|
||||
assert!(table
|
||||
.partition_iter_mut()
|
||||
.all(|p| p.get_query_data().is_none()));
|
||||
assert_eq!(
|
||||
p.max_persisted_sequence_number(),
|
||||
table
|
||||
.get_partition_by_key(&"1970-01-01".into())
|
||||
.unwrap()
|
||||
.max_persisted_sequence_number(),
|
||||
Some(SequenceNumber::new(1))
|
||||
);
|
||||
assert!(p.data.buffer.is_none());
|
||||
}
|
||||
assert_matches!(action, DmlApplyAction::Skipped);
|
||||
|
||||
|
@ -1329,8 +1393,8 @@ mod tests {
|
|||
let table = table_data.read().await;
|
||||
let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
|
||||
assert_eq!(
|
||||
partition.data.buffer.as_ref().unwrap().min_sequence_number,
|
||||
SequenceNumber::new(2)
|
||||
partition.sequence_number_range().inclusive_min(),
|
||||
Some(SequenceNumber::new(2))
|
||||
);
|
||||
|
||||
assert_matches!(data.table_count().observe(), Observation::U64Counter(v) => {
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId};
|
||||
use data_types::{NamespaceId, SequenceNumber, ShardId, TableId};
|
||||
use dml::DmlOperation;
|
||||
use iox_catalog::interface::Catalog;
|
||||
use metric::U64Counter;
|
||||
|
@ -253,52 +253,7 @@ impl NamespaceData {
|
|||
}
|
||||
}
|
||||
|
||||
/// Snapshots the mutable buffer for the partition, which clears it out and moves it over to
|
||||
/// snapshots. Then return a vec of the snapshots and the optional persisting batch.
|
||||
#[cfg(test)] // Only used in tests
|
||||
pub(crate) async fn snapshot(
|
||||
&self,
|
||||
table_name: &TableName,
|
||||
partition_key: &PartitionKey,
|
||||
) -> Option<(
|
||||
Vec<Arc<super::partition::SnapshotBatch>>,
|
||||
Option<Arc<super::partition::PersistingBatch>>,
|
||||
)> {
|
||||
if let Some(t) = self.table_data(table_name) {
|
||||
let mut t = t.write().await;
|
||||
|
||||
return t.get_partition_by_key_mut(partition_key).map(|p| {
|
||||
p.data
|
||||
.generate_snapshot()
|
||||
.expect("snapshot on mutable batch should never fail");
|
||||
(p.data.snapshots.to_vec(), p.data.persisting.clone())
|
||||
});
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Snapshots the mutable buffer for the partition, which clears it out and then moves all
|
||||
/// snapshots over to a persisting batch, which is returned. If there is no data to snapshot
|
||||
/// or persist, None will be returned.
|
||||
#[cfg(test)] // Only used in tests
|
||||
pub(crate) async fn snapshot_to_persisting(
|
||||
&self,
|
||||
table_name: &TableName,
|
||||
partition_key: &PartitionKey,
|
||||
) -> Option<Arc<super::partition::PersistingBatch>> {
|
||||
if let Some(table_data) = self.table_data(table_name) {
|
||||
let mut table_data = table_data.write().await;
|
||||
|
||||
return table_data
|
||||
.get_partition_by_key_mut(partition_key)
|
||||
.and_then(|partition_data| partition_data.snapshot_to_persisting_batch());
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Gets the buffered table data
|
||||
/// Return the specified [`TableData`] if it exists.
|
||||
pub(crate) fn table_data(
|
||||
&self,
|
||||
table_name: &TableName,
|
||||
|
@ -353,30 +308,11 @@ impl NamespaceData {
|
|||
})
|
||||
}
|
||||
|
||||
/// Walks down the table and partition and clears the persisting batch. The sequence number is
|
||||
/// the max_sequence_number for the persisted parquet file, which should be kept in the table
|
||||
/// data buffer.
|
||||
pub(super) async fn mark_persisted(
|
||||
&self,
|
||||
table_name: &TableName,
|
||||
partition_key: &PartitionKey,
|
||||
sequence_number: SequenceNumber,
|
||||
) {
|
||||
if let Some(t) = self.table_data(table_name) {
|
||||
let mut t = t.write().await;
|
||||
let partition = t.get_partition_by_key_mut(partition_key);
|
||||
|
||||
if let Some(p) = partition {
|
||||
p.mark_persisted(sequence_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return progress from this Namespace
|
||||
pub(super) async fn progress(&self) -> ShardProgress {
|
||||
let tables: Vec<_> = self.tables.read().by_id.values().map(Arc::clone).collect();
|
||||
|
||||
// Consolidate progtress across partitions.
|
||||
// Consolidate progress across partitions.
|
||||
let mut progress = ShardProgress::new()
|
||||
// Properly account for any sequence number that is
|
||||
// actively buffering and thus not yet completely
|
||||
|
@ -442,7 +378,7 @@ impl<'a> Drop for ScopedSequenceNumber<'a> {
|
|||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::{PartitionId, ShardIndex};
|
||||
use data_types::{PartitionId, PartitionKey, ShardIndex};
|
||||
use metric::{Attributes, Metric};
|
||||
|
||||
use crate::{
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,274 +1,112 @@
|
|||
//! Data for the lifecycle of the Ingester
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::{PartitionId, SequenceNumber, ShardId, TableId};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use data_types::SequenceNumber;
|
||||
use mutable_batch::MutableBatch;
|
||||
use schema::selection::Selection;
|
||||
use snafu::ResultExt;
|
||||
use uuid::Uuid;
|
||||
use write_summary::ShardProgress;
|
||||
|
||||
use crate::data::table::TableName;
|
||||
use crate::data::SequenceNumberRange;
|
||||
|
||||
use super::{PersistingBatch, QueryableBatch, SnapshotBatch};
|
||||
mod always_some;
|
||||
mod mutable_buffer;
|
||||
mod state_machine;
|
||||
pub(crate) mod traits;
|
||||
|
||||
/// Data of an IOx partition split into batches
|
||||
/// ┌────────────────────────┐ ┌────────────────────────┐ ┌─────────────────────────┐
|
||||
/// │ Buffer │ │ Snapshots │ │ Persisting │
|
||||
/// │ ┌───────────────────┐ │ │ │ │ │
|
||||
/// │ │ ┌───────────────┐│ │ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │
|
||||
/// │ │ ┌┴──────────────┐│├─┼────────┼─┼─▶┌───────────────┐│ │ │ │ ┌───────────────┐│ │
|
||||
/// │ │┌┴──────────────┐├┘│ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │
|
||||
/// │ ││ BufferBatch ├┘ │ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │
|
||||
/// │ │└───────────────┘ │ │ ┌───┼─▶│ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │
|
||||
/// │ └───────────────────┘ │ │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │
|
||||
/// │ ... │ │ │ └───────────────────┘ │ │ └───────────────────┘ │
|
||||
/// │ ┌───────────────────┐ │ │ │ │ │ │
|
||||
/// │ │ ┌───────────────┐│ │ │ │ ... │ │ ... │
|
||||
/// │ │ ┌┴──────────────┐││ │ │ │ │ │ │
|
||||
/// │ │┌┴──────────────┐├┘│─┼────┘ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │
|
||||
/// │ ││ BufferBatch ├┘ │ │ │ │ ┌───────────────┐│ │ │ │ ┌───────────────┐│ │
|
||||
/// │ │└───────────────┘ │ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │
|
||||
/// │ └───────────────────┘ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │
|
||||
/// │ │ │ ││ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │
|
||||
/// │ ... │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │
|
||||
/// │ │ │ └───────────────────┘ │ │ └───────────────────┘ │
|
||||
/// └────────────────────────┘ └────────────────────────┘ └─────────────────────────┘
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct DataBuffer {
|
||||
/// Buffer of incoming writes
|
||||
pub(crate) buffer: Option<BufferBatch>,
|
||||
pub(crate) use state_machine::*;
|
||||
|
||||
/// Data in `buffer` will be moved to a `snapshot` when one of these happens:
|
||||
/// . A background persist is called
|
||||
/// . A read request from Querier
|
||||
/// The `buffer` will be empty when this happens.
|
||||
pub(crate) snapshots: Vec<Arc<SnapshotBatch>>,
|
||||
/// When a persist is called, data in `buffer` will be moved to a `snapshot`
|
||||
/// and then all `snapshots` will be moved to a `persisting`.
|
||||
/// Both `buffer` and 'snaphots` will be empty when this happens.
|
||||
pub(crate) persisting: Option<Arc<PersistingBatch>>,
|
||||
// Extra Notes:
|
||||
// . In MVP, we will only persist a set of snapshots at a time.
|
||||
// In later version, multiple persisting operations may be happening concurrently but
|
||||
// their persisted info must be added into the Catalog in their data
|
||||
// ingesting order.
|
||||
// . When a read request comes from a Querier, all data from `snapshots`
|
||||
// and `persisting` must be sent to the Querier.
|
||||
// . After the `persisting` data is persisted and successfully added
|
||||
// into the Catalog, it will be removed from this Data Buffer.
|
||||
// This data might be added into an extra cache to serve up to
|
||||
// Queriers that may not have loaded the parquet files from object
|
||||
// storage yet. But this will be decided after MVP.
|
||||
use self::{always_some::AlwaysSome, traits::Queryable};
|
||||
|
||||
/// The current state of the [`BufferState`] state machine.
|
||||
///
|
||||
/// NOTE that this does NOT contain the [`Persisting`] state, as this is a
|
||||
/// immutable, terminal state that does not accept further writes and is
|
||||
/// directly queryable.
|
||||
#[derive(Debug)]
|
||||
#[must_use = "FSM should not be dropped unused"]
|
||||
enum FsmState {
|
||||
/// The data buffer contains no data snapshots, and is accepting writes.
|
||||
Buffering(BufferState<Buffering>),
|
||||
}
|
||||
|
||||
impl Default for FsmState {
|
||||
fn default() -> Self {
|
||||
Self::Buffering(BufferState::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl FsmState {
|
||||
/// Return the current range of writes in the [`BufferState`] state machine,
|
||||
/// if any.
|
||||
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
|
||||
match self {
|
||||
Self::Buffering(v) => v.sequence_number_range(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper wrapper over the [`BufferState`] FSM to abstract the caller from
|
||||
/// state transitions during reads and writes from the underlying buffer.
|
||||
#[derive(Debug, Default)]
|
||||
#[must_use = "DataBuffer should not be dropped unused"]
|
||||
pub(crate) struct DataBuffer(AlwaysSome<FsmState>);
|
||||
|
||||
impl DataBuffer {
|
||||
/// If a [`BufferBatch`] exists, convert it to a [`SnapshotBatch`] and add
|
||||
/// it to the list of snapshots.
|
||||
/// Return the range of [`SequenceNumber`] currently queryable by calling
|
||||
/// [`Self::get_query_data()`].
|
||||
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
|
||||
self.0.sequence_number_range()
|
||||
}
|
||||
|
||||
/// Buffer the given [`MutableBatch`] in memory, ordered by the specified
|
||||
/// [`SequenceNumber`].
|
||||
///
|
||||
/// Does nothing if there is no [`BufferBatch`].
|
||||
pub(crate) fn generate_snapshot(&mut self) -> Result<(), mutable_batch::Error> {
|
||||
let snapshot = self.copy_buffer_to_snapshot()?;
|
||||
if let Some(snapshot) = snapshot {
|
||||
self.snapshots.push(snapshot);
|
||||
self.buffer = None;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns snapshot of the buffer but keeps data in the buffer
|
||||
fn copy_buffer_to_snapshot(&self) -> Result<Option<Arc<SnapshotBatch>>, mutable_batch::Error> {
|
||||
if let Some(buf) = &self.buffer {
|
||||
return Ok(Some(Arc::new(SnapshotBatch {
|
||||
min_sequence_number: buf.min_sequence_number,
|
||||
max_sequence_number: buf.max_sequence_number,
|
||||
data: Arc::new(buf.data.to_arrow(Selection::All)?),
|
||||
})));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Snapshots the buffer and make a QueryableBatch for all the snapshots
|
||||
/// Both buffer and snapshots will be empty after this
|
||||
pub(super) fn snapshot_to_queryable_batch(
|
||||
/// # Panics
|
||||
///
|
||||
/// This method panics if `sequence_number` is not strictly greater than
|
||||
/// previous calls.
|
||||
pub(crate) fn buffer_write(
|
||||
&mut self,
|
||||
table_name: &TableName,
|
||||
partition_id: PartitionId,
|
||||
) -> Option<QueryableBatch> {
|
||||
self.generate_snapshot()
|
||||
.expect("This mutable batch snapshot error should be impossible.");
|
||||
|
||||
let mut data = vec![];
|
||||
std::mem::swap(&mut data, &mut self.snapshots);
|
||||
|
||||
// only produce batch if there is any data
|
||||
if data.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(QueryableBatch::new(table_name.clone(), partition_id, data))
|
||||
}
|
||||
mb: MutableBatch,
|
||||
sequence_number: SequenceNumber,
|
||||
) -> Result<(), mutable_batch::Error> {
|
||||
// Take ownership of the FSM and apply the write.
|
||||
self.0.mutate(|fsm| match fsm {
|
||||
// Mutable stats simply have the write applied.
|
||||
FsmState::Buffering(mut b) => {
|
||||
let ret = b.write(mb, sequence_number);
|
||||
(FsmState::Buffering(b), ret)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns all existing snapshots plus data in the buffer
|
||||
/// This only read data. Data in the buffer will be kept in the buffer
|
||||
pub(super) fn buffer_and_snapshots(
|
||||
&self,
|
||||
) -> Result<Vec<Arc<SnapshotBatch>>, crate::data::Error> {
|
||||
// Existing snapshots
|
||||
let mut snapshots = self.snapshots.clone();
|
||||
|
||||
// copy the buffer to a snapshot
|
||||
let buffer_snapshot = self
|
||||
.copy_buffer_to_snapshot()
|
||||
.context(crate::data::BufferToSnapshotSnafu)?;
|
||||
snapshots.extend(buffer_snapshot);
|
||||
|
||||
Ok(snapshots)
|
||||
/// Return all data for this buffer, ordered by the [`SequenceNumber`] from
|
||||
/// which it was buffered with.
|
||||
pub(crate) fn get_query_data(&mut self) -> Vec<Arc<RecordBatch>> {
|
||||
// Take ownership of the FSM and return the data within it.
|
||||
self.0.mutate(|fsm| match fsm {
|
||||
// The buffering state can return data.
|
||||
FsmState::Buffering(b) => {
|
||||
let ret = b.get_query_data();
|
||||
(FsmState::Buffering(b), ret)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`.
|
||||
///
|
||||
/// # Panic
|
||||
///
|
||||
/// Panics if there is already a persisting batch.
|
||||
pub(super) fn snapshot_to_persisting(
|
||||
&mut self,
|
||||
shard_id: ShardId,
|
||||
table_id: TableId,
|
||||
partition_id: PartitionId,
|
||||
table_name: &TableName,
|
||||
) -> Option<Arc<PersistingBatch>> {
|
||||
if self.persisting.is_some() {
|
||||
panic!("Unable to snapshot while persisting. This is an unexpected state.")
|
||||
}
|
||||
|
||||
if let Some(queryable_batch) = self.snapshot_to_queryable_batch(table_name, partition_id) {
|
||||
let persisting_batch = Arc::new(PersistingBatch {
|
||||
shard_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
object_store_id: Uuid::new_v4(),
|
||||
data: Arc::new(queryable_batch),
|
||||
});
|
||||
|
||||
self.persisting = Some(Arc::clone(&persisting_batch));
|
||||
|
||||
Some(persisting_batch)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a QueryableBatch of the persisting batch after applying new tombstones
|
||||
pub(super) fn get_persisting_data(&self) -> Option<QueryableBatch> {
|
||||
let persisting = match &self.persisting {
|
||||
Some(p) => p,
|
||||
None => return None,
|
||||
// Deconstruct the [`DataBuffer`] into the underlying FSM in a
|
||||
// [`Persisting`] state, if the buffer contains any data.
|
||||
pub(crate) fn into_persisting(self) -> Option<BufferState<Persisting>> {
|
||||
let p = match self.0.into_inner() {
|
||||
FsmState::Buffering(b) => {
|
||||
// Attempt to snapshot the buffer to an immutable state.
|
||||
match b.snapshot() {
|
||||
Transition::Ok(b) => b.into_persisting(),
|
||||
Transition::Unchanged(_) => {
|
||||
// The buffer contains no data.
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// persisting data
|
||||
Some((*persisting.data).clone())
|
||||
}
|
||||
|
||||
/// Return the progress in this DataBuffer
|
||||
pub(super) fn progress(&self) -> ShardProgress {
|
||||
let progress = ShardProgress::new();
|
||||
|
||||
let progress = if let Some(buffer) = &self.buffer {
|
||||
progress.combine(buffer.progress())
|
||||
} else {
|
||||
progress
|
||||
};
|
||||
|
||||
let progress = self.snapshots.iter().fold(progress, |progress, snapshot| {
|
||||
progress.combine(snapshot.progress())
|
||||
});
|
||||
|
||||
if let Some(persisting) = &self.persisting {
|
||||
persisting
|
||||
.data
|
||||
.data
|
||||
.iter()
|
||||
.fold(progress, |progress, snapshot| {
|
||||
progress.combine(snapshot.progress())
|
||||
})
|
||||
} else {
|
||||
progress
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn get_snapshots(&self) -> &[Arc<SnapshotBatch>] {
|
||||
self.snapshots.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn mark_persisted(&mut self) {
|
||||
self.persisting = None;
|
||||
}
|
||||
}
|
||||
|
||||
/// BufferBatch is a MutableBatch with its ingesting order, sequence_number, that helps the
|
||||
/// ingester keep the batches of data in their ingesting order
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BufferBatch {
|
||||
/// Sequence number of the first write in this batch
|
||||
pub(crate) min_sequence_number: SequenceNumber,
|
||||
/// Sequence number of the last write in this batch
|
||||
pub(super) max_sequence_number: SequenceNumber,
|
||||
/// Ingesting data
|
||||
pub(super) data: MutableBatch,
|
||||
}
|
||||
|
||||
impl BufferBatch {
|
||||
/// Return the progress in this DataBuffer
|
||||
fn progress(&self) -> ShardProgress {
|
||||
ShardProgress::new()
|
||||
.with_buffered(self.min_sequence_number)
|
||||
.with_buffered(self.max_sequence_number)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn snapshot_empty_buffer_adds_no_snapshots() {
|
||||
let mut data_buffer = DataBuffer::default();
|
||||
|
||||
data_buffer.generate_snapshot().unwrap();
|
||||
|
||||
assert!(data_buffer.snapshots.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snapshot_buffer_batch_moves_to_snapshots() {
|
||||
let mut data_buffer = DataBuffer::default();
|
||||
|
||||
let seq_num1 = SequenceNumber::new(1);
|
||||
let (_, mutable_batch1) =
|
||||
lp_to_mutable_batch(r#"foo,t1=asdf iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#);
|
||||
let buffer_batch1 = BufferBatch {
|
||||
min_sequence_number: seq_num1,
|
||||
max_sequence_number: seq_num1,
|
||||
data: mutable_batch1,
|
||||
};
|
||||
let record_batch1 = buffer_batch1.data.to_arrow(Selection::All).unwrap();
|
||||
data_buffer.buffer = Some(buffer_batch1);
|
||||
|
||||
data_buffer.generate_snapshot().unwrap();
|
||||
|
||||
assert!(data_buffer.buffer.is_none());
|
||||
assert_eq!(data_buffer.snapshots.len(), 1);
|
||||
|
||||
let snapshot = &data_buffer.snapshots[0];
|
||||
assert_eq!(snapshot.min_sequence_number, seq_num1);
|
||||
assert_eq!(snapshot.max_sequence_number, seq_num1);
|
||||
assert_eq!(&*snapshot.data, &record_batch1);
|
||||
Some(p)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,31 +1,8 @@
|
|||
//! A helper type that ensures an `Option` is always `Some` once the guard is
|
||||
//! dropped.
|
||||
|
||||
/// A guard through which a value can be placed back into the [`AlwaysSome`].
|
||||
#[derive(Debug)]
|
||||
#[must_use = "Guard must be used to restore the value"]
|
||||
pub(super) struct Guard<'a, T>(&'a mut Option<T>);
|
||||
|
||||
impl<'a, T> Guard<'a, T> {
|
||||
/// Store `value` in the [`AlwaysSome`] for subsequent
|
||||
/// [`AlwaysSome::take()`] calls.
|
||||
pub(super) fn store(self, value: T) {
|
||||
assert!(self.0.is_none());
|
||||
*self.0 = Some(value);
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper type that aims to ease working with an [`Option`] that must always
|
||||
/// be restored in a given scope.
|
||||
///
|
||||
/// Accessing the value within an [`AlwaysSome`] returns a [`Guard`], which MUST
|
||||
/// be used to store the value before going out of scope. Failure to store a
|
||||
/// value cause a subsequent [`Self::take()`] call to panic.
|
||||
///
|
||||
/// Failing to store a value in the [`Guard`] causes a compiler warning, however
|
||||
/// this does not prevent failing to return a value to the [`AlwaysSome`] as the
|
||||
/// warning can be falsely silenced by using it within one conditional code path
|
||||
/// and not the other.
|
||||
/// A helper type that aims to ease calling methods on a type that takes `self`,
|
||||
/// that must always be restored at the end of the method call.
|
||||
#[derive(Debug)]
|
||||
pub(super) struct AlwaysSome<T>(Option<T>);
|
||||
|
||||
|
@ -52,14 +29,14 @@ impl<T> AlwaysSome<T> {
|
|||
Self(Some(value))
|
||||
}
|
||||
|
||||
/// Read the value.
|
||||
pub(super) fn take(&mut self) -> (Guard<'_, T>, T) {
|
||||
pub(super) fn mutate<F, R>(&mut self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(T) -> (T, R),
|
||||
{
|
||||
let value = std::mem::take(&mut self.0);
|
||||
|
||||
(
|
||||
Guard(&mut self.0),
|
||||
value.expect("AlwaysSome value is None!"),
|
||||
)
|
||||
let (value, ret) = f(value.expect("AlwaysSome value is None!"));
|
||||
self.0 = Some(value);
|
||||
ret
|
||||
}
|
||||
|
||||
/// Deconstruct `self`, returning the inner value.
|
||||
|
@ -76,24 +53,18 @@ mod tests {
|
|||
fn test_always_some() {
|
||||
let mut a = AlwaysSome::<usize>::default();
|
||||
|
||||
let (guard, value) = a.take();
|
||||
assert_eq!(value, 0);
|
||||
guard.store(42);
|
||||
let ret = a.mutate(|value| {
|
||||
assert_eq!(value, 0);
|
||||
(42, true)
|
||||
});
|
||||
assert!(ret);
|
||||
|
||||
let (guard, value) = a.take();
|
||||
assert_eq!(value, 42);
|
||||
guard.store(24);
|
||||
let ret = a.mutate(|value| {
|
||||
assert_eq!(value, 42);
|
||||
(13, "bananas")
|
||||
});
|
||||
assert_eq!(ret, "bananas");
|
||||
|
||||
assert_eq!(a.into_inner(), 24);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic = "AlwaysSome value is None!"]
|
||||
fn test_drops_guard() {
|
||||
let mut a = AlwaysSome::<usize>::default();
|
||||
{
|
||||
let _ = a.take();
|
||||
}
|
||||
let _ = a.take();
|
||||
assert_eq!(a.into_inner(), 13);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ pub(crate) enum Transition<A, B> {
|
|||
|
||||
impl<A, B> Transition<A, B> {
|
||||
/// A helper function to construct [`Self::Ok`] variants.
|
||||
pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Transition<A, B> {
|
||||
pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Self {
|
||||
Self::Ok(BufferState {
|
||||
state: v,
|
||||
sequence_range,
|
||||
|
@ -39,7 +39,7 @@ impl<A, B> Transition<A, B> {
|
|||
}
|
||||
|
||||
/// A helper function to construct [`Self::Unchanged`] variants.
|
||||
pub(super) fn unchanged(v: BufferState<B>) -> Transition<A, B> {
|
||||
pub(super) fn unchanged(v: BufferState<B>) -> Self {
|
||||
Self::Unchanged(v)
|
||||
}
|
||||
}
|
||||
|
@ -164,7 +164,7 @@ mod tests {
|
|||
// Keep the data to validate they are ref-counted copies after further
|
||||
// writes below. Note this construct allows the caller to decide when/if
|
||||
// to allocate.
|
||||
let w1_data = buffer.get_query_data().to_owned();
|
||||
let w1_data = buffer.get_query_data();
|
||||
|
||||
let expected = vec![
|
||||
"+-------+----------+----------+--------------------------------+",
|
||||
|
@ -193,7 +193,7 @@ mod tests {
|
|||
};
|
||||
|
||||
// Verify the writes are still queryable.
|
||||
let w2_data = buffer.get_query_data().to_owned();
|
||||
let w2_data = buffer.get_query_data();
|
||||
let expected = vec![
|
||||
"+-------+----------+----------+--------------------------------+",
|
||||
"| great | how_much | tag | time |",
|
||||
|
@ -214,7 +214,7 @@ mod tests {
|
|||
let same_arcs = w2_data
|
||||
.iter()
|
||||
.zip(second_read.iter())
|
||||
.all(|(a, b)| Arc::ptr_eq(a, &b));
|
||||
.all(|(a, b)| Arc::ptr_eq(a, b));
|
||||
assert!(same_arcs);
|
||||
}
|
||||
|
||||
|
|
|
@ -157,7 +157,7 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.get_by_id(got.id)
|
||||
.get_by_id(got.partition_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("partition not created");
|
||||
|
|
|
@ -39,7 +39,7 @@ impl SequenceNumberRange {
|
|||
let merged_range = self
|
||||
.range
|
||||
.into_iter()
|
||||
.chain(other.range.clone())
|
||||
.chain(other.range)
|
||||
.reduce(|a, b| (a.0.min(b.0), a.1.max(b.1)));
|
||||
|
||||
Self {
|
||||
|
|
|
@ -7,8 +7,11 @@ use mutable_batch::MutableBatch;
|
|||
use observability_deps::tracing::*;
|
||||
use write_summary::ShardProgress;
|
||||
|
||||
use super::partition::{resolver::PartitionProvider, PartitionData, UnpersistedPartitionData};
|
||||
use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle, querier_handler::PartitionStatus};
|
||||
use super::{
|
||||
partition::{resolver::PartitionProvider, PartitionData},
|
||||
DmlApplyAction,
|
||||
};
|
||||
use crate::lifecycle::LifecycleHandle;
|
||||
|
||||
/// A double-referenced map where [`PartitionData`] can be looked up by
|
||||
/// [`PartitionKey`], or ID.
|
||||
|
@ -72,6 +75,12 @@ impl std::ops::Deref for TableName {
|
|||
}
|
||||
}
|
||||
|
||||
impl PartialEq<str> for TableName {
|
||||
fn eq(&self, other: &str) -> bool {
|
||||
&*self.0 == other
|
||||
}
|
||||
}
|
||||
|
||||
/// Data of a Table in a given Namesapce that belongs to a given Shard
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TableData {
|
||||
|
@ -119,16 +128,6 @@ impl TableData {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return parquet_max_sequence_number
|
||||
pub(super) fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
|
||||
self.partition_data
|
||||
.by_key
|
||||
.values()
|
||||
.map(|p| p.max_persisted_sequence_number())
|
||||
.max()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
// buffers the table write and returns true if the lifecycle manager indicates that
|
||||
// ingest should be paused.
|
||||
pub(super) async fn buffer_table_write(
|
||||
|
@ -171,7 +170,7 @@ impl TableData {
|
|||
|
||||
let size = batch.size();
|
||||
let rows = batch.rows();
|
||||
partition_data.buffer_write(sequence_number, batch)?;
|
||||
partition_data.buffer_write(batch, sequence_number)?;
|
||||
|
||||
// Record the write as having been buffered.
|
||||
//
|
||||
|
@ -191,6 +190,18 @@ impl TableData {
|
|||
Ok(DmlApplyAction::Applied(should_pause))
|
||||
}
|
||||
|
||||
/// Return a mutable reference to all partitions buffered for this table.
|
||||
///
|
||||
/// # Ordering
|
||||
///
|
||||
/// The order of [`PartitionData`] in the iterator is arbitrary and should
|
||||
/// not be relied upon.
|
||||
pub(crate) fn partition_iter_mut(
|
||||
&mut self,
|
||||
) -> impl Iterator<Item = &mut PartitionData> + ExactSizeIterator {
|
||||
self.partition_data.by_key.values_mut()
|
||||
}
|
||||
|
||||
/// Return the [`PartitionData`] for the specified ID.
|
||||
#[allow(unused)]
|
||||
pub(crate) fn get_partition(
|
||||
|
@ -209,43 +220,12 @@ impl TableData {
|
|||
self.partition_data.by_key(partition_key)
|
||||
}
|
||||
|
||||
/// Return the [`PartitionData`] for the specified partition key.
|
||||
pub(crate) fn get_partition_by_key_mut(
|
||||
&mut self,
|
||||
partition_key: &PartitionKey,
|
||||
) -> Option<&mut PartitionData> {
|
||||
self.partition_data.by_key_mut(partition_key)
|
||||
}
|
||||
|
||||
pub(crate) fn unpersisted_partition_data(&self) -> Vec<UnpersistedPartitionData> {
|
||||
self.partition_data
|
||||
.by_key
|
||||
.values()
|
||||
.map(|p| UnpersistedPartitionData {
|
||||
partition_id: p.partition_id(),
|
||||
non_persisted: p
|
||||
.get_non_persisting_data()
|
||||
.expect("get_non_persisting should always work"),
|
||||
persisting: p.get_persisting_data(),
|
||||
partition_status: PartitionStatus {
|
||||
parquet_max_sequence_number: p.max_persisted_sequence_number(),
|
||||
},
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Return progress from this Table
|
||||
pub(super) fn progress(&self) -> ShardProgress {
|
||||
let progress = ShardProgress::new();
|
||||
let progress = match self.parquet_max_sequence_number() {
|
||||
Some(n) => progress.with_persisted(n),
|
||||
None => progress,
|
||||
};
|
||||
|
||||
self.partition_data
|
||||
.by_key
|
||||
.values()
|
||||
.fold(progress, |progress, partition_data| {
|
||||
.fold(Default::default(), |progress, partition_data| {
|
||||
progress.combine(partition_data.progress())
|
||||
})
|
||||
}
|
||||
|
@ -259,6 +239,16 @@ impl TableData {
|
|||
pub(crate) fn table_name(&self) -> &TableName {
|
||||
&self.table_name
|
||||
}
|
||||
|
||||
/// Return the shard ID for this table.
|
||||
pub(crate) fn shard_id(&self) -> ShardId {
|
||||
self.shard_id
|
||||
}
|
||||
|
||||
/// Return the [`NamespaceId`] this table is a part of.
|
||||
pub fn namespace_id(&self) -> NamespaceId {
|
||||
self.namespace_id
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -24,7 +24,7 @@ mod job;
|
|||
pub mod lifecycle;
|
||||
mod poison;
|
||||
pub mod querier_handler;
|
||||
pub(crate) mod query;
|
||||
pub(crate) mod query_adaptor;
|
||||
pub mod server;
|
||||
pub(crate) mod stream_handler;
|
||||
|
||||
|
|
|
@ -1,12 +1,7 @@
|
|||
//! Handle all requests from Querier
|
||||
|
||||
use crate::{
|
||||
data::{
|
||||
namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName,
|
||||
IngesterData,
|
||||
},
|
||||
query::QueryableBatch,
|
||||
};
|
||||
use std::{pin::Pin, sync::Arc};
|
||||
|
||||
use arrow::{array::new_null_array, error::ArrowError, record_batch::RecordBatch};
|
||||
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
|
||||
use data_types::{PartitionId, SequenceNumber};
|
||||
|
@ -17,9 +12,10 @@ use generated_types::ingester::IngesterQueryRequest;
|
|||
use observability_deps::tracing::debug;
|
||||
use schema::{merge::SchemaMerger, selection::Selection};
|
||||
use snafu::{ensure, Snafu};
|
||||
use std::{pin::Pin, sync::Arc};
|
||||
use trace::span::{Span, SpanRecorder};
|
||||
|
||||
use crate::data::{namespace::NamespaceName, table::TableName, IngesterData};
|
||||
|
||||
/// Number of table data read locks that shall be acquired in parallel
|
||||
const CONCURRENT_TABLE_DATA_LOCKS: usize = 10;
|
||||
|
||||
|
@ -264,10 +260,11 @@ pub async fn prepare_data_to_querier(
|
|||
) -> Result<IngesterQueryResponse> {
|
||||
debug!(?request, "prepare_data_to_querier");
|
||||
|
||||
let span_recorder = SpanRecorder::new(span);
|
||||
let mut span_recorder = SpanRecorder::new(span);
|
||||
|
||||
let mut tables_data = vec![];
|
||||
let mut table_refs = vec![];
|
||||
let mut found_namespace = false;
|
||||
|
||||
for (shard_id, shard_data) in ingest_data.shards() {
|
||||
debug!(shard_id=%shard_id.get());
|
||||
let namespace_name = NamespaceName::from(&request.namespace);
|
||||
|
@ -293,7 +290,7 @@ pub async fn prepare_data_to_querier(
|
|||
}
|
||||
};
|
||||
|
||||
tables_data.push(table_data);
|
||||
table_refs.push(table_data);
|
||||
}
|
||||
|
||||
ensure!(
|
||||
|
@ -303,113 +300,83 @@ pub async fn prepare_data_to_querier(
|
|||
},
|
||||
);
|
||||
|
||||
// acquire locks in parallel
|
||||
let unpersisted_partitions: Vec<_> = futures::stream::iter(tables_data)
|
||||
.map(|table_data| async move {
|
||||
let table_data = table_data.read().await;
|
||||
table_data.unpersisted_partition_data()
|
||||
})
|
||||
// Note: the order doesn't matter
|
||||
.buffer_unordered(CONCURRENT_TABLE_DATA_LOCKS)
|
||||
.concat()
|
||||
.await;
|
||||
|
||||
ensure!(
|
||||
!unpersisted_partitions.is_empty(),
|
||||
!table_refs.is_empty(),
|
||||
TableNotFoundSnafu {
|
||||
namespace_name: &request.namespace,
|
||||
table_name: &request.table
|
||||
},
|
||||
);
|
||||
|
||||
let request = Arc::clone(request);
|
||||
let partitions =
|
||||
futures::stream::iter(unpersisted_partitions.into_iter().map(move |partition| {
|
||||
// extract payload
|
||||
let partition_id = partition.partition_id;
|
||||
let status = partition.partition_status.clone();
|
||||
let snapshots: Vec<_> = prepare_data_to_querier_for_partition(
|
||||
partition,
|
||||
&request,
|
||||
span_recorder.child_span("ingester prepare data to querier for partition"),
|
||||
)
|
||||
.into_iter()
|
||||
.map(Ok)
|
||||
.collect();
|
||||
|
||||
// Note: include partition in `unpersisted_partitions` even when there we might filter
|
||||
// out all the data, because the metadata (e.g. max persisted parquet file) is
|
||||
// important for the querier.
|
||||
Ok(IngesterQueryPartition::new(
|
||||
Box::pin(futures::stream::iter(snapshots)),
|
||||
partition_id,
|
||||
status,
|
||||
))
|
||||
}));
|
||||
|
||||
Ok(IngesterQueryResponse::new(Box::pin(partitions)))
|
||||
}
|
||||
|
||||
fn prepare_data_to_querier_for_partition(
|
||||
unpersisted_partition_data: UnpersistedPartitionData,
|
||||
request: &IngesterQueryRequest,
|
||||
span: Option<Span>,
|
||||
) -> Vec<SendableRecordBatchStream> {
|
||||
let mut span_recorder = SpanRecorder::new(span);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Accumulate data
|
||||
|
||||
// Make Filters
|
||||
let selection_columns: Vec<_> = request.columns.iter().map(String::as_str).collect();
|
||||
let selection = if selection_columns.is_empty() {
|
||||
Selection::All
|
||||
} else {
|
||||
Selection::Some(&selection_columns)
|
||||
};
|
||||
|
||||
// figure out what batches
|
||||
let queryable_batch = unpersisted_partition_data
|
||||
.persisting
|
||||
.unwrap_or_else(|| {
|
||||
QueryableBatch::new(
|
||||
request.table.clone().into(),
|
||||
unpersisted_partition_data.partition_id,
|
||||
vec![],
|
||||
)
|
||||
// acquire locks and read table data in parallel
|
||||
let unpersisted_partitions: Vec<_> = futures::stream::iter(table_refs)
|
||||
.map(|table_data| async move {
|
||||
let mut table_data = table_data.write().await;
|
||||
table_data
|
||||
.partition_iter_mut()
|
||||
.map(|p| {
|
||||
(
|
||||
p.partition_id(),
|
||||
p.get_query_data(),
|
||||
p.max_persisted_sequence_number(),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.with_data(unpersisted_partition_data.non_persisted);
|
||||
// Note: the order doesn't matter
|
||||
.buffer_unordered(CONCURRENT_TABLE_DATA_LOCKS)
|
||||
.concat()
|
||||
.await;
|
||||
|
||||
let streams = queryable_batch
|
||||
.data
|
||||
.iter()
|
||||
.map(|snapshot_batch| {
|
||||
let batch = snapshot_batch.data.as_ref();
|
||||
let schema = batch.schema();
|
||||
let request = Arc::clone(request);
|
||||
let partitions = futures::stream::iter(unpersisted_partitions.into_iter().map(
|
||||
move |(partition_id, data, max_persisted_sequence_number)| {
|
||||
let snapshots = match data {
|
||||
None => Box::pin(futures::stream::empty()) as SnapshotStream,
|
||||
|
||||
// Apply selection to in-memory batch
|
||||
let batch = match selection {
|
||||
Selection::All => batch.clone(),
|
||||
Selection::Some(columns) => {
|
||||
let projection = columns
|
||||
Some(batch) => {
|
||||
assert_eq!(partition_id, batch.partition_id());
|
||||
|
||||
// Project the data if necessary
|
||||
let columns = request
|
||||
.columns
|
||||
.iter()
|
||||
.flat_map(|&column_name| {
|
||||
// ignore non-existing columns
|
||||
schema.index_of(column_name).ok()
|
||||
})
|
||||
.map(String::as_str)
|
||||
.collect::<Vec<_>>();
|
||||
batch.project(&projection).expect("bug in projection")
|
||||
let selection = if columns.is_empty() {
|
||||
Selection::All
|
||||
} else {
|
||||
Selection::Some(columns.as_ref())
|
||||
};
|
||||
|
||||
let snapshots = batch.project_selection(selection).into_iter().map(|batch| {
|
||||
// Create a stream from the batch.
|
||||
Ok(Box::pin(MemoryStream::new(vec![batch])) as SendableRecordBatchStream)
|
||||
});
|
||||
|
||||
Box::pin(futures::stream::iter(snapshots)) as SnapshotStream
|
||||
}
|
||||
};
|
||||
|
||||
// create stream
|
||||
Box::pin(MemoryStream::new(vec![batch])) as SendableRecordBatchStream
|
||||
})
|
||||
.collect();
|
||||
// NOTE: the partition persist watermark MUST always be provided to
|
||||
// the querier for any partition that has performed (or is aware of)
|
||||
// a persist operation.
|
||||
//
|
||||
// This allows the querier to use the per-partition persist marker
|
||||
// when planning queries.
|
||||
Ok(IngesterQueryPartition::new(
|
||||
snapshots,
|
||||
partition_id,
|
||||
PartitionStatus {
|
||||
parquet_max_sequence_number: max_persisted_sequence_number,
|
||||
},
|
||||
))
|
||||
},
|
||||
));
|
||||
|
||||
span_recorder.ok("done");
|
||||
|
||||
streams
|
||||
Ok(IngesterQueryResponse::new(Box::pin(partitions)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -427,7 +394,7 @@ mod tests {
|
|||
use predicate::Predicate;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::{make_ingester_data, DataLocation, TEST_NAMESPACE, TEST_TABLE};
|
||||
use crate::test_util::{make_ingester_data, TEST_NAMESPACE, TEST_TABLE};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ingester_query_response_flatten() {
|
||||
|
@ -517,23 +484,11 @@ mod tests {
|
|||
async fn test_prepare_data_to_querier() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
let span = None;
|
||||
|
||||
// make 14 scenarios for ingester data
|
||||
let mut scenarios = vec![];
|
||||
for two_partitions in [false, true] {
|
||||
for loc in [
|
||||
DataLocation::BUFFER,
|
||||
DataLocation::BUFFER_SNAPSHOT,
|
||||
DataLocation::BUFFER_PERSISTING,
|
||||
DataLocation::BUFFER_SNAPSHOT_PERSISTING,
|
||||
DataLocation::SNAPSHOT,
|
||||
DataLocation::SNAPSHOT_PERSISTING,
|
||||
DataLocation::PERSISTING,
|
||||
] {
|
||||
let scenario = Arc::new(make_ingester_data(two_partitions, loc).await);
|
||||
scenarios.push((loc, scenario));
|
||||
}
|
||||
let scenario = Arc::new(make_ingester_data(two_partitions).await);
|
||||
scenarios.push(scenario);
|
||||
}
|
||||
|
||||
// read data from all scenarios without any filters
|
||||
|
@ -557,9 +512,8 @@ mod tests {
|
|||
"| Wilmington | mon | | 1970-01-01T00:00:00.000000035Z |", // in group 3 - seq_num: 6
|
||||
"+------------+-----+------+--------------------------------+",
|
||||
];
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let result = prepare_data_to_querier(scenario, &request, span.clone())
|
||||
for scenario in &scenarios {
|
||||
let result = prepare_data_to_querier(scenario, &request, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_record_batches()
|
||||
|
@ -593,9 +547,8 @@ mod tests {
|
|||
"| Wilmington | | 1970-01-01T00:00:00.000000035Z |",
|
||||
"+------------+------+--------------------------------+",
|
||||
];
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let result = prepare_data_to_querier(scenario, &request, span.clone())
|
||||
for scenario in &scenarios {
|
||||
let result = prepare_data_to_querier(scenario, &request, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_record_batches()
|
||||
|
@ -638,9 +591,8 @@ mod tests {
|
|||
"| Wilmington | | 1970-01-01T00:00:00.000000035Z |",
|
||||
"+------------+------+--------------------------------+",
|
||||
];
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let result = prepare_data_to_querier(scenario, &request, span.clone())
|
||||
for scenario in &scenarios {
|
||||
let result = prepare_data_to_querier(scenario, &request, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_record_batches()
|
||||
|
@ -655,9 +607,8 @@ mod tests {
|
|||
vec![],
|
||||
None,
|
||||
));
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let err = prepare_data_to_querier(scenario, &request, span.clone())
|
||||
for scenario in &scenarios {
|
||||
let err = prepare_data_to_querier(scenario, &request, None)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::TableNotFound { .. });
|
||||
|
@ -670,9 +621,8 @@ mod tests {
|
|||
vec![],
|
||||
None,
|
||||
));
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let err = prepare_data_to_querier(scenario, &request, span.clone())
|
||||
for scenario in &scenarios {
|
||||
let err = prepare_data_to_querier(scenario, &request, None)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::NamespaceNotFound { .. });
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
//! Module to handle query on Ingester's data
|
||||
//! An adaptor over a set of [`RecordBatch`] allowing them to be used as an IOx
|
||||
//! [`QueryChunk`].
|
||||
|
||||
use std::{any::Any, sync::Arc};
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::util::ensure_schema;
|
||||
use data_types::{
|
||||
ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
|
||||
TimestampMinMax,
|
||||
ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax,
|
||||
};
|
||||
use datafusion::{
|
||||
error::DataFusionError,
|
||||
|
@ -21,11 +21,12 @@ use iox_query::{
|
|||
QueryChunk, QueryChunkMeta,
|
||||
};
|
||||
use observability_deps::tracing::trace;
|
||||
use once_cell::sync::OnceCell;
|
||||
use predicate::Predicate;
|
||||
use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use crate::data::{partition::SnapshotBatch, table::TableName};
|
||||
use crate::data::table::TableName;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -47,72 +48,106 @@ pub enum Error {
|
|||
/// A specialized `Error` for Ingester's Query errors
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Queryable data used for both query and persistence
|
||||
/// A queryable wrapper over a set of ordered [`RecordBatch`] snapshot from a
|
||||
/// single [`PartitionData`].
|
||||
///
|
||||
/// It is an invariant that a [`QueryAdaptor`] MUST always contain at least one
|
||||
/// row. This frees the caller of having to reason about empty [`QueryAdaptor`]
|
||||
/// instances yielding empty [`RecordBatch`].
|
||||
///
|
||||
/// [`PartitionData`]: crate::data::partition::PartitionData
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub(crate) struct QueryableBatch {
|
||||
/// data
|
||||
pub(crate) data: Vec<Arc<SnapshotBatch>>,
|
||||
pub(crate) struct QueryAdaptor {
|
||||
/// The snapshot data from a partition.
|
||||
///
|
||||
/// This MUST be non-pub / closed for modification / immutable to support
|
||||
/// interning the merged schema in [`Self::schema()`].
|
||||
data: Vec<Arc<RecordBatch>>,
|
||||
|
||||
/// This is needed to return a reference for a trait function
|
||||
pub(crate) table_name: TableName,
|
||||
/// The name of the table this data is part of.
|
||||
table_name: TableName,
|
||||
|
||||
/// Partition ID
|
||||
pub(crate) partition_id: PartitionId,
|
||||
/// The catalog ID of the partition the this data is part of.
|
||||
partition_id: PartitionId,
|
||||
|
||||
/// An interned schema for all [`RecordBatch`] in data.
|
||||
schema: OnceCell<Arc<Schema>>,
|
||||
}
|
||||
|
||||
impl QueryableBatch {
|
||||
/// Initilaize a QueryableBatch
|
||||
impl QueryAdaptor {
|
||||
/// Construct a [`QueryAdaptor`].
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This constructor panics if `data` contains no [`RecordBatch`], or all
|
||||
/// [`RecordBatch`] are empty.
|
||||
pub(crate) fn new(
|
||||
table_name: TableName,
|
||||
partition_id: PartitionId,
|
||||
data: Vec<Arc<SnapshotBatch>>,
|
||||
data: Vec<Arc<RecordBatch>>,
|
||||
) -> Self {
|
||||
// There must always be at least one record batch and one row.
|
||||
//
|
||||
// This upholds an invariant that simplifies dealing with empty
|
||||
// partitions - if there is a QueryAdaptor, it contains data.
|
||||
assert!(data.iter().map(|b| b.num_rows()).sum::<usize>() > 0);
|
||||
|
||||
Self {
|
||||
data,
|
||||
table_name,
|
||||
partition_id,
|
||||
schema: OnceCell::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add snapshots to this batch
|
||||
pub(crate) fn with_data(mut self, mut data: Vec<Arc<SnapshotBatch>>) -> Self {
|
||||
self.data.append(&mut data);
|
||||
self
|
||||
pub(crate) fn project_selection(&self, selection: Selection<'_>) -> Vec<RecordBatch> {
|
||||
// Project the column selection across all RecordBatch
|
||||
self.data
|
||||
.iter()
|
||||
.map(|data| {
|
||||
let batch = data.as_ref();
|
||||
let schema = batch.schema();
|
||||
|
||||
// Apply selection to in-memory batch
|
||||
match selection {
|
||||
Selection::All => batch.clone(),
|
||||
Selection::Some(columns) => {
|
||||
let projection = columns
|
||||
.iter()
|
||||
.flat_map(|&column_name| {
|
||||
// ignore non-existing columns
|
||||
schema.index_of(column_name).ok()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
batch.project(&projection).expect("bug in projection")
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// return min and max of all the snapshots
|
||||
pub(crate) fn min_max_sequence_numbers(&self) -> (SequenceNumber, SequenceNumber) {
|
||||
let min = self
|
||||
.data
|
||||
.first()
|
||||
.expect("The Queryable Batch should not empty")
|
||||
.min_sequence_number;
|
||||
/// Returns the [`RecordBatch`] instances in this [`QueryAdaptor`].
|
||||
pub(crate) fn record_batches(&self) -> &[Arc<RecordBatch>] {
|
||||
self.data.as_ref()
|
||||
}
|
||||
|
||||
let max = self
|
||||
.data
|
||||
.first()
|
||||
.expect("The Queryable Batch should not empty")
|
||||
.max_sequence_number;
|
||||
|
||||
assert!(min <= max);
|
||||
|
||||
(min, max)
|
||||
/// Returns the partition ID from which the data this [`QueryAdaptor`] was
|
||||
/// sourced from.
|
||||
pub(crate) fn partition_id(&self) -> PartitionId {
|
||||
self.partition_id
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryChunkMeta for QueryableBatch {
|
||||
impl QueryChunkMeta for QueryAdaptor {
|
||||
fn summary(&self) -> Option<Arc<TableSummary>> {
|
||||
None
|
||||
}
|
||||
|
||||
fn schema(&self) -> Arc<Schema> {
|
||||
// TODO: may want store this schema as a field of QueryableBatch and
|
||||
// only do this schema merge the first time it is called
|
||||
|
||||
// Merge schema of all RecordBatches of the PerstingBatch
|
||||
let batches: Vec<Arc<RecordBatch>> =
|
||||
self.data.iter().map(|s| Arc::clone(&s.data)).collect();
|
||||
merge_record_batch_schemas(&batches)
|
||||
Arc::clone(
|
||||
self.schema
|
||||
.get_or_init(|| merge_record_batch_schemas(&self.data)),
|
||||
)
|
||||
}
|
||||
|
||||
fn partition_sort_key(&self) -> Option<&SortKey> {
|
||||
|
@ -139,11 +174,11 @@ impl QueryChunkMeta for QueryableBatch {
|
|||
}
|
||||
}
|
||||
|
||||
impl QueryChunk for QueryableBatch {
|
||||
impl QueryChunk for QueryAdaptor {
|
||||
// This function should not be used in QueryBatch context
|
||||
fn id(&self) -> ChunkId {
|
||||
// To return a value for debugging and make it consistent with ChunkId created in Compactor,
|
||||
// use Uuid for this
|
||||
// To return a value for debugging and make it consistent with ChunkId
|
||||
// created in Compactor, use Uuid for this
|
||||
ChunkId::new()
|
||||
}
|
||||
|
||||
|
@ -152,10 +187,11 @@ impl QueryChunk for QueryableBatch {
|
|||
&self.table_name
|
||||
}
|
||||
|
||||
/// Returns true if the chunk may contain a duplicate "primary
|
||||
/// key" within itself
|
||||
/// Returns true if the chunk may contain a duplicate "primary key" within
|
||||
/// itself
|
||||
fn may_contain_pk_duplicates(&self) -> bool {
|
||||
// always true because they are not deduplicated yet
|
||||
// always true because the rows across record batches have not been
|
||||
// de-duplicated.
|
||||
true
|
||||
}
|
||||
|
||||
|
@ -204,22 +240,15 @@ impl QueryChunk for QueryableBatch {
|
|||
.context(SchemaSnafu)
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
// Get all record batches from their snapshots
|
||||
// Apply the projection over all the data in self, ensuring each batch
|
||||
// has the specified schema.
|
||||
let batches = self
|
||||
.data
|
||||
.iter()
|
||||
.filter_map(|snapshot| {
|
||||
let batch = snapshot
|
||||
// Only return columns in the selection
|
||||
.scan(selection)
|
||||
.context(FilterColumnsSnafu {})
|
||||
.transpose()?
|
||||
// ensure batch has desired schema
|
||||
.and_then(|batch| {
|
||||
ensure_schema(&schema.as_arrow(), &batch).context(ConcatBatchesSnafu {})
|
||||
})
|
||||
.map(Arc::new);
|
||||
Some(batch)
|
||||
.project_selection(selection)
|
||||
.into_iter()
|
||||
.map(|batch| {
|
||||
ensure_schema(&schema.as_arrow(), &batch)
|
||||
.context(ConcatBatchesSnafu {})
|
||||
.map(Arc::new)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
@ -233,10 +262,9 @@ impl QueryChunk for QueryableBatch {
|
|||
|
||||
/// Returns chunk type
|
||||
fn chunk_type(&self) -> &str {
|
||||
"PersistingBatch"
|
||||
"QueryAdaptor"
|
||||
}
|
||||
|
||||
// This function should not be used in PersistingBatch context
|
||||
fn order(&self) -> ChunkOrder {
|
||||
unimplemented!()
|
||||
}
|
|
@ -7,9 +7,8 @@ use std::{sync::Arc, time::Duration};
|
|||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_eq;
|
||||
use bitflags::bitflags;
|
||||
use data_types::{
|
||||
NamespaceId, PartitionId, PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId,
|
||||
NamespaceId, PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId,
|
||||
};
|
||||
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
||||
use iox_catalog::{interface::Catalog, mem::MemCatalog};
|
||||
|
@ -17,71 +16,12 @@ use iox_query::test::{raw_data, TestChunk};
|
|||
use iox_time::{SystemProvider, Time};
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use object_store::memory::InMemory;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
data::{
|
||||
partition::{resolver::CatalogPartitionResolver, PersistingBatch, SnapshotBatch},
|
||||
IngesterData,
|
||||
},
|
||||
data::{partition::resolver::CatalogPartitionResolver, IngesterData},
|
||||
lifecycle::{LifecycleConfig, LifecycleManager},
|
||||
query::QueryableBatch,
|
||||
};
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn make_persisting_batch(
|
||||
shard_id: i64,
|
||||
seq_num_start: i64,
|
||||
table_id: i64,
|
||||
table_name: &str,
|
||||
partition_id: i64,
|
||||
object_store_id: Uuid,
|
||||
batches: Vec<Arc<RecordBatch>>,
|
||||
) -> Arc<PersistingBatch> {
|
||||
let queryable_batch = make_queryable_batch(table_name, partition_id, seq_num_start, batches);
|
||||
Arc::new(PersistingBatch {
|
||||
shard_id: ShardId::new(shard_id),
|
||||
table_id: TableId::new(table_id),
|
||||
partition_id: PartitionId::new(partition_id),
|
||||
object_store_id,
|
||||
data: queryable_batch,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn make_queryable_batch(
|
||||
table_name: &str,
|
||||
partition_id: i64,
|
||||
seq_num_start: i64,
|
||||
batches: Vec<Arc<RecordBatch>>,
|
||||
) -> Arc<QueryableBatch> {
|
||||
// make snapshots for the batches
|
||||
let mut snapshots = vec![];
|
||||
let mut seq_num = seq_num_start;
|
||||
for batch in batches {
|
||||
let seq = SequenceNumber::new(seq_num);
|
||||
snapshots.push(Arc::new(make_snapshot_batch(batch, seq, seq)));
|
||||
seq_num += 1;
|
||||
}
|
||||
|
||||
Arc::new(QueryableBatch::new(
|
||||
table_name.into(),
|
||||
PartitionId::new(partition_id),
|
||||
snapshots,
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn make_snapshot_batch(
|
||||
batch: Arc<RecordBatch>,
|
||||
min: SequenceNumber,
|
||||
max: SequenceNumber,
|
||||
) -> SnapshotBatch {
|
||||
SnapshotBatch {
|
||||
min_sequence_number: min,
|
||||
max_sequence_number: max,
|
||||
data: batch,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn create_one_row_record_batch_with_influxtype() -> Vec<Arc<RecordBatch>> {
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
|
@ -506,32 +446,9 @@ pub(crate) const TEST_TABLE: &str = "test_table";
|
|||
pub(crate) const TEST_PARTITION_1: &str = "test+partition_1";
|
||||
pub(crate) const TEST_PARTITION_2: &str = "test+partition_2";
|
||||
|
||||
bitflags! {
|
||||
/// Make the same in-memory data but data are split between:
|
||||
/// . one or two partition
|
||||
/// . The first partition will have a choice to have data in either
|
||||
/// . buffer only
|
||||
/// . snapshot only
|
||||
/// . persisting only
|
||||
/// . buffer + snapshot
|
||||
/// . buffer + persisting
|
||||
/// . snapshot + persisting
|
||||
/// . buffer + snapshot + persisting
|
||||
/// . If the second partittion exists, it only has data in its buffer
|
||||
pub(crate) struct DataLocation: u8 {
|
||||
const BUFFER = 0b001;
|
||||
const SNAPSHOT = 0b010;
|
||||
const PERSISTING = 0b100;
|
||||
const BUFFER_SNAPSHOT = Self::BUFFER.bits | Self::SNAPSHOT.bits;
|
||||
const BUFFER_PERSISTING = Self::BUFFER.bits | Self::PERSISTING.bits;
|
||||
const SNAPSHOT_PERSISTING = Self::SNAPSHOT.bits | Self::PERSISTING.bits;
|
||||
const BUFFER_SNAPSHOT_PERSISTING = Self::BUFFER.bits | Self::SNAPSHOT.bits | Self::PERSISTING.bits;
|
||||
}
|
||||
}
|
||||
|
||||
/// This function produces one scenario but with the parameter combination (2*7),
|
||||
/// you will be able to produce 14 scenarios by calling it in 2 loops
|
||||
pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterData {
|
||||
pub(crate) async fn make_ingester_data(two_partitions: bool) -> IngesterData {
|
||||
// Whatever data because they won't be used in the tests
|
||||
let metrics: Arc<metric::Registry> = Default::default();
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
@ -576,26 +493,6 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation)
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
if loc.contains(DataLocation::PERSISTING) {
|
||||
// Move partition 1 data to persisting
|
||||
let _ignored = ingester
|
||||
.shard(shard_id)
|
||||
.unwrap()
|
||||
.namespace(&TEST_NAMESPACE.into())
|
||||
.unwrap()
|
||||
.snapshot_to_persisting(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1))
|
||||
.await;
|
||||
} else if loc.contains(DataLocation::SNAPSHOT) {
|
||||
// move partition 1 data to snapshot
|
||||
let _ignored = ingester
|
||||
.shard(shard_id)
|
||||
.unwrap()
|
||||
.namespace(&TEST_NAMESPACE.into())
|
||||
.unwrap()
|
||||
.snapshot(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1))
|
||||
.await;
|
||||
}
|
||||
|
||||
ingester
|
||||
}
|
||||
|
||||
|
|
|
@ -40,8 +40,7 @@ mod deduplicate;
|
|||
pub mod overlap;
|
||||
mod physical;
|
||||
use self::overlap::group_potential_duplicates;
|
||||
pub(crate) use deduplicate::DeduplicateExec;
|
||||
pub use deduplicate::RecordBatchDeduplicator;
|
||||
pub use deduplicate::{DeduplicateExec, RecordBatchDeduplicator};
|
||||
pub(crate) use physical::IOxReadFilterNode;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
|
Loading…
Reference in New Issue