Merge pull request #5958 from influxdata/dom/buffer-state-machine

refactor(ingester): use partition buffer FSM
pull/24376/head
kodiakhq[bot] 2022-10-28 08:52:56 +00:00 committed by GitHub
commit 3568564d39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1471 additions and 1339 deletions

2
Cargo.lock generated
View File

@ -2279,7 +2279,6 @@ dependencies = [
"assert_matches",
"async-trait",
"backoff",
"bitflags",
"bytes",
"chrono",
"data_types",
@ -2293,6 +2292,7 @@ dependencies = [
"iox_catalog",
"iox_query",
"iox_time",
"lazy_static",
"metric",
"mutable_batch",
"mutable_batch_lp",

View File

@ -47,11 +47,11 @@ write_summary = { path = "../write_summary" }
tokio-util = { version = "0.7.4" }
trace = { path = "../trace" }
rand = "0.8.5"
once_cell = "1"
[dev-dependencies]
assert_matches = "1.5.0"
bitflags = {version = "1.3.2"}
once_cell = "1"
lazy_static = "1.4.0"
paste = "1.0.9"
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
tokio-stream = {version = "0.1.11", default_features = false }

View File

@ -11,7 +11,7 @@ use iox_query::{
use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey};
use snafu::{ResultExt, Snafu};
use crate::{data::partition::PersistingBatch, query::QueryableBatch};
use crate::query_adaptor::QueryAdaptor;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
@ -85,14 +85,14 @@ impl std::fmt::Debug for CompactedStream {
}
}
/// Compact a given persisting batch into a [`CompactedStream`] or
/// `None` if there is no data to compact.
/// Compact a given batch into a [`CompactedStream`] or `None` if there is no
/// data to compact, returning an updated sort key, if any.
pub(crate) async fn compact_persisting_batch(
executor: &Executor,
sort_key: Option<SortKey>,
batch: Arc<PersistingBatch>,
batch: QueryAdaptor,
) -> Result<CompactedStream> {
assert!(!batch.data.data.is_empty());
assert!(!batch.record_batches().is_empty());
// Get sort key from the catalog or compute it from
// cardinality.
@ -104,12 +104,12 @@ pub(crate) async fn compact_persisting_batch(
//
// If there are any new columns, add them to the end of the sort key in the catalog and
// return that to be updated in the catalog.
adjust_sort_key_columns(&sk, &batch.data.schema().primary_key())
adjust_sort_key_columns(&sk, &batch.schema().primary_key())
}
None => {
let sort_key = compute_sort_key(
batch.data.schema().as_ref(),
batch.data.data.iter().map(|sb| sb.data.as_ref()),
batch.schema().as_ref(),
batch.record_batches().iter().map(|sb| sb.as_ref()),
);
// Use the sort key computed from the cardinality as the sort key for this parquet
// file's metadata, also return the sort key to be stored in the catalog
@ -118,7 +118,7 @@ pub(crate) async fn compact_persisting_batch(
};
// Compact
let stream = compact(executor, Arc::clone(&batch.data), data_sort_key.clone()).await?;
let stream = compact(executor, Arc::new(batch), data_sort_key.clone()).await?;
Ok(CompactedStream {
stream,
@ -127,10 +127,10 @@ pub(crate) async fn compact_persisting_batch(
})
}
/// Compact a given Queryable Batch
/// Compact a given batch without updating the sort key.
pub(crate) async fn compact(
executor: &Executor,
data: Arc<QueryableBatch>,
data: Arc<QueryAdaptor>,
sort_key: SortKey,
) -> Result<SendableRecordBatchStream> {
// Build logical plan for compaction
@ -157,9 +157,9 @@ pub(crate) async fn compact(
#[cfg(test)]
mod tests {
use arrow_util::assert_batches_eq;
use data_types::PartitionId;
use mutable_batch_lp::lines_to_batches;
use schema::selection::Selection;
use uuid::Uuid;
use super::*;
use crate::test_util::{
@ -169,14 +169,14 @@ mod tests {
create_batches_with_influxtype_same_columns_different_type,
create_one_record_batch_with_influxtype_duplicates,
create_one_record_batch_with_influxtype_no_duplicates,
create_one_row_record_batch_with_influxtype, make_persisting_batch, make_queryable_batch,
create_one_row_record_batch_with_influxtype,
};
// this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782
// where if sending in a single row it would compact into an output of two batches, one of
// which was empty, which would cause this to panic.
#[tokio::test]
async fn test_compact_persisting_batch_on_one_record_batch_with_one_row() {
async fn test_compact_batch_on_one_record_batch_with_one_row() {
// create input data
let batch = lines_to_batches("cpu bar=2 20", 0)
.unwrap()
@ -184,26 +184,15 @@ mod tests {
.unwrap()
.to_arrow(Selection::All)
.unwrap();
let batches = vec![Arc::new(batch)];
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let table_name = "test_table";
let shard_id = 1;
let seq_num_start: i64 = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
shard_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
vec![Arc::new(batch)],
);
// verify PK
let schema = persisting_batch.data.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["time"];
assert_eq!(expected_pk, pk);
@ -211,7 +200,7 @@ mod tests {
// compact
let exc = Executor::new(1);
let CompactedStream { stream, .. } =
compact_persisting_batch(&exc, Some(SortKey::empty()), persisting_batch)
compact_persisting_batch(&exc, Some(SortKey::empty()), batch)
.await
.unwrap();
@ -232,29 +221,16 @@ mod tests {
}
#[tokio::test]
async fn test_compact_persisting_batch_on_one_record_batch_no_dupilcates() {
async fn test_compact_batch_on_one_record_batch_no_dupilcates() {
// create input data
let batches = create_one_record_batch_with_influxtype_no_duplicates().await;
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let table_name = "test_table";
let shard_id = 1;
let seq_num_start: i64 = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
shard_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_one_record_batch_with_influxtype_no_duplicates().await,
);
// verify PK
let schema = persisting_batch.data.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk);
@ -265,7 +241,7 @@ mod tests {
stream,
data_sort_key,
catalog_sort_key_update,
} = compact_persisting_batch(&exc, Some(SortKey::empty()), persisting_batch)
} = compact_persisting_batch(&exc, Some(SortKey::empty()), batch)
.await
.unwrap();
@ -295,29 +271,16 @@ mod tests {
}
#[tokio::test]
async fn test_compact_persisting_batch_no_sort_key() {
async fn test_compact_batch_no_sort_key() {
// create input data
let batches = create_batches_with_influxtype_different_cardinality().await;
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let table_name = "test_table";
let shard_id = 1;
let seq_num_start: i64 = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
shard_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_batches_with_influxtype_different_cardinality().await,
);
// verify PK
let schema = persisting_batch.data.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag3", "time"];
assert_eq!(expected_pk, pk);
@ -329,7 +292,7 @@ mod tests {
stream,
data_sort_key,
catalog_sort_key_update,
} = compact_persisting_batch(&exc, Some(SortKey::empty()), persisting_batch)
} = compact_persisting_batch(&exc, Some(SortKey::empty()), batch)
.await
.unwrap();
@ -363,29 +326,16 @@ mod tests {
}
#[tokio::test]
async fn test_compact_persisting_batch_with_specified_sort_key() {
async fn test_compact_batch_with_specified_sort_key() {
// create input data
let batches = create_batches_with_influxtype_different_cardinality().await;
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let table_name = "test_table";
let shard_id = 1;
let seq_num_start: i64 = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
shard_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_batches_with_influxtype_different_cardinality().await,
);
// verify PK
let schema = persisting_batch.data.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag3", "time"];
assert_eq!(expected_pk, pk);
@ -401,7 +351,7 @@ mod tests {
} = compact_persisting_batch(
&exc,
Some(SortKey::from_columns(["tag3", "tag1", "time"])),
persisting_batch,
batch,
)
.await
.unwrap();
@ -435,29 +385,16 @@ mod tests {
}
#[tokio::test]
async fn test_compact_persisting_batch_new_column_for_sort_key() {
async fn test_compact_batch_new_column_for_sort_key() {
// create input data
let batches = create_batches_with_influxtype_different_cardinality().await;
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let table_name = "test_table";
let shard_id = 1;
let seq_num_start: i64 = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
shard_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_batches_with_influxtype_different_cardinality().await,
);
// verify PK
let schema = persisting_batch.data.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag3", "time"];
assert_eq!(expected_pk, pk);
@ -471,13 +408,9 @@ mod tests {
stream,
data_sort_key,
catalog_sort_key_update,
} = compact_persisting_batch(
&exc,
Some(SortKey::from_columns(["tag3", "time"])),
persisting_batch,
)
.await
.unwrap();
} = compact_persisting_batch(&exc, Some(SortKey::from_columns(["tag3", "time"])), batch)
.await
.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -511,29 +444,16 @@ mod tests {
}
#[tokio::test]
async fn test_compact_persisting_batch_missing_column_for_sort_key() {
async fn test_compact_batch_missing_column_for_sort_key() {
// create input data
let batches = create_batches_with_influxtype_different_cardinality().await;
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let table_name = "test_table";
let shard_id = 1;
let seq_num_start: i64 = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
shard_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_batches_with_influxtype_different_cardinality().await,
);
// verify PK
let schema = persisting_batch.data.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag3", "time"];
assert_eq!(expected_pk, pk);
@ -550,7 +470,7 @@ mod tests {
} = compact_persisting_batch(
&exc,
Some(SortKey::from_columns(["tag3", "tag1", "tag4", "time"])),
persisting_batch,
batch,
)
.await
.unwrap();
@ -588,26 +508,25 @@ mod tests {
test_helpers::maybe_start_logging();
// create input data
let batches = create_one_row_record_batch_with_influxtype().await;
// build queryable batch from the input batches
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_one_row_record_batch_with_influxtype().await,
);
// verify PK
let schema = compact_batch.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
let exc = Executor::new(1);
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.unwrap();
@ -629,26 +548,25 @@ mod tests {
#[tokio::test]
async fn test_compact_one_batch_with_duplicates() {
// create input data
let batches = create_one_record_batch_with_influxtype_duplicates().await;
// build queryable batch from the input batches
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_one_record_batch_with_influxtype_duplicates().await,
);
// verify PK
let schema = compact_batch.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
let exc = Executor::new(1);
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.unwrap();
@ -678,26 +596,25 @@ mod tests {
#[tokio::test]
async fn test_compact_many_batches_same_columns_with_duplicates() {
// create many-batches input data
let batches = create_batches_with_influxtype().await;
// build queryable batch from the input batches
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_batches_with_influxtype().await,
);
// verify PK
let schema = compact_batch.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
let exc = Executor::new(1);
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.unwrap();
@ -724,26 +641,25 @@ mod tests {
#[tokio::test]
async fn test_compact_many_batches_different_columns_with_duplicates() {
// create many-batches input data
let batches = create_batches_with_influxtype_different_columns().await;
// build queryable batch from the input batches
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_batches_with_influxtype_different_columns().await,
);
// verify PK
let schema = compact_batch.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact
let exc = Executor::new(1);
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.unwrap();
@ -774,26 +690,25 @@ mod tests {
#[tokio::test]
async fn test_compact_many_batches_different_columns_different_order_with_duplicates() {
// create many-batches input data
let batches = create_batches_with_influxtype_different_columns_different_order().await;
// build queryable batch from the input batches
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_batches_with_influxtype_different_columns_different_order().await,
);
// verify PK
let schema = compact_batch.schema();
let schema = batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact
let exc = Executor::new(1);
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let stream = compact(&exc, Arc::new(batch), sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.unwrap();
@ -823,68 +738,17 @@ mod tests {
assert_batches_eq!(&expected, &output_batches);
}
// BUG
#[tokio::test]
async fn test_compact_many_batches_different_columns_different_order_with_duplicates2() {
// create many-batches input data
let batches = create_batches_with_influxtype_different_columns_different_order().await;
// build queryable batch from the input batches
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
// verify PK
let schema = compact_batch.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact
let exc = Executor::new(1);
let stream = compact(&exc, compact_batch, sort_key).await.unwrap();
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
.unwrap();
// verify compacted data
// data is sorted and all duplicates are removed
let expected = vec![
"+-----------+------+------+--------------------------------+",
"| field_int | tag1 | tag2 | time |",
"+-----------+------+------+--------------------------------+",
"| 5 | | AL | 1970-01-01T00:00:00.000005Z |",
"| 10 | | AL | 1970-01-01T00:00:00.000007Z |",
"| 70 | | CT | 1970-01-01T00:00:00.000000100Z |",
"| 1000 | | CT | 1970-01-01T00:00:00.000001Z |",
"| 100 | | MA | 1970-01-01T00:00:00.000000050Z |",
"| 10 | AL | MA | 1970-01-01T00:00:00.000000050Z |",
"| 70 | CT | CT | 1970-01-01T00:00:00.000000100Z |",
"| 70 | CT | CT | 1970-01-01T00:00:00.000000500Z |",
"| 30 | MT | AL | 1970-01-01T00:00:00.000000005Z |",
"| 20 | MT | AL | 1970-01-01T00:00:00.000007Z |",
"| 1000 | MT | CT | 1970-01-01T00:00:00.000001Z |",
"| 1000 | MT | CT | 1970-01-01T00:00:00.000002Z |",
"+-----------+------+------+--------------------------------+",
];
assert_batches_eq!(&expected, &output_batches);
}
#[tokio::test]
#[should_panic(expected = "Schemas compatible")]
async fn test_compact_many_batches_same_columns_different_types() {
// create many-batches input data
let batches = create_batches_with_influxtype_same_columns_different_type().await;
let batch = QueryAdaptor::new(
"test_table".into(),
PartitionId::new(1),
create_batches_with_influxtype_same_columns_different_type().await,
);
// build queryable batch from the input batches
let compact_batch = make_queryable_batch("test_table", 0, 1, batches);
// the schema merge will thorw a panic
compact_batch.schema();
// the schema merge should throw a panic
batch.schema();
}
}

View File

@ -20,6 +20,7 @@ use parquet_file::{
storage::{ParquetStorage, StorageId},
};
use snafu::{OptionExt, Snafu};
use uuid::Uuid;
use write_summary::ShardProgress;
use crate::{
@ -29,9 +30,12 @@ use crate::{
pub(crate) mod namespace;
pub mod partition;
mod sequence_range;
pub(crate) mod shard;
pub(crate) mod table;
pub(crate) use sequence_range::*;
use self::{partition::resolver::PartitionProvider, shard::ShardData};
#[cfg(test)]
@ -245,26 +249,32 @@ impl Persister for IngesterData {
) {
// lookup the state from the ingester data. If something isn't found,
// it's unexpected. Crash so someone can take a look.
let shard_data = self
let namespace = self
.shards
.get(&shard_id)
.unwrap_or_else(|| panic!("shard state for {shard_id} not in ingester data"));
let namespace = shard_data
.namespace_by_id(namespace_id)
.and_then(|s| s.namespace_by_id(namespace_id))
.unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state"));
let namespace_name = namespace.namespace_name();
// Assert the namespace ID matches the index key.
assert_eq!(namespace.namespace_id(), namespace_id);
let table_data = namespace.table_id(table_id).unwrap_or_else(|| {
panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state")
});
let partition_key;
let table_name;
let batch;
let partition_key;
let sort_key;
let last_persisted_sequence_number;
let batch;
let batch_sequence_number_range;
{
let mut guard = table_data.write().await;
// Assert various properties of the table to ensure the index is
// correct, out of an abundance of caution.
assert_eq!(guard.shard_id(), shard_id);
assert_eq!(guard.namespace_id(), namespace_id);
assert_eq!(guard.table_id(), table_id);
table_name = guard.table_name().clone();
let partition = guard.get_partition(partition_id).unwrap_or_else(|| {
@ -273,12 +283,35 @@ impl Persister for IngesterData {
)
});
// Assert various properties of the partition to ensure the index is
// correct, out of an abundance of caution.
assert_eq!(partition.partition_id(), partition_id);
assert_eq!(partition.shard_id(), shard_id);
assert_eq!(partition.namespace_id(), namespace_id);
assert_eq!(partition.table_id(), table_id);
assert_eq!(*partition.table_name(), table_name);
partition_key = partition.partition_key().clone();
batch = partition.snapshot_to_persisting_batch();
sort_key = partition.sort_key().clone();
last_persisted_sequence_number = partition.max_persisted_sequence_number();
// The sequence number MUST be read without releasing the write lock
// to ensure a consistent snapshot of batch contents and batch
// sequence number range.
batch = partition.mark_persisting();
batch_sequence_number_range = partition.sequence_number_range();
};
// From this point on, the code MUST be infallible.
//
// The partition data was moved to the persisting slot, and any
// subsequent calls would be an error.
//
// This is NOT an invariant, and this could be changed in the future to
// allow partitions to be marked as persisting repeatedly. Today
// however, the code is infallible (or rather, terminal - it does cause
// a retry).
let sort_key = sort_key.get().await;
trace!(
%shard_id,
@ -306,8 +339,13 @@ impl Persister for IngesterData {
// Check if there is any data to persist.
let batch = match batch {
Some(v) if !v.data.data.is_empty() => v,
_ => {
Some(v) => {
// The partition state machine will NOT return an empty batch.
assert!(!v.record_batches().is_empty());
v
}
None => {
// But it MAY return no batch at all.
warn!(
%shard_id,
%namespace_id,
@ -322,17 +360,6 @@ impl Persister for IngesterData {
}
};
assert_eq!(batch.shard_id(), shard_id);
assert_eq!(batch.table_id(), table_id);
assert_eq!(batch.partition_id(), partition_id);
// Read the maximum SequenceNumber in the batch.
let (_min, max_sequence_number) = batch.data.min_max_sequence_numbers();
// Read the future object store ID before passing the batch into
// compaction, instead of retaining a copy of the data post-compaction.
let object_store_id = batch.object_store_id();
// do the CPU intensive work of compaction, de-duplication and sorting
let CompactedStream {
stream: record_stream,
@ -342,6 +369,10 @@ impl Persister for IngesterData {
.await
.expect("unable to compact persisting batch");
// Generate a UUID to uniquely identify this parquet file in object
// storage.
let object_store_id = Uuid::new_v4();
// Construct the metadata for this parquet file.
let iox_metadata = IoxMetadata {
object_store_id,
@ -353,7 +384,7 @@ impl Persister for IngesterData {
table_name: Arc::clone(&*table_name),
partition_id,
partition_key: partition_key.clone(),
max_sequence_number,
max_sequence_number: batch_sequence_number_range.inclusive_max().unwrap(),
compaction_level: CompactionLevel::Initial,
sort_key: Some(data_sort_key),
};
@ -503,15 +534,28 @@ impl Persister for IngesterData {
.recorder(attributes)
.record(file_size as u64);
// and remove the persisted data from memory
namespace
.mark_persisted(
&table_name,
&partition_key,
iox_metadata.max_sequence_number,
)
.await;
debug!(
// Mark the partition as having completed persistence, causing it to
// release the reference to the in-flight persistence data it is
// holding.
//
// This SHOULD cause the data to be dropped, but there MAY be ongoing
// queries that currently hold a reference to the data. In either case,
// the persisted data will be dropped "shortly".
table_data
.write()
.await
.get_partition(partition_id)
.unwrap()
.mark_persisted(iox_metadata.max_sequence_number);
// BUG: ongoing queries retain references to the persisting data,
// preventing it from being dropped, but memory is released back to
// lifecycle memory tracker when this fn returns.
//
// https://github.com/influxdata/influxdb_iox/issues/5872
//
info!(
%object_store_id,
%shard_id,
%namespace_id,
@ -521,7 +565,7 @@ impl Persister for IngesterData {
%partition_id,
%partition_key,
max_sequence_number=%iox_metadata.max_sequence_number.get(),
"marked partition as persisted"
"persisted partition"
);
}
@ -656,8 +700,21 @@ mod tests {
.await
.unwrap();
assert_matches!(action, DmlApplyAction::Applied(false));
let w2 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
Some("1970-01-01".into()),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(2)),
ignored_ts,
None,
50,
),
);
let action = data
.buffer_operation(shard1.id, DmlOperation::Write(w1), &manager.handle())
.buffer_operation(shard1.id, DmlOperation::Write(w2), &manager.handle())
.await
.unwrap();
assert_matches!(action, DmlApplyAction::Applied(true));
@ -1016,11 +1073,15 @@ mod tests {
assert_eq!(buckets_with_counts, &[500 * 1024]);
let mem_table = n.table_data(&"mem".into()).unwrap();
let mem_table = mem_table.read().await;
// verify that the parquet_max_sequence_number got updated
assert_eq!(
mem_table.parquet_max_sequence_number(),
mem_table
.write()
.await
.get_partition(partition_id)
.unwrap()
.max_persisted_sequence_number(),
Some(SequenceNumber::new(2))
);
@ -1310,13 +1371,17 @@ mod tests {
.unwrap();
{
let table_data = data.table_data(&"mem".into()).unwrap();
let table = table_data.read().await;
let p = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
let mut table = table_data.write().await;
assert!(table
.partition_iter_mut()
.all(|p| p.get_query_data().is_none()));
assert_eq!(
p.max_persisted_sequence_number(),
table
.get_partition_by_key(&"1970-01-01".into())
.unwrap()
.max_persisted_sequence_number(),
Some(SequenceNumber::new(1))
);
assert!(p.data.buffer.is_none());
}
assert_matches!(action, DmlApplyAction::Skipped);
@ -1329,8 +1394,8 @@ mod tests {
let table = table_data.read().await;
let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
assert_eq!(
partition.data.buffer.as_ref().unwrap().min_sequence_number,
SequenceNumber::new(2)
partition.sequence_number_range().inclusive_min(),
Some(SequenceNumber::new(2))
);
assert_matches!(data.table_count().observe(), Observation::U64Counter(v) => {

View File

@ -2,7 +2,7 @@
use std::{collections::HashMap, sync::Arc};
use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId};
use data_types::{NamespaceId, SequenceNumber, ShardId, TableId};
use dml::DmlOperation;
use iox_catalog::interface::Catalog;
use metric::U64Counter;
@ -253,52 +253,7 @@ impl NamespaceData {
}
}
/// Snapshots the mutable buffer for the partition, which clears it out and moves it over to
/// snapshots. Then return a vec of the snapshots and the optional persisting batch.
#[cfg(test)] // Only used in tests
pub(crate) async fn snapshot(
&self,
table_name: &TableName,
partition_key: &PartitionKey,
) -> Option<(
Vec<Arc<super::partition::SnapshotBatch>>,
Option<Arc<super::partition::PersistingBatch>>,
)> {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
return t.get_partition_by_key_mut(partition_key).map(|p| {
p.data
.generate_snapshot()
.expect("snapshot on mutable batch should never fail");
(p.data.snapshots.to_vec(), p.data.persisting.clone())
});
}
None
}
/// Snapshots the mutable buffer for the partition, which clears it out and then moves all
/// snapshots over to a persisting batch, which is returned. If there is no data to snapshot
/// or persist, None will be returned.
#[cfg(test)] // Only used in tests
pub(crate) async fn snapshot_to_persisting(
&self,
table_name: &TableName,
partition_key: &PartitionKey,
) -> Option<Arc<super::partition::PersistingBatch>> {
if let Some(table_data) = self.table_data(table_name) {
let mut table_data = table_data.write().await;
return table_data
.get_partition_by_key_mut(partition_key)
.and_then(|partition_data| partition_data.snapshot_to_persisting_batch());
}
None
}
/// Gets the buffered table data
/// Return the specified [`TableData`] if it exists.
pub(crate) fn table_data(
&self,
table_name: &TableName,
@ -353,30 +308,11 @@ impl NamespaceData {
})
}
/// Walks down the table and partition and clears the persisting batch. The sequence number is
/// the max_sequence_number for the persisted parquet file, which should be kept in the table
/// data buffer.
pub(super) async fn mark_persisted(
&self,
table_name: &TableName,
partition_key: &PartitionKey,
sequence_number: SequenceNumber,
) {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
let partition = t.get_partition_by_key_mut(partition_key);
if let Some(p) = partition {
p.mark_persisted(sequence_number);
}
}
}
/// Return progress from this Namespace
pub(super) async fn progress(&self) -> ShardProgress {
let tables: Vec<_> = self.tables.read().by_id.values().map(Arc::clone).collect();
// Consolidate progtress across partitions.
// Consolidate progress across partitions.
let mut progress = ShardProgress::new()
// Properly account for any sequence number that is
// actively buffering and thus not yet completely
@ -442,7 +378,7 @@ impl<'a> Drop for ScopedSequenceNumber<'a> {
mod tests {
use std::sync::Arc;
use data_types::{PartitionId, ShardIndex};
use data_types::{PartitionId, PartitionKey, ShardIndex};
use metric::{Attributes, Metric};
use crate::{

File diff suppressed because it is too large Load Diff

View File

@ -1,274 +1,112 @@
//! Data for the lifecycle of the Ingester
use std::sync::Arc;
use data_types::{PartitionId, SequenceNumber, ShardId, TableId};
use arrow::record_batch::RecordBatch;
use data_types::SequenceNumber;
use mutable_batch::MutableBatch;
use schema::selection::Selection;
use snafu::ResultExt;
use uuid::Uuid;
use write_summary::ShardProgress;
use crate::data::table::TableName;
use crate::data::SequenceNumberRange;
use super::{PersistingBatch, QueryableBatch, SnapshotBatch};
mod always_some;
mod mutable_buffer;
mod state_machine;
pub(crate) mod traits;
/// Data of an IOx partition split into batches
/// ┌────────────────────────┐ ┌────────────────────────┐ ┌─────────────────────────┐
/// │ Buffer │ │ Snapshots │ │ Persisting │
/// │ ┌───────────────────┐ │ │ │ │ │
/// │ │ ┌───────────────┐│ │ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │
/// │ │ ┌┴──────────────┐│├─┼────────┼─┼─▶┌───────────────┐│ │ │ │ ┌───────────────┐│ │
/// │ │┌┴──────────────┐├┘│ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │
/// │ ││ BufferBatch ├┘ │ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │
/// │ │└───────────────┘ │ │ ┌───┼─▶│ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │
/// │ └───────────────────┘ │ │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │
/// │ ... │ │ │ └───────────────────┘ │ │ └───────────────────┘ │
/// │ ┌───────────────────┐ │ │ │ │ │ │
/// │ │ ┌───────────────┐│ │ │ │ ... │ │ ... │
/// │ │ ┌┴──────────────┐││ │ │ │ │ │ │
/// │ │┌┴──────────────┐├┘│─┼────┘ │ ┌───────────────────┐ │ │ ┌───────────────────┐ │
/// │ ││ BufferBatch ├┘ │ │ │ │ ┌───────────────┐│ │ │ │ ┌───────────────┐│ │
/// │ │└───────────────┘ │ │ │ │ ┌┴──────────────┐││ │ │ │ ┌┴──────────────┐││ │
/// │ └───────────────────┘ │ │ │┌┴──────────────┐├┘│──┼──────┼─▶│┌┴──────────────┐├┘│ │
/// │ │ │ ││ SnapshotBatch ├┘ │ │ │ ││ SnapshotBatch ├┘ │ │
/// │ ... │ │ │└───────────────┘ │ │ │ │└───────────────┘ │ │
/// │ │ │ └───────────────────┘ │ │ └───────────────────┘ │
/// └────────────────────────┘ └────────────────────────┘ └─────────────────────────┘
#[derive(Debug, Default)]
pub(crate) struct DataBuffer {
/// Buffer of incoming writes
pub(crate) buffer: Option<BufferBatch>,
pub(crate) use state_machine::*;
/// Data in `buffer` will be moved to a `snapshot` when one of these happens:
/// . A background persist is called
/// . A read request from Querier
/// The `buffer` will be empty when this happens.
pub(crate) snapshots: Vec<Arc<SnapshotBatch>>,
/// When a persist is called, data in `buffer` will be moved to a `snapshot`
/// and then all `snapshots` will be moved to a `persisting`.
/// Both `buffer` and 'snaphots` will be empty when this happens.
pub(crate) persisting: Option<Arc<PersistingBatch>>,
// Extra Notes:
// . In MVP, we will only persist a set of snapshots at a time.
// In later version, multiple persisting operations may be happening concurrently but
// their persisted info must be added into the Catalog in their data
// ingesting order.
// . When a read request comes from a Querier, all data from `snapshots`
// and `persisting` must be sent to the Querier.
// . After the `persisting` data is persisted and successfully added
// into the Catalog, it will be removed from this Data Buffer.
// This data might be added into an extra cache to serve up to
// Queriers that may not have loaded the parquet files from object
// storage yet. But this will be decided after MVP.
use self::{always_some::AlwaysSome, traits::Queryable};
/// The current state of the [`BufferState`] state machine.
///
/// NOTE that this does NOT contain the [`Persisting`] state, as this is a
/// immutable, terminal state that does not accept further writes and is
/// directly queryable.
#[derive(Debug)]
#[must_use = "FSM should not be dropped unused"]
enum FsmState {
/// The data buffer contains no data snapshots, and is accepting writes.
Buffering(BufferState<Buffering>),
}
impl Default for FsmState {
fn default() -> Self {
Self::Buffering(BufferState::new())
}
}
impl FsmState {
/// Return the current range of writes in the [`BufferState`] state machine,
/// if any.
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
match self {
Self::Buffering(v) => v.sequence_number_range(),
}
}
}
/// A helper wrapper over the [`BufferState`] FSM to abstract the caller from
/// state transitions during reads and writes from the underlying buffer.
#[derive(Debug, Default)]
#[must_use = "DataBuffer should not be dropped unused"]
pub(crate) struct DataBuffer(AlwaysSome<FsmState>);
impl DataBuffer {
/// If a [`BufferBatch`] exists, convert it to a [`SnapshotBatch`] and add
/// it to the list of snapshots.
/// Return the range of [`SequenceNumber`] currently queryable by calling
/// [`Self::get_query_data()`].
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
self.0.sequence_number_range()
}
/// Buffer the given [`MutableBatch`] in memory, ordered by the specified
/// [`SequenceNumber`].
///
/// Does nothing if there is no [`BufferBatch`].
pub(crate) fn generate_snapshot(&mut self) -> Result<(), mutable_batch::Error> {
let snapshot = self.copy_buffer_to_snapshot()?;
if let Some(snapshot) = snapshot {
self.snapshots.push(snapshot);
self.buffer = None;
}
Ok(())
}
/// Returns snapshot of the buffer but keeps data in the buffer
fn copy_buffer_to_snapshot(&self) -> Result<Option<Arc<SnapshotBatch>>, mutable_batch::Error> {
if let Some(buf) = &self.buffer {
return Ok(Some(Arc::new(SnapshotBatch {
min_sequence_number: buf.min_sequence_number,
max_sequence_number: buf.max_sequence_number,
data: Arc::new(buf.data.to_arrow(Selection::All)?),
})));
}
Ok(None)
}
/// Snapshots the buffer and make a QueryableBatch for all the snapshots
/// Both buffer and snapshots will be empty after this
pub(super) fn snapshot_to_queryable_batch(
/// # Panics
///
/// This method panics if `sequence_number` is not strictly greater than
/// previous calls.
pub(crate) fn buffer_write(
&mut self,
table_name: &TableName,
partition_id: PartitionId,
) -> Option<QueryableBatch> {
self.generate_snapshot()
.expect("This mutable batch snapshot error should be impossible.");
let mut data = vec![];
std::mem::swap(&mut data, &mut self.snapshots);
// only produce batch if there is any data
if data.is_empty() {
None
} else {
Some(QueryableBatch::new(table_name.clone(), partition_id, data))
}
mb: MutableBatch,
sequence_number: SequenceNumber,
) -> Result<(), mutable_batch::Error> {
// Take ownership of the FSM and apply the write.
self.0.mutate(|fsm| match fsm {
// Mutable stats simply have the write applied.
FsmState::Buffering(mut b) => {
let ret = b.write(mb, sequence_number);
(FsmState::Buffering(b), ret)
}
})
}
/// Returns all existing snapshots plus data in the buffer
/// This only read data. Data in the buffer will be kept in the buffer
pub(super) fn buffer_and_snapshots(
&self,
) -> Result<Vec<Arc<SnapshotBatch>>, crate::data::Error> {
// Existing snapshots
let mut snapshots = self.snapshots.clone();
// copy the buffer to a snapshot
let buffer_snapshot = self
.copy_buffer_to_snapshot()
.context(crate::data::BufferToSnapshotSnafu)?;
snapshots.extend(buffer_snapshot);
Ok(snapshots)
/// Return all data for this buffer, ordered by the [`SequenceNumber`] from
/// which it was buffered with.
pub(crate) fn get_query_data(&mut self) -> Vec<Arc<RecordBatch>> {
// Take ownership of the FSM and return the data within it.
self.0.mutate(|fsm| match fsm {
// The buffering state can return data.
FsmState::Buffering(b) => {
let ret = b.get_query_data();
(FsmState::Buffering(b), ret)
}
})
}
/// Snapshots the buffer and moves snapshots over to the `PersistingBatch`.
///
/// # Panic
///
/// Panics if there is already a persisting batch.
pub(super) fn snapshot_to_persisting(
&mut self,
shard_id: ShardId,
table_id: TableId,
partition_id: PartitionId,
table_name: &TableName,
) -> Option<Arc<PersistingBatch>> {
if self.persisting.is_some() {
panic!("Unable to snapshot while persisting. This is an unexpected state.")
}
if let Some(queryable_batch) = self.snapshot_to_queryable_batch(table_name, partition_id) {
let persisting_batch = Arc::new(PersistingBatch {
shard_id,
table_id,
partition_id,
object_store_id: Uuid::new_v4(),
data: Arc::new(queryable_batch),
});
self.persisting = Some(Arc::clone(&persisting_batch));
Some(persisting_batch)
} else {
None
}
}
/// Return a QueryableBatch of the persisting batch after applying new tombstones
pub(super) fn get_persisting_data(&self) -> Option<QueryableBatch> {
let persisting = match &self.persisting {
Some(p) => p,
None => return None,
// Deconstruct the [`DataBuffer`] into the underlying FSM in a
// [`Persisting`] state, if the buffer contains any data.
pub(crate) fn into_persisting(self) -> Option<BufferState<Persisting>> {
let p = match self.0.into_inner() {
FsmState::Buffering(b) => {
// Attempt to snapshot the buffer to an immutable state.
match b.snapshot() {
Transition::Ok(b) => b.into_persisting(),
Transition::Unchanged(_) => {
// The buffer contains no data.
return None;
}
}
}
};
// persisting data
Some((*persisting.data).clone())
}
/// Return the progress in this DataBuffer
pub(super) fn progress(&self) -> ShardProgress {
let progress = ShardProgress::new();
let progress = if let Some(buffer) = &self.buffer {
progress.combine(buffer.progress())
} else {
progress
};
let progress = self.snapshots.iter().fold(progress, |progress, snapshot| {
progress.combine(snapshot.progress())
});
if let Some(persisting) = &self.persisting {
persisting
.data
.data
.iter()
.fold(progress, |progress, snapshot| {
progress.combine(snapshot.progress())
})
} else {
progress
}
}
#[cfg(test)]
pub(super) fn get_snapshots(&self) -> &[Arc<SnapshotBatch>] {
self.snapshots.as_ref()
}
pub(crate) fn mark_persisted(&mut self) {
self.persisting = None;
}
}
/// BufferBatch is a MutableBatch with its ingesting order, sequence_number, that helps the
/// ingester keep the batches of data in their ingesting order
#[derive(Debug)]
pub(crate) struct BufferBatch {
/// Sequence number of the first write in this batch
pub(crate) min_sequence_number: SequenceNumber,
/// Sequence number of the last write in this batch
pub(super) max_sequence_number: SequenceNumber,
/// Ingesting data
pub(super) data: MutableBatch,
}
impl BufferBatch {
/// Return the progress in this DataBuffer
fn progress(&self) -> ShardProgress {
ShardProgress::new()
.with_buffered(self.min_sequence_number)
.with_buffered(self.max_sequence_number)
}
}
#[cfg(test)]
mod tests {
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use super::*;
#[test]
fn snapshot_empty_buffer_adds_no_snapshots() {
let mut data_buffer = DataBuffer::default();
data_buffer.generate_snapshot().unwrap();
assert!(data_buffer.snapshots.is_empty());
}
#[test]
fn snapshot_buffer_batch_moves_to_snapshots() {
let mut data_buffer = DataBuffer::default();
let seq_num1 = SequenceNumber::new(1);
let (_, mutable_batch1) =
lp_to_mutable_batch(r#"foo,t1=asdf iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#);
let buffer_batch1 = BufferBatch {
min_sequence_number: seq_num1,
max_sequence_number: seq_num1,
data: mutable_batch1,
};
let record_batch1 = buffer_batch1.data.to_arrow(Selection::All).unwrap();
data_buffer.buffer = Some(buffer_batch1);
data_buffer.generate_snapshot().unwrap();
assert!(data_buffer.buffer.is_none());
assert_eq!(data_buffer.snapshots.len(), 1);
let snapshot = &data_buffer.snapshots[0];
assert_eq!(snapshot.min_sequence_number, seq_num1);
assert_eq!(snapshot.max_sequence_number, seq_num1);
assert_eq!(&*snapshot.data, &record_batch1);
Some(p)
}
}

View File

@ -1,31 +1,8 @@
//! A helper type that ensures an `Option` is always `Some` once the guard is
//! dropped.
/// A guard through which a value can be placed back into the [`AlwaysSome`].
#[derive(Debug)]
#[must_use = "Guard must be used to restore the value"]
pub(super) struct Guard<'a, T>(&'a mut Option<T>);
impl<'a, T> Guard<'a, T> {
/// Store `value` in the [`AlwaysSome`] for subsequent
/// [`AlwaysSome::take()`] calls.
pub(super) fn store(self, value: T) {
assert!(self.0.is_none());
*self.0 = Some(value);
}
}
/// A helper type that aims to ease working with an [`Option`] that must always
/// be restored in a given scope.
///
/// Accessing the value within an [`AlwaysSome`] returns a [`Guard`], which MUST
/// be used to store the value before going out of scope. Failure to store a
/// value cause a subsequent [`Self::take()`] call to panic.
///
/// Failing to store a value in the [`Guard`] causes a compiler warning, however
/// this does not prevent failing to return a value to the [`AlwaysSome`] as the
/// warning can be falsely silenced by using it within one conditional code path
/// and not the other.
/// A helper type that aims to ease calling methods on a type that takes `self`,
/// that must always be restored at the end of the method call.
#[derive(Debug)]
pub(super) struct AlwaysSome<T>(Option<T>);
@ -52,14 +29,14 @@ impl<T> AlwaysSome<T> {
Self(Some(value))
}
/// Read the value.
pub(super) fn take(&mut self) -> (Guard<'_, T>, T) {
pub(super) fn mutate<F, R>(&mut self, f: F) -> R
where
F: FnOnce(T) -> (T, R),
{
let value = std::mem::take(&mut self.0);
(
Guard(&mut self.0),
value.expect("AlwaysSome value is None!"),
)
let (value, ret) = f(value.expect("AlwaysSome value is None!"));
self.0 = Some(value);
ret
}
/// Deconstruct `self`, returning the inner value.
@ -76,24 +53,18 @@ mod tests {
fn test_always_some() {
let mut a = AlwaysSome::<usize>::default();
let (guard, value) = a.take();
assert_eq!(value, 0);
guard.store(42);
let ret = a.mutate(|value| {
assert_eq!(value, 0);
(42, true)
});
assert!(ret);
let (guard, value) = a.take();
assert_eq!(value, 42);
guard.store(24);
let ret = a.mutate(|value| {
assert_eq!(value, 42);
(13, "bananas")
});
assert_eq!(ret, "bananas");
assert_eq!(a.into_inner(), 24);
}
#[test]
#[should_panic = "AlwaysSome value is None!"]
fn test_drops_guard() {
let mut a = AlwaysSome::<usize>::default();
{
let _ = a.take();
}
let _ = a.take();
assert_eq!(a.into_inner(), 13);
}
}

View File

@ -31,7 +31,7 @@ pub(crate) enum Transition<A, B> {
impl<A, B> Transition<A, B> {
/// A helper function to construct [`Self::Ok`] variants.
pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Transition<A, B> {
pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Self {
Self::Ok(BufferState {
state: v,
sequence_range,
@ -39,7 +39,7 @@ impl<A, B> Transition<A, B> {
}
/// A helper function to construct [`Self::Unchanged`] variants.
pub(super) fn unchanged(v: BufferState<B>) -> Transition<A, B> {
pub(super) fn unchanged(v: BufferState<B>) -> Self {
Self::Unchanged(v)
}
}
@ -164,7 +164,7 @@ mod tests {
// Keep the data to validate they are ref-counted copies after further
// writes below. Note this construct allows the caller to decide when/if
// to allocate.
let w1_data = buffer.get_query_data().to_owned();
let w1_data = buffer.get_query_data();
let expected = vec![
"+-------+----------+----------+--------------------------------+",
@ -193,7 +193,7 @@ mod tests {
};
// Verify the writes are still queryable.
let w2_data = buffer.get_query_data().to_owned();
let w2_data = buffer.get_query_data();
let expected = vec![
"+-------+----------+----------+--------------------------------+",
"| great | how_much | tag | time |",
@ -214,7 +214,7 @@ mod tests {
let same_arcs = w2_data
.iter()
.zip(second_read.iter())
.all(|(a, b)| Arc::ptr_eq(a, &b));
.all(|(a, b)| Arc::ptr_eq(a, b));
assert!(same_arcs);
}

View File

@ -28,7 +28,7 @@ pub(crate) struct Buffering {
///
/// In the future this [`Queryable`] should NOT be implemented for
/// [`Buffering`], and instead snapshots should be incrementally generated and
/// compacted. See https://github.com/influxdata/influxdb_iox/issues/5805 for
/// compacted. See <https://github.com/influxdata/influxdb_iox/issues/5805> for
/// context.
impl Queryable for Buffering {
fn get_query_data(&self) -> Vec<Arc<RecordBatch>> {

View File

@ -157,7 +157,7 @@ mod tests {
.repositories()
.await
.partitions()
.get_by_id(got.id)
.get_by_id(got.partition_id)
.await
.unwrap()
.expect("partition not created");

View File

@ -39,7 +39,7 @@ impl SequenceNumberRange {
let merged_range = self
.range
.into_iter()
.chain(other.range.clone())
.chain(other.range)
.reduce(|a, b| (a.0.min(b.0), a.1.max(b.1)));
Self {

View File

@ -7,8 +7,11 @@ use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use write_summary::ShardProgress;
use super::partition::{resolver::PartitionProvider, PartitionData, UnpersistedPartitionData};
use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle, querier_handler::PartitionStatus};
use super::{
partition::{resolver::PartitionProvider, PartitionData},
DmlApplyAction,
};
use crate::lifecycle::LifecycleHandle;
/// A double-referenced map where [`PartitionData`] can be looked up by
/// [`PartitionKey`], or ID.
@ -72,6 +75,12 @@ impl std::ops::Deref for TableName {
}
}
impl PartialEq<str> for TableName {
fn eq(&self, other: &str) -> bool {
&*self.0 == other
}
}
/// Data of a Table in a given Namesapce that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct TableData {
@ -119,16 +128,6 @@ impl TableData {
}
}
/// Return parquet_max_sequence_number
pub(super) fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.partition_data
.by_key
.values()
.map(|p| p.max_persisted_sequence_number())
.max()
.flatten()
}
// buffers the table write and returns true if the lifecycle manager indicates that
// ingest should be paused.
pub(super) async fn buffer_table_write(
@ -171,7 +170,7 @@ impl TableData {
let size = batch.size();
let rows = batch.rows();
partition_data.buffer_write(sequence_number, batch)?;
partition_data.buffer_write(batch, sequence_number)?;
// Record the write as having been buffered.
//
@ -191,6 +190,18 @@ impl TableData {
Ok(DmlApplyAction::Applied(should_pause))
}
/// Return a mutable reference to all partitions buffered for this table.
///
/// # Ordering
///
/// The order of [`PartitionData`] in the iterator is arbitrary and should
/// not be relied upon.
pub(crate) fn partition_iter_mut(
&mut self,
) -> impl Iterator<Item = &mut PartitionData> + ExactSizeIterator {
self.partition_data.by_key.values_mut()
}
/// Return the [`PartitionData`] for the specified ID.
#[allow(unused)]
pub(crate) fn get_partition(
@ -209,43 +220,12 @@ impl TableData {
self.partition_data.by_key(partition_key)
}
/// Return the [`PartitionData`] for the specified partition key.
pub(crate) fn get_partition_by_key_mut(
&mut self,
partition_key: &PartitionKey,
) -> Option<&mut PartitionData> {
self.partition_data.by_key_mut(partition_key)
}
pub(crate) fn unpersisted_partition_data(&self) -> Vec<UnpersistedPartitionData> {
self.partition_data
.by_key
.values()
.map(|p| UnpersistedPartitionData {
partition_id: p.partition_id(),
non_persisted: p
.get_non_persisting_data()
.expect("get_non_persisting should always work"),
persisting: p.get_persisting_data(),
partition_status: PartitionStatus {
parquet_max_sequence_number: p.max_persisted_sequence_number(),
},
})
.collect()
}
/// Return progress from this Table
pub(super) fn progress(&self) -> ShardProgress {
let progress = ShardProgress::new();
let progress = match self.parquet_max_sequence_number() {
Some(n) => progress.with_persisted(n),
None => progress,
};
self.partition_data
.by_key
.values()
.fold(progress, |progress, partition_data| {
.fold(Default::default(), |progress, partition_data| {
progress.combine(partition_data.progress())
})
}
@ -259,6 +239,16 @@ impl TableData {
pub(crate) fn table_name(&self) -> &TableName {
&self.table_name
}
/// Return the shard ID for this table.
pub(crate) fn shard_id(&self) -> ShardId {
self.shard_id
}
/// Return the [`NamespaceId`] this table is a part of.
pub fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
}
#[cfg(test)]

View File

@ -24,7 +24,7 @@ mod job;
pub mod lifecycle;
mod poison;
pub mod querier_handler;
pub(crate) mod query;
pub(crate) mod query_adaptor;
pub mod server;
pub(crate) mod stream_handler;

View File

@ -1,12 +1,7 @@
//! Handle all requests from Querier
use crate::{
data::{
namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName,
IngesterData,
},
query::QueryableBatch,
};
use std::{pin::Pin, sync::Arc};
use arrow::{array::new_null_array, error::ArrowError, record_batch::RecordBatch};
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
use data_types::{PartitionId, SequenceNumber};
@ -17,9 +12,10 @@ use generated_types::ingester::IngesterQueryRequest;
use observability_deps::tracing::debug;
use schema::{merge::SchemaMerger, selection::Selection};
use snafu::{ensure, Snafu};
use std::{pin::Pin, sync::Arc};
use trace::span::{Span, SpanRecorder};
use crate::data::{namespace::NamespaceName, table::TableName, IngesterData};
/// Number of table data read locks that shall be acquired in parallel
const CONCURRENT_TABLE_DATA_LOCKS: usize = 10;
@ -264,10 +260,11 @@ pub async fn prepare_data_to_querier(
) -> Result<IngesterQueryResponse> {
debug!(?request, "prepare_data_to_querier");
let span_recorder = SpanRecorder::new(span);
let mut span_recorder = SpanRecorder::new(span);
let mut tables_data = vec![];
let mut table_refs = vec![];
let mut found_namespace = false;
for (shard_id, shard_data) in ingest_data.shards() {
debug!(shard_id=%shard_id.get());
let namespace_name = NamespaceName::from(&request.namespace);
@ -293,7 +290,7 @@ pub async fn prepare_data_to_querier(
}
};
tables_data.push(table_data);
table_refs.push(table_data);
}
ensure!(
@ -303,113 +300,83 @@ pub async fn prepare_data_to_querier(
},
);
// acquire locks in parallel
let unpersisted_partitions: Vec<_> = futures::stream::iter(tables_data)
.map(|table_data| async move {
let table_data = table_data.read().await;
table_data.unpersisted_partition_data()
})
// Note: the order doesn't matter
.buffer_unordered(CONCURRENT_TABLE_DATA_LOCKS)
.concat()
.await;
ensure!(
!unpersisted_partitions.is_empty(),
!table_refs.is_empty(),
TableNotFoundSnafu {
namespace_name: &request.namespace,
table_name: &request.table
},
);
let request = Arc::clone(request);
let partitions =
futures::stream::iter(unpersisted_partitions.into_iter().map(move |partition| {
// extract payload
let partition_id = partition.partition_id;
let status = partition.partition_status.clone();
let snapshots: Vec<_> = prepare_data_to_querier_for_partition(
partition,
&request,
span_recorder.child_span("ingester prepare data to querier for partition"),
)
.into_iter()
.map(Ok)
.collect();
// Note: include partition in `unpersisted_partitions` even when there we might filter
// out all the data, because the metadata (e.g. max persisted parquet file) is
// important for the querier.
Ok(IngesterQueryPartition::new(
Box::pin(futures::stream::iter(snapshots)),
partition_id,
status,
))
}));
Ok(IngesterQueryResponse::new(Box::pin(partitions)))
}
fn prepare_data_to_querier_for_partition(
unpersisted_partition_data: UnpersistedPartitionData,
request: &IngesterQueryRequest,
span: Option<Span>,
) -> Vec<SendableRecordBatchStream> {
let mut span_recorder = SpanRecorder::new(span);
// ------------------------------------------------
// Accumulate data
// Make Filters
let selection_columns: Vec<_> = request.columns.iter().map(String::as_str).collect();
let selection = if selection_columns.is_empty() {
Selection::All
} else {
Selection::Some(&selection_columns)
};
// figure out what batches
let queryable_batch = unpersisted_partition_data
.persisting
.unwrap_or_else(|| {
QueryableBatch::new(
request.table.clone().into(),
unpersisted_partition_data.partition_id,
vec![],
)
// acquire locks and read table data in parallel
let unpersisted_partitions: Vec<_> = futures::stream::iter(table_refs)
.map(|table_data| async move {
let mut table_data = table_data.write().await;
table_data
.partition_iter_mut()
.map(|p| {
(
p.partition_id(),
p.get_query_data(),
p.max_persisted_sequence_number(),
)
})
.collect::<Vec<_>>()
})
.with_data(unpersisted_partition_data.non_persisted);
// Note: the order doesn't matter
.buffer_unordered(CONCURRENT_TABLE_DATA_LOCKS)
.concat()
.await;
let streams = queryable_batch
.data
.iter()
.map(|snapshot_batch| {
let batch = snapshot_batch.data.as_ref();
let schema = batch.schema();
let request = Arc::clone(request);
let partitions = futures::stream::iter(unpersisted_partitions.into_iter().map(
move |(partition_id, data, max_persisted_sequence_number)| {
let snapshots = match data {
None => Box::pin(futures::stream::empty()) as SnapshotStream,
// Apply selection to in-memory batch
let batch = match selection {
Selection::All => batch.clone(),
Selection::Some(columns) => {
let projection = columns
Some(batch) => {
assert_eq!(partition_id, batch.partition_id());
// Project the data if necessary
let columns = request
.columns
.iter()
.flat_map(|&column_name| {
// ignore non-existing columns
schema.index_of(column_name).ok()
})
.map(String::as_str)
.collect::<Vec<_>>();
batch.project(&projection).expect("bug in projection")
let selection = if columns.is_empty() {
Selection::All
} else {
Selection::Some(columns.as_ref())
};
let snapshots = batch.project_selection(selection).into_iter().map(|batch| {
// Create a stream from the batch.
Ok(Box::pin(MemoryStream::new(vec![batch])) as SendableRecordBatchStream)
});
Box::pin(futures::stream::iter(snapshots)) as SnapshotStream
}
};
// create stream
Box::pin(MemoryStream::new(vec![batch])) as SendableRecordBatchStream
})
.collect();
// NOTE: the partition persist watermark MUST always be provided to
// the querier for any partition that has performed (or is aware of)
// a persist operation.
//
// This allows the querier to use the per-partition persist marker
// when planning queries.
Ok(IngesterQueryPartition::new(
snapshots,
partition_id,
PartitionStatus {
parquet_max_sequence_number: max_persisted_sequence_number,
},
))
},
));
span_recorder.ok("done");
streams
Ok(IngesterQueryResponse::new(Box::pin(partitions)))
}
#[cfg(test)]
@ -427,7 +394,7 @@ mod tests {
use predicate::Predicate;
use super::*;
use crate::test_util::{make_ingester_data, DataLocation, TEST_NAMESPACE, TEST_TABLE};
use crate::test_util::{make_ingester_data, TEST_NAMESPACE, TEST_TABLE};
#[tokio::test]
async fn test_ingester_query_response_flatten() {
@ -517,23 +484,11 @@ mod tests {
async fn test_prepare_data_to_querier() {
test_helpers::maybe_start_logging();
let span = None;
// make 14 scenarios for ingester data
let mut scenarios = vec![];
for two_partitions in [false, true] {
for loc in [
DataLocation::BUFFER,
DataLocation::BUFFER_SNAPSHOT,
DataLocation::BUFFER_PERSISTING,
DataLocation::BUFFER_SNAPSHOT_PERSISTING,
DataLocation::SNAPSHOT,
DataLocation::SNAPSHOT_PERSISTING,
DataLocation::PERSISTING,
] {
let scenario = Arc::new(make_ingester_data(two_partitions, loc).await);
scenarios.push((loc, scenario));
}
let scenario = Arc::new(make_ingester_data(two_partitions).await);
scenarios.push(scenario);
}
// read data from all scenarios without any filters
@ -557,9 +512,8 @@ mod tests {
"| Wilmington | mon | | 1970-01-01T00:00:00.000000035Z |", // in group 3 - seq_num: 6
"+------------+-----+------+--------------------------------+",
];
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let result = prepare_data_to_querier(scenario, &request, span.clone())
for scenario in &scenarios {
let result = prepare_data_to_querier(scenario, &request, None)
.await
.unwrap()
.into_record_batches()
@ -593,9 +547,8 @@ mod tests {
"| Wilmington | | 1970-01-01T00:00:00.000000035Z |",
"+------------+------+--------------------------------+",
];
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let result = prepare_data_to_querier(scenario, &request, span.clone())
for scenario in &scenarios {
let result = prepare_data_to_querier(scenario, &request, None)
.await
.unwrap()
.into_record_batches()
@ -638,9 +591,8 @@ mod tests {
"| Wilmington | | 1970-01-01T00:00:00.000000035Z |",
"+------------+------+--------------------------------+",
];
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let result = prepare_data_to_querier(scenario, &request, span.clone())
for scenario in &scenarios {
let result = prepare_data_to_querier(scenario, &request, None)
.await
.unwrap()
.into_record_batches()
@ -655,9 +607,8 @@ mod tests {
vec![],
None,
));
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let err = prepare_data_to_querier(scenario, &request, span.clone())
for scenario in &scenarios {
let err = prepare_data_to_querier(scenario, &request, None)
.await
.unwrap_err();
assert_matches!(err, Error::TableNotFound { .. });
@ -670,9 +621,8 @@ mod tests {
vec![],
None,
));
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let err = prepare_data_to_querier(scenario, &request, span.clone())
for scenario in &scenarios {
let err = prepare_data_to_querier(scenario, &request, None)
.await
.unwrap_err();
assert_matches!(err, Error::NamespaceNotFound { .. });

View File

@ -1,12 +1,12 @@
//! Module to handle query on Ingester's data
//! An adaptor over a set of [`RecordBatch`] allowing them to be used as an IOx
//! [`QueryChunk`].
use std::{any::Any, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::util::ensure_schema;
use data_types::{
ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
TimestampMinMax,
ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax,
};
use datafusion::{
error::DataFusionError,
@ -21,11 +21,12 @@ use iox_query::{
QueryChunk, QueryChunkMeta,
};
use observability_deps::tracing::trace;
use once_cell::sync::OnceCell;
use predicate::Predicate;
use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use crate::data::{partition::SnapshotBatch, table::TableName};
use crate::data::table::TableName;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
@ -47,72 +48,106 @@ pub enum Error {
/// A specialized `Error` for Ingester's Query errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Queryable data used for both query and persistence
/// A queryable wrapper over a set of ordered [`RecordBatch`] snapshot from a
/// single [`PartitionData`].
///
/// It is an invariant that a [`QueryAdaptor`] MUST always contain at least one
/// row. This frees the caller of having to reason about empty [`QueryAdaptor`]
/// instances yielding empty [`RecordBatch`].
///
/// [`PartitionData`]: crate::data::partition::PartitionData
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct QueryableBatch {
/// data
pub(crate) data: Vec<Arc<SnapshotBatch>>,
pub(crate) struct QueryAdaptor {
/// The snapshot data from a partition.
///
/// This MUST be non-pub / closed for modification / immutable to support
/// interning the merged schema in [`Self::schema()`].
data: Vec<Arc<RecordBatch>>,
/// This is needed to return a reference for a trait function
pub(crate) table_name: TableName,
/// The name of the table this data is part of.
table_name: TableName,
/// Partition ID
pub(crate) partition_id: PartitionId,
/// The catalog ID of the partition the this data is part of.
partition_id: PartitionId,
/// An interned schema for all [`RecordBatch`] in data.
schema: OnceCell<Arc<Schema>>,
}
impl QueryableBatch {
/// Initilaize a QueryableBatch
impl QueryAdaptor {
/// Construct a [`QueryAdaptor`].
///
/// # Panics
///
/// This constructor panics if `data` contains no [`RecordBatch`], or all
/// [`RecordBatch`] are empty.
pub(crate) fn new(
table_name: TableName,
partition_id: PartitionId,
data: Vec<Arc<SnapshotBatch>>,
data: Vec<Arc<RecordBatch>>,
) -> Self {
// There must always be at least one record batch and one row.
//
// This upholds an invariant that simplifies dealing with empty
// partitions - if there is a QueryAdaptor, it contains data.
assert!(data.iter().map(|b| b.num_rows()).sum::<usize>() > 0);
Self {
data,
table_name,
partition_id,
schema: OnceCell::default(),
}
}
/// Add snapshots to this batch
pub(crate) fn with_data(mut self, mut data: Vec<Arc<SnapshotBatch>>) -> Self {
self.data.append(&mut data);
self
pub(crate) fn project_selection(&self, selection: Selection<'_>) -> Vec<RecordBatch> {
// Project the column selection across all RecordBatch
self.data
.iter()
.map(|data| {
let batch = data.as_ref();
let schema = batch.schema();
// Apply selection to in-memory batch
match selection {
Selection::All => batch.clone(),
Selection::Some(columns) => {
let projection = columns
.iter()
.flat_map(|&column_name| {
// ignore non-existing columns
schema.index_of(column_name).ok()
})
.collect::<Vec<_>>();
batch.project(&projection).expect("bug in projection")
}
}
})
.collect()
}
/// return min and max of all the snapshots
pub(crate) fn min_max_sequence_numbers(&self) -> (SequenceNumber, SequenceNumber) {
let min = self
.data
.first()
.expect("The Queryable Batch should not empty")
.min_sequence_number;
/// Returns the [`RecordBatch`] instances in this [`QueryAdaptor`].
pub(crate) fn record_batches(&self) -> &[Arc<RecordBatch>] {
self.data.as_ref()
}
let max = self
.data
.first()
.expect("The Queryable Batch should not empty")
.max_sequence_number;
assert!(min <= max);
(min, max)
/// Returns the partition ID from which the data this [`QueryAdaptor`] was
/// sourced from.
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
}
}
impl QueryChunkMeta for QueryableBatch {
impl QueryChunkMeta for QueryAdaptor {
fn summary(&self) -> Option<Arc<TableSummary>> {
None
}
fn schema(&self) -> Arc<Schema> {
// TODO: may want store this schema as a field of QueryableBatch and
// only do this schema merge the first time it is called
// Merge schema of all RecordBatches of the PerstingBatch
let batches: Vec<Arc<RecordBatch>> =
self.data.iter().map(|s| Arc::clone(&s.data)).collect();
merge_record_batch_schemas(&batches)
Arc::clone(
self.schema
.get_or_init(|| merge_record_batch_schemas(&self.data)),
)
}
fn partition_sort_key(&self) -> Option<&SortKey> {
@ -139,11 +174,11 @@ impl QueryChunkMeta for QueryableBatch {
}
}
impl QueryChunk for QueryableBatch {
impl QueryChunk for QueryAdaptor {
// This function should not be used in QueryBatch context
fn id(&self) -> ChunkId {
// To return a value for debugging and make it consistent with ChunkId created in Compactor,
// use Uuid for this
// To return a value for debugging and make it consistent with ChunkId
// created in Compactor, use Uuid for this
ChunkId::new()
}
@ -152,10 +187,11 @@ impl QueryChunk for QueryableBatch {
&self.table_name
}
/// Returns true if the chunk may contain a duplicate "primary
/// key" within itself
/// Returns true if the chunk may contain a duplicate "primary key" within
/// itself
fn may_contain_pk_duplicates(&self) -> bool {
// always true because they are not deduplicated yet
// always true because the rows across record batches have not been
// de-duplicated.
true
}
@ -204,22 +240,15 @@ impl QueryChunk for QueryableBatch {
.context(SchemaSnafu)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
// Get all record batches from their snapshots
// Apply the projection over all the data in self, ensuring each batch
// has the specified schema.
let batches = self
.data
.iter()
.filter_map(|snapshot| {
let batch = snapshot
// Only return columns in the selection
.scan(selection)
.context(FilterColumnsSnafu {})
.transpose()?
// ensure batch has desired schema
.and_then(|batch| {
ensure_schema(&schema.as_arrow(), &batch).context(ConcatBatchesSnafu {})
})
.map(Arc::new);
Some(batch)
.project_selection(selection)
.into_iter()
.map(|batch| {
ensure_schema(&schema.as_arrow(), &batch)
.context(ConcatBatchesSnafu {})
.map(Arc::new)
})
.collect::<Result<Vec<_>, _>>()
.map_err(|e| DataFusionError::External(Box::new(e)))?;
@ -233,10 +262,9 @@ impl QueryChunk for QueryableBatch {
/// Returns chunk type
fn chunk_type(&self) -> &str {
"PersistingBatch"
"QueryAdaptor"
}
// This function should not be used in PersistingBatch context
fn order(&self) -> ChunkOrder {
unimplemented!()
}

View File

@ -7,9 +7,8 @@ use std::{sync::Arc, time::Duration};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bitflags::bitflags;
use data_types::{
NamespaceId, PartitionId, PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId,
NamespaceId, PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId,
};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
@ -17,71 +16,12 @@ use iox_query::test::{raw_data, TestChunk};
use iox_time::{SystemProvider, Time};
use mutable_batch_lp::lines_to_batches;
use object_store::memory::InMemory;
use uuid::Uuid;
use crate::{
data::{
partition::{resolver::CatalogPartitionResolver, PersistingBatch, SnapshotBatch},
IngesterData,
},
data::{partition::resolver::CatalogPartitionResolver, IngesterData},
lifecycle::{LifecycleConfig, LifecycleManager},
query::QueryableBatch,
};
#[allow(clippy::too_many_arguments)]
pub(crate) fn make_persisting_batch(
shard_id: i64,
seq_num_start: i64,
table_id: i64,
table_name: &str,
partition_id: i64,
object_store_id: Uuid,
batches: Vec<Arc<RecordBatch>>,
) -> Arc<PersistingBatch> {
let queryable_batch = make_queryable_batch(table_name, partition_id, seq_num_start, batches);
Arc::new(PersistingBatch {
shard_id: ShardId::new(shard_id),
table_id: TableId::new(table_id),
partition_id: PartitionId::new(partition_id),
object_store_id,
data: queryable_batch,
})
}
pub(crate) fn make_queryable_batch(
table_name: &str,
partition_id: i64,
seq_num_start: i64,
batches: Vec<Arc<RecordBatch>>,
) -> Arc<QueryableBatch> {
// make snapshots for the batches
let mut snapshots = vec![];
let mut seq_num = seq_num_start;
for batch in batches {
let seq = SequenceNumber::new(seq_num);
snapshots.push(Arc::new(make_snapshot_batch(batch, seq, seq)));
seq_num += 1;
}
Arc::new(QueryableBatch::new(
table_name.into(),
PartitionId::new(partition_id),
snapshots,
))
}
pub(crate) fn make_snapshot_batch(
batch: Arc<RecordBatch>,
min: SequenceNumber,
max: SequenceNumber,
) -> SnapshotBatch {
SnapshotBatch {
min_sequence_number: min,
max_sequence_number: max,
data: batch,
}
}
pub(crate) async fn create_one_row_record_batch_with_influxtype() -> Vec<Arc<RecordBatch>> {
let chunk1 = Arc::new(
TestChunk::new("t")
@ -506,32 +446,9 @@ pub(crate) const TEST_TABLE: &str = "test_table";
pub(crate) const TEST_PARTITION_1: &str = "test+partition_1";
pub(crate) const TEST_PARTITION_2: &str = "test+partition_2";
bitflags! {
/// Make the same in-memory data but data are split between:
/// . one or two partition
/// . The first partition will have a choice to have data in either
/// . buffer only
/// . snapshot only
/// . persisting only
/// . buffer + snapshot
/// . buffer + persisting
/// . snapshot + persisting
/// . buffer + snapshot + persisting
/// . If the second partittion exists, it only has data in its buffer
pub(crate) struct DataLocation: u8 {
const BUFFER = 0b001;
const SNAPSHOT = 0b010;
const PERSISTING = 0b100;
const BUFFER_SNAPSHOT = Self::BUFFER.bits | Self::SNAPSHOT.bits;
const BUFFER_PERSISTING = Self::BUFFER.bits | Self::PERSISTING.bits;
const SNAPSHOT_PERSISTING = Self::SNAPSHOT.bits | Self::PERSISTING.bits;
const BUFFER_SNAPSHOT_PERSISTING = Self::BUFFER.bits | Self::SNAPSHOT.bits | Self::PERSISTING.bits;
}
}
/// This function produces one scenario but with the parameter combination (2*7),
/// you will be able to produce 14 scenarios by calling it in 2 loops
pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterData {
pub(crate) async fn make_ingester_data(two_partitions: bool) -> IngesterData {
// Whatever data because they won't be used in the tests
let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
@ -576,26 +493,6 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation)
.unwrap();
}
if loc.contains(DataLocation::PERSISTING) {
// Move partition 1 data to persisting
let _ignored = ingester
.shard(shard_id)
.unwrap()
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot_to_persisting(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1))
.await;
} else if loc.contains(DataLocation::SNAPSHOT) {
// move partition 1 data to snapshot
let _ignored = ingester
.shard(shard_id)
.unwrap()
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1))
.await;
}
ingester
}

View File

@ -40,8 +40,7 @@ mod deduplicate;
pub mod overlap;
mod physical;
use self::overlap::group_potential_duplicates;
pub(crate) use deduplicate::DeduplicateExec;
pub use deduplicate::RecordBatchDeduplicator;
pub use deduplicate::{DeduplicateExec, RecordBatchDeduplicator};
pub(crate) use physical::IOxReadFilterNode;
#[derive(Debug, Snafu)]