Merge pull request #6322 from influxdata/dom/buffer-tree-iter
refactor(ingester2): BufferTree partition iteratorpull/24376/head
commit
e234187a94
|
@ -116,6 +116,15 @@ impl NamespaceData {
|
|||
pub(crate) fn namespace_name(&self) -> &DeferredLoad<NamespaceName> {
|
||||
&self.namespace_name
|
||||
}
|
||||
|
||||
/// Obtain a snapshot of the tables within this [`NamespaceData`].
|
||||
///
|
||||
/// NOTE: the snapshot is an atomic / point-in-time snapshot of the set of
|
||||
/// [`NamespaceData`], but the tables (and partitions) within them may
|
||||
/// change as they continue to buffer DML operations.
|
||||
pub(super) fn tables(&self) -> Vec<Arc<TableData>> {
|
||||
self.tables.values()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
|
@ -4,11 +4,12 @@ use async_trait::async_trait;
|
|||
use data_types::{NamespaceId, TableId};
|
||||
use dml::DmlOperation;
|
||||
use metric::U64Counter;
|
||||
use parking_lot::Mutex;
|
||||
use trace::span::Span;
|
||||
|
||||
use super::{
|
||||
namespace::{name_resolver::NamespaceNameProvider, NamespaceData},
|
||||
partition::resolver::PartitionProvider,
|
||||
partition::{resolver::PartitionProvider, PartitionData},
|
||||
table::name_resolver::TableNameProvider,
|
||||
};
|
||||
use crate::{
|
||||
|
@ -124,6 +125,26 @@ impl BufferTree {
|
|||
pub(crate) fn namespace(&self, namespace_id: NamespaceId) -> Option<Arc<NamespaceData>> {
|
||||
self.namespaces.get(&namespace_id)
|
||||
}
|
||||
|
||||
/// Iterate over a snapshot of [`PartitionData`] in the tree.
|
||||
///
|
||||
/// This iterator will iterate over a consistent snapshot of namespaces
|
||||
/// taken at the time this fn was called, recursing into each table &
|
||||
/// partition incrementally. Each time a namespace is read, a snapshot of
|
||||
/// tables is taken, and these are then iterated on. Likewise the first read
|
||||
/// of a table causes a snapshot of partitions to be taken, and it is those
|
||||
/// partitions that are read.
|
||||
///
|
||||
/// Because of this, concurrent writes may add new data to partitions/tables
|
||||
/// and these MAY be readable depending on the progress of the iterator
|
||||
/// through the tree.
|
||||
pub(crate) fn partitions(&self) -> impl Iterator<Item = Arc<Mutex<PartitionData>>> + Send {
|
||||
self.namespaces
|
||||
.values()
|
||||
.into_iter()
|
||||
.flat_map(|v| v.tables())
|
||||
.flat_map(|v| v.partitions())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -178,6 +199,7 @@ impl QueryExec for BufferTree {
|
|||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{PartitionId, PartitionKey};
|
||||
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
|
@ -677,6 +699,118 @@ mod tests {
|
|||
assert_eq!(m, 1, "tables counter mismatch");
|
||||
}
|
||||
|
||||
/// Assert that multiple writes to a single namespace/table results in a
|
||||
/// single namespace being created, and matching metrics.
|
||||
#[tokio::test]
|
||||
async fn test_partition_iter() {
|
||||
const TABLE2_ID: TableId = TableId::new(1234321);
|
||||
|
||||
// Configure the mock partition provider to return a single partition, named
|
||||
// p1.
|
||||
let partition_provider = Arc::new(
|
||||
MockPartitionProvider::default()
|
||||
.with_partition(PartitionData::new(
|
||||
PartitionId::new(0),
|
||||
PartitionKey::from("p1"),
|
||||
NAMESPACE_ID,
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
NamespaceName::from(NAMESPACE_NAME)
|
||||
})),
|
||||
TABLE_ID,
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
))
|
||||
.with_partition(PartitionData::new(
|
||||
PartitionId::new(1),
|
||||
PartitionKey::from("p2"),
|
||||
NAMESPACE_ID,
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
NamespaceName::from(NAMESPACE_NAME)
|
||||
})),
|
||||
TABLE_ID,
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TableName::from(TABLE_NAME)
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
))
|
||||
.with_partition(PartitionData::new(
|
||||
PartitionId::new(2),
|
||||
PartitionKey::from("p3"),
|
||||
NAMESPACE_ID,
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
NamespaceName::from(NAMESPACE_NAME)
|
||||
})),
|
||||
TABLE2_ID,
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TableName::from("another_table")
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
)),
|
||||
);
|
||||
|
||||
// Init the buffer tree
|
||||
let buf = BufferTree::new(
|
||||
Arc::new(MockNamespaceNameProvider::default()),
|
||||
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
|
||||
partition_provider,
|
||||
Arc::clone(&Arc::new(metric::Registry::default())),
|
||||
);
|
||||
|
||||
assert_eq!(buf.partitions().count(), 0);
|
||||
|
||||
// Write data to partition p1, in table "bananas".
|
||||
buf.apply(DmlOperation::Write(make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
NAMESPACE_ID,
|
||||
TABLE_NAME,
|
||||
TABLE_ID,
|
||||
0,
|
||||
r#"bananas,region=Asturias temp=35 4242424242"#,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write initial data");
|
||||
|
||||
assert_eq!(buf.partitions().count(), 1);
|
||||
|
||||
// Write data to partition p2, in table "bananas".
|
||||
buf.apply(DmlOperation::Write(make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
NAMESPACE_ID,
|
||||
TABLE_NAME,
|
||||
TABLE_ID,
|
||||
0,
|
||||
r#"bananas,region=Asturias temp=35 4242424242"#,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write initial data");
|
||||
|
||||
assert_eq!(buf.partitions().count(), 2);
|
||||
|
||||
// Write data to partition p3, in the second table
|
||||
buf.apply(DmlOperation::Write(make_write_op(
|
||||
&PartitionKey::from("p3"),
|
||||
NAMESPACE_ID,
|
||||
"another_table",
|
||||
TABLE2_ID,
|
||||
0,
|
||||
r#"another_table,region=Asturias temp=35 4242424242"#,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write initial data");
|
||||
|
||||
// Iterate over the partitions and ensure they were all visible.
|
||||
let mut ids = buf
|
||||
.partitions()
|
||||
.map(|p| p.lock().partition_id().get())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
ids.sort_unstable();
|
||||
|
||||
assert_matches!(*ids, [0, 1, 2]);
|
||||
}
|
||||
|
||||
/// Assert the correct "not found" errors are generated for missing
|
||||
/// table/namespaces, and that querying an entirely empty buffer tree
|
||||
/// returns no data (as opposed to panicking, etc).
|
||||
|
@ -711,7 +845,7 @@ mod tests {
|
|||
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
|
||||
.await
|
||||
.expect_err("query should fail");
|
||||
assert_matches::assert_matches!(err, QueryError::NamespaceNotFound(ns) => {
|
||||
assert_matches!(err, QueryError::NamespaceNotFound(ns) => {
|
||||
assert_eq!(ns, NAMESPACE_ID);
|
||||
});
|
||||
|
||||
|
@ -732,7 +866,7 @@ mod tests {
|
|||
.query_exec(NAMESPACE_ID, TableId::new(1234), vec![], None)
|
||||
.await
|
||||
.expect_err("query should fail");
|
||||
assert_matches::assert_matches!(err, QueryError::TableNotFound(ns, t) => {
|
||||
assert_matches!(err, QueryError::TableNotFound(ns, t) => {
|
||||
assert_eq!(ns, NAMESPACE_ID);
|
||||
assert_eq!(t, TableId::new(1234));
|
||||
});
|
||||
|
|
|
@ -184,6 +184,13 @@ impl TableData {
|
|||
///
|
||||
/// The order of [`PartitionData`] in the iterator is arbitrary and should
|
||||
/// not be relied upon.
|
||||
///
|
||||
/// # Snapshot
|
||||
///
|
||||
/// The set of [`PartitionData`] returned is an atomic / point-in-time
|
||||
/// snapshot of the set of [`PartitionData`] at the time this function is
|
||||
/// invoked, but the data within them may change as they continue to buffer
|
||||
/// DML operations.
|
||||
pub(crate) fn partitions(&self) -> Vec<Arc<Mutex<PartitionData>>> {
|
||||
self.partition_data.read().by_key.values()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue