Merge pull request #6263 from influxdata/dom/buffer-tree-query

perf(ingester2): streaming buffer tree queries
pull/24376/head
Dom 2022-11-29 15:27:21 +00:00 committed by GitHub
commit 366e60383f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 888 additions and 28 deletions

View File

@ -14,6 +14,7 @@ backoff = { version = "0.1.0", path = "../backoff" }
bytes = "1.3.0"
data_types = { version = "0.1.0", path = "../data_types" }
datafusion.workspace = true
datafusion_util = { path = "../datafusion_util" }
dml = { version = "0.1.0", path = "../dml" }
flatbuffers = "22"
futures = "0.3.25"

View File

@ -9,12 +9,18 @@ use data_types::{NamespaceId, TableId};
use dml::DmlOperation;
use metric::U64Counter;
use observability_deps::tracing::warn;
use trace::span::Span;
use super::{
partition::resolver::PartitionProvider,
table::{name_resolver::TableNameProvider, TableData},
};
use crate::{arcmap::ArcMap, deferred_load::DeferredLoad, dml_sink::DmlSink};
use crate::{
arcmap::ArcMap,
deferred_load::DeferredLoad,
dml_sink::DmlSink,
query::{response::QueryResponse, tracing::QueryExecTracing, QueryError, QueryExec},
};
/// The string name / identifier of a Namespace.
///
@ -106,11 +112,6 @@ impl NamespaceData {
self.namespace_id
}
#[cfg(test)]
pub(super) fn table_count(&self) -> &U64Counter {
&self.table_count
}
/// Returns the [`NamespaceName`] for this namespace.
pub(crate) fn namespace_name(&self) -> &DeferredLoad<NamespaceName> {
&self.namespace_name
@ -168,6 +169,37 @@ impl DmlSink for NamespaceData {
}
}
#[async_trait]
impl QueryExec for NamespaceData {
type Response = QueryResponse;
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Self::Response, QueryError> {
assert_eq!(
self.namespace_id, namespace_id,
"buffer tree index inconsistency"
);
// Extract the table if it exists.
let inner = self
.table(table_id)
.ok_or(QueryError::TableNotFound(namespace_id, table_id))?;
// Delegate query execution to the namespace, wrapping the execution in
// a tracing delegate to emit a child span.
Ok(QueryResponse::new(
QueryExecTracing::new(inner, "table")
.query_exec(namespace_id, table_id, columns, span)
.await?,
))
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

View File

@ -1,17 +1,71 @@
use std::sync::Arc;
use async_trait::async_trait;
use data_types::NamespaceId;
use data_types::{NamespaceId, TableId};
use dml::DmlOperation;
use metric::U64Counter;
use trace::span::Span;
use super::{
namespace::{name_resolver::NamespaceNameProvider, NamespaceData},
partition::resolver::PartitionProvider,
table::name_resolver::TableNameProvider,
};
use crate::{arcmap::ArcMap, dml_sink::DmlSink};
use crate::{
arcmap::ArcMap,
dml_sink::DmlSink,
query::{response::QueryResponse, tracing::QueryExecTracing, QueryError, QueryExec},
};
/// A [`BufferTree`] is the root of an in-memory tree of many [`NamespaceData`]
/// containing one or more child [`TableData`] nodes, which in turn contain one
/// or more [`PartitionData`] nodes:
///
/// ```text
///
/// ╔════════════════╗
/// ║ BufferTree ║
/// ╚═══════╦════════╝
/// ▼
/// ┌────────────┐
/// │ Namespace ├┐
/// └┬───────────┘├┐
/// └┬───────────┘│
/// └────┬───────┘
/// ▼
/// ┌────────────┐
/// │ Table ├┐
/// └┬───────────┘├┐
/// └┬───────────┘│
/// └────┬───────┘
/// ▼
/// ┌────────────┐
/// │ Partition ├┐
/// └┬───────────┘├┐
/// └┬───────────┘│
/// └────────────┘
/// ```
///
/// A buffer tree is a mutable data structure that implements [`DmlSink`] to
/// apply successive [`DmlOperation`] to its internal state, and makes the
/// materialised result available through a streaming [`QueryExec`] execution.
///
/// # Read Consistency
///
/// When [`BufferTree::query_exec()`] is called for a given table, a snapshot of
/// the table's current set of partitions is created and the data within these
/// partitions will be streamed to the client as they consume the response. New
/// partitions that are created concurrently to the query execution do not ever
/// become visible.
///
/// Concurrent writes during query execution to a partition that forms part of
/// this snapshot will be visible iff the write has been fully applied to the
/// partition's data buffer before the query stream reads the data from that
/// partition. Once a partition has been read, the data within it is immutable
/// from the caller's perspective, and subsequent writes DO NOT become visible.
///
/// [`TableData`]: crate::buffer_tree::table::TableData
/// [`PartitionData`]: crate::buffer_tree::partition::PartitionData
#[derive(Debug)]
pub(crate) struct BufferTree {
/// The resolver of `(table_id, partition_key)` to [`PartitionData`].
@ -95,3 +149,701 @@ impl DmlSink for BufferTree {
namespace_data.apply(op).await
}
}
#[async_trait]
impl QueryExec for BufferTree {
type Response = QueryResponse;
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Self::Response, QueryError> {
// Extract the namespace if it exists.
let inner = self
.namespace(namespace_id)
.ok_or(QueryError::NamespaceNotFound(namespace_id))?;
// Delegate query execution to the namespace, wrapping the execution in
// a tracing delegate to emit a child span.
QueryExecTracing::new(inner, "namespace")
.query_exec(namespace_id, table_id, columns, span)
.await
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use data_types::{PartitionId, PartitionKey};
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use futures::{StreamExt, TryStreamExt};
use metric::{Attributes, Metric};
use super::*;
use crate::{
buffer_tree::{
namespace::{name_resolver::mock::MockNamespaceNameProvider, NamespaceData},
partition::{resolver::mock::MockPartitionProvider, PartitionData, SortKeyState},
table::{name_resolver::mock::MockTableNameProvider, TableName},
},
deferred_load::{self, DeferredLoad},
query::partition_response::PartitionResponse,
test_util::make_write_op,
};
const TABLE_ID: TableId = TableId::new(44);
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
#[tokio::test]
async fn test_namespace_init_table() {
let metrics = Arc::new(metric::Registry::default());
// Configure the mock partition provider to return a partition for this
// table ID.
let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(
PartitionData::new(
PartitionId::new(0),
PartitionKey::from("banana-split"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
));
// Init the namespace
let ns = NamespaceData::new(
NAMESPACE_ID,
DeferredLoad::new(Duration::from_millis(1), async { NAMESPACE_NAME.into() }),
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
partition_provider,
&metrics,
);
// Assert the namespace name was stored
let name = ns.namespace_name().to_string();
assert!(
(name == NAMESPACE_NAME) || (name == deferred_load::UNRESOLVED_DISPLAY_STRING),
"unexpected namespace name: {name}"
);
// Assert the namespace does not contain the test data
assert!(ns.table(TABLE_ID).is_none());
// Write some test data
ns.apply(DmlOperation::Write(make_write_op(
&PartitionKey::from("banana-split"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,city=Madrid day="sun",temp=55 22"#,
)))
.await
.expect("buffer op should succeed");
// Referencing the table should succeed
assert!(ns.table(TABLE_ID).is_some());
// And the table counter metric should increase
let tables = metrics
.get_instrument::<Metric<U64Counter>>("ingester_tables")
.expect("failed to read metric")
.get_observer(&Attributes::from([]))
.expect("failed to get observer")
.fetch();
assert_eq!(tables, 1);
// Ensure the deferred namespace name is loaded.
let name = ns.namespace_name().get().await;
assert_eq!(&**name, NAMESPACE_NAME);
assert_eq!(ns.namespace_name().to_string(), NAMESPACE_NAME);
}
/// Generate a test that performs a set of writes and assert the data within
/// the table with TABLE_ID in the namespace with NAMESPACE_ID.
macro_rules! test_write_query {
(
$name:ident,
partitions = [$($partition:expr), +], // The set of PartitionData for the mock partition provider
writes = [$($write:expr), *], // The set of DmlWrite to apply()
want = $want:expr // The expected results of querying NAMESPACE_ID and TABLE_ID
) => {
paste::paste! {
#[tokio::test]
async fn [<test_write_query_ $name>]() {
// Configure the mock partition provider with the provided
// partitions.
let partition_provider = Arc::new(MockPartitionProvider::default()
$(
.with_partition($partition)
)+
);
// Init the buffer tree
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::default()),
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
partition_provider,
Arc::new(metric::Registry::default()),
);
// Write the provided DmlWrites
$(
buf.apply(DmlOperation::Write($write))
.await
.expect("failed to perform write");
)*
// Execute the query against NAMESPACE_ID and TABLE_ID
let batches = buf
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
.await
.expect("query should succeed")
.into_record_batches()
.try_collect::<Vec<_>>()
.await
.expect("query failed");
// Assert the contents of NAMESPACE_ID and TABLE_ID
assert_batches_sorted_eq!(
$want,
&batches
);
}
}
};
}
// A simple "read your writes" test.
test_write_query!(
read_writes,
partitions = [PartitionData::new(
PartitionId::new(0),
PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)],
writes = [make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,region=Asturias temp=35 4242424242"#,
)],
want = [
"+----------+------+-------------------------------+",
"| region | temp | time |",
"+----------+------+-------------------------------+",
"| Asturias | 35 | 1970-01-01T00:00:04.242424242 |",
"+----------+------+-------------------------------+",
]
);
// A query that ensures the data across multiple partitions within a single
// table are returned.
test_write_query!(
multiple_partitions,
partitions = [
PartitionData::new(
PartitionId::new(0),
PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
PartitionData::new(
PartitionId::new(1),
PartitionKey::from("p2"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)
],
writes = [
make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,region=Madrid temp=35 4242424242"#,
),
make_write_op(
&PartitionKey::from("p2"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,region=Asturias temp=25 4242424242"#,
)
],
want = [
"+----------+------+-------------------------------+",
"| region | temp | time |",
"+----------+------+-------------------------------+",
"| Madrid | 35 | 1970-01-01T00:00:04.242424242 |",
"| Asturias | 25 | 1970-01-01T00:00:04.242424242 |",
"+----------+------+-------------------------------+",
]
);
// A query that ensures the data across multiple namespaces is correctly
// filtered to return only the queried table.
test_write_query!(
filter_multiple_namespaces,
partitions = [
PartitionData::new(
PartitionId::new(0),
PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
PartitionData::new(
PartitionId::new(1),
PartitionKey::from("p2"),
NamespaceId::new(4321), // A different namespace ID.
TableId::new(1234), // A different table ID.
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)
],
writes = [
make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,region=Madrid temp=25 4242424242"#,
),
make_write_op(
&PartitionKey::from("p2"),
NamespaceId::new(4321), // A different namespace ID.
TABLE_NAME,
TableId::new(1234), // A different table ID
0,
r#"bananas,region=Asturias temp=35 4242424242"#,
)
],
want = [
"+--------+------+-------------------------------+",
"| region | temp | time |",
"+--------+------+-------------------------------+",
"| Madrid | 25 | 1970-01-01T00:00:04.242424242 |",
"+--------+------+-------------------------------+",
]
);
// A query that ensures the data across multiple tables (with the same table
// name!) is correctly filtered to return only the queried table.
test_write_query!(
filter_multiple_tabls,
partitions = [
PartitionData::new(
PartitionId::new(0),
PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
PartitionData::new(
PartitionId::new(1),
PartitionKey::from("p2"),
NAMESPACE_ID,
TableId::new(1234), // A different table ID.
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)
],
writes = [
make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,region=Madrid temp=25 4242424242"#,
),
make_write_op(
&PartitionKey::from("p2"),
NAMESPACE_ID,
TABLE_NAME,
TableId::new(1234), // A different table ID
0,
r#"bananas,region=Asturias temp=35 4242424242"#,
)
],
want = [
"+--------+------+-------------------------------+",
"| region | temp | time |",
"+--------+------+-------------------------------+",
"| Madrid | 25 | 1970-01-01T00:00:04.242424242 |",
"+--------+------+-------------------------------+",
]
);
// Assert that no dedupe operations are performed when querying a partition
// that contains duplicate rows for a single series/primary key, but the
// operations maintain their ordering (later writes appear after earlier
// writes).
test_write_query!(
duplicate_writes,
partitions = [PartitionData::new(
PartitionId::new(0),
PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)],
writes = [
make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,region=Asturias temp=35 4242424242"#,
),
make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
1,
r#"bananas,region=Asturias temp=12 4242424242"#,
)
],
want = [
"+----------+------+-------------------------------+",
"| region | temp | time |",
"+----------+------+-------------------------------+",
"| Asturias | 35 | 1970-01-01T00:00:04.242424242 |",
"| Asturias | 12 | 1970-01-01T00:00:04.242424242 |",
"+----------+------+-------------------------------+",
]
);
/// Assert that multiple writes to a single namespace/table results in a
/// single namespace being created, and matching metrics.
#[tokio::test]
async fn test_metrics() {
// Configure the mock partition provider to return a single partition, named
// p1.
let partition_provider = Arc::new(
MockPartitionProvider::default()
.with_partition(PartitionData::new(
PartitionId::new(0),
PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
))
.with_partition(PartitionData::new(
PartitionId::new(0),
PartitionKey::from("p2"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)),
);
let metrics = Arc::new(metric::Registry::default());
// Init the buffer tree
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::default()),
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
partition_provider,
Arc::clone(&metrics),
);
// Write data to partition p1, in table "bananas".
buf.apply(DmlOperation::Write(make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,region=Asturias temp=35 4242424242"#,
)))
.await
.expect("failed to write initial data");
// Write a duplicate record with the same series key & timestamp, but a
// different temp value.
buf.apply(DmlOperation::Write(make_write_op(
&PartitionKey::from("p2"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
1,
r#"bananas,region=Asturias temp=12 4242424242"#,
)))
.await
.expect("failed to overwrite data");
// Validate namespace count
assert_eq!(buf.namespaces.values().len(), 1);
let m = metrics
.get_instrument::<Metric<U64Counter>>("ingester_namespaces")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[]))
.expect("failed to find metric with attributes")
.fetch();
assert_eq!(m, 1, "namespace counter mismatch");
// Validate table count
let m = metrics
.get_instrument::<Metric<U64Counter>>("ingester_tables")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[]))
.expect("failed to find metric with attributes")
.fetch();
assert_eq!(m, 1, "tables counter mismatch");
}
/// Assert the correct "not found" errors are generated for missing
/// table/namespaces, and that querying an entirely empty buffer tree
/// returns no data (as opposed to panicking, etc).
#[tokio::test]
async fn test_not_found() {
let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(
PartitionData::new(
PartitionId::new(0),
PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
));
// Init the BufferTree
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::default()),
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
partition_provider,
Arc::new(metric::Registry::default()),
);
// Query the empty tree
let err = buf
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
.await
.expect_err("query should fail");
assert_matches::assert_matches!(err, QueryError::NamespaceNotFound(ns) => {
assert_eq!(ns, NAMESPACE_ID);
});
// Write data to partition p1, in table "bananas".
buf.apply(DmlOperation::Write(make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,region=Asturias temp=35 4242424242"#,
)))
.await
.expect("failed to write data");
// Ensure an unknown table errors
let err = buf
.query_exec(NAMESPACE_ID, TableId::new(1234), vec![], None)
.await
.expect_err("query should fail");
assert_matches::assert_matches!(err, QueryError::TableNotFound(ns, t) => {
assert_eq!(ns, NAMESPACE_ID);
assert_eq!(t, TableId::new(1234));
});
// Ensure a valid namespace / table does not error
buf.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
.await
.expect("namespace / table should exist");
}
/// This test asserts the read consistency properties defined in the
/// [`BufferTree`] type docs.
///
/// Specifically, this test ensures:
///
/// * A read snapshot of the set of partitions is created during the
/// construction of the query stream. New partitions added (or existing
/// partitions removed) do not change the query results once the stream
/// has been initialised.
/// * Concurrent writes to partitions that form part of the read snapshot
/// become visible if they are ordered/applied before the acquisition of
/// the partition data by the query stream. Writes ordered after the
/// partition lock acquisition do not become readable.
///
/// All writes use the same write timestamp as it is not a factor in
/// ordering of writes.
#[tokio::test]
async fn test_read_consistency() {
// Configure the mock partition provider to return two partitions, named
// p1 and p2.
let partition_provider = Arc::new(
MockPartitionProvider::default()
.with_partition(PartitionData::new(
PartitionId::new(0),
PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
))
.with_partition(PartitionData::new(
PartitionId::new(1),
PartitionKey::from("p2"),
NAMESPACE_ID,
TABLE_ID,
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)),
);
// Init the buffer tree
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::default()),
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
partition_provider,
Arc::new(metric::Registry::default()),
);
// Write data to partition p1, in table "bananas".
buf.apply(DmlOperation::Write(make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
0,
r#"bananas,region=Madrid temp=35 4242424242"#,
)))
.await
.expect("failed to write initial data");
// Execute a query of the buffer tree, generating the result stream, but
// DO NOT consume it.
let stream = buf
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None)
.await
.expect("query should succeed")
.into_partition_stream();
// Perform a write concurrent to the consumption of the query stream
// that creates a new partition (p2) in the same table.
buf.apply(DmlOperation::Write(make_write_op(
&PartitionKey::from("p2"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
1,
r#"bananas,region=Asturias temp=20 4242424242"#,
)))
.await
.expect("failed to perform concurrent write to new partition");
// Perform another write that hits the partition within the query
// results snapshot (p1) before the partition is read.
buf.apply(DmlOperation::Write(make_write_op(
&PartitionKey::from("p1"),
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
2,
r#"bananas,region=Murcia temp=30 4242424242"#,
)))
.await
.expect("failed to perform concurrent write to existing partition");
// Consume the set of partitions within the query stream.
//
// Under the specified query consistency guarantees, both the first and
// third writes (both to p1) should be visible. The second write to p2
// should not be visible.
let mut partitions: Vec<PartitionResponse> = stream.collect().await;
assert_eq!(partitions.len(), 1); // only p1, not p2
let partition = partitions.pop().unwrap();
// Perform the partition read
let batches =
datafusion::physical_plan::common::collect(partition.into_record_batch_stream())
.await
.expect("failed to collate query results");
// Assert the contents of p1 contains both the initial write, and the
// 3rd write in a single RecordBatch.
assert_batches_eq!(
[
"+--------+------+-------------------------------+",
"| region | temp | time |",
"+--------+------+-------------------------------+",
"| Madrid | 35 | 1970-01-01T00:00:04.242424242 |",
"| Murcia | 30 | 1970-01-01T00:00:04.242424242 |",
"+--------+------+-------------------------------+",
],
&batches
);
}
}

