From de6f0468d8248ac4e30dc53e534c18e0f6ab79a8 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 25 Nov 2022 21:40:43 +0100 Subject: [PATCH 1/2] refactor: associated QueryExec return type Allow the return type of the QueryExec trait's query_exec() method to be parametrised by the implementer. This allows the trait to be reused across different data sources that return differing concrete types. --- ingester2/src/query/exec.rs | 4 +++- ingester2/src/query/instrumentation.rs | 11 ++++++++--- ingester2/src/query/mock_query_exec.rs | 4 +++- ingester2/src/query/tracing.rs | 6 ++++-- ingester2/src/query/trait.rs | 10 ++++++---- ingester2/src/server/grpc.rs | 10 +++++++--- ingester2/src/server/grpc/query.rs | 2 +- 7 files changed, 32 insertions(+), 15 deletions(-) diff --git a/ingester2/src/query/exec.rs b/ingester2/src/query/exec.rs index e0e3f3c5df..b0693ea3d0 100644 --- a/ingester2/src/query/exec.rs +++ b/ingester2/src/query/exec.rs @@ -17,13 +17,15 @@ impl QueryRunner { #[async_trait] impl QueryExec for QueryRunner { + type Response = QueryResponse; + async fn query_exec( &self, namespace_id: NamespaceId, table_id: TableId, columns: Vec, span: Option, - ) -> Result { + ) -> Result { let mut _span_recorder = SpanRecorder::new(span); info!( diff --git a/ingester2/src/query/instrumentation.rs b/ingester2/src/query/instrumentation.rs index 35379fa34e..bb97aee7ca 100644 --- a/ingester2/src/query/instrumentation.rs +++ b/ingester2/src/query/instrumentation.rs @@ -4,7 +4,7 @@ use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; use trace::span::Span; -use super::{response::QueryResponse, QueryExec}; +use super::QueryExec; use crate::query::QueryError; /// An instrumentation decorator over a [`QueryExec`] implementation. @@ -49,6 +49,8 @@ where T: QueryExec, P: TimeProvider, { + type Response = T::Response; + #[inline(always)] async fn query_exec( &self, @@ -56,7 +58,7 @@ where table_id: TableId, columns: Vec, span: Option, - ) -> Result { + ) -> Result { let t = self.time_provider.now(); let res = self @@ -83,7 +85,10 @@ mod tests { use metric::Attributes; use super::*; - use crate::query::{mock_query_exec::MockQueryExec, response::PartitionStream}; + use crate::query::{ + mock_query_exec::MockQueryExec, + response::{PartitionStream, QueryResponse}, + }; macro_rules! test_metric { ( diff --git a/ingester2/src/query/mock_query_exec.rs b/ingester2/src/query/mock_query_exec.rs index c4e9602eb2..580e139f02 100644 --- a/ingester2/src/query/mock_query_exec.rs +++ b/ingester2/src/query/mock_query_exec.rs @@ -19,13 +19,15 @@ impl MockQueryExec { #[async_trait] impl QueryExec for MockQueryExec { + type Response = QueryResponse; + async fn query_exec( &self, _namespace_id: NamespaceId, _table_id: TableId, _columns: Vec, _span: Option, - ) -> Result { + ) -> Result { self.response .lock() .take() diff --git a/ingester2/src/query/tracing.rs b/ingester2/src/query/tracing.rs index 08355d8043..59a76f4f37 100644 --- a/ingester2/src/query/tracing.rs +++ b/ingester2/src/query/tracing.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use data_types::{NamespaceId, TableId}; use trace::span::{Span, SpanRecorder}; -use super::{response::QueryResponse, QueryExec}; +use super::QueryExec; use crate::query::QueryError; /// An tracing decorator over a [`QueryExec`] implementation. @@ -33,6 +33,8 @@ impl QueryExec for QueryExecTracing where T: QueryExec, { + type Response = T::Response; + #[inline(always)] async fn query_exec( &self, @@ -40,7 +42,7 @@ where table_id: TableId, columns: Vec, span: Option, - ) -> Result { + ) -> Result { let span = span.map(|s| s.child(self.name.clone())); let mut recorder = SpanRecorder::new(span.clone()); diff --git a/ingester2/src/query/trait.rs b/ingester2/src/query/trait.rs index aede8f3595..30655eff66 100644 --- a/ingester2/src/query/trait.rs +++ b/ingester2/src/query/trait.rs @@ -5,8 +5,6 @@ use data_types::{NamespaceId, TableId}; use thiserror::Error; use trace::span::Span; -use super::response::QueryResponse; - #[derive(Debug, Error)] #[allow(missing_copy_implementations)] pub(crate) enum QueryError { @@ -19,13 +17,15 @@ pub(crate) enum QueryError { #[async_trait] pub(crate) trait QueryExec: Send + Sync + Debug { + type Response: Send + Debug; + async fn query_exec( &self, namespace_id: NamespaceId, table_id: TableId, columns: Vec, span: Option, - ) -> Result; + ) -> Result; } #[async_trait] @@ -33,13 +33,15 @@ impl QueryExec for Arc where T: QueryExec, { + type Response = T::Response; + async fn query_exec( &self, namespace_id: NamespaceId, table_id: TableId, columns: Vec, span: Option, - ) -> Result { + ) -> Result { self.deref() .query_exec(namespace_id, table_id, columns, span) .await diff --git a/ingester2/src/server/grpc.rs b/ingester2/src/server/grpc.rs index 69002a1a32..a7d403e73f 100644 --- a/ingester2/src/server/grpc.rs +++ b/ingester2/src/server/grpc.rs @@ -13,7 +13,11 @@ use generated_types::influxdata::iox::{ use iox_catalog::interface::Catalog; use service_grpc_catalog::CatalogService; -use crate::{dml_sink::DmlSink, init::IngesterRpcInterface, query::QueryExec}; +use crate::{ + dml_sink::DmlSink, + init::IngesterRpcInterface, + query::{response::QueryResponse, QueryExec}, +}; use self::rpc_write::RpcWrite; @@ -31,7 +35,7 @@ pub(crate) struct GrpcDelegate { impl GrpcDelegate where D: DmlSink + 'static, - Q: QueryExec + 'static, + Q: QueryExec + 'static, { /// Initialise a new [`GrpcDelegate`]. pub(crate) fn new(dml_sink: Arc, query_exec: Arc) -> Self { @@ -47,7 +51,7 @@ where impl IngesterRpcInterface for GrpcDelegate where D: DmlSink + 'static, - Q: QueryExec + 'static, + Q: QueryExec + 'static, { type CatalogHandler = CatalogService; type WriteHandler = RpcWrite>; diff --git a/ingester2/src/server/grpc/query.rs b/ingester2/src/server/grpc/query.rs index efb5fe3e88..a8fad38823 100644 --- a/ingester2/src/server/grpc/query.rs +++ b/ingester2/src/server/grpc/query.rs @@ -133,7 +133,7 @@ type TonicStream = Pin> + Send #[tonic::async_trait] impl Flight for FlightService where - Q: QueryExec + 'static, + Q: QueryExec + 'static, { type HandshakeStream = TonicStream; type ListFlightsStream = TonicStream; From 95216055d8bbca2b36149f225fcf1de71c08b87b Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 25 Nov 2022 15:35:39 +0100 Subject: [PATCH 2/2] perf(ingester2): stream BufferTree partition data This commit implements the QueryExec trait for the BufferTree, allow it to be queried for the partition data it contains. With this change, the BufferTree now provides "read your writes" functionality. Notably the implementation streams the contents of individual partitions to the caller on demand (pull-based execution), deferring acquiring the partition lock until actually necessary and minimising the duration of time a strong reference to a specific RecordBatch is held in order to minimise the memory overhead. During query execution a client sees a consistent snapshot of partitions: once a client begins streaming the query response, incoming writes that create new partitions do not become visible. However incoming writes to an existing partition that forms part of the snapshot set become visible iff they are ordered before the acquisition of the partition lock when streaming that partition data to the client. --- ingester2/Cargo.toml | 1 + ingester2/src/buffer_tree/namespace.rs | 44 +- ingester2/src/buffer_tree/root.rs | 756 ++++++++++++++++++++++++- ingester2/src/buffer_tree/table.rs | 68 ++- 4 files changed, 856 insertions(+), 13 deletions(-) diff --git a/ingester2/Cargo.toml b/ingester2/Cargo.toml index 98b26da5c0..87583b472f 100644 --- a/ingester2/Cargo.toml +++ b/ingester2/Cargo.toml @@ -14,6 +14,7 @@ backoff = { version = "0.1.0", path = "../backoff" } bytes = "1.3.0" data_types = { version = "0.1.0", path = "../data_types" } datafusion.workspace = true +datafusion_util = { path = "../datafusion_util" } dml = { version = "0.1.0", path = "../dml" } flatbuffers = "22" futures = "0.3.25" diff --git a/ingester2/src/buffer_tree/namespace.rs b/ingester2/src/buffer_tree/namespace.rs index c383835ae6..a73766a384 100644 --- a/ingester2/src/buffer_tree/namespace.rs +++ b/ingester2/src/buffer_tree/namespace.rs @@ -9,12 +9,18 @@ use data_types::{NamespaceId, TableId}; use dml::DmlOperation; use metric::U64Counter; use observability_deps::tracing::warn; +use trace::span::Span; use super::{ partition::resolver::PartitionProvider, table::{name_resolver::TableNameProvider, TableData}, }; -use crate::{arcmap::ArcMap, deferred_load::DeferredLoad, dml_sink::DmlSink}; +use crate::{ + arcmap::ArcMap, + deferred_load::DeferredLoad, + dml_sink::DmlSink, + query::{response::QueryResponse, tracing::QueryExecTracing, QueryError, QueryExec}, +}; /// The string name / identifier of a Namespace. /// @@ -106,11 +112,6 @@ impl NamespaceData { self.namespace_id } - #[cfg(test)] - pub(super) fn table_count(&self) -> &U64Counter { - &self.table_count - } - /// Returns the [`NamespaceName`] for this namespace. pub(crate) fn namespace_name(&self) -> &DeferredLoad { &self.namespace_name @@ -168,6 +169,37 @@ impl DmlSink for NamespaceData { } } +#[async_trait] +impl QueryExec for NamespaceData { + type Response = QueryResponse; + + async fn query_exec( + &self, + namespace_id: NamespaceId, + table_id: TableId, + columns: Vec, + span: Option, + ) -> Result { + assert_eq!( + self.namespace_id, namespace_id, + "buffer tree index inconsistency" + ); + + // Extract the table if it exists. + let inner = self + .table(table_id) + .ok_or(QueryError::TableNotFound(namespace_id, table_id))?; + + // Delegate query execution to the namespace, wrapping the execution in + // a tracing delegate to emit a child span. + Ok(QueryResponse::new( + QueryExecTracing::new(inner, "table") + .query_exec(namespace_id, table_id, columns, span) + .await?, + )) + } +} + #[cfg(test)] mod tests { use std::{sync::Arc, time::Duration}; diff --git a/ingester2/src/buffer_tree/root.rs b/ingester2/src/buffer_tree/root.rs index 773a994ca6..076c196f8d 100644 --- a/ingester2/src/buffer_tree/root.rs +++ b/ingester2/src/buffer_tree/root.rs @@ -1,17 +1,71 @@ use std::sync::Arc; use async_trait::async_trait; -use data_types::NamespaceId; +use data_types::{NamespaceId, TableId}; use dml::DmlOperation; use metric::U64Counter; +use trace::span::Span; use super::{ namespace::{name_resolver::NamespaceNameProvider, NamespaceData}, partition::resolver::PartitionProvider, table::name_resolver::TableNameProvider, }; -use crate::{arcmap::ArcMap, dml_sink::DmlSink}; +use crate::{ + arcmap::ArcMap, + dml_sink::DmlSink, + query::{response::QueryResponse, tracing::QueryExecTracing, QueryError, QueryExec}, +}; +/// A [`BufferTree`] is the root of an in-memory tree of many [`NamespaceData`] +/// containing one or more child [`TableData`] nodes, which in turn contain one +/// or more [`PartitionData`] nodes: +/// +/// ```text +/// +/// ╔════════════════╗ +/// ║ BufferTree ║ +/// ╚═══════╦════════╝ +/// ▼ +/// ┌────────────┐ +/// │ Namespace ├┐ +/// └┬───────────┘├┐ +/// └┬───────────┘│ +/// └────┬───────┘ +/// ▼ +/// ┌────────────┐ +/// │ Table ├┐ +/// └┬───────────┘├┐ +/// └┬───────────┘│ +/// └────┬───────┘ +/// ▼ +/// ┌────────────┐ +/// │ Partition ├┐ +/// └┬───────────┘├┐ +/// └┬───────────┘│ +/// └────────────┘ +/// ``` +/// +/// A buffer tree is a mutable data structure that implements [`DmlSink`] to +/// apply successive [`DmlOperation`] to its internal state, and makes the +/// materialised result available through a streaming [`QueryExec`] execution. +/// +/// # Read Consistency +/// +/// When [`BufferTree::query_exec()`] is called for a given table, a snapshot of +/// the table's current set of partitions is created and the data within these +/// partitions will be streamed to the client as they consume the response. New +/// partitions that are created concurrently to the query execution do not ever +/// become visible. +/// +/// Concurrent writes during query execution to a partition that forms part of +/// this snapshot will be visible iff the write has been fully applied to the +/// partition's data buffer before the query stream reads the data from that +/// partition. Once a partition has been read, the data within it is immutable +/// from the caller's perspective, and subsequent writes DO NOT become visible. +/// +/// [`TableData`]: crate::buffer_tree::table::TableData +/// [`PartitionData`]: crate::buffer_tree::partition::PartitionData #[derive(Debug)] pub(crate) struct BufferTree { /// The resolver of `(table_id, partition_key)` to [`PartitionData`]. @@ -95,3 +149,701 @@ impl DmlSink for BufferTree { namespace_data.apply(op).await } } + +#[async_trait] +impl QueryExec for BufferTree { + type Response = QueryResponse; + + async fn query_exec( + &self, + namespace_id: NamespaceId, + table_id: TableId, + columns: Vec, + span: Option, + ) -> Result { + // Extract the namespace if it exists. + let inner = self + .namespace(namespace_id) + .ok_or(QueryError::NamespaceNotFound(namespace_id))?; + + // Delegate query execution to the namespace, wrapping the execution in + // a tracing delegate to emit a child span. + QueryExecTracing::new(inner, "namespace") + .query_exec(namespace_id, table_id, columns, span) + .await + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + use data_types::{PartitionId, PartitionKey}; + use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; + use futures::{StreamExt, TryStreamExt}; + use metric::{Attributes, Metric}; + + use super::*; + use crate::{ + buffer_tree::{ + namespace::{name_resolver::mock::MockNamespaceNameProvider, NamespaceData}, + partition::{resolver::mock::MockPartitionProvider, PartitionData, SortKeyState}, + table::{name_resolver::mock::MockTableNameProvider, TableName}, + }, + deferred_load::{self, DeferredLoad}, + query::partition_response::PartitionResponse, + test_util::make_write_op, + }; + + const TABLE_ID: TableId = TableId::new(44); + const TABLE_NAME: &str = "bananas"; + const NAMESPACE_NAME: &str = "platanos"; + const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + + #[tokio::test] + async fn test_namespace_init_table() { + let metrics = Arc::new(metric::Registry::default()); + + // Configure the mock partition provider to return a partition for this + // table ID. + let partition_provider = Arc::new(MockPartitionProvider::default().with_partition( + PartitionData::new( + PartitionId::new(0), + PartitionKey::from("banana-split"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + ), + )); + + // Init the namespace + let ns = NamespaceData::new( + NAMESPACE_ID, + DeferredLoad::new(Duration::from_millis(1), async { NAMESPACE_NAME.into() }), + Arc::new(MockTableNameProvider::new(TABLE_NAME)), + partition_provider, + &metrics, + ); + + // Assert the namespace name was stored + let name = ns.namespace_name().to_string(); + assert!( + (name == NAMESPACE_NAME) || (name == deferred_load::UNRESOLVED_DISPLAY_STRING), + "unexpected namespace name: {name}" + ); + + // Assert the namespace does not contain the test data + assert!(ns.table(TABLE_ID).is_none()); + + // Write some test data + ns.apply(DmlOperation::Write(make_write_op( + &PartitionKey::from("banana-split"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,city=Madrid day="sun",temp=55 22"#, + ))) + .await + .expect("buffer op should succeed"); + + // Referencing the table should succeed + assert!(ns.table(TABLE_ID).is_some()); + + // And the table counter metric should increase + let tables = metrics + .get_instrument::>("ingester_tables") + .expect("failed to read metric") + .get_observer(&Attributes::from([])) + .expect("failed to get observer") + .fetch(); + assert_eq!(tables, 1); + + // Ensure the deferred namespace name is loaded. + let name = ns.namespace_name().get().await; + assert_eq!(&**name, NAMESPACE_NAME); + assert_eq!(ns.namespace_name().to_string(), NAMESPACE_NAME); + } + + /// Generate a test that performs a set of writes and assert the data within + /// the table with TABLE_ID in the namespace with NAMESPACE_ID. + macro_rules! test_write_query { + ( + $name:ident, + partitions = [$($partition:expr), +], // The set of PartitionData for the mock partition provider + writes = [$($write:expr), *], // The set of DmlWrite to apply() + want = $want:expr // The expected results of querying NAMESPACE_ID and TABLE_ID + ) => { + paste::paste! { + #[tokio::test] + async fn []() { + // Configure the mock partition provider with the provided + // partitions. + let partition_provider = Arc::new(MockPartitionProvider::default() + $( + .with_partition($partition) + )+ + ); + + // Init the buffer tree + let buf = BufferTree::new( + Arc::new(MockNamespaceNameProvider::default()), + Arc::new(MockTableNameProvider::new(TABLE_NAME)), + partition_provider, + Arc::new(metric::Registry::default()), + ); + + // Write the provided DmlWrites + $( + buf.apply(DmlOperation::Write($write)) + .await + .expect("failed to perform write"); + )* + + // Execute the query against NAMESPACE_ID and TABLE_ID + let batches = buf + .query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) + .await + .expect("query should succeed") + .into_record_batches() + .try_collect::>() + .await + .expect("query failed"); + + // Assert the contents of NAMESPACE_ID and TABLE_ID + assert_batches_sorted_eq!( + $want, + &batches + ); + } + } + }; + } + + // A simple "read your writes" test. + test_write_query!( + read_writes, + partitions = [PartitionData::new( + PartitionId::new(0), + PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + )], + writes = [make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,region=Asturias temp=35 4242424242"#, + )], + want = [ + "+----------+------+-------------------------------+", + "| region | temp | time |", + "+----------+------+-------------------------------+", + "| Asturias | 35 | 1970-01-01T00:00:04.242424242 |", + "+----------+------+-------------------------------+", + ] + ); + + // A query that ensures the data across multiple partitions within a single + // table are returned. + test_write_query!( + multiple_partitions, + partitions = [ + PartitionData::new( + PartitionId::new(0), + PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + ), + PartitionData::new( + PartitionId::new(1), + PartitionKey::from("p2"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + ) + ], + writes = [ + make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,region=Madrid temp=35 4242424242"#, + ), + make_write_op( + &PartitionKey::from("p2"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,region=Asturias temp=25 4242424242"#, + ) + ], + want = [ + "+----------+------+-------------------------------+", + "| region | temp | time |", + "+----------+------+-------------------------------+", + "| Madrid | 35 | 1970-01-01T00:00:04.242424242 |", + "| Asturias | 25 | 1970-01-01T00:00:04.242424242 |", + "+----------+------+-------------------------------+", + ] + ); + + // A query that ensures the data across multiple namespaces is correctly + // filtered to return only the queried table. + test_write_query!( + filter_multiple_namespaces, + partitions = [ + PartitionData::new( + PartitionId::new(0), + PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + ), + PartitionData::new( + PartitionId::new(1), + PartitionKey::from("p2"), + NamespaceId::new(4321), // A different namespace ID. + TableId::new(1234), // A different table ID. + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + ) + ], + writes = [ + make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,region=Madrid temp=25 4242424242"#, + ), + make_write_op( + &PartitionKey::from("p2"), + NamespaceId::new(4321), // A different namespace ID. + TABLE_NAME, + TableId::new(1234), // A different table ID + 0, + r#"bananas,region=Asturias temp=35 4242424242"#, + ) + ], + want = [ + "+--------+------+-------------------------------+", + "| region | temp | time |", + "+--------+------+-------------------------------+", + "| Madrid | 25 | 1970-01-01T00:00:04.242424242 |", + "+--------+------+-------------------------------+", + ] + ); + + // A query that ensures the data across multiple tables (with the same table + // name!) is correctly filtered to return only the queried table. + test_write_query!( + filter_multiple_tabls, + partitions = [ + PartitionData::new( + PartitionId::new(0), + PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + ), + PartitionData::new( + PartitionId::new(1), + PartitionKey::from("p2"), + NAMESPACE_ID, + TableId::new(1234), // A different table ID. + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + ) + ], + writes = [ + make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,region=Madrid temp=25 4242424242"#, + ), + make_write_op( + &PartitionKey::from("p2"), + NAMESPACE_ID, + TABLE_NAME, + TableId::new(1234), // A different table ID + 0, + r#"bananas,region=Asturias temp=35 4242424242"#, + ) + ], + want = [ + "+--------+------+-------------------------------+", + "| region | temp | time |", + "+--------+------+-------------------------------+", + "| Madrid | 25 | 1970-01-01T00:00:04.242424242 |", + "+--------+------+-------------------------------+", + ] + ); + + // Assert that no dedupe operations are performed when querying a partition + // that contains duplicate rows for a single series/primary key, but the + // operations maintain their ordering (later writes appear after earlier + // writes). + test_write_query!( + duplicate_writes, + partitions = [PartitionData::new( + PartitionId::new(0), + PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + )], + writes = [ + make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,region=Asturias temp=35 4242424242"#, + ), + make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 1, + r#"bananas,region=Asturias temp=12 4242424242"#, + ) + ], + want = [ + "+----------+------+-------------------------------+", + "| region | temp | time |", + "+----------+------+-------------------------------+", + "| Asturias | 35 | 1970-01-01T00:00:04.242424242 |", + "| Asturias | 12 | 1970-01-01T00:00:04.242424242 |", + "+----------+------+-------------------------------+", + ] + ); + + /// Assert that multiple writes to a single namespace/table results in a + /// single namespace being created, and matching metrics. + #[tokio::test] + async fn test_metrics() { + // Configure the mock partition provider to return a single partition, named + // p1. + let partition_provider = Arc::new( + MockPartitionProvider::default() + .with_partition(PartitionData::new( + PartitionId::new(0), + PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + )) + .with_partition(PartitionData::new( + PartitionId::new(0), + PartitionKey::from("p2"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + )), + ); + + let metrics = Arc::new(metric::Registry::default()); + + // Init the buffer tree + let buf = BufferTree::new( + Arc::new(MockNamespaceNameProvider::default()), + Arc::new(MockTableNameProvider::new(TABLE_NAME)), + partition_provider, + Arc::clone(&metrics), + ); + + // Write data to partition p1, in table "bananas". + buf.apply(DmlOperation::Write(make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,region=Asturias temp=35 4242424242"#, + ))) + .await + .expect("failed to write initial data"); + + // Write a duplicate record with the same series key & timestamp, but a + // different temp value. + buf.apply(DmlOperation::Write(make_write_op( + &PartitionKey::from("p2"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 1, + r#"bananas,region=Asturias temp=12 4242424242"#, + ))) + .await + .expect("failed to overwrite data"); + + // Validate namespace count + assert_eq!(buf.namespaces.values().len(), 1); + let m = metrics + .get_instrument::>("ingester_namespaces") + .expect("failed to read metric") + .get_observer(&Attributes::from(&[])) + .expect("failed to find metric with attributes") + .fetch(); + assert_eq!(m, 1, "namespace counter mismatch"); + + // Validate table count + let m = metrics + .get_instrument::>("ingester_tables") + .expect("failed to read metric") + .get_observer(&Attributes::from(&[])) + .expect("failed to find metric with attributes") + .fetch(); + assert_eq!(m, 1, "tables counter mismatch"); + } + + /// Assert the correct "not found" errors are generated for missing + /// table/namespaces, and that querying an entirely empty buffer tree + /// returns no data (as opposed to panicking, etc). + #[tokio::test] + async fn test_not_found() { + let partition_provider = Arc::new(MockPartitionProvider::default().with_partition( + PartitionData::new( + PartitionId::new(0), + PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + ), + )); + + // Init the BufferTree + let buf = BufferTree::new( + Arc::new(MockNamespaceNameProvider::default()), + Arc::new(MockTableNameProvider::new(TABLE_NAME)), + partition_provider, + Arc::new(metric::Registry::default()), + ); + + // Query the empty tree + let err = buf + .query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) + .await + .expect_err("query should fail"); + assert_matches::assert_matches!(err, QueryError::NamespaceNotFound(ns) => { + assert_eq!(ns, NAMESPACE_ID); + }); + + // Write data to partition p1, in table "bananas". + buf.apply(DmlOperation::Write(make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,region=Asturias temp=35 4242424242"#, + ))) + .await + .expect("failed to write data"); + + // Ensure an unknown table errors + let err = buf + .query_exec(NAMESPACE_ID, TableId::new(1234), vec![], None) + .await + .expect_err("query should fail"); + assert_matches::assert_matches!(err, QueryError::TableNotFound(ns, t) => { + assert_eq!(ns, NAMESPACE_ID); + assert_eq!(t, TableId::new(1234)); + }); + + // Ensure a valid namespace / table does not error + buf.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) + .await + .expect("namespace / table should exist"); + } + + /// This test asserts the read consistency properties defined in the + /// [`BufferTree`] type docs. + /// + /// Specifically, this test ensures: + /// + /// * A read snapshot of the set of partitions is created during the + /// construction of the query stream. New partitions added (or existing + /// partitions removed) do not change the query results once the stream + /// has been initialised. + /// * Concurrent writes to partitions that form part of the read snapshot + /// become visible if they are ordered/applied before the acquisition of + /// the partition data by the query stream. Writes ordered after the + /// partition lock acquisition do not become readable. + /// + /// All writes use the same write timestamp as it is not a factor in + /// ordering of writes. + #[tokio::test] + async fn test_read_consistency() { + // Configure the mock partition provider to return two partitions, named + // p1 and p2. + let partition_provider = Arc::new( + MockPartitionProvider::default() + .with_partition(PartitionData::new( + PartitionId::new(0), + PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + )) + .with_partition(PartitionData::new( + PartitionId::new(1), + PartitionKey::from("p2"), + NAMESPACE_ID, + TABLE_ID, + Arc::new(DeferredLoad::new(Duration::from_secs(1), async { + TableName::from(TABLE_NAME) + })), + SortKeyState::Provided(None), + None, + )), + ); + + // Init the buffer tree + let buf = BufferTree::new( + Arc::new(MockNamespaceNameProvider::default()), + Arc::new(MockTableNameProvider::new(TABLE_NAME)), + partition_provider, + Arc::new(metric::Registry::default()), + ); + + // Write data to partition p1, in table "bananas". + buf.apply(DmlOperation::Write(make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 0, + r#"bananas,region=Madrid temp=35 4242424242"#, + ))) + .await + .expect("failed to write initial data"); + + // Execute a query of the buffer tree, generating the result stream, but + // DO NOT consume it. + let stream = buf + .query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) + .await + .expect("query should succeed") + .into_partition_stream(); + + // Perform a write concurrent to the consumption of the query stream + // that creates a new partition (p2) in the same table. + buf.apply(DmlOperation::Write(make_write_op( + &PartitionKey::from("p2"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 1, + r#"bananas,region=Asturias temp=20 4242424242"#, + ))) + .await + .expect("failed to perform concurrent write to new partition"); + + // Perform another write that hits the partition within the query + // results snapshot (p1) before the partition is read. + buf.apply(DmlOperation::Write(make_write_op( + &PartitionKey::from("p1"), + NAMESPACE_ID, + TABLE_NAME, + TABLE_ID, + 2, + r#"bananas,region=Murcia temp=30 4242424242"#, + ))) + .await + .expect("failed to perform concurrent write to existing partition"); + + // Consume the set of partitions within the query stream. + // + // Under the specified query consistency guarantees, both the first and + // third writes (both to p1) should be visible. The second write to p2 + // should not be visible. + let mut partitions: Vec = stream.collect().await; + assert_eq!(partitions.len(), 1); // only p1, not p2 + let partition = partitions.pop().unwrap(); + + // Perform the partition read + let batches = + datafusion::physical_plan::common::collect(partition.into_record_batch_stream()) + .await + .expect("failed to collate query results"); + + // Assert the contents of p1 contains both the initial write, and the + // 3rd write in a single RecordBatch. + assert_batches_eq!( + [ + "+--------+------+-------------------------------+", + "| region | temp | time |", + "+--------+------+-------------------------------+", + "| Madrid | 35 | 1970-01-01T00:00:04.242424242 |", + "| Murcia | 30 | 1970-01-01T00:00:04.242424242 |", + "+--------+------+-------------------------------+", + ], + &batches + ); + } +} diff --git a/ingester2/src/buffer_tree/table.rs b/ingester2/src/buffer_tree/table.rs index 9e24c27615..244ea7b785 100644 --- a/ingester2/src/buffer_tree/table.rs +++ b/ingester2/src/buffer_tree/table.rs @@ -4,12 +4,22 @@ pub(crate) mod name_resolver; use std::sync::Arc; +use async_trait::async_trait; use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId}; +use datafusion_util::MemoryStream; use mutable_batch::MutableBatch; use parking_lot::{Mutex, RwLock}; +use schema::Projection; +use trace::span::{Span, SpanRecorder}; use super::partition::{resolver::PartitionProvider, PartitionData}; -use crate::{arcmap::ArcMap, deferred_load::DeferredLoad}; +use crate::{ + arcmap::ArcMap, + deferred_load::DeferredLoad, + query::{ + partition_response::PartitionResponse, response::PartitionStream, QueryError, QueryExec, + }, +}; /// A double-referenced map where [`PartitionData`] can be looked up by /// [`PartitionKey`], or ID. @@ -172,10 +182,7 @@ impl TableData { } /// Return the [`PartitionData`] for the specified ID. - pub(crate) fn get_partition( - &self, - partition_id: PartitionId, - ) -> Option>> { + pub(crate) fn partition(&self, partition_id: PartitionId) -> Option>> { self.partition_data.read().by_id(partition_id) } @@ -203,6 +210,57 @@ impl TableData { } } +#[async_trait] +impl QueryExec for TableData { + type Response = PartitionStream; + + async fn query_exec( + &self, + namespace_id: NamespaceId, + table_id: TableId, + columns: Vec, + span: Option, + ) -> Result { + assert_eq!(self.table_id, table_id, "buffer tree index inconsistency"); + assert_eq!( + self.namespace_id, namespace_id, + "buffer tree index inconsistency" + ); + + // Gather the partition data from all of the partitions in this table. + let partitions = self.partitions().into_iter().filter_map(move |p| { + let mut span = SpanRecorder::new(span.clone().map(|s| s.child("partition read"))); + + let (id, data) = { + let mut p = p.lock(); + (p.partition_id(), p.get_query_data()?) + }; + assert_eq!(id, data.partition_id()); + + // Project the data if necessary + let columns = columns.iter().map(String::as_str).collect::>(); + let selection = if columns.is_empty() { + Projection::All + } else { + Projection::Some(columns.as_ref()) + }; + + let ret = PartitionResponse::new( + Box::pin(MemoryStream::new( + data.project_selection(selection).into_iter().collect(), + )), + id, + None, + ); + + span.ok("read partition data"); + Some(ret) + }); + + Ok(PartitionStream::new(futures::stream::iter(partitions))) + } +} + #[cfg(test)] mod tests { use std::{sync::Arc, time::Duration};