View File

@ -4,12 +4,22 @@ pub(crate) mod name_resolver;
use std::sync::Arc;
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId};
use datafusion_util::MemoryStream;
use mutable_batch::MutableBatch;
use parking_lot::{Mutex, RwLock};
use schema::Projection;
use trace::span::{Span, SpanRecorder};
use super::partition::{resolver::PartitionProvider, PartitionData};
use crate::{arcmap::ArcMap, deferred_load::DeferredLoad};
use crate::{
arcmap::ArcMap,
deferred_load::DeferredLoad,
query::{
partition_response::PartitionResponse, response::PartitionStream, QueryError, QueryExec,
},
};
/// A double-referenced map where [`PartitionData`] can be looked up by
/// [`PartitionKey`], or ID.
@ -172,10 +182,7 @@ impl TableData {
}
/// Return the [`PartitionData`] for the specified ID.
pub(crate) fn get_partition(
&self,
partition_id: PartitionId,
) -> Option<Arc<Mutex<PartitionData>>> {
pub(crate) fn partition(&self, partition_id: PartitionId) -> Option<Arc<Mutex<PartitionData>>> {
self.partition_data.read().by_id(partition_id)
}
@ -203,6 +210,57 @@ impl TableData {
}
}
#[async_trait]
impl QueryExec for TableData {
type Response = PartitionStream;
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Self::Response, QueryError> {
assert_eq!(self.table_id, table_id, "buffer tree index inconsistency");
assert_eq!(
self.namespace_id, namespace_id,
"buffer tree index inconsistency"
);
// Gather the partition data from all of the partitions in this table.
let partitions = self.partitions().into_iter().filter_map(move |p| {
let mut span = SpanRecorder::new(span.clone().map(|s| s.child("partition read")));
let (id, data) = {
let mut p = p.lock();
(p.partition_id(), p.get_query_data()?)
};
assert_eq!(id, data.partition_id());
// Project the data if necessary
let columns = columns.iter().map(String::as_str).collect::<Vec<_>>();
let selection = if columns.is_empty() {
Projection::All
} else {
Projection::Some(columns.as_ref())
};
let ret = PartitionResponse::new(
Box::pin(MemoryStream::new(
data.project_selection(selection).into_iter().collect(),
)),
id,
None,
);
span.ok("read partition data");
Some(ret)
});
Ok(PartitionStream::new(futures::stream::iter(partitions)))
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

View File

@ -17,13 +17,15 @@ impl QueryRunner {
#[async_trait]
impl QueryExec for QueryRunner {
type Response = QueryResponse;
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<QueryResponse, QueryError> {
) -> Result<Self::Response, QueryError> {
let mut _span_recorder = SpanRecorder::new(span);
info!(

View File

@ -4,7 +4,7 @@ use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
use trace::span::Span;
use super::{response::QueryResponse, QueryExec};
use super::QueryExec;
use crate::query::QueryError;
/// An instrumentation decorator over a [`QueryExec`] implementation.
@ -49,6 +49,8 @@ where
T: QueryExec,
P: TimeProvider,
{
type Response = T::Response;
#[inline(always)]
async fn query_exec(
&self,
@ -56,7 +58,7 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<QueryResponse, QueryError> {
) -> Result<Self::Response, QueryError> {
let t = self.time_provider.now();
let res = self
@ -83,7 +85,10 @@ mod tests {
use metric::Attributes;
use super::*;
use crate::query::{mock_query_exec::MockQueryExec, response::PartitionStream};
use crate::query::{
mock_query_exec::MockQueryExec,
response::{PartitionStream, QueryResponse},
};
macro_rules! test_metric {
(

View File

@ -19,13 +19,15 @@ impl MockQueryExec {
#[async_trait]
impl QueryExec for MockQueryExec {
type Response = QueryResponse;
async fn query_exec(
&self,
_namespace_id: NamespaceId,
_table_id: TableId,
_columns: Vec<String>,
_span: Option<Span>,
) -> Result<QueryResponse, QueryError> {
) -> Result<Self::Response, QueryError> {
self.response
.lock()
.take()

View File

@ -4,7 +4,7 @@ use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use trace::span::{Span, SpanRecorder};
use super::{response::QueryResponse, QueryExec};
use super::QueryExec;
use crate::query::QueryError;
/// An tracing decorator over a [`QueryExec`] implementation.
@ -33,6 +33,8 @@ impl<T> QueryExec for QueryExecTracing<T>
where
T: QueryExec,
{
type Response = T::Response;
#[inline(always)]
async fn query_exec(
&self,
@ -40,7 +42,7 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<QueryResponse, QueryError> {
) -> Result<Self::Response, QueryError> {
let span = span.map(|s| s.child(self.name.clone()));
let mut recorder = SpanRecorder::new(span.clone());

View File

@ -5,8 +5,6 @@ use data_types::{NamespaceId, TableId};
use thiserror::Error;
use trace::span::Span;
use super::response::QueryResponse;
#[derive(Debug, Error)]
#[allow(missing_copy_implementations)]
pub(crate) enum QueryError {
@ -19,13 +17,15 @@ pub(crate) enum QueryError {
#[async_trait]
pub(crate) trait QueryExec: Send + Sync + Debug {
type Response: Send + Debug;
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<QueryResponse, QueryError>;
) -> Result<Self::Response, QueryError>;
}
#[async_trait]
@ -33,13 +33,15 @@ impl<T> QueryExec for Arc<T>
where
T: QueryExec,
{
type Response = T::Response;
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<QueryResponse, QueryError> {
) -> Result<Self::Response, QueryError> {
self.deref()
.query_exec(namespace_id, table_id, columns, span)
.await

View File

@ -13,7 +13,11 @@ use generated_types::influxdata::iox::{
use iox_catalog::interface::Catalog;
use service_grpc_catalog::CatalogService;
use crate::{dml_sink::DmlSink, init::IngesterRpcInterface, query::QueryExec};
use crate::{
dml_sink::DmlSink,
init::IngesterRpcInterface,
query::{response::QueryResponse, QueryExec},
};
use self::rpc_write::RpcWrite;
@ -31,7 +35,7 @@ pub(crate) struct GrpcDelegate<D, Q> {
impl<D, Q> GrpcDelegate<D, Q>
where
D: DmlSink + 'static,
Q: QueryExec + 'static,
Q: QueryExec<Response = QueryResponse> + 'static,
{
/// Initialise a new [`GrpcDelegate`].
pub(crate) fn new(dml_sink: Arc<D>, query_exec: Arc<Q>) -> Self {
@ -47,7 +51,7 @@ where
impl<D, Q> IngesterRpcInterface for GrpcDelegate<D, Q>
where
D: DmlSink + 'static,
Q: QueryExec + 'static,
Q: QueryExec<Response = QueryResponse> + 'static,
{
type CatalogHandler = CatalogService;
type WriteHandler = RpcWrite<Arc<D>>;

View File

@ -135,7 +135,7 @@ type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send
#[tonic::async_trait]
impl<Q> Flight for FlightService<Q>
where
Q: QueryExec + 'static,
Q: QueryExec<Response = QueryResponse> + 'static,
{
type HandshakeStream = TonicStream<HandshakeResponse>;
type ListFlightsStream = TonicStream<FlightInfo>